Automatic Trigger Data Pipeline with AWS using AWS CDK
Kate Vu

Kate Vu @katevu

Location:
NSW
Joined:
Oct 7, 2022

Automatic Trigger Data Pipeline with AWS using AWS CDK

Publish Date: Jul 8
1 0

This blog describes how to build an automated data pipeline to ingest, simply transform data. The pipeline will use AWS Step Functions to orchestrate, and AWS Glue to transform the data. AWS SNS will be used to notify the user if there is an error or when the run is finished.
I usually build infrastructure via AWS CDK with TypeScript, but for this one, I’ll use Python — with great help from GitHub Copilot along the way.

Prerequisites:

  • AWS Accounts

Architecture

Datapipeline diagram

AWS Resources:

  • AWS S3
  • AWS Lambda
  • AWS StepFunction
  • AWS Glue
  • AWS Athena
  • AWS SNS
  1. Create the CDK project:

    mkdir aws-cdk-glue && cd aws-cdk-glue
    cdk init app --language python
    source .venv/bin/activate
    python3 -m pip install -r requirements.txt
    
  2. Create S3 buckets

Let’s quickly create a list of S3 buckets we’ll need.

```
from aws_cdk import Stack, RemovalPolicy
from aws_cdk import aws_s3 as s3
from constructs import Construct

class S3BucketsStack(Stack):
    def __init__(
        self, scope: Construct, construct_id: str, bucket_names: list, **kwargs
    ) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create S3 buckets from the provided list of bucket names
        for bucket_name in bucket_names:
            s3.Bucket(
                self,
                f"S3Bucket-{bucket_name}",
                bucket_name=bucket_name,
                versioned=True,  # Enable versioning for the bucket
                removal_policy=RemovalPolicy.DESTROY,  # Retain bucket on stack deletion
                auto_delete_objects=False,  # Prevent accidental deletion of objects
```
Enter fullscreen mode Exit fullscreen mode

Now, define the list of bucket names and create them in your app.py file:

# Create the S3 buckets stack
bucket_names = [
    "kate-source-data",
    "kate-staging-data",
    "kate-error-staging-data",
    "kate-transform-data",
    "kate-error-transform-data",
]
s3_buckets_stack = S3BucketsStack(
    app,
    construct_id="S3BucketsStack",
    bucket_names=bucket_names,
    env=environment,
)
Enter fullscreen mode Exit fullscreen mode

Run the command below to deploy the stack:

cdk deploy S3BucketsStack
Enter fullscreen mode Exit fullscreen mode

Then head over to the AWS Console to verify that all the buckets have been created.

3. Generate data

To generate test data, run the following command:

python ./generate_data.py 1000 1000
Enter fullscreen mode Exit fullscreen mode

This will create two CSV files in your current directory:

  • customer_data.csv — contains 1,000 customer records
  • transaction_data.csv — contains 1,000 transaction records These files will be used as source data in the pipeline.
import csv
import random
import argparse
import uuid
import os  # Import os for folder creation


def generate_customer_data(num_records):
    first_names = ["John", "Jane", "Alice", "Bob", "Charlie", "Diana"]
    last_names = ["Smith", "Doe", "Johnson", "Williams", "Brown", "Jones"]
    genders = ["Male", "Female"]

    data = []
    for i in range(1, num_records + 1):
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        gender = random.choice(genders)
        data.append(
            {
                "id": i,
                "first_name": first_name,
                "last_name": last_name,
                "gender": gender,
            }
        )

    return data


def save_to_csv(data, folder, filename):
    # Ensure the folder exists
    os.makedirs(folder, exist_ok=True)
    filepath = os.path.join(folder, filename)
    with open(filepath, mode="w", newline="") as file:
        writer = csv.DictWriter(file, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)


def generate_transaction_data(customer_ids, num_records):
    transaction_types = ["Purchase", "Refund", "Exchange"]
    data = []
    for _ in range(num_records):
        transaction_id = str(uuid.uuid4())  # Generate a unique transaction ID
        customer_id = random.choice(customer_ids)
        transaction_type = random.choice(transaction_types)
        amount = round(random.uniform(10.0, 500.0), 2)
        data.append(
            {
                "transaction_id": transaction_id,
                "customer_id": customer_id,
                "transaction_type": transaction_type,
                "amounttt": amount,
            }
        )

    return data


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Generate customer and transaction data."
    )
    parser.add_argument(
        "num_customer_records", type=int, help="Number of customer records to generate"
    )
    parser.add_argument(
        "num_transaction_records",
        type=int,
        help="Number of transaction records to generate",
    )
    args = parser.parse_args()

    # Define the folder to save files
    folder = "test_data"

    # Generate customer data
    customer_data = generate_customer_data(args.num_customer_records)
    save_to_csv(customer_data, folder, "customer_data.csv")

    # Extract customer IDs
    customer_ids = [customer["id"] for customer in customer_data]

    # Generate transaction data
    transaction_data = generate_transaction_data(
        customer_ids, args.num_transaction_records
    )
    save_to_csv(transaction_data, folder, "transaction_data.csv")
Enter fullscreen mode Exit fullscreen mode

4. Create PySpark script to ingest the data

