"""Markdown-aware document chunking for Obsidian vault.""" import os import re from pathlib import Path from typing import List, Optional from llama_index.core.schema import TextNode class MarkdownChunker: """Intelligent markdown chunker for Obsidian vaults. Chunks markdown files while preserving: - Document/folder structure context - Code blocks as atomic units - Heading hierarchy - Wiki links as metadata """ # Default chunk settings DEFAULT_CHUNK_SIZE = 512 DEFAULT_CHUNK_OVERLAP = 50 def __init__( self, chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def chunk_file(self, file_path: str, content: str) -> List[TextNode]: """Chunk a single markdown file. Args: file_path: Path to the markdown file content: Raw markdown content Returns: List of TextNode chunks with metadata """ # Extract frontmatter if present frontmatter, body = self._extract_frontmatter(content) # Extract wiki links for metadata wiki_links = self._extract_wiki_links(body) # Get relative path for context rel_path = os.path.relpath(file_path) # Get file modification time for change detection file_mtime = os.path.getmtime(file_path) # Split into sections based on headings sections = self._split_by_headings(body) chunks = [] for i, section in enumerate(sections): if not section["content"].strip(): continue # Create chunk with metadata # Note: wiki_links must be a string for ChromaDB compatibility node = TextNode( text=section["content"], metadata={ "source": rel_path, "file_name": os.path.basename(file_path), "file_mtime": file_mtime, "heading": section.get("heading", ""), "section_index": i, "wiki_links": ",".join(wiki_links) if wiki_links else "", "has_frontmatter": frontmatter is not None, }, excluded_embed_metadata_keys=["wiki_links"], excluded_search_metadata_keys=["wiki_links"], ) chunks.append(node) return chunks def chunk_directory(self, dir_path: str) -> List[TextNode]: """Chunk all markdown files in a directory recursively. Args: dir_path: Root directory containing markdown files Returns: List of all TextNode chunks """ all_chunks = [] dir_path = Path(dir_path) if not dir_path.exists(): raise FileNotFoundError(f"Directory not found: {dir_path}") # Find all .md files md_files = list(dir_path.rglob("*.md")) for md_file in md_files: try: content = md_file.read_text(encoding="utf-8") chunks = self.chunk_file(str(md_file), content) all_chunks.extend(chunks) except Exception as e: print(f"Error chunking {md_file}: {e}") continue return all_chunks def _extract_frontmatter(self, content: str) -> tuple[Optional[dict], str]: """Extract YAML frontmatter from markdown.""" if not content.startswith("---"): return None, content # Find closing --- lines = content.split("\n") if len(lines) < 3: return None, content frontmatter_lines = [] body_start = 2 for i in range(1, len(lines)): if lines[i].strip() == "---": body_start = i + 1 break frontmatter_lines.append(lines[i]) # Parse simple key-value frontmatter frontmatter = {} for line in frontmatter_lines: if ":" in line: key, value = line.split(":", 1) frontmatter[key.strip()] = value.strip() body = "\n".join(lines[body_start:]) return frontmatter, body def _extract_wiki_links(self, content: str) -> List[str]: """Extract [[wiki links]] from markdown content.""" wiki_link_pattern = r"\[\[([^\]|]+)(?:\|[^\]]+)?\]]" return re.findall(wiki_link_pattern, content) def _split_by_headings(self, content: str) -> List[dict]: """Split content by markdown headings while preserving context.""" # Split by heading lines (# ## ### etc) heading_pattern = r"^(#{1,6})\s+(.+)$" sections = [] current_section = { "heading": "", "content": "", } lines = content.split("\n") for line in lines: match = re.match(heading_pattern, line) if match: # Save current section if non-empty if current_section["content"].strip(): sections.append(current_section) # Start new section level = len(match.group(1)) heading_text = match.group(2).strip() current_section = { "heading": heading_text, "content": line + "\n", } else: current_section["content"] += line + "\n" # Don't forget the last section if current_section["content"].strip(): sections.append(current_section) # If no headings found, treat entire content as one section if not sections: sections = [{"heading": "", "content": content}] return sections