Skip to content

Enricher Agent

enricher

Enrichment agent logic for processing URLs and media.

This module implements the enrichment workflow using Pydantic-AI agents. It provides: - EnrichmentWorker (Orchestrates URL and Media enrichment) - Async orchestration via enrich_table

EnrichmentOutput

Bases: BaseModel

Structured output for enrichment agents.

EnrichmentRuntimeContext dataclass

EnrichmentRuntimeContext(
    cache: EnrichmentCache,
    output_sink: Any,
    site_root: Path | None = None,
    duckdb_connection: Backend | None = None,
    target_table: str | None = None,
    usage_tracker: UsageTracker | None = None,
    pii_prevention: dict[str, Any] | None = None,
    task_store: Any | None = None,
)

Runtime context for enrichment execution.

MediaEnrichmentConfig dataclass

MediaEnrichmentConfig(
    media_mapping: MediaMapping, max_enrichments: int, enable_media: bool
)

Config for media enrichment enqueueing.

EnrichmentWorker

EnrichmentWorker(
    ctx: PipelineContext, enrichment_config: EnrichmentSettings | None = None
)

Bases: BaseWorker

Worker for media enrichment (e.g. image description).

Source code in src/egregora/agents/enricher.py
def __init__(
    self,
    ctx: PipelineContext,
    enrichment_config: EnrichmentSettings | None = None,
) -> None:
    super().__init__(ctx)
    self.ctx: PipelineContext = ctx
    self._enrichment_config_override = enrichment_config
    self.zip_handle: zipfile.ZipFile | None = None
    self.media_index: dict[str, str] = {}
    # Main Architecture: Ephemeral media staging
    self.staging_dir = tempfile.TemporaryDirectory(prefix="egregora_staging_")
    self.staged_files: set[str] = set()

    # Initialize ModelKeyRotator if enabled (reusing state across batches)
    rotation_enabled = getattr(self.enrichment_config, "model_rotation_enabled", True)
    self.rotator: ModelKeyRotator | None = None
    if rotation_enabled:
        from egregora.llm.providers.model_key_rotator import ModelKeyRotator

        rotation_models = getattr(self.enrichment_config, "rotation_models", None)
        self.rotator = ModelKeyRotator(models=rotation_models)

    if self.ctx.input_path and self.ctx.input_path.exists() and self.ctx.input_path.is_file():
        try:
            self.zip_handle = zipfile.ZipFile(self.ctx.input_path, "r")
            validate_zip_contents(self.zip_handle)
            # Build index for O(1) lookup
            for info in self.zip_handle.infolist():
                if not info.is_dir():
                    self.media_index[Path(info.filename).name.lower()] = info.filename
        except (OSError, zipfile.BadZipFile) as exc:
            logger.warning("Failed to open source ZIP %s: %s", self.ctx.input_path, exc)
            if self.zip_handle:
                self.zip_handle.close()
                self.zip_handle = None

enrichment_config property

enrichment_config: EnrichmentSettings

Get effective enrichment configuration.

close

close() -> None

Explicitly close the ZIP handle to release resources.

Should be called when done with the worker. Also called by exit for context manager support.

Source code in src/egregora/agents/enricher.py
def close(self) -> None:
    """Explicitly close the ZIP handle to release resources.

    Should be called when done with the worker. Also called by __exit__
    for context manager support.
    """
    if self.zip_handle:
        try:
            self.zip_handle.close()
        except OSError:
            logger.debug("Error closing ZIP handle", exc_info=True)
        finally:
            self.zip_handle = None
            self.media_index = {}

    # Clean up staging directory (Story 1: Ephemeral media staging)
    if self.staging_dir:
        try:
            self.staging_dir.cleanup()
        except OSError:
            logger.debug("Error cleaning up staging directory", exc_info=True)
        finally:
            self.staging_dir = None
            self.staged_files = set()

__enter__

__enter__() -> Self

Context manager entry.

Source code in src/egregora/agents/enricher.py
def __enter__(self) -> Self:
    """Context manager entry."""
    return self

__exit__

__exit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None

Context manager exit - ensures ZIP handle is closed.

Source code in src/egregora/agents/enricher.py
def __exit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None:
    """Context manager exit - ensures ZIP handle is closed."""
    self.close()

run

run() -> int

Process pending enrichment tasks in batches.

