Skip to content

RAG (Retrieval-Augmented Generation) API

The RAG module provides vector-based document retrieval using LanceDB for fast similarity search.

Overview

Egregora's RAG system enables the writer agent to search through past content for relevant context. It uses:

  • LanceDB for vector storage and similarity search
  • Google Gemini embeddings (models/gemini-embedding-001)
  • Dual-queue embedding router for optimal API quota utilization
  • Async architecture for efficient I/O

Configuration

Configure RAG in .egregora/config.yml:

YAML
rag:
  enabled: true                     # Enable RAG for writer agent
  top_k: 5                          # Number of results to retrieve
  min_similarity_threshold: 0.7     # Minimum similarity score (0.0-1.0)
  indexable_types: ["POST"]         # Document types to index

  # Embedding router settings
  embedding_max_batch_size: 100     # Max texts per batch request
  embedding_timeout: 60.0           # Request timeout (seconds)
  embedding_max_retries: 5          # Max retries on error

paths:
  lancedb_dir: .egregora/lancedb    # Vector database location

Data Models

See RAG Data Models

Usage Examples

Python
import asyncio
from egregora.rag import index_documents, search
from egregora.rag.models import RAGQueryRequest
from egregora.data_primitives.document import Document, DocumentType

# Index documents
async def index_docs():
    doc = Document(
        content="# Python Async\n\nGuide to async programming...",
        type=DocumentType.POST,
        metadata={"title": "Async Programming"}
    )
    await index_documents([doc])

asyncio.run(index_docs())

# Search
async def search_docs():
    request = RAGQueryRequest(
        text="async programming best practices",
        top_k=5
    )
    response = await search(request)

    for hit in response.hits:
        print(f"Score: {hit.score:.2f}")
        print(f"Text: {hit.text[:100]}...")
        print()

asyncio.run(search_docs())

Search with Filters

Python
1
2
3
4
5
6
7
# Search with SQL filtering
request = RAGQueryRequest(
    text="machine learning",
    top_k=10,
    filters="metadata_json LIKE '%python%'"
)
response = await search(request)

Custom Embedding Function

Python
from egregora.rag import index_documents

async def custom_embed(texts: list[str], task_type: str) -> list[list[float]]:
    """Custom embedding function."""
    # Your embedding logic here
    return embeddings

# Index with custom embeddings
await index_documents(
    documents=[doc1, doc2],
    embedding_fn=custom_embed
)

Backend

RAGBackend Protocol

RAGBackend

Bases: Protocol

Interface for RAG backends.

This protocol defines the contract that all RAG backend implementations must satisfy. Backends are responsible for:

  • Filtering documents to text-only content
  • Chunking documents into manageable pieces
  • Computing embeddings for chunks
  • Upserting chunks into the vector store
  • Executing vector similarity search

Implementations:

Text Only
1
- LanceDBRAGBackend: New LanceDB implementation (recommended)
Functions
index_documents
Python
index_documents(docs: Sequence[Document]) -> None

Index a batch of Documents into the RAG knowledge base.

The implementation is responsible for: - Filtering to text documents (skipping bytes content) - Chunking per document according to backend strategy - Computing embeddings for all chunks - Upserting into the vector store

Parameters:

Name Type Description Default
docs Sequence[Document]

Sequence of Document instances to index

required

Raises:

Type Description
ValueError

If documents are invalid or cannot be indexed

RuntimeError

If embedding or storage operations fail

Source code in src/egregora/rag/backend.py
Python
def index_documents(self, docs: Sequence[Document]) -> None:
    """Index a batch of Documents into the RAG knowledge base.

    The implementation is responsible for:
    - Filtering to text documents (skipping bytes content)
    - Chunking per document according to backend strategy
    - Computing embeddings for all chunks
    - Upserting into the vector store

    Args:
        docs: Sequence of Document instances to index

    Raises:
        ValueError: If documents are invalid or cannot be indexed
        RuntimeError: If embedding or storage operations fail

    """
    ...
query
Python
query(request: RAGQueryRequest) -> RAGQueryResponse

Execute vector search in the knowledge base.

