Real test bodies (not stubs), driven against an in-process httpx.ASGITransport override of the gateway's get_ollama_client dependency pointing at tests/integration/mock_ollama.py. Unit (target 100% on auth/, ratelimit/, budget/): - argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change - key format/uniqueness/prefix extraction - token counter (prompt_eval_count + eval_count, embeddings, missing-counts) - translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks, /v1/models list shape) - allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/ empty-discovered) - discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None) - sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted) Integration (testcontainers postgres + redis + in-process mock Ollama): - auth flow (no/malformed/wrong key all return identical sanitized 401) - proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked endpoints uniformly 403) - openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models) - model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered, /v1/models filtered, unpermitted-but-installed = nonexistent = 403, empty cache denies even allow_all) - rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200) - budget (decrement + headers; pre-burned counter blocks next request) - revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s) Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py: the per-hit ZSET member uses id(object()) which returns the same id on consecutive calls, causing same-millisecond hits to overwrite instead of stacking. To be fixed in a follow-up commit.
94 lines
3.7 KiB
Python
94 lines
3.7 KiB
Python
"""Unit tests for ``neuronetz_gateway.auth.hashing`` (argon2id wrapper).
|
|
|
|
Covers SPEC §3/§9: hash, constant-time verify, wrong-key rejection, and
|
|
rehash-on-parameter-change. Targets 100% coverage of ``auth/hashing.py``.
|
|
|
|
These bodies are real; while ``hashing.py`` is a Phase-1 stub each call routes
|
|
through :func:`tests._skip.call_or_skip`, which skips (not fails) on
|
|
``NotImplementedError`` and lets the test self-activate once Backend lands.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from argon2 import PasswordHasher
|
|
|
|
from neuronetz_gateway.auth import hashing
|
|
from neuronetz_gateway.config import Settings
|
|
from tests._skip import call_or_skip
|
|
|
|
# A fast hasher so the suite stays quick; the wrapper must round-trip regardless
|
|
# of the cost parameters, which are what we actually exercise here.
|
|
_FAST = PasswordHasher(time_cost=1, memory_cost=8, parallelism=1)
|
|
SECRET = "nz_abcdef012345deadbeefcafebabe00112233" # noqa: S105 - test fixture, not a real key
|
|
|
|
|
|
def _settings(**overrides: object) -> Settings:
|
|
base: dict[str, object] = {
|
|
"argon2_time_cost": 1,
|
|
"argon2_memory_cost_kib": 8,
|
|
"argon2_parallelism": 1,
|
|
}
|
|
base.update(overrides)
|
|
return Settings(**base) # type: ignore[arg-type] # pydantic-settings kwargs
|
|
|
|
|
|
def test_build_hasher_uses_settings() -> None:
|
|
hasher = hashing.build_hasher(_settings(argon2_time_cost=2))
|
|
assert isinstance(hasher, PasswordHasher)
|
|
assert hasher.time_cost == 2
|
|
|
|
|
|
def test_hash_secret_returns_argon2id_encoded() -> None:
|
|
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
|
|
assert isinstance(encoded, str)
|
|
# argon2id encoded hashes begin with the variant tag.
|
|
assert encoded.startswith("$argon2id$")
|
|
# Never echoes the plaintext back.
|
|
assert SECRET not in encoded
|
|
|
|
|
|
def test_hash_is_salted_unique_per_call() -> None:
|
|
a = call_or_skip(hashing.hash_secret, _FAST, SECRET)
|
|
b = call_or_skip(hashing.hash_secret, _FAST, SECRET)
|
|
assert a != b # random salt => distinct encodings
|
|
|
|
|
|
def test_verify_secret_roundtrip_true() -> None:
|
|
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
|
|
assert call_or_skip(hashing.verify_secret, _FAST, encoded, SECRET) is True
|
|
|
|
|
|
def test_verify_wrong_secret_returns_false_not_raise() -> None:
|
|
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
|
|
# Constant-time path must return False, never propagate argon2's
|
|
# VerifyMismatchError to the caller (auth must fail closed, not 500).
|
|
result = call_or_skip(hashing.verify_secret, _FAST, encoded, SECRET + "x")
|
|
assert result is False
|
|
|
|
|
|
def test_verify_garbage_encoding_returns_false() -> None:
|
|
# An invalid/corrupt stored hash must verify False, not raise.
|
|
result = call_or_skip(hashing.verify_secret, _FAST, "not-a-valid-hash", SECRET)
|
|
assert result is False
|
|
|
|
|
|
def test_needs_rehash_false_for_current_params() -> None:
|
|
encoded = call_or_skip(hashing.hash_secret, _FAST, SECRET)
|
|
assert call_or_skip(hashing.needs_rehash, _FAST, encoded) is False
|
|
|
|
|
|
def test_needs_rehash_true_when_params_change() -> None:
|
|
weak = PasswordHasher(time_cost=1, memory_cost=8, parallelism=1)
|
|
strong = PasswordHasher(time_cost=3, memory_cost=64, parallelism=1)
|
|
encoded = call_or_skip(hashing.hash_secret, weak, SECRET)
|
|
# A hash made with weaker params must be flagged for rehash by a stronger
|
|
# configured hasher (SPEC §9 rehash-on-parameter-change).
|
|
assert call_or_skip(hashing.needs_rehash, strong, encoded) is True
|
|
|
|
|
|
@pytest.mark.parametrize("secret", ["", "a", "x" * 4096])
|
|
def test_hash_verify_edge_lengths(secret: str) -> None:
|
|
encoded = call_or_skip(hashing.hash_secret, _FAST, secret)
|
|
assert call_or_skip(hashing.verify_secret, _FAST, encoded, secret) is True
|