diff --git a/src/psyc/_federation_cli.py b/src/psyc/_federation_cli.py
index 2d40b8c..e36bfd7 100644
--- a/src/psyc/_federation_cli.py
+++ b/src/psyc/_federation_cli.py
@@ -13,7 +13,7 @@ import httpx
import typer
from psyc import db, log
-from psyc.lines import discovery, federation, pulse
+from psyc.lines import discovery, federation, pulse, translog
from psyc.result import Err, Ok
@@ -210,3 +210,80 @@ def register(typer_app: typer.Typer) -> None:
seeds = [s for s in seeds if s != d]
pulse.set_discovery_seeds(seeds)
typer.echo(f"removed seed {d}")
+
+ # ---------- vouching + quorum --------------------------------------
+
+ @typer_app.command("fed-vouch")
+ def fed_vouch(
+ target_fp: str = typer.Argument(..., help="target peer fingerprint (32 hex)"),
+ ttl_days: int = typer.Option(90, "--ttl-days", help="vouch lifetime in days"),
+ ) -> None:
+ """Issue a signed vouch for `target_fp`. Persists locally + rides our feed."""
+ db.init_db()
+ v = federation.issue_vouch(target_fp.strip(), ttl_days=ttl_days)
+ typer.echo(f"vouched: {v.target_fingerprint} (expires {v.expires_at})")
+
+ @typer_app.command("fed-unvouch")
+ def fed_unvouch(target_fp: str = typer.Argument(...)) -> None:
+ """Revoke OUR vouch for `target_fp`."""
+ db.init_db()
+ federation.revoke_vouch(target_fp.strip())
+ typer.echo(f"revoked vouch for {target_fp}")
+
+ @typer_app.command("fed-vouches")
+ def fed_vouches() -> None:
+ """List vouches WE have issued."""
+ db.init_db()
+ rows = federation.our_vouches()
+ if not rows:
+ typer.echo("(no vouches issued)")
+ return
+ for v in rows:
+ exp = v.expires_at.isoformat() if v.expires_at else "—"
+ typer.echo(f" {v.target_fingerprint} issued={v.issued_at.isoformat()[:16]} expires={exp[:16]}")
+
+ @typer_app.command("fed-quorum-set")
+ def fed_quorum_set(
+ trust: Optional[int] = typer.Option(None, "--trust", help="trust_min_vouchers"),
+ k: Optional[int] = typer.Option(None, "--k", help="signal_quorum_k"),
+ ) -> None:
+ """Update quorum thresholds. Either flag is optional — only changed values overwrite."""
+ db.init_db()
+ cfg = federation.quorum_config()
+ if trust is not None:
+ cfg.trust_min_vouchers = max(1, int(trust))
+ if k is not None:
+ cfg.signal_quorum_k = max(1, int(k))
+ federation.set_quorum_config(cfg)
+ typer.echo(f"quorum: trust_min_vouchers={cfg.trust_min_vouchers} signal_quorum_k={cfg.signal_quorum_k}")
+
+ # ---------- transparency log --------------------------------------
+
+ @typer_app.command("fed-log")
+ def fed_log(
+ limit: int = typer.Option(20, "--limit", help="number of entries to show"),
+ ) -> None:
+ """Print recent transparency-log entries (newest first)."""
+ db.init_db()
+ rows = translog.recent(limit=limit)
+ if not rows:
+ typer.echo("(transparency log empty)")
+ return
+ for e in rows:
+ typer.echo(
+ f" id={e.id:5d} {e.entry_type:6s} {e.timestamp[:19]} hash={e.entry_hash[:16]}…"
+ )
+
+ @typer_app.command("fed-log-verify")
+ def fed_log_verify() -> None:
+ """Re-walk the chain locally and report verification status."""
+ db.init_db()
+ result = translog.verify_chain()
+ head = translog.head()
+ head_hash = head.entry_hash if head else "(empty)"
+ if isinstance(result, Err):
+ typer.echo(f" ✗ broken: {result.reason}", err=True)
+ typer.echo(f" head_hash: {head_hash}")
+ raise typer.Exit(1)
+ typer.echo(f" ✓ verified {result.value} entries")
+ typer.echo(f" head_hash: {head_hash}")
diff --git a/src/psyc/cockpit/federation_routes.py b/src/psyc/cockpit/federation_routes.py
index ba7e4c2..90bdcb0 100644
--- a/src/psyc/cockpit/federation_routes.py
+++ b/src/psyc/cockpit/federation_routes.py
@@ -15,7 +15,8 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red
from fastapi.templating import Jinja2Templates
from psyc import db, log
-from psyc.lines import discovery, federation, pulse
+from psyc.lines import discovery, federation, pulse, translog
+from psyc.result import Err
_log = log.get(__name__)
@@ -191,4 +192,183 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
"""
return JSONResponse(_cached_public_peers())
+ # ---------- public vouches + transparency log --------------------
+
+ @app.get("/federation/vouches")
+ def federation_vouches() -> JSONResponse:
+ """Vouches WE have issued. Peers fetch this to learn who we trust."""
+ return JSONResponse({
+ "fingerprint": federation.node_fingerprint(),
+ "vouches": [v.model_dump(mode="json") for v in federation.our_vouches()],
+ })
+
+ @app.get("/federation/log")
+ def federation_log() -> JSONResponse:
+ """Last 100 transparency-log entries, newest first."""
+ entries = translog.recent(limit=100)
+ return JSONResponse({
+ "count": len(entries),
+ "entries": [e.model_dump(mode="json") for e in entries],
+ })
+
+ @app.get("/federation/log/verify")
+ def federation_log_verify() -> JSONResponse:
+ """Re-walk the chain locally and report status. Auditors poll this."""
+ result = translog.verify_chain()
+ head = translog.head()
+ head_hash = head.entry_hash if head else None
+ if isinstance(result, Err):
+ return JSONResponse({"error": result.reason, "head_hash": head_hash}, status_code=409)
+ return JSONResponse({"verified": result.value, "head_hash": head_hash})
+
+ # ---------- admin: vouches page ---------------------------------
+
+ @app.get("/admin/federation/vouches", response_class=HTMLResponse)
+ def admin_federation_vouches(request: Request) -> HTMLResponse:
+ if not _admin_ok(request):
+ return RedirectResponse("/admin", status_code=303)
+ peers = federation.list_peers()
+ cfg = federation.quorum_config()
+ ours = federation.our_vouches()
+ # Per-peer view: vouches naming each peer and whether quorum is met.
+ peer_rows = []
+ for p in peers:
+ vouches = federation.vouches_for(p.fingerprint)
+ peer_rows.append({
+ "peer": p,
+ "vouches": vouches,
+ "vouched": federation.is_vouched(p.fingerprint),
+ "eligible": federation.peer_is_listening_eligible(p.fingerprint),
+ })
+ return TEMPLATES.TemplateResponse(
+ request,
+ "admin_federation_vouches.html",
+ {
+ "fingerprint": federation.node_fingerprint(),
+ "our_vouches": ours,
+ "peer_rows": peer_rows,
+ "cfg": cfg,
+ },
+ )
+
+ @app.post("/admin/federation/vouches/issue")
+ def admin_federation_vouch_issue(
+ request: Request,
+ target_fingerprint: str = Form(...),
+ ttl_days: int = Form(90),
+ ) -> RedirectResponse:
+ if not _admin_ok(request):
+ raise HTTPException(status_code=403, detail="admin session required")
+ try:
+ federation.issue_vouch(target_fingerprint.strip(), ttl_days=ttl_days)
+ except Exception as exc:
+ _log.warning("federation.vouch.issue.error", error=str(exc))
+ return RedirectResponse("/admin/federation/vouches", status_code=303)
+
+ @app.post("/admin/federation/vouches/revoke")
+ def admin_federation_vouch_revoke(
+ request: Request,
+ target_fingerprint: str = Form(...),
+ ) -> RedirectResponse:
+ if not _admin_ok(request):
+ raise HTTPException(status_code=403, detail="admin session required")
+ federation.revoke_vouch(target_fingerprint.strip())
+ return RedirectResponse("/admin/federation/vouches", status_code=303)
+
+ # ---------- admin: transparency log page ------------------------
+
+ @app.get("/admin/federation/log", response_class=HTMLResponse)
+ def admin_federation_log(request: Request) -> HTMLResponse:
+ if not _admin_ok(request):
+ return RedirectResponse("/admin", status_code=303)
+ result = translog.verify_chain()
+ head = translog.head()
+ entries = translog.recent(limit=200)
+ verify_status: Dict[str, Any]
+ if isinstance(result, Err):
+ verify_status = {"ok": False, "reason": result.reason}
+ else:
+ verify_status = {"ok": True, "verified": result.value}
+ return TEMPLATES.TemplateResponse(
+ request,
+ "admin_federation_log.html",
+ {
+ "verify_status": verify_status,
+ "head_hash": head.entry_hash if head else "",
+ "head_id": head.id if head else 0,
+ "entries": entries,
+ },
+ )
+
+ # ---------- admin: quorum config + per-peer/per-hash view -------
+
+ @app.get("/admin/federation/quorum", response_class=HTMLResponse)
+ def admin_federation_quorum(request: Request) -> HTMLResponse:
+ if not _admin_ok(request):
+ return RedirectResponse("/admin", status_code=303)
+ cfg = federation.quorum_config()
+ peers = federation.list_peers()
+ peer_rows = [
+ {
+ "peer": p,
+ "vouched": federation.is_vouched(p.fingerprint),
+ "eligible": federation.peer_is_listening_eligible(p.fingerprint),
+ }
+ for p in peers
+ ]
+ # Group buffered signals by signal_hash and count distinct eligible peers.
+ signal_rows: Dict[str, Dict[str, Any]] = {}
+ for s in db.recent_signals(limit=500):
+ h = s.get("signal_hash") or ""
+ entry = signal_rows.setdefault(h, {
+ "signal_hash": h,
+ "signal_type": s.get("signal_type") or "",
+ "signal_id": s.get("signal_id") or "",
+ "peers": set(),
+ "latest": s.get("received_at") or "",
+ })
+ entry["peers"].add(s.get("peer_fingerprint") or "")
+ hash_summary = []
+ for h, row in signal_rows.items():
+ distinct_eligible = sum(
+ 1 for fp in row["peers"] if federation.peer_is_listening_eligible(fp)
+ )
+ hash_summary.append({
+ "signal_hash": h,
+ "signal_type": row["signal_type"],
+ "signal_id": row["signal_id"],
+ "distinct_peers": len(row["peers"]),
+ "distinct_eligible": distinct_eligible,
+ "quorum_met": distinct_eligible >= cfg.signal_quorum_k,
+ "latest": row["latest"],
+ })
+ hash_summary.sort(key=lambda r: r["latest"], reverse=True)
+ return TEMPLATES.TemplateResponse(
+ request,
+ "admin_federation_quorum.html",
+ {
+ "cfg": cfg,
+ "peer_rows": peer_rows,
+ "hash_summary": hash_summary,
+ },
+ )
+
+ @app.post("/admin/federation/quorum/save")
+ def admin_federation_quorum_save(
+ request: Request,
+ trust_min_vouchers: int = Form(...),
+ signal_quorum_k: int = Form(...),
+ ) -> RedirectResponse:
+ if not _admin_ok(request):
+ raise HTTPException(status_code=403, detail="admin session required")
+ try:
+ cfg = federation.QuorumConfig(
+ trust_min_vouchers=max(1, int(trust_min_vouchers)),
+ signal_quorum_k=max(1, int(signal_quorum_k)),
+ )
+ federation.set_quorum_config(cfg)
+ except Exception as exc:
+ _log.warning("federation.quorum.save.error", error=str(exc))
+ return RedirectResponse("/admin/federation/quorum", status_code=303)
+
_log.info("federation.routes.registered")
diff --git a/src/psyc/cockpit/templates/admin_federation.html b/src/psyc/cockpit/templates/admin_federation.html
index b9493bd..8d7bfaa 100644
--- a/src/psyc/cockpit/templates/admin_federation.html
+++ b/src/psyc/cockpit/templates/admin_federation.html
@@ -8,7 +8,7 @@
{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}
This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.
-
node fingerprint
diff --git a/src/psyc/cockpit/templates/admin_federation_log.html b/src/psyc/cockpit/templates/admin_federation_log.html
new file mode 100644
index 0000000..341b24e
--- /dev/null
+++ b/src/psyc/cockpit/templates/admin_federation_log.html
@@ -0,0 +1,63 @@
+{% extends "base.html" %}
+{% block title %}Transparency Log — psyc admin{% endblock %}
+{% block content %}
+
+
+
+
Transparency Log
+ id @ {{ head_id }}
+
+ Every signal we accept from a peer is appended to a signed merkle chain. Each entry references the previous entry's hash, so tampering with any historical row invalidates every entry after. Auditors can re-walk and detect a bad peer historically — even one we trusted at the time.
+ ← back to federation · vouches · quorum config · public verify endpoint
+
+
+
chain verification
+ {% if verify_status.ok %}
+
verified {{ verify_status.verified }} entries walked, no breaks
+ {% else %}
+
BROKEN {{ verify_status.reason }}
+ {% endif %}
+ {% if head_hash %}
+
head hash
+
{{ head_hash }}
+ {% endif %}
+
+
+
+
+
+
Recent Entries
+ {{ entries|length }} of last 200
+
+ Newest first. Hashes are truncated for display — full values are at /federation/log.
+
+ {% if entries %}
+
+ | id | When | Type | Peer / target | Signal id | Hash |
+
+ {% for e in entries %}
+
+ | {{ e.id }} |
+ {{ (e.timestamp or '')[:19] | replace('T', ' ') }} |
+ {{ e.entry_type }} |
+
+ {% if e.entry_type == 'signal' %}
+ {{ (e.entry_data.peer_fingerprint or '')[:8] }}…
+ {% elif e.entry_type == 'vouch' %}
+ {{ (e.entry_data.voucher_fingerprint or '')[:8] }}…→{{ (e.entry_data.target_fingerprint or '')[:8] }}…
+ {% else %}
+ —
+ {% endif %}
+ |
+ {{ ((e.entry_data.signal_id or e.entry_data.target_fingerprint or '') | string)[:32] }} |
+ {{ e.entry_hash[:16] }}… |
+
+ {% endfor %}
+
+
+ {% else %}
+ (chain empty — no signals appended yet)
+ {% endif %}
+
+
+{% endblock %}
diff --git a/src/psyc/cockpit/templates/admin_federation_quorum.html b/src/psyc/cockpit/templates/admin_federation_quorum.html
new file mode 100644
index 0000000..25e91a4
--- /dev/null
+++ b/src/psyc/cockpit/templates/admin_federation_quorum.html
@@ -0,0 +1,92 @@
+{% extends "base.html" %}
+{% block title %}Quorum Config — psyc admin{% endblock %}
+{% block content %}
+
+
+
+
Quorum Configuration
+ trust={{ cfg.trust_min_vouchers }} k={{ cfg.signal_quorum_k }}
+
+ trust_min_vouchers — distinct trusted vouchers required to make a new peer listening-eligible. signal_quorum_k — distinct listening-eligible peers required to consider a signal_hash quorum-met. Both gates live in pulse_settings; raising them tightens trust, lowering them relaxes it.
+ ← back to federation · vouches · transparency log
+
+
+
+
+
+
+
Per-Peer Listening Eligibility
+ {{ peer_rows|length }}
+
+ A peer's feed gets ingested only when its fingerprint is eligible (directly trusted or vouched into trust).
+
+ {% if peer_rows %}
+
+ | Domain | Fingerprint | Status | Vouched | Eligible |
+
+ {% for row in peer_rows %}
+
+ | {{ row.peer.domain }} |
+ {{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }} |
+
+ {% if row.peer.status == 'trusted' %}
+ trusted
+ {% elif row.peer.status == 'blocked' %}
+ blocked
+ {% else %}
+ {{ row.peer.status }}
+ {% endif %}
+ |
+ {% if row.vouched %}yes{% else %}no{% endif %} |
+ {% if row.eligible %}listening{% else %}muted{% endif %} |
+
+ {% endfor %}
+
+
+ {% else %}
+ (no peers registered yet)
+ {% endif %}
+
+
+
+
+
Signal Hashes in Buffer
+ {{ hash_summary|length }} hashes
+
+ Distinct eligible-peer counts per signal hash. Quorum is met when count ≥ {{ cfg.signal_quorum_k }}.
+
+ {% if hash_summary %}
+
+ | Latest | Type | Signal id | Hash | Distinct peers | Eligible | Quorum |
+
+ {% for r in hash_summary %}
+
+ | {{ (r.latest or '')[:19] | replace('T', ' ') }} |
+ {{ r.signal_type }} |
+ {{ r.signal_id[:48] }} |
+ {{ r.signal_hash[:16] }}… |
+ {{ r.distinct_peers }} |
+ {{ r.distinct_eligible }} |
+
+ {% if r.quorum_met %}
+ met
+ {% else %}
+ below
+ {% endif %}
+ |
+
+ {% endfor %}
+
+
+ {% else %}
+ (no signals in buffer yet)
+ {% endif %}
+
+
+{% endblock %}
diff --git a/src/psyc/cockpit/templates/admin_federation_vouches.html b/src/psyc/cockpit/templates/admin_federation_vouches.html
new file mode 100644
index 0000000..98f337e
--- /dev/null
+++ b/src/psyc/cockpit/templates/admin_federation_vouches.html
@@ -0,0 +1,110 @@
+{% extends "base.html" %}
+{% block title %}Federation Vouches — psyc admin{% endblock %}
+{% block content %}
+
+
+
+
Web of Trust
+ {{ our_vouches|length }} issued
+
+ A vouch is an Ed25519-signed assertion that we trust another node's fingerprint. Peers gossip our vouches with their feeds, so trust accumulates: once {{ cfg.trust_min_vouchers }} of our trusted peers vouches for a new fingerprint, it becomes listening-eligible — its signed feeds get ingested.
+ ← back to federation · quorum config · transparency log
+
+
+
our fingerprint
+
{{ fingerprint }}
+
+
+
+
+
+
Vouches We've Issued
+ {{ our_vouches|length }}
+
+ We've signed these — peers that fetch our feed will see them and may extend trust accordingly.
+
+ {% if our_vouches %}
+
+ | Target fingerprint | Issued | Expires | |
+
+ {% for v in our_vouches %}
+
+ | {{ v.target_fingerprint }} |
+ {{ v.issued_at.isoformat()[:16] | replace('T', ' ') }} |
+ {{ v.expires_at.isoformat()[:16] | replace('T', ' ') if v.expires_at else '—' }} |
+
+
+ |
+
+ {% endfor %}
+
+
+ {% else %}
+ (no vouches issued yet)
+ {% endif %}
+
+
+
+
+
Issue a Vouch
+
+ Vouch for a peer's fingerprint. Trusted peers see this and may treat the target as listening-eligible.
+
+
+
+
+
+
Per-Peer Quorum Status
+ {{ peer_rows|length }} peers
+
+ Threshold: {{ cfg.trust_min_vouchers }} distinct trusted vouchers required to make a non-trusted peer listening-eligible.
+
+ {% if peer_rows %}
+
+ | Peer | Status | Vouches | Quorum met | Eligible |
+
+ {% for row in peer_rows %}
+
+ {{ row.peer.domain }} {{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }} |
+
+ {% if row.peer.status == 'trusted' %}
+ trusted
+ {% elif row.peer.status == 'blocked' %}
+ blocked
+ {% else %}
+ {{ row.peer.status }}
+ {% endif %}
+ |
+ {{ row.vouches|length }} |
+
+ {% if row.vouched %}
+ yes
+ {% else %}
+ no
+ {% endif %}
+ |
+
+ {% if row.eligible %}
+ listening
+ {% else %}
+ muted
+ {% endif %}
+ |
+
+ {% endfor %}
+
+
+ {% else %}
+ (no peers registered yet)
+ {% endif %}
+
+
+{% endblock %}
diff --git a/src/psyc/db.py b/src/psyc/db.py
index cd0a6e5..c355f9a 100644
--- a/src/psyc/db.py
+++ b/src/psyc/db.py
@@ -161,6 +161,35 @@ Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
+# Web-of-trust vouches — voucher signs an attestation that target is OK to listen to.
+# Quorum is reached when enough distinct trusted vouchers sign for the same target.
+vouches = Table(
+ "vouches", _metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("voucher_fingerprint", String, nullable=False),
+ Column("target_fingerprint", String, nullable=False),
+ Column("issued_at", String, nullable=False),
+ Column("expires_at", String, nullable=True),
+ Column("signature", Text, nullable=False), # base64 ed25519 sig
+)
+Index("vouches_unique_idx", vouches.c.voucher_fingerprint, vouches.c.target_fingerprint, unique=True)
+Index("vouches_target_idx", vouches.c.target_fingerprint)
+
+# Transparency log — append-only signed hash chain over every signal we receive.
+# Each entry references the previous entry's hash; tampering with any row breaks
+# verify_chain on every subsequent row.
+translog = Table(
+ "translog", _metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("prev_hash", String, nullable=False),
+ Column("entry_type", String, nullable=False), # signal | vouch | config
+ Column("entry_data", Text, nullable=False), # canonical JSON of payload
+ Column("timestamp", String, nullable=False),
+ Column("entry_hash", String, nullable=False),
+)
+Index("translog_hash_idx", translog.c.entry_hash)
+Index("translog_time_idx", translog.c.timestamp.desc())
+
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -388,3 +417,113 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+# ---------- federation: pulse_settings get/set (shared scratch kv) -------
+
+def setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
+ """Read one pulse_settings value by key. Returns None if absent."""
+ stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
+ with engine(db_path).connect() as conn:
+ row = conn.execute(stmt).fetchone()
+ return None if row is None else str(row.value)
+
+
+def setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
+ """Upsert one pulse_settings entry."""
+ stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
+ stmt = stmt.on_conflict_do_update(
+ index_elements=[pulse_settings.c.key],
+ set_=dict(value=stmt.excluded.value),
+ )
+ with engine(db_path).begin() as conn:
+ conn.execute(stmt)
+
+
+# ---------- federation: vouches ------------------------------------------
+
+def upsert_vouch(row: dict, db_path: Path = DB_PATH) -> None:
+ """Insert-or-update one vouch. Unique on (voucher_fp, target_fp)."""
+ stmt = sqlite_insert(vouches).values(**row)
+ update_cols = {k: stmt.excluded[k] for k in row if k not in ("voucher_fingerprint", "target_fingerprint")}
+ stmt = stmt.on_conflict_do_update(
+ index_elements=[vouches.c.voucher_fingerprint, vouches.c.target_fingerprint],
+ set_=update_cols,
+ )
+ with engine(db_path).begin() as conn:
+ conn.execute(stmt)
+
+
+def list_vouches(db_path: Path = DB_PATH) -> List[dict]:
+ stmt = select(vouches).order_by(vouches.c.issued_at.desc())
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def vouches_by_target(target_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
+ stmt = select(vouches).where(vouches.c.target_fingerprint == target_fingerprint)
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def vouches_by_voucher(voucher_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
+ stmt = select(vouches).where(vouches.c.voucher_fingerprint == voucher_fingerprint)
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def delete_vouch(voucher_fingerprint: str, target_fingerprint: str, db_path: Path = DB_PATH) -> None:
+ stmt = vouches.delete().where(
+ (vouches.c.voucher_fingerprint == voucher_fingerprint)
+ & (vouches.c.target_fingerprint == target_fingerprint)
+ )
+ with engine(db_path).begin() as conn:
+ conn.execute(stmt)
+
+
+# ---------- transparency log ---------------------------------------------
+
+def translog_append(row: dict, db_path: Path = DB_PATH) -> int:
+ """Append one transparency-log entry. Returns inserted id."""
+ stmt = insert(translog).values(**row)
+ with engine(db_path).begin() as conn:
+ res = conn.execute(stmt)
+ return int(res.inserted_primary_key[0])
+
+
+def translog_head(db_path: Path = DB_PATH) -> Optional[dict]:
+ """Highest-id (latest) entry, or None if chain empty."""
+ stmt = select(translog).order_by(translog.c.id.desc()).limit(1)
+ with engine(db_path).connect() as conn:
+ row = conn.execute(stmt).fetchone()
+ return dict(row._mapping) if row else None
+
+
+def translog_get(entry_id: int, db_path: Path = DB_PATH) -> Optional[dict]:
+ stmt = select(translog).where(translog.c.id == entry_id)
+ with engine(db_path).connect() as conn:
+ row = conn.execute(stmt).fetchone()
+ return dict(row._mapping) if row else None
+
+
+def translog_after(entry_id: int, db_path: Path = DB_PATH) -> List[dict]:
+ """All entries with id > entry_id, oldest first — for sync."""
+ stmt = select(translog).where(translog.c.id > entry_id).order_by(translog.c.id.asc())
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def translog_recent(limit: int = 100, db_path: Path = DB_PATH) -> List[dict]:
+ stmt = select(translog).order_by(translog.c.id.desc()).limit(limit)
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB_PATH) -> List[dict]:
+ """All entries with start <= id (and id <= end if given), oldest first."""
+ cond = translog.c.id >= start
+ if end is not None:
+ cond = cond & (translog.c.id <= end)
+ stmt = select(translog).where(cond).order_by(translog.c.id.asc())
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py
index bed46fb..059aa9d 100644
--- a/src/psyc/lines/federation.py
+++ b/src/psyc/lines/federation.py
@@ -24,6 +24,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519
from pydantic import BaseModel, Field
from psyc import DATA_DIR, db, log
+from psyc.lines import translog
from psyc.result import Err, Ok, Result
@@ -259,6 +260,9 @@ def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
"window_hours": window_hours,
"cases": _build_case_records(window_hours),
"iocs": _build_ioc_records(window_hours),
+ # Vouches we've issued ride along with the feed so peers can learn
+ # who we trust and accumulate quorum on shared targets.
+ "vouches": [v.model_dump() for v in our_vouches()],
}
sig = sign_payload(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
@@ -306,10 +310,16 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
except Exception as exc:
return Err(f"bad pubkey: {exc}")
+ # Listening gate: only accept signals from peers we explicitly trust or
+ # that quorum of trusted peers vouches for. Unknown peers don't land here.
+ if not peer_is_listening_eligible(peer_fp):
+ return Err(f"peer not trusted: {peer_fp}")
+
now = datetime.now(timezone.utc).isoformat()
signal_ids: List[Tuple[str, str]] = []
cases = feed.get("cases") or []
iocs = feed.get("iocs") or []
+ feed_vouches = feed.get("vouches") or []
for c in cases:
case_id = c.get("case_id") or ""
@@ -323,6 +333,15 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
raw_json=json.dumps(c, sort_keys=True),
))
signal_ids.append(("case", digest))
+ try:
+ translog.append("signal", {
+ "peer_fingerprint": peer_fp,
+ "signal_type": "case",
+ "signal_id": case_id,
+ "signal_hash": digest,
+ })
+ except Exception as exc: # transparency log is best-effort, never block ingest
+ _log.warning("federation.translog.append.fail", error=str(exc))
for i in iocs:
value = i.get("value") or ""
@@ -336,6 +355,36 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
raw_json=json.dumps(i, sort_keys=True),
))
signal_ids.append(("ioc", digest))
+ try:
+ translog.append("signal", {
+ "peer_fingerprint": peer_fp,
+ "signal_type": "ioc",
+ "signal_id": value,
+ "signal_hash": digest,
+ })
+ except Exception as exc:
+ _log.warning("federation.translog.append.fail", error=str(exc))
+
+ # Vouch propagation — peer asserts who they trust. We only accept vouches
+ # whose declared voucher fingerprint matches the peer we just authenticated
+ # (so a peer can't forge vouches "from" someone else through us).
+ for v_raw in feed_vouches:
+ if not isinstance(v_raw, dict):
+ continue
+ try:
+ vouch = Vouch.model_validate(v_raw)
+ except Exception as exc:
+ _log.warning("federation.vouch.malformed", error=str(exc))
+ continue
+ if vouch.voucher_fingerprint != peer_fp:
+ _log.warning(
+ "federation.vouch.voucher_mismatch",
+ claimed=vouch.voucher_fingerprint, actual=peer_fp,
+ )
+ continue
+ accepted = accept_vouch(vouch, expected_pubkey_pem)
+ if isinstance(accepted, Err):
+ _log.warning("federation.vouch.rejected", reason=accepted.reason)
_log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
return Ok(ImportSummary(
@@ -403,3 +452,279 @@ def set_peer_status(domain: str, status: str) -> None:
def remove_peer(domain: str) -> None:
db.remove_peer(domain)
+
+
+# ---------- vouching + quorum (stage 4) ---------------------------------
+#
+# The web of trust: a peer's fingerprint becomes "listening-eligible" when
+# either we directly trust it (peers.status == "trusted") or at least
+# `trust_min_vouchers` of our trusted peers have signed a vouch for it.
+#
+# Signal-level quorum: a federation_signals row is meaningful only when
+# `signal_quorum_k` distinct vouched peers have reported the same signal_hash.
+#
+# Vouches are short Pydantic records signed with the voucher's Ed25519 key
+# over canonical JSON of the body (everything but the signature field).
+
+
+class Vouch(BaseModel):
+ voucher_fingerprint: str
+ target_fingerprint: str
+ issued_at: datetime
+ expires_at: Optional[datetime] = None
+ signature: str = "" # base64 ed25519 sig over vouch_payload_bytes(...)
+
+
+class QuorumConfig(BaseModel):
+ trust_min_vouchers: int = 2
+ signal_quorum_k: int = 2
+
+
+_QC_TRUST_KEY = "wot_trust_min"
+_QC_K_KEY = "wot_quorum_k"
+
+
+def quorum_config() -> QuorumConfig:
+ """Live quorum settings, with sensible defaults if pulse_settings is empty."""
+ cfg = QuorumConfig()
+ t = db.setting_get(_QC_TRUST_KEY)
+ k = db.setting_get(_QC_K_KEY)
+ if t is not None:
+ try:
+ cfg.trust_min_vouchers = max(1, int(t))
+ except ValueError:
+ pass
+ if k is not None:
+ try:
+ cfg.signal_quorum_k = max(1, int(k))
+ except ValueError:
+ pass
+ return cfg
+
+
+def set_quorum_config(cfg: QuorumConfig) -> None:
+ """Persist quorum config into pulse_settings."""
+ db.setting_set(_QC_TRUST_KEY, str(cfg.trust_min_vouchers))
+ db.setting_set(_QC_K_KEY, str(cfg.signal_quorum_k))
+
+
+def vouch_payload_bytes(
+ voucher_fp: str,
+ target_fp: str,
+ issued_at: datetime,
+ expires_at: Optional[datetime],
+) -> bytes:
+ """Canonical JSON of the unsigned vouch body — what the voucher signs."""
+ body: Dict[str, Any] = {
+ "voucher_fingerprint": voucher_fp,
+ "target_fingerprint": target_fp,
+ "issued_at": issued_at.isoformat(),
+ "expires_at": expires_at.isoformat() if expires_at else None,
+ }
+ return canonical_json(body)
+
+
+def _store_vouch(v: Vouch) -> None:
+ db.upsert_vouch(dict(
+ voucher_fingerprint=v.voucher_fingerprint,
+ target_fingerprint=v.target_fingerprint,
+ issued_at=v.issued_at.isoformat(),
+ expires_at=v.expires_at.isoformat() if v.expires_at else None,
+ signature=v.signature,
+ ))
+
+
+def _row_to_vouch(row: Dict[str, Any]) -> Vouch:
+ return Vouch(
+ voucher_fingerprint=row["voucher_fingerprint"],
+ target_fingerprint=row["target_fingerprint"],
+ issued_at=datetime.fromisoformat(row["issued_at"]),
+ expires_at=datetime.fromisoformat(row["expires_at"]) if row.get("expires_at") else None,
+ signature=row.get("signature") or "",
+ )
+
+
+def issue_vouch(target_fingerprint: str, ttl_days: int = 90) -> Vouch:
+ """Sign a vouch for `target_fingerprint` with OUR key. Persists + returns it."""
+ our_fp = node_fingerprint()
+ issued_at = datetime.now(timezone.utc)
+ expires_at = issued_at + timedelta(days=ttl_days) if ttl_days > 0 else None
+ payload = vouch_payload_bytes(our_fp, target_fingerprint, issued_at, expires_at)
+ sig = sign_payload(payload)
+ vouch = Vouch(
+ voucher_fingerprint=our_fp,
+ target_fingerprint=target_fingerprint,
+ issued_at=issued_at,
+ expires_at=expires_at,
+ signature=base64.b64encode(sig).decode("ascii"),
+ )
+ _store_vouch(vouch)
+ try:
+ translog.append("vouch", {
+ "voucher_fingerprint": vouch.voucher_fingerprint,
+ "target_fingerprint": vouch.target_fingerprint,
+ "issued_at": vouch.issued_at.isoformat(),
+ "expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
+ })
+ except Exception as exc:
+ _log.warning("federation.translog.append.fail", error=str(exc))
+ _log.info("federation.vouch.issued", target=target_fingerprint, ttl_days=ttl_days)
+ return vouch
+
+
+def accept_vouch(vouch: Vouch, voucher_pubkey_pem: str) -> Result[None, str]:
+ """Verify signature + expiry + voucher trust status, then persist.
+
+ Failure modes return Err with a short reason so the caller can log them.
+ A voucher whose status is not "trusted" in our peers table is refused —
+ we don't accept transitive vouches from unknown peers.
+ """
+ # Expiry first — cheapest check.
+ now = datetime.now(timezone.utc)
+ if vouch.expires_at is not None and vouch.expires_at < now:
+ return Err("vouch expired")
+
+ # Voucher must be a directly-trusted peer (no transitive trust at this layer).
+ voucher_status = None
+ for row in db.list_peers():
+ if row.get("fingerprint") == vouch.voucher_fingerprint:
+ voucher_status = row.get("status")
+ break
+ if voucher_status != "trusted":
+ return Err(f"voucher not trusted: {vouch.voucher_fingerprint}")
+
+ # The pubkey must match the declared voucher fingerprint.
+ try:
+ if _fingerprint_for_pubkey_pem(voucher_pubkey_pem) != vouch.voucher_fingerprint:
+ return Err("voucher pubkey does not match fingerprint")
+ except Exception as exc:
+ return Err(f"bad voucher pubkey: {exc}")
+
+ payload = vouch_payload_bytes(
+ vouch.voucher_fingerprint,
+ vouch.target_fingerprint,
+ vouch.issued_at,
+ vouch.expires_at,
+ )
+ try:
+ signature = base64.b64decode(vouch.signature)
+ except Exception:
+ return Err("vouch signature not base64")
+ if not verify_payload(payload, signature, voucher_pubkey_pem):
+ return Err("vouch signature invalid")
+
+ _store_vouch(vouch)
+ try:
+ translog.append("vouch", {
+ "voucher_fingerprint": vouch.voucher_fingerprint,
+ "target_fingerprint": vouch.target_fingerprint,
+ "issued_at": vouch.issued_at.isoformat(),
+ "expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
+ "accepted": True,
+ })
+ except Exception as exc:
+ _log.warning("federation.translog.append.fail", error=str(exc))
+ _log.info("federation.vouch.accepted", voucher=vouch.voucher_fingerprint, target=vouch.target_fingerprint)
+ return Ok(None)
+
+
+def revoke_vouch(target_fingerprint: str) -> None:
+ """Delete OUR vouch naming `target_fingerprint`. No-op if absent."""
+ db.delete_vouch(node_fingerprint(), target_fingerprint)
+ _log.info("federation.vouch.revoked", target=target_fingerprint)
+
+
+def our_vouches() -> List[Vouch]:
+ """Vouches we have issued (filter for voucher_fingerprint == our fp)."""
+ return [_row_to_vouch(r) for r in db.vouches_by_voucher(node_fingerprint())]
+
+
+def vouches_for(target_fingerprint: str) -> List[Vouch]:
+ """Every vouch stored locally that names `target_fingerprint` as target."""
+ return [_row_to_vouch(r) for r in db.vouches_by_target(target_fingerprint)]
+
+
+def is_vouched(target_fingerprint: str, min_vouchers: Optional[int] = None) -> bool:
+ """True iff ≥`min_vouchers` distinct non-expired vouches from currently-trusted
+ peers name `target_fingerprint`.
+ """
+ cfg = quorum_config()
+ threshold = min_vouchers if min_vouchers is not None else cfg.trust_min_vouchers
+ if threshold <= 0:
+ return True
+ now = datetime.now(timezone.utc)
+ trusted_fps = {p.fingerprint for p in list_peers() if p.status == "trusted"}
+ distinct_vouchers: set = set()
+ for v in vouches_for(target_fingerprint):
+ if v.expires_at is not None and v.expires_at < now:
+ continue
+ if v.voucher_fingerprint not in trusted_fps:
+ continue
+ distinct_vouchers.add(v.voucher_fingerprint)
+ if len(distinct_vouchers) >= threshold:
+ return True
+ return False
+
+
+def peer_is_listening_eligible(fingerprint: str) -> bool:
+ """True iff the peer is directly trusted OR vouched into trust.
+
+ This is the gate used by `import_signed_feed`. Auto-response will share
+ this signature — keep it stable.
+ """
+ if not fingerprint:
+ return False
+ for p in list_peers():
+ if p.fingerprint == fingerprint:
+ if p.status == "trusted":
+ return True
+ if p.status == "blocked":
+ return False
+ break
+ return is_vouched(fingerprint)
+
+
+def is_quorum_met(signal_hash: str, k: Optional[int] = None) -> bool:
+ """True iff ≥k distinct vouched peers have reported `signal_hash`.
+
+ "Vouched" here means `peer_is_listening_eligible` — the same web-of-trust
+ set the import gate respects. Self-reports from the local node do not
+ count (they never end up in federation_signals).
+ """
+ cfg = quorum_config()
+ threshold = k if k is not None else cfg.signal_quorum_k
+ if threshold <= 0:
+ return True
+ rows = db.signals_for_hash(signal_hash)
+ distinct: set = set()
+ for r in rows:
+ fp = r.get("peer_fingerprint") or ""
+ if not fp or fp in distinct:
+ continue
+ if not peer_is_listening_eligible(fp):
+ continue
+ distinct.add(fp)
+ if len(distinct) >= threshold:
+ return True
+ return False
+
+
+def quorum_evidence(signal_hash: str) -> List[Tuple[str, datetime]]:
+ """(peer_fingerprint, received_at) tuples for one signal_hash — for UI display.
+
+ Only includes signals from currently listening-eligible peers, deduped
+ per fingerprint at the earliest receipt.
+ """
+ rows = db.signals_for_hash(signal_hash)
+ earliest: Dict[str, datetime] = {}
+ for r in rows:
+ fp = r.get("peer_fingerprint") or ""
+ if not fp or not peer_is_listening_eligible(fp):
+ continue
+ try:
+ ts = datetime.fromisoformat(r.get("received_at") or "")
+ except ValueError:
+ continue
+ if fp not in earliest or ts < earliest[fp]:
+ earliest[fp] = ts
+ return sorted(earliest.items(), key=lambda kv: kv[1])
diff --git a/src/psyc/lines/translog.py b/src/psyc/lines/translog.py
new file mode 100644
index 0000000..b4a1859
--- /dev/null
+++ b/src/psyc/lines/translog.py
@@ -0,0 +1,161 @@
+"""Transparency log — append-only signed merkle chain over federation signals.
+
+Every signal we receive from a peer (case, IOC, or accepted vouch) is appended
+as one `LogEntry`. Each entry's `entry_hash = sha256(canonical(prev_hash +
+entry_type + entry_data + timestamp))` references the previous head, so any
+tampering with a historical row invalidates every subsequent hash. The chain
+is public — auditors can re-fetch it and re-run `verify_chain` to detect a
+node that quietly mutated history (e.g. to hide a bad signal it accepted).
+
+Hash format: lowercase hex SHA-256 of the canonical JSON of
+``{"prev_hash": "...", "entry_type": "...", "entry_data": {...}, "timestamp": "..."}``.
+Genesis entries use ``prev_hash = "0" * 64``.
+"""
+
+from __future__ import annotations
+
+import hashlib
+import json
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+from pydantic import BaseModel, Field
+
+from psyc import db, log
+from psyc.result import Err, Ok, Result
+
+
+_log = log.get(__name__)
+
+GENESIS_PREV_HASH = "0" * 64
+
+
+class LogEntry(BaseModel):
+ id: int
+ prev_hash: str
+ entry_type: str
+ entry_data: Dict[str, Any] = Field(default_factory=dict)
+ timestamp: str
+ entry_hash: str
+
+
+def _canonical_json(obj: Dict[str, Any]) -> bytes:
+ return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
+
+
+def compute_entry_hash(prev_hash: str, entry_type: str, entry_data: Dict[str, Any], timestamp: str) -> str:
+ """Hex SHA-256 of canonical(prev_hash + entry_type + entry_data + timestamp)."""
+ payload: Dict[str, Any] = {
+ "prev_hash": prev_hash,
+ "entry_type": entry_type,
+ "entry_data": entry_data,
+ "timestamp": timestamp,
+ }
+ return hashlib.sha256(_canonical_json(payload)).hexdigest()
+
+
+def _row_to_entry(row: Dict[str, Any]) -> LogEntry:
+ raw = row.get("entry_data") or "{}"
+ try:
+ data = json.loads(raw)
+ except Exception:
+ data = {}
+ return LogEntry(
+ id=int(row["id"]),
+ prev_hash=str(row["prev_hash"]),
+ entry_type=str(row["entry_type"]),
+ entry_data=data if isinstance(data, dict) else {},
+ timestamp=str(row["timestamp"]),
+ entry_hash=str(row["entry_hash"]),
+ )
+
+
+def head(db_path: Path = db.DB_PATH) -> Optional[LogEntry]:
+ """Latest log entry, or None if the chain is empty."""
+ row = db.translog_head(db_path=db_path)
+ return _row_to_entry(row) if row else None
+
+
+def append(entry_type: str, entry_data: Dict[str, Any], db_path: Path = db.DB_PATH) -> LogEntry:
+ """Atomically append one entry to the chain. Returns the persisted entry."""
+ prev = db.translog_head(db_path=db_path)
+ prev_hash = str(prev["entry_hash"]) if prev else GENESIS_PREV_HASH
+ timestamp = datetime.now(timezone.utc).isoformat()
+ entry_hash = compute_entry_hash(prev_hash, entry_type, entry_data, timestamp)
+ new_id = db.translog_append(
+ dict(
+ prev_hash=prev_hash,
+ entry_type=entry_type,
+ entry_data=json.dumps(entry_data, sort_keys=True),
+ timestamp=timestamp,
+ entry_hash=entry_hash,
+ ),
+ db_path=db_path,
+ )
+ _log.info("translog.append", id=new_id, entry_type=entry_type, hash=entry_hash[:12])
+ return LogEntry(
+ id=new_id,
+ prev_hash=prev_hash,
+ entry_type=entry_type,
+ entry_data=entry_data,
+ timestamp=timestamp,
+ entry_hash=entry_hash,
+ )
+
+
+def verify_chain(start: int = 0, end: Optional[int] = None, db_path: Path = db.DB_PATH) -> Result[int, str]:
+ """Walk entries [start, end] in id order, recompute each hash, compare.
+
+ Returns Ok(n_verified) when every entry's recomputed hash equals the
+ stored one and each prev_hash matches the previous entry's stored hash.
+ Returns Err with the offending id + expected/got hashes otherwise.
+ """
+ rows = db.translog_range(start=start, end=end, db_path=db_path)
+ if not rows:
+ return Ok(0)
+ # Establish the prior hash anchor — either genesis (if walking from id=1)
+ # or the entry just before `start`.
+ first_id = int(rows[0]["id"])
+ if first_id <= 1:
+ prior_hash = GENESIS_PREV_HASH
+ else:
+ anchor = db.translog_get(first_id - 1, db_path=db_path)
+ if anchor is None:
+ return Err(f"missing anchor entry id={first_id - 1}")
+ prior_hash = str(anchor["entry_hash"])
+
+ verified = 0
+ for row in rows:
+ stored_prev = str(row["prev_hash"])
+ if stored_prev != prior_hash:
+ return Err(
+ f"broken at id={row['id']} expected_prev={prior_hash} got_prev={stored_prev}"
+ )
+ try:
+ data = json.loads(row.get("entry_data") or "{}")
+ except Exception:
+ return Err(f"broken at id={row['id']} entry_data not JSON")
+ if not isinstance(data, dict):
+ return Err(f"broken at id={row['id']} entry_data not an object")
+ recomputed = compute_entry_hash(
+ stored_prev, str(row["entry_type"]), data, str(row["timestamp"])
+ )
+ stored_hash = str(row["entry_hash"])
+ if recomputed != stored_hash:
+ return Err(
+ f"broken at id={row['id']} expected={recomputed} got={stored_hash}"
+ )
+ prior_hash = stored_hash
+ verified += 1
+ return Ok(verified)
+
+
+def recent(limit: int = 100, db_path: Path = db.DB_PATH) -> List[LogEntry]:
+ """The latest `limit` entries, newest first."""
+ return [_row_to_entry(r) for r in db.translog_recent(limit=limit, db_path=db_path)]
+
+
+def entries_after(entry_id: int, db_path: Path = db.DB_PATH) -> List[LogEntry]:
+ """All entries with id > entry_id, oldest first — for peer sync."""
+ return [_row_to_entry(r) for r in db.translog_after(entry_id, db_path=db_path)]
diff --git a/tests/test_federation.py b/tests/test_federation.py
index 3a0c599..b04fddb 100644
--- a/tests/test_federation.py
+++ b/tests/test_federation.py
@@ -159,6 +159,8 @@ def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
new_sig = peer_priv.sign(canonical_json(unsigned))
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
+ # Stage 4 listening gate: peer must be trusted to land signals.
+ federation.register_peer("peer.example", peer_fp, peer_pub_pem, status="trusted")
result = import_signed_feed(feed, peer_pub_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
summary = result.value
diff --git a/tests/test_translog.py b/tests/test_translog.py
new file mode 100644
index 0000000..da50cb0
--- /dev/null
+++ b/tests/test_translog.py
@@ -0,0 +1,118 @@
+"""Transparency log — append, verify, tamper detection, sync slices."""
+
+from __future__ import annotations
+
+import json
+
+import pytest
+from sqlalchemy import create_engine, update
+
+from psyc import db
+from psyc.lines import translog
+from psyc.lines.translog import GENESIS_PREV_HASH
+from psyc.result import Err, Ok
+
+
+@pytest.fixture
+def fresh_db(tmp_path, monkeypatch):
+ test_db = tmp_path / "test.db"
+ eng = create_engine(f"sqlite:///{test_db}", future=True)
+ db._metadata.create_all(eng, checkfirst=True)
+ monkeypatch.setattr(db, "_engine", eng)
+ monkeypatch.setattr(db, "DB_PATH", test_db)
+ yield test_db
+
+
+def test_first_append_uses_genesis_prev_hash(fresh_db):
+ e = translog.append("signal", {"x": 1})
+ assert e.prev_hash == GENESIS_PREV_HASH
+ assert e.id >= 1
+ assert e.entry_type == "signal"
+ assert e.entry_data == {"x": 1}
+ # head matches
+ h = translog.head()
+ assert h is not None
+ assert h.id == e.id
+ assert h.entry_hash == e.entry_hash
+
+
+def test_append_chains_prev_hash(fresh_db):
+ e1 = translog.append("signal", {"a": 1})
+ e2 = translog.append("signal", {"b": 2})
+ e3 = translog.append("vouch", {"c": 3})
+ assert e2.prev_hash == e1.entry_hash
+ assert e3.prev_hash == e2.entry_hash
+ head = translog.head()
+ assert head is not None
+ assert head.entry_hash == e3.entry_hash
+
+
+def test_verify_chain_ok_round_trip(fresh_db):
+ translog.append("signal", {"a": 1})
+ translog.append("signal", {"b": 2})
+ translog.append("vouch", {"c": 3})
+ result = translog.verify_chain()
+ assert isinstance(result, Ok)
+ assert result.value == 3
+
+
+def test_verify_chain_empty_returns_zero(fresh_db):
+ result = translog.verify_chain()
+ assert isinstance(result, Ok)
+ assert result.value == 0
+
+
+def test_verify_chain_detects_tampered_data(fresh_db):
+ e1 = translog.append("signal", {"a": 1})
+ e2 = translog.append("signal", {"b": 2})
+
+ # Mutate entry_data of the first row directly in the DB; entry_hash stays
+ # the same but no longer matches the recomputed hash.
+ with db.engine().begin() as conn:
+ conn.execute(
+ update(db.translog)
+ .where(db.translog.c.id == e1.id)
+ .values(entry_data=json.dumps({"a": 999}, sort_keys=True))
+ )
+
+ result = translog.verify_chain()
+ assert isinstance(result, Err)
+ assert "broken at id=" in result.reason
+
+
+def test_verify_chain_detects_tampered_prev_hash(fresh_db):
+ translog.append("signal", {"a": 1})
+ e2 = translog.append("signal", {"b": 2})
+ # Flip e2.prev_hash so it no longer matches e1.entry_hash.
+ with db.engine().begin() as conn:
+ conn.execute(
+ update(db.translog)
+ .where(db.translog.c.id == e2.id)
+ .values(prev_hash="f" * 64)
+ )
+ result = translog.verify_chain()
+ assert isinstance(result, Err)
+ assert "broken at id=" in result.reason
+
+
+def test_entries_after_returns_correct_slice(fresh_db):
+ e1 = translog.append("signal", {"a": 1})
+ e2 = translog.append("signal", {"b": 2})
+ e3 = translog.append("signal", {"c": 3})
+
+ after_zero = translog.entries_after(0)
+ assert [e.id for e in after_zero] == [e1.id, e2.id, e3.id]
+
+ after_e1 = translog.entries_after(e1.id)
+ assert [e.id for e in after_e1] == [e2.id, e3.id]
+
+ after_e3 = translog.entries_after(e3.id)
+ assert after_e3 == []
+
+
+def test_recent_newest_first(fresh_db):
+ e1 = translog.append("signal", {"a": 1})
+ e2 = translog.append("signal", {"b": 2})
+ e3 = translog.append("signal", {"c": 3})
+ recent = translog.recent(limit=10)
+ assert [e.id for e in recent] == [e3.id, e2.id, e1.id]
diff --git a/tests/test_vouching.py b/tests/test_vouching.py
new file mode 100644
index 0000000..d9b312e
--- /dev/null
+++ b/tests/test_vouching.py
@@ -0,0 +1,336 @@
+"""Vouching + quorum — sign/verify, threshold logic, import gate."""
+
+from __future__ import annotations
+
+import base64
+import hashlib
+from datetime import datetime, timedelta, timezone
+
+import pytest
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import ed25519
+from sqlalchemy import create_engine
+
+from psyc import db
+from psyc.lines import federation
+from psyc.lines.federation import (
+ QuorumConfig,
+ Vouch,
+ accept_vouch,
+ build_signed_feed,
+ canonical_json,
+ import_signed_feed,
+ is_quorum_met,
+ is_vouched,
+ issue_vouch,
+ node_fingerprint,
+ our_vouches,
+ peer_is_listening_eligible,
+ public_key_pem,
+ quorum_config,
+ register_peer,
+ revoke_vouch,
+ set_quorum_config,
+ vouch_payload_bytes,
+)
+from psyc.result import Err, Ok
+
+
+@pytest.fixture
+def fresh_db(tmp_path, monkeypatch):
+ test_db = tmp_path / "test.db"
+ eng = create_engine(f"sqlite:///{test_db}", future=True)
+ db._metadata.create_all(eng, checkfirst=True)
+ monkeypatch.setattr(db, "_engine", eng)
+ monkeypatch.setattr(db, "DB_PATH", test_db)
+ yield test_db
+
+
+@pytest.fixture
+def fed_dir(tmp_path, monkeypatch):
+ d = tmp_path / "federation"
+ monkeypatch.setattr(federation, "FED_DIR", d)
+ monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
+ monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
+ yield d
+
+
+def _make_peer():
+ """Generate an Ed25519 keypair + matching fingerprint for a fake peer."""
+ priv = ed25519.Ed25519PrivateKey.generate()
+ pub = priv.public_key()
+ pub_pem = pub.public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ ).decode("ascii")
+ raw = pub.public_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw,
+ )
+ fp = hashlib.sha256(raw).digest()[:16].hex()
+ return priv, pub_pem, fp
+
+
+def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at):
+ payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at)
+ sig = priv.sign(payload)
+ return base64.b64encode(sig).decode("ascii")
+
+
+# ---------- self-issued vouch round-trip --------------------------------
+
+def test_issue_vouch_roundtrip(fresh_db, fed_dir):
+ target = "ab" * 16
+ v = issue_vouch(target, ttl_days=30)
+ assert v.voucher_fingerprint == node_fingerprint()
+ assert v.target_fingerprint == target
+ assert v.expires_at is not None
+ # round-trip from storage
+ listed = our_vouches()
+ assert len(listed) == 1
+ assert listed[0].target_fingerprint == target
+ assert listed[0].signature == v.signature
+ # signature verifies under our own pubkey
+ payload = vouch_payload_bytes(
+ v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at
+ )
+ sig = base64.b64decode(v.signature)
+ assert federation.verify_payload(payload, sig, public_key_pem())
+
+
+def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir):
+ target = "cd" * 16
+ issue_vouch(target, ttl_days=30)
+ assert len(our_vouches()) == 1
+ revoke_vouch(target)
+ assert our_vouches() == []
+
+
+# ---------- accept_vouch validation -------------------------------------
+
+def test_accept_vouch_rejects_expired(fresh_db, fed_dir):
+ priv, pem, fp = _make_peer()
+ register_peer("voucher.example", fp, pem, status="trusted")
+ issued = datetime.now(timezone.utc) - timedelta(days=10)
+ expired = datetime.now(timezone.utc) - timedelta(days=1)
+ sig = _sign_vouch(priv, fp, "target", issued, expired)
+ v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
+ issued_at=issued, expires_at=expired, signature=sig)
+ result = accept_vouch(v, pem)
+ assert isinstance(result, Err)
+ assert "expired" in result.reason
+
+
+def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir):
+ priv, pem, fp = _make_peer()
+ register_peer("voucher.example", fp, pem, status="trusted")
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ # Sign a different target then claim it's for "real-target".
+ real_sig = _sign_vouch(priv, fp, "other-target", issued, expires)
+ v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target",
+ issued_at=issued, expires_at=expires, signature=real_sig)
+ result = accept_vouch(v, pem)
+ assert isinstance(result, Err)
+ assert "signature" in result.reason
+
+
+def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir):
+ priv, pem, fp = _make_peer()
+ # Voucher exists but is "unknown" not "trusted".
+ register_peer("voucher.example", fp, pem, status="unknown")
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ sig = _sign_vouch(priv, fp, "target", issued, expires)
+ v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
+ issued_at=issued, expires_at=expires, signature=sig)
+ result = accept_vouch(v, pem)
+ assert isinstance(result, Err)
+ assert "not trusted" in result.reason
+
+
+def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir):
+ priv, pem, fp = _make_peer()
+ register_peer("voucher.example", fp, pem, status="trusted")
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ sig = _sign_vouch(priv, fp, "target", issued, expires)
+ v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
+ issued_at=issued, expires_at=expires, signature=sig)
+ result = accept_vouch(v, pem)
+ assert isinstance(result, Ok)
+
+
+# ---------- is_vouched threshold ----------------------------------------
+
+def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir):
+ """Two vouches from the same peer must NOT clear a threshold of 2."""
+ priv, pem, fp = _make_peer()
+ register_peer("voucher.example", fp, pem, status="trusted")
+
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ sig = _sign_vouch(priv, fp, "target", issued, expires)
+ v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
+ issued_at=issued, expires_at=expires, signature=sig)
+ assert isinstance(accept_vouch(v1, pem), Ok)
+
+ # Newer vouch from the SAME voucher — upsert replaces, count stays 1.
+ issued2 = issued + timedelta(seconds=1)
+ sig2 = _sign_vouch(priv, fp, "target", issued2, expires)
+ v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
+ issued_at=issued2, expires_at=expires, signature=sig2)
+ assert isinstance(accept_vouch(v2, pem), Ok)
+
+ assert is_vouched("target", min_vouchers=2) is False
+ # Threshold of 1 should pass.
+ assert is_vouched("target", min_vouchers=1) is True
+
+
+def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir):
+ priv_a, pem_a, fp_a = _make_peer()
+ priv_b, pem_b, fp_b = _make_peer()
+ register_peer("a.example", fp_a, pem_a, status="trusted")
+ register_peer("b.example", fp_b, pem_b, status="trusted")
+
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target",
+ issued_at=issued, expires_at=expires,
+ signature=_sign_vouch(priv_a, fp_a, "target", issued, expires))
+ vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target",
+ issued_at=issued, expires_at=expires,
+ signature=_sign_vouch(priv_b, fp_b, "target", issued, expires))
+ assert isinstance(accept_vouch(va, pem_a), Ok)
+ assert isinstance(accept_vouch(vb, pem_b), Ok)
+
+ assert is_vouched("target", min_vouchers=2) is True
+ assert is_vouched("target", min_vouchers=3) is False
+
+
+# ---------- quorum on signal_hash ---------------------------------------
+
+def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir):
+ # Two trusted peers + one untrusted peer report the same signal_hash.
+ _, pem_a, fp_a = _make_peer()
+ _, pem_b, fp_b = _make_peer()
+ _, pem_c, fp_c = _make_peer()
+ register_peer("a.example", fp_a, pem_a, status="trusted")
+ register_peer("b.example", fp_b, pem_b, status="trusted")
+ register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible
+
+ for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct
+ db.record_signal(dict(
+ peer_fingerprint=fp,
+ signal_type="ioc",
+ signal_id="1.2.3.4",
+ signal_hash="h-aaa",
+ received_at=datetime.now(timezone.utc).isoformat(),
+ raw_json="{}",
+ ))
+
+ assert is_quorum_met("h-aaa", k=2) is True
+ assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts
+
+
+# ---------- quorum config persistence -----------------------------------
+
+def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir):
+ cfg = quorum_config()
+ assert cfg.trust_min_vouchers == 2
+ assert cfg.signal_quorum_k == 2
+ set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4))
+ cfg2 = quorum_config()
+ assert cfg2.trust_min_vouchers == 3
+ assert cfg2.signal_quorum_k == 4
+
+
+# ---------- import gate enforces listening eligibility ------------------
+
+def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None):
+ """Build a feed claiming origin=peer_fp, signed with peer_priv."""
+ payload = {
+ "version": federation.FEED_VERSION,
+ "fingerprint": peer_fp,
+ "generated_at": datetime.now(timezone.utc).isoformat(),
+ "window_hours": 24,
+ "cases": [],
+ "iocs": [{
+ "value": "9.9.9.9",
+ "type": "ip",
+ "severity": "high",
+ "first_seen": datetime.now(timezone.utc).isoformat(),
+ "digest_sha256": "abc123",
+ }],
+ "vouches": vouches or [],
+ }
+ sig = peer_priv.sign(canonical_json(payload))
+ payload["signature"] = base64.b64encode(sig).decode("ascii")
+ return payload
+
+
+def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir):
+ peer_priv, peer_pem, peer_fp = _make_peer()
+ feed = _signed_feed_from_peer(peer_priv, peer_fp)
+ result = import_signed_feed(feed, peer_pem)
+ assert isinstance(result, Err)
+ assert "not trusted" in result.reason
+
+
+def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir):
+ peer_priv, peer_pem, peer_fp = _make_peer()
+ register_peer("peer.example", peer_fp, peer_pem, status="trusted")
+ feed = _signed_feed_from_peer(peer_priv, peer_fp)
+ result = import_signed_feed(feed, peer_pem)
+ assert isinstance(result, Ok), getattr(result, "reason", "")
+
+
+def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir):
+ # Two trusted peers vouch for a third — third becomes listening-eligible.
+ priv_a, pem_a, fp_a = _make_peer()
+ priv_b, pem_b, fp_b = _make_peer()
+ priv_c, pem_c, fp_c = _make_peer()
+ register_peer("a.example", fp_a, pem_a, status="trusted")
+ register_peer("b.example", fp_b, pem_b, status="trusted")
+
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c,
+ issued_at=issued, expires_at=expires,
+ signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires))
+ vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c,
+ issued_at=issued, expires_at=expires,
+ signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires))
+ assert isinstance(accept_vouch(va, pem_a), Ok)
+ assert isinstance(accept_vouch(vb, pem_b), Ok)
+ assert peer_is_listening_eligible(fp_c) is True
+
+ feed = _signed_feed_from_peer(priv_c, fp_c)
+ result = import_signed_feed(feed, pem_c)
+ assert isinstance(result, Ok), getattr(result, "reason", "")
+
+
+def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir):
+ """A trusted peer's feed carries a vouch the peer issued — we should
+ accept_vouch it and store it locally."""
+ peer_priv, peer_pem, peer_fp = _make_peer()
+ register_peer("peer.example", peer_fp, peer_pem, status="trusted")
+
+ target_fp = "ff" * 16
+ issued = datetime.now(timezone.utc)
+ expires = issued + timedelta(days=30)
+ peer_vouch = Vouch(
+ voucher_fingerprint=peer_fp,
+ target_fingerprint=target_fp,
+ issued_at=issued,
+ expires_at=expires,
+ signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires),
+ )
+ feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")])
+
+ result = import_signed_feed(feed, peer_pem)
+ assert isinstance(result, Ok), getattr(result, "reason", "")
+
+ # The vouch is now in our local store under the peer's fingerprint.
+ stored = federation.vouches_for(target_fp)
+ assert any(v.voucher_fingerprint == peer_fp for v in stored)