APIGW + Lambda + DynamoDB
In this post, let see a demo application which is accessible by the users via API GW and the back end logic for the application is served by AWS lambda function. For persistent storage, we are using DynamoDB table.
Lets start with DynamoDB table:
I created a DynamoDB table by the name "employee_table".
Table has a primary key "employee_id" of datatype Number. So, any query to the table must be made through the key "employee_id", else we need to scan the entire table.
Let's add an item to the table from console.
Now, we added an item to the table.
Let's do the same and other action via API GW and Lambda Functions. Let's create a lambda function. One lambda function, based on the query parameter it invokes the function within the lambda.
import json
import boto3
from botocore.exceptions import ClientError
from decimal import Decimal
from boto3.dynamodb.conditions import Key
# Initialize the DynamoDB client
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
dynamodb_table = dynamodb.Table('employee_table')
status_check_path = '/status'
employee_path = '/employee'
employees_path = '/employees'
def lambda_handler(event, context):
print('Request event: ', event)
response = None
try:
http_method = event.get('httpMethod')
path = event.get('path')
if http_method == 'GET' and path == status_check_path:
response = build_response(200, 'Service is operational')
elif http_method == 'GET' and path == employee_path:
employee_id = event['queryStringParameters']['employee_id']
response = get_employee(int(employee_id))
elif http_method == 'GET' and path == employees_path:
response = get_employees()
elif http_method == 'POST' and path == employee_path:
response = save_employee(json.loads(event['body']))
elif http_method == 'PATCH' and path == employee_path:
body = json.loads(event['body'])
response = modify_employee(body['employee_id '], body['updateKey'], body['updateValue'])
elif http_method == 'DELETE' and path == employee_path:
body = json.loads(event['body'])
response = delete_employee(body['employee_id '])
else:
response = build_response(404, '404 Not Found')
except Exception as e:
print('Error:', e)
response = build_response(400, 'Error processing request')
return response
def get_employee(employee_id):
try:
response = dynamodb_table.get_item(Key={'employee_id': employee_id})
return build_response(200, response.get('Item'))
except ClientError as e:
print('Error:', e)
return build_response(400, e.response['Error']['Message'])
def get_employees():
try:
scan_params = {
'TableName': dynamodb_table.name
}
return build_response(200, scan_dynamo_records(scan_params, []))
except ClientError as e:
print('Error:', e)
return build_response(400, e.response['Error']['Message'])
def scan_dynamo_records(scan_params, item_array):
response = dynamodb_table.scan(**scan_params)
item_array.extend(response.get('Items', []))
if 'LastEvaluatedKey' in response:
scan_params['ExclusiveStartKey'] = response['LastEvaluatedKey']
return scan_dynamo_records(scan_params, item_array)
else:
return {'employees': item_array}
def save_employee(request_body):
try:
dynamodb_table.put_item(Item=request_body)
body = {
'Operation': 'SAVE',
'Message': 'SUCCESS',
'Item': request_body
}
return build_response(200, body)
except ClientError as e:
print('Error:', e)
return build_response(400, e.response['Error']['Message'])
def modify_employee(employee_id, update_key, update_value):
try:
response = dynamodb_table.update_item(
Key={'employee_id': employee_id},
UpdateExpression=f'SET {update_key} = :value',
ExpressionAttributeValues={':value': update_value},
ReturnValues='UPDATED_NEW'
)
body = {
'Operation': 'UPDATE',
'Message': 'SUCCESS',
'UpdatedAttributes': response
}
return build_response(200, body)
except ClientError as e:
print('Error:', e)
return build_response(400, e.response['Error']['Message'])
def delete_employee(employee_id):
try:
response = dynamodb_table.delete_item(
Key={'employee_id': employee_id},
ReturnValues='ALL_OLD'
)
body = {
'Operation': 'DELETE',
'Message': 'SUCCESS',
'Item': response
}
return build_response(200, body)
except ClientError as e:
print('Error:', e)
return build_response(400, e.response['Error']['Message'])
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
# Check if it's an int or a float
if obj % 1 == 0:
return int(obj)
else:
return float(obj)
# Let the base class default method raise the TypeError
return super(DecimalEncoder, self).default(obj)
def build_response(status_code, body):
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps(body, cls=DecimalEncoder)
}
Below are the resources and the method I created and integrated to a single lambda function.
Let's look at the API GW resources and the method's.
Let's start with exploring the GET method of the resource /employee.
This method is to check if an employee exist in the database or not. So, this method must be invoked with the query parameter (Primary key which is employee_id). So, the API GW endpoint must be followed like below:
https://sddd.execute-api.us-east-1.amazonaws.com/production/employee?employee_id=1
Let's try the next method on how to add item to a table using POST method under the resource /employee.
Likewise. We can have our customer applications can be build leveraging APIGW with Lambda and DynamoDB.
Comments
Post a Comment