From 4a9f6ceb7f10343a8cc9fe9602833d78018ec85a Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:09:25 +0200 Subject: [PATCH 1/6] stage-vouch-a federation: vouches table + DB helpers Add `vouches` (voucher_fp, target_fp, issued_at, expires_at, signature) with unique (voucher, target) and target index, plus `translog` for the append-only signed merkle chain (id, prev_hash, entry_type, entry_data, timestamp, entry_hash). Also surface `setting_get` / `setting_set` helpers on pulse_settings so the quorum config has a place to live. Co-Authored-By: Claude Opus 4.7 --- src/psyc/db.py | 139 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/src/psyc/db.py b/src/psyc/db.py index 5d19acb..72f1c16 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -161,6 +161,35 @@ Index("federation_signals_hash_idx", federation_signals.c.signal_hash) Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint) Index("federation_signals_received_idx", federation_signals.c.received_at.desc()) +# Web-of-trust vouches — voucher signs an attestation that target is OK to listen to. +# Quorum is reached when enough distinct trusted vouchers sign for the same target. +vouches = Table( + "vouches", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("voucher_fingerprint", String, nullable=False), + Column("target_fingerprint", String, nullable=False), + Column("issued_at", String, nullable=False), + Column("expires_at", String, nullable=True), + Column("signature", Text, nullable=False), # base64 ed25519 sig +) +Index("vouches_unique_idx", vouches.c.voucher_fingerprint, vouches.c.target_fingerprint, unique=True) +Index("vouches_target_idx", vouches.c.target_fingerprint) + +# Transparency log — append-only signed hash chain over every signal we receive. +# Each entry references the previous entry's hash; tampering with any row breaks +# verify_chain on every subsequent row. +translog = Table( + "translog", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("prev_hash", String, nullable=False), + Column("entry_type", String, nullable=False), # signal | vouch | config + Column("entry_data", Text, nullable=False), # canonical JSON of payload + Column("timestamp", String, nullable=False), + Column("entry_hash", String, nullable=False), +) +Index("translog_hash_idx", translog.c.entry_hash) +Index("translog_time_idx", translog.c.timestamp.desc()) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -369,3 +398,113 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]: stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) with engine(db_path).connect() as conn: return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +# ---------- federation: pulse_settings get/set (shared scratch kv) ------- + +def setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]: + """Read one pulse_settings value by key. Returns None if absent.""" + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return None if row is None else str(row.value) + + +def setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: + """Upsert one pulse_settings entry.""" + stmt = sqlite_insert(pulse_settings).values(key=key, value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +# ---------- federation: vouches ------------------------------------------ + +def upsert_vouch(row: dict, db_path: Path = DB_PATH) -> None: + """Insert-or-update one vouch. Unique on (voucher_fp, target_fp).""" + stmt = sqlite_insert(vouches).values(**row) + update_cols = {k: stmt.excluded[k] for k in row if k not in ("voucher_fingerprint", "target_fingerprint")} + stmt = stmt.on_conflict_do_update( + index_elements=[vouches.c.voucher_fingerprint, vouches.c.target_fingerprint], + set_=update_cols, + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def list_vouches(db_path: Path = DB_PATH) -> List[dict]: + stmt = select(vouches).order_by(vouches.c.issued_at.desc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def vouches_by_target(target_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(vouches).where(vouches.c.target_fingerprint == target_fingerprint) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def vouches_by_voucher(voucher_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(vouches).where(vouches.c.voucher_fingerprint == voucher_fingerprint) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def delete_vouch(voucher_fingerprint: str, target_fingerprint: str, db_path: Path = DB_PATH) -> None: + stmt = vouches.delete().where( + (vouches.c.voucher_fingerprint == voucher_fingerprint) + & (vouches.c.target_fingerprint == target_fingerprint) + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +# ---------- transparency log --------------------------------------------- + +def translog_append(row: dict, db_path: Path = DB_PATH) -> int: + """Append one transparency-log entry. Returns inserted id.""" + stmt = insert(translog).values(**row) + with engine(db_path).begin() as conn: + res = conn.execute(stmt) + return int(res.inserted_primary_key[0]) + + +def translog_head(db_path: Path = DB_PATH) -> Optional[dict]: + """Highest-id (latest) entry, or None if chain empty.""" + stmt = select(translog).order_by(translog.c.id.desc()).limit(1) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return dict(row._mapping) if row else None + + +def translog_get(entry_id: int, db_path: Path = DB_PATH) -> Optional[dict]: + stmt = select(translog).where(translog.c.id == entry_id) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return dict(row._mapping) if row else None + + +def translog_after(entry_id: int, db_path: Path = DB_PATH) -> List[dict]: + """All entries with id > entry_id, oldest first — for sync.""" + stmt = select(translog).where(translog.c.id > entry_id).order_by(translog.c.id.asc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def translog_recent(limit: int = 100, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(translog).order_by(translog.c.id.desc()).limit(limit) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB_PATH) -> List[dict]: + """All entries with start <= id (and id <= end if given), oldest first.""" + cond = translog.c.id >= start + if end is not None: + cond = cond & (translog.c.id <= end) + stmt = select(translog).where(cond).order_by(translog.c.id.asc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] From 7a510c7acfaf4080ffd46ae5c1a92c2034ef3905 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:09:32 +0200 Subject: [PATCH 2/6] stage-trans-a translog: append-only signed merkle chain + tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit translog.append computes sha256(canonical({prev_hash, entry_type, entry_data, timestamp})) and writes one row per call; the first entry uses prev_hash = "0"*64. verify_chain walks rows in id order, re-hashes each, and returns Err("broken at id=X expected=... got=...") on the first mismatch — so tampering with either entry_data or prev_hash invalidates every downstream row. recent / entries_after / head support peer sync and UI. Tests cover: genesis prev_hash, chained prev_hash, full-chain verify, tampered-data detection, tampered-prev_hash detection, slicing. Co-Authored-By: Claude Opus 4.7 --- src/psyc/lines/translog.py | 161 +++++++++++++++++++++++++++++++++++++ tests/test_translog.py | 118 +++++++++++++++++++++++++++ 2 files changed, 279 insertions(+) create mode 100644 src/psyc/lines/translog.py create mode 100644 tests/test_translog.py diff --git a/src/psyc/lines/translog.py b/src/psyc/lines/translog.py new file mode 100644 index 0000000..b4a1859 --- /dev/null +++ b/src/psyc/lines/translog.py @@ -0,0 +1,161 @@ +"""Transparency log — append-only signed merkle chain over federation signals. + +Every signal we receive from a peer (case, IOC, or accepted vouch) is appended +as one `LogEntry`. Each entry's `entry_hash = sha256(canonical(prev_hash + +entry_type + entry_data + timestamp))` references the previous head, so any +tampering with a historical row invalidates every subsequent hash. The chain +is public — auditors can re-fetch it and re-run `verify_chain` to detect a +node that quietly mutated history (e.g. to hide a bad signal it accepted). + +Hash format: lowercase hex SHA-256 of the canonical JSON of +``{"prev_hash": "...", "entry_type": "...", "entry_data": {...}, "timestamp": "..."}``. +Genesis entries use ``prev_hash = "0" * 64``. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from psyc import db, log +from psyc.result import Err, Ok, Result + + +_log = log.get(__name__) + +GENESIS_PREV_HASH = "0" * 64 + + +class LogEntry(BaseModel): + id: int + prev_hash: str + entry_type: str + entry_data: Dict[str, Any] = Field(default_factory=dict) + timestamp: str + entry_hash: str + + +def _canonical_json(obj: Dict[str, Any]) -> bytes: + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def compute_entry_hash(prev_hash: str, entry_type: str, entry_data: Dict[str, Any], timestamp: str) -> str: + """Hex SHA-256 of canonical(prev_hash + entry_type + entry_data + timestamp).""" + payload: Dict[str, Any] = { + "prev_hash": prev_hash, + "entry_type": entry_type, + "entry_data": entry_data, + "timestamp": timestamp, + } + return hashlib.sha256(_canonical_json(payload)).hexdigest() + + +def _row_to_entry(row: Dict[str, Any]) -> LogEntry: + raw = row.get("entry_data") or "{}" + try: + data = json.loads(raw) + except Exception: + data = {} + return LogEntry( + id=int(row["id"]), + prev_hash=str(row["prev_hash"]), + entry_type=str(row["entry_type"]), + entry_data=data if isinstance(data, dict) else {}, + timestamp=str(row["timestamp"]), + entry_hash=str(row["entry_hash"]), + ) + + +def head(db_path: Path = db.DB_PATH) -> Optional[LogEntry]: + """Latest log entry, or None if the chain is empty.""" + row = db.translog_head(db_path=db_path) + return _row_to_entry(row) if row else None + + +def append(entry_type: str, entry_data: Dict[str, Any], db_path: Path = db.DB_PATH) -> LogEntry: + """Atomically append one entry to the chain. Returns the persisted entry.""" + prev = db.translog_head(db_path=db_path) + prev_hash = str(prev["entry_hash"]) if prev else GENESIS_PREV_HASH + timestamp = datetime.now(timezone.utc).isoformat() + entry_hash = compute_entry_hash(prev_hash, entry_type, entry_data, timestamp) + new_id = db.translog_append( + dict( + prev_hash=prev_hash, + entry_type=entry_type, + entry_data=json.dumps(entry_data, sort_keys=True), + timestamp=timestamp, + entry_hash=entry_hash, + ), + db_path=db_path, + ) + _log.info("translog.append", id=new_id, entry_type=entry_type, hash=entry_hash[:12]) + return LogEntry( + id=new_id, + prev_hash=prev_hash, + entry_type=entry_type, + entry_data=entry_data, + timestamp=timestamp, + entry_hash=entry_hash, + ) + + +def verify_chain(start: int = 0, end: Optional[int] = None, db_path: Path = db.DB_PATH) -> Result[int, str]: + """Walk entries [start, end] in id order, recompute each hash, compare. + + Returns Ok(n_verified) when every entry's recomputed hash equals the + stored one and each prev_hash matches the previous entry's stored hash. + Returns Err with the offending id + expected/got hashes otherwise. + """ + rows = db.translog_range(start=start, end=end, db_path=db_path) + if not rows: + return Ok(0) + # Establish the prior hash anchor — either genesis (if walking from id=1) + # or the entry just before `start`. + first_id = int(rows[0]["id"]) + if first_id <= 1: + prior_hash = GENESIS_PREV_HASH + else: + anchor = db.translog_get(first_id - 1, db_path=db_path) + if anchor is None: + return Err(f"missing anchor entry id={first_id - 1}") + prior_hash = str(anchor["entry_hash"]) + + verified = 0 + for row in rows: + stored_prev = str(row["prev_hash"]) + if stored_prev != prior_hash: + return Err( + f"broken at id={row['id']} expected_prev={prior_hash} got_prev={stored_prev}" + ) + try: + data = json.loads(row.get("entry_data") or "{}") + except Exception: + return Err(f"broken at id={row['id']} entry_data not JSON") + if not isinstance(data, dict): + return Err(f"broken at id={row['id']} entry_data not an object") + recomputed = compute_entry_hash( + stored_prev, str(row["entry_type"]), data, str(row["timestamp"]) + ) + stored_hash = str(row["entry_hash"]) + if recomputed != stored_hash: + return Err( + f"broken at id={row['id']} expected={recomputed} got={stored_hash}" + ) + prior_hash = stored_hash + verified += 1 + return Ok(verified) + + +def recent(limit: int = 100, db_path: Path = db.DB_PATH) -> List[LogEntry]: + """The latest `limit` entries, newest first.""" + return [_row_to_entry(r) for r in db.translog_recent(limit=limit, db_path=db_path)] + + +def entries_after(entry_id: int, db_path: Path = db.DB_PATH) -> List[LogEntry]: + """All entries with id > entry_id, oldest first — for peer sync.""" + return [_row_to_entry(r) for r in db.translog_after(entry_id, db_path=db_path)] diff --git a/tests/test_translog.py b/tests/test_translog.py new file mode 100644 index 0000000..da50cb0 --- /dev/null +++ b/tests/test_translog.py @@ -0,0 +1,118 @@ +"""Transparency log — append, verify, tamper detection, sync slices.""" + +from __future__ import annotations + +import json + +import pytest +from sqlalchemy import create_engine, update + +from psyc import db +from psyc.lines import translog +from psyc.lines.translog import GENESIS_PREV_HASH +from psyc.result import Err, Ok + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +def test_first_append_uses_genesis_prev_hash(fresh_db): + e = translog.append("signal", {"x": 1}) + assert e.prev_hash == GENESIS_PREV_HASH + assert e.id >= 1 + assert e.entry_type == "signal" + assert e.entry_data == {"x": 1} + # head matches + h = translog.head() + assert h is not None + assert h.id == e.id + assert h.entry_hash == e.entry_hash + + +def test_append_chains_prev_hash(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + e3 = translog.append("vouch", {"c": 3}) + assert e2.prev_hash == e1.entry_hash + assert e3.prev_hash == e2.entry_hash + head = translog.head() + assert head is not None + assert head.entry_hash == e3.entry_hash + + +def test_verify_chain_ok_round_trip(fresh_db): + translog.append("signal", {"a": 1}) + translog.append("signal", {"b": 2}) + translog.append("vouch", {"c": 3}) + result = translog.verify_chain() + assert isinstance(result, Ok) + assert result.value == 3 + + +def test_verify_chain_empty_returns_zero(fresh_db): + result = translog.verify_chain() + assert isinstance(result, Ok) + assert result.value == 0 + + +def test_verify_chain_detects_tampered_data(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + + # Mutate entry_data of the first row directly in the DB; entry_hash stays + # the same but no longer matches the recomputed hash. + with db.engine().begin() as conn: + conn.execute( + update(db.translog) + .where(db.translog.c.id == e1.id) + .values(entry_data=json.dumps({"a": 999}, sort_keys=True)) + ) + + result = translog.verify_chain() + assert isinstance(result, Err) + assert "broken at id=" in result.reason + + +def test_verify_chain_detects_tampered_prev_hash(fresh_db): + translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + # Flip e2.prev_hash so it no longer matches e1.entry_hash. + with db.engine().begin() as conn: + conn.execute( + update(db.translog) + .where(db.translog.c.id == e2.id) + .values(prev_hash="f" * 64) + ) + result = translog.verify_chain() + assert isinstance(result, Err) + assert "broken at id=" in result.reason + + +def test_entries_after_returns_correct_slice(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + e3 = translog.append("signal", {"c": 3}) + + after_zero = translog.entries_after(0) + assert [e.id for e in after_zero] == [e1.id, e2.id, e3.id] + + after_e1 = translog.entries_after(e1.id) + assert [e.id for e in after_e1] == [e2.id, e3.id] + + after_e3 = translog.entries_after(e3.id) + assert after_e3 == [] + + +def test_recent_newest_first(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + e3 = translog.append("signal", {"c": 3}) + recent = translog.recent(limit=10) + assert [e.id for e in recent] == [e3.id, e2.id, e1.id] From 234e6d98ba5ba3220cb2d22cb20c8fe49f4cbc9c Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:10:03 +0200 Subject: [PATCH 3/6] stage-vouch-b federation: vouch sign/verify + quorum API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the web-of-trust primitives, all keyed off the existing node keypair: - Vouch + QuorumConfig Pydantic models - vouch_payload_bytes — canonical-JSON body that the voucher signs - issue_vouch / accept_vouch / revoke_vouch / our_vouches / vouches_for - is_vouched(target, min_vouchers) — counts DISTINCT trusted vouchers, ignoring expired vouches and re-using QuorumConfig defaults - peer_is_listening_eligible(fp) — direct-trust OR vouched-in - is_quorum_met(signal_hash, k) — distinct listening-eligible peers reporting the same hash - quorum_evidence(signal_hash) — (peer_fp, received_at) tuples for UI - quorum_config / set_quorum_config — persisted in pulse_settings accept_vouch is paranoid: rejects expired vouches, vouchers that aren't currently "trusted" in our peers table, mismatched pubkey-fingerprint pairs, malformed base64, and Ed25519 verification failures — each with a short Err reason. Co-Authored-By: Claude Opus 4.7 --- src/psyc/lines/federation.py | 277 +++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py index bed46fb..5439068 100644 --- a/src/psyc/lines/federation.py +++ b/src/psyc/lines/federation.py @@ -24,6 +24,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519 from pydantic import BaseModel, Field from psyc import DATA_DIR, db, log +from psyc.lines import translog from psyc.result import Err, Ok, Result @@ -403,3 +404,279 @@ def set_peer_status(domain: str, status: str) -> None: def remove_peer(domain: str) -> None: db.remove_peer(domain) + + +# ---------- vouching + quorum (stage 4) --------------------------------- +# +# The web of trust: a peer's fingerprint becomes "listening-eligible" when +# either we directly trust it (peers.status == "trusted") or at least +# `trust_min_vouchers` of our trusted peers have signed a vouch for it. +# +# Signal-level quorum: a federation_signals row is meaningful only when +# `signal_quorum_k` distinct vouched peers have reported the same signal_hash. +# +# Vouches are short Pydantic records signed with the voucher's Ed25519 key +# over canonical JSON of the body (everything but the signature field). + + +class Vouch(BaseModel): + voucher_fingerprint: str + target_fingerprint: str + issued_at: datetime + expires_at: Optional[datetime] = None + signature: str = "" # base64 ed25519 sig over vouch_payload_bytes(...) + + +class QuorumConfig(BaseModel): + trust_min_vouchers: int = 2 + signal_quorum_k: int = 2 + + +_QC_TRUST_KEY = "wot_trust_min" +_QC_K_KEY = "wot_quorum_k" + + +def quorum_config() -> QuorumConfig: + """Live quorum settings, with sensible defaults if pulse_settings is empty.""" + cfg = QuorumConfig() + t = db.setting_get(_QC_TRUST_KEY) + k = db.setting_get(_QC_K_KEY) + if t is not None: + try: + cfg.trust_min_vouchers = max(1, int(t)) + except ValueError: + pass + if k is not None: + try: + cfg.signal_quorum_k = max(1, int(k)) + except ValueError: + pass + return cfg + + +def set_quorum_config(cfg: QuorumConfig) -> None: + """Persist quorum config into pulse_settings.""" + db.setting_set(_QC_TRUST_KEY, str(cfg.trust_min_vouchers)) + db.setting_set(_QC_K_KEY, str(cfg.signal_quorum_k)) + + +def vouch_payload_bytes( + voucher_fp: str, + target_fp: str, + issued_at: datetime, + expires_at: Optional[datetime], +) -> bytes: + """Canonical JSON of the unsigned vouch body — what the voucher signs.""" + body: Dict[str, Any] = { + "voucher_fingerprint": voucher_fp, + "target_fingerprint": target_fp, + "issued_at": issued_at.isoformat(), + "expires_at": expires_at.isoformat() if expires_at else None, + } + return canonical_json(body) + + +def _store_vouch(v: Vouch) -> None: + db.upsert_vouch(dict( + voucher_fingerprint=v.voucher_fingerprint, + target_fingerprint=v.target_fingerprint, + issued_at=v.issued_at.isoformat(), + expires_at=v.expires_at.isoformat() if v.expires_at else None, + signature=v.signature, + )) + + +def _row_to_vouch(row: Dict[str, Any]) -> Vouch: + return Vouch( + voucher_fingerprint=row["voucher_fingerprint"], + target_fingerprint=row["target_fingerprint"], + issued_at=datetime.fromisoformat(row["issued_at"]), + expires_at=datetime.fromisoformat(row["expires_at"]) if row.get("expires_at") else None, + signature=row.get("signature") or "", + ) + + +def issue_vouch(target_fingerprint: str, ttl_days: int = 90) -> Vouch: + """Sign a vouch for `target_fingerprint` with OUR key. Persists + returns it.""" + our_fp = node_fingerprint() + issued_at = datetime.now(timezone.utc) + expires_at = issued_at + timedelta(days=ttl_days) if ttl_days > 0 else None + payload = vouch_payload_bytes(our_fp, target_fingerprint, issued_at, expires_at) + sig = sign_payload(payload) + vouch = Vouch( + voucher_fingerprint=our_fp, + target_fingerprint=target_fingerprint, + issued_at=issued_at, + expires_at=expires_at, + signature=base64.b64encode(sig).decode("ascii"), + ) + _store_vouch(vouch) + try: + translog.append("vouch", { + "voucher_fingerprint": vouch.voucher_fingerprint, + "target_fingerprint": vouch.target_fingerprint, + "issued_at": vouch.issued_at.isoformat(), + "expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None, + }) + except Exception as exc: + _log.warning("federation.translog.append.fail", error=str(exc)) + _log.info("federation.vouch.issued", target=target_fingerprint, ttl_days=ttl_days) + return vouch + + +def accept_vouch(vouch: Vouch, voucher_pubkey_pem: str) -> Result[None, str]: + """Verify signature + expiry + voucher trust status, then persist. + + Failure modes return Err with a short reason so the caller can log them. + A voucher whose status is not "trusted" in our peers table is refused — + we don't accept transitive vouches from unknown peers. + """ + # Expiry first — cheapest check. + now = datetime.now(timezone.utc) + if vouch.expires_at is not None and vouch.expires_at < now: + return Err("vouch expired") + + # Voucher must be a directly-trusted peer (no transitive trust at this layer). + voucher_status = None + for row in db.list_peers(): + if row.get("fingerprint") == vouch.voucher_fingerprint: + voucher_status = row.get("status") + break + if voucher_status != "trusted": + return Err(f"voucher not trusted: {vouch.voucher_fingerprint}") + + # The pubkey must match the declared voucher fingerprint. + try: + if _fingerprint_for_pubkey_pem(voucher_pubkey_pem) != vouch.voucher_fingerprint: + return Err("voucher pubkey does not match fingerprint") + except Exception as exc: + return Err(f"bad voucher pubkey: {exc}") + + payload = vouch_payload_bytes( + vouch.voucher_fingerprint, + vouch.target_fingerprint, + vouch.issued_at, + vouch.expires_at, + ) + try: + signature = base64.b64decode(vouch.signature) + except Exception: + return Err("vouch signature not base64") + if not verify_payload(payload, signature, voucher_pubkey_pem): + return Err("vouch signature invalid") + + _store_vouch(vouch) + try: + translog.append("vouch", { + "voucher_fingerprint": vouch.voucher_fingerprint, + "target_fingerprint": vouch.target_fingerprint, + "issued_at": vouch.issued_at.isoformat(), + "expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None, + "accepted": True, + }) + except Exception as exc: + _log.warning("federation.translog.append.fail", error=str(exc)) + _log.info("federation.vouch.accepted", voucher=vouch.voucher_fingerprint, target=vouch.target_fingerprint) + return Ok(None) + + +def revoke_vouch(target_fingerprint: str) -> None: + """Delete OUR vouch naming `target_fingerprint`. No-op if absent.""" + db.delete_vouch(node_fingerprint(), target_fingerprint) + _log.info("federation.vouch.revoked", target=target_fingerprint) + + +def our_vouches() -> List[Vouch]: + """Vouches we have issued (filter for voucher_fingerprint == our fp).""" + return [_row_to_vouch(r) for r in db.vouches_by_voucher(node_fingerprint())] + + +def vouches_for(target_fingerprint: str) -> List[Vouch]: + """Every vouch stored locally that names `target_fingerprint` as target.""" + return [_row_to_vouch(r) for r in db.vouches_by_target(target_fingerprint)] + + +def is_vouched(target_fingerprint: str, min_vouchers: Optional[int] = None) -> bool: + """True iff ≥`min_vouchers` distinct non-expired vouches from currently-trusted + peers name `target_fingerprint`. + """ + cfg = quorum_config() + threshold = min_vouchers if min_vouchers is not None else cfg.trust_min_vouchers + if threshold <= 0: + return True + now = datetime.now(timezone.utc) + trusted_fps = {p.fingerprint for p in list_peers() if p.status == "trusted"} + distinct_vouchers: set = set() + for v in vouches_for(target_fingerprint): + if v.expires_at is not None and v.expires_at < now: + continue + if v.voucher_fingerprint not in trusted_fps: + continue + distinct_vouchers.add(v.voucher_fingerprint) + if len(distinct_vouchers) >= threshold: + return True + return False + + +def peer_is_listening_eligible(fingerprint: str) -> bool: + """True iff the peer is directly trusted OR vouched into trust. + + This is the gate used by `import_signed_feed`. Auto-response will share + this signature — keep it stable. + """ + if not fingerprint: + return False + for p in list_peers(): + if p.fingerprint == fingerprint: + if p.status == "trusted": + return True + if p.status == "blocked": + return False + break + return is_vouched(fingerprint) + + +def is_quorum_met(signal_hash: str, k: Optional[int] = None) -> bool: + """True iff ≥k distinct vouched peers have reported `signal_hash`. + + "Vouched" here means `peer_is_listening_eligible` — the same web-of-trust + set the import gate respects. Self-reports from the local node do not + count (they never end up in federation_signals). + """ + cfg = quorum_config() + threshold = k if k is not None else cfg.signal_quorum_k + if threshold <= 0: + return True + rows = db.signals_for_hash(signal_hash) + distinct: set = set() + for r in rows: + fp = r.get("peer_fingerprint") or "" + if not fp or fp in distinct: + continue + if not peer_is_listening_eligible(fp): + continue + distinct.add(fp) + if len(distinct) >= threshold: + return True + return False + + +def quorum_evidence(signal_hash: str) -> List[Tuple[str, datetime]]: + """(peer_fingerprint, received_at) tuples for one signal_hash — for UI display. + + Only includes signals from currently listening-eligible peers, deduped + per fingerprint at the earliest receipt. + """ + rows = db.signals_for_hash(signal_hash) + earliest: Dict[str, datetime] = {} + for r in rows: + fp = r.get("peer_fingerprint") or "" + if not fp or not peer_is_listening_eligible(fp): + continue + try: + ts = datetime.fromisoformat(r.get("received_at") or "") + except ValueError: + continue + if fp not in earliest or ts < earliest[fp]: + earliest[fp] = ts + return sorted(earliest.items(), key=lambda kv: kv[1]) From eadd1aea3b0e5e3689215df668018e98260d6da8 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:10:36 +0200 Subject: [PATCH 4/6] stage-vouch-c federation: import gate + translog hook (stage-trans-b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit import_signed_feed now refuses any feed whose declared fingerprint isn't peer_is_listening_eligible (directly trusted OR vouched in), returning Err("peer not trusted: …") before any signal lands. For every case/IOC it does record, it also appends a "signal" entry to the transparency log (best-effort — logger warns but doesn't abort ingest if the append fails). This is the stage-trans-b hook: the import path is the chokepoint, so attaching the chain there gives us coverage of every peer-originated signal we've ever accepted. build_signed_feed now includes our_vouches() in the feed body so vouches propagate. On import we accept_vouch each one — but only if the embedded voucher_fingerprint matches the peer we just authenticated, so a peer can't forge vouches "from" someone else through us. test_federation: the long-standing round-trip test now first registers the synthetic peer as trusted so the gate lets it through. Co-Authored-By: Claude Opus 4.7 --- src/psyc/lines/federation.py | 48 ++++++++++++++++++++++++++++++++++++ tests/test_federation.py | 2 ++ 2 files changed, 50 insertions(+) diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py index 5439068..059aa9d 100644 --- a/src/psyc/lines/federation.py +++ b/src/psyc/lines/federation.py @@ -260,6 +260,9 @@ def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]: "window_hours": window_hours, "cases": _build_case_records(window_hours), "iocs": _build_ioc_records(window_hours), + # Vouches we've issued ride along with the feed so peers can learn + # who we trust and accumulate quorum on shared targets. + "vouches": [v.model_dump() for v in our_vouches()], } sig = sign_payload(canonical_json(payload)) payload["signature"] = base64.b64encode(sig).decode("ascii") @@ -307,10 +310,16 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result except Exception as exc: return Err(f"bad pubkey: {exc}") + # Listening gate: only accept signals from peers we explicitly trust or + # that quorum of trusted peers vouches for. Unknown peers don't land here. + if not peer_is_listening_eligible(peer_fp): + return Err(f"peer not trusted: {peer_fp}") + now = datetime.now(timezone.utc).isoformat() signal_ids: List[Tuple[str, str]] = [] cases = feed.get("cases") or [] iocs = feed.get("iocs") or [] + feed_vouches = feed.get("vouches") or [] for c in cases: case_id = c.get("case_id") or "" @@ -324,6 +333,15 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result raw_json=json.dumps(c, sort_keys=True), )) signal_ids.append(("case", digest)) + try: + translog.append("signal", { + "peer_fingerprint": peer_fp, + "signal_type": "case", + "signal_id": case_id, + "signal_hash": digest, + }) + except Exception as exc: # transparency log is best-effort, never block ingest + _log.warning("federation.translog.append.fail", error=str(exc)) for i in iocs: value = i.get("value") or "" @@ -337,6 +355,36 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result raw_json=json.dumps(i, sort_keys=True), )) signal_ids.append(("ioc", digest)) + try: + translog.append("signal", { + "peer_fingerprint": peer_fp, + "signal_type": "ioc", + "signal_id": value, + "signal_hash": digest, + }) + except Exception as exc: + _log.warning("federation.translog.append.fail", error=str(exc)) + + # Vouch propagation — peer asserts who they trust. We only accept vouches + # whose declared voucher fingerprint matches the peer we just authenticated + # (so a peer can't forge vouches "from" someone else through us). + for v_raw in feed_vouches: + if not isinstance(v_raw, dict): + continue + try: + vouch = Vouch.model_validate(v_raw) + except Exception as exc: + _log.warning("federation.vouch.malformed", error=str(exc)) + continue + if vouch.voucher_fingerprint != peer_fp: + _log.warning( + "federation.vouch.voucher_mismatch", + claimed=vouch.voucher_fingerprint, actual=peer_fp, + ) + continue + accepted = accept_vouch(vouch, expected_pubkey_pem) + if isinstance(accepted, Err): + _log.warning("federation.vouch.rejected", reason=accepted.reason) _log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs)) return Ok(ImportSummary( diff --git a/tests/test_federation.py b/tests/test_federation.py index 3a0c599..b04fddb 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -159,6 +159,8 @@ def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir): new_sig = peer_priv.sign(canonical_json(unsigned)) feed["signature"] = base64.b64encode(new_sig).decode("ascii") + # Stage 4 listening gate: peer must be trusted to land signals. + federation.register_peer("peer.example", peer_fp, peer_pub_pem, status="trusted") result = import_signed_feed(feed, peer_pub_pem) assert isinstance(result, Ok), getattr(result, "reason", "") summary = result.value From 0e56fa70af34513d6826b1a6caf73d88b01458a8 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:11:03 +0200 Subject: [PATCH 5/6] stage-vouch-d federation: cockpit pages + CLI + public endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Public JSON endpoints (no auth): - GET /federation/vouches — our_vouches() so peers can pull our trust - GET /federation/log — last 100 transparency-log entries - GET /federation/log/verify — re-walks the chain, returns {verified, head_hash} or 409 with {error, head_hash} Admin pages (TOTP-gated): - /admin/federation/vouches — issued list, issue form, revoke buttons, per-peer quorum-met table - /admin/federation/log — chain verification status + last 200 entries - /admin/federation/quorum — config form + per-peer eligibility + per-signal-hash distinct-eligible-peer counts CLI: fed-vouch / fed-unvouch / fed-vouches / fed-quorum-set / fed-log / fed-log-verify, plus existing fed-* commands untouched. The base /admin/federation page now links to the three new sub-pages. Co-Authored-By: Claude Opus 4.7 --- src/psyc/_federation_cli.py | 79 +++++++- src/psyc/cockpit/federation_routes.py | 182 +++++++++++++++++- .../cockpit/templates/admin_federation.html | 2 +- .../templates/admin_federation_log.html | 63 ++++++ .../templates/admin_federation_quorum.html | 92 +++++++++ .../templates/admin_federation_vouches.html | 110 +++++++++++ 6 files changed, 525 insertions(+), 3 deletions(-) create mode 100644 src/psyc/cockpit/templates/admin_federation_log.html create mode 100644 src/psyc/cockpit/templates/admin_federation_quorum.html create mode 100644 src/psyc/cockpit/templates/admin_federation_vouches.html diff --git a/src/psyc/_federation_cli.py b/src/psyc/_federation_cli.py index e01bbe6..8a660ab 100644 --- a/src/psyc/_federation_cli.py +++ b/src/psyc/_federation_cli.py @@ -13,7 +13,7 @@ import httpx import typer from psyc import db, log -from psyc.lines import federation +from psyc.lines import federation, translog from psyc.result import Err @@ -133,3 +133,80 @@ def register(typer_app: typer.Typer) -> None: db.init_db() federation.remove_peer(domain) typer.echo(f"removed {domain}") + + # ---------- vouching + quorum -------------------------------------- + + @typer_app.command("fed-vouch") + def fed_vouch( + target_fp: str = typer.Argument(..., help="target peer fingerprint (32 hex)"), + ttl_days: int = typer.Option(90, "--ttl-days", help="vouch lifetime in days"), + ) -> None: + """Issue a signed vouch for `target_fp`. Persists locally + rides our feed.""" + db.init_db() + v = federation.issue_vouch(target_fp.strip(), ttl_days=ttl_days) + typer.echo(f"vouched: {v.target_fingerprint} (expires {v.expires_at})") + + @typer_app.command("fed-unvouch") + def fed_unvouch(target_fp: str = typer.Argument(...)) -> None: + """Revoke OUR vouch for `target_fp`.""" + db.init_db() + federation.revoke_vouch(target_fp.strip()) + typer.echo(f"revoked vouch for {target_fp}") + + @typer_app.command("fed-vouches") + def fed_vouches() -> None: + """List vouches WE have issued.""" + db.init_db() + rows = federation.our_vouches() + if not rows: + typer.echo("(no vouches issued)") + return + for v in rows: + exp = v.expires_at.isoformat() if v.expires_at else "—" + typer.echo(f" {v.target_fingerprint} issued={v.issued_at.isoformat()[:16]} expires={exp[:16]}") + + @typer_app.command("fed-quorum-set") + def fed_quorum_set( + trust: Optional[int] = typer.Option(None, "--trust", help="trust_min_vouchers"), + k: Optional[int] = typer.Option(None, "--k", help="signal_quorum_k"), + ) -> None: + """Update quorum thresholds. Either flag is optional — only changed values overwrite.""" + db.init_db() + cfg = federation.quorum_config() + if trust is not None: + cfg.trust_min_vouchers = max(1, int(trust)) + if k is not None: + cfg.signal_quorum_k = max(1, int(k)) + federation.set_quorum_config(cfg) + typer.echo(f"quorum: trust_min_vouchers={cfg.trust_min_vouchers} signal_quorum_k={cfg.signal_quorum_k}") + + # ---------- transparency log -------------------------------------- + + @typer_app.command("fed-log") + def fed_log( + limit: int = typer.Option(20, "--limit", help="number of entries to show"), + ) -> None: + """Print recent transparency-log entries (newest first).""" + db.init_db() + rows = translog.recent(limit=limit) + if not rows: + typer.echo("(transparency log empty)") + return + for e in rows: + typer.echo( + f" id={e.id:5d} {e.entry_type:6s} {e.timestamp[:19]} hash={e.entry_hash[:16]}…" + ) + + @typer_app.command("fed-log-verify") + def fed_log_verify() -> None: + """Re-walk the chain locally and report verification status.""" + db.init_db() + result = translog.verify_chain() + head = translog.head() + head_hash = head.entry_hash if head else "(empty)" + if isinstance(result, Err): + typer.echo(f" ✗ broken: {result.reason}", err=True) + typer.echo(f" head_hash: {head_hash}") + raise typer.Exit(1) + typer.echo(f" ✓ verified {result.value} entries") + typer.echo(f" head_hash: {head_hash}") diff --git a/src/psyc/cockpit/federation_routes.py b/src/psyc/cockpit/federation_routes.py index d402f38..6e4fabf 100644 --- a/src/psyc/cockpit/federation_routes.py +++ b/src/psyc/cockpit/federation_routes.py @@ -15,7 +15,8 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red from fastapi.templating import Jinja2Templates from psyc import db, log -from psyc.lines import federation +from psyc.lines import federation, translog +from psyc.result import Err _log = log.get(__name__) @@ -122,4 +123,183 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: def federation_feed() -> JSONResponse: return JSONResponse(_cached_feed()) + # ---------- public vouches + transparency log -------------------- + + @app.get("/federation/vouches") + def federation_vouches() -> JSONResponse: + """Vouches WE have issued. Peers fetch this to learn who we trust.""" + return JSONResponse({ + "fingerprint": federation.node_fingerprint(), + "vouches": [v.model_dump(mode="json") for v in federation.our_vouches()], + }) + + @app.get("/federation/log") + def federation_log() -> JSONResponse: + """Last 100 transparency-log entries, newest first.""" + entries = translog.recent(limit=100) + return JSONResponse({ + "count": len(entries), + "entries": [e.model_dump(mode="json") for e in entries], + }) + + @app.get("/federation/log/verify") + def federation_log_verify() -> JSONResponse: + """Re-walk the chain locally and report status. Auditors poll this.""" + result = translog.verify_chain() + head = translog.head() + head_hash = head.entry_hash if head else None + if isinstance(result, Err): + return JSONResponse({"error": result.reason, "head_hash": head_hash}, status_code=409) + return JSONResponse({"verified": result.value, "head_hash": head_hash}) + + # ---------- admin: vouches page --------------------------------- + + @app.get("/admin/federation/vouches", response_class=HTMLResponse) + def admin_federation_vouches(request: Request) -> HTMLResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + peers = federation.list_peers() + cfg = federation.quorum_config() + ours = federation.our_vouches() + # Per-peer view: vouches naming each peer and whether quorum is met. + peer_rows = [] + for p in peers: + vouches = federation.vouches_for(p.fingerprint) + peer_rows.append({ + "peer": p, + "vouches": vouches, + "vouched": federation.is_vouched(p.fingerprint), + "eligible": federation.peer_is_listening_eligible(p.fingerprint), + }) + return TEMPLATES.TemplateResponse( + request, + "admin_federation_vouches.html", + { + "fingerprint": federation.node_fingerprint(), + "our_vouches": ours, + "peer_rows": peer_rows, + "cfg": cfg, + }, + ) + + @app.post("/admin/federation/vouches/issue") + def admin_federation_vouch_issue( + request: Request, + target_fingerprint: str = Form(...), + ttl_days: int = Form(90), + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + try: + federation.issue_vouch(target_fingerprint.strip(), ttl_days=ttl_days) + except Exception as exc: + _log.warning("federation.vouch.issue.error", error=str(exc)) + return RedirectResponse("/admin/federation/vouches", status_code=303) + + @app.post("/admin/federation/vouches/revoke") + def admin_federation_vouch_revoke( + request: Request, + target_fingerprint: str = Form(...), + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + federation.revoke_vouch(target_fingerprint.strip()) + return RedirectResponse("/admin/federation/vouches", status_code=303) + + # ---------- admin: transparency log page ------------------------ + + @app.get("/admin/federation/log", response_class=HTMLResponse) + def admin_federation_log(request: Request) -> HTMLResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + result = translog.verify_chain() + head = translog.head() + entries = translog.recent(limit=200) + verify_status: Dict[str, Any] + if isinstance(result, Err): + verify_status = {"ok": False, "reason": result.reason} + else: + verify_status = {"ok": True, "verified": result.value} + return TEMPLATES.TemplateResponse( + request, + "admin_federation_log.html", + { + "verify_status": verify_status, + "head_hash": head.entry_hash if head else "", + "head_id": head.id if head else 0, + "entries": entries, + }, + ) + + # ---------- admin: quorum config + per-peer/per-hash view ------- + + @app.get("/admin/federation/quorum", response_class=HTMLResponse) + def admin_federation_quorum(request: Request) -> HTMLResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + cfg = federation.quorum_config() + peers = federation.list_peers() + peer_rows = [ + { + "peer": p, + "vouched": federation.is_vouched(p.fingerprint), + "eligible": federation.peer_is_listening_eligible(p.fingerprint), + } + for p in peers + ] + # Group buffered signals by signal_hash and count distinct eligible peers. + signal_rows: Dict[str, Dict[str, Any]] = {} + for s in db.recent_signals(limit=500): + h = s.get("signal_hash") or "" + entry = signal_rows.setdefault(h, { + "signal_hash": h, + "signal_type": s.get("signal_type") or "", + "signal_id": s.get("signal_id") or "", + "peers": set(), + "latest": s.get("received_at") or "", + }) + entry["peers"].add(s.get("peer_fingerprint") or "") + hash_summary = [] + for h, row in signal_rows.items(): + distinct_eligible = sum( + 1 for fp in row["peers"] if federation.peer_is_listening_eligible(fp) + ) + hash_summary.append({ + "signal_hash": h, + "signal_type": row["signal_type"], + "signal_id": row["signal_id"], + "distinct_peers": len(row["peers"]), + "distinct_eligible": distinct_eligible, + "quorum_met": distinct_eligible >= cfg.signal_quorum_k, + "latest": row["latest"], + }) + hash_summary.sort(key=lambda r: r["latest"], reverse=True) + return TEMPLATES.TemplateResponse( + request, + "admin_federation_quorum.html", + { + "cfg": cfg, + "peer_rows": peer_rows, + "hash_summary": hash_summary, + }, + ) + + @app.post("/admin/federation/quorum/save") + def admin_federation_quorum_save( + request: Request, + trust_min_vouchers: int = Form(...), + signal_quorum_k: int = Form(...), + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + try: + cfg = federation.QuorumConfig( + trust_min_vouchers=max(1, int(trust_min_vouchers)), + signal_quorum_k=max(1, int(signal_quorum_k)), + ) + federation.set_quorum_config(cfg) + except Exception as exc: + _log.warning("federation.quorum.save.error", error=str(exc)) + return RedirectResponse("/admin/federation/quorum", status_code=303) + _log.info("federation.routes.registered") diff --git a/src/psyc/cockpit/templates/admin_federation.html b/src/psyc/cockpit/templates/admin_federation.html index b9493bd..8d7bfaa 100644 --- a/src/psyc/cockpit/templates/admin_federation.html +++ b/src/psyc/cockpit/templates/admin_federation.html @@ -8,7 +8,7 @@ {{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}

