stage-fed-f federation: CLI commands

This commit is contained in:
m17hr1l
2026-06-06 16:10:26 +02:00
parent 17b94acf6b
commit 2ef0448165

135
src/psyc/_federation_cli.py Normal file
View File

@@ -0,0 +1,135 @@
"""Federation CLI — keygen, DNS records, feed export, peer registry, verify.
Registered onto the top-level Typer app from cli.py so the surface stays flat.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
import httpx
import typer
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err
_log = log.get(__name__)
def register(typer_app: typer.Typer) -> None:
"""Mount all `fed-*` commands onto `typer_app`."""
@typer_app.command("fed-keygen")
def fed_keygen() -> None:
"""Generate the node's Ed25519 keypair (or load existing). Prints fingerprint."""
federation.node_keypair() # creates the files if missing
typer.echo(federation.node_fingerprint())
@typer_app.command("fed-dns")
def fed_dns(
domain: str = typer.Argument(..., help="public domain to advertise this node on"),
port: int = typer.Option(443, "--port", help="port psyc is reachable on"),
) -> None:
"""Print the DNS SRV + TXT records to publish under `domain`."""
rec = federation.dns_record(domain, port=port)
typer.echo(rec.human_instructions)
@typer_app.command("fed-feed")
def fed_feed(
window_hours: int = typer.Option(24, "--hours", help="lookback window (hours)"),
) -> None:
"""Build + print the signed feed JSON."""
db.init_db()
payload = federation.build_signed_feed(window_hours=window_hours)
typer.echo(json.dumps(payload, indent=2))
@typer_app.command("fed-verify")
def fed_verify(
peer_url: str = typer.Argument(..., help="peer base URL, e.g. https://peer.example"),
) -> None:
"""Fetch a peer's /federation/{info,key,feed} and verify the signature."""
peer_url = peer_url.rstrip("/")
try:
with httpx.Client(timeout=10.0) as client:
info = client.get(f"{peer_url}/federation/info").json()
key_text = client.get(f"{peer_url}/federation/key").text
feed = client.get(f"{peer_url}/federation/feed").json()
except Exception as exc:
typer.echo(f"error: fetch failed: {exc}", err=True)
raise typer.Exit(1)
# If the peer is already in the registry, prefer the stored pubkey
# (TOFU pin); otherwise warn and use the freshly fetched one.
declared_fp = info.get("fingerprint", "")
pubkey_pem = key_text
pinned = None
for p in federation.list_peers():
if p.fingerprint == declared_fp:
pinned = p
break
if pinned:
pubkey_pem = pinned.pubkey_pem
typer.echo(f" · using pinned pubkey for {pinned.domain}")
else:
typer.echo(" · WARNING: no pinned pubkey for this peer — trusting fetched key (TOFU)")
db.init_db()
result = federation.import_signed_feed(feed, pubkey_pem)
if isinstance(result, Err):
typer.echo(f" ✗ verification failed: {result.reason}", err=True)
raise typer.Exit(1)
s = result.value
typer.echo(f" ✓ verified peer {s.peer_fingerprint}")
typer.echo(f" cases: {s.cases_seen} iocs: {s.iocs_seen} signals buffered: {len(s.signal_ids)}")
@typer_app.command("fed-peer-add")
def fed_peer_add(
domain: str = typer.Argument(..., help="peer's public domain"),
fingerprint: str = typer.Argument(..., help="peer's 32-hex fingerprint"),
pubkey_file: Path = typer.Option(..., "--pubkey-file", help="path to peer's PEM public key"),
status: str = typer.Option("unknown", "--status", help="unknown | trusted | blocked"),
) -> None:
"""Register a peer's identity in the local registry."""
db.init_db()
pem = pubkey_file.read_text(encoding="utf-8")
federation.register_peer(domain, fingerprint, pem, status=status)
typer.echo(f"registered peer {domain} ({fingerprint[:8]}…) status={status}")
@typer_app.command("fed-peer-list")
def fed_peer_list() -> None:
"""List all registered peers."""
db.init_db()
rows = federation.list_peers()
if not rows:
typer.echo("(no peers registered)")
return
for p in rows:
typer.echo(
f" {p.status:8s} {p.domain:30s} {p.fingerprint[:8]}{p.fingerprint[-8:]}"
f" last_seen={(p.last_seen or '')[:16]}"
)
@typer_app.command("fed-peer-trust")
def fed_peer_trust(domain: str = typer.Argument(...)) -> None:
"""Mark a peer as trusted — their signals count toward quorum."""
db.init_db()
federation.set_peer_status(domain, "trusted")
typer.echo(f"{domain} → trusted")
@typer_app.command("fed-peer-block")
def fed_peer_block(domain: str = typer.Argument(...)) -> None:
"""Block a peer — ignore their feeds."""
db.init_db()
federation.set_peer_status(domain, "blocked")
typer.echo(f"{domain} → blocked")
@typer_app.command("fed-peer-remove")
def fed_peer_remove(domain: str = typer.Argument(...)) -> None:
"""Drop a peer from the registry."""
db.init_db()
federation.remove_peer(domain)
typer.echo(f"removed {domain}")