"""Bootstrap CLI (Typer) per SPEC §11. Entry point: ``neuronetz-gateway = neuronetz_gateway.cli.manage:app``. This is the *only* supported way to create tenants and keys (AGENT_PROMPT non-negotiable #10: the CLI must work before the first manual ``curl``). Each command opens its own short-lived async engine against ``DATABASE_URL``, does its unit of work in a transaction, and exits. The full API key is printed exactly once, at creation, and never stored or logged. ``list-models`` reads the discovery cache from Redis (SPEC §4.6); with ``--tenant`` it also resolves and prints that tenant's effective model set. """ from __future__ import annotations import asyncio from collections.abc import Awaitable, Callable from pathlib import Path from typing import Annotated import typer from neuronetz_gateway.auth.hashing import build_hasher, hash_secret from neuronetz_gateway.auth.keys import generate_key from neuronetz_gateway.config import BackendSpec, Settings, get_settings from neuronetz_gateway.db.models import BudgetPeriod, KeyStatus from neuronetz_gateway.db.repositories import ( ApiKeyRepository, BudgetRepository, KeyLimitRepository, RevocationRepository, TenantRepository, ) from neuronetz_gateway.db.session import create_engine, create_session_factory, session_scope from neuronetz_gateway.proxy.allowlist import resolve_effective_models from neuronetz_gateway.proxy.discovery import read_discovered_from_redis app = typer.Typer( name="neuronetz-gateway", help="Bootstrap CLI for the neuronetz-gateway (tenants, keys, budgets).", no_args_is_help=True, add_completion=False, ) def _run[T](coro_factory: Callable[[Settings], Awaitable[T]]) -> T: """Execute an async unit of work against a fresh engine, then dispose it.""" async def _main() -> T: settings = get_settings() engine = create_engine(settings) try: return await coro_factory(settings) finally: await engine.dispose() return asyncio.run(_main()) @app.command("create-tenant") def create_tenant( name: Annotated[str, typer.Option("--name", help="Unique tenant name.")], rpm: Annotated[int, typer.Option("--rpm", help="Requests-per-minute limit.")] = 60, tpm: Annotated[int, typer.Option("--tpm", help="Tokens-per-minute limit.")] = 100_000, concurrent: Annotated[ int, typer.Option("--concurrent", help="Concurrent-connection cap.") ] = 8, allow_all_models: Annotated[ bool, typer.Option( "--allow-all-models/--no-allow-all-models", help="Opt the tenant into using any installed model.", ), ] = False, ) -> None: """Create a tenant with optional rate limits and model policy.""" async def work(settings: Settings) -> None: factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: tenants = TenantRepository(session) if await tenants.get_by_name(name) is not None: raise typer.BadParameter(f"tenant {name!r} already exists") tenant = await tenants.create( name=name, rpm=rpm, tpm=tpm, concurrent=concurrent, allow_all_models=allow_all_models, ) typer.echo(f"created tenant {tenant.name} ({tenant.id})") typer.echo(f" allow_all_models={allow_all_models} rpm={rpm} tpm={tpm}") _run(work) @app.command("create-key") def create_key( tenant: Annotated[str, typer.Option("--tenant", help="Owning tenant name.")], name: Annotated[str, typer.Option("--name", help="Human-readable key name.")], scopes: Annotated[ str, typer.Option("--scopes", help="Comma-separated scopes.") ] = "chat,embeddings", ) -> None: """Create an API key for a tenant. The full key is printed exactly once.""" async def work(settings: Settings) -> None: factory = create_session_factory(create_engine(settings)) hasher = build_hasher(settings) scope_list = [s.strip() for s in scopes.split(",") if s.strip()] async with session_scope(factory) as session: tenants = TenantRepository(session) tenant_row = await tenants.get_by_name(tenant) if tenant_row is None: raise typer.BadParameter(f"unknown tenant {tenant!r}") generated = generate_key() key_hash = hash_secret(hasher, generated.full_key) keys = ApiKeyRepository(session) created = await keys.create( tenant_id=tenant_row.id, prefix=generated.prefix, key_hash=key_hash, name=name, scopes=scope_list, ) typer.echo(f"created key {created.name} for tenant {tenant} (prefix {created.prefix})") typer.echo("") typer.secho("API KEY (shown once — store it now):", fg=typer.colors.YELLOW, bold=True) typer.secho(generated.full_key, fg=typer.colors.GREEN, bold=True) _run(work) @app.command("revoke-key") def revoke_key( prefix: Annotated[str, typer.Option("--prefix", help="Key prefix to revoke.")], ) -> None: """Revoke a key by its prefix (sets status + writes the revocation outbox).""" async def work(settings: Settings) -> None: factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: keys = ApiKeyRepository(session) key = await keys.get_by_prefix(prefix) if key is None: raise typer.BadParameter(f"no key with prefix {prefix!r}") await keys.set_status(key.id, KeyStatus.revoked) await RevocationRepository(session).insert(key.id, reason="cli revoke") typer.echo(f"revoked key {prefix} ({key.id})") _run(work) @app.command("list-keys") def list_keys( tenant: Annotated[str, typer.Option("--tenant", help="Tenant whose keys to list.")], ) -> None: """List a tenant's keys (prefixes and metadata, never full keys).""" async def work(settings: Settings) -> None: factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: tenants = TenantRepository(session) tenant_row = await tenants.get_by_name(tenant) if tenant_row is None: raise typer.BadParameter(f"unknown tenant {tenant!r}") rows = await ApiKeyRepository(session).list_for_tenant(tenant_row.id) if not rows: typer.echo("(no keys)") return for key in rows: typer.echo( f"{key.prefix} status={key.status.value:<8} " f"name={key.name!r} created={key.created_at.isoformat()}" ) _run(work) @app.command("show-usage") def show_usage( tenant: Annotated[str, typer.Option("--tenant", help="Tenant to report usage for.")], period: Annotated[str, typer.Option("--period", help="Period: day|month|total.")] = "day", ) -> None: """Show token/request usage for a tenant in a period.""" async def work(settings: Settings) -> None: try: period_enum = BudgetPeriod(period) except ValueError as exc: raise typer.BadParameter("period must be one of day|month|total") from exc factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: tenant_row = await TenantRepository(session).get_by_name(tenant) if tenant_row is None: raise typer.BadParameter(f"unknown tenant {tenant!r}") tokens_in, tokens_out, requests = await BudgetRepository(session).usage_for_tenant( tenant_row.id, period_enum ) typer.echo(f"usage for {tenant} (period={period}):") typer.echo(f" requests={requests} tokens_in={tokens_in} tokens_out={tokens_out}") _run(work) @app.command("set-budget") def set_budget( key: Annotated[str, typer.Option("--key", help="Key prefix to set budget on.")], daily: Annotated[int | None, typer.Option("--daily", help="Daily token budget.")] = None, monthly: Annotated[ int | None, typer.Option("--monthly", help="Monthly token budget.") ] = None, total: Annotated[int | None, typer.Option("--total", help="Lifetime token budget.")] = None, ) -> None: """Set per-key token budgets.""" async def work(settings: Settings) -> None: if daily is None and monthly is None and total is None: raise typer.BadParameter("provide at least one of --daily/--monthly/--total") factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: key_row = await ApiKeyRepository(session).get_by_prefix(key) if key_row is None: raise typer.BadParameter(f"no key with prefix {key!r}") await KeyLimitRepository(session).upsert_budget( key_row.id, tokens_daily=daily, tokens_monthly=monthly, tokens_total=total, ) typer.echo(f"set budget on {key}: daily={daily} monthly={monthly} total={total}") _run(work) @app.command("set-models") def set_models( tenant: Annotated[str, typer.Option("--tenant", help="Tenant to set models for.")], models: Annotated[ str | None, typer.Option("--models", help="Comma-separated model allowlist.") ] = None, allow_all: Annotated[ bool | None, typer.Option( "--allow-all/--no-allow-all", help="Opt into / out of allow_all_models for the tenant.", ), ] = None, ) -> None: """Set a tenant's model allowlist and/or its allow_all_models flag.""" async def work(settings: Settings) -> None: if models is None and allow_all is None: raise typer.BadParameter("provide --models and/or --allow-all/--no-allow-all") allowed = ( [m.strip() for m in models.split(",") if m.strip()] if models is not None else None ) factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: tenants = TenantRepository(session) tenant_row = await tenants.get_by_name(tenant) if tenant_row is None: raise typer.BadParameter(f"unknown tenant {tenant!r}") await tenants.set_models( tenant_row.id, allowed_models=allowed, allow_all_models=allow_all ) typer.echo(f"updated models for {tenant}: allowed={allowed} allow_all={allow_all}") _run(work) @app.command("list-models") def list_models( tenant: Annotated[ str | None, typer.Option("--tenant", help="Also show this tenant's effective set.") ] = None, ) -> None: """Show live-discovered models per backend (and, with --tenant, the effective set). Spins up a short-lived poll against every configured backend to surface which models are currently live where, then (optionally) computes the given tenant's effective allow-list intersection. """ import httpx import redis.asyncio as redis from neuronetz_gateway.proxy.discovery import fetch_tags, names_of from neuronetz_gateway.proxy.router import build_backend_headers async def _poll_one(backend: BackendSpec) -> tuple[str, list[str]]: try: async with httpx.AsyncClient( base_url=backend.base_url, timeout=httpx.Timeout(connect=5, read=10, write=10, pool=10), headers=build_backend_headers(backend), ) as client: return backend.name, sorted(names_of(await fetch_tags(client))) except (httpx.HTTPError, ValueError) as exc: typer.secho( f" ({backend.name}: probe failed — {type(exc).__name__})", fg=typer.colors.RED, ) return backend.name, [] async def work(settings: Settings) -> None: backends = settings.effective_backends() results = [await _poll_one(b) for b in backends] union: set[str] = set() typer.echo("live-discovered models, by backend:") for name, models in results: typer.echo(f" [{name}]") if not models: typer.echo(" (none)") for m in models: typer.echo(f" {m}") union.update(models) typer.echo(f"\nunion across all backends: {len(union)} unique model(s)") # Surface what Redis currently has cached too (for sanity vs the live poll). client = redis.from_url(settings.redis_url, decode_responses=True) try: cached = await read_discovered_from_redis(client) finally: await client.aclose() if cached: typer.echo( f"redis cache (gateway:models:discovered): {len(cached)} model(s)" ) if tenant is None: return discovered = frozenset(union) factory = create_session_factory(create_engine(settings)) async with session_scope(factory) as session: tenants = TenantRepository(session) tenant_row = await tenants.get_by_name(tenant) if tenant_row is None: raise typer.BadParameter(f"unknown tenant {tenant!r}") limits = await tenants.get_limits(tenant_row.id) if limits is None: raise typer.BadParameter(f"tenant {tenant!r} has no limits row") effective = resolve_effective_models( allow_all=limits.allow_all_models, allowed_models=tuple(limits.allowed_models), discovered=discovered, ) typer.echo(f"effective set for tenant {tenant}:") for name in sorted(effective): typer.echo(f" {name}") _run(work) @app.command("probe-ollama") def probe_ollama( *, timeout: Annotated[float, typer.Option(help="Per-request timeout in seconds.")] = 10.0, ) -> None: """Probe every configured Ollama backend (GET /api/version + /api/tags). Iterates every entry returned by :meth:`Settings.effective_backends` (i.e. the OLLAMA_BACKENDS JSON list, or a single synthesized backend from the legacy OLLAMA_BASE_URL fields). Reports per-backend status and the first 5 model names; exits non-zero if any backend fails. Tokens are never printed — only whether one was attached. """ import httpx from neuronetz_gateway.proxy.router import build_backend_headers settings = get_settings() backends = settings.effective_backends() async def _probe_one(backend: BackendSpec, idx: int, total: int) -> int: marker = f"[{idx + 1}/{total}] {backend.name}" typer.echo("") typer.secho(marker, bold=True) typer.echo(f" target: {backend.base_url}") typer.echo( f" auth: {('sending ' + backend.auth_header) if backend.has_auth else 'no token'}" ) probe_timeout = httpx.Timeout( connect=settings.ollama_connect_timeout_s, read=timeout, write=timeout, pool=timeout, ) async with httpx.AsyncClient( base_url=backend.base_url, timeout=probe_timeout, headers=build_backend_headers(backend), ) as client: errors = 0 for path in ("/api/version", "/api/tags"): try: resp = await client.get(path) except httpx.HTTPError as exc: typer.secho( f" GET {path} ✗ transport error: {type(exc).__name__}", fg=typer.colors.RED, ) errors += 1 continue if resp.status_code >= 400: typer.secho( f" GET {path} ✗ HTTP {resp.status_code}", fg=typer.colors.RED, ) if resp.status_code in (401, 403): typer.echo( " upstream rejected the credentials — check the " "auth_token for this backend." ) errors += 1 continue if path == "/api/version": typer.secho(f" GET {path} ✓ HTTP 200", fg=typer.colors.GREEN) else: ct = resp.headers.get("content-type", "") body = resp.json() if ct.startswith("application/json") else {} n = len(body.get("models", [])) typer.secho( f" GET {path} ✓ HTTP 200, {n} model(s) discovered", fg=typer.colors.GREEN, ) for m in body.get("models", [])[:5]: typer.echo(f" · {m.get('name') or m.get('model')}") if n > 5: typer.echo(f" … and {n - 5} more") return errors async def _go() -> int: total_errors = 0 for idx, backend in enumerate(backends): total_errors += await _probe_one(backend, idx, len(backends)) return total_errors errors = asyncio.run(_go()) typer.echo("") if errors: typer.secho(f"{errors} probe(s) failed.", fg=typer.colors.RED, bold=True) raise typer.Exit(code=1) typer.secho( f"all {len(backends)} backend(s) reachable and authenticated.", fg=typer.colors.GREEN, bold=True, ) typer.secho("upstream reachable and authenticated.", fg=typer.colors.GREEN, bold=True) def _emit_paste_fallback(path: Path, line: str, exc: BaseException) -> None: """Print a clean fallback message when --write-env can't write the file. The bootstrap CLI usually runs inside the gateway container (uid 10001), but the host-mounted .env is typically owned by the host user — so the in-container process can't write it. Instead of crashing with a traceback, print the line the user can paste, and tell them how to retry with the right privileges. """ typer.secho(f"✗ could not write {path}: {type(exc).__name__}", fg=typer.colors.RED) typer.echo(f" ({exc})") typer.echo("") typer.echo("Two ways forward:") typer.echo(" 1. Re-run the same command with `docker exec -u root …` so the in-container") typer.echo(" process can write to the host-mounted .env.") typer.echo(" 2. Paste this into your .env manually, then `docker compose up -d gateway`:") typer.echo("") typer.echo(f" OLLAMA_BACKENDS={line}") typer.echo("") @app.command("list-backends") def list_backends() -> None: """Show the configured Ollama backends (tokens redacted). Reads from ``OLLAMA_BACKENDS`` if set, otherwise displays the single "default" backend synthesized from the legacy single-backend env vars. """ import os from neuronetz_gateway.cli.backends import parse as parse_backends from neuronetz_gateway.cli.backends import redacted_dump raw = os.environ.get("OLLAMA_BACKENDS", "").strip() settings = get_settings() if raw: backends = parse_backends(raw) typer.echo(f"OLLAMA_BACKENDS is set ({len(backends)} backend(s)):") else: backends = settings.effective_backends() typer.echo("OLLAMA_BACKENDS is empty — using single-backend fallback:") for idx, b in enumerate(backends, start=1): typer.echo(f" [{idx}] {b.name}") for k, v in redacted_dump(b).items(): if k == "name": continue typer.echo(f" {k}: {v}") @app.command("add-backend") def add_backend( name: Annotated[ str, typer.Argument(help="Unique backend name (e.g. 'embedded', 'neuro-ollama').") ], base_url: Annotated[ str, typer.Argument(help="Base URL of the Ollama API (e.g. http://neuro-ollama:11434).") ], *, token: Annotated[ str | None, typer.Option("--token", help="Bearer token (omit if no auth).") ] = None, header: Annotated[ str, typer.Option("--header", help="Auth header name; default Authorization.") ] = "Authorization", scheme: Annotated[ str, typer.Option("--scheme", help="Auth scheme prefix; default Bearer.") ] = "Bearer", skip_validate: Annotated[ bool, typer.Option("--no-validate", help="Skip the upstream probe (NOT recommended).") ] = False, write_env: Annotated[ str | None, typer.Option( "--write-env", help="If given, replace the OLLAMA_BACKENDS line in this .env file in place.", ), ] = None, replace: Annotated[ bool, typer.Option("--replace", help="If a backend with this name exists, update it.") ] = False, dry_run: Annotated[ bool, typer.Option("--dry-run", help="Show what would change; don't probe or write.") ] = False, ) -> None: """Add a backend to OLLAMA_BACKENDS (or update with --replace), probe-validate it, and print/write the resulting line. Without --write-env, just prints the new ``OLLAMA_BACKENDS=...`` line for you to paste into your .env. With --write-env , edits the file in place (atomic temp-file + rename). Recreating the gateway container is on you — typically ``docker compose up -d gateway``. """ import os import httpx from pydantic import SecretStr from neuronetz_gateway.cli.backends import ( add_or_replace, serialize, update_env_file, ) from neuronetz_gateway.cli.backends import ( parse as parse_backends, ) from neuronetz_gateway.proxy.router import build_backend_headers raw = os.environ.get("OLLAMA_BACKENDS", "").strip() settings = get_settings() current = parse_backends(raw) if raw else list(settings.effective_backends()) secret = SecretStr(token) if token else None new = BackendSpec( name=name, base_url=base_url, auth_token=secret, auth_header=header, auth_scheme=scheme, ) try: updated = add_or_replace(current, new, replace=replace) except ValueError as exc: typer.secho(str(exc), fg=typer.colors.RED) raise typer.Exit(code=1) from None if dry_run: typer.echo("(dry-run) would set:") typer.echo(f" OLLAMA_BACKENDS={serialize(updated)}") return if not skip_validate: async def _probe() -> tuple[int, str]: async with httpx.AsyncClient( base_url=new.base_url, timeout=httpx.Timeout(connect=5, read=10, write=10, pool=10), headers=build_backend_headers(new), ) as client: try: resp = await client.get("/api/tags") except httpx.HTTPError as exc: return 0, f"transport error: {type(exc).__name__}" if resp.status_code >= 400: return resp.status_code, f"HTTP {resp.status_code}" ct = resp.headers.get("content-type", "") body = resp.json() if ct.startswith("application/json") else {} count = len(body.get("models", [])) if isinstance(body, dict) else 0 return resp.status_code, f"HTTP 200, {count} model(s)" status, msg = asyncio.run(_probe()) if status != 200: typer.secho(f"✗ probe failed for {new.name}: {msg}", fg=typer.colors.RED) if status in (401, 403): typer.echo(" credentials rejected — check --token / --header / --scheme.") typer.echo(" (use --no-validate to skip this check; not recommended.)") raise typer.Exit(code=1) typer.secho(f"✓ probe ok for {new.name}: {msg}", fg=typer.colors.GREEN) line = serialize(updated) if write_env is not None: path = Path(write_env) try: update_env_file(path, line) except (PermissionError, OSError) as exc: _emit_paste_fallback(path, line, exc) raise typer.Exit(code=1) from exc typer.echo(f"✓ updated {path}") typer.echo(" recreate the gateway: docker compose up -d gateway") else: typer.echo("") typer.echo("Add this to your .env (then `docker compose up -d gateway`):") typer.echo("") typer.echo(f" OLLAMA_BACKENDS={line}") typer.echo("") @app.command("remove-backend") def remove_backend( name: Annotated[str, typer.Argument(help="Name of the backend to remove.")], *, write_env: Annotated[ str | None, typer.Option("--write-env", help="If given, update this .env file in place."), ] = None, dry_run: Annotated[ bool, typer.Option("--dry-run", help="Show what would change; don't write.") ] = False, ) -> None: """Remove a backend from OLLAMA_BACKENDS and emit the resulting line.""" import os from neuronetz_gateway.cli.backends import ( parse as parse_backends, ) from neuronetz_gateway.cli.backends import ( remove as remove_backend_from_list, ) from neuronetz_gateway.cli.backends import ( serialize, update_env_file, ) raw = os.environ.get("OLLAMA_BACKENDS", "").strip() if not raw: typer.secho("OLLAMA_BACKENDS is empty — nothing to remove.", fg=typer.colors.YELLOW) raise typer.Exit(code=1) current = parse_backends(raw) try: updated = remove_backend_from_list(current, name) except ValueError as exc: typer.secho(str(exc), fg=typer.colors.RED) raise typer.Exit(code=1) from None line = serialize(updated) if dry_run: typer.echo("(dry-run) would set:") typer.echo(f" OLLAMA_BACKENDS={line}") return if write_env is not None: path = Path(write_env) try: update_env_file(path, line) except (PermissionError, OSError) as exc: _emit_paste_fallback(path, line, exc) raise typer.Exit(code=1) from exc typer.echo(f"✓ updated {write_env}") typer.echo(" recreate the gateway: docker compose up -d gateway") else: typer.echo("") typer.echo("Replace your OLLAMA_BACKENDS in .env with:") typer.echo("") typer.echo(f" OLLAMA_BACKENDS={line}") typer.echo("") def main() -> None: """Console-script entry point.""" app() if __name__ == "__main__": main()