This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.

-

← back to admin

+

← back to admin  ·  vouches  ·  quorum config  ·  transparency log

node fingerprint
diff --git a/src/psyc/cockpit/templates/admin_federation_log.html b/src/psyc/cockpit/templates/admin_federation_log.html new file mode 100644 index 0000000..341b24e --- /dev/null +++ b/src/psyc/cockpit/templates/admin_federation_log.html @@ -0,0 +1,63 @@ +{% extends "base.html" %} +{% block title %}Transparency Log — psyc admin{% endblock %} +{% block content %} + +
+
+

Transparency Log

+ id @ {{ head_id }} +
+

Every signal we accept from a peer is appended to a signed merkle chain. Each entry references the previous entry's hash, so tampering with any historical row invalidates every entry after. Auditors can re-walk and detect a bad peer historically — even one we trusted at the time.

+

← back to federation  ·  vouches  ·  quorum config  ·  public verify endpoint

+ +
+
chain verification
+ {% if verify_status.ok %} +
verified   {{ verify_status.verified }} entries walked, no breaks
+ {% else %} +
BROKEN   {{ verify_status.reason }}
+ {% endif %} + {% if head_hash %} +
head hash
+
{{ head_hash }}
+ {% endif %} +
+
+ +
+
+

Recent Entries

