Compare commits

...

1 Commits

Author SHA1 Message Date
1f3450c2f8 Add incremental indexing with deleted file detection
- Add file_mtime to chunk metadata for change detection
- Add get_indexed_files() and get_existing_sources() methods
- Add filter_new_chunks() to skip unchanged files
- Add remove_chunks_by_source() to delete orphaned chunks
- Update server to detect and remove deleted files on incremental index
- Fix clear() to recreate ChromaVectorStore wrapper
2026-03-04 16:24:27 -05:00
3 changed files with 156 additions and 57 deletions

View File

@ -49,6 +49,9 @@ class MarkdownChunker:
# Get relative path for context
rel_path = os.path.relpath(file_path)
# Get file modification time for change detection
file_mtime = os.path.getmtime(file_path)
# Split into sections based on headings
sections = self._split_by_headings(body)
@ -64,6 +67,7 @@ class MarkdownChunker:
metadata={
"source": rel_path,
"file_name": os.path.basename(file_path),
"file_mtime": file_mtime,
"heading": section.get("heading", ""),
"section_index": i,
"wiki_links": ",".join(wiki_links) if wiki_links else "",
@ -105,9 +109,7 @@ class MarkdownChunker:
return all_chunks
def _extract_frontmatter(
self, content: str
) -> tuple[Optional[dict], str]:
def _extract_frontmatter(self, content: str) -> tuple[Optional[dict], str]:
"""Extract YAML frontmatter from markdown."""
if not content.startswith("---"):
return None, content

View File

@ -35,9 +35,7 @@ class KnowledgeMCPServer:
def __init__(self, vault_path: str | None = None):
# Get vault path from environment or use default
self.vault_path = vault_path or os.environ.get(
"VAULT_PATH", "/data/vault"
)
self.vault_path = vault_path or os.environ.get("VAULT_PATH", "/data/vault")
# Ensure vault path exists
if not Path(self.vault_path).exists():
@ -45,9 +43,7 @@ class KnowledgeMCPServer:
# Initialize components
self.embedding_model = get_embedding_model()
self.vector_store = KnowledgeVectorStore(
embedding_model=self.embedding_model
)
self.vector_store = KnowledgeVectorStore(embedding_model=self.embedding_model)
self.chunker = MarkdownChunker()
# Track indexing status
@ -115,9 +111,7 @@ class KnowledgeMCPServer:
]
@self.server.call_tool()
async def call_tool(
name: str, arguments: dict | None
) -> list[TextContent]:
async def call_tool(name: str, arguments: dict | None) -> list[TextContent]:
"""Handle tool calls."""
if name == "search_knowledge":
return await self._search_knowledge(arguments or {})
@ -128,9 +122,7 @@ class KnowledgeMCPServer:
else:
raise ValueError(f"Unknown tool: {name}")
async def _search_knowledge(
self, arguments: dict[str, Any]
) -> list[TextContent]:
async def _search_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]:
"""Search the knowledge base semantically."""
query = arguments.get("query", "")
top_k = arguments.get("top_k", 5)
@ -153,7 +145,7 @@ class KnowledgeMCPServer:
return [
TextContent(
type="text",
text="No results found. Try indexing your knowledge base first."
text="No results found. Try indexing your knowledge base first.",
)
]
@ -181,45 +173,70 @@ class KnowledgeMCPServer:
logger.exception("Search error")
return [TextContent(type="text", text=f"Search error: {str(e)}")]
async def _index_knowledge(
self, arguments: dict[str, Any]
) -> list[TextContent]:
async def _index_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]:
"""Index the knowledge base."""
force = arguments.get("force", False)
vault_path = Path(self.vault_path)
if not vault_path.exists():
return [
TextContent(
type="text",
text=f"Vault path does not exist: {self.vault_path}"
)
]
return [TextContent(type="text", text=f"Vault path does not exist: {self.vault_path}")]
try:
# Clear existing index if forced
if force:
logger.info("Force re-indexing...")
self.vector_store.clear()
chunks = self.chunker.chunk_directory(str(vault_path))
new_chunks = chunks
else:
logger.info("Indexing knowledge base...")
logger.info("Indexing knowledge base (incremental)...")
# Chunk all markdown files
chunks = self.chunker.chunk_directory(str(vault_path))
all_chunks = self.chunker.chunk_directory(str(vault_path))
if not chunks:
if not all_chunks:
return [TextContent(type="text", text="No markdown files found in vault.")]
# Get current sources from the vault
current_sources = set()
for chunk in all_chunks:
source = chunk.metadata.get("source")
if source:
current_sources.add(source)
# Get indexed sources and detect deleted files
indexed_sources = self.vector_store.get_existing_sources()
deleted_sources = indexed_sources - current_sources
# Remove chunks from deleted files
if deleted_sources:
logger.info(f"Removing chunks from deleted files: {deleted_sources}")
for source in deleted_sources:
self.vector_store.remove_chunks_by_source(source)
# Filter to only new/modified chunks
new_chunks = self.vector_store.filter_new_chunks(all_chunks)
if not new_chunks:
# Check if we have any existing index
stats = self.vector_store.get_stats()
if stats["total_chunks"] > 0:
return [
TextContent(
type="text",
text="No markdown files found in vault."
text=f"No new or modified files to index.\n"
f"Total chunks in index: {stats['total_chunks']}",
)
]
# No existing index, fall through to index everything
new_chunks = all_chunks
logger.info(f"Created {len(chunks)} chunks, adding to vector store...")
logger.info(f"Processing {len(new_chunks)} new/modified chunks...")
# Add to vector store (this embeds them)
self.vector_store.add_nodes(chunks, embedding_model=self.embedding_model)
if new_chunks:
self.vector_store.add_nodes(new_chunks, embedding_model=self.embedding_model)
self._indexed = True
@ -227,8 +244,8 @@ class KnowledgeMCPServer:
return [
TextContent(
type="text",
text=f"Successfully indexed {len(chunks)} chunks from the knowledge base.\n"
f"Total chunks in index: {stats['total_chunks']}"
text=f"Successfully indexed {len(new_chunks)} chunks from the knowledge base.\n"
f"Total chunks in index: {stats['total_chunks']}",
)
]
@ -250,7 +267,7 @@ class KnowledgeMCPServer:
f"- Vault path: {self.vault_path}\n"
f"- Markdown files: {len(md_files)}\n"
f"- Indexed chunks: {stats['total_chunks']}\n"
f"- Index status: {'Ready' if self._indexed else 'Not indexed'}"
f"- Index status: {'Ready' if self._indexed else 'Not indexed'}",
)
]
@ -279,4 +296,5 @@ async def main():
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View File

