Persistence

Database layer: SQLAlchemy models, session management, and repositories for users, experiments, streams, and control sets. Used by the HTTP layer and background jobs.

Overview

  • ORM Models — User, ControlSet, and related models. Map to PostgreSQL tables.

  • Session — Async engine, session factory, init/close lifecycle.

  • Repositories — UserRepository, StreamRepository, ControlSetRepository. Encapsulate queries and CRUD.

Design Decisions

Async SQLAlchemy

All database operations use async sessions with asyncpg (PostgreSQL’s native async driver). This ensures the event loop is never blocked by database I/O, which matters for SSE streaming where many concurrent connections share the same process.

Repository pattern

Each domain entity (users, streams, control sets) has its own repository class that encapsulates queries. Services never construct raw SQL — they call repository methods. This makes testing easier (mock the repository, not the database) and keeps SQL details out of business logic.

Alembic for migrations

Schema migrations use Alembic with async-compatible migration scripts. create_all is retained for development convenience (fresh databases), but production-like environments should use Alembic to apply schema changes incrementally.

UUID primary keys

All entities use UUID primary keys (via the custom GUID type decorator) for globally unique, non-sequential identifiers. This allows distributed ID generation without coordination and prevents information leakage from sequential IDs.

Note

Schema migrations use Alembic (see alembic/versions/). create_all is retained for development convenience on fresh databases.

ORM Models

Purpose: SQLAlchemy models for users, control sets, experiments, streams, gene sets, and operations. Define the schema and relationships.

Key classes: User, ControlSet, ExperimentRow, Stream

SQLAlchemy ORM models.

class veupath_chatbot.persistence.models.GUID(*args, **kwargs)[source]

Bases: TypeDecorator

Platform-independent GUID type.

Uses CHAR(36) and stores UUIDs as strings. Returns proper UUID objects on read so that Python-side comparisons (e.g. stream.user_id == some_uuid) work correctly.

impl

alias of CHAR

cache_ok = True

Indicate if statements using this ExternalType are “safe to cache”.

The default value None will emit a warning and then not allow caching of a statement which includes this type. Set to False to disable statements using this type from being cached at all without a warning. When set to True, the object’s class and selected elements from its state will be used as part of the cache key. For example, using a TypeDecorator:

class MyType(TypeDecorator):
    impl = String

    cache_ok = True

    def __init__(self, choices):
        self.choices = tuple(choices)
        self.internal_only = True

The cache key for the above type would be equivalent to:

>>> MyType(["a", "b", "c"])._static_cache_key
(<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))

The caching scheme will extract attributes from the type that correspond to the names of parameters in the __init__() method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.

The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.

To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:

class LookupType(UserDefinedType):
    """a custom type that accepts a dictionary as a parameter.

    this is the non-cacheable version, as "self.lookup" is not
    hashable.

    """

    def __init__(self, lookup):
        self.lookup = lookup

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect): ...  # works with "self.lookup" ...

Where “lookup” is a dictionary. The type will not be able to generate a cache key:

>>> type_ = LookupType({"a": 10, "b": 20})
>>> type_._static_cache_key
<stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not
produce a cache key because the ``cache_ok`` flag is not set to True.
Set this flag to True if this type object's state is safe to use
in a cache key, or False to disable this warning.
symbol('no_cache')

If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:

>>> # set cache_ok = True
>>> type_.cache_ok = True

>>> # this is the cache key it would generate
>>> key = type_._static_cache_key
>>> key
(<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20}))

>>> # however this key is not hashable, will fail when used with
>>> # SQLAlchemy statement cache
>>> some_cache = {key: "some sql value"}
Traceback (most recent call last): File "<stdin>", line 1,
in <module> TypeError: unhashable type: 'dict'

The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:

class LookupType(UserDefinedType):
    """a custom type that accepts a dictionary as a parameter.

    The dictionary is stored both as itself in a private variable,
    and published in a public variable as a sorted tuple of tuples,
    which is hashable and will also return the same value for any
    two equivalent dictionaries.  Note it assumes the keys and
    values of the dictionary are themselves hashable.

    """

    cache_ok = True

    def __init__(self, lookup):
        self._lookup = lookup

        # assume keys/values of "lookup" are hashable; otherwise
        # they would also need to be converted in some way here
        self.lookup = tuple((key, lookup[key]) for key in sorted(lookup))

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect): ...  # works with "self._lookup" ...

Where above, the cache key for LookupType({"a": 10, "b": 20}) will be:

>>> LookupType({"a": 10, "b": 20})._static_cache_key
(<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))

Added in version 1.4.14: - added the cache_ok flag to allow some configurability of caching for TypeDecorator classes.

Added in version 1.4.28: - added the ExternalType mixin which generalizes the cache_ok flag to both the TypeDecorator and UserDefinedType classes.

