merge vouching+translog: web-of-trust + signed merkle audit log

# Conflicts:
#	src/psyc/_federation_cli.py
#	src/psyc/cockpit/federation_routes.py
This commit is contained in:
m17hr1l
2026-06-06 21:15:11 +02:00
12 changed files with 1606 additions and 3 deletions

View File

@@ -13,7 +13,7 @@ import httpx
import typer import typer
from psyc import db, log from psyc import db, log
from psyc.lines import discovery, federation, pulse from psyc.lines import discovery, federation, pulse, translog
from psyc.result import Err, Ok from psyc.result import Err, Ok
@@ -210,3 +210,80 @@ def register(typer_app: typer.Typer) -> None:
seeds = [s for s in seeds if s != d] seeds = [s for s in seeds if s != d]
pulse.set_discovery_seeds(seeds) pulse.set_discovery_seeds(seeds)
typer.echo(f"removed seed {d}") typer.echo(f"removed seed {d}")
# ---------- vouching + quorum --------------------------------------
@typer_app.command("fed-vouch")
def fed_vouch(
target_fp: str = typer.Argument(..., help="target peer fingerprint (32 hex)"),
ttl_days: int = typer.Option(90, "--ttl-days", help="vouch lifetime in days"),
) -> None:
"""Issue a signed vouch for `target_fp`. Persists locally + rides our feed."""
db.init_db()
v = federation.issue_vouch(target_fp.strip(), ttl_days=ttl_days)
typer.echo(f"vouched: {v.target_fingerprint} (expires {v.expires_at})")
@typer_app.command("fed-unvouch")
def fed_unvouch(target_fp: str = typer.Argument(...)) -> None:
"""Revoke OUR vouch for `target_fp`."""
db.init_db()
federation.revoke_vouch(target_fp.strip())
typer.echo(f"revoked vouch for {target_fp}")
@typer_app.command("fed-vouches")
def fed_vouches() -> None:
"""List vouches WE have issued."""
db.init_db()
rows = federation.our_vouches()
if not rows:
typer.echo("(no vouches issued)")
return
for v in rows:
exp = v.expires_at.isoformat() if v.expires_at else ""
typer.echo(f" {v.target_fingerprint} issued={v.issued_at.isoformat()[:16]} expires={exp[:16]}")
@typer_app.command("fed-quorum-set")
def fed_quorum_set(
trust: Optional[int] = typer.Option(None, "--trust", help="trust_min_vouchers"),
k: Optional[int] = typer.Option(None, "--k", help="signal_quorum_k"),
) -> None:
"""Update quorum thresholds. Either flag is optional — only changed values overwrite."""
db.init_db()
cfg = federation.quorum_config()
if trust is not None:
cfg.trust_min_vouchers = max(1, int(trust))
if k is not None:
cfg.signal_quorum_k = max(1, int(k))
federation.set_quorum_config(cfg)
typer.echo(f"quorum: trust_min_vouchers={cfg.trust_min_vouchers} signal_quorum_k={cfg.signal_quorum_k}")
# ---------- transparency log --------------------------------------
@typer_app.command("fed-log")
def fed_log(
limit: int = typer.Option(20, "--limit", help="number of entries to show"),
) -> None:
"""Print recent transparency-log entries (newest first)."""
db.init_db()
rows = translog.recent(limit=limit)
if not rows:
typer.echo("(transparency log empty)")
return
for e in rows:
typer.echo(
f" id={e.id:5d} {e.entry_type:6s} {e.timestamp[:19]} hash={e.entry_hash[:16]}"
)
@typer_app.command("fed-log-verify")
def fed_log_verify() -> None:
"""Re-walk the chain locally and report verification status."""
db.init_db()
result = translog.verify_chain()
head = translog.head()
head_hash = head.entry_hash if head else "(empty)"
if isinstance(result, Err):
typer.echo(f" ✗ broken: {result.reason}", err=True)
typer.echo(f" head_hash: {head_hash}")
raise typer.Exit(1)
typer.echo(f" ✓ verified {result.value} entries")
typer.echo(f" head_hash: {head_hash}")

View File