Returns ranked hits pointing back to original documents.

Parameters:

Name Type Description Default
request RAGQueryRequest

Query parameters (text, top_k, filters)

required

Returns:

Type Description
RAGQueryResponse

Response containing ranked RAGHit results

Raises:

Type Description
ValueError

If query parameters are invalid

RuntimeError

If search operation fails

Source code in src/egregora/rag/backend.py
Python
def query(self, request: RAGQueryRequest) -> RAGQueryResponse:
    """Execute vector search in the knowledge base.

    Returns ranked hits pointing back to original documents.

    Args:
        request: Query parameters (text, top_k, filters)

    Returns:
        Response containing ranked RAGHit results

    Raises:
        ValueError: If query parameters are invalid
        RuntimeError: If search operation fails

    """
    ...

LanceDB Implementation

LanceDBRAGBackend

Python
LanceDBRAGBackend(db_dir: Path, table_name: str, embed_fn: EmbedFn, indexable_types: set[Any] | None = None)

LanceDB-based RAG backend.

Responsibilities
  • Convert Documents to chunks using ingestion module
  • Compute embeddings with provided embed_fn
  • Upsert into LanceDB table
  • Run vector search and return RAGHit objects
Architecture
  • Uses LanceDB for vector storage and search
  • Stores chunks with embeddings, metadata, and full-text
  • Supports both ANN and exact similarity search
  • Dependency injection for embedding function (no direct Gemini coupling)
Schema
  • chunk_id: string (primary key)
  • document_id: string
  • text: string
  • embedding: vector (LanceDB vector column)
  • metadata: struct/map (json-like)

Initialize LanceDB RAG backend.

Parameters:

Name Type Description Default
db_dir Path

Directory for LanceDB database

required
table_name str

Name of the table to store embeddings

required
embed_fn EmbedFn

Function that takes texts and returns embeddings

required
indexable_types set[Any] | None

Set of DocumentType values to index (optional)

None
Source code in src/egregora/rag/lancedb_backend.py
Python
def __init__(
    self,
    db_dir: Path,
    table_name: str,
    embed_fn: EmbedFn,
    indexable_types: set[Any] | None = None,
) -> None:
    """Initialize LanceDB RAG backend.

    Args:
        db_dir: Directory for LanceDB database
        table_name: Name of the table to store embeddings
        embed_fn: Function that takes texts and returns embeddings
        indexable_types: Set of DocumentType values to index (optional)

    """
    self._db_dir = db_dir
    self._table_name = table_name
    self._embed_fn = embed_fn
    self._indexable_types = indexable_types

    # Initialize LanceDB connection
    db_dir.mkdir(parents=True, exist_ok=True)
    self._db = lancedb.connect(str(db_dir))

    # Create or open table using Pydantic schema
    if table_name not in self._db.table_names():
        logger.info("Creating new LanceDB table: %s", table_name)
        # Use Pydantic schema for type-safe table creation
        self._table = self._db.create_table(
            table_name,
            schema=RagChunkModel,
            mode="overwrite",
        )
    else:
        logger.info("Opening existing LanceDB table: %s", table_name)
        self._table = self._db.open_table(table_name)
index_documents
Python
index_documents(docs: Sequence[Document]) -> None

Index a batch of Documents into the RAG knowledge base.

Implementation
  1. Convert Documents to chunks using ingestion module
  2. Compute embeddings for all chunk texts
  3. Atomic upsert into LanceDB (merge_insert with update/insert)

Parameters:

Name Type Description Default
docs Sequence[Document]

Sequence of Document instances to index

required

Raises:

Type Description
ValueError

If documents are invalid

RuntimeError

If embedding or storage operations fail

