Auto Upload an S3 File to Redshift

  • Create an S3 bucket
  • On S3 permissions tab, scroll down and create an event for both posts and puts
  • Create a Lambda Python function
  • Add a trigger to the Lamba Function – specify the S3 bucket above
  • write AWS Python SDK also referred to as Boto3 / pyscopg2 (within Lambda) to run copy script to Redshift – maybe included as a Lambda Layer and then attached (add a layer to the Lambda function you are creating Python 3.7 or higher)
  • Add role to grant s3 access Redshift and Lambda

Lambda code will look like this:

import json
import psycopg2
import os

def lambda_handler(event, context):
    print("event collected is {}".format(event))
    for record in event['Records'] :
        s3_bucket = record['s3']['bucket']['name']
        print("Bucket name is {}".format(s3_bucket))
        s3_key = record['s3']['object']['key']
        print("Bucket key name is {}".format(s3_key))
        from_path = "s3://{}/{}".format(s3_bucket, s3_key)
        print("from path {}".format(from_path))
        Access_key = os.getenv('AWS_Access_key')
        Access_Secret = os.getenv('AWS_Access_Secret')
        dbname = os.getenv('dbname')
        host = os.getenv('host')
        user = os.getenv('user')
        password = os.getenv('password')
        tablename = os.getenv('tablename')
        connection = psycopg2.connect(dbname = dbname,
                                       host = host,
                                       port = '5439',
                                       user = user,
                                       password = password)
                                       
        print('connected....')
        curs = connection.cursor()
        print('cursor obtained....')
        # should be able to do this copy by granting passthru role to redshift to read s3 - without specifying keys
        querySQL = "COPY {} FROM '{}' CREDENTIALS 'aws_access_key_id={};aws_secret_access_key={}' CSV;".format(tablename,from_path,Access_key,Access_Secret)
        print("query is {}".format(querySQL))
        curs.execute(querySQL)
        connection.commit()
         
        print('copy executed and committed....')
        curs.close()
        print('cursor closed....')
        connection.close()
        print('connection closed....')
Scroll to Top