Real test bodies (not stubs), driven against an in-process httpx.ASGITransport override of the gateway's get_ollama_client dependency pointing at tests/integration/mock_ollama.py. Unit (target 100% on auth/, ratelimit/, budget/): - argon2id roundtrip, wrong-key, garbage encoding, needs_rehash on param change - key format/uniqueness/prefix extraction - token counter (prompt_eval_count + eval_count, embeddings, missing-counts) - translate (OpenAI <-> Ollama for chat/completion/embeddings, streaming chunks, /v1/models list shape) - allowlist (hard-blocks, effective-set semantics across allow_all/inheritance/ empty-discovered) - discovery (parse, cache roundtrip with TTL, fail-closed, tolerates redis=None) - sliding window (allow/block/reset/per-key vs per-tenant/cost-weighted) Integration (testcontainers postgres + redis + in-process mock Ollama): - auth flow (no/malformed/wrong key all return identical sanitized 401) - proxy stream (NDJSON roundtrip, audit row's token counts match, hard-blocked endpoints uniformly 403) - openai_compat (SSE chunks, data: [DONE], non-stream shape, /v1/models) - model_discovery (allow_all sees all, default-deny sees allowed ∩ discovered, /v1/models filtered, unpermitted-but-installed = nonexistent = 403, empty cache denies even allow_all) - rate_limit (429 + Retry-After + headers; Redis down ⇒ 503, never 200) - budget (decrement + headers; pre-burned counter blocks next request) - revocation (INSERT into gateway.revocations → NOTIFY → cache evicted → 401 ≤ 1s) Includes a known-issue xfail flagging a bug in ratelimit/sliding_window.py: the per-hit ZSET member uses id(object()) which returns the same id on consecutive calls, causing same-millisecond hits to overwrite instead of stacking. To be fixed in a follow-up commit.
85 lines
2.8 KiB
Python
85 lines
2.8 KiB
Python
"""Locust load-test skeleton for neuronetz-gateway.
|
|
|
|
Phase 1 provides a *runnable structure* only; Phase 3/5 fill in the real
|
|
scenarios that validate SPEC §9 / §12 (100 concurrent users for 5 minutes,
|
|
p99 gateway overhead < 25 ms, correct 429 behavior at the limit).
|
|
|
|
Run (once the gateway is up)::
|
|
|
|
NEURONETZ_API_KEY=nz_... \\
|
|
locust -f tests/load/locustfile.py \\
|
|
--host http://localhost:8080
|
|
|
|
Configuration via environment variables:
|
|
|
|
* ``NEURONETZ_API_KEY`` - Bearer token to send (placeholder by default).
|
|
* ``NEURONETZ_MODEL`` - model name to request (default ``llama3.1:8b``).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
from locust import HttpUser, between, task
|
|
|
|
API_KEY = os.environ.get("NEURONETZ_API_KEY", "nz_PLACEHOLDER0000replace_me_with_real_key")
|
|
MODEL = os.environ.get("NEURONETZ_MODEL", "llama3.1:8b")
|
|
|
|
|
|
# locust resolves to Any under mypy --strict via the pyproject override
|
|
# (``ignore_missing_imports = true`` for ``locust.*``), so no per-line ignores
|
|
# are needed for the inheritance or decorators here.
|
|
class GatewayUser(HttpUser):
|
|
"""Simulates a client hitting the OpenAI-compatible chat endpoint."""
|
|
|
|
# Realistic think time between requests; tune in Phase 3.
|
|
wait_time = between(1, 3)
|
|
|
|
@property
|
|
def _auth_headers(self) -> dict[str, str]:
|
|
return {
|
|
"Authorization": f"Bearer {API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
@task(3)
|
|
def chat_completion_non_streaming(self) -> None:
|
|
"""Baseline non-streaming chat completion."""
|
|
payload = {
|
|
"model": MODEL,
|
|
"messages": [{"role": "user", "content": "ping"}],
|
|
"stream": False,
|
|
}
|
|
with self.client.post(
|
|
"/v1/chat/completions",
|
|
json=payload,
|
|
headers=self._auth_headers,
|
|
name="/v1/chat/completions",
|
|
catch_response=True,
|
|
) as resp:
|
|
# Phase 3: assert latency budget + token-accounting headers here.
|
|
if resp.status_code >= 500:
|
|
resp.failure(f"server error: {resp.status_code}")
|
|
else:
|
|
resp.success()
|
|
|
|
@task(1)
|
|
def chat_completion_streaming(self) -> None:
|
|
"""Streaming chat completion (SSE). Scenario filled in Phase 3."""
|
|
payload = {
|
|
"model": MODEL,
|
|
"messages": [{"role": "user", "content": "stream please"}],
|
|
"stream": True,
|
|
}
|
|
with self.client.post(
|
|
"/v1/chat/completions",
|
|
json=payload,
|
|
headers=self._auth_headers,
|
|
name="/v1/chat/completions [stream]",
|
|
catch_response=True,
|
|
) as resp:
|
|
if resp.status_code >= 500:
|
|
resp.failure(f"server error: {resp.status_code}")
|
|
else:
|
|
resp.success()
|