Initial setup: Knowledge base RAG system with LlamaIndex and ChromaDB

- Add Python project with uv package manager
- Implement LlamaIndex + ChromaDB RAG pipeline
- Add sentence-transformers for local embeddings (all-MiniLM-L6-v2)
- Create MCP server with semantic search, indexing, and stats tools
- Add Markdown chunker with heading/wikilink/frontmatter support
- Add Dockerfile and docker-compose.yaml for self-hosted deployment
- Include sample Obsidian vault files for testing
- Add .gitignore and .env.example
This commit is contained in:
2026-03-03 20:42:42 -05:00
parent 94dd158d1c
commit 11c3f705ce
11 changed files with 5319 additions and 0 deletions

282
src/knowledge_rag/server.py Normal file
View File

@ -0,0 +1,282 @@
"""MCP server for knowledge base RAG system."""
import os
import sys
import logging
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import AnyUrl
from .chunker import MarkdownChunker
from .embeddings import get_embedding_model
from .vector_store import KnowledgeVectorStore
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
class KnowledgeMCPServer:
"""MCP server for semantic search in Obsidian vault.
Provides tools to:
- Search the knowledge base semantically
- Index/update the knowledge base
- Get statistics about indexed content
"""
def __init__(self, vault_path: str | None = None):
# Get vault path from environment or use default
self.vault_path = vault_path or os.environ.get(
"VAULT_PATH", "/data/vault"
)
# Ensure vault path exists
if not Path(self.vault_path).exists():
logger.warning(f"Vault path does not exist: {self.vault_path}")
# Initialize components
self.embedding_model = get_embedding_model()
self.vector_store = KnowledgeVectorStore(
embedding_model=self.embedding_model
)
self.chunker = MarkdownChunker()
# Track indexing status
self._indexed = False
# Create MCP server
self.server = Server("knowledge-rag")
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP request handlers."""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""List available MCP tools."""
return [
Tool(
name="search_knowledge",
description="Semantic search through the knowledge base. "
"Uses embeddings to find relevant content based on meaning, "
"not just keywords. Best for answering questions or finding "
"related concepts.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query in natural language",
},
"top_k": {
"type": "integer",
"description": "Number of results to return",
"default": 5,
},
},
"required": ["query"],
},
),
Tool(
name="index_knowledge",
description="Index or re-index the knowledge base. "
"Run this after adding new files to the vault. "
"Scans all markdown files and builds the search index.",
inputSchema={
"type": "object",
"properties": {
"force": {
"type": "boolean",
"description": "Force re-index (clear existing index first)",
"default": False,
},
},
},
),
Tool(
name="get_knowledge_stats",
description="Get statistics about the indexed knowledge base.",
inputSchema={
"type": "object",
"properties": {},
},
),
]
@self.server.call_tool()
async def call_tool(
name: str, arguments: dict | None
) -> list[TextContent]:
"""Handle tool calls."""
if name == "search_knowledge":
return await self._search_knowledge(arguments or {})
elif name == "index_knowledge":
return await self._index_knowledge(arguments or {})
elif name == "get_knowledge_stats":
return await self._get_stats()
else:
raise ValueError(f"Unknown tool: {name}")
async def _search_knowledge(
self, arguments: dict[str, Any]
) -> list[TextContent]:
"""Search the knowledge base semantically."""
query = arguments.get("query", "")
top_k = arguments.get("top_k", 5)
if not query:
return [TextContent(type="text", text="Query cannot be empty.")]
# Ensure we've indexed
if not self._indexed:
await self._index_knowledge({})
try:
# Search with embeddings
results = self.vector_store.search(
query=query,
top_k=top_k,
)
if not results:
return [
TextContent(
type="text",
text="No results found. Try indexing your knowledge base first."
)
]
# Format results
output = []
for i, result in enumerate(results, 1):
source = result["metadata"].get("file_name", "unknown")
heading = result["metadata"].get("heading", "")
score = result.get("score", 0)
text = result["text"][:500] # Truncate long text
if len(result["text"]) > 500:
text += "..."
output.append(
f"--- Result {i} ---\n"
f"Source: {source}"
+ (f" > {heading}" if heading else "")
+ f"\nRelevance: {score:.2f}\n\n{text}\n"
)
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.exception("Search error")
return [TextContent(type="text", text=f"Search error: {str(e)}")]
async def _index_knowledge(
self, arguments: dict[str, Any]
) -> list[TextContent]:
"""Index the knowledge base."""
force = arguments.get("force", False)
vault_path = Path(self.vault_path)
if not vault_path.exists():
return [
TextContent(
type="text",
text=f"Vault path does not exist: {self.vault_path}"
)
]
try:
# Clear existing index if forced
if force:
logger.info("Force re-indexing...")
self.vector_store.clear()
else:
logger.info("Indexing knowledge base...")
# Chunk all markdown files
chunks = self.chunker.chunk_directory(str(vault_path))
if not chunks:
return [
TextContent(
type="text",
text="No markdown files found in vault."
)
]
logger.info(f"Created {len(chunks)} chunks, adding to vector store...")
# Add to vector store (this embeds them)
self.vector_store.add_nodes(chunks, embedding_model=self.embedding_model)
self._indexed = True
stats = self.vector_store.get_stats()
return [
TextContent(
type="text",
text=f"Successfully indexed {len(chunks)} chunks from the knowledge base.\n"
f"Total chunks in index: {stats['total_chunks']}"
)
]
except Exception as e:
logger.exception("Indexing error")
return [TextContent(type="text", text=f"Indexing error: {str(e)}")]
async def _get_stats(self) -> list[TextContent]:
"""Get knowledge base statistics."""
stats = self.vector_store.get_stats()
vault_path = Path(self.vault_path)
md_files = list(vault_path.rglob("*.md")) if vault_path.exists() else []
return [
TextContent(
type="text",
text=f"Knowledge Base Statistics:\n"
f"- Vault path: {self.vault_path}\n"
f"- Markdown files: {len(md_files)}\n"
f"- Indexed chunks: {stats['total_chunks']}\n"
f"- Index status: {'Ready' if self._indexed else 'Not indexed'}"
)
]
async def run(self):
"""Run the MCP server."""
logger.info(f"Starting Knowledge RAG MCP Server")
logger.info(f"Vault path: {self.vault_path}")
# Auto-index on startup
await self._index_knowledge({})
# Run stdio server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options(),
)
async def main():
"""Main entry point."""
server = KnowledgeMCPServer()
await server.run()
if __name__ == "__main__":
import asyncio
asyncio.run(main())