stage-exp-a explore: public payload builder + tests

This commit is contained in:
m17hr1l
2026-06-07 01:11:17 +02:00
parent 925bf76a0b
commit 56466c334d
2 changed files with 298 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ _log = log.get(__name__)
SIGNAL_WINDOW_HOURS = 24 SIGNAL_WINDOW_HOURS = 24
TRANSITIVE_CACHE_TTL = 300.0 # 5 minutes TRANSITIVE_CACHE_TTL = 300.0 # 5 minutes
TRANSITIVE_FETCH_TIMEOUT = 4.0 TRANSITIVE_FETCH_TIMEOUT = 4.0
EXPLORE_FETCH_TIMEOUT = 4.0
# ---------- data model -------------------------------------------------- # ---------- data model --------------------------------------------------
@@ -773,3 +774,256 @@ def build_admin_view(include_transitive: bool = True) -> Dict[str, Any]:
"stats": view.stats, "stats": view.stats,
"generated_at": view.generated_at.isoformat(), "generated_at": view.generated_at.isoformat(),
} }
# ---------- public explore payload --------------------------------------
#
# "Transparent security" view: the same shape a peer would see at
# /federation/network, plus per-peer counts (NEVER values), inbound vouches,
# and a thin distance-2 snapshot — enough for a public visitor to draw the
# mesh and walk to any peer's own explore page. Everything is signed.
EXPLORE_TRANSITIVE_CAP = 50 # cap on distinct distance-2 fps to keep payload bounded
def _explore_peer_stats(
peer_fp: str,
now: datetime,
signals_by_peer: Dict[str, List[Dict[str, Any]]],
all_signal_counts: Dict[str, int],
quorum_cache: Dict[str, bool],
) -> Dict[str, Any]:
"""Per-peer COUNTS only — no IOC values, no case summaries, no raw_json.
Counts split by signal_type (cases vs iocs) are safe to expose since
the magnitude of "how chatty is this peer" is already implicit in the
24h signal count. We deliberately omit severity + ioc_type breakdowns
here — those could hint at the target sector.
"""
rows = signals_by_peer.get(peer_fp, [])
cases_24h = 0
iocs_24h = 0
last_seen_iso = ""
seen_hashes: set = set()
quorum_contribution_24h = 0
for row in rows:
st = row.get("signal_type") or ""
if st == "case":
cases_24h += 1
elif st == "ioc":
iocs_24h += 1
h = row.get("signal_hash") or ""
if h and h not in seen_hashes:
seen_hashes.add(h)
if h not in quorum_cache:
quorum_cache[h] = federation.is_quorum_met(h)
if quorum_cache[h]:
quorum_contribution_24h += 1
if rows:
# recent_signals returns newest-first → first row is latest.
last_seen_iso = str(rows[0].get("received_at") or "")
return {
"signal_count_24h": len(rows),
"signal_count_total": all_signal_counts.get(peer_fp, 0),
"cases_24h": cases_24h,
"iocs_24h": iocs_24h,
"quorum_contribution_24h": quorum_contribution_24h,
"last_seen": last_seen_iso or None,
}
def _fetch_peer_explore(domain: str, timeout: float = EXPLORE_FETCH_TIMEOUT) -> Optional[Dict[str, Any]]:
"""GET /federation/explore/data on a peer. Returns dict on success.
Mirrors `_fetch_peer_network`'s failure semantics: one slow/broken peer
must never abort the explore walk.
"""
if not domain:
return None
url = f"https://{domain}/federation/explore/data"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except Exception as exc: # noqa: BLE001
_log.info("network_view.explore.transitive.skip", domain=domain, reason=str(exc)[:120])
return None
if not isinstance(data, dict):
return None
return data
def _explore_transitive_peers(
trusted_peers: List[Tuple[str, Optional[str]]],
own_fp: str,
own_peer_fps: set,
) -> List[Dict[str, Any]]:
"""Distance-2 fps learned from trusted peers' explore/data feeds.
Returns [{fingerprint, via_peer_fingerprint, domain}] entries. Capped at
`EXPLORE_TRANSITIVE_CAP` to keep the public payload bounded — first peer
to introduce a fingerprint wins so the via attribution stays stable.
"""
seen: set = set(own_peer_fps)
seen.add(own_fp)
out: List[Dict[str, Any]] = []
for parent_fp, parent_domain in trusted_peers:
if not parent_domain or len(out) >= EXPLORE_TRANSITIVE_CAP:
continue
data = _fetch_peer_explore(parent_domain)
if not data:
# Fall back to the older /federation/network endpoint — older
# psyc nodes won't have /federation/explore/data yet.
data = _fetch_peer_network(parent_domain)
if not data:
continue
their_peers = data.get("peers") or []
for pp in their_peers:
if not isinstance(pp, dict):
continue
fp = str(pp.get("fingerprint") or "")
if not fp or fp in seen:
continue
seen.add(fp)
out.append({
"fingerprint": fp,
"domain": pp.get("domain") or None,
"via_peer_fingerprint": parent_fp,
})
if len(out) >= EXPLORE_TRANSITIVE_CAP:
break
return out
def build_explore_view(node_domain: Optional[str] = None) -> Dict[str, Any]:
"""Signed public explorer payload for /federation/explore/data.
Extends `build_public_view` with:
* `node` — headline stats about THIS node (counts only)
* `peers[].*_count_24h` — per-peer chatter levels (no values leak)
* `vouches_in` — who has vouched for us (we only include vouchers
whose peer we currently trust, so signatures don't
leak unknown identities)
* `transitive_peers` — distance-2 fingerprints learned from each
trusted peer's public explore/network feed.
Cached aggressively (mirrors transitive cache).
* `corroboration_count_24h` — # distinct signal_hashes seen from ≥2
peers in the 24h window.
The whole payload (sans signature) is Ed25519-signed over canonical JSON.
No IOC values, case_ids, raw_json, severity or ioc-type breakdowns are
included — anything that could leak the target sector or who reported
what stays inside `build_admin_view`.
"""
our_fp = federation.node_fingerprint()
now = datetime.now(timezone.utc)
# Reuse the 24h signal bucket scan + all-time count + quorum cache.
signals_by_peer, fresh_signals = _index_signals_24h(now)
all_signal_counts = _all_signals_by_peer_count()
quorum_cache: Dict[str, bool] = {}
# Build the trusted-peer rows (the only ones we expose), with public-safe
# stats. Unknown + blocked never leak — see `build_public_view`.
peer_rows: List[Dict[str, Any]] = []
trusted_peers_for_walk: List[Tuple[str, Optional[str]]] = []
trusted_fps: set = set()
for p in federation.list_peers():
if p.status != "trusted":
continue
trusted_fps.add(p.fingerprint)
trusted_peers_for_walk.append((p.fingerprint, p.domain))
stats = _explore_peer_stats(
peer_fp=p.fingerprint,
now=now,
signals_by_peer=signals_by_peer,
all_signal_counts=all_signal_counts,
quorum_cache=quorum_cache,
)
peer_rows.append({
"domain": p.domain,
"fingerprint": p.fingerprint,
"first_seen": p.discovered_at,
**stats,
})
# Vouches WE've issued — same shape as build_public_view + signature.
vouches_out: List[Dict[str, Any]] = []
for v in federation.our_vouches():
vouches_out.append({
"voucher_fingerprint": v.voucher_fingerprint,
"target_fingerprint": v.target_fingerprint,
"issued_at": v.issued_at.isoformat(),
"expires_at": v.expires_at.isoformat() if v.expires_at else None,
"signature": v.signature,
})
# Vouches IN — only those naming us as target where we trust the voucher.
# We don't surface vouches from unknown identities: doing so would let any
# stranger forge an inbound vouch and show up here.
vouches_in: List[Dict[str, Any]] = []
for row in db.list_vouches():
if (row.get("target_fingerprint") or "") != our_fp:
continue
voucher_fp = row.get("voucher_fingerprint") or ""
if voucher_fp == our_fp:
continue
if voucher_fp not in trusted_fps:
continue
vouches_in.append({
"voucher_fingerprint": voucher_fp,
"target_fingerprint": our_fp,
"issued_at": row.get("issued_at") or "",
"expires_at": row.get("expires_at") or None,
"signature": row.get("signature") or "",
})
# Transitive snapshot. The aim is "one fetch surfaces N hops" — distance-2
# fingerprints learned from each trusted peer's own explore/network feed.
transitive_peers = _explore_transitive_peers(
trusted_peers_for_walk, our_fp, trusted_fps,
)
# Corroboration: # distinct hashes seen from ≥2 distinct peers in 24h.
by_hash: Dict[str, set] = {}
for row in fresh_signals:
h = row.get("signal_hash") or ""
if not h:
continue
by_hash.setdefault(h, set()).add(row.get("peer_fingerprint") or "")
corroboration_count_24h = sum(1 for fps in by_hash.values() if len(fps) >= 2)
# Transparency log headline numbers — chain head + length, never bodies.
head_entry = translog.head()
translog_head_hash = head_entry.entry_hash if head_entry else None
translog_entry_count = int(head_entry.id) if head_entry else 0
node_block: Dict[str, Any] = {
"fingerprint": our_fp,
"domain": node_domain,
"generated_at": now.isoformat(),
"transparency_log_head_hash": translog_head_hash,
"translog_entry_count": translog_entry_count,
"peer_count": len(peer_rows),
"vouches_out_count": len(vouches_out),
"vouches_in_count": len(vouches_in),
"corroboration_count_24h": corroboration_count_24h,
"signals_count_24h": sum(p["signal_count_24h"] for p in peer_rows),
}
payload: Dict[str, Any] = {
"version": federation.FEED_VERSION,
"fingerprint": our_fp,
"generated_at": now.isoformat(),
"node": node_block,
"peers": peer_rows,
"vouches": vouches_out, # kept for shape-compat with /federation/network
"vouches_out": vouches_out,
"vouches_in": vouches_in,
"transitive_peers": transitive_peers,
"corroboration_count_24h": corroboration_count_24h,
}
sig = federation.sign_payload(federation.canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload

View File

@@ -18,6 +18,7 @@ from psyc.lines.network_view import (
NetworkNode, NetworkNode,
NetworkView, NetworkView,
build_admin_view, build_admin_view,
build_explore_view,
build_local_view, build_local_view,
build_public_view, build_public_view,
build_transitive_view, build_transitive_view,
@@ -623,6 +624,49 @@ def test_admin_view_recent_translog_per_peer(fresh_db, fed_dir):
assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"} assert set(row.keys()) == {"id", "entry_type", "timestamp", "hash"}
def test_explore_view_omits_ioc_values_case_ids_and_raw_json(fresh_db, fed_dir):
"""The public explore payload must NEVER expose IOC values, case_ids, or raw_json.
This is the load-bearing transparency-vs-leakage contract that lives at
the network-view layer — anyone can audit who's talking to whom and how
much, but never *what* they're saying.
"""
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="evil-domain-do-not-leak.com",
signal_hash="ioc-hash-leak",
received_at=now_iso,
raw_json=json.dumps({"type": "domain", "value": "evil-domain-do-not-leak.com"}),
))
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="case",
signal_id="CASE-SECRET-42",
signal_hash="case-hash-leak",
received_at=now_iso,
raw_json=json.dumps({"severity": "critical", "case_id": "CASE-SECRET-42"}),
))
with patch.object(network_view, "_fetch_peer_explore", return_value=None), \
patch.object(network_view, "_fetch_peer_network", return_value=None):
payload = build_explore_view()
flat = json.dumps(payload, default=str)
assert "evil-domain-do-not-leak.com" not in flat
assert "CASE-SECRET-42" not in flat
assert "raw_json" not in flat
# Sector-leaking breakdowns must not appear either.
assert "severity_breakdown" not in flat
assert "ioc_type_breakdown" not in flat
# And peer rows carry only public-safe counts.
for p in payload.get("peers", []):
assert "severity_breakdown" not in p
assert "ioc_type_breakdown" not in p
assert "recent_translog" not in p
def test_public_view_still_has_no_stats(fresh_db, fed_dir): def test_public_view_still_has_no_stats(fresh_db, fed_dir):
"""Public payload must not surface admin-only enrichments — sensitive. """Public payload must not surface admin-only enrichments — sensitive.