diff --git a/pyproject.toml b/pyproject.toml index 6a4f932..287e2d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "pyotp>=2.9", "qrcode[pil]>=7.4", "itsdangerous>=2.1", + "dnspython>=2.4", ] [project.optional-dependencies] diff --git a/src/psyc/_federation_cli.py b/src/psyc/_federation_cli.py index e01bbe6..2d40b8c 100644 --- a/src/psyc/_federation_cli.py +++ b/src/psyc/_federation_cli.py @@ -7,14 +7,14 @@ from __future__ import annotations import json from pathlib import Path -from typing import Optional +from typing import List, Optional import httpx import typer from psyc import db, log -from psyc.lines import federation -from psyc.result import Err +from psyc.lines import discovery, federation, pulse +from psyc.result import Err, Ok _log = log.get(__name__) @@ -133,3 +133,80 @@ def register(typer_app: typer.Typer) -> None: db.init_db() federation.remove_peer(domain) typer.echo(f"removed {domain}") + + # ---------- discovery (DNS-SD walker) ---------------------------------- + + @typer_app.command("fed-resolve") + def fed_resolve( + domain: str = typer.Argument(..., help="domain to look up via _psyc._tcp."), + timeout: float = typer.Option(5.0, "--timeout", help="DNS lookup timeout, seconds"), + ) -> None: + """Resolve a domain's psyc DNS-SD record. Prints fingerprint + port.""" + result = discovery.resolve_psyc(domain, timeout=timeout) + if isinstance(result, Err): + typer.echo(f"error: {result.reason}", err=True) + raise typer.Exit(1) + c = result.value + typer.echo(f" domain {c.domain}") + typer.echo(f" fingerprint {c.fingerprint}") + typer.echo(f" port {c.port}") + typer.echo(f" source {c.source}") + + @typer_app.command("fed-walk") + def fed_walk( + seeds: List[str] = typer.Argument(..., help="one or more seed domains"), + depth: int = typer.Option(2, "--depth", help="max BFS depth"), + max_peers: int = typer.Option(200, "--max-peers", help="cap discovered candidates"), + record: bool = typer.Option(False, "--record", help="persist candidates as status=unknown"), + ) -> None: + """Walk DNS-SD + peer-public from `seeds`. Prints discovered table.""" + db.init_db() + cands = discovery.walk(seeds, max_depth=depth, max_peers=max_peers) + if not cands: + typer.echo("(no candidates discovered)") + return + typer.echo(f"{'domain':<32} {'fingerprint':<24} {'port':>5} source") + for c in cands: + fp = f"{c.fingerprint[:8]}…{c.fingerprint[-8:]}" + typer.echo(f"{c.domain:<32} {fp:<24} {c.port:>5} {c.source}") + if record: + for c in cands: + discovery.record_candidate(c) + typer.echo(f"recorded {len(cands)} candidate(s) into peers table") + + @typer_app.command("fed-seeds-list") + def fed_seeds_list() -> None: + """Print the operator-curated discovery seed list.""" + db.init_db() + seeds = pulse.get_discovery_seeds() + if not seeds: + typer.echo("(no seeds configured)") + return + for s in seeds: + typer.echo(s) + + @typer_app.command("fed-seeds-add") + def fed_seeds_add(domain: str = typer.Argument(...)) -> None: + """Append a seed domain (no-op if already present).""" + db.init_db() + seeds = pulse.get_discovery_seeds() + d = domain.strip() + if d in seeds: + typer.echo(f"{d} already a seed") + return + seeds.append(d) + pulse.set_discovery_seeds(seeds) + typer.echo(f"added seed {d}") + + @typer_app.command("fed-seeds-remove") + def fed_seeds_remove(domain: str = typer.Argument(...)) -> None: + """Remove a seed domain (no-op if absent).""" + db.init_db() + seeds = pulse.get_discovery_seeds() + d = domain.strip() + if d not in seeds: + typer.echo(f"{d} not in seeds") + return + seeds = [s for s in seeds if s != d] + pulse.set_discovery_seeds(seeds) + typer.echo(f"removed seed {d}") diff --git a/src/psyc/cockpit/federation_routes.py b/src/psyc/cockpit/federation_routes.py index d402f38..ba7e4c2 100644 --- a/src/psyc/cockpit/federation_routes.py +++ b/src/psyc/cockpit/federation_routes.py @@ -15,7 +15,7 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red from fastapi.templating import Jinja2Templates from psyc import db, log -from psyc.lines import federation +from psyc.lines import discovery, federation, pulse _log = log.get(__name__) @@ -25,6 +25,10 @@ _log = log.get(__name__) _FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None} _FEED_TTL = 60.0 +# Mirror the feed cache for the public peers list — same poll-load pattern. +_PUBLIC_PEERS_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None} +_PUBLIC_PEERS_TTL = 60.0 + def _admin_ok(request: Request) -> bool: return bool(request.session.get("admin_ok")) @@ -38,6 +42,14 @@ def _cached_feed() -> Dict[str, Any]: return _FEED_CACHE["payload"] +def _cached_public_peers() -> Any: + now = time.time() + if _PUBLIC_PEERS_CACHE["payload"] is None or (now - _PUBLIC_PEERS_CACHE["ts"]) > _PUBLIC_PEERS_TTL: + _PUBLIC_PEERS_CACHE["payload"] = discovery.public_peer_attestation() + _PUBLIC_PEERS_CACHE["ts"] = now + return _PUBLIC_PEERS_CACHE["payload"] + + def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: """Mount all federation routes onto `app`.""" @@ -103,6 +115,54 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: federation.remove_peer(domain) return RedirectResponse("/admin/federation", status_code=303) + # ---------- discovery (DNS-SD walker) ---------------------------- + + @app.get("/admin/federation/discovery", response_class=HTMLResponse) + def admin_discovery(request: Request) -> HTMLResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + seeds = pulse.get_discovery_seeds() + candidates = federation.list_peers() + flash = request.query_params.get("flash") or "" + return TEMPLATES.TemplateResponse( + request, + "admin_discovery.html", + { + "seeds": seeds, + "seeds_text": "\n".join(seeds), + "candidates": candidates, + "flash": flash, + }, + ) + + @app.post("/admin/federation/discovery/seeds") + def admin_discovery_seeds( + request: Request, + seeds: str = Form(""), + ) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + lines = [line for line in seeds.splitlines()] + pulse.set_discovery_seeds(lines) + return RedirectResponse("/admin/federation/discovery?flash=seeds+saved", status_code=303) + + @app.post("/admin/federation/discovery/walk") + def admin_discovery_walk(request: Request) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + seeds = pulse.get_discovery_seeds() + if not seeds: + return RedirectResponse("/admin/federation/discovery?flash=no+seeds+configured", status_code=303) + try: + cands = discovery.walk(seeds) + for c in cands: + discovery.record_candidate(c) + msg = f"discovered+{len(cands)}+candidates+from+{len(seeds)}+seed(s)" + except Exception as exc: # noqa: BLE001 — surface the error to the operator + _log.warning("federation.discovery.walk.error", error=str(exc)) + msg = f"walk+failed:+{str(exc)[:80]}" + return RedirectResponse(f"/admin/federation/discovery?flash={msg}", status_code=303) + # ---------- public endpoints -------------------------------------- @app.get("/federation/info") @@ -122,4 +182,13 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None: def federation_feed() -> JSONResponse: return JSONResponse(_cached_feed()) + @app.get("/federation/peers/public") + def federation_peers_public() -> JSONResponse: + """Publicly attested peer list — what other psyc walkers discover us through. + + Only trusted peers leak; unknown + blocked are internal state and must + never appear here. + """ + return JSONResponse(_cached_public_peers()) + _log.info("federation.routes.registered") diff --git a/src/psyc/cockpit/templates/admin_discovery.html b/src/psyc/cockpit/templates/admin_discovery.html new file mode 100644 index 0000000..40e4683 --- /dev/null +++ b/src/psyc/cockpit/templates/admin_discovery.html @@ -0,0 +1,72 @@ +{% extends "base.html" %} +{% block title %}Discovery — psyc admin{% endblock %} +{% block content %} + +
+
+