Source code in src/egregora/agents/enricher.py
def run(self) -> int:
    """Process pending enrichment tasks in batches."""
    if not self.enrichment_config.enabled:
        logger.info("Enrichment is disabled. Skipping enrichment worker.")
        return 0
    # Determine concurrency to scale fetch limit
    # We assume typical batch size of 50.
    base_batch_size = 50
    # We pass a dummy count to _determine_concurrency just to check key/config state
    concurrency = self._determine_concurrency(base_batch_size)

    # Scale fetch limit by concurrency to allow parallel processing of multiple batches
    fetch_limit = base_batch_size * concurrency

    tasks = self.task_store.fetch_pending(task_type="enrich_url", limit=fetch_limit)
    media_tasks = self.task_store.fetch_pending(task_type="enrich_media", limit=fetch_limit)

    total_tasks = len(tasks) + len(media_tasks)
    if not total_tasks:
        return 0

    logger.info(
        "[Enrichment] Processing %d tasks (URL: %d, Media: %d) with concurrency %d",
        total_tasks,
        len(tasks),
        len(media_tasks),
        concurrency,
    )

    processed_count = 0

    if tasks:
        count = self._process_url_batch(tasks)
        processed_count += count
        logger.info("[Enrichment] URL batch complete: %d/%d", count, len(tasks))

    if media_tasks:
        count = self._process_media_batch(media_tasks)
        processed_count += count
        logger.info("[Enrichment] Media batch complete: %d/%d", count, len(media_tasks))

    logger.info("Enrichment complete: %d/%d tasks processed", processed_count, total_tasks)
    return processed_count

load_file_as_binary_content

load_file_as_binary_content(
    file_path: Path, max_size_mb: int = 20
) -> BinaryContent

Load a file as BinaryContent for pydantic-ai agents.

Source code in src/egregora/agents/enricher.py
def load_file_as_binary_content(file_path: Path, max_size_mb: int = 20) -> BinaryContent:
    """Load a file as BinaryContent for pydantic-ai agents."""
    if not file_path.exists():
        msg = f"File not found: {file_path}"
        raise EnrichmentFileError(msg)
    file_size = file_path.stat().st_size
    max_size_bytes = max_size_mb * 1024 * 1024
    if file_size > max_size_bytes:
        size_mb = file_size / (1024 * 1024)
        msg = f"File too large: {size_mb:.2f}MB exceeds {max_size_mb}MB limit. File: {file_path.name}"
        raise EnrichmentFileError(msg)
    media_type, _ = mimetypes.guess_type(str(file_path))
    if not media_type:
        media_type = "application/octet-stream"
    file_bytes = file_path.read_bytes()
    return BinaryContent(data=file_bytes, media_type=media_type)

fetch_url_with_jina async

fetch_url_with_jina(ctx: RunContext[Any], url: str) -> str

Fetch URL content using Jina.ai Reader.

Use this tool ONLY if the standard 'WebFetchTool' fails to retrieve meaningful content. Examples of when to use this: - The standard fetch returns "JavaScript is required" or "Access Denied" (403/429). - The content is empty or contains only cookie/GDPR banners. - The page is a Single Page Application (SPA) that didn't render.

Source code in src/egregora/agents/enricher.py
async def fetch_url_with_jina(ctx: RunContext[Any], url: str) -> str:
    """Fetch URL content using Jina.ai Reader.

    Use this tool ONLY if the standard 'WebFetchTool' fails to retrieve meaningful content.
    Examples of when to use this:
    - The standard fetch returns "JavaScript is required" or "Access Denied" (403/429).
    - The content is empty or contains only cookie/GDPR banners.
    - The page is a Single Page Application (SPA) that didn't render.
    """
    jina_url = f"https://r.jina.ai/{url}"

    # Headers to enable image captioning and ensure JSON response if needed
    headers = {"X-With-Generated-Alt": "true", "X-Retain-Images": "none"}

    async with httpx.AsyncClient() as client:
        try:
            # Jina returns Markdown by default
            response = await client.get(jina_url, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.text
        except (httpx.RequestError, httpx.HTTPStatusError) as exc:
            msg = f"Jina fetch failed: {exc}"
            raise JinaFetchError(msg) from exc

schedule_enrichment

schedule_enrichment(
    messages_table: Table,
    media_mapping: MediaMapping,
    enrichment_settings: EnrichmentSettings,
    context: EnrichmentRuntimeContext,
    run_id: UUID | None = None,
) -> None

Schedule enrichment tasks for background processing.

Source code in src/egregora/agents/enricher.py
def schedule_enrichment(
    messages_table: Table,
    media_mapping: MediaMapping,
    enrichment_settings: EnrichmentSettings,
    context: EnrichmentRuntimeContext,
    run_id: uuid.UUID | None = None,
) -> None:
    """Schedule enrichment tasks for background processing."""
    if not hasattr(context, "task_store") or not context.task_store:
        logger.warning("TaskStore not available in context; skipping enrichment scheduling.")
        return

    if messages_table.count().execute() == 0:
        return

    current_run_id = run_id or uuid.uuid4()
    max_enrichments = enrichment_settings.max_enrichments

    url_count = _enqueue_url_enrichments(
        messages_table,
        max_enrichments,
        context,
        current_run_id,
        enable_url=enrichment_settings.enable_url,
    )

    media_config = MediaEnrichmentConfig(
        media_mapping=media_mapping,
        max_enrichments=max_enrichments,
        enable_media=enrichment_settings.enable_media,
    )
    media_count = _enqueue_media_enrichments(
        messages_table,
        context,
        current_run_id,
        media_config,
    )
    logger.info("Scheduled %d URL tasks and %d Media tasks", url_count, media_count)