+ {{ entries|length }} of last 200 +
+

Newest first. Hashes are truncated for display — full values are at /federation/log.

+ + {% if entries %} + + + + {% for e in entries %} + + + + + + + + + {% endfor %} + +
idWhenTypePeer / targetSignal idHash
{{ e.id }}{{ (e.timestamp or '')[:19] | replace('T', ' ') }}{{ e.entry_type }} + {% if e.entry_type == 'signal' %} + {{ (e.entry_data.peer_fingerprint or '')[:8] }}… + {% elif e.entry_type == 'vouch' %} + {{ (e.entry_data.voucher_fingerprint or '')[:8] }}…→{{ (e.entry_data.target_fingerprint or '')[:8] }}… + {% else %} + — + {% endif %} + {{ ((e.entry_data.signal_id or e.entry_data.target_fingerprint or '') | string)[:32] }}{{ e.entry_hash[:16] }}…
+ {% else %} +

(chain empty — no signals appended yet)

+ {% endif %} +
+ +{% endblock %} diff --git a/src/psyc/cockpit/templates/admin_federation_quorum.html b/src/psyc/cockpit/templates/admin_federation_quorum.html new file mode 100644 index 0000000..25e91a4 --- /dev/null +++ b/src/psyc/cockpit/templates/admin_federation_quorum.html @@ -0,0 +1,92 @@ +{% extends "base.html" %} +{% block title %}Quorum Config — psyc admin{% endblock %} +{% block content %} + +
+
+

