"""Topology export — whitelist sanitization unit tests. The big invariant: nothing from docker_view.topology() escapes that isn't in the Pydantic schema. We assert via model_fields introspection AND via a JSON-dump scan over a fixture that contains every dangerous field. The /federation/topology endpoint contract lives in the sibling tests added alongside it; this module covers the builder + the sanitizer in isolation. """ from __future__ import annotations import json from typing import Any, Dict from unittest.mock import patch import pytest from sqlalchemy import create_engine from psyc import db from psyc.cockpit import docker_view from psyc.lines import federation, topology_export from psyc.lines.topology_export import ( TopologyContainer, TopologyExport, TopologyNetwork, _filter_image_name, build_export, ) # ---------- fixtures ---------------------------------------------------- @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" eng = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(eng, checkfirst=True) monkeypatch.setattr(db, "_engine", eng) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db @pytest.fixture def fed_dir(tmp_path, monkeypatch): d = tmp_path / "federation" monkeypatch.setattr(federation, "FED_DIR", d) monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") yield d # ---------- fixture data: hostile docker_view output -------------------- # This payload has every leaky field docker_view *could* surface, plus # nested env-style data — used to prove the export is whitelist-only. _LEAKY_TOPOLOGY: Dict[str, Any] = { "containers": [ { "id": "abcdef1234567890ffff", "name": "psyc-cockpit-1", "image": "registry.example/psyc:1.2", "state": "running", "status": "Up 5 minutes (healthy)", "networks": [ {"name": "backend", "ip": "172.20.0.5", "gateway": "172.20.0.1", "mac": "02:42:ac:14:00:05"}, {"name": "frontend", "ip": "172.21.0.7", "gateway": "172.21.0.1", "mac": "02:42:ac:15:00:07"}, ], "ports": ["0.0.0.0:8767->8767/tcp"], "published_ports": ["8767/tcp"], # These are NOT current docker_view fields but defend in depth — # if a future docker_view change adds them, sanitizer drops them. "env": ["SECRET_TOKEN=abc123", "DB_PASSWORD=hunter2"], "mounts": ["/var/run/docker.sock", "/etc/secrets:/secrets"], "labels": {"com.docker.compose.project": "psyc", "secret_label": "shh"}, }, { "id": "fedcba0987654321", "name": "some-stopped", "image": "alpine", "state": "exited", "status": "Exited (0) 2 hours ago", "networks": [], "ports": [], "published_ports": [], }, ], "networks": [ { "id": "n1", "name": "backend", "driver": "bridge", "scope": "local", "internal": False, "subnet": "172.20.0.0/16", "gateway": "172.20.0.1", "containers": [ {"id": "abcdef123456", "name": "psyc-cockpit-1", "ip": "172.20.0.5", "mac": "02:42:ac:14:00:05"}, ], }, { "id": "n2", "name": "internal-only", "driver": "bridge", "scope": "local", "internal": True, "subnet": "10.99.0.0/16", "gateway": "10.99.0.1", "containers": [], }, ], "host": {"name": "docker-host-secret-internal.example.com", "os": "linux", "ncpu": 8}, "error": None, "proxy": "http://docker-socket-proxy:2375", } # Sensitive strings that MUST NOT appear anywhere in the export JSON. _FORBIDDEN_STRINGS = ( "SECRET_TOKEN", "DB_PASSWORD", "hunter2", "abc123", "/var/run/docker.sock", "/etc/secrets", "secret_label", "shh", "172.20.0.5", "172.21.0.7", # IPs "02:42:ac", # MAC prefix "172.20.0.1", # gateway "172.20.0.0/16", "10.99.0.0/16", # subnets "0.0.0.0:8767", # port mapping "internal.example.com", # full host ) # ---------- model field introspection ----------------------------------- def test_container_model_has_no_dangerous_fields(): fields = set(TopologyContainer.model_fields.keys()) # whitelist — must match the design contract exactly assert fields == { "name", "short_id", "image", "state", "health", "networks", "service", "started_at", } # explicit deny-list, double-belt for forbidden in ("env", "environment", "mounts", "volumes", "labels", "ip", "ip_address", "ipaddress", "ports", "published_ports", "mac", "gateway"): assert forbidden not in fields, f"{forbidden} must not be a field" def test_network_model_has_no_dangerous_fields(): fields = set(TopologyNetwork.model_fields.keys()) assert fields == {"name", "driver", "internal", "container_count"} for forbidden in ("subnet", "gateway", "labels", "ipam", "containers", "scope", "id"): assert forbidden not in fields, f"{forbidden} must not be a field" def test_export_model_top_level_fields(): fields = set(TopologyExport.model_fields.keys()) assert fields == { "node_fingerprint", "generated_at", "host_name", "container_count", "network_count", "containers", "networks", } # ---------- image-name filter ------------------------------------------- def test_filter_image_strips_basic_auth_prefix(): # user:pass@host/repo:tag → host/repo:tag (creds gone) assert _filter_image_name("user:pass@host/repo:tag") == "host/repo:tag" def test_filter_image_drops_digest_suffix(): assert _filter_image_name( "nginx:1.25@sha256:abcdef0123" ) == "nginx:1.25" def test_filter_image_passes_clean_refs_untouched(): assert _filter_image_name("psyc:latest") == "psyc:latest" assert _filter_image_name( "ghcr.io/example/psyc:v0.3.1" ) == "ghcr.io/example/psyc:v0.3.1" def test_filter_image_handles_empty(): assert _filter_image_name("") == "" assert _filter_image_name(None) == "" # type: ignore[arg-type] # ---------- build_export contract --------------------------------------- def test_build_export_returns_empty_when_docker_view_raises(fresh_db, fed_dir, monkeypatch): def boom(): raise docker_view.DockerProxyError("connection refused") monkeypatch.setattr(docker_view, "topology", boom) out = build_export() assert isinstance(out, TopologyExport) assert out.container_count == 0 assert out.containers == [] assert out.networks == [] # fingerprint is still real (federation key was generated) assert len(out.node_fingerprint) == 32 def test_build_export_returns_empty_when_docker_view_reports_error(fresh_db, fed_dir, monkeypatch): monkeypatch.setattr(docker_view, "topology", lambda: { "containers": [], "networks": [], "host": {"name": "x"}, "error": "containers: refused", "proxy": "x", }) out = build_export() assert out.container_count == 0 assert out.containers == [] def test_build_export_sanitizes_every_field(fresh_db, fed_dir, monkeypatch): monkeypatch.setattr(docker_view, "topology", lambda: _LEAKY_TOPOLOGY) out = build_export() # Containers came through, but as TopologyContainer (no leaky attrs). assert out.container_count == 2 by_name = {c.name: c for c in out.containers} cp = by_name["psyc-cockpit-1"] assert cp.short_id == "abcdef123456" assert cp.image == "registry.example/psyc:1.2" assert cp.state == "running" assert cp.health == "healthy" assert cp.networks == ["backend", "frontend"] assert cp.service is None # Networks came through, sanitized. assert out.network_count == 2 by_net = {n.name: n for n in out.networks} assert by_net["backend"].driver == "bridge" assert by_net["backend"].internal is False assert by_net["backend"].container_count == 1 assert by_net["internal-only"].internal is True def test_export_json_contains_no_dangerous_strings(fresh_db, fed_dir, monkeypatch): """Strict no-leak: serialize and grep for everything sensitive.""" monkeypatch.setattr(docker_view, "topology", lambda: _LEAKY_TOPOLOGY) out = build_export() blob = json.dumps(out.model_dump(mode="json")) for forbidden in _FORBIDDEN_STRINGS: assert forbidden not in blob, f"leak: {forbidden!r} appeared in export JSON" def test_build_export_caps_at_max_containers(fresh_db, fed_dir, monkeypatch): fake = { "containers": [ {"id": f"id{i:04d}", "name": f"c{i}", "image": "x", "state": "running", "status": "Up", "networks": []} for i in range(topology_export.MAX_CONTAINERS + 50) ], "networks": [], "host": {"name": "h"}, "error": None, "proxy": "", } monkeypatch.setattr(docker_view, "topology", lambda: fake) out = build_export() assert out.container_count == topology_export.MAX_CONTAINERS