"""Unit tests for ``neuronetz_gateway.ratelimit.sliding_window``. Redis Lua-atomic sliding window (SPEC §4.3 step 4, §9 100% on ``ratelimit/``): counts hits within the window, resets after it elapses, and keeps per-key vs per-tenant scopes independent. Backed by the real ``redis_client`` testcontainer fixture; skips cleanly when Docker is unavailable. Bodies are real but skip if the limiter is still a Phase-1 stub (``NotImplementedError``) so the suite stays green until Backend lands. """ from __future__ import annotations import asyncio import pytest import redis.asyncio as aioredis from neuronetz_gateway.ratelimit.sliding_window import RateLimitResult, SlidingWindowLimiter from tests._skip import call_or_skip pytestmark = pytest.mark.asyncio # Known Backend bug (sliding_window.py): the per-hit ZSET member is # ``f"{now_ms}-{id(object())}"``. ``id(object())`` returns the SAME value on # every call (the temporary object is freed immediately), so two hits landing in # the same millisecond produce an identical member; ZADD then *overwrites* # instead of adding a second entry, undercounting the window and admitting # requests that should be blocked. These two tests assert correct counting and # therefore fail until Backend gives each hit a unique member (e.g. a counter or # ``secrets.token_hex``). ``strict=False`` so they flip to XPASS once fixed # without breaking the suite. See QA report. _MEMBER_COLLISION = pytest.mark.xfail( reason="sliding_window member id(object()) collides within a millisecond; " "undercounts the window (see QA report)", strict=False, ) async def _check( limiter: SlidingWindowLimiter, key: str, limit: int, window_s: int, cost: int = 1 ) -> RateLimitResult: return await call_or_skip(limiter.check, key, limit, window_s, cost) # A spacing larger than 1ms so consecutive hits land on distinct ZSET members, # isolating the *windowing* logic from the separate member-collision bug # (asserted directly in test_same_millisecond_burst_undercounts below). _SPACING_S = 0.003 async def test_allows_up_to_limit_then_blocks(redis_client: aioredis.Redis) -> None: limiter = SlidingWindowLimiter(redis_client) key = "rl:key:abc" limit, window = 3, 60 results = [] for _ in range(limit): results.append(await _check(limiter, key, limit, window)) await asyncio.sleep(_SPACING_S) assert all(r.allowed for r in results) assert results[0].limit == limit # Remaining decrements monotonically toward zero. assert results[-1].remaining == 0 blocked = await _check(limiter, key, limit, window) assert blocked.allowed is False assert blocked.remaining == 0 # A blocked result advertises when to retry (used for Retry-After). assert blocked.retry_after_s is not None assert blocked.retry_after_s >= 0 async def test_window_resets_after_elapse(redis_client: aioredis.Redis) -> None: limiter = SlidingWindowLimiter(redis_client) key = "rl:key:resets" limit, window = 2, 1 # 1-second window for a fast test assert (await _check(limiter, key, limit, window)).allowed await asyncio.sleep(_SPACING_S) assert (await _check(limiter, key, limit, window)).allowed await asyncio.sleep(_SPACING_S) assert (await _check(limiter, key, limit, window)).allowed is False # After the window passes, the oldest hits age out and capacity returns. await asyncio.sleep(1.2) assert (await _check(limiter, key, limit, window)).allowed is True @_MEMBER_COLLISION async def test_same_millisecond_burst_undercounts(redis_client: aioredis.Redis) -> None: # A burst of hits within one millisecond must still each count. With the # current member scheme they collide and only one is recorded, so the # limiter wrongly keeps admitting. xfail until Backend makes members unique. limiter = SlidingWindowLimiter(redis_client) key = "rl:key:burst" limit, window = 2, 60 results = [await _check(limiter, key, limit, window) for _ in range(4)] # Correct behaviour: first two admitted, rest blocked. assert [r.allowed for r in results] == [True, True, False, False] async def test_per_key_and_per_tenant_scopes_independent( redis_client: aioredis.Redis, ) -> None: limiter = SlidingWindowLimiter(redis_client) # Distinct keys => distinct windows; exhausting one must not affect another. await _check(limiter, "rl:key:k1", 1, 60) assert (await _check(limiter, "rl:key:k1", 1, 60)).allowed is False assert (await _check(limiter, "rl:tenant:t1", 1, 60)).allowed is True async def test_cost_consumes_multiple_slots(redis_client: aioredis.Redis) -> None: # TPM-style accounting: a single check may cost >1 (per-key TPM, SPEC §4.3). limiter = SlidingWindowLimiter(redis_client) first = await _check(limiter, "rl:tpm:k", limit=10, window_s=60, cost=8) assert first.allowed is True assert first.remaining == 2 second = await _check(limiter, "rl:tpm:k", limit=10, window_s=60, cost=8) assert second.allowed is False