From c6c5d3b2ea0d08c72247d2b03cce9e3dedfdcee3 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sun, 7 Jun 2026 00:52:41 +0200 Subject: [PATCH] stage-netd-a network detail: enrich peer stats (signals/severity/vouches/quorum) --- src/psyc/lines/network_view.py | 360 ++++++++++++++++++++++++++++++++- 1 file changed, 356 insertions(+), 4 deletions(-) diff --git a/src/psyc/lines/network_view.py b/src/psyc/lines/network_view.py index f33fc14..d15359a 100644 --- a/src/psyc/lines/network_view.py +++ b/src/psyc/lines/network_view.py @@ -29,7 +29,7 @@ import httpx from pydantic import BaseModel, Field from psyc import db, log -from psyc.lines import federation +from psyc.lines import federation, translog _log = log.get(__name__) @@ -48,6 +48,11 @@ class NetworkNode(BaseModel): `distance` is the topological hop count from self: 0 for self, 1 for directly-registered peers, 2 for peers-of-peers discovered via the transitive fetch. `status` is the trust label the UI colors by. + + `stats` carries the admin-only per-peer enrichments (24h signal counts, + severity breakdown, vouch tallies, quorum contribution, etc.) and is + populated by `build_admin_view`. It stays empty in the public/local + views so the public JSON never leaks operational state. """ fingerprint: str domain: Optional[str] = None @@ -55,17 +60,19 @@ class NetworkNode(BaseModel): status: str # "self" | "trusted" | "vouched" | "unknown" | "blocked" is_self: bool = False distance: int = 1 + stats: Optional[Dict[str, Any]] = None class NetworkEdge(BaseModel): """One edge on the federation map. `kind` drives stroke style in the UI: vouch = solid, signal = dashed - flow with thickness ∝ weight, knows = dotted grey transitive hint. + flow with thickness ∝ weight, knows = dotted grey transitive hint, + corroborate = dotted faint accent (two peers share a signal_hash). """ source_fingerprint: str target_fingerprint: str - kind: str # "vouch" | "signal" | "knows" + kind: str # "vouch" | "signal" | "knows" | "corroborate" weight: float = 1.0 label: str = "" bidirectional: bool = False @@ -405,6 +412,246 @@ def build_public_view() -> Dict[str, Any]: return payload +# ---------- admin-only enrichment helpers ------------------------------- +# +# These build the rich per-peer stats the cockpit detail panel renders. They +# read directly from the federation_signals / vouches / translog tables and +# are only ever called from `build_admin_view` — the public view must stay +# slim to avoid leaking operational state to peers. + +SEVERITY_LEVELS = ("critical", "high", "medium", "low") +IOC_TYPES = ("url", "domain", "ip", "hash", "cve") +SEVERITY_SCAN_LIMIT = 1000 +TRANSLOG_PER_PEER_LIMIT = 10 +CORROBORATED_LIMIT = 50 + + +def _relative_time(iso_ts: str, now: datetime) -> str: + """Compact "3m ago" / "1h ago" / "—" for the tooltip + node badge.""" + if not iso_ts: + return "—" + try: + ts = datetime.fromisoformat(iso_ts) + except ValueError: + return "—" + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + delta = now - ts + secs = int(delta.total_seconds()) + if secs < 0: + return "just now" + if secs < 60: + return f"{secs}s ago" + if secs < 3600: + return f"{secs // 60}m ago" + if secs < 86400: + return f"{secs // 3600}h ago" + return f"{secs // 86400}d ago" + + +def _decode_raw_json(raw: Any) -> Optional[Dict[str, Any]]: + """federation_signals.raw_json is stored as a JSON string; parse defensively.""" + if not raw: + return None + if isinstance(raw, dict): + return raw + if not isinstance(raw, str): + return None + try: + v = json.loads(raw) + except Exception: + return None + return v if isinstance(v, dict) else None + + +def _peer_stats( + peer_fp: str, + now: datetime, + signals_24h_rows: List[Dict[str, Any]], + all_signals_for_peer_count: int, + vouches_in: int, + vouches_out: int, + quorum_contribution: int, + last_seen_iso: str, + recent_translog: List[Dict[str, Any]], +) -> Dict[str, Any]: + """Aggregate one peer's 24h slice + tallies into the cockpit-facing dict.""" + cases_24h = 0 + iocs_24h = 0 + severity_breakdown: Dict[str, int] = {k: 0 for k in SEVERITY_LEVELS} + ioc_type_breakdown: Dict[str, int] = {k: 0 for k in IOC_TYPES} + # We pulled rows newest-first; cap severity/ioc decoding to keep this fast. + decoded = 0 + for row in signals_24h_rows: + st = row.get("signal_type") or "" + if st == "case": + cases_24h += 1 + if decoded < SEVERITY_SCAN_LIMIT: + payload = _decode_raw_json(row.get("raw_json")) + if payload: + sev = str(payload.get("severity") or "").lower() + if sev in severity_breakdown: + severity_breakdown[sev] += 1 + decoded += 1 + elif st == "ioc": + iocs_24h += 1 + if decoded < SEVERITY_SCAN_LIMIT: + payload = _decode_raw_json(row.get("raw_json")) + if payload: + t = str(payload.get("type") or "").lower() + if t in ioc_type_breakdown: + ioc_type_breakdown[t] += 1 + decoded += 1 + return { + "signals_24h": len(signals_24h_rows), + "signals_total": all_signals_for_peer_count, + "cases_24h": cases_24h, + "iocs_24h": iocs_24h, + "severity_breakdown": severity_breakdown, + "ioc_type_breakdown": ioc_type_breakdown, + "vouches_in_count": vouches_in, + "vouches_out_count": vouches_out, + "quorum_contribution": quorum_contribution, + "last_seen": last_seen_iso or None, + "last_seen_relative": _relative_time(last_seen_iso, now), + "recent_translog": recent_translog, + } + + +def _index_signals_24h(now: datetime) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: + """Bucket the 24h signal buffer by peer_fingerprint and return all rows. + + Two return values so the caller can both walk per-peer rows and compute + cross-cutting structures (corroboration pairs, timeline buckets) in one + pass over the buffer. + """ + cutoff = (now - timedelta(hours=SIGNAL_WINDOW_HOURS)).isoformat() + by_peer: Dict[str, List[Dict[str, Any]]] = {} + fresh: List[Dict[str, Any]] = [] + for row in db.recent_signals(limit=10_000): + received = str(row.get("received_at") or "") + if received < cutoff: + break + fp = row.get("peer_fingerprint") or "" + if not fp: + continue + by_peer.setdefault(fp, []).append(row) + fresh.append(row) + return by_peer, fresh + + +def _all_signals_by_peer_count() -> Dict[str, int]: + """All-time count of federation_signals rows per peer_fingerprint.""" + counts: Dict[str, int] = {} + # 50k cap — well above any realistic working set, and bounded so a + # runaway signal flood can't OOM the admin page render. + for row in db.recent_signals(limit=50_000): + fp = row.get("peer_fingerprint") or "" + if not fp: + continue + counts[fp] = counts.get(fp, 0) + 1 + return counts + + +def _recent_translog_for_peer(peer_fp: str, all_entries: List[Any]) -> List[Dict[str, Any]]: + """Up to TRANSLOG_PER_PEER_LIMIT translog rows that name this peer. + + Walks the pre-fetched batch (newest first) so we make one DB roundtrip + for the whole admin view rather than one per peer. + """ + out: List[Dict[str, Any]] = [] + for entry in all_entries: + data = entry.entry_data or {} + if not isinstance(data, dict): + continue + if data.get("peer_fingerprint") != peer_fp: + continue + out.append({ + "id": entry.id, + "entry_type": entry.entry_type, + "timestamp": entry.timestamp, + "hash": entry.entry_hash, + }) + if len(out) >= TRANSLOG_PER_PEER_LIMIT: + break + return out + + +def _corroborated_signals( + fresh_signals: List[Dict[str, Any]], + peer_fps: set, +) -> List[Dict[str, Any]]: + """signal_hashes seen from ≥2 distinct known peers in last 24h. + + `peer_fps` is the set of peers we render in the graph — corroboration + edges that touch peers outside it have nowhere to anchor visually, so + we drop them. + """ + by_hash: Dict[str, Dict[str, Any]] = {} + for row in fresh_signals: + h = row.get("signal_hash") or "" + if not h: + continue + fp = row.get("peer_fingerprint") or "" + if fp not in peer_fps: + continue + entry = by_hash.setdefault(h, { + "signal_hash": h, + "signal_type": row.get("signal_type") or "", + "signal_id": row.get("signal_id") or "", + "peers": set(), + }) + entry["peers"].add(fp) + out: List[Dict[str, Any]] = [] + for h, entry in by_hash.items(): + if len(entry["peers"]) < 2: + continue + peers_sorted = sorted(entry["peers"]) + out.append({ + "signal_hash": h, + "signal_type": entry["signal_type"], + "signal_id": entry["signal_id"], + "peer_count": len(peers_sorted), + "peer_fingerprints": peers_sorted, + "quorum_met": federation.is_quorum_met(h), + }) + # Higher peer-counts first so the UI shows the strongest corroborations on top. + out.sort(key=lambda r: r["peer_count"], reverse=True) + return out[:CORROBORATED_LIMIT] + + +def _signal_timeline_24h( + fresh_signals: List[Dict[str, Any]], + now: datetime, +) -> List[Dict[str, Any]]: + """24 hourly buckets, oldest first. Each bucket: total + per-peer counts. + + `hour_offset` runs 0..23 where 0 is "23–24 hours ago" and 23 is the + current hour — left-to-right oldest-to-newest matches how operators + read a timeline. + """ + buckets: List[Dict[str, Any]] = [ + {"hour_offset": i, "total": 0, "per_peer": {}} for i in range(24) + ] + for row in fresh_signals: + try: + ts = datetime.fromisoformat(str(row.get("received_at") or "")) + except ValueError: + continue + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + hours_ago = int((now - ts).total_seconds() // 3600) + if hours_ago < 0 or hours_ago >= 24: + continue + idx = 23 - hours_ago + b = buckets[idx] + b["total"] += 1 + fp = row.get("peer_fingerprint") or "" + if fp: + b["per_peer"][fp] = b["per_peer"].get(fp, 0) + 1 + return buckets + + # ---------- admin-only payload (data endpoint) -------------------------- def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]: @@ -412,10 +659,115 @@ def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]: Unlike `build_public_view`, this DOES include unknown + blocked peers and recent signal hashes — it's only ever served behind admin auth. + + Each non-self node gets a `stats` block: + * 24h signal counts (total / cases / iocs) + * severity + ioc-type breakdowns from raw_json + * vouches in/out tallies + * how many of this peer's signal_hashes are quorum-met + * last_seen ISO + relative ("3m ago") + * up to 10 recent translog rows that name them + + Top-level `stats` gains: + * `corroborated_signals` — pairs of peers that share a signal_hash + in the last 24h. Drives the corroboration edges below. + * `signal_timeline_24h` — 24 hourly buckets for the bottom-of-page + timeline strip. + + And the edge list gains a `kind="corroborate"` for every pair of peers + that share ≥1 signal_hash in the 24h window. Edge weight = number of + shared hashes for that pair. """ view = build_transitive_view() if include_transitive else build_local_view() + our_fp = view.nodes[0].fingerprint + now = datetime.now(timezone.utc) + + # Pre-fetch the tables we'll query per-peer so the admin render is one + # batch of DB hits, not one-per-node. + signals_by_peer, fresh_signals = _index_signals_24h(now) + all_signal_counts = _all_signals_by_peer_count() + recent_translog_entries = translog.recent(limit=500) + + # Vouch tallies per peer (in/out). + vouches_in: Dict[str, int] = {} + vouches_out: Dict[str, int] = {} + for row in db.list_vouches(): + target = row.get("target_fingerprint") or "" + voucher = row.get("voucher_fingerprint") or "" + if target: + vouches_in[target] = vouches_in.get(target, 0) + 1 + if voucher: + vouches_out[voucher] = vouches_out.get(voucher, 0) + 1 + + # Per-peer quorum contribution — distinct signal_hashes from this peer + # that are quorum-met. Cached per-hash within this build to dedupe work + # across peers reporting the same hash. + quorum_cache: Dict[str, bool] = {} + + def _quorum_for_hash(h: str) -> bool: + if h in quorum_cache: + return quorum_cache[h] + v = federation.is_quorum_met(h) + quorum_cache[h] = v + return v + + peer_fps: set = set() + for node in view.nodes: + if node.is_self: + continue + peer_fps.add(node.fingerprint) + peer_rows = signals_by_peer.get(node.fingerprint, []) + last_seen_iso = "" + if peer_rows: + # recent_signals returns newest-first → first row is latest. + last_seen_iso = str(peer_rows[0].get("received_at") or "") + peer_quorum_contrib = 0 + seen_hashes: set = set() + for r in peer_rows: + h = r.get("signal_hash") or "" + if not h or h in seen_hashes: + continue + seen_hashes.add(h) + if _quorum_for_hash(h): + peer_quorum_contrib += 1 + node.stats = _peer_stats( + peer_fp=node.fingerprint, + now=now, + signals_24h_rows=peer_rows, + all_signals_for_peer_count=all_signal_counts.get(node.fingerprint, 0), + vouches_in=vouches_in.get(node.fingerprint, 0), + vouches_out=vouches_out.get(node.fingerprint, 0), + quorum_contribution=peer_quorum_contrib, + last_seen_iso=last_seen_iso, + recent_translog=_recent_translog_for_peer(node.fingerprint, recent_translog_entries), + ) + + # Corroboration: pairs of rendered peers that share a signal_hash. + corroborated = _corroborated_signals(fresh_signals, peer_fps) + # Per-pair shared-hash count → corroborate edges. + pair_counts: Dict[Tuple[str, str], int] = {} + for entry in corroborated: + fps = entry["peer_fingerprints"] + for i in range(len(fps)): + for j in range(i + 1, len(fps)): + a, b = fps[i], fps[j] + key = (a, b) if a < b else (b, a) + pair_counts[key] = pair_counts.get(key, 0) + 1 + for (a, b), count in pair_counts.items(): + view.edges.append(NetworkEdge( + source_fingerprint=a, + target_fingerprint=b, + kind="corroborate", + weight=float(count), + label=f"{count} shared signals", + )) + + # Top-level stats — keep existing, layer on the new admin extras. + view.stats["corroborated_signals"] = corroborated + view.stats["signal_timeline_24h"] = _signal_timeline_24h(fresh_signals, now) + return { - "self_fingerprint": view.nodes[0].fingerprint, + "self_fingerprint": our_fp, "nodes": [n.model_dump() for n in view.nodes], "edges": [e.model_dump() for e in view.edges], "stats": view.stats,