From 4c35aad2bb6be2ae0797ba544bfe0ebfa62162f9 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:08:03 +0200 Subject: [PATCH 1/7] stage-fed-a federation: ed25519 keypair + fingerprint --- pyproject.toml | 1 + src/psyc/lines/federation.py | 95 ++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/psyc/lines/federation.py diff --git a/pyproject.toml b/pyproject.toml index 74b3cd4..6a4f932 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "httpx>=0.27", "typer>=0.12", "pynacl>=1.5", + "cryptography>=42.0", "structlog>=24.1", "sqlalchemy>=2.0", "python-dotenv>=1.0", diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py new file mode 100644 index 0000000..28c6d9f --- /dev/null +++ b/src/psyc/lines/federation.py @@ -0,0 +1,95 @@ +"""Federation — node identity, signed feeds, peer registry. + +Identity layer for internet-wide federation of psyc nodes. Each node owns +an Ed25519 keypair persisted under DATA_DIR/federation/. The public key +fingerprint (first 16 bytes of SHA256(raw_pubkey) hex-encoded) goes into a +DNS TXT record so peers can discover and authenticate the node, and the +private key signs the outbound feed at /federation/feed. + +This module is the *identity* primitives only — discovery walkers, +vouching/quorum, transparency log and auto-pull live in later stages. +""" + +from __future__ import annotations + +import hashlib +import os +from typing import Tuple + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 + +from psyc import DATA_DIR, log + + +_log = log.get(__name__) + +FED_DIR = DATA_DIR / "federation" +PRIVATE_KEY_PATH = FED_DIR / "node.key" +PUBLIC_KEY_PATH = FED_DIR / "node.pub" + + +# ---------- keypair persistence ----------------------------------------- + +def _ensure_dir() -> None: + FED_DIR.mkdir(parents=True, exist_ok=True) + + +def node_keypair() -> Tuple[ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey]: + """Return the node's Ed25519 keypair, generating + persisting it on first call. + + Private key lands at data/federation/node.key (PEM, chmod 0600); public + at data/federation/node.pub (PEM). Idempotent — subsequent calls load + the existing files instead of generating new ones. + """ + _ensure_dir() + if PRIVATE_KEY_PATH.exists() and PUBLIC_KEY_PATH.exists(): + priv_pem = PRIVATE_KEY_PATH.read_bytes() + priv = serialization.load_pem_private_key(priv_pem, password=None) + if not isinstance(priv, ed25519.Ed25519PrivateKey): + raise RuntimeError(f"federation key at {PRIVATE_KEY_PATH} is not Ed25519") + return priv, priv.public_key() + + priv = ed25519.Ed25519PrivateKey.generate() + priv_pem = priv.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + pub = priv.public_key() + pub_pem = pub.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + PRIVATE_KEY_PATH.write_bytes(priv_pem) + os.chmod(PRIVATE_KEY_PATH, 0o600) + PUBLIC_KEY_PATH.write_bytes(pub_pem) + _log.info("federation.keypair.generated", path=str(PRIVATE_KEY_PATH)) + return priv, pub + + +def public_key_pem() -> str: + """PEM-encoded public key as text — what peers store + verify against.""" + _, pub = node_keypair() + return pub.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + + +def _raw_pubkey_bytes(pub: ed25519.Ed25519PublicKey) -> bytes: + return pub.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + + +def node_fingerprint() -> str: + """Short stable id for the node — first 16 bytes of SHA256(raw_pubkey), hex. + + Lives in DNS TXT records; 32 hex chars is short enough to fit but long + enough to be collision-safe for any plausible peer population. + """ + _, pub = node_keypair() + digest = hashlib.sha256(_raw_pubkey_bytes(pub)).digest() + return digest[:16].hex() From 50158f7fa8d2cc8d78fb769dbf75efb0adaa295c Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:08:31 +0200 Subject: [PATCH 2/7] stage-fed-b federation: dns record format --- src/psyc/lines/federation.py | 58 ++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py index 28c6d9f..b031c1c 100644 --- a/src/psyc/lines/federation.py +++ b/src/psyc/lines/federation.py @@ -18,6 +18,7 @@ from typing import Tuple from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 +from pydantic import BaseModel from psyc import DATA_DIR, log @@ -28,6 +29,10 @@ FED_DIR = DATA_DIR / "federation" PRIVATE_KEY_PATH = FED_DIR / "node.key" PUBLIC_KEY_PATH = FED_DIR / "node.pub" +FEED_VERSION = "psyc1" +FEED_ALG = "ed25519" +FEED_PATH = "/federation/feed" + # ---------- keypair persistence ----------------------------------------- @@ -93,3 +98,56 @@ def node_fingerprint() -> str: _, pub = node_keypair() digest = hashlib.sha256(_raw_pubkey_bytes(pub)).digest() return digest[:16].hex() + + +def _fingerprint_for_pubkey_pem(pubkey_pem: str) -> str: + pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii")) + if not isinstance(pub, ed25519.Ed25519PublicKey): + raise ValueError("not an Ed25519 public key") + return hashlib.sha256(_raw_pubkey_bytes(pub)).digest()[:16].hex() + + +# ---------- DNS record format ------------------------------------------- + +class DNSRecord(BaseModel): + """The SRV + TXT pair an admin pastes into their zone file.""" + srv_name: str + srv_target: str + srv_port: int + srv_priority: int = 10 + srv_weight: int = 10 + txt_name: str + txt_value: str + human_instructions: str + + +def dns_record(domain: str, port: int = 443) -> DNSRecord: + """Build the DNS-SD-style records that advertise this node at `domain`.""" + fp = node_fingerprint() + srv_name = f"_psyc._tcp.{domain}" + srv_target = f"{domain}." + txt_name = srv_name + txt_value = f"v={FEED_VERSION} fp={fp} alg={FEED_ALG} path={FEED_PATH}" + instructions = ( + f"; psyc federation records for {domain}\n" + f"; ----------------------------------------------------------\n" + f"; 1) SRV record — locates this psyc node (host + port).\n" + f'{srv_name}. 3600 IN SRV 10 10 {port} {srv_target}\n' + f";\n" + f"; 2) TXT record — declares protocol version, key fingerprint,\n" + f"; signature algorithm, and the feed endpoint path.\n" + f'{txt_name}. 3600 IN TXT "{txt_value}"\n' + f"; ----------------------------------------------------------\n" + f"; Once these are live, federation peers can fetch:\n" + f"; https://{domain}{FEED_PATH} (signed feed JSON)\n" + f"; https://{domain}/federation/key (public key PEM)\n" + f"; https://{domain}/federation/info (capabilities)\n" + ) + return DNSRecord( + srv_name=srv_name, + srv_target=srv_target, + srv_port=port, + txt_name=txt_name, + txt_value=txt_value, + human_instructions=instructions, + ) From 63e3ff2777d21cc3b5367a20ea11c0fe15e96bc1 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:08:36 +0200 Subject: [PATCH 3/7] stage-fed-c federation: db tables for peers + signal buffer --- src/psyc/db.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/psyc/db.py b/src/psyc/db.py index 1b12436..0f7506d 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -114,6 +114,33 @@ Index("iocs_value_idx", iocs.c.value) Index("iocs_type_idx", iocs.c.ioc_type) Index("iocs_case_idx", iocs.c.case_id) +peers = Table( + "peers", _metadata, + Column("domain", String, primary_key=True), + Column("fingerprint", String, nullable=False), + Column("pubkey_pem", Text, nullable=False), + Column("status", String, nullable=False), # unknown | trusted | blocked + Column("discovered_at", String, nullable=False), + Column("last_seen", String, nullable=True), + Column("notes", Text, nullable=True), +) +Index("peers_fp_idx", peers.c.fingerprint) +Index("peers_status_idx", peers.c.status) + +federation_signals = Table( + "federation_signals", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("peer_fingerprint", String, nullable=False), + Column("signal_type", String, nullable=False), # case | ioc + Column("signal_id", String, nullable=False), # case_id or ioc value + Column("signal_hash", String, nullable=False), # sha256 of canonical record + Column("received_at", String, nullable=False), + Column("raw_json", Text, nullable=False), +) +Index("federation_signals_hash_idx", federation_signals.c.signal_hash) +Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint) +Index("federation_signals_received_idx", federation_signals.c.received_at.desc()) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -214,3 +241,61 @@ def ioc_count(db_path: Path = DB_PATH) -> int: stmt = select(func.count()).select_from(iocs) with engine(db_path).connect() as conn: return conn.execute(stmt).scalar_one() + + +# ---------- federation: peers + signal buffer ---------------------------- + +def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: + """Insert-or-update a peer by domain. `row` must include `domain`.""" + stmt = sqlite_insert(peers).values(**row) + update_cols = {k: stmt.excluded[k] for k in row if k != "domain"} + stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def list_peers(db_path: Path = DB_PATH) -> List[dict]: + stmt = select(peers).order_by(peers.c.discovered_at.desc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]: + stmt = select(peers).where(peers.c.domain == domain) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return dict(row._mapping) if row else None + + +def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None: + from sqlalchemy import update as sa_update + stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def remove_peer(domain: str, db_path: Path = DB_PATH) -> None: + stmt = peers.delete().where(peers.c.domain == domain) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def record_signal(row: dict, db_path: Path = DB_PATH) -> int: + """Append one federation signal. Returns the inserted row id.""" + stmt = insert(federation_signals).values(**row) + with engine(db_path).begin() as conn: + res = conn.execute(stmt) + return int(res.inserted_primary_key[0]) + + +def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]: + """All recorded signals matching `signal_hash` — quorum-lookup primitive.""" + stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] From 55ffd9da3d61090c999c7fdc777de1087701a0ac Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:09:53 +0200 Subject: [PATCH 4/7] stage-fed-d federation: signed feed export + verified import --- src/psyc/lines/federation.py | 258 ++++++++++++++++++++++++++++++++++- 1 file changed, 255 insertions(+), 3 deletions(-) diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py index b031c1c..bed46fb 100644 --- a/src/psyc/lines/federation.py +++ b/src/psyc/lines/federation.py @@ -12,15 +12,19 @@ vouching/quorum, transparency log and auto-pull live in later stages. from __future__ import annotations +import base64 import hashlib +import json import os -from typing import Tuple +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 -from pydantic import BaseModel +from pydantic import BaseModel, Field -from psyc import DATA_DIR, log +from psyc import DATA_DIR, db, log +from psyc.result import Err, Ok, Result _log = log.get(__name__) @@ -151,3 +155,251 @@ def dns_record(domain: str, port: int = 443) -> DNSRecord: txt_value=txt_value, human_instructions=instructions, ) + + +# ---------- signing ----------------------------------------------------- + +def canonical_json(obj: Dict[str, Any]) -> bytes: + """Deterministic JSON serialization — what we sign + hash over.""" + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def sign_payload(payload: bytes) -> bytes: + """Ed25519 signature over `payload`. Raw 64-byte sig.""" + priv, _ = node_keypair() + return priv.sign(payload) + + +def verify_payload(payload: bytes, signature: bytes, pubkey_pem: str) -> bool: + """True iff `signature` verifies under `pubkey_pem`. Never raises.""" + try: + pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii")) + if not isinstance(pub, ed25519.Ed25519PublicKey): + return False + pub.verify(signature, payload) + return True + except Exception: + return False + + +# ---------- feed export ------------------------------------------------- + +def _case_digest(case_record: Dict[str, Any]) -> str: + return hashlib.sha256(canonical_json(case_record)).hexdigest() + + +def _build_case_records(window_hours: int) -> List[Dict[str, Any]]: + cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours) + out: List[Dict[str, Any]] = [] + for case in db.list_cases(limit=10_000): + if case.ingested_at < cutoff: + continue + record: Dict[str, Any] = { + "case_id": case.case_id, + "summary": case.summary, + "severity": case.classification.severity.value if case.classification.severity else None, + "incident_type": case.classification.incident_type.value if case.classification.incident_type else None, + "observed_at": case.observed_at.isoformat(), + "feed_source": case.source_metadata.get("feed", ""), + "iocs": ( + [{"value": v, "type": "url"} for v in case.observables.urls] + + [{"value": v, "type": "domain"} for v in case.observables.domains] + + [{"value": v, "type": "ip"} for v in case.observables.ips] + + [{"value": v, "type": "hash"} for v in case.observables.hashes] + + [{"value": v, "type": "cve"} for v in case.observables.cves] + ), + } + record["digest_sha256"] = _case_digest( + {k: v for k, v in record.items() if k != "digest_sha256"} + ) + out.append(record) + return out + + +def _build_ioc_records(window_hours: int) -> List[Dict[str, Any]]: + cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours) + out: List[Dict[str, Any]] = [] + seen: set = set() + for ioc_type in ("url", "domain", "ip", "hash", "cve"): + for row in db.iocs_by_type(ioc_type): + first_seen = row.get("first_seen") + if first_seen: + try: + if datetime.fromisoformat(first_seen) < cutoff: + continue + except ValueError: + pass + key = (row["value"], row["ioc_type"]) + if key in seen: + continue + seen.add(key) + record = { + "value": row["value"], + "type": row["ioc_type"], + "severity": row.get("severity"), + "first_seen": first_seen, + } + record["digest_sha256"] = hashlib.sha256(canonical_json(record)).hexdigest() + out.append(record) + return out + + +def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]: + """Build the JSON feed peers will pull from /federation/feed. + + Pulls cases ingested in the last `window_hours` plus the corresponding + IOC slice, attaches per-record `digest_sha256` (so peers can later + quorum-match across nodes), and signs the canonical JSON of the whole + payload-minus-signature with our Ed25519 key. + """ + payload: Dict[str, Any] = { + "version": FEED_VERSION, + "fingerprint": node_fingerprint(), + "generated_at": datetime.now(timezone.utc).isoformat(), + "window_hours": window_hours, + "cases": _build_case_records(window_hours), + "iocs": _build_ioc_records(window_hours), + } + sig = sign_payload(canonical_json(payload)) + payload["signature"] = base64.b64encode(sig).decode("ascii") + return payload + + +# ---------- import + quorum-signal buffer ------------------------------- + +class ImportSummary(BaseModel): + peer_fingerprint: str + cases_seen: int + iocs_seen: int + signal_ids: List[Tuple[str, str]] = Field(default_factory=list) + + +def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result[ImportSummary, str]: + """Verify + record a peer's feed into the federation_signals buffer. + + Does NOT merge into the local case store — that's the quorum stage's + job. The buffer is the per-hash signal log that quorum logic later + aggregates ("3 trusted peers reported this same IOC → promote"). + """ + sig_b64 = feed.get("signature") + if not sig_b64: + return Err("missing signature") + try: + signature = base64.b64decode(sig_b64) + except Exception: + return Err("malformed signature (not base64)") + + unsigned = {k: v for k, v in feed.items() if k != "signature"} + if not verify_payload(canonical_json(unsigned), signature, expected_pubkey_pem): + return Err("signature verification failed") + + peer_fp = feed.get("fingerprint", "") + if not peer_fp: + return Err("missing fingerprint") + if peer_fp == node_fingerprint(): + return Err("loop: own feed") + + # Cross-check the declared fingerprint matches the pubkey we verified with. + try: + if _fingerprint_for_pubkey_pem(expected_pubkey_pem) != peer_fp: + return Err("fingerprint does not match provided pubkey") + except Exception as exc: + return Err(f"bad pubkey: {exc}") + + now = datetime.now(timezone.utc).isoformat() + signal_ids: List[Tuple[str, str]] = [] + cases = feed.get("cases") or [] + iocs = feed.get("iocs") or [] + + for c in cases: + case_id = c.get("case_id") or "" + digest = c.get("digest_sha256") or hashlib.sha256(canonical_json(c)).hexdigest() + db.record_signal(dict( + peer_fingerprint=peer_fp, + signal_type="case", + signal_id=case_id, + signal_hash=digest, + received_at=now, + raw_json=json.dumps(c, sort_keys=True), + )) + signal_ids.append(("case", digest)) + + for i in iocs: + value = i.get("value") or "" + digest = i.get("digest_sha256") or hashlib.sha256(canonical_json(i)).hexdigest() + db.record_signal(dict( + peer_fingerprint=peer_fp, + signal_type="ioc", + signal_id=value, + signal_hash=digest, + received_at=now, + raw_json=json.dumps(i, sort_keys=True), + )) + signal_ids.append(("ioc", digest)) + + _log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs)) + return Ok(ImportSummary( + peer_fingerprint=peer_fp, + cases_seen=len(cases), + iocs_seen=len(iocs), + signal_ids=signal_ids, + )) + + +# ---------- peer registry ------------------------------------------------ + +class Peer(BaseModel): + domain: str + fingerprint: str + pubkey_pem: str + status: str = "unknown" # unknown | trusted | blocked + discovered_at: str + last_seen: Optional[str] = None + notes: Optional[str] = None + + +def _row_to_peer(row: Dict[str, Any]) -> Peer: + return Peer( + domain=row["domain"], + fingerprint=row["fingerprint"], + pubkey_pem=row["pubkey_pem"], + status=row.get("status") or "unknown", + discovered_at=row.get("discovered_at") or "", + last_seen=row.get("last_seen"), + notes=row.get("notes"), + ) + + +def register_peer(domain: str, fingerprint: str, pubkey_pem: str, status: str = "unknown") -> None: + """Insert or update a peer in the registry. Idempotent on `domain`.""" + now = datetime.now(timezone.utc).isoformat() + existing = db.get_peer(domain) + discovered_at = existing["discovered_at"] if existing else now + db.upsert_peer(dict( + domain=domain, + fingerprint=fingerprint, + pubkey_pem=pubkey_pem, + status=status, + discovered_at=discovered_at, + last_seen=now, + notes=existing.get("notes") if existing else None, + )) + + +def list_peers() -> List[Peer]: + return [_row_to_peer(r) for r in db.list_peers()] + + +def get_peer(domain: str) -> Optional[Peer]: + row = db.get_peer(domain) + return _row_to_peer(row) if row else None + + +def set_peer_status(domain: str, status: str) -> None: + if status not in ("unknown", "trusted", "blocked"): + raise ValueError(f"unknown peer status: {status}") + db.set_peer_status(domain, status) + + +def remove_peer(domain: str) -> None: + db.remove_peer(domain) From 17b94acf6bca965654acd83f59c9c7a0da3d90be Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:10:19 +0200 Subject: [PATCH 5/7] stage-fed-e federation: cockpit admin page + public feed routes --- src/psyc/cockpit/federation_routes.py | 125 +++++++++++++++++ .../cockpit/templates/admin_federation.html | 132 ++++++++++++++++++ 2 files changed, 257 insertions(+) create mode 100644 src/psyc/cockpit/federation_routes.py create mode 100644 src/psyc/cockpit/templates/admin_federation.html diff --git a/src/psyc/cockpit/federation_routes.py b/src/psyc/cockpit/federation_routes.py new file mode 100644 index 0000000..d402f38 --- /dev/null +++ b/src/psyc/cockpit/federation_routes.py @@ -0,0 +1,125 @@ +"""Federation cockpit routes — admin page, public feed/key/info endpoints. + +Wired into the FastAPI app by app.py via a single `register(app, TEMPLATES)` +call so the federation surface stays self-contained. +""" + +from __future__ import annotations + +import json +import time +from typing import Any, Dict, Optional, Tuple + +from fastapi import FastAPI, Form, HTTPException, Request +from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse +from fastapi.templating import Jinja2Templates + +from psyc import db, log +from psyc.lines import federation + + +_log = log.get(__name__) + +# Tiny in-memory cache for the signed feed — peers may poll, recomputing +# canonical JSON + signature on every hit would be wasteful. +_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None} +_FEED_TTL = 60.0 + + +def _admin_ok(request: Request) -> bool: + return bool(request.session.get("admin_ok")) + + +def _cached_feed() -> Dict[str, Any]: + now = time.time() + if _FEED_CACHE["payload"] is None or (now - _FEED_CACHE["ts"]) > _FEED_TTL: + _FEED_CACHE["payload"] = federation.build_signed_feed() + _FEED_CACHE["ts"] = now + return _FEED_CACHE["payload"] + + +def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: + """Mount all federation routes onto `app`.""" + + @app.get("/admin/federation", response_class=HTMLResponse) + def admin_federation(request: Request) -> HTMLResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + host = request.url.hostname or "your-node.example" + suggested = request.query_params.get("domain", host) + rec = federation.dns_record(suggested) + peers = federation.list_peers() + signals = db.recent_signals(limit=20) + return TEMPLATES.TemplateResponse( + request, + "admin_federation.html", + { + "fingerprint": federation.node_fingerprint(), + "pubkey_pem": federation.public_key_pem(), + "suggested_domain": suggested, + "dns": rec, + "peers": peers, + "signals": signals, + }, + ) + + @app.post("/admin/federation/peers/add") + def admin_federation_add_peer( + request: Request, + domain: str = Form(...), + fingerprint: str = Form(...), + pubkey_pem: str = Form(...), + status: str = Form("unknown"), + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + try: + federation.register_peer(domain.strip(), fingerprint.strip(), pubkey_pem.strip(), status=status) + except Exception as exc: + _log.warning("federation.peer.add.error", domain=domain, error=str(exc)) + return RedirectResponse("/admin/federation", status_code=303) + + @app.post("/admin/federation/peers/{domain}/status") + def admin_federation_set_status( + request: Request, + domain: str, + status: str = Form(...), + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + try: + federation.set_peer_status(domain, status) + except ValueError as exc: + _log.warning("federation.peer.status.bad", domain=domain, status=status, error=str(exc)) + return RedirectResponse("/admin/federation", status_code=303) + + @app.post("/admin/federation/peers/{domain}/remove") + def admin_federation_remove( + request: Request, + domain: str, + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + federation.remove_peer(domain) + return RedirectResponse("/admin/federation", status_code=303) + + # ---------- public endpoints -------------------------------------- + + @app.get("/federation/info") + def federation_info() -> JSONResponse: + return JSONResponse({ + "fingerprint": federation.node_fingerprint(), + "version": federation.FEED_VERSION, + "feed": federation.FEED_PATH, + "key": "/federation/key", + }) + + @app.get("/federation/key", response_class=PlainTextResponse) + def federation_key() -> PlainTextResponse: + return PlainTextResponse(federation.public_key_pem(), media_type="text/plain") + + @app.get("/federation/feed") + def federation_feed() -> JSONResponse: + return JSONResponse(_cached_feed()) + + _log.info("federation.routes.registered") diff --git a/src/psyc/cockpit/templates/admin_federation.html b/src/psyc/cockpit/templates/admin_federation.html new file mode 100644 index 0000000..b9493bd --- /dev/null +++ b/src/psyc/cockpit/templates/admin_federation.html @@ -0,0 +1,132 @@ +{% extends "base.html" %} +{% block title %}Federation — psyc admin{% endblock %} +{% block content %} + +
+
+

