RAG in Production [P4]: Backend Implementation - Building the Engine with FastAPI & LangChain
Deep dive into the source code of a real-world RAG backend. We will implement ingestion and query pipelines using FastAPI, LangChain, and Celery.
"Code is poetry when it's clean, but in AI, it's more like a messy laboratory. Our job is to turn that laboratory into a factory." In this post, we will build the core engine of our RAG system using Python's most powerful tools.*
Table of Contents
- Project Structure
- Dependency Injection & Configuration
- The Core Ingestion Pipeline
- The Query Intelligence Pipeline
- Implementing Streaming Responses (SSE)
- Asynchronous Tasks with Celery
- Testing Strategy
- Conclusion & Next Post
Project Structure
A production-ready project needs a clear separation of layers. We followed a variation of the Clean Architecture:
rag-backend/
├── app/
│ ├── api/ # FastAPI routes
│ │ ├── v1/
│ │ │ ├── ingestion.py
│ │ │ └── query.py
│ ├── core/ # App configuration & security
│ ├── models/ # Pydantic models (DTOs)
│ ├── services/ # Business logic (The "Brain")
│ │ ├── embedding.py
│ │ ├── vector_store.py
│ │ └── llm.py
│ ├── workers/ # Celery tasks for async background jobs
│ └── main.py # App entry point
├── tests/ # Pytest suite
├── .env.example # Environment variables template
├── pyproject.toml # Dependencies (Poetry)
└── Dockerfile
Dependency Injection & Configuration
We use pydantic-settings to manage configurations. This ensures type safety and easy environment variable loading.
# app/core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
OPENAI_API_KEY: str
QDRANT_URL: str = "http://localhost:6333"
QDRANT_API_KEY: str | None = None
EMBEDDING_MODEL: str = "text-embedding-3-small"
LLM_MODEL: str = "gpt-4-turbo"
MAX_RETRIEVAL_RESULTS: int = 5
class Config:
env_file = ".env"
settings = Settings()
The Core Ingestion Pipeline
The goal of this pipeline is to take raw documents and store them as vectors. We use LangChain for its rich set of document loaders and splitters.
# app/services/ingestion.py
from langchain_community.document_loaders import PyPDFLoader, ConfluenceLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from app.services.vector_store import VectorStoreService
class IngestionService:
def __init__(self, vector_store: VectorStoreService):
self.vector_store = vector_store
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=150,
separators=["\n\n", "\n", " ", ""]
)
async def process_pdf(self, file_path: str, metadata: dict):
# 1. Load document
loader = PyPDFLoader(file_path)
docs = loader.load()
# 2. Split into chunks
chunks = self.text_splitter.split_documents(docs)
# 3. Add metadata (department, source, etc.)
for chunk in chunks:
chunk.metadata.update(metadata)
# 4. Upload to Vector DB
await self.vector_store.upsert_documents(chunks)
return len(chunks)
The Query Intelligence Pipeline
This is where we implement the Retrieve → Augment → Generate logic.
# app/services/query.py
from app.services.vector_store import VectorStoreService
from app.services.llm import LLMService
class QueryService:
def __init__(self, vector_store: VectorStoreService, llm: LLMService):
self.vector_store = vector_store
self.llm = llm
async def answer_question(self, query: str, user_context: dict):
# 1. Semantic Retrieval with Security Filtering
# We only retrieve documents the user is allowed to see
context_docs = await self.vector_store.search(
query=query,
filter_metadata={"department_id": user_context["department_id"]}
)
# 2. Augment Prompt
context_text = "\n\n".join([doc.page_content for doc in context_docs])
sources = [doc.metadata.get("source_url") for doc in context_docs]
prompt = self._build_prompt(query, context_text)
# 3. Generate Answer
answer = await self.llm.generate(prompt)
return {
"answer": answer,
"sources": list(set(sources))
}
def _build_prompt(self, query: str, context: str) -> str:
return f"""
You are a professional assistant. Answer based only on the provided context.
If the answer is not in the context, say you don't know.
---
CONTEXT:
{context}
---
QUESTION: {query}
"""
Implementing Streaming Responses (SSE)
In production, waiting 5–10 seconds for a full answer feels slow. Users prefer seeing words appear instantly. We use Server-Sent Events (SSE).
# app/api/v1/query.py
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.services.query import QueryService
router = APIRouter()
@router.get("/ask-stream")
async def ask_stream(query: str, service: QueryService = Depends()):
async def event_generator():
async for chunk in service.answer_question_stream(query):
# Format according to SSE standard
yield f"data: {chunk}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Asynchronous Tasks with Celery
Ingesting large PDF files or thousands of Confluence pages can take minutes. We shouldn't block the API.
# app/workers/tasks.py
from celery import Celery
from app.services.ingestion import IngestionService
celery_app = Celery("rag_workers", broker="redis://localhost:6379/0")
@celery_app.task(name="process_heavy_ingestion")
def process_heavy_ingestion_task(source_type: str, source_id: str):
# This runs in the background
service = IngestionService(...)
service.run_sync_from_source(source_type, source_id)
Testing Strategy
Testing AI systems is hard because output is non-deterministic. We use a 3-layer approach:
- Unit Tests: Mock LLM and Vector DB to test the pipeline logic.
- Integration Tests: Run a real Qdrant in a Docker container (using
testcontainers). - Evaluation Tests: Use RAGAS to measure faithfulness and relevance of generated answers against a ground truth dataset.
# tests/test_ingestion.py
import pytest
from unittest.mock import MagicMock
def test_chunking_logic():
service = IngestionService(vector_store=MagicMock())
text = "A very long document..." * 100
chunks = service.text_splitter.split_text(text)
assert len(chunks) > 1
assert all(len(c) <= 800 for c in chunks)
Conclusion & Next Post
We've laid out the technical foundation of our RAG system. Building a backend with FastAPI gives us high performance, while LangChain streamlines the complex AI workflows.
3 Key Takeaways:
- Layered architecture makes the system testable and maintainable.
- Streaming via SSE is critical for a good user experience.
- Background workers are necessary for heavy data processing.
👉 Next Post: [Post 05] Vector Database Deep Dive - Optimizing Qdrant for Scale
In the next post, we will leave the Python code for a bit and dive into the world of Vector Databases. How does HNSW indexing work? How to choose the right distance metric (Cosine vs Euclidean)? And how to tune Qdrant for sub-second retrieval over millions of vectors?
📬 Have you faced issues with streaming LLM responses? Let's discuss in the comments!
Author: [Your Name]
Series: RAG in Production — The Journey of Building a Real-world AI System
Tags: FastAPI LangChain Python RAG Backend Development
Series • Part 4 of 11