stage-netd-f network detail: tests for admin enrichment (stats/corroboration/timeline)

This commit is contained in:
m17hr1l
2026-06-07 01:00:39 +02:00
parent 980cf74b76
commit 0d9baef4c8

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
from unittest.mock import patch
@@ -11,11 +12,12 @@ import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation, network_view
from psyc.lines import federation, network_view, translog
from psyc.lines.network_view import (
NetworkEdge,
NetworkNode,
NetworkView,
build_admin_view,
build_local_view,
build_public_view,
build_transitive_view,
@@ -332,3 +334,325 @@ def test_transitive_view_skips_only_unknown_peers(fresh_db, fed_dir):
assert "trusted.example" in calls
assert "unknown.example" not in calls
# ---------- admin view: per-peer enrichment + corroboration + timeline ---
def _no_transitive():
"""patch.object helper — silence network fetches in admin-view tests."""
return patch.object(network_view, "_fetch_peer_network", return_value=None)
def test_admin_view_includes_stats_on_peer_nodes(fresh_db, fed_dir):
"""Every non-self node must carry a `stats` dict in the admin view."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
with _no_transitive():
view = build_admin_view(include_transitive=False)
self_nodes = [n for n in view["nodes"] if n["is_self"]]
peer_nodes = [n for n in view["nodes"] if not n["is_self"]]
assert len(self_nodes) == 1
assert len(peer_nodes) == 1
# Self has no stats; peers do.
assert self_nodes[0]["stats"] is None
peer_stats = peer_nodes[0]["stats"]
assert isinstance(peer_stats, dict)
for key in (
"signals_24h", "signals_total", "cases_24h", "iocs_24h",
"severity_breakdown", "ioc_type_breakdown",
"vouches_in_count", "vouches_out_count",
"quorum_contribution", "last_seen", "last_seen_relative",
"recent_translog",
):
assert key in peer_stats, f"missing {key}"
# last_seen is None when no signals have landed yet.
assert peer_stats["last_seen"] is None
assert peer_stats["last_seen_relative"] == ""
def test_admin_view_signals_24h_excludes_stale(fresh_db, fed_dir):
"""signals_24h must count only rows inside the 24h window."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
stale_iso = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
for i in range(3):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"v{i}",
signal_hash=f"h{i}",
received_at=now_iso,
raw_json="{}",
))
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="stale",
signal_hash="stale-hash",
received_at=stale_iso,
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
peer = next(n for n in view["nodes"] if not n["is_self"])
assert peer["stats"]["signals_24h"] == 3
# All-time total still sees the stale row.
assert peer["stats"]["signals_total"] == 4
# last_seen is populated and the relative is a short string.
assert peer["stats"]["last_seen"] is not None
assert peer["stats"]["last_seen_relative"] != ""
def test_admin_view_severity_and_ioc_breakdown_from_raw_json(fresh_db, fed_dir):
"""severity_breakdown is read from raw_json for case rows, ioc_type for ioc rows."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
cases = [
{"severity": "critical", "case_id": "c1"},
{"severity": "critical", "case_id": "c2"},
{"severity": "high", "case_id": "c3"},
{"severity": "low", "case_id": "c4"},
]
for c in cases:
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="case",
signal_id=c["case_id"],
signal_hash=f"hash-{c['case_id']}",
received_at=now_iso,
raw_json=json.dumps(c),
))
iocs = [
{"type": "url", "value": "https://a"},
{"type": "url", "value": "https://b"},
{"type": "domain", "value": "x.com"},
{"type": "ip", "value": "1.2.3.4"},
]
for ioc in iocs:
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=ioc["value"],
signal_hash=f"hash-{ioc['value']}",
received_at=now_iso,
raw_json=json.dumps(ioc),
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
stats = next(n for n in view["nodes"] if not n["is_self"])["stats"]
assert stats["cases_24h"] == 4
assert stats["iocs_24h"] == 4
sev = stats["severity_breakdown"]
assert sev == {"critical": 2, "high": 1, "medium": 0, "low": 1}
ioc_t = stats["ioc_type_breakdown"]
assert ioc_t == {"url": 2, "domain": 1, "ip": 1, "hash": 0, "cve": 0}
def test_admin_view_vouch_in_out_counts(fresh_db, fed_dir):
"""vouches_in_count counts vouches naming this peer; out counts what they've issued."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now = datetime.now(timezone.utc).isoformat()
# A vouches for B; we vouch for B too — B sees vouches_in=2.
db.upsert_vouch(dict(
voucher_fingerprint=fp_a,
target_fingerprint=fp_b,
issued_at=now, expires_at=None, signature="x",
))
federation.issue_vouch(fp_b, ttl_days=30)
# B vouches for A — A sees vouches_in=1, B sees vouches_out=1.
db.upsert_vouch(dict(
voucher_fingerprint=fp_b,
target_fingerprint=fp_a,
issued_at=now, expires_at=None, signature="y",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]}
assert by_fp[fp_a]["stats"]["vouches_in_count"] == 1
assert by_fp[fp_a]["stats"]["vouches_out_count"] == 1 # vouches for B
assert by_fp[fp_b]["stats"]["vouches_in_count"] == 2 # A + us
assert by_fp[fp_b]["stats"]["vouches_out_count"] == 1 # vouches for A
def test_admin_view_corroborated_signals(fresh_db, fed_dir):
"""Pairs of peers reporting the same signal_hash → corroborated entry + edge."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
for peer_fp in (fp_a, fp_b):
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id="evil.com",
signal_hash="shared-hash-1",
received_at=now_iso,
raw_json="{}",
))
# A also reports a hash B doesn't — should NOT corroborate.
db.record_signal(dict(
peer_fingerprint=fp_a,
signal_type="ioc",
signal_id="solo.com",
signal_hash="solo-hash",
received_at=now_iso,
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
corr = view["stats"]["corroborated_signals"]
hashes = {c["signal_hash"] for c in corr}
assert "shared-hash-1" in hashes
assert "solo-hash" not in hashes
shared = next(c for c in corr if c["signal_hash"] == "shared-hash-1")
assert set(shared["peer_fingerprints"]) == {fp_a, fp_b}
assert shared["peer_count"] == 2
# One corroborate edge between the pair (orientation-independent).
corr_edges = [e for e in view["edges"] if e["kind"] == "corroborate"]
assert len(corr_edges) == 1
pair = {corr_edges[0]["source_fingerprint"], corr_edges[0]["target_fingerprint"]}
assert pair == {fp_a, fp_b}
assert corr_edges[0]["weight"] == 1.0
def test_admin_view_signal_timeline_24h_returns_24_buckets(fresh_db, fed_dir):
"""signal_timeline_24h is a 24-bucket list with correct totals."""
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now = datetime.now(timezone.utc)
# Two signals one hour ago, three signals five hours ago.
one_h = (now - timedelta(hours=1, minutes=5)).isoformat()
five_h = (now - timedelta(hours=5, minutes=5)).isoformat()
for i in range(2):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"a{i}",
signal_hash=f"h-a-{i}",
received_at=one_h,
raw_json="{}",
))
for i in range(3):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"b{i}",
signal_hash=f"h-b-{i}",
received_at=five_h,
raw_json="{}",
))
# Stale signal — must NOT show up.
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="stale",
signal_hash="stale-hash",
received_at=(now - timedelta(hours=48)).isoformat(),
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
buckets = view["stats"]["signal_timeline_24h"]
assert isinstance(buckets, list)
assert len(buckets) == 24
totals = [b["total"] for b in buckets]
assert sum(totals) == 5 # stale excluded
# Bucket hour_offsets are 0..23 in oldest-first order.
assert [b["hour_offset"] for b in buckets] == list(range(24))
def test_admin_view_quorum_contribution(fresh_db, fed_dir):
"""quorum_contribution counts this peer's distinct hashes that are quorum-met."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
# Shared hash → both peers report it → quorum-met (default k=2).
for peer_fp in (fp_a, fp_b):
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id="shared",
signal_hash="quorum-hash",
received_at=now_iso,
raw_json="{}",
))
# Solo hash from A → not quorum-met.
db.record_signal(dict(
peer_fingerprint=fp_a,
signal_type="ioc",
signal_id="solo",
signal_hash="solo-hash",
received_at=now_iso,
raw_json="{}",
))
with _no_transitive():
view = build_admin_view(include_transitive=False)
by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]}
assert by_fp[fp_a]["stats"]["quorum_contribution"] == 1
assert by_fp[fp_b]["stats"]["quorum_contribution"] == 1
def test_admin_view_recent_translog_per_peer(fresh_db, fed_dir):
"""recent_translog lists entries where entry_data.peer_fingerprint matches."""
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
# Append translog rows that name each peer.
translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "ioc", "signal_id": "x"})
translog.append("signal", {"peer_fingerprint": fp_b, "signal_type": "case", "signal_id": "y"})
translog.append("signal", {"peer_fingerprint": fp_a, "signal_type": "case", "signal_id": "z"})
with _no_transitive():
view = build_admin_view(include_transitive=False)
by_fp = {n["fingerprint"]: n for n in view["nodes"] if not n["is_self"]}
a_log = by_fp[fp_a]["stats"]["recent_translog"]
b_log = by_fp[fp_b]["stats"]["recent_translog"]
assert len(a_log) == 2
assert len(b_log) == 1
# Each row carries the documented shape.
for row in a_log + b_log:
assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"}
def test_public_view_still_has_no_stats(fresh_db, fed_dir):
"""Public payload must not surface admin-only enrichments — sensitive.
Even after `build_admin_view` has been invoked (which mutates node.stats
on the cached transitive view), the public view path must stay clean.
"""
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
# Seed signals + corroborated hash so admin view has rich state.
now_iso = datetime.now(timezone.utc).isoformat()
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="leak",
signal_hash="leak-hash",
received_at=now_iso,
raw_json=json.dumps({"type": "url", "value": "https://leak"}),
))
# Build admin view first so any caching kicks in.
with _no_transitive():
build_admin_view(include_transitive=False)
# Now build the public view and assert no admin-only fields leak.
payload = build_public_view()
flat = json.dumps(payload, default=str)
assert "signals_24h" not in flat
assert "severity_breakdown" not in flat
assert "corroborated_signals" not in flat
assert "signal_timeline_24h" not in flat
assert "recent_translog" not in flat
assert "leak-hash" not in flat
# Peer entries in the public view never carry a `stats` field.
for p in payload.get("peers", []):
assert "stats" not in p