diff --git a/src/knowledge_rag/chunker.py b/src/knowledge_rag/chunker.py index 2bbc59a..d537cc7 100644 --- a/src/knowledge_rag/chunker.py +++ b/src/knowledge_rag/chunker.py @@ -49,6 +49,9 @@ class MarkdownChunker: # Get relative path for context rel_path = os.path.relpath(file_path) + # Get file modification time for change detection + file_mtime = os.path.getmtime(file_path) + # Split into sections based on headings sections = self._split_by_headings(body) @@ -64,6 +67,7 @@ class MarkdownChunker: metadata={ "source": rel_path, "file_name": os.path.basename(file_path), + "file_mtime": file_mtime, "heading": section.get("heading", ""), "section_index": i, "wiki_links": ",".join(wiki_links) if wiki_links else "", @@ -105,9 +109,7 @@ class MarkdownChunker: return all_chunks - def _extract_frontmatter( - self, content: str - ) -> tuple[Optional[dict], str]: + def _extract_frontmatter(self, content: str) -> tuple[Optional[dict], str]: """Extract YAML frontmatter from markdown.""" if not content.startswith("---"): return None, content diff --git a/src/knowledge_rag/server.py b/src/knowledge_rag/server.py index 62f0a44..85ceb43 100644 --- a/src/knowledge_rag/server.py +++ b/src/knowledge_rag/server.py @@ -35,9 +35,7 @@ class KnowledgeMCPServer: def __init__(self, vault_path: str | None = None): # Get vault path from environment or use default - self.vault_path = vault_path or os.environ.get( - "VAULT_PATH", "/data/vault" - ) + self.vault_path = vault_path or os.environ.get("VAULT_PATH", "/data/vault") # Ensure vault path exists if not Path(self.vault_path).exists(): @@ -45,9 +43,7 @@ class KnowledgeMCPServer: # Initialize components self.embedding_model = get_embedding_model() - self.vector_store = KnowledgeVectorStore( - embedding_model=self.embedding_model - ) + self.vector_store = KnowledgeVectorStore(embedding_model=self.embedding_model) self.chunker = MarkdownChunker() # Track indexing status @@ -69,9 +65,9 @@ class KnowledgeMCPServer: Tool( name="search_knowledge", description="Semantic search through the knowledge base. " - "Uses embeddings to find relevant content based on meaning, " - "not just keywords. Best for answering questions or finding " - "related concepts.", + "Uses embeddings to find relevant content based on meaning, " + "not just keywords. Best for answering questions or finding " + "related concepts.", inputSchema={ "type": "object", "properties": { @@ -91,8 +87,8 @@ class KnowledgeMCPServer: Tool( name="index_knowledge", description="Index or re-index the knowledge base. " - "Run this after adding new files to the vault. " - "Scans all markdown files and builds the search index.", + "Run this after adding new files to the vault. " + "Scans all markdown files and builds the search index.", inputSchema={ "type": "object", "properties": { @@ -115,9 +111,7 @@ class KnowledgeMCPServer: ] @self.server.call_tool() - async def call_tool( - name: str, arguments: dict | None - ) -> list[TextContent]: + async def call_tool(name: str, arguments: dict | None) -> list[TextContent]: """Handle tool calls.""" if name == "search_knowledge": return await self._search_knowledge(arguments or {}) @@ -128,9 +122,7 @@ class KnowledgeMCPServer: else: raise ValueError(f"Unknown tool: {name}") - async def _search_knowledge( - self, arguments: dict[str, Any] - ) -> list[TextContent]: + async def _search_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]: """Search the knowledge base semantically.""" query = arguments.get("query", "") top_k = arguments.get("top_k", 5) @@ -153,7 +145,7 @@ class KnowledgeMCPServer: return [ TextContent( type="text", - text="No results found. Try indexing your knowledge base first." + text="No results found. Try indexing your knowledge base first.", ) ] @@ -181,45 +173,70 @@ class KnowledgeMCPServer: logger.exception("Search error") return [TextContent(type="text", text=f"Search error: {str(e)}")] - async def _index_knowledge( - self, arguments: dict[str, Any] - ) -> list[TextContent]: + async def _index_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]: """Index the knowledge base.""" force = arguments.get("force", False) vault_path = Path(self.vault_path) if not vault_path.exists(): - return [ - TextContent( - type="text", - text=f"Vault path does not exist: {self.vault_path}" - ) - ] + return [TextContent(type="text", text=f"Vault path does not exist: {self.vault_path}")] try: # Clear existing index if forced if force: logger.info("Force re-indexing...") self.vector_store.clear() + chunks = self.chunker.chunk_directory(str(vault_path)) + new_chunks = chunks else: - logger.info("Indexing knowledge base...") + logger.info("Indexing knowledge base (incremental)...") - # Chunk all markdown files - chunks = self.chunker.chunk_directory(str(vault_path)) + # Chunk all markdown files + all_chunks = self.chunker.chunk_directory(str(vault_path)) - if not chunks: - return [ - TextContent( - type="text", - text="No markdown files found in vault." - ) - ] + if not all_chunks: + return [TextContent(type="text", text="No markdown files found in vault.")] - logger.info(f"Created {len(chunks)} chunks, adding to vector store...") + # Get current sources from the vault + current_sources = set() + for chunk in all_chunks: + source = chunk.metadata.get("source") + if source: + current_sources.add(source) + + # Get indexed sources and detect deleted files + indexed_sources = self.vector_store.get_existing_sources() + deleted_sources = indexed_sources - current_sources + + # Remove chunks from deleted files + if deleted_sources: + logger.info(f"Removing chunks from deleted files: {deleted_sources}") + for source in deleted_sources: + self.vector_store.remove_chunks_by_source(source) + + # Filter to only new/modified chunks + new_chunks = self.vector_store.filter_new_chunks(all_chunks) + + if not new_chunks: + # Check if we have any existing index + stats = self.vector_store.get_stats() + if stats["total_chunks"] > 0: + return [ + TextContent( + type="text", + text=f"No new or modified files to index.\n" + f"Total chunks in index: {stats['total_chunks']}", + ) + ] + # No existing index, fall through to index everything + new_chunks = all_chunks + + logger.info(f"Processing {len(new_chunks)} new/modified chunks...") # Add to vector store (this embeds them) - self.vector_store.add_nodes(chunks, embedding_model=self.embedding_model) + if new_chunks: + self.vector_store.add_nodes(new_chunks, embedding_model=self.embedding_model) self._indexed = True @@ -227,8 +244,8 @@ class KnowledgeMCPServer: return [ TextContent( type="text", - text=f"Successfully indexed {len(chunks)} chunks from the knowledge base.\n" - f"Total chunks in index: {stats['total_chunks']}" + text=f"Successfully indexed {len(new_chunks)} chunks from the knowledge base.\n" + f"Total chunks in index: {stats['total_chunks']}", ) ] @@ -247,10 +264,10 @@ class KnowledgeMCPServer: TextContent( type="text", text=f"Knowledge Base Statistics:\n" - f"- Vault path: {self.vault_path}\n" - f"- Markdown files: {len(md_files)}\n" - f"- Indexed chunks: {stats['total_chunks']}\n" - f"- Index status: {'Ready' if self._indexed else 'Not indexed'}" + f"- Vault path: {self.vault_path}\n" + f"- Markdown files: {len(md_files)}\n" + f"- Indexed chunks: {stats['total_chunks']}\n" + f"- Index status: {'Ready' if self._indexed else 'Not indexed'}", ) ] @@ -279,4 +296,5 @@ async def main(): if __name__ == "__main__": import asyncio + asyncio.run(main()) diff --git a/src/knowledge_rag/vector_store.py b/src/knowledge_rag/vector_store.py index 911fe63..9b3c8a0 100644 --- a/src/knowledge_rag/vector_store.py +++ b/src/knowledge_rag/vector_store.py @@ -45,8 +45,7 @@ class KnowledgeVectorStore: # Get or create collection self._collection = self._client.get_or_create_collection( - name=collection_name, - metadata={"description": "Knowledge base embeddings"} + name=collection_name, metadata={"description": "Knowledge base embeddings"} ) # Wrap in LlamaIndex vector store @@ -64,20 +63,22 @@ class KnowledgeVectorStore: """Get the LlamaIndex ChromaVectorStore.""" return self._vector_store - def add_nodes(self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None) -> None: + def add_nodes( + self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None + ) -> None: """Add nodes to the vector store.""" from llama_index.core import VectorStoreIndex, StorageContext - + # Use provided embedding model or the stored one model = embedding_model or self._embedding_model - + if model is None: raise ValueError("No embedding model provided") - + # First embed the nodes for node in nodes: node.embedding = model.get_text_embedding(node.text) - + # Then add to vector store self._vector_store.add(nodes) @@ -127,8 +128,11 @@ class KnowledgeVectorStore: """Clear all embeddings from the store.""" self._client.delete_collection(self._collection_name) self._collection = self._client.get_or_create_collection( - name=self._collection_name, - metadata={"description": "Knowledge base embeddings"} + name=self._collection_name, metadata={"description": "Knowledge base embeddings"} + ) + # Recreate the ChromaVectorStore wrapper with the new collection + self._vector_store = ChromaVectorStore( + chroma_collection=self._collection, ) def get_stats(self) -> dict[str, Any]: @@ -137,3 +141,78 @@ class KnowledgeVectorStore: "total_chunks": self._collection.count(), "collection_name": self._collection_name, } + + def get_indexed_files(self) -> dict[str, float]: + """Get all indexed files and their modification times. + + Returns: + Dict mapping source path to file modification time + """ + indexed_files = {} + + # Get all items from the collection + items = self._collection.get() + + if items and items.get("metadatas"): + for metadata in items["metadatas"]: + source = metadata.get("source") + file_mtime = metadata.get("file_mtime") + if source and file_mtime is not None: + # Store the latest mtime for each source + if source not in indexed_files or file_mtime > indexed_files[source]: + indexed_files[source] = file_mtime + + return indexed_files + + def get_existing_sources(self) -> set[str]: + """Get set of all indexed source file paths.""" + return set(self.get_indexed_files().keys()) + + def filter_new_chunks(self, nodes: List["TextNode"]) -> List["TextNode"]: + """Filter out chunks that are already indexed. + + Compares source file path and modification time to skip unchanged files. + + Args: + nodes: List of TextNode chunks to filter + + Returns: + List of chunks that are new or modified + """ + indexed_files = self.get_indexed_files() + + new_chunks = [] + for node in nodes: + source = node.metadata.get("source") + file_mtime = node.metadata.get("file_mtime") + + if source is None: + # Include chunks without source (shouldn't happen) + new_chunks.append(node) + continue + + # Check if this file has been indexed + indexed_mtime = indexed_files.get(source) + if indexed_mtime is None: + # New file, not in index + new_chunks.append(node) + elif file_mtime > indexed_mtime: + # File has been modified since last index + # First remove old chunks for this file + self._remove_chunks_by_source(source) + new_chunks.append(node) + # else: file unchanged, skip it + + return new_chunks + + def _remove_chunks_by_source(self, source: str) -> None: + """Remove all chunks from a specific source file.""" + # Get IDs of chunks to delete + items = self._collection.get(where={"source": source}) + + if items and items.get("ids"): + self._collection.delete(ids=items["ids"]) + + def remove_chunks_by_source(self, source: str) -> None: + """Public method to remove all chunks from a specific source file.""" + self._remove_chunks_by_source(source)