stage-netd-a network detail: enrich peer stats (signals/severity/vouches/quorum)

This commit is contained in:
m17hr1l
2026-06-07 00:52:41 +02:00
parent e33c5b41f5
commit c6c5d3b2ea

View File

@@ -29,7 +29,7 @@ import httpx
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.lines import federation
from psyc.lines import federation, translog
_log = log.get(__name__)
@@ -48,6 +48,11 @@ class NetworkNode(BaseModel):
`distance` is the topological hop count from self: 0 for self, 1 for
directly-registered peers, 2 for peers-of-peers discovered via the
transitive fetch. `status` is the trust label the UI colors by.
`stats` carries the admin-only per-peer enrichments (24h signal counts,
severity breakdown, vouch tallies, quorum contribution, etc.) and is
populated by `build_admin_view`. It stays empty in the public/local
views so the public JSON never leaks operational state.
"""
fingerprint: str
domain: Optional[str] = None
@@ -55,17 +60,19 @@ class NetworkNode(BaseModel):
status: str # "self" | "trusted" | "vouched" | "unknown" | "blocked"
is_self: bool = False
distance: int = 1
stats: Optional[Dict[str, Any]] = None
class NetworkEdge(BaseModel):
"""One edge on the federation map.
`kind` drives stroke style in the UI: vouch = solid, signal = dashed
flow with thickness ∝ weight, knows = dotted grey transitive hint.
flow with thickness ∝ weight, knows = dotted grey transitive hint,
corroborate = dotted faint accent (two peers share a signal_hash).
"""
source_fingerprint: str
target_fingerprint: str
kind: str # "vouch" | "signal" | "knows"
kind: str # "vouch" | "signal" | "knows" | "corroborate"
weight: float = 1.0
label: str = ""
bidirectional: bool = False
@@ -405,6 +412,246 @@ def build_public_view() -> Dict[str, Any]:
return payload
# ---------- admin-only enrichment helpers -------------------------------
#
# These build the rich per-peer stats the cockpit detail panel renders. They
# read directly from the federation_signals / vouches / translog tables and
# are only ever called from `build_admin_view` — the public view must stay
# slim to avoid leaking operational state to peers.
SEVERITY_LEVELS = ("critical", "high", "medium", "low")
IOC_TYPES = ("url", "domain", "ip", "hash", "cve")
SEVERITY_SCAN_LIMIT = 1000
TRANSLOG_PER_PEER_LIMIT = 10
CORROBORATED_LIMIT = 50
def _relative_time(iso_ts: str, now: datetime) -> str:
"""Compact "3m ago" / "1h ago" / "" for the tooltip + node badge."""
if not iso_ts:
return ""
try:
ts = datetime.fromisoformat(iso_ts)
except ValueError:
return ""
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
delta = now - ts
secs = int(delta.total_seconds())
if secs < 0:
return "just now"
if secs < 60:
return f"{secs}s ago"
if secs < 3600:
return f"{secs // 60}m ago"
if secs < 86400:
return f"{secs // 3600}h ago"
return f"{secs // 86400}d ago"
def _decode_raw_json(raw: Any) -> Optional[Dict[str, Any]]:
"""federation_signals.raw_json is stored as a JSON string; parse defensively."""
if not raw:
return None
if isinstance(raw, dict):
return raw
if not isinstance(raw, str):
return None
try:
v = json.loads(raw)
except Exception:
return None
return v if isinstance(v, dict) else None
def _peer_stats(
peer_fp: str,
now: datetime,
signals_24h_rows: List[Dict[str, Any]],
all_signals_for_peer_count: int,
vouches_in: int,
vouches_out: int,
quorum_contribution: int,
last_seen_iso: str,
recent_translog: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Aggregate one peer's 24h slice + tallies into the cockpit-facing dict."""
cases_24h = 0
iocs_24h = 0
severity_breakdown: Dict[str, int] = {k: 0 for k in SEVERITY_LEVELS}
ioc_type_breakdown: Dict[str, int] = {k: 0 for k in IOC_TYPES}
# We pulled rows newest-first; cap severity/ioc decoding to keep this fast.
decoded = 0
for row in signals_24h_rows:
st = row.get("signal_type") or ""
if st == "case":
cases_24h += 1
if decoded < SEVERITY_SCAN_LIMIT:
payload = _decode_raw_json(row.get("raw_json"))
if payload:
sev = str(payload.get("severity") or "").lower()
if sev in severity_breakdown:
severity_breakdown[sev] += 1
decoded += 1
elif st == "ioc":
iocs_24h += 1
if decoded < SEVERITY_SCAN_LIMIT:
payload = _decode_raw_json(row.get("raw_json"))
if payload:
t = str(payload.get("type") or "").lower()
if t in ioc_type_breakdown:
ioc_type_breakdown[t] += 1
decoded += 1
return {
"signals_24h": len(signals_24h_rows),
"signals_total": all_signals_for_peer_count,
"cases_24h": cases_24h,
"iocs_24h": iocs_24h,
"severity_breakdown": severity_breakdown,
"ioc_type_breakdown": ioc_type_breakdown,
"vouches_in_count": vouches_in,
"vouches_out_count": vouches_out,
"quorum_contribution": quorum_contribution,
"last_seen": last_seen_iso or None,
"last_seen_relative": _relative_time(last_seen_iso, now),
"recent_translog": recent_translog,
}
def _index_signals_24h(now: datetime) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]:
"""Bucket the 24h signal buffer by peer_fingerprint and return all rows.
Two return values so the caller can both walk per-peer rows and compute
cross-cutting structures (corroboration pairs, timeline buckets) in one
pass over the buffer.
"""
cutoff = (now - timedelta(hours=SIGNAL_WINDOW_HOURS)).isoformat()
by_peer: Dict[str, List[Dict[str, Any]]] = {}
fresh: List[Dict[str, Any]] = []
for row in db.recent_signals(limit=10_000):
received = str(row.get("received_at") or "")
if received < cutoff:
break
fp = row.get("peer_fingerprint") or ""
if not fp:
continue
by_peer.setdefault(fp, []).append(row)
fresh.append(row)
return by_peer, fresh
def _all_signals_by_peer_count() -> Dict[str, int]:
"""All-time count of federation_signals rows per peer_fingerprint."""
counts: Dict[str, int] = {}
# 50k cap — well above any realistic working set, and bounded so a
# runaway signal flood can't OOM the admin page render.
for row in db.recent_signals(limit=50_000):
fp = row.get("peer_fingerprint") or ""
if not fp:
continue
counts[fp] = counts.get(fp, 0) + 1
return counts
def _recent_translog_for_peer(peer_fp: str, all_entries: List[Any]) -> List[Dict[str, Any]]:
"""Up to TRANSLOG_PER_PEER_LIMIT translog rows that name this peer.
Walks the pre-fetched batch (newest first) so we make one DB roundtrip
for the whole admin view rather than one per peer.
"""
out: List[Dict[str, Any]] = []
for entry in all_entries:
data = entry.entry_data or {}
if not isinstance(data, dict):
continue
if data.get("peer_fingerprint") != peer_fp:
continue
out.append({
"id": entry.id,
"entry_type": entry.entry_type,
"timestamp": entry.timestamp,
"hash": entry.entry_hash,
})
if len(out) >= TRANSLOG_PER_PEER_LIMIT:
break
return out
def _corroborated_signals(
fresh_signals: List[Dict[str, Any]],
peer_fps: set,
) -> List[Dict[str, Any]]:
"""signal_hashes seen from ≥2 distinct known peers in last 24h.
`peer_fps` is the set of peers we render in the graph — corroboration
edges that touch peers outside it have nowhere to anchor visually, so
we drop them.
"""
by_hash: Dict[str, Dict[str, Any]] = {}
for row in fresh_signals:
h = row.get("signal_hash") or ""
if not h:
continue
fp = row.get("peer_fingerprint") or ""
if fp not in peer_fps:
continue
entry = by_hash.setdefault(h, {
"signal_hash": h,
"signal_type": row.get("signal_type") or "",
"signal_id": row.get("signal_id") or "",
"peers": set(),
})
entry["peers"].add(fp)
out: List[Dict[str, Any]] = []
for h, entry in by_hash.items():
if len(entry["peers"]) < 2:
continue
peers_sorted = sorted(entry["peers"])
out.append({
"signal_hash": h,
"signal_type": entry["signal_type"],
"signal_id": entry["signal_id"],
"peer_count": len(peers_sorted),
"peer_fingerprints": peers_sorted,
"quorum_met": federation.is_quorum_met(h),
})
# Higher peer-counts first so the UI shows the strongest corroborations on top.
out.sort(key=lambda r: r["peer_count"], reverse=True)
return out[:CORROBORATED_LIMIT]
def _signal_timeline_24h(
fresh_signals: List[Dict[str, Any]],
now: datetime,
) -> List[Dict[str, Any]]:
"""24 hourly buckets, oldest first. Each bucket: total + per-peer counts.
`hour_offset` runs 0..23 where 0 is "2324 hours ago" and 23 is the
current hour — left-to-right oldest-to-newest matches how operators
read a timeline.
"""
buckets: List[Dict[str, Any]] = [
{"hour_offset": i, "total": 0, "per_peer": {}} for i in range(24)
]
for row in fresh_signals:
try:
ts = datetime.fromisoformat(str(row.get("received_at") or ""))
except ValueError:
continue
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
hours_ago = int((now - ts).total_seconds() // 3600)
if hours_ago < 0 or hours_ago >= 24:
continue
idx = 23 - hours_ago
b = buckets[idx]
b["total"] += 1
fp = row.get("peer_fingerprint") or ""
if fp:
b["per_peer"][fp] = b["per_peer"].get(fp, 0) + 1
return buckets
# ---------- admin-only payload (data endpoint) --------------------------
def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]:
@@ -412,10 +659,115 @@ def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]:
Unlike `build_public_view`, this DOES include unknown + blocked peers
and recent signal hashes — it's only ever served behind admin auth.
Each non-self node gets a `stats` block:
* 24h signal counts (total / cases / iocs)
* severity + ioc-type breakdowns from raw_json
* vouches in/out tallies
* how many of this peer's signal_hashes are quorum-met
* last_seen ISO + relative ("3m ago")
* up to 10 recent translog rows that name them
Top-level `stats` gains:
* `corroborated_signals` — pairs of peers that share a signal_hash
in the last 24h. Drives the corroboration edges below.
* `signal_timeline_24h` — 24 hourly buckets for the bottom-of-page
timeline strip.
And the edge list gains a `kind="corroborate"` for every pair of peers
that share ≥1 signal_hash in the 24h window. Edge weight = number of
shared hashes for that pair.
"""
view = build_transitive_view() if include_transitive else build_local_view()
our_fp = view.nodes[0].fingerprint
now = datetime.now(timezone.utc)
# Pre-fetch the tables we'll query per-peer so the admin render is one
# batch of DB hits, not one-per-node.
signals_by_peer, fresh_signals = _index_signals_24h(now)
all_signal_counts = _all_signals_by_peer_count()
recent_translog_entries = translog.recent(limit=500)
# Vouch tallies per peer (in/out).
vouches_in: Dict[str, int] = {}
vouches_out: Dict[str, int] = {}
for row in db.list_vouches():
target = row.get("target_fingerprint") or ""
voucher = row.get("voucher_fingerprint") or ""
if target:
vouches_in[target] = vouches_in.get(target, 0) + 1
if voucher:
vouches_out[voucher] = vouches_out.get(voucher, 0) + 1
# Per-peer quorum contribution — distinct signal_hashes from this peer
# that are quorum-met. Cached per-hash within this build to dedupe work
# across peers reporting the same hash.
quorum_cache: Dict[str, bool] = {}
def _quorum_for_hash(h: str) -> bool:
if h in quorum_cache:
return quorum_cache[h]
v = federation.is_quorum_met(h)
quorum_cache[h] = v
return v
peer_fps: set = set()
for node in view.nodes:
if node.is_self:
continue
peer_fps.add(node.fingerprint)
peer_rows = signals_by_peer.get(node.fingerprint, [])
last_seen_iso = ""
if peer_rows:
# recent_signals returns newest-first → first row is latest.
last_seen_iso = str(peer_rows[0].get("received_at") or "")
peer_quorum_contrib = 0
seen_hashes: set = set()
for r in peer_rows:
h = r.get("signal_hash") or ""
if not h or h in seen_hashes:
continue
seen_hashes.add(h)
if _quorum_for_hash(h):
peer_quorum_contrib += 1
node.stats = _peer_stats(
peer_fp=node.fingerprint,
now=now,
signals_24h_rows=peer_rows,
all_signals_for_peer_count=all_signal_counts.get(node.fingerprint, 0),
vouches_in=vouches_in.get(node.fingerprint, 0),
vouches_out=vouches_out.get(node.fingerprint, 0),
quorum_contribution=peer_quorum_contrib,
last_seen_iso=last_seen_iso,
recent_translog=_recent_translog_for_peer(node.fingerprint, recent_translog_entries),
)
# Corroboration: pairs of rendered peers that share a signal_hash.
corroborated = _corroborated_signals(fresh_signals, peer_fps)
# Per-pair shared-hash count → corroborate edges.
pair_counts: Dict[Tuple[str, str], int] = {}
for entry in corroborated:
fps = entry["peer_fingerprints"]
for i in range(len(fps)):
for j in range(i + 1, len(fps)):
a, b = fps[i], fps[j]
key = (a, b) if a < b else (b, a)
pair_counts[key] = pair_counts.get(key, 0) + 1
for (a, b), count in pair_counts.items():
view.edges.append(NetworkEdge(
source_fingerprint=a,
target_fingerprint=b,
kind="corroborate",
weight=float(count),
label=f"{count} shared signals",
))
# Top-level stats — keep existing, layer on the new admin extras.
view.stats["corroborated_signals"] = corroborated
view.stats["signal_timeline_24h"] = _signal_timeline_24h(fresh_signals, now)
return {
"self_fingerprint": view.nodes[0].fingerprint,
"self_fingerprint": our_fp,
"nodes": [n.model_dump() for n in view.nodes],
"edges": [e.model_dump() for e in view.edges],
"stats": view.stats,