Files
neuronetz-gateway/tests/unit/test_discovery.py
Stephan Berbig 844b02aade tests: unit + integration suite (99 tests; ruff + mypy --strict clean)
Real test bodies (not stubs), driven against an in-process httpx.ASGITransport
override of the gateway's get_ollama_client dependency pointing at
tests/integration/mock_ollama.py.

Unit (target 100% on auth/, ratelimit/, budget/):
- argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change
- key format/uniqueness/prefix extraction
- token counter (prompt_eval_count + eval_count, embeddings, missing-counts)
- translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks,
  /v1/models list shape)
- allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/
  empty-discovered)
- discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None)
- sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted)

Integration (testcontainers postgres + redis + in-process mock Ollama):
- auth flow (no/malformed/wrong key all return identical sanitized 401)
- proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked
  endpoints uniformly 403)
- openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models)
- model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered,
  /v1/models filtered, unpermitted-but-installed = nonexistent = 403,
  empty cache denies even allow_all)
- rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200)
- budget (decrement + headers; pre-burned counter blocks next request)
- revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s)

Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py:
the per-hit ZSET member uses id(object()) which returns the same id on
consecutive calls, causing same-millisecond hits to overwrite instead of
stacking. To be fixed in a follow-up commit.
2026-05-26 20:52:33 +02:00

143 lines
5.2 KiB
Python

"""Unit tests for model discovery (SPEC §4.6).
A background poller queries Ollama ``GET /api/tags``, parses the installed model
set, caches it in Redis (TTL) + in-process (:class:`DiscoveryCache`), and **fails
closed**: an empty/expired discovered set means no model resolves (deny). On an
upstream error ``refresh_once`` returns ``False`` and leaves the caches untouched
so they expire on their own TTL (stale-expired ⇒ empty ⇒ deny); discovery never
opens access.
Driven against the in-process ``mock_ollama`` upstream and the real
``redis_client`` testcontainer fixture (skips cleanly without Docker).
"""
from __future__ import annotations
from typing import Any
import httpx
import pytest
import redis.asyncio as aioredis
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from neuronetz_gateway.config import Settings
from neuronetz_gateway.proxy import discovery
def _ollama_client(app: FastAPI) -> httpx.AsyncClient:
transport = httpx.ASGITransport(app=app)
return httpx.AsyncClient(transport=transport, base_url="http://ollama")
def _settings() -> Settings:
return Settings(model_discovery_cache_ttl_s=120, model_discovery_refresh_s=60)
# --- pure parsing ----------------------------------------------------------
def test_names_of_extracts_model_names() -> None:
models = [
discovery.DiscoveredModel(name="llama3.1:8b", family="llama"),
discovery.DiscoveredModel(name="mistral:7b"),
]
assert discovery.names_of(models) == frozenset({"llama3.1:8b", "mistral:7b"})
def test_names_of_empty() -> None:
assert discovery.names_of([]) == frozenset()
# --- fetch + parse against the mock upstream -------------------------------
@pytest.mark.asyncio
async def test_fetch_tags_parses_mock_catalogue(mock_ollama_app: FastAPI) -> None:
async with _ollama_client(mock_ollama_app) as ollama:
models = await discovery.fetch_tags(ollama)
names = discovery.names_of(models)
assert {"llama3.1:8b", "mistral:7b", "nomic-embed-text"} <= names
# Sanitized metadata is captured (family parsed from the mock's details).
by_name = {m.name: m for m in models}
assert by_name["llama3.1:8b"].family == "llama3.1"
assert by_name["llama3.1:8b"].parameter_size == "8B"
@pytest.mark.asyncio
async def test_fetch_tags_raises_on_upstream_error() -> None:
broken = FastAPI()
@broken.get("/api/tags")
async def _tags() -> Any:
return JSONResponse({"error": "boom"}, status_code=500)
async with _ollama_client(broken) as ollama:
with pytest.raises(httpx.HTTPError):
await discovery.fetch_tags(ollama)
# --- redis cache round-trip ------------------------------------------------
@pytest.mark.asyncio
async def test_redis_cache_roundtrip(redis_client: aioredis.Redis) -> None:
models = [discovery.DiscoveredModel(name="llama3.1:8b"), discovery.DiscoveredModel(name="x:1b")]
await discovery.write_discovered_to_redis(redis_client, models, ttl_s=120)
names = await discovery.read_discovered_from_redis(redis_client)
assert names == frozenset({"llama3.1:8b", "x:1b"})
# TTL was applied (staleness expires) per SPEC §4.6.
assert 0 < await redis_client.ttl(discovery.REDIS_DISCOVERED_KEY) <= 120
@pytest.mark.asyncio
async def test_read_discovered_miss_returns_empty(redis_client: aioredis.Redis) -> None:
# Cache miss / expiry => empty set => fail-closed deny.
assert await discovery.read_discovered_from_redis(redis_client) == frozenset()
# --- refresh_once: success and fail-closed ---------------------------------
@pytest.mark.asyncio
async def test_refresh_once_populates_in_process_and_redis(
redis_client: aioredis.Redis, mock_ollama_app: FastAPI
) -> None:
cache = discovery.DiscoveryCache()
async with _ollama_client(mock_ollama_app) as ollama:
ok = await discovery.refresh_once(ollama, redis_client, cache, _settings())
assert ok is True
assert {"llama3.1:8b", "mistral:7b", "nomic-embed-text"} <= cache.names
# Mirrored into Redis under the SPEC §4.6 key.
assert {"llama3.1:8b", "mistral:7b"} <= await discovery.read_discovered_from_redis(redis_client)
@pytest.mark.asyncio
async def test_refresh_once_fail_closed_on_upstream_error(
redis_client: aioredis.Redis,
) -> None:
broken = FastAPI()
@broken.get("/api/tags")
async def _tags() -> Any:
return JSONResponse({"error": "boom"}, status_code=500)
cache = discovery.DiscoveryCache()
async with _ollama_client(broken) as ollama:
ok = await discovery.refresh_once(ollama, redis_client, cache, _settings())
# Refresh reports failure; the in-process cache stays empty (no models
# resolve) — discovery never opens access on error.
assert ok is False
assert cache.names == frozenset()
@pytest.mark.asyncio
async def test_refresh_once_tolerates_missing_redis(mock_ollama_app: FastAPI) -> None:
# redis_client=None must still refresh the in-process cache (best-effort
# Redis fill), not crash the poller.
cache = discovery.DiscoveryCache()
async with _ollama_client(mock_ollama_app) as ollama:
ok = await discovery.refresh_once(ollama, None, cache, _settings())
assert ok is True
assert "llama3.1:8b" in cache.names