diff --git a/pyproject.toml b/pyproject.toml
index 74b3cd4..6a4f932 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -16,6 +16,7 @@ dependencies = [
"httpx>=0.27",
"typer>=0.12",
"pynacl>=1.5",
+ "cryptography>=42.0",
"structlog>=24.1",
"sqlalchemy>=2.0",
"python-dotenv>=1.0",
diff --git a/src/psyc/_federation_cli.py b/src/psyc/_federation_cli.py
new file mode 100644
index 0000000..e01bbe6
--- /dev/null
+++ b/src/psyc/_federation_cli.py
@@ -0,0 +1,135 @@
+"""Federation CLI — keygen, DNS records, feed export, peer registry, verify.
+
+Registered onto the top-level Typer app from cli.py so the surface stays flat.
+"""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Optional
+
+import httpx
+import typer
+
+from psyc import db, log
+from psyc.lines import federation
+from psyc.result import Err
+
+
+_log = log.get(__name__)
+
+
+def register(typer_app: typer.Typer) -> None:
+ """Mount all `fed-*` commands onto `typer_app`."""
+
+ @typer_app.command("fed-keygen")
+ def fed_keygen() -> None:
+ """Generate the node's Ed25519 keypair (or load existing). Prints fingerprint."""
+ federation.node_keypair() # creates the files if missing
+ typer.echo(federation.node_fingerprint())
+
+ @typer_app.command("fed-dns")
+ def fed_dns(
+ domain: str = typer.Argument(..., help="public domain to advertise this node on"),
+ port: int = typer.Option(443, "--port", help="port psyc is reachable on"),
+ ) -> None:
+ """Print the DNS SRV + TXT records to publish under `domain`."""
+ rec = federation.dns_record(domain, port=port)
+ typer.echo(rec.human_instructions)
+
+ @typer_app.command("fed-feed")
+ def fed_feed(
+ window_hours: int = typer.Option(24, "--hours", help="lookback window (hours)"),
+ ) -> None:
+ """Build + print the signed feed JSON."""
+ db.init_db()
+ payload = federation.build_signed_feed(window_hours=window_hours)
+ typer.echo(json.dumps(payload, indent=2))
+
+ @typer_app.command("fed-verify")
+ def fed_verify(
+ peer_url: str = typer.Argument(..., help="peer base URL, e.g. https://peer.example"),
+ ) -> None:
+ """Fetch a peer's /federation/{info,key,feed} and verify the signature."""
+ peer_url = peer_url.rstrip("/")
+ try:
+ with httpx.Client(timeout=10.0) as client:
+ info = client.get(f"{peer_url}/federation/info").json()
+ key_text = client.get(f"{peer_url}/federation/key").text
+ feed = client.get(f"{peer_url}/federation/feed").json()
+ except Exception as exc:
+ typer.echo(f"error: fetch failed: {exc}", err=True)
+ raise typer.Exit(1)
+
+ # If the peer is already in the registry, prefer the stored pubkey
+ # (TOFU pin); otherwise warn and use the freshly fetched one.
+ declared_fp = info.get("fingerprint", "")
+ pubkey_pem = key_text
+ pinned = None
+ for p in federation.list_peers():
+ if p.fingerprint == declared_fp:
+ pinned = p
+ break
+ if pinned:
+ pubkey_pem = pinned.pubkey_pem
+ typer.echo(f" · using pinned pubkey for {pinned.domain}")
+ else:
+ typer.echo(" · WARNING: no pinned pubkey for this peer — trusting fetched key (TOFU)")
+
+ db.init_db()
+ result = federation.import_signed_feed(feed, pubkey_pem)
+ if isinstance(result, Err):
+ typer.echo(f" ✗ verification failed: {result.reason}", err=True)
+ raise typer.Exit(1)
+ s = result.value
+ typer.echo(f" ✓ verified peer {s.peer_fingerprint}")
+ typer.echo(f" cases: {s.cases_seen} iocs: {s.iocs_seen} signals buffered: {len(s.signal_ids)}")
+
+ @typer_app.command("fed-peer-add")
+ def fed_peer_add(
+ domain: str = typer.Argument(..., help="peer's public domain"),
+ fingerprint: str = typer.Argument(..., help="peer's 32-hex fingerprint"),
+ pubkey_file: Path = typer.Option(..., "--pubkey-file", help="path to peer's PEM public key"),
+ status: str = typer.Option("unknown", "--status", help="unknown | trusted | blocked"),
+ ) -> None:
+ """Register a peer's identity in the local registry."""
+ db.init_db()
+ pem = pubkey_file.read_text(encoding="utf-8")
+ federation.register_peer(domain, fingerprint, pem, status=status)
+ typer.echo(f"registered peer {domain} ({fingerprint[:8]}…) status={status}")
+
+ @typer_app.command("fed-peer-list")
+ def fed_peer_list() -> None:
+ """List all registered peers."""
+ db.init_db()
+ rows = federation.list_peers()
+ if not rows:
+ typer.echo("(no peers registered)")
+ return
+ for p in rows:
+ typer.echo(
+ f" {p.status:8s} {p.domain:30s} {p.fingerprint[:8]}…{p.fingerprint[-8:]}"
+ f" last_seen={(p.last_seen or '—')[:16]}"
+ )
+
+ @typer_app.command("fed-peer-trust")
+ def fed_peer_trust(domain: str = typer.Argument(...)) -> None:
+ """Mark a peer as trusted — their signals count toward quorum."""
+ db.init_db()
+ federation.set_peer_status(domain, "trusted")
+ typer.echo(f"{domain} → trusted")
+
+ @typer_app.command("fed-peer-block")
+ def fed_peer_block(domain: str = typer.Argument(...)) -> None:
+ """Block a peer — ignore their feeds."""
+ db.init_db()
+ federation.set_peer_status(domain, "blocked")
+ typer.echo(f"{domain} → blocked")
+
+ @typer_app.command("fed-peer-remove")
+ def fed_peer_remove(domain: str = typer.Argument(...)) -> None:
+ """Drop a peer from the registry."""
+ db.init_db()
+ federation.remove_peer(domain)
+ typer.echo(f"removed {domain}")
diff --git a/src/psyc/cockpit/federation_routes.py b/src/psyc/cockpit/federation_routes.py
new file mode 100644
index 0000000..d402f38
--- /dev/null
+++ b/src/psyc/cockpit/federation_routes.py
@@ -0,0 +1,125 @@
+"""Federation cockpit routes — admin page, public feed/key/info endpoints.
+
+Wired into the FastAPI app by app.py via a single `register(app, TEMPLATES)`
+call so the federation surface stays self-contained.
+"""
+
+from __future__ import annotations
+
+import json
+import time
+from typing import Any, Dict, Optional, Tuple
+
+from fastapi import FastAPI, Form, HTTPException, Request
+from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse
+from fastapi.templating import Jinja2Templates
+
+from psyc import db, log
+from psyc.lines import federation
+
+
+_log = log.get(__name__)
+
+# Tiny in-memory cache for the signed feed — peers may poll, recomputing
+# canonical JSON + signature on every hit would be wasteful.
+_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
+_FEED_TTL = 60.0
+
+
+def _admin_ok(request: Request) -> bool:
+ return bool(request.session.get("admin_ok"))
+
+
+def _cached_feed() -> Dict[str, Any]:
+ now = time.time()
+ if _FEED_CACHE["payload"] is None or (now - _FEED_CACHE["ts"]) > _FEED_TTL:
+ _FEED_CACHE["payload"] = federation.build_signed_feed()
+ _FEED_CACHE["ts"] = now
+ return _FEED_CACHE["payload"]
+
+
+def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
+ """Mount all federation routes onto `app`."""
+
+ @app.get("/admin/federation", response_class=HTMLResponse)
+ def admin_federation(request: Request) -> HTMLResponse:
+ if not _admin_ok(request):
+ return RedirectResponse("/admin", status_code=303)
+ host = request.url.hostname or "your-node.example"
+ suggested = request.query_params.get("domain", host)
+ rec = federation.dns_record(suggested)
+ peers = federation.list_peers()
+ signals = db.recent_signals(limit=20)
+ return TEMPLATES.TemplateResponse(
+ request,
+ "admin_federation.html",
+ {
+ "fingerprint": federation.node_fingerprint(),
+ "pubkey_pem": federation.public_key_pem(),
+ "suggested_domain": suggested,
+ "dns": rec,
+ "peers": peers,
+ "signals": signals,
+ },
+ )
+
+ @app.post("/admin/federation/peers/add")
+ def admin_federation_add_peer(
+ request: Request,
+ domain: str = Form(...),
+ fingerprint: str = Form(...),
+ pubkey_pem: str = Form(...),
+ status: str = Form("unknown"),
+ ) -> RedirectResponse:
+ if not _admin_ok(request):
+ raise HTTPException(status_code=403, detail="admin session required")
+ try:
+ federation.register_peer(domain.strip(), fingerprint.strip(), pubkey_pem.strip(), status=status)
+ except Exception as exc:
+ _log.warning("federation.peer.add.error", domain=domain, error=str(exc))
+ return RedirectResponse("/admin/federation", status_code=303)
+
+ @app.post("/admin/federation/peers/{domain}/status")
+ def admin_federation_set_status(
+ request: Request,
+ domain: str,
+ status: str = Form(...),
+ ) -> RedirectResponse:
+ if not _admin_ok(request):
+ raise HTTPException(status_code=403, detail="admin session required")
+ try:
+ federation.set_peer_status(domain, status)
+ except ValueError as exc:
+ _log.warning("federation.peer.status.bad", domain=domain, status=status, error=str(exc))
+ return RedirectResponse("/admin/federation", status_code=303)
+
+ @app.post("/admin/federation/peers/{domain}/remove")
+ def admin_federation_remove(
+ request: Request,
+ domain: str,
+ ) -> RedirectResponse:
+ if not _admin_ok(request):
+ raise HTTPException(status_code=403, detail="admin session required")
+ federation.remove_peer(domain)
+ return RedirectResponse("/admin/federation", status_code=303)
+
+ # ---------- public endpoints --------------------------------------
+
+ @app.get("/federation/info")
+ def federation_info() -> JSONResponse:
+ return JSONResponse({
+ "fingerprint": federation.node_fingerprint(),
+ "version": federation.FEED_VERSION,
+ "feed": federation.FEED_PATH,
+ "key": "/federation/key",
+ })
+
+ @app.get("/federation/key", response_class=PlainTextResponse)
+ def federation_key() -> PlainTextResponse:
+ return PlainTextResponse(federation.public_key_pem(), media_type="text/plain")
+
+ @app.get("/federation/feed")
+ def federation_feed() -> JSONResponse:
+ return JSONResponse(_cached_feed())
+
+ _log.info("federation.routes.registered")
diff --git a/src/psyc/cockpit/templates/admin_federation.html b/src/psyc/cockpit/templates/admin_federation.html
new file mode 100644
index 0000000..b9493bd
--- /dev/null
+++ b/src/psyc/cockpit/templates/admin_federation.html
@@ -0,0 +1,132 @@
+{% extends "base.html" %}
+{% block title %}Federation — psyc admin{% endblock %}
+{% block content %}
+
+
+
+
Federation Identity
+ {{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}
+
+ This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.
+ ← back to admin
+
+
+
node fingerprint
+
{{ fingerprint }}
+
+ public key (PEM)
+ {{ pubkey_pem }}
+
+
+
+
+
+
+
Publish via DNS
+ SRV + TXT records
+
+ Paste these into your zone file. Once they're live, any peer that knows your domain can discover the node and pin the right key without out-of-band coordination.
+
+
+
+ {{ dns.human_instructions }}
+
+
+
+
+
Known Peers
+ {{ peers|length }} registered
+
+ Trusted peers' feeds are signature-verified on every poll. Blocked peers are recorded but ignored. Unknown peers are kept for review — nothing flows from them until you set them trusted.
+
+ {% if peers %}
+
+ | Domain | Fingerprint | Status | Discovered | Last seen | |
+
+ {% for p in peers %}
+
+ | {{ p.domain }} |
+ {{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }} |
+
+ {% if p.status == 'trusted' %}
+ trusted
+ {% elif p.status == 'blocked' %}
+ blocked
+ {% else %}
+ unknown
+ {% endif %}
+ |
+ {{ (p.discovered_at or '')[:16] | replace('T', ' ') }} |
+ {{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }} |
+
+
+
+
+ |
+
+ {% endfor %}
+
+
+ {% else %}
+ (no peers yet — add one below)
+ {% endif %}
+
+
+
+
+
Add Peer
+
+ Pin a peer's identity manually: their domain, their fingerprint (from their DNS TXT record), and the public key they publish at /federation/key.
+
+
+
+
+
+
Recent Signals
+ last {{ signals|length }} of buffer
+
+ Verified federation signals from peers — case + IOC reports awaiting quorum. The signal buffer is what later quorum logic will count over.
+
+ {% if signals %}
+
+ | Received | Peer | Type | Id | Hash |
+
+ {% for s in signals %}
+
+ | {{ (s.received_at or '')[:19] | replace('T', ' ') }} |
+ {{ s.peer_fingerprint[:8] }}… |
+ {{ s.signal_type }} |
+ {{ s.signal_id[:48] }} |
+ {{ s.signal_hash[:16] }}… |
+
+ {% endfor %}
+
+
+ {% else %}
+ (no signals received yet — quorum stage will populate this)
+ {% endif %}
+
+
+{% endblock %}
diff --git a/src/psyc/db.py b/src/psyc/db.py
index 273154c..5d19acb 100644
--- a/src/psyc/db.py
+++ b/src/psyc/db.py
@@ -134,6 +134,33 @@ pulse_settings = Table(
Column("value", String, nullable=False),
)
+peers = Table(
+ "peers", _metadata,
+ Column("domain", String, primary_key=True),
+ Column("fingerprint", String, nullable=False),
+ Column("pubkey_pem", Text, nullable=False),
+ Column("status", String, nullable=False), # unknown | trusted | blocked
+ Column("discovered_at", String, nullable=False),
+ Column("last_seen", String, nullable=True),
+ Column("notes", Text, nullable=True),
+)
+Index("peers_fp_idx", peers.c.fingerprint)
+Index("peers_status_idx", peers.c.status)
+
+federation_signals = Table(
+ "federation_signals", _metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("peer_fingerprint", String, nullable=False),
+ Column("signal_type", String, nullable=False), # case | ioc
+ Column("signal_id", String, nullable=False), # case_id or ioc value
+ Column("signal_hash", String, nullable=False), # sha256 of canonical record
+ Column("received_at", String, nullable=False),
+ Column("raw_json", Text, nullable=False),
+)
+Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
+Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
+Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
+
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -284,3 +311,61 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
+
+
+# ---------- federation: peers + signal buffer ----------------------------
+
+def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
+ """Insert-or-update a peer by domain. `row` must include `domain`."""
+ stmt = sqlite_insert(peers).values(**row)
+ update_cols = {k: stmt.excluded[k] for k in row if k != "domain"}
+ stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols)
+ with engine(db_path).begin() as conn:
+ conn.execute(stmt)
+
+
+def list_peers(db_path: Path = DB_PATH) -> List[dict]:
+ stmt = select(peers).order_by(peers.c.discovered_at.desc())
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]:
+ stmt = select(peers).where(peers.c.domain == domain)
+ with engine(db_path).connect() as conn:
+ row = conn.execute(stmt).fetchone()
+ return dict(row._mapping) if row else None
+
+
+def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None:
+ from sqlalchemy import update as sa_update
+ stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status)
+ with engine(db_path).begin() as conn:
+ conn.execute(stmt)
+
+
+def remove_peer(domain: str, db_path: Path = DB_PATH) -> None:
+ stmt = peers.delete().where(peers.c.domain == domain)
+ with engine(db_path).begin() as conn:
+ conn.execute(stmt)
+
+
+def record_signal(row: dict, db_path: Path = DB_PATH) -> int:
+ """Append one federation signal. Returns the inserted row id."""
+ stmt = insert(federation_signals).values(**row)
+ with engine(db_path).begin() as conn:
+ res = conn.execute(stmt)
+ return int(res.inserted_primary_key[0])
+
+
+def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]:
+ """All recorded signals matching `signal_hash` — quorum-lookup primitive."""
+ stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash)
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
+ stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py
new file mode 100644
index 0000000..bed46fb
--- /dev/null
+++ b/src/psyc/lines/federation.py
@@ -0,0 +1,405 @@
+"""Federation — node identity, signed feeds, peer registry.
+
+Identity layer for internet-wide federation of psyc nodes. Each node owns
+an Ed25519 keypair persisted under DATA_DIR/federation/. The public key
+fingerprint (first 16 bytes of SHA256(raw_pubkey) hex-encoded) goes into a
+DNS TXT record so peers can discover and authenticate the node, and the
+private key signs the outbound feed at /federation/feed.
+
+This module is the *identity* primitives only — discovery walkers,
+vouching/quorum, transparency log and auto-pull live in later stages.
+"""
+
+from __future__ import annotations
+
+import base64
+import hashlib
+import json
+import os
+from datetime import datetime, timedelta, timezone
+from typing import Any, Dict, List, Optional, Tuple
+
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import ed25519
+from pydantic import BaseModel, Field
+
+from psyc import DATA_DIR, db, log
+from psyc.result import Err, Ok, Result
+
+
+_log = log.get(__name__)
+
+FED_DIR = DATA_DIR / "federation"
+PRIVATE_KEY_PATH = FED_DIR / "node.key"
+PUBLIC_KEY_PATH = FED_DIR / "node.pub"
+
+FEED_VERSION = "psyc1"
+FEED_ALG = "ed25519"
+FEED_PATH = "/federation/feed"
+
+
+# ---------- keypair persistence -----------------------------------------
+
+def _ensure_dir() -> None:
+ FED_DIR.mkdir(parents=True, exist_ok=True)
+
+
+def node_keypair() -> Tuple[ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey]:
+ """Return the node's Ed25519 keypair, generating + persisting it on first call.
+
+ Private key lands at data/federation/node.key (PEM, chmod 0600); public
+ at data/federation/node.pub (PEM). Idempotent — subsequent calls load
+ the existing files instead of generating new ones.
+ """
+ _ensure_dir()
+ if PRIVATE_KEY_PATH.exists() and PUBLIC_KEY_PATH.exists():
+ priv_pem = PRIVATE_KEY_PATH.read_bytes()
+ priv = serialization.load_pem_private_key(priv_pem, password=None)
+ if not isinstance(priv, ed25519.Ed25519PrivateKey):
+ raise RuntimeError(f"federation key at {PRIVATE_KEY_PATH} is not Ed25519")
+ return priv, priv.public_key()
+
+ priv = ed25519.Ed25519PrivateKey.generate()
+ priv_pem = priv.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=serialization.NoEncryption(),
+ )
+ pub = priv.public_key()
+ pub_pem = pub.public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ )
+ PRIVATE_KEY_PATH.write_bytes(priv_pem)
+ os.chmod(PRIVATE_KEY_PATH, 0o600)
+ PUBLIC_KEY_PATH.write_bytes(pub_pem)
+ _log.info("federation.keypair.generated", path=str(PRIVATE_KEY_PATH))
+ return priv, pub
+
+
+def public_key_pem() -> str:
+ """PEM-encoded public key as text — what peers store + verify against."""
+ _, pub = node_keypair()
+ return pub.public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ ).decode("ascii")
+
+
+def _raw_pubkey_bytes(pub: ed25519.Ed25519PublicKey) -> bytes:
+ return pub.public_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw,
+ )
+
+
+def node_fingerprint() -> str:
+ """Short stable id for the node — first 16 bytes of SHA256(raw_pubkey), hex.
+
+ Lives in DNS TXT records; 32 hex chars is short enough to fit but long
+ enough to be collision-safe for any plausible peer population.
+ """
+ _, pub = node_keypair()
+ digest = hashlib.sha256(_raw_pubkey_bytes(pub)).digest()
+ return digest[:16].hex()
+
+
+def _fingerprint_for_pubkey_pem(pubkey_pem: str) -> str:
+ pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
+ if not isinstance(pub, ed25519.Ed25519PublicKey):
+ raise ValueError("not an Ed25519 public key")
+ return hashlib.sha256(_raw_pubkey_bytes(pub)).digest()[:16].hex()
+
+
+# ---------- DNS record format -------------------------------------------
+
+class DNSRecord(BaseModel):
+ """The SRV + TXT pair an admin pastes into their zone file."""
+ srv_name: str
+ srv_target: str
+ srv_port: int
+ srv_priority: int = 10
+ srv_weight: int = 10
+ txt_name: str
+ txt_value: str
+ human_instructions: str
+
+
+def dns_record(domain: str, port: int = 443) -> DNSRecord:
+ """Build the DNS-SD-style records that advertise this node at `domain`."""
+ fp = node_fingerprint()
+ srv_name = f"_psyc._tcp.{domain}"
+ srv_target = f"{domain}."
+ txt_name = srv_name
+ txt_value = f"v={FEED_VERSION} fp={fp} alg={FEED_ALG} path={FEED_PATH}"
+ instructions = (
+ f"; psyc federation records for {domain}\n"
+ f"; ----------------------------------------------------------\n"
+ f"; 1) SRV record — locates this psyc node (host + port).\n"
+ f'{srv_name}. 3600 IN SRV 10 10 {port} {srv_target}\n'
+ f";\n"
+ f"; 2) TXT record — declares protocol version, key fingerprint,\n"
+ f"; signature algorithm, and the feed endpoint path.\n"
+ f'{txt_name}. 3600 IN TXT "{txt_value}"\n'
+ f"; ----------------------------------------------------------\n"
+ f"; Once these are live, federation peers can fetch:\n"
+ f"; https://{domain}{FEED_PATH} (signed feed JSON)\n"
+ f"; https://{domain}/federation/key (public key PEM)\n"
+ f"; https://{domain}/federation/info (capabilities)\n"
+ )
+ return DNSRecord(
+ srv_name=srv_name,
+ srv_target=srv_target,
+ srv_port=port,
+ txt_name=txt_name,
+ txt_value=txt_value,
+ human_instructions=instructions,
+ )
+
+
+# ---------- signing -----------------------------------------------------
+
+def canonical_json(obj: Dict[str, Any]) -> bytes:
+ """Deterministic JSON serialization — what we sign + hash over."""
+ return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
+
+
+def sign_payload(payload: bytes) -> bytes:
+ """Ed25519 signature over `payload`. Raw 64-byte sig."""
+ priv, _ = node_keypair()
+ return priv.sign(payload)
+
+
+def verify_payload(payload: bytes, signature: bytes, pubkey_pem: str) -> bool:
+ """True iff `signature` verifies under `pubkey_pem`. Never raises."""
+ try:
+ pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
+ if not isinstance(pub, ed25519.Ed25519PublicKey):
+ return False
+ pub.verify(signature, payload)
+ return True
+ except Exception:
+ return False
+
+
+# ---------- feed export -------------------------------------------------
+
+def _case_digest(case_record: Dict[str, Any]) -> str:
+ return hashlib.sha256(canonical_json(case_record)).hexdigest()
+
+
+def _build_case_records(window_hours: int) -> List[Dict[str, Any]]:
+ cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
+ out: List[Dict[str, Any]] = []
+ for case in db.list_cases(limit=10_000):
+ if case.ingested_at < cutoff:
+ continue
+ record: Dict[str, Any] = {
+ "case_id": case.case_id,
+ "summary": case.summary,
+ "severity": case.classification.severity.value if case.classification.severity else None,
+ "incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
+ "observed_at": case.observed_at.isoformat(),
+ "feed_source": case.source_metadata.get("feed", ""),
+ "iocs": (
+ [{"value": v, "type": "url"} for v in case.observables.urls]
+ + [{"value": v, "type": "domain"} for v in case.observables.domains]
+ + [{"value": v, "type": "ip"} for v in case.observables.ips]
+ + [{"value": v, "type": "hash"} for v in case.observables.hashes]
+ + [{"value": v, "type": "cve"} for v in case.observables.cves]
+ ),
+ }
+ record["digest_sha256"] = _case_digest(
+ {k: v for k, v in record.items() if k != "digest_sha256"}
+ )
+ out.append(record)
+ return out
+
+
+def _build_ioc_records(window_hours: int) -> List[Dict[str, Any]]:
+ cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
+ out: List[Dict[str, Any]] = []
+ seen: set = set()
+ for ioc_type in ("url", "domain", "ip", "hash", "cve"):
+ for row in db.iocs_by_type(ioc_type):
+ first_seen = row.get("first_seen")
+ if first_seen:
+ try:
+ if datetime.fromisoformat(first_seen) < cutoff:
+ continue
+ except ValueError:
+ pass
+ key = (row["value"], row["ioc_type"])
+ if key in seen:
+ continue
+ seen.add(key)
+ record = {
+ "value": row["value"],
+ "type": row["ioc_type"],
+ "severity": row.get("severity"),
+ "first_seen": first_seen,
+ }
+ record["digest_sha256"] = hashlib.sha256(canonical_json(record)).hexdigest()
+ out.append(record)
+ return out
+
+
+def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
+ """Build the JSON feed peers will pull from /federation/feed.
+
+ Pulls cases ingested in the last `window_hours` plus the corresponding
+ IOC slice, attaches per-record `digest_sha256` (so peers can later
+ quorum-match across nodes), and signs the canonical JSON of the whole
+ payload-minus-signature with our Ed25519 key.
+ """
+ payload: Dict[str, Any] = {
+ "version": FEED_VERSION,
+ "fingerprint": node_fingerprint(),
+ "generated_at": datetime.now(timezone.utc).isoformat(),
+ "window_hours": window_hours,
+ "cases": _build_case_records(window_hours),
+ "iocs": _build_ioc_records(window_hours),
+ }
+ sig = sign_payload(canonical_json(payload))
+ payload["signature"] = base64.b64encode(sig).decode("ascii")
+ return payload
+
+
+# ---------- import + quorum-signal buffer -------------------------------
+
+class ImportSummary(BaseModel):
+ peer_fingerprint: str
+ cases_seen: int
+ iocs_seen: int
+ signal_ids: List[Tuple[str, str]] = Field(default_factory=list)
+
+
+def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result[ImportSummary, str]:
+ """Verify + record a peer's feed into the federation_signals buffer.
+
+ Does NOT merge into the local case store — that's the quorum stage's
+ job. The buffer is the per-hash signal log that quorum logic later
+ aggregates ("3 trusted peers reported this same IOC → promote").
+ """
+ sig_b64 = feed.get("signature")
+ if not sig_b64:
+ return Err("missing signature")
+ try:
+ signature = base64.b64decode(sig_b64)
+ except Exception:
+ return Err("malformed signature (not base64)")
+
+ unsigned = {k: v for k, v in feed.items() if k != "signature"}
+ if not verify_payload(canonical_json(unsigned), signature, expected_pubkey_pem):
+ return Err("signature verification failed")
+
+ peer_fp = feed.get("fingerprint", "")
+ if not peer_fp:
+ return Err("missing fingerprint")
+ if peer_fp == node_fingerprint():
+ return Err("loop: own feed")
+
+ # Cross-check the declared fingerprint matches the pubkey we verified with.
+ try:
+ if _fingerprint_for_pubkey_pem(expected_pubkey_pem) != peer_fp:
+ return Err("fingerprint does not match provided pubkey")
+ except Exception as exc:
+ return Err(f"bad pubkey: {exc}")
+
+ now = datetime.now(timezone.utc).isoformat()
+ signal_ids: List[Tuple[str, str]] = []
+ cases = feed.get("cases") or []
+ iocs = feed.get("iocs") or []
+
+ for c in cases:
+ case_id = c.get("case_id") or ""
+ digest = c.get("digest_sha256") or hashlib.sha256(canonical_json(c)).hexdigest()
+ db.record_signal(dict(
+ peer_fingerprint=peer_fp,
+ signal_type="case",
+ signal_id=case_id,
+ signal_hash=digest,
+ received_at=now,
+ raw_json=json.dumps(c, sort_keys=True),
+ ))
+ signal_ids.append(("case", digest))
+
+ for i in iocs:
+ value = i.get("value") or ""
+ digest = i.get("digest_sha256") or hashlib.sha256(canonical_json(i)).hexdigest()
+ db.record_signal(dict(
+ peer_fingerprint=peer_fp,
+ signal_type="ioc",
+ signal_id=value,
+ signal_hash=digest,
+ received_at=now,
+ raw_json=json.dumps(i, sort_keys=True),
+ ))
+ signal_ids.append(("ioc", digest))
+
+ _log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
+ return Ok(ImportSummary(
+ peer_fingerprint=peer_fp,
+ cases_seen=len(cases),
+ iocs_seen=len(iocs),
+ signal_ids=signal_ids,
+ ))
+
+
+# ---------- peer registry ------------------------------------------------
+
+class Peer(BaseModel):
+ domain: str
+ fingerprint: str
+ pubkey_pem: str
+ status: str = "unknown" # unknown | trusted | blocked
+ discovered_at: str
+ last_seen: Optional[str] = None
+ notes: Optional[str] = None
+
+
+def _row_to_peer(row: Dict[str, Any]) -> Peer:
+ return Peer(
+ domain=row["domain"],
+ fingerprint=row["fingerprint"],
+ pubkey_pem=row["pubkey_pem"],
+ status=row.get("status") or "unknown",
+ discovered_at=row.get("discovered_at") or "",
+ last_seen=row.get("last_seen"),
+ notes=row.get("notes"),
+ )
+
+
+def register_peer(domain: str, fingerprint: str, pubkey_pem: str, status: str = "unknown") -> None:
+ """Insert or update a peer in the registry. Idempotent on `domain`."""
+ now = datetime.now(timezone.utc).isoformat()
+ existing = db.get_peer(domain)
+ discovered_at = existing["discovered_at"] if existing else now
+ db.upsert_peer(dict(
+ domain=domain,
+ fingerprint=fingerprint,
+ pubkey_pem=pubkey_pem,
+ status=status,
+ discovered_at=discovered_at,
+ last_seen=now,
+ notes=existing.get("notes") if existing else None,
+ ))
+
+
+def list_peers() -> List[Peer]:
+ return [_row_to_peer(r) for r in db.list_peers()]
+
+
+def get_peer(domain: str) -> Optional[Peer]:
+ row = db.get_peer(domain)
+ return _row_to_peer(row) if row else None
+
+
+def set_peer_status(domain: str, status: str) -> None:
+ if status not in ("unknown", "trusted", "blocked"):
+ raise ValueError(f"unknown peer status: {status}")
+ db.set_peer_status(domain, status)
+
+
+def remove_peer(domain: str) -> None:
+ db.remove_peer(domain)
diff --git a/tests/test_federation.py b/tests/test_federation.py
new file mode 100644
index 0000000..3a0c599
--- /dev/null
+++ b/tests/test_federation.py
@@ -0,0 +1,230 @@
+"""Federation — identity, signed feed, peer registry, signal buffer."""
+
+from __future__ import annotations
+
+import base64
+import os
+import stat
+
+import pytest
+from sqlalchemy import create_engine
+
+from psyc import db
+from psyc.lines import federation
+from psyc.lines.federation import (
+ DNSRecord,
+ build_signed_feed,
+ canonical_json,
+ dns_record,
+ import_signed_feed,
+ node_fingerprint,
+ node_keypair,
+ public_key_pem,
+ sign_payload,
+ verify_payload,
+)
+from psyc.result import Err, Ok
+from conftest import make_case
+
+
+@pytest.fixture
+def fresh_db(tmp_path, monkeypatch):
+ test_db = tmp_path / "test.db"
+ eng = create_engine(f"sqlite:///{test_db}", future=True)
+ db._metadata.create_all(eng, checkfirst=True)
+ monkeypatch.setattr(db, "_engine", eng)
+ monkeypatch.setattr(db, "DB_PATH", test_db)
+ yield test_db
+
+
+@pytest.fixture
+def fed_dir(tmp_path, monkeypatch):
+ """Redirect federation key paths to a tmp dir so each test gets a fresh key."""
+ d = tmp_path / "federation"
+ monkeypatch.setattr(federation, "FED_DIR", d)
+ monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
+ monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
+ yield d
+
+
+def test_keypair_persisted_with_correct_perms(fed_dir):
+ priv, pub = node_keypair()
+ assert federation.PRIVATE_KEY_PATH.exists()
+ assert federation.PUBLIC_KEY_PATH.exists()
+ mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode)
+ assert mode == 0o600
+
+
+def test_keypair_idempotent_across_calls(fed_dir):
+ priv1, pub1 = node_keypair()
+ priv2, pub2 = node_keypair()
+ # raw bytes match — same key loaded twice, not regenerated
+ raw1 = pub1.public_bytes(
+ encoding=federation.serialization.Encoding.Raw,
+ format=federation.serialization.PublicFormat.Raw,
+ )
+ raw2 = pub2.public_bytes(
+ encoding=federation.serialization.Encoding.Raw,
+ format=federation.serialization.PublicFormat.Raw,
+ )
+ assert raw1 == raw2
+
+
+def test_fingerprint_is_stable_and_32_hex(fed_dir):
+ fp1 = node_fingerprint()
+ fp2 = node_fingerprint()
+ assert fp1 == fp2
+ assert len(fp1) == 32
+ assert all(c in "0123456789abcdef" for c in fp1)
+
+
+def test_sign_verify_roundtrip(fed_dir):
+ payload = b"hello federation"
+ sig = sign_payload(payload)
+ assert verify_payload(payload, sig, public_key_pem()) is True
+
+
+def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path):
+ payload = b"the truth"
+ sig = sign_payload(payload)
+
+ # Build a *different* keypair in a separate directory and use its pubkey.
+ other = tmp_path / "other-federation"
+ other.mkdir()
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import ed25519
+ other_priv = ed25519.Ed25519PrivateKey.generate()
+ other_pub_pem = other_priv.public_key().public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ ).decode("ascii")
+
+ assert verify_payload(payload, sig, other_pub_pem) is False
+
+
+def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir):
+ sig = sign_payload(b"x")
+ assert verify_payload(b"x", sig, "not a pem") is False
+
+
+def test_canonical_json_is_deterministic():
+ a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}})
+ b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}})
+ assert a == b
+
+
+def test_dns_record_txt_value_matches_spec(fed_dir):
+ rec = dns_record("example.com")
+ assert isinstance(rec, DNSRecord)
+ fp = node_fingerprint()
+ assert rec.srv_name == "_psyc._tcp.example.com"
+ assert rec.srv_target == "example.com."
+ assert rec.srv_port == 443
+ assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
+ # human instructions include both record lines
+ assert "_psyc._tcp.example.com" in rec.human_instructions
+ assert "SRV" in rec.human_instructions
+ assert "TXT" in rec.human_instructions
+
+
+def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
+ case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"])
+ db.upsert_case(case)
+ feed = build_signed_feed(window_hours=24)
+ assert feed["version"] == "psyc1"
+ assert feed["fingerprint"] == node_fingerprint()
+ assert feed["signature"]
+ # cases entry made it in
+ assert any(c["case_id"] == case.case_id for c in feed["cases"])
+
+ # Import using our own pubkey against a *different* declared fingerprint:
+ # swap the fingerprint so import_signed_feed doesn't reject as a loop.
+ pub = public_key_pem()
+ # Use a fresh keypair to act as "peer" — sign a feed with that key.
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import ed25519
+ import hashlib
+ peer_priv = ed25519.Ed25519PrivateKey.generate()
+ peer_pub_pem = peer_priv.public_key().public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ ).decode("ascii")
+ peer_raw = peer_priv.public_key().public_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw,
+ )
+ peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex()
+ feed["fingerprint"] = peer_fp
+ unsigned = {k: v for k, v in feed.items() if k != "signature"}
+ new_sig = peer_priv.sign(canonical_json(unsigned))
+ feed["signature"] = base64.b64encode(new_sig).decode("ascii")
+
+ result = import_signed_feed(feed, peer_pub_pem)
+ assert isinstance(result, Ok), getattr(result, "reason", "")
+ summary = result.value
+ assert summary.peer_fingerprint == peer_fp
+ assert summary.cases_seen >= 1
+
+
+def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir):
+ db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"]))
+ feed = build_signed_feed(window_hours=24)
+ # build a *different* pubkey to claim verification against
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import ed25519
+ other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ ).decode("ascii")
+ # also need to change fingerprint so the loop-check doesn't trigger first
+ feed["fingerprint"] = "deadbeef" * 4
+ result = import_signed_feed(feed, other_pub_pem)
+ assert isinstance(result, Err)
+
+
+def test_import_own_feed_returns_loop_err(fresh_db, fed_dir):
+ db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"]))
+ feed = build_signed_feed(window_hours=24)
+ result = import_signed_feed(feed, public_key_pem())
+ assert isinstance(result, Err)
+ assert "loop" in result.reason
+
+
+def test_record_signal_then_lookup_by_hash(fresh_db):
+ rid = db.record_signal(dict(
+ peer_fingerprint="abc123",
+ signal_type="ioc",
+ signal_id="1.2.3.4",
+ signal_hash="hash-aaa",
+ received_at="2026-01-01T00:00:00+00:00",
+ raw_json="{}",
+ ))
+ assert rid > 0
+ rows = db.signals_for_hash("hash-aaa")
+ assert len(rows) == 1
+ assert rows[0]["peer_fingerprint"] == "abc123"
+ # second peer reports the same hash → both surface for quorum check
+ db.record_signal(dict(
+ peer_fingerprint="def456",
+ signal_type="ioc",
+ signal_id="1.2.3.4",
+ signal_hash="hash-aaa",
+ received_at="2026-01-01T00:01:00+00:00",
+ raw_json="{}",
+ ))
+ rows = db.signals_for_hash("hash-aaa")
+ assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"}
+
+
+def test_peer_registry_crud(fresh_db, fed_dir):
+ federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted")
+ peers = federation.list_peers()
+ assert len(peers) == 1
+ assert peers[0].domain == "peer.example"
+ assert peers[0].status == "trusted"
+
+ federation.set_peer_status("peer.example", "blocked")
+ assert federation.get_peer("peer.example").status == "blocked"
+
+ federation.remove_peer("peer.example")
+ assert federation.list_peers() == []