@@ -15,7 +15,8 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from psyc import db, log from psyc import db, log
from psyc.lines import discovery, federation, pulse from psyc.lines import discovery, federation, pulse, translog
from psyc.result import Err
_log = log.get(__name__) _log = log.get(__name__)
@@ -191,4 +192,183 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
""" """
return JSONResponse(_cached_public_peers()) return JSONResponse(_cached_public_peers())
# ---------- public vouches + transparency log --------------------
@app.get("/federation/vouches")
def federation_vouches() -> JSONResponse:
"""Vouches WE have issued. Peers fetch this to learn who we trust."""
return JSONResponse({
"fingerprint": federation.node_fingerprint(),
"vouches": [v.model_dump(mode="json") for v in federation.our_vouches()],
})
@app.get("/federation/log")
def federation_log() -> JSONResponse:
"""Last 100 transparency-log entries, newest first."""
entries = translog.recent(limit=100)
return JSONResponse({
"count": len(entries),
"entries": [e.model_dump(mode="json") for e in entries],
})
@app.get("/federation/log/verify")
def federation_log_verify() -> JSONResponse:
"""Re-walk the chain locally and report status. Auditors poll this."""
result = translog.verify_chain()
head = translog.head()
head_hash = head.entry_hash if head else None
if isinstance(result, Err):
return JSONResponse({"error": result.reason, "head_hash": head_hash}, status_code=409)
return JSONResponse({"verified": result.value, "head_hash": head_hash})
# ---------- admin: vouches page ---------------------------------
@app.get("/admin/federation/vouches", response_class=HTMLResponse)
def admin_federation_vouches(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
peers = federation.list_peers()
cfg = federation.quorum_config()
ours = federation.our_vouches()
# Per-peer view: vouches naming each peer and whether quorum is met.
peer_rows = []
for p in peers:
vouches = federation.vouches_for(p.fingerprint)
peer_rows.append({
"peer": p,
"vouches": vouches,
"vouched": federation.is_vouched(p.fingerprint),
"eligible": federation.peer_is_listening_eligible(p.fingerprint),
})
return TEMPLATES.TemplateResponse(
request,
"admin_federation_vouches.html",
{
"fingerprint": federation.node_fingerprint(),
"our_vouches": ours,
"peer_rows": peer_rows,
"cfg": cfg,
},
)
@app.post("/admin/federation/vouches/issue")
def admin_federation_vouch_issue(
request: Request,
target_fingerprint: str = Form(...),
ttl_days: int = Form(90),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
federation.issue_vouch(target_fingerprint.strip(), ttl_days=ttl_days)
except Exception as exc:
_log.warning("federation.vouch.issue.error", error=str(exc))
return RedirectResponse("/admin/federation/vouches", status_code=303)
@app.post("/admin/federation/vouches/revoke")
def admin_federation_vouch_revoke(
request: Request,
target_fingerprint: str = Form(...),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
federation.revoke_vouch(target_fingerprint.strip())
return RedirectResponse("/admin/federation/vouches", status_code=303)
# ---------- admin: transparency log page ------------------------
@app.get("/admin/federation/log", response_class=HTMLResponse)
def admin_federation_log(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
result = translog.verify_chain()
head = translog.head()
entries = translog.recent(limit=200)
verify_status: Dict[str, Any]
if isinstance(result, Err):
verify_status = {"ok": False, "reason": result.reason}
else:
verify_status = {"ok": True, "verified": result.value}
return TEMPLATES.TemplateResponse(
request,
"admin_federation_log.html",
{
"verify_status": verify_status,
"head_hash": head.entry_hash if head else "",
"head_id": head.id if head else 0,
"entries": entries,
},
)
# ---------- admin: quorum config + per-peer/per-hash view -------
@app.get("/admin/federation/quorum", response_class=HTMLResponse)
def admin_federation_quorum(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
cfg = federation.quorum_config()
peers = federation.list_peers()
peer_rows = [
{
"peer": p,
"vouched": federation.is_vouched(p.fingerprint),
"eligible": federation.peer_is_listening_eligible(p.fingerprint),
}
for p in peers
]
# Group buffered signals by signal_hash and count distinct eligible peers.
signal_rows: Dict[str, Dict[str, Any]] = {}
for s in db.recent_signals(limit=500):
h = s.get("signal_hash") or ""
entry = signal_rows.setdefault(h, {
"signal_hash": h,
"signal_type": s.get("signal_type") or "",
"signal_id": s.get("signal_id") or "",
"peers": set(),
"latest": s.get("received_at") or "",
})
entry["peers"].add(s.get("peer_fingerprint") or "")
hash_summary = []
for h, row in signal_rows.items():
distinct_eligible = sum(
1 for fp in row["peers"] if federation.peer_is_listening_eligible(fp)
)
hash_summary.append({
"signal_hash": h,
"signal_type": row["signal_type"],
"signal_id": row["signal_id"],
"distinct_peers": len(row["peers"]),
"distinct_eligible": distinct_eligible,
"quorum_met": distinct_eligible >= cfg.signal_quorum_k,
"latest": row["latest"],
})
hash_summary.sort(key=lambda r: r["latest"], reverse=True)
return TEMPLATES.TemplateResponse(
request,
"admin_federation_quorum.html",
{
"cfg": cfg,
"peer_rows": peer_rows,
"hash_summary": hash_summary,
},
)
@app.post("/admin/federation/quorum/save")
def admin_federation_quorum_save(
request: Request,
trust_min_vouchers: int = Form(...),
signal_quorum_k: int = Form(...),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
cfg = federation.QuorumConfig(
trust_min_vouchers=max(1, int(trust_min_vouchers)),
signal_quorum_k=max(1, int(signal_quorum_k)),
)
federation.set_quorum_config(cfg)
except Exception as exc:
_log.warning("federation.quorum.save.error", error=str(exc))
return RedirectResponse("/admin/federation/quorum", status_code=303)
_log.info("federation.routes.registered") _log.info("federation.routes.registered")

View File

@@ -8,7 +8,7 @@
<span class="count">{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}</span> <span class="count">{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}</span>
</div> </div>
<p class="page-intro">This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.</p> <p class="page-intro">This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.</p>
<p class="back"><a href="/admin">← back to admin</a></p> <p class="back"><a href="/admin">← back to admin</a> &nbsp;·&nbsp; <a href="/admin/federation/vouches">vouches</a> &nbsp;·&nbsp; <a href="/admin/federation/quorum">quorum config</a> &nbsp;·&nbsp; <a href="/admin/federation/log">transparency log</a></p>
<div class="card" style="margin-bottom:14px;"> <div class="card" style="margin-bottom:14px;">
<div class="lg-sub">node fingerprint</div> <div class="lg-sub">node fingerprint</div>

View File

@@ -0,0 +1,63 @@
{% extends "base.html" %}
{% block title %}Transparency Log — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Transparency Log</h1>
<span class="count">id @ {{ head_id }}</span>
</div>
<p class="page-intro">Every signal we accept from a peer is appended to a signed merkle chain. Each entry references the previous entry's hash, so tampering with any historical row invalidates every entry after. Auditors can re-walk and detect a bad peer historically — even one we trusted at the time.</p>
<p class="back"><a href="/admin/federation">← back to federation</a> &nbsp;·&nbsp; <a href="/admin/federation/vouches">vouches</a> &nbsp;·&nbsp; <a href="/admin/federation/quorum">quorum config</a> &nbsp;·&nbsp; <a href="/federation/log/verify">public verify endpoint</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">chain verification</div>
{% if verify_status.ok %}
<div style="margin:6px 0;"><span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">verified</span> &nbsp; {{ verify_status.verified }} entries walked, no breaks</div>
{% else %}
<div style="margin:6px 0;"><span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">BROKEN</span> &nbsp; {{ verify_status.reason }}</div>
{% endif %}
{% if head_hash %}
<div class="lg-sub" style="margin-top:8px;">head hash</div>
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:13px; word-break:break-all; color:var(--accent);">{{ head_hash }}</div>
{% endif %}
</div>
</section>
<section class="panel">
<div class="panel-head">
<h2>Recent Entries</h2>
<span class="count">{{ entries|length }} of last 200</span>
</div>
<p class="page-intro">Newest first. Hashes are truncated for display — full values are at <code>/federation/log</code>.</p>
{% if entries %}
<table class="ledger">
<thead><tr><th>id</th><th>When</th><th>Type</th><th>Peer / target</th><th>Signal id</th><th>Hash</th></tr></thead>
<tbody>
{% for e in entries %}
<tr class="ledger-row">
<td>{{ e.id }}</td>
<td class="lg-ts">{{ (e.timestamp or '')[:19] | replace('T', ' ') }}</td>
<td><span class="sev-badge">{{ e.entry_type }}</span></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">
{% if e.entry_type == 'signal' %}
{{ (e.entry_data.peer_fingerprint or '')[:8] }}…
{% elif e.entry_type == 'vouch' %}
{{ (e.entry_data.voucher_fingerprint or '')[:8] }}…→{{ (e.entry_data.target_fingerprint or '')[:8] }}…
{% else %}
{% endif %}
</td>
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ ((e.entry_data.signal_id or e.entry_data.target_fingerprint or '') | string)[:32] }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ e.entry_hash[:16] }}…</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(chain empty — no signals appended yet)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -0,0 +1,92 @@
{% extends "base.html" %}
{% block title %}Quorum Config — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Quorum Configuration</h1>
<span class="count">trust={{ cfg.trust_min_vouchers }} k={{ cfg.signal_quorum_k }}</span>
</div>
<p class="page-intro"><strong>trust_min_vouchers</strong> — distinct trusted vouchers required to make a new peer listening-eligible. <strong>signal_quorum_k</strong> — distinct listening-eligible peers required to consider a signal_hash quorum-met. Both gates live in pulse_settings; raising them tightens trust, lowering them relaxes it.</p>
<p class="back"><a href="/admin/federation">← back to federation</a> &nbsp;·&nbsp; <a href="/admin/federation/vouches">vouches</a> &nbsp;·&nbsp; <a href="/admin/federation/log">transparency log</a></p>
<form method="post" action="/admin/federation/quorum/save" style="display:grid; gap:10px; max-width:520px;">
<label class="lg-sub">trust_min_vouchers</label>
<input type="number" name="trust_min_vouchers" value="{{ cfg.trust_min_vouchers }}" min="1" max="50" class="lookup-input" required>
<label class="lg-sub">signal_quorum_k</label>
<input type="number" name="signal_quorum_k" value="{{ cfg.signal_quorum_k }}" min="1" max="50" class="lookup-input" required>
<button type="submit" class="btn btn-enforce">save config</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Per-Peer Listening Eligibility</h2>
<span class="count">{{ peer_rows|length }}</span>
</div>
<p class="page-intro">A peer's feed gets ingested only when its fingerprint is eligible (directly trusted or vouched into trust).</p>
{% if peer_rows %}
<table class="ledger">
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Vouched</th><th>Eligible</th></tr></thead>
<tbody>
{% for row in peer_rows %}
<tr class="ledger-row">
<td><strong>{{ row.peer.domain }}</strong></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }}</td>
<td>
{% if row.peer.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif row.peer.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">{{ row.peer.status }}</span>
{% endif %}
</td>
<td>{% if row.vouched %}<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">yes</span>{% else %}<span class="sev-badge">no</span>{% endif %}</td>
<td>{% if row.eligible %}<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">listening</span>{% else %}<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">muted</span>{% endif %}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no peers registered yet)</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Signal Hashes in Buffer</h2>
<span class="count">{{ hash_summary|length }} hashes</span>
</div>
<p class="page-intro">Distinct eligible-peer counts per signal hash. Quorum is met when count ≥ {{ cfg.signal_quorum_k }}.</p>
{% if hash_summary %}
<table class="ledger">
<thead><tr><th>Latest</th><th>Type</th><th>Signal id</th><th>Hash</th><th>Distinct peers</th><th>Eligible</th><th>Quorum</th></tr></thead>
<tbody>
{% for r in hash_summary %}
<tr class="ledger-row">
<td class="lg-ts">{{ (r.latest or '')[:19] | replace('T', ' ') }}</td>
<td><span class="sev-badge">{{ r.signal_type }}</span></td>
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ r.signal_id[:48] }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ r.signal_hash[:16] }}…</td>
<td>{{ r.distinct_peers }}</td>
<td>{{ r.distinct_eligible }}</td>
<td>
{% if r.quorum_met %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">met</span>
{% else %}
<span class="sev-badge">below</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no signals in buffer yet)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -0,0 +1,110 @@
{% extends "base.html" %}
{% block title %}Federation Vouches — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Web of Trust</h1>
<span class="count">{{ our_vouches|length }} issued</span>
</div>
<p class="page-intro">A vouch is an Ed25519-signed assertion that we trust another node's fingerprint. Peers gossip our vouches with their feeds, so trust accumulates: once {{ cfg.trust_min_vouchers }} of our trusted peers vouches for a new fingerprint, it becomes <em>listening-eligible</em> — its signed feeds get ingested.</p>
<p class="back"><a href="/admin/federation">← back to federation</a> &nbsp;·&nbsp; <a href="/admin/federation/quorum">quorum config</a> &nbsp;·&nbsp; <a href="/admin/federation/log">transparency log</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">our fingerprint</div>
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:14px; word-break:break-all; color:var(--accent);">{{ fingerprint }}</div>
</div>
</section>
<section class="panel">
<div class="panel-head">
<h2>Vouches We've Issued</h2>
<span class="count">{{ our_vouches|length }}</span>
</div>
<p class="page-intro">We've signed these — peers that fetch our feed will see them and may extend trust accordingly.</p>
{% if our_vouches %}
<table class="ledger">
<thead><tr><th>Target fingerprint</th><th>Issued</th><th>Expires</th><th></th></tr></thead>
<tbody>
{% for v in our_vouches %}
<tr class="ledger-row">
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:12px;">{{ v.target_fingerprint }}</td>
<td class="lg-ts">{{ v.issued_at.isoformat()[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ v.expires_at.isoformat()[:16] | replace('T', ' ') if v.expires_at else '—' }}</td>
<td>
<form method="post" action="/admin/federation/vouches/revoke" class="queue-action"
onsubmit="return confirm('Revoke vouch for {{ v.target_fingerprint[:8] }}…?');">
<input type="hidden" name="target_fingerprint" value="{{ v.target_fingerprint }}">
<button type="submit" class="btn btn-reject">revoke</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no vouches issued yet)</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Issue a Vouch</h2>
</div>
<p class="page-intro">Vouch for a peer's fingerprint. Trusted peers see this and may treat the target as listening-eligible.</p>
<form method="post" action="/admin/federation/vouches/issue" style="display:grid; gap:10px; max-width:680px;">
<input type="text" name="target_fingerprint" placeholder="target fingerprint (32 hex chars)" class="lookup-input" maxlength="64" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">
<input type="number" name="ttl_days" value="90" min="1" max="3650" class="lookup-input" placeholder="ttl days">
<button type="submit" class="btn btn-enforce">+ issue vouch</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Per-Peer Quorum Status</h2>
<span class="count">{{ peer_rows|length }} peers</span>
</div>
<p class="page-intro">Threshold: {{ cfg.trust_min_vouchers }} distinct trusted vouchers required to make a non-trusted peer listening-eligible.</p>
{% if peer_rows %}
<table class="ledger">
<thead><tr><th>Peer</th><th>Status</th><th>Vouches</th><th>Quorum met</th><th>Eligible</th></tr></thead>
<tbody>
{% for row in peer_rows %}
<tr class="ledger-row">
<td><strong>{{ row.peer.domain }}</strong><br><span class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }}</span></td>
<td>
{% if row.peer.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif row.peer.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">{{ row.peer.status }}</span>
{% endif %}
</td>
<td>{{ row.vouches|length }}</td>
<td>
{% if row.vouched %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">yes</span>
{% else %}
<span class="sev-badge">no</span>
{% endif %}
</td>
<td>
{% if row.eligible %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">listening</span>
{% else %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">muted</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no peers registered yet)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -161,6 +161,35 @@ Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint) Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
Index("federation_signals_received_idx", federation_signals.c.received_at.desc()) Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
# Web-of-trust vouches — voucher signs an attestation that target is OK to listen to.
# Quorum is reached when enough distinct trusted vouchers sign for the same target.
vouches = Table(
"vouches", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("voucher_fingerprint", String, nullable=False),
Column("target_fingerprint", String, nullable=False),
Column("issued_at", String, nullable=False),
Column("expires_at", String, nullable=True),
Column("signature", Text, nullable=False), # base64 ed25519 sig
)
Index("vouches_unique_idx", vouches.c.voucher_fingerprint, vouches.c.target_fingerprint, unique=True)
Index("vouches_target_idx", vouches.c.target_fingerprint)
# Transparency log — append-only signed hash chain over every signal we receive.
# Each entry references the previous entry's hash; tampering with any row breaks
# verify_chain on every subsequent row.
translog = Table(
"translog", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("prev_hash", String, nullable=False),
Column("entry_type", String, nullable=False), # signal | vouch | config
Column("entry_data", Text, nullable=False), # canonical JSON of payload
Column("timestamp", String, nullable=False),
Column("entry_hash", String, nullable=False),
)
Index("translog_hash_idx", translog.c.entry_hash)
Index("translog_time_idx", translog.c.timestamp.desc())
_log = log.get(__name__) _log = log.get(__name__)
_engine: Optional[Engine] = None _engine: Optional[Engine] = None
@@ -388,3 +417,113 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn: with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
# ---------- federation: pulse_settings get/set (shared scratch kv) -------
def setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
"""Read one pulse_settings value by key. Returns None if absent."""
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return None if row is None else str(row.value)
def setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
"""Upsert one pulse_settings entry."""
stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- federation: vouches ------------------------------------------
def upsert_vouch(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert-or-update one vouch. Unique on (voucher_fp, target_fp)."""
stmt = sqlite_insert(vouches).values(**row)
update_cols = {k: stmt.excluded[k] for k in row if k not in ("voucher_fingerprint", "target_fingerprint")}
stmt = stmt.on_conflict_do_update(
index_elements=[vouches.c.voucher_fingerprint, vouches.c.target_fingerprint],
set_=update_cols,
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def list_vouches(db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).order_by(vouches.c.issued_at.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def vouches_by_target(target_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).where(vouches.c.target_fingerprint == target_fingerprint)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def vouches_by_voucher(voucher_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).where(vouches.c.voucher_fingerprint == voucher_fingerprint)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def delete_vouch(voucher_fingerprint: str, target_fingerprint: str, db_path: Path = DB_PATH) -> None:
stmt = vouches.delete().where(
(vouches.c.voucher_fingerprint == voucher_fingerprint)
& (vouches.c.target_fingerprint == target_fingerprint)
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- transparency log ---------------------------------------------
def translog_append(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one transparency-log entry. Returns inserted id."""
stmt = insert(translog).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def translog_head(db_path: Path = DB_PATH) -> Optional[dict]:
"""Highest-id (latest) entry, or None if chain empty."""
stmt = select(translog).order_by(translog.c.id.desc()).limit(1)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def translog_get(entry_id: int, db_path: Path = DB_PATH) -> Optional[dict]:
stmt = select(translog).where(translog.c.id == entry_id)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def translog_after(entry_id: int, db_path: Path = DB_PATH) -> List[dict]:
"""All entries with id > entry_id, oldest first — for sync."""
stmt = select(translog).where(translog.c.id > entry_id).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def translog_recent(limit: int = 100, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(translog).order_by(translog.c.id.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB_PATH) -> List[dict]:
"""All entries with start <= id (and id <= end if given), oldest first."""
cond = translog.c.id >= start
if end is not None:
cond = cond & (translog.c.id <= end)
stmt = select(translog).where(cond).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]

View File

@@ -24,6 +24,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from psyc import DATA_DIR, db, log from psyc import DATA_DIR, db, log
from psyc.lines import translog
from psyc.result import Err, Ok, Result from psyc.result import Err, Ok, Result
@@ -259,6 +260,9 @@ def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
"window_hours": window_hours, "window_hours": window_hours,
"cases": _build_case_records(window_hours), "cases": _build_case_records(window_hours),
"iocs": _build_ioc_records(window_hours), "iocs": _build_ioc_records(window_hours),
# Vouches we've issued ride along with the feed so peers can learn
# who we trust and accumulate quorum on shared targets.
"vouches": [v.model_dump() for v in our_vouches()],
} }
sig = sign_payload(canonical_json(payload)) sig = sign_payload(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii") payload["signature"] = base64.b64encode(sig).decode("ascii")
@@ -306,10 +310,16 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
except Exception as exc: except Exception as exc:
return Err(f"bad pubkey: {exc}") return Err(f"bad pubkey: {exc}")
# Listening gate: only accept signals from peers we explicitly trust or
# that quorum of trusted peers vouches for. Unknown peers don't land here.
if not peer_is_listening_eligible(peer_fp):
return Err(f"peer not trusted: {peer_fp}")
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
signal_ids: List[Tuple[str, str]] = [] signal_ids: List[Tuple[str, str]] = []
cases = feed.get("cases") or [] cases = feed.get("cases") or []
iocs = feed.get("iocs") or [] iocs = feed.get("iocs") or []
feed_vouches = feed.get("vouches") or []
for c in cases: for c in cases:
case_id = c.get("case_id") or "" case_id = c.get("case_id") or ""
@@ -323,6 +333,15 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
raw_json=json.dumps(c, sort_keys=True), raw_json=json.dumps(c, sort_keys=True),
)) ))
signal_ids.append(("case", digest)) signal_ids.append(("case", digest))
try:
translog.append("signal", {
"peer_fingerprint": peer_fp,
"signal_type": "case",
"signal_id": case_id,
"signal_hash": digest,
})
except Exception as exc: # transparency log is best-effort, never block ingest
_log.warning("federation.translog.append.fail", error=str(exc))
for i in iocs: for i in iocs:
value = i.get("value") or "" value = i.get("value") or ""
@@ -336,6 +355,36 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
raw_json=json.dumps(i, sort_keys=True), raw_json=json.dumps(i, sort_keys=True),
)) ))
signal_ids.append(("ioc", digest)) signal_ids.append(("ioc", digest))
try:
translog.append("signal", {
"peer_fingerprint": peer_fp,
"signal_type": "ioc",
"signal_id": value,
"signal_hash": digest,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
# Vouch propagation — peer asserts who they trust. We only accept vouches
# whose declared voucher fingerprint matches the peer we just authenticated
# (so a peer can't forge vouches "from" someone else through us).
for v_raw in feed_vouches:
if not isinstance(v_raw, dict):
continue
try:
vouch = Vouch.model_validate(v_raw)
except Exception as exc:
_log.warning("federation.vouch.malformed", error=str(exc))
continue
if vouch.voucher_fingerprint != peer_fp:
_log.warning(
"federation.vouch.voucher_mismatch",
claimed=vouch.voucher_fingerprint, actual=peer_fp,
)
continue
accepted = accept_vouch(vouch, expected_pubkey_pem)
if isinstance(accepted, Err):
_log.warning("federation.vouch.rejected", reason=accepted.reason)
_log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs)) _log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
return Ok(ImportSummary( return Ok(ImportSummary(
@@ -403,3 +452,279 @@ def set_peer_status(domain: str, status: str) -> None:
def remove_peer(domain: str) -> None: def remove_peer(domain: str) -> None:
db.remove_peer(domain) db.remove_peer(domain)
# ---------- vouching + quorum (stage 4) ---------------------------------
#
# The web of trust: a peer's fingerprint becomes "listening-eligible" when
# either we directly trust it (peers.status == "trusted") or at least
# `trust_min_vouchers` of our trusted peers have signed a vouch for it.
#
# Signal-level quorum: a federation_signals row is meaningful only when
# `signal_quorum_k` distinct vouched peers have reported the same signal_hash.
#
# Vouches are short Pydantic records signed with the voucher's Ed25519 key
# over canonical JSON of the body (everything but the signature field).
class Vouch(BaseModel):
voucher_fingerprint: str
target_fingerprint: str
issued_at: datetime
expires_at: Optional[datetime] = None
signature: str = "" # base64 ed25519 sig over vouch_payload_bytes(...)
class QuorumConfig(BaseModel):
trust_min_vouchers: int = 2
signal_quorum_k: int = 2
_QC_TRUST_KEY = "wot_trust_min"
_QC_K_KEY = "wot_quorum_k"
def quorum_config() -> QuorumConfig:
"""Live quorum settings, with sensible defaults if pulse_settings is empty."""
cfg = QuorumConfig()
t = db.setting_get(_QC_TRUST_KEY)
k = db.setting_get(_QC_K_KEY)
if t is not None:
try:
cfg.trust_min_vouchers = max(1, int(t))
except ValueError:
pass
if k is not None:
try:
cfg.signal_quorum_k = max(1, int(k))
except ValueError:
pass
return cfg
def set_quorum_config(cfg: QuorumConfig) -> None:
"""Persist quorum config into pulse_settings."""
db.setting_set(_QC_TRUST_KEY, str(cfg.trust_min_vouchers))
db.setting_set(_QC_K_KEY, str(cfg.signal_quorum_k))
def vouch_payload_bytes(
voucher_fp: str,
target_fp: str,
issued_at: datetime,
expires_at: Optional[datetime],
) -> bytes:
"""Canonical JSON of the unsigned vouch body — what the voucher signs."""
body: Dict[str, Any] = {
"voucher_fingerprint": voucher_fp,
"target_fingerprint": target_fp,
"issued_at": issued_at.isoformat(),
"expires_at": expires_at.isoformat() if expires_at else None,
}
return canonical_json(body)
def _store_vouch(v: Vouch) -> None:
db.upsert_vouch(dict(
voucher_fingerprint=v.voucher_fingerprint,
target_fingerprint=v.target_fingerprint,
issued_at=v.issued_at.isoformat(),
expires_at=v.expires_at.isoformat() if v.expires_at else None,
signature=v.signature,
))
def _row_to_vouch(row: Dict[str, Any]) -> Vouch:
return Vouch(
voucher_fingerprint=row["voucher_fingerprint"],
target_fingerprint=row["target_fingerprint"],
issued_at=datetime.fromisoformat(row["issued_at"]),
expires_at=datetime.fromisoformat(row["expires_at"]) if row.get("expires_at") else None,
signature=row.get("signature") or "",
)
def issue_vouch(target_fingerprint: str, ttl_days: int = 90) -> Vouch:
"""Sign a vouch for `target_fingerprint` with OUR key. Persists + returns it."""
our_fp = node_fingerprint()
issued_at = datetime.now(timezone.utc)
expires_at = issued_at + timedelta(days=ttl_days) if ttl_days > 0 else None
payload = vouch_payload_bytes(our_fp, target_fingerprint, issued_at, expires_at)
sig = sign_payload(payload)
vouch = Vouch(
voucher_fingerprint=our_fp,
target_fingerprint=target_fingerprint,
issued_at=issued_at,
expires_at=expires_at,
signature=base64.b64encode(sig).decode("ascii"),
)
_store_vouch(vouch)
try:
translog.append("vouch", {
"voucher_fingerprint": vouch.voucher_fingerprint,
"target_fingerprint": vouch.target_fingerprint,
"issued_at": vouch.issued_at.isoformat(),
"expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
_log.info("federation.vouch.issued", target=target_fingerprint, ttl_days=ttl_days)
return vouch
def accept_vouch(vouch: Vouch, voucher_pubkey_pem: str) -> Result[None, str]:
"""Verify signature + expiry + voucher trust status, then persist.
Failure modes return Err with a short reason so the caller can log them.
A voucher whose status is not "trusted" in our peers table is refused —
we don't accept transitive vouches from unknown peers.
"""
# Expiry first — cheapest check.
now = datetime.now(timezone.utc)
if vouch.expires_at is not None and vouch.expires_at < now:
return Err("vouch expired")
# Voucher must be a directly-trusted peer (no transitive trust at this layer).
voucher_status = None
for row in db.list_peers():
if row.get("fingerprint") == vouch.voucher_fingerprint:
voucher_status = row.get("status")
break
if voucher_status != "trusted":
return Err(f"voucher not trusted: {vouch.voucher_fingerprint}")
# The pubkey must match the declared voucher fingerprint.
try:
if _fingerprint_for_pubkey_pem(voucher_pubkey_pem) != vouch.voucher_fingerprint:
return Err("voucher pubkey does not match fingerprint")
except Exception as exc:
return Err(f"bad voucher pubkey: {exc}")
payload = vouch_payload_bytes(
vouch.voucher_fingerprint,
vouch.target_fingerprint,
vouch.issued_at,
vouch.expires_at,
)
try:
signature = base64.b64decode(vouch.signature)
except Exception:
return Err("vouch signature not base64")
if not verify_payload(payload, signature, voucher_pubkey_pem):
return Err("vouch signature invalid")
_store_vouch(vouch)
try:
translog.append("vouch", {
"voucher_fingerprint": vouch.voucher_fingerprint,
"target_fingerprint": vouch.target_fingerprint,
"issued_at": vouch.issued_at.isoformat(),
"expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
"accepted": True,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
_log.info("federation.vouch.accepted", voucher=vouch.voucher_fingerprint, target=vouch.target_fingerprint)
return Ok(None)
def revoke_vouch(target_fingerprint: str) -> None:
"""Delete OUR vouch naming `target_fingerprint`. No-op if absent."""
db.delete_vouch(node_fingerprint(), target_fingerprint)
_log.info("federation.vouch.revoked", target=target_fingerprint)
def our_vouches() -> List[Vouch]:
"""Vouches we have issued (filter for voucher_fingerprint == our fp)."""
return [_row_to_vouch(r) for r in db.vouches_by_voucher(node_fingerprint())]
def vouches_for(target_fingerprint: str) -> List[Vouch]:
"""Every vouch stored locally that names `target_fingerprint` as target."""
return [_row_to_vouch(r) for r in db.vouches_by_target(target_fingerprint)]
def is_vouched(target_fingerprint: str, min_vouchers: Optional[int] = None) -> bool:
"""True iff ≥`min_vouchers` distinct non-expired vouches from currently-trusted
peers name `target_fingerprint`.
"""
cfg = quorum_config()
threshold = min_vouchers if min_vouchers is not None else cfg.trust_min_vouchers
if threshold <= 0:
return True
now = datetime.now(timezone.utc)
trusted_fps = {p.fingerprint for p in list_peers() if p.status == "trusted"}
distinct_vouchers: set = set()
for v in vouches_for(target_fingerprint):
if v.expires_at is not None and v.expires_at < now:
continue
if v.voucher_fingerprint not in trusted_fps:
continue
distinct_vouchers.add(v.voucher_fingerprint)
if len(distinct_vouchers) >= threshold:
return True
return False
def peer_is_listening_eligible(fingerprint: str) -> bool:
"""True iff the peer is directly trusted OR vouched into trust.
This is the gate used by `import_signed_feed`. Auto-response will share
this signature — keep it stable.
"""
if not fingerprint:
return False
for p in list_peers():
if p.fingerprint == fingerprint:
if p.status == "trusted":
return True
if p.status == "blocked":
return False
break
return is_vouched(fingerprint)
def is_quorum_met(signal_hash: str, k: Optional[int] = None) -> bool:
"""True iff ≥k distinct vouched peers have reported `signal_hash`.
"Vouched" here means `peer_is_listening_eligible` — the same web-of-trust
set the import gate respects. Self-reports from the local node do not
count (they never end up in federation_signals).
"""
cfg = quorum_config()
threshold = k if k is not None else cfg.signal_quorum_k
if threshold <= 0:
return True
rows = db.signals_for_hash(signal_hash)
distinct: set = set()
for r in rows:
fp = r.get("peer_fingerprint") or ""
if not fp or fp in distinct:
continue
if not peer_is_listening_eligible(fp):
continue
distinct.add(fp)
if len(distinct) >= threshold:
return True
return False
def quorum_evidence(signal_hash: str) -> List[Tuple[str, datetime]]:
"""(peer_fingerprint, received_at) tuples for one signal_hash — for UI display.
Only includes signals from currently listening-eligible peers, deduped
per fingerprint at the earliest receipt.
"""
rows = db.signals_for_hash(signal_hash)
earliest: Dict[str, datetime] = {}
for r in rows:
fp = r.get("peer_fingerprint") or ""
if not fp or not peer_is_listening_eligible(fp):
continue
try:
ts = datetime.fromisoformat(r.get("received_at") or "")
except ValueError:
continue
if fp not in earliest or ts < earliest[fp]:
earliest[fp] = ts
return sorted(earliest.items(), key=lambda kv: kv[1])

161
src/psyc/lines/translog.py Normal file
View File

@@ -0,0 +1,161 @@
"""Transparency log — append-only signed merkle chain over federation signals.
Every signal we receive from a peer (case, IOC, or accepted vouch) is appended
as one `LogEntry`. Each entry's `entry_hash = sha256(canonical(prev_hash +
entry_type + entry_data + timestamp))` references the previous head, so any
tampering with a historical row invalidates every subsequent hash. The chain
is public — auditors can re-fetch it and re-run `verify_chain` to detect a
node that quietly mutated history (e.g. to hide a bad signal it accepted).
Hash format: lowercase hex SHA-256 of the canonical JSON of
``{"prev_hash": "...", "entry_type": "...", "entry_data": {...}, "timestamp": "..."}``.
Genesis entries use ``prev_hash = "0" * 64``.
"""
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
GENESIS_PREV_HASH = "0" * 64
class LogEntry(BaseModel):
id: int
prev_hash: str
entry_type: str
entry_data: Dict[str, Any] = Field(default_factory=dict)
timestamp: str
entry_hash: str
def _canonical_json(obj: Dict[str, Any]) -> bytes:
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def compute_entry_hash(prev_hash: str, entry_type: str, entry_data: Dict[str, Any], timestamp: str) -> str:
"""Hex SHA-256 of canonical(prev_hash + entry_type + entry_data + timestamp)."""
payload: Dict[str, Any] = {
"prev_hash": prev_hash,
"entry_type": entry_type,
"entry_data": entry_data,
"timestamp": timestamp,
}
return hashlib.sha256(_canonical_json(payload)).hexdigest()
def _row_to_entry(row: Dict[str, Any]) -> LogEntry:
raw = row.get("entry_data") or "{}"
try:
data = json.loads(raw)
except Exception:
data = {}
return LogEntry(
id=int(row["id"]),
prev_hash=str(row["prev_hash"]),
entry_type=str(row["entry_type"]),
entry_data=data if isinstance(data, dict) else {},
timestamp=str(row["timestamp"]),
entry_hash=str(row["entry_hash"]),
)
def head(db_path: Path = db.DB_PATH) -> Optional[LogEntry]:
"""Latest log entry, or None if the chain is empty."""
row = db.translog_head(db_path=db_path)
return _row_to_entry(row) if row else None
def append(entry_type: str, entry_data: Dict[str, Any], db_path: Path = db.DB_PATH) -> LogEntry:
"""Atomically append one entry to the chain. Returns the persisted entry."""
prev = db.translog_head(db_path=db_path)
prev_hash = str(prev["entry_hash"]) if prev else GENESIS_PREV_HASH
timestamp = datetime.now(timezone.utc).isoformat()
entry_hash = compute_entry_hash(prev_hash, entry_type, entry_data, timestamp)
new_id = db.translog_append(
dict(
prev_hash=prev_hash,
entry_type=entry_type,
entry_data=json.dumps(entry_data, sort_keys=True),
timestamp=timestamp,
entry_hash=entry_hash,
),
db_path=db_path,
)
_log.info("translog.append", id=new_id, entry_type=entry_type, hash=entry_hash[:12])
return LogEntry(
id=new_id,
prev_hash=prev_hash,
entry_type=entry_type,
entry_data=entry_data,
timestamp=timestamp,
entry_hash=entry_hash,
)
def verify_chain(start: int = 0, end: Optional[int] = None, db_path: Path = db.DB_PATH) -> Result[int, str]:
"""Walk entries [start, end] in id order, recompute each hash, compare.
Returns Ok(n_verified) when every entry's recomputed hash equals the
stored one and each prev_hash matches the previous entry's stored hash.
Returns Err with the offending id + expected/got hashes otherwise.
"""
rows = db.translog_range(start=start, end=end, db_path=db_path)
if not rows:
return Ok(0)
# Establish the prior hash anchor — either genesis (if walking from id=1)
# or the entry just before `start`.
first_id = int(rows[0]["id"])
if first_id <= 1:
prior_hash = GENESIS_PREV_HASH
else:
anchor = db.translog_get(first_id - 1, db_path=db_path)
if anchor is None:
return Err(f"missing anchor entry id={first_id - 1}")
prior_hash = str(anchor["entry_hash"])
verified = 0
for row in rows:
stored_prev = str(row["prev_hash"])
if stored_prev != prior_hash:
return Err(
f"broken at id={row['id']} expected_prev={prior_hash} got_prev={stored_prev}"
)
try:
data = json.loads(row.get("entry_data") or "{}")
except Exception:
return Err(f"broken at id={row['id']} entry_data not JSON")
if not isinstance(data, dict):
return Err(f"broken at id={row['id']} entry_data not an object")
recomputed = compute_entry_hash(
stored_prev, str(row["entry_type"]), data, str(row["timestamp"])
)
stored_hash = str(row["entry_hash"])
if recomputed != stored_hash:
return Err(
f"broken at id={row['id']} expected={recomputed} got={stored_hash}"
)
prior_hash = stored_hash
verified += 1
return Ok(verified)
def recent(limit: int = 100, db_path: Path = db.DB_PATH) -> List[LogEntry]:
"""The latest `limit` entries, newest first."""
return [_row_to_entry(r) for r in db.translog_recent(limit=limit, db_path=db_path)]
def entries_after(entry_id: int, db_path: Path = db.DB_PATH) -> List[LogEntry]:
"""All entries with id > entry_id, oldest first — for peer sync."""
return [_row_to_entry(r) for r in db.translog_after(entry_id, db_path=db_path)]

View File

@@ -159,6 +159,8 @@ def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
new_sig = peer_priv.sign(canonical_json(unsigned)) new_sig = peer_priv.sign(canonical_json(unsigned))
feed["signature"] = base64.b64encode(new_sig).decode("ascii") feed["signature"] = base64.b64encode(new_sig).decode("ascii")
# Stage 4 listening gate: peer must be trusted to land signals.
federation.register_peer("peer.example", peer_fp, peer_pub_pem, status="trusted")
result = import_signed_feed(feed, peer_pub_pem) result = import_signed_feed(feed, peer_pub_pem)
assert isinstance(result, Ok), getattr(result, "reason", "") assert isinstance(result, Ok), getattr(result, "reason", "")
summary = result.value summary = result.value

118
tests/test_translog.py Normal file
View File

@@ -0,0 +1,118 @@
"""Transparency log — append, verify, tamper detection, sync slices."""
from __future__ import annotations
import json
import pytest
from sqlalchemy import create_engine, update
from psyc import db
from psyc.lines import translog
from psyc.lines.translog import GENESIS_PREV_HASH
from psyc.result import Err, Ok
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
def test_first_append_uses_genesis_prev_hash(fresh_db):
e = translog.append("signal", {"x": 1})
assert e.prev_hash == GENESIS_PREV_HASH
assert e.id >= 1
assert e.entry_type == "signal"
assert e.entry_data == {"x": 1}
# head matches
h = translog.head()
assert h is not None
assert h.id == e.id
assert h.entry_hash == e.entry_hash
def test_append_chains_prev_hash(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
e3 = translog.append("vouch", {"c": 3})
assert e2.prev_hash == e1.entry_hash
assert e3.prev_hash == e2.entry_hash
head = translog.head()
assert head is not None
assert head.entry_hash == e3.entry_hash
def test_verify_chain_ok_round_trip(fresh_db):
translog.append("signal", {"a": 1})
translog.append("signal", {"b": 2})
translog.append("vouch", {"c": 3})
result = translog.verify_chain()
assert isinstance(result, Ok)
assert result.value == 3
def test_verify_chain_empty_returns_zero(fresh_db):
result = translog.verify_chain()
assert isinstance(result, Ok)
assert result.value == 0
def test_verify_chain_detects_tampered_data(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
# Mutate entry_data of the first row directly in the DB; entry_hash stays
# the same but no longer matches the recomputed hash.
with db.engine().begin() as conn:
conn.execute(
update(db.translog)
.where(db.translog.c.id == e1.id)
.values(entry_data=json.dumps({"a": 999}, sort_keys=True))
)
result = translog.verify_chain()
assert isinstance(result, Err)
assert "broken at id=" in result.reason
def test_verify_chain_detects_tampered_prev_hash(fresh_db):
translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
# Flip e2.prev_hash so it no longer matches e1.entry_hash.
with db.engine().begin() as conn:
conn.execute(
update(db.translog)
.where(db.translog.c.id == e2.id)
.values(prev_hash="f" * 64)
)
result = translog.verify_chain()
assert isinstance(result, Err)
assert "broken at id=" in result.reason
def test_entries_after_returns_correct_slice(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
e3 = translog.append("signal", {"c": 3})
after_zero = translog.entries_after(0)
assert [e.id for e in after_zero] == [e1.id, e2.id, e3.id]
after_e1 = translog.entries_after(e1.id)
assert [e.id for e in after_e1] == [e2.id, e3.id]
after_e3 = translog.entries_after(e3.id)
assert after_e3 == []
def test_recent_newest_first(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
e3 = translog.append("signal", {"c": 3})
recent = translog.recent(limit=10)
assert [e.id for e in recent] == [e3.id, e2.id, e1.id]

336
tests/test_vouching.py Normal file
View File

@@ -0,0 +1,336 @@
"""Vouching + quorum — sign/verify, threshold logic, import gate."""
from __future__ import annotations
import base64
import hashlib
from datetime import datetime, timedelta, timezone
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation
from psyc.lines.federation import (
QuorumConfig,
Vouch,
accept_vouch,
build_signed_feed,
canonical_json,
import_signed_feed,
is_quorum_met,
is_vouched,
issue_vouch,
node_fingerprint,
our_vouches,
peer_is_listening_eligible,
public_key_pem,
quorum_config,
register_peer,
revoke_vouch,
set_quorum_config,
vouch_payload_bytes,
)
from psyc.result import Err, Ok
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def _make_peer():
"""Generate an Ed25519 keypair + matching fingerprint for a fake peer."""
priv = ed25519.Ed25519PrivateKey.generate()
pub = priv.public_key()
pub_pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
raw = pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
fp = hashlib.sha256(raw).digest()[:16].hex()
return priv, pub_pem, fp
def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at):
payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at)
sig = priv.sign(payload)
return base64.b64encode(sig).decode("ascii")
# ---------- self-issued vouch round-trip --------------------------------
def test_issue_vouch_roundtrip(fresh_db, fed_dir):
target = "ab" * 16
v = issue_vouch(target, ttl_days=30)
assert v.voucher_fingerprint == node_fingerprint()
assert v.target_fingerprint == target
assert v.expires_at is not None
# round-trip from storage
listed = our_vouches()
assert len(listed) == 1
assert listed[0].target_fingerprint == target
assert listed[0].signature == v.signature
# signature verifies under our own pubkey
payload = vouch_payload_bytes(
v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at
)
sig = base64.b64decode(v.signature)
assert federation.verify_payload(payload, sig, public_key_pem())
def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir):
target = "cd" * 16
issue_vouch(target, ttl_days=30)
assert len(our_vouches()) == 1
revoke_vouch(target)
assert our_vouches() == []
# ---------- accept_vouch validation -------------------------------------
def test_accept_vouch_rejects_expired(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc) - timedelta(days=10)
expired = datetime.now(timezone.utc) - timedelta(days=1)
sig = _sign_vouch(priv, fp, "target", issued, expired)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expired, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "expired" in result.reason
def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
# Sign a different target then claim it's for "real-target".
real_sig = _sign_vouch(priv, fp, "other-target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target",
issued_at=issued, expires_at=expires, signature=real_sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "signature" in result.reason
def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
# Voucher exists but is "unknown" not "trusted".
register_peer("voucher.example", fp, pem, status="unknown")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "not trusted" in result.reason
def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Ok)
# ---------- is_vouched threshold ----------------------------------------
def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir):
"""Two vouches from the same peer must NOT clear a threshold of 2."""
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
assert isinstance(accept_vouch(v1, pem), Ok)
# Newer vouch from the SAME voucher — upsert replaces, count stays 1.
issued2 = issued + timedelta(seconds=1)
sig2 = _sign_vouch(priv, fp, "target", issued2, expires)
v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued2, expires_at=expires, signature=sig2)
assert isinstance(accept_vouch(v2, pem), Ok)
assert is_vouched("target", min_vouchers=2) is False
# Threshold of 1 should pass.
assert is_vouched("target", min_vouchers=1) is True
def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir):
priv_a, pem_a, fp_a = _make_peer()
priv_b, pem_b, fp_b = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target",
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_a, fp_a, "target", issued, expires))
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target",
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_b, fp_b, "target", issued, expires))
assert isinstance(accept_vouch(va, pem_a), Ok)
assert isinstance(accept_vouch(vb, pem_b), Ok)
assert is_vouched("target", min_vouchers=2) is True
assert is_vouched("target", min_vouchers=3) is False
# ---------- quorum on signal_hash ---------------------------------------
def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir):
# Two trusted peers + one untrusted peer report the same signal_hash.
_, pem_a, fp_a = _make_peer()
_, pem_b, fp_b = _make_peer()
_, pem_c, fp_c = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible
for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="h-aaa",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
assert is_quorum_met("h-aaa", k=2) is True
assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts
# ---------- quorum config persistence -----------------------------------
def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir):
cfg = quorum_config()
assert cfg.trust_min_vouchers == 2
assert cfg.signal_quorum_k == 2
set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4))
cfg2 = quorum_config()
assert cfg2.trust_min_vouchers == 3
assert cfg2.signal_quorum_k == 4
# ---------- import gate enforces listening eligibility ------------------
def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None):
"""Build a feed claiming origin=peer_fp, signed with peer_priv."""
payload = {
"version": federation.FEED_VERSION,
"fingerprint": peer_fp,
"generated_at": datetime.now(timezone.utc).isoformat(),
"window_hours": 24,
"cases": [],
"iocs": [{
"value": "9.9.9.9",
"type": "ip",
"severity": "high",
"first_seen": datetime.now(timezone.utc).isoformat(),
"digest_sha256": "abc123",
}],
"vouches": vouches or [],
}
sig = peer_priv.sign(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload
def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir):
peer_priv, peer_pem, peer_fp = _make_peer()
feed = _signed_feed_from_peer(peer_priv, peer_fp)
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Err)
assert "not trusted" in result.reason
def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir):
peer_priv, peer_pem, peer_fp = _make_peer()
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
feed = _signed_feed_from_peer(peer_priv, peer_fp)
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir):
# Two trusted peers vouch for a third — third becomes listening-eligible.
priv_a, pem_a, fp_a = _make_peer()
priv_b, pem_b, fp_b = _make_peer()
priv_c, pem_c, fp_c = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c,
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires))
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c,
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires))
assert isinstance(accept_vouch(va, pem_a), Ok)
assert isinstance(accept_vouch(vb, pem_b), Ok)
assert peer_is_listening_eligible(fp_c) is True
feed = _signed_feed_from_peer(priv_c, fp_c)
result = import_signed_feed(feed, pem_c)
assert isinstance(result, Ok), getattr(result, "reason", "")
def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir):
"""A trusted peer's feed carries a vouch the peer issued — we should
accept_vouch it and store it locally."""
peer_priv, peer_pem, peer_fp = _make_peer()
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
target_fp = "ff" * 16
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
peer_vouch = Vouch(
voucher_fingerprint=peer_fp,
target_fingerprint=target_fp,
issued_at=issued,
expires_at=expires,
signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires),
)
feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")])
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
# The vouch is now in our local store under the peer's fingerprint.
stored = federation.vouches_for(target_fp)
assert any(v.voucher_fingerprint == peer_fp for v in stored)