stage-net-a network view: data model + local view builder

This commit is contained in:
m17hr1l
2026-06-07 00:36:29 +02:00
parent 77533eccb1
commit fbad78a611

View File

@@ -0,0 +1,423 @@
"""Network view — federation graph: self + peers + edges (vouches + signals).
The federation/identity layer (psyc.lines.federation) gives us node identity
and a peer registry; discovery walks DNS-SD; vouching adds the web-of-trust;
the signal buffer records what peers have told us. This module is the
*visualization primitive* — it stitches those into a node+edge graph the
cockpit renders as a force-directed map of the federation we sit in.
Three views:
* build_local_view() — what we know first-hand: self at center, direct
peers around us, edges = vouches + 24h signals.
* build_transitive_view() — local_view plus peers-of-peers, fetched from
each trusted peer's /federation/network endpoint.
Cached aggressively to avoid spamming peers.
* build_public_view() — JSON-safe payload to publish at /federation/network.
Only TRUSTED peers + our outbound vouches; signed.
"""
from __future__ import annotations
import base64
import json
import threading
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
import httpx
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.lines import federation
_log = log.get(__name__)
SIGNAL_WINDOW_HOURS = 24
TRANSITIVE_CACHE_TTL = 300.0 # 5 minutes
TRANSITIVE_FETCH_TIMEOUT = 4.0
# ---------- data model --------------------------------------------------
class NetworkNode(BaseModel):
"""One vertex on the federation map.
`distance` is the topological hop count from self: 0 for self, 1 for
directly-registered peers, 2 for peers-of-peers discovered via the
transitive fetch. `status` is the trust label the UI colors by.
"""
fingerprint: str
domain: Optional[str] = None
label: str
status: str # "self" | "trusted" | "vouched" | "unknown" | "blocked"
is_self: bool = False
distance: int = 1
class NetworkEdge(BaseModel):
"""One edge on the federation map.
`kind` drives stroke style in the UI: vouch = solid, signal = dashed
flow with thickness ∝ weight, knows = dotted grey transitive hint.
"""
source_fingerprint: str
target_fingerprint: str
kind: str # "vouch" | "signal" | "knows"
weight: float = 1.0
label: str = ""
bidirectional: bool = False
class NetworkView(BaseModel):
"""Renderable snapshot — nodes + edges + headline stats."""
nodes: List[NetworkNode] = Field(default_factory=list)
edges: List[NetworkEdge] = Field(default_factory=list)
generated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
stats: Dict[str, Any] = Field(default_factory=dict)
# ---------- internal helpers --------------------------------------------
def _short_fp(fp: str) -> str:
if len(fp) >= 16:
return f"{fp[:8]}{fp[-8:]}"
return fp
def _peer_status_label(peer: federation.Peer) -> str:
"""Map peers.status + vouch-eligibility into the UI status taxonomy."""
if peer.status == "trusted":
return "trusted"
if peer.status == "blocked":
return "blocked"
# unknown — but if a quorum of trusted peers have vouched, surface as "vouched"
if federation.peer_is_listening_eligible(peer.fingerprint):
return "vouched"
return "unknown"
def _signal_counts_24h() -> Tuple[Dict[str, int], int, int]:
"""Per-peer signal count over the last `SIGNAL_WINDOW_HOURS`, plus totals.
Returns (counts_by_fingerprint, total_buffered, distinct_hashes).
"""
cutoff = (datetime.now(timezone.utc) - timedelta(hours=SIGNAL_WINDOW_HOURS)).isoformat()
counts: Dict[str, int] = {}
distinct_hashes: set = set()
total = 0
# recent_signals gives newest first; one call is enough for a 24h window
# under any realistic volume — the buffer is small.
for row in db.recent_signals(limit=10_000):
received = str(row.get("received_at") or "")
if received < cutoff:
break
fp = row.get("peer_fingerprint") or ""
if not fp:
continue
counts[fp] = counts.get(fp, 0) + 1
distinct_hashes.add(row.get("signal_hash") or "")
total += 1
return counts, total, len(distinct_hashes)
# ---------- local view --------------------------------------------------
def build_local_view() -> NetworkView:
"""Everything we know first-hand — self + direct peers + their edges to us.
Self sits at distance 0. Each row from `db.list_peers()` becomes a
distance-1 node, with `status` derived from `peers.status` (falling
back to "vouched" if a vouch quorum names them) or "blocked".
Edges:
* self -> peer vouch — for every peer we've issued a vouch for
* peer -> self vouch — for every received vouch naming our fp
* self -> peer signal — one per peer that has sent us ≥1 signal in 24h,
weight = count, label = "N signals"
Stats line up with what the admin page header shows.
"""
our_fp = federation.node_fingerprint()
peers = federation.list_peers()
nodes: List[NetworkNode] = [
NetworkNode(
fingerprint=our_fp,
label="self",
status="self",
is_self=True,
distance=0,
)
]
peer_by_fp: Dict[str, federation.Peer] = {}
for p in peers:
if p.fingerprint == our_fp:
# Defensive — should never happen, but don't double-render self.
continue
peer_by_fp[p.fingerprint] = p
nodes.append(NetworkNode(
fingerprint=p.fingerprint,
domain=p.domain,
label=p.domain or _short_fp(p.fingerprint),
status=_peer_status_label(p),
is_self=False,
distance=1,
))
edges: List[NetworkEdge] = []
# Vouches WE have issued — self -> target.
out_vouch_targets: set = set()
for v in federation.our_vouches():
# Only render an edge if the target is in our peer registry
# OR is the federation broadcast target — but for the local map we
# only show edges to nodes we render, so skip strangers.
if v.target_fingerprint not in peer_by_fp:
continue
out_vouch_targets.add(v.target_fingerprint)
edges.append(NetworkEdge(
source_fingerprint=our_fp,
target_fingerprint=v.target_fingerprint,
kind="vouch",
weight=1.0,
label="vouched",
))
# Vouches received — non-self vouches where target == our fp.
for v in federation.vouches_for(our_fp):
if v.voucher_fingerprint == our_fp:
continue
if v.voucher_fingerprint not in peer_by_fp:
# Voucher not in our peer table → no node to attach to. Skip;
# the local map is honest about what's renderable.
continue
# Mark bidirectional on the existing outbound vouch when applicable so
# the UI can collapse the two lines.
existing = next(
(e for e in edges
if e.kind == "vouch"
and e.source_fingerprint == our_fp
and e.target_fingerprint == v.voucher_fingerprint),
None,
)
if existing is not None:
existing.bidirectional = True
existing.label = "vouched ↔"
continue
edges.append(NetworkEdge(
source_fingerprint=v.voucher_fingerprint,
target_fingerprint=our_fp,
kind="vouch",
weight=1.0,
label="vouches us",
))
# Signal edges — one per peer that has reported anything in 24h.
sig_counts, total_signals, distinct_hashes = _signal_counts_24h()
for fp, count in sig_counts.items():
if fp not in peer_by_fp:
# Signals only land from listening-eligible peers, but if the
# operator just demoted a peer the buffer can still contain
# rows referencing a now-deleted peer. Skip cleanly.
continue
edges.append(NetworkEdge(
source_fingerprint=fp,
target_fingerprint=our_fp,
kind="signal",
weight=float(count),
label=f"{count} signals/24h",
))
# Quorum-met count over the signals we've seen in the window.
quorum_met = 0
seen_hashes: set = set()
for row in db.recent_signals(limit=10_000):
h = row.get("signal_hash") or ""
if not h or h in seen_hashes:
continue
seen_hashes.add(h)
if federation.is_quorum_met(h):
quorum_met += 1
vouched_peers = sum(
1 for n in nodes
if not n.is_self and n.status in ("trusted", "vouched")
)
stats: Dict[str, Any] = {
"total_peers": len(peer_by_fp),
"vouched_peers": vouched_peers,
"signals_buffered_24h": total_signals,
"distinct_signal_hashes_24h": distinct_hashes,
"quorum_met_count": quorum_met,
"vouches_issued": len(out_vouch_targets),
}
return NetworkView(nodes=nodes, edges=edges, stats=stats)
# ---------- transitive view ---------------------------------------------
_TRANSITIVE_CACHE: Dict[str, Any] = {"ts": 0.0, "view": None}
_TRANSITIVE_LOCK = threading.Lock()
def _fetch_peer_network(domain: str, timeout: float = TRANSITIVE_FETCH_TIMEOUT) -> Optional[Dict[str, Any]]:
"""GET /federation/network on a peer. Returns dict on success, None otherwise.
Failure modes are logged at info — one slow/broken peer must NEVER abort
the transitive walk.
"""
if not domain:
return None
url = f"https://{domain}/federation/network"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except Exception as exc: # noqa: BLE001 — every failure mode is "skip this peer"
_log.info("network_view.transitive.skip", domain=domain, reason=str(exc)[:120])
return None
if not isinstance(data, dict):
_log.info("network_view.transitive.bad_shape", domain=domain, kind=type(data).__name__)
return None
return data
def build_transitive_view(force_refresh: bool = False) -> NetworkView:
"""Local view + distance-2 nodes pulled from each trusted peer.
Cached for `TRANSITIVE_CACHE_TTL` seconds — peer fan-out is expensive.
Cache is opt-out via `force_refresh` (the admin "re-fetch" button).
"""
now = time.time()
if not force_refresh:
with _TRANSITIVE_LOCK:
cached = _TRANSITIVE_CACHE.get("view")
cached_ts = float(_TRANSITIVE_CACHE.get("ts") or 0.0)
if cached is not None and (now - cached_ts) < TRANSITIVE_CACHE_TTL:
return cached
view = build_local_view()
our_fp = view.nodes[0].fingerprint
existing_fps: set = {n.fingerprint for n in view.nodes}
# Index direct peers by domain so we can attribute distance=2 to a parent.
direct_by_fp: Dict[str, NetworkNode] = {
n.fingerprint: n for n in view.nodes if not n.is_self
}
direct_trusted = [n for n in direct_by_fp.values() if n.status == "trusted" and n.domain]
transitive_count = 0
for parent in direct_trusted:
data = _fetch_peer_network(parent.domain or "")
if data is None:
continue
# The peer's payload mirrors build_public_view's shape:
# {fingerprint, peers: [{fingerprint, domain}], vouches: [{voucher, target}]}.
their_peers = data.get("peers") or []
for pp in their_peers:
if not isinstance(pp, dict):
continue
fp = str(pp.get("fingerprint") or "")
if not fp or fp == our_fp:
continue
if fp in existing_fps:
# Already a direct peer (or our self) — but still draw the
# "knows" edge from parent to it to express the topology.
view.edges.append(NetworkEdge(
source_fingerprint=parent.fingerprint,
target_fingerprint=fp,
kind="knows",
weight=0.5,
label="knows",
))
continue
domain = str(pp.get("domain") or "") or None
view.nodes.append(NetworkNode(
fingerprint=fp,
domain=domain,
label=domain or _short_fp(fp),
status="unknown",
is_self=False,
distance=2,
))
existing_fps.add(fp)
transitive_count += 1
view.edges.append(NetworkEdge(
source_fingerprint=parent.fingerprint,
target_fingerprint=fp,
kind="knows",
weight=0.5,
label="knows",
))
view.stats["transitive_nodes"] = transitive_count
with _TRANSITIVE_LOCK:
_TRANSITIVE_CACHE["view"] = view
_TRANSITIVE_CACHE["ts"] = now
return view
# ---------- public payload ----------------------------------------------
def build_public_view() -> Dict[str, Any]:
"""JSON-safe public payload returned from GET /federation/network.
Only TRUSTED peers leak — never unknown or blocked. Vouches we've
issued ride along so peers can reconstruct the WoT shape. The whole
payload is signed with our Ed25519 key (canonical-JSON, then base64).
Signal hashes are deliberately omitted — those are internal state and
would leak who's reporting what to whom.
"""
our_fp = federation.node_fingerprint()
trusted_peers: List[Dict[str, Any]] = []
for p in federation.list_peers():
if p.status != "trusted":
continue
trusted_peers.append({
"domain": p.domain,
"fingerprint": p.fingerprint,
"first_seen": p.discovered_at,
})
vouches: List[Dict[str, Any]] = []
for v in federation.our_vouches():
vouches.append({
"voucher_fingerprint": v.voucher_fingerprint,
"target_fingerprint": v.target_fingerprint,
"issued_at": v.issued_at.isoformat(),
"expires_at": v.expires_at.isoformat() if v.expires_at else None,
})
payload: Dict[str, Any] = {
"version": federation.FEED_VERSION,
"fingerprint": our_fp,
"generated_at": datetime.now(timezone.utc).isoformat(),
"peers": trusted_peers,
"vouches": vouches,
}
sig = federation.sign_payload(federation.canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload
# ---------- admin-only payload (data endpoint) --------------------------
def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]:
"""Full view for the admin data endpoint — JSON-serializable.
Unlike `build_public_view`, this DOES include unknown + blocked peers
and recent signal hashes — it's only ever served behind admin auth.
"""
view = build_transitive_view() if include_transitive else build_local_view()
return {
"self_fingerprint": view.nodes[0].fingerprint,
"nodes": [n.model_dump() for n in view.nodes],
"edges": [e.model_dump() for e in view.edges],
"stats": view.stats,
"generated_at": view.generated_at.isoformat(),
}