Compare commits

...

20 Commits

Author SHA1 Message Date
m17hr1l
3e737d61b3 stage-34 wire admin nav: federation hub links to discovery + SW v4 2026-06-06 21:17:57 +02:00
m17hr1l
a53aacfdd8 merge auto-response: severity/quorum/local-only gated execution
# Conflicts:
#	src/psyc/db.py
2026-06-06 21:17:20 +02:00
m17hr1l
53ba537ce8 merge vouching+translog: web-of-trust + signed merkle audit log
# Conflicts:
#	src/psyc/_federation_cli.py
#	src/psyc/cockpit/federation_routes.py
2026-06-06 21:15:11 +02:00
m17hr1l
726117b19b merge discovery: DNS-SD walker + public peers endpoint 2026-06-06 21:13:29 +02:00
m17hr1l
c5472b3134 stage-auto-e pulse: tests for auto-response gating
Cover the auto-fire decision matrix:
- _severity_rank ordering
- mode != auto-execute → never fires (auto-propose, manual)
- below-threshold action is skipped + audited
- federation case + no quorum → skipped + audited "no quorum"
- federation case + quorum met → fires
- local case + quorum required + local-only on → still fires
- local case + quorum required + local-only off → still fires
- quorum gating disabled → federation cases fire too
- kill switch armed → tick() skips everything
- pulse_audit records both auto-fire and skip rows
- audit_count_since returns the per-action counts the cockpit needs
- config round-trips through pulse_settings

Tests patch federation.is_quorum_met (raising=False so the sibling
agent can ship the real function later without breaking these), and
swap respond.execute_action for a counter so no SOAR sink call escapes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:12:02 +02:00
m17hr1l
f5ca928f92 stage-auto-d pulse: cockpit auto-response state panel + CLI
Cockpit:
- /admin/pulse now renders an "AUTO-RESPONSE STATE" panel above the
  pipeline table — mode badge (traffic-light colored), threshold,
  quorum on/off, local-only on/off, auto-fired-in-24h count, last 5
  audit entries, and a one-form save for threshold + gates.
- POST /admin/pulse/respond-config writes the new gates.

CLI:
- pulse-respond-config [--threshold …] [--quorum/--no-quorum]
                       [--local-only/--no-local-only]
  Args left unset are unchanged; echoes the post-state.
- pulse-respond-status prints mode, gates, and the last 10 audit
  entries.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:11:52 +02:00
m17hr1l
e66c3d3359 stage-auto-c pulse: respond runner with gates + auto-fire path
Replace the propose-only respond runner with a two-phase runner: phase 1
always proposes actions for fresh high-severity cases (unchanged); phase 2
fires when pipeline mode is auto-execute and the action clears all gates.

Gates:
- severity ≥ configured threshold
- if require_quorum is on, federation-sourced cases must hit
  federation.is_quorum_met (wrapped in try/except so we tolerate the
  sibling agent not having shipped that function yet — fallback is
  "no quorum metric → don't auto-fire", the safe default)
- locally-generated cases (no row in federation_signals for their case_id)
  bypass the quorum check
- when local-only is armed, federation-sourced cases never auto-fire
  even with quorum

Every decision (auto-fire, skipped, error) records a pulse_audit row so
the cockpit and CLI can show history. Per-action try/except keeps one
bad action from aborting the whole batch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:11:39 +02:00
m17hr1l
f4148d86a6 stage-vouch-e federation: tests for vouching + quorum gate
test_vouching covers the contract auto-response and other agents will
gate on:

