AI & Serverless
DynamoDB Streams, Lambda & AI Integration
Build event-driven AI pipelines with DynamoDB Streams, trigger Lambda functions on data changes, and integrate with AI services.
DynamoDB Streams
DynamoDB Streams is a change data capture (CDC) feature that captures every modification (insert, update, delete) made to a table and makes it available as an ordered log for 24 hours.
Each stream record contains:
- The item's primary key
- The old image (before the change)
- The new image (after the change)
- The type of change (INSERT, MODIFY, REMOVE)
Streams unlock event-driven architectures: instead of polling your database for changes, you react to changes as they happen.
DynamoDB Streams + Lambda
The most common pattern: attach an AWS Lambda function to a DynamoDB Stream. Lambda invokes your function for each batch of stream records.
// Lambda function triggered by DynamoDB Stream
export const handler = async (event) => {
for (const record of event.Records) {
const { eventName, dynamodb } = record;
if (eventName === 'INSERT') {
const newItem = dynamodb.NewImage;
const userId = newItem.PK.S.replace('USER#', '');
// Send welcome email on new user
await sendWelcomeEmail(userId);
}
if (eventName === 'MODIFY') {
const oldItem = dynamodb.OldImage;
const newItem = dynamodb.NewImage;
// Detect status changes on orders
if (oldItem.SK.S.startsWith('ORDER#')) {
const oldStatus = oldItem.status?.S;
const newStatus = newItem.status?.S;
if (oldStatus !== newStatus && newStatus === 'SHIPPED') {
await sendShippingNotification(newItem);
}
}
}
}
};AI Pipeline: Automatic Embedding Generation
A powerful pattern for AI applications: when a document is inserted or updated in DynamoDB, automatically generate an embedding and store it in a vector database for semantic search.
import Anthropic from '@anthropic-ai/sdk';
import { index } from './pinecone-client';
const anthropic = new Anthropic();
export const handler = async (event) => {
const inserts = event.Records.filter(r => r.eventName === 'INSERT');
for (const record of inserts) {
const item = record.dynamodb.NewImage;
// Only process content items
if (!item.SK?.S?.startsWith('CONTENT#')) continue;
const text = item.body?.S;
if (!text) continue;
// Generate embedding via Voyage AI (Anthropic's embedding partner)
const embeddingResponse = await fetch('https://api.voyageai.com/v1/embeddings', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.VOYAGE_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ input: text, model: 'voyage-3' }),
});
const { data } = await embeddingResponse.json();
const embedding = data[0].embedding;
// Store in Pinecone for semantic search
await index.upsert([{
id: item.PK.S + '#' + item.SK.S,
values: embedding,
metadata: {
title: item.title?.S,
createdAt: item.createdAt?.S,
},
}]);
}
};DynamoDB with AWS AppSync (Real-time AI Responses)
DynamoDB integrates natively with AWS AppSync for real-time GraphQL subscriptions. As AI generates responses token by token, you can stream them via DynamoDB → Streams → AppSync subscriptions to the frontend.
PartiQL: SQL-Like Queries
DynamoDB supports PartiQL, a SQL-compatible query language, as an alternative to the expression-based API:
import { ExecuteStatementCommand } from '@aws-sdk/client-dynamodb';
const result = await client.send(new ExecuteStatementCommand({
Statement: "SELECT * FROM Users WHERE email = 'alice@example.com'",
}));This is more readable for simple lookups but doesn't change DynamoDB's underlying constraints — you still need to query by primary key or use an index.
Example
// DynamoDB + Lambda: auto-generate AI summary on insert
import Anthropic from '@anthropic-ai/sdk';
import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
const anthropic = new Anthropic();
export const handler = async (event) => {
for (const record of event.Records) {
if (record.eventName !== 'INSERT') continue;
const item = record.dynamodb.NewImage;
if (!item.body?.S) continue;
const summary = await anthropic.messages.create({
model: 'claude-haiku-4-5',
max_tokens: 150,
messages: [{
role: 'user',
content: `Summarize in one sentence: ${item.body.S}`
}]
});
await docClient.send(new UpdateCommand({
TableName: 'MyApp',
Key: { PK: item.PK.S, SK: item.SK.S },
UpdateExpression: 'SET aiSummary = :s',
ExpressionAttributeValues: { ':s': summary.content[0].text },
}));
}
};Want to run this code interactively?
Try in Compiler