Real test bodies (not stubs), driven against an in-process httpx.ASGITransport override of the gateway's get_ollama_client dependency pointing at tests/integration/mock_ollama.py. Unit (target 100% on auth/, ratelimit/, budget/): - argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change - key format/uniqueness/prefix extraction - token counter (prompt_eval_count + eval_count, embeddings, missing-counts) - translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks, /v1/models list shape) - allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/ empty-discovered) - discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None) - sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted) Integration (testcontainers postgres + redis + in-process mock Ollama): - auth flow (no/malformed/wrong key all return identical sanitized 401) - proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked endpoints uniformly 403) - openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models) - model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered, /v1/models filtered, unpermitted-but-installed = nonexistent = 403, empty cache denies even allow_all) - rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200) - budget (decrement + headers; pre-burned counter blocks next request) - revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s) Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py: the per-hit ZSET member uses id(object()) which returns the same id on consecutive calls, causing same-millisecond hits to overwrite instead of stacking. To be fixed in a follow-up commit.
121 lines
4.9 KiB
Python
121 lines
4.9 KiB
Python
"""Unit tests for ``neuronetz_gateway.ratelimit.sliding_window``.
|
|
|
|
Redis Lua-atomic sliding window (SPEC §4.3 step 4, §9 100% on ``ratelimit/``):
|
|
counts hits within the window, resets after it elapses, and keeps per-key vs
|
|
per-tenant scopes independent. Backed by the real ``redis_client`` testcontainer
|
|
fixture; skips cleanly when Docker is unavailable.
|
|
|
|
Bodies are real but skip if the limiter is still a Phase-1 stub
|
|
(``NotImplementedError``) so the suite stays green until Backend lands.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
import redis.asyncio as aioredis
|
|
|
|
from neuronetz_gateway.ratelimit.sliding_window import RateLimitResult, SlidingWindowLimiter
|
|
from tests._skip import call_or_skip
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
# Known Backend bug (sliding_window.py): the per-hit ZSET member is
|
|
# ``f"{now_ms}-{id(object())}"``. ``id(object())`` returns the SAME value on
|
|
# every call (the temporary object is freed immediately), so two hits landing in
|
|
# the same millisecond produce an identical member; ZADD then *overwrites*
|
|
# instead of adding a second entry, undercounting the window and admitting
|
|
# requests that should be blocked. These two tests assert correct counting and
|
|
# therefore fail until Backend gives each hit a unique member (e.g. a counter or
|
|
# ``secrets.token_hex``). ``strict=False`` so they flip to XPASS once fixed
|
|
# without breaking the suite. See QA report.
|
|
_MEMBER_COLLISION = pytest.mark.xfail(
|
|
reason="sliding_window member id(object()) collides within a millisecond; "
|
|
"undercounts the window (see QA report)",
|
|
strict=False,
|
|
)
|
|
|
|
|
|
async def _check(
|
|
limiter: SlidingWindowLimiter, key: str, limit: int, window_s: int, cost: int = 1
|
|
) -> RateLimitResult:
|
|
return await call_or_skip(limiter.check, key, limit, window_s, cost)
|
|
|
|
|
|
# A spacing larger than 1ms so consecutive hits land on distinct ZSET members,
|
|
# isolating the *windowing* logic from the separate member-collision bug
|
|
# (asserted directly in test_same_millisecond_burst_undercounts below).
|
|
_SPACING_S = 0.003
|
|
|
|
|
|
async def test_allows_up_to_limit_then_blocks(redis_client: aioredis.Redis) -> None:
|
|
limiter = SlidingWindowLimiter(redis_client)
|
|
key = "rl:key:abc"
|
|
limit, window = 3, 60
|
|
|
|
results = []
|
|
for _ in range(limit):
|
|
results.append(await _check(limiter, key, limit, window))
|
|
await asyncio.sleep(_SPACING_S)
|
|
assert all(r.allowed for r in results)
|
|
assert results[0].limit == limit
|
|
# Remaining decrements monotonically toward zero.
|
|
assert results[-1].remaining == 0
|
|
|
|
blocked = await _check(limiter, key, limit, window)
|
|
assert blocked.allowed is False
|
|
assert blocked.remaining == 0
|
|
# A blocked result advertises when to retry (used for Retry-After).
|
|
assert blocked.retry_after_s is not None
|
|
assert blocked.retry_after_s >= 0
|
|
|
|
|
|
async def test_window_resets_after_elapse(redis_client: aioredis.Redis) -> None:
|
|
limiter = SlidingWindowLimiter(redis_client)
|
|
key = "rl:key:resets"
|
|
limit, window = 2, 1 # 1-second window for a fast test
|
|
|
|
assert (await _check(limiter, key, limit, window)).allowed
|
|
await asyncio.sleep(_SPACING_S)
|
|
assert (await _check(limiter, key, limit, window)).allowed
|
|
await asyncio.sleep(_SPACING_S)
|
|
assert (await _check(limiter, key, limit, window)).allowed is False
|
|
|
|
# After the window passes, the oldest hits age out and capacity returns.
|
|
await asyncio.sleep(1.2)
|
|
assert (await _check(limiter, key, limit, window)).allowed is True
|
|
|
|
|
|
@_MEMBER_COLLISION
|
|
async def test_same_millisecond_burst_undercounts(redis_client: aioredis.Redis) -> None:
|
|
# A burst of hits within one millisecond must still each count. With the
|
|
# current member scheme they collide and only one is recorded, so the
|
|
# limiter wrongly keeps admitting. xfail until Backend makes members unique.
|
|
limiter = SlidingWindowLimiter(redis_client)
|
|
key = "rl:key:burst"
|
|
limit, window = 2, 60
|
|
results = [await _check(limiter, key, limit, window) for _ in range(4)]
|
|
# Correct behaviour: first two admitted, rest blocked.
|
|
assert [r.allowed for r in results] == [True, True, False, False]
|
|
|
|
|
|
async def test_per_key_and_per_tenant_scopes_independent(
|
|
redis_client: aioredis.Redis,
|
|
) -> None:
|
|
limiter = SlidingWindowLimiter(redis_client)
|
|
# Distinct keys => distinct windows; exhausting one must not affect another.
|
|
await _check(limiter, "rl:key:k1", 1, 60)
|
|
assert (await _check(limiter, "rl:key:k1", 1, 60)).allowed is False
|
|
assert (await _check(limiter, "rl:tenant:t1", 1, 60)).allowed is True
|
|
|
|
|
|
async def test_cost_consumes_multiple_slots(redis_client: aioredis.Redis) -> None:
|
|
# TPM-style accounting: a single check may cost >1 (per-key TPM, SPEC §4.3).
|
|
limiter = SlidingWindowLimiter(redis_client)
|
|
first = await _check(limiter, "rl:tpm:k", limit=10, window_s=60, cost=8)
|
|
assert first.allowed is True
|
|
assert first.remaining == 2
|
|
second = await _check(limiter, "rl:tpm:k", limit=10, window_s=60, cost=8)
|
|
assert second.allowed is False
|