- issue_vouch round-trip (sign + verify under our own pubkey)
- accept_vouch rejects expired vouches
- accept_vouch rejects mismatched signatures
- accept_vouch rejects vouchers whose peers.status != "trusted"
- accept_vouch happy path
- is_vouched needs DISTINCT vouchers (two upserts from one peer == 1)
- is_vouched clears threshold with two distinct trusted vouchers
- is_quorum_met counts only listening-eligible peers (untrusted +
  duplicate rows don't count)
- quorum_config defaults + pulse_settings persistence
- import_signed_feed rejects unknown peer ("not trusted")
- import_signed_feed accepts directly-trusted peer
- import_signed_feed accepts a peer made eligible via two vouches
- import_signed_feed stores vouches embedded in a trusted peer's feed

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:11:18 +02:00
m17hr1l
0e56fa70af stage-vouch-d federation: cockpit pages + CLI + public endpoints
Public JSON endpoints (no auth):
- GET /federation/vouches — our_vouches() so peers can pull our trust
- GET /federation/log — last 100 transparency-log entries
- GET /federation/log/verify — re-walks the chain, returns
  {verified, head_hash} or 409 with {error, head_hash}

Admin pages (TOTP-gated):
- /admin/federation/vouches — issued list, issue form, revoke
  buttons, per-peer quorum-met table
- /admin/federation/log — chain verification status + last 200 entries
- /admin/federation/quorum — config form + per-peer eligibility +
  per-signal-hash distinct-eligible-peer counts

CLI: fed-vouch / fed-unvouch / fed-vouches / fed-quorum-set / fed-log /
fed-log-verify, plus existing fed-* commands untouched.

The base /admin/federation page now links to the three new sub-pages.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:11:03 +02:00
m17hr1l
31ec1557ec stage-auto-b pulse: pulse_audit table + history
Add a per-decision audit log so the cockpit + CLI can show what the
auto-response runner did each tick:
- pulse_audit table: id, pipeline, action ('auto-fire'|'skipped'|'error'),
  action_id, case_id, detail, timestamp
- helpers: pulse_audit_record, pulse_audit_recent, pulse_audit_count_since
- indexes on (pipeline, timestamp desc) and on action_id

Also add db.signals_for_case(case_id) — checks the federation_signals
buffer to tell whether a case was peer-sourced. Used by the runner to
decide if a quorum check is required.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:10:38 +02:00
m17hr1l
eadd1aea3b stage-vouch-c federation: import gate + translog hook (stage-trans-b)
import_signed_feed now refuses any feed whose declared fingerprint isn't
peer_is_listening_eligible (directly trusted OR vouched in), returning
Err("peer not trusted: …") before any signal lands.

For every case/IOC it does record, it also appends a "signal" entry to
the transparency log (best-effort — logger warns but doesn't abort
ingest if the append fails). This is the stage-trans-b hook: the
import path is the chokepoint, so attaching the chain there gives
us coverage of every peer-originated signal we've ever accepted.

build_signed_feed now includes our_vouches() in the feed body so vouches
propagate. On import we accept_vouch each one — but only if the embedded
voucher_fingerprint matches the peer we just authenticated, so a peer
can't forge vouches "from" someone else through us.

test_federation: the long-standing round-trip test now first registers
the synthetic peer as trusted so the gate lets it through.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:10:36 +02:00
m17hr1l
234e6d98ba stage-vouch-b federation: vouch sign/verify + quorum API
Add the web-of-trust primitives, all keyed off the existing node keypair:

- Vouch + QuorumConfig Pydantic models
- vouch_payload_bytes — canonical-JSON body that the voucher signs
- issue_vouch / accept_vouch / revoke_vouch / our_vouches / vouches_for
- is_vouched(target, min_vouchers) — counts DISTINCT trusted vouchers,
  ignoring expired vouches and re-using QuorumConfig defaults
- peer_is_listening_eligible(fp) — direct-trust OR vouched-in
- is_quorum_met(signal_hash, k) — distinct listening-eligible peers
  reporting the same hash
- quorum_evidence(signal_hash) — (peer_fp, received_at) tuples for UI
- quorum_config / set_quorum_config — persisted in pulse_settings

accept_vouch is paranoid: rejects expired vouches, vouchers that aren't
currently "trusted" in our peers table, mismatched pubkey-fingerprint
pairs, malformed base64, and Ed25519 verification failures — each with
a short Err reason.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:10:03 +02:00
m17hr1l
0dbeb056c5 stage-auto-a pulse: respond config (threshold/quorum/local-only)
Persisted-key/value helpers for the respond-pipeline auto-fire gates:
- respond_auto_threshold / set_… (Severity, default HIGH)
- respond_require_quorum / set_… (bool, default True)
- respond_local_only / set_…     (bool, default False)
Plus a _severity_rank helper for threshold comparison.

Backing store is the existing pulse_settings table; this commit also adds
generic pulse_setting_get / pulse_setting_set helpers in db.py so future
pulse settings don't each need their own column-pair helper.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:09:46 +02:00
m17hr1l
7a510c7acf stage-trans-a translog: append-only signed merkle chain + tests
translog.append computes
sha256(canonical({prev_hash, entry_type, entry_data, timestamp})) and
writes one row per call; the first entry uses prev_hash = "0"*64.
verify_chain walks rows in id order, re-hashes each, and returns
Err("broken at id=X expected=... got=...") on the first mismatch — so
tampering with either entry_data or prev_hash invalidates every
downstream row. recent / entries_after / head support peer sync and UI.

Tests cover: genesis prev_hash, chained prev_hash, full-chain verify,
tampered-data detection, tampered-prev_hash detection, slicing.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:09:32 +02:00
m17hr1l
4a9f6ceb7f stage-vouch-a federation: vouches table + DB helpers
Add `vouches` (voucher_fp, target_fp, issued_at, expires_at, signature)
with unique (voucher, target) and target index, plus `translog` for the
append-only signed merkle chain (id, prev_hash, entry_type, entry_data,
timestamp, entry_hash). Also surface `setting_get` / `setting_set`
helpers on pulse_settings so the quorum config has a place to live.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:09:25 +02:00
m17hr1l
ff88aba569 stage-disc-e discovery: tests 2026-06-06 21:08:15 +02:00
m17hr1l
9b49f768ca stage-disc-d discovery: cockpit + CLI 2026-06-06 21:06:39 +02:00
m17hr1l
ddb40ff92c stage-disc-c discovery: pulse pipeline wiring + seeds settings 2026-06-06 21:04:54 +02:00
m17hr1l
6241a21af5 stage-disc-b discovery: peer walker (BFS) 2026-06-06 21:04:17 +02:00
m17hr1l
de6204819b stage-disc-a discovery: dnssd resolver + public peers endpoint 2026-06-06 21:03:33 +02:00
22 changed files with 3454 additions and 20 deletions

View File

@@ -23,6 +23,7 @@ dependencies = [
"pyotp>=2.9",
"qrcode[pil]>=7.4",
"itsdangerous>=2.1",
"dnspython>=2.4",
]
[project.optional-dependencies]

View File

@@ -7,14 +7,14 @@ from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
from typing import List, Optional
import httpx
import typer
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err
from psyc.lines import discovery, federation, pulse, translog
from psyc.result import Err, Ok
_log = log.get(__name__)
@@ -133,3 +133,157 @@ def register(typer_app: typer.Typer) -> None:
db.init_db()
federation.remove_peer(domain)
typer.echo(f"removed {domain}")
# ---------- discovery (DNS-SD walker) ----------------------------------
@typer_app.command("fed-resolve")
def fed_resolve(
domain: str = typer.Argument(..., help="domain to look up via _psyc._tcp.<domain>"),
timeout: float = typer.Option(5.0, "--timeout", help="DNS lookup timeout, seconds"),
) -> None:
"""Resolve a domain's psyc DNS-SD record. Prints fingerprint + port."""
result = discovery.resolve_psyc(domain, timeout=timeout)
if isinstance(result, Err):
typer.echo(f"error: {result.reason}", err=True)
raise typer.Exit(1)
c = result.value
typer.echo(f" domain {c.domain}")
typer.echo(f" fingerprint {c.fingerprint}")
typer.echo(f" port {c.port}")
typer.echo(f" source {c.source}")
@typer_app.command("fed-walk")
def fed_walk(
seeds: List[str] = typer.Argument(..., help="one or more seed domains"),
depth: int = typer.Option(2, "--depth", help="max BFS depth"),
max_peers: int = typer.Option(200, "--max-peers", help="cap discovered candidates"),
record: bool = typer.Option(False, "--record", help="persist candidates as status=unknown"),
) -> None:
"""Walk DNS-SD + peer-public from `seeds`. Prints discovered table."""
db.init_db()
cands = discovery.walk(seeds, max_depth=depth, max_peers=max_peers)
if not cands:
typer.echo("(no candidates discovered)")
return
typer.echo(f"{'domain':<32} {'fingerprint':<24} {'port':>5} source")
for c in cands:
fp = f"{c.fingerprint[:8]}{c.fingerprint[-8:]}"
typer.echo(f"{c.domain:<32} {fp:<24} {c.port:>5} {c.source}")
if record:
for c in cands:
discovery.record_candidate(c)
typer.echo(f"recorded {len(cands)} candidate(s) into peers table")
@typer_app.command("fed-seeds-list")
def fed_seeds_list() -> None:
"""Print the operator-curated discovery seed list."""
db.init_db()
seeds = pulse.get_discovery_seeds()
if not seeds:
typer.echo("(no seeds configured)")
return
for s in seeds:
typer.echo(s)
@typer_app.command("fed-seeds-add")
def fed_seeds_add(domain: str = typer.Argument(...)) -> None:
"""Append a seed domain (no-op if already present)."""
db.init_db()
seeds = pulse.get_discovery_seeds()
d = domain.strip()
if d in seeds:
typer.echo(f"{d} already a seed")
return
seeds.append(d)
pulse.set_discovery_seeds(seeds)
typer.echo(f"added seed {d}")
@typer_app.command("fed-seeds-remove")
def fed_seeds_remove(domain: str = typer.Argument(...)) -> None:
"""Remove a seed domain (no-op if absent)."""
db.init_db()
seeds = pulse.get_discovery_seeds()
d = domain.strip()
if d not in seeds:
typer.echo(f"{d} not in seeds")
return
seeds = [s for s in seeds if s != d]
pulse.set_discovery_seeds(seeds)
typer.echo(f"removed seed {d}")
# ---------- vouching + quorum --------------------------------------
@typer_app.command("fed-vouch")
def fed_vouch(
target_fp: str = typer.Argument(..., help="target peer fingerprint (32 hex)"),
ttl_days: int = typer.Option(90, "--ttl-days", help="vouch lifetime in days"),
) -> None:
"""Issue a signed vouch for `target_fp`. Persists locally + rides our feed."""
db.init_db()
v = federation.issue_vouch(target_fp.strip(), ttl_days=ttl_days)
typer.echo(f"vouched: {v.target_fingerprint} (expires {v.expires_at})")
@typer_app.command("fed-unvouch")
def fed_unvouch(target_fp: str = typer.Argument(...)) -> None:
"""Revoke OUR vouch for `target_fp`."""
db.init_db()
federation.revoke_vouch(target_fp.strip())
typer.echo(f"revoked vouch for {target_fp}")
@typer_app.command("fed-vouches")
def fed_vouches() -> None:
"""List vouches WE have issued."""
db.init_db()
rows = federation.our_vouches()
if not rows:
typer.echo("(no vouches issued)")
return
for v in rows:
exp = v.expires_at.isoformat() if v.expires_at else ""
typer.echo(f" {v.target_fingerprint} issued={v.issued_at.isoformat()[:16]} expires={exp[:16]}")
@typer_app.command("fed-quorum-set")
def fed_quorum_set(
trust: Optional[int] = typer.Option(None, "--trust", help="trust_min_vouchers"),
k: Optional[int] = typer.Option(None, "--k", help="signal_quorum_k"),
) -> None:
"""Update quorum thresholds. Either flag is optional — only changed values overwrite."""
db.init_db()
cfg = federation.quorum_config()
if trust is not None:
cfg.trust_min_vouchers = max(1, int(trust))
if k is not None:
cfg.signal_quorum_k = max(1, int(k))
federation.set_quorum_config(cfg)
typer.echo(f"quorum: trust_min_vouchers={cfg.trust_min_vouchers} signal_quorum_k={cfg.signal_quorum_k}")
# ---------- transparency log --------------------------------------
@typer_app.command("fed-log")
def fed_log(
limit: int = typer.Option(20, "--limit", help="number of entries to show"),
) -> None:
"""Print recent transparency-log entries (newest first)."""
db.init_db()
rows = translog.recent(limit=limit)
if not rows:
typer.echo("(transparency log empty)")
return
for e in rows:
typer.echo(
f" id={e.id:5d} {e.entry_type:6s} {e.timestamp[:19]} hash={e.entry_hash[:16]}"
)
@typer_app.command("fed-log-verify")
def fed_log_verify() -> None:
"""Re-walk the chain locally and report verification status."""
db.init_db()
result = translog.verify_chain()
head = translog.head()
head_hash = head.entry_hash if head else "(empty)"
if isinstance(result, Err):
typer.echo(f" ✗ broken: {result.reason}", err=True)
typer.echo(f" head_hash: {head_hash}")
raise typer.Exit(1)
typer.echo(f" ✓ verified {result.value} entries")
typer.echo(f" head_hash: {head_hash}")

View File

@@ -14,6 +14,7 @@ import typer
from psyc import db
from psyc.lines import pulse
from psyc.models import Severity
def _relative(dt: Optional[datetime]) -> str:
@@ -120,3 +121,62 @@ def register(typer_app: typer.Typer) -> None:
db.init_db()
pulse.set_kill_switch(False)
typer.echo("kill switch disarmed — pulse resumes")
@typer_app.command("pulse-respond-config")
def pulse_respond_config(
threshold: Optional[str] = typer.Option(
None, "--threshold", help="min severity: low | medium | high | critical"
),
quorum: Optional[bool] = typer.Option(
None, "--quorum/--no-quorum", help="require quorum on federation-sourced cases"
),
local_only: Optional[bool] = typer.Option(
None, "--local-only/--no-local-only",
help="when armed, auto-execute defers federation cases until quorum"
),
) -> None:
"""Update the respond-pipeline auto-fire gates. Args left unset are unchanged."""
db.init_db()
if threshold is not None:
try:
pulse.set_respond_auto_threshold(Severity(threshold))
except ValueError:
typer.echo(f"error: unknown severity {threshold!r}", err=True)
raise typer.Exit(1)
if quorum is not None:
pulse.set_respond_require_quorum(quorum)
if local_only is not None:
pulse.set_respond_local_only(local_only)
typer.echo(
f"threshold={pulse.respond_auto_threshold().value} "
f"quorum={'on' if pulse.respond_require_quorum() else 'off'} "
f"local-only={'on' if pulse.respond_local_only() else 'off'}"
)
@typer_app.command("pulse-respond-status")
def pulse_respond_status() -> None:
"""Print the respond-pipeline gates + the last 10 audit entries."""
db.init_db()
mode = "manual"
for p in pulse.state():
if p.name == "respond":
mode = p.mode.value
break
typer.echo(f"respond mode : {mode}")
typer.echo(f"threshold : {pulse.respond_auto_threshold().value}")
typer.echo(f"require quorum : {'yes' if pulse.respond_require_quorum() else 'no'}")
typer.echo(f"local-only : {'yes' if pulse.respond_local_only() else 'no'}")
audit = db.pulse_audit_recent("respond", limit=10)
if not audit:
typer.echo("(no audit entries yet)")
return
typer.echo("")
typer.echo(f"{'timestamp':<28} {'action':<11} {'case_id':<22} detail")
for row in audit:
typer.echo(
f"{(row['timestamp'] or '')[:27]:<28} "
f"{(row['action'] or ''):<11} "
f"{(row['case_id'] or ''):<22} "
f"{(row['detail'] or '')[:80]}"
)

View File

@@ -15,7 +15,8 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red
from fastapi.templating import Jinja2Templates
from psyc import db, log
from psyc.lines import federation
from psyc.lines import discovery, federation, pulse, translog
from psyc.result import Err
_log = log.get(__name__)
@@ -25,6 +26,10 @@ _log = log.get(__name__)
_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_FEED_TTL = 60.0
# Mirror the feed cache for the public peers list — same poll-load pattern.
_PUBLIC_PEERS_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_PUBLIC_PEERS_TTL = 60.0
def _admin_ok(request: Request) -> bool:
return bool(request.session.get("admin_ok"))
@@ -38,6 +43,14 @@ def _cached_feed() -> Dict[str, Any]:
return _FEED_CACHE["payload"]
def _cached_public_peers() -> Any:
now = time.time()
if _PUBLIC_PEERS_CACHE["payload"] is None or (now - _PUBLIC_PEERS_CACHE["ts"]) > _PUBLIC_PEERS_TTL:
_PUBLIC_PEERS_CACHE["payload"] = discovery.public_peer_attestation()
_PUBLIC_PEERS_CACHE["ts"] = now
return _PUBLIC_PEERS_CACHE["payload"]
def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
"""Mount all federation routes onto `app`."""
@@ -103,6 +116,54 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
federation.remove_peer(domain)
return RedirectResponse("/admin/federation", status_code=303)
# ---------- discovery (DNS-SD walker) ----------------------------
@app.get("/admin/federation/discovery", response_class=HTMLResponse)
def admin_discovery(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
seeds = pulse.get_discovery_seeds()
candidates = federation.list_peers()
flash = request.query_params.get("flash") or ""
return TEMPLATES.TemplateResponse(
request,
"admin_discovery.html",
{
"seeds": seeds,
"seeds_text": "\n".join(seeds),
"candidates": candidates,
"flash": flash,
},
)
@app.post("/admin/federation/discovery/seeds")
def admin_discovery_seeds(
request: Request,
seeds: str = Form(""),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
lines = [line for line in seeds.splitlines()]
pulse.set_discovery_seeds(lines)
return RedirectResponse("/admin/federation/discovery?flash=seeds+saved", status_code=303)
@app.post("/admin/federation/discovery/walk")
def admin_discovery_walk(request: Request) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
seeds = pulse.get_discovery_seeds()
if not seeds:
return RedirectResponse("/admin/federation/discovery?flash=no+seeds+configured", status_code=303)
try:
cands = discovery.walk(seeds)
for c in cands:
discovery.record_candidate(c)
msg = f"discovered+{len(cands)}+candidates+from+{len(seeds)}+seed(s)"
except Exception as exc: # noqa: BLE001 — surface the error to the operator
_log.warning("federation.discovery.walk.error", error=str(exc))
msg = f"walk+failed:+{str(exc)[:80]}"
return RedirectResponse(f"/admin/federation/discovery?flash={msg}", status_code=303)
# ---------- public endpoints --------------------------------------
@app.get("/federation/info")
@@ -122,4 +183,192 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
def federation_feed() -> JSONResponse:
return JSONResponse(_cached_feed())
@app.get("/federation/peers/public")
def federation_peers_public() -> JSONResponse:
"""Publicly attested peer list — what other psyc walkers discover us through.
Only trusted peers leak; unknown + blocked are internal state and must
never appear here.
"""
return JSONResponse(_cached_public_peers())
# ---------- public vouches + transparency log --------------------
@app.get("/federation/vouches")
def federation_vouches() -> JSONResponse:
"""Vouches WE have issued. Peers fetch this to learn who we trust."""
return JSONResponse({
"fingerprint": federation.node_fingerprint(),
"vouches": [v.model_dump(mode="json") for v in federation.our_vouches()],
})
@app.get("/federation/log")
def federation_log() -> JSONResponse:
"""Last 100 transparency-log entries, newest first."""
entries = translog.recent(limit=100)
return JSONResponse({
"count": len(entries),
"entries": [e.model_dump(mode="json") for e in entries],
})
@app.get("/federation/log/verify")
def federation_log_verify() -> JSONResponse:
"""Re-walk the chain locally and report status. Auditors poll this."""
result = translog.verify_chain()
head = translog.head()
head_hash = head.entry_hash if head else None
if isinstance(result, Err):
return JSONResponse({"error": result.reason, "head_hash": head_hash}, status_code=409)
return JSONResponse({"verified": result.value, "head_hash": head_hash})
# ---------- admin: vouches page ---------------------------------
@app.get("/admin/federation/vouches", response_class=HTMLResponse)
def admin_federation_vouches(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
peers = federation.list_peers()
cfg = federation.quorum_config()
ours = federation.our_vouches()
# Per-peer view: vouches naming each peer and whether quorum is met.
peer_rows = []
for p in peers:
vouches = federation.vouches_for(p.fingerprint)
peer_rows.append({
"peer": p,
"vouches": vouches,
"vouched": federation.is_vouched(p.fingerprint),
"eligible": federation.peer_is_listening_eligible(p.fingerprint),
})
return TEMPLATES.TemplateResponse(
request,
"admin_federation_vouches.html",
{
"fingerprint": federation.node_fingerprint(),
"our_vouches": ours,
"peer_rows": peer_rows,
"cfg": cfg,
},
)
@app.post("/admin/federation/vouches/issue")
def admin_federation_vouch_issue(
request: Request,
target_fingerprint: str = Form(...),
ttl_days: int = Form(90),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
federation.issue_vouch(target_fingerprint.strip(), ttl_days=ttl_days)
except Exception as exc:
_log.warning("federation.vouch.issue.error", error=str(exc))
return RedirectResponse("/admin/federation/vouches", status_code=303)
@app.post("/admin/federation/vouches/revoke")
def admin_federation_vouch_revoke(
request: Request,
target_fingerprint: str = Form(...),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
federation.revoke_vouch(target_fingerprint.strip())
return RedirectResponse("/admin/federation/vouches", status_code=303)
# ---------- admin: transparency log page ------------------------
@app.get("/admin/federation/log", response_class=HTMLResponse)
def admin_federation_log(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
result = translog.verify_chain()
head = translog.head()
entries = translog.recent(limit=200)
verify_status: Dict[str, Any]
if isinstance(result, Err):
verify_status = {"ok": False, "reason": result.reason}
else:
verify_status = {"ok": True, "verified": result.value}
return TEMPLATES.TemplateResponse(
request,
"admin_federation_log.html",
{
"verify_status": verify_status,
"head_hash": head.entry_hash if head else "",
"head_id": head.id if head else 0,
"entries": entries,
},
)
# ---------- admin: quorum config + per-peer/per-hash view -------
@app.get("/admin/federation/quorum", response_class=HTMLResponse)
def admin_federation_quorum(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
cfg = federation.quorum_config()
peers = federation.list_peers()
peer_rows = [
{
"peer": p,
"vouched": federation.is_vouched(p.fingerprint),
"eligible": federation.peer_is_listening_eligible(p.fingerprint),
}
for p in peers
]
# Group buffered signals by signal_hash and count distinct eligible peers.
signal_rows: Dict[str, Dict[str, Any]] = {}
for s in db.recent_signals(limit=500):
h = s.get("signal_hash") or ""
entry = signal_rows.setdefault(h, {
"signal_hash": h,
"signal_type": s.get("signal_type") or "",
"signal_id": s.get("signal_id") or "",
"peers": set(),
"latest": s.get("received_at") or "",
})
entry["peers"].add(s.get("peer_fingerprint") or "")
hash_summary = []
for h, row in signal_rows.items():
distinct_eligible = sum(
1 for fp in row["peers"] if federation.peer_is_listening_eligible(fp)
)
hash_summary.append({
"signal_hash": h,
"signal_type": row["signal_type"],
"signal_id": row["signal_id"],
"distinct_peers": len(row["peers"]),
"distinct_eligible": distinct_eligible,
"quorum_met": distinct_eligible >= cfg.signal_quorum_k,
"latest": row["latest"],
})
hash_summary.sort(key=lambda r: r["latest"], reverse=True)
return TEMPLATES.TemplateResponse(
request,
"admin_federation_quorum.html",
{
"cfg": cfg,
"peer_rows": peer_rows,
"hash_summary": hash_summary,
},
)
@app.post("/admin/federation/quorum/save")
def admin_federation_quorum_save(
request: Request,
trust_min_vouchers: int = Form(...),
signal_quorum_k: int = Form(...),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
cfg = federation.QuorumConfig(
trust_min_vouchers=max(1, int(trust_min_vouchers)),
signal_quorum_k=max(1, int(signal_quorum_k)),
)
federation.set_quorum_config(cfg)
except Exception as exc:
_log.warning("federation.quorum.save.error", error=str(exc))
return RedirectResponse("/admin/federation/quorum", status_code=303)
_log.info("federation.routes.registered")

View File

@@ -8,15 +8,16 @@ scheduler loop. Caller in app.py just imports + invokes register().
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Optional
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from psyc import log
from psyc import db, log
from psyc.lines import pulse
from psyc.models import Severity
_log = log.get(__name__)
@@ -61,6 +62,10 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None:
return RedirectResponse("/admin", status_code=303)
flash = request.query_params.get("flash", "")
pipelines = pulse.state()
respond_mode = next((p.mode.value for p in pipelines if p.name == "respond"), "manual")
since = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
auto_fired_24h = db.pulse_audit_count_since("respond", "auto-fire", since)
audit_recent = db.pulse_audit_recent("respond", limit=5)
return templates.TemplateResponse(
request,
"admin_pulse.html",
@@ -70,6 +75,13 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None:
"tick_interval": TICK_INTERVAL_SECONDS,
"relative": _relative,
"flash": flash,
"respond_mode": respond_mode,
"respond_threshold": pulse.respond_auto_threshold().value,
"respond_require_quorum": pulse.respond_require_quorum(),
"respond_local_only": pulse.respond_local_only(),
"respond_auto_fired_24h": auto_fired_24h,
"respond_audit_recent": audit_recent,
"severity_choices": [s.value for s in Severity],
},
)
@@ -113,6 +125,29 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None:
flash = f"run failed: {exc}"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.post("/admin/pulse/respond-config")
def pulse_respond_config(
request: Request,
threshold: str = Form(...),
require_quorum: Optional[str] = Form(None),
local_only: Optional[str] = Form(None),
) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
try:
sev = Severity(threshold)
pulse.set_respond_auto_threshold(sev)
pulse.set_respond_require_quorum(require_quorum is not None)
pulse.set_respond_local_only(local_only is not None)
flash = (
f"respond gates updated: threshold={sev.value}, "
f"quorum={'on' if require_quorum is not None else 'off'}, "
f"local-only={'on' if local_only is not None else 'off'}"
)
except ValueError as exc:
flash = f"respond-config failed: {exc}"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.on_event("startup")
async def _start_pulse_loop() -> None:
# Fire-and-forget; the loop catches its own exceptions and self-restarts.

View File

@@ -5,7 +5,7 @@
// This makes the cockpit installable as a PWA and survives flaky connections,
// without serving stale operational data behind the operator's back.
const CACHE_VERSION = "psyc-v3";
const CACHE_VERSION = "psyc-v4";
const STATIC_ASSETS = [
"/static/cockpit.css",
"/static/psyc-tokens.css",

View File

@@ -0,0 +1,72 @@
{% extends "base.html" %}
{% block title %}Discovery — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Peer discovery</h1>
<span class="count">{{ candidates|length }} candidate{{ '' if candidates|length == 1 else 's' }}</span>
</div>
<p class="page-intro">Walk DNS-SD records from a seed domain you know runs psyc, then recurse through its public peer list. Newly-found peers land here with status <code>unknown</code> — vouching is what eventually promotes them. Once seeds exist, enabling the <code>peer-pull</code> pulse pipeline runs this on a cadence.</p>
<p class="back"><a href="/admin/federation">← back to federation</a></p>
{% if flash %}
<div class="verdict verdict-clean">{{ flash }}</div>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Seed domains</h2>
<span class="count">{{ seeds|length }} configured</span>
</div>
<p class="page-intro">One domain per line. Each seed is resolved via <code>_psyc._tcp.&lt;domain&gt;</code> SRV+TXT; its <code>/federation/peers/public</code> is fetched and recursed.</p>
<form method="post" action="/admin/federation/discovery/seeds" style="display:grid; gap:10px; max-width:680px;">
<textarea name="seeds" rows="6" class="lookup-input" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:12px;" placeholder="peer1.example.com&#10;peer2.example.org">{{ seeds_text }}</textarea>
<div style="display:flex; gap:10px;">
<button type="submit" class="btn btn-enforce">save seeds</button>
</div>
</form>
<form method="post" action="/admin/federation/discovery/walk" style="margin-top:14px;">
<button type="submit" class="btn btn-approve" {% if not seeds %}disabled{% endif %}>walk now</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Recent candidates</h2>
<span class="count">{{ candidates|length }} in registry</span>
</div>
<p class="page-intro">Every peer the walker has ever found, newest first. Trusted/blocked statuses are preserved across re-walks — discovery never demotes a peer the operator has classified.</p>
{% if candidates %}
<table class="ledger">
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Discovered</th><th>Last seen</th><th>Notes</th></tr></thead>
<tbody>
{% for p in candidates %}
<tr class="ledger-row">
<td><strong>{{ p.domain }}</strong></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }}</td>
<td>
{% if p.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif p.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">{{ p.status }}</span>
{% endif %}
</td>
<td class="lg-ts">{{ (p.discovered_at or '')[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }}</td>
<td class="lg-sub">{{ (p.notes or '')[:60] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no candidates yet — add a seed above and walk)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -8,7 +8,7 @@
<span class="count">{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}</span>
</div>
<p class="page-intro">This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.</p>
<p class="back"><a href="/admin">← back to admin</a></p>
<p class="back"><a href="/admin">← back to admin</a> &nbsp;·&nbsp; <a href="/admin/federation/discovery">discovery</a> &nbsp;·&nbsp; <a href="/admin/federation/vouches">vouches</a> &nbsp;·&nbsp; <a href="/admin/federation/quorum">quorum config</a> &nbsp;·&nbsp; <a href="/admin/federation/log">transparency log</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">node fingerprint</div>

View File

@@ -0,0 +1,63 @@
{% extends "base.html" %}
{% block title %}Transparency Log — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Transparency Log</h1>
<span class="count">id @ {{ head_id }}</span>
</div>
<p class="page-intro">Every signal we accept from a peer is appended to a signed merkle chain. Each entry references the previous entry's hash, so tampering with any historical row invalidates every entry after. Auditors can re-walk and detect a bad peer historically — even one we trusted at the time.</p>
<p class="back"><a href="/admin/federation">← back to federation</a> &nbsp;·&nbsp; <a href="/admin/federation/vouches">vouches</a> &nbsp;·&nbsp; <a href="/admin/federation/quorum">quorum config</a> &nbsp;·&nbsp; <a href="/federation/log/verify">public verify endpoint</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">chain verification</div>
{% if verify_status.ok %}
<div style="margin:6px 0;"><span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">verified</span> &nbsp; {{ verify_status.verified }} entries walked, no breaks</div>
{% else %}
<div style="margin:6px 0;"><span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">BROKEN</span> &nbsp; {{ verify_status.reason }}</div>
{% endif %}
{% if head_hash %}
<div class="lg-sub" style="margin-top:8px;">head hash</div>
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:13px; word-break:break-all; color:var(--accent);">{{ head_hash }}</div>
{% endif %}
</div>
</section>
<section class="panel">
<div class="panel-head">
<h2>Recent Entries</h2>
<span class="count">{{ entries|length }} of last 200</span>
</div>
<p class="page-intro">Newest first. Hashes are truncated for display — full values are at <code>/federation/log</code>.</p>
{% if entries %}
<table class="ledger">
<thead><tr><th>id</th><th>When</th><th>Type</th><th>Peer / target</th><th>Signal id</th><th>Hash</th></tr></thead>
<tbody>
{% for e in entries %}
<tr class="ledger-row">
<td>{{ e.id }}</td>
<td class="lg-ts">{{ (e.timestamp or '')[:19] | replace('T', ' ') }}</td>
<td><span class="sev-badge">{{ e.entry_type }}</span></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">
{% if e.entry_type == 'signal' %}
{{ (e.entry_data.peer_fingerprint or '')[:8] }}…
{% elif e.entry_type == 'vouch' %}
{{ (e.entry_data.voucher_fingerprint or '')[:8] }}…→{{ (e.entry_data.target_fingerprint or '')[:8] }}…
{% else %}
{% endif %}
</td>
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ ((e.entry_data.signal_id or e.entry_data.target_fingerprint or '') | string)[:32] }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ e.entry_hash[:16] }}…</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(chain empty — no signals appended yet)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -0,0 +1,92 @@
{% extends "base.html" %}
{% block title %}Quorum Config — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Quorum Configuration</h1>
<span class="count">trust={{ cfg.trust_min_vouchers }} k={{ cfg.signal_quorum_k }}</span>
</div>
<p class="page-intro"><strong>trust_min_vouchers</strong> — distinct trusted vouchers required to make a new peer listening-eligible. <strong>signal_quorum_k</strong> — distinct listening-eligible peers required to consider a signal_hash quorum-met. Both gates live in pulse_settings; raising them tightens trust, lowering them relaxes it.</p>
<p class="back"><a href="/admin/federation">← back to federation</a> &nbsp;·&nbsp; <a href="/admin/federation/vouches">vouches</a> &nbsp;·&nbsp; <a href="/admin/federation/log">transparency log</a></p>
<form method="post" action="/admin/federation/quorum/save" style="display:grid; gap:10px; max-width:520px;">
<label class="lg-sub">trust_min_vouchers</label>
<input type="number" name="trust_min_vouchers" value="{{ cfg.trust_min_vouchers }}" min="1" max="50" class="lookup-input" required>
<label class="lg-sub">signal_quorum_k</label>
<input type="number" name="signal_quorum_k" value="{{ cfg.signal_quorum_k }}" min="1" max="50" class="lookup-input" required>
<button type="submit" class="btn btn-enforce">save config</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Per-Peer Listening Eligibility</h2>
<span class="count">{{ peer_rows|length }}</span>
</div>
<p class="page-intro">A peer's feed gets ingested only when its fingerprint is eligible (directly trusted or vouched into trust).</p>
{% if peer_rows %}
<table class="ledger">
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Vouched</th><th>Eligible</th></tr></thead>
<tbody>
{% for row in peer_rows %}
<tr class="ledger-row">
<td><strong>{{ row.peer.domain }}</strong></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }}</td>
<td>
{% if row.peer.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif row.peer.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">{{ row.peer.status }}</span>
{% endif %}
</td>
<td>{% if row.vouched %}<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">yes</span>{% else %}<span class="sev-badge">no</span>{% endif %}</td>
<td>{% if row.eligible %}<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">listening</span>{% else %}<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">muted</span>{% endif %}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no peers registered yet)</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Signal Hashes in Buffer</h2>
<span class="count">{{ hash_summary|length }} hashes</span>
</div>
<p class="page-intro">Distinct eligible-peer counts per signal hash. Quorum is met when count ≥ {{ cfg.signal_quorum_k }}.</p>
{% if hash_summary %}
<table class="ledger">
<thead><tr><th>Latest</th><th>Type</th><th>Signal id</th><th>Hash</th><th>Distinct peers</th><th>Eligible</th><th>Quorum</th></tr></thead>
<tbody>
{% for r in hash_summary %}
<tr class="ledger-row">
<td class="lg-ts">{{ (r.latest or '')[:19] | replace('T', ' ') }}</td>
<td><span class="sev-badge">{{ r.signal_type }}</span></td>
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ r.signal_id[:48] }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ r.signal_hash[:16] }}…</td>
<td>{{ r.distinct_peers }}</td>
<td>{{ r.distinct_eligible }}</td>
<td>
{% if r.quorum_met %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">met</span>
{% else %}
<span class="sev-badge">below</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no signals in buffer yet)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -0,0 +1,110 @@
{% extends "base.html" %}
{% block title %}Federation Vouches — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Web of Trust</h1>
<span class="count">{{ our_vouches|length }} issued</span>
</div>
<p class="page-intro">A vouch is an Ed25519-signed assertion that we trust another node's fingerprint. Peers gossip our vouches with their feeds, so trust accumulates: once {{ cfg.trust_min_vouchers }} of our trusted peers vouches for a new fingerprint, it becomes <em>listening-eligible</em> — its signed feeds get ingested.</p>
<p class="back"><a href="/admin/federation">← back to federation</a> &nbsp;·&nbsp; <a href="/admin/federation/quorum">quorum config</a> &nbsp;·&nbsp; <a href="/admin/federation/log">transparency log</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">our fingerprint</div>
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:14px; word-break:break-all; color:var(--accent);">{{ fingerprint }}</div>
</div>
</section>
<section class="panel">
<div class="panel-head">
<h2>Vouches We've Issued</h2>
<span class="count">{{ our_vouches|length }}</span>
</div>
<p class="page-intro">We've signed these — peers that fetch our feed will see them and may extend trust accordingly.</p>
{% if our_vouches %}
<table class="ledger">
<thead><tr><th>Target fingerprint</th><th>Issued</th><th>Expires</th><th></th></tr></thead>
<tbody>
{% for v in our_vouches %}
<tr class="ledger-row">
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:12px;">{{ v.target_fingerprint }}</td>
<td class="lg-ts">{{ v.issued_at.isoformat()[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ v.expires_at.isoformat()[:16] | replace('T', ' ') if v.expires_at else '—' }}</td>
<td>
<form method="post" action="/admin/federation/vouches/revoke" class="queue-action"
onsubmit="return confirm('Revoke vouch for {{ v.target_fingerprint[:8] }}…?');">
<input type="hidden" name="target_fingerprint" value="{{ v.target_fingerprint }}">
<button type="submit" class="btn btn-reject">revoke</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no vouches issued yet)</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Issue a Vouch</h2>
</div>
<p class="page-intro">Vouch for a peer's fingerprint. Trusted peers see this and may treat the target as listening-eligible.</p>
<form method="post" action="/admin/federation/vouches/issue" style="display:grid; gap:10px; max-width:680px;">
<input type="text" name="target_fingerprint" placeholder="target fingerprint (32 hex chars)" class="lookup-input" maxlength="64" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">
<input type="number" name="ttl_days" value="90" min="1" max="3650" class="lookup-input" placeholder="ttl days">
<button type="submit" class="btn btn-enforce">+ issue vouch</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Per-Peer Quorum Status</h2>
<span class="count">{{ peer_rows|length }} peers</span>
</div>
<p class="page-intro">Threshold: {{ cfg.trust_min_vouchers }} distinct trusted vouchers required to make a non-trusted peer listening-eligible.</p>
{% if peer_rows %}
<table class="ledger">
<thead><tr><th>Peer</th><th>Status</th><th>Vouches</th><th>Quorum met</th><th>Eligible</th></tr></thead>
<tbody>
{% for row in peer_rows %}
<tr class="ledger-row">
<td><strong>{{ row.peer.domain }}</strong><br><span class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ row.peer.fingerprint[:8] }}…{{ row.peer.fingerprint[-8:] }}</span></td>
<td>
{% if row.peer.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif row.peer.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">{{ row.peer.status }}</span>
{% endif %}
</td>
<td>{{ row.vouches|length }}</td>
<td>
{% if row.vouched %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">yes</span>
{% else %}
<span class="sev-badge">no</span>
{% endif %}
</td>
<td>
{% if row.eligible %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">listening</span>
{% else %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">muted</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no peers registered yet)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -35,6 +35,86 @@
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>AUTO-RESPONSE STATE</h2>
<span class="count">{{ respond_auto_fired_24h }} auto-fired in last 24h</span>
</div>
<p class="page-intro">When the <code>respond</code> pipeline runs in <code>auto-execute</code>, every PROPOSED action that passes all three gates fires automatically. Below shows the live config + audit trail.</p>
<div style="display:flex; gap:14px; flex-wrap:wrap; margin:14px 0;">
{# Mode badge — traffic-light coloring. auto-execute is "armed" (red), auto-propose is amber, manual is green/safe. #}
{% if respond_mode == 'auto-execute' %}
<div style="padding:10px 14px; border-radius:6px; background:rgba(248,113,113,0.12); border:1px solid rgba(248,113,113,0.45); color:#fca5a5; font-weight:700; letter-spacing:0.04em;">
MODE: auto-execute (ARMED)
</div>
{% elif respond_mode == 'auto-propose' %}
<div style="padding:10px 14px; border-radius:6px; background:rgba(250,204,21,0.12); border:1px solid rgba(250,204,21,0.45); color:#fde047; font-weight:700; letter-spacing:0.04em;">
MODE: auto-propose (staging only)
</div>
{% else %}
<div style="padding:10px 14px; border-radius:6px; background:rgba(74,222,128,0.10); border:1px solid var(--green); color:var(--green); font-weight:700; letter-spacing:0.04em;">
MODE: manual (no proposals)
</div>
{% endif %}
<div style="padding:10px 14px; border-radius:6px; background:var(--panel-2); border:1px solid var(--border); color:var(--text);">
Threshold: <strong>{{ respond_threshold|upper }}+</strong>
</div>
<div style="padding:10px 14px; border-radius:6px; background:var(--panel-2); border:1px solid var(--border); color:var(--text);">
Quorum: <strong style="color: {{ 'var(--green)' if respond_require_quorum else 'var(--muted)' }};">{{ 'ON' if respond_require_quorum else 'OFF' }}</strong>
</div>
<div style="padding:10px 14px; border-radius:6px; background:var(--panel-2); border:1px solid var(--border); color:var(--text);">
Local-only: <strong style="color: {{ 'var(--green)' if respond_local_only else 'var(--muted)' }};">{{ 'ON' if respond_local_only else 'OFF' }}</strong>
</div>
</div>
<form method="post" action="/admin/pulse/respond-config" style="display:flex; gap:10px; align-items:center; flex-wrap:wrap; margin-top:14px;">
<label style="font-size:12px;">Min severity:
<select name="threshold" style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px;">
{% for s in severity_choices %}
<option value="{{ s }}" {% if s == respond_threshold %}selected{% endif %}>{{ s }}</option>
{% endfor %}
</select>
</label>
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
<input type="checkbox" name="require_quorum" value="1" {% if respond_require_quorum %}checked{% endif %}> require quorum
</label>
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
<input type="checkbox" name="local_only" value="1" {% if respond_local_only %}checked{% endif %}> local-only
</label>
<button type="submit" class="btn">save gates</button>
</form>
{% if respond_audit_recent %}
<table class="ledger" style="margin-top:18px;">
<thead>
<tr>
<th style="width:18%;">timestamp</th>
<th style="width:12%;">decision</th>
<th style="width:18%;">case</th>
<th>detail</th>
</tr>
</thead>
<tbody>
{% for row in respond_audit_recent %}
<tr class="ledger-row">
<td class="lg-ts">{{ row.timestamp }}</td>
<td>
{% if row.action == 'auto-fire' %}<span style="color: var(--green);">✓ {{ row.action }}</span>
{% elif row.action == 'error' %}<span style="color: var(--red);">✗ {{ row.action }}</span>
{% else %}<span style="color: var(--muted);">⊘ {{ row.action }}</span>{% endif %}
</td>
<td class="lg-sub"><code>{{ row.case_id or '—' }}</code>{% if row.action_id %} · #{{ row.action_id }}{% endif %}</td>
<td class="lg-sub">{{ row.detail or '' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="lg-sub" style="margin-top:14px;">No auto-response decisions logged yet.</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Pipelines</h2>

View File

@@ -134,6 +134,19 @@ pulse_settings = Table(
Column("value", String, nullable=False),
)
pulse_audit = Table(
"pulse_audit", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("pipeline", String, nullable=False), # 'respond' | 'fetch' | ...
Column("action", String, nullable=False), # 'auto-fire' | 'skipped' | 'error'
Column("action_id", Integer, nullable=True), # response_actions.id when relevant
Column("case_id", String, nullable=True),
Column("detail", Text, nullable=True),
Column("timestamp", String, nullable=False), # ISO
)
Index("pulse_audit_pipeline_idx", pulse_audit.c.pipeline, pulse_audit.c.timestamp.desc())
Index("pulse_audit_action_id_idx", pulse_audit.c.action_id)
peers = Table(
"peers", _metadata,
Column("domain", String, primary_key=True),
@@ -161,6 +174,35 @@ Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
# Web-of-trust vouches — voucher signs an attestation that target is OK to listen to.
# Quorum is reached when enough distinct trusted vouchers sign for the same target.
vouches = Table(
"vouches", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("voucher_fingerprint", String, nullable=False),
Column("target_fingerprint", String, nullable=False),
Column("issued_at", String, nullable=False),
Column("expires_at", String, nullable=True),
Column("signature", Text, nullable=False), # base64 ed25519 sig
)
Index("vouches_unique_idx", vouches.c.voucher_fingerprint, vouches.c.target_fingerprint, unique=True)
Index("vouches_target_idx", vouches.c.target_fingerprint)
# Transparency log — append-only signed hash chain over every signal we receive.
# Each entry references the previous entry's hash; tampering with any row breaks
# verify_chain on every subsequent row.
translog = Table(
"translog", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("prev_hash", String, nullable=False),
Column("entry_type", String, nullable=False), # signal | vouch | config
Column("entry_data", Text, nullable=False), # canonical JSON of payload
Column("timestamp", String, nullable=False),
Column("entry_hash", String, nullable=False),
)
Index("translog_hash_idx", translog.c.entry_hash)
Index("translog_time_idx", translog.c.timestamp.desc())
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -313,6 +355,66 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
conn.execute(stmt)
def pulse_setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
"""Fetch one row from pulse_settings by key; returns None if absent."""
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return str(row.value) if row else None
def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
"""Upsert one (key, value) into pulse_settings."""
stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- pulse audit trail --------------------------------------------
def pulse_audit_record(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one pulse_audit row. Returns its id.
`row` must include 'pipeline', 'action', 'timestamp'. action_id, case_id,
detail are optional. Caller controls timestamp so tests can pin it.
"""
stmt = insert(pulse_audit).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def pulse_audit_recent(pipeline: str, limit: int = 25, db_path: Path = DB_PATH) -> List[dict]:
"""Most-recent audit rows for one pipeline (newest first)."""
stmt = (
select(pulse_audit)
.where(pulse_audit.c.pipeline == pipeline)
.order_by(pulse_audit.c.timestamp.desc())
.limit(limit)
)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def pulse_audit_count_since(
pipeline: str, action: str, since_iso: str, db_path: Path = DB_PATH
) -> int:
"""Count audit rows for (pipeline, action) at or after `since_iso`."""
stmt = (
select(func.count())
.select_from(pulse_audit)
.where(pulse_audit.c.pipeline == pipeline)
.where(pulse_audit.c.action == action)
.where(pulse_audit.c.timestamp >= since_iso)
)
with engine(db_path).connect() as conn:
return int(conn.execute(stmt).scalar_one())
# ---------- federation: peers + signal buffer ----------------------------
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
@@ -369,3 +471,128 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
# ---------- federation: pulse_settings get/set (shared scratch kv) -------
def setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
"""Read one pulse_settings value by key. Returns None if absent."""
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return None if row is None else str(row.value)
def setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
"""Upsert one pulse_settings entry."""
stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- federation: vouches ------------------------------------------
def upsert_vouch(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert-or-update one vouch. Unique on (voucher_fp, target_fp)."""
stmt = sqlite_insert(vouches).values(**row)
update_cols = {k: stmt.excluded[k] for k in row if k not in ("voucher_fingerprint", "target_fingerprint")}
stmt = stmt.on_conflict_do_update(
index_elements=[vouches.c.voucher_fingerprint, vouches.c.target_fingerprint],
set_=update_cols,
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def list_vouches(db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).order_by(vouches.c.issued_at.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def vouches_by_target(target_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).where(vouches.c.target_fingerprint == target_fingerprint)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def vouches_by_voucher(voucher_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).where(vouches.c.voucher_fingerprint == voucher_fingerprint)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def delete_vouch(voucher_fingerprint: str, target_fingerprint: str, db_path: Path = DB_PATH) -> None:
stmt = vouches.delete().where(
(vouches.c.voucher_fingerprint == voucher_fingerprint)
& (vouches.c.target_fingerprint == target_fingerprint)
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- transparency log ---------------------------------------------
def translog_append(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one transparency-log entry. Returns inserted id."""
stmt = insert(translog).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def translog_head(db_path: Path = DB_PATH) -> Optional[dict]:
"""Highest-id (latest) entry, or None if chain empty."""
stmt = select(translog).order_by(translog.c.id.desc()).limit(1)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def translog_get(entry_id: int, db_path: Path = DB_PATH) -> Optional[dict]:
stmt = select(translog).where(translog.c.id == entry_id)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def translog_after(entry_id: int, db_path: Path = DB_PATH) -> List[dict]:
"""All entries with id > entry_id, oldest first — for sync."""
stmt = select(translog).where(translog.c.id > entry_id).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def translog_recent(limit: int = 100, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(translog).order_by(translog.c.id.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB_PATH) -> List[dict]:
"""All entries with start <= id (and id <= end if given), oldest first."""
cond = translog.c.id >= start
if end is not None:
cond = cond & (translog.c.id <= end)
stmt = select(translog).where(cond).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def signals_for_case(case_id: str, db_path: Path = DB_PATH) -> List[dict]:
"""All federation signals attached to this case_id (signal_type='case').
Empty list means no peer has ever sent us this case → we generated it
ourselves and it counts as locally-sourced for auto-fire purposes.
"""
stmt = (
select(federation_signals)
.where(federation_signals.c.signal_type == "case")
.where(federation_signals.c.signal_id == case_id)
)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]

357
src/psyc/lines/discovery.py Normal file
View File

@@ -0,0 +1,357 @@
"""Discovery — DNS-SD resolver + BFS peer-walker for internet-wide federation.
Federation identity (psyc.lines.federation) gives every node a stable Ed25519
keypair, a 32-hex fingerprint, and a DNS record format. This module is the
*finder*: given a seed domain you suspect runs psyc, walk its DNS-SD records
to learn the fingerprint, fetch its public peer list, and recurse.
Newly-discovered peers always enter the `peers` table with status="unknown"
they do NOT become trusted by being seen; vouching is a separate concern
(sibling module). Discovery only populates the candidate set.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple
import dns.exception
import dns.rdatatype
import dns.resolver
import httpx
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
DEFAULT_TIMEOUT = 5.0
DEFAULT_PORT = 443
DEFAULT_MAX_DEPTH = 2
DEFAULT_MAX_PEERS = 200
_VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"}
# ---------- candidate model --------------------------------------------------
class PeerCandidate(BaseModel):
"""A peer found by the resolver/walker — not yet vetted, just observed."""
domain: str
fingerprint: str
port: int = DEFAULT_PORT
source: str # "dns-sd" | "peer-walk:<source-domain>"
discovered_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# ---------- DNS-SD resolver --------------------------------------------------
def _parse_txt_value(txt: str) -> Result[Dict[str, str], str]:
"""Parse `v=psyc1 fp=<hex> alg=ed25519 path=...` → dict. Tolerant of order."""
out: Dict[str, str] = {}
for token in txt.strip().split():
if "=" not in token:
return Err(f"malformed TXT token (no '='): {token!r}")
k, v = token.split("=", 1)
out[k.strip()] = v.strip()
if out.get("v") != federation.FEED_VERSION:
return Err(f"unsupported version: {out.get('v')!r}")
fp = out.get("fp", "")
if len(fp) != 32 or any(c not in "0123456789abcdef" for c in fp.lower()):
return Err(f"bad fingerprint: {fp!r}")
if out.get("alg") and out["alg"] != federation.FEED_ALG:
return Err(f"unsupported alg: {out.get('alg')!r}")
return Ok(out)
def _flatten_txt(rdata: Any) -> str:
"""DNS TXT records are a sequence of byte-strings — join them. Tolerant of mocks."""
strings = getattr(rdata, "strings", None)
if strings is None:
return str(rdata).strip('"')
parts: List[str] = []
for s in strings:
if isinstance(s, bytes):
parts.append(s.decode("utf-8", errors="replace"))
else:
parts.append(str(s))
return "".join(parts)
def resolve_psyc(domain: str, timeout: float = DEFAULT_TIMEOUT) -> Result[PeerCandidate, str]:
"""Look up `_psyc._tcp.<domain>` SRV + TXT → return a candidate.
SRV gives target+port; TXT carries `v=psyc1 fp=... alg=ed25519 path=...`.
Any DNS failure, parse failure, or missing record returns Err.
"""
name = f"_psyc._tcp.{domain}"
resolver = dns.resolver.Resolver()
resolver.lifetime = timeout
resolver.timeout = timeout
# SRV: target host + port.
port = DEFAULT_PORT
try:
srv_answers = resolver.resolve(name, "SRV")
except dns.resolver.NXDOMAIN:
return Err(f"no SRV record at {name} (NXDOMAIN)")
except dns.resolver.NoAnswer:
return Err(f"no SRV record at {name} (NoAnswer)")
except dns.exception.Timeout:
return Err(f"SRV lookup timed out for {name}")
except dns.exception.DNSException as exc:
return Err(f"SRV lookup failed for {name}: {exc}")
srv_list = list(srv_answers)
if not srv_list:
return Err(f"no SRV record at {name} (empty)")
try:
port = int(srv_list[0].port)
except Exception:
port = DEFAULT_PORT
# TXT: fingerprint + protocol metadata.
try:
txt_answers = resolver.resolve(name, "TXT")
except dns.resolver.NXDOMAIN:
return Err(f"no TXT record at {name} (NXDOMAIN)")
except dns.resolver.NoAnswer:
return Err(f"no TXT record at {name} (NoAnswer)")
except dns.exception.Timeout:
return Err(f"TXT lookup timed out for {name}")
except dns.exception.DNSException as exc:
return Err(f"TXT lookup failed for {name}: {exc}")
txt_list = list(txt_answers)
if not txt_list:
return Err(f"no TXT record at {name} (empty)")
last_err: Optional[str] = None
for rdata in txt_list:
value = _flatten_txt(rdata)
parsed = _parse_txt_value(value)
if isinstance(parsed, Ok):
return Ok(PeerCandidate(
domain=domain,
fingerprint=parsed.value["fp"].lower(),
port=port,
source="dns-sd",
))
last_err = parsed.reason
return Err(f"no usable psyc TXT record at {name}: {last_err}")
# ---------- HTTP probes ------------------------------------------------------
def _base_url(domain: str, port: int) -> str:
if port == 443:
return f"https://{domain}"
return f"https://{domain}:{port}"
def fetch_peer_info(domain: str, port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT,
expected_fingerprint: Optional[str] = None) -> Result[Dict[str, Any], str]:
"""GET /federation/info on a peer. Cross-checks fingerprint if provided.
The cross-check defends against MITM-injected TXT records — DNS said one
fingerprint, the live node's HTTPS-served info MUST agree.
"""
url = _base_url(domain, port) + "/federation/info"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except httpx.HTTPStatusError as exc:
return Err(f"HTTP {exc.response.status_code} from {url}")
except httpx.RequestError as exc:
return Err(f"network error fetching {url}: {exc}")
except ValueError as exc:
return Err(f"non-JSON response from {url}: {exc}")
except Exception as exc: # noqa: BLE001 — anything weird is a failure
return Err(f"fetch failed for {url}: {exc}")
if not isinstance(data, dict):
return Err(f"unexpected info shape from {url}: {type(data).__name__}")
declared = str(data.get("fingerprint", "")).lower()
if expected_fingerprint and declared != expected_fingerprint.lower():
return Err(
f"fingerprint mismatch for {domain}: DNS said {expected_fingerprint!r} "
f"but /federation/info said {declared!r}"
)
return Ok(data)
def fetch_public_peers(domain: str, port: int = DEFAULT_PORT,
timeout: float = DEFAULT_TIMEOUT) -> Result[List[Dict[str, Any]], str]:
"""GET /federation/peers/public on a peer. Returns the list as-is for the walker to dedupe."""
url = _base_url(domain, port) + "/federation/peers/public"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except httpx.HTTPStatusError as exc:
return Err(f"HTTP {exc.response.status_code} from {url}")
except httpx.RequestError as exc:
return Err(f"network error fetching {url}: {exc}")
except ValueError as exc:
return Err(f"non-JSON response from {url}: {exc}")
except Exception as exc: # noqa: BLE001
return Err(f"fetch failed for {url}: {exc}")
if not isinstance(data, list):
return Err(f"unexpected peers shape from {url}: {type(data).__name__}")
out: List[Dict[str, Any]] = []
for item in data:
if isinstance(item, dict) and item.get("domain") and item.get("fingerprint"):
out.append(item)
return Ok(out)
# ---------- BFS walker -------------------------------------------------------
def walk(seeds: List[str], max_depth: int = DEFAULT_MAX_DEPTH,
max_peers: int = DEFAULT_MAX_PEERS, timeout: float = DEFAULT_TIMEOUT) -> List[PeerCandidate]:
"""Breadth-first walk from `seeds` → discovered candidates.
For each domain: DNS-SD resolve → fetch /info to verify fingerprint →
fetch /peers/public → enqueue its domains for the next layer. Dedupes on
(domain, fingerprint). Skips our own fingerprint to avoid loops. All
errors are logged but non-fatal — one bad peer doesn't abort the walk.
"""
own_fp = ""
try:
own_fp = federation.node_fingerprint()
except Exception as exc: # noqa: BLE001 — discovery should work even pre-keygen
_log.info("discovery.walk.no_own_fp", error=str(exc))
seen_pairs: Set[Tuple[str, str]] = set()
seen_domains: Set[str] = set()
out: List[PeerCandidate] = []
# Queue of (domain, depth, source). Seeds enter at depth 0.
frontier: List[Tuple[str, int, str]] = [(d.strip(), 0, "dns-sd") for d in seeds if d and d.strip()]
next_layer: List[Tuple[str, int, str]] = []
while frontier:
domain, depth, source = frontier.pop(0)
if domain in seen_domains:
if not frontier:
frontier, next_layer = next_layer, []
continue
seen_domains.add(domain)
if len(out) >= max_peers:
_log.info("discovery.walk.cap.peers", cap=max_peers)
break
resolved = resolve_psyc(domain, timeout=timeout)
if isinstance(resolved, Err):
_log.info("discovery.resolve.skip", domain=domain, reason=resolved.reason)
if not frontier:
frontier, next_layer = next_layer, []
continue
cand = resolved.value
cand.source = source if depth > 0 else "dns-sd"
if cand.fingerprint == own_fp:
_log.info("discovery.skip.self", domain=domain)
if not frontier:
frontier, next_layer = next_layer, []
continue
pair = (cand.domain, cand.fingerprint)
if pair in seen_pairs:
if not frontier:
frontier, next_layer = next_layer, []
continue
seen_pairs.add(pair)
# Verify the live endpoint's fingerprint matches DNS. If we can't reach
# it, still record the DNS-discovered candidate — vouching can vet it
# later, and we don't want one HTTP outage to abort the walk.
info_res = fetch_peer_info(domain, port=cand.port, timeout=timeout,
expected_fingerprint=cand.fingerprint)
if isinstance(info_res, Err):
_log.info("discovery.info.skip", domain=domain, reason=info_res.reason)
out.append(cand)
if not frontier:
frontier, next_layer = next_layer, []
continue
out.append(cand)
# Recurse: fetch this peer's public-peers list, enqueue domains.
if depth + 1 <= max_depth:
peers_res = fetch_public_peers(domain, port=cand.port, timeout=timeout)
if isinstance(peers_res, Err):
_log.info("discovery.peers.skip", domain=domain, reason=peers_res.reason)
else:
for item in peers_res.value:
child_domain = str(item.get("domain", "")).strip()
if not child_domain or child_domain in seen_domains:
continue
child_source = f"peer-walk:{domain}"
next_layer.append((child_domain, depth + 1, child_source))
if not frontier:
frontier, next_layer = next_layer, []
_log.info("discovery.walk.done", seeds=len(seeds), discovered=len(out), max_depth=max_depth)
return out
# ---------- persistence ------------------------------------------------------
def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None:
"""Upsert a discovered candidate into the peers table.
Preserves any existing trusted/blocked status — discovery NEVER demotes a
peer the operator has already classified. Only updates last_seen.
"""
if default_status not in _VALID_STATUSES:
default_status = "unknown"
existing = db.get_peer(c.domain)
now = c.discovered_at.isoformat()
if existing:
status = existing.get("status") or default_status
if status not in ("trusted", "blocked"):
status = default_status
db.upsert_peer(dict(
domain=c.domain,
fingerprint=existing.get("fingerprint") or c.fingerprint,
pubkey_pem=existing.get("pubkey_pem") or "",
status=status,
discovered_at=existing.get("discovered_at") or now,
last_seen=now,
notes=existing.get("notes"),
))
return
db.upsert_peer(dict(
domain=c.domain,
fingerprint=c.fingerprint,
pubkey_pem="", # populated when we successfully fetch /federation/key during vouching
status=default_status,
discovered_at=now,
last_seen=now,
notes=f"discovered via {c.source}",
))
_log.info("discovery.recorded", domain=c.domain, fp=c.fingerprint, source=c.source)
# ---------- public attestation -----------------------------------------------
def public_peer_attestation() -> List[Dict[str, Any]]:
"""List of peers we'll publicly attest to. Only `trusted` — never leaks unknown/blocked.
This is the surface that other psyc nodes' walkers read from us. We never
expose unknown or blocked peers — those are internal classification state.
"""
out: List[Dict[str, Any]] = []
for row in db.list_peers():
if row.get("status") != "trusted":
continue
out.append({
"domain": row["domain"],
"fingerprint": row["fingerprint"],
"first_seen": row.get("discovered_at"),
})
return out

View File

@@ -24,6 +24,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519
from pydantic import BaseModel, Field
from psyc import DATA_DIR, db, log
from psyc.lines import translog
from psyc.result import Err, Ok, Result
@@ -259,6 +260,9 @@ def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
"window_hours": window_hours,
"cases": _build_case_records(window_hours),
"iocs": _build_ioc_records(window_hours),
# Vouches we've issued ride along with the feed so peers can learn
# who we trust and accumulate quorum on shared targets.
"vouches": [v.model_dump() for v in our_vouches()],
}
sig = sign_payload(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
@@ -306,10 +310,16 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
except Exception as exc:
return Err(f"bad pubkey: {exc}")
# Listening gate: only accept signals from peers we explicitly trust or
# that quorum of trusted peers vouches for. Unknown peers don't land here.
if not peer_is_listening_eligible(peer_fp):
return Err(f"peer not trusted: {peer_fp}")
now = datetime.now(timezone.utc).isoformat()
signal_ids: List[Tuple[str, str]] = []
cases = feed.get("cases") or []
iocs = feed.get("iocs") or []
feed_vouches = feed.get("vouches") or []
for c in cases:
case_id = c.get("case_id") or ""
@@ -323,6 +333,15 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
raw_json=json.dumps(c, sort_keys=True),
))
signal_ids.append(("case", digest))
try:
translog.append("signal", {
"peer_fingerprint": peer_fp,
"signal_type": "case",
"signal_id": case_id,
"signal_hash": digest,
})
except Exception as exc: # transparency log is best-effort, never block ingest
_log.warning("federation.translog.append.fail", error=str(exc))
for i in iocs:
value = i.get("value") or ""
@@ -336,6 +355,36 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result
raw_json=json.dumps(i, sort_keys=True),
))
signal_ids.append(("ioc", digest))
try:
translog.append("signal", {
"peer_fingerprint": peer_fp,
"signal_type": "ioc",
"signal_id": value,
"signal_hash": digest,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
# Vouch propagation — peer asserts who they trust. We only accept vouches
# whose declared voucher fingerprint matches the peer we just authenticated
# (so a peer can't forge vouches "from" someone else through us).
for v_raw in feed_vouches:
if not isinstance(v_raw, dict):
continue
try:
vouch = Vouch.model_validate(v_raw)
except Exception as exc:
_log.warning("federation.vouch.malformed", error=str(exc))
continue
if vouch.voucher_fingerprint != peer_fp:
_log.warning(
"federation.vouch.voucher_mismatch",
claimed=vouch.voucher_fingerprint, actual=peer_fp,
)
continue
accepted = accept_vouch(vouch, expected_pubkey_pem)
if isinstance(accepted, Err):
_log.warning("federation.vouch.rejected", reason=accepted.reason)
_log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
return Ok(ImportSummary(
@@ -403,3 +452,279 @@ def set_peer_status(domain: str, status: str) -> None:
def remove_peer(domain: str) -> None:
db.remove_peer(domain)
# ---------- vouching + quorum (stage 4) ---------------------------------
#
# The web of trust: a peer's fingerprint becomes "listening-eligible" when
# either we directly trust it (peers.status == "trusted") or at least
# `trust_min_vouchers` of our trusted peers have signed a vouch for it.
#
# Signal-level quorum: a federation_signals row is meaningful only when
# `signal_quorum_k` distinct vouched peers have reported the same signal_hash.
#
# Vouches are short Pydantic records signed with the voucher's Ed25519 key
# over canonical JSON of the body (everything but the signature field).
class Vouch(BaseModel):
voucher_fingerprint: str
target_fingerprint: str
issued_at: datetime
expires_at: Optional[datetime] = None
signature: str = "" # base64 ed25519 sig over vouch_payload_bytes(...)
class QuorumConfig(BaseModel):
trust_min_vouchers: int = 2
signal_quorum_k: int = 2
_QC_TRUST_KEY = "wot_trust_min"
_QC_K_KEY = "wot_quorum_k"
def quorum_config() -> QuorumConfig:
"""Live quorum settings, with sensible defaults if pulse_settings is empty."""
cfg = QuorumConfig()
t = db.setting_get(_QC_TRUST_KEY)
k = db.setting_get(_QC_K_KEY)
if t is not None:
try:
cfg.trust_min_vouchers = max(1, int(t))
except ValueError:
pass
if k is not None:
try:
cfg.signal_quorum_k = max(1, int(k))
except ValueError:
pass
return cfg
def set_quorum_config(cfg: QuorumConfig) -> None:
"""Persist quorum config into pulse_settings."""
db.setting_set(_QC_TRUST_KEY, str(cfg.trust_min_vouchers))
db.setting_set(_QC_K_KEY, str(cfg.signal_quorum_k))
def vouch_payload_bytes(
voucher_fp: str,
target_fp: str,
issued_at: datetime,
expires_at: Optional[datetime],
) -> bytes:
"""Canonical JSON of the unsigned vouch body — what the voucher signs."""
body: Dict[str, Any] = {
"voucher_fingerprint": voucher_fp,
"target_fingerprint": target_fp,
"issued_at": issued_at.isoformat(),
"expires_at": expires_at.isoformat() if expires_at else None,
}
return canonical_json(body)
def _store_vouch(v: Vouch) -> None:
db.upsert_vouch(dict(
voucher_fingerprint=v.voucher_fingerprint,
target_fingerprint=v.target_fingerprint,
issued_at=v.issued_at.isoformat(),
expires_at=v.expires_at.isoformat() if v.expires_at else None,
signature=v.signature,
))
def _row_to_vouch(row: Dict[str, Any]) -> Vouch:
return Vouch(
voucher_fingerprint=row["voucher_fingerprint"],
target_fingerprint=row["target_fingerprint"],
issued_at=datetime.fromisoformat(row["issued_at"]),
expires_at=datetime.fromisoformat(row["expires_at"]) if row.get("expires_at") else None,
signature=row.get("signature") or "",
)
def issue_vouch(target_fingerprint: str, ttl_days: int = 90) -> Vouch:
"""Sign a vouch for `target_fingerprint` with OUR key. Persists + returns it."""
our_fp = node_fingerprint()
issued_at = datetime.now(timezone.utc)
expires_at = issued_at + timedelta(days=ttl_days) if ttl_days > 0 else None
payload = vouch_payload_bytes(our_fp, target_fingerprint, issued_at, expires_at)
sig = sign_payload(payload)
vouch = Vouch(
voucher_fingerprint=our_fp,
target_fingerprint=target_fingerprint,
issued_at=issued_at,
expires_at=expires_at,
signature=base64.b64encode(sig).decode("ascii"),
)
_store_vouch(vouch)
try:
translog.append("vouch", {
"voucher_fingerprint": vouch.voucher_fingerprint,
"target_fingerprint": vouch.target_fingerprint,
"issued_at": vouch.issued_at.isoformat(),
"expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
_log.info("federation.vouch.issued", target=target_fingerprint, ttl_days=ttl_days)
return vouch
def accept_vouch(vouch: Vouch, voucher_pubkey_pem: str) -> Result[None, str]:
"""Verify signature + expiry + voucher trust status, then persist.
Failure modes return Err with a short reason so the caller can log them.
A voucher whose status is not "trusted" in our peers table is refused —
we don't accept transitive vouches from unknown peers.
"""
# Expiry first — cheapest check.
now = datetime.now(timezone.utc)
if vouch.expires_at is not None and vouch.expires_at < now:
return Err("vouch expired")
# Voucher must be a directly-trusted peer (no transitive trust at this layer).
voucher_status = None
for row in db.list_peers():
if row.get("fingerprint") == vouch.voucher_fingerprint:
voucher_status = row.get("status")
break
if voucher_status != "trusted":
return Err(f"voucher not trusted: {vouch.voucher_fingerprint}")
# The pubkey must match the declared voucher fingerprint.
try:
if _fingerprint_for_pubkey_pem(voucher_pubkey_pem) != vouch.voucher_fingerprint:
return Err("voucher pubkey does not match fingerprint")
except Exception as exc:
return Err(f"bad voucher pubkey: {exc}")
payload = vouch_payload_bytes(
vouch.voucher_fingerprint,
vouch.target_fingerprint,
vouch.issued_at,
vouch.expires_at,
)
try:
signature = base64.b64decode(vouch.signature)
except Exception:
return Err("vouch signature not base64")
if not verify_payload(payload, signature, voucher_pubkey_pem):
return Err("vouch signature invalid")
_store_vouch(vouch)
try:
translog.append("vouch", {
"voucher_fingerprint": vouch.voucher_fingerprint,
"target_fingerprint": vouch.target_fingerprint,
"issued_at": vouch.issued_at.isoformat(),
"expires_at": vouch.expires_at.isoformat() if vouch.expires_at else None,
"accepted": True,
})
except Exception as exc:
_log.warning("federation.translog.append.fail", error=str(exc))
_log.info("federation.vouch.accepted", voucher=vouch.voucher_fingerprint, target=vouch.target_fingerprint)
return Ok(None)
def revoke_vouch(target_fingerprint: str) -> None:
"""Delete OUR vouch naming `target_fingerprint`. No-op if absent."""
db.delete_vouch(node_fingerprint(), target_fingerprint)
_log.info("federation.vouch.revoked", target=target_fingerprint)
def our_vouches() -> List[Vouch]:
"""Vouches we have issued (filter for voucher_fingerprint == our fp)."""
return [_row_to_vouch(r) for r in db.vouches_by_voucher(node_fingerprint())]
def vouches_for(target_fingerprint: str) -> List[Vouch]:
"""Every vouch stored locally that names `target_fingerprint` as target."""
return [_row_to_vouch(r) for r in db.vouches_by_target(target_fingerprint)]
def is_vouched(target_fingerprint: str, min_vouchers: Optional[int] = None) -> bool:
"""True iff ≥`min_vouchers` distinct non-expired vouches from currently-trusted
peers name `target_fingerprint`.
"""
cfg = quorum_config()
threshold = min_vouchers if min_vouchers is not None else cfg.trust_min_vouchers
if threshold <= 0:
return True
now = datetime.now(timezone.utc)
trusted_fps = {p.fingerprint for p in list_peers() if p.status == "trusted"}
distinct_vouchers: set = set()
for v in vouches_for(target_fingerprint):
if v.expires_at is not None and v.expires_at < now:
continue
if v.voucher_fingerprint not in trusted_fps:
continue
distinct_vouchers.add(v.voucher_fingerprint)
if len(distinct_vouchers) >= threshold:
return True
return False
def peer_is_listening_eligible(fingerprint: str) -> bool:
"""True iff the peer is directly trusted OR vouched into trust.
This is the gate used by `import_signed_feed`. Auto-response will share
this signature — keep it stable.
"""
if not fingerprint:
return False
for p in list_peers():
if p.fingerprint == fingerprint:
if p.status == "trusted":
return True
if p.status == "blocked":
return False
break
return is_vouched(fingerprint)
def is_quorum_met(signal_hash: str, k: Optional[int] = None) -> bool:
"""True iff ≥k distinct vouched peers have reported `signal_hash`.
"Vouched" here means `peer_is_listening_eligible` — the same web-of-trust
set the import gate respects. Self-reports from the local node do not
count (they never end up in federation_signals).
"""
cfg = quorum_config()
threshold = k if k is not None else cfg.signal_quorum_k
if threshold <= 0:
return True
rows = db.signals_for_hash(signal_hash)
distinct: set = set()
for r in rows:
fp = r.get("peer_fingerprint") or ""
if not fp or fp in distinct:
continue
if not peer_is_listening_eligible(fp):
continue
distinct.add(fp)
if len(distinct) >= threshold:
return True
return False
def quorum_evidence(signal_hash: str) -> List[Tuple[str, datetime]]:
"""(peer_fingerprint, received_at) tuples for one signal_hash — for UI display.
Only includes signals from currently listening-eligible peers, deduped
per fingerprint at the earliest receipt.
"""
rows = db.signals_for_hash(signal_hash)
earliest: Dict[str, datetime] = {}
for r in rows:
fp = r.get("peer_fingerprint") or ""
if not fp or not peer_is_listening_eligible(fp):
continue
try:
ts = datetime.fromisoformat(r.get("received_at") or "")
except ValueError:
continue
if fp not in earliest or ts < earliest[fp]:
earliest[fp] = ts
return sorted(earliest.items(), key=lambda kv: kv[1])

View File

@@ -22,6 +22,7 @@ from typing import Callable, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.models import Severity
_log = log.get(__name__)
@@ -33,6 +34,73 @@ class PulseMode(str, Enum):
AUTO_EXECUTE = "auto-execute"
# ---------- respond auto-fire gates -----------------------------------------
# Persisted as rows in pulse_settings (key/value pairs). All defaults are
# "safe" — quorum required, HIGH threshold, federation cases permitted only
# when quorum-met.
_KEY_RESPOND_THRESHOLD = "respond_auto_threshold"
_KEY_RESPOND_REQUIRE_QUORUM = "respond_require_quorum"
_KEY_RESPOND_LOCAL_ONLY = "respond_local_only"
_DEFAULT_THRESHOLD = Severity.HIGH
_DEFAULT_REQUIRE_QUORUM = True
_DEFAULT_LOCAL_ONLY = False
def _severity_rank(sev: Optional[Severity]) -> int:
"""Rank order for severity threshold comparison. Unknown / None → -1."""
if sev is None:
return -1
return {
Severity.LOW: 0,
Severity.MEDIUM: 1,
Severity.HIGH: 2,
Severity.CRITICAL: 3,
}.get(sev, -1)
def respond_auto_threshold() -> Severity:
raw = db.pulse_setting_get(_KEY_RESPOND_THRESHOLD)
if raw is None:
return _DEFAULT_THRESHOLD
try:
return Severity(raw)
except ValueError:
return _DEFAULT_THRESHOLD
def set_respond_auto_threshold(sev: Severity) -> None:
if not isinstance(sev, Severity):
raise ValueError(f"not a Severity: {sev!r}")
db.pulse_setting_set(_KEY_RESPOND_THRESHOLD, sev.value)
_log.info("pulse.respond.threshold.changed", severity=sev.value)
def respond_require_quorum() -> bool:
raw = db.pulse_setting_get(_KEY_RESPOND_REQUIRE_QUORUM)
if raw is None:
return _DEFAULT_REQUIRE_QUORUM
return raw == "1"
def set_respond_require_quorum(state: bool) -> None:
db.pulse_setting_set(_KEY_RESPOND_REQUIRE_QUORUM, "1" if state else "0")
_log.info("pulse.respond.quorum.changed", required=bool(state))
def respond_local_only() -> bool:
raw = db.pulse_setting_get(_KEY_RESPOND_LOCAL_ONLY)
if raw is None:
return _DEFAULT_LOCAL_ONLY
return raw == "1"
def set_respond_local_only(state: bool) -> None:
db.pulse_setting_set(_KEY_RESPOND_LOCAL_ONLY, "1" if state else "0")
_log.info("pulse.respond.local-only.changed", local_only=bool(state))
class Pipeline(BaseModel):
name: str
title: str
@@ -114,28 +182,262 @@ def _run_reindex() -> str:
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
def _run_respond_propose() -> str:
def _propose_for_recent_cases() -> int:
"""Propose response actions for high-severity cases that don't yet have any.
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
helper is already idempotent per case (skips cases that already have
actions), so re-running this on a tick is safe.
Returns total proposed-action count. Idempotent per case (respond's
propose_for_case skips cases that already have actions).
"""
from psyc.lines import respond
cases = db.list_cases(limit=10_000)
proposed = 0
touched = 0
for c in cases:
ids = respond.propose_for_case(c)
if ids:
proposed += len(ids)
touched += 1
return f"proposed {proposed} action(s) for {touched} case(s)"
proposed += len(ids)
return proposed
def _current_mode(pipeline_name: str) -> PulseMode:
p = _get_pipeline(pipeline_name)
return p.mode if p is not None else PulseMode.MANUAL
def _is_quorum_met(case_digest_hash: str) -> bool:
"""Wrapper for federation.is_quorum_met that tolerates the sibling agent
not having shipped the function yet.
If federation lacks `is_quorum_met`, we fall back to False — the safe
default ("no quorum signal → don't fire federation cases").
"""
try:
from psyc.lines import federation as _federation
fn = getattr(_federation, "is_quorum_met", None)
if fn is None:
return False
return bool(fn(case_digest_hash))
except Exception as exc: # noqa: BLE001 — defensive: any import / runtime miss → safe-false
_log.warning("pulse.respond.quorum.unavailable", error=str(exc))
return False
def _canonical_json_local(obj: Dict[str, object]) -> bytes:
"""Deterministic JSON serialization — mirrors federation.canonical_json
for the case-digest computation. Local copy so we don't hard-require
federation to be importable.
"""
import json
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def _case_digest_hash(case_id: str) -> str:
"""SHA-256 of the canonical JSON of {case_id: ...} — what federation hashes.
Returns "" if the case can't be loaded (e.g. row vanished mid-fire).
"""
import hashlib
from psyc.result import Ok as _Ok
got = db.get_case(case_id)
if not isinstance(got, _Ok):
return ""
case = got.value
# Mirror federation._build_case_records' record shape so digests match.
record = {
"case_id": case.case_id,
"summary": case.summary,
"severity": case.classification.severity.value if case.classification.severity else None,
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
"observed_at": case.observed_at.isoformat(),
"feed_source": case.source_metadata.get("feed", ""),
"iocs": (
[{"value": v, "type": "url"} for v in case.observables.urls]
+ [{"value": v, "type": "domain"} for v in case.observables.domains]
+ [{"value": v, "type": "ip"} for v in case.observables.ips]
+ [{"value": v, "type": "hash"} for v in case.observables.hashes]
+ [{"value": v, "type": "cve"} for v in case.observables.cves]
),
}
return hashlib.sha256(_canonical_json_local(record)).hexdigest()
def _case_is_local(case_id: str) -> bool:
"""True if no federation peer has ever pushed us this case_id."""
return len(db.signals_for_case(case_id)) == 0
def _audit(action: str, *, action_id: Optional[int] = None,
case_id: Optional[str] = None, detail: str = "") -> None:
db.pulse_audit_record(dict(
pipeline="respond",
action=action,
action_id=action_id,
case_id=case_id,
detail=detail[:500],
timestamp=_now().isoformat(),
))
def _auto_fire_eligible() -> Tuple[int, int]:
"""Iterate PROPOSED actions and execute the ones that clear every gate.
Returns (fired_count, skipped_count). Records a pulse_audit row for every
decision (fired or skipped-with-reason) so the cockpit can show history.
A single failing action never aborts the batch.
"""
from psyc.lines import respond
from psyc.models import ActionStatus
from psyc.result import Ok as _Ok
threshold = respond_auto_threshold()
threshold_rank = _severity_rank(threshold)
require_quorum = respond_require_quorum()
local_only = respond_local_only()
fired = 0
skipped = 0
actions = respond.list_actions(status=ActionStatus.PROPOSED, limit=100)
for action in actions:
# Re-hydrate severity enum (action.severity is the .value string).
try:
sev_enum = Severity(action.severity) if action.severity else None
except ValueError:
sev_enum = None
if _severity_rank(sev_enum) < threshold_rank:
skipped += 1
_audit(
"skipped",
action_id=action.id,
case_id=action.case_id,
detail=f"below threshold: severity={action.severity!r} < {threshold.value}",
)
continue
is_local = _case_is_local(action.case_id)
if require_quorum and not is_local:
digest = _case_digest_hash(action.case_id)
if not digest or not _is_quorum_met(digest):
if local_only:
# local-only is armed but this case was imported via federation
# → defer (don't fire) until federation grants quorum
skipped += 1
_audit(
"skipped",
action_id=action.id,
case_id=action.case_id,
detail="local-only armed + federation-sourced case",
)
continue
skipped += 1
_audit(
"skipped",
action_id=action.id,
case_id=action.case_id,
detail="no quorum on federation-sourced case",
)
continue
# else: quorum disabled, or case is locally-generated → fire.
try:
result = respond.execute_action(action.id, approver="pulse-auto")
except Exception as exc: # noqa: BLE001 — one bad action shouldn't kill the batch
skipped += 1
_audit(
"error",
action_id=action.id,
case_id=action.case_id,
detail=f"execute raised: {type(exc).__name__}: {exc}",
)
_log.warning("pulse.respond.auto-fire.error",
action_id=action.id, error=str(exc))
continue
if isinstance(result, _Ok):
fired += 1
_log.info("pulse.respond.auto-fire",
action_id=action.id, case_id=action.case_id,
type=action.action_type.value, target=action.target)
_audit(
"auto-fire",
action_id=action.id,
case_id=action.case_id,
detail=f"{action.action_type.value}{action.target}",
)
else:
# Err path — execute_action returned Err (e.g. SOAR sink down)
reason = getattr(result, "reason", "unknown")
skipped += 1
_audit(
"error",
action_id=action.id,
case_id=action.case_id,
detail=f"execute failed: {reason}",
)
_log.warning("pulse.respond.auto-fire.failed",
action_id=action.id, reason=str(reason))
return fired, skipped
def _run_respond() -> str:
"""Propose + (when mode is auto-execute) auto-fire eligible PROPOSED actions.
Two phases:
1. Always propose new actions for high-severity cases (existing behavior).
2. If pipeline mode is auto-execute, iterate PROPOSED actions and execute
those that clear severity/quorum/local-only gates.
"""
propose_count = _propose_for_recent_cases()
mode = _current_mode("respond")
if mode != PulseMode.AUTO_EXECUTE:
return f"proposed {propose_count} actions; mode={mode.value} → no auto-fire"
fired, skipped = _auto_fire_eligible()
return f"proposed {propose_count}; auto-fired {fired}; skipped {skipped} (gate)"
_DISCOVERY_SEEDS_KEY = "discovery_seeds"
def get_discovery_seeds() -> List[str]:
"""Operator-curated seed list for the discovery walker. Newline-separated in DB."""
raw = db.pulse_setting_get(_DISCOVERY_SEEDS_KEY)
if not raw:
return []
return [line.strip() for line in raw.splitlines() if line.strip()]
def set_discovery_seeds(seeds: List[str]) -> None:
"""Replace the seed list. Strips blanks + dedupes preserving order."""
seen: set = set()
cleaned: List[str] = []
for s in seeds:
v = (s or "").strip()
if not v or v in seen:
continue
seen.add(v)
cleaned.append(v)
db.pulse_setting_set(_DISCOVERY_SEEDS_KEY, "\n".join(cleaned))
def _run_peer_pull() -> str:
return "federation not yet active"
"""Walk DNS-SD + recurse over peer-public lists from the operator's seeds.
Records every fresh candidate into the `peers` table with status=unknown.
Vouching (sibling stage) is what eventually promotes them.
"""
from psyc.lines import discovery
seeds = get_discovery_seeds()
if not seeds:
return "no seeds configured"
candidates = discovery.walk(seeds)
for c in candidates:
try:
discovery.record_candidate(c)
except Exception as exc: # noqa: BLE001 — one bad write must not abort the batch
_log.warning("pulse.peer_pull.record.error", domain=c.domain, error=str(exc))
return f"discovered {len(candidates)} candidate(s) from {len(seeds)} seed(s)"
def _run_vouch_refresh() -> str:
@@ -149,7 +451,7 @@ _REGISTRY: Dict[str, Callable[[], str]] = {
"classify": _run_classify,
"prove": _run_prove,
"reindex": _run_reindex,
"respond": _run_respond_propose,
"respond": _run_respond,
"peer-pull": _run_peer_pull,
"vouch-refresh": _run_vouch_refresh,
}

161
src/psyc/lines/translog.py Normal file
View File

@@ -0,0 +1,161 @@
"""Transparency log — append-only signed merkle chain over federation signals.
Every signal we receive from a peer (case, IOC, or accepted vouch) is appended
as one `LogEntry`. Each entry's `entry_hash = sha256(canonical(prev_hash +
entry_type + entry_data + timestamp))` references the previous head, so any
tampering with a historical row invalidates every subsequent hash. The chain
is public — auditors can re-fetch it and re-run `verify_chain` to detect a
node that quietly mutated history (e.g. to hide a bad signal it accepted).
Hash format: lowercase hex SHA-256 of the canonical JSON of
``{"prev_hash": "...", "entry_type": "...", "entry_data": {...}, "timestamp": "..."}``.
Genesis entries use ``prev_hash = "0" * 64``.
"""
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
GENESIS_PREV_HASH = "0" * 64
class LogEntry(BaseModel):
id: int
prev_hash: str
entry_type: str
entry_data: Dict[str, Any] = Field(default_factory=dict)
timestamp: str
entry_hash: str
def _canonical_json(obj: Dict[str, Any]) -> bytes:
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def compute_entry_hash(prev_hash: str, entry_type: str, entry_data: Dict[str, Any], timestamp: str) -> str:
"""Hex SHA-256 of canonical(prev_hash + entry_type + entry_data + timestamp)."""
payload: Dict[str, Any] = {
"prev_hash": prev_hash,
"entry_type": entry_type,
"entry_data": entry_data,
"timestamp": timestamp,
}
return hashlib.sha256(_canonical_json(payload)).hexdigest()
def _row_to_entry(row: Dict[str, Any]) -> LogEntry:
raw = row.get("entry_data") or "{}"
try:
data = json.loads(raw)
except Exception:
data = {}
return LogEntry(
id=int(row["id"]),
prev_hash=str(row["prev_hash"]),
entry_type=str(row["entry_type"]),
entry_data=data if isinstance(data, dict) else {},
timestamp=str(row["timestamp"]),
entry_hash=str(row["entry_hash"]),
)
def head(db_path: Path = db.DB_PATH) -> Optional[LogEntry]:
"""Latest log entry, or None if the chain is empty."""
row = db.translog_head(db_path=db_path)
return _row_to_entry(row) if row else None
def append(entry_type: str, entry_data: Dict[str, Any], db_path: Path = db.DB_PATH) -> LogEntry:
"""Atomically append one entry to the chain. Returns the persisted entry."""
prev = db.translog_head(db_path=db_path)
prev_hash = str(prev["entry_hash"]) if prev else GENESIS_PREV_HASH
timestamp = datetime.now(timezone.utc).isoformat()
entry_hash = compute_entry_hash(prev_hash, entry_type, entry_data, timestamp)
new_id = db.translog_append(
dict(
prev_hash=prev_hash,
entry_type=entry_type,
entry_data=json.dumps(entry_data, sort_keys=True),
timestamp=timestamp,
entry_hash=entry_hash,
),
db_path=db_path,
)
_log.info("translog.append", id=new_id, entry_type=entry_type, hash=entry_hash[:12])
return LogEntry(
id=new_id,
prev_hash=prev_hash,
entry_type=entry_type,
entry_data=entry_data,
timestamp=timestamp,
entry_hash=entry_hash,
)
def verify_chain(start: int = 0, end: Optional[int] = None, db_path: Path = db.DB_PATH) -> Result[int, str]:
"""Walk entries [start, end] in id order, recompute each hash, compare.
Returns Ok(n_verified) when every entry's recomputed hash equals the
stored one and each prev_hash matches the previous entry's stored hash.
Returns Err with the offending id + expected/got hashes otherwise.
"""
rows = db.translog_range(start=start, end=end, db_path=db_path)
if not rows:
return Ok(0)
# Establish the prior hash anchor — either genesis (if walking from id=1)
# or the entry just before `start`.
first_id = int(rows[0]["id"])
if first_id <= 1:
prior_hash = GENESIS_PREV_HASH
else:
anchor = db.translog_get(first_id - 1, db_path=db_path)
if anchor is None:
return Err(f"missing anchor entry id={first_id - 1}")
prior_hash = str(anchor["entry_hash"])
verified = 0
for row in rows:
stored_prev = str(row["prev_hash"])
if stored_prev != prior_hash:
return Err(
f"broken at id={row['id']} expected_prev={prior_hash} got_prev={stored_prev}"
)
try:
data = json.loads(row.get("entry_data") or "{}")
except Exception:
return Err(f"broken at id={row['id']} entry_data not JSON")
if not isinstance(data, dict):
return Err(f"broken at id={row['id']} entry_data not an object")
recomputed = compute_entry_hash(
stored_prev, str(row["entry_type"]), data, str(row["timestamp"])
)
stored_hash = str(row["entry_hash"])
if recomputed != stored_hash:
return Err(
f"broken at id={row['id']} expected={recomputed} got={stored_hash}"
)
prior_hash = stored_hash
verified += 1
return Ok(verified)
def recent(limit: int = 100, db_path: Path = db.DB_PATH) -> List[LogEntry]:
"""The latest `limit` entries, newest first."""
return [_row_to_entry(r) for r in db.translog_recent(limit=limit, db_path=db_path)]
def entries_after(entry_id: int, db_path: Path = db.DB_PATH) -> List[LogEntry]:
"""All entries with id > entry_id, oldest first — for peer sync."""
return [_row_to_entry(r) for r in db.translog_after(entry_id, db_path=db_path)]

376
tests/test_discovery.py Normal file
View File

@@ -0,0 +1,376 @@
"""Discovery — DNS-SD parse + resolver, BFS walker, persistence, public endpoint."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock, patch
import dns.exception
import dns.resolver
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from starlette.middleware.sessions import SessionMiddleware
from psyc import db
from psyc.cockpit import federation_routes
from psyc.lines import discovery, federation, pulse
from psyc.lines.discovery import (
PeerCandidate,
_parse_txt_value,
fetch_peer_info,
fetch_public_peers,
public_peer_attestation,
record_candidate,
resolve_psyc,
walk,
)
from psyc.result import Err, Ok
# ---------- fixtures ---------------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def _mk_srv(port: int = 443) -> Any:
rd = MagicMock()
rd.port = port
return rd
def _mk_txt(value: str) -> Any:
rd = MagicMock()
rd.strings = [value.encode("utf-8")]
return rd
# ---------- TXT parser -------------------------------------------------------
def test_parse_txt_valid():
txt = "v=psyc1 fp=" + "a" * 32 + " alg=ed25519 path=/federation/feed"
res = _parse_txt_value(txt)
assert isinstance(res, Ok)
assert res.value["fp"] == "a" * 32
assert res.value["alg"] == "ed25519"
def test_parse_txt_tolerates_token_order():
txt = "path=/federation/feed alg=ed25519 fp=" + "f" * 32 + " v=psyc1"
res = _parse_txt_value(txt)
assert isinstance(res, Ok)
def test_parse_txt_rejects_wrong_version():
txt = "v=psyc2 fp=" + "a" * 32 + " alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "version" in res.reason
def test_parse_txt_rejects_bad_fingerprint_length():
txt = "v=psyc1 fp=deadbeef alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "fingerprint" in res.reason
def test_parse_txt_rejects_non_hex_fingerprint():
txt = "v=psyc1 fp=" + "z" * 32 + " alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
def test_parse_txt_rejects_malformed_token():
txt = "v=psyc1 fp=" + "a" * 32 + " alg ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "malformed" in res.reason
def test_parse_txt_rejects_wrong_alg():
txt = "v=psyc1 fp=" + "a" * 32 + " alg=rsa"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
# ---------- resolve_psyc -----------------------------------------------------
def test_resolve_psyc_happy_path():
fp = "1" * 32
txt = f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
def fake_resolve(self, name, rdtype):
if rdtype == "SRV":
return [_mk_srv(port=8443)]
if rdtype == "TXT":
return [_mk_txt(txt)]
raise dns.exception.DNSException("unexpected")
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example.com")
assert isinstance(res, Ok)
cand = res.value
assert cand.domain == "peer.example.com"
assert cand.fingerprint == fp
assert cand.port == 8443
assert cand.source == "dns-sd"
def test_resolve_psyc_nxdomain_returns_err():
def fake_resolve(self, name, rdtype):
raise dns.resolver.NXDOMAIN()
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("nothere.example")
assert isinstance(res, Err)
assert "NXDOMAIN" in res.reason
def test_resolve_psyc_txt_malformed_returns_err():
def fake_resolve(self, name, rdtype):
if rdtype == "SRV":
return [_mk_srv()]
return [_mk_txt("v=psyc1 fp=garbage alg=ed25519")]
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example")
assert isinstance(res, Err)
assert "TXT" in res.reason or "fingerprint" in res.reason
def test_resolve_psyc_no_answer_returns_err():
def fake_resolve(self, name, rdtype):
raise dns.resolver.NoAnswer()
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example")
assert isinstance(res, Err)
assert "NoAnswer" in res.reason
# ---------- walk -------------------------------------------------------------
def _stub_resolve(catalog: Dict[str, str]):
"""Build a resolve_psyc stub that returns each domain's catalog fingerprint."""
def _stub(domain: str, timeout: float = 5.0):
if domain not in catalog:
return Err(f"no record for {domain}")
return Ok(PeerCandidate(
domain=domain,
fingerprint=catalog[domain],
port=443,
source="dns-sd",
))
return _stub
def _stub_fetch_info_ok(*args, **kwargs):
return Ok({"fingerprint": kwargs.get("expected_fingerprint", "")})
def _stub_fetch_peers_factory(graph: Dict[str, List[Dict[str, str]]]):
def _stub(domain: str, port: int = 443, timeout: float = 5.0):
return Ok(graph.get(domain, []))
return _stub
def test_walk_dedupes_by_fingerprint(fresh_db, fed_dir, monkeypatch):
# Two seeds, same fingerprint via different domains → only one survives the (domain,fp) dedupe
# but distinct domains both surface; the (domain, fp) pair just shouldn't repeat.
fp = "9" * 32
catalog = {"a.example": fp, "b.example": fp}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["a.example", "b.example", "a.example"], max_depth=1)
# both unique domains made it in; the duplicate seed didn't re-enter
assert len(out) == 2
domains = {c.domain for c in out}
assert domains == {"a.example", "b.example"}
def test_walk_respects_max_depth(fresh_db, fed_dir, monkeypatch):
catalog = {"d0.example": "0" * 32, "d1.example": "1" * 32, "d2.example": "2" * 32}
graph = {
"d0.example": [{"domain": "d1.example", "fingerprint": "1" * 32}],
"d1.example": [{"domain": "d2.example", "fingerprint": "2" * 32}],
}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph))
out = walk(["d0.example"], max_depth=1)
domains = {c.domain for c in out}
# depth 0: d0; depth 1: d1; depth 2 (d2) is excluded by max_depth=1
assert "d0.example" in domains and "d1.example" in domains
assert "d2.example" not in domains
def test_walk_respects_max_peers(fresh_db, fed_dir, monkeypatch):
catalog = {f"d{i}.example": f"{i:032x}" for i in range(10)}
graph = {"d0.example": [{"domain": f"d{i}.example", "fingerprint": f"{i:032x}"} for i in range(1, 10)]}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph))
out = walk(["d0.example"], max_depth=2, max_peers=3)
assert len(out) <= 3
def test_walk_skips_own_fingerprint(fresh_db, fed_dir, monkeypatch):
own = federation.node_fingerprint()
catalog = {"self.example": own, "peer.example": "f" * 32}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["self.example", "peer.example"], max_depth=1)
domains = {c.domain for c in out}
assert "self.example" not in domains
assert "peer.example" in domains
def test_walk_one_failure_does_not_abort(fresh_db, fed_dir, monkeypatch):
catalog = {"good.example": "a" * 32} # bad.example is absent → Err on resolve
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["bad.example", "good.example"], max_depth=1)
assert len(out) == 1
assert out[0].domain == "good.example"
# ---------- record_candidate -------------------------------------------------
def test_record_candidate_inserts_as_unknown(fresh_db):
c = PeerCandidate(domain="new.example", fingerprint="a" * 32, source="dns-sd")
record_candidate(c)
row = db.get_peer("new.example")
assert row is not None
assert row["status"] == "unknown"
assert row["fingerprint"] == "a" * 32
def test_record_candidate_preserves_trusted(fresh_db, fed_dir):
federation.register_peer("vip.example", "b" * 32, "PEM", status="trusted")
# walker re-discovers it
c = PeerCandidate(domain="vip.example", fingerprint="b" * 32, source="peer-walk:other.example")
record_candidate(c)
row = db.get_peer("vip.example")
assert row["status"] == "trusted"
def test_record_candidate_preserves_blocked(fresh_db, fed_dir):
federation.register_peer("bad.example", "c" * 32, "PEM", status="blocked")
c = PeerCandidate(domain="bad.example", fingerprint="c" * 32, source="dns-sd")
record_candidate(c)
row = db.get_peer("bad.example")
assert row["status"] == "blocked"
def test_record_candidate_updates_last_seen(fresh_db):
c = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd")
record_candidate(c)
first = db.get_peer("repeat.example")
# second pass — last_seen advances, discovered_at stays
c2 = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd")
record_candidate(c2)
second = db.get_peer("repeat.example")
assert second["discovered_at"] == first["discovered_at"]
# ---------- public attestation -----------------------------------------------
def test_public_peer_attestation_only_trusted(fresh_db, fed_dir):
federation.register_peer("trusted.example", "1" * 32, "PEM", status="trusted")
federation.register_peer("unknown.example", "2" * 32, "PEM", status="unknown")
federation.register_peer("blocked.example", "3" * 32, "PEM", status="blocked")
out = public_peer_attestation()
domains = {p["domain"] for p in out}
assert domains == {"trusted.example"}
def test_public_peer_attestation_payload_shape(fresh_db, fed_dir):
federation.register_peer("t.example", "f" * 32, "PEM", status="trusted")
out = public_peer_attestation()
assert len(out) == 1
entry = out[0]
assert set(entry.keys()) == {"domain", "fingerprint", "first_seen"}
# ---------- public endpoint via TestClient -----------------------------------
def _mk_app() -> FastAPI:
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="test-secret")
# Templates aren't exercised by the public endpoints we care about here.
from fastapi.templating import Jinja2Templates
import tempfile, os
tdir = tempfile.mkdtemp()
templates = Jinja2Templates(directory=tdir)
federation_routes.register(app, templates)
return app
def test_public_peers_endpoint_excludes_unknown_blocked(fresh_db, fed_dir):
federation.register_peer("ok.example", "a" * 32, "PEM", status="trusted")
federation.register_peer("rude.example", "b" * 32, "PEM", status="blocked")
federation.register_peer("new.example", "c" * 32, "PEM", status="unknown")
# Flush in-memory cache from any earlier test.
federation_routes._PUBLIC_PEERS_CACHE["payload"] = None
federation_routes._PUBLIC_PEERS_CACHE["ts"] = 0.0
app = _mk_app()
client = TestClient(app)
r = client.get("/federation/peers/public")
assert r.status_code == 200
body = r.json()
domains = {p["domain"] for p in body}
assert "ok.example" in domains
assert "rude.example" not in domains
assert "new.example" not in domains
# ---------- pulse integration ------------------------------------------------
def test_discovery_seeds_roundtrip(fresh_db):
assert pulse.get_discovery_seeds() == []
pulse.set_discovery_seeds(["a.example", "b.example", "a.example", "", " "])
# dedupe + strip blanks
assert pulse.get_discovery_seeds() == ["a.example", "b.example"]
def test_peer_pull_pipeline_no_seeds(fresh_db, fed_dir, monkeypatch):
# peer-pull runner returns a clean message when nothing's configured.
outcome, result = pulse.run_now("peer-pull")
assert outcome == "ok"
assert "no seeds" in result
def test_peer_pull_pipeline_with_seeds(fresh_db, fed_dir, monkeypatch):
pulse.set_discovery_seeds(["good.example"])
catalog = {"good.example": "e" * 32}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
outcome, result = pulse.run_now("peer-pull")
assert outcome == "ok"
assert "discovered 1" in result
# And it was recorded.
row = db.get_peer("good.example")
assert row is not None
assert row["status"] == "unknown"

View File

@@ -159,6 +159,8 @@ def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
new_sig = peer_priv.sign(canonical_json(unsigned))
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
# Stage 4 listening gate: peer must be trusted to land signals.
federation.register_peer("peer.example", peer_fp, peer_pub_pem, status="trusted")
result = import_signed_feed(feed, peer_pub_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
summary = result.value

314
tests/test_pulse_respond.py Normal file
View File

@@ -0,0 +1,314 @@
"""Pulseline auto-response gating — severity threshold, quorum, local-only.
The runner here is the live `_run_respond` from pulse.py. We point it at a
temp DB, monkeypatch federation.is_quorum_met to a controllable function, and
swap respond.execute_action for a counter so we don't reach the SOAR sink.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import List, Tuple
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import pulse, respond
from psyc.lines import federation
from psyc.models import (
ActionStatus,
ActionType,
Case,
Classification,
Observables,
ResponseAction,
Severity,
TLP,
)
from psyc.result import Ok
from conftest import make_case
# ----- fixtures --------------------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
"""Temp SQLite + the real runner registry. Mode pinned to auto-execute."""
test_db = tmp_path / "respond.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fired(monkeypatch):
"""Capture every execute_action(action_id, approver=...) — no SOAR sink call."""
log: List[Tuple[int, str]] = []
def fake_execute(action_id: int, approver: str = "operator"):
log.append((action_id, approver))
# Re-read the action so we can return a realistic Ok value
got = respond.get_action(action_id)
return got if isinstance(got, Ok) else got
monkeypatch.setattr(respond, "execute_action", fake_execute)
return log
@pytest.fixture
def quorum_yes(monkeypatch):
monkeypatch.setattr(federation, "is_quorum_met",
lambda h, k=None: True, raising=False)
@pytest.fixture
def quorum_no(monkeypatch):
monkeypatch.setattr(federation, "is_quorum_met",
lambda h, k=None: False, raising=False)
def _set_respond_mode(mode: pulse.PulseMode) -> None:
pulse.set_mode("respond", mode)
def _propose_one(case: Case) -> int:
db.upsert_case(case)
ids = respond.propose_for_case(case)
assert ids, "test setup expected at least one action proposed"
return ids[0]
# ----- severity rank ---------------------------------------------------------
def test_severity_rank_ordering():
assert pulse._severity_rank(Severity.LOW) == 0
assert pulse._severity_rank(Severity.MEDIUM) == 1
assert pulse._severity_rank(Severity.HIGH) == 2
assert pulse._severity_rank(Severity.CRITICAL) == 3
assert pulse._severity_rank(None) == -1
# ----- runner mode gating ----------------------------------------------------
def test_runner_no_auto_fire_when_mode_is_propose(fresh_db, fired, quorum_yes):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
# default seed mode for respond is auto-propose → no auto-fire even with PROPOSED actions
result = pulse._run_respond()
assert "no auto-fire" in result
assert fired == []
def test_runner_no_auto_fire_when_manual(fresh_db, fired, quorum_yes):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
_set_respond_mode(pulse.PulseMode.MANUAL)
result = pulse._run_respond()
assert "no auto-fire" in result
assert fired == []
# ----- severity threshold ----------------------------------------------------
def test_below_threshold_is_skipped(fresh_db, fired, quorum_yes):
# Propose an action carrying severity=MEDIUM by hand — propose_for_case
# only generates HIGH/CRITICAL actions, but the gate must still work for
# any below-threshold severity we drop in.
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
respond.propose_for_case(case)
# Demote every action's severity to MEDIUM so all should be skipped under HIGH threshold.
from sqlalchemy import update as sa_update
with db.engine().begin() as conn:
conn.execute(sa_update(db.response_actions).values(severity=Severity.MEDIUM.value))
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
assert fired == [], "below-threshold action must not fire"
audit = db.pulse_audit_recent("respond", limit=5)
assert any(r["action"] == "skipped" and "below threshold" in (r["detail"] or "") for r in audit)
# ----- quorum gate -----------------------------------------------------------
def test_federation_case_no_quorum_skipped(fresh_db, fired, quorum_no):
case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
# Mark this case as federation-sourced by inserting a signal row for it.
db.record_signal(dict(
peer_fingerprint="peer-a",
signal_type="case",
signal_id=case.case_id,
signal_hash="dummyhash",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
respond.propose_for_case(case)
pulse.set_respond_require_quorum(True)
pulse.set_respond_local_only(False)
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
assert fired == []
audit = db.pulse_audit_recent("respond", limit=5)
assert any(r["action"] == "skipped" and "no quorum" in (r["detail"] or "") for r in audit)
def test_local_case_fires_when_quorum_required(fresh_db, fired, quorum_no):
"""Locally-generated cases bypass quorum — they're our own work."""
case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
# No federation_signals row → locally-generated
respond.propose_for_case(case)
pulse.set_respond_require_quorum(True)
pulse.set_respond_local_only(True) # both armed; local cases still fire
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
assert len(fired) >= 1
audit = db.pulse_audit_recent("respond", limit=10)
assert any(r["action"] == "auto-fire" for r in audit)
def test_local_case_fires_local_only_off(fresh_db, fired, quorum_no):
"""Even with local_only OFF, a locally-generated case still fires (no quorum needed)."""
case = make_case(feed="urlhaus", ips=["1.1.1.1"], severity=Severity.CRITICAL)
db.upsert_case(case)
respond.propose_for_case(case)
pulse.set_respond_require_quorum(True)
pulse.set_respond_local_only(False)
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
assert len(fired) >= 1
def test_federation_case_with_quorum_fires(fresh_db, fired, quorum_yes):
case = make_case(feed="urlhaus", ips=["2.2.2.2"], severity=Severity.HIGH)
db.upsert_case(case)
db.record_signal(dict(
peer_fingerprint="peer-b",
signal_type="case",
signal_id=case.case_id,
signal_hash="dummyhash2",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
respond.propose_for_case(case)
pulse.set_respond_require_quorum(True)
pulse.set_respond_local_only(False)
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
assert len(fired) >= 1
def test_quorum_off_fires_federation_case(fresh_db, fired, quorum_no):
"""With quorum gating disabled entirely, federation cases fire too."""
case = make_case(feed="urlhaus", ips=["3.3.3.3"], severity=Severity.HIGH)
db.upsert_case(case)
db.record_signal(dict(
peer_fingerprint="peer-c",
signal_type="case",
signal_id=case.case_id,
signal_hash="dummyhash3",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
respond.propose_for_case(case)
pulse.set_respond_require_quorum(False)
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
assert len(fired) >= 1
# ----- kill switch -----------------------------------------------------------
def test_kill_switch_blocks_tick(fresh_db, fired, quorum_yes):
"""The parent tick() skips everything when kill switch is armed."""
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
respond.propose_for_case(case)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse.set_kill_switch(True)
results = pulse.tick()
assert all(o == "skipped" for _, o, _ in results)
assert fired == []
# ----- audit -----------------------------------------------------------------
def test_pulse_audit_records_fire_and_skip(fresh_db, fired, quorum_no):
# Local case → should fire and audit auto-fire
local = make_case(feed="urlhaus", ips=["10.0.0.1"], severity=Severity.HIGH, age_days=1)
db.upsert_case(local)
respond.propose_for_case(local)
# Federation-sourced case w/o quorum → should skip and audit skip
fedcase = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.HIGH, age_days=2)
db.upsert_case(fedcase)
db.record_signal(dict(
peer_fingerprint="peer-x",
signal_type="case",
signal_id=fedcase.case_id,
signal_hash="xhash",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
respond.propose_for_case(fedcase)
pulse.set_respond_require_quorum(True)
pulse.set_respond_local_only(False)
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
audit = db.pulse_audit_recent("respond", limit=20)
actions = {r["action"] for r in audit}
assert "auto-fire" in actions
assert "skipped" in actions
def test_audit_count_since(fresh_db, fired, quorum_no):
case = make_case(feed="urlhaus", ips=["8.8.8.8"], severity=Severity.HIGH)
db.upsert_case(case)
respond.propose_for_case(case)
pulse.set_respond_require_quorum(True)
pulse.set_respond_auto_threshold(Severity.HIGH)
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
pulse._run_respond()
from datetime import timedelta
since = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
assert db.pulse_audit_count_since("respond", "auto-fire", since) >= 1
# ----- config round-trip -----------------------------------------------------
def test_config_round_trips(fresh_db):
assert pulse.respond_auto_threshold() == Severity.HIGH
assert pulse.respond_require_quorum() is True
assert pulse.respond_local_only() is False
pulse.set_respond_auto_threshold(Severity.CRITICAL)
pulse.set_respond_require_quorum(False)
pulse.set_respond_local_only(True)
assert pulse.respond_auto_threshold() == Severity.CRITICAL
assert pulse.respond_require_quorum() is False
assert pulse.respond_local_only() is True

118
tests/test_translog.py Normal file
View File

@@ -0,0 +1,118 @@
"""Transparency log — append, verify, tamper detection, sync slices."""
from __future__ import annotations
import json
import pytest
from sqlalchemy import create_engine, update
from psyc import db
from psyc.lines import translog
from psyc.lines.translog import GENESIS_PREV_HASH
from psyc.result import Err, Ok
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
def test_first_append_uses_genesis_prev_hash(fresh_db):
e = translog.append("signal", {"x": 1})
assert e.prev_hash == GENESIS_PREV_HASH
assert e.id >= 1
assert e.entry_type == "signal"
assert e.entry_data == {"x": 1}
# head matches
h = translog.head()
assert h is not None
assert h.id == e.id
assert h.entry_hash == e.entry_hash
def test_append_chains_prev_hash(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
e3 = translog.append("vouch", {"c": 3})
assert e2.prev_hash == e1.entry_hash
assert e3.prev_hash == e2.entry_hash
head = translog.head()
assert head is not None
assert head.entry_hash == e3.entry_hash
def test_verify_chain_ok_round_trip(fresh_db):
translog.append("signal", {"a": 1})
translog.append("signal", {"b": 2})
translog.append("vouch", {"c": 3})
result = translog.verify_chain()
assert isinstance(result, Ok)
assert result.value == 3
def test_verify_chain_empty_returns_zero(fresh_db):
result = translog.verify_chain()
assert isinstance(result, Ok)
assert result.value == 0
def test_verify_chain_detects_tampered_data(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
# Mutate entry_data of the first row directly in the DB; entry_hash stays
# the same but no longer matches the recomputed hash.
with db.engine().begin() as conn:
conn.execute(
update(db.translog)
.where(db.translog.c.id == e1.id)
.values(entry_data=json.dumps({"a": 999}, sort_keys=True))
)
result = translog.verify_chain()
assert isinstance(result, Err)
assert "broken at id=" in result.reason
def test_verify_chain_detects_tampered_prev_hash(fresh_db):
translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
# Flip e2.prev_hash so it no longer matches e1.entry_hash.
with db.engine().begin() as conn:
conn.execute(
update(db.translog)
.where(db.translog.c.id == e2.id)
.values(prev_hash="f" * 64)
)
result = translog.verify_chain()
assert isinstance(result, Err)
assert "broken at id=" in result.reason
def test_entries_after_returns_correct_slice(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
e3 = translog.append("signal", {"c": 3})
after_zero = translog.entries_after(0)
assert [e.id for e in after_zero] == [e1.id, e2.id, e3.id]
after_e1 = translog.entries_after(e1.id)
assert [e.id for e in after_e1] == [e2.id, e3.id]
after_e3 = translog.entries_after(e3.id)
assert after_e3 == []
def test_recent_newest_first(fresh_db):
e1 = translog.append("signal", {"a": 1})
e2 = translog.append("signal", {"b": 2})
e3 = translog.append("signal", {"c": 3})
recent = translog.recent(limit=10)
assert [e.id for e in recent] == [e3.id, e2.id, e1.id]

336
tests/test_vouching.py Normal file
View File

@@ -0,0 +1,336 @@
"""Vouching + quorum — sign/verify, threshold logic, import gate."""
from __future__ import annotations
import base64
import hashlib
from datetime import datetime, timedelta, timezone
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation
from psyc.lines.federation import (
QuorumConfig,
Vouch,
accept_vouch,
build_signed_feed,
canonical_json,
import_signed_feed,
is_quorum_met,
is_vouched,
issue_vouch,
node_fingerprint,
our_vouches,
peer_is_listening_eligible,
public_key_pem,
quorum_config,
register_peer,
revoke_vouch,
set_quorum_config,
vouch_payload_bytes,
)
from psyc.result import Err, Ok
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def _make_peer():
"""Generate an Ed25519 keypair + matching fingerprint for a fake peer."""
priv = ed25519.Ed25519PrivateKey.generate()
pub = priv.public_key()
pub_pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
raw = pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
fp = hashlib.sha256(raw).digest()[:16].hex()
return priv, pub_pem, fp
def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at):
payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at)
sig = priv.sign(payload)
return base64.b64encode(sig).decode("ascii")
# ---------- self-issued vouch round-trip --------------------------------
def test_issue_vouch_roundtrip(fresh_db, fed_dir):
target = "ab" * 16
v = issue_vouch(target, ttl_days=30)
assert v.voucher_fingerprint == node_fingerprint()
assert v.target_fingerprint == target
assert v.expires_at is not None
# round-trip from storage
listed = our_vouches()
assert len(listed) == 1
assert listed[0].target_fingerprint == target
assert listed[0].signature == v.signature
# signature verifies under our own pubkey
payload = vouch_payload_bytes(
v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at
)
sig = base64.b64decode(v.signature)
assert federation.verify_payload(payload, sig, public_key_pem())
def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir):
target = "cd" * 16
issue_vouch(target, ttl_days=30)
assert len(our_vouches()) == 1
revoke_vouch(target)
assert our_vouches() == []
# ---------- accept_vouch validation -------------------------------------
def test_accept_vouch_rejects_expired(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc) - timedelta(days=10)
expired = datetime.now(timezone.utc) - timedelta(days=1)
sig = _sign_vouch(priv, fp, "target", issued, expired)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expired, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "expired" in result.reason
def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
# Sign a different target then claim it's for "real-target".
real_sig = _sign_vouch(priv, fp, "other-target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target",
issued_at=issued, expires_at=expires, signature=real_sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "signature" in result.reason
def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
# Voucher exists but is "unknown" not "trusted".
register_peer("voucher.example", fp, pem, status="unknown")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "not trusted" in result.reason
def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Ok)
# ---------- is_vouched threshold ----------------------------------------
def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir):
"""Two vouches from the same peer must NOT clear a threshold of 2."""
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
assert isinstance(accept_vouch(v1, pem), Ok)
# Newer vouch from the SAME voucher — upsert replaces, count stays 1.
issued2 = issued + timedelta(seconds=1)
sig2 = _sign_vouch(priv, fp, "target", issued2, expires)
v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued2, expires_at=expires, signature=sig2)
assert isinstance(accept_vouch(v2, pem), Ok)
assert is_vouched("target", min_vouchers=2) is False
# Threshold of 1 should pass.
assert is_vouched("target", min_vouchers=1) is True
def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir):
priv_a, pem_a, fp_a = _make_peer()
priv_b, pem_b, fp_b = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target",
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_a, fp_a, "target", issued, expires))
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target",
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_b, fp_b, "target", issued, expires))
assert isinstance(accept_vouch(va, pem_a), Ok)
assert isinstance(accept_vouch(vb, pem_b), Ok)
assert is_vouched("target", min_vouchers=2) is True
assert is_vouched("target", min_vouchers=3) is False
# ---------- quorum on signal_hash ---------------------------------------
def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir):
# Two trusted peers + one untrusted peer report the same signal_hash.
_, pem_a, fp_a = _make_peer()
_, pem_b, fp_b = _make_peer()
_, pem_c, fp_c = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible
for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="h-aaa",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
assert is_quorum_met("h-aaa", k=2) is True
assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts
# ---------- quorum config persistence -----------------------------------
def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir):
cfg = quorum_config()
assert cfg.trust_min_vouchers == 2
assert cfg.signal_quorum_k == 2
set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4))
cfg2 = quorum_config()
assert cfg2.trust_min_vouchers == 3
assert cfg2.signal_quorum_k == 4
# ---------- import gate enforces listening eligibility ------------------
def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None):
"""Build a feed claiming origin=peer_fp, signed with peer_priv."""
payload = {
"version": federation.FEED_VERSION,
"fingerprint": peer_fp,
"generated_at": datetime.now(timezone.utc).isoformat(),
"window_hours": 24,
"cases": [],
"iocs": [{
"value": "9.9.9.9",
"type": "ip",
"severity": "high",
"first_seen": datetime.now(timezone.utc).isoformat(),
"digest_sha256": "abc123",
}],
"vouches": vouches or [],
}
sig = peer_priv.sign(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload
def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir):
peer_priv, peer_pem, peer_fp = _make_peer()
feed = _signed_feed_from_peer(peer_priv, peer_fp)
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Err)
assert "not trusted" in result.reason
def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir):
peer_priv, peer_pem, peer_fp = _make_peer()
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
feed = _signed_feed_from_peer(peer_priv, peer_fp)
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir):
# Two trusted peers vouch for a third — third becomes listening-eligible.
priv_a, pem_a, fp_a = _make_peer()
priv_b, pem_b, fp_b = _make_peer()
priv_c, pem_c, fp_c = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c,
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires))
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c,
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires))
assert isinstance(accept_vouch(va, pem_a), Ok)
assert isinstance(accept_vouch(vb, pem_b), Ok)
assert peer_is_listening_eligible(fp_c) is True
feed = _signed_feed_from_peer(priv_c, fp_c)
result = import_signed_feed(feed, pem_c)
assert isinstance(result, Ok), getattr(result, "reason", "")
def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir):
"""A trusted peer's feed carries a vouch the peer issued — we should
accept_vouch it and store it locally."""
peer_priv, peer_pem, peer_fp = _make_peer()
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
target_fp = "ff" * 16
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
peer_vouch = Vouch(
voucher_fingerprint=peer_fp,
target_fingerprint=target_fp,
issued_at=issued,
expires_at=expires,
signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires),
)
feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")])
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
# The vouch is now in our local store under the peer's fingerprint.
stored = federation.vouches_for(target_fp)
assert any(v.voucher_fingerprint == peer_fp for v in stored)