"""Locust load-test skeleton for neuronetz-gateway. Phase 1 provides a *runnable structure* only; Phase 3/5 fill in the real scenarios that validate SPEC §9 / §12 (100 concurrent users for 5 minutes, p99 gateway overhead < 25 ms, correct 429 behavior at the limit). Run (once the gateway is up):: NEURONETZ_API_KEY=nz_... \\ locust -f tests/load/locustfile.py \\ --host http://localhost:8080 Configuration via environment variables: * ``NEURONETZ_API_KEY`` - Bearer token to send (placeholder by default). * ``NEURONETZ_MODEL`` - model name to request (default ``llama3.1:8b``). """ from __future__ import annotations import os from locust import HttpUser, between, task API_KEY = os.environ.get("NEURONETZ_API_KEY", "nz_PLACEHOLDER0000replace_me_with_real_key") MODEL = os.environ.get("NEURONETZ_MODEL", "llama3.1:8b") # locust resolves to Any under mypy --strict via the pyproject override # (``ignore_missing_imports = true`` for ``locust.*``), so no per-line ignores # are needed for the inheritance or decorators here. class GatewayUser(HttpUser): """Simulates a client hitting the OpenAI-compatible chat endpoint.""" # Realistic think time between requests; tune in Phase 3. wait_time = between(1, 3) @property def _auth_headers(self) -> dict[str, str]: return { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", } @task(3) def chat_completion_non_streaming(self) -> None: """Baseline non-streaming chat completion.""" payload = { "model": MODEL, "messages": [{"role": "user", "content": "ping"}], "stream": False, } with self.client.post( "/v1/chat/completions", json=payload, headers=self._auth_headers, name="/v1/chat/completions", catch_response=True, ) as resp: # Phase 3: assert latency budget + token-accounting headers here. if resp.status_code >= 500: resp.failure(f"server error: {resp.status_code}") else: resp.success() @task(1) def chat_completion_streaming(self) -> None: """Streaming chat completion (SSE). Scenario filled in Phase 3.""" payload = { "model": MODEL, "messages": [{"role": "user", "content": "stream please"}], "stream": True, } with self.client.post( "/v1/chat/completions", json=payload, headers=self._auth_headers, name="/v1/chat/completions [stream]", catch_response=True, ) as resp: if resp.status_code >= 500: resp.failure(f"server error: {resp.status_code}") else: resp.success()