Federation Identity

+ {{ peers|length }} peer{{ '' if peers|length == 1 else 's' }} +
+

This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.

+

← back to admin

+ +
+
node fingerprint
+
{{ fingerprint }}
+
+ public key (PEM) +
{{ pubkey_pem }}
+
+
+
+ +
+
+

Publish via DNS

+ SRV + TXT records +
+

Paste these into your zone file. Once they're live, any peer that knows your domain can discover the node and pin the right key without out-of-band coordination.

+ +
+ + +
+ +
{{ dns.human_instructions }}
+
+ +
+
+

Known Peers

+ {{ peers|length }} registered +
+

Trusted peers' feeds are signature-verified on every poll. Blocked peers are recorded but ignored. Unknown peers are kept for review — nothing flows from them until you set them trusted.

+ + {% if peers %} + + + + {% for p in peers %} + + + + + + + + + {% endfor %} + +
DomainFingerprintStatusDiscoveredLast seen
{{ p.domain }}{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }} + {% if p.status == 'trusted' %} + trusted + {% elif p.status == 'blocked' %} + blocked + {% else %} + unknown + {% endif %} + {{ (p.discovered_at or '')[:16] | replace('T', ' ') }}{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }} +
+ + +
+
+ + +
+
+ +
+
+ {% else %} +

