Skip to content

This is a data pipeline built with the purpose of serving a business team.

Notifications You must be signed in to change notification settings

camposvinicius/aws-snowflake-etl

Repository files navigation

ETL for Business Team

This is a data pipeline built with the purpose of serving a business team.

First, the pipeline itself is a sequence of steps and relatively simple, as in the image below, where we use the Step Function tool as our main orchestrator.


stepfunction


All resource creation was done via Cloudformation and basically we have two stacks, one called dataengineer-requirements where we create our codes bucket and our docker images repository and then the rest of all the necessary infrastructure for the pipeline called dataengineer-stack.

requirements.yaml

AWSTemplateFormatVersion: "2010-09-09"
Description: Data Engineer Requirements

Resources:

  ## BUCKETS 
  
  CodesZone:
    Type: AWS::S3::Bucket    
    Properties:
      BucketName: data-codeszone    
      Tags:
        - Key: DataBucket
          Value: CodesZone

  ## ECR REPOSITORY

  ECRLambdaExtractImage: 
    Type: AWS::ECR::Repository
    Properties: 
      RepositoryName: "lambda-extract-image"
      ImageScanningConfiguration: 
        ScanOnPush: true

Outputs:

  ## BUCKETS OUTPUT

  CodesZoneBucketName:
    Description: The CodesZone Bucket Name
    Value: !Ref CodesZone

  ## ECR REPOSITORY OUTPUT    

  ECRLambdaExtractImageArn:
    Value: !GetAtt ECRLambdaExtractImage.Arn

deploy.yaml

AWSTemplateFormatVersion: "2010-09-09"
Description: Data Engineer Stack

Resources:

  ## BUCKETS 

  RawZone:
    Type: AWS::S3::Bucket    
    Properties:
      BucketName: data-rawzone    
      Tags:
        - Key: DataBucket
          Value: RawZone
  
  ProcessingZone:
    Type: AWS::S3::Bucket    
    Properties:
      BucketName: data-processingzone    
      Tags:
        - Key: DataBucket
          Value: ProcessingZone
  
  DeliveryZone:
    Type: AWS::S3::Bucket    
    Properties:
      BucketName: data-deliveryzone    
      Tags:
        - Key: DataBucket
          Value: DeliveryZone
  
  QueryResultsZone:
    Type: AWS::S3::Bucket    
    Properties:
      BucketName: data-queryresultszone    
      Tags:
        - Key: DataBucket
          Value: QueryResultsZone

  ## GLUE RESOURCES

  GlueDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: "deliverydatabase"

  GlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Name: "deliverycrawler"
      Role: Glue-S3
      DatabaseName: !Ref GlueDatabase
      RecrawlPolicy: 
        RecrawlBehavior: CRAWL_EVERYTHING        
      Targets:
        S3Targets:
          - Path: s3://data-deliveryzone
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG 
      Configuration: "{\"Version\":1.0,\"Grouping\":{\"TableGroupingPolicy\":\"CombineCompatibleSchemas\"}}"    
      Tags: {
        DataCrawler: DeliveryCrawler
      }

  ## LAMBDA EXTRACT

  LambdaExtract:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: lambda-extract
      Code: 
        ImageUri: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_REGION_NAME>.amazonaws.com/lambda-extract-image:latest            
      Role: arn:aws:iam::<YOUR_ACCOUNT_ID>:role/LambdaExecutionRoleData
      Timeout: 600
      MemorySize: 450
      PackageType: Image

  ## GLUE SPARK JOBS      

  GlueSparkJob1:
    Type: AWS::Glue::Job
    Properties:
      Role: Glue-S3
      Name: gluesparkjob1
      Command:
        Name: glueetl
        ScriptLocation: "s3://data-codeszone/glue_job_1.py"
      ExecutionProperty:
        MaxConcurrentRuns: 1
      MaxRetries: 0
      GlueVersion: 4.0

  GlueSparkJob2:
    Type: AWS::Glue::Job
    Properties:
      Role: Glue-S3
      Name: gluesparkjob2
      Command:
        Name: glueetl
        ScriptLocation: "s3://data-codeszone/glue_job_2.py"
      ExecutionProperty:
        MaxConcurrentRuns: 1
      MaxRetries: 0
      GlueVersion: 4.0

  ## GLUE WORKFLOW STRUCTURE

  GlueWorkflow:
    Type: AWS::Glue::Workflow
    Properties: 
      Name: glue-workflow
      Description: Workflow for orchestrating the Glue Job

  StartWorkflow:
    Type: AWS::Glue::Trigger
    Properties:
      WorkflowName: !Ref GlueWorkflow
      Name: start-workflow
      Description: Start Workflow
      Type: ON_DEMAND
      Actions:
        - JobName: !Ref GlueSparkJob1

  WatchingGlueSparkJob1:
    Type: AWS::Glue::Trigger
    Properties:
      WorkflowName: !Ref GlueWorkflow
      Name: watching-glue-spark-job1
      Description: Watching Glue Spark Job1
      Type: CONDITIONAL
      StartOnCreation: True
      Actions:
        - JobName: !Ref GlueSparkJob2
      Predicate:
        Conditions:
          - LogicalOperator: EQUALS
            JobName: !Ref GlueSparkJob1
            State: SUCCEEDED

  WatchingGlueSparkJob2:
    Type: AWS::Glue::Trigger
    Properties:
      WorkflowName: !Ref GlueWorkflow
      Name: watching-glue-spark-job2
      Description: Watching Glue Spark Job2
      Type: CONDITIONAL
      StartOnCreation: True
      Predicate:
        Conditions:
          - LogicalOperator: EQUALS
            JobName: !Ref GlueSparkJob2
            State: SUCCEEDED      
      Actions:
        - CrawlerName: !Ref GlueCrawler

  ## STEP FUNCTION

  WorkflowStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: Workflow-StateMachine
      DefinitionString: |-
        {
          "Comment": "Workflow State Machine",
          "StartAt": "lambda-extract",
          "States": {
            "lambda-extract": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "OutputPath": "$.Payload",
              "Parameters": {
                "Payload.$": "$",
                "FunctionName": "arn:aws:lambda:<YOUR_REGION_NAME>:<YOUR_ACCOUNT_ID>:function:lambda-extract:$LATEST"
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.AWSLambdaException",
                    "Lambda.SdkClientException",
                    "Lambda.TooManyRequestsException"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 6,
                  "BackoffRate": 2
                }
              ],
              "Next": "glue-workflow"
            },
            "glue-workflow": {
              "Type": "Task",
              "End": true,
              "Parameters": {
                "Name": "glue-workflow"
              },
              "Resource": "arn:aws:states:::aws-sdk:glue:startWorkflowRun"
            }
          }
        }
      RoleArn: arn:aws:iam::<YOUR_ACCOUNT_ID>:role/service-role/StepFunctions-IngestionDatalakeStateMachine-role-a46669ee
      Tags:
        -
          Key: "RunWorkflow"
          Value: "TriggerLambda"
        -
          Key: "RunWorkflow"
          Value: "TriggerGlueWorkflow"            
          