@ -45,8 +45,7 @@ class KnowledgeVectorStore:
# Get or create collection
self._collection = self._client.get_or_create_collection(
name=collection_name,
metadata={"description": "Knowledge base embeddings"}
name=collection_name, metadata={"description": "Knowledge base embeddings"}
)
# Wrap in LlamaIndex vector store
@ -64,7 +63,9 @@ class KnowledgeVectorStore:
"""Get the LlamaIndex ChromaVectorStore."""
return self._vector_store
def add_nodes(self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None) -> None:
def add_nodes(
self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None
) -> None:
"""Add nodes to the vector store."""
from llama_index.core import VectorStoreIndex, StorageContext
@ -127,8 +128,11 @@ class KnowledgeVectorStore:
"""Clear all embeddings from the store."""
self._client.delete_collection(self._collection_name)
self._collection = self._client.get_or_create_collection(
name=self._collection_name,
metadata={"description": "Knowledge base embeddings"}
name=self._collection_name, metadata={"description": "Knowledge base embeddings"}
)
# Recreate the ChromaVectorStore wrapper with the new collection
self._vector_store = ChromaVectorStore(
chroma_collection=self._collection,
)
def get_stats(self) -> dict[str, Any]:
@ -137,3 +141,78 @@ class KnowledgeVectorStore:
"total_chunks": self._collection.count(),
"collection_name": self._collection_name,
}
def get_indexed_files(self) -> dict[str, float]:
"""Get all indexed files and their modification times.
Returns:
Dict mapping source path to file modification time
"""
indexed_files = {}
# Get all items from the collection
items = self._collection.get()
if items and items.get("metadatas"):
for metadata in items["metadatas"]:
source = metadata.get("source")
file_mtime = metadata.get("file_mtime")
if source and file_mtime is not None:
# Store the latest mtime for each source
if source not in indexed_files or file_mtime > indexed_files[source]:
indexed_files[source] = file_mtime
return indexed_files
def get_existing_sources(self) -> set[str]:
"""Get set of all indexed source file paths."""
return set(self.get_indexed_files().keys())
def filter_new_chunks(self, nodes: List["TextNode"]) -> List["TextNode"]:
"""Filter out chunks that are already indexed.
Compares source file path and modification time to skip unchanged files.
Args:
nodes: List of TextNode chunks to filter
Returns:
List of chunks that are new or modified
"""
indexed_files = self.get_indexed_files()
new_chunks = []
for node in nodes:
source = node.metadata.get("source")
file_mtime = node.metadata.get("file_mtime")
if source is None:
# Include chunks without source (shouldn't happen)
new_chunks.append(node)
continue
# Check if this file has been indexed
indexed_mtime = indexed_files.get(source)
if indexed_mtime is None:
# New file, not in index
new_chunks.append(node)
elif file_mtime > indexed_mtime:
# File has been modified since last index
# First remove old chunks for this file
self._remove_chunks_by_source(source)
new_chunks.append(node)
# else: file unchanged, skip it
return new_chunks
def _remove_chunks_by_source(self, source: str) -> None:
"""Remove all chunks from a specific source file."""
# Get IDs of chunks to delete
items = self._collection.get(where={"source": source})
if items and items.get("ids"):
self._collection.delete(ids=items["ids"])
def remove_chunks_by_source(self, source: str) -> None:
"""Public method to remove all chunks from a specific source file."""
self._remove_chunks_by_source(source)