merge federation: ed25519 identity + signed feeds

# Conflicts:
#	src/psyc/db.py
This commit is contained in:
m17hr1l
2026-06-06 16:13:36 +02:00
7 changed files with 1113 additions and 0 deletions

View File

@@ -16,6 +16,7 @@ dependencies = [
"httpx>=0.27", "httpx>=0.27",
"typer>=0.12", "typer>=0.12",
"pynacl>=1.5", "pynacl>=1.5",
"cryptography>=42.0",
"structlog>=24.1", "structlog>=24.1",
"sqlalchemy>=2.0", "sqlalchemy>=2.0",
"python-dotenv>=1.0", "python-dotenv>=1.0",

135
src/psyc/_federation_cli.py Normal file
View File

@@ -0,0 +1,135 @@
"""Federation CLI — keygen, DNS records, feed export, peer registry, verify.
Registered onto the top-level Typer app from cli.py so the surface stays flat.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
import httpx
import typer
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err
_log = log.get(__name__)
def register(typer_app: typer.Typer) -> None:
"""Mount all `fed-*` commands onto `typer_app`."""
@typer_app.command("fed-keygen")
def fed_keygen() -> None:
"""Generate the node's Ed25519 keypair (or load existing). Prints fingerprint."""
federation.node_keypair() # creates the files if missing
typer.echo(federation.node_fingerprint())
@typer_app.command("fed-dns")
def fed_dns(
domain: str = typer.Argument(..., help="public domain to advertise this node on"),
port: int = typer.Option(443, "--port", help="port psyc is reachable on"),
) -> None:
"""Print the DNS SRV + TXT records to publish under `domain`."""
rec = federation.dns_record(domain, port=port)
typer.echo(rec.human_instructions)
@typer_app.command("fed-feed")
def fed_feed(
window_hours: int = typer.Option(24, "--hours", help="lookback window (hours)"),
) -> None:
"""Build + print the signed feed JSON."""
db.init_db()
payload = federation.build_signed_feed(window_hours=window_hours)
typer.echo(json.dumps(payload, indent=2))
@typer_app.command("fed-verify")
def fed_verify(
peer_url: str = typer.Argument(..., help="peer base URL, e.g. https://peer.example"),
) -> None:
"""Fetch a peer's /federation/{info,key,feed} and verify the signature."""
peer_url = peer_url.rstrip("/")
try:
with httpx.Client(timeout=10.0) as client:
info = client.get(f"{peer_url}/federation/info").json()
key_text = client.get(f"{peer_url}/federation/key").text
feed = client.get(f"{peer_url}/federation/feed").json()
except Exception as exc:
typer.echo(f"error: fetch failed: {exc}", err=True)
raise typer.Exit(1)
# If the peer is already in the registry, prefer the stored pubkey
# (TOFU pin); otherwise warn and use the freshly fetched one.
declared_fp = info.get("fingerprint", "")
pubkey_pem = key_text
pinned = None
for p in federation.list_peers():
if p.fingerprint == declared_fp:
pinned = p
break
if pinned:
pubkey_pem = pinned.pubkey_pem
typer.echo(f" · using pinned pubkey for {pinned.domain}")
else:
typer.echo(" · WARNING: no pinned pubkey for this peer — trusting fetched key (TOFU)")
db.init_db()
result = federation.import_signed_feed(feed, pubkey_pem)
if isinstance(result, Err):
typer.echo(f" ✗ verification failed: {result.reason}", err=True)
raise typer.Exit(1)
s = result.value
typer.echo(f" ✓ verified peer {s.peer_fingerprint}")
typer.echo(f" cases: {s.cases_seen} iocs: {s.iocs_seen} signals buffered: {len(s.signal_ids)}")
@typer_app.command("fed-peer-add")
def fed_peer_add(
domain: str = typer.Argument(..., help="peer's public domain"),
fingerprint: str = typer.Argument(..., help="peer's 32-hex fingerprint"),
pubkey_file: Path = typer.Option(..., "--pubkey-file", help="path to peer's PEM public key"),
status: str = typer.Option("unknown", "--status", help="unknown | trusted | blocked"),
) -> None:
"""Register a peer's identity in the local registry."""
db.init_db()
pem = pubkey_file.read_text(encoding="utf-8")
federation.register_peer(domain, fingerprint, pem, status=status)
typer.echo(f"registered peer {domain} ({fingerprint[:8]}…) status={status}")
@typer_app.command("fed-peer-list")
def fed_peer_list() -> None:
"""List all registered peers."""
db.init_db()
rows = federation.list_peers()
if not rows:
typer.echo("(no peers registered)")
return
for p in rows:
typer.echo(
f" {p.status:8s} {p.domain:30s} {p.fingerprint[:8]}{p.fingerprint[-8:]}"
f" last_seen={(p.last_seen or '')[:16]}"
)
@typer_app.command("fed-peer-trust")
def fed_peer_trust(domain: str = typer.Argument(...)) -> None:
"""Mark a peer as trusted — their signals count toward quorum."""
db.init_db()
federation.set_peer_status(domain, "trusted")
typer.echo(f"{domain} → trusted")
@typer_app.command("fed-peer-block")
def fed_peer_block(domain: str = typer.Argument(...)) -> None:
"""Block a peer — ignore their feeds."""
db.init_db()
federation.set_peer_status(domain, "blocked")
typer.echo(f"{domain} → blocked")
@typer_app.command("fed-peer-remove")
def fed_peer_remove(domain: str = typer.Argument(...)) -> None:
"""Drop a peer from the registry."""
db.init_db()
federation.remove_peer(domain)
typer.echo(f"removed {domain}")

View File

@@ -0,0 +1,125 @@
"""Federation cockpit routes — admin page, public feed/key/info endpoints.
Wired into the FastAPI app by app.py via a single `register(app, TEMPLATES)`
call so the federation surface stays self-contained.
"""
from __future__ import annotations
import json
import time
from typing import Any, Dict, Optional, Tuple
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from psyc import db, log
from psyc.lines import federation
_log = log.get(__name__)
# Tiny in-memory cache for the signed feed — peers may poll, recomputing
# canonical JSON + signature on every hit would be wasteful.
_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_FEED_TTL = 60.0
def _admin_ok(request: Request) -> bool:
return bool(request.session.get("admin_ok"))
def _cached_feed() -> Dict[str, Any]:
now = time.time()
if _FEED_CACHE["payload"] is None or (now - _FEED_CACHE["ts"]) > _FEED_TTL:
_FEED_CACHE["payload"] = federation.build_signed_feed()
_FEED_CACHE["ts"] = now
return _FEED_CACHE["payload"]
def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
"""Mount all federation routes onto `app`."""
@app.get("/admin/federation", response_class=HTMLResponse)
def admin_federation(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
host = request.url.hostname or "your-node.example"
suggested = request.query_params.get("domain", host)
rec = federation.dns_record(suggested)
peers = federation.list_peers()
signals = db.recent_signals(limit=20)
return TEMPLATES.TemplateResponse(
request,
"admin_federation.html",
{
"fingerprint": federation.node_fingerprint(),
"pubkey_pem": federation.public_key_pem(),
"suggested_domain": suggested,
"dns": rec,
"peers": peers,
"signals": signals,
},
)
@app.post("/admin/federation/peers/add")
def admin_federation_add_peer(
request: Request,
domain: str = Form(...),
fingerprint: str = Form(...),
pubkey_pem: str = Form(...),
status: str = Form("unknown"),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
federation.register_peer(domain.strip(), fingerprint.strip(), pubkey_pem.strip(), status=status)
except Exception as exc:
_log.warning("federation.peer.add.error", domain=domain, error=str(exc))
return RedirectResponse("/admin/federation", status_code=303)
@app.post("/admin/federation/peers/{domain}/status")
def admin_federation_set_status(
request: Request,
domain: str,
status: str = Form(...),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
federation.set_peer_status(domain, status)
except ValueError as exc:
_log.warning("federation.peer.status.bad", domain=domain, status=status, error=str(exc))
return RedirectResponse("/admin/federation", status_code=303)
@app.post("/admin/federation/peers/{domain}/remove")
def admin_federation_remove(
request: Request,
domain: str,
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
federation.remove_peer(domain)
return RedirectResponse("/admin/federation", status_code=303)
# ---------- public endpoints --------------------------------------
@app.get("/federation/info")
def federation_info() -> JSONResponse:
return JSONResponse({
"fingerprint": federation.node_fingerprint(),
"version": federation.FEED_VERSION,
"feed": federation.FEED_PATH,
"key": "/federation/key",
})
@app.get("/federation/key", response_class=PlainTextResponse)
def federation_key() -> PlainTextResponse:
return PlainTextResponse(federation.public_key_pem(), media_type="text/plain")
@app.get("/federation/feed")
def federation_feed() -> JSONResponse:
return JSONResponse(_cached_feed())
_log.info("federation.routes.registered")

View File

@@ -0,0 +1,132 @@
{% extends "base.html" %}
{% block title %}Federation — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Federation Identity</h1>
<span class="count">{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}</span>
</div>
<p class="page-intro">This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.</p>
<p class="back"><a href="/admin">← back to admin</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">node fingerprint</div>
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:18px; word-break:break-all; margin:6px 0 12px; color:var(--accent); text-shadow:0 0 12px var(--accent-glow);">{{ fingerprint }}</div>
<details>
<summary class="lg-sub" style="cursor:pointer;">public key (PEM)</summary>
<pre style="background:var(--panel-2); border:1px solid var(--border); border-radius:6px; padding:10px; margin-top:8px; overflow-x:auto; font-size:11.5px;">{{ pubkey_pem }}</pre>
</details>
</div>
</section>
<section class="panel">
<div class="panel-head">
<h2>Publish via DNS</h2>
<span class="count">SRV + TXT records</span>
</div>
<p class="page-intro">Paste these into your zone file. Once they're live, any peer that knows your domain can discover the node and pin the right key without out-of-band coordination.</p>
<form method="get" action="/admin/federation" class="lookup-form" style="margin-bottom:12px;">
<input type="text" name="domain" value="{{ suggested_domain }}" class="lookup-input" placeholder="domain to publish on (e.g. psyc.example.com)">
<button type="submit" class="btn btn-enforce">regenerate</button>
</form>
<pre style="background:var(--panel-2); border:1px solid var(--border); border-radius:6px; padding:12px; overflow-x:auto; font-size:12px; line-height:1.5;">{{ dns.human_instructions }}</pre>
</section>
<section class="panel">
<div class="panel-head">
<h2>Known Peers</h2>
<span class="count">{{ peers|length }} registered</span>
</div>
<p class="page-intro">Trusted peers' feeds are signature-verified on every poll. Blocked peers are recorded but ignored. Unknown peers are kept for review — nothing flows from them until you set them trusted.</p>
{% if peers %}
<table class="ledger">
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Discovered</th><th>Last seen</th><th></th></tr></thead>
<tbody>
{% for p in peers %}
<tr class="ledger-row">
<td><strong>{{ p.domain }}</strong></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }}</td>
<td>
{% if p.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif p.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">unknown</span>
{% endif %}
</td>
<td class="lg-ts">{{ (p.discovered_at or '')[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }}</td>
<td>
<form method="post" action="/admin/federation/peers/{{ p.domain }}/status" class="queue-action">
<input type="hidden" name="status" value="trusted">
<button type="submit" class="btn btn-enforce" {% if p.status == 'trusted' %}disabled{% endif %}>trust</button>
</form>
<form method="post" action="/admin/federation/peers/{{ p.domain }}/status" class="queue-action">
<input type="hidden" name="status" value="blocked">
<button type="submit" class="btn btn-reject" {% if p.status == 'blocked' %}disabled{% endif %}>block</button>
</form>
<form method="post" action="/admin/federation/peers/{{ p.domain }}/remove" class="queue-action"
onsubmit="return confirm('Remove {{ p.domain }}? Their signals will no longer count toward quorum.');">
<button type="submit" class="btn">remove</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no peers yet — add one below)</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Add Peer</h2>
</div>
<p class="page-intro">Pin a peer's identity manually: their domain, their fingerprint (from their DNS TXT record), and the public key they publish at <code>/federation/key</code>.</p>
<form method="post" action="/admin/federation/peers/add" style="display:grid; gap:10px; max-width:680px;">
<input type="text" name="domain" placeholder="peer domain (e.g. peer.example.com)" class="lookup-input" required>
<input type="text" name="fingerprint" placeholder="fingerprint (32 hex chars)" class="lookup-input" maxlength="64" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">
<textarea name="pubkey_pem" placeholder="-----BEGIN PUBLIC KEY-----&#10;…&#10;-----END PUBLIC KEY-----" rows="6" class="lookup-input" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;"></textarea>
<select name="status" class="lookup-input">
<option value="unknown">unknown — record only, don't trust yet</option>
<option value="trusted">trusted — count toward quorum</option>
<option value="blocked">blocked — ignore</option>
</select>
<button type="submit" class="btn btn-enforce">+ register peer</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Recent Signals</h2>
<span class="count">last {{ signals|length }} of buffer</span>
</div>
<p class="page-intro">Verified federation signals from peers — case + IOC reports awaiting quorum. The signal buffer is what later quorum logic will count over.</p>
{% if signals %}
<table class="ledger">
<thead><tr><th>Received</th><th>Peer</th><th>Type</th><th>Id</th><th>Hash</th></tr></thead>
<tbody>
{% for s in signals %}
<tr class="ledger-row">
<td class="lg-ts">{{ (s.received_at or '')[:19] | replace('T', ' ') }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ s.peer_fingerprint[:8] }}…</td>
<td><span class="sev-badge">{{ s.signal_type }}</span></td>
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ s.signal_id[:48] }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ s.signal_hash[:16] }}…</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no signals received yet — quorum stage will populate this)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -134,6 +134,33 @@ pulse_settings = Table(
Column("value", String, nullable=False), Column("value", String, nullable=False),
) )
peers = Table(
"peers", _metadata,
Column("domain", String, primary_key=True),
Column("fingerprint", String, nullable=False),
Column("pubkey_pem", Text, nullable=False),
Column("status", String, nullable=False), # unknown | trusted | blocked
Column("discovered_at", String, nullable=False),
Column("last_seen", String, nullable=True),
Column("notes", Text, nullable=True),
)
Index("peers_fp_idx", peers.c.fingerprint)
Index("peers_status_idx", peers.c.status)
federation_signals = Table(
"federation_signals", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("peer_fingerprint", String, nullable=False),
Column("signal_type", String, nullable=False), # case | ioc
Column("signal_id", String, nullable=False), # case_id or ioc value
Column("signal_hash", String, nullable=False), # sha256 of canonical record
Column("received_at", String, nullable=False),
Column("raw_json", Text, nullable=False),
)
Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
_log = log.get(__name__) _log = log.get(__name__)
_engine: Optional[Engine] = None _engine: Optional[Engine] = None
@@ -284,3 +311,61 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
) )
with engine(db_path).begin() as conn: with engine(db_path).begin() as conn:
conn.execute(stmt) conn.execute(stmt)
# ---------- federation: peers + signal buffer ----------------------------
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert-or-update a peer by domain. `row` must include `domain`."""
stmt = sqlite_insert(peers).values(**row)
update_cols = {k: stmt.excluded[k] for k in row if k != "domain"}
stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def list_peers(db_path: Path = DB_PATH) -> List[dict]:
stmt = select(peers).order_by(peers.c.discovered_at.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]:
stmt = select(peers).where(peers.c.domain == domain)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None:
from sqlalchemy import update as sa_update
stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def remove_peer(domain: str, db_path: Path = DB_PATH) -> None:
stmt = peers.delete().where(peers.c.domain == domain)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def record_signal(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one federation signal. Returns the inserted row id."""
stmt = insert(federation_signals).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]:
"""All recorded signals matching `signal_hash` — quorum-lookup primitive."""
stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]