In this step, we’ll create a PySpark script to handle the data ingestion. The script will:

  • Check if all required files exist in the source data bucket. If any file is missing, it will send a notification to the SNS topic and stop the job.
  • If all required files are present:
    • Read the files from S3
    • Add ingestion_start_time and ingestion_end_time columns
    • Write the data to the output path in Parquet format
  • If there’s an error while processing the files:
    • Write the error file to the error bucket
    • Notify SNS topic
import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from awsglue.utils import getResolvedOptions
import boto3
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def check_files_exist(s3_client, bucket, env_name, file_path, file_names):
    """
    Check if the specified files exist in the given S3 bucket.

    :param s3_client: Boto3 S3 client
    :param bucket: Name of the S3 bucket
    :param file_names: List of file names to check
    """
    for file_name in file_names:
        try:
            s3_client.head_object(Bucket=bucket, Key=f"{file_path}/{file_name}")
            logger.info(f"File {file_path}/{file_name} exists in bucket {bucket}.")
        except s3_client.exceptions.ClientError as e:
            logger.error(
                f"File {file_path}/{file_name} does not exist in bucket {bucket}: {e} Exiting."
            )
            sys.exit(1)


def delete_directory_in_s3(s3_client, bucket, directory_path):
    """
    Delete all files in the specified directory in the S3 bucket.

    :param s3_client: Boto3 S3 client
    :param bucket: Name of the S3 bucket
    :param directory_path: Path of the directory to delete
    """
    try:
        objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=directory_path)
        if "Contents" in objects:
            for obj in objects["Contents"]:
                s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
                logger.info(f"Deleted file: {obj['Key']} from bucket {bucket}")
        else:
            logger.info(f"No files found in {directory_path} to delete.")
    except Exception as e:
        logger.error(f"Error deleting directory {directory_path}: {e}")
        sys.exit(1)


def notify_sns(sns_client, topic_arn, message):
    """
    Send a notification to an SNS topic.

    :param sns_client: Boto3 SNS client
    :param topic_arn: ARN of the SNS topic
    :param message: Message to send
    """
    try:
        sns_client.publish(TopicArn=topic_arn, Message=message)
        logger.info(f"Notification sent to SNS topic: {topic_arn}")
    except ClientError as e:
        logger.error(f"Failed to send notification to SNS topic: {e}")
        sys.exit(1)


def process_file(
    spark,
    s3_client,
    sns_client,
    sns_topic_arn,
    input_bucket,
    output_bucket,
    error_bucket,
    file_path,
    file_name,
    env_name,
    current_time,
):
    """
    Process a single file: read from S3, transform, and write to S3.

    :param spark: SparkSession object
    :param s3_client: Boto3 S3 client
    :param sns_client: Boto3 SNS client
    :param sns_topic_arn: ARN of the SNS topic
    :param input_bucket: Name of the input S3 bucket
    :param output_bucket: Name of the output S3 bucket
    :param error_bucket: Name of the error S3 bucket
    :param file_name: Name of the file to process
    :param env_name: Environment name (e.g., dev, prod)
    """
    input_s3_path = f"s3://{input_bucket}/{file_path}/{file_name}"
    output_s3_path = (
        f"s3://{output_bucket}/{env_name}/staging_{file_name.split('.')[0]}/"
    )
    error_s3_path = f"s3://{error_bucket}/{env_name}/error_{file_name}"

    logger.info(f"Processing file: {file_name}")
    logger.info(f"Reading from: {input_s3_path}")

    try:
        # Get the current UTC time for ingestion start
        ingestion_start_time = current_time.strftime("%Y-%m-%d %H:%M:%S")

        # Read CSV file from S3
        df = spark.read.csv(input_s3_path, header=True, inferSchema=True)
        logger.info(f"Finish reading file from s3")
        if not df.columns:
            raise RuntimeError(
                "The DataFrame is empty. Cannot proceed with processing."
            )

        # Add ingestion_start_time column to the DataFrame
        df = df.withColumn("ingestion_start_time", lit(ingestion_start_time))

        # Get the current UTC time for ingestion finish
        ingestion_finish_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

        # Add ingestion_finish_time column to the DataFrame
        df = df.withColumn("ingestion_finish_time", lit(ingestion_finish_time))

        # Write the DataFrame to S3 in Parquet format
        logger.info(f"Writing to: {output_s3_path}")
        df.write.parquet(output_s3_path, mode="overwrite")
        logger.info(f"Successfully processed and saved file: {file_name}")

    except Exception as e:
        logger.error(f"Error processing file {file_name}: {e}")
        logger.info(f"Copying original file to error bucket: {error_s3_path}")
        try:
            s3_client.copy_object(
                Bucket=error_bucket,
                CopySource={"Bucket": input_bucket, "Key": f"{file_path}/{file_name}"},
                Key=f"{env_name}/error_{file_name}",
            )
            logger.info(f"Successfully copied file {file_name} to error bucket.")
        except ClientError as copy_error:
            logger.error(
                f"Failed to copy file {file_name} to error bucket: {copy_error}"
            )

        # Notify SNS about the error
        error_message = f"Error processing file {file_name} in environment {env_name}. Original file copied to error bucket."
        notify_sns(sns_client, sns_topic_arn, error_message)


