stage-disc-a discovery: dnssd resolver + public peers endpoint

This commit is contained in:
m17hr1l
2026-06-06 21:03:33 +02:00
parent 1675a2326e
commit de6204819b
3 changed files with 288 additions and 1 deletions

View File

@@ -23,6 +23,7 @@ dependencies = [
"pyotp>=2.9", "pyotp>=2.9",
"qrcode[pil]>=7.4", "qrcode[pil]>=7.4",
"itsdangerous>=2.1", "itsdangerous>=2.1",
"dnspython>=2.4",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -15,7 +15,7 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from psyc import db, log from psyc import db, log
from psyc.lines import federation from psyc.lines import discovery, federation
_log = log.get(__name__) _log = log.get(__name__)
@@ -25,6 +25,10 @@ _log = log.get(__name__)
_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None} _FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_FEED_TTL = 60.0 _FEED_TTL = 60.0
# Mirror the feed cache for the public peers list — same poll-load pattern.
_PUBLIC_PEERS_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_PUBLIC_PEERS_TTL = 60.0
def _admin_ok(request: Request) -> bool: def _admin_ok(request: Request) -> bool:
return bool(request.session.get("admin_ok")) return bool(request.session.get("admin_ok"))
@@ -38,6 +42,14 @@ def _cached_feed() -> Dict[str, Any]:
return _FEED_CACHE["payload"] return _FEED_CACHE["payload"]
def _cached_public_peers() -> Any:
now = time.time()
if _PUBLIC_PEERS_CACHE["payload"] is None or (now - _PUBLIC_PEERS_CACHE["ts"]) > _PUBLIC_PEERS_TTL:
_PUBLIC_PEERS_CACHE["payload"] = discovery.public_peer_attestation()
_PUBLIC_PEERS_CACHE["ts"] = now
return _PUBLIC_PEERS_CACHE["payload"]
def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
"""Mount all federation routes onto `app`.""" """Mount all federation routes onto `app`."""
@@ -122,4 +134,13 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
def federation_feed() -> JSONResponse: def federation_feed() -> JSONResponse:
return JSONResponse(_cached_feed()) return JSONResponse(_cached_feed())
@app.get("/federation/peers/public")
def federation_peers_public() -> JSONResponse:
"""Publicly attested peer list — what other psyc walkers discover us through.
Only trusted peers leak; unknown + blocked are internal state and must
never appear here.
"""
return JSONResponse(_cached_public_peers())
_log.info("federation.routes.registered") _log.info("federation.routes.registered")

265
src/psyc/lines/discovery.py Normal file
View File