load_dialect_impl(dialect)[source]

Return a TypeEngine object corresponding to a dialect.

This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the TypeDecorator implementation of type_engine() to help determine what type should ultimately be returned for a given TypeDecorator.

By default returns self.impl.

Return type:

TypeEngine

process_bind_param(value, dialect)[source]

Receive a bound parameter value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

Parameters:
  • value (UUID | str | None) – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect (Dialect) – the Dialect in use.

Return type:

str | None

See also

Augmenting Existing Types

_types.TypeDecorator.process_result_value()

process_result_value(value, dialect)[source]

Receive a result-row column value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.

The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.

Parameters:
  • value (str | UUID | None) – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect (Dialect) – the Dialect in use.

Return type:

UUID | None

See also

Augmenting Existing Types

_types.TypeDecorator.process_bind_param()

class veupath_chatbot.persistence.models.Base(**kwargs)[source]

Bases: DeclarativeBase

Base class for all models.

type_annotation_map = {<class 'uuid.UUID'>: <class 'veupath_chatbot.persistence.models.GUID'>, JSONArray: <class 'sqlalchemy.sql.sqltypes.JSON'>, JSONObject: <class 'sqlalchemy.sql.sqltypes.JSON'>}
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

metadata: ClassVar[MetaData] = MetaData()

Refers to the _schema.MetaData collection that will be used for new _schema.Table objects.

registry: ClassVar[registry] = <sqlalchemy.orm.decl_api.registry object>

Refers to the _orm.registry in use where new _orm.Mapper objects will be associated.

class veupath_chatbot.persistence.models.User(**kwargs)[source]

Bases: Base

User model for tracking strategies.

id: Mapped[UUID]
external_id: Mapped[str | None]
created_at: Mapped[datetime]
streams: Mapped[list[Stream]]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class veupath_chatbot.persistence.models.ControlSet(**kwargs)[source]

Bases: Base

Reusable control gene set with provenance metadata.

id: Mapped[UUID]
user_id: Mapped[UUID | None]
name: Mapped[str]
site_id: Mapped[str]
record_type: Mapped[str]
positive_ids: Mapped[JSONArray]
negative_ids: Mapped[JSONArray]
source: Mapped[str | None]
tags: Mapped[JSONArray]
provenance_notes: Mapped[str | None]
version: Mapped[int]
is_public: Mapped[bool]
created_at: Mapped[datetime]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class veupath_chatbot.persistence.models.ExperimentRow(**kwargs)[source]

Bases: Base

Persisted experiment with full JSON blob.

id: Mapped[str]
site_id: Mapped[str]
user_id: Mapped[str | None]
name: Mapped[str]
status: Mapped[str]
data: Mapped[JSONObject]
batch_id: Mapped[str | None]
benchmark_id: Mapped[str | None]
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class veupath_chatbot.persistence.models.Stream(**kwargs)[source]

Bases: Base

A conversation stream — the identity of a chat conversation.

All mutable state is derived from events in Redis. This table only holds identity and ownership.

id: Mapped[UUID]
user_id: Mapped[UUID]
site_id: Mapped[str]
experiment_id: Mapped[str | None]
created_at: Mapped[datetime]
user: Mapped[User]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class veupath_chatbot.persistence.models.StreamProjection(**kwargs)[source]

Bases: Base

Materialized projection of a conversation stream.

Derived from events — rebuildable by replaying the Redis stream. This is a CACHE for fast reads, not a source of truth.

stream_id: Mapped[UUID]
name: Mapped[str]
record_type: Mapped[str | None]
wdk_strategy_id: Mapped[int | None]
is_saved: Mapped[bool]
model_id: Mapped[str | None]
message_count: Mapped[int]
step_count: Mapped[int]
plan: Mapped[JSONObject]
steps: Mapped[JSONArray]
root_step_id: Mapped[str | None]
result_count: Mapped[int | None]
last_event_id: Mapped[str | None]
updated_at: Mapped[datetime]
dismissed_at: Mapped[datetime | None]
site_id: Mapped[str]
gene_set_id: Mapped[str | None]
gene_set_auto_imported: Mapped[bool]
stream: Mapped[Stream]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class veupath_chatbot.persistence.models.GeneSetRow(**kwargs)[source]

Bases: Base

Persisted gene set for workbench analysis.

id: Mapped[str]
user_id: Mapped[str | None]
site_id: Mapped[str]
name: Mapped[str]
gene_ids: Mapped[JSONArray]
source: Mapped[str]
wdk_strategy_id: Mapped[int | None]
wdk_step_id: Mapped[int | None]
search_name: Mapped[str | None]
record_type: Mapped[str | None]
parameters: Mapped[JSONObject | None]
parent_set_ids: Mapped[JSONArray]
operation: Mapped[str | None]
step_count: Mapped[int]
created_at: Mapped[datetime]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class veupath_chatbot.persistence.models.Operation(**kwargs)[source]