(no peers yet — add one below)

+ {% endif %} +
+ +
+
+

Add Peer

+
+

Pin a peer's identity manually: their domain, their fingerprint (from their DNS TXT record), and the public key they publish at /federation/key.

+
+ + + + + +
+
+ +
+
+

Recent Signals

+ last {{ signals|length }} of buffer +
+

Verified federation signals from peers — case + IOC reports awaiting quorum. The signal buffer is what later quorum logic will count over.

+ + {% if signals %} + + + + {% for s in signals %} + + + + + + + + {% endfor %} + +
ReceivedPeerTypeIdHash
{{ (s.received_at or '')[:19] | replace('T', ' ') }}{{ s.peer_fingerprint[:8] }}…{{ s.signal_type }}{{ s.signal_id[:48] }}{{ s.signal_hash[:16] }}…
+ {% else %} +

(no signals received yet — quorum stage will populate this)

+ {% endif %} +
+ +{% endblock %} From 2ef0448165dbd9f681c67626481d4ab62e4b1333 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:10:26 +0200 Subject: [PATCH 6/7] stage-fed-f federation: CLI commands --- src/psyc/_federation_cli.py | 135 ++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 src/psyc/_federation_cli.py diff --git a/src/psyc/_federation_cli.py b/src/psyc/_federation_cli.py new file mode 100644 index 0000000..e01bbe6 --- /dev/null +++ b/src/psyc/_federation_cli.py @@ -0,0 +1,135 @@ +"""Federation CLI — keygen, DNS records, feed export, peer registry, verify. + +Registered onto the top-level Typer app from cli.py so the surface stays flat. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Optional + +import httpx +import typer + +from psyc import db, log +from psyc.lines import federation +from psyc.result import Err + + +_log = log.get(__name__) + + +def register(typer_app: typer.Typer) -> None: + """Mount all `fed-*` commands onto `typer_app`.""" + + @typer_app.command("fed-keygen") + def fed_keygen() -> None: + """Generate the node's Ed25519 keypair (or load existing). Prints fingerprint.""" + federation.node_keypair() # creates the files if missing + typer.echo(federation.node_fingerprint()) + + @typer_app.command("fed-dns") + def fed_dns( + domain: str = typer.Argument(..., help="public domain to advertise this node on"), + port: int = typer.Option(443, "--port", help="port psyc is reachable on"), + ) -> None: + """Print the DNS SRV + TXT records to publish under `domain`.""" + rec = federation.dns_record(domain, port=port) + typer.echo(rec.human_instructions) + + @typer_app.command("fed-feed") + def fed_feed( + window_hours: int = typer.Option(24, "--hours", help="lookback window (hours)"), + ) -> None: + """Build + print the signed feed JSON.""" + db.init_db() + payload = federation.build_signed_feed(window_hours=window_hours) + typer.echo(json.dumps(payload, indent=2)) + + @typer_app.command("fed-verify") + def fed_verify( + peer_url: str = typer.Argument(..., help="peer base URL, e.g. https://peer.example"), + ) -> None: + """Fetch a peer's /federation/{info,key,feed} and verify the signature.""" + peer_url = peer_url.rstrip("/") + try: + with httpx.Client(timeout=10.0) as client: + info = client.get(f"{peer_url}/federation/info").json() + key_text = client.get(f"{peer_url}/federation/key").text + feed = client.get(f"{peer_url}/federation/feed").json() + except Exception as exc: + typer.echo(f"error: fetch failed: {exc}", err=True) + raise typer.Exit(1) + + # If the peer is already in the registry, prefer the stored pubkey + # (TOFU pin); otherwise warn and use the freshly fetched one. + declared_fp = info.get("fingerprint", "") + pubkey_pem = key_text + pinned = None + for p in federation.list_peers(): + if p.fingerprint == declared_fp: + pinned = p + break + if pinned: + pubkey_pem = pinned.pubkey_pem + typer.echo(f" · using pinned pubkey for {pinned.domain}") + else: + typer.echo(" · WARNING: no pinned pubkey for this peer — trusting fetched key (TOFU)") + + db.init_db() + result = federation.import_signed_feed(feed, pubkey_pem) + if isinstance(result, Err): + typer.echo(f" ✗ verification failed: {result.reason}", err=True) + raise typer.Exit(1) + s = result.value + typer.echo(f" ✓ verified peer {s.peer_fingerprint}") + typer.echo(f" cases: {s.cases_seen} iocs: {s.iocs_seen} signals buffered: {len(s.signal_ids)}") + + @typer_app.command("fed-peer-add") + def fed_peer_add( + domain: str = typer.Argument(..., help="peer's public domain"), + fingerprint: str = typer.Argument(..., help="peer's 32-hex fingerprint"), + pubkey_file: Path = typer.Option(..., "--pubkey-file", help="path to peer's PEM public key"), + status: str = typer.Option("unknown", "--status", help="unknown | trusted | blocked"), + ) -> None: + """Register a peer's identity in the local registry.""" + db.init_db() + pem = pubkey_file.read_text(encoding="utf-8") + federation.register_peer(domain, fingerprint, pem, status=status) + typer.echo(f"registered peer {domain} ({fingerprint[:8]}…) status={status}") + + @typer_app.command("fed-peer-list") + def fed_peer_list() -> None: + """List all registered peers.""" + db.init_db() + rows = federation.list_peers() + if not rows: + typer.echo("(no peers registered)") + return + for p in rows: + typer.echo( + f" {p.status:8s} {p.domain:30s} {p.fingerprint[:8]}…{p.fingerprint[-8:]}" + f" last_seen={(p.last_seen or '—')[:16]}" + ) + + @typer_app.command("fed-peer-trust") + def fed_peer_trust(domain: str = typer.Argument(...)) -> None: + """Mark a peer as trusted — their signals count toward quorum.""" + db.init_db() + federation.set_peer_status(domain, "trusted") + typer.echo(f"{domain} → trusted") + + @typer_app.command("fed-peer-block") + def fed_peer_block(domain: str = typer.Argument(...)) -> None: + """Block a peer — ignore their feeds.""" + db.init_db() + federation.set_peer_status(domain, "blocked") + typer.echo(f"{domain} → blocked") + + @typer_app.command("fed-peer-remove") + def fed_peer_remove(domain: str = typer.Argument(...)) -> None: + """Drop a peer from the registry.""" + db.init_db() + federation.remove_peer(domain) + typer.echo(f"removed {domain}") From d4229dd26493874cca4f3ce9d03e46cfd6a38919 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 16:10:31 +0200 Subject: [PATCH 7/7] stage-fed-g federation: tests --- tests/test_federation.py | 230 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 tests/test_federation.py diff --git a/tests/test_federation.py b/tests/test_federation.py new file mode 100644 index 0000000..3a0c599 --- /dev/null +++ b/tests/test_federation.py @@ -0,0 +1,230 @@ +"""Federation — identity, signed feed, peer registry, signal buffer.""" + +from __future__ import annotations + +import base64 +import os +import stat + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import federation +from psyc.lines.federation import ( + DNSRecord, + build_signed_feed, + canonical_json, + dns_record, + import_signed_feed, + node_fingerprint, + node_keypair, + public_key_pem, + sign_payload, + verify_payload, +) +from psyc.result import Err, Ok +from conftest import make_case + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + """Redirect federation key paths to a tmp dir so each test gets a fresh key.""" + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +def test_keypair_persisted_with_correct_perms(fed_dir): + priv, pub = node_keypair() + assert federation.PRIVATE_KEY_PATH.exists() + assert federation.PUBLIC_KEY_PATH.exists() + mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode) + assert mode == 0o600 + + +def test_keypair_idempotent_across_calls(fed_dir): + priv1, pub1 = node_keypair() + priv2, pub2 = node_keypair() + # raw bytes match — same key loaded twice, not regenerated + raw1 = pub1.public_bytes( + encoding=federation.serialization.Encoding.Raw, + format=federation.serialization.PublicFormat.Raw, + ) + raw2 = pub2.public_bytes( + encoding=federation.serialization.Encoding.Raw, + format=federation.serialization.PublicFormat.Raw, + ) + assert raw1 == raw2 + + +def test_fingerprint_is_stable_and_32_hex(fed_dir): + fp1 = node_fingerprint() + fp2 = node_fingerprint() + assert fp1 == fp2 + assert len(fp1) == 32 + assert all(c in "0123456789abcdef" for c in fp1) + + +def test_sign_verify_roundtrip(fed_dir): + payload = b"hello federation" + sig = sign_payload(payload) + assert verify_payload(payload, sig, public_key_pem()) is True + + +def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path): + payload = b"the truth" + sig = sign_payload(payload) + + # Build a *different* keypair in a separate directory and use its pubkey. + other = tmp_path / "other-federation" + other.mkdir() + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + other_priv = ed25519.Ed25519PrivateKey.generate() + other_pub_pem = other_priv.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + + assert verify_payload(payload, sig, other_pub_pem) is False + + +def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir): + sig = sign_payload(b"x") + assert verify_payload(b"x", sig, "not a pem") is False + + +def test_canonical_json_is_deterministic(): + a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}}) + b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}}) + assert a == b + + +def test_dns_record_txt_value_matches_spec(fed_dir): + rec = dns_record("example.com") + assert isinstance(rec, DNSRecord) + fp = node_fingerprint() + assert rec.srv_name == "_psyc._tcp.example.com" + assert rec.srv_target == "example.com." + assert rec.srv_port == 443 + assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed" + # human instructions include both record lines + assert "_psyc._tcp.example.com" in rec.human_instructions + assert "SRV" in rec.human_instructions + assert "TXT" in rec.human_instructions + + +def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir): + case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"]) + db.upsert_case(case) + feed = build_signed_feed(window_hours=24) + assert feed["version"] == "psyc1" + assert feed["fingerprint"] == node_fingerprint() + assert feed["signature"] + # cases entry made it in + assert any(c["case_id"] == case.case_id for c in feed["cases"]) + + # Import using our own pubkey against a *different* declared fingerprint: + # swap the fingerprint so import_signed_feed doesn't reject as a loop. + pub = public_key_pem() + # Use a fresh keypair to act as "peer" — sign a feed with that key. + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + import hashlib + peer_priv = ed25519.Ed25519PrivateKey.generate() + peer_pub_pem = peer_priv.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + peer_raw = peer_priv.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex() + feed["fingerprint"] = peer_fp + unsigned = {k: v for k, v in feed.items() if k != "signature"} + new_sig = peer_priv.sign(canonical_json(unsigned)) + feed["signature"] = base64.b64encode(new_sig).decode("ascii") + + result = import_signed_feed(feed, peer_pub_pem) + assert isinstance(result, Ok), getattr(result, "reason", "") + summary = result.value + assert summary.peer_fingerprint == peer_fp + assert summary.cases_seen >= 1 + + +def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir): + db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"])) + feed = build_signed_feed(window_hours=24) + # build a *different* pubkey to claim verification against + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + # also need to change fingerprint so the loop-check doesn't trigger first + feed["fingerprint"] = "deadbeef" * 4 + result = import_signed_feed(feed, other_pub_pem) + assert isinstance(result, Err) + + +def test_import_own_feed_returns_loop_err(fresh_db, fed_dir): + db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"])) + feed = build_signed_feed(window_hours=24) + result = import_signed_feed(feed, public_key_pem()) + assert isinstance(result, Err) + assert "loop" in result.reason + + +def test_record_signal_then_lookup_by_hash(fresh_db): + rid = db.record_signal(dict( + peer_fingerprint="abc123", + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="hash-aaa", + received_at="2026-01-01T00:00:00+00:00", + raw_json="{}", + )) + assert rid > 0 + rows = db.signals_for_hash("hash-aaa") + assert len(rows) == 1 + assert rows[0]["peer_fingerprint"] == "abc123" + # second peer reports the same hash → both surface for quorum check + db.record_signal(dict( + peer_fingerprint="def456", + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="hash-aaa", + received_at="2026-01-01T00:01:00+00:00", + raw_json="{}", + )) + rows = db.signals_for_hash("hash-aaa") + assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"} + + +def test_peer_registry_crud(fresh_db, fed_dir): + federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted") + peers = federation.list_peers() + assert len(peers) == 1 + assert peers[0].domain == "peer.example" + assert peers[0].status == "trusted" + + federation.set_peer_status("peer.example", "blocked") + assert federation.get_peer("peer.example").status == "blocked" + + federation.remove_peer("peer.example") + assert federation.list_peers() == []