def main():
    # Get arguments passed to the Glue job
    args = getResolvedOptions(
        sys.argv,
        [
            "env_name",
            "input_bucket",
            "output_bucket",
            "error_bucket",
            "file_path",
            "file_names",
            "sns_topic_arn",  # Added SNS topic ARN argument
            "JOB_NAME",
        ],
    )

    # Extract input and output S3 paths from arguments
    input_bucket = args["input_bucket"]
    output_bucket = args["output_bucket"]
    error_bucket = args["error_bucket"]
    file_path = args["file_path"]
    file_names = args["file_names"].split(
        ","
    )  # Expecting a comma-separated list of file names
    env_name = args["env_name"]
    sns_topic_arn = args["sns_topic_arn"]  # Extract SNS topic ARN

    # Initialize S3 and SNS clients
    s3_client = boto3.client("s3")
    sns_client = boto3.client("sns")

    # Check if files exist in the input bucket
    check_files_exist(s3_client, input_bucket, env_name, file_path, file_names)

    # Delete the entire directory in the output bucket before processing files
    output_directory_path = f"{env_name}/"
    delete_directory_in_s3(s3_client, output_bucket, output_directory_path)

    # Delete the error bucket folder before processing files
    error_directory_path = f"{env_name}/"
    delete_directory_in_s3(s3_client, error_bucket, error_directory_path)

    # Initialize Spark session
    spark = SparkSession.builder.appName(args["JOB_NAME"]).getOrCreate()

    # Process each file
    current_time = datetime.utcnow()
    for file_name in file_names:
        process_file(
            spark,
            s3_client,
            sns_client,
            sns_topic_arn,
            input_bucket,
            output_bucket,
            error_bucket,
            file_path,
            file_name,
            env_name,
            current_time,
        )

    # Stop the Spark session
    spark.stop()
    logger.info("Spark session stopped.")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

5. Create PySpark script to transform the data
For this example, the transformation is very simple: Just correcting the name of a column in the transaction_data file.

import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession
from awsglue.utils import getResolvedOptions
from pyspark.sql.functions import col
import boto3
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

def notify_sns(sns_client, topic_arn, message):
    """
    Send a notification to an SNS topic.

    :param sns_client: Boto3 SNS client
    :param topic_arn: ARN of the SNS topic
    :param message: Message to send
    """
    try:
        sns_client.publish(TopicArn=topic_arn, Message=message)
        logger.info(f"Notification sent to SNS topic: {topic_arn}")
    except ClientError as e:
        logger.error(f"Failed to send notification to SNS topic: {e}")

def process_file(
    spark,
    input_bucket,
    output_bucket,
    error_bucket,
    env_name,
    file_name,
    sns_client,
    sns_topic_arn,
):
    """
    Process individual files based on their type.

    :param spark: SparkSession object
    :param input_bucket: Name of the input S3 bucket
    :param output_bucket: Name of the output S3 bucket
    :param error_bucket: Name of the error S3 bucket
    :param env_name: Environment name (e.g., dev, prod)
    :param file_name: Name of the file to process
    :param sns_client: Boto3 SNS client
    :param sns_topic_arn: ARN of the SNS topic
    """
    input_path = f"s3://{input_bucket}/{env_name}/staging_{file_name}"
    output_path = f"s3://{output_bucket}/{env_name}/transformation_{file_name}"
    error_path = f"s3://{error_bucket}/{env_name}/{file_name}_error.log"

    logger.info(f"Processing file: {file_name}")
    logger.info(f"Reading data from: {input_path}")

    try:
        # Read Parquet file from S3
        df = spark.read.parquet(input_path)

        if file_name == "customer_data":
            # Copy customer_data as is
            logger.info("Copying customer_data without transformation.")
        elif file_name == "transaction_data":
            # Rename column 'amounttt' to 'amount' for transaction_data
            logger.info("Renaming column 'amounttt' to 'amount' for transaction_data.")
            df = df.withColumnRenamed("amounttt", "amount")
        else:
            logger.warning(f"Unknown file type: {file_name}. Skipping transformation.")

        # Write the processed data to S3 in Parquet format
        logger.info(f"Writing processed data to: {output_path}")
        df.write.parquet(output_path, mode="overwrite")

        logger.info(f"Successfully processed file: {file_name}")
    except Exception as e:
        logger.error(f"Error processing file {file_name}: {e}")
        logger.info(f"Copying original file to error bucket: {error_path}")
        try:
            s3_client = boto3.client("s3")
            s3_client.copy_object(
                Bucket=error_bucket,
                CopySource={
                    "Bucket": input_bucket,
                    "Key": f"{env_name}/staging_{file_name}",
                },
                Key=f"{env_name}/{file_name}_error.log",
            )
            logger.info(f"Successfully copied file {file_name} to error bucket.")
        except ClientError as copy_error:
            logger.error(
                f"Failed to copy file {file_name} to error bucket: {copy_error}"
            )

        # Notify SNS about the error
        error_message = f"Error processing file {file_name} in environment {env_name}. Original file copied to error bucket."
        notify_sns(sns_client, sns_topic_arn, error_message)

