Skip to content

Writer Agent

writer

Pydantic-AI powered writer agent.

This module implements the writer workflow using Pydantic-AI. It acts as the Composition Root for the agent, assembling core tools and capabilities before executing the conversation through a pydantic_ai.Agent.

JournalEntry dataclass

JournalEntry(
    entry_type: str,
    content: str,
    timestamp: datetime | None = None,
    tool_name: str | None = None,
)

Represents a single entry in the intercalated journal log.

WriterJournalEntryParams dataclass

WriterJournalEntryParams(
    intercalated_log: list[JournalEntry],
    window_label: str,
    output_sink: OutputSink,
    posts_published: int,
    profiles_updated: int,
    window_start: datetime,
    window_end: datetime,
    total_tokens: int = 0,
)

Parameters for saving a journal entry.

WriterFinalizationParams dataclass

WriterFinalizationParams(
    saved_posts: list[str],
    saved_profiles: list[str],
    resources: WriterResources,
    deps: WriterDeps,
    cache: PipelineCache,
    signature: str,
)

Parameters for finalizing writer results.

write_posts_with_pydantic_agent

write_posts_with_pydantic_agent(
    *,
    prompt: str,
    config: EgregoraConfig,
    context: WriterDeps,
    test_model: AgentModel | None = None,
    max_tokens_override: int | None = None,
    api_key_override: str | None = None,
) -> tuple[list[str], list[str]]

Execute the writer flow using Pydantic-AI agent tooling.

Source code in src/egregora/agents/writer.py
@_sleep_and_retry
@_limits(calls=100, period=60)
def write_posts_with_pydantic_agent(
    *,
    prompt: str,
    config: EgregoraConfig,
    context: WriterDeps,
    test_model: AgentModel | None = None,
    max_tokens_override: int | None = None,
    api_key_override: str | None = None,
) -> tuple[list[str], list[str]]:
    """Execute the writer flow using Pydantic-AI agent tooling."""
    logger.info("Running writer via Pydantic-AI backend")

    model = create_writer_model(config, context, prompt, test_model, api_key=api_key_override)
    model_settings: ModelSettings | None = None
    if config.models.writer.startswith("openrouter:"):
        model_settings = {"max_tokens": max_tokens_override or 1024}
    agent = setup_writer_agent(model, prompt, config=config, model_settings=model_settings)

    if context.resources.quota:
        context.resources.quota.reserve(1)

    logger.info(
        "Starting writer agent: period=%s messages=%d xml_chars=%d",
        context.window_label,
        len(context.messages),
        len(context.conversation_xml),
    )

    if not context.messages:
        logger.warning("Writer agent called with 0 messages for window %s", context.window_label)
        return [], []

    # Log the first 500 characters of the XML for debugging
    logger.debug("Conversation XML snippet (first 500 chars): %s", context.conversation_xml[:500])

    # Define usage limits
    usage_limits = UsageLimits(
        request_limit=15,  # Reasonable limit for tool loops
    )

    result = None

    # Use tenacity for retries
    def _run_agent_sync(loop: asyncio.AbstractEventLoop) -> Any:
        async def _run_async() -> Any:
            return await agent.run(
                "Analyze the conversation context provided and write posts/profiles as needed.",
                deps=context,
                usage_limits=usage_limits,
            )

        if loop.is_running():
            msg = "Writer loop already running; cannot run synchronously."
            raise RuntimeError(msg)
        asyncio.set_event_loop(loop)
        try:
            return loop.run_until_complete(_run_async())
        finally:
            asyncio.set_event_loop(None)

    loop = _get_writer_loop()
    try:
        for attempt in Retrying(stop=RETRY_STOP, wait=RETRY_WAIT, retry=RETRY_IF, reraise=True):
            with attempt:
                # Execute model directly without tools
                result = _run_agent_sync(loop)
    except Exception as e:
        logger.exception("Error during agent run: %s", e)
        raise
    finally:
        loop.close()

    if not result:
        msg = "Agent failed to return a result"
        raise RuntimeError(msg)

    # Log total tool calls made
    tool_calls = [
        p for m in result.all_messages() if hasattr(m, "parts") for p in m.parts if hasattr(p, "tool_name")
    ]
    logger.info("Agent run complete. Total tool calls attempt: %d", len(tool_calls))
    for tc in tool_calls:
        logger.info("  - Called tool: %s", getattr(tc, "tool_name", "unknown"))

    usage = result.usage()
    if context.resources.usage:
        context.resources.usage.record(usage)
    messages = result.all_messages()
    saved_posts, saved_profiles = _extract_tool_results(messages)
    if not saved_posts and not saved_profiles:
        has_tool_calls = any(
            isinstance(part, ToolCallPart) for message in messages for part in getattr(message, "parts", [])
        )
        if not has_tool_calls:
            msg = "Writer response did not include any tool calls."
            raise AgentError(msg)
    intercalated_log = _extract_intercalated_log(messages)
    # TODO: [Taskmaster] Refactor complex journal fallback logic
    if not intercalated_log:
        fallback_content = _extract_journal_content(result.all_messages())
        if fallback_content:
            # Strip frontmatter if present (to avoid double frontmatter)
            if fallback_content.strip().startswith("---"):
                try:
                    _, _, body = fallback_content.strip().split("---", 2)
                    fallback_content = body.strip()
                except ValueError:
                    pass  # Failed to split, keep original

            intercalated_log = [JournalEntry("journal", fallback_content, datetime.now(tz=UTC))]
        else:
            intercalated_log = [
                JournalEntry(
                    "journal",
                    "Writer agent completed without emitting a detailed reasoning trace.",
                    datetime.now(tz=UTC),
                )
            ]
    _save_journal_to_file(
        WriterJournalEntryParams(
            intercalated_log=intercalated_log,
            window_label=context.window_label,
            output_sink=context.resources.output,
            posts_published=len(saved_posts),
            profiles_updated=len(saved_profiles),
            window_start=context.window_start,
            window_end=context.window_end,
            total_tokens=result.usage().total_tokens if result.usage() else 0,
        )
    )

    logger.info(
        "Writer agent completed: period=%s posts=%d profiles=%d tokens=%d",
        context.window_label,
        len(saved_posts),
        len(saved_profiles),
        result.usage().total_tokens if result.usage() else 0,
    )

    return saved_posts, saved_profiles

