diff --git a/tests/test_network_view.py b/tests/test_network_view.py index 152da47..49fde86 100644 --- a/tests/test_network_view.py +++ b/tests/test_network_view.py @@ -3,6 +3,7 @@ from __future__ import annotations import base64 +import json from datetime import datetime, timedelta, timezone from typing import Any, Dict from unittest.mock import patch @@ -11,11 +12,12 @@ import pytest from sqlalchemy import create_engine from psyc import db -from psyc.lines import federation, network_view +from psyc.lines import federation, network_view, translog from psyc.lines.network_view import ( NetworkEdge, NetworkNode, NetworkView, + build_admin_view, build_local_view, build_public_view, build_transitive_view, @@ -332,3 +334,325 @@ def test_transitive_view_skips_only_unknown_peers(fresh_db, fed_dir): assert "trusted.example" in calls assert "unknown.example" not in calls + + +# ---------- admin view: per-peer enrichment + corroboration + timeline --- + +def _no_transitive(): + """patch.object helper — silence network fetches in admin-view tests.""" + return patch.object(network_view, "_fetch_peer_network", return_value=None) + + +def test_admin_view_includes_stats_on_peer_nodes(fresh_db, fed_dir): + """Every non-self node must carry a `stats` dict in the admin view.""" + fp, pem = _make_peer_pubkey() + federation.register_peer("peer.example", fp, pem, status="trusted") + with _no_transitive(): + view = build_admin_view(include_transitive=False) + self_nodes = [n for n in view["nodes"] if n["is_self"]] + peer_nodes = [n for n in view["nodes"] if not n["is_self"]] + assert len(self_nodes) == 1 + assert len(peer_nodes) == 1 + # Self has no stats; peers do. + assert self_nodes[0]["stats"] is None + peer_stats = peer_nodes[0]["stats"] + assert isinstance(peer_stats, dict) + for key in ( + "signals_24h", "signals_total", "cases_24h", "iocs_24h", + "severity_breakdown", "ioc_type_breakdown", + "vouches_in_count", "vouches_out_count", + "quorum_contribution", "last_seen", "last_seen_relative", + "recent_translog", + ): + assert key in peer_stats, f"missing {key}" + # last_seen is None when no signals have landed yet. + assert peer_stats["last_seen"] is None + assert peer_stats["last_seen_relative"] == "—" + + +def test_admin_view_signals_24h_excludes_stale(fresh_db, fed_dir): + """signals_24h must count only rows inside the 24h window.""" + fp, pem = _make_peer_pubkey() + federation.register_peer("peer.example", fp, pem, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + stale_iso = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat() + for i in range(3): + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id=f"v{i}", + signal_hash=f"h{i}", + received_at=now_iso, + raw_json="{}", + )) + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="stale", + signal_hash="stale-hash", + received_at=stale_iso, + raw_json="{}", + )) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + peer = next(n for n in view["nodes"] if not n["is_self"]) + assert peer["stats"]["signals_24h"] == 3 + # All-time total still sees the stale row. + assert peer["stats"]["signals_total"] == 4 + # last_seen is populated and the relative is a short string. + assert peer["stats"]["last_seen"] is not None + assert peer["stats"]["last_seen_relative"] != "—" + + +def test_admin_view_severity_and_ioc_breakdown_from_raw_json(fresh_db, fed_dir): + """severity_breakdown is read from raw_json for case rows, ioc_type for ioc rows.""" + fp, pem = _make_peer_pubkey() + federation.register_peer("peer.example", fp, pem, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + cases = [ + {"severity": "critical", "case_id": "c1"}, + {"severity": "critical", "case_id": "c2"}, + {"severity": "high", "case_id": "c3"}, + {"severity": "low", "case_id": "c4"}, + ] + for c in cases: + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="case", + signal_id=c["case_id"], + signal_hash=f"hash-{c['case_id']}", + received_at=now_iso, + raw_json=json.dumps(c), + )) + iocs = [ + {"type": "url", "value": "https://a"}, + {"type": "url", "value": "https://b"}, + {"type": "domain", "value": "x.com"}, + {"type": "ip", "value": "1.2.3.4"}, + ] + for ioc in iocs: + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id=ioc["value"], + signal_hash=f"hash-{ioc['value']}", + received_at=now_iso, + raw_json=json.dumps(ioc), + )) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + stats = next(n for n in view["nodes"] if not n["is_self"])["stats"] + assert stats["cases_24h"] == 4 + assert stats["iocs_24h"] == 4 + sev = stats["severity_breakdown"] + assert sev == {"critical": 2, "high": 1, "medium": 0, "low": 1} + ioc_t = stats["ioc_type_breakdown"] + assert ioc_t == {"url": 2, "domain": 1, "ip": 1, "hash": 0, "cve": 0} + + +def test_admin_view_vouch_in_out_counts(fresh_db, fed_dir): + """vouches_in_count counts vouches naming this peer; out counts what they've issued.""" + fp_a, pem_a = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("a.example", fp_a, pem_a, status="trusted") + federation.register_peer("b.example", fp_b, pem_b, status="trusted") + now = datetime.now(timezone.utc).isoformat() + # A vouches for B; we vouch for B too — B sees vouches_in=2. + db.upsert_vouch(dict( + voucher_fingerprint=fp_a, + target_fingerprint=fp_b, + issued_at=now, expires_at=None, signature="x", + )) + federation.issue_vouch(fp_b, ttl_days=30) + # B vouches for A — A sees vouches_in=1, B sees vouches_out=1. + db.upsert_vouch(dict( + voucher_fingerprint=fp_b, + target_fingerprint=fp_a, + issued_at=now, expires_at=None, signature="y", + )) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]} + assert by_fp[fp_a]["stats"]["vouches_in_count"] == 1 + assert by_fp[fp_a]["stats"]["vouches_out_count"] == 1 # vouches for B + assert by_fp[fp_b]["stats"]["vouches_in_count"] == 2 # A + us + assert by_fp[fp_b]["stats"]["vouches_out_count"] == 1 # vouches for A + + +def test_admin_view_corroborated_signals(fresh_db, fed_dir): + """Pairs of peers reporting the same signal_hash → corroborated entry + edge.""" + fp_a, pem_a = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("a.example", fp_a, pem_a, status="trusted") + federation.register_peer("b.example", fp_b, pem_b, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + for peer_fp in (fp_a, fp_b): + db.record_signal(dict( + peer_fingerprint=peer_fp, + signal_type="ioc", + signal_id="evil.com", + signal_hash="shared-hash-1", + received_at=now_iso, + raw_json="{}", + )) + # A also reports a hash B doesn't — should NOT corroborate. + db.record_signal(dict( + peer_fingerprint=fp_a, + signal_type="ioc", + signal_id="solo.com", + signal_hash="solo-hash", + received_at=now_iso, + raw_json="{}", + )) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + corr = view["stats"]["corroborated_signals"] + hashes = {c["signal_hash"] for c in corr} + assert "shared-hash-1" in hashes + assert "solo-hash" not in hashes + shared = next(c for c in corr if c["signal_hash"] == "shared-hash-1") + assert set(shared["peer_fingerprints"]) == {fp_a, fp_b} + assert shared["peer_count"] == 2 + + # One corroborate edge between the pair (orientation-independent). + corr_edges = [e for e in view["edges"] if e["kind"] == "corroborate"] + assert len(corr_edges) == 1 + pair = {corr_edges[0]["source_fingerprint"], corr_edges[0]["target_fingerprint"]} + assert pair == {fp_a, fp_b} + assert corr_edges[0]["weight"] == 1.0 + + +def test_admin_view_signal_timeline_24h_returns_24_buckets(fresh_db, fed_dir): + """signal_timeline_24h is a 24-bucket list with correct totals.""" + fp, pem = _make_peer_pubkey() + federation.register_peer("peer.example", fp, pem, status="trusted") + now = datetime.now(timezone.utc) + # Two signals one hour ago, three signals five hours ago. + one_h = (now - timedelta(hours=1, minutes=5)).isoformat() + five_h = (now - timedelta(hours=5, minutes=5)).isoformat() + for i in range(2): + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id=f"a{i}", + signal_hash=f"h-a-{i}", + received_at=one_h, + raw_json="{}", + )) + for i in range(3): + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id=f"b{i}", + signal_hash=f"h-b-{i}", + received_at=five_h, + raw_json="{}", + )) + # Stale signal — must NOT show up. + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="stale", + signal_hash="stale-hash", + received_at=(now - timedelta(hours=48)).isoformat(), + raw_json="{}", + )) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + buckets = view["stats"]["signal_timeline_24h"] + assert isinstance(buckets, list) + assert len(buckets) == 24 + totals = [b["total"] for b in buckets] + assert sum(totals) == 5 # stale excluded + # Bucket hour_offsets are 0..23 in oldest-first order. + assert [b["hour_offset"] for b in buckets] == list(range(24)) + + +def test_admin_view_quorum_contribution(fresh_db, fed_dir): + """quorum_contribution counts this peer's distinct hashes that are quorum-met.""" + fp_a, pem_a = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("a.example", fp_a, pem_a, status="trusted") + federation.register_peer("b.example", fp_b, pem_b, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + # Shared hash → both peers report it → quorum-met (default k=2). + for peer_fp in (fp_a, fp_b): + db.record_signal(dict( + peer_fingerprint=peer_fp, + signal_type="ioc", + signal_id="shared", + signal_hash="quorum-hash", + received_at=now_iso, + raw_json="{}", + )) + # Solo hash from A → not quorum-met. + db.record_signal(dict( + peer_fingerprint=fp_a, + signal_type="ioc", + signal_id="solo", + signal_hash="solo-hash", + received_at=now_iso, + raw_json="{}", + )) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]} + assert by_fp[fp_a]["stats"]["quorum_contribution"] == 1 + assert by_fp[fp_b]["stats"]["quorum_contribution"] == 1 + + +def test_admin_view_recent_translog_per_peer(fresh_db, fed_dir): + """recent_translog lists entries where entry_data.peer_fingerprint matches.""" + fp_a, pem_a = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("a.example", fp_a, pem_a, status="trusted") + federation.register_peer("b.example", fp_b, pem_b, status="trusted") + # Append translog rows that name each peer. + translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "ioc", "signal_id": "x"}) + translog.append("signal", {"peer_fingerprint": fp_b, "signal_type": "case", "signal_id": "y"}) + translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "case", "signal_id": "z"}) + with _no_transitive(): + view = build_admin_view(include_transitive=False) + by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]} + a_log = by_fp[fp_a]["stats"]["recent_translog"] + b_log = by_fp[fp_b]["stats"]["recent_translog"] + assert len(a_log) == 2 + assert len(b_log) == 1 + # Each row carries the documented shape. + for row in a_log + b_log: + assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"} + + +def test_public_view_still_has_no_stats(fresh_db, fed_dir): + """Public payload must not surface admin-only enrichments — sensitive. + + Even after `build_admin_view` has been invoked (which mutates node.stats + on the cached transitive view), the public view path must stay clean. + """ + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + # Seed signals + corroborated hash so admin view has rich state. + now_iso = datetime.now(timezone.utc).isoformat() + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="leak", + signal_hash="leak-hash", + received_at=now_iso, + raw_json=json.dumps({"type": "url", "value": "https://leak"}), + )) + # Build admin view first so any caching kicks in. + with _no_transitive(): + build_admin_view(include_transitive=False) + # Now build the public view and assert no admin-only fields leak. + payload = build_public_view() + flat = json.dumps(payload, default=str) + assert "signals_24h" not in flat + assert "severity_breakdown" not in flat + assert "corroborated_signals" not in flat + assert "signal_timeline_24h" not in flat + assert "recent_translog" not in flat + assert "leak-hash" not in flat + # Peer entries in the public view never carry a `stats` field. + for p in payload.get("peers", []): + assert "stats" not in p