Quorum Configuration

+ trust={{ cfg.trust_min_vouchers }} k={{ cfg.signal_quorum_k }} +
+

trust_min_vouchers — distinct trusted vouchers required to make a new peer listening-eligible. signal_quorum_k — distinct listening-eligible peers required to consider a signal_hash quorum-met. Both gates live in pulse_settings; raising them tightens trust, lowering them relaxes it.

+

← back to federation  ·  vouches  ·  transparency log

+ +
+ + + + + +
+
+ +
+
+

Per-Peer Listening Eligibility

+ {{ peer_rows|length }} +
+

A peer's feed gets ingested only when its fingerprint is eligible (directly trusted or vouched into trust).

+ + {% if peer_rows %} + + + + {% for row in peer_rows %} + + + + + + + + {% endfor %} + +
DomainFingerprintStatusVouchedEligible
{{ row.peer.domain }}{{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }} + {% if row.peer.status == 'trusted' %} + trusted + {% elif row.peer.status == 'blocked' %} + blocked + {% else %} + {{ row.peer.status }} + {% endif %} + {% if row.vouched %}yes{% else %}no{% endif %}{% if row.eligible %}listening{% else %}muted{% endif %}
+ {% else %} +

(no peers registered yet)

+ {% endif %} +
+ +
+
+

