Skip to content

Database Schemas

schemas

Database schema definitions for Egregora V2 (Clean Break).

This module defines the strictly typed, append-only tables for the new architecture.

create_table_if_not_exists

create_table_if_not_exists(
    conn: DatabaseConnection,
    table_name: str,
    schema: Schema,
    *,
    overwrite: bool = False,
    check_constraints: dict[str, str] | None = None,
    primary_key: str | list[str] | None = None,
    foreign_keys: list[str] | None = None,
) -> None

Create a table using Ibis if it doesn't already exist.

Parameters:

Name Type Description Default
conn DatabaseConnection

Database connection (Ibis or raw DuckDB)

required
table_name str

Name of the table to create

required
schema Schema

Ibis schema definition

required
overwrite bool

If True, drop existing table first

False
check_constraints dict[str, str] | None

Optional dict of constraint_name -> check_expression Example: {"chk_status": "status IN ('draft', 'published')"}

None
primary_key str | list[str] | None

Optional column name (or list of names) to set as PRIMARY KEY

None
foreign_keys list[str] | None

Optional list of foreign key constraint strings Example: ["FOREIGN KEY (parent_id) REFERENCES documents(id)"]

None
Source code in src/egregora/database/schemas.py
def create_table_if_not_exists(
    conn: DatabaseConnection,
    table_name: str,
    schema: ibis.Schema,
    *,
    overwrite: bool = False,
    check_constraints: dict[str, str] | None = None,
    primary_key: str | list[str] | None = None,
    foreign_keys: list[str] | None = None,
) -> None:
    """Create a table using Ibis if it doesn't already exist.

    Args:
        conn: Database connection (Ibis or raw DuckDB)
        table_name: Name of the table to create
        schema: Ibis schema definition
        overwrite: If True, drop existing table first
        check_constraints: Optional dict of constraint_name -> check_expression
                          Example: {"chk_status": "status IN ('draft', 'published')"}
        primary_key: Optional column name (or list of names) to set as PRIMARY KEY
        foreign_keys: Optional list of foreign key constraint strings
                      Example: ["FOREIGN KEY (parent_id) REFERENCES documents(id)"]

    """
    # Explicitly check if the connection is a raw DuckDB connection.
    # This is more reliable than duck-typing with hasattr.
    if isinstance(conn, duckdb.DuckDBPyConnection):
        # Raw duckdb connection - build the CREATE TABLE statement with constraints.
        if overwrite:
            conn.execute(f"DROP TABLE IF EXISTS {quote_identifier(table_name)}")

        columns_sql = ", ".join(
            f"{quote_identifier(name)} {ibis_to_duckdb_type(dtype)}" for name, dtype in schema.items()
        )

        all_clauses = [columns_sql]

        # Add PRIMARY KEY
        if primary_key:
            if isinstance(primary_key, list):
                pk_cols = ", ".join(quote_identifier(c) for c in primary_key)
            else:
                pk_cols = quote_identifier(primary_key)
            all_clauses.append(f"PRIMARY KEY ({pk_cols})")

        # Add FOREIGN KEYs
        if foreign_keys:
            all_clauses.extend(foreign_keys)

        # Add CHECK constraints to CREATE TABLE statement
        if check_constraints:
            for constraint_name, check_expr in check_constraints.items():
                all_clauses.append(f"CONSTRAINT {quote_identifier(constraint_name)} CHECK ({check_expr})")

        create_verb = "CREATE TABLE" if overwrite else "CREATE TABLE IF NOT EXISTS"
        create_sql = f"{create_verb} {quote_identifier(table_name)} ({', '.join(all_clauses)})"
        conn.execute(create_sql)
    else:
        # Assume Ibis connection for all other cases.
        # We need to cast because mypy doesn't fully narrow the negative case of isinstance
        # when the other type is behind a TYPE_CHECKING guard or Any-like.
        ibis_conn = cast("BaseBackend", conn)

        if table_name not in ibis_conn.list_tables() or overwrite:
            ibis_conn.create_table(table_name, schema=schema, overwrite=overwrite)
            # This path is problematic for DuckDB which doesn't support ALTER TABLE ADD CONSTRAINT.
            # But we try to apply what we can via helpers.
            if primary_key:
                add_primary_key(conn, table_name, primary_key)
            if check_constraints:
                for constraint_name, check_expr in check_constraints.items():
                    add_check_constraint(conn, table_name, constraint_name, check_expr)