Source code in src/egregora/rag/lancedb_backend.py
Python
def index_documents(self, docs: Sequence[Document]) -> None:
    """Index a batch of Documents into the RAG knowledge base.

    Implementation:
        1. Convert Documents to chunks using ingestion module
        2. Compute embeddings for all chunk texts
        3. Atomic upsert into LanceDB (merge_insert with update/insert)

    Args:
        docs: Sequence of Document instances to index

    Raises:
        ValueError: If documents are invalid
        RuntimeError: If embedding or storage operations fail

    """
    # Convert documents to chunks
    chunks = chunks_from_documents(docs, indexable_types=self._indexable_types)

    if not chunks:
        logger.info("No chunks to index (empty or filtered documents)")
        return

    logger.info("Indexing %d chunks from %d documents", len(chunks), len(docs))

    # Extract texts for embedding
    texts = [c.text for c in chunks]

    # Compute embeddings with RETRIEVAL_DOCUMENT task type
    try:
        embeddings = self._embed_fn(texts, "RETRIEVAL_DOCUMENT")
    except Exception as e:
        msg = f"Failed to compute embeddings: {e}"
        raise RuntimeError(msg) from e

    if len(embeddings) != len(chunks):
        msg = f"Embedding count mismatch: got {len(embeddings)}, expected {len(chunks)}"
        raise RuntimeError(msg)

    # Prepare rows using Pydantic models (ensures schema consistency)
    rows: list[RagChunkModel] = []
    for chunk, emb in zip(chunks, embeddings, strict=True):
        rows.append(
            RagChunkModel(
                chunk_id=chunk.chunk_id,
                document_id=chunk.document_id,
                text=chunk.text,
                vector=np.asarray(emb, dtype=np.float32),
                metadata_json=json.dumps(chunk.metadata),
            )
        )

    # Atomic upsert using merge_insert (no race conditions)
    try:
        self._table.merge_insert(
            "chunk_id"
        ).when_matched_update_all().when_not_matched_insert_all().execute(rows)
        logger.info("Successfully indexed %d chunks (atomic upsert)", len(rows))
    except Exception as e:
        msg = f"Failed to upsert chunks to LanceDB: {e}"
        raise RuntimeError(msg) from e
query
Python
query(request: RAGQueryRequest) -> RAGQueryResponse

Execute vector search in the knowledge base.

Implementation
  1. Embed the query text
  2. Run vector search in LanceDB
  3. Convert results to RAGHit objects

Parameters:

Name Type Description Default
request RAGQueryRequest

Query parameters (text, top_k, filters)

required

Returns:

Type Description
RAGQueryResponse

Response containing ranked RAGHit results

Raises:

Type Description
ValueError

If query parameters are invalid

RuntimeError

If search operation fails

Source code in src/egregora/rag/lancedb_backend.py
Python
def query(self, request: RAGQueryRequest) -> RAGQueryResponse:
    """Execute vector search in the knowledge base.

    Implementation:
        1. Embed the query text
        2. Run vector search in LanceDB
        3. Convert results to RAGHit objects

    Args:
        request: Query parameters (text, top_k, filters)

    Returns:
        Response containing ranked RAGHit results

    Raises:
        ValueError: If query parameters are invalid
        RuntimeError: If search operation fails

    """
    top_k = request.top_k

    # Embed query with RETRIEVAL_QUERY task type
    try:
        query_emb = self._embed_fn([request.text], "RETRIEVAL_QUERY")[0]
    except Exception as e:
        msg = f"Failed to embed query: {e}"
        raise RuntimeError(msg) from e

    query_vec = np.asarray(query_emb, dtype=np.float32)

    # Execute search using Arrow (zero-copy, no Pandas)
    try:
        q = self._table.search(query_vec).metric("cosine").limit(top_k)

        # Apply filters if provided
        # LanceDB supports SQL-like WHERE clauses for pre-filtering
        if request.filters:
            q = q.where(request.filters)

        # Execute and get results as Arrow table (zero-copy)
        arrow_table = q.to_arrow()
    except Exception as e:
        msg = f"LanceDB search failed: {e}"
        raise RuntimeError(msg) from e

    # Convert Arrow table to Python dicts (fast native method)
    # This is much faster than iterating over pandas rows
    hits: list[RAGHit] = []
    for row in arrow_table.to_pylist():
        # LanceDB exposes a distance column (usually "_distance")
        distance = float(row.get("_distance", 0.0))

        # Convert distance to similarity score
        # For cosine distance: distance ∈ [0, 2], similarity = 1 - distance ∈ [-1, 1]
        # Normalize to [0, 1] range: score = (1 - distance) / 2 + 0.5
        # Simplified: score = (2 - distance) / 2 = 1 - (distance / 2)
        # However, for typical use cases, distance should be in [0, 2] range
        # and we want higher similarity scores for lower distances
        score = 1.0 - distance

        # Extract and deserialize metadata
        metadata_json = row.get("metadata_json", "{}")
        try:
            meta = json.loads(metadata_json) if metadata_json else {}
        except json.JSONDecodeError:
            logger.warning("Failed to decode metadata JSON, using empty dict")
            meta = {}

        hits.append(
            RAGHit(
                document_id=row["document_id"],
                chunk_id=row["chunk_id"],
                text=row["text"],
                metadata=meta,
                score=score,
            )
        )

    logger.info("Found %d hits for query (top_k=%d)", len(hits), top_k)
    return RAGQueryResponse(hits=hits)
