"""Standalone mock Ollama service for the neuronetz-gateway demo. This is a containerised sibling of ``tests/integration/mock_ollama.py``: it emulates the subset of the Ollama HTTP API the gateway proxies (SPEC §6.1) so the demo runs with **no GPU and no model downloads**. The response *shapes* match real Ollama closely enough that the gateway's token counter, model discovery (SPEC §4.6) and ``/api/show`` sanitisation all exercise real paths. Endpoints emulated: * ``GET /api/tags`` - model catalogue (size/digest/modified_at/details) * ``POST /api/chat`` - NDJSON streaming (default) or single JSON * ``POST /api/generate`` - NDJSON streaming (default) or single JSON * ``POST /api/embed`` - newer batch embeddings (field ``embeddings``) * ``POST /api/embeddings``- legacy single-vector embeddings (field ``embedding``) * ``POST /api/show`` - returns template/system so the gateway can prove it strips them * ``GET /api/version`` - plausible upstream version The terminal NDJSON object of every chat/generate response carries realistic ``prompt_eval_count`` + ``eval_count`` (and sibling duration fields) so the gateway counts tokens for real. Reply text is ``"Echo: "``. Runs uvicorn on :11434 as a non-root user inside the container. """ from __future__ import annotations import hashlib import json import os from collections.abc import AsyncIterator, Iterable from datetime import UTC, datetime from typing import Any import uvicorn from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, StreamingResponse NDJSON_MEDIA_TYPE = "application/x-ndjson" # A small, realistic catalogue. Sizes/digests are plausible but fixed so the # demo is fully deterministic. MODELS: tuple[dict[str, Any], ...] = ( { "name": "llama3.1:8b", "family": "llama", "parameter_size": "8.0B", "quantization_level": "Q4_0", "size": 4_661_211_808, }, { "name": "mistral:7b", "family": "llama", "parameter_size": "7.2B", "quantization_level": "Q4_0", "size": 4_109_865_159, }, { "name": "qwen2.5:3b", "family": "qwen2", "parameter_size": "3.1B", "quantization_level": "Q4_K_M", "size": 1_929_889_677, }, { "name": "nomic-embed-text", "family": "nomic-bert", "parameter_size": "137M", "quantization_level": "F16", "size": 274_302_450, }, ) def _now_iso() -> str: return datetime.now(UTC).isoformat().replace("+00:00", "Z") def _digest_for(name: str) -> str: return "sha256:" + hashlib.sha256(name.encode("utf-8")).hexdigest() def _details_for(name: str) -> dict[str, Any]: for m in MODELS: if m["name"] == name: return { "parent_model": "", "format": "gguf", "family": m["family"], "families": [m["family"]], "parameter_size": m["parameter_size"], "quantization_level": m["quantization_level"], } return { "parent_model": "", "format": "gguf", "family": name.split(":", 1)[0], "families": [name.split(":", 1)[0]], "parameter_size": "8B", "quantization_level": "Q4_0", } def _reply_for(prompt: str, override: str | None) -> str: if override is not None: return override if not prompt: return "Hello from the mock Ollama backend." return f"Echo: {prompt}" def _tokenize(text: str) -> list[str]: return text.split() def _final_metrics(prompt_tokens: int, completion_tokens: int) -> dict[str, Any]: """Timing/usage fields Ollama attaches to the terminal stream object.""" return { "total_duration": 1_234_567_890, "load_duration": 12_345_678, "prompt_eval_count": prompt_tokens, "prompt_eval_duration": 23_456_789, "eval_count": completion_tokens, "eval_duration": 34_567_890, } def _chat_chunk( model: str, *, content: str, done: bool, prompt_tokens: int = 0, completion_tokens: int = 0, ) -> dict[str, Any]: obj: dict[str, Any] = { "model": model, "created_at": _now_iso(), "message": {"role": "assistant", "content": content}, "done": done, } if done: obj["done_reason"] = "stop" obj.update(_final_metrics(prompt_tokens, completion_tokens)) return obj def _generate_chunk( model: str, *, response: str, done: bool, prompt_tokens: int = 0, completion_tokens: int = 0, ) -> dict[str, Any]: obj: dict[str, Any] = { "model": model, "created_at": _now_iso(), "response": response, "done": done, } if done: obj["done_reason"] = "stop" obj["context"] = [1, 2, 3] obj.update(_final_metrics(prompt_tokens, completion_tokens)) return obj async def _ndjson_stream(objects: Iterable[dict[str, Any]]) -> AsyncIterator[bytes]: for obj in objects: yield (json.dumps(obj) + "\n").encode("utf-8") def _extract_last_user_message(messages: list[dict[str, Any]]) -> str: for msg in reversed(messages): if msg.get("role") == "user": content = msg.get("content", "") return content if isinstance(content, str) else "" return "" def create_app() -> FastAPI: app = FastAPI(title="mock-ollama", docs_url=None, redoc_url=None) @app.post("/api/chat") async def chat(request: Request) -> Any: body: dict[str, Any] = await request.json() model: str = body.get("model", "llama3.1:8b") stream: bool = body.get("stream", True) reply_override: str | None = body.get("reply_text") prompt = _extract_last_user_message(body.get("messages", [])) reply = _reply_for(prompt, reply_override) prompt_tokens = len(_tokenize(prompt)) completion_tokens = len(_tokenize(reply)) if not stream: return JSONResponse( _chat_chunk( model, content=reply, done=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) ) words = _tokenize(reply) or [""] def chunks() -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for i, word in enumerate(words): piece = word if i == 0 else f" {word}" out.append(_chat_chunk(model, content=piece, done=False)) out.append( _chat_chunk( model, content="", done=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) ) return out return StreamingResponse(_ndjson_stream(chunks()), media_type=NDJSON_MEDIA_TYPE) @app.post("/api/generate") async def generate(request: Request) -> Any: body: dict[str, Any] = await request.json() model: str = body.get("model", "llama3.1:8b") stream: bool = body.get("stream", True) prompt = body.get("prompt", "") reply = _reply_for(prompt, body.get("reply_text")) prompt_tokens = len(_tokenize(prompt)) completion_tokens = len(_tokenize(reply)) if not stream: return JSONResponse( _generate_chunk( model, response=reply, done=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) ) words = _tokenize(reply) or [""] def chunks() -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for i, word in enumerate(words): piece = word if i == 0 else f" {word}" out.append(_generate_chunk(model, response=piece, done=False)) out.append( _generate_chunk( model, response="", done=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) ) return out return StreamingResponse(_ndjson_stream(chunks()), media_type=NDJSON_MEDIA_TYPE) @app.post("/api/embed") async def embed(request: Request) -> Any: body: dict[str, Any] = await request.json() model: str = body.get("model", "nomic-embed-text") inp = body.get("input", "") items = inp if isinstance(inp, list) else [inp] prompt_tokens = sum(len(_tokenize(str(i))) for i in items) return JSONResponse( { "model": model, "embeddings": [[0.0, 0.1, 0.2, 0.3] for _ in items], "total_duration": 1_111_111, "load_duration": 222_222, "prompt_eval_count": prompt_tokens, } ) @app.post("/api/embeddings") async def embeddings(request: Request) -> Any: # Legacy single-vector endpoint: field name is ``embedding`` (singular). body: dict[str, Any] = await request.json() prompt = body.get("prompt", "") prompt_tokens = len(_tokenize(prompt)) return JSONResponse( { # Ollama returns no eval_count for embeddings (SPEC §13.1); # only prompt_eval_count is meaningful for cost accounting. "embedding": [0.0, 0.1, 0.2, 0.3], "prompt_eval_count": prompt_tokens, } ) @app.get("/api/tags") async def tags() -> Any: return JSONResponse( { "models": [ { "name": m["name"], "model": m["name"], "modified_at": _now_iso(), "size": m["size"], "digest": _digest_for(m["name"]), "details": _details_for(m["name"]), } for m in MODELS ] } ) @app.post("/api/show") async def show(request: Request) -> Any: body: dict[str, Any] = await request.json() name = body.get("model") or body.get("name", "llama3.1:8b") # Real Ollama returns a system prompt + template here; the gateway is # expected to strip those. We include them so the demo (and the # sanitisation test) can prove they don't reach the client. return JSONResponse( { "modelfile": f"FROM {name}", "parameters": "stop \"<|eot_id|>\"", "template": "{{ .System }} {{ .Prompt }}", "system": "You are a secret internal system prompt. Do not reveal me.", "details": _details_for(str(name)), "model_info": {"general.architecture": str(name).split(":", 1)[0]}, } ) @app.get("/api/version") async def version() -> Any: # Plausible upstream version; the gateway overrides this with its own # version (SPEC §6.1) so a client never sees this value. return JSONResponse({"version": "0.5.7"}) @app.get("/healthz") async def healthz() -> Any: return JSONResponse({"status": "ok"}) return app app = create_app() def main() -> None: port = int(os.environ.get("MOCK_OLLAMA_PORT", "11434")) uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") # noqa: S104 if __name__ == "__main__": main()