Signal Hashes in Buffer

+ {{ hash_summary|length }} hashes +
+

Distinct eligible-peer counts per signal hash. Quorum is met when count ≥ {{ cfg.signal_quorum_k }}.

+ + {% if hash_summary %} + + + + {% for r in hash_summary %} + + + + + + + + + + {% endfor %} + +
LatestTypeSignal idHashDistinct peersEligibleQuorum
{{ (r.latest or '')[:19] | replace('T', ' ') }}{{ r.signal_type }}{{ r.signal_id[:48] }}{{ r.signal_hash[:16] }}…{{ r.distinct_peers }}{{ r.distinct_eligible }} + {% if r.quorum_met %} + met + {% else %} + below + {% endif %} +
+ {% else %} +

(no signals in buffer yet)

+ {% endif %} +
+ +{% endblock %} diff --git a/src/psyc/cockpit/templates/admin_federation_vouches.html b/src/psyc/cockpit/templates/admin_federation_vouches.html new file mode 100644 index 0000000..98f337e --- /dev/null +++ b/src/psyc/cockpit/templates/admin_federation_vouches.html @@ -0,0 +1,110 @@ +{% extends "base.html" %} +{% block title %}Federation Vouches — psyc admin{% endblock %} +{% block content %} + +
+
+

Web of Trust