def main():
    # Get arguments passed to the Glue job
    args = getResolvedOptions(
        sys.argv,
        [
            "JOB_NAME",
            "env_name",
            "input_bucket",
            "output_bucket",
            "error_bucket",
            "file_names",
            "sns_topic_arn",  # Added SNS topic ARN argument
        ],
    )

    # Extract arguments
    env_name = args["env_name"]
    input_bucket = args["input_bucket"]
    output_bucket = args["output_bucket"]
    error_bucket = args["error_bucket"]
    file_names = args["file_names"].split(",")  # Comma-separated list of file names
    sns_topic_arn = args["sns_topic_arn"]

    # Initialize Spark session
    spark = SparkSession.builder.appName(args["JOB_NAME"]).getOrCreate()

    # Initialize SNS client
    sns_client = boto3.client("sns")

    # Loop through files and process them
    for file_name in file_names:
        process_file(
            spark,
            input_bucket,
            output_bucket,
            error_bucket,
            env_name,
            file_name,
            sns_client,
            sns_topic_arn,
        )

    # Stop the Spark session
    spark.stop()
    logger.info("Spark session stopped.")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

6. Create Glue jobs for ingestion and transformation
First, create a new construct to handle this. The construct will:

  • Create an IAM role for the Glue jobs
    • Read from the source data bucket
    • Write to the destination and error buckets
    • Publish messages to the SNS topic
  • Create Glue jobs
from aws_cdk import (
    aws_glue as glue,
    aws_iam as iam,
    aws_s3_assets as s3_assets,
)
from constructs import Construct
import os.path as path


class GlueContruct(Construct):

    def __init__(
        self,
        scope: Construct,
        id: str,
        env_name: str,
        input_bucket: str,
        output_bucket: str,
        error_bucket: str,
        file_names: list,
        script_file_path: str,
        glue_job_prefix: str,
        sns_topic_arn: str,
        **kwargs,
    ) -> None:
        super().__init__(scope, id, **kwargs)
        # Define an IAM role for the Glue job
        glue_role = iam.Role(
            self,
            "GlueJobRole",
            assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
            managed_policies=[
                iam.ManagedPolicy.from_aws_managed_policy_name(
                    "service-role/AWSGlueServiceRole"
                )
            ],
        )

        # Add permissions to read from the input bucket and write to the output bucket
        glue_role.add_to_policy(
            iam.PolicyStatement(
                actions=["s3:GetObject", "s3:ListBucket"],
                resources=[
                    f"arn:aws:s3:::{input_bucket}/{env_name}/*",
                    f"arn:aws:s3:::{input_bucket}/{env_name}",
                ],
            )
        )

        glue_role.add_to_policy(
            iam.PolicyStatement(
                actions=[
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:DeleteObject",
                ],
                resources=[
                    f"arn:aws:s3:::{output_bucket}/{env_name}/*",
                    f"arn:aws:s3:::{output_bucket}/{env_name}",
                    f"arn:aws:s3:::{error_bucket}/{env_name}/*",
                    f"arn:aws:s3:::{error_bucket}/{env_name}",
                ],
            )
        )

        # Add permissions to publish messages to the SNS topic
        glue_role.add_to_policy(
            iam.PolicyStatement(
                actions=["sns:Publish"],
                resources=[sns_topic_arn],
            )
        )

        # Upload the Glue script to an S3 bucket using an S3 asset
        glue_script_asset = s3_assets.Asset(
            self,
            "GlueScriptAsset",
            path=path.join(
                path.dirname(__file__), script_file_path
            ),  # Replace with the local path to your script
        )

        glue_script_asset.grant_read(glue_role)

        # Define the Glue job
        self.glue_job = glue.CfnJob(
            self,
            glue_job_prefix + env_name,
            name=f"{glue_job_prefix}-{env_name}",
            role=glue_role.role_arn,
            command={
                "name": "glueetl",
                "scriptLocation": glue_script_asset.s3_object_url,
                "pythonVersion": "3",
            },
            default_arguments={
                "--env_name": env_name,
                "--input_bucket": input_bucket,
                "--output_bucket": output_bucket,
                "--error_bucket": error_bucket,
                "--file_names": ",".join(file_names),
                "--file_path": "test",  # Assuming files are in a 'data' folder for now
                "--sns_topic_arn": sns_topic_arn,
            },
            max_retries=0,
            timeout=10,
            glue_version="5.0",
        )
Enter fullscreen mode Exit fullscreen mode

Now we can use new construct to create Glue Jobs

  • Ingestion job — runs the PySpark script that validates and ingests the data
  • Transformation job — runs the script that corrects the column name
        # Create the Glue ingestion construct
        glue_ingestion = GlueContruct(
            self,
            "GlueIngestion",
            env_name=env_name,
            input_bucket=account_config["ingestion"]["input_bucket"],
            output_bucket=account_config["ingestion"]["output_bucket"],
            error_bucket=account_config["ingestion"]["error_bucket"],
            file_names=account_config["ingestion"]["file_names"],
            script_file_path="../../scripts/glue/ingestion.py",  # Path to your Glue script
            glue_job_prefix="IngestionJob",
            sns_topic_arn=sns_topic.topic_arn,  # Pass the SNS topic ARN
        )

        glue_transformation = GlueContruct(
            self,
            "GlueTransformation",
            env_name=env_name,
            input_bucket=account_config["transformation"]["input_bucket"],
            output_bucket=account_config["transformation"]["output_bucket"],
            error_bucket=account_config["transformation"]["error_bucket"],
            file_names=account_config["transformation"]["file_names"],
            script_file_path="../../scripts/glue/transformation.py",  # Path to your Glue script
            glue_job_prefix="TransformationJob",
            sns_topic_arn=sns_topic.topic_arn,  # Pass the SNS topic ARN
        )
        # Output Glue job names
        add_output(self, "GlueIngestionJobName", glue_ingestion.glue_job.name)
        add_output(self, "GlueTransformationJobName", glue_transformation.glue_job.name)
