This document describes the Chunked RAG Augmentation feature, which enhances the RAG (Retrieval-Augmented Generation) system to provide better context coverage for multi-topic conversations.
# Current approach (simplified)def_query_rag_for_context(table:Table,...):query_text=table.execute().to_csv(sep="|",index=False)# Entire conversationquery_vec=embed_query_text(query_text,model=embedding_model)results=store.search(query_vec,top_k=5)# Single query
Problem: When a conversation covers multiple distinct topics, the single embedding "averages" across all topics, potentially missing relevant context for specific sub-topics.
Example: A 500-message conversation covering: - Messages 1-150: Travel to Paris - Messages 151-300: Debugging a coding issue - Messages 301-500: Recipe discussions
Current RAG might only retrieve Paris-related posts (dominant theme), missing relevant coding and recipe posts.
Use the same chunking strategy as RAG storage (chunk_markdown() from rag/chunker.py): - Paragraph boundaries: Split on \n\n - Max tokens: 1800 (safe under Gemini's 2048 limit) - Overlap: 150 tokens between consecutive chunks
Rationale: Semantic alignment between query chunks and storage chunks improves retrieval quality.
defquery_rag_per_chunk(chunks:list[str],store:VectorStore,embedding_model:str,top_k:int=5,min_similarity_threshold:float=0.7,retrieval_mode:str="ann",retrieval_nprobe:int|None=None,retrieval_overfetch:int|None=None,)->list[dict]:"""Query RAG for each chunk and collect all results. Args: chunks: List of text chunks from chunk_markdown() store: VectorStore instance embedding_model: Embedding model name (e.g., "google-gla:gemini-embedding-001") top_k: Number of results per chunk query min_similarity_threshold: Minimum cosine similarity (0-1) retrieval_mode: "ann" (approximate) or "exact" (brute-force) retrieval_nprobe: ANN nprobe parameter (IVF index) retrieval_overfetch: Candidate multiplier for ANN Returns: List of result dicts with keys: - content: chunk text - similarity: cosine similarity score - document_id: unique document identifier - post_slug: post slug (if post document) - post_title: post title - chunk_index: chunk index within document - metadata: additional metadata """all_results=[]fori,chunkinenumerate(chunks):logger.debug("Querying RAG for chunk %d/%d",i+1,len(chunks))query_vec=embed_query_text(chunk,model=embedding_model)results=store.search(query_vec=query_vec,top_k=top_k,min_similarity_threshold=min_similarity_threshold,mode=retrieval_mode,nprobe=retrieval_nprobe,overfetch=retrieval_overfetch,)# Convert Ibis table to dict recordsdf=results.execute()chunk_results=df.to_dict('records')all_results.extend(chunk_results)logger.info("Collected %d total results from %d chunks",len(all_results),len(chunks))returnall_results
def_query_rag_for_context(table:Table,client:genai.Client,rag_dir:Path,*,embedding_model:str,retrieval_mode:str="ann",retrieval_nprobe:int|None=None,retrieval_overfetch:int|None=None,return_records:bool=False,)->Result[RagContext,str]|tuple[str,list[dict[str,Any]]]:"""Query RAG using chunked conversation approach. NEW IMPLEMENTATION (Phase 8): 1. Consolidate messages to markdown 2. Chunk markdown (paragraph boundaries, 1800 tokens, 150 overlap) 3. Query RAG for each chunk (top-5 per chunk) 4. Deduplicate: keep top-1 chunk per document 5. Sort by similarity and return top-5 overall This provides better topic coverage for multi-topic conversations compared to the old single-query approach. """try:store=VectorStore(rag_dir/"chunks.parquet")# Step 1: Consolidate to markdownmarkdown=consolidate_messages_to_markdown(table)ifnotmarkdown.strip():logger.info("No messages to consolidate for RAG query")ifreturn_records:return("",[])returnFailure(RagErrorReason.NO_HITS)# Step 2: Chunk markdown (reuse existing chunker)fromegregora.agents.shared.rag.chunkerimportchunk_markdownchunks=chunk_markdown(markdown,max_tokens=1800,overlap_tokens=150)logger.info("Chunked conversation into %d chunks for RAG query",len(chunks))# Step 3: Query RAG for each chunkall_results=query_rag_per_chunk(chunks=chunks,store=store,embedding_model=embedding_model,top_k=5,min_similarity_threshold=0.7,retrieval_mode=retrieval_mode,retrieval_nprobe=retrieval_nprobe,retrieval_overfetch=retrieval_overfetch,)ifnotall_results:logger.info("No similar posts found (0 results from all chunks)")ifreturn_records:return("",[])returnFailure(RagErrorReason.NO_HITS)# Step 4: Deduplicate (keep top-1 per document)deduped=deduplicate_by_document(all_results,n=1)# Step 5: Sort by similarity and take top-5final_results=sorted(deduped,key=lambdax:x.get('similarity',0.0),reverse=True)[:5]logger.info("RAG query complete: %d chunks → %d results → %d deduped → %d final",len(chunks),len(all_results),len(deduped),len(final_results))# Step 6: Format context (reuse existing formatting logic)rag_text="\n\n## Related Previous Posts (for continuity and linking):\n"rag_text+="You can reference these posts in your writing to maintain conversation continuity.\n\n"forrowinfinal_results:rag_text+=f"### [{row['post_title']}] ({row['post_date']})\n"rag_text+=f"{row['content'][:400]}...\n"rag_text+=f"- Tags: {(', '.join(row['tags'])ifrow.get('tags')else'none')}\n"rag_text+=f"- Similarity: {row['similarity']:.2f}\n\n"ifreturn_records:return(rag_text,final_results)returnSuccess(RagContext(text=rag_text,records=final_results))exceptExceptionase:logger.error("RAG query failed: %s",e,exc_info=True)ifreturn_records:return("",[])returnFailure(RagErrorReason.SYSTEM_ERROR)
@dataclassclassRAGSettings:"""RAG configuration."""enabled:bool=Truetop_k:int=5mode:str="ann"# "ann" or "exact"nprobe:int|None=Noneoverfetch:int|None=None# NEW (Phase 8): Chunked RAG settingschunks_per_query:int=5# top-k per chunk querydedup_strategy:str="document"# "document" or "none"dedup_max_per_doc:int=1# chunks to keep per document