"""Network view — local + transitive + public payload tests.""" from __future__ import annotations import base64 import json from datetime import datetime, timedelta, timezone from typing import Any, Dict from unittest.mock import patch import pytest from sqlalchemy import create_engine from psyc import db from psyc.lines import federation, network_view, translog from psyc.lines.network_view import ( NetworkEdge, NetworkNode, NetworkView, build_admin_view, build_explore_view, build_local_view, build_public_view, build_transitive_view, ) # ---------- fixtures ---------------------------------------------------- @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" eng = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(eng, checkfirst=True) monkeypatch.setattr(db, "_engine", eng) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db @pytest.fixture def fed_dir(tmp_path, monkeypatch): d = tmp_path / "federation" monkeypatch.setattr(federation, "FED_DIR", d) monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") yield d @pytest.fixture(autouse=True) def reset_transitive_cache(monkeypatch): """Prevent cache bleed between tests.""" monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None}) yield def _make_peer_pubkey() -> tuple[str, str]: """Return (fingerprint, pubkey_pem) for a synthetic peer keypair.""" import hashlib from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 priv = ed25519.Ed25519PrivateKey.generate() pub = priv.public_key() pem = pub.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode("ascii") raw = pub.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw, ) fp = hashlib.sha256(raw).digest()[:16].hex() return fp, pem # ---------- local view -------------------------------------------------- def test_local_view_empty_registry_yields_only_self(fresh_db, fed_dir): view = build_local_view() assert isinstance(view, NetworkView) assert len(view.nodes) == 1 self_node = view.nodes[0] assert self_node.is_self is True assert self_node.distance == 0 assert self_node.status == "self" assert self_node.fingerprint == federation.node_fingerprint() assert view.edges == [] assert view.stats["total_peers"] == 0 assert view.stats["vouched_peers"] == 0 assert view.stats["signals_buffered_24h"] == 0 def test_local_view_one_trusted_peer_no_edges(fresh_db, fed_dir): peer_fp, peer_pem = _make_peer_pubkey() federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") view = build_local_view() assert len(view.nodes) == 2 peer_node = next(n for n in view.nodes if not n.is_self) assert peer_node.fingerprint == peer_fp assert peer_node.status == "trusted" assert peer_node.distance == 1 assert peer_node.domain == "peer.example" assert view.edges == [] assert view.stats["total_peers"] == 1 assert view.stats["vouched_peers"] == 1 def test_local_view_outbound_vouch_creates_edge(fresh_db, fed_dir): peer_fp, peer_pem = _make_peer_pubkey() federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") federation.issue_vouch(peer_fp, ttl_days=30) view = build_local_view() vouch_edges = [e for e in view.edges if e.kind == "vouch"] assert len(vouch_edges) == 1 e = vouch_edges[0] assert e.source_fingerprint == federation.node_fingerprint() assert e.target_fingerprint == peer_fp assert view.stats["vouches_issued"] == 1 def test_local_view_inbound_vouch_creates_edge(fresh_db, fed_dir): """Vouches received that name us as target → peer → self edge.""" peer_fp, peer_pem = _make_peer_pubkey() federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") # Insert a vouch where peer vouches FOR us, bypassing accept_vouch (which # we don't need to exercise here — the question is render shape). our_fp = federation.node_fingerprint() now = datetime.now(timezone.utc) db.upsert_vouch(dict( voucher_fingerprint=peer_fp, target_fingerprint=our_fp, issued_at=now.isoformat(), expires_at=(now + timedelta(days=30)).isoformat(), signature="x" * 88, )) view = build_local_view() vouch_edges = [e for e in view.edges if e.kind == "vouch"] assert len(vouch_edges) == 1 e = vouch_edges[0] assert e.source_fingerprint == peer_fp assert e.target_fingerprint == our_fp def test_local_view_bidirectional_vouches_collapse(fresh_db, fed_dir): peer_fp, peer_pem = _make_peer_pubkey() federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") federation.issue_vouch(peer_fp, ttl_days=30) # And peer vouches back at us. our_fp = federation.node_fingerprint() now = datetime.now(timezone.utc) db.upsert_vouch(dict( voucher_fingerprint=peer_fp, target_fingerprint=our_fp, issued_at=now.isoformat(), expires_at=(now + timedelta(days=30)).isoformat(), signature="x" * 88, )) view = build_local_view() vouch_edges = [e for e in view.edges if e.kind == "vouch"] assert len(vouch_edges) == 1 assert vouch_edges[0].bidirectional is True def test_local_view_signal_edge_weight_is_24h_count(fresh_db, fed_dir): peer_fp, peer_pem = _make_peer_pubkey() federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() # Three signals from this peer within the window. for i in range(3): db.record_signal(dict( peer_fingerprint=peer_fp, signal_type="ioc", signal_id=f"1.2.3.{i}", signal_hash=f"hash-{i}", received_at=now_iso, raw_json="{}", )) # One stale signal outside the window — must be ignored. stale = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat() db.record_signal(dict( peer_fingerprint=peer_fp, signal_type="ioc", signal_id="9.9.9.9", signal_hash="stale", received_at=stale, raw_json="{}", )) view = build_local_view() sig_edges = [e for e in view.edges if e.kind == "signal"] assert len(sig_edges) == 1 assert sig_edges[0].weight == 3.0 assert sig_edges[0].source_fingerprint == peer_fp assert sig_edges[0].target_fingerprint == federation.node_fingerprint() assert view.stats["signals_buffered_24h"] == 3 assert view.stats["distinct_signal_hashes_24h"] == 3 def test_local_view_blocked_peer_renders_with_blocked_status(fresh_db, fed_dir): fp, pem = _make_peer_pubkey() federation.register_peer("blocked.example", fp, pem, status="blocked") view = build_local_view() peer = next(n for n in view.nodes if not n.is_self) assert peer.status == "blocked" # ---------- public view + signature round-trip -------------------------- def test_public_view_excludes_unknown_and_blocked(fresh_db, fed_dir): fp_t, pem_t = _make_peer_pubkey() fp_u, pem_u = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("trusted.example", fp_t, pem_t, status="trusted") federation.register_peer("unknown.example", fp_u, pem_u, status="unknown") federation.register_peer("blocked.example", fp_b, pem_b, status="blocked") payload = build_public_view() fps = {p["fingerprint"] for p in payload["peers"]} assert fp_t in fps assert fp_u not in fps assert fp_b not in fps def test_public_view_signature_round_trip(fresh_db, fed_dir): fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") federation.issue_vouch(fp, ttl_days=30) payload = build_public_view() assert "signature" in payload assert payload["fingerprint"] == federation.node_fingerprint() sig = base64.b64decode(payload["signature"]) unsigned = {k: v for k, v in payload.items() if k != "signature"} assert federation.verify_payload( federation.canonical_json(unsigned), sig, federation.public_key_pem(), ) is True # Vouch we issued is in the payload. targets = {v["target_fingerprint"] for v in payload["vouches"]} assert fp in targets def test_public_view_omits_signals(fresh_db, fed_dir): """Public payload must not leak who's reporting what.""" fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="1.2.3.4", signal_hash="secret-hash", received_at=datetime.now(timezone.utc).isoformat(), raw_json="{}", )) payload = build_public_view() # No signal-shaped fields anywhere in the payload. flat = str(payload) assert "secret-hash" not in flat assert "signals" not in payload # ---------- transitive view --------------------------------------------- def test_transitive_view_adds_distance_2_nodes(fresh_db, fed_dir): direct_fp, direct_pem = _make_peer_pubkey() federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") # The peer reports two peers of its own. far_fp_a, _ = _make_peer_pubkey() far_fp_b, _ = _make_peer_pubkey() fake_payload: Dict[str, Any] = { "fingerprint": direct_fp, "peers": [ {"fingerprint": far_fp_a, "domain": "far-a.example"}, {"fingerprint": far_fp_b, "domain": "far-b.example"}, ], "vouches": [], } with patch.object(network_view, "_fetch_peer_network", return_value=fake_payload): view = build_transitive_view(force_refresh=True) distances = sorted(n.distance for n in view.nodes) assert 0 in distances and 1 in distances and 2 in distances transitive_fps = {n.fingerprint for n in view.nodes if n.distance == 2} assert far_fp_a in transitive_fps assert far_fp_b in transitive_fps # "knows" edges from direct peer to each transitive. knows = [e for e in view.edges if e.kind == "knows"] assert len(knows) == 2 assert all(e.source_fingerprint == direct_fp for e in knows) assert view.stats["transitive_nodes"] == 2 def test_transitive_view_failed_fetch_does_not_abort(fresh_db, fed_dir): fp_a, pem_a = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("peer-a.example", fp_a, pem_a, status="trusted") federation.register_peer("peer-b.example", fp_b, pem_b, status="trusted") far_fp, _ = _make_peer_pubkey() def fake_fetch(domain, timeout=4.0): if domain == "peer-a.example": return None # simulate a fetch failure return { "fingerprint": fp_b, "peers": [{"fingerprint": far_fp, "domain": "far.example"}], "vouches": [], } with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch): view = build_transitive_view(force_refresh=True) # Direct nodes both present, transitive only from B. assert any(n.fingerprint == fp_a for n in view.nodes) assert any(n.fingerprint == fp_b for n in view.nodes) assert any(n.fingerprint == far_fp and n.distance == 2 for n in view.nodes) assert view.stats["transitive_nodes"] == 1 def test_transitive_view_skips_only_unknown_peers(fresh_db, fed_dir): """Unknown peers shouldn't be queried — fetcher only called for trusted.""" fp_unknown, pem_u = _make_peer_pubkey() fp_trusted, pem_t = _make_peer_pubkey() federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown") federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted") calls = [] def fake_fetch(domain, timeout=4.0): calls.append(domain) return None with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch): build_transitive_view(force_refresh=True) assert "trusted.example" in calls assert "unknown.example" not in calls # ---------- admin view: per-peer enrichment + corroboration + timeline --- def _no_transitive(): """patch.object helper — silence network fetches in admin-view tests.""" return patch.object(network_view, "_fetch_peer_network", return_value=None) def test_admin_view_includes_stats_on_peer_nodes(fresh_db, fed_dir): """Every non-self node must carry a `stats` dict in the admin view.""" fp, pem = _make_peer_pubkey() federation.register_peer("peer.example", fp, pem, status="trusted") with _no_transitive(): view = build_admin_view(include_transitive=False) self_nodes = [n for n in view["nodes"] if n["is_self"]] peer_nodes = [n for n in view["nodes"] if not n["is_self"]] assert len(self_nodes) == 1 assert len(peer_nodes) == 1 # Self has no stats; peers do. assert self_nodes[0]["stats"] is None peer_stats = peer_nodes[0]["stats"] assert isinstance(peer_stats, dict) for key in ( "signals_24h", "signals_total", "cases_24h", "iocs_24h", "severity_breakdown", "ioc_type_breakdown", "vouches_in_count", "vouches_out_count", "quorum_contribution", "last_seen", "last_seen_relative", "recent_translog", ): assert key in peer_stats, f"missing {key}" # last_seen is None when no signals have landed yet. assert peer_stats["last_seen"] is None assert peer_stats["last_seen_relative"] == "—" def test_admin_view_signals_24h_excludes_stale(fresh_db, fed_dir): """signals_24h must count only rows inside the 24h window.""" fp, pem = _make_peer_pubkey() federation.register_peer("peer.example", fp, pem, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() stale_iso = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat() for i in range(3): db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id=f"v{i}", signal_hash=f"h{i}", received_at=now_iso, raw_json="{}", )) db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="stale", signal_hash="stale-hash", received_at=stale_iso, raw_json="{}", )) with _no_transitive(): view = build_admin_view(include_transitive=False) peer = next(n for n in view["nodes"] if not n["is_self"]) assert peer["stats"]["signals_24h"] == 3 # All-time total still sees the stale row. assert peer["stats"]["signals_total"] == 4 # last_seen is populated and the relative is a short string. assert peer["stats"]["last_seen"] is not None assert peer["stats"]["last_seen_relative"] != "—" def test_admin_view_severity_and_ioc_breakdown_from_raw_json(fresh_db, fed_dir): """severity_breakdown is read from raw_json for case rows, ioc_type for ioc rows.""" fp, pem = _make_peer_pubkey() federation.register_peer("peer.example", fp, pem, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() cases = [ {"severity": "critical", "case_id": "c1"}, {"severity": "critical", "case_id": "c2"}, {"severity": "high", "case_id": "c3"}, {"severity": "low", "case_id": "c4"}, ] for c in cases: db.record_signal(dict( peer_fingerprint=fp, signal_type="case", signal_id=c["case_id"], signal_hash=f"hash-{c['case_id']}", received_at=now_iso, raw_json=json.dumps(c), )) iocs = [ {"type": "url", "value": "https://a"}, {"type": "url", "value": "https://b"}, {"type": "domain", "value": "x.com"}, {"type": "ip", "value": "1.2.3.4"}, ] for ioc in iocs: db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id=ioc["value"], signal_hash=f"hash-{ioc['value']}", received_at=now_iso, raw_json=json.dumps(ioc), )) with _no_transitive(): view = build_admin_view(include_transitive=False) stats = next(n for n in view["nodes"] if not n["is_self"])["stats"] assert stats["cases_24h"] == 4 assert stats["iocs_24h"] == 4 sev = stats["severity_breakdown"] assert sev == {"critical": 2, "high": 1, "medium": 0, "low": 1} ioc_t = stats["ioc_type_breakdown"] assert ioc_t == {"url": 2, "domain": 1, "ip": 1, "hash": 0, "cve": 0} def test_admin_view_vouch_in_out_counts(fresh_db, fed_dir): """vouches_in_count counts vouches naming this peer; out counts what they've issued.""" fp_a, pem_a = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("a.example", fp_a, pem_a, status="trusted") federation.register_peer("b.example", fp_b, pem_b, status="trusted") now = datetime.now(timezone.utc).isoformat() # A vouches for B; we vouch for B too — B sees vouches_in=2. db.upsert_vouch(dict( voucher_fingerprint=fp_a, target_fingerprint=fp_b, issued_at=now, expires_at=None, signature="x", )) federation.issue_vouch(fp_b, ttl_days=30) # B vouches for A — A sees vouches_in=1, B sees vouches_out=1. db.upsert_vouch(dict( voucher_fingerprint=fp_b, target_fingerprint=fp_a, issued_at=now, expires_at=None, signature="y", )) with _no_transitive(): view = build_admin_view(include_transitive=False) by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]} assert by_fp[fp_a]["stats"]["vouches_in_count"] == 1 assert by_fp[fp_a]["stats"]["vouches_out_count"] == 1 # vouches for B assert by_fp[fp_b]["stats"]["vouches_in_count"] == 2 # A + us assert by_fp[fp_b]["stats"]["vouches_out_count"] == 1 # vouches for A def test_admin_view_corroborated_signals(fresh_db, fed_dir): """Pairs of peers reporting the same signal_hash → corroborated entry + edge.""" fp_a, pem_a = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("a.example", fp_a, pem_a, status="trusted") federation.register_peer("b.example", fp_b, pem_b, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() for peer_fp in (fp_a, fp_b): db.record_signal(dict( peer_fingerprint=peer_fp, signal_type="ioc", signal_id="evil.com", signal_hash="shared-hash-1", received_at=now_iso, raw_json="{}", )) # A also reports a hash B doesn't — should NOT corroborate. db.record_signal(dict( peer_fingerprint=fp_a, signal_type="ioc", signal_id="solo.com", signal_hash="solo-hash", received_at=now_iso, raw_json="{}", )) with _no_transitive(): view = build_admin_view(include_transitive=False) corr = view["stats"]["corroborated_signals"] hashes = {c["signal_hash"] for c in corr} assert "shared-hash-1" in hashes assert "solo-hash" not in hashes shared = next(c for c in corr if c["signal_hash"] == "shared-hash-1") assert set(shared["peer_fingerprints"]) == {fp_a, fp_b} assert shared["peer_count"] == 2 # One corroborate edge between the pair (orientation-independent). corr_edges = [e for e in view["edges"] if e["kind"] == "corroborate"] assert len(corr_edges) == 1 pair = {corr_edges[0]["source_fingerprint"], corr_edges[0]["target_fingerprint"]} assert pair == {fp_a, fp_b} assert corr_edges[0]["weight"] == 1.0 def test_admin_view_signal_timeline_24h_returns_24_buckets(fresh_db, fed_dir): """signal_timeline_24h is a 24-bucket list with correct totals.""" fp, pem = _make_peer_pubkey() federation.register_peer("peer.example", fp, pem, status="trusted") now = datetime.now(timezone.utc) # Two signals one hour ago, three signals five hours ago. one_h = (now - timedelta(hours=1, minutes=5)).isoformat() five_h = (now - timedelta(hours=5, minutes=5)).isoformat() for i in range(2): db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id=f"a{i}", signal_hash=f"h-a-{i}", received_at=one_h, raw_json="{}", )) for i in range(3): db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id=f"b{i}", signal_hash=f"h-b-{i}", received_at=five_h, raw_json="{}", )) # Stale signal — must NOT show up. db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="stale", signal_hash="stale-hash", received_at=(now - timedelta(hours=48)).isoformat(), raw_json="{}", )) with _no_transitive(): view = build_admin_view(include_transitive=False) buckets = view["stats"]["signal_timeline_24h"] assert isinstance(buckets, list) assert len(buckets) == 24 totals = [b["total"] for b in buckets] assert sum(totals) == 5 # stale excluded # Bucket hour_offsets are 0..23 in oldest-first order. assert [b["hour_offset"] for b in buckets] == list(range(24)) def test_admin_view_quorum_contribution(fresh_db, fed_dir): """quorum_contribution counts this peer's distinct hashes that are quorum-met.""" fp_a, pem_a = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("a.example", fp_a, pem_a, status="trusted") federation.register_peer("b.example", fp_b, pem_b, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() # Shared hash → both peers report it → quorum-met (default k=2). for peer_fp in (fp_a, fp_b): db.record_signal(dict( peer_fingerprint=peer_fp, signal_type="ioc", signal_id="shared", signal_hash="quorum-hash", received_at=now_iso, raw_json="{}", )) # Solo hash from A → not quorum-met. db.record_signal(dict( peer_fingerprint=fp_a, signal_type="ioc", signal_id="solo", signal_hash="solo-hash", received_at=now_iso, raw_json="{}", )) with _no_transitive(): view = build_admin_view(include_transitive=False) by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]} assert by_fp[fp_a]["stats"]["quorum_contribution"] == 1 assert by_fp[fp_b]["stats"]["quorum_contribution"] == 1 def test_admin_view_recent_translog_per_peer(fresh_db, fed_dir): """recent_translog lists entries where entry_data.peer_fingerprint matches.""" fp_a, pem_a = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("a.example", fp_a, pem_a, status="trusted") federation.register_peer("b.example", fp_b, pem_b, status="trusted") # Append translog rows that name each peer. translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "ioc", "signal_id": "x"}) translog.append("signal", {"peer_fingerprint": fp_b, "signal_type": "case", "signal_id": "y"}) translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "case", "signal_id": "z"}) with _no_transitive(): view = build_admin_view(include_transitive=False) by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]} a_log = by_fp[fp_a]["stats"]["recent_translog"] b_log = by_fp[fp_b]["stats"]["recent_translog"] assert len(a_log) == 2 assert len(b_log) == 1 # Each row carries the documented shape. for row in a_log + b_log: assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"} def test_explore_view_omits_ioc_values_case_ids_and_raw_json(fresh_db, fed_dir): """The public explore payload must NEVER expose IOC values, case_ids, or raw_json. This is the load-bearing transparency-vs-leakage contract that lives at the network-view layer — anyone can audit who's talking to whom and how much, but never *what* they're saying. """ fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="evil-domain-do-not-leak.com", signal_hash="ioc-hash-leak", received_at=now_iso, raw_json=json.dumps({"type": "domain", "value": "evil-domain-do-not-leak.com"}), )) db.record_signal(dict( peer_fingerprint=fp, signal_type="case", signal_id="CASE-SECRET-42", signal_hash="case-hash-leak", received_at=now_iso, raw_json=json.dumps({"severity": "critical", "case_id": "CASE-SECRET-42"}), )) with patch.object(network_view, "_fetch_peer_explore", return_value=None), \ patch.object(network_view, "_fetch_peer_network", return_value=None): payload = build_explore_view() flat = json.dumps(payload, default=str) assert "evil-domain-do-not-leak.com" not in flat assert "CASE-SECRET-42" not in flat assert "raw_json" not in flat # Sector-leaking breakdowns must not appear either. assert "severity_breakdown" not in flat assert "ioc_type_breakdown" not in flat # And peer rows carry only public-safe counts. for p in payload.get("peers", []): assert "severity_breakdown" not in p assert "ioc_type_breakdown" not in p assert "recent_translog" not in p def test_public_view_still_has_no_stats(fresh_db, fed_dir): """Public payload must not surface admin-only enrichments — sensitive. Even after `build_admin_view` has been invoked (which mutates node.stats on the cached transitive view), the public view path must stay clean. """ fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") # Seed signals + corroborated hash so admin view has rich state. now_iso = datetime.now(timezone.utc).isoformat() db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="leak", signal_hash="leak-hash", received_at=now_iso, raw_json=json.dumps({"type": "url", "value": "https://leak"}), )) # Build admin view first so any caching kicks in. with _no_transitive(): build_admin_view(include_transitive=False) # Now build the public view and assert no admin-only fields leak. payload = build_public_view() flat = json.dumps(payload, default=str) assert "signals_24h" not in flat assert "severity_breakdown" not in flat assert "corroborated_signals" not in flat assert "signal_timeline_24h" not in flat assert "recent_translog" not in flat assert "leak-hash" not in flat # Peer entries in the public view never carry a `stats` field. for p in payload.get("peers", []): assert "stats" not in p