AI & Production

Cassandra for AI and ML Workloads

How Cassandra powers large-scale AI feature stores, event streaming for ML pipelines, and real-time recommendation systems.

Cassandra's Role in AI Infrastructure

As AI applications scale, the data infrastructure supporting them must scale too. Cassandra has become a cornerstone of enterprise AI infrastructure for several reasons:

Feature Stores

Machine learning models need features — computed attributes of entities (users, products, sessions) — served with low latency at prediction time. A feature store is the infrastructure layer that computes, stores, and serves these features.

Cassandra is widely used as the online (low-latency serving) layer of feature stores because:

  • Sub-millisecond point lookups by entity ID
  • Handles millions of writes per second from feature computation pipelines
  • High availability means models never go without features

Feast, the most popular open-source feature store, uses Cassandra as an online store.

python
# Cassandra as a feature store (conceptual)
from cassandra.cluster import Cluster

cluster = Cluster(['localhost'])
session = cluster.connect('ml_features')

# Write features computed by batch pipeline
session.execute("""
  INSERT INTO user_features (user_id, feature_time, avg_purchase_value,
    purchase_frequency, last_category)
  VALUES (%s, %s, %s, %s, %s)
""", (user_id, datetime.utcnow(), avg_value, frequency, category))

# Serve features at prediction time (< 1ms)
row = session.execute(
  "SELECT * FROM user_features WHERE user_id = %s", (user_id,)
).one()
features = [row.avg_purchase_value, row.purchase_frequency]
prediction = model.predict([features])

Event Streaming for ML Pipelines

Modern ML pipelines consume streams of events — clicks, purchases, searches — to train and update models continuously. Cassandra can serve as a high-throughput event log:

python
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
import uuid
from datetime import datetime

cluster = Cluster(
  ['cassandra-1', 'cassandra-2', 'cassandra-3'],
  load_balancing_policy=DCAwareRoundRobinPolicy(local_dc='us-east')
)
session = cluster.connect('events')

# Log user event
def log_event(user_id, event_type, properties):
  session.execute_async("""
    INSERT INTO user_events
    (user_id, event_id, event_time, event_type, properties)
    VALUES (%s, %s, %s, %s, %s)
  """, (user_id, uuid.uuid4(), datetime.utcnow(),
        event_type, str(properties)))

# Non-blocking: fire and forget
log_event('user-123', 'product_view', {'product_id': 'sku-456'})

DataStax Astra DB and Vector Search

DataStax (the primary commercial Cassandra provider) has extended Cassandra with vector search capabilities in Astra DB, enabling semantic search over embedded data alongside the traditional high-write use cases.

python
# Astra DB with vector search (DataStax extension)
from astrapy import DataAPIClient

client = DataAPIClient("AstraCS:...")
db = client.get_database("https://your-db.apps.astra.datastax.com")

collection = db.get_collection("products")

# Insert with embedding
collection.insert_one({
  "name": "Wireless Headphones",
  "description": "Premium noise-cancelling...",
  "$vector": [0.12, -0.34, 0.56, ...]  # embedding from AI model
})

# Vector similarity search
results = collection.find(
  {},
  sort={"$vector": query_embedding},
  limit=10
)

DataStax Astra DB: Cassandra as a Service

For teams that want Cassandra's scalability without managing the infrastructure, DataStax Astra DB is the leading managed option. It provides:

  • Serverless Cassandra in the cloud (AWS, GCP, Azure)
  • Built-in vector search for AI applications
  • A Data API (REST/JSON) so you don't need to write CQL
  • A generous free tier for development

Example

python
# Python driver (cassandra-driver)
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth = PlainTextAuthProvider('username', 'password')
cluster = Cluster(['localhost'], auth_provider=auth)
session = cluster.connect('myapp')

# Prepared statement (prevents injection, faster execution)
insert_stmt = session.prepare("""
  INSERT INTO events (user_id, ts, action)
  VALUES (?, ?, ?)
""")

# Batch insert
from cassandra.query import BatchStatement
batch = BatchStatement()
for event in events:
    batch.add(insert_stmt, (event.user_id, event.ts, event.action))

session.execute(batch)
cluster.shutdown()

Want to run this code interactively?

Try in Compiler