Building RAG

Full RAG Pipeline

Put it all together — build a complete, production-ready RAG system.

Complete RAG System

A production RAG pipeline consists of:

Ingestion Pipeline (offline):

  1. Load documents (PDF, web, database)
  2. Chunk documents
  3. Generate embeddings
  4. Store in vector database

Query Pipeline (online):

  1. Receive user question
  2. Embed the question
  3. Retrieve top-k similar chunks
  4. Rerank results (optional)
  5. Generate answer with retrieved context

Advanced Techniques

  • HyDE (Hypothetical Document Embeddings): Generate a hypothetical answer, then search
  • Multi-query: Generate multiple query variations for better coverage
  • Reranking: Use a cross-encoder to rerank initial results
  • Contextual compression: Extract only relevant parts of retrieved chunks

Example

python
import anthropic
import chromadb
from chromadb.utils import embedding_functions

class RAGSystem:
    def __init__(self):
        self.anthropic = anthropic.Anthropic()
        self.chroma = chromadb.Client()
        self.embed_fn = embedding_functions.DefaultEmbeddingFunction()

        self.collection = self.chroma.create_collection(
            "knowledge_base",
            embedding_function=self.embed_fn,
            get_or_create=True
        )

    def ingest(self, documents: list[dict]):
        """Add documents to the knowledge base."""
        texts = [doc['content'] for doc in documents]
        ids = [doc['id'] for doc in documents]
        metadatas = [doc.get('metadata', {}) for doc in documents]

        self.collection.upsert(
            documents=texts,
            ids=ids,
            metadatas=metadatas
        )
        print(f"Ingested {len(documents)} documents")

    def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        """Retrieve relevant documents for a query."""
        results = self.collection.query(
            query_texts=[query],
            n_results=top_k,
            include=["documents", "distances", "metadatas"]
        )

        retrieved = []
        for doc, dist, meta in zip(
            results['documents'][0],
            results['distances'][0],
            results['metadatas'][0]
        ):
            retrieved.append({
                "content": doc,
                "distance": dist,
                "metadata": meta,
                "relevance_score": 1 - dist  # convert distance to score
            })

        return retrieved

    def generate_answer(self, question: str, context_docs: list[dict]) -> str:
        """Generate an answer using retrieved context."""
        context = "\n\n".join([
            f"[Source: {doc['metadata'].get('source', 'unknown')}]\n{doc['content']}"
            for doc in context_docs
        ])

        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            system="""You are a helpful assistant. Answer questions based ONLY on the provided context.
If the context doesn't contain enough information, say so clearly.
Always cite which sources you used.""",
            messages=[{
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {question}"
            }]
        )
        return response.content[0].text

    def query(self, question: str) -> dict:
        """Full RAG query pipeline."""
        # 1. Retrieve relevant documents
        docs = self.retrieve(question)

        # 2. Filter by relevance (optional)
        relevant_docs = [d for d in docs if d['relevance_score'] > 0.3]

        # 3. Generate answer
        answer = self.generate_answer(question, relevant_docs or docs[:3])

        return {
            "question": question,
            "answer": answer,
            "sources": [d['metadata'] for d in relevant_docs],
            "num_sources": len(relevant_docs)
        }
Try it yourself — PYTHON