Peer discovery

+ {{ candidates|length }} candidate{{ '' if candidates|length == 1 else 's' }} +
+

Walk DNS-SD records from a seed domain you know runs psyc, then recurse through its public peer list. Newly-found peers land here with status unknown — vouching is what eventually promotes them. Once seeds exist, enabling the peer-pull pulse pipeline runs this on a cadence.

+

← back to federation

+ + {% if flash %} +
{{ flash }}
+ {% endif %} +
+ +
+
+

Seed domains

+ {{ seeds|length }} configured +
+

One domain per line. Each seed is resolved via _psyc._tcp.<domain> SRV+TXT; its /federation/peers/public is fetched and recursed.

+
+ +
+ +
+
+ +
+ +
+
+ +
+
+

Recent candidates

+ {{ candidates|length }} in registry +
+

Every peer the walker has ever found, newest first. Trusted/blocked statuses are preserved across re-walks — discovery never demotes a peer the operator has classified.

+ + {% if candidates %} + + + + {% for p in candidates %} + + + + + + + + + {% endfor %} + +
DomainFingerprintStatusDiscoveredLast seenNotes
{{ p.domain }}{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }} + {% if p.status == 'trusted' %} + trusted + {% elif p.status == 'blocked' %} + blocked + {% else %} + {{ p.status }} + {% endif %} + {{ (p.discovered_at or '')[:16] | replace('T', ' ') }}{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }}{{ (p.notes or '')[:60] }}
+ {% else %} +

