Files
psyc/tests/test_network_view.py

659 lines
25 KiB
Python

"""Network view — local + transitive + public payload tests."""
from __future__ import annotations
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
from unittest.mock import patch
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation, network_view, translog
from psyc.lines.network_view import (
NetworkEdge,
NetworkNode,
NetworkView,
build_admin_view,
build_local_view,
build_public_view,
build_transitive_view,
)
# ---------- fixtures ----------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
@pytest.fixture(autouse=True)
def reset_transitive_cache(monkeypatch):
"""Prevent cache bleed between tests."""
monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None})
yield
def _make_peer_pubkey() -> tuple[str, str]:
"""Return (fingerprint, pubkey_pem) for a synthetic peer keypair."""
import hashlib
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
priv = ed25519.Ed25519PrivateKey.generate()
pub = priv.public_key()
pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
raw = pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
fp = hashlib.sha256(raw).digest()[:16].hex()
return fp, pem
# ---------- local view --------------------------------------------------
def test_local_view_empty_registry_yields_only_self(fresh_db, fed_dir):
view = build_local_view()
assert isinstance(view, NetworkView)
assert len(view.nodes) == 1
self_node = view.nodes[0]
assert self_node.is_self is True
assert self_node.distance == 0
assert self_node.status == "self"
assert self_node.fingerprint == federation.node_fingerprint()
assert view.edges == []
assert view.stats["total_peers"] == 0
assert view.stats["vouched_peers"] == 0
assert view.stats["signals_buffered_24h"] == 0
def test_local_view_one_trusted_peer_no_edges(fresh_db, fed_dir):
peer_fp, peer_pem = _make_peer_pubkey()
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
view = build_local_view()
assert len(view.nodes) == 2
peer_node = next(n for n in view.nodes if not n.is_self)
assert peer_node.fingerprint == peer_fp
assert peer_node.status == "trusted"
assert peer_node.distance == 1
assert peer_node.domain == "peer.example"
assert view.edges == []
assert view.stats["total_peers"] == 1
assert view.stats["vouched_peers"] == 1
def test_local_view_outbound_vouch_creates_edge(fresh_db, fed_dir):
peer_fp, peer_pem = _make_peer_pubkey()
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
federation.issue_vouch(peer_fp, ttl_days=30)
view = build_local_view()
vouch_edges = [e for e in view.edges if e.kind == "vouch"]
assert len(vouch_edges) == 1
e = vouch_edges[0]
assert e.source_fingerprint == federation.node_fingerprint()
assert e.target_fingerprint == peer_fp
assert view.stats["vouches_issued"] == 1
def test_local_view_inbound_vouch_creates_edge(fresh_db, fed_dir):
"""Vouches received that name us as target → peer → self edge."""
peer_fp, peer_pem = _make_peer_pubkey()
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
# Insert a vouch where peer vouches FOR us, bypassing accept_vouch (which
# we don't need to exercise here — the question is render shape).
our_fp = federation.node_fingerprint()
now = datetime.now(timezone.utc)
db.upsert_vouch(dict(
voucher_fingerprint=peer_fp,
target_fingerprint=our_fp,
issued_at=now.isoformat(),
expires_at=(now + timedelta(days=30)).isoformat(),
signature="x" * 88,
))
view = build_local_view()
vouch_edges = [e for e in view.edges if e.kind == "vouch"]
assert len(vouch_edges) == 1
e = vouch_edges[0]
assert e.source_fingerprint == peer_fp
assert e.target_fingerprint == our_fp
def test_local_view_bidirectional_vouches_collapse(fresh_db, fed_dir):
peer_fp, peer_pem = _make_peer_pubkey()
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
federation.issue_vouch(peer_fp, ttl_days=30)
# And peer vouches back at us.
our_fp = federation.node_fingerprint()
now = datetime.now(timezone.utc)
db.upsert_vouch(dict(
voucher_fingerprint=peer_fp,
target_fingerprint=our_fp,
issued_at=now.isoformat(),
expires_at=(now + timedelta(days=30)).isoformat(),
signature="x" * 88,
))
view = build_local_view()
vouch_edges = [e for e in view.edges if e.kind == "vouch"]
assert len(vouch_edges) == 1
assert vouch_edges[0].bidirectional is True
def test_local_view_signal_edge_weight_is_24h_count(fresh_db, fed_dir):
peer_fp, peer_pem = _make_peer_pubkey()
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
# Three signals from this peer within the window.
for i in range(3):
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id=f"1.2.3.{i}",
signal_hash=f"hash-{i}",
received_at=now_iso,
raw_json="{}",
))
# One stale signal outside the window — must be ignored.
stale = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id="9.9.9.9",
signal_hash="stale",
received_at=stale,
raw_json="{}",
))
view = build_local_view()
sig_edges = [e for e in view.edges if e.kind == "signal"]
assert len(sig_edges) == 1
assert sig_edges[0].weight == 3.0
assert sig_edges[0].source_fingerprint == peer_fp
assert sig_edges[0].target_fingerprint == federation.node_fingerprint()
assert view.stats["signals_buffered_24h"] == 3
assert view.stats["distinct_signal_hashes_24h"] == 3
def test_local_view_blocked_peer_renders_with_blocked_status(fresh_db, fed_dir):
fp, pem = _make_peer_pubkey()
federation.register_peer("blocked.example", fp, pem, status="blocked")
view = build_local_view()
peer = next(n for n in view.nodes if not n.is_self)
assert peer.status == "blocked"
# ---------- public view + signature round-trip --------------------------
def test_public_view_excludes_unknown_and_blocked(fresh_db, fed_dir):
fp_t, pem_t = _make_peer_pubkey()
fp_u, pem_u = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("trusted.example", fp_t, pem_t, status="trusted")
federation.register_peer("unknown.example", fp_u, pem_u, status="unknown")
federation.register_peer("blocked.example", fp_b, pem_b, status="blocked")
payload = build_public_view()
fps = {p["fingerprint"] for p in payload["peers"]}
assert fp_t in fps
assert fp_u not in fps
assert fp_b not in fps
def test_public_view_signature_round_trip(fresh_db, fed_dir):
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
federation.issue_vouch(fp, ttl_days=30)
payload = build_public_view()
assert "signature" in payload
assert payload["fingerprint"] == federation.node_fingerprint()
sig = base64.b64decode(payload["signature"])
unsigned = {k: v for k, v in payload.items() if k != "signature"}
assert federation.verify_payload(
federation.canonical_json(unsigned),
sig,
federation.public_key_pem(),
) is True
# Vouch we issued is in the payload.
targets = {v["target_fingerprint"] for v in payload["vouches"]}
assert fp in targets
def test_public_view_omits_signals(fresh_db, fed_dir):
"""Public payload must not leak who's reporting what."""
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="secret-hash",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
payload = build_public_view()
# No signal-shaped fields anywhere in the payload.
flat = str(payload)
assert "secret-hash" not in flat
assert "signals" not in payload
# ---------- transitive view ---------------------------------------------
def test_transitive_view_adds_distance_2_nodes(fresh_db, fed_dir):
direct_fp, direct_pem = _make_peer_pubkey()
federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted")
# The peer reports two peers of its own.
far_fp_a, _ = _make_peer_pubkey()
far_fp_b, _ = _make_peer_pubkey()
fake_payload: Dict[str, Any] = {
"fingerprint": direct_fp,
"peers": [
{"fingerprint": far_fp_a, "domain": "far-a.example"},
{"fingerprint": far_fp_b, "domain": "far-b.example"},
],
"vouches": [],
}
with patch.object(network_view, "_fetch_peer_network", return_value=fake_payload):
view = build_transitive_view(force_refresh=True)
distances = sorted(n.distance for n in view.nodes)
assert 0 in distances and 1 in distances and 2 in distances
transitive_fps = {n.fingerprint for n in view.nodes if n.distance == 2}
assert far_fp_a in transitive_fps
assert far_fp_b in transitive_fps
# "knows" edges from direct peer to each transitive.
knows = [e for e in view.edges if e.kind == "knows"]
assert len(knows) == 2
assert all(e.source_fingerprint == direct_fp for e in knows)
assert view.stats["transitive_nodes"] == 2
def test_transitive_view_failed_fetch_does_not_abort(fresh_db, fed_dir):
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("peer-a.example", fp_a, pem_a, status="trusted")
federation.register_peer("peer-b.example", fp_b, pem_b, status="trusted")
far_fp, _ = _make_peer_pubkey()
def fake_fetch(domain, timeout=4.0):
if domain == "peer-a.example":
return None # simulate a fetch failure
return {
"fingerprint": fp_b,
"peers": [{"fingerprint": far_fp, "domain": "far.example"}],
"vouches": [],
}
with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch):
view = build_transitive_view(force_refresh=True)
# Direct nodes both present, transitive only from B.
assert any(n.fingerprint == fp_a for n in view.nodes)
assert any(n.fingerprint == fp_b for n in view.nodes)
assert any(n.fingerprint == far_fp and n.distance == 2 for n in view.nodes)
assert view.stats["transitive_nodes"] == 1
def test_transitive_view_skips_only_unknown_peers(fresh_db, fed_dir):
"""Unknown peers shouldn't be queried — fetcher only called for trusted."""
fp_unknown, pem_u = _make_peer_pubkey()
fp_trusted, pem_t = _make_peer_pubkey()
federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown")
federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted")
calls = []
def fake_fetch(domain, timeout=4.0):
calls.append(domain)
return None
with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch):
build_transitive_view(force_refresh=True)
assert "trusted.example" in calls
assert "unknown.example" not in calls
# ---------- admin view: per-peer enrichment + corroboration + timeline ---
def _no_transitive():
"""patch.object helper — silence network fetches in admin-view tests."""
return patch.object(network_view, "_fetch_peer_network", return_value=None)
def test_admin_view_includes_stats_on_peer_nodes(fresh_db, fed_dir):
"""Every non-self node must carry a `stats` dict in the admin view."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
with _no_transitive():
view = build_admin_view(include_transitive=False)
self_nodes = [n for n in view["nodes"] if n["is_self"]]
peer_nodes = [n for n in view["nodes"] if not n["is_self"]]
assert len(self_nodes) == 1
assert len(peer_nodes) == 1
# Self has no stats; peers do.
assert self_nodes[0]["stats"] is None
peer_stats = peer_nodes[0]["stats"]
assert isinstance(peer_stats, dict)
for key in (
"signals_24h", "signals_total", "cases_24h", "iocs_24h",
"severity_breakdown", "ioc_type_breakdown",
"vouches_in_count", "vouches_out_count",
"quorum_contribution", "last_seen", "last_seen_relative",
"recent_translog",
):
assert key in peer_stats, f"missing {key}"
# last_seen is None when no signals have landed yet.
assert peer_stats["last_seen"] is None
assert peer_stats["last_seen_relative"] == ""
def test_admin_view_signals_24h_excludes_stale(fresh_db, fed_dir):
"""signals_24h must count only rows inside the 24h window."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
stale_iso = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
for i in range(3):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"v{i}",
signal_hash=f"h{i}",
received_at=now_iso,
raw_json="{}",
))
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="stale",
signal_hash="stale-hash",
received_at=stale_iso,
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
peer = next(n for n in view["nodes"] if not n["is_self"])
assert peer["stats"]["signals_24h"] == 3
# All-time total still sees the stale row.
assert peer["stats"]["signals_total"] == 4
# last_seen is populated and the relative is a short string.
assert peer["stats"]["last_seen"] is not None
assert peer["stats"]["last_seen_relative"] != ""
def test_admin_view_severity_and_ioc_breakdown_from_raw_json(fresh_db, fed_dir):
"""severity_breakdown is read from raw_json for case rows, ioc_type for ioc rows."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
cases = [
{"severity": "critical", "case_id": "c1"},
{"severity": "critical", "case_id": "c2"},
{"severity": "high", "case_id": "c3"},
{"severity": "low", "case_id": "c4"},
]
for c in cases:
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="case",
signal_id=c["case_id"],
signal_hash=f"hash-{c['case_id']}",
received_at=now_iso,
raw_json=json.dumps(c),
))
iocs = [
{"type": "url", "value": "https://a"},
{"type": "url", "value": "https://b"},
{"type": "domain", "value": "x.com"},
{"type": "ip", "value": "1.2.3.4"},
]
for ioc in iocs:
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=ioc["value"],
signal_hash=f"hash-{ioc['value']}",
received_at=now_iso,
raw_json=json.dumps(ioc),
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
stats = next(n for n in view["nodes"] if not n["is_self"])["stats"]
assert stats["cases_24h"] == 4
assert stats["iocs_24h"] == 4
sev = stats["severity_breakdown"]
assert sev == {"critical": 2, "high": 1, "medium": 0, "low": 1}
ioc_t = stats["ioc_type_breakdown"]
assert ioc_t == {"url": 2, "domain": 1, "ip": 1, "hash": 0, "cve": 0}
def test_admin_view_vouch_in_out_counts(fresh_db, fed_dir):
"""vouches_in_count counts vouches naming this peer; out counts what they've issued."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now = datetime.now(timezone.utc).isoformat()
# A vouches for B; we vouch for B too — B sees vouches_in=2.
db.upsert_vouch(dict(
voucher_fingerprint=fp_a,
target_fingerprint=fp_b,
issued_at=now, expires_at=None, signature="x",
))
federation.issue_vouch(fp_b, ttl_days=30)
# B vouches for A — A sees vouches_in=1, B sees vouches_out=1.
db.upsert_vouch(dict(
voucher_fingerprint=fp_b,
target_fingerprint=fp_a,
issued_at=now, expires_at=None, signature="y",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]}
assert by_fp[fp_a]["stats"]["vouches_in_count"] == 1
assert by_fp[fp_a]["stats"]["vouches_out_count"] == 1 # vouches for B
assert by_fp[fp_b]["stats"]["vouches_in_count"] == 2 # A + us
assert by_fp[fp_b]["stats"]["vouches_out_count"] == 1 # vouches for A
def test_admin_view_corroborated_signals(fresh_db, fed_dir):
"""Pairs of peers reporting the same signal_hash → corroborated entry + edge."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
for peer_fp in (fp_a, fp_b):
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id="evil.com",
signal_hash="shared-hash-1",
received_at=now_iso,
raw_json="{}",
))
# A also reports a hash B doesn't — should NOT corroborate.
db.record_signal(dict(
peer_fingerprint=fp_a,
signal_type="ioc",
signal_id="solo.com",
signal_hash="solo-hash",
received_at=now_iso,
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
corr = view["stats"]["corroborated_signals"]
hashes = {c["signal_hash"] for c in corr}
assert "shared-hash-1" in hashes
assert "solo-hash" not in hashes
shared = next(c for c in corr if c["signal_hash"] == "shared-hash-1")
assert set(shared["peer_fingerprints"]) == {fp_a, fp_b}
assert shared["peer_count"] == 2
# One corroborate edge between the pair (orientation-independent).
corr_edges = [e for e in view["edges"] if e["kind"] == "corroborate"]
assert len(corr_edges) == 1
pair = {corr_edges[0]["source_fingerprint"], corr_edges[0]["target_fingerprint"]}
assert pair == {fp_a, fp_b}
assert corr_edges[0]["weight"] == 1.0
def test_admin_view_signal_timeline_24h_returns_24_buckets(fresh_db, fed_dir):
"""signal_timeline_24h is a 24-bucket list with correct totals."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now = datetime.now(timezone.utc)
# Two signals one hour ago, three signals five hours ago.
one_h = (now - timedelta(hours=1, minutes=5)).isoformat()
five_h = (now - timedelta(hours=5, minutes=5)).isoformat()
for i in range(2):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"a{i}",
signal_hash=f"h-a-{i}",
received_at=one_h,
raw_json="{}",
))
for i in range(3):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"b{i}",
signal_hash=f"h-b-{i}",
received_at=five_h,
raw_json="{}",
))
# Stale signal — must NOT show up.
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="stale",
signal_hash="stale-hash",
received_at=(now - timedelta(hours=48)).isoformat(),
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
buckets = view["stats"]["signal_timeline_24h"]
assert isinstance(buckets, list)
assert len(buckets) == 24
totals = [b["total"] for b in buckets]
assert sum(totals) == 5 # stale excluded
# Bucket hour_offsets are 0..23 in oldest-first order.
assert [b["hour_offset"] for b in buckets] == list(range(24))
def test_admin_view_quorum_contribution(fresh_db, fed_dir):
"""quorum_contribution counts this peer's distinct hashes that are quorum-met."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
# Shared hash → both peers report it → quorum-met (default k=2).
for peer_fp in (fp_a, fp_b):
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id="shared",
signal_hash="quorum-hash",
received_at=now_iso,
raw_json="{}",
))
# Solo hash from A → not quorum-met.
db.record_signal(dict(
peer_fingerprint=fp_a,
signal_type="ioc",
signal_id="solo",
signal_hash="solo-hash",
received_at=now_iso,
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]}
assert by_fp[fp_a]["stats"]["quorum_contribution"] == 1
assert by_fp[fp_b]["stats"]["quorum_contribution"] == 1
def test_admin_view_recent_translog_per_peer(fresh_db, fed_dir):
"""recent_translog lists entries where entry_data.peer_fingerprint matches."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
# Append translog rows that name each peer.
translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "ioc", "signal_id": "x"})
translog.append("signal", {"peer_fingerprint": fp_b, "signal_type": "case", "signal_id": "y"})
translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "case", "signal_id": "z"})
with _no_transitive():
view = build_admin_view(include_transitive=False)
by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]}
a_log = by_fp[fp_a]["stats"]["recent_translog"]
b_log = by_fp[fp_b]["stats"]["recent_translog"]
assert len(a_log) == 2
assert len(b_log) == 1
# Each row carries the documented shape.
for row in a_log + b_log:
assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"}
def test_public_view_still_has_no_stats(fresh_db, fed_dir):
"""Public payload must not surface admin-only enrichments — sensitive.
Even after `build_admin_view` has been invoked (which mutates node.stats
on the cached transitive view), the public view path must stay clean.
"""
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
# Seed signals + corroborated hash so admin view has rich state.
now_iso = datetime.now(timezone.utc).isoformat()
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="leak",
signal_hash="leak-hash",
received_at=now_iso,
raw_json=json.dumps({"type": "url", "value": "https://leak"}),
))
# Build admin view first so any caching kicks in.
with _no_transitive():
build_admin_view(include_transitive=False)
# Now build the public view and assert no admin-only fields leak.
payload = build_public_view()
flat = json.dumps(payload, default=str)
assert "signals_24h" not in flat
assert "severity_breakdown" not in flat
assert "corroborated_signals" not in flat
assert "signal_timeline_24h" not in flat
assert "recent_translog" not in flat
assert "leak-hash" not in flat
# Peer entries in the public view never carry a `stats` field.
for p in payload.get("peers", []):
assert "stats" not in p