AI & Production
Cassandra for AI and ML Workloads
How Cassandra powers large-scale AI feature stores, event streaming for ML pipelines, and real-time recommendation systems.
Cassandra's Role in AI Infrastructure
As AI applications scale, the data infrastructure supporting them must scale too. Cassandra has become a cornerstone of enterprise AI infrastructure for several reasons:
Feature Stores
Machine learning models need features — computed attributes of entities (users, products, sessions) — served with low latency at prediction time. A feature store is the infrastructure layer that computes, stores, and serves these features.
Cassandra is widely used as the online (low-latency serving) layer of feature stores because:
- Sub-millisecond point lookups by entity ID
- Handles millions of writes per second from feature computation pipelines
- High availability means models never go without features
Feast, the most popular open-source feature store, uses Cassandra as an online store.
# Cassandra as a feature store (conceptual)
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'])
session = cluster.connect('ml_features')
# Write features computed by batch pipeline
session.execute("""
INSERT INTO user_features (user_id, feature_time, avg_purchase_value,
purchase_frequency, last_category)
VALUES (%s, %s, %s, %s, %s)
""", (user_id, datetime.utcnow(), avg_value, frequency, category))
# Serve features at prediction time (< 1ms)
row = session.execute(
"SELECT * FROM user_features WHERE user_id = %s", (user_id,)
).one()
features = [row.avg_purchase_value, row.purchase_frequency]
prediction = model.predict([features])Event Streaming for ML Pipelines
Modern ML pipelines consume streams of events — clicks, purchases, searches — to train and update models continuously. Cassandra can serve as a high-throughput event log:
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
import uuid
from datetime import datetime
cluster = Cluster(
['cassandra-1', 'cassandra-2', 'cassandra-3'],
load_balancing_policy=DCAwareRoundRobinPolicy(local_dc='us-east')
)
session = cluster.connect('events')
# Log user event
def log_event(user_id, event_type, properties):
session.execute_async("""
INSERT INTO user_events
(user_id, event_id, event_time, event_type, properties)
VALUES (%s, %s, %s, %s, %s)
""", (user_id, uuid.uuid4(), datetime.utcnow(),
event_type, str(properties)))
# Non-blocking: fire and forget
log_event('user-123', 'product_view', {'product_id': 'sku-456'})DataStax Astra DB and Vector Search
DataStax (the primary commercial Cassandra provider) has extended Cassandra with vector search capabilities in Astra DB, enabling semantic search over embedded data alongside the traditional high-write use cases.
# Astra DB with vector search (DataStax extension)
from astrapy import DataAPIClient
client = DataAPIClient("AstraCS:...")
db = client.get_database("https://your-db.apps.astra.datastax.com")
collection = db.get_collection("products")
# Insert with embedding
collection.insert_one({
"name": "Wireless Headphones",
"description": "Premium noise-cancelling...",
"$vector": [0.12, -0.34, 0.56, ...] # embedding from AI model
})
# Vector similarity search
results = collection.find(
{},
sort={"$vector": query_embedding},
limit=10
)DataStax Astra DB: Cassandra as a Service
For teams that want Cassandra's scalability without managing the infrastructure, DataStax Astra DB is the leading managed option. It provides:
- Serverless Cassandra in the cloud (AWS, GCP, Azure)
- Built-in vector search for AI applications
- A Data API (REST/JSON) so you don't need to write CQL
- A generous free tier for development
Example
# Python driver (cassandra-driver)
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
auth = PlainTextAuthProvider('username', 'password')
cluster = Cluster(['localhost'], auth_provider=auth)
session = cluster.connect('myapp')
# Prepared statement (prevents injection, faster execution)
insert_stmt = session.prepare("""
INSERT INTO events (user_id, ts, action)
VALUES (?, ?, ?)
""")
# Batch insert
from cassandra.query import BatchStatement
batch = BatchStatement()
for event in events:
batch.add(insert_stmt, (event.user_id, event.ts, event.action))
session.execute(batch)
cluster.shutdown()Want to run this code interactively?
Try in Compiler