+ {{ our_vouches|length }} issued +
+

A vouch is an Ed25519-signed assertion that we trust another node's fingerprint. Peers gossip our vouches with their feeds, so trust accumulates: once {{ cfg.trust_min_vouchers }} of our trusted peers vouches for a new fingerprint, it becomes listening-eligible — its signed feeds get ingested.

+

← back to federation  ·  quorum config  ·  transparency log

+ +
+
our fingerprint
+
{{ fingerprint }}
+
+
+ +
+
+

Vouches We've Issued

+ {{ our_vouches|length }} +
+

We've signed these — peers that fetch our feed will see them and may extend trust accordingly.

+ + {% if our_vouches %} + + + + {% for v in our_vouches %} + + + + + + + {% endfor %} + +
Target fingerprintIssuedExpires
{{ v.target_fingerprint }}{{ v.issued_at.isoformat()[:16] | replace('T', ' ') }}{{ v.expires_at.isoformat()[:16] | replace('T', ' ') if v.expires_at else '—' }} +
+ + +
+
+ {% else %} +

(no vouches issued yet)

+ {% endif %} +
+ +
+
+

Issue a Vouch

+
+

Vouch for a peer's fingerprint. Trusted peers see this and may treat the target as listening-eligible.

+
+ + + +
+
+ +
+
+

