"""Alembic environment for neuronetz-gateway (async engine). Reads ``DATABASE_URL`` from the environment (the same value the app uses, ``postgresql+asyncpg://...``). Ensures schema ``gateway`` exists and pins the Alembic version table into that schema so migration bookkeeping never collides with the ``console`` schema in the shared database. """ from __future__ import annotations import asyncio import os from logging.config import fileConfig from alembic import context from sqlalchemy import pool, text from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config from neuronetz_gateway.config import get_settings from neuronetz_gateway.db.models import GATEWAY_SCHEMA, Base config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata def _database_url() -> str: """Resolve the async database URL from env, falling back to settings.""" return os.environ.get("DATABASE_URL") or get_settings().database_url def _configure_context(connection: Connection) -> None: """Configure migration context with the gateway schema + version table.""" context.configure( connection=connection, target_metadata=target_metadata, version_table="alembic_version", version_table_schema=GATEWAY_SCHEMA, include_schemas=True, compare_type=True, ) def run_migrations_offline() -> None: """Run migrations in 'offline' mode (emit SQL without a DBAPI connection).""" context.configure( url=_database_url(), target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, version_table="alembic_version", version_table_schema=GATEWAY_SCHEMA, include_schemas=True, ) with context.begin_transaction(): context.run_migrations() def _do_run_migrations(connection: Connection) -> None: """Ensure the schema exists, then run migrations within a transaction. The ``CREATE SCHEMA`` is committed in its own transaction before configuring Alembic. Under SQLAlchemy 2.0, ``execute()`` auto-begins a transaction; if it were left open, Alembic's ``begin_transaction()`` would treat the connection as caller-managed and become a no-op that never commits, so the whole migration (and the schema) would be rolled back on connection close. Committing here leaves the connection clean so Alembic owns — and commits — its own transaction. """ connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{GATEWAY_SCHEMA}"')) connection.commit() _configure_context(connection) with context.begin_transaction(): context.run_migrations() async def run_migrations_online() -> None: """Run migrations in 'online' mode using an async engine.""" configuration = config.get_section(config.config_ini_section) or {} configuration["sqlalchemy.url"] = _database_url() connectable = async_engine_from_config( configuration, prefix="sqlalchemy.", poolclass=pool.NullPool, ) async with connectable.connect() as connection: await connection.run_sync(_do_run_migrations) await connectable.dispose() if context.is_offline_mode(): run_migrations_offline() else: asyncio.run(run_migrations_online())