If you’ve built an LLM demo and watched it embarrass you the moment a real user asks a real question, you already know the gap between “it works on my data” and “it works in production.” This post closes that gap.
We’ll build a production-shaped RAG backend end-to-end:
- PostgreSQL + pgvector for vector storage
- FastAPI for the API
- Real chunking, real embeddings, real hybrid (vector + BM25 / full-text) retrieval
- Citation-aware prompt assembly
- The parts every other tutorial skips: indexing, dimension drift, eval, and cost
By the end you’ll have a service you can actually deploy, not a notebook.
Prefer the long-form code? Full project on my portfolio at rajpoot.dev .
Why pgvector (and why now)
In 2026 the dedicated-vector-DB hype has cooled. Most teams running fewer than ~50M vectors are happiest on Postgres + pgvector because:
- One database, one backup story, one ACL model.
- You can
JOINbetween embeddings and your business tables. - HNSW indexes in pgvector are competitive with Pinecone/Weaviate at small to mid scale.
- It’s the lowest-ops path that still gives you grown-up performance.
If you cross ~50–100M vectors, revisit. Until then: pgvector wins.
The architecture
┌──────────────┐ ingest ┌──────────────┐
│ Documents │ ───────────▶ │ Chunker │
└──────────────┘ └──────┬───────┘
│
▼
┌──────────────┐
│ Embedder │ ── OpenAI / Voyage / Cohere
└──────┬───────┘
│
▼
┌──────────────┐
│ Postgres + │
│ pgvector │
└──────────────┘
▲
│ retrieve
┌────────────┐ /ask ┌────────┴───────┐ prompt ┌──────────────┐
│ Client │ ───────────▶ │ FastAPI │ ───────────▶ │ LLM │
└────────────┘ └────────────────┘ └──────────────┘
Three subsystems: ingest, retrieve, generate. Each one fails differently. Building them as separate, testable units is the difference between a toy and a product.
1. Postgres setup
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- for trigram + ILIKE acceleration
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
source TEXT NOT NULL,
title TEXT,
url TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
ord INT NOT NULL, -- position within document
content TEXT NOT NULL,
tokens INT NOT NULL,
embedding vector(1536), -- text-embedding-3-small
tsv tsvector
GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
created_at TIMESTAMPTZ DEFAULT now()
);
-- HNSW vector index — ANN, fast, in-memory
CREATE INDEX chunks_embedding_hnsw
ON chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Lexical index for hybrid search
CREATE INDEX chunks_tsv_gin ON chunks USING GIN (tsv);
CREATE INDEX chunks_doc_id ON chunks (document_id);
A few production notes most tutorials skip:
- Dimension is locked at table creation. If you switch from
text-embedding-3-small(1536) to a 3072-dim model, you’ll need a new column or table. Plan for it. - HNSW > IVF for almost all RAG workloads — better recall, no training step, supports updates without retrain.
- Generated
tsvcolumn lets you do hybrid search without re-tokenizing on every insert.
2. Chunking that doesn’t sabotage retrieval
Chunking is where 80% of RAG quality is decided. Bad chunking → bad retrieval → no prompt rewriting will save you.
# app/chunking.py
from __future__ import annotations
import re
from dataclasses import dataclass
import tiktoken
ENC = tiktoken.encoding_for_model("text-embedding-3-small")
@dataclass(frozen=True)
class Chunk:
text: str
tokens: int
ord: int
def chunk_markdown(
text: str,
target_tokens: int = 400,
overlap_tokens: int = 60,
) -> list[Chunk]:
"""Recursive-character chunking with token-aware sizing.
1. Split on headings (## / ###) — preserves topical boundaries.
2. Inside each section, split on paragraphs.
3. Pack paragraphs until target_tokens; carry overlap to next chunk.
"""
sections = re.split(r"(?m)^(?=#{2,3} )", text)
out: list[Chunk] = []
ord_ = 0
buf: list[str] = []
buf_tokens = 0
def flush():
nonlocal ord_, buf, buf_tokens
if not buf:
return
body = "\n\n".join(buf).strip()
if body:
out.append(Chunk(body, buf_tokens, ord_))
ord_ += 1
buf, buf_tokens = [], 0
for section in sections:
for para in re.split(r"\n{2,}", section):
tokens = len(ENC.encode(para))
if buf_tokens + tokens > target_tokens and buf:
flush()
# carry overlap from previous chunk
if overlap_tokens and out:
tail = ENC.decode(ENC.encode(out[-1].text)[-overlap_tokens:])
buf.append(tail)
buf_tokens = overlap_tokens
buf.append(para)
buf_tokens += tokens
flush()
return out
Why this and not “split on 1000 chars”?
- Headings are semantic landmarks. Don’t cut across them.
- Paragraph boundaries are how humans wrote the source — respect them.
- Token count, not character count. Embeddings see tokens, your bills bill tokens.
- Overlap rescues retrieval when an answer straddles two chunks.
3. Embedder with batching and retries
# app/embed.py
from __future__ import annotations
import os
from typing import Iterable
import httpx
OPENAI_KEY = os.environ["OPENAI_API_KEY"]
MODEL = "text-embedding-3-small"
BATCH = 96 # OpenAI accepts up to 2048; 96 is friendlier on retries
async def embed_texts(client: httpx.AsyncClient, texts: list[str]) -> list[list[float]]:
out: list[list[float]] = []
for i in range(0, len(texts), BATCH):
chunk = texts[i : i + BATCH]
resp = await client.post(
"https://api.openai.com/v1/embeddings",
headers={"Authorization": f"Bearer {OPENAI_KEY}"},
json={"model": MODEL, "input": chunk},
timeout=30.0,
)
resp.raise_for_status()
out.extend([d["embedding"] for d in resp.json()["data"]])
return out
Production additions you’ll want soon:
- Retry with exponential backoff on 429/5xx (use
tenacity). - Idempotency: store a content hash with each chunk so re-ingestion skips unchanged text.
- Batched DB writes with
COPYorexecutemany— embedding 100k chunks with single inserts is unusably slow.
4. Hybrid retrieval (vector + BM25)
Pure vector search loses to hybrid search on every benchmark I’ve ever run. Reciprocal Rank Fusion (RRF) is the standard combiner — simple, robust, no tuning:
# app/retrieve.py
from __future__ import annotations
import asyncpg
K = 8 # final results
VECTOR_K = 30
LEXICAL_K = 30
RRF_C = 60 # standard RRF constant
async def retrieve(
pool: asyncpg.Pool,
query: str,
query_embedding: list[float],
) -> list[dict]:
async with pool.acquire() as conn:
# Vector search — cosine distance; lower is better.
vec = await conn.fetch(
"""
SELECT id, document_id, content, ord,
1 - (embedding <=> $1) AS score
FROM chunks
ORDER BY embedding <=> $1
LIMIT $2
""",
query_embedding, VECTOR_K,
)
# Lexical search — websearch_to_tsquery handles phrases & operators.
lex = await conn.fetch(
"""
SELECT id, document_id, content, ord,
ts_rank(tsv, websearch_to_tsquery('english', $1)) AS score
FROM chunks
WHERE tsv @@ websearch_to_tsquery('english', $1)
ORDER BY score DESC
LIMIT $2
""",
query, LEXICAL_K,
)
# RRF fusion
ranks: dict[int, float] = {}
for rank, row in enumerate(vec, start=1):
ranks[row["id"]] = ranks.get(row["id"], 0.0) + 1.0 / (RRF_C + rank)
for rank, row in enumerate(lex, start=1):
ranks[row["id"]] = ranks.get(row["id"], 0.0) + 1.0 / (RRF_C + rank)
by_id = {r["id"]: dict(r) for r in (*vec, *lex)}
fused = sorted(by_id.values(), key=lambda r: ranks[r["id"]], reverse=True)
return fused[:K]
Why this beats vector-only:
- Vector search wins on paraphrase / semantic similarity.
- Lexical search wins on exact terms, codes, IDs, names.
- RRF gives you both without tuning weights.
5. Prompt assembly with citations
# app/prompt.py
SYSTEM = """You answer strictly from the provided context.
If the context doesn't contain the answer, say "I don't have that in the docs."
Always cite sources as [1], [2], ... matching the numbered chunks below."""
def build_prompt(question: str, chunks: list[dict]) -> list[dict]:
ctx = "\n\n".join(f"[{i+1}] {c['content']}" for i, c in enumerate(chunks))
return [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": f"Context:\n{ctx}\n\nQuestion: {question}"},
]
Critical detail: make the model cite by index, then look up the document URLs server-side. Models hallucinate URLs; they don’t hallucinate [3].
6. The FastAPI service
# app/main.py
from __future__ import annotations
from contextlib import asynccontextmanager
import asyncpg, httpx
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .embed import embed_texts
from .retrieve import retrieve
from .prompt import build_prompt
class AskIn(BaseModel):
question: str
class Citation(BaseModel):
index: int
document_id: int
snippet: str
class AskOut(BaseModel):
answer: str
citations: list[Citation]
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = await asyncpg.create_pool(dsn=DATABASE_URL, min_size=2, max_size=10)
app.state.http = httpx.AsyncClient(timeout=30.0)
yield
await app.state.pool.close()
await app.state.http.aclose()
app = FastAPI(lifespan=lifespan)
@app.post("/ask", response_model=AskOut)
async def ask(payload: AskIn):
if not payload.question.strip():
raise HTTPException(400, "empty question")
[q_emb] = await embed_texts(app.state.http, [payload.question])
chunks = await retrieve(app.state.pool, payload.question, q_emb)
if not chunks:
return AskOut(answer="I don't have that in the docs.", citations=[])
messages = build_prompt(payload.question, chunks)
resp = await app.state.http.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENAI_KEY}"},
json={"model": "gpt-4o-mini", "messages": messages, "temperature": 0.1},
)
resp.raise_for_status()
answer = resp.json()["choices"][0]["message"]["content"]
citations = [
Citation(index=i + 1, document_id=c["document_id"], snippet=c["content"][:240])
for i, c in enumerate(chunks)
]
return AskOut(answer=answer, citations=citations)
Notes:
lifespankeeps the connection pool and HTTP client across requests. Don’t open them per request.temperature=0.1for grounded answers. Higher creativity makes hallucination more likely.- Bound your context. With 8 chunks × ~400 tokens ≈ 3.2k context tokens — comfortable for most current models.
7. The parts most tutorials skip
Re-ingestion idempotency
ALTER TABLE chunks ADD COLUMN content_hash TEXT;
CREATE UNIQUE INDEX chunks_doc_hash ON chunks (document_id, content_hash);
Compute sha256(content) on insert. Skip if exists. This is what lets your nightly ingest job actually be nightly.
Tuning HNSW for recall
-- per-session knob; bigger = better recall, slower query
SET hnsw.ef_search = 100;
I’ve found ef_search = 100 is a good default for k = 30 retrieval. Go higher if recall suffers, lower if latency hurts.
Eval, not vibes
Pick a starter eval set early — even 30 hand-curated (question, expected_chunk_ids) pairs. Run it on every deploy. RAG quality silently regresses when you upgrade the embedding model or change chunking. Catch it before users do.
I’ll write a dedicated eval post next; for now, the rule is: if you can’t measure it, you can’t improve it.
Cost
For 1M chunks × text-embedding-3-small (~$0.02/1M tokens):
- ~400 tokens/chunk × 1M = 400M tokens → $8 one-time.
- Per query: 1 query embed + ~3k input tokens to GPT-4o-mini ≈ $0.0005.
Hybrid search costs you Postgres CPU, not API dollars. Lean into it.
What’s next
This is a backend that actually works in production. Things to add as you grow:
- A reranker (Cohere Rerank or BGE) on the top 30 fused candidates → top 8.
- Conversational rewrites — turn follow-up questions into standalone queries before retrieval.
- Per-tenant filters with Postgres RLS so multi-tenant data never leaks.
- Streaming responses with FastAPI’s
StreamingResponseand SSE. - Evaluations on every deploy — see my next post on RAG eval (coming soon).
If you want a worked-out project to clone, the full repo for this lives on rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .