Compare commits

..

1 Commits

Author SHA1 Message Date
1f3450c2f8 Add incremental indexing with deleted file detection
- Add file_mtime to chunk metadata for change detection
- Add get_indexed_files() and get_existing_sources() methods
- Add filter_new_chunks() to skip unchanged files
- Add remove_chunks_by_source() to delete orphaned chunks
- Update server to detect and remove deleted files on incremental index
- Fix clear() to recreate ChromaVectorStore wrapper
2026-03-04 16:24:27 -05:00
3 changed files with 156 additions and 57 deletions

View File

@ -49,6 +49,9 @@ class MarkdownChunker:
# Get relative path for context # Get relative path for context
rel_path = os.path.relpath(file_path) rel_path = os.path.relpath(file_path)
# Get file modification time for change detection
file_mtime = os.path.getmtime(file_path)
# Split into sections based on headings # Split into sections based on headings
sections = self._split_by_headings(body) sections = self._split_by_headings(body)
@ -64,6 +67,7 @@ class MarkdownChunker:
metadata={ metadata={
"source": rel_path, "source": rel_path,
"file_name": os.path.basename(file_path), "file_name": os.path.basename(file_path),
"file_mtime": file_mtime,
"heading": section.get("heading", ""), "heading": section.get("heading", ""),
"section_index": i, "section_index": i,
"wiki_links": ",".join(wiki_links) if wiki_links else "", "wiki_links": ",".join(wiki_links) if wiki_links else "",
@ -105,9 +109,7 @@ class MarkdownChunker:
return all_chunks return all_chunks
def _extract_frontmatter( def _extract_frontmatter(self, content: str) -> tuple[Optional[dict], str]:
self, content: str
) -> tuple[Optional[dict], str]:
"""Extract YAML frontmatter from markdown.""" """Extract YAML frontmatter from markdown."""
if not content.startswith("---"): if not content.startswith("---"):
return None, content return None, content

View File

@ -35,9 +35,7 @@ class KnowledgeMCPServer:
def __init__(self, vault_path: str | None = None): def __init__(self, vault_path: str | None = None):
# Get vault path from environment or use default # Get vault path from environment or use default
self.vault_path = vault_path or os.environ.get( self.vault_path = vault_path or os.environ.get("VAULT_PATH", "/data/vault")
"VAULT_PATH", "/data/vault"
)
# Ensure vault path exists # Ensure vault path exists
if not Path(self.vault_path).exists(): if not Path(self.vault_path).exists():
@ -45,9 +43,7 @@ class KnowledgeMCPServer:
# Initialize components # Initialize components
self.embedding_model = get_embedding_model() self.embedding_model = get_embedding_model()
self.vector_store = KnowledgeVectorStore( self.vector_store = KnowledgeVectorStore(embedding_model=self.embedding_model)
embedding_model=self.embedding_model
)
self.chunker = MarkdownChunker() self.chunker = MarkdownChunker()
# Track indexing status # Track indexing status
@ -69,9 +65,9 @@ class KnowledgeMCPServer:
Tool( Tool(
name="search_knowledge", name="search_knowledge",
description="Semantic search through the knowledge base. " description="Semantic search through the knowledge base. "
"Uses embeddings to find relevant content based on meaning, " "Uses embeddings to find relevant content based on meaning, "
"not just keywords. Best for answering questions or finding " "not just keywords. Best for answering questions or finding "
"related concepts.", "related concepts.",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
@ -91,8 +87,8 @@ class KnowledgeMCPServer:
Tool( Tool(
name="index_knowledge", name="index_knowledge",
description="Index or re-index the knowledge base. " description="Index or re-index the knowledge base. "
"Run this after adding new files to the vault. " "Run this after adding new files to the vault. "
"Scans all markdown files and builds the search index.", "Scans all markdown files and builds the search index.",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
@ -115,9 +111,7 @@ class KnowledgeMCPServer:
] ]
@self.server.call_tool() @self.server.call_tool()
async def call_tool( async def call_tool(name: str, arguments: dict | None) -> list[TextContent]:
name: str, arguments: dict | None
) -> list[TextContent]:
"""Handle tool calls.""" """Handle tool calls."""
if name == "search_knowledge": if name == "search_knowledge":
return await self._search_knowledge(arguments or {}) return await self._search_knowledge(arguments or {})
@ -128,9 +122,7 @@ class KnowledgeMCPServer:
else: else:
raise ValueError(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}")
async def _search_knowledge( async def _search_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]:
self, arguments: dict[str, Any]
) -> list[TextContent]:
"""Search the knowledge base semantically.""" """Search the knowledge base semantically."""
query = arguments.get("query", "") query = arguments.get("query", "")
top_k = arguments.get("top_k", 5) top_k = arguments.get("top_k", 5)
@ -153,7 +145,7 @@ class KnowledgeMCPServer:
return [ return [
TextContent( TextContent(
type="text", type="text",
text="No results found. Try indexing your knowledge base first." text="No results found. Try indexing your knowledge base first.",
) )
] ]
@ -181,45 +173,70 @@ class KnowledgeMCPServer:
logger.exception("Search error") logger.exception("Search error")
return [TextContent(type="text", text=f"Search error: {str(e)}")] return [TextContent(type="text", text=f"Search error: {str(e)}")]
async def _index_knowledge( async def _index_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]:
self, arguments: dict[str, Any]
) -> list[TextContent]:
"""Index the knowledge base.""" """Index the knowledge base."""
force = arguments.get("force", False) force = arguments.get("force", False)
vault_path = Path(self.vault_path) vault_path = Path(self.vault_path)
if not vault_path.exists(): if not vault_path.exists():
return [ return [TextContent(type="text", text=f"Vault path does not exist: {self.vault_path}")]
TextContent(
type="text",
text=f"Vault path does not exist: {self.vault_path}"
)
]
try: try:
# Clear existing index if forced # Clear existing index if forced
if force: if force:
logger.info("Force re-indexing...") logger.info("Force re-indexing...")
self.vector_store.clear() self.vector_store.clear()
chunks = self.chunker.chunk_directory(str(vault_path))
new_chunks = chunks
else: else:
logger.info("Indexing knowledge base...") logger.info("Indexing knowledge base (incremental)...")
# Chunk all markdown files # Chunk all markdown files
chunks = self.chunker.chunk_directory(str(vault_path)) all_chunks = self.chunker.chunk_directory(str(vault_path))
if not chunks: if not all_chunks:
return [ return [TextContent(type="text", text="No markdown files found in vault.")]
TextContent(
type="text",
text="No markdown files found in vault."
)
]
logger.info(f"Created {len(chunks)} chunks, adding to vector store...") # Get current sources from the vault
current_sources = set()
for chunk in all_chunks:
source = chunk.metadata.get("source")
if source:
current_sources.add(source)
# Get indexed sources and detect deleted files
indexed_sources = self.vector_store.get_existing_sources()
deleted_sources = indexed_sources - current_sources
# Remove chunks from deleted files
if deleted_sources:
logger.info(f"Removing chunks from deleted files: {deleted_sources}")
for source in deleted_sources:
self.vector_store.remove_chunks_by_source(source)
# Filter to only new/modified chunks
new_chunks = self.vector_store.filter_new_chunks(all_chunks)
if not new_chunks:
# Check if we have any existing index
stats = self.vector_store.get_stats()
if stats["total_chunks"] > 0:
return [
TextContent(
type="text",
text=f"No new or modified files to index.\n"
f"Total chunks in index: {stats['total_chunks']}",
)
]
# No existing index, fall through to index everything
new_chunks = all_chunks
logger.info(f"Processing {len(new_chunks)} new/modified chunks...")
# Add to vector store (this embeds them) # Add to vector store (this embeds them)
self.vector_store.add_nodes(chunks, embedding_model=self.embedding_model) if new_chunks:
self.vector_store.add_nodes(new_chunks, embedding_model=self.embedding_model)
self._indexed = True self._indexed = True
@ -227,8 +244,8 @@ class KnowledgeMCPServer:
return [ return [
TextContent( TextContent(
type="text", type="text",
text=f"Successfully indexed {len(chunks)} chunks from the knowledge base.\n" text=f"Successfully indexed {len(new_chunks)} chunks from the knowledge base.\n"
f"Total chunks in index: {stats['total_chunks']}" f"Total chunks in index: {stats['total_chunks']}",
) )
] ]
@ -247,10 +264,10 @@ class KnowledgeMCPServer:
TextContent( TextContent(
type="text", type="text",
text=f"Knowledge Base Statistics:\n" text=f"Knowledge Base Statistics:\n"
f"- Vault path: {self.vault_path}\n" f"- Vault path: {self.vault_path}\n"
f"- Markdown files: {len(md_files)}\n" f"- Markdown files: {len(md_files)}\n"
f"- Indexed chunks: {stats['total_chunks']}\n" f"- Indexed chunks: {stats['total_chunks']}\n"
f"- Index status: {'Ready' if self._indexed else 'Not indexed'}" f"- Index status: {'Ready' if self._indexed else 'Not indexed'}",
) )
] ]
@ -279,4 +296,5 @@ async def main():
if __name__ == "__main__": if __name__ == "__main__":
import asyncio import asyncio
asyncio.run(main()) asyncio.run(main())

View File

@ -45,8 +45,7 @@ class KnowledgeVectorStore:
# Get or create collection # Get or create collection
self._collection = self._client.get_or_create_collection( self._collection = self._client.get_or_create_collection(
name=collection_name, name=collection_name, metadata={"description": "Knowledge base embeddings"}
metadata={"description": "Knowledge base embeddings"}
) )
# Wrap in LlamaIndex vector store # Wrap in LlamaIndex vector store
@ -64,7 +63,9 @@ class KnowledgeVectorStore:
"""Get the LlamaIndex ChromaVectorStore.""" """Get the LlamaIndex ChromaVectorStore."""
return self._vector_store return self._vector_store
def add_nodes(self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None) -> None: def add_nodes(
self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None
) -> None:
"""Add nodes to the vector store.""" """Add nodes to the vector store."""
from llama_index.core import VectorStoreIndex, StorageContext from llama_index.core import VectorStoreIndex, StorageContext
@ -127,8 +128,11 @@ class KnowledgeVectorStore:
"""Clear all embeddings from the store.""" """Clear all embeddings from the store."""
self._client.delete_collection(self._collection_name) self._client.delete_collection(self._collection_name)
self._collection = self._client.get_or_create_collection( self._collection = self._client.get_or_create_collection(
name=self._collection_name, name=self._collection_name, metadata={"description": "Knowledge base embeddings"}
metadata={"description": "Knowledge base embeddings"} )
# Recreate the ChromaVectorStore wrapper with the new collection
self._vector_store = ChromaVectorStore(
chroma_collection=self._collection,
) )
def get_stats(self) -> dict[str, Any]: def get_stats(self) -> dict[str, Any]:
@ -137,3 +141,78 @@ class KnowledgeVectorStore:
"total_chunks": self._collection.count(), "total_chunks": self._collection.count(),
"collection_name": self._collection_name, "collection_name": self._collection_name,
} }
def get_indexed_files(self) -> dict[str, float]:
"""Get all indexed files and their modification times.
Returns:
Dict mapping source path to file modification time
"""
indexed_files = {}
# Get all items from the collection
items = self._collection.get()
if items and items.get("metadatas"):
for metadata in items["metadatas"]:
source = metadata.get("source")
file_mtime = metadata.get("file_mtime")
if source and file_mtime is not None:
# Store the latest mtime for each source
if source not in indexed_files or file_mtime > indexed_files[source]:
indexed_files[source] = file_mtime
return indexed_files
def get_existing_sources(self) -> set[str]:
"""Get set of all indexed source file paths."""
return set(self.get_indexed_files().keys())
def filter_new_chunks(self, nodes: List["TextNode"]) -> List["TextNode"]:
"""Filter out chunks that are already indexed.
Compares source file path and modification time to skip unchanged files.
Args:
nodes: List of TextNode chunks to filter
Returns:
List of chunks that are new or modified
"""
indexed_files = self.get_indexed_files()
new_chunks = []
for node in nodes:
source = node.metadata.get("source")
file_mtime = node.metadata.get("file_mtime")
if source is None:
# Include chunks without source (shouldn't happen)
new_chunks.append(node)
continue
# Check if this file has been indexed
indexed_mtime = indexed_files.get(source)
if indexed_mtime is None:
# New file, not in index
new_chunks.append(node)
elif file_mtime > indexed_mtime:
# File has been modified since last index
# First remove old chunks for this file
self._remove_chunks_by_source(source)
new_chunks.append(node)
# else: file unchanged, skip it
return new_chunks
def _remove_chunks_by_source(self, source: str) -> None:
"""Remove all chunks from a specific source file."""
# Get IDs of chunks to delete
items = self._collection.get(where={"source": source})
if items and items.get("ids"):
self._collection.delete(ids=items["ids"])
def remove_chunks_by_source(self, source: str) -> None:
"""Public method to remove all chunks from a specific source file."""
self._remove_chunks_by_source(source)