Enter fullscreen mode Exit fullscreen mode

7. Create database, table, crawler
Since we want to query the data later using Amazon Athena, we need to create the following components:

  • An AWS Glue Database
  • AWS Glue Tables
  • Glue Crawlers to crawl the data and update the AWS Glue Data Catalog Glue Crawlers can automatically create tables, which is convenient. However, manually creating tables gives you more control over the metadata — can be useful if your AWS account has Lake Formation enabled, and you want to manage access via the Data Lake.
import os
from aws_cdk import aws_glue as glue
from aws_cdk import aws_iam as iam
from constructs import Construct
import aws_cdk.aws_lakeformation as lakeformation
from aws_cdk_glue.utils.utils import add_output  # Import the add_output function


def create_glue_role(
    scope: Construct,
    id: str,
    env_name: str,
    output_bucket: str,
    account_id: str,
    region: str,
) -> iam.Role:
    """Create an IAM role for the Glue crawler with necessary permissions."""
    crawler_role = iam.Role(
        scope,
        id,
        assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
        managed_policies=[
            iam.ManagedPolicy.from_aws_managed_policy_name(
                "service-role/AWSGlueServiceRole"
            )
        ],
    )

    # Add permissions to access the staging bucket
    crawler_role.add_to_policy(
        iam.PolicyStatement(
            actions=[
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:DeleteObject",
            ],
            resources=[
                f"arn:aws:s3:::{output_bucket}",
                f"arn:aws:s3:::{output_bucket}/{env_name}/*",
            ],
        )
    )

    # Add permissions to access Athena databases
    crawler_role.add_to_policy(
        iam.PolicyStatement(
            actions=[
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:CreateTable",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:BatchGetPartition",
                "glue:BatchCreatePartition",
            ],
            resources=[
                f"arn:aws:glue:{region}:{account_id}:catalog",
                f"arn:aws:glue:{region}:{account_id}:database/{env_name}_database",
            ],
        )
    )

    return crawler_role


def create_glue_table(
    scope: Construct,
    id: str,
    table_name: str,  # Pass table name as a parameter
    env_name: str,
    output_bucket: str,
    account_id: str,
    glue_database: glue.CfnDatabase,
) -> glue.CfnTable:
    """Create a Glue table for the Athena database."""
    glue_table = glue.CfnTable(
        scope,
        id,
        catalog_id=account_id,  # Assign account_id to catalog_id
        database_name=glue_database.ref,  # Reference the Glue database
        table_input={
            "name": table_name,  # Use the passed table name
            "storageDescriptor": {
                "location": f"s3://{output_bucket}/{env_name}/{table_name}/",  # Path to the data in S3
                "inputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",  # Input format for the table
                "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",  # Output format for the table
                "serdeInfo": {
                    "serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",  # SerDe library
                    "parameters": {"classification": "Parquet"},
                },
            },
            "tableType": "EXTERNAL_TABLE",  # Define the table type as external
        },
    )
    return glue_table


