rag:enabled:true# Enable RAG for writer agenttop_k:5# Number of results to retrievemin_similarity_threshold:0.7# Minimum similarity score (0.0-1.0)indexable_types:["POST"]# Document types to index# Embedding router settingsembedding_max_batch_size:100# Max texts per batch requestembedding_timeout:60.0# Request timeout (seconds)embedding_max_retries:5# Max retries on errorpaths:lancedb_dir:.egregora/lancedb# Vector database location
importasynciofromegregora.ragimportindex_documents,searchfromegregora.rag.modelsimportRAGQueryRequestfromegregora.data_primitives.documentimportDocument,DocumentType# Index documentsasyncdefindex_docs():doc=Document(content="# Python Async\n\nGuide to async programming...",type=DocumentType.POST,metadata={"title":"Async Programming"})awaitindex_documents([doc])asyncio.run(index_docs())# Searchasyncdefsearch_docs():request=RAGQueryRequest(text="async programming best practices",top_k=5)response=awaitsearch(request)forhitinresponse.hits:print(f"Score: {hit.score:.2f}")print(f"Text: {hit.text[:100]}...")print()asyncio.run(search_docs())
# Search with SQL filteringrequest=RAGQueryRequest(text="machine learning",top_k=10,filters="metadata_json LIKE '%python%'")response=awaitsearch(request)
fromegregora.ragimportindex_documentsasyncdefcustom_embed(texts:list[str],task_type:str)->list[list[float]]:"""Custom embedding function."""# Your embedding logic herereturnembeddings# Index with custom embeddingsawaitindex_documents(documents=[doc1,doc2],embedding_fn=custom_embed)
Index a batch of Documents into the RAG knowledge base.
The implementation is responsible for: - Filtering to text documents (skipping bytes content) - Chunking per document according to backend strategy - Computing embeddings for all chunks - Upserting into the vector store
defindex_documents(self,docs:Sequence[Document])->None:"""Index a batch of Documents into the RAG knowledge base. The implementation is responsible for: - Filtering to text documents (skipping bytes content) - Chunking per document according to backend strategy - Computing embeddings for all chunks - Upserting into the vector store Args: docs: Sequence of Document instances to index Raises: ValueError: If documents are invalid or cannot be indexed RuntimeError: If embedding or storage operations fail """...
def__init__(self,db_dir:Path,table_name:str,embed_fn:EmbedFn,indexable_types:set[Any]|None=None,)->None:"""Initialize LanceDB RAG backend. Args: db_dir: Directory for LanceDB database table_name: Name of the table to store embeddings embed_fn: Function that takes texts and returns embeddings indexable_types: Set of DocumentType values to index (optional) """self._db_dir=db_dirself._table_name=table_nameself._embed_fn=embed_fnself._indexable_types=indexable_types# Initialize LanceDB connectiondb_dir.mkdir(parents=True,exist_ok=True)self._db=lancedb.connect(str(db_dir))# Create or open table using Pydantic schemaiftable_namenotinself._db.table_names():logger.info("Creating new LanceDB table: %s",table_name)# Use Pydantic schema for type-safe table creationself._table=self._db.create_table(table_name,schema=RagChunkModel,mode="overwrite",)else:logger.info("Opening existing LanceDB table: %s",table_name)self._table=self._db.open_table(table_name)
defindex_documents(self,docs:Sequence[Document])->None:"""Index a batch of Documents into the RAG knowledge base. Implementation: 1. Convert Documents to chunks using ingestion module 2. Compute embeddings for all chunk texts 3. Atomic upsert into LanceDB (merge_insert with update/insert) Args: docs: Sequence of Document instances to index Raises: ValueError: If documents are invalid RuntimeError: If embedding or storage operations fail """# Convert documents to chunkschunks=chunks_from_documents(docs,indexable_types=self._indexable_types)ifnotchunks:logger.info("No chunks to index (empty or filtered documents)")returnlogger.info("Indexing %d chunks from %d documents",len(chunks),len(docs))# Extract texts for embeddingtexts=[c.textforcinchunks]# Compute embeddings with RETRIEVAL_DOCUMENT task typetry:embeddings=self._embed_fn(texts,"RETRIEVAL_DOCUMENT")exceptExceptionase:msg=f"Failed to compute embeddings: {e}"raiseRuntimeError(msg)fromeiflen(embeddings)!=len(chunks):msg=f"Embedding count mismatch: got {len(embeddings)}, expected {len(chunks)}"raiseRuntimeError(msg)# Prepare rows using Pydantic models (ensures schema consistency)rows:list[RagChunkModel]=[]forchunk,embinzip(chunks,embeddings,strict=True):rows.append(RagChunkModel(chunk_id=chunk.chunk_id,document_id=chunk.document_id,text=chunk.text,vector=np.asarray(emb,dtype=np.float32),metadata_json=json.dumps(chunk.metadata),))# Atomic upsert using merge_insert (no race conditions)try:self._table.merge_insert("chunk_id").when_matched_update_all().when_not_matched_insert_all().execute(rows)logger.info("Successfully indexed %d chunks (atomic upsert)",len(rows))exceptExceptionase:msg=f"Failed to upsert chunks to LanceDB: {e}"raiseRuntimeError(msg)frome
defquery(self,request:RAGQueryRequest)->RAGQueryResponse:"""Execute vector search in the knowledge base. Implementation: 1. Embed the query text 2. Run vector search in LanceDB 3. Convert results to RAGHit objects Args: request: Query parameters (text, top_k, filters) Returns: Response containing ranked RAGHit results Raises: ValueError: If query parameters are invalid RuntimeError: If search operation fails """top_k=request.top_k# Embed query with RETRIEVAL_QUERY task typetry:query_emb=self._embed_fn([request.text],"RETRIEVAL_QUERY")[0]exceptExceptionase:msg=f"Failed to embed query: {e}"raiseRuntimeError(msg)fromequery_vec=np.asarray(query_emb,dtype=np.float32)# Execute search using Arrow (zero-copy, no Pandas)try:q=self._table.search(query_vec).metric("cosine").limit(top_k)# Apply filters if provided# LanceDB supports SQL-like WHERE clauses for pre-filteringifrequest.filters:q=q.where(request.filters)# Execute and get results as Arrow table (zero-copy)arrow_table=q.to_arrow()exceptExceptionase:msg=f"LanceDB search failed: {e}"raiseRuntimeError(msg)frome# Convert Arrow table to Python dicts (fast native method)# This is much faster than iterating over pandas rowshits:list[RAGHit]=[]forrowinarrow_table.to_pylist():# LanceDB exposes a distance column (usually "_distance")distance=float(row.get("_distance",0.0))# Convert distance to similarity score# For cosine distance: distance ∈ [0, 2], similarity = 1 - distance ∈ [-1, 1]# Normalize to [0, 1] range: score = (1 - distance) / 2 + 0.5# Simplified: score = (2 - distance) / 2 = 1 - (distance / 2)# However, for typical use cases, distance should be in [0, 2] range# and we want higher similarity scores for lower distancesscore=1.0-distance# Extract and deserialize metadatametadata_json=row.get("metadata_json","{}")try:meta=json.loads(metadata_json)ifmetadata_jsonelse{}exceptjson.JSONDecodeError:logger.warning("Failed to decode metadata JSON, using empty dict")meta={}hits.append(RAGHit(document_id=row["document_id"],chunk_id=row["chunk_id"],text=row["text"],metadata=meta,score=score,))logger.info("Found %d hits for query (top_k=%d)",len(hits),top_k)returnRAGQueryResponse(hits=hits)
defget_all_post_vectors(self)->tuple[list[str],np.ndarray]:"""Retrieve IDs and Centroid Vectors for all indexed posts. Returns: (doc_ids, vectors_matrix) """# Fetch all vectors (Zero-copy Arrow)# In a real scenario, filter by "document_type" metadata if possiblearrow_table=self._table.search().limit(None).to_arrow()doc_vectors:dict[str,list[np.ndarray]]={}# Aggregate chunks by document IDforbatchinarrow_table.to_batches():d=batch.to_pydict()ids=d["document_id"]vecs=d["vector"]# Assume we only process POSTs based on upstream logic or metadata checksfori,doc_idinenumerate(ids):ifdoc_idnotindoc_vectors:doc_vectors[doc_id]=[]doc_vectors[doc_id].append(vecs[i])ifnotdoc_vectors:return[],np.array([])# Compute centroids (Mean of chunk vectors)final_ids=[]final_vecs=[]fordoc_id,vec_listindoc_vectors.items():centroid=np.mean(np.stack(vec_list),axis=0)final_ids.append(doc_id)final_vecs.append(centroid)returnfinal_ids,np.array(final_vecs)
Dual-queue embedding router with independent rate limit tracking.
Routes embedding requests to either single or batch Google Gemini API endpoints based on availability, maximizing throughput by using whichever endpoint is available.
Architecture
Two independent queues (single + batch)
Two independent rate limiters
Smart routing: prefer batch for efficiency, fallback to single when rate-limited
defis_available(self)->bool:"""Check if endpoint is available for requests."""withself._lock:ifself.state==RateLimitState.AVAILABLE:returnTrueiftime.time()>=self.available_at:# Window expired, reset to availableself.state=RateLimitState.AVAILABLEself.consecutive_errors=0returnTruereturnFalse
defmark_rate_limited(self,retry_after:float=60.0)->None:"""Mark endpoint as rate limited."""withself._lock:self.state=RateLimitState.RATE_LIMITEDself.available_at=time.time()+retry_afterlogger.warning("%s endpoint rate limited. Available again at %s (in %.1fs)",self.endpoint_type.value,time.strftime("%H:%M:%S",time.localtime(self.available_at)),retry_after,)
defmark_error(self,backoff_seconds:float=2.0)->None:"""Mark endpoint as having an error."""withself._lock:self.consecutive_errors+=1ifself.consecutive_errors>=self.max_consecutive_errors:msg=f"{self.endpoint_type.value} endpoint failed {self.consecutive_errors} times"raiseRuntimeError(msg)self.state=RateLimitState.ERRORself.available_at=time.time()+backoff_secondslogger.warning("%s endpoint error #%d. Backing off for %.1fs",self.endpoint_type.value,self.consecutive_errors,backoff_seconds,)
defstop(self)->None:"""Stop background worker."""ifself.worker_threadandself.worker_thread.is_alive():self.stop_event.set()# Wake up worker if blocked on queue.get# We can't easily interrupt queue.get, but we can put a sentinel or rely on daemon thread# For clean shutdown, we can put a dummy request or just let it die if daemon=True# But let's try to joinself.worker_thread.join(timeout=1.0)logger.info("Stopped %s endpoint worker",self.endpoint_type.value)
defsubmit(self,texts:list[str],task_type:str)->list[list[float]]:"""Submit request and wait for result."""future:Future[list[list[float]]]=Future()request=EmbeddingRequest(texts,task_type,future)self.queue.put(request)returnfuture.result()
def__init__(self,*,model:str,api_key:str|None=None,max_batch_size:int=100,timeout:float=60.0,)->None:"""Initialize router with dual queues. Args: model: Google embedding model (e.g., "models/gemini-embedding-001") api_key: Google API key (defaults to GOOGLE_API_KEY env var) max_batch_size: Maximum texts per batch request timeout: HTTP timeout in seconds """effective_api_key=api_keyorget_google_api_key()# Create dual rate limitersself.batch_limiter=RateLimiter(EndpointType.BATCH)self.single_limiter=RateLimiter(EndpointType.SINGLE)# Create dual queuesself.batch_queue=EndpointQueue(endpoint_type=EndpointType.BATCH,rate_limiter=self.batch_limiter,model=model,max_batch_size=max_batch_size,api_key=effective_api_key,timeout=timeout,)self.single_queue=EndpointQueue(endpoint_type=EndpointType.SINGLE,rate_limiter=self.single_limiter,model=model,max_batch_size=1,api_key=effective_api_key,timeout=timeout,)
defstart(self)->None:"""Start background workers."""self.batch_queue.start()self.single_queue.start()logger.info("Embedding router started with dual-queue architecture")
embed(texts:Annotated[Sequence[str],'Texts to embed'],task_type:Annotated[str,'Task type (RETRIEVAL_DOCUMENT or RETRIEVAL_QUERY)'])->Annotated[list[list[float]],'Embedding vectors']
Route embedding request to optimal endpoint.
Priority: single endpoint (low latency) > batch endpoint (fallback) Always prefer single for lower latency, use batch only when single is exhausted.
Parameters:
Name
Type
Description
Default
texts
Annotated[Sequence[str], 'Texts to embed']
List of texts to embed
required
task_type
Annotated[str, 'Task type (RETRIEVAL_DOCUMENT or RETRIEVAL_QUERY)']
Task type for embeddings
required
Returns:
Type
Description
Annotated[list[list[float]], 'Embedding vectors']
List of embedding vectors
Source code in src/egregora/rag/embedding_router.py
defembed(self,texts:Annotated[Sequence[str],"Texts to embed"],task_type:Annotated[str,"Task type (RETRIEVAL_DOCUMENT or RETRIEVAL_QUERY)"],)->Annotated[list[list[float]],"Embedding vectors"]:"""Route embedding request to optimal endpoint. Priority: single endpoint (low latency) > batch endpoint (fallback) Always prefer single for lower latency, use batch only when single is exhausted. Args: texts: List of texts to embed task_type: Task type for embeddings Returns: List of embedding vectors """texts_list=list(texts)ifnottexts_list:return[]# Priority routing: single (low latency) first, then batch (fallback)ifself.single_queue.is_available():# Single endpoint available - use it for low latencylogger.debug("Routing %d text(s) to single endpoint (low latency)",len(texts_list))try:returnself.single_queue.submit(texts_list,task_type)excepthttpx.HTTPStatusErrorase:ife.response.status_code==HTTP_TOO_MANY_REQUESTSandself.batch_queue.is_available():# Got 429 on single, fallback to batchlogger.info("Single endpoint hit rate limit, falling back to batch endpoint")returnself.batch_queue.submit(texts_list,task_type)raiseifself.batch_queue.is_available():# Single exhausted, fallback to batchlogger.debug("Single endpoint exhausted, routing %d texts to batch endpoint",len(texts_list))try:returnself.batch_queue.submit(texts_list,task_type)excepthttpx.HTTPStatusErrorase:ife.response.status_code==HTTP_TOO_MANY_REQUESTSandself.single_queue.is_available():# Got 429 on batch, fallback to singlelogger.info("Batch endpoint hit rate limit, falling back to single endpoint")returnself.single_queue.submit(texts_list,task_type)raise# Both exhausted - wait for single (lower latency)logger.debug("Both endpoints rate-limited, waiting for single endpoint")returnself.single_queue.submit(texts_list,task_type)
defcreate_embedding_router(*,model:str,api_key:str|None=None,max_batch_size:int=100,timeout:float=60.0,)->EmbeddingRouter:"""Create and start a dedicated embedding router instance."""router=EmbeddingRouter(model=model,api_key=api_key,max_batch_size=max_batch_size,timeout=timeout,)router.start()returnrouter
defget_router(*,model:str,api_key:str|None=None,max_batch_size:int=100,timeout:float=60.0,)->EmbeddingRouter:"""Get or create global embedding router singleton. Prefer :func:`create_embedding_router` when the caller owns lifecycle management. This helper is kept for backwards compatibility. """global_routerwith_router_lock:if_routerisNone:_router=create_embedding_router(model=model,api_key=api_key,max_batch_size=max_batch_size,timeout=timeout,)return_router