"""Shared integration fixtures: a fully-wired gateway app driven in-process. The integration tests build the real :func:`neuronetz_gateway.app.create_app` (lifespan + AuthMiddleware + routes) against testcontainers Postgres + Redis, with the upstream Ollama overridden to the in-process mock via ``get_ollama_client`` (the override contract documented in ``deps.py``). The fixture also creates a tenant + API key directly through the repositories so tests don't depend on the CLI being present, and seeds the in-process discovery cache with the mock catalogue so the model allowlist resolves without relying on the background poller racing against the test. Skips cleanly when Docker is unavailable (the postgres/redis container fixtures self-skip). All env vars are scoped to the fixture and the settings cache is cleared so the app reads them fresh. """ from __future__ import annotations import asyncio import secrets import uuid from collections.abc import AsyncIterator from dataclasses import dataclass from typing import Any import httpx import pytest import pytest_asyncio from argon2 import PasswordHasher from fastapi import FastAPI from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from testcontainers.postgres import PostgresContainer from testcontainers.redis import RedisContainer from neuronetz_gateway.app import create_app from neuronetz_gateway.auth.hashing import build_hasher, hash_secret from neuronetz_gateway.auth.keys import generate_key from neuronetz_gateway.config import Settings, get_settings from neuronetz_gateway.db.repositories import ( ApiKeyRepository, KeyLimitRepository, TenantRepository, ) from neuronetz_gateway.deps import get_ollama_client from neuronetz_gateway.proxy.discovery import DiscoveredModel, names_of from neuronetz_gateway.proxy.ollama import OllamaClient from tests.integration.mock_ollama import DEFAULT_MODELS, create_mock_ollama # --- fixtures --------------------------------------------------------------- @dataclass(frozen=True, slots=True) class IntegrationKey: """A test API key + the tenant it belongs to.""" tenant_id: uuid.UUID tenant_name: str key_id: uuid.UUID full_key: str prefix: str @dataclass(slots=True) class IntegrationApp: """Built-app + driver bundle handed to integration tests.""" app: FastAPI base_url: str settings: Settings engine: AsyncEngine sessionmaker: async_sessionmaker[Any] redis_url: str postgres_url: str hasher: PasswordHasher mock_upstream: FastAPI # DDL for schema + enums + tables + indexes + trigger. This is the same SQL the # alembic 0001 migration emits; we run it directly to avoid invoking the alembic # CLI from the test process. Kept in sync with ``alembic/versions/0001_initial.py``. _DDL_PATH_TO_MIGRATION = ( "Built from db.models.Base.metadata + SPEC §5 trigger; see alembic 0001." ) async def _create_schema_and_tables(engine: AsyncEngine) -> None: """Create the ``gateway`` schema, enums, tables, indexes, and trigger.""" from neuronetz_gateway.db.models import GATEWAY_SCHEMA, Base async with engine.begin() as conn: await conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{GATEWAY_SCHEMA}"')) # Create the three Postgres enums the models reference (create_type=False # on the columns means the metadata won't create them itself). for name, values in [ ("key_status", ("active", "disabled", "revoked")), ("tenant_status", ("active", "suspended", "closed")), ("budget_period", ("day", "month", "total")), ]: values_sql = ", ".join(f"'{v}'" for v in values) await conn.execute( text( f""" DO $$ BEGIN CREATE TYPE "{GATEWAY_SCHEMA}".{name} AS ENUM ({values_sql}); EXCEPTION WHEN duplicate_object THEN NULL; END $$; """ ) ) await conn.run_sync(Base.metadata.create_all) # The NOTIFY trigger on gateway.revocations (SPEC §5). await conn.execute( text( f""" CREATE OR REPLACE FUNCTION "{GATEWAY_SCHEMA}".notify_key_revoked() RETURNS trigger AS $$ BEGIN PERFORM pg_notify('key_revoked', NEW.key_id::text); RETURN NEW; END; $$ LANGUAGE plpgsql; """ ) ) await conn.execute( text( f'DROP TRIGGER IF EXISTS trg_notify_key_revoked ' f'ON "{GATEWAY_SCHEMA}".revocations' ) ) await conn.execute( text( f""" CREATE TRIGGER trg_notify_key_revoked AFTER INSERT ON "{GATEWAY_SCHEMA}".revocations FOR EACH ROW EXECUTE FUNCTION "{GATEWAY_SCHEMA}".notify_key_revoked() """ ) ) @pytest_asyncio.fixture() async def integration_app( postgres_container: PostgresContainer, redis_container: RedisContainer, monkeypatch: pytest.MonkeyPatch, ) -> AsyncIterator[IntegrationApp]: """Wire up the full gateway app against real Postgres + Redis + mock Ollama.""" # ---- URLs from the testcontainers (asyncpg driver for SQLAlchemy) ---- pg_raw = postgres_container.get_connection_url() for prefix in ("postgresql+psycopg2://", "postgresql+psycopg://", "postgresql://"): if pg_raw.startswith(prefix): pg_url = "postgresql+asyncpg://" + pg_raw[len(prefix) :] break else: pg_url = pg_raw redis_host = redis_container.get_container_host_ip() redis_port = redis_container.get_exposed_port(6379) redis_url = f"redis://{redis_host}:{redis_port}/0" # ---- Settings env (cleared cache so create_app reads these) ---- monkeypatch.setenv("DATABASE_URL", pg_url) monkeypatch.setenv("REDIS_URL", redis_url) monkeypatch.setenv("OLLAMA_BASE_URL", "http://ollama") # any URL; overridden monkeypatch.setenv("GATEWAY_LOG_LEVEL", "WARNING") monkeypatch.setenv("ARGON2_TIME_COST", "1") monkeypatch.setenv("ARGON2_MEMORY_COST_KIB", "8") monkeypatch.setenv("ARGON2_PARALLELISM", "1") get_settings.cache_clear() settings = get_settings() # ---- Migrate schema directly (no alembic CLI needed). ---- bootstrap_engine = create_async_engine(pg_url) await _create_schema_and_tables(bootstrap_engine) await bootstrap_engine.dispose() # ---- Build the app (its lifespan starts on first ASGI request). ---- app = create_app(settings) # Override the upstream proxy to point at the in-process mock Ollama. mock_upstream = create_mock_ollama() transport = httpx.ASGITransport(app=mock_upstream) mock_http = httpx.AsyncClient(transport=transport, base_url="http://ollama") app.dependency_overrides[get_ollama_client] = lambda: OllamaClient(mock_http) # Force the discovery cache to be deterministic (mirrors the mock catalogue) # so model-allowlist checks resolve without depending on the background poll # racing the test. The lifespan will replace app.state.discovery_cache, so we # seed *after* startup. We do that via a startup hook injection below. base_url = "http://gateway" # ASGITransport does not drive the lifespan automatically; run it explicitly # so app.state is populated (hasher / discovery_cache / sessionmaker / etc.) # before any test traffic. Equivalent to what uvicorn does on startup. async with app.router.lifespan_context(app): # Seed the discovery cache so tests don't race the background poller. models = [DiscoveredModel(name=n, family=n.split(":", 1)[0]) for n in DEFAULT_MODELS] cache = app.state.discovery_cache await cache.set(models) # Sanity: hasher and sessionmaker were placed on app.state by lifespan. if getattr(app.state, "hasher", None) is None: pytest.skip("pending Backend: app.state.hasher not set by lifespan") if getattr(app.state, "db_sessionmaker", None) is None: pytest.skip("pending Backend: db_sessionmaker not set by lifespan") yield IntegrationApp( app=app, base_url=base_url, settings=settings, engine=app.state.db_engine, sessionmaker=app.state.db_sessionmaker, redis_url=redis_url, postgres_url=pg_url, hasher=app.state.hasher, mock_upstream=mock_upstream, ) # Cleanup await mock_http.aclose() get_settings.cache_clear() @pytest_asyncio.fixture() async def client(integration_app: IntegrationApp) -> AsyncIterator[httpx.AsyncClient]: """An ASGI httpx client bound to the wired gateway app.""" async with httpx.AsyncClient( transport=httpx.ASGITransport(app=integration_app.app), base_url=integration_app.base_url, ) as c: yield c async def _create_tenant_and_key( integration_app: IntegrationApp, *, tenant_name: str | None = None, allow_all_models: bool = False, allowed_models: list[str] | None = None, tokens_daily: int | None = None, rpm: int = 60, tpm: int = 100_000, ) -> IntegrationKey: """Insert a tenant + API key directly via the repositories. Mirrors what the bootstrap CLI does, without invoking it. The full key is only available here in the test process — never logged or persisted. """ name = tenant_name or f"acme-{secrets.token_hex(4)}" gen = generate_key() hasher = build_hasher(integration_app.settings) encoded = hash_secret(hasher, gen.full_key) async with integration_app.sessionmaker() as session: tenants = TenantRepository(session) tenant = await tenants.create( name=name, rpm=rpm, tpm=tpm, concurrent=8, allow_all_models=allow_all_models ) if allowed_models is not None or allow_all_models: await tenants.set_models( tenant.id, allowed_models=allowed_models if allowed_models is not None else None, allow_all_models=allow_all_models, ) keys_repo = ApiKeyRepository(session) key = await keys_repo.create( tenant_id=tenant.id, prefix=gen.prefix, key_hash=encoded, name="test-key", scopes=["chat", "embeddings"], ) if tokens_daily is not None: klr = KeyLimitRepository(session) await klr.upsert_budget( key.id, tokens_daily=tokens_daily, tokens_monthly=None, tokens_total=None, ) await session.commit() return IntegrationKey( tenant_id=tenant.id, tenant_name=name, key_id=key.id, full_key=gen.full_key, prefix=gen.prefix, ) @pytest_asyncio.fixture() async def api_key(integration_app: IntegrationApp) -> IntegrationKey: """A default tenant + key with explicit access to the mock-catalogue models.""" return await _create_tenant_and_key( integration_app, allowed_models=list(DEFAULT_MODELS), allow_all_models=False, ) @pytest_asyncio.fixture() async def allow_all_key(integration_app: IntegrationApp) -> IntegrationKey: """A tenant with ``allow_all_models`` opted in.""" return await _create_tenant_and_key( integration_app, allow_all_models=True, allowed_models=[] ) @pytest_asyncio.fixture() async def restricted_key(integration_app: IntegrationApp) -> IntegrationKey: """A default-deny tenant whose allowlist only includes one mock model.""" return await _create_tenant_and_key( integration_app, allow_all_models=False, allowed_models=["llama3.1:8b"], ) # Re-export the helper so individual tests can build extra keys without # reaching back through pytest's fixture machinery. __all__ = [ "IntegrationApp", "IntegrationKey", "_create_tenant_and_key", "allow_all_key", "api_key", "client", "integration_app", "restricted_key", ] # `asyncio` and `names_of` are imported above so that this module is self- # contained for static analysers; suppress unused-import warnings in tests. _ = (asyncio, names_of)