LogoTRUONG PHAM
Home
Projects
Blogs
YouTube
Contact

Newsletter

Stay updated with technical artifacts and engineering insights.

LogoTRUONG PHAM

Building scalable software and sharing insights on technology & life.

Sitemap

  • Home
  • Projects
  • Blogs
  • YouTube
  • Contact

Connect

  • GitHub
  • LinkedIn
  • Email
  • YouTube

© 2024 TRUONG PHAM. © All rights reserved.

Privacy PolicyTerms of Service
Back
RAG in Production [P4]: Backend Implementation - Building the Engine with FastAPI & LangChain
RAG in Production — The Journey of Building a Real-world AI System

RAG in Production [P4]: Backend Implementation - Building the Engine with FastAPI & LangChain

Deep dive into the source code of a real-world RAG backend. We will implement ingestion and query pipelines using FastAPI, LangChain, and Celery.

TP
Truong PhamSoftware Engineer
PublishedApril 2, 2024
Stack
FastAPI ·LangChain ·Python ·Backend

"Code is poetry when it's clean, but in AI, it's more like a messy laboratory. Our job is to turn that laboratory into a factory." In this post, we will build the core engine of our RAG system using Python's most powerful tools.*


Table of Contents

  1. Project Structure
  2. Dependency Injection & Configuration
  3. The Core Ingestion Pipeline
  4. The Query Intelligence Pipeline
  5. Implementing Streaming Responses (SSE)
  6. Asynchronous Tasks with Celery
  7. Testing Strategy
  8. Conclusion & Next Post

Project Structure

A production-ready project needs a clear separation of layers. We followed a variation of the Clean Architecture:

rag-backend/
├── app/
│   ├── api/                # FastAPI routes
│   │   ├── v1/
│   │   │   ├── ingestion.py
│   │   │   └── query.py
│   ├── core/               # App configuration & security
│   ├── models/             # Pydantic models (DTOs)
│   ├── services/           # Business logic (The "Brain")
│   │   ├── embedding.py
│   │   ├── vector_store.py
│   │   └── llm.py
│   ├── workers/            # Celery tasks for async background jobs
│   └── main.py             # App entry point
├── tests/                  # Pytest suite
├── .env.example            # Environment variables template
├── pyproject.toml          # Dependencies (Poetry)
└── Dockerfile

Dependency Injection & Configuration

We use pydantic-settings to manage configurations. This ensures type safety and easy environment variable loading.

# app/core/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    OPENAI_API_KEY: str
    QDRANT_URL: str = "http://localhost:6333"
    QDRANT_API_KEY: str | None = None
    
    EMBEDDING_MODEL: str = "text-embedding-3-small"
    LLM_MODEL: str = "gpt-4-turbo"
    
    MAX_RETRIEVAL_RESULTS: int = 5
    
    class Config:
        env_file = ".env"

settings = Settings()

The Core Ingestion Pipeline

The goal of this pipeline is to take raw documents and store them as vectors. We use LangChain for its rich set of document loaders and splitters.

# app/services/ingestion.py
from langchain_community.document_loaders import PyPDFLoader, ConfluenceLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from app.services.vector_store import VectorStoreService

class IngestionService:
    def __init__(self, vector_store: VectorStoreService):
        self.vector_store = vector_store
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=800,
            chunk_overlap=150,
            separators=["\n\n", "\n", " ", ""]
        )

    async def process_pdf(self, file_path: str, metadata: dict):
        # 1. Load document
        loader = PyPDFLoader(file_path)
        docs = loader.load()
        
        # 2. Split into chunks
        chunks = self.text_splitter.split_documents(docs)
        
        # 3. Add metadata (department, source, etc.)
        for chunk in chunks:
            chunk.metadata.update(metadata)
            
        # 4. Upload to Vector DB
        await self.vector_store.upsert_documents(chunks)
        
        return len(chunks)

The Query Intelligence Pipeline

This is where we implement the Retrieve → Augment → Generate logic.

# app/services/query.py
from app.services.vector_store import VectorStoreService
from app.services.llm import LLMService

class QueryService:
    def __init__(self, vector_store: VectorStoreService, llm: LLMService):
        self.vector_store = vector_store
        self.llm = llm

    async def answer_question(self, query: str, user_context: dict):
        # 1. Semantic Retrieval with Security Filtering
        # We only retrieve documents the user is allowed to see
        context_docs = await self.vector_store.search(
            query=query, 
            filter_metadata={"department_id": user_context["department_id"]}
        )
        
        # 2. Augment Prompt
        context_text = "\n\n".join([doc.page_content for doc in context_docs])
        sources = [doc.metadata.get("source_url") for doc in context_docs]
        
        prompt = self._build_prompt(query, context_text)
        
        # 3. Generate Answer
        answer = await self.llm.generate(prompt)
        
        return {
            "answer": answer,
            "sources": list(set(sources))
        }

    def _build_prompt(self, query: str, context: str) -> str:
        return f"""
        You are a professional assistant. Answer based only on the provided context.
        If the answer is not in the context, say you don't know.
        
        ---
        CONTEXT:
        {context}
        ---
        QUESTION: {query}
        """