class GlueTable(Construct):
    def __init__(
        self,
        scope: Construct,
        id: str,
        env_name: str,
        staging_bucket: str,
        staging_file_names: list,
        transformation_bucket: str,
        transformation_file_names: list,
        account_id: str,
        region: str,
        **kwargs,
    ) -> None:
        super().__init__(scope, id, **kwargs)
        database_name = f"{env_name}_database"

        tag_key = "kate"
        tag_values = ["test"]

        # Define the Glue database
        glue_database = glue.CfnDatabase(
            self,
            "GlueDatabase",
            catalog_id=account_id,
            database_input={"name": database_name},
            database_name=database_name,
        )

        lf_tag_pair_property = lakeformation.CfnTagAssociation.LFTagPairProperty(
            catalog_id=account_id, tag_key=tag_key, tag_values=tag_values
        )

        tag_association = lakeformation.CfnTagAssociation(
            self,
            "TagAssociation",
            lf_tags=[lf_tag_pair_property],
            resource=lakeformation.CfnTagAssociation.ResourceProperty(
                database=lakeformation.CfnTagAssociation.DatabaseResourceProperty(
                    catalog_id=account_id, name=database_name
                )
            ),
        )
        tag_association.node.add_dependency(glue_database)

        crawler_role_staging: iam.Role = create_glue_role(
            self,
            f"GlueCrawlerRoleStaging-{env_name}",
            env_name,
            staging_bucket,
            account_id,
            region,
        )

        # Grant permissions for database
        grant_staging_crawler_database_access = lakeformation.CfnPermissions(
            self,
            "LFDatabasePermissions",
            data_lake_principal={
                "dataLakePrincipalIdentifier": crawler_role_staging.role_arn
            },
            resource=lakeformation.CfnPermissions.ResourceProperty(
                database_resource=lakeformation.CfnPermissions.DatabaseResourceProperty(
                    catalog_id=account_id, name=database_name
                ),
            ),
            permissions=["ALTER", "DROP", "DESCRIBE", "CREATE_TABLE"],
        )

        grant_staging_crawler_database_access.node.add_dependency(
            glue_database, tag_association
        )

        for file_name in staging_file_names:
            # Remove file type from file name
            table_name = f"staging_{os.path.splitext(file_name)[0]}"  # Extract the base name without file extension

            # Create a Glue table for each file name
            glue_table = create_glue_table(
                self,
                f"StagingGlueTable-{table_name}",
                table_name=table_name,  # Use the file name as the table name
                env_name=env_name,
                output_bucket=staging_bucket,
                account_id=account_id,
                glue_database=glue_database,
            )
            # Grant permissions for tables
            grant_table_access = lakeformation.CfnPermissions(
                self,
                f"StagingGlueTableLFTablePermissions-{file_name}",
                data_lake_principal={
                    "dataLakePrincipalIdentifier": crawler_role_staging.role_arn
                },
                resource=lakeformation.CfnPermissions.ResourceProperty(
                    table_resource=lakeformation.CfnPermissions.TableResourceProperty(
                        database_name=database_name, name=table_name
                    )
                ),
                permissions=["SELECT", "ALTER", "DROP", "INSERT", "DESCRIBE"],
            )
            grant_table_access.node.add_dependency(glue_database, tag_association)

            # Output the Glue table name using add_output
            add_output(
                self,
                f"StagingGlueTableName-{table_name}",
                glue_table.ref,
            )

        # Define the Glue crawler
        glue_crawler_staging = glue.CfnCrawler(
            self,
            "GlueCrawlerStaging",
            name=f"{env_name}_staging_crawler",
            role=crawler_role_staging.role_arn,  # Use the created IAM role
            database_name=glue_database.ref,
            targets={"s3Targets": [{"path": f"s3://{staging_bucket}/{env_name}/"}]},
        )

        # Output the Glue crawler name using add_output
        add_output(
            self,
            "GlueStagingCrawlerName",
            glue_crawler_staging.ref,
        )

        # Transformation Process
        crawler_role_transformation: iam.Role = create_glue_role(
            self,
            f"GlueCrawlerRoleTransformation-{env_name}",
            env_name,
            transformation_bucket,
            account_id,
            region,
        )

        grant_transformation_crawler_database_access = lakeformation.CfnPermissions(
            self,
            "LFDatabasePermissionsTransformation",
            data_lake_principal={
                "dataLakePrincipalIdentifier": crawler_role_transformation.role_arn
            },
            resource=lakeformation.CfnPermissions.ResourceProperty(
                database_resource=lakeformation.CfnPermissions.DatabaseResourceProperty(
                    catalog_id=account_id, name=database_name
                ),
            ),
            permissions=["ALTER", "DROP", "DESCRIBE", "CREATE_TABLE"],
        )

        grant_transformation_crawler_database_access.node.add_dependency(
            glue_database, tag_association
        )

        for file_name in transformation_file_names:
            table_name = f"transformation_{os.path.splitext(file_name)[0]}"

            glue_table = create_glue_table(
                self,
                f"TransformationGlueTable-{table_name}",
                table_name=table_name,
                env_name=env_name,
                output_bucket=transformation_bucket,
                account_id=account_id,
                glue_database=glue_database,
            )

            grant_table_access = lakeformation.CfnPermissions(
                self,
                f"TransformationGlueTableLFTablePermissions-{file_name}",
                data_lake_principal={
                    "dataLakePrincipalIdentifier": crawler_role_transformation.role_arn
                },
                resource=lakeformation.CfnPermissions.ResourceProperty(
                    table_resource=lakeformation.CfnPermissions.TableResourceProperty(
                        database_name=database_name, name=table_name
                    )
                ),
                permissions=["SELECT", "ALTER", "DROP", "INSERT", "DESCRIBE"],
            )
            grant_table_access.node.add_dependency(glue_database, tag_association)

            # Output the Glue table name using add_output
            add_output(
                self,
                f"TransformationGlueTableName-{table_name}",
                glue_table.ref,
            )

        glue_crawler_transformation = glue.CfnCrawler(
            self,
            "GlueCrawlerTransformation",
            name=f"{env_name}_transformation_crawler",
            role=crawler_role_transformation.role_arn,
            database_name=glue_database.ref,
            targets={
                "s3Targets": [{"path": f"s3://{transformation_bucket}/{env_name}/"}]
            },
        )

        # Output the Glue crawler name using add_output
        add_output(
            self,
            "GlueTransformationCrawlerName",
            glue_crawler_transformation.ref,
        )

        self.glue_crawler_staging = glue_crawler_staging
        self.glue_crawler_transformation = glue_crawler_transformation
Enter fullscreen mode Exit fullscreen mode

If Lake Formation is enabled in your AWS account, make sure your CDK execution role has the necessary permissions to create and access Glue resources.
In my case, I’m using LF-Tags to grant the required permissions to the CDK execution role.