write_posts_for_window

write_posts_for_window(params: WindowProcessingParams) -> dict[str, Any]

Public entry point for the writer agent.

Source code in src/egregora/agents/writer.py
def write_posts_for_window(params: WindowProcessingParams) -> dict[str, Any]:
    """Public entry point for the writer agent."""
    if params.smoke_test:
        logger.info("Smoke test mode: skipping writer agent.")
        return {RESULT_KEY_POSTS: [], RESULT_KEY_PROFILES: []}

    # We check if messages list is empty
    if not params.messages:
        logger.warning("write_posts_for_window called with 0 messages for window %s", params.window_label)
        return {RESULT_KEY_POSTS: [], RESULT_KEY_PROFILES: []}

    # NEW: Trace message count
    logger.info("Writer agent received %d messages for processing", len(params.messages))

    # 1. Prepare dependencies (partial, will update with context later)
    resources = params.resources
    if params.run_id and resources.run_id is None:
        # Create new resources with run_id
        resources = dataclasses.replace(resources, run_id=params.run_id)

    # 2. Build context and calculate signature
    # We need to build context first to get XML for signature
    writer_context, signature = build_context_and_signature(
        WriterContextParams(
            table=params.table,
            resources=resources,
            cache=params.cache,
            config=params.config,
            window_label=f"{params.window_start:%Y-%m-%d %H:%M} to {params.window_end:%H:%M}",
            adapter_content_summary=params.adapter_content_summary,
            adapter_generation_instructions=params.adapter_generation_instructions,
        ),
        resources.prompts_dir,
    )

    # 3. Check L3 cache
    cached_result = check_writer_cache(
        params.cache,
        signature,
        f"{params.window_start:%Y-%m-%d %H:%M} to {params.window_end:%H:%M}",
        resources.usage,
    )
    if cached_result:
        # TODO: [Taskmaster] Refactor brittle cache validation logic
        # Validate cached posts still exist on disk (they may be missing if output dir is fresh)
        cached_posts = cached_result.get(RESULT_KEY_POSTS, [])
        if cached_posts:
            # Check if at least one post file exists
            posts_exist = True
            if hasattr(resources.output, "posts_dir"):
                posts_exist = any(
                    list(resources.output.posts_dir.glob(f"*{slug}*.md"))
                    for slug in cached_posts[:1]  # Check first post only for speed
                )

            if not posts_exist:
                logger.warning(
                    "⚠️ Cached posts not found on disk, regenerating for window %s",
                    f"{params.window_start:%Y-%m-%d %H:%M} to {params.window_end:%H:%M}",
                )
                # Invalidate this cache entry
                params.cache.writer.delete(signature)
            else:
                _regenerate_site_indices(resources.output)
                return cached_result
        else:
            return cached_result

    logger.info("Using Pydantic AI backend for writer")

    # 4. Create Deps with the generated context
    deps = prepare_writer_dependencies(
        WriterDepsParams(
            window_start=params.window_start,
            window_end=params.window_end,
            resources=resources,
            model_name=params.config.models.writer,
            messages=params.messages,
            table=params.table,
            config=params.config,
            conversation_xml=writer_context.conversation_xml,
            active_authors=writer_context.active_authors,
            adapter_content_summary=params.adapter_content_summary,
            adapter_generation_instructions=params.adapter_generation_instructions,
        )
    )

    # Trace final deps message count
    logger.info("WriterDeps initialized with %d messages", len(deps.messages))

    # 5. Render prompt and execute agent
    # NOTE: _render_writer_prompt uses writer_context, which we stripped RAG/Profiles from.
    # The Jinja template must be robust to missing/empty rag_context/profiles_context
    # OR we need to trust the dynamic system prompts to fill them in.
    # The current Jinja template (viewed earlier) has placeholders:
    # {% if profiles_context %}{{ profiles_context }}{% endif %}
    # If they are empty strings, they won't render in the user prompt, which is what we want,
    # because they will be injected by system prompts.

    prompt = _render_writer_prompt(writer_context, deps.resources.prompts_dir)

    # Execute writer with error handling (removed economic mode - never worked)
    saved_posts, saved_profiles = _execute_writer_with_error_handling(prompt, params.config, deps)

    # 6. Finalize results (output, RAG indexing, caching)
    return _finalize_writer_results(
        WriterFinalizationParams(
            saved_posts=saved_posts,
            saved_profiles=saved_profiles,
            resources=resources,
            deps=deps,
            cache=params.cache,
            signature=signature,
        )
    )

