From de6204819b85a1d3951f96ecb2d9c19aac715a13 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:03:33 +0200 Subject: [PATCH] stage-disc-a discovery: dnssd resolver + public peers endpoint --- pyproject.toml | 1 + src/psyc/cockpit/federation_routes.py | 23 ++- src/psyc/lines/discovery.py | 265 ++++++++++++++++++++++++++ 3 files changed, 288 insertions(+), 1 deletion(-) create mode 100644 src/psyc/lines/discovery.py diff --git a/pyproject.toml b/pyproject.toml index 6a4f932..287e2d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "pyotp>=2.9", "qrcode[pil]>=7.4", "itsdangerous>=2.1", + "dnspython>=2.4", ] [project.optional-dependencies] diff --git a/src/psyc/cockpit/federation_routes.py b/src/psyc/cockpit/federation_routes.py index d402f38..b7d5f8c 100644 --- a/src/psyc/cockpit/federation_routes.py +++ b/src/psyc/cockpit/federation_routes.py @@ -15,7 +15,7 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red from fastapi.templating import Jinja2Templates from psyc import db, log -from psyc.lines import federation +from psyc.lines import discovery, federation _log = log.get(__name__) @@ -25,6 +25,10 @@ _log = log.get(__name__) _FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None} _FEED_TTL = 60.0 +# Mirror the feed cache for the public peers list — same poll-load pattern. +_PUBLIC_PEERS_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None} +_PUBLIC_PEERS_TTL = 60.0 + def _admin_ok(request: Request) -> bool: return bool(request.session.get("admin_ok")) @@ -38,6 +42,14 @@ def _cached_feed() -> Dict[str, Any]: return _FEED_CACHE["payload"] +def _cached_public_peers() -> Any: + now = time.time() + if _PUBLIC_PEERS_CACHE["payload"] is None or (now - _PUBLIC_PEERS_CACHE["ts"]) > _PUBLIC_PEERS_TTL: + _PUBLIC_PEERS_CACHE["payload"] = discovery.public_peer_attestation() + _PUBLIC_PEERS_CACHE["ts"] = now + return _PUBLIC_PEERS_CACHE["payload"] + + def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: """Mount all federation routes onto `app`.""" @@ -122,4 +134,13 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: def federation_feed() -> JSONResponse: return JSONResponse(_cached_feed()) + @app.get("/federation/peers/public") + def federation_peers_public() -> JSONResponse: + """Publicly attested peer list — what other psyc walkers discover us through. + + Only trusted peers leak; unknown + blocked are internal state and must + never appear here. + """ + return JSONResponse(_cached_public_peers()) + _log.info("federation.routes.registered") diff --git a/src/psyc/lines/discovery.py b/src/psyc/lines/discovery.py new file mode 100644 index 0000000..bb75c4d --- /dev/null +++ b/src/psyc/lines/discovery.py @@ -0,0 +1,265 @@ +"""Discovery — DNS-SD resolver + HTTP probes for internet-wide federation. + +Federation identity (psyc.lines.federation) gives every node a stable Ed25519 +keypair, a 32-hex fingerprint, and a DNS record format. This module is the +*finder*: given a domain you suspect runs psyc, walk its DNS-SD records to +learn the fingerprint and reach its public federation endpoints. + +The BFS peer-walker that recurses across `/federation/peers/public` lives in +the same module and lands in the next commit. Newly-discovered peers always +enter the `peers` table with status="unknown" — they do NOT become trusted by +being seen; vouching is a separate concern. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +import dns.exception +import dns.rdatatype +import dns.resolver +import httpx +from pydantic import BaseModel, Field + +from psyc import db, log +from psyc.lines import federation +from psyc.result import Err, Ok, Result + + +_log = log.get(__name__) + +DEFAULT_TIMEOUT = 5.0 +DEFAULT_PORT = 443 + +_VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"} + + +# ---------- candidate model -------------------------------------------------- + +class PeerCandidate(BaseModel): + """A peer found by the resolver/walker — not yet vetted, just observed.""" + domain: str + fingerprint: str + port: int = DEFAULT_PORT + source: str # "dns-sd" | "peer-walk:" + discovered_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +# ---------- DNS-SD resolver -------------------------------------------------- + +def _parse_txt_value(txt: str) -> Result[Dict[str, str], str]: + """Parse `v=psyc1 fp= alg=ed25519 path=...` → dict. Tolerant of order.""" + out: Dict[str, str] = {} + for token in txt.strip().split(): + if "=" not in token: + return Err(f"malformed TXT token (no '='): {token!r}") + k, v = token.split("=", 1) + out[k.strip()] = v.strip() + if out.get("v") != federation.FEED_VERSION: + return Err(f"unsupported version: {out.get('v')!r}") + fp = out.get("fp", "") + if len(fp) != 32 or any(c not in "0123456789abcdef" for c in fp.lower()): + return Err(f"bad fingerprint: {fp!r}") + if out.get("alg") and out["alg"] != federation.FEED_ALG: + return Err(f"unsupported alg: {out.get('alg')!r}") + return Ok(out) + + +def _flatten_txt(rdata: Any) -> str: + """DNS TXT records are a sequence of byte-strings — join them. Tolerant of mocks.""" + strings = getattr(rdata, "strings", None) + if strings is None: + return str(rdata).strip('"') + parts: List[str] = [] + for s in strings: + if isinstance(s, bytes): + parts.append(s.decode("utf-8", errors="replace")) + else: + parts.append(str(s)) + return "".join(parts) + + +def resolve_psyc(domain: str, timeout: float = DEFAULT_TIMEOUT) -> Result[PeerCandidate, str]: + """Look up `_psyc._tcp.` SRV + TXT → return a candidate. + + SRV gives target+port; TXT carries `v=psyc1 fp=... alg=ed25519 path=...`. + Any DNS failure, parse failure, or missing record returns Err. + """ + name = f"_psyc._tcp.{domain}" + resolver = dns.resolver.Resolver() + resolver.lifetime = timeout + resolver.timeout = timeout + + # SRV: target host + port. + port = DEFAULT_PORT + try: + srv_answers = resolver.resolve(name, "SRV") + except dns.resolver.NXDOMAIN: + return Err(f"no SRV record at {name} (NXDOMAIN)") + except dns.resolver.NoAnswer: + return Err(f"no SRV record at {name} (NoAnswer)") + except dns.exception.Timeout: + return Err(f"SRV lookup timed out for {name}") + except dns.exception.DNSException as exc: + return Err(f"SRV lookup failed for {name}: {exc}") + srv_list = list(srv_answers) + if not srv_list: + return Err(f"no SRV record at {name} (empty)") + try: + port = int(srv_list[0].port) + except Exception: + port = DEFAULT_PORT + + # TXT: fingerprint + protocol metadata. + try: + txt_answers = resolver.resolve(name, "TXT") + except dns.resolver.NXDOMAIN: + return Err(f"no TXT record at {name} (NXDOMAIN)") + except dns.resolver.NoAnswer: + return Err(f"no TXT record at {name} (NoAnswer)") + except dns.exception.Timeout: + return Err(f"TXT lookup timed out for {name}") + except dns.exception.DNSException as exc: + return Err(f"TXT lookup failed for {name}: {exc}") + txt_list = list(txt_answers) + if not txt_list: + return Err(f"no TXT record at {name} (empty)") + + last_err: Optional[str] = None + for rdata in txt_list: + value = _flatten_txt(rdata) + parsed = _parse_txt_value(value) + if isinstance(parsed, Ok): + return Ok(PeerCandidate( + domain=domain, + fingerprint=parsed.value["fp"].lower(), + port=port, + source="dns-sd", + )) + last_err = parsed.reason + return Err(f"no usable psyc TXT record at {name}: {last_err}") + + +# ---------- HTTP probes ------------------------------------------------------ + +def _base_url(domain: str, port: int) -> str: + if port == 443: + return f"https://{domain}" + return f"https://{domain}:{port}" + + +def fetch_peer_info(domain: str, port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT, + expected_fingerprint: Optional[str] = None) -> Result[Dict[str, Any], str]: + """GET /federation/info on a peer. Cross-checks fingerprint if provided. + + The cross-check defends against MITM-injected TXT records — DNS said one + fingerprint, the live node's HTTPS-served info MUST agree. + """ + url = _base_url(domain, port) + "/federation/info" + try: + with httpx.Client(timeout=timeout) as client: + r = client.get(url) + r.raise_for_status() + data = r.json() + except httpx.HTTPStatusError as exc: + return Err(f"HTTP {exc.response.status_code} from {url}") + except httpx.RequestError as exc: + return Err(f"network error fetching {url}: {exc}") + except ValueError as exc: + return Err(f"non-JSON response from {url}: {exc}") + except Exception as exc: # noqa: BLE001 — anything weird is a failure + return Err(f"fetch failed for {url}: {exc}") + if not isinstance(data, dict): + return Err(f"unexpected info shape from {url}: {type(data).__name__}") + declared = str(data.get("fingerprint", "")).lower() + if expected_fingerprint and declared != expected_fingerprint.lower(): + return Err( + f"fingerprint mismatch for {domain}: DNS said {expected_fingerprint!r} " + f"but /federation/info said {declared!r}" + ) + return Ok(data) + + +def fetch_public_peers(domain: str, port: int = DEFAULT_PORT, + timeout: float = DEFAULT_TIMEOUT) -> Result[List[Dict[str, Any]], str]: + """GET /federation/peers/public on a peer. Returns the list as-is for the walker to dedupe.""" + url = _base_url(domain, port) + "/federation/peers/public" + try: + with httpx.Client(timeout=timeout) as client: + r = client.get(url) + r.raise_for_status() + data = r.json() + except httpx.HTTPStatusError as exc: + return Err(f"HTTP {exc.response.status_code} from {url}") + except httpx.RequestError as exc: + return Err(f"network error fetching {url}: {exc}") + except ValueError as exc: + return Err(f"non-JSON response from {url}: {exc}") + except Exception as exc: # noqa: BLE001 + return Err(f"fetch failed for {url}: {exc}") + if not isinstance(data, list): + return Err(f"unexpected peers shape from {url}: {type(data).__name__}") + out: List[Dict[str, Any]] = [] + for item in data: + if isinstance(item, dict) and item.get("domain") and item.get("fingerprint"): + out.append(item) + return Ok(out) + + +# ---------- persistence ------------------------------------------------------ + +def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None: + """Upsert a discovered candidate into the peers table. + + Preserves any existing trusted/blocked status — discovery NEVER demotes a + peer the operator has already classified. Only updates last_seen. + """ + if default_status not in _VALID_STATUSES: + default_status = "unknown" + existing = db.get_peer(c.domain) + now = c.discovered_at.isoformat() + if existing: + status = existing.get("status") or default_status + if status not in ("trusted", "blocked"): + status = default_status + db.upsert_peer(dict( + domain=c.domain, + fingerprint=existing.get("fingerprint") or c.fingerprint, + pubkey_pem=existing.get("pubkey_pem") or "", + status=status, + discovered_at=existing.get("discovered_at") or now, + last_seen=now, + notes=existing.get("notes"), + )) + return + db.upsert_peer(dict( + domain=c.domain, + fingerprint=c.fingerprint, + pubkey_pem="", # populated when we successfully fetch /federation/key during vouching + status=default_status, + discovered_at=now, + last_seen=now, + notes=f"discovered via {c.source}", + )) + _log.info("discovery.recorded", domain=c.domain, fp=c.fingerprint, source=c.source) + + +# ---------- public attestation ----------------------------------------------- + +def public_peer_attestation() -> List[Dict[str, Any]]: + """List of peers we'll publicly attest to. Only `trusted` — never leaks unknown/blocked. + + This is the surface that other psyc nodes' walkers read from us. We never + expose unknown or blocked peers — those are internal classification state. + """ + out: List[Dict[str, Any]] = [] + for row in db.list_peers(): + if row.get("status") != "trusted": + continue + out.append({ + "domain": row["domain"], + "fingerprint": row["fingerprint"], + "first_seen": row.get("discovered_at"), + }) + return out