Compare commits
1 Commits
46afc4c256
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1f3450c2f8 |
@ -49,6 +49,9 @@ class MarkdownChunker:
|
||||
# Get relative path for context
|
||||
rel_path = os.path.relpath(file_path)
|
||||
|
||||
# Get file modification time for change detection
|
||||
file_mtime = os.path.getmtime(file_path)
|
||||
|
||||
# Split into sections based on headings
|
||||
sections = self._split_by_headings(body)
|
||||
|
||||
@ -64,6 +67,7 @@ class MarkdownChunker:
|
||||
metadata={
|
||||
"source": rel_path,
|
||||
"file_name": os.path.basename(file_path),
|
||||
"file_mtime": file_mtime,
|
||||
"heading": section.get("heading", ""),
|
||||
"section_index": i,
|
||||
"wiki_links": ",".join(wiki_links) if wiki_links else "",
|
||||
@ -105,9 +109,7 @@ class MarkdownChunker:
|
||||
|
||||
return all_chunks
|
||||
|
||||
def _extract_frontmatter(
|
||||
self, content: str
|
||||
) -> tuple[Optional[dict], str]:
|
||||
def _extract_frontmatter(self, content: str) -> tuple[Optional[dict], str]:
|
||||
"""Extract YAML frontmatter from markdown."""
|
||||
if not content.startswith("---"):
|
||||
return None, content
|
||||
|
||||
@ -35,9 +35,7 @@ class KnowledgeMCPServer:
|
||||
|
||||
def __init__(self, vault_path: str | None = None):
|
||||
# Get vault path from environment or use default
|
||||
self.vault_path = vault_path or os.environ.get(
|
||||
"VAULT_PATH", "/data/vault"
|
||||
)
|
||||
self.vault_path = vault_path or os.environ.get("VAULT_PATH", "/data/vault")
|
||||
|
||||
# Ensure vault path exists
|
||||
if not Path(self.vault_path).exists():
|
||||
@ -45,9 +43,7 @@ class KnowledgeMCPServer:
|
||||
|
||||
# Initialize components
|
||||
self.embedding_model = get_embedding_model()
|
||||
self.vector_store = KnowledgeVectorStore(
|
||||
embedding_model=self.embedding_model
|
||||
)
|
||||
self.vector_store = KnowledgeVectorStore(embedding_model=self.embedding_model)
|
||||
self.chunker = MarkdownChunker()
|
||||
|
||||
# Track indexing status
|
||||
@ -69,9 +65,9 @@ class KnowledgeMCPServer:
|
||||
Tool(
|
||||
name="search_knowledge",
|
||||
description="Semantic search through the knowledge base. "
|
||||
"Uses embeddings to find relevant content based on meaning, "
|
||||
"not just keywords. Best for answering questions or finding "
|
||||
"related concepts.",
|
||||
"Uses embeddings to find relevant content based on meaning, "
|
||||
"not just keywords. Best for answering questions or finding "
|
||||
"related concepts.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -91,8 +87,8 @@ class KnowledgeMCPServer:
|
||||
Tool(
|
||||
name="index_knowledge",
|
||||
description="Index or re-index the knowledge base. "
|
||||
"Run this after adding new files to the vault. "
|
||||
"Scans all markdown files and builds the search index.",
|
||||
"Run this after adding new files to the vault. "
|
||||
"Scans all markdown files and builds the search index.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -115,9 +111,7 @@ class KnowledgeMCPServer:
|
||||
]
|
||||
|
||||
@self.server.call_tool()
|
||||
async def call_tool(
|
||||
name: str, arguments: dict | None
|
||||
) -> list[TextContent]:
|
||||
async def call_tool(name: str, arguments: dict | None) -> list[TextContent]:
|
||||
"""Handle tool calls."""
|
||||
if name == "search_knowledge":
|
||||
return await self._search_knowledge(arguments or {})
|
||||
@ -128,9 +122,7 @@ class KnowledgeMCPServer:
|
||||
else:
|
||||
raise ValueError(f"Unknown tool: {name}")
|
||||
|
||||
async def _search_knowledge(
|
||||
self, arguments: dict[str, Any]
|
||||
) -> list[TextContent]:
|
||||
async def _search_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]:
|
||||
"""Search the knowledge base semantically."""
|
||||
query = arguments.get("query", "")
|
||||
top_k = arguments.get("top_k", 5)
|
||||
@ -153,7 +145,7 @@ class KnowledgeMCPServer:
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text="No results found. Try indexing your knowledge base first."
|
||||
text="No results found. Try indexing your knowledge base first.",
|
||||
)
|
||||
]
|
||||
|
||||
@ -181,45 +173,70 @@ class KnowledgeMCPServer:
|
||||
logger.exception("Search error")
|
||||
return [TextContent(type="text", text=f"Search error: {str(e)}")]
|
||||
|
||||
async def _index_knowledge(
|
||||
self, arguments: dict[str, Any]
|
||||
) -> list[TextContent]:
|
||||
async def _index_knowledge(self, arguments: dict[str, Any]) -> list[TextContent]:
|
||||
"""Index the knowledge base."""
|
||||
force = arguments.get("force", False)
|
||||
|
||||
vault_path = Path(self.vault_path)
|
||||
|
||||
if not vault_path.exists():
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=f"Vault path does not exist: {self.vault_path}"
|
||||
)
|
||||
]
|
||||
return [TextContent(type="text", text=f"Vault path does not exist: {self.vault_path}")]
|
||||
|
||||
try:
|
||||
# Clear existing index if forced
|
||||
if force:
|
||||
logger.info("Force re-indexing...")
|
||||
self.vector_store.clear()
|
||||
chunks = self.chunker.chunk_directory(str(vault_path))
|
||||
new_chunks = chunks
|
||||
else:
|
||||
logger.info("Indexing knowledge base...")
|
||||
logger.info("Indexing knowledge base (incremental)...")
|
||||
|
||||
# Chunk all markdown files
|
||||
chunks = self.chunker.chunk_directory(str(vault_path))
|
||||
# Chunk all markdown files
|
||||
all_chunks = self.chunker.chunk_directory(str(vault_path))
|
||||
|
||||
if not chunks:
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text="No markdown files found in vault."
|
||||
)
|
||||
]
|
||||
if not all_chunks:
|
||||
return [TextContent(type="text", text="No markdown files found in vault.")]
|
||||
|
||||
logger.info(f"Created {len(chunks)} chunks, adding to vector store...")
|
||||
# Get current sources from the vault
|
||||
current_sources = set()
|
||||
for chunk in all_chunks:
|
||||
source = chunk.metadata.get("source")
|
||||
if source:
|
||||
current_sources.add(source)
|
||||
|
||||
# Get indexed sources and detect deleted files
|
||||
indexed_sources = self.vector_store.get_existing_sources()
|
||||
deleted_sources = indexed_sources - current_sources
|
||||
|
||||
# Remove chunks from deleted files
|
||||
if deleted_sources:
|
||||
logger.info(f"Removing chunks from deleted files: {deleted_sources}")
|
||||
for source in deleted_sources:
|
||||
self.vector_store.remove_chunks_by_source(source)
|
||||
|
||||
# Filter to only new/modified chunks
|
||||
new_chunks = self.vector_store.filter_new_chunks(all_chunks)
|
||||
|
||||
if not new_chunks:
|
||||
# Check if we have any existing index
|
||||
stats = self.vector_store.get_stats()
|
||||
if stats["total_chunks"] > 0:
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=f"No new or modified files to index.\n"
|
||||
f"Total chunks in index: {stats['total_chunks']}",
|
||||
)
|
||||
]
|
||||
# No existing index, fall through to index everything
|
||||
new_chunks = all_chunks
|
||||
|
||||
logger.info(f"Processing {len(new_chunks)} new/modified chunks...")
|
||||
|
||||
# Add to vector store (this embeds them)
|
||||
self.vector_store.add_nodes(chunks, embedding_model=self.embedding_model)
|
||||
if new_chunks:
|
||||
self.vector_store.add_nodes(new_chunks, embedding_model=self.embedding_model)
|
||||
|
||||
self._indexed = True
|
||||
|
||||
@ -227,8 +244,8 @@ class KnowledgeMCPServer:
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=f"Successfully indexed {len(chunks)} chunks from the knowledge base.\n"
|
||||
f"Total chunks in index: {stats['total_chunks']}"
|
||||
text=f"Successfully indexed {len(new_chunks)} chunks from the knowledge base.\n"
|
||||
f"Total chunks in index: {stats['total_chunks']}",
|
||||
)
|
||||
]
|
||||
|
||||
@ -247,10 +264,10 @@ class KnowledgeMCPServer:
|
||||
TextContent(
|
||||
type="text",
|
||||
text=f"Knowledge Base Statistics:\n"
|
||||
f"- Vault path: {self.vault_path}\n"
|
||||
f"- Markdown files: {len(md_files)}\n"
|
||||
f"- Indexed chunks: {stats['total_chunks']}\n"
|
||||
f"- Index status: {'Ready' if self._indexed else 'Not indexed'}"
|
||||
f"- Vault path: {self.vault_path}\n"
|
||||
f"- Markdown files: {len(md_files)}\n"
|
||||
f"- Indexed chunks: {stats['total_chunks']}\n"
|
||||
f"- Index status: {'Ready' if self._indexed else 'Not indexed'}",
|
||||
)
|
||||
]
|
||||
|
||||
@ -279,4 +296,5 @@ async def main():
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
@ -45,8 +45,7 @@ class KnowledgeVectorStore:
|
||||
|
||||
# Get or create collection
|
||||
self._collection = self._client.get_or_create_collection(
|
||||
name=collection_name,
|
||||
metadata={"description": "Knowledge base embeddings"}
|
||||
name=collection_name, metadata={"description": "Knowledge base embeddings"}
|
||||
)
|
||||
|
||||
# Wrap in LlamaIndex vector store
|
||||
@ -64,7 +63,9 @@ class KnowledgeVectorStore:
|
||||
"""Get the LlamaIndex ChromaVectorStore."""
|
||||
return self._vector_store
|
||||
|
||||
def add_nodes(self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None) -> None:
|
||||
def add_nodes(
|
||||
self, nodes: List[TextNode], embedding_model: "BaseEmbedding | None" = None
|
||||
) -> None:
|
||||
"""Add nodes to the vector store."""
|
||||
from llama_index.core import VectorStoreIndex, StorageContext
|
||||
|
||||
@ -127,8 +128,11 @@ class KnowledgeVectorStore:
|
||||
"""Clear all embeddings from the store."""
|
||||
self._client.delete_collection(self._collection_name)
|
||||
self._collection = self._client.get_or_create_collection(
|
||||
name=self._collection_name,
|
||||
metadata={"description": "Knowledge base embeddings"}
|
||||
name=self._collection_name, metadata={"description": "Knowledge base embeddings"}
|
||||
)
|
||||
# Recreate the ChromaVectorStore wrapper with the new collection
|
||||
self._vector_store = ChromaVectorStore(
|
||||
chroma_collection=self._collection,
|
||||
)
|
||||
|
||||
def get_stats(self) -> dict[str, Any]:
|
||||
@ -137,3 +141,78 @@ class KnowledgeVectorStore:
|
||||
"total_chunks": self._collection.count(),
|
||||
"collection_name": self._collection_name,
|
||||
}
|
||||
|
||||
def get_indexed_files(self) -> dict[str, float]:
|
||||
"""Get all indexed files and their modification times.
|
||||
|
||||
Returns:
|
||||
Dict mapping source path to file modification time
|
||||
"""
|
||||
indexed_files = {}
|
||||
|
||||
# Get all items from the collection
|
||||
items = self._collection.get()
|
||||
|
||||
if items and items.get("metadatas"):
|
||||
for metadata in items["metadatas"]:
|
||||
source = metadata.get("source")
|
||||
file_mtime = metadata.get("file_mtime")
|
||||
if source and file_mtime is not None:
|
||||
# Store the latest mtime for each source
|
||||
if source not in indexed_files or file_mtime > indexed_files[source]:
|
||||
indexed_files[source] = file_mtime
|
||||
|
||||
return indexed_files
|
||||
|
||||
def get_existing_sources(self) -> set[str]:
|
||||
"""Get set of all indexed source file paths."""
|
||||
return set(self.get_indexed_files().keys())
|
||||
|
||||
def filter_new_chunks(self, nodes: List["TextNode"]) -> List["TextNode"]:
|
||||
"""Filter out chunks that are already indexed.
|
||||
|
||||
Compares source file path and modification time to skip unchanged files.
|
||||
|
||||
Args:
|
||||
nodes: List of TextNode chunks to filter
|
||||
|
||||
Returns:
|
||||
List of chunks that are new or modified
|
||||
"""
|
||||
indexed_files = self.get_indexed_files()
|
||||
|
||||
new_chunks = []
|
||||
for node in nodes:
|
||||
source = node.metadata.get("source")
|
||||
file_mtime = node.metadata.get("file_mtime")
|
||||
|
||||
if source is None:
|
||||
# Include chunks without source (shouldn't happen)
|
||||
new_chunks.append(node)
|
||||
continue
|
||||
|
||||
# Check if this file has been indexed
|
||||
indexed_mtime = indexed_files.get(source)
|
||||
if indexed_mtime is None:
|
||||
# New file, not in index
|
||||
new_chunks.append(node)
|
||||
elif file_mtime > indexed_mtime:
|
||||
# File has been modified since last index
|
||||
# First remove old chunks for this file
|
||||
self._remove_chunks_by_source(source)
|
||||
new_chunks.append(node)
|
||||
# else: file unchanged, skip it
|
||||
|
||||
return new_chunks
|
||||
|
||||
def _remove_chunks_by_source(self, source: str) -> None:
|
||||
"""Remove all chunks from a specific source file."""
|
||||
# Get IDs of chunks to delete
|
||||
items = self._collection.get(where={"source": source})
|
||||
|
||||
if items and items.get("ids"):
|
||||
self._collection.delete(ids=items["ids"])
|
||||
|
||||
def remove_chunks_by_source(self, source: str) -> None:
|
||||
"""Public method to remove all chunks from a specific source file."""
|
||||
self._remove_chunks_by_source(source)
|
||||
|
||||
Reference in New Issue
Block a user