Outputs:

  ## BUCKETS OUTPUT

  RawZoneBucketName:
    Description: The RawZone Bucket Name
    Value: !Ref RawZone

  ProcessingZoneBucketName:
    Description: The ProcessingZone Bucket Name
    Value: !Ref ProcessingZone

  DeliveryZoneBucketName:
    Description: The DeliveryZone Bucket Name
    Value: !Ref DeliveryZone

  QueryResultsZoneBucketName:
    Description: The QueryResultsZone Bucket Name
    Value: !Ref QueryResultsZone

  ## GLUE RESOURCES OUTPUT

  GlueDatabaseName:
    Description: The Glue Database Name
    Value: !Ref GlueDatabase
    
  GlueCrawlerName:
    Description: The Glue Crawler Name
    Value: !Ref GlueCrawler

  ## LAMBDA EXTRACT OUTOUT    

  LambdaExtractArn:
    Value: !GetAtt LambdaExtract.Arn    

CloudFormation


We've also created two CICD pipelines, one for creating resources whenever there's a PUSH on the master branch, and another for destroying resources that can be triggered manually.


cicd-construct


This set of buckets creates our datalake and the data goes through the complete ETL process involving all the main layers, here we call data-rawzone, data-processingzone and data-deliveryzone.


Datalake


Now let's get into the lambda function code and understand what it is all about.


import requests, io, tempfile, os, boto3
from zipfile import ZipFile

file_name = '2m-Sales-Records.zip'
bucket = "data-rawzone"
url = 'https://eforexcel.com/wp/wp-content/uploads/2020/09/2m-Sales-Records.zip'

def lambda_handler(event, context):

    headers = {
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Firefox/60.0"
    }

    s3 = boto3.resource('s3')

    with tempfile.TemporaryDirectory() as temp_path:
        temp_dir = os.path.join(temp_path, 'temp')
        with open(temp_dir, 'wb') as f:
            req = requests.get(url, headers=headers)  
            f.write(req.content)
        
        s3.Bucket(bucket).upload_file(temp_dir, file_name)
    
        zip_obj = s3.Object(bucket_name=bucket, key=file_name)
        buffer = io.BytesIO(zip_obj.get()["Body"].read())
        
        z = ZipFile(buffer)
        for filename in z.namelist():
            s3.meta.client.upload_fileobj(
                z.open(filename),
                Bucket=bucket,
                Key='data/' + f'{filename}'
            )

    for file in s3.Bucket(bucket).objects.all():

        ## Removing the whitespaces in the filename

        if str(file).__contains__('data/'):
            OLD_NAME = file.key
            NEW = file.key.replace(" ", "_")

            try:
                s3.Object(bucket, NEW).copy_from(CopySource=f'{bucket}/{OLD_NAME}')
                s3.Object(bucket, OLD_NAME).delete()
            except:
                pass
            
        ## Moving original file to another prefix    

        elif str(file).__contains__('.zip'):
            OLD_PATH = file.key
            NEW_PREFIX = 'old-data/'+OLD_PATH

            try:
                s3.Object(bucket, NEW_PREFIX).copy_from(CopySource=f'{bucket}/{OLD_PATH}')
                s3.Object(bucket, OLD_PATH).delete()
            except:
                pass

