Files
neuronetz-gateway/tests/unit/test_hashing.py
Stephan Berbig 844b02aade tests: unit + integration suite (99 tests; ruff + mypy --strict clean)
Real test bodies (not stubs), driven against an in-process httpx.ASGITransport
override of the gateway's get_ollama_client dependency pointing at
tests/integration/mock_ollama.py.

Unit (target 100% on auth/, ratelimit/, budget/):
- argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change
- key format/uniqueness/prefix extraction
- token counter (prompt_eval_count + eval_count, embeddings, missing-counts)
- translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks,
  /v1/models list shape)
- allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/
  empty-discovered)
- discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None)
- sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted)

Integration (testcontainers postgres + redis + in-process mock Ollama):
- auth flow (no/malformed/wrong key all return identical sanitized 401)
- proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked
  endpoints uniformly 403)
- openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models)
- model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered,
  /v1/models filtered, unpermitted-but-installed = nonexistent = 403,
  empty cache denies even allow_all)
- rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200)
- budget (decrement + headers; pre-burned counter blocks next request)
- revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s)

Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py:
the per-hit ZSET member uses id(object()) which returns the same id on
consecutive calls, causing same-millisecond hits to overwrite instead of
stacking. To be fixed in a follow-up commit.
2026-05-26 20:52:33 +02:00

94 lines
3.7 KiB
Python

"""Unit tests for ``neuronetz_gateway.auth.hashing`` (argon2id wrapper).
Covers SPEC §3/§9: hash, constant-time verify, wrong-key rejection, and
rehash-on-parameter-change. Targets 100% coverage of ``auth/hashing.py``.
These bodies are real; while ``hashing.py`` is a Phase-1 stub each call routes
through :func:`tests._skip.call_or_skip`, which skips (not fails) on
``NotImplementedError`` and lets the test self-activate once Backend lands.
"""
from __future__ import annotations
import pytest
from argon2 import PasswordHasher
from neuronetz_gateway.auth import hashing
from neuronetz_gateway.config import Settings
from tests._skip import call_or_skip
# A fast hasher so the suite stays quick; the wrapper must round-trip regardless
# of the cost parameters, which are what we actually exercise here.
_FAST = PasswordHasher(time_cost=1, memory_cost=8, parallelism=1)
SECRET = "nz_abcdef012345deadbeefcafebabe00112233" # noqa: S105 - test fixture, not a real key
def _settings(**overrides: object) -> Settings:
base: dict[str, object] = {
"argon2_time_cost": 1,
"argon2_memory_cost_kib": 8,
"argon2_parallelism": 1,
}
base.update(overrides)
return Settings(**base) # type: ignore[arg-type] # pydantic-settings kwargs
def test_build_hasher_uses_settings() -> None:
hasher = hashing.build_hasher(_settings(argon2_time_cost=2))
assert isinstance(hasher, PasswordHasher)
assert hasher.time_cost == 2
def test_hash_secret_returns_argon2id_encoded() -> None:
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
assert isinstance(encoded, str)
# argon2id encoded hashes begin with the variant tag.
assert encoded.startswith("$argon2id$")
# Never echoes the plaintext back.
assert SECRET not in encoded
def test_hash_is_salted_unique_per_call() -> None:
a = call_or_skip(hashing.hash_secret, _FAST, SECRET)
b = call_or_skip(hashing.hash_secret, _FAST, SECRET)
assert a != b # random salt => distinct encodings
def test_verify_secret_roundtrip_true() -> None:
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
assert call_or_skip(hashing.verify_secret, _FAST, encoded, SECRET) is True
def test_verify_wrong_secret_returns_false_not_raise() -> None:
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
# Constant-time path must return False, never propagate argon2's
# VerifyMismatchError to the caller (auth must fail closed, not 500).
result = call_or_skip(hashing.verify_secret, _FAST, encoded, SECRET + "x")
assert result is False
def test_verify_garbage_encoding_returns_false() -> None:
# An invalid/corrupt stored hash must verify False, not raise.
result = call_or_skip(hashing.verify_secret, _FAST, "not-a-valid-hash", SECRET)
assert result is False
def test_needs_rehash_false_for_current_params() -> None:
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
assert call_or_skip(hashing.needs_rehash, _FAST, encoded) is False
def test_needs_rehash_true_when_params_change() -> None:
weak = PasswordHasher(time_cost=1, memory_cost=8, parallelism=1)
strong = PasswordHasher(time_cost=3, memory_cost=64, parallelism=1)
encoded = call_or_skip(hashing.hash_secret, weak, SECRET)
# A hash made with weaker params must be flagged for rehash by a stronger
# configured hasher (SPEC §9 rehash-on-parameter-change).
assert call_or_skip(hashing.needs_rehash, strong, encoded) is True
@pytest.mark.parametrize("secret", ["", "a", "x" * 4096])
def test_hash_verify_edge_lengths(secret: str) -> None:
encoded = call_or_skip(hashing.hash_secret, _FAST, secret)
assert call_or_skip(hashing.verify_secret, _FAST, encoded, secret) is True