Files
neuronetz-gateway/tests/integration/test_openai_compat.py
Stephan Berbig 844b02aade tests: unit + integration suite (99 tests; ruff + mypy --strict clean)
Real test bodies (not stubs), driven against an in-process httpx.ASGITransport
override of the gateway's get_ollama_client dependency pointing at
tests/integration/mock_ollama.py.

Unit (target 100% on auth/, ratelimit/, budget/):
- argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change
- key format/uniqueness/prefix extraction
- token counter (prompt_eval_count + eval_count, embeddings, missing-counts)
- translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks,
  /v1/models list shape)
- allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/
  empty-discovered)
- discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None)
- sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted)

Integration (testcontainers postgres + redis + in-process mock Ollama):
- auth flow (no/malformed/wrong key all return identical sanitized 401)
- proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked
  endpoints uniformly 403)
- openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models)
- model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered,
  /v1/models filtered, unpermitted-but-installed = nonexistent = 403,
  empty cache denies even allow_all)
- rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200)
- budget (decrement + headers; pre-burned counter blocks next request)
- revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s)

Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py:
the per-hit ZSET member uses id(object()) which returns the same id on
consecutive calls, causing same-millisecond hits to overwrite instead of
stacking. To be fixed in a follow-up commit.
2026-05-26 20:52:33 +02:00

86 lines
2.9 KiB
Python

"""Integration tests for the OpenAI-compatible surface (SPEC §6.3, §12).
* ``/v1/chat/completions`` streaming SSE: every event is ``data: {...}\\n\\n``
and the stream terminates with ``data: [DONE]\\n\\n``.
* Non-streaming ``/v1/chat/completions`` returns the OpenAI ``chat.completion``
shape with a single ``choices[0].message`` and ``usage``.
* ``/v1/models`` returns the tenant's *effective* discovered set in the
OpenAI model-list format.
"""
from __future__ import annotations
import json
import httpx
import pytest
from tests.integration.conftest import IntegrationKey
from tests.integration.mock_ollama import DEFAULT_MODELS
pytestmark = pytest.mark.asyncio
async def test_chat_completions_sse_ends_with_done(
client: httpx.AsyncClient, api_key: IntegrationKey
) -> None:
events: list[str] = []
async with client.stream(
"POST",
"/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key.full_key}"},
json={
"model": "llama3.1:8b",
"messages": [{"role": "user", "content": "hi"}],
"stream": True,
},
) as resp:
assert resp.status_code == 200
assert "text/event-stream" in resp.headers.get("content-type", "")
async for line in resp.aiter_lines():
if line:
events.append(line)
# SSE framing: every line we kept is a ``data: `` line.
assert all(e.startswith("data: ") for e in events), events
assert events[-1] == "data: [DONE]"
# Parse one delta chunk to confirm OpenAI shape.
payload_line = next(e for e in events if e != "data: [DONE]")
payload = json.loads(payload_line.removeprefix("data: "))
assert payload["object"] == "chat.completion.chunk"
assert payload["choices"][0]["index"] == 0
async def test_chat_completions_non_streaming_shape(
client: httpx.AsyncClient, api_key: IntegrationKey
) -> None:
resp = await client.post(
"/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key.full_key}"},
json={
"model": "llama3.1:8b",
"messages": [{"role": "user", "content": "hi"}],
"stream": False,
},
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["object"] == "chat.completion"
assert body["choices"][0]["message"]["role"] == "assistant"
assert body["usage"]["total_tokens"] >= 0
async def test_v1_models_returns_effective_set(
client: httpx.AsyncClient, api_key: IntegrationKey
) -> None:
resp = await client.get(
"/v1/models", headers={"Authorization": f"Bearer {api_key.full_key}"}
)
assert resp.status_code == 200
body = resp.json()
assert body["object"] == "list"
ids = {m["id"] for m in body["data"]}
# ``api_key``'s tenant was created with the full DEFAULT_MODELS allowlist.
assert set(DEFAULT_MODELS) <= ids
for model in body["data"]:
assert model["object"] == "model"