AI & Serverless

DynamoDB Streams, Lambda & AI Integration

Build event-driven AI pipelines with DynamoDB Streams, trigger Lambda functions on data changes, and integrate with AI services.

DynamoDB Streams

DynamoDB Streams is a change data capture (CDC) feature that captures every modification (insert, update, delete) made to a table and makes it available as an ordered log for 24 hours.

Each stream record contains:

  • The item's primary key
  • The old image (before the change)
  • The new image (after the change)
  • The type of change (INSERT, MODIFY, REMOVE)

Streams unlock event-driven architectures: instead of polling your database for changes, you react to changes as they happen.

DynamoDB Streams + Lambda

The most common pattern: attach an AWS Lambda function to a DynamoDB Stream. Lambda invokes your function for each batch of stream records.

javascript
// Lambda function triggered by DynamoDB Stream
export const handler = async (event) => {
  for (const record of event.Records) {
    const { eventName, dynamodb } = record;

    if (eventName === 'INSERT') {
      const newItem = dynamodb.NewImage;
      const userId = newItem.PK.S.replace('USER#', '');

      // Send welcome email on new user
      await sendWelcomeEmail(userId);
    }

    if (eventName === 'MODIFY') {
      const oldItem = dynamodb.OldImage;
      const newItem = dynamodb.NewImage;

      // Detect status changes on orders
      if (oldItem.SK.S.startsWith('ORDER#')) {
        const oldStatus = oldItem.status?.S;
        const newStatus = newItem.status?.S;

        if (oldStatus !== newStatus && newStatus === 'SHIPPED') {
          await sendShippingNotification(newItem);
        }
      }
    }
  }
};

AI Pipeline: Automatic Embedding Generation

A powerful pattern for AI applications: when a document is inserted or updated in DynamoDB, automatically generate an embedding and store it in a vector database for semantic search.

javascript
import Anthropic from '@anthropic-ai/sdk';
import { index } from './pinecone-client';

const anthropic = new Anthropic();

export const handler = async (event) => {
  const inserts = event.Records.filter(r => r.eventName === 'INSERT');

  for (const record of inserts) {
    const item = record.dynamodb.NewImage;

    // Only process content items
    if (!item.SK?.S?.startsWith('CONTENT#')) continue;

    const text = item.body?.S;
    if (!text) continue;

    // Generate embedding via Voyage AI (Anthropic's embedding partner)
    const embeddingResponse = await fetch('https://api.voyageai.com/v1/embeddings', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${process.env.VOYAGE_API_KEY}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ input: text, model: 'voyage-3' }),
    });
    const { data } = await embeddingResponse.json();
    const embedding = data[0].embedding;

    // Store in Pinecone for semantic search
    await index.upsert([{
      id: item.PK.S + '#' + item.SK.S,
      values: embedding,
      metadata: {
        title: item.title?.S,
        createdAt: item.createdAt?.S,
      },
    }]);
  }
};

DynamoDB with AWS AppSync (Real-time AI Responses)

DynamoDB integrates natively with AWS AppSync for real-time GraphQL subscriptions. As AI generates responses token by token, you can stream them via DynamoDB → Streams → AppSync subscriptions to the frontend.

PartiQL: SQL-Like Queries

DynamoDB supports PartiQL, a SQL-compatible query language, as an alternative to the expression-based API:

javascript
import { ExecuteStatementCommand } from '@aws-sdk/client-dynamodb';

const result = await client.send(new ExecuteStatementCommand({
  Statement: "SELECT * FROM Users WHERE email = 'alice@example.com'",
}));

This is more readable for simple lookups but doesn't change DynamoDB's underlying constraints — you still need to query by primary key or use an index.

Example

javascript
// DynamoDB + Lambda: auto-generate AI summary on insert
import Anthropic from '@anthropic-ai/sdk';
import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';

const anthropic = new Anthropic();

export const handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventName !== 'INSERT') continue;

    const item = record.dynamodb.NewImage;
    if (!item.body?.S) continue;

    const summary = await anthropic.messages.create({
      model: 'claude-haiku-4-5',
      max_tokens: 150,
      messages: [{
        role: 'user',
        content: `Summarize in one sentence: ${item.body.S}`
      }]
    });

    await docClient.send(new UpdateCommand({
      TableName: 'MyApp',
      Key: { PK: item.PK.S, SK: item.SK.S },
      UpdateExpression: 'SET aiSummary = :s',
      ExpressionAttributeValues: { ':s': summary.content[0].text },
    }));
  }
};

Want to run this code interactively?

Try in Compiler