Using Lambda load data into DynamoDB

Shivam Shrivastava
AWS Tip
Published in
5 min readApr 17, 2022

--

Isn’t it annoying when in an ETL pipeline, the upstream system fails to deliver the data and your job fails due to the missing data or you have to retrigger the job manually when the data finally arrives.
Using AWS Lambda we can trigger jobs when the data is delivered to us hence avoiding all the hassle!

Let us follow the above scenario and load data into Dynamo DB from a S3 file drop, using Lambda:

Step 1: Create an IAM role that has programmatic access to S3(AmazonS3FullAccess) and DynamoDB(AmazonDynamoDBFullAccess).
Note : If you are not familiar with how to generate programmatic access keys, please read the following article.

Step 2: Create the Lambda function that will execute our desired solution:

Navigate to the AWS Lambda Service -> Select “Functions” from the index on left -> click on “Create Function”

AWS Lambda: Functions

The above step will navigate you to a new page where you need to fill the form on Lambda function’s configurations. Select “Author from scratch”, provide the “Function name” and the “Runtime”. When done, click on “Create Function” on the bottom right corner of the page.

Function Configuration

The “Code Source” console will be available now. Keep this aside and set up the source and target locations for our program.

Step 3: Set up a source S3 bucket where the source file will be placed.
Create a S3 bucket and navigate to it’s properties:

Bucket Properties

Scroll down to “Event Notifications” and click on “Create event notification”:

Event Notifications

Input the name of the event and select the action which will cause the event(like uploading a file or deleting one); in our case the Event Type will be “Put”:

Scroll to the bottom of the page and you will find the “Destination” form. Select “Lambda Function” and from the drop down select the lambda function we created above.
Note : If you can’t find the lambda function you created in the above step, it could be because you created the bucket and function in two different regions.

Destination Details

Step 4: Create the target, a DynamoDB table. Navigate to DynamoDB service, select “Tables” from the index on left and click on “Create Table”:

DynamoDB Console

Provide the details like table name and the partition key and select “Create Table” from the bottom right of the page. Alternatively if you want to create table via code, go through the following article.

Create Table: DynamoDB

Step 5: Once the table is created, navigate back to the lambda function we created above and input the following code and click on “Deploy”:

import json
import boto3

ACCESS_KEY = 'AKIAR2IB4OJH6EFORDS2'
SECRET_KEY = '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'
#To get the table object from DynamoDB
def getTable(table_name, ACCESS_KEY, SECRET_KEY):
#session object to connect to IAM user
session = boto3.Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key= SECRET_KEY)
#Create a DynamoDB resource
dynamoDB = session.resource('dynamodb', region_name= 'ap-south-1')

pokemon_table = dynamoDB.Table(table_name)

return pokemon_table
s3_client = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key= SECRET_KEY, region_name = 'ap-south-1')def lambda_handler(event, context):
try:

#Result variable to capture whether data has been loaded or not
data_loaded = False

#Fetch bucket and key information from the event object
bucket_name = event['Records'][0]['s3']['bucket']
key = event['Records'][0]['s3']['object']['key']

#Read the data from the s3 file
json_data = s3_client.get_object(Bucket = 'pokemon-battle-team-bucket', Key = 'pokemon.json')
pokemon_data = json.loads(json_data['Body'].read())

#Get the dynamo db table where we want to populate our data
pokemon_table = getTable("pokemon", ACCESS_KEY, SECRET_KEY)

for data in pokemon_data:
#Fetch only the required information
id = data['id']
name = data['name']['english']
type = data['type'][0]
#Insert the data point into the table
pokemon_table.put_item(Item = {'id' : id, 'name':name, 'type': type})

data_loaded = True

except Exception as e:
print(event)
print(e)

finally:
return data_loaded
Deploy Code

Step 6: Validate the function by uploading a S3 file onto the bucket we created above:

def write_to_s3_bucket(client, BUCKET_NAME, PATH, DATA):
try:
response = client.put_object(Bucket = BUCKET_NAME, Key = PATH, Body = DATA)
return response['ResponseMetadata']['HTTPStatusCode']
except Exception as e:
raise(e)
#Create a file inside the bucket
pokemon_data = """
[{
"id": 1,
"name": {
"english": "Bulbasaur",
"japanese": "フシギダネ",
"chinese": "妙蛙种子",
"french": "Bulbizarre"
},
"type": [
"Grass",
"Poison"
],
"base": {
"HP": 45,
"Attack": 49,
"Defense": 49,
"Sp. Attack": 65,
"Sp. Defense": 65,
"Speed": 45
}
},
{
"id": 2,
"name": {
"english": "Ivysaur",
"japanese": "フシギソウ",
"chinese": "妙蛙草",
"french": "Herbizarre"
},
"type": [
"Grass",
"Poison"
],
"base": {
"HP": 60,
"Attack": 62,
"Defense": 63,
"Sp. Attack": 80,
"Sp. Defense": 80,
"Speed": 60
}
},
{
"id": 3,
"name": {
"english": "Venusaur",
"japanese": "フシギバナ",
"chinese": "妙蛙花",
"french": "Florizarre"
},
"type": [
"Grass",
"Poison"
],
"base": {
"HP": 80,
"Attack": 82,
"Defense": 83,
"Sp. Attack": 100,
"Sp. Defense": 100,
"Speed": 80
}
},
{
"id": 4,
"name": {
"english": "Charmander",
"japanese": "ヒトカゲ",
"chinese": "小火龙",
"french": "Salamèche"
},
"type": [
"Fire"
],
"base": {
"HP": 39,
"Attack": 52,
"Defense": 43,
"Sp. Attack": 60,
"Sp. Defense": 50,
"Speed": 65
}
},
{
"id": 5,
"name": {
"english": "Charmeleon",
"japanese": "リザード",
"chinese": "火恐龙",
"french": "Reptincel"
},
"type": [
"Fire"
],
"base": {
"HP": 58,
"Attack": 64,
"Defense": 58,
"Sp. Attack": 80,
"Sp. Defense": 65,
"Speed": 80
}
}]
"""
s3_client = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key= SECRET_KEY, region_name = 'ap-south-1')BUCKET_NAME = "pokemon-battle-team-bucket"response = write_to_s3_bucket(s3_client, BUCKET_NAME, "pokemon.json", pokemon_data)

Open “DynamoDB” -> “Tables” -> Select the table we created -> “Explore Data Items” -> “Items Returned”:

Data Loaded in DynamoDB

--

--

Data Engineer at Fractal. Enthusiastic about learning, jotting those learnings down and sharing with the world.