Skip to content

Profile Agent

profile

Profile worker utilities.

ProfileWorker

ProfileWorker(ctx: WorkerContext)

Bases: BaseWorker

Worker that updates author profiles, with coalescing optimization.

Source code in src/egregora/orchestration/worker_base.py
def __init__(self, ctx: WorkerContext) -> None:
    self.ctx = ctx
    task_store = getattr(ctx, "task_store", None)
    if not task_store:
        msg = "TaskStore not found in PipelineContext; it must be initialized and injected."
        raise ValueError(msg)
    self.task_store = task_store