Files
neuronetz-gateway/tests/integration/test_budget.py
Stephan Berbig 844b02aade tests: unit + integration suite (99 tests; ruff + mypy --strict clean)
Real test bodies (not stubs), driven against an in-process httpx.ASGITransport
override of the gateway's get_ollama_client dependency pointing at
tests/integration/mock_ollama.py.

Unit (target 100% on auth/, ratelimit/, budget/):
- argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change
- key format/uniqueness/prefix extraction
- token counter (prompt_eval_count + eval_count, embeddings, missing-counts)
- translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks,
  /v1/models list shape)
- allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/
  empty-discovered)
- discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None)
- sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted)

Integration (testcontainers postgres + redis + in-process mock Ollama):
- auth flow (no/malformed/wrong key all return identical sanitized 401)
- proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked
  endpoints uniformly 403)
- openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models)
- model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered,
  /v1/models filtered, unpermitted-but-installed = nonexistent = 403,
  empty cache denies even allow_all)
- rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200)
- budget (decrement + headers; pre-burned counter blocks next request)
- revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s)

Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py:
the per-hit ZSET member uses id(object()) which returns the same id on
consecutive calls, causing same-millisecond hits to overwrite instead of
stacking. To be fixed in a follow-up commit.
2026-05-26 20:52:33 +02:00

84 lines
2.8 KiB
Python

"""Integration tests for token budgets (SPEC §4.3 step 5, §6.5, §12).
* A request returns the SPEC §6.5 budget headers
(``X-Budget-Period``, ``X-Budget-Tokens-Remaining``).
* When the daily budget is exhausted the next request is blocked with a
sanitized ``budget_exceeded`` error.
"""
from __future__ import annotations
import asyncio
import httpx
import pytest
from neuronetz_gateway.budget.counter import BudgetCounter
from neuronetz_gateway.db.models import BudgetPeriod
from tests.integration.conftest import (
IntegrationApp,
_create_tenant_and_key,
)
from tests.integration.mock_ollama import DEFAULT_MODELS
pytestmark = pytest.mark.asyncio
async def _chat(client: httpx.AsyncClient, key_full: str) -> httpx.Response:
return await client.post(
"/api/chat",
headers={"Authorization": f"Bearer {key_full}"},
json={
"model": "llama3.1:8b",
"messages": [{"role": "user", "content": "hello"}],
"stream": False,
},
)
async def test_budget_headers_present_on_response(
integration_app: IntegrationApp, client: httpx.AsyncClient
) -> None:
key = await _create_tenant_and_key(
integration_app,
tokens_daily=1_000_000,
allowed_models=list(DEFAULT_MODELS),
)
resp = await _chat(client, key.full_key)
assert resp.status_code == 200
# SPEC §6.5
assert resp.headers.get("X-Budget-Period") in {"day", "month", "total"}
assert resp.headers.get("X-Budget-Tokens-Remaining") is not None
async def test_budget_blocks_when_exhausted(
integration_app: IntegrationApp, client: httpx.AsyncClient
) -> None:
# Tiny daily budget; the first request itself will spend more than it,
# leaving remaining <= 0 so a follow-up must be blocked.
key = await _create_tenant_and_key(
integration_app,
tokens_daily=1,
allowed_models=list(DEFAULT_MODELS),
)
# Pre-burn the Redis budget counter so the *next* request is blocked
# deterministically (don't depend on post-stream accounting timing).
redis_client = integration_app.app.state.redis
counter = BudgetCounter(redis_client)
# Consume more than the daily limit so check() reports exhausted.
await counter.consume(str(key.key_id), BudgetPeriod.day, 1000)
# Give Redis a moment so the next request observes the consumed value.
await asyncio.sleep(0.01)
resp = await _chat(client, key.full_key)
# Must not be a 200 — fail-closed / descriptive error.
assert resp.status_code != 200
body = resp.json()
assert body["error"]["code"] in {"budget_exceeded", "rate_limited"}
assert body["error"]["request_id"]
# Message is descriptive but sanitized (no upstream / internal details).
msg = body["error"]["message"].lower()
for needle in ("ollama", "redis", "postgres", "traceback"):
assert needle not in msg