diff --git a/src/psyc/lines/network_view.py b/src/psyc/lines/network_view.py index d15359a..d8ba344 100644 --- a/src/psyc/lines/network_view.py +++ b/src/psyc/lines/network_view.py @@ -38,6 +38,7 @@ _log = log.get(__name__) SIGNAL_WINDOW_HOURS = 24 TRANSITIVE_CACHE_TTL = 300.0 # 5 minutes TRANSITIVE_FETCH_TIMEOUT = 4.0 +EXPLORE_FETCH_TIMEOUT = 4.0 # ---------- data model -------------------------------------------------- @@ -773,3 +774,256 @@ def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]: "stats": view.stats, "generated_at": view.generated_at.isoformat(), } + + +# ---------- public explore payload -------------------------------------- +# +# "Transparent security" view: the same shape a peer would see at +# /federation/network, plus per-peer counts (NEVER values), inbound vouches, +# and a thin distance-2 snapshot — enough for a public visitor to draw the +# mesh and walk to any peer's own explore page. Everything is signed. + +EXPLORE_TRANSITIVE_CAP = 50 # cap on distinct distance-2 fps to keep payload bounded + + +def _explore_peer_stats( + peer_fp: str, + now: datetime, + signals_by_peer: Dict[str, List[Dict[str, Any]]], + all_signal_counts: Dict[str, int], + quorum_cache: Dict[str, bool], +) -> Dict[str, Any]: + """Per-peer COUNTS only — no IOC values, no case summaries, no raw_json. + + Counts split by signal_type (cases vs iocs) are safe to expose since + the magnitude of "how chatty is this peer" is already implicit in the + 24h signal count. We deliberately omit severity + ioc_type breakdowns + here — those could hint at the target sector. + """ + rows = signals_by_peer.get(peer_fp, []) + cases_24h = 0 + iocs_24h = 0 + last_seen_iso = "" + seen_hashes: set = set() + quorum_contribution_24h = 0 + for row in rows: + st = row.get("signal_type") or "" + if st == "case": + cases_24h += 1 + elif st == "ioc": + iocs_24h += 1 + h = row.get("signal_hash") or "" + if h and h not in seen_hashes: + seen_hashes.add(h) + if h not in quorum_cache: + quorum_cache[h] = federation.is_quorum_met(h) + if quorum_cache[h]: + quorum_contribution_24h += 1 + if rows: + # recent_signals returns newest-first → first row is latest. + last_seen_iso = str(rows[0].get("received_at") or "") + return { + "signal_count_24h": len(rows), + "signal_count_total": all_signal_counts.get(peer_fp, 0), + "cases_24h": cases_24h, + "iocs_24h": iocs_24h, + "quorum_contribution_24h": quorum_contribution_24h, + "last_seen": last_seen_iso or None, + } + + +def _fetch_peer_explore(domain: str, timeout: float = EXPLORE_FETCH_TIMEOUT) -> Optional[Dict[str, Any]]: + """GET /federation/explore/data on a peer. Returns dict on success. + + Mirrors `_fetch_peer_network`'s failure semantics: one slow/broken peer + must never abort the explore walk. + """ + if not domain: + return None + url = f"https://{domain}/federation/explore/data" + try: + with httpx.Client(timeout=timeout) as client: + r = client.get(url) + r.raise_for_status() + data = r.json() + except Exception as exc: # noqa: BLE001 + _log.info("network_view.explore.transitive.skip", domain=domain, reason=str(exc)[:120]) + return None + if not isinstance(data, dict): + return None + return data + + +def _explore_transitive_peers( + trusted_peers: List[Tuple[str, Optional[str]]], + own_fp: str, + own_peer_fps: set, +) -> List[Dict[str, Any]]: + """Distance-2 fps learned from trusted peers' explore/data feeds. + + Returns [{fingerprint, via_peer_fingerprint, domain}] entries. Capped at + `EXPLORE_TRANSITIVE_CAP` to keep the public payload bounded — first peer + to introduce a fingerprint wins so the via attribution stays stable. + """ + seen: set = set(own_peer_fps) + seen.add(own_fp) + out: List[Dict[str, Any]] = [] + for parent_fp, parent_domain in trusted_peers: + if not parent_domain or len(out) >= EXPLORE_TRANSITIVE_CAP: + continue + data = _fetch_peer_explore(parent_domain) + if not data: + # Fall back to the older /federation/network endpoint — older + # psyc nodes won't have /federation/explore/data yet. + data = _fetch_peer_network(parent_domain) + if not data: + continue + their_peers = data.get("peers") or [] + for pp in their_peers: + if not isinstance(pp, dict): + continue + fp = str(pp.get("fingerprint") or "") + if not fp or fp in seen: + continue + seen.add(fp) + out.append({ + "fingerprint": fp, + "domain": pp.get("domain") or None, + "via_peer_fingerprint": parent_fp, + }) + if len(out) >= EXPLORE_TRANSITIVE_CAP: + break + return out + + +def build_explore_view(node_domain: Optional[str] = None) -> Dict[str, Any]: + """Signed public explorer payload for /federation/explore/data. + + Extends `build_public_view` with: + * `node` — headline stats about THIS node (counts only) + * `peers[].*_count_24h` — per-peer chatter levels (no values leak) + * `vouches_in` — who has vouched for us (we only include vouchers + whose peer we currently trust, so signatures don't + leak unknown identities) + * `transitive_peers` — distance-2 fingerprints learned from each + trusted peer's public explore/network feed. + Cached aggressively (mirrors transitive cache). + * `corroboration_count_24h` — # distinct signal_hashes seen from ≥2 + peers in the 24h window. + + The whole payload (sans signature) is Ed25519-signed over canonical JSON. + No IOC values, case_ids, raw_json, severity or ioc-type breakdowns are + included — anything that could leak the target sector or who reported + what stays inside `build_admin_view`. + """ + our_fp = federation.node_fingerprint() + now = datetime.now(timezone.utc) + + # Reuse the 24h signal bucket scan + all-time count + quorum cache. + signals_by_peer, fresh_signals = _index_signals_24h(now) + all_signal_counts = _all_signals_by_peer_count() + quorum_cache: Dict[str, bool] = {} + + # Build the trusted-peer rows (the only ones we expose), with public-safe + # stats. Unknown + blocked never leak — see `build_public_view`. + peer_rows: List[Dict[str, Any]] = [] + trusted_peers_for_walk: List[Tuple[str, Optional[str]]] = [] + trusted_fps: set = set() + for p in federation.list_peers(): + if p.status != "trusted": + continue + trusted_fps.add(p.fingerprint) + trusted_peers_for_walk.append((p.fingerprint, p.domain)) + stats = _explore_peer_stats( + peer_fp=p.fingerprint, + now=now, + signals_by_peer=signals_by_peer, + all_signal_counts=all_signal_counts, + quorum_cache=quorum_cache, + ) + peer_rows.append({ + "domain": p.domain, + "fingerprint": p.fingerprint, + "first_seen": p.discovered_at, + **stats, + }) + + # Vouches WE've issued — same shape as build_public_view + signature. + vouches_out: List[Dict[str, Any]] = [] + for v in federation.our_vouches(): + vouches_out.append({ + "voucher_fingerprint": v.voucher_fingerprint, + "target_fingerprint": v.target_fingerprint, + "issued_at": v.issued_at.isoformat(), + "expires_at": v.expires_at.isoformat() if v.expires_at else None, + "signature": v.signature, + }) + + # Vouches IN — only those naming us as target where we trust the voucher. + # We don't surface vouches from unknown identities: doing so would let any + # stranger forge an inbound vouch and show up here. + vouches_in: List[Dict[str, Any]] = [] + for row in db.list_vouches(): + if (row.get("target_fingerprint") or "") != our_fp: + continue + voucher_fp = row.get("voucher_fingerprint") or "" + if voucher_fp == our_fp: + continue + if voucher_fp not in trusted_fps: + continue + vouches_in.append({ + "voucher_fingerprint": voucher_fp, + "target_fingerprint": our_fp, + "issued_at": row.get("issued_at") or "", + "expires_at": row.get("expires_at") or None, + "signature": row.get("signature") or "", + }) + + # Transitive snapshot. The aim is "one fetch surfaces N hops" — distance-2 + # fingerprints learned from each trusted peer's own explore/network feed. + transitive_peers = _explore_transitive_peers( + trusted_peers_for_walk, our_fp, trusted_fps, + ) + + # Corroboration: # distinct hashes seen from ≥2 distinct peers in 24h. + by_hash: Dict[str, set] = {} + for row in fresh_signals: + h = row.get("signal_hash") or "" + if not h: + continue + by_hash.setdefault(h, set()).add(row.get("peer_fingerprint") or "") + corroboration_count_24h = sum(1 for fps in by_hash.values() if len(fps) >= 2) + + # Transparency log headline numbers — chain head + length, never bodies. + head_entry = translog.head() + translog_head_hash = head_entry.entry_hash if head_entry else None + translog_entry_count = int(head_entry.id) if head_entry else 0 + + node_block: Dict[str, Any] = { + "fingerprint": our_fp, + "domain": node_domain, + "generated_at": now.isoformat(), + "transparency_log_head_hash": translog_head_hash, + "translog_entry_count": translog_entry_count, + "peer_count": len(peer_rows), + "vouches_out_count": len(vouches_out), + "vouches_in_count": len(vouches_in), + "corroboration_count_24h": corroboration_count_24h, + "signals_count_24h": sum(p["signal_count_24h"] for p in peer_rows), + } + + payload: Dict[str, Any] = { + "version": federation.FEED_VERSION, + "fingerprint": our_fp, + "generated_at": now.isoformat(), + "node": node_block, + "peers": peer_rows, + "vouches": vouches_out, # kept for shape-compat with /federation/network + "vouches_out": vouches_out, + "vouches_in": vouches_in, + "transitive_peers": transitive_peers, + "corroboration_count_24h": corroboration_count_24h, + } + sig = federation.sign_payload(federation.canonical_json(payload)) + payload["signature"] = base64.b64encode(sig).decode("ascii") + return payload diff --git a/tests/test_network_view.py b/tests/test_network_view.py index 49fde86..d7f3027 100644 --- a/tests/test_network_view.py +++ b/tests/test_network_view.py @@ -18,6 +18,7 @@ from psyc.lines.network_view import ( NetworkNode, NetworkView, build_admin_view, + build_explore_view, build_local_view, build_public_view, build_transitive_view, @@ -623,6 +624,49 @@ def test_admin_view_recent_translog_per_peer(fresh_db, fed_dir): assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"} +def test_explore_view_omits_ioc_values_case_ids_and_raw_json(fresh_db, fed_dir): + """The public explore payload must NEVER expose IOC values, case_ids, or raw_json. + + This is the load-bearing transparency-vs-leakage contract that lives at + the network-view layer — anyone can audit who's talking to whom and how + much, but never *what* they're saying. + """ + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="evil-domain-do-not-leak.com", + signal_hash="ioc-hash-leak", + received_at=now_iso, + raw_json=json.dumps({"type": "domain", "value": "evil-domain-do-not-leak.com"}), + )) + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="case", + signal_id="CASE-SECRET-42", + signal_hash="case-hash-leak", + received_at=now_iso, + raw_json=json.dumps({"severity": "critical", "case_id": "CASE-SECRET-42"}), + )) + with patch.object(network_view, "_fetch_peer_explore", return_value=None), \ + patch.object(network_view, "_fetch_peer_network", return_value=None): + payload = build_explore_view() + flat = json.dumps(payload, default=str) + assert "evil-domain-do-not-leak.com" not in flat + assert "CASE-SECRET-42" not in flat + assert "raw_json" not in flat + # Sector-leaking breakdowns must not appear either. + assert "severity_breakdown" not in flat + assert "ioc_type_breakdown" not in flat + # And peer rows carry only public-safe counts. + for p in payload.get("peers", []): + assert "severity_breakdown" not in p + assert "ioc_type_breakdown" not in p + assert "recent_translog" not in p + + def test_public_view_still_has_no_stats(fresh_db, fed_dir): """Public payload must not surface admin-only enrichments — sensitive.