Basically, we have a containerized lambda function that is making a request and writing the content, which is a zip file, inside a temporary directory to later upload to s3, unzip and extract the csv file that is inside it, replace the blank spaces in the filename by underscores and move the zip file to another prefix within the same bucket.

This is the result, where in data we have the renamed csv file and old-data we have the zip file.


raw


After that, we have a glue workflow using spark jobs and crawler that will basically do:

Read the csv file from the raw zone → Save to parquet in just a single partition in the processing zone → Save to parquet and partitioned by COUNTRY in the delivery zone → Trigger the glue crawler to make the table available in Athena.


glueworkflow

You can check what the codes are doing in the code1 and code2.

The results:


processingzone


deliverzone


After the crawler runs, here is our final table with the data schema already enriched, since all the fields in our original schema were strings, which you can check how it was done in our glue spark job 1.


table


WITH metrics as (
    select 
        country,
        order_date,
        total_revenue,
        row_number() over (partition by country order by total_revenue desc) as R
    from data_deliveryzone where country in ('Canada', 'United States of America')
)
SELECT
    country,
    order_date,
    total_revenue
FROM
    metrics
where R <=2    
order by 3 desc;

Where we use this simple query as an example to get the two biggest revenues from Canada and USA, and your order dates.

And finally we have the destruction of resources via our CICD pipeline that we mentioned at the beginning.


cicd-destruction


cloudformationdestruction

You can understand the code to construct here and to destroy here.

-------------------------------------------------- EXTRA BONUS --------------------------------------------------

Let's imagine that you need to make the data available not only in Athena but also in Snowflake

Well, there are many ways to do this, let's show you one of them. First we can create a snowflake integration with our dalalake using this structure:

create or replace storage integration datadelivery
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::YOUR_ACCOUNT_ID:role/RoleToAccessS3DataDeliveryZone'
storage_allowed_locations = (
    's3://data-deliveryzone/data/'
);

sf-integration


Remembering that you need to create a Role that will have a policy that gives you the access to your s3 bucket that you want.

You can follow this step by step to understand how to configure the integration and role.

After that you will create a STAGE using the created integration, which is basically where the files will be inside the snowflake.


create or replace stage DELIVERYZONE.DELIVERYZONE.STAGE_DELIVERYZONE
url='s3://data-deliveryzone/data/'
storage_integration = datadelivery;

sf-stage


You will also need to create the file format that snowflake will work in and ingest with your file, in our case is parquet.

CREATE OR REPLACE FILE FORMAT DELIVERYZONE.DELIVERYZONE.PARQUET
TYPE = PARQUET
SNAPPY_COMPRESSION = TRUE
BINARY_AS_TEXT = TRUE
TRIM_SPACE = FALSE
NULL_IF = ('NULL', 'NUL', '');

sf-fileformat


Now basically you can make the table available directly integrated to your datalake, via external table. First map the schema of your table and you can follow the idea below, in this case, we have a partitioned table and to help you map your partition you can do a query like this to understand your partitioned table and metadata:


select 
    metadata$filename, 
    (split_part(split_part(metadata$filename, '/', 2),'=', 2)::TEXT)
FROM @DELIVERYZONE.DELIVERYZONE.STAGE_DELIVERYZONE (file_format => 'DELIVERYZONE.DELIVERYZONE.PARQUET') t;

sf-metadatafilename


create or replace external table DELIVERYZONE.DELIVERYZONE.TABLE_DELIVERYZONE (
  REGION string AS (value:REGION::string), 
  ITEM_TYPE string AS (value:ITEM_TYPE::string), 
  SALES_CHANNEL string AS (value:SALES_CHANNEL::string), 
  ORDER_PRIORITY string AS (value:ORDER_PRIORITY::string), 
  ORDER_DATE date AS (value:ORDER_DATE::date), 
  ORDER_ID int AS (value:ORDER_ID::int), 
  SHIP_DATE date AS (value:SHIP_DATE::date), 
  UNITS_SOLD int AS (value:UNITS_SOLD::int), 
  UNIT_PRICE decimal AS (value:UNIT_PRICE::decimal(10,2)), 
  UNIT_COST decimal AS (value:UNIT_COST::decimal(10,2)), 
  TOTAL_REVENUE decimal AS (value:TOTAL_REVENUE::decimal(10,2)), 
  TOTAL_COST decimal AS (value:TOTAL_COST::decimal(10,2)), 
  TOTAL_PROFIT decimal AS (value:TOTAL_PROFIT::decimal(10,2)), 
  COUNTRY STRING AS (split_part(split_part(metadata$filename, '/', 2),'=', 2)::TEXT)
    ) 
  partition by (COUNTRY)
  location = @DELIVERYZONE.DELIVERYZONE.STAGE_DELIVERYZONE
  auto_refresh = true 
  file_format = (FORMAT_NAME='DELIVERYZONE.DELIVERYZONE.PARQUET'); 

And here is our external table created.


sf-external_table_creation



sf-external_table_preview