(no candidates yet — add a seed above and walk)

+ {% endif %} +
+ +{% endblock %} diff --git a/src/psyc/db.py b/src/psyc/db.py index 5d19acb..cd0a6e5 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -313,6 +313,25 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None: conn.execute(stmt) +def pulse_setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]: + """Fetch one row from pulse_settings by key; returns None if absent.""" + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return str(row.value) if row else None + + +def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: + """Upsert one (key, value) into pulse_settings.""" + stmt = sqlite_insert(pulse_settings).values(key=key, value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + # ---------- federation: peers + signal buffer ---------------------------- def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: diff --git a/src/psyc/lines/discovery.py b/src/psyc/lines/discovery.py new file mode 100644 index 0000000..e74969c --- /dev/null +++ b/src/psyc/lines/discovery.py @@ -0,0 +1,357 @@ +"""Discovery — DNS-SD resolver + BFS peer-walker for internet-wide federation. + +Federation identity (psyc.lines.federation) gives every node a stable Ed25519 +keypair, a 32-hex fingerprint, and a DNS record format. This module is the +*finder*: given a seed domain you suspect runs psyc, walk its DNS-SD records +to learn the fingerprint, fetch its public peer list, and recurse. + +Newly-discovered peers always enter the `peers` table with status="unknown" — +they do NOT become trusted by being seen; vouching is a separate concern +(sibling module). Discovery only populates the candidate set. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set, Tuple + +import dns.exception +import dns.rdatatype +import dns.resolver +import httpx +from pydantic import BaseModel, Field + +from psyc import db, log +from psyc.lines import federation +from psyc.result import Err, Ok, Result + + +_log = log.get(__name__) + +DEFAULT_TIMEOUT = 5.0 +DEFAULT_PORT = 443 +DEFAULT_MAX_DEPTH = 2 +DEFAULT_MAX_PEERS = 200 + +_VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"} + + +# ---------- candidate model -------------------------------------------------- + +class PeerCandidate(BaseModel): + """A peer found by the resolver/walker — not yet vetted, just observed.""" + domain: str + fingerprint: str + port: int = DEFAULT_PORT + source: str # "dns-sd" | "peer-walk:" + discovered_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +# ---------- DNS-SD resolver -------------------------------------------------- + +def _parse_txt_value(txt: str) -> Result[Dict[str, str], str]: + """Parse `v=psyc1 fp= alg=ed25519 path=...` → dict. Tolerant of order.""" + out: Dict[str, str] = {} + for token in txt.strip().split(): + if "=" not in token: + return Err(f"malformed TXT token (no '='): {token!r}") + k, v = token.split("=", 1) + out[k.strip()] = v.strip() + if out.get("v") != federation.FEED_VERSION: + return Err(f"unsupported version: {out.get('v')!r}") + fp = out.get("fp", "") + if len(fp) != 32 or any(c not in "0123456789abcdef" for c in fp.lower()): + return Err(f"bad fingerprint: {fp!r}") + if out.get("alg") and out["alg"] != federation.FEED_ALG: + return Err(f"unsupported alg: {out.get('alg')!r}") + return Ok(out) + + +def _flatten_txt(rdata: Any) -> str: + """DNS TXT records are a sequence of byte-strings — join them. Tolerant of mocks.""" + strings = getattr(rdata, "strings", None) + if strings is None: + return str(rdata).strip('"') + parts: List[str] = [] + for s in strings: + if isinstance(s, bytes): + parts.append(s.decode("utf-8", errors="replace")) + else: + parts.append(str(s)) + return "".join(parts) + + +def resolve_psyc(domain: str, timeout: float = DEFAULT_TIMEOUT) -> Result[PeerCandidate, str]: + """Look up `_psyc._tcp.` SRV + TXT → return a candidate. + + SRV gives target+port; TXT carries `v=psyc1 fp=... alg=ed25519 path=...`. + Any DNS failure, parse failure, or missing record returns Err. + """ + name = f"_psyc._tcp.{domain}" + resolver = dns.resolver.Resolver() + resolver.lifetime = timeout + resolver.timeout = timeout + + # SRV: target host + port. + port = DEFAULT_PORT + try: + srv_answers = resolver.resolve(name, "SRV") + except dns.resolver.NXDOMAIN: + return Err(f"no SRV record at {name} (NXDOMAIN)") + except dns.resolver.NoAnswer: + return Err(f"no SRV record at {name} (NoAnswer)") + except dns.exception.Timeout: + return Err(f"SRV lookup timed out for {name}") + except dns.exception.DNSException as exc: + return Err(f"SRV lookup failed for {name}: {exc}") + srv_list = list(srv_answers) + if not srv_list: + return Err(f"no SRV record at {name} (empty)") + try: + port = int(srv_list[0].port) + except Exception: + port = DEFAULT_PORT + + # TXT: fingerprint + protocol metadata. + try: + txt_answers = resolver.resolve(name, "TXT") + except dns.resolver.NXDOMAIN: + return Err(f"no TXT record at {name} (NXDOMAIN)") + except dns.resolver.NoAnswer: + return Err(f"no TXT record at {name} (NoAnswer)") + except dns.exception.Timeout: + return Err(f"TXT lookup timed out for {name}") + except dns.exception.DNSException as exc: + return Err(f"TXT lookup failed for {name}: {exc}") + txt_list = list(txt_answers) + if not txt_list: + return Err(f"no TXT record at {name} (empty)") + + last_err: Optional[str] = None + for rdata in txt_list: + value = _flatten_txt(rdata) + parsed = _parse_txt_value(value) + if isinstance(parsed, Ok): + return Ok(PeerCandidate( + domain=domain, + fingerprint=parsed.value["fp"].lower(), + port=port, + source="dns-sd", + )) + last_err = parsed.reason + return Err(f"no usable psyc TXT record at {name}: {last_err}") + + +# ---------- HTTP probes ------------------------------------------------------ + +def _base_url(domain: str, port: int) -> str: + if port == 443: + return f"https://{domain}" + return f"https://{domain}:{port}" + + +def fetch_peer_info(domain: str, port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT, + expected_fingerprint: Optional[str] = None) -> Result[Dict[str, Any], str]: + """GET /federation/info on a peer. Cross-checks fingerprint if provided. + + The cross-check defends against MITM-injected TXT records — DNS said one + fingerprint, the live node's HTTPS-served info MUST agree. + """ + url = _base_url(domain, port) + "/federation/info" + try: + with httpx.Client(timeout=timeout) as client: + r = client.get(url) + r.raise_for_status() + data = r.json() + except httpx.HTTPStatusError as exc: + return Err(f"HTTP {exc.response.status_code} from {url}") + except httpx.RequestError as exc: + return Err(f"network error fetching {url}: {exc}") + except ValueError as exc: + return Err(f"non-JSON response from {url}: {exc}") + except Exception as exc: # noqa: BLE001 — anything weird is a failure + return Err(f"fetch failed for {url}: {exc}") + if not isinstance(data, dict): + return Err(f"unexpected info shape from {url}: {type(data).__name__}") + declared = str(data.get("fingerprint", "")).lower() + if expected_fingerprint and declared != expected_fingerprint.lower(): + return Err( + f"fingerprint mismatch for {domain}: DNS said {expected_fingerprint!r} " + f"but /federation/info said {declared!r}" + ) + return Ok(data) + + +def fetch_public_peers(domain: str, port: int = DEFAULT_PORT, + timeout: float = DEFAULT_TIMEOUT) -> Result[List[Dict[str, Any]], str]: + """GET /federation/peers/public on a peer. Returns the list as-is for the walker to dedupe.""" + url = _base_url(domain, port) + "/federation/peers/public" + try: + with httpx.Client(timeout=timeout) as client: + r = client.get(url) + r.raise_for_status() + data = r.json() + except httpx.HTTPStatusError as exc: + return Err(f"HTTP {exc.response.status_code} from {url}") + except httpx.RequestError as exc: + return Err(f"network error fetching {url}: {exc}") + except ValueError as exc: + return Err(f"non-JSON response from {url}: {exc}") + except Exception as exc: # noqa: BLE001 + return Err(f"fetch failed for {url}: {exc}") + if not isinstance(data, list): + return Err(f"unexpected peers shape from {url}: {type(data).__name__}") + out: List[Dict[str, Any]] = [] + for item in data: + if isinstance(item, dict) and item.get("domain") and item.get("fingerprint"): + out.append(item) + return Ok(out) + + +# ---------- BFS walker ------------------------------------------------------- + +def walk(seeds: List[str], max_depth: int = DEFAULT_MAX_DEPTH, + max_peers: int = DEFAULT_MAX_PEERS, timeout: float = DEFAULT_TIMEOUT) -> List[PeerCandidate]: + """Breadth-first walk from `seeds` → discovered candidates. + + For each domain: DNS-SD resolve → fetch /info to verify fingerprint → + fetch /peers/public → enqueue its domains for the next layer. Dedupes on + (domain, fingerprint). Skips our own fingerprint to avoid loops. All + errors are logged but non-fatal — one bad peer doesn't abort the walk. + """ + own_fp = "" + try: + own_fp = federation.node_fingerprint() + except Exception as exc: # noqa: BLE001 — discovery should work even pre-keygen + _log.info("discovery.walk.no_own_fp", error=str(exc)) + + seen_pairs: Set[Tuple[str, str]] = set() + seen_domains: Set[str] = set() + out: List[PeerCandidate] = [] + + # Queue of (domain, depth, source). Seeds enter at depth 0. + frontier: List[Tuple[str, int, str]] = [(d.strip(), 0, "dns-sd") for d in seeds if d and d.strip()] + next_layer: List[Tuple[str, int, str]] = [] + + while frontier: + domain, depth, source = frontier.pop(0) + if domain in seen_domains: + if not frontier: + frontier, next_layer = next_layer, [] + continue + seen_domains.add(domain) + if len(out) >= max_peers: + _log.info("discovery.walk.cap.peers", cap=max_peers) + break + + resolved = resolve_psyc(domain, timeout=timeout) + if isinstance(resolved, Err): + _log.info("discovery.resolve.skip", domain=domain, reason=resolved.reason) + if not frontier: + frontier, next_layer = next_layer, [] + continue + cand = resolved.value + cand.source = source if depth > 0 else "dns-sd" + + if cand.fingerprint == own_fp: + _log.info("discovery.skip.self", domain=domain) + if not frontier: + frontier, next_layer = next_layer, [] + continue + + pair = (cand.domain, cand.fingerprint) + if pair in seen_pairs: + if not frontier: + frontier, next_layer = next_layer, [] + continue + seen_pairs.add(pair) + + # Verify the live endpoint's fingerprint matches DNS. If we can't reach + # it, still record the DNS-discovered candidate — vouching can vet it + # later, and we don't want one HTTP outage to abort the walk. + info_res = fetch_peer_info(domain, port=cand.port, timeout=timeout, + expected_fingerprint=cand.fingerprint) + if isinstance(info_res, Err): + _log.info("discovery.info.skip", domain=domain, reason=info_res.reason) + out.append(cand) + if not frontier: + frontier, next_layer = next_layer, [] + continue + out.append(cand) + + # Recurse: fetch this peer's public-peers list, enqueue domains. + if depth + 1 <= max_depth: + peers_res = fetch_public_peers(domain, port=cand.port, timeout=timeout) + if isinstance(peers_res, Err): + _log.info("discovery.peers.skip", domain=domain, reason=peers_res.reason) + else: + for item in peers_res.value: + child_domain = str(item.get("domain", "")).strip() + if not child_domain or child_domain in seen_domains: + continue + child_source = f"peer-walk:{domain}" + next_layer.append((child_domain, depth + 1, child_source)) + + if not frontier: + frontier, next_layer = next_layer, [] + + _log.info("discovery.walk.done", seeds=len(seeds), discovered=len(out), max_depth=max_depth) + return out + + +# ---------- persistence ------------------------------------------------------ + +def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None: + """Upsert a discovered candidate into the peers table. + + Preserves any existing trusted/blocked status — discovery NEVER demotes a + peer the operator has already classified. Only updates last_seen. + """ + if default_status not in _VALID_STATUSES: + default_status = "unknown" + existing = db.get_peer(c.domain) + now = c.discovered_at.isoformat() + if existing: + status = existing.get("status") or default_status + if status not in ("trusted", "blocked"): + status = default_status + db.upsert_peer(dict( + domain=c.domain, + fingerprint=existing.get("fingerprint") or c.fingerprint, + pubkey_pem=existing.get("pubkey_pem") or "", + status=status, + discovered_at=existing.get("discovered_at") or now, + last_seen=now, + notes=existing.get("notes"), + )) + return + db.upsert_peer(dict( + domain=c.domain, + fingerprint=c.fingerprint, + pubkey_pem="", # populated when we successfully fetch /federation/key during vouching + status=default_status, + discovered_at=now, + last_seen=now, + notes=f"discovered via {c.source}", + )) + _log.info("discovery.recorded", domain=c.domain, fp=c.fingerprint, source=c.source) + + +# ---------- public attestation ----------------------------------------------- + +def public_peer_attestation() -> List[Dict[str, Any]]: + """List of peers we'll publicly attest to. Only `trusted` — never leaks unknown/blocked. + + This is the surface that other psyc nodes' walkers read from us. We never + expose unknown or blocked peers — those are internal classification state. + """ + out: List[Dict[str, Any]] = [] + for row in db.list_peers(): + if row.get("status") != "trusted": + continue + out.append({ + "domain": row["domain"], + "fingerprint": row["fingerprint"], + "first_seen": row.get("discovered_at"), + }) + return out diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py index 10473ba..20944a8 100644 --- a/src/psyc/lines/pulse.py +++ b/src/psyc/lines/pulse.py @@ -134,8 +134,48 @@ def _run_respond_propose() -> str: return f"proposed {proposed} action(s) for {touched} case(s)" +_DISCOVERY_SEEDS_KEY = "discovery_seeds" + + +def get_discovery_seeds() -> List[str]: + """Operator-curated seed list for the discovery walker. Newline-separated in DB.""" + raw = db.pulse_setting_get(_DISCOVERY_SEEDS_KEY) + if not raw: + return [] + return [line.strip() for line in raw.splitlines() if line.strip()] + + +def set_discovery_seeds(seeds: List[str]) -> None: + """Replace the seed list. Strips blanks + dedupes preserving order.""" + seen: set = set() + cleaned: List[str] = [] + for s in seeds: + v = (s or "").strip() + if not v or v in seen: + continue + seen.add(v) + cleaned.append(v) + db.pulse_setting_set(_DISCOVERY_SEEDS_KEY, "\n".join(cleaned)) + + def _run_peer_pull() -> str: - return "federation not yet active" + """Walk DNS-SD + recurse over peer-public lists from the operator's seeds. + + Records every fresh candidate into the `peers` table with status=unknown. + Vouching (sibling stage) is what eventually promotes them. + """ + from psyc.lines import discovery + + seeds = get_discovery_seeds() + if not seeds: + return "no seeds configured" + candidates = discovery.walk(seeds) + for c in candidates: + try: + discovery.record_candidate(c) + except Exception as exc: # noqa: BLE001 — one bad write must not abort the batch + _log.warning("pulse.peer_pull.record.error", domain=c.domain, error=str(exc)) + return f"discovered {len(candidates)} candidate(s) from {len(seeds)} seed(s)" def _run_vouch_refresh() -> str: diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..46275cf --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,376 @@ +"""Discovery — DNS-SD parse + resolver, BFS walker, persistence, public endpoint.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from unittest.mock import MagicMock, patch + +import dns.exception +import dns.resolver +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from starlette.middleware.sessions import SessionMiddleware + +from psyc import db +from psyc.cockpit import federation_routes +from psyc.lines import discovery, federation, pulse +from psyc.lines.discovery import ( + PeerCandidate, + _parse_txt_value, + fetch_peer_info, + fetch_public_peers, + public_peer_attestation, + record_candidate, + resolve_psyc, + walk, +) +from psyc.result import Err, Ok + + +# ---------- fixtures --------------------------------------------------------- + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +def _mk_srv(port: int = 443) -> Any: + rd = MagicMock() + rd.port = port + return rd + + +def _mk_txt(value: str) -> Any: + rd = MagicMock() + rd.strings = [value.encode("utf-8")] + return rd + + +# ---------- TXT parser ------------------------------------------------------- + +def test_parse_txt_valid(): + txt = "v=psyc1 fp=" + "a" * 32 + " alg=ed25519 path=/federation/feed" + res = _parse_txt_value(txt) + assert isinstance(res, Ok) + assert res.value["fp"] == "a" * 32 + assert res.value["alg"] == "ed25519" + + +def test_parse_txt_tolerates_token_order(): + txt = "path=/federation/feed alg=ed25519 fp=" + "f" * 32 + " v=psyc1" + res = _parse_txt_value(txt) + assert isinstance(res, Ok) + + +def test_parse_txt_rejects_wrong_version(): + txt = "v=psyc2 fp=" + "a" * 32 + " alg=ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + assert "version" in res.reason + + +def test_parse_txt_rejects_bad_fingerprint_length(): + txt = "v=psyc1 fp=deadbeef alg=ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + assert "fingerprint" in res.reason + + +def test_parse_txt_rejects_non_hex_fingerprint(): + txt = "v=psyc1 fp=" + "z" * 32 + " alg=ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + + +def test_parse_txt_rejects_malformed_token(): + txt = "v=psyc1 fp=" + "a" * 32 + " alg ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + assert "malformed" in res.reason + + +def test_parse_txt_rejects_wrong_alg(): + txt = "v=psyc1 fp=" + "a" * 32 + " alg=rsa" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + + +# ---------- resolve_psyc ----------------------------------------------------- + +def test_resolve_psyc_happy_path(): + fp = "1" * 32 + txt = f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed" + + def fake_resolve(self, name, rdtype): + if rdtype == "SRV": + return [_mk_srv(port=8443)] + if rdtype == "TXT": + return [_mk_txt(txt)] + raise dns.exception.DNSException("unexpected") + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("peer.example.com") + assert isinstance(res, Ok) + cand = res.value + assert cand.domain == "peer.example.com" + assert cand.fingerprint == fp + assert cand.port == 8443 + assert cand.source == "dns-sd" + + +def test_resolve_psyc_nxdomain_returns_err(): + def fake_resolve(self, name, rdtype): + raise dns.resolver.NXDOMAIN() + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("nothere.example") + assert isinstance(res, Err) + assert "NXDOMAIN" in res.reason + + +def test_resolve_psyc_txt_malformed_returns_err(): + def fake_resolve(self, name, rdtype): + if rdtype == "SRV": + return [_mk_srv()] + return [_mk_txt("v=psyc1 fp=garbage alg=ed25519")] + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("peer.example") + assert isinstance(res, Err) + assert "TXT" in res.reason or "fingerprint" in res.reason + + +def test_resolve_psyc_no_answer_returns_err(): + def fake_resolve(self, name, rdtype): + raise dns.resolver.NoAnswer() + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("peer.example") + assert isinstance(res, Err) + assert "NoAnswer" in res.reason + + +# ---------- walk ------------------------------------------------------------- + +def _stub_resolve(catalog: Dict[str, str]): + """Build a resolve_psyc stub that returns each domain's catalog fingerprint.""" + def _stub(domain: str, timeout: float = 5.0): + if domain not in catalog: + return Err(f"no record for {domain}") + return Ok(PeerCandidate( + domain=domain, + fingerprint=catalog[domain], + port=443, + source="dns-sd", + )) + return _stub + + +def _stub_fetch_info_ok(*args, **kwargs): + return Ok({"fingerprint": kwargs.get("expected_fingerprint", "")}) + + +def _stub_fetch_peers_factory(graph: Dict[str, List[Dict[str, str]]]): + def _stub(domain: str, port: int = 443, timeout: float = 5.0): + return Ok(graph.get(domain, [])) + return _stub + + +def test_walk_dedupes_by_fingerprint(fresh_db, fed_dir, monkeypatch): + # Two seeds, same fingerprint via different domains → only one survives the (domain,fp) dedupe + # but distinct domains both surface; the (domain, fp) pair just shouldn't repeat. + fp = "9" * 32 + catalog = {"a.example": fp, "b.example": fp} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + out = walk(["a.example", "b.example", "a.example"], max_depth=1) + # both unique domains made it in; the duplicate seed didn't re-enter + assert len(out) == 2 + domains = {c.domain for c in out} + assert domains == {"a.example", "b.example"} + + +def test_walk_respects_max_depth(fresh_db, fed_dir, monkeypatch): + catalog = {"d0.example": "0" * 32, "d1.example": "1" * 32, "d2.example": "2" * 32} + graph = { + "d0.example": [{"domain": "d1.example", "fingerprint": "1" * 32}], + "d1.example": [{"domain": "d2.example", "fingerprint": "2" * 32}], + } + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph)) + out = walk(["d0.example"], max_depth=1) + domains = {c.domain for c in out} + # depth 0: d0; depth 1: d1; depth 2 (d2) is excluded by max_depth=1 + assert "d0.example" in domains and "d1.example" in domains + assert "d2.example" not in domains + + +def test_walk_respects_max_peers(fresh_db, fed_dir, monkeypatch): + catalog = {f"d{i}.example": f"{i:032x}" for i in range(10)} + graph = {"d0.example": [{"domain": f"d{i}.example", "fingerprint": f"{i:032x}"} for i in range(1, 10)]} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph)) + out = walk(["d0.example"], max_depth=2, max_peers=3) + assert len(out) <= 3 + + +def test_walk_skips_own_fingerprint(fresh_db, fed_dir, monkeypatch): + own = federation.node_fingerprint() + catalog = {"self.example": own, "peer.example": "f" * 32} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + out = walk(["self.example", "peer.example"], max_depth=1) + domains = {c.domain for c in out} + assert "self.example" not in domains + assert "peer.example" in domains + + +def test_walk_one_failure_does_not_abort(fresh_db, fed_dir, monkeypatch): + catalog = {"good.example": "a" * 32} # bad.example is absent → Err on resolve + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + out = walk(["bad.example", "good.example"], max_depth=1) + assert len(out) == 1 + assert out[0].domain == "good.example" + + +# ---------- record_candidate ------------------------------------------------- + +def test_record_candidate_inserts_as_unknown(fresh_db): + c = PeerCandidate(domain="new.example", fingerprint="a" * 32, source="dns-sd") + record_candidate(c) + row = db.get_peer("new.example") + assert row is not None + assert row["status"] == "unknown" + assert row["fingerprint"] == "a" * 32 + + +def test_record_candidate_preserves_trusted(fresh_db, fed_dir): + federation.register_peer("vip.example", "b" * 32, "PEM", status="trusted") + # walker re-discovers it + c = PeerCandidate(domain="vip.example", fingerprint="b" * 32, source="peer-walk:other.example") + record_candidate(c) + row = db.get_peer("vip.example") + assert row["status"] == "trusted" + + +def test_record_candidate_preserves_blocked(fresh_db, fed_dir): + federation.register_peer("bad.example", "c" * 32, "PEM", status="blocked") + c = PeerCandidate(domain="bad.example", fingerprint="c" * 32, source="dns-sd") + record_candidate(c) + row = db.get_peer("bad.example") + assert row["status"] == "blocked" + + +def test_record_candidate_updates_last_seen(fresh_db): + c = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd") + record_candidate(c) + first = db.get_peer("repeat.example") + # second pass — last_seen advances, discovered_at stays + c2 = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd") + record_candidate(c2) + second = db.get_peer("repeat.example") + assert second["discovered_at"] == first["discovered_at"] + + +# ---------- public attestation ----------------------------------------------- + +def test_public_peer_attestation_only_trusted(fresh_db, fed_dir): + federation.register_peer("trusted.example", "1" * 32, "PEM", status="trusted") + federation.register_peer("unknown.example", "2" * 32, "PEM", status="unknown") + federation.register_peer("blocked.example", "3" * 32, "PEM", status="blocked") + out = public_peer_attestation() + domains = {p["domain"] for p in out} + assert domains == {"trusted.example"} + + +def test_public_peer_attestation_payload_shape(fresh_db, fed_dir): + federation.register_peer("t.example", "f" * 32, "PEM", status="trusted") + out = public_peer_attestation() + assert len(out) == 1 + entry = out[0] + assert set(entry.keys()) == {"domain", "fingerprint", "first_seen"} + + +# ---------- public endpoint via TestClient ----------------------------------- + +def _mk_app() -> FastAPI: + app = FastAPI() + app.add_middleware(SessionMiddleware, secret_key="test-secret") + # Templates aren't exercised by the public endpoints we care about here. + from fastapi.templating import Jinja2Templates + import tempfile, os + tdir = tempfile.mkdtemp() + templates = Jinja2Templates(directory=tdir) + federation_routes.register(app, templates) + return app + + +def test_public_peers_endpoint_excludes_unknown_blocked(fresh_db, fed_dir): + federation.register_peer("ok.example", "a" * 32, "PEM", status="trusted") + federation.register_peer("rude.example", "b" * 32, "PEM", status="blocked") + federation.register_peer("new.example", "c" * 32, "PEM", status="unknown") + # Flush in-memory cache from any earlier test. + federation_routes._PUBLIC_PEERS_CACHE["payload"] = None + federation_routes._PUBLIC_PEERS_CACHE["ts"] = 0.0 + app = _mk_app() + client = TestClient(app) + r = client.get("/federation/peers/public") + assert r.status_code == 200 + body = r.json() + domains = {p["domain"] for p in body} + assert "ok.example" in domains + assert "rude.example" not in domains + assert "new.example" not in domains + + +# ---------- pulse integration ------------------------------------------------ + +def test_discovery_seeds_roundtrip(fresh_db): + assert pulse.get_discovery_seeds() == [] + pulse.set_discovery_seeds(["a.example", "b.example", "a.example", "", " "]) + # dedupe + strip blanks + assert pulse.get_discovery_seeds() == ["a.example", "b.example"] + + +def test_peer_pull_pipeline_no_seeds(fresh_db, fed_dir, monkeypatch): + # peer-pull runner returns a clean message when nothing's configured. + outcome, result = pulse.run_now("peer-pull") + assert outcome == "ok" + assert "no seeds" in result + + +def test_peer_pull_pipeline_with_seeds(fresh_db, fed_dir, monkeypatch): + pulse.set_discovery_seeds(["good.example"]) + catalog = {"good.example": "e" * 32} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + outcome, result = pulse.run_now("peer-pull") + assert outcome == "ok" + assert "discovered 1" in result + # And it was recorded. + row = db.get_peer("good.example") + assert row is not None + assert row["status"] == "unknown"