load_format_instructions

load_format_instructions(
    site_root: Path | None, *, registry: OutputSinkRegistry | None = None
) -> str

Load output format instructions for the writer agent.

Source code in src/egregora/agents/writer.py
def load_format_instructions(site_root: Path | None, *, registry: OutputSinkRegistry | None = None) -> str:
    """Load output format instructions for the writer agent."""
    registry = registry or create_default_output_registry()

    if site_root:
        detected_format = registry.detect_format(site_root)
        if detected_format:
            return detected_format.get_format_instructions()

    try:
        default_format = registry.get_format("mkdocs")
        return default_format.get_format_instructions()
    except KeyError:
        return ""

get_top_authors

get_top_authors(table: Table, limit: int = 20) -> list[str]

Get top N active authors by message count.

Deprecated: Use Message DTOs filtering instead.

Source code in src/egregora/agents/writer.py
def get_top_authors(table: Table, limit: int = 20) -> list[str]:
    """Get top N active authors by message count.

    Deprecated: Use Message DTOs filtering instead.
    """
    author_counts = (
        table.filter(~table.author_uuid.cast("string").isin(["system", "egregora"]))
        .filter(table.author_uuid.notnull())
        .filter(table.author_uuid.cast("string") != "")
        .group_by("author_uuid")
        .aggregate(count=table.author_uuid.count())
        .order_by(ibis.desc("count"))
        .limit(limit)
    )
    if author_counts.count().execute() == 0:
        return []
    return cast("list[str]", author_counts.author_uuid.cast("string").execute().tolist())