@@ -0,0 +1,265 @@
"""Discovery — DNS-SD resolver + HTTP probes for internet-wide federation.
Federation identity (psyc.lines.federation) gives every node a stable Ed25519
keypair, a 32-hex fingerprint, and a DNS record format. This module is the
*finder*: given a domain you suspect runs psyc, walk its DNS-SD records to
learn the fingerprint and reach its public federation endpoints.
The BFS peer-walker that recurses across `/federation/peers/public` lives in
the same module and lands in the next commit. Newly-discovered peers always
enter the `peers` table with status="unknown" — they do NOT become trusted by
being seen; vouching is a separate concern.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import dns.exception
import dns.rdatatype
import dns.resolver
import httpx
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
DEFAULT_TIMEOUT = 5.0
DEFAULT_PORT = 443
_VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"}
# ---------- candidate model --------------------------------------------------
class PeerCandidate(BaseModel):
"""A peer found by the resolver/walker — not yet vetted, just observed."""
domain: str
fingerprint: str
port: int = DEFAULT_PORT
source: str # "dns-sd" | "peer-walk:<source-domain>"
discovered_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# ---------- DNS-SD resolver --------------------------------------------------
def _parse_txt_value(txt: str) -> Result[Dict[str, str], str]:
"""Parse `v=psyc1 fp=<hex> alg=ed25519 path=...` → dict. Tolerant of order."""
out: Dict[str, str] = {}
for token in txt.strip().split():
if "=" not in token:
return Err(f"malformed TXT token (no '='): {token!r}")
k, v = token.split("=", 1)
out[k.strip()] = v.strip()
if out.get("v") != federation.FEED_VERSION:
return Err(f"unsupported version: {out.get('v')!r}")
fp = out.get("fp", "")
if len(fp) != 32 or any(c not in "0123456789abcdef" for c in fp.lower()):
return Err(f"bad fingerprint: {fp!r}")
if out.get("alg") and out["alg"] != federation.FEED_ALG:
return Err(f"unsupported alg: {out.get('alg')!r}")
return Ok(out)
def _flatten_txt(rdata: Any) -> str:
"""DNS TXT records are a sequence of byte-strings — join them. Tolerant of mocks."""
strings = getattr(rdata, "strings", None)
if strings is None:
return str(rdata).strip('"')
parts: List[str] = []
for s in strings:
if isinstance(s, bytes):
parts.append(s.decode("utf-8", errors="replace"))
else:
parts.append(str(s))
return "".join(parts)
def resolve_psyc(domain: str, timeout: float = DEFAULT_TIMEOUT) -> Result[PeerCandidate, str]:
"""Look up `_psyc._tcp.<domain>` SRV + TXT → return a candidate.
SRV gives target+port; TXT carries `v=psyc1 fp=... alg=ed25519 path=...`.
Any DNS failure, parse failure, or missing record returns Err.
"""
name = f"_psyc._tcp.{domain}"
resolver = dns.resolver.Resolver()
resolver.lifetime = timeout
resolver.timeout = timeout
# SRV: target host + port.
port = DEFAULT_PORT
try:
srv_answers = resolver.resolve(name, "SRV")
except dns.resolver.NXDOMAIN:
return Err(f"no SRV record at {name} (NXDOMAIN)")
except dns.resolver.NoAnswer:
return Err(f"no SRV record at {name} (NoAnswer)")
except dns.exception.Timeout:
return Err(f"SRV lookup timed out for {name}")
except dns.exception.DNSException as exc:
return Err(f"SRV lookup failed for {name}: {exc}")
srv_list = list(srv_answers)
if not srv_list:
return Err(f"no SRV record at {name} (empty)")
try:
port = int(srv_list[0].port)
except Exception:
port = DEFAULT_PORT
# TXT: fingerprint + protocol metadata.
try:
txt_answers = resolver.resolve(name, "TXT")
except dns.resolver.NXDOMAIN:
return Err(f"no TXT record at {name} (NXDOMAIN)")
except dns.resolver.NoAnswer:
return Err(f"no TXT record at {name} (NoAnswer)")
except dns.exception.Timeout:
return Err(f"TXT lookup timed out for {name}")
except dns.exception.DNSException as exc:
return Err(f"TXT lookup failed for {name}: {exc}")
txt_list = list(txt_answers)
if not txt_list:
return Err(f"no TXT record at {name} (empty)")
last_err: Optional[str] = None
for rdata in txt_list:
value = _flatten_txt(rdata)
parsed = _parse_txt_value(value)
if isinstance(parsed, Ok):
return Ok(PeerCandidate(
domain=domain,
fingerprint=parsed.value["fp"].lower(),
port=port,
source="dns-sd",
))
last_err = parsed.reason
return Err(f"no usable psyc TXT record at {name}: {last_err}")
# ---------- HTTP probes ------------------------------------------------------
def _base_url(domain: str, port: int) -> str:
if port == 443:
return f"https://{domain}"
return f"https://{domain}:{port}"
def fetch_peer_info(domain: str, port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT,
expected_fingerprint: Optional[str] = None) -> Result[Dict[str, Any], str]:
"""GET /federation/info on a peer. Cross-checks fingerprint if provided.
The cross-check defends against MITM-injected TXT records — DNS said one
fingerprint, the live node's HTTPS-served info MUST agree.
"""
url = _base_url(domain, port) + "/federation/info"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except httpx.HTTPStatusError as exc:
return Err(f"HTTP {exc.response.status_code} from {url}")
except httpx.RequestError as exc:
return Err(f"network error fetching {url}: {exc}")
except ValueError as exc:
return Err(f"non-JSON response from {url}: {exc}")
except Exception as exc: # noqa: BLE001 — anything weird is a failure
return Err(f"fetch failed for {url}: {exc}")
if not isinstance(data, dict):
return Err(f"unexpected info shape from {url}: {type(data).__name__}")
declared = str(data.get("fingerprint", "")).lower()
if expected_fingerprint and declared != expected_fingerprint.lower():
return Err(
f"fingerprint mismatch for {domain}: DNS said {expected_fingerprint!r} "
f"but /federation/info said {declared!r}"
)
return Ok(data)
def fetch_public_peers(domain: str, port: int = DEFAULT_PORT,
timeout: float = DEFAULT_TIMEOUT) -> Result[List[Dict[str, Any]], str]:
"""GET /federation/peers/public on a peer. Returns the list as-is for the walker to dedupe."""
url = _base_url(domain, port) + "/federation/peers/public"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except httpx.HTTPStatusError as exc:
return Err(f"HTTP {exc.response.status_code} from {url}")
except httpx.RequestError as exc:
return Err(f"network error fetching {url}: {exc}")
except ValueError as exc:
return Err(f"non-JSON response from {url}: {exc}")
except Exception as exc: # noqa: BLE001
return Err(f"fetch failed for {url}: {exc}")
if not isinstance(data, list):
return Err(f"unexpected peers shape from {url}: {type(data).__name__}")
out: List[Dict[str, Any]] = []
for item in data:
if isinstance(item, dict) and item.get("domain") and item.get("fingerprint"):
out.append(item)
return Ok(out)
# ---------- persistence ------------------------------------------------------
def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None:
"""Upsert a discovered candidate into the peers table.
Preserves any existing trusted/blocked status — discovery NEVER demotes a
peer the operator has already classified. Only updates last_seen.
"""
if default_status not in _VALID_STATUSES:
default_status = "unknown"
existing = db.get_peer(c.domain)
now = c.discovered_at.isoformat()
if existing:
status = existing.get("status") or default_status
if status not in ("trusted", "blocked"):
status = default_status
db.upsert_peer(dict(
domain=c.domain,
fingerprint=existing.get("fingerprint") or c.fingerprint,
pubkey_pem=existing.get("pubkey_pem") or "",
status=status,
discovered_at=existing.get("discovered_at") or now,
last_seen=now,
notes=existing.get("notes"),
))
return
db.upsert_peer(dict(
domain=c.domain,
fingerprint=c.fingerprint,
pubkey_pem="", # populated when we successfully fetch /federation/key during vouching
status=default_status,
discovered_at=now,
last_seen=now,
notes=f"discovered via {c.source}",
))
_log.info("discovery.recorded", domain=c.domain, fp=c.fingerprint, source=c.source)
# ---------- public attestation -----------------------------------------------
def public_peer_attestation() -> List[Dict[str, Any]]:
"""List of peers we'll publicly attest to. Only `trusted` — never leaks unknown/blocked.
This is the surface that other psyc nodes' walkers read from us. We never
expose unknown or blocked peers — those are internal classification state.
"""
out: List[Dict[str, Any]] = []
for row in db.list_peers():
if row.get("status") != "trusted":
continue
out.append({
"domain": row["domain"],
"fingerprint": row["fingerprint"],
"first_seen": row.get("discovered_at"),
})
return out