DataLake_grant_permission

And ensure the Glue crawler role has the appropriate Lake Formation permissions.

8. Orchestrate the Flow Using Step Function
Now we can orchestrate the entire pipeline using AWS Step Functions. We want the Step Function to only complete when all steps run successfully.

  • For Glue jobs, it’s straightforward. Just add the following parameter to your task definition to ensure the Step Function waits for the job to complete before moving to the next step. integration_pattern=sfn.IntegrationPattern.RUN_JOB
  • For crawlers, unfortunately, we cannot use RUN_JOB

integration_pattern (Optional[IntegrationPattern]) – AWS Step Functions integrates with services directly in the Amazon States Language. You can control these AWS services using service integration patterns. Depending on the AWS Service, the Service Integration Pattern availability will vary. Default: - IntegrationPattern.REQUEST_RESPONSE for most tasks. IntegrationPattern.RUN_JOB for the following exceptions: BatchSubmitJob, EmrAddStep, EmrCreateCluster, EmrTerminationCluster, and EmrContainersStartJobRun.(https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_stepfunctions_tasks/GlueStartJobRun.html)

To handle this, we can use a Choice state and additional logic to wait until the crawler has actually completed. (https://repost.aws/questions/QUTgHzHs6bSN6n_79bCYohaQ/glue-crawler-in-state-machine-shows-as-complete-before-glue-data-catalog-is-updated)
StepFunction flow will look like


        # Define the ingestion Glue job task
        ingestion_glue_task = tasks.GlueStartJobRun(
            self,
            "IngestionGlueJob",
            glue_job_name=ingestion_glue_job_name,
            integration_pattern=sfn.IntegrationPattern.RUN_JOB,  # Wait for job completion
            arguments=sfn.TaskInput.from_object(
                {
                    "--file_path": sfn.JsonPath.string_at(
                        "$.file_path"
                    ),  # Pass file_path from input
                }
            ),
        )

        # Define the transformation Glue job task
        transformation_glue_task = tasks.GlueStartJobRun(
            self,
            "TransformationGlueJob",
            glue_job_name=transformation_glue_job_name,
            integration_pattern=sfn.IntegrationPattern.RUN_JOB,  # Wait for job completion
        )

        # Define the Glue crawler staging task
        glue_crawler_staging_task = tasks.CallAwsService(
            self,
            "GlueCrawlerStagingTask",
            service="glue",
            action="startCrawler",
            parameters={"Name": glue_crawler_staging_name},
            iam_resources=[
                f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_staging_name}"
            ],
        )

        # Define the Glue crawler transformation task
        glue_crawler_transformation_task = tasks.CallAwsService(
            self,
            "GlueCrawlerTransformationTask",
            service="glue",
            action="startCrawler",
            parameters={"Name": glue_crawler_transformation_name},
            iam_resources=[
                f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_transformation_name}"
            ],
        )

        # Define the SNS publish task
        sns_publish_task = tasks.CallAwsService(
            self,
            "SNSPublishTask",
            service="sns",
            action="publish",
            parameters={
                "TopicArn": sns_topic_arn,
                "Message": f"Step Function {env_name}-DataPipelineStateMachine has completed successfully.",
            },
            iam_resources=[f"arn:aws:sns:{region}:{account}:*"],
        )

        # Wait state for the staging crawler
        wait_staging = sfn.Wait(
            self,
            "WaitForStagingCrawler",
            time=sfn.WaitTime.duration(Duration.seconds(30)),
        )

        # GetCrawler state for the staging crawler
        get_staging_crawler = tasks.CallAwsService(
            self,
            "GetStagingCrawlerState",
            service="glue",
            action="getCrawler",
            parameters={"Name": glue_crawler_staging_name},
            iam_resources=[
                f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_staging_name}"
            ],
        )

        # Success and fail states for the staging crawler
        staging_success = sfn.Succeed(self, "StagingCrawlerSuccess")
        staging_failed = sfn.Fail(self, "StagingCrawlerFailed")

        # Choice state for the staging crawler
        staging_crawler_complete = sfn.Choice(self, "StagingCrawlerComplete")
        staging_crawler_complete.when(
            sfn.Condition.string_equals("$.Crawler.State", "READY"), staging_success
        )
        staging_crawler_complete.when(
            sfn.Condition.string_equals("$.Crawler.State", "FAILED"), staging_failed
        )
        staging_crawler_complete.otherwise(wait_staging)

        # Wait state for the transformation crawler
        wait_transformation = sfn.Wait(
            self,
            "WaitForTransformationCrawler",
            time=sfn.WaitTime.duration(Duration.seconds(30)),
        )

        # GetCrawler state for the transformation crawler
        get_transformation_crawler = tasks.CallAwsService(
            self,
            "GetTransformationCrawlerState",
            service="glue",
            action="getCrawler",
            parameters={"Name": glue_crawler_transformation_name},
            iam_resources=[
                f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_transformation_name}"
            ],
        )

        # Success and fail states for the transformation crawler
        transformation_success = sfn.Succeed(self, "TransformationCrawlerSuccess")
        transformation_failed = sfn.Fail(self, "TransformationCrawlerFailed")

        # Choice state for the transformation crawler
        transformation_crawler_complete = sfn.Choice(
            self, "TransformationCrawlerComplete"
        )
        transformation_crawler_complete.when(
            sfn.Condition.string_equals("$.Crawler.State", "READY"),
            transformation_success,
        )
        transformation_crawler_complete.when(
            sfn.Condition.string_equals("$.Crawler.State", "FAILED"),
            transformation_failed,
        )
        transformation_crawler_complete.otherwise(wait_transformation)

        # Run transformation Glue job and Glue crawler staging in parallel
        parallel_tasks = sfn.Parallel(self, "ParallelTasks")
        parallel_tasks.branch(
            transformation_glue_task.next(glue_crawler_transformation_task)
            .next(wait_transformation)
            .next(get_transformation_crawler)
            .next(transformation_crawler_complete)
        )
        parallel_tasks.branch(
            glue_crawler_staging_task.next(wait_staging)
            .next(get_staging_crawler)
            .next(staging_crawler_complete)
        )

        # Chain the ingestion Glue job, parallel tasks, transformation crawler, and SNS publish task
        definition = ingestion_glue_task.next(parallel_tasks).next(sns_publish_task)

        # Create the Step Function
        self.state_machine = sfn.StateMachine(
            self,
            f"{env_name}-DataPipelineStateMachine",
            definition=definition,
        )
