diff --git a/src/psyc/lines/network_view.py b/src/psyc/lines/network_view.py new file mode 100644 index 0000000..f33fc14 --- /dev/null +++ b/src/psyc/lines/network_view.py @@ -0,0 +1,423 @@ +"""Network view — federation graph: self + peers + edges (vouches + signals). + +The federation/identity layer (psyc.lines.federation) gives us node identity +and a peer registry; discovery walks DNS-SD; vouching adds the web-of-trust; +the signal buffer records what peers have told us. This module is the +*visualization primitive* — it stitches those into a node+edge graph the +cockpit renders as a force-directed map of the federation we sit in. + +Three views: + * build_local_view() — what we know first-hand: self at center, direct + peers around us, edges = vouches + 24h signals. + * build_transitive_view() — local_view plus peers-of-peers, fetched from + each trusted peer's /federation/network endpoint. + Cached aggressively to avoid spamming peers. + * build_public_view() — JSON-safe payload to publish at /federation/network. + Only TRUSTED peers + our outbound vouches; signed. +""" + +from __future__ import annotations + +import base64 +import json +import threading +import time +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple + +import httpx +from pydantic import BaseModel, Field + +from psyc import db, log +from psyc.lines import federation + + +_log = log.get(__name__) + + +SIGNAL_WINDOW_HOURS = 24 +TRANSITIVE_CACHE_TTL = 300.0 # 5 minutes +TRANSITIVE_FETCH_TIMEOUT = 4.0 + + +# ---------- data model -------------------------------------------------- + +class NetworkNode(BaseModel): + """One vertex on the federation map. + + `distance` is the topological hop count from self: 0 for self, 1 for + directly-registered peers, 2 for peers-of-peers discovered via the + transitive fetch. `status` is the trust label the UI colors by. + """ + fingerprint: str + domain: Optional[str] = None + label: str + status: str # "self" | "trusted" | "vouched" | "unknown" | "blocked" + is_self: bool = False + distance: int = 1 + + +class NetworkEdge(BaseModel): + """One edge on the federation map. + + `kind` drives stroke style in the UI: vouch = solid, signal = dashed + flow with thickness ∝ weight, knows = dotted grey transitive hint. + """ + source_fingerprint: str + target_fingerprint: str + kind: str # "vouch" | "signal" | "knows" + weight: float = 1.0 + label: str = "" + bidirectional: bool = False + + +class NetworkView(BaseModel): + """Renderable snapshot — nodes + edges + headline stats.""" + nodes: List[NetworkNode] = Field(default_factory=list) + edges: List[NetworkEdge] = Field(default_factory=list) + generated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + stats: Dict[str, Any] = Field(default_factory=dict) + + +# ---------- internal helpers -------------------------------------------- + +def _short_fp(fp: str) -> str: + if len(fp) >= 16: + return f"{fp[:8]}…{fp[-8:]}" + return fp + + +def _peer_status_label(peer: federation.Peer) -> str: + """Map peers.status + vouch-eligibility into the UI status taxonomy.""" + if peer.status == "trusted": + return "trusted" + if peer.status == "blocked": + return "blocked" + # unknown — but if a quorum of trusted peers have vouched, surface as "vouched" + if federation.peer_is_listening_eligible(peer.fingerprint): + return "vouched" + return "unknown" + + +def _signal_counts_24h() -> Tuple[Dict[str, int], int, int]: + """Per-peer signal count over the last `SIGNAL_WINDOW_HOURS`, plus totals. + + Returns (counts_by_fingerprint, total_buffered, distinct_hashes). + """ + cutoff = (datetime.now(timezone.utc) - timedelta(hours=SIGNAL_WINDOW_HOURS)).isoformat() + counts: Dict[str, int] = {} + distinct_hashes: set = set() + total = 0 + # recent_signals gives newest first; one call is enough for a 24h window + # under any realistic volume — the buffer is small. + for row in db.recent_signals(limit=10_000): + received = str(row.get("received_at") or "") + if received < cutoff: + break + fp = row.get("peer_fingerprint") or "" + if not fp: + continue + counts[fp] = counts.get(fp, 0) + 1 + distinct_hashes.add(row.get("signal_hash") or "") + total += 1 + return counts, total, len(distinct_hashes) + + +# ---------- local view -------------------------------------------------- + +def build_local_view() -> NetworkView: + """Everything we know first-hand — self + direct peers + their edges to us. + + Self sits at distance 0. Each row from `db.list_peers()` becomes a + distance-1 node, with `status` derived from `peers.status` (falling + back to "vouched" if a vouch quorum names them) or "blocked". + + Edges: + * self -> peer vouch — for every peer we've issued a vouch for + * peer -> self vouch — for every received vouch naming our fp + * self -> peer signal — one per peer that has sent us ≥1 signal in 24h, + weight = count, label = "N signals" + + Stats line up with what the admin page header shows. + """ + our_fp = federation.node_fingerprint() + peers = federation.list_peers() + nodes: List[NetworkNode] = [ + NetworkNode( + fingerprint=our_fp, + label="self", + status="self", + is_self=True, + distance=0, + ) + ] + peer_by_fp: Dict[str, federation.Peer] = {} + for p in peers: + if p.fingerprint == our_fp: + # Defensive — should never happen, but don't double-render self. + continue + peer_by_fp[p.fingerprint] = p + nodes.append(NetworkNode( + fingerprint=p.fingerprint, + domain=p.domain, + label=p.domain or _short_fp(p.fingerprint), + status=_peer_status_label(p), + is_self=False, + distance=1, + )) + + edges: List[NetworkEdge] = [] + + # Vouches WE have issued — self -> target. + out_vouch_targets: set = set() + for v in federation.our_vouches(): + # Only render an edge if the target is in our peer registry + # OR is the federation broadcast target — but for the local map we + # only show edges to nodes we render, so skip strangers. + if v.target_fingerprint not in peer_by_fp: + continue + out_vouch_targets.add(v.target_fingerprint) + edges.append(NetworkEdge( + source_fingerprint=our_fp, + target_fingerprint=v.target_fingerprint, + kind="vouch", + weight=1.0, + label="vouched", + )) + + # Vouches received — non-self vouches where target == our fp. + for v in federation.vouches_for(our_fp): + if v.voucher_fingerprint == our_fp: + continue + if v.voucher_fingerprint not in peer_by_fp: + # Voucher not in our peer table → no node to attach to. Skip; + # the local map is honest about what's renderable. + continue + # Mark bidirectional on the existing outbound vouch when applicable so + # the UI can collapse the two lines. + existing = next( + (e for e in edges + if e.kind == "vouch" + and e.source_fingerprint == our_fp + and e.target_fingerprint == v.voucher_fingerprint), + None, + ) + if existing is not None: + existing.bidirectional = True + existing.label = "vouched ↔" + continue + edges.append(NetworkEdge( + source_fingerprint=v.voucher_fingerprint, + target_fingerprint=our_fp, + kind="vouch", + weight=1.0, + label="vouches us", + )) + + # Signal edges — one per peer that has reported anything in 24h. + sig_counts, total_signals, distinct_hashes = _signal_counts_24h() + for fp, count in sig_counts.items(): + if fp not in peer_by_fp: + # Signals only land from listening-eligible peers, but if the + # operator just demoted a peer the buffer can still contain + # rows referencing a now-deleted peer. Skip cleanly. + continue + edges.append(NetworkEdge( + source_fingerprint=fp, + target_fingerprint=our_fp, + kind="signal", + weight=float(count), + label=f"{count} signals/24h", + )) + + # Quorum-met count over the signals we've seen in the window. + quorum_met = 0 + seen_hashes: set = set() + for row in db.recent_signals(limit=10_000): + h = row.get("signal_hash") or "" + if not h or h in seen_hashes: + continue + seen_hashes.add(h) + if federation.is_quorum_met(h): + quorum_met += 1 + + vouched_peers = sum( + 1 for n in nodes + if not n.is_self and n.status in ("trusted", "vouched") + ) + stats: Dict[str, Any] = { + "total_peers": len(peer_by_fp), + "vouched_peers": vouched_peers, + "signals_buffered_24h": total_signals, + "distinct_signal_hashes_24h": distinct_hashes, + "quorum_met_count": quorum_met, + "vouches_issued": len(out_vouch_targets), + } + + return NetworkView(nodes=nodes, edges=edges, stats=stats) + + +# ---------- transitive view --------------------------------------------- + +_TRANSITIVE_CACHE: Dict[str, Any] = {"ts": 0.0, "view": None} +_TRANSITIVE_LOCK = threading.Lock() + + +def _fetch_peer_network(domain: str, timeout: float = TRANSITIVE_FETCH_TIMEOUT) -> Optional[Dict[str, Any]]: + """GET /federation/network on a peer. Returns dict on success, None otherwise. + + Failure modes are logged at info — one slow/broken peer must NEVER abort + the transitive walk. + """ + if not domain: + return None + url = f"https://{domain}/federation/network" + try: + with httpx.Client(timeout=timeout) as client: + r = client.get(url) + r.raise_for_status() + data = r.json() + except Exception as exc: # noqa: BLE001 — every failure mode is "skip this peer" + _log.info("network_view.transitive.skip", domain=domain, reason=str(exc)[:120]) + return None + if not isinstance(data, dict): + _log.info("network_view.transitive.bad_shape", domain=domain, kind=type(data).__name__) + return None + return data + + +def build_transitive_view(force_refresh: bool = False) -> NetworkView: + """Local view + distance-2 nodes pulled from each trusted peer. + + Cached for `TRANSITIVE_CACHE_TTL` seconds — peer fan-out is expensive. + Cache is opt-out via `force_refresh` (the admin "re-fetch" button). + """ + now = time.time() + if not force_refresh: + with _TRANSITIVE_LOCK: + cached = _TRANSITIVE_CACHE.get("view") + cached_ts = float(_TRANSITIVE_CACHE.get("ts") or 0.0) + if cached is not None and (now - cached_ts) < TRANSITIVE_CACHE_TTL: + return cached + + view = build_local_view() + our_fp = view.nodes[0].fingerprint + existing_fps: set = {n.fingerprint for n in view.nodes} + + # Index direct peers by domain so we can attribute distance=2 to a parent. + direct_by_fp: Dict[str, NetworkNode] = { + n.fingerprint: n for n in view.nodes if not n.is_self + } + direct_trusted = [n for n in direct_by_fp.values() if n.status == "trusted" and n.domain] + + transitive_count = 0 + for parent in direct_trusted: + data = _fetch_peer_network(parent.domain or "") + if data is None: + continue + # The peer's payload mirrors build_public_view's shape: + # {fingerprint, peers: [{fingerprint, domain}], vouches: [{voucher, target}]}. + their_peers = data.get("peers") or [] + for pp in their_peers: + if not isinstance(pp, dict): + continue + fp = str(pp.get("fingerprint") or "") + if not fp or fp == our_fp: + continue + if fp in existing_fps: + # Already a direct peer (or our self) — but still draw the + # "knows" edge from parent to it to express the topology. + view.edges.append(NetworkEdge( + source_fingerprint=parent.fingerprint, + target_fingerprint=fp, + kind="knows", + weight=0.5, + label="knows", + )) + continue + domain = str(pp.get("domain") or "") or None + view.nodes.append(NetworkNode( + fingerprint=fp, + domain=domain, + label=domain or _short_fp(fp), + status="unknown", + is_self=False, + distance=2, + )) + existing_fps.add(fp) + transitive_count += 1 + view.edges.append(NetworkEdge( + source_fingerprint=parent.fingerprint, + target_fingerprint=fp, + kind="knows", + weight=0.5, + label="knows", + )) + + view.stats["transitive_nodes"] = transitive_count + + with _TRANSITIVE_LOCK: + _TRANSITIVE_CACHE["view"] = view + _TRANSITIVE_CACHE["ts"] = now + return view + + +# ---------- public payload ---------------------------------------------- + +def build_public_view() -> Dict[str, Any]: + """JSON-safe public payload returned from GET /federation/network. + + Only TRUSTED peers leak — never unknown or blocked. Vouches we've + issued ride along so peers can reconstruct the WoT shape. The whole + payload is signed with our Ed25519 key (canonical-JSON, then base64). + + Signal hashes are deliberately omitted — those are internal state and + would leak who's reporting what to whom. + """ + our_fp = federation.node_fingerprint() + trusted_peers: List[Dict[str, Any]] = [] + for p in federation.list_peers(): + if p.status != "trusted": + continue + trusted_peers.append({ + "domain": p.domain, + "fingerprint": p.fingerprint, + "first_seen": p.discovered_at, + }) + vouches: List[Dict[str, Any]] = [] + for v in federation.our_vouches(): + vouches.append({ + "voucher_fingerprint": v.voucher_fingerprint, + "target_fingerprint": v.target_fingerprint, + "issued_at": v.issued_at.isoformat(), + "expires_at": v.expires_at.isoformat() if v.expires_at else None, + }) + + payload: Dict[str, Any] = { + "version": federation.FEED_VERSION, + "fingerprint": our_fp, + "generated_at": datetime.now(timezone.utc).isoformat(), + "peers": trusted_peers, + "vouches": vouches, + } + sig = federation.sign_payload(federation.canonical_json(payload)) + payload["signature"] = base64.b64encode(sig).decode("ascii") + return payload + + +# ---------- admin-only payload (data endpoint) -------------------------- + +def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]: + """Full view for the admin data endpoint — JSON-serializable. + + Unlike `build_public_view`, this DOES include unknown + blocked peers + and recent signal hashes — it's only ever served behind admin auth. + """ + view = build_transitive_view() if include_transitive else build_local_view() + return { + "self_fingerprint": view.nodes[0].fingerprint, + "nodes": [n.model_dump() for n in view.nodes], + "edges": [e.model_dump() for e in view.edges], + "stats": view.stats, + "generated_at": view.generated_at.isoformat(), + }