stage-vouch-b federation: vouch sign/verify + quorum API

Add the web-of-trust primitives, all keyed off the existing node keypair:

- Vouch + QuorumConfig Pydantic models
- vouch_payload_bytes — canonical-JSON body that the voucher signs
- issue_vouch / accept_vouch / revoke_vouch / our_vouches / vouches_for
- is_vouched(target, min_vouchers) — counts DISTINCT trusted vouchers,
  ignoring expired vouches and re-using QuorumConfig defaults
- peer_is_listening_eligible(fp) — direct-trust OR vouched-in
- is_quorum_met(signal_hash, k) — distinct listening-eligible peers
  reporting the same hash
- quorum_evidence(signal_hash) — (peer_fp, received_at) tuples for UI
- quorum_config / set_quorum_config — persisted in pulse_settings

accept_vouch is paranoid: rejects expired vouches, vouchers that aren't
currently "trusted" in our peers table, mismatched pubkey-fingerprint
pairs, malformed base64, and Ed25519 verification failures — each with
a short Err reason.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-06-06 21:10:03 +02:00
parent 7a510c7acf
commit 234e6d98ba

View File

@@ -24,6 +24,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519
from pydantic import BaseModel, Field
from psyc import DATA_DIR, db, log
from psyc.lines import translog
from psyc.result import Err, Ok, Result
@@ -403,3 +404,279 @@ def set_peer_status(domain: str, status: str) -> None:
def remove_peer(domain: str) -> None:
db.remove_peer(domain)
# ---------- vouching + quorum (stage 4) ---------------------------------
#
# The web of trust: a peer's fingerprint becomes "listening-eligible" when
# either we directly trust it (peers.status == "trusted") or at least
# `trust_min_vouchers` of our trusted peers have signed a vouch for it.
#
# Signal-level quorum: a federation_signals row is meaningful only when
# `signal_quorum_k` distinct vouched peers have reported the same signal_hash.
#
# Vouches are short Pydantic records signed with the voucher's Ed25519 key
# over canonical JSON of the body (everything but the signature field).
class Vouch(BaseModel):
voucher_fingerprint: str
target_fingerprint: str
issued_at: datetime
expires_at: Optional[datetime] = None
signature: str = "" # base64 ed25519 sig over vouch_payload_bytes(...)
class QuorumConfig(BaseModel):
trust_min_vouchers: int = 2
signal_quorum_k: int = 2
_QC_TRUST_KEY = "wot_trust_min"
_QC_K_KEY = "wot_quorum_k"
def quorum_config() -> QuorumConfig:
"""Live quorum settings, with sensible defaults if pulse_settings is empty."""
cfg = QuorumConfig()
t = db.setting_get(_QC_TRUST_KEY)
k = db.setting_get(_QC_K_KEY)
if t is not None:
try:
cfg.trust_min_vouchers = max(1, int(t))
except ValueError:
pass
if k is not None:
try:
cfg.signal_quorum_k = max(1, int(k))
except ValueError:
pass
return cfg
def set_quorum_config(cfg: QuorumConfig) -> None:
"""Persist quorum config into pulse_settings."""
db.setting_set(_QC_TRUST_KEY, str(cfg.trust_min_vouchers))
db.setting_set(_QC_K_KEY, str(cfg.signal_quorum_k))
def vouch_payload_bytes(
voucher_fp: str,
target_fp: str,
issued_at: datetime,
expires_at: Optional[datetime],
) -> bytes:
"""Canonical JSON of the unsigned vouch body — what the voucher signs."""
body: Dict[str, Any] = {
"voucher_fingerprint": voucher_fp,
"target_fingerprint": target_fp,
"issued_at": issued_at.isoformat(),
"expires_at": expires_at.isoformat() if expires_at else None,
}
return canonical_json(body)
def _store_vouch(v: Vouch) -> None:
db.upsert_vouch(dict(
voucher_fingerprint=v.voucher_fingerprint,
target_fingerprint=v.target_fingerprint,
issued_at=v.issued_at.isoformat(),
expires_at=v.expires_at.isoformat() if v.expires_at else None,
signature=v.signature,
))
def _row_to_vouch(row: Dict[str, Any]) -> Vouch:
return Vouch(
voucher_fingerprint=row["voucher_fingerprint"],
target_fingerprint=row["target_fingerprint"],
issued_at=datetime.fromisoformat(row["issued_at"]),
expires_at=datetime.fromisoformat(row["expires_at"]) if row.get("expires_at") else None,
signature=row.get("signature") or "",
)
def issue_vouch(target_fingerprint: str, ttl_days: int = 90) -> Vouch:
"""Sign a vouch for `target_fingerprint` with OUR key. Persists + returns it."""
our_fp = node_fingerprint()
issued_at = datetime.now(timezone.utc)
expires_at = issued_at + timedelta(days=ttl_days) if ttl_days > 0 else None
payload = vouch_payload_bytes(our_fp, target_fingerprint, issued_at, expires_at)
sig = sign_payload(payload)
vouch = Vouch(
voucher_fingerprint=our_fp,
target_fingerprint=target_fingerprint,
issued_at=issued_at,
expires_at=expires_at,
signature=base64.b64encode(sig).decode("ascii"),
)
_store_vouch(vouch)
try:
translog.append("vouch", {
"voucher_fingerprint": vouch.voucher_fingerprint,
"target_fingerprint": vouch.target_fingerprint,
"issued_at": vouch.issued_at.isoformat(),
"expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
_log.info("federation.vouch.issued", target=target_fingerprint, ttl_days=ttl_days)
return vouch
def accept_vouch(vouch: Vouch, voucher_pubkey_pem: str) -> Result[None, str]:
"""Verify signature + expiry + voucher trust status, then persist.
Failure modes return Err with a short reason so the caller can log them.
A voucher whose status is not "trusted" in our peers table is refused —
we don't accept transitive vouches from unknown peers.
"""
# Expiry first — cheapest check.
now = datetime.now(timezone.utc)
if vouch.expires_at is not None and vouch.expires_at < now:
return Err("vouch expired")
# Voucher must be a directly-trusted peer (no transitive trust at this layer).
voucher_status = None
for row in db.list_peers():
if row.get("fingerprint") == vouch.voucher_fingerprint:
voucher_status = row.get("status")
break
if voucher_status != "trusted":
return Err(f"voucher not trusted: {vouch.voucher_fingerprint}")
# The pubkey must match the declared voucher fingerprint.
try:
if _fingerprint_for_pubkey_pem(voucher_pubkey_pem) != vouch.voucher_fingerprint:
return Err("voucher pubkey does not match fingerprint")
except Exception as exc:
return Err(f"bad voucher pubkey: {exc}")
payload = vouch_payload_bytes(
vouch.voucher_fingerprint,
vouch.target_fingerprint,
vouch.issued_at,
vouch.expires_at,
)
try:
signature = base64.b64decode(vouch.signature)
except Exception:
return Err("vouch signature not base64")
if not verify_payload(payload, signature, voucher_pubkey_pem):
return Err("vouch signature invalid")
_store_vouch(vouch)
try:
translog.append("vouch", {
"voucher_fingerprint": vouch.voucher_fingerprint,
"target_fingerprint": vouch.target_fingerprint,
"issued_at": vouch.issued_at.isoformat(),
"expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
"accepted": True,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
_log.info("federation.vouch.accepted", voucher=vouch.voucher_fingerprint, target=vouch.target_fingerprint)
return Ok(None)
def revoke_vouch(target_fingerprint: str) -> None:
"""Delete OUR vouch naming `target_fingerprint`. No-op if absent."""
db.delete_vouch(node_fingerprint(), target_fingerprint)
_log.info("federation.vouch.revoked", target=target_fingerprint)
def our_vouches() -> List[Vouch]:
"""Vouches we have issued (filter for voucher_fingerprint == our fp)."""
return [_row_to_vouch(r) for r in db.vouches_by_voucher(node_fingerprint())]
def vouches_for(target_fingerprint: str) -> List[Vouch]:
"""Every vouch stored locally that names `target_fingerprint` as target."""
return [_row_to_vouch(r) for r in db.vouches_by_target(target_fingerprint)]
def is_vouched(target_fingerprint: str, min_vouchers: Optional[int] = None) -> bool:
"""True iff ≥`min_vouchers` distinct non-expired vouches from currently-trusted
peers name `target_fingerprint`.
"""
cfg = quorum_config()
threshold = min_vouchers if min_vouchers is not None else cfg.trust_min_vouchers
if threshold <= 0:
return True
now = datetime.now(timezone.utc)
trusted_fps = {p.fingerprint for p in list_peers() if p.status == "trusted"}
distinct_vouchers: set = set()
for v in vouches_for(target_fingerprint):
if v.expires_at is not None and v.expires_at < now:
continue
if v.voucher_fingerprint not in trusted_fps:
continue
distinct_vouchers.add(v.voucher_fingerprint)
if len(distinct_vouchers) >= threshold:
return True
return False
def peer_is_listening_eligible(fingerprint: str) -> bool:
"""True iff the peer is directly trusted OR vouched into trust.
This is the gate used by `import_signed_feed`. Auto-response will share
this signature — keep it stable.
"""
if not fingerprint:
return False
for p in list_peers():
if p.fingerprint == fingerprint:
if p.status == "trusted":
return True
if p.status == "blocked":
return False
break
return is_vouched(fingerprint)
def is_quorum_met(signal_hash: str, k: Optional[int] = None) -> bool:
"""True iff ≥k distinct vouched peers have reported `signal_hash`.
"Vouched" here means `peer_is_listening_eligible` — the same web-of-trust
set the import gate respects. Self-reports from the local node do not
count (they never end up in federation_signals).
"""
cfg = quorum_config()
threshold = k if k is not None else cfg.signal_quorum_k
if threshold <= 0:
return True
rows = db.signals_for_hash(signal_hash)
distinct: set = set()
for r in rows:
fp = r.get("peer_fingerprint") or ""
if not fp or fp in distinct:
continue
if not peer_is_listening_eligible(fp):
continue
distinct.add(fp)
if len(distinct) >= threshold:
return True
return False
def quorum_evidence(signal_hash: str) -> List[Tuple[str, datetime]]:
"""(peer_fingerprint, received_at) tuples for one signal_hash — for UI display.
Only includes signals from currently listening-eligible peers, deduped
per fingerprint at the earliest receipt.
"""
rows = db.signals_for_hash(signal_hash)
earliest: Dict[str, datetime] = {}
for r in rows:
fp = r.get("peer_fingerprint") or ""
if not fp or not peer_is_listening_eligible(fp):
continue
try:
ts = datetime.fromisoformat(r.get("received_at") or "")
except ValueError:
continue
if fp not in earliest or ts < earliest[fp]:
earliest[fp] = ts
return sorted(earliest.items(), key=lambda kv: kv[1])