get_all_post_vectors
Python
get_all_post_vectors() -> tuple[list[str], np.ndarray]

Retrieve IDs and Centroid Vectors for all indexed posts.

Returns:

Type Description
tuple[list[str], ndarray]

(doc_ids, vectors_matrix)

Source code in src/egregora/rag/lancedb_backend.py
Python
def get_all_post_vectors(self) -> tuple[list[str], np.ndarray]:
    """Retrieve IDs and Centroid Vectors for all indexed posts.

    Returns:
        (doc_ids, vectors_matrix)

    """
    # Fetch all vectors (Zero-copy Arrow)
    # In a real scenario, filter by "document_type" metadata if possible
    arrow_table = self._table.search().limit(None).to_arrow()

    doc_vectors: dict[str, list[np.ndarray]] = {}

    # Aggregate chunks by document ID
    for batch in arrow_table.to_batches():
        d = batch.to_pydict()
        ids = d["document_id"]
        vecs = d["vector"]
        # Assume we only process POSTs based on upstream logic or metadata checks

        for i, doc_id in enumerate(ids):
            if doc_id not in doc_vectors:
                doc_vectors[doc_id] = []
            doc_vectors[doc_id].append(vecs[i])

    if not doc_vectors:
        return [], np.array([])

    # Compute centroids (Mean of chunk vectors)
    final_ids = []
    final_vecs = []

    for doc_id, vec_list in doc_vectors.items():
        centroid = np.mean(np.stack(vec_list), axis=0)
        final_ids.append(doc_id)
        final_vecs.append(centroid)

    return final_ids, np.array(final_vecs)

Embedding Router

The dual-queue embedding router optimizes API quota utilization:

  • Single endpoint: Low-latency, preferred for queries (1 request/sec)
  • Batch endpoint: High-throughput, used for bulk indexing (1000 embeddings/min)
  • Automatic fallback: Falls back between endpoints on 429 errors
  • Request batching: Reduces API calls by up to 100x

embedding_router

Dual-queue embedding router with independent rate limit tracking.

Routes embedding requests to either single or batch Google Gemini API endpoints based on availability, maximizing throughput by using whichever endpoint is available.

Architecture
  • Two independent queues (single + batch)
  • Two independent rate limiters
  • Smart routing: prefer batch for efficiency, fallback to single when rate-limited
  • Request accumulation during rate limit waits
  • Thread-based concurrency (synchronous I/O)
EndpointType

Bases: Enum

Type of embedding endpoint.

RateLimitState

Bases: Enum

Rate limit state for an endpoint.

RateLimiter dataclass
Python
RateLimiter(endpoint_type: EndpointType, state: RateLimitState = RateLimitState.AVAILABLE, available_at: float = 0.0, consecutive_errors: int = 0, max_consecutive_errors: int = 5, _lock: Lock = threading.Lock())

Tracks rate limit state for a single endpoint.

is_available
Python
is_available() -> bool

Check if endpoint is available for requests.

