From 234e6d98ba5ba3220cb2d22cb20c8fe49f4cbc9c Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:10:03 +0200 Subject: [PATCH] stage-vouch-b federation: vouch sign/verify + quorum API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the web-of-trust primitives, all keyed off the existing node keypair: - Vouch + QuorumConfig Pydantic models - vouch_payload_bytes — canonical-JSON body that the voucher signs - issue_vouch / accept_vouch / revoke_vouch / our_vouches / vouches_for - is_vouched(target, min_vouchers) — counts DISTINCT trusted vouchers, ignoring expired vouches and re-using QuorumConfig defaults - peer_is_listening_eligible(fp) — direct-trust OR vouched-in - is_quorum_met(signal_hash, k) — distinct listening-eligible peers reporting the same hash - quorum_evidence(signal_hash) — (peer_fp, received_at) tuples for UI - quorum_config / set_quorum_config — persisted in pulse_settings accept_vouch is paranoid: rejects expired vouches, vouchers that aren't currently "trusted" in our peers table, mismatched pubkey-fingerprint pairs, malformed base64, and Ed25519 verification failures — each with a short Err reason. Co-Authored-By: Claude Opus 4.7 --- src/psyc/lines/federation.py | 277 +++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py index bed46fb..5439068 100644 --- a/src/psyc/lines/federation.py +++ b/src/psyc/lines/federation.py @@ -24,6 +24,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519 from pydantic import BaseModel, Field from psyc import DATA_DIR, db, log +from psyc.lines import translog from psyc.result import Err, Ok, Result @@ -403,3 +404,279 @@ def set_peer_status(domain: str, status: str) -> None: def remove_peer(domain: str) -> None: db.remove_peer(domain) + + +# ---------- vouching + quorum (stage 4) --------------------------------- +# +# The web of trust: a peer's fingerprint becomes "listening-eligible" when +# either we directly trust it (peers.status == "trusted") or at least +# `trust_min_vouchers` of our trusted peers have signed a vouch for it. +# +# Signal-level quorum: a federation_signals row is meaningful only when +# `signal_quorum_k` distinct vouched peers have reported the same signal_hash. +# +# Vouches are short Pydantic records signed with the voucher's Ed25519 key +# over canonical JSON of the body (everything but the signature field). + + +class Vouch(BaseModel): + voucher_fingerprint: str + target_fingerprint: str + issued_at: datetime + expires_at: Optional[datetime] = None + signature: str = "" # base64 ed25519 sig over vouch_payload_bytes(...) + + +class QuorumConfig(BaseModel): + trust_min_vouchers: int = 2 + signal_quorum_k: int = 2 + + +_QC_TRUST_KEY = "wot_trust_min" +_QC_K_KEY = "wot_quorum_k" + + +def quorum_config() -> QuorumConfig: + """Live quorum settings, with sensible defaults if pulse_settings is empty.""" + cfg = QuorumConfig() + t = db.setting_get(_QC_TRUST_KEY) + k = db.setting_get(_QC_K_KEY) + if t is not None: + try: + cfg.trust_min_vouchers = max(1, int(t)) + except ValueError: + pass + if k is not None: + try: + cfg.signal_quorum_k = max(1, int(k)) + except ValueError: + pass + return cfg + + +def set_quorum_config(cfg: QuorumConfig) -> None: + """Persist quorum config into pulse_settings.""" + db.setting_set(_QC_TRUST_KEY, str(cfg.trust_min_vouchers)) + db.setting_set(_QC_K_KEY, str(cfg.signal_quorum_k)) + + +def vouch_payload_bytes( + voucher_fp: str, + target_fp: str, + issued_at: datetime, + expires_at: Optional[datetime], +) -> bytes: + """Canonical JSON of the unsigned vouch body — what the voucher signs.""" + body: Dict[str, Any] = { + "voucher_fingerprint": voucher_fp, + "target_fingerprint": target_fp, + "issued_at": issued_at.isoformat(), + "expires_at": expires_at.isoformat() if expires_at else None, + } + return canonical_json(body) + + +def _store_vouch(v: Vouch) -> None: + db.upsert_vouch(dict( + voucher_fingerprint=v.voucher_fingerprint, + target_fingerprint=v.target_fingerprint, + issued_at=v.issued_at.isoformat(), + expires_at=v.expires_at.isoformat() if v.expires_at else None, + signature=v.signature, + )) + + +def _row_to_vouch(row: Dict[str, Any]) -> Vouch: + return Vouch( + voucher_fingerprint=row["voucher_fingerprint"], + target_fingerprint=row["target_fingerprint"], + issued_at=datetime.fromisoformat(row["issued_at"]), + expires_at=datetime.fromisoformat(row["expires_at"]) if row.get("expires_at") else None, + signature=row.get("signature") or "", + ) + + +def issue_vouch(target_fingerprint: str, ttl_days: int = 90) -> Vouch: + """Sign a vouch for `target_fingerprint` with OUR key. Persists + returns it.""" + our_fp = node_fingerprint() + issued_at = datetime.now(timezone.utc) + expires_at = issued_at + timedelta(days=ttl_days) if ttl_days > 0 else None + payload = vouch_payload_bytes(our_fp, target_fingerprint, issued_at, expires_at) + sig = sign_payload(payload) + vouch = Vouch( + voucher_fingerprint=our_fp, + target_fingerprint=target_fingerprint, + issued_at=issued_at, + expires_at=expires_at, + signature=base64.b64encode(sig).decode("ascii"), + ) + _store_vouch(vouch) + try: + translog.append("vouch", { + "voucher_fingerprint": vouch.voucher_fingerprint, + "target_fingerprint": vouch.target_fingerprint, + "issued_at": vouch.issued_at.isoformat(), + "expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None, + }) + except Exception as exc: + _log.warning("federation.translog.append.fail", error=str(exc)) + _log.info("federation.vouch.issued", target=target_fingerprint, ttl_days=ttl_days) + return vouch + + +def accept_vouch(vouch: Vouch, voucher_pubkey_pem: str) -> Result[None, str]: + """Verify signature + expiry + voucher trust status, then persist. + + Failure modes return Err with a short reason so the caller can log them. + A voucher whose status is not "trusted" in our peers table is refused — + we don't accept transitive vouches from unknown peers. + """ + # Expiry first — cheapest check. + now = datetime.now(timezone.utc) + if vouch.expires_at is not None and vouch.expires_at < now: + return Err("vouch expired") + + # Voucher must be a directly-trusted peer (no transitive trust at this layer). + voucher_status = None + for row in db.list_peers(): + if row.get("fingerprint") == vouch.voucher_fingerprint: + voucher_status = row.get("status") + break + if voucher_status != "trusted": + return Err(f"voucher not trusted: {vouch.voucher_fingerprint}") + + # The pubkey must match the declared voucher fingerprint. + try: + if _fingerprint_for_pubkey_pem(voucher_pubkey_pem) != vouch.voucher_fingerprint: + return Err("voucher pubkey does not match fingerprint") + except Exception as exc: + return Err(f"bad voucher pubkey: {exc}") + + payload = vouch_payload_bytes( + vouch.voucher_fingerprint, + vouch.target_fingerprint, + vouch.issued_at, + vouch.expires_at, + ) + try: + signature = base64.b64decode(vouch.signature) + except Exception: + return Err("vouch signature not base64") + if not verify_payload(payload, signature, voucher_pubkey_pem): + return Err("vouch signature invalid") + + _store_vouch(vouch) + try: + translog.append("vouch", { + "voucher_fingerprint": vouch.voucher_fingerprint, + "target_fingerprint": vouch.target_fingerprint, + "issued_at": vouch.issued_at.isoformat(), + "expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None, + "accepted": True, + }) + except Exception as exc: + _log.warning("federation.translog.append.fail", error=str(exc)) + _log.info("federation.vouch.accepted", voucher=vouch.voucher_fingerprint, target=vouch.target_fingerprint) + return Ok(None) + + +def revoke_vouch(target_fingerprint: str) -> None: + """Delete OUR vouch naming `target_fingerprint`. No-op if absent.""" + db.delete_vouch(node_fingerprint(), target_fingerprint) + _log.info("federation.vouch.revoked", target=target_fingerprint) + + +def our_vouches() -> List[Vouch]: + """Vouches we have issued (filter for voucher_fingerprint == our fp).""" + return [_row_to_vouch(r) for r in db.vouches_by_voucher(node_fingerprint())] + + +def vouches_for(target_fingerprint: str) -> List[Vouch]: + """Every vouch stored locally that names `target_fingerprint` as target.""" + return [_row_to_vouch(r) for r in db.vouches_by_target(target_fingerprint)] + + +def is_vouched(target_fingerprint: str, min_vouchers: Optional[int] = None) -> bool: + """True iff ≥`min_vouchers` distinct non-expired vouches from currently-trusted + peers name `target_fingerprint`. + """ + cfg = quorum_config() + threshold = min_vouchers if min_vouchers is not None else cfg.trust_min_vouchers + if threshold <= 0: + return True + now = datetime.now(timezone.utc) + trusted_fps = {p.fingerprint for p in list_peers() if p.status == "trusted"} + distinct_vouchers: set = set() + for v in vouches_for(target_fingerprint): + if v.expires_at is not None and v.expires_at < now: + continue + if v.voucher_fingerprint not in trusted_fps: + continue + distinct_vouchers.add(v.voucher_fingerprint) + if len(distinct_vouchers) >= threshold: + return True + return False + + +def peer_is_listening_eligible(fingerprint: str) -> bool: + """True iff the peer is directly trusted OR vouched into trust. + + This is the gate used by `import_signed_feed`. Auto-response will share + this signature — keep it stable. + """ + if not fingerprint: + return False + for p in list_peers(): + if p.fingerprint == fingerprint: + if p.status == "trusted": + return True + if p.status == "blocked": + return False + break + return is_vouched(fingerprint) + + +def is_quorum_met(signal_hash: str, k: Optional[int] = None) -> bool: + """True iff ≥k distinct vouched peers have reported `signal_hash`. + + "Vouched" here means `peer_is_listening_eligible` — the same web-of-trust + set the import gate respects. Self-reports from the local node do not + count (they never end up in federation_signals). + """ + cfg = quorum_config() + threshold = k if k is not None else cfg.signal_quorum_k + if threshold <= 0: + return True + rows = db.signals_for_hash(signal_hash) + distinct: set = set() + for r in rows: + fp = r.get("peer_fingerprint") or "" + if not fp or fp in distinct: + continue + if not peer_is_listening_eligible(fp): + continue + distinct.add(fp) + if len(distinct) >= threshold: + return True + return False + + +def quorum_evidence(signal_hash: str) -> List[Tuple[str, datetime]]: + """(peer_fingerprint, received_at) tuples for one signal_hash — for UI display. + + Only includes signals from currently listening-eligible peers, deduped + per fingerprint at the earliest receipt. + """ + rows = db.signals_for_hash(signal_hash) + earliest: Dict[str, datetime] = {} + for r in rows: + fp = r.get("peer_fingerprint") or "" + if not fp or not peer_is_listening_eligible(fp): + continue + try: + ts = datetime.fromisoformat(r.get("received_at") or "") + except ValueError: + continue + if fp not in earliest or ts < earliest[fp]: + earliest[fp] = ts + return sorted(earliest.items(), key=lambda kv: kv[1])