ibis_to_duckdb_type

ibis_to_duckdb_type(ibis_type: DataType) -> str

Convert Ibis data type to DuckDB SQL type string.

Parameters:

Name Type Description Default
ibis_type DataType

Ibis data type

required

Returns:

Type Description
str

DuckDB SQL type string

Source code in src/egregora/database/schemas.py
def ibis_to_duckdb_type(ibis_type: ibis.expr.datatypes.DataType) -> str:
    """Convert Ibis data type to DuckDB SQL type string.

    Args:
        ibis_type: Ibis data type

    Returns:
        DuckDB SQL type string

    """
    # Mapping of predicate method names to DuckDB type strings
    simple_types = {
        "is_timestamp": "TIMESTAMP WITH TIME ZONE",
        "is_date": "DATE",
        "is_string": "VARCHAR",
        "is_int64": "BIGINT",
        "is_int32": "INTEGER",
        "is_float64": "DOUBLE PRECISION",
        "is_float32": "FLOAT",
        "is_boolean": "BOOLEAN",
        "is_binary": "BLOB",
        "is_uuid": "UUID",
        "is_json": "JSON",
    }

    # Ibis dtypes are value objects (not classes) in 9.x, so prefer predicate methods over isinstance.
    for predicate, sql_type in simple_types.items():
        if callable(getattr(ibis_type, predicate, None)) and getattr(ibis_type, predicate)():
            return sql_type

    # Handle nested types
    if callable(getattr(ibis_type, "is_array", None)) and ibis_type.is_array():
        value_type = ibis_to_duckdb_type(ibis_type.value_type)
        return f"{value_type}[]"

    # Fallback to string representation
    return str(ibis_type).upper()

add_primary_key

add_primary_key(
    conn: DatabaseConnection, table_name: str, column_name: str | list[str]
) -> None

Add a primary key constraint to an existing table.

Parameters:

Name Type Description Default
conn DatabaseConnection

Database connection (Ibis or raw DuckDB)

required
table_name str

Name of the table

required
column_name str | list[str]

Column(s) to use as primary key

required
Note

DuckDB requires ALTER TABLE for primary key constraints.

Source code in src/egregora/database/schemas.py
def add_primary_key(conn: DatabaseConnection, table_name: str, column_name: str | list[str]) -> None:
    """Add a primary key constraint to an existing table.

    Args:
        conn: Database connection (Ibis or raw DuckDB)
        table_name: Name of the table
        column_name: Column(s) to use as primary key

    Note:
        DuckDB requires ALTER TABLE for primary key constraints.

    """
    try:
        quoted_table = quote_identifier(table_name)
        quoted_constraint = quote_identifier(f"pk_{table_name}")

        if isinstance(column_name, list):
            quoted_cols = ", ".join(quote_identifier(c) for c in column_name)
        else:
            quoted_cols = quote_identifier(column_name)

        sql = f"ALTER TABLE {quoted_table} ADD CONSTRAINT {quoted_constraint} PRIMARY KEY ({quoted_cols})"
        _execute_on_connection(conn, sql)
    except (duckdb.Error, RuntimeError) as e:
        # Constraint may already exist - log and continue
        # RuntimeError can happen from Ibis backends on SQL error
        logger.debug("Could not add primary key to %s.%s: %s", table_name, column_name, e)

ensure_identity_column

ensure_identity_column(
    conn: DatabaseConnection,
    table_name: str,
    column_name: str,
    *,
    generated: str = "ALWAYS",
) -> None

Ensure a column is configured as an identity column in DuckDB.

Source code in src/egregora/database/schemas.py
def ensure_identity_column(
    conn: DatabaseConnection, table_name: str, column_name: str, *, generated: str = "ALWAYS"
) -> None:
    """Ensure a column is configured as an identity column in DuckDB."""
    if generated not in ("ALWAYS", "BY DEFAULT"):
        msg = f"Invalid identity generation mode: {generated!r}"
        raise ValueError(msg)

    try:
        quoted_table = quote_identifier(table_name)
        quoted_column = quote_identifier(column_name)
        sql = f"ALTER TABLE {quoted_table} ALTER COLUMN {quoted_column} SET GENERATED {generated} AS IDENTITY"
        _execute_on_connection(conn, sql)
    except (duckdb.Error, RuntimeError) as e:
        # Identity column setup failure is non-fatal (might already exist or not supported by backend)
        # duckdb.Error is likely, but catching RuntimeError to be safe against different backends
        logger.debug(
            "Could not set identity on %s.%s (generated=%s): %s", table_name, column_name, generated, e
        )