View File

@@ -0,0 +1,405 @@
"""Federation — node identity, signed feeds, peer registry.
Identity layer for internet-wide federation of psyc nodes. Each node owns
an Ed25519 keypair persisted under DATA_DIR/federation/. The public key
fingerprint (first 16 bytes of SHA256(raw_pubkey) hex-encoded) goes into a
DNS TXT record so peers can discover and authenticate the node, and the
private key signs the outbound feed at /federation/feed.
This module is the *identity* primitives only — discovery walkers,
vouching/quorum, transparency log and auto-pull live in later stages.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from pydantic import BaseModel, Field
from psyc import DATA_DIR, db, log
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
FED_DIR = DATA_DIR / "federation"
PRIVATE_KEY_PATH = FED_DIR / "node.key"
PUBLIC_KEY_PATH = FED_DIR / "node.pub"
FEED_VERSION = "psyc1"
FEED_ALG = "ed25519"
FEED_PATH = "/federation/feed"
# ---------- keypair persistence -----------------------------------------
def _ensure_dir() -> None:
FED_DIR.mkdir(parents=True, exist_ok=True)
def node_keypair() -> Tuple[ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey]:
"""Return the node's Ed25519 keypair, generating + persisting it on first call.
Private key lands at data/federation/node.key (PEM, chmod 0600); public
at data/federation/node.pub (PEM). Idempotent — subsequent calls load
the existing files instead of generating new ones.
"""
_ensure_dir()
if PRIVATE_KEY_PATH.exists() and PUBLIC_KEY_PATH.exists():
priv_pem = PRIVATE_KEY_PATH.read_bytes()
priv = serialization.load_pem_private_key(priv_pem, password=None)
if not isinstance(priv, ed25519.Ed25519PrivateKey):
raise RuntimeError(f"federation key at {PRIVATE_KEY_PATH} is not Ed25519")
return priv, priv.public_key()
priv = ed25519.Ed25519PrivateKey.generate()
priv_pem = priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
pub = priv.public_key()
pub_pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
PRIVATE_KEY_PATH.write_bytes(priv_pem)
os.chmod(PRIVATE_KEY_PATH, 0o600)
PUBLIC_KEY_PATH.write_bytes(pub_pem)
_log.info("federation.keypair.generated", path=str(PRIVATE_KEY_PATH))
return priv, pub
def public_key_pem() -> str:
"""PEM-encoded public key as text — what peers store + verify against."""
_, pub = node_keypair()
return pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
def _raw_pubkey_bytes(pub: ed25519.Ed25519PublicKey) -> bytes:
return pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
def node_fingerprint() -> str:
"""Short stable id for the node — first 16 bytes of SHA256(raw_pubkey), hex.
Lives in DNS TXT records; 32 hex chars is short enough to fit but long
enough to be collision-safe for any plausible peer population.
"""
_, pub = node_keypair()
digest = hashlib.sha256(_raw_pubkey_bytes(pub)).digest()
return digest[:16].hex()
def _fingerprint_for_pubkey_pem(pubkey_pem: str) -> str:
pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
if not isinstance(pub, ed25519.Ed25519PublicKey):
raise ValueError("not an Ed25519 public key")
return hashlib.sha256(_raw_pubkey_bytes(pub)).digest()[:16].hex()
# ---------- DNS record format -------------------------------------------
class DNSRecord(BaseModel):
"""The SRV + TXT pair an admin pastes into their zone file."""
srv_name: str
srv_target: str
srv_port: int
srv_priority: int = 10
srv_weight: int = 10
txt_name: str
txt_value: str
human_instructions: str
def dns_record(domain: str, port: int = 443) -> DNSRecord:
"""Build the DNS-SD-style records that advertise this node at `domain`."""
fp = node_fingerprint()
srv_name = f"_psyc._tcp.{domain}"
srv_target = f"{domain}."
txt_name = srv_name
txt_value = f"v={FEED_VERSION} fp={fp} alg={FEED_ALG} path={FEED_PATH}"
instructions = (
f"; psyc federation records for {domain}\n"
f"; ----------------------------------------------------------\n"
f"; 1) SRV record — locates this psyc node (host + port).\n"
f'{srv_name}. 3600 IN SRV 10 10 {port} {srv_target}\n'
f";\n"
f"; 2) TXT record — declares protocol version, key fingerprint,\n"
f"; signature algorithm, and the feed endpoint path.\n"
f'{txt_name}. 3600 IN TXT "{txt_value}"\n'
f"; ----------------------------------------------------------\n"
f"; Once these are live, federation peers can fetch:\n"
f"; https://{domain}{FEED_PATH} (signed feed JSON)\n"
f"; https://{domain}/federation/key (public key PEM)\n"
f"; https://{domain}/federation/info (capabilities)\n"
)
return DNSRecord(
srv_name=srv_name,
srv_target=srv_target,
srv_port=port,
txt_name=txt_name,
txt_value=txt_value,
human_instructions=instructions,
)
# ---------- signing -----------------------------------------------------
def canonical_json(obj: Dict[str, Any]) -> bytes:
"""Deterministic JSON serialization — what we sign + hash over."""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def sign_payload(payload: bytes) -> bytes:
"""Ed25519 signature over `payload`. Raw 64-byte sig."""
priv, _ = node_keypair()
return priv.sign(payload)
def verify_payload(payload: bytes, signature: bytes, pubkey_pem: str) -> bool:
"""True iff `signature` verifies under `pubkey_pem`. Never raises."""
try:
pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
if not isinstance(pub, ed25519.Ed25519PublicKey):
return False
pub.verify(signature, payload)
return True
except Exception:
return False
# ---------- feed export -------------------------------------------------
def _case_digest(case_record: Dict[str, Any]) -> str:
return hashlib.sha256(canonical_json(case_record)).hexdigest()
def _build_case_records(window_hours: int) -> List[Dict[str, Any]]:
cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
out: List[Dict[str, Any]] = []
for case in db.list_cases(limit=10_000):
if case.ingested_at < cutoff:
continue
record: Dict[str, Any] = {
"case_id": case.case_id,
"summary": case.summary,
"severity": case.classification.severity.value if case.classification.severity else None,
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
"observed_at": case.observed_at.isoformat(),
"feed_source": case.source_metadata.get("feed", ""),
"iocs": (
[{"value": v, "type": "url"} for v in case.observables.urls]
+ [{"value": v, "type": "domain"} for v in case.observables.domains]
+ [{"value": v, "type": "ip"} for v in case.observables.ips]
+ [{"value": v, "type": "hash"} for v in case.observables.hashes]
+ [{"value": v, "type": "cve"} for v in case.observables.cves]
),
}
record["digest_sha256"] = _case_digest(
{k: v for k, v in record.items() if k != "digest_sha256"}
)
out.append(record)
return out
def _build_ioc_records(window_hours: int) -> List[Dict[str, Any]]:
cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
out: List[Dict[str, Any]] = []
seen: set = set()
for ioc_type in ("url", "domain", "ip", "hash", "cve"):
for row in db.iocs_by_type(ioc_type):
first_seen = row.get("first_seen")
if first_seen:
try:
if datetime.fromisoformat(first_seen) < cutoff:
continue
except ValueError:
pass
key = (row["value"], row["ioc_type"])
if key in seen:
continue
seen.add(key)
record = {
"value": row["value"],
"type": row["ioc_type"],
"severity": row.get("severity"),
"first_seen": first_seen,
}
record["digest_sha256"] = hashlib.sha256(canonical_json(record)).hexdigest()
out.append(record)
return out
def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
"""Build the JSON feed peers will pull from /federation/feed.
Pulls cases ingested in the last `window_hours` plus the corresponding
IOC slice, attaches per-record `digest_sha256` (so peers can later
quorum-match across nodes), and signs the canonical JSON of the whole
payload-minus-signature with our Ed25519 key.
"""
payload: Dict[str, Any] = {
"version": FEED_VERSION,
"fingerprint": node_fingerprint(),
"generated_at": datetime.now(timezone.utc).isoformat(),
"window_hours": window_hours,
"cases": _build_case_records(window_hours),
"iocs": _build_ioc_records(window_hours),
}
sig = sign_payload(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload
# ---------- import + quorum-signal buffer -------------------------------
class ImportSummary(BaseModel):
peer_fingerprint: str
cases_seen: int
iocs_seen: int
signal_ids: List[Tuple[str, str]] = Field(default_factory=list)
def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result[ImportSummary, str]:
"""Verify + record a peer's feed into the federation_signals buffer.
Does NOT merge into the local case store — that's the quorum stage's
job. The buffer is the per-hash signal log that quorum logic later
aggregates ("3 trusted peers reported this same IOC → promote").
"""
sig_b64 = feed.get("signature")
if not sig_b64:
return Err("missing signature")
try:
signature = base64.b64decode(sig_b64)
except Exception:
return Err("malformed signature (not base64)")
unsigned = {k: v for k, v in feed.items() if k != "signature"}
if not verify_payload(canonical_json(unsigned), signature, expected_pubkey_pem):
return Err("signature verification failed")
peer_fp = feed.get("fingerprint", "")
if not peer_fp:
return Err("missing fingerprint")
if peer_fp == node_fingerprint():
return Err("loop: own feed")
# Cross-check the declared fingerprint matches the pubkey we verified with.
try:
if _fingerprint_for_pubkey_pem(expected_pubkey_pem) != peer_fp:
return Err("fingerprint does not match provided pubkey")
except Exception as exc:
return Err(f"bad pubkey: {exc}")
now = datetime.now(timezone.utc).isoformat()
signal_ids: List[Tuple[str, str]] = []
cases = feed.get("cases") or []
iocs = feed.get("iocs") or []
for c in cases:
case_id = c.get("case_id") or ""
digest = c.get("digest_sha256") or hashlib.sha256(canonical_json(c)).hexdigest()
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="case",
signal_id=case_id,
signal_hash=digest,
received_at=now,
raw_json=json.dumps(c, sort_keys=True),
))
signal_ids.append(("case", digest))
for i in iocs:
value = i.get("value") or ""
digest = i.get("digest_sha256") or hashlib.sha256(canonical_json(i)).hexdigest()
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id=value,
signal_hash=digest,
received_at=now,
raw_json=json.dumps(i, sort_keys=True),
))
signal_ids.append(("ioc", digest))
_log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
return Ok(ImportSummary(
peer_fingerprint=peer_fp,
cases_seen=len(cases),
iocs_seen=len(iocs),
signal_ids=signal_ids,
))
# ---------- peer registry ------------------------------------------------
class Peer(BaseModel):
domain: str
fingerprint: str
pubkey_pem: str
status: str = "unknown" # unknown | trusted | blocked
discovered_at: str
last_seen: Optional[str] = None
notes: Optional[str] = None
def _row_to_peer(row: Dict[str, Any]) -> Peer:
return Peer(
domain=row["domain"],
fingerprint=row["fingerprint"],
pubkey_pem=row["pubkey_pem"],
status=row.get("status") or "unknown",
discovered_at=row.get("discovered_at") or "",
last_seen=row.get("last_seen"),
notes=row.get("notes"),
)
def register_peer(domain: str, fingerprint: str, pubkey_pem: str, status: str = "unknown") -> None:
"""Insert or update a peer in the registry. Idempotent on `domain`."""
now = datetime.now(timezone.utc).isoformat()
existing = db.get_peer(domain)
discovered_at = existing["discovered_at"] if existing else now
db.upsert_peer(dict(
domain=domain,
fingerprint=fingerprint,
pubkey_pem=pubkey_pem,
status=status,
discovered_at=discovered_at,
last_seen=now,
notes=existing.get("notes") if existing else None,
))
def list_peers() -> List[Peer]:
return [_row_to_peer(r) for r in db.list_peers()]
def get_peer(domain: str) -> Optional[Peer]:
row = db.get_peer(domain)
return _row_to_peer(row) if row else None
def set_peer_status(domain: str, status: str) -> None:
if status not in ("unknown", "trusted", "blocked"):
raise ValueError(f"unknown peer status: {status}")
db.set_peer_status(domain, status)
def remove_peer(domain: str) -> None:
db.remove_peer(domain)

230
tests/test_federation.py Normal file
View File

@@ -0,0 +1,230 @@
"""Federation — identity, signed feed, peer registry, signal buffer."""
from __future__ import annotations
import base64
import os
import stat
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation
from psyc.lines.federation import (
DNSRecord,
build_signed_feed,
canonical_json,
dns_record,
import_signed_feed,
node_fingerprint,
node_keypair,
public_key_pem,
sign_payload,
verify_payload,
)
from psyc.result import Err, Ok
from conftest import make_case
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
"""Redirect federation key paths to a tmp dir so each test gets a fresh key."""
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def test_keypair_persisted_with_correct_perms(fed_dir):
priv, pub = node_keypair()
assert federation.PRIVATE_KEY_PATH.exists()
assert federation.PUBLIC_KEY_PATH.exists()
mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode)
assert mode == 0o600
def test_keypair_idempotent_across_calls(fed_dir):
priv1, pub1 = node_keypair()
priv2, pub2 = node_keypair()
# raw bytes match — same key loaded twice, not regenerated
raw1 = pub1.public_bytes(
encoding=federation.serialization.Encoding.Raw,
format=federation.serialization.PublicFormat.Raw,
)
raw2 = pub2.public_bytes(
encoding=federation.serialization.Encoding.Raw,
format=federation.serialization.PublicFormat.Raw,
)
assert raw1 == raw2
def test_fingerprint_is_stable_and_32_hex(fed_dir):
fp1 = node_fingerprint()
fp2 = node_fingerprint()
assert fp1 == fp2
assert len(fp1) == 32
assert all(c in "0123456789abcdef" for c in fp1)
def test_sign_verify_roundtrip(fed_dir):
payload = b"hello federation"
sig = sign_payload(payload)
assert verify_payload(payload, sig, public_key_pem()) is True
def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path):
payload = b"the truth"
sig = sign_payload(payload)
# Build a *different* keypair in a separate directory and use its pubkey.
other = tmp_path / "other-federation"
other.mkdir()
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
other_priv = ed25519.Ed25519PrivateKey.generate()
other_pub_pem = other_priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
assert verify_payload(payload, sig, other_pub_pem) is False
def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir):
sig = sign_payload(b"x")
assert verify_payload(b"x", sig, "not a pem") is False
def test_canonical_json_is_deterministic():
a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}})
b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}})
assert a == b
def test_dns_record_txt_value_matches_spec(fed_dir):
rec = dns_record("example.com")
assert isinstance(rec, DNSRecord)
fp = node_fingerprint()
assert rec.srv_name == "_psyc._tcp.example.com"
assert rec.srv_target == "example.com."
assert rec.srv_port == 443
assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
# human instructions include both record lines
assert "_psyc._tcp.example.com" in rec.human_instructions
assert "SRV" in rec.human_instructions
assert "TXT" in rec.human_instructions
def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"])
db.upsert_case(case)
feed = build_signed_feed(window_hours=24)
assert feed["version"] == "psyc1"
assert feed["fingerprint"] == node_fingerprint()
assert feed["signature"]
# cases entry made it in
assert any(c["case_id"] == case.case_id for c in feed["cases"])
# Import using our own pubkey against a *different* declared fingerprint:
# swap the fingerprint so import_signed_feed doesn't reject as a loop.
pub = public_key_pem()
# Use a fresh keypair to act as "peer" — sign a feed with that key.
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
import hashlib
peer_priv = ed25519.Ed25519PrivateKey.generate()
peer_pub_pem = peer_priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
peer_raw = peer_priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex()
feed["fingerprint"] = peer_fp
unsigned = {k: v for k, v in feed.items() if k != "signature"}
new_sig = peer_priv.sign(canonical_json(unsigned))
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
result = import_signed_feed(feed, peer_pub_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
summary = result.value
assert summary.peer_fingerprint == peer_fp
assert summary.cases_seen >= 1
def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir):
db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"]))
feed = build_signed_feed(window_hours=24)
# build a *different* pubkey to claim verification against
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
# also need to change fingerprint so the loop-check doesn't trigger first
feed["fingerprint"] = "deadbeef" * 4
result = import_signed_feed(feed, other_pub_pem)
assert isinstance(result, Err)
def test_import_own_feed_returns_loop_err(fresh_db, fed_dir):
db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"]))
feed = build_signed_feed(window_hours=24)
result = import_signed_feed(feed, public_key_pem())
assert isinstance(result, Err)
assert "loop" in result.reason
def test_record_signal_then_lookup_by_hash(fresh_db):
rid = db.record_signal(dict(
peer_fingerprint="abc123",
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="hash-aaa",
received_at="2026-01-01T00:00:00+00:00",
raw_json="{}",
))
assert rid > 0
rows = db.signals_for_hash("hash-aaa")
assert len(rows) == 1
assert rows[0]["peer_fingerprint"] == "abc123"
# second peer reports the same hash → both surface for quorum check
db.record_signal(dict(
peer_fingerprint="def456",
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="hash-aaa",
received_at="2026-01-01T00:01:00+00:00",
raw_json="{}",
))
rows = db.signals_for_hash("hash-aaa")
assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"}
def test_peer_registry_crud(fresh_db, fed_dir):
federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted")
peers = federation.list_peers()
assert len(peers) == 1
assert peers[0].domain == "peer.example"
assert peers[0].status == "trusted"
federation.set_peer_status("peer.example", "blocked")
assert federation.get_peer("peer.example").status == "blocked"
federation.remove_peer("peer.example")
assert federation.list_peers() == []