Source code in src/egregora/rag/embedding_router.py
Python
def is_available(self) -> bool:
    """Check if endpoint is available for requests."""
    with self._lock:
        if self.state == RateLimitState.AVAILABLE:
            return True
        if time.time() >= self.available_at:
            # Window expired, reset to available
            self.state = RateLimitState.AVAILABLE
            self.consecutive_errors = 0
            return True
        return False
mark_rate_limited
Python
mark_rate_limited(retry_after: float = 60.0) -> None

Mark endpoint as rate limited.

Source code in src/egregora/rag/embedding_router.py
Python
def mark_rate_limited(self, retry_after: float = 60.0) -> None:
    """Mark endpoint as rate limited."""
    with self._lock:
        self.state = RateLimitState.RATE_LIMITED
        self.available_at = time.time() + retry_after
        logger.warning(
            "%s endpoint rate limited. Available again at %s (in %.1fs)",
            self.endpoint_type.value,
            time.strftime("%H:%M:%S", time.localtime(self.available_at)),
            retry_after,
        )
mark_error
Python
mark_error(backoff_seconds: float = 2.0) -> None

Mark endpoint as having an error.

Source code in src/egregora/rag/embedding_router.py
Python
def mark_error(self, backoff_seconds: float = 2.0) -> None:
    """Mark endpoint as having an error."""
    with self._lock:
        self.consecutive_errors += 1
        if self.consecutive_errors >= self.max_consecutive_errors:
            msg = f"{self.endpoint_type.value} endpoint failed {self.consecutive_errors} times"
            raise RuntimeError(msg)
        self.state = RateLimitState.ERROR
        self.available_at = time.time() + backoff_seconds
        logger.warning(
            "%s endpoint error #%d. Backing off for %.1fs",
            self.endpoint_type.value,
            self.consecutive_errors,
            backoff_seconds,
        )
mark_success
Python
mark_success() -> None

Mark successful request.

Source code in src/egregora/rag/embedding_router.py
Python
def mark_success(self) -> None:
    """Mark successful request."""
    with self._lock:
        self.state = RateLimitState.AVAILABLE
        self.consecutive_errors = 0
        self.available_at = 0.0
EmbeddingRequest dataclass
Python
EmbeddingRequest(texts: list[str], task_type: str, future: Future[list[list[float]]], submitted_at: float = time.time())

A pending embedding request.

EndpointQueue dataclass
Python
EndpointQueue(endpoint_type: EndpointType, rate_limiter: RateLimiter, model: str, queue: Queue[EmbeddingRequest] = queue.Queue(), worker_thread: Thread | None = None, stop_event: Event = threading.Event(), max_batch_size: int = 100, api_key: str = get_google_api_key(), timeout: float = 60.0)

Queue and worker for a single endpoint type.

start
Python
start() -> None

Start background worker.

Source code in src/egregora/rag/embedding_router.py
Python
def start(self) -> None:
    """Start background worker."""
    if self.worker_thread is None or not self.worker_thread.is_alive():
        self.stop_event.clear()
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
        logger.info("Started %s endpoint worker", self.endpoint_type.value)
stop
Python
stop() -> None

Stop background worker.

Source code in src/egregora/rag/embedding_router.py
Python
def stop(self) -> None:
    """Stop background worker."""
    if self.worker_thread and self.worker_thread.is_alive():
        self.stop_event.set()
        # Wake up worker if blocked on queue.get
        # We can't easily interrupt queue.get, but we can put a sentinel or rely on daemon thread
        # For clean shutdown, we can put a dummy request or just let it die if daemon=True
        # But let's try to join
        self.worker_thread.join(timeout=1.0)
        logger.info("Stopped %s endpoint worker", self.endpoint_type.value)
submit
Python
submit(texts: list[str], task_type: str) -> list[list[float]]

Submit request and wait for result.

Source code in src/egregora/rag/embedding_router.py
Python
def submit(self, texts: list[str], task_type: str) -> list[list[float]]:
    """Submit request and wait for result."""
    future: Future[list[list[float]]] = Future()
    request = EmbeddingRequest(texts, task_type, future)
    self.queue.put(request)
    return future.result()
is_available
Python
is_available() -> bool

Check if endpoint is available.