Bases: Base

Tracks active and completed operations for client discovery.

operation_id: Mapped[str]
stream_id: Mapped[UUID]
type: Mapped[str]
status: Mapped[str]
created_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
stream: Mapped[Stream]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

Session Management

Purpose: Async engine, session factory, and lifecycle hooks (init, close). Used by FastAPI dependency injection and background tasks.

Key functions: get_db_session(), init_db(), close_db()

SQLAlchemy async engine and session management.

veupath_chatbot.persistence.session.get_engine()[source]

Get the lazily-initialized async engine.

Return type:

AsyncEngine

veupath_chatbot.persistence.session.async_session_factory()[source]

Create a new async session from the lazily-initialized factory.

Return type:

AsyncSession

async veupath_chatbot.persistence.session.get_db_session()[source]

Dependency to get database session.

Return type:

AsyncGenerator[AsyncSession]

async veupath_chatbot.persistence.session.init_db()[source]

Initialize database — creates all tables from ORM models.

async veupath_chatbot.persistence.session.close_db()[source]

Close database connections.

Repositories

Purpose: Data access layer. Encapsulates queries and CRUD operations. Split into domain-specific repository modules.

User repository.

class veupath_chatbot.persistence.repositories.user.UserRepository(session)[source]

Bases: object

User CRUD operations.

__init__(session)[source]
async get_by_id(user_id)[source]

Get user by ID.

Return type:

User | None

async get_or_create(user_id)[source]

Get existing user or create new one.

Return type:

User

async get_or_create_by_external_id(external_id)[source]

Lookup a user by external identity (e.g. VEuPathDB email).

Creates a new row if none exists yet, avoiding race conditions with INSERT ... ON CONFLICT.

Return type:

User

async create(external_id=None)[source]

Create a new user.

Return type:

User

Repository for stream (conversation) identity + projections.

class veupath_chatbot.persistence.repositories.stream.StreamRepository(session)[source]

Bases: object

Data access for conversation streams and their projections.

__init__(session)[source]
async create(user_id, site_id, *, stream_id=None, name='', experiment_id=None)[source]
Return type:

Stream

async get_by_id(stream_id)[source]
Return type:

Stream | None

async find_by_experiment(user_id, experiment_id)[source]

Find an existing stream for a user + experiment combination.

Return type:

Stream | None

async delete(stream_id)[source]
async get_projection(stream_id)[source]
Return type:

StreamProjection | None

async list_projections(user_id, site_id=None, limit=50)[source]
Return type:

list[StreamProjection]

async get_by_wdk_strategy_id(user_id, wdk_strategy_id)[source]
Return type:

StreamProjection | None

async update_projection(stream_id, *, name=None, record_type=None, wdk_strategy_id=None, wdk_strategy_id_set=False, is_saved=None, is_saved_set=False, plan=None, step_count=None, result_count=None, result_count_set=False, gene_set_id=None, gene_set_id_set=False, gene_set_auto_imported=None)[source]

Dynamically update a StreamProjection based on provided kwargs.

Steps and root_step_id are derived from plan at read time; only plan and a denormalized step_count are persisted on write.

async dismiss(stream_id)[source]

Soft-delete: mark a projection as dismissed (hidden from main list).

async restore(stream_id)[source]

Un-dismiss: restore a dismissed projection and reset for fresh WDK import.

async list_dismissed_projections(user_id, site_id=None, limit=50)[source]
Return type:

list[StreamProjection]

async prune_wdk_orphans(user_id, site_id, live_wdk_ids)[source]

Delete streams whose projections have wdk_strategy_id not in the live set.

Returns the number of pruned streams.

Return type:

int

async register_operation(operation_id, stream_id, op_type)[source]
Return type:

Operation

async complete_operation(operation_id)[source]
async fail_operation(operation_id)[source]
async cancel_operation(operation_id)[source]
async get_active_operations(stream_id)[source]
Return type:

list[Operation]

async list_active_operations(op_type=None)[source]
Return type:

list[Operation]

Control set repository.

class veupath_chatbot.persistence.repositories.control_set.ControlSetRepository(session)[source]

Bases: object

Control set CRUD operations.

__init__(session)[source]
async get_by_id(control_set_id)[source]

Get control set by ID.

Return type:

ControlSet | None

async list_by_site(site_id, user_id=None, tags=None, limit=100)[source]

List control sets for a site, including public ones and user-owned.

Return type:

list[ControlSet]

async create(*, name, site_id, record_type, positive_ids, negative_ids, source=None, tags=None, provenance_notes=None, is_public=False, user_id=None)[source]

Create a new control set.

Return type:

ControlSet

async delete(control_set_id)[source]

Delete a control set.

Return type:

bool