Implementing Streaming Responses (SSE)

In production, waiting 5–10 seconds for a full answer feels slow. Users prefer seeing words appear instantly. We use Server-Sent Events (SSE).

# app/api/v1/query.py
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.services.query import QueryService

router = APIRouter()

@router.get("/ask-stream")
async def ask_stream(query: str, service: QueryService = Depends()):
    async def event_generator():
        async for chunk in service.answer_question_stream(query):
            # Format according to SSE standard
            yield f"data: {chunk}\n\n"
            
    return StreamingResponse(event_generator(), media_type="text/event-stream")

Asynchronous Tasks with Celery

Ingesting large PDF files or thousands of Confluence pages can take minutes. We shouldn't block the API.

# app/workers/tasks.py
from celery import Celery
from app.services.ingestion import IngestionService

celery_app = Celery("rag_workers", broker="redis://localhost:6379/0")

@celery_app.task(name="process_heavy_ingestion")
def process_heavy_ingestion_task(source_type: str, source_id: str):
    # This runs in the background
    service = IngestionService(...)
    service.run_sync_from_source(source_type, source_id)

Testing Strategy

Testing AI systems is hard because output is non-deterministic. We use a 3-layer approach:

  1. Unit Tests: Mock LLM and Vector DB to test the pipeline logic.
  2. Integration Tests: Run a real Qdrant in a Docker container (using testcontainers).
  3. Evaluation Tests: Use RAGAS to measure faithfulness and relevance of generated answers against a ground truth dataset.
# tests/test_ingestion.py
import pytest
from unittest.mock import MagicMock

def test_chunking_logic():
    service = IngestionService(vector_store=MagicMock())
    text = "A very long document..." * 100
    chunks = service.text_splitter.split_text(text)
    
    assert len(chunks) > 1
    assert all(len(c) <= 800 for c in chunks)

Conclusion & Next Post

We've laid out the technical foundation of our RAG system. Building a backend with FastAPI gives us high performance, while LangChain streamlines the complex AI workflows.

3 Key Takeaways:

  1. Layered architecture makes the system testable and maintainable.
  2. Streaming via SSE is critical for a good user experience.
  3. Background workers are necessary for heavy data processing.

👉 Next Post: [Post 05] Vector Database Deep Dive - Optimizing Qdrant for Scale

In the next post, we will leave the Python code for a bit and dive into the world of Vector Databases. How does HNSW indexing work? How to choose the right distance metric (Cosine vs Euclidean)? And how to tune Qdrant for sub-second retrieval over millions of vectors?


📬 Have you faced issues with streaming LLM responses? Let's discuss in the comments!


Author: [Your Name] Series: RAG in Production — The Journey of Building a Real-world AI System Tags: FastAPI LangChain Python RAG Backend Development

Series • Part 4 of 11

RAG in Production — The Journey of Building a Real-world AI System

NextRAG in Production [P5]: Vector Database Design - Optimizing Qdrant for Scale
RAG in Production [P3]: Architecture Design - Blueprint for an Enterprise RAG System
01RAG in Production [P1]: Real-world Problem - When Does a Business Actually Need AI?02RAG in Production [P2]: What is RAG? Why not Fine-tuning or Prompt Engineering?03RAG in Production [P3]: Architecture Design - Blueprint for an Enterprise RAG System04RAG in Production [P4]: Backend Implementation - Building the Engine with FastAPI & LangChainReading05RAG in Production [P5]: Vector Database Design - Optimizing Qdrant for Scale06RAG in Production [P6]: LLM Inference Deployment - Scalability with vLLM & Kubernetes07RAG in Production [P7]: DevOps & GitOps - Orchestrating the RAG Ecosystem08RAG in Production [P8]: Monitoring & Optimization - Keeping an Eye on Your AI09RAG in Production [P9]: Security & Privacy - Protecting Your Enterprise Data10RAG in Production [P10]: Future Improvements - Agentic RAG, GraphRAG & Beyond11RAG in Production [P11]: Lessons Learned - 15 Hard Truths About RAG in Production
TP

Written by Truong Pham

Software Engineer passionate about building high-performance systems and meaningful experiences.

Read more articles