Files
psyc/tests/test_topology_export.py

314 lines
12 KiB
Python

"""Topology export — whitelist sanitization + endpoint contract.
The big invariant: nothing from docker_view.topology() escapes that isn't
in the Pydantic schema. We assert via model_fields introspection AND via a
JSON-dump scan over a fixture that contains every dangerous field.
"""
from __future__ import annotations
import json
from typing import Any, Dict, List
from unittest.mock import patch
import pytest
from fastapi import FastAPI
from fastapi.templating import Jinja2Templates
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from starlette.middleware.sessions import SessionMiddleware
from psyc import db
from psyc.cockpit import docker_view, federation_routes
from psyc.lines import federation, topology_export
from psyc.lines.topology_export import (
TopologyContainer,
TopologyExport,
TopologyNetwork,
_filter_image_name,
build_export,
)
# ---------- fixtures ----------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
@pytest.fixture(autouse=True)
def reset_topology_cache():
if hasattr(federation_routes, "_TOPOLOGY_CACHE"):
federation_routes._TOPOLOGY_CACHE["payload"] = None
federation_routes._TOPOLOGY_CACHE["ts"] = 0.0
yield
# ---------- fixture data: hostile docker_view output --------------------
# This payload has every leaky field docker_view *could* surface, plus
# nested env-style data — used to prove the export is whitelist-only.
_LEAKY_TOPOLOGY: Dict[str, Any] = {
"containers": [
{
"id": "abcdef1234567890ffff",
"name": "psyc-cockpit-1",
"image": "registry.example/psyc:1.2",
"state": "running",
"status": "Up 5 minutes (healthy)",
"networks": [
{"name": "backend", "ip": "172.20.0.5", "gateway": "172.20.0.1", "mac": "02:42:ac:14:00:05"},
{"name": "frontend", "ip": "172.21.0.7", "gateway": "172.21.0.1", "mac": "02:42:ac:15:00:07"},
],
"ports": ["0.0.0.0:8767->8767/tcp"],
"published_ports": ["8767/tcp"],
# These are NOT current docker_view fields but defend in depth —
# if a future docker_view change adds them, sanitizer drops them.
"env": ["SECRET_TOKEN=abc123", "DB_PASSWORD=hunter2"],
"mounts": ["/var/run/docker.sock", "/etc/secrets:/secrets"],
"labels": {"com.docker.compose.project": "psyc", "secret_label": "shh"},
},
{
"id": "fedcba0987654321",
"name": "some-stopped",
"image": "alpine",
"state": "exited",
"status": "Exited (0) 2 hours ago",
"networks": [],
"ports": [],
"published_ports": [],
},
],
"networks": [
{
"id": "n1", "name": "backend", "driver": "bridge", "scope": "local",
"internal": False, "subnet": "172.20.0.0/16", "gateway": "172.20.0.1",
"containers": [
{"id": "abcdef123456", "name": "psyc-cockpit-1", "ip": "172.20.0.5", "mac": "02:42:ac:14:00:05"},
],
},
{
"id": "n2", "name": "internal-only", "driver": "bridge", "scope": "local",
"internal": True, "subnet": "10.99.0.0/16", "gateway": "10.99.0.1",
"containers": [],
},
],
"host": {"name": "docker-host-secret-internal.example.com", "os": "linux", "ncpu": 8},
"error": None,
"proxy": "http://docker-socket-proxy:2375",
}
# Sensitive strings that MUST NOT appear anywhere in the export JSON.
_FORBIDDEN_STRINGS = (
"SECRET_TOKEN", "DB_PASSWORD", "hunter2", "abc123",
"/var/run/docker.sock", "/etc/secrets",
"secret_label", "shh",
"172.20.0.5", "172.21.0.7", # IPs
"02:42:ac", # MAC prefix
"172.20.0.1", # gateway
"172.20.0.0/16", "10.99.0.0/16", # subnets
"0.0.0.0:8767", # port mapping
"internal.example.com", # full host
)
# ---------- model field introspection -----------------------------------
def test_container_model_has_no_dangerous_fields():
fields = set(TopologyContainer.model_fields.keys())
# whitelist — must match the design contract exactly
assert fields == {
"name", "short_id", "image", "state", "health",
"networks", "service", "started_at",
}
# explicit deny-list, double-belt
for forbidden in ("env", "environment", "mounts", "volumes",
"labels", "ip", "ip_address", "ipaddress",
"ports", "published_ports", "mac", "gateway"):
assert forbidden not in fields, f"{forbidden} must not be a field"
def test_network_model_has_no_dangerous_fields():
fields = set(TopologyNetwork.model_fields.keys())
assert fields == {"name", "driver", "internal", "container_count"}
for forbidden in ("subnet", "gateway", "labels", "ipam",
"containers", "scope", "id"):
assert forbidden not in fields, f"{forbidden} must not be a field"
def test_export_model_top_level_fields():
fields = set(TopologyExport.model_fields.keys())
assert fields == {
"node_fingerprint", "generated_at", "host_name",
"container_count", "network_count", "containers", "networks",
}
# ---------- image-name filter -------------------------------------------
def test_filter_image_strips_basic_auth_prefix():
# user:pass@host/repo:tag → host/repo:tag (creds gone)
assert _filter_image_name("user:pass@host/repo:tag") == "host/repo:tag"
def test_filter_image_drops_digest_suffix():
assert _filter_image_name(
"nginx:1.25@sha256:abcdef0123"
) == "nginx:1.25"
def test_filter_image_passes_clean_refs_untouched():
assert _filter_image_name("psyc:latest") == "psyc:latest"
assert _filter_image_name(
"ghcr.io/example/psyc:v0.3.1"
) == "ghcr.io/example/psyc:v0.3.1"
def test_filter_image_handles_empty():
assert _filter_image_name("") == ""
assert _filter_image_name(None) == "" # type: ignore[arg-type]
# ---------- build_export contract ---------------------------------------
def test_build_export_returns_empty_when_docker_view_raises(fresh_db, fed_dir, monkeypatch):
def boom():
raise docker_view.DockerProxyError("connection refused")
monkeypatch.setattr(docker_view, "topology", boom)
out = build_export()
assert isinstance(out, TopologyExport)
assert out.container_count == 0
assert out.containers == []
assert out.networks == []
# fingerprint is still real (federation key was generated)
assert len(out.node_fingerprint) == 32
def test_build_export_returns_empty_when_docker_view_reports_error(fresh_db, fed_dir, monkeypatch):
monkeypatch.setattr(docker_view, "topology", lambda: {
"containers": [], "networks": [], "host": {"name": "x"},
"error": "containers: refused", "proxy": "x",
})
out = build_export()
assert out.container_count == 0
assert out.containers == []
def test_build_export_sanitizes_every_field(fresh_db, fed_dir, monkeypatch):
monkeypatch.setattr(docker_view, "topology", lambda: _LEAKY_TOPOLOGY)
out = build_export()
# Containers came through, but as TopologyContainer (no leaky attrs).
assert out.container_count == 2
by_name = {c.name: c for c in out.containers}
cp = by_name["psyc-cockpit-1"]
assert cp.short_id == "abcdef123456"
assert cp.image == "registry.example/psyc:1.2"
assert cp.state == "running"
assert cp.health == "healthy"
assert cp.networks == ["backend", "frontend"]
assert cp.service is None
# Networks came through, sanitized.
assert out.network_count == 2
by_net = {n.name: n for n in out.networks}
assert by_net["backend"].driver == "bridge"
assert by_net["backend"].internal is False
assert by_net["backend"].container_count == 1
assert by_net["internal-only"].internal is True
def test_export_json_contains_no_dangerous_strings(fresh_db, fed_dir, monkeypatch):
"""Strict no-leak: serialize and grep for everything sensitive."""
monkeypatch.setattr(docker_view, "topology", lambda: _LEAKY_TOPOLOGY)
out = build_export()
blob = json.dumps(out.model_dump(mode="json"))
for forbidden in _FORBIDDEN_STRINGS:
assert forbidden not in blob, f"leak: {forbidden!r} appeared in export JSON"
def test_build_export_caps_at_max_containers(fresh_db, fed_dir, monkeypatch):
fake = {
"containers": [
{"id": f"id{i:04d}", "name": f"c{i}", "image": "x", "state": "running", "status": "Up", "networks": []}
for i in range(topology_export.MAX_CONTAINERS + 50)
],
"networks": [], "host": {"name": "h"}, "error": None, "proxy": "",
}
monkeypatch.setattr(docker_view, "topology", lambda: fake)
out = build_export()
assert out.container_count == topology_export.MAX_CONTAINERS
# ---------- HTTP endpoint -----------------------------------------------
def _mk_app() -> FastAPI:
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="test-secret")
from pathlib import Path as _Path
here = _Path(__file__).resolve().parent.parent / "src" / "psyc" / "cockpit" / "templates"
templates = Jinja2Templates(directory=str(here))
federation_routes.register(app, templates)
return app
def test_federation_topology_endpoint_returns_json_with_cors(fresh_db, fed_dir, monkeypatch):
monkeypatch.setattr(docker_view, "topology", lambda: _LEAKY_TOPOLOGY)
client = TestClient(_mk_app())
r = client.get("/federation/topology")
assert r.status_code == 200
assert r.headers.get("access-control-allow-origin") == "*"
data = r.json()
# Schema check.
for key in ("node_fingerprint", "generated_at", "host_name",
"container_count", "network_count", "containers", "networks"):
assert key in data
assert data["container_count"] == 2
assert len(data["containers"]) == 2
# No leaks in the wire response either.
blob = r.text
for forbidden in _FORBIDDEN_STRINGS:
assert forbidden not in blob, f"leak via endpoint: {forbidden!r}"
def test_federation_topology_endpoint_resilient_when_docker_unavailable(fresh_db, fed_dir, monkeypatch):
def boom():
raise docker_view.DockerProxyError("proxy down")
monkeypatch.setattr(docker_view, "topology", boom)
client = TestClient(_mk_app())
r = client.get("/federation/topology")
assert r.status_code == 200
data = r.json()
assert data["container_count"] == 0
assert data["containers"] == []
def test_federation_topology_cache_short_circuits_repeated_calls(fresh_db, fed_dir, monkeypatch):
"""Within TTL, a second hit must not re-call docker_view."""
calls = {"n": 0}
def counted():
calls["n"] += 1
return _LEAKY_TOPOLOGY
monkeypatch.setattr(docker_view, "topology", counted)
client = TestClient(_mk_app())
r1 = client.get("/federation/topology")
r2 = client.get("/federation/topology")
assert r1.status_code == 200 and r2.status_code == 200
assert calls["n"] == 1, "cache should suppress the second docker_view call"