Source code in src/egregora/rag/embedding_router.py
Python
def is_available(self) -> bool:
    """Check if endpoint is available."""
    return self.rate_limiter.is_available()
EmbeddingRouter
Python
EmbeddingRouter(*, model: str, api_key: str | None = None, max_batch_size: int = 100, timeout: float = 60.0)

Routes embedding requests to optimal endpoint based on availability.

Initialize router with dual queues.

Parameters:

Name Type Description Default
model str

Google embedding model (e.g., "models/gemini-embedding-001")

required
api_key str | None

Google API key (defaults to GOOGLE_API_KEY env var)

None
max_batch_size int

Maximum texts per batch request

100
timeout float

HTTP timeout in seconds

60.0
Source code in src/egregora/rag/embedding_router.py
Python
def __init__(
    self,
    *,
    model: str,
    api_key: str | None = None,
    max_batch_size: int = 100,
    timeout: float = 60.0,
) -> None:
    """Initialize router with dual queues.

    Args:
        model: Google embedding model (e.g., "models/gemini-embedding-001")
        api_key: Google API key (defaults to GOOGLE_API_KEY env var)
        max_batch_size: Maximum texts per batch request
        timeout: HTTP timeout in seconds

    """
    effective_api_key = api_key or get_google_api_key()

    # Create dual rate limiters
    self.batch_limiter = RateLimiter(EndpointType.BATCH)
    self.single_limiter = RateLimiter(EndpointType.SINGLE)

    # Create dual queues
    self.batch_queue = EndpointQueue(
        endpoint_type=EndpointType.BATCH,
        rate_limiter=self.batch_limiter,
        model=model,
        max_batch_size=max_batch_size,
        api_key=effective_api_key,
        timeout=timeout,
    )
    self.single_queue = EndpointQueue(
        endpoint_type=EndpointType.SINGLE,
        rate_limiter=self.single_limiter,
        model=model,
        max_batch_size=1,
        api_key=effective_api_key,
        timeout=timeout,
    )
start
Python
start() -> None

Start background workers.

Source code in src/egregora/rag/embedding_router.py
Python
def start(self) -> None:
    """Start background workers."""
    self.batch_queue.start()
    self.single_queue.start()
    logger.info("Embedding router started with dual-queue architecture")
stop
Python
stop() -> None

Stop background workers.

Source code in src/egregora/rag/embedding_router.py
Python
def stop(self) -> None:
    """Stop background workers."""
    self.batch_queue.stop()
    self.single_queue.stop()
    logger.info("Embedding router stopped")
embed
Python
embed(texts: Annotated[Sequence[str], 'Texts to embed'], task_type: Annotated[str, 'Task type (RETRIEVAL_DOCUMENT or RETRIEVAL_QUERY)']) -> Annotated[list[list[float]], 'Embedding vectors']

Route embedding request to optimal endpoint.

Priority: single endpoint (low latency) > batch endpoint (fallback) Always prefer single for lower latency, use batch only when single is exhausted.

Parameters:

Name Type Description Default
texts Annotated[Sequence[str], 'Texts to embed']

List of texts to embed

required
task_type Annotated[str, 'Task type (RETRIEVAL_DOCUMENT or RETRIEVAL_QUERY)']

Task type for embeddings

required

Returns:

Type Description
Annotated[list[list[float]], 'Embedding vectors']

List of embedding vectors