create_index

create_index(
    conn: DatabaseConnection,
    table_name: str,
    index_name: str,
    column_name: str,
    index_type: str = "HNSW",
) -> None

Create an index on a table.

Parameters:

Name Type Description Default
conn DatabaseConnection

Database connection (Ibis or raw DuckDB)

required
table_name str

Name of the table

required
index_name str

Name for the index

required
column_name str

Column to index

required
index_type str

Type of index (HNSW for vector search, standard otherwise)

'HNSW'
Note

For vector columns, use index_type='HNSW' with cosine metric (optimized for 768-dim embeddings). Uses CREATE INDEX IF NOT EXISTS to handle already-existing indexes.

Source code in src/egregora/database/schemas.py
def create_index(
    conn: DatabaseConnection, table_name: str, index_name: str, column_name: str, index_type: str = "HNSW"
) -> None:
    """Create an index on a table.

    Args:
        conn: Database connection (Ibis or raw DuckDB)
        table_name: Name of the table
        index_name: Name for the index
        column_name: Column to index
        index_type: Type of index (HNSW for vector search, standard otherwise)

    Note:
        For vector columns, use index_type='HNSW' with cosine metric (optimized for 768-dim embeddings).
        Uses CREATE INDEX IF NOT EXISTS to handle already-existing indexes.

    """
    q_index = quote_identifier(index_name)
    q_table = quote_identifier(table_name)
    q_col = quote_identifier(column_name)

    if index_type == "HNSW":
        sql = (
            f"CREATE INDEX IF NOT EXISTS {q_index} ON {q_table} USING HNSW ({q_col}) WITH (metric = 'cosine')"
        )
    else:
        sql = f"CREATE INDEX IF NOT EXISTS {q_index} ON {q_table} ({q_col})"

    _execute_on_connection(conn, sql)

add_check_constraint

add_check_constraint(
    conn: DatabaseConnection,
    table_name: str,
    constraint_name: str,
    check_expression: str,
) -> None

Add a CHECK constraint to an existing table.

Parameters:

Name Type Description Default
conn DatabaseConnection

Database connection (Ibis or raw DuckDB)

required
table_name str

Name of the table

required
constraint_name str

Name for the constraint

required
check_expression str

SQL expression for the constraint (e.g., "status IN ('draft', 'published')")

required
Note

DuckDB requires ALTER TABLE for check constraints. Idempotent: silently succeeds if constraint already exists.

Source code in src/egregora/database/schemas.py
def add_check_constraint(
    conn: DatabaseConnection, table_name: str, constraint_name: str, check_expression: str
) -> None:
    """Add a CHECK constraint to an existing table.

    Args:
        conn: Database connection (Ibis or raw DuckDB)
        table_name: Name of the table
        constraint_name: Name for the constraint
        check_expression: SQL expression for the constraint (e.g., "status IN ('draft', 'published')")

    Note:
        DuckDB requires ALTER TABLE for check constraints.
        Idempotent: silently succeeds if constraint already exists.

    """
    try:
        quoted_table = quote_identifier(table_name)
        quoted_constraint = quote_identifier(constraint_name)
        sql = f"ALTER TABLE {quoted_table} ADD CONSTRAINT {quoted_constraint} CHECK ({check_expression})"
        _execute_on_connection(conn, sql)
    except (duckdb.Error, RuntimeError) as e:
        # Constraint may already exist - log and continue
        logger.debug("Could not add CHECK constraint to %s: %s", table_name, e)

get_table_check_constraints

get_table_check_constraints(table_name: str) -> dict[str, str]

Get CHECK constraints for a table based on business logic.

Parameters:

Name Type Description Default
table_name str

Name of the table

required

Returns:

Type Description
dict[str, str]

Dictionary mapping constraint names to CHECK expressions

Note

