MCP RAG Server with LangChain & ChromaDB

Build an MCP vector-database server with FastMCP, ChromaDB, and Ollama embeddings — ingest PDFs from a file, folder, or URL, then query it from a Streamlit agent.

Jun 17, 20268 min readFollow

Topics You Will Master

Building an MCP RAG server with FastMCP, ChromaDB, and Ollama nomic-embed-text
Ingesting PDFs from a file path, a folder, or a URL into a vector store
Exposing ingest, retrieve, db-info, and clear tools over MCP
Driving the server from a Streamlit agent with langchain-mcp-adapters

Retrieval-Augmented Generation (RAG) grounds an LLM in your own documents. In this lesson you expose a complete RAG pipeline as an MCP server: it ingests PDFs into ChromaDB using Ollama embeddings, and offers retrieval as tools any MCP client can call. A Streamlit chat app then uses those tools through a local LangChain agent.

The whole stack runs on your machine — FastMCP for the protocol, ChromaDB for storage, Ollama nomic-embed-text for embeddings, and PyPDF2 for text extraction.

Note

Prerequisites: Ollama running with nomic-embed-text and qwen3 pulled; uv add fastmcp chromadb langchain-chroma langchain-ollama langchain-mcp-adapters langgraph langchain streamlit pypdf2 requests. For RAG fundamentals see RAG — Chat with Your Own Documents and Vector Stores and Retrievals.

95% OFF

MCP Mastery: Build AI Apps with Claude, LangChain and Ollama

Build MCP servers and clients with Python, Streamlit, ChromaDB, LangChain, LangGraph agents, and Ollama — from your first tool to cloud deployment.

Enroll Now — 95% OFF →

Architecture

The ingestion pipeline: PDF to text to chunks to embeddings to ChromaDB

The data-processing flow is a classic indexing pipeline wrapped in MCP:

PYTHON
PDF (file / folder / URL)
  -> PyPDF2 text extraction
  -> RecursiveCharacterTextSplitter (chunks)
  -> Ollama nomic-embed-text (embeddings)
  -> ChromaDB collection "documents"

The server has six layers: an MCP layer (FastMCP routes tool calls), a processing layer (PDF extraction and chunking), an AI layer (Ollama embeddings), a storage layer (ChromaDB), a search layer (similarity search with scoring), and an integration layer (LangChain ties it together).


Configuration and static initialization

Create server.py. The configuration up top is the single place to tune the pipeline:

PYTHON
import os
import requests
from typing import List, Dict, Any
from pathlib import Path

from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from PyPDF2 import PdfReader
from fastmcp import FastMCP

current_dir = Path(__file__).parent
CHROMA_PATH = os.path.join(current_dir, "chroma_db")
EMBED_MODEL = "nomic-embed-text"
OLLAMA_BASE_URL = "http://localhost:11434"
CHUNK_SIZE = 4096
CHUNK_OVERLAP = CHUNK_SIZE // 10  # 10% overlap
COLLECTION_NAME = "documents"

mcp = FastMCP("langchain-vector-db")

The expensive objects — embeddings, splitter, and vector store — are created once at module load rather than lazily. This single-instance pattern is memory-efficient and gives every tool direct access to shared globals.

PYTHON
embeddings = OllamaEmbeddings(model=EMBED_MODEL, base_url=OLLAMA_BASE_URL)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    length_function=len,
    separators=["\n\n", "\n", " ", ""],
)

vectorstore = Chroma(
    persist_directory=CHROMA_PATH,
    embedding_function=embeddings,
    collection_name=COLLECTION_NAME,
)

Note

A 4096-character chunk with 10% overlap suits long technical PDFs. Smaller chunks improve precision but increase the number of vectors; tune CHUNK_SIZE to your documents.


Helper functions

Three helpers handle text extraction, single-PDF processing, and downloading a PDF from a URL.

PYTHON
def extract_text_from_pdf(pdf_path: str) -> str:
    """Extract text from a PDF using PyPDF2."""
    try:
        reader = PdfReader(pdf_path)
        text = ""
        for page in reader.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
        return text
    except Exception as e:
        print(f"Error reading PDF {pdf_path}: {e}")
        return ""