Source code in src/egregora/rag/embedding_router.py
Python
def embed(
    self,
    texts: Annotated[Sequence[str], "Texts to embed"],
    task_type: Annotated[str, "Task type (RETRIEVAL_DOCUMENT or RETRIEVAL_QUERY)"],
) -> Annotated[list[list[float]], "Embedding vectors"]:
    """Route embedding request to optimal endpoint.

    Priority: single endpoint (low latency) > batch endpoint (fallback)
    Always prefer single for lower latency, use batch only when single is exhausted.

    Args:
        texts: List of texts to embed
        task_type: Task type for embeddings

    Returns:
        List of embedding vectors

    """
    texts_list = list(texts)
    if not texts_list:
        return []

    # Priority routing: single (low latency) first, then batch (fallback)
    if self.single_queue.is_available():
        # Single endpoint available - use it for low latency
        logger.debug("Routing %d text(s) to single endpoint (low latency)", len(texts_list))
        try:
            return self.single_queue.submit(texts_list, task_type)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == HTTP_TOO_MANY_REQUESTS and self.batch_queue.is_available():
                # Got 429 on single, fallback to batch
                logger.info("Single endpoint hit rate limit, falling back to batch endpoint")
                return self.batch_queue.submit(texts_list, task_type)
            raise
    if self.batch_queue.is_available():
        # Single exhausted, fallback to batch
        logger.debug("Single endpoint exhausted, routing %d texts to batch endpoint", len(texts_list))
        try:
            return self.batch_queue.submit(texts_list, task_type)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == HTTP_TOO_MANY_REQUESTS and self.single_queue.is_available():
                # Got 429 on batch, fallback to single
                logger.info("Batch endpoint hit rate limit, falling back to single endpoint")
                return self.single_queue.submit(texts_list, task_type)
            raise
    # Both exhausted - wait for single (lower latency)
    logger.debug("Both endpoints rate-limited, waiting for single endpoint")
    return self.single_queue.submit(texts_list, task_type)
create_embedding_router
Python
create_embedding_router(*, model: str, api_key: str | None = None, max_batch_size: int = 100, timeout: float = 60.0) -> EmbeddingRouter

Create and start a dedicated embedding router instance.

Source code in src/egregora/rag/embedding_router.py
Python
def create_embedding_router(
    *,
    model: str,
    api_key: str | None = None,
    max_batch_size: int = 100,
    timeout: float = 60.0,
) -> EmbeddingRouter:
    """Create and start a dedicated embedding router instance."""
    router = EmbeddingRouter(
        model=model,
        api_key=api_key,
        max_batch_size=max_batch_size,
        timeout=timeout,
    )
    router.start()
    return router
get_router
Python
get_router(*, model: str, api_key: str | None = None, max_batch_size: int = 100, timeout: float = 60.0) -> EmbeddingRouter

Get or create global embedding router singleton.

Prefer :func:create_embedding_router when the caller owns lifecycle management. This helper is kept for backwards compatibility.

Source code in src/egregora/rag/embedding_router.py
Python
def get_router(
    *,
    model: str,
    api_key: str | None = None,
    max_batch_size: int = 100,
    timeout: float = 60.0,
) -> EmbeddingRouter:
    """Get or create global embedding router singleton.

    Prefer :func:`create_embedding_router` when the caller owns lifecycle
    management. This helper is kept for backwards compatibility.
    """
    global _router
    with _router_lock:
        if _router is None:
            _router = create_embedding_router(
                model=model,
                api_key=api_key,
                max_batch_size=max_batch_size,
                timeout=timeout,
            )
    return _router
shutdown_router
Python
shutdown_router() -> None

Shutdown global router (for cleanup).

Source code in src/egregora/rag/embedding_router.py
Python
def shutdown_router() -> None:
    """Shutdown global router (for cleanup)."""
    global _router
    with _router_lock:
        if _router is not None:
            _router.stop()
            _router = None

Performance Tips

  1. Batch indexing: Index documents in batches for better throughput
  2. Top-K selection: Use 5-10 for best performance/relevance tradeoff
  3. Filters: Apply SQL filters to narrow search space
  4. Similarity threshold: Set to 0.7+ to filter low-relevance results
  5. Async usage: Always use await - don't block the event loop

Architecture

Text Only
┌─────────────────┐
│  Writer Agent   │
└────────┬────────┘
         │ RAGQueryRequest
┌─────────────────┐
│  RAG Backend    │  ← Protocol-based (swappable)
│   (LanceDB)     │
└────────┬────────┘
    ┌────┴─────┬──────────┐
    ▼          ▼          ▼
┌────────┐ ┌────────┐ ┌──────────┐
│ Vector │ │ Embed  │ │ Chunking │
│ Search │ │ Router │ │ Logic    │
└────────┘ └────────┘ └──────────┘

See Also