@dataclass(frozen=True,slots=True)classUrlContext:"""Context information required when generating canonical URLs."""base_url:str=""# Base URL (e.g., "https://example.com")site_prefix:str=""# Site prefix (e.g., "/blog")base_path:Path|None=None# Filesystem base pathlocale:str|None=None# Locale for i18n (e.g., "en", "pt-BR")
classUrlConvention(Protocol):"""Contract for deterministic URL generation strategies. Pure function pattern: same document → same URL No I/O, no side effects - just URL calculation. """@propertydefname(self)->str:"""Return a short identifier describing the convention."""...@propertydefversion(self)->str:"""Return a semantic version or timestamp string for compatibility checks."""...defcanonical_url(self,document:Document,ctx:UrlContext)->str:"""Calculate the canonical URL for ``document`` within ``ctx``."""...
Key Properties: - Deterministic: Same document always produces same URL - Pure: No I/O operations, no side effects - Versioned:name and version for compatibility tracking - Context-aware: Uses UrlContext for environment-specific configuration
Why This Matters: - SEO: Stable URLs across rebuilds prevent broken links - Testing: Pure functions are easy to test - Flexibility: Swap conventions without changing callers - Compatibility: Version tracking enables gradual migration
@runtime_checkableclassInputAdapter(Protocol):"""Adapter for reading external data sources and converting to IR."""defread(self)->Iterator[Table]:"""Read from source and yield Ibis tables conforming to IR_MESSAGE_SCHEMA. Returns: Iterator of Ibis tables with IR_MESSAGE_SCHEMA columns """...@propertydefmetadata(self)->dict[str,Any]:"""Return metadata about the input source."""...
Available Implementations: - WhatsAppAdapter - Parse WhatsApp chat exports - IperonTJROAdapter - Brazilian judicial records API - SelfInputAdapter - Re-ingest existing posts
Key Responsibilities: - Parse external format - Convert to IR_MESSAGE_SCHEMA - Handle privacy/anonymization at source - Yield data as Ibis tables (not pandas)
classMyAdapter:def__init__(self,source_path:Path):self.source_path=source_pathdefread(self)->Iterator[Table]:# Parse your formatdata=parse_my_format(self.source_path)# Convert to IR_MESSAGE_SCHEMAtable=ibis.memtable(data).select(message_id=...,conversation_id=...,author_id=...,content=...,timestamp=...,# ... all IR_MESSAGE_SCHEMA columns)yieldtable@propertydefmetadata(self)->dict[str,Any]:return{"source_type":"my-format","source_path":str(self.source_path),"version":"1.0"}
@runtime_checkableclassOutputAdapter(Protocol):"""Adapter for persisting documents to external formats."""defpersist(self,document:Document)->None:"""Persist document to output format. Must be idempotent - repeated calls with same document should overwrite. """...defdocuments(self)->Iterator[Document]:"""Iterate over all documents in output format. Returns: Iterator for memory efficiency (not list) """...
Available Implementations: - MkDocsAdapter - Generate MkDocs sites - ParquetAdapter - Export to Parquet format
Key Responsibilities: - Convert Document to target format - Idempotent writes (overwrite on repeat) - Lazy document iteration - Handle filesystem layout
classRAGBackend(Protocol):"""Protocol for RAG vector storage backends."""asyncdefindex_documents(self,documents:Sequence[Document],*,embedding_fn:Callable[[Sequence[str],str],Awaitable[list[list[float]]]])->int:"""Index documents for retrieval. Args: documents: Documents to index embedding_fn: Async function to generate embeddings Signature: (texts, task_type) -> embeddings Returns: Number of chunks indexed """...asyncdefsearch(self,request:RAGQueryRequest)->RAGQueryResponse:"""Search indexed documents. Args: request: Search request with query text, top_k, filters Returns: Response with scored results """...asyncdefdelete_all(self)->None:"""Delete all indexed documents."""...
Available Implementations: - LanceDBRAGBackend - LanceDB vector storage (current)
Key Properties: - Fully async: All methods are async - Embedding injection: Backend doesn't know about embedding models - Chunking: Backend handles chunking internally - Task types: Supports asymmetric embeddings (RETRIEVAL_DOCUMENT vs RETRIEVAL_QUERY)
fromegregora.ragimportLanceDBRAGBackend,RAGQueryRequest,index_documents# Index documentsbackend=LanceDBRAGBackend(db_path=Path(".egregora/lancedb"))count=awaitindex_documents([doc1,doc2,doc3])print(f"Indexed {count} chunks")# Searchrequest=RAGQueryRequest(text="how to use RAG",top_k=5,min_similarity=0.7)response=awaitbackend.search(request)forhitinresponse.hits:print(f"{hit.score:.2f}: {hit.text[:100]}")
classTableStorage(Protocol):"""Protocol for table storage operations."""defwrite_table(self,table:Table,name:str,*,checkpoint:bool=False)->None:"""Write Ibis table to storage."""...defread_table(self,name:str)->Table:"""Read Ibis table from storage."""...deftable_exists(self,name:str)->bool:"""Check if table exists."""...
# Test with mock implementationclassMockOutputAdapter:def__init__(self):self.documents_dict={}defpersist(self,document:Document)->None:self.documents_dict[document.id]=documentdefdocuments(self)->Iterator[Document]:yield fromself.documents_dict.values()# Verify protocol compliancefromegregora.data_primitives.protocolsimportOutputAdapterassertisinstance(MockOutputAdapter(),OutputAdapter)