This blog describes how to build an automated data pipeline to ingest, simply transform data. The pipeline will use AWS Step Functions to orchestrate, and AWS Glue to transform the data. AWS SNS will be used to notify the user if there is an error or when the run is finished.
I usually build infrastructure via AWS CDK with TypeScript, but for this one, I’ll use Python — with great help from GitHub Copilot along the way.
Prerequisites:
- AWS Accounts
Architecture
AWS Resources:
- AWS S3
- AWS Lambda
- AWS StepFunction
- AWS Glue
- AWS Athena
- AWS SNS
-
Create the CDK project:
mkdir aws-cdk-glue && cd aws-cdk-glue cdk init app --language python source .venv/bin/activate python3 -m pip install -r requirements.txt
Create S3 buckets
Let’s quickly create a list of S3 buckets we’ll need.
```
from aws_cdk import Stack, RemovalPolicy
from aws_cdk import aws_s3 as s3
from constructs import Construct
class S3BucketsStack(Stack):
def __init__(
self, scope: Construct, construct_id: str, bucket_names: list, **kwargs
) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create S3 buckets from the provided list of bucket names
for bucket_name in bucket_names:
s3.Bucket(
self,
f"S3Bucket-{bucket_name}",
bucket_name=bucket_name,
versioned=True, # Enable versioning for the bucket
removal_policy=RemovalPolicy.DESTROY, # Retain bucket on stack deletion
auto_delete_objects=False, # Prevent accidental deletion of objects
```
Now, define the list of bucket names and create them in your app.py file:
# Create the S3 buckets stack
bucket_names = [
"kate-source-data",
"kate-staging-data",
"kate-error-staging-data",
"kate-transform-data",
"kate-error-transform-data",
]
s3_buckets_stack = S3BucketsStack(
app,
construct_id="S3BucketsStack",
bucket_names=bucket_names,
env=environment,
)
Run the command below to deploy the stack:
cdk deploy S3BucketsStack
Then head over to the AWS Console to verify that all the buckets have been created.
3. Generate data
To generate test data, run the following command:
python ./generate_data.py 1000 1000
This will create two CSV files in your current directory:
-
customer_data.csv
— contains 1,000 customer records -
transaction_data.csv
— contains 1,000 transaction records These files will be used as source data in the pipeline.
import csv
import random
import argparse
import uuid
import os # Import os for folder creation
def generate_customer_data(num_records):
first_names = ["John", "Jane", "Alice", "Bob", "Charlie", "Diana"]
last_names = ["Smith", "Doe", "Johnson", "Williams", "Brown", "Jones"]
genders = ["Male", "Female"]
data = []
for i in range(1, num_records + 1):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
gender = random.choice(genders)
data.append(
{
"id": i,
"first_name": first_name,
"last_name": last_name,
"gender": gender,
}
)
return data
def save_to_csv(data, folder, filename):
# Ensure the folder exists
os.makedirs(folder, exist_ok=True)
filepath = os.path.join(folder, filename)
with open(filepath, mode="w", newline="") as file:
writer = csv.DictWriter(file, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
def generate_transaction_data(customer_ids, num_records):
transaction_types = ["Purchase", "Refund", "Exchange"]
data = []
for _ in range(num_records):
transaction_id = str(uuid.uuid4()) # Generate a unique transaction ID
customer_id = random.choice(customer_ids)
transaction_type = random.choice(transaction_types)
amount = round(random.uniform(10.0, 500.0), 2)
data.append(
{
"transaction_id": transaction_id,
"customer_id": customer_id,
"transaction_type": transaction_type,
"amounttt": amount,
}
)
return data
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate customer and transaction data."
)
parser.add_argument(
"num_customer_records", type=int, help="Number of customer records to generate"
)
parser.add_argument(
"num_transaction_records",
type=int,
help="Number of transaction records to generate",
)
args = parser.parse_args()
# Define the folder to save files
folder = "test_data"
# Generate customer data
customer_data = generate_customer_data(args.num_customer_records)
save_to_csv(customer_data, folder, "customer_data.csv")
# Extract customer IDs
customer_ids = [customer["id"] for customer in customer_data]
# Generate transaction data
transaction_data = generate_transaction_data(
customer_ids, args.num_transaction_records
)
save_to_csv(transaction_data, folder, "transaction_data.csv")
4. Create PySpark script to ingest the data
In this step, we’ll create a PySpark script to handle the data ingestion. The script will:
- Check if all required files exist in the source data bucket. If any file is missing, it will send a notification to the SNS topic and stop the job.
- If all required files are present:
- Read the files from S3
- Add ingestion_start_time and ingestion_end_time columns
- Write the data to the output path in Parquet format
- If there’s an error while processing the files:
- Write the error file to the error bucket
- Notify SNS topic
import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from awsglue.utils import getResolvedOptions
import boto3
from botocore.exceptions import ClientError
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def check_files_exist(s3_client, bucket, env_name, file_path, file_names):
"""
Check if the specified files exist in the given S3 bucket.
:param s3_client: Boto3 S3 client
:param bucket: Name of the S3 bucket
:param file_names: List of file names to check
"""
for file_name in file_names:
try:
s3_client.head_object(Bucket=bucket, Key=f"{file_path}/{file_name}")
logger.info(f"File {file_path}/{file_name} exists in bucket {bucket}.")
except s3_client.exceptions.ClientError as e:
logger.error(
f"File {file_path}/{file_name} does not exist in bucket {bucket}: {e} Exiting."
)
sys.exit(1)
def delete_directory_in_s3(s3_client, bucket, directory_path):
"""
Delete all files in the specified directory in the S3 bucket.
:param s3_client: Boto3 S3 client
:param bucket: Name of the S3 bucket
:param directory_path: Path of the directory to delete
"""
try:
objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=directory_path)
if "Contents" in objects:
for obj in objects["Contents"]:
s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
logger.info(f"Deleted file: {obj['Key']} from bucket {bucket}")
else:
logger.info(f"No files found in {directory_path} to delete.")
except Exception as e:
logger.error(f"Error deleting directory {directory_path}: {e}")
sys.exit(1)
def notify_sns(sns_client, topic_arn, message):
"""
Send a notification to an SNS topic.
:param sns_client: Boto3 SNS client
:param topic_arn: ARN of the SNS topic
:param message: Message to send
"""
try:
sns_client.publish(TopicArn=topic_arn, Message=message)
logger.info(f"Notification sent to SNS topic: {topic_arn}")
except ClientError as e:
logger.error(f"Failed to send notification to SNS topic: {e}")
sys.exit(1)
def process_file(
spark,
s3_client,
sns_client,
sns_topic_arn,
input_bucket,
output_bucket,
error_bucket,
file_path,
file_name,
env_name,
current_time,
):
"""
Process a single file: read from S3, transform, and write to S3.
:param spark: SparkSession object
:param s3_client: Boto3 S3 client
:param sns_client: Boto3 SNS client
:param sns_topic_arn: ARN of the SNS topic
:param input_bucket: Name of the input S3 bucket
:param output_bucket: Name of the output S3 bucket
:param error_bucket: Name of the error S3 bucket
:param file_name: Name of the file to process
:param env_name: Environment name (e.g., dev, prod)
"""
input_s3_path = f"s3://{input_bucket}/{file_path}/{file_name}"
output_s3_path = (
f"s3://{output_bucket}/{env_name}/staging_{file_name.split('.')[0]}/"
)
error_s3_path = f"s3://{error_bucket}/{env_name}/error_{file_name}"
logger.info(f"Processing file: {file_name}")
logger.info(f"Reading from: {input_s3_path}")
try:
# Get the current UTC time for ingestion start
ingestion_start_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
# Read CSV file from S3
df = spark.read.csv(input_s3_path, header=True, inferSchema=True)
logger.info(f"Finish reading file from s3")
if not df.columns:
raise RuntimeError(
"The DataFrame is empty. Cannot proceed with processing."
)
# Add ingestion_start_time column to the DataFrame
df = df.withColumn("ingestion_start_time", lit(ingestion_start_time))
# Get the current UTC time for ingestion finish
ingestion_finish_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
# Add ingestion_finish_time column to the DataFrame
df = df.withColumn("ingestion_finish_time", lit(ingestion_finish_time))
# Write the DataFrame to S3 in Parquet format
logger.info(f"Writing to: {output_s3_path}")
df.write.parquet(output_s3_path, mode="overwrite")
logger.info(f"Successfully processed and saved file: {file_name}")
except Exception as e:
logger.error(f"Error processing file {file_name}: {e}")
logger.info(f"Copying original file to error bucket: {error_s3_path}")
try:
s3_client.copy_object(
Bucket=error_bucket,
CopySource={"Bucket": input_bucket, "Key": f"{file_path}/{file_name}"},
Key=f"{env_name}/error_{file_name}",
)
logger.info(f"Successfully copied file {file_name} to error bucket.")
except ClientError as copy_error:
logger.error(
f"Failed to copy file {file_name} to error bucket: {copy_error}"
)
# Notify SNS about the error
error_message = f"Error processing file {file_name} in environment {env_name}. Original file copied to error bucket."
notify_sns(sns_client, sns_topic_arn, error_message)
def main():
# Get arguments passed to the Glue job
args = getResolvedOptions(
sys.argv,
[
"env_name",
"input_bucket",
"output_bucket",
"error_bucket",
"file_path",
"file_names",
"sns_topic_arn", # Added SNS topic ARN argument
"JOB_NAME",
],
)
# Extract input and output S3 paths from arguments
input_bucket = args["input_bucket"]
output_bucket = args["output_bucket"]
error_bucket = args["error_bucket"]
file_path = args["file_path"]
file_names = args["file_names"].split(
","
) # Expecting a comma-separated list of file names
env_name = args["env_name"]
sns_topic_arn = args["sns_topic_arn"] # Extract SNS topic ARN
# Initialize S3 and SNS clients
s3_client = boto3.client("s3")
sns_client = boto3.client("sns")
# Check if files exist in the input bucket
check_files_exist(s3_client, input_bucket, env_name, file_path, file_names)
# Delete the entire directory in the output bucket before processing files
output_directory_path = f"{env_name}/"
delete_directory_in_s3(s3_client, output_bucket, output_directory_path)
# Delete the error bucket folder before processing files
error_directory_path = f"{env_name}/"
delete_directory_in_s3(s3_client, error_bucket, error_directory_path)
# Initialize Spark session
spark = SparkSession.builder.appName(args["JOB_NAME"]).getOrCreate()
# Process each file
current_time = datetime.utcnow()
for file_name in file_names:
process_file(
spark,
s3_client,
sns_client,
sns_topic_arn,
input_bucket,
output_bucket,
error_bucket,
file_path,
file_name,
env_name,
current_time,
)
# Stop the Spark session
spark.stop()
logger.info("Spark session stopped.")
if __name__ == "__main__":
main()
5. Create PySpark script to transform the data
For this example, the transformation is very simple: Just correcting the name of a column in the transaction_data file.
import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession
from awsglue.utils import getResolvedOptions
from pyspark.sql.functions import col
import boto3
from botocore.exceptions import ClientError
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def notify_sns(sns_client, topic_arn, message):
"""
Send a notification to an SNS topic.
:param sns_client: Boto3 SNS client
:param topic_arn: ARN of the SNS topic
:param message: Message to send
"""
try:
sns_client.publish(TopicArn=topic_arn, Message=message)
logger.info(f"Notification sent to SNS topic: {topic_arn}")
except ClientError as e:
logger.error(f"Failed to send notification to SNS topic: {e}")
def process_file(
spark,
input_bucket,
output_bucket,
error_bucket,
env_name,
file_name,
sns_client,
sns_topic_arn,
):
"""
Process individual files based on their type.
:param spark: SparkSession object
:param input_bucket: Name of the input S3 bucket
:param output_bucket: Name of the output S3 bucket
:param error_bucket: Name of the error S3 bucket
:param env_name: Environment name (e.g., dev, prod)
:param file_name: Name of the file to process
:param sns_client: Boto3 SNS client
:param sns_topic_arn: ARN of the SNS topic
"""
input_path = f"s3://{input_bucket}/{env_name}/staging_{file_name}"
output_path = f"s3://{output_bucket}/{env_name}/transformation_{file_name}"
error_path = f"s3://{error_bucket}/{env_name}/{file_name}_error.log"
logger.info(f"Processing file: {file_name}")
logger.info(f"Reading data from: {input_path}")
try:
# Read Parquet file from S3
df = spark.read.parquet(input_path)
if file_name == "customer_data":
# Copy customer_data as is
logger.info("Copying customer_data without transformation.")
elif file_name == "transaction_data":
# Rename column 'amounttt' to 'amount' for transaction_data
logger.info("Renaming column 'amounttt' to 'amount' for transaction_data.")
df = df.withColumnRenamed("amounttt", "amount")
else:
logger.warning(f"Unknown file type: {file_name}. Skipping transformation.")
# Write the processed data to S3 in Parquet format
logger.info(f"Writing processed data to: {output_path}")
df.write.parquet(output_path, mode="overwrite")
logger.info(f"Successfully processed file: {file_name}")
except Exception as e:
logger.error(f"Error processing file {file_name}: {e}")
logger.info(f"Copying original file to error bucket: {error_path}")
try:
s3_client = boto3.client("s3")
s3_client.copy_object(
Bucket=error_bucket,
CopySource={
"Bucket": input_bucket,
"Key": f"{env_name}/staging_{file_name}",
},
Key=f"{env_name}/{file_name}_error.log",
)
logger.info(f"Successfully copied file {file_name} to error bucket.")
except ClientError as copy_error:
logger.error(
f"Failed to copy file {file_name} to error bucket: {copy_error}"
)
# Notify SNS about the error
error_message = f"Error processing file {file_name} in environment {env_name}. Original file copied to error bucket."
notify_sns(sns_client, sns_topic_arn, error_message)
def main():
# Get arguments passed to the Glue job
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"env_name",
"input_bucket",
"output_bucket",
"error_bucket",
"file_names",
"sns_topic_arn", # Added SNS topic ARN argument
],
)
# Extract arguments
env_name = args["env_name"]
input_bucket = args["input_bucket"]
output_bucket = args["output_bucket"]
error_bucket = args["error_bucket"]
file_names = args["file_names"].split(",") # Comma-separated list of file names
sns_topic_arn = args["sns_topic_arn"]
# Initialize Spark session
spark = SparkSession.builder.appName(args["JOB_NAME"]).getOrCreate()
# Initialize SNS client
sns_client = boto3.client("sns")
# Loop through files and process them
for file_name in file_names:
process_file(
spark,
input_bucket,
output_bucket,
error_bucket,
env_name,
file_name,
sns_client,
sns_topic_arn,
)
# Stop the Spark session
spark.stop()
logger.info("Spark session stopped.")
if __name__ == "__main__":
main()
6. Create Glue jobs for ingestion and transformation
First, create a new construct to handle this. The construct will:
- Create an IAM role for the Glue jobs
- Read from the source data bucket
- Write to the destination and error buckets
- Publish messages to the SNS topic
- Create Glue jobs
from aws_cdk import (
aws_glue as glue,
aws_iam as iam,
aws_s3_assets as s3_assets,
)
from constructs import Construct
import os.path as path
class GlueContruct(Construct):
def __init__(
self,
scope: Construct,
id: str,
env_name: str,
input_bucket: str,
output_bucket: str,
error_bucket: str,
file_names: list,
script_file_path: str,
glue_job_prefix: str,
sns_topic_arn: str,
**kwargs,
) -> None:
super().__init__(scope, id, **kwargs)
# Define an IAM role for the Glue job
glue_role = iam.Role(
self,
"GlueJobRole",
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSGlueServiceRole"
)
],
)
# Add permissions to read from the input bucket and write to the output bucket
glue_role.add_to_policy(
iam.PolicyStatement(
actions=["s3:GetObject", "s3:ListBucket"],
resources=[
f"arn:aws:s3:::{input_bucket}/{env_name}/*",
f"arn:aws:s3:::{input_bucket}/{env_name}",
],
)
)
glue_role.add_to_policy(
iam.PolicyStatement(
actions=[
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:DeleteObject",
],
resources=[
f"arn:aws:s3:::{output_bucket}/{env_name}/*",
f"arn:aws:s3:::{output_bucket}/{env_name}",
f"arn:aws:s3:::{error_bucket}/{env_name}/*",
f"arn:aws:s3:::{error_bucket}/{env_name}",
],
)
)
# Add permissions to publish messages to the SNS topic
glue_role.add_to_policy(
iam.PolicyStatement(
actions=["sns:Publish"],
resources=[sns_topic_arn],
)
)
# Upload the Glue script to an S3 bucket using an S3 asset
glue_script_asset = s3_assets.Asset(
self,
"GlueScriptAsset",
path=path.join(
path.dirname(__file__), script_file_path
), # Replace with the local path to your script
)
glue_script_asset.grant_read(glue_role)
# Define the Glue job
self.glue_job = glue.CfnJob(
self,
glue_job_prefix + env_name,
name=f"{glue_job_prefix}-{env_name}",
role=glue_role.role_arn,
command={
"name": "glueetl",
"scriptLocation": glue_script_asset.s3_object_url,
"pythonVersion": "3",
},
default_arguments={
"--env_name": env_name,
"--input_bucket": input_bucket,
"--output_bucket": output_bucket,
"--error_bucket": error_bucket,
"--file_names": ",".join(file_names),
"--file_path": "test", # Assuming files are in a 'data' folder for now
"--sns_topic_arn": sns_topic_arn,
},
max_retries=0,
timeout=10,
glue_version="5.0",
)
Now we can use new construct to create Glue Jobs
- Ingestion job — runs the PySpark script that validates and ingests the data
- Transformation job — runs the script that corrects the column name
# Create the Glue ingestion construct
glue_ingestion = GlueContruct(
self,
"GlueIngestion",
env_name=env_name,
input_bucket=account_config["ingestion"]["input_bucket"],
output_bucket=account_config["ingestion"]["output_bucket"],
error_bucket=account_config["ingestion"]["error_bucket"],
file_names=account_config["ingestion"]["file_names"],
script_file_path="../../scripts/glue/ingestion.py", # Path to your Glue script
glue_job_prefix="IngestionJob",
sns_topic_arn=sns_topic.topic_arn, # Pass the SNS topic ARN
)
glue_transformation = GlueContruct(
self,
"GlueTransformation",
env_name=env_name,
input_bucket=account_config["transformation"]["input_bucket"],
output_bucket=account_config["transformation"]["output_bucket"],
error_bucket=account_config["transformation"]["error_bucket"],
file_names=account_config["transformation"]["file_names"],
script_file_path="../../scripts/glue/transformation.py", # Path to your Glue script
glue_job_prefix="TransformationJob",
sns_topic_arn=sns_topic.topic_arn, # Pass the SNS topic ARN
)
# Output Glue job names
add_output(self, "GlueIngestionJobName", glue_ingestion.glue_job.name)
add_output(self, "GlueTransformationJobName", glue_transformation.glue_job.name)
7. Create database, table, crawler
Since we want to query the data later using Amazon Athena, we need to create the following components:
- An AWS Glue Database
- AWS Glue Tables
- Glue Crawlers to crawl the data and update the AWS Glue Data Catalog Glue Crawlers can automatically create tables, which is convenient. However, manually creating tables gives you more control over the metadata — can be useful if your AWS account has Lake Formation enabled, and you want to manage access via the Data Lake.
import os
from aws_cdk import aws_glue as glue
from aws_cdk import aws_iam as iam
from constructs import Construct
import aws_cdk.aws_lakeformation as lakeformation
from aws_cdk_glue.utils.utils import add_output # Import the add_output function
def create_glue_role(
scope: Construct,
id: str,
env_name: str,
output_bucket: str,
account_id: str,
region: str,
) -> iam.Role:
"""Create an IAM role for the Glue crawler with necessary permissions."""
crawler_role = iam.Role(
scope,
id,
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSGlueServiceRole"
)
],
)
# Add permissions to access the staging bucket
crawler_role.add_to_policy(
iam.PolicyStatement(
actions=[
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject",
"s3:DeleteObject",
],
resources=[
f"arn:aws:s3:::{output_bucket}",
f"arn:aws:s3:::{output_bucket}/{env_name}/*",
],
)
)
# Add permissions to access Athena databases
crawler_role.add_to_policy(
iam.PolicyStatement(
actions=[
"glue:GetDatabase",
"glue:GetDatabases",
"glue:GetTable",
"glue:UpdateTable",
"glue:CreateTable",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:BatchGetPartition",
"glue:BatchCreatePartition",
],
resources=[
f"arn:aws:glue:{region}:{account_id}:catalog",
f"arn:aws:glue:{region}:{account_id}:database/{env_name}_database",
],
)
)
return crawler_role
def create_glue_table(
scope: Construct,
id: str,
table_name: str, # Pass table name as a parameter
env_name: str,
output_bucket: str,
account_id: str,
glue_database: glue.CfnDatabase,
) -> glue.CfnTable:
"""Create a Glue table for the Athena database."""
glue_table = glue.CfnTable(
scope,
id,
catalog_id=account_id, # Assign account_id to catalog_id
database_name=glue_database.ref, # Reference the Glue database
table_input={
"name": table_name, # Use the passed table name
"storageDescriptor": {
"location": f"s3://{output_bucket}/{env_name}/{table_name}/", # Path to the data in S3
"inputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", # Input format for the table
"outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", # Output format for the table
"serdeInfo": {
"serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", # SerDe library
"parameters": {"classification": "Parquet"},
},
},
"tableType": "EXTERNAL_TABLE", # Define the table type as external
},
)
return glue_table
class GlueTable(Construct):
def __init__(
self,
scope: Construct,
id: str,
env_name: str,
staging_bucket: str,
staging_file_names: list,
transformation_bucket: str,
transformation_file_names: list,
account_id: str,
region: str,
**kwargs,
) -> None:
super().__init__(scope, id, **kwargs)
database_name = f"{env_name}_database"
tag_key = "kate"
tag_values = ["test"]
# Define the Glue database
glue_database = glue.CfnDatabase(
self,
"GlueDatabase",
catalog_id=account_id,
database_input={"name": database_name},
database_name=database_name,
)
lf_tag_pair_property = lakeformation.CfnTagAssociation.LFTagPairProperty(
catalog_id=account_id, tag_key=tag_key, tag_values=tag_values
)
tag_association = lakeformation.CfnTagAssociation(
self,
"TagAssociation",
lf_tags=[lf_tag_pair_property],
resource=lakeformation.CfnTagAssociation.ResourceProperty(
database=lakeformation.CfnTagAssociation.DatabaseResourceProperty(
catalog_id=account_id, name=database_name
)
),
)
tag_association.node.add_dependency(glue_database)
crawler_role_staging: iam.Role = create_glue_role(
self,
f"GlueCrawlerRoleStaging-{env_name}",
env_name,
staging_bucket,
account_id,
region,
)
# Grant permissions for database
grant_staging_crawler_database_access = lakeformation.CfnPermissions(
self,
"LFDatabasePermissions",
data_lake_principal={
"dataLakePrincipalIdentifier": crawler_role_staging.role_arn
},
resource=lakeformation.CfnPermissions.ResourceProperty(
database_resource=lakeformation.CfnPermissions.DatabaseResourceProperty(
catalog_id=account_id, name=database_name
),
),
permissions=["ALTER", "DROP", "DESCRIBE", "CREATE_TABLE"],
)
grant_staging_crawler_database_access.node.add_dependency(
glue_database, tag_association
)
for file_name in staging_file_names:
# Remove file type from file name
table_name = f"staging_{os.path.splitext(file_name)[0]}" # Extract the base name without file extension
# Create a Glue table for each file name
glue_table = create_glue_table(
self,
f"StagingGlueTable-{table_name}",
table_name=table_name, # Use the file name as the table name
env_name=env_name,
output_bucket=staging_bucket,
account_id=account_id,
glue_database=glue_database,
)
# Grant permissions for tables
grant_table_access = lakeformation.CfnPermissions(
self,
f"StagingGlueTableLFTablePermissions-{file_name}",
data_lake_principal={
"dataLakePrincipalIdentifier": crawler_role_staging.role_arn
},
resource=lakeformation.CfnPermissions.ResourceProperty(
table_resource=lakeformation.CfnPermissions.TableResourceProperty(
database_name=database_name, name=table_name
)
),
permissions=["SELECT", "ALTER", "DROP", "INSERT", "DESCRIBE"],
)
grant_table_access.node.add_dependency(glue_database, tag_association)
# Output the Glue table name using add_output
add_output(
self,
f"StagingGlueTableName-{table_name}",
glue_table.ref,
)
# Define the Glue crawler
glue_crawler_staging = glue.CfnCrawler(
self,
"GlueCrawlerStaging",
name=f"{env_name}_staging_crawler",
role=crawler_role_staging.role_arn, # Use the created IAM role
database_name=glue_database.ref,
targets={"s3Targets": [{"path": f"s3://{staging_bucket}/{env_name}/"}]},
)
# Output the Glue crawler name using add_output
add_output(
self,
"GlueStagingCrawlerName",
glue_crawler_staging.ref,
)
# Transformation Process
crawler_role_transformation: iam.Role = create_glue_role(
self,
f"GlueCrawlerRoleTransformation-{env_name}",
env_name,
transformation_bucket,
account_id,
region,
)
grant_transformation_crawler_database_access = lakeformation.CfnPermissions(
self,
"LFDatabasePermissionsTransformation",
data_lake_principal={
"dataLakePrincipalIdentifier": crawler_role_transformation.role_arn
},
resource=lakeformation.CfnPermissions.ResourceProperty(
database_resource=lakeformation.CfnPermissions.DatabaseResourceProperty(
catalog_id=account_id, name=database_name
),
),
permissions=["ALTER", "DROP", "DESCRIBE", "CREATE_TABLE"],
)
grant_transformation_crawler_database_access.node.add_dependency(
glue_database, tag_association
)
for file_name in transformation_file_names:
table_name = f"transformation_{os.path.splitext(file_name)[0]}"
glue_table = create_glue_table(
self,
f"TransformationGlueTable-{table_name}",
table_name=table_name,
env_name=env_name,
output_bucket=transformation_bucket,
account_id=account_id,
glue_database=glue_database,
)
grant_table_access = lakeformation.CfnPermissions(
self,
f"TransformationGlueTableLFTablePermissions-{file_name}",
data_lake_principal={
"dataLakePrincipalIdentifier": crawler_role_transformation.role_arn
},
resource=lakeformation.CfnPermissions.ResourceProperty(
table_resource=lakeformation.CfnPermissions.TableResourceProperty(
database_name=database_name, name=table_name
)
),
permissions=["SELECT", "ALTER", "DROP", "INSERT", "DESCRIBE"],
)
grant_table_access.node.add_dependency(glue_database, tag_association)
# Output the Glue table name using add_output
add_output(
self,
f"TransformationGlueTableName-{table_name}",
glue_table.ref,
)
glue_crawler_transformation = glue.CfnCrawler(
self,
"GlueCrawlerTransformation",
name=f"{env_name}_transformation_crawler",
role=crawler_role_transformation.role_arn,
database_name=glue_database.ref,
targets={
"s3Targets": [{"path": f"s3://{transformation_bucket}/{env_name}/"}]
},
)
# Output the Glue crawler name using add_output
add_output(
self,
"GlueTransformationCrawlerName",
glue_crawler_transformation.ref,
)
self.glue_crawler_staging = glue_crawler_staging
self.glue_crawler_transformation = glue_crawler_transformation
If Lake Formation is enabled in your AWS account, make sure your CDK execution role has the necessary permissions to create and access Glue resources.
In my case, I’m using LF-Tags to grant the required permissions to the CDK execution role.
And ensure the Glue crawler role has the appropriate Lake Formation permissions.
8. Orchestrate the Flow Using Step Function
Now we can orchestrate the entire pipeline using AWS Step Functions. We want the Step Function to only complete when all steps run successfully.
- For Glue jobs, it’s straightforward. Just add the following parameter to your task definition to ensure the Step Function waits for the job to complete before moving to the next step.
integration_pattern=sfn.IntegrationPattern.RUN_JOB
- For crawlers, unfortunately, we cannot use RUN_JOB
integration_pattern (Optional[IntegrationPattern]) – AWS Step Functions integrates with services directly in the Amazon States Language. You can control these AWS services using service integration patterns. Depending on the AWS Service, the Service Integration Pattern availability will vary. Default: - IntegrationPattern.REQUEST_RESPONSE for most tasks. IntegrationPattern.RUN_JOB for the following exceptions: BatchSubmitJob, EmrAddStep, EmrCreateCluster, EmrTerminationCluster, and EmrContainersStartJobRun.(https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_stepfunctions_tasks/GlueStartJobRun.html)
To handle this, we can use a Choice state and additional logic to wait until the crawler has actually completed. (https://repost.aws/questions/QUTgHzHs6bSN6n_79bCYohaQ/glue-crawler-in-state-machine-shows-as-complete-before-glue-data-catalog-is-updated)
StepFunction flow will look like
# Define the ingestion Glue job task
ingestion_glue_task = tasks.GlueStartJobRun(
self,
"IngestionGlueJob",
glue_job_name=ingestion_glue_job_name,
integration_pattern=sfn.IntegrationPattern.RUN_JOB, # Wait for job completion
arguments=sfn.TaskInput.from_object(
{
"--file_path": sfn.JsonPath.string_at(
"$.file_path"
), # Pass file_path from input
}
),
)
# Define the transformation Glue job task
transformation_glue_task = tasks.GlueStartJobRun(
self,
"TransformationGlueJob",
glue_job_name=transformation_glue_job_name,
integration_pattern=sfn.IntegrationPattern.RUN_JOB, # Wait for job completion
)
# Define the Glue crawler staging task
glue_crawler_staging_task = tasks.CallAwsService(
self,
"GlueCrawlerStagingTask",
service="glue",
action="startCrawler",
parameters={"Name": glue_crawler_staging_name},
iam_resources=[
f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_staging_name}"
],
)
# Define the Glue crawler transformation task
glue_crawler_transformation_task = tasks.CallAwsService(
self,
"GlueCrawlerTransformationTask",
service="glue",
action="startCrawler",
parameters={"Name": glue_crawler_transformation_name},
iam_resources=[
f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_transformation_name}"
],
)
# Define the SNS publish task
sns_publish_task = tasks.CallAwsService(
self,
"SNSPublishTask",
service="sns",
action="publish",
parameters={
"TopicArn": sns_topic_arn,
"Message": f"Step Function {env_name}-DataPipelineStateMachine has completed successfully.",
},
iam_resources=[f"arn:aws:sns:{region}:{account}:*"],
)
# Wait state for the staging crawler
wait_staging = sfn.Wait(
self,
"WaitForStagingCrawler",
time=sfn.WaitTime.duration(Duration.seconds(30)),
)
# GetCrawler state for the staging crawler
get_staging_crawler = tasks.CallAwsService(
self,
"GetStagingCrawlerState",
service="glue",
action="getCrawler",
parameters={"Name": glue_crawler_staging_name},
iam_resources=[
f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_staging_name}"
],
)
# Success and fail states for the staging crawler
staging_success = sfn.Succeed(self, "StagingCrawlerSuccess")
staging_failed = sfn.Fail(self, "StagingCrawlerFailed")
# Choice state for the staging crawler
staging_crawler_complete = sfn.Choice(self, "StagingCrawlerComplete")
staging_crawler_complete.when(
sfn.Condition.string_equals("$.Crawler.State", "READY"), staging_success
)
staging_crawler_complete.when(
sfn.Condition.string_equals("$.Crawler.State", "FAILED"), staging_failed
)
staging_crawler_complete.otherwise(wait_staging)
# Wait state for the transformation crawler
wait_transformation = sfn.Wait(
self,
"WaitForTransformationCrawler",
time=sfn.WaitTime.duration(Duration.seconds(30)),
)
# GetCrawler state for the transformation crawler
get_transformation_crawler = tasks.CallAwsService(
self,
"GetTransformationCrawlerState",
service="glue",
action="getCrawler",
parameters={"Name": glue_crawler_transformation_name},
iam_resources=[
f"arn:aws:glue:{region}:{account}:crawler/{glue_crawler_transformation_name}"
],
)
# Success and fail states for the transformation crawler
transformation_success = sfn.Succeed(self, "TransformationCrawlerSuccess")
transformation_failed = sfn.Fail(self, "TransformationCrawlerFailed")
# Choice state for the transformation crawler
transformation_crawler_complete = sfn.Choice(
self, "TransformationCrawlerComplete"
)
transformation_crawler_complete.when(
sfn.Condition.string_equals("$.Crawler.State", "READY"),
transformation_success,
)
transformation_crawler_complete.when(
sfn.Condition.string_equals("$.Crawler.State", "FAILED"),
transformation_failed,
)
transformation_crawler_complete.otherwise(wait_transformation)
# Run transformation Glue job and Glue crawler staging in parallel
parallel_tasks = sfn.Parallel(self, "ParallelTasks")
parallel_tasks.branch(
transformation_glue_task.next(glue_crawler_transformation_task)
.next(wait_transformation)
.next(get_transformation_crawler)
.next(transformation_crawler_complete)
)
parallel_tasks.branch(
glue_crawler_staging_task.next(wait_staging)
.next(get_staging_crawler)
.next(staging_crawler_complete)
)
# Chain the ingestion Glue job, parallel tasks, transformation crawler, and SNS publish task
definition = ingestion_glue_task.next(parallel_tasks).next(sns_publish_task)
# Create the Step Function
self.state_machine = sfn.StateMachine(
self,
f"{env_name}-DataPipelineStateMachine",
definition=definition,
)
To make the pipeline fully event-driven, we’ll add a trigger that starts the Step Function automatically whenever new files are added to the source bucket.
- Set up an S3 Event Notification: Configure the source S3 bucket to send an event to a Lambda function whenever a new object is created.
- Lambda function: Check if all required files (e.g., customer_data.csv, transaction_data.csv) are present in the bucket. If all required files are found, start the Step Function
# Reference the existing S3 bucket
input_bucket = s3.Bucket.from_bucket_name(
self,
"ExistingInputBucket",
bucket_name=input_bucket_name,
)
# Create a Lambda function to trigger the Step Function
trigger_lambda = _lambda.Function(
self,
"TriggerLambda",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="trigger.handler",
code=_lambda.Code.from_asset("./scripts/lambda/"), # Path to Lambda code
environment={
"STEP_FUNCTION_ARN": self.state_machine.state_machine_arn,
"REGION": region,
"ACCOUNT": account,
"BUCKET_NAME": input_bucket.bucket_name,
"FILE_NAMES": ",".join(
file_names
), # Convert list to comma-separated string
},
)
# Grant permissions to the Lambda function
input_bucket.grant_read(trigger_lambda)
self.state_machine.grant_start_execution(trigger_lambda)
# Add S3 event notification to invoke the Lambda function only for files in the env_name folder
input_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3_notifications.LambdaDestination(trigger_lambda),
s3.NotificationKeyFilter(
prefix=f"{env_name}/"
), # Trigger only for files in env_name folder
)
9. Deploy and Verify resources in AWS
Now it’s time to deploy everything to your AWS environment. Run the following command:
cdk deploy DataPipelineStack
Once the deployment is complete, go to the AWS CloudFormation Console to verify that all resources have been created successfully.
10. Upload data to S3 bucket
Once the resources are deployed, upload the sample data files (customer_data.csv and transaction_data.csv) to the source S3 bucket.
You can do this either via the AWS Console or by using the AWS CLI.
aws s3 cp /path/to/source s3://bucket-name/ --recursive
Make sure the files are placed under the correct prefix, which should match the environment name.
Once uploaded, the Lambda function will check for all required files and trigger the Step Function if everything is in place.
11. Monitor the Pipeline and Query Data with Athena
After uploading the data, you can monitor the entire process through AWS Step Function and check status of each step.
To stay informed, you can subscribe an email address or mobile phone number to the SNS topic.
This way, you’ll receive a notification when the Step Function completes successfully or if there’s any error during execution.
Finally, once the process is complete and the Glue catalog is updated, you can query the output data using Amazon Athena:
Quick Notes and Thoughts
- If you see folders named like *_$folder$ in your output S3 bucket, don’t worry — this is a placeholder created by Hadoop when writing to a path that doesn’t yet exist. To avoid permission errors, make sure your Glue job role has the right permissions to create folders in the target S3 location.
- On a side note: I love using Python — it’s one of my favourite languages But when it comes to AWS CDK, I often feel that TypeScript is more “native.” Maybe that’s because TypeScript was the first language supported by CDK? The documentation definitely feel more complete in TypeScript.
TypeScript was the first language supported by the AWS CDK, and much of the AWS CDK example code is written in TypeScript. This guide includes a topic specifically to show how to adapt TypeScript AWS CDK code for use with the other supported languages. For more information, see Comparing AWS CDK in TypeScript with other languages. (https://docs.aws.amazon.com/cdk/v2/guide/languages.html)
https://docs.aws.amazon.com/cdk/v2/guide/work-with.html#work-with-cdk-compare