Enter fullscreen mode Exit fullscreen mode

To make the pipeline fully event-driven, we’ll add a trigger that starts the Step Function automatically whenever new files are added to the source bucket.

  • Set up an S3 Event Notification: Configure the source S3 bucket to send an event to a Lambda function whenever a new object is created.
  • Lambda function: Check if all required files (e.g., customer_data.csv, transaction_data.csv) are present in the bucket. If all required files are found, start the Step Function
        # Reference the existing S3 bucket
        input_bucket = s3.Bucket.from_bucket_name(
            self,
            "ExistingInputBucket",
            bucket_name=input_bucket_name,
        )

        # Create a Lambda function to trigger the Step Function
        trigger_lambda = _lambda.Function(
            self,
            "TriggerLambda",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="trigger.handler",
            code=_lambda.Code.from_asset("./scripts/lambda/"),  # Path to Lambda code
            environment={
                "STEP_FUNCTION_ARN": self.state_machine.state_machine_arn,
                "REGION": region,
                "ACCOUNT": account,
                "BUCKET_NAME": input_bucket.bucket_name,
                "FILE_NAMES": ",".join(
                    file_names
                ),  # Convert list to comma-separated string
            },
        )

        # Grant permissions to the Lambda function
        input_bucket.grant_read(trigger_lambda)
        self.state_machine.grant_start_execution(trigger_lambda)

        # Add S3 event notification to invoke the Lambda function only for files in the env_name folder
        input_bucket.add_event_notification(
            s3.EventType.OBJECT_CREATED,
            s3_notifications.LambdaDestination(trigger_lambda),
            s3.NotificationKeyFilter(
                prefix=f"{env_name}/"
            ),  # Trigger only for files in env_name folder
        )
Enter fullscreen mode Exit fullscreen mode

9. Deploy and Verify resources in AWS
Now it’s time to deploy everything to your AWS environment. Run the following command:
cdk deploy DataPipelineStack
Once the deployment is complete, go to the AWS CloudFormation Console to verify that all resources have been created successfully.

10. Upload data to S3 bucket
Once the resources are deployed, upload the sample data files (customer_data.csv and transaction_data.csv) to the source S3 bucket.
You can do this either via the AWS Console or by using the AWS CLI.


aws s3 cp /path/to/source s3://bucket-name/ --recursive

Make sure the files are placed under the correct prefix, which should match the environment name.
Once uploaded, the Lambda function will check for all required files and trigger the Step Function if everything is in place.

11. Monitor the Pipeline and Query Data with Athena
After uploading the data, you can monitor the entire process through AWS Step Function and check status of each step.
To stay informed, you can subscribe an email address or mobile phone number to the SNS topic.
This way, you’ll receive a notification when the Step Function completes successfully or if there’s any error during execution.

Emal Notification

Finally, once the process is complete and the Glue catalog is updated, you can query the output data using Amazon Athena:

Athena

Quick Notes and Thoughts

  • If you see folders named like *_$folder$ in your output S3 bucket, don’t worry — this is a placeholder created by Hadoop when writing to a path that doesn’t yet exist. To avoid permission errors, make sure your Glue job role has the right permissions to create folders in the target S3 location.
  • On a side note: I love using Python — it’s one of my favourite languages But when it comes to AWS CDK, I often feel that TypeScript is more “native.” Maybe that’s because TypeScript was the first language supported by CDK? The documentation definitely feel more complete in TypeScript.

TypeScript was the first language supported by the AWS CDK, and much of the AWS CDK example code is written in TypeScript. This guide includes a topic specifically to show how to adapt TypeScript AWS CDK code for use with the other supported languages. For more information, see Comparing AWS CDK in TypeScript with other languages. (https://docs.aws.amazon.com/cdk/v2/guide/languages.html)
https://docs.aws.amazon.com/cdk/v2/guide/work-with.html#work-with-cdk-compare

Comments 0 total

    Add comment