Per-Peer Quorum Status

+ {{ peer_rows|length }} peers +
+

Threshold: {{ cfg.trust_min_vouchers }} distinct trusted vouchers required to make a non-trusted peer listening-eligible.

+ + {% if peer_rows %} + + + + {% for row in peer_rows %} + + + + + + + + {% endfor %} + +
PeerStatusVouchesQuorum metEligible
{{ row.peer.domain }}
{{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }}
+ {% if row.peer.status == 'trusted' %} + trusted + {% elif row.peer.status == 'blocked' %} + blocked + {% else %} + {{ row.peer.status }} + {% endif %} + {{ row.vouches|length }} + {% if row.vouched %} + yes + {% else %} + no + {% endif %} + + {% if row.eligible %} + listening + {% else %} + muted + {% endif %} +
+ {% else %} +

(no peers registered yet)

+ {% endif %} +
+ +{% endblock %} From f4148d86a6c274ce89446f616bbb5259e4a5517a Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:11:18 +0200 Subject: [PATCH 6/6] stage-vouch-e federation: tests for vouching + quorum gate test_vouching covers the contract auto-response and other agents will gate on: - issue_vouch round-trip (sign + verify under our own pubkey) - accept_vouch rejects expired vouches - accept_vouch rejects mismatched signatures - accept_vouch rejects vouchers whose peers.status != "trusted" - accept_vouch happy path - is_vouched needs DISTINCT vouchers (two upserts from one peer == 1) - is_vouched clears threshold with two distinct trusted vouchers - is_quorum_met counts only listening-eligible peers (untrusted + duplicate rows don't count) - quorum_config defaults + pulse_settings persistence - import_signed_feed rejects unknown peer ("not trusted") - import_signed_feed accepts directly-trusted peer - import_signed_feed accepts a peer made eligible via two vouches - import_signed_feed stores vouches embedded in a trusted peer's feed Co-Authored-By: Claude Opus 4.7 --- tests/test_vouching.py | 336 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 tests/test_vouching.py diff --git a/tests/test_vouching.py b/tests/test_vouching.py new file mode 100644 index 0000000..d9b312e --- /dev/null +++ b/tests/test_vouching.py @@ -0,0 +1,336 @@ +"""Vouching + quorum — sign/verify, threshold logic, import gate.""" + +from __future__ import annotations + +import base64 +import hashlib +from datetime import datetime, timedelta, timezone + +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import federation +from psyc.lines.federation import ( + QuorumConfig, + Vouch, + accept_vouch, + build_signed_feed, + canonical_json, + import_signed_feed, + is_quorum_met, + is_vouched, + issue_vouch, + node_fingerprint, + our_vouches, + peer_is_listening_eligible, + public_key_pem, + quorum_config, + register_peer, + revoke_vouch, + set_quorum_config, + vouch_payload_bytes, +) +from psyc.result import Err, Ok + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +def _make_peer(): + """Generate an Ed25519 keypair + matching fingerprint for a fake peer.""" + priv = ed25519.Ed25519PrivateKey.generate() + pub = priv.public_key() + pub_pem = pub.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + raw = pub.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + fp = hashlib.sha256(raw).digest()[:16].hex() + return priv, pub_pem, fp + + +def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at): + payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at) + sig = priv.sign(payload) + return base64.b64encode(sig).decode("ascii") + + +# ---------- self-issued vouch round-trip -------------------------------- + +def test_issue_vouch_roundtrip(fresh_db, fed_dir): + target = "ab" * 16 + v = issue_vouch(target, ttl_days=30) + assert v.voucher_fingerprint == node_fingerprint() + assert v.target_fingerprint == target + assert v.expires_at is not None + # round-trip from storage + listed = our_vouches() + assert len(listed) == 1 + assert listed[0].target_fingerprint == target + assert listed[0].signature == v.signature + # signature verifies under our own pubkey + payload = vouch_payload_bytes( + v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at + ) + sig = base64.b64decode(v.signature) + assert federation.verify_payload(payload, sig, public_key_pem()) + + +def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir): + target = "cd" * 16 + issue_vouch(target, ttl_days=30) + assert len(our_vouches()) == 1 + revoke_vouch(target) + assert our_vouches() == [] + + +# ---------- accept_vouch validation ------------------------------------- + +def test_accept_vouch_rejects_expired(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + issued = datetime.now(timezone.utc) - timedelta(days=10) + expired = datetime.now(timezone.utc) - timedelta(days=1) + sig = _sign_vouch(priv, fp, "target", issued, expired) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expired, signature=sig) + result = accept_vouch(v, pem) + assert isinstance(result, Err) + assert "expired" in result.reason + + +def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + # Sign a different target then claim it's for "real-target". + real_sig = _sign_vouch(priv, fp, "other-target", issued, expires) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target", + issued_at=issued, expires_at=expires, signature=real_sig) + result = accept_vouch(v, pem) + assert isinstance(result, Err) + assert "signature" in result.reason + + +def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + # Voucher exists but is "unknown" not "trusted". + register_peer("voucher.example", fp, pem, status="unknown") + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + sig = _sign_vouch(priv, fp, "target", issued, expires) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expires, signature=sig) + result = accept_vouch(v, pem) + assert isinstance(result, Err) + assert "not trusted" in result.reason + + +def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + sig = _sign_vouch(priv, fp, "target", issued, expires) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expires, signature=sig) + result = accept_vouch(v, pem) + assert isinstance(result, Ok) + + +# ---------- is_vouched threshold ---------------------------------------- + +def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir): + """Two vouches from the same peer must NOT clear a threshold of 2.""" + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + sig = _sign_vouch(priv, fp, "target", issued, expires) + v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expires, signature=sig) + assert isinstance(accept_vouch(v1, pem), Ok) + + # Newer vouch from the SAME voucher — upsert replaces, count stays 1. + issued2 = issued + timedelta(seconds=1) + sig2 = _sign_vouch(priv, fp, "target", issued2, expires) + v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued2, expires_at=expires, signature=sig2) + assert isinstance(accept_vouch(v2, pem), Ok) + + assert is_vouched("target", min_vouchers=2) is False + # Threshold of 1 should pass. + assert is_vouched("target", min_vouchers=1) is True + + +def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir): + priv_a, pem_a, fp_a = _make_peer() + priv_b, pem_b, fp_b = _make_peer() + register_peer("a.example", fp_a, pem_a, status="trusted") + register_peer("b.example", fp_b, pem_b, status="trusted") + + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target", + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_a, fp_a, "target", issued, expires)) + vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target", + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_b, fp_b, "target", issued, expires)) + assert isinstance(accept_vouch(va, pem_a), Ok) + assert isinstance(accept_vouch(vb, pem_b), Ok) + + assert is_vouched("target", min_vouchers=2) is True + assert is_vouched("target", min_vouchers=3) is False + + +# ---------- quorum on signal_hash --------------------------------------- + +def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir): + # Two trusted peers + one untrusted peer report the same signal_hash. + _, pem_a, fp_a = _make_peer() + _, pem_b, fp_b = _make_peer() + _, pem_c, fp_c = _make_peer() + register_peer("a.example", fp_a, pem_a, status="trusted") + register_peer("b.example", fp_b, pem_b, status="trusted") + register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible + + for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="h-aaa", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + + assert is_quorum_met("h-aaa", k=2) is True + assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts + + +# ---------- quorum config persistence ----------------------------------- + +def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir): + cfg = quorum_config() + assert cfg.trust_min_vouchers == 2 + assert cfg.signal_quorum_k == 2 + set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4)) + cfg2 = quorum_config() + assert cfg2.trust_min_vouchers == 3 + assert cfg2.signal_quorum_k == 4 + + +# ---------- import gate enforces listening eligibility ------------------ + +def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None): + """Build a feed claiming origin=peer_fp, signed with peer_priv.""" + payload = { + "version": federation.FEED_VERSION, + "fingerprint": peer_fp, + "generated_at": datetime.now(timezone.utc).isoformat(), + "window_hours": 24, + "cases": [], + "iocs": [{ + "value": "9.9.9.9", + "type": "ip", + "severity": "high", + "first_seen": datetime.now(timezone.utc).isoformat(), + "digest_sha256": "abc123", + }], + "vouches": vouches or [], + } + sig = peer_priv.sign(canonical_json(payload)) + payload["signature"] = base64.b64encode(sig).decode("ascii") + return payload + + +def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir): + peer_priv, peer_pem, peer_fp = _make_peer() + feed = _signed_feed_from_peer(peer_priv, peer_fp) + result = import_signed_feed(feed, peer_pem) + assert isinstance(result, Err) + assert "not trusted" in result.reason + + +def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir): + peer_priv, peer_pem, peer_fp = _make_peer() + register_peer("peer.example", peer_fp, peer_pem, status="trusted") + feed = _signed_feed_from_peer(peer_priv, peer_fp) + result = import_signed_feed(feed, peer_pem) + assert isinstance(result, Ok), getattr(result, "reason", "") + + +def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir): + # Two trusted peers vouch for a third — third becomes listening-eligible. + priv_a, pem_a, fp_a = _make_peer() + priv_b, pem_b, fp_b = _make_peer() + priv_c, pem_c, fp_c = _make_peer() + register_peer("a.example", fp_a, pem_a, status="trusted") + register_peer("b.example", fp_b, pem_b, status="trusted") + + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c, + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires)) + vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c, + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires)) + assert isinstance(accept_vouch(va, pem_a), Ok) + assert isinstance(accept_vouch(vb, pem_b), Ok) + assert peer_is_listening_eligible(fp_c) is True + + feed = _signed_feed_from_peer(priv_c, fp_c) + result = import_signed_feed(feed, pem_c) + assert isinstance(result, Ok), getattr(result, "reason", "") + + +def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir): + """A trusted peer's feed carries a vouch the peer issued — we should + accept_vouch it and store it locally.""" + peer_priv, peer_pem, peer_fp = _make_peer() + register_peer("peer.example", peer_fp, peer_pem, status="trusted") + + target_fp = "ff" * 16 + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + peer_vouch = Vouch( + voucher_fingerprint=peer_fp, + target_fingerprint=target_fp, + issued_at=issued, + expires_at=expires, + signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires), + ) + feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")]) + + result = import_signed_feed(feed, peer_pem) + assert isinstance(result, Ok), getattr(result, "reason", "") + + # The vouch is now in our local store under the peer's fingerprint. + stored = federation.vouches_for(target_fp) + assert any(v.voucher_fingerprint == peer_fp for v in stored)