def process_single_pdf(pdf_path: str) -> int:
    """Process one PDF and add its chunks to the vector store."""
    text = extract_text_from_pdf(pdf_path)
    if not text:
        print(f"No text extracted from {pdf_path}")
        return 0

    doc = Document(
        page_content=text,
        metadata={"source": str(pdf_path), "filename": Path(pdf_path).name},
    )
    chunks = text_splitter.split_documents([doc])
    for i, chunk in enumerate(chunks):
        chunk.metadata["chunk_index"] = i
        chunk.metadata["total_chunks"] = len(chunks)

    ids = [f"{Path(pdf_path).stem}_chunk_{i}" for i in range(len(chunks))]
    vectorstore.add_documents(documents=chunks, ids=ids)
    return len(chunks)


def download_pdf(url: str, download_dir: str = "./downloads") -> str:
    """Download a PDF from a URL to a local folder."""
    os.makedirs(download_dir, exist_ok=True)
    filename = Path(url.split("?")[0]).name
    if not filename.endswith(".pdf"):
        filename = f"downloaded_{Path(url).stem}.pdf"
    local_path = os.path.join(download_dir, filename)

    response = requests.get(url, stream=True)
    response.raise_for_status()
    with open(local_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
    return local_path

The MCP tools

The RAG server's four tools: ingest, retrieve, db-info, and clear

ingest_pdf

One tool handles three source types — a URL, a folder of PDFs, or a single file — and reports how many chunks were added.

PYTHON
@mcp.tool()
async def ingest_pdf(source: str) -> Dict[str, Any]:
    """
    Ingest a PDF from a folder, file path, or URL.

    Args:
        source: a URL, a folder path (all PDFs), or a single PDF file path.
    Returns:
        Status and number of chunks added.
    """
    try:
        total_chunks = 0
        processed_files = []

        if source.startswith(("http://", "https://")):
            local_path = download_pdf(source)
            total_chunks += process_single_pdf(local_path)
            processed_files.append(local_path)

        elif os.path.isdir(source):
            for pdf_file in Path(source).glob("*.pdf"):
                total_chunks += process_single_pdf(str(pdf_file))
                processed_files.append(str(pdf_file))

        elif os.path.isfile(source) and source.endswith(".pdf"):
            total_chunks += process_single_pdf(source)
            processed_files.append(source)

        else:
            return {"status": "error",
                    "message": f"Invalid source: {source}. Must be a PDF file, folder, or URL."}

        return {"status": "success", "chunks_added": total_chunks,
                "files_processed": len(processed_files), "files": processed_files}
    except Exception as e:
        return {"status": "error", "message": str(e)}

retrieve

Returns the top-N most similar chunks, converting ChromaDB's distance into a similarity score.

A query embedded and matched to return the top-N most similar chunks

PYTHON
@mcp.tool()
async def retrieve(query: str, n: int = 5) -> List[Dict[str, Any]]:
    """Retrieve the top-N chunks for a query."""
    try:
        results = vectorstore.similarity_search_with_score(query, k=n)
        chunks = []
        for doc, score in results:
            chunks.append({
                "text": doc.page_content,
                "metadata": doc.metadata,
                "similarity_score": float(1 - score),
                "distance": float(score),
            })
        return chunks
    except Exception as e:
        return [{"error": str(e)}]

db_info and clear_db

Inspect and reset the collection.

PYTHON
@mcp.tool()
async def db_info() -> Dict[str, Any]:
    """Get ChromaDB collection statistics."""
    try:
        collection = vectorstore._collection
        count = collection.count()
        sources = set()
        if count > 0:
            sample = collection.get(limit=min(100, count), include=["metadatas"])
            for metadata in sample.get("metadatas", []) or []:
                if metadata and "source" in metadata:
                    sources.add(metadata["source"])
        return {
            "database_path": CHROMA_PATH,
            "collection_name": COLLECTION_NAME,
            "embedding_model": EMBED_MODEL,
            "total_chunks": count,
            "unique_sources": list(sources),
            "num_sources": len(sources),
        }
    except Exception as e:
        return {"error": str(e)}


@mcp.tool()
async def clear_db() -> Dict[str, Any]:
    """Clear all data from the database."""
    try:
        global vectorstore
        vectorstore.delete_collection()
        vectorstore = Chroma(
            persist_directory=CHROMA_PATH,
            embedding_function=embeddings,
            collection_name=COLLECTION_NAME,
        )
        return {"status": "success", "message": "Database cleared and reset"}
    except Exception as e:
        return {"status": "error", "message": str(e)}


if __name__ == "__main__":
    mcp.run(transport="stdio")

Caution

clear_db deletes the entire collection and cannot be undone. Expose destructive tools deliberately, and consider removing them from servers connected to a shared host.


A Streamlit client agent

A Streamlit agent calling the RAG server's tools over stdio

The client is a Streamlit chat app. It launches the server over stdio, converts the MCP tools into LangChain tools with langchain-mcp-adapters, and builds an agent that decides when to ingest or retrieve. Create app.py:

PYTHON
import streamlit as st
import asyncio
from langchain_ollama import ChatOllama
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_agent

# Update to the absolute path of your server.py
server_params = StdioServerParameters(
    command="uv",
    args=["--directory",
          "C:\\Users\\your-username\\projects\\mcp-course\\08 MCP RAG with LangChain",
          "run", "server.py"])

st.title(":brain: Streamlit App for MCP RAG with Ollama LLM")
st.write("LEARN LLM @ KGP Talkie: https://www.youtube.com/kgptalkie")

model = ChatOllama(model="qwen3", base_url="http://localhost:11434/")

if "chat_history" not in st.session_state:
    st.session_state["chat_history"] = []

with st.form("llm-form"):
    text = st.text_area("Enter your question here.")
    submit = st.form_submit_button("Submit")
    new_chat = st.form_submit_button("New Chat")
    debug_info = st.checkbox("Show Debug Info")

async def generate_response_async(user_message):
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            tools = await load_mcp_tools(session)
            agent = create_agent(model, tools)
            agent_response = await agent.ainvoke(
                {"messages": [{"role": "user", "content": user_message}]}
            )
            if debug_info:
                st.write("### Debug - Agent Response")
                st.write(agent_response)
            return agent_response.get("messages")[-1].content

def generate_response(user_message):
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    return loop.run_until_complete(generate_response_async(user_message))

if submit and text:
    with st.spinner("Generating response..."):
        response = generate_response(text)
        st.session_state["chat_history"].append({"user": text, "assistant": response})

if new_chat:
    st.session_state["chat_history"] = []

if st.session_state["chat_history"]:
    st.write("## Chat History")
    for chat in reversed(st.session_state["chat_history"]):
        st.write(f"**:adult: User**: {chat['user']}")
        st.write(f"**:brain: Assistant**: {chat['assistant']}")
        st.write("---")

Important

Set --directory to the absolute path of the folder containing your server.py. Replace your-username with your actual Windows username.

On Linux/macOS: use a path like /home/your-username/projects/mcp-course/08 MCP RAG with LangChain.

Note

create_agent is the LangChain v1 helper. On LangChain/LangGraph v0.3 use from langgraph.prebuilt import create_react_agent as create_agent instead.

Run the app:

BASH
uv run streamlit run app.py

Ask it to ingest first, then query:

PLAINTEXT
You: Ingest the PDF at https://example.com/whitepaper.pdf
Assistant: Added 12 chunks from 1 file to the vector database.

You: What does the document say about deployment costs?
Assistant: According to the ingested document, deployment costs are driven by ...

Tip

The same server works in Claude Desktop — register it like any stdio server (see Connect MCP Servers to Claude Desktop) and ask Claude to ingest and query PDFs directly.


Recap

  • A RAG pipeline becomes reusable when exposed as MCP tools: ingest_pdf, retrieve, db_info, clear_db.
  • Initializing embeddings, splitter, and vector store once at module load keeps the server fast and memory-efficient.
  • langchain-mcp-adapters turns MCP tools into LangChain tools so any agent can call them — see github.com/langchain-ai/langchain-mcp-adapters.

Next you will scale this idea into a multi-server research assistant that combines retrieval with live web crawling in Research Assistant with MCP and LangGraph.

Found this useful? Keep building with me.

New tutorials every week on YouTube — or go deeper with a full structured course.

Find this tutorial useful?

Subscribe to our YouTube channels for more practical production walk-throughs.

Discussion & Comments