This function defines business rules at the database level by specifying CHECK constraints for enum-like fields. Currently supports: - posts.status: Must be one of VALID_POST_STATUSES - tasks.status: Must be one of VALID_TASK_STATUSES

Source code in src/egregora/database/schemas.py
def get_table_check_constraints(table_name: str) -> dict[str, str]:
    """Get CHECK constraints for a table based on business logic.

    Args:
        table_name: Name of the table

    Returns:
        Dictionary mapping constraint names to CHECK expressions

    Note:
        This function defines business rules at the database level by specifying
        CHECK constraints for enum-like fields. Currently supports:
        - posts.status: Must be one of VALID_POST_STATUSES
        - tasks.status: Must be one of VALID_TASK_STATUSES

    """
    if table_name == "posts":
        valid_values = ", ".join(f"'{status}'" for status in VALID_POST_STATUSES)
        return {"chk_posts_status": f"status IN ({valid_values})"}
    if table_name == "tasks":
        constraints = {}
        valid_statuses = ", ".join(f"'{status}'" for status in VALID_TASK_STATUSES)
        constraints["chk_tasks_status"] = f"status IN ({valid_statuses})"
        valid_task_types = ", ".join(f"'{task_type}'" for task_type in VALID_TASK_TYPES)
        constraints["chk_tasks_task_type"] = f"task_type IN ({valid_task_types})"
        return constraints
    if table_name == "annotations":
        valid_values = ", ".join(f"'{parent_type}'" for parent_type in VALID_ANNOTATION_PARENT_TYPES)
        return {"chk_annotations_parent_type": f"parent_type IN ({valid_values})"}

    if table_name == "documents":
        valid_post_statuses = ", ".join(f"'{status}'" for status in VALID_POST_STATUSES)
        valid_media_types = ", ".join(f"'{media_type}'" for media_type in VALID_MEDIA_TYPES)
        return {
            "chk_doc_post_req": "(doc_type != 'post') OR (title IS NOT NULL AND slug IS NOT NULL AND status IS NOT NULL)",
            "chk_doc_post_status": f"(doc_type != 'post') OR (status IN ({valid_post_statuses}))",
            "chk_doc_profile_req": "(doc_type != 'profile') OR (title IS NOT NULL AND subject_uuid IS NOT NULL)",
            "chk_doc_journal_req": "(doc_type != 'journal') OR (title IS NOT NULL AND window_start IS NOT NULL AND window_end IS NOT NULL)",
            "chk_doc_media_req": "(doc_type != 'media') OR (filename IS NOT NULL)",
            "chk_doc_media_type": f"(doc_type != 'media') OR (media_type IN ({valid_media_types}))",
        }

    if table_name == "messages":
        valid_media_types = ", ".join(f"'{media_type}'" for media_type in VALID_MEDIA_TYPES)
        return {"chk_messages_media_type": f"(media_type IS NULL) OR (media_type IN ({valid_media_types}))"}

    if table_name == "document_relations":
        valid_relation_types = ", ".join(f"'{t}'" for t in VALID_RELATION_TYPES)
        return {"chk_doc_relations_type": f"relation_type IN ({valid_relation_types})"}

    return {}

get_table_foreign_keys

get_table_foreign_keys(table_name: str) -> list[str]

Get Foreign Key constraints for a table.

Parameters:

Name Type Description Default
table_name str

Name of the table

required

Returns:

Name Type Description
list[str]

List of FOREIGN KEY clause strings.

Example list[str]

["FOREIGN KEY (parent_id) REFERENCES documents(id)"]

Source code in src/egregora/database/schemas.py
def get_table_foreign_keys(table_name: str) -> list[str]:
    """Get Foreign Key constraints for a table.

    Args:
        table_name: Name of the table

    Returns:
        List of FOREIGN KEY clause strings.
        Example: ["FOREIGN KEY (parent_id) REFERENCES documents(id)"]

    """
    if table_name == "annotations":
        return ["FOREIGN KEY (parent_id) REFERENCES documents(id)"]
    if table_name == "document_relations":
        return [
            "FOREIGN KEY (source_id) REFERENCES documents(id)",
            "FOREIGN KEY (target_id) REFERENCES documents(id)",
        ]
    if table_name == "entity_aliases":
        return ["FOREIGN KEY (target_id) REFERENCES documents(id)"]
    return []