Persistence¶
Database layer: SQLAlchemy models, session management, and repositories for users, experiments, streams, and control sets. Used by the HTTP layer and background jobs.
Overview¶
ORM Models — User, ControlSet, and related models. Map to PostgreSQL tables.
Session — Async engine, session factory, init/close lifecycle.
Repositories — UserRepository, StreamRepository, ControlSetRepository. Encapsulate queries and CRUD.
Design Decisions¶
Async SQLAlchemy
All database operations use async sessions with
asyncpg (PostgreSQL’s native async driver). This ensures the event loop
is never blocked by database I/O, which matters for SSE streaming where many
concurrent connections share the same process.
Repository pattern
Each domain entity (users, streams, control sets) has its own repository class that encapsulates queries. Services never construct raw SQL — they call repository methods. This makes testing easier (mock the repository, not the database) and keeps SQL details out of business logic.
Alembic for migrations
Schema migrations use Alembic with async-compatible
migration scripts. create_all is retained for development convenience
(fresh databases), but production-like environments should use Alembic to
apply schema changes incrementally.
UUID primary keys
All entities use UUID primary keys (via the custom
GUID type decorator) for globally unique, non-sequential identifiers. This
allows distributed ID generation without coordination and prevents information
leakage from sequential IDs.
Note
Schema migrations use Alembic (see alembic/versions/).
create_all is retained for development convenience on fresh databases.
ORM Models¶
Purpose: SQLAlchemy models for users, control sets, experiments, streams, gene sets, and operations. Define the schema and relationships.
Key classes: User, ControlSet, ExperimentRow, Stream
SQLAlchemy ORM models.
- class veupath_chatbot.persistence.models.GUID(*args, **kwargs)[source]¶
Bases:
TypeDecoratorPlatform-independent GUID type.
Uses CHAR(36) and stores UUIDs as strings. Returns proper
UUIDobjects on read so that Python-side comparisons (e.g.stream.user_id == some_uuid) work correctly.- cache_ok = True¶
Indicate if statements using this
ExternalTypeare “safe to cache”.The default value
Nonewill emit a warning and then not allow caching of a statement which includes this type. Set toFalseto disable statements using this type from being cached at all without a warning. When set toTrue, the object’s class and selected elements from its state will be used as part of the cache key. For example, using aTypeDecorator:class MyType(TypeDecorator): impl = String cache_ok = True def __init__(self, choices): self.choices = tuple(choices) self.internal_only = True
The cache key for the above type would be equivalent to:
>>> MyType(["a", "b", "c"])._static_cache_key (<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))
The caching scheme will extract attributes from the type that correspond to the names of parameters in the
__init__()method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.
To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:
class LookupType(UserDefinedType): """a custom type that accepts a dictionary as a parameter. this is the non-cacheable version, as "self.lookup" is not hashable. """ def __init__(self, lookup): self.lookup = lookup def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): ... # works with "self.lookup" ...
Where “lookup” is a dictionary. The type will not be able to generate a cache key:
>>> type_ = LookupType({"a": 10, "b": 20}) >>> type_._static_cache_key <stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not produce a cache key because the ``cache_ok`` flag is not set to True. Set this flag to True if this type object's state is safe to use in a cache key, or False to disable this warning. symbol('no_cache')
If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:
>>> # set cache_ok = True >>> type_.cache_ok = True >>> # this is the cache key it would generate >>> key = type_._static_cache_key >>> key (<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20})) >>> # however this key is not hashable, will fail when used with >>> # SQLAlchemy statement cache >>> some_cache = {key: "some sql value"} Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: unhashable type: 'dict'
The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:
class LookupType(UserDefinedType): """a custom type that accepts a dictionary as a parameter. The dictionary is stored both as itself in a private variable, and published in a public variable as a sorted tuple of tuples, which is hashable and will also return the same value for any two equivalent dictionaries. Note it assumes the keys and values of the dictionary are themselves hashable. """ cache_ok = True def __init__(self, lookup): self._lookup = lookup # assume keys/values of "lookup" are hashable; otherwise # they would also need to be converted in some way here self.lookup = tuple((key, lookup[key]) for key in sorted(lookup)) def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): ... # works with "self._lookup" ...
Where above, the cache key for
LookupType({"a": 10, "b": 20})will be:>>> LookupType({"a": 10, "b": 20})._static_cache_key (<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))
Added in version 1.4.14: - added the
cache_okflag to allow some configurability of caching forTypeDecoratorclasses.Added in version 1.4.28: - added the
ExternalTypemixin which generalizes thecache_okflag to both theTypeDecoratorandUserDefinedTypeclasses.See also
- load_dialect_impl(dialect)[source]¶
Return a
TypeEngineobject corresponding to a dialect.This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the
TypeDecoratorimplementation oftype_engine()to help determine what type should ultimately be returned for a givenTypeDecorator.By default returns
self.impl.- Return type:
- process_bind_param(value, dialect)[source]¶
Receive a bound parameter value to be converted.
Custom subclasses of
_types.TypeDecoratorshould override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.
- process_result_value(value, dialect)[source]¶
Receive a result-row column value to be converted.
Custom subclasses of
_types.TypeDecoratorshould override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.
- class veupath_chatbot.persistence.models.Base(**kwargs)[source]¶
Bases:
DeclarativeBaseBase class for all models.
- type_annotation_map = {<class 'uuid.UUID'>: <class 'veupath_chatbot.persistence.models.GUID'>, JSONArray: <class 'sqlalchemy.sql.sqltypes.JSON'>, JSONObject: <class 'sqlalchemy.sql.sqltypes.JSON'>}¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.User(**kwargs)[source]¶
Bases:
BaseUser model for tracking strategies.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.ControlSet(**kwargs)[source]¶
Bases:
BaseReusable control gene set with provenance metadata.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.ExperimentRow(**kwargs)[source]¶
Bases:
BasePersisted experiment with full JSON blob.
- data: Mapped[JSONObject]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.Stream(**kwargs)[source]¶
Bases:
BaseA conversation stream — the identity of a chat conversation.
All mutable state is derived from events in Redis. This table only holds identity and ownership.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.StreamProjection(**kwargs)[source]¶
Bases:
BaseMaterialized projection of a conversation stream.
Derived from events — rebuildable by replaying the Redis stream. This is a CACHE for fast reads, not a source of truth.
- plan: Mapped[JSONObject]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.GeneSetRow(**kwargs)[source]¶
Bases:
BasePersisted gene set for workbench analysis.
- parameters: Mapped[JSONObject | None]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- class veupath_chatbot.persistence.models.Operation(**kwargs)[source]¶
Bases:
BaseTracks active and completed operations for client discovery.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
Session Management¶
Purpose: Async engine, session factory, and lifecycle hooks (init, close). Used by FastAPI dependency injection and background tasks.
Key functions: get_db_session(), init_db(), close_db()
SQLAlchemy async engine and session management.
- veupath_chatbot.persistence.session.get_engine()[source]¶
Get the lazily-initialized async engine.
- Return type:
- veupath_chatbot.persistence.session.async_session_factory()[source]¶
Create a new async session from the lazily-initialized factory.
- Return type:
- async veupath_chatbot.persistence.session.get_db_session()[source]¶
Dependency to get database session.
- Return type:
Repositories¶
Purpose: Data access layer. Encapsulates queries and CRUD operations. Split into domain-specific repository modules.
User repository.
- class veupath_chatbot.persistence.repositories.user.UserRepository(session)[source]¶
Bases:
objectUser CRUD operations.
Repository for stream (conversation) identity + projections.
- class veupath_chatbot.persistence.repositories.stream.StreamRepository(session)[source]¶
Bases:
objectData access for conversation streams and their projections.
- async create(user_id, site_id, *, stream_id=None, name='', experiment_id=None)[source]¶
- Return type:
- async find_by_experiment(user_id, experiment_id)[source]¶
Find an existing stream for a user + experiment combination.
- Return type:
Stream | None
- async get_projection(stream_id)[source]¶
- Return type:
StreamProjection | None
- async get_by_wdk_strategy_id(user_id, wdk_strategy_id)[source]¶
- Return type:
StreamProjection | None
- async update_projection(stream_id, *, name=None, record_type=None, wdk_strategy_id=None, wdk_strategy_id_set=False, is_saved=None, is_saved_set=False, plan=None, step_count=None, result_count=None, result_count_set=False, gene_set_id=None, gene_set_id_set=False, gene_set_auto_imported=None)[source]¶
Dynamically update a StreamProjection based on provided kwargs.
Steps and root_step_id are derived from plan at read time; only plan and a denormalized step_count are persisted on write.
- async dismiss(stream_id)[source]¶
Soft-delete: mark a projection as dismissed (hidden from main list).
- async restore(stream_id)[source]¶
Un-dismiss: restore a dismissed projection and reset for fresh WDK import.
Control set repository.
- class veupath_chatbot.persistence.repositories.control_set.ControlSetRepository(session)[source]¶
Bases:
objectControl set CRUD operations.
- async get_by_id(control_set_id)[source]¶
Get control set by ID.
- Return type:
ControlSet | None
- async list_by_site(site_id, user_id=None, tags=None, limit=100)[source]¶
List control sets for a site, including public ones and user-owned.
- Return type: