Skip to content

Content Repository

repository

Data access layer for content (Posts, Profiles, Media, Journals).

This repository handles routing document operations to the unified documents table in DuckDB.

ContentRepository

ContentRepository(db: DuckDBStorageManager)

Repository for content document operations.

Source code in src/egregora/database/repository.py
def __init__(self, db: DuckDBStorageManager) -> None:
    self.db = db
    self._table_name = "documents"

save

save(doc: Document) -> None

Save document to the unified documents table.

Source code in src/egregora/database/repository.py
def save(self, doc: Document) -> None:
    """Save document to the unified documents table."""
    # 1. Base mapping (Common to all types)
    row: dict[str, Any] = {
        "id": doc.document_id,  # Use stable ID property
        "content": doc.content
        if isinstance(doc.content, str)
        else doc.content.decode("utf-8", errors="ignore"),
        "created_at": doc.created_at,
        "source_checksum": doc.internal_metadata.get("checksum"),
        "doc_type": doc.type.value,
        "status": doc.metadata.get("status", "published"),  # Default status if not provided
        "extensions": None,
    }

    # 2. Type-specific mapping
    if doc.type == DocumentType.POST:
        row.update(
            {
                "title": doc.metadata.get("title"),
                "slug": doc.slug,  # Use property that handles fallback
                "date": doc.internal_metadata.get("date"),
                "summary": doc.metadata.get("summary"),
                "authors": doc.metadata.get("authors", []),
                "tags": doc.metadata.get("tags", []),
                "status": doc.metadata.get("status"),
            }
        )

    elif doc.type == DocumentType.PROFILE:
        row.update(
            {
                "subject_uuid": doc.internal_metadata.get("subject_uuid")
                or doc.document_id,  # Fallback to ID if needed
                "title": doc.metadata.get("title") or doc.metadata.get("name"),
                "alias": doc.internal_metadata.get("alias"),
                "summary": doc.metadata.get("summary") or doc.metadata.get("bio"),
                "avatar_url": doc.metadata.get("avatar_url"),
                "interests": doc.metadata.get("interests", []),
            }
        )

    elif doc.type == DocumentType.MEDIA:
        row.update(
            {
                "filename": doc.internal_metadata.get("filename"),
                "mime_type": doc.internal_metadata.get("mime_type"),  # Was mime_type in schema
                "media_type": doc.internal_metadata.get("media_type"),
                "phash": doc.internal_metadata.get("phash"),
            }
        )

    elif doc.type == DocumentType.JOURNAL:
        row.update(
            {
                "title": doc.metadata.get("title") or doc.metadata.get("window_label"),
                "window_start": doc.internal_metadata.get("window_start"),
                "window_end": doc.internal_metadata.get("window_end"),
            }
        )

    elif doc.type == DocumentType.ANNOTATION:
        pass

    try:
        self.db.replace_rows("documents", [row], by_keys={"id": row["id"]})
    except Exception as e:
        msg = f"Failed to save document {row['id']}: {e}"
        raise DatabaseOperationError(msg) from e

get_all

get_all() -> Iterator[Document]

Stream all documents from the unified table.

Source code in src/egregora/database/repository.py
def get_all(self) -> Iterator[Document]:
    """Stream all documents from the unified table."""
    try:
        t = self.db.read_table("documents")
        # Return Document objects, not dicts
        for row in t.execute().to_dict(orient="records"):
            yield self._row_to_document(row)
    except Exception as e:
        msg = f"Failed to get all documents: {e}"
        raise DatabaseOperationError(msg) from e

get

get(doc_type: DocumentType, identifier: str) -> Document

Retrieve a single document by type and identifier.

Source code in src/egregora/database/repository.py
def get(self, doc_type: DocumentType, identifier: str) -> Document:
    """Retrieve a single document by type and identifier."""
    try:
        t = self.db.read_table("documents")

        # Filter by ID and Type
        # Also support slug lookup for Posts?

        if doc_type == DocumentType.POST:
            # Try ID match first
            res = t.filter((t.doc_type == doc_type.value) & (t.id == identifier)).limit(1).execute()
            if res.empty:
                # Try Slug match
                res = t.filter((t.doc_type == doc_type.value) & (t.slug == identifier)).limit(1).execute()
        else:
            res = t.filter((t.doc_type == doc_type.value) & (t.id == identifier)).limit(1).execute()

        if res.empty:
            raise DocumentNotFoundError(doc_type.value, identifier)

        data = res.to_dict(orient="records")[0]
        return self._row_to_document(data)
    except DocumentNotFoundError:
        raise
    except Exception as e:
        # Catch IbisError and others
        msg = f"Failed to get document: {e}"
        raise DatabaseOperationError(msg) from e

list

list(doc_type: DocumentType | None = None) -> Iterator[dict[str, Any]]

List documents metadata.

Source code in src/egregora/database/repository.py
def list(self, doc_type: DocumentType | None = None) -> Iterator[dict[str, Any]]:
    """List documents metadata."""
    try:
        t = self.db.read_table("documents")
        if doc_type:
            t = t.filter(t.doc_type == doc_type.value)

        yield from t.execute().to_dict(orient="records")
    except Exception as e:
        # Fallback or error?
        # Old code had fallback. New code assumes 'documents' exists.
        # If 'documents' table missing, it will raise TableNotFoundError which is fine.
        msg = f"Failed to list documents: {e}"
        raise DatabaseOperationError(msg) from e