diff --git a/src/psyc/lines/discovery.py b/src/psyc/lines/discovery.py index bb75c4d..e74969c 100644 --- a/src/psyc/lines/discovery.py +++ b/src/psyc/lines/discovery.py @@ -1,20 +1,19 @@ -"""Discovery — DNS-SD resolver + HTTP probes for internet-wide federation. +"""Discovery — DNS-SD resolver + BFS peer-walker for internet-wide federation. Federation identity (psyc.lines.federation) gives every node a stable Ed25519 keypair, a 32-hex fingerprint, and a DNS record format. This module is the -*finder*: given a domain you suspect runs psyc, walk its DNS-SD records to -learn the fingerprint and reach its public federation endpoints. +*finder*: given a seed domain you suspect runs psyc, walk its DNS-SD records +to learn the fingerprint, fetch its public peer list, and recurse. -The BFS peer-walker that recurses across `/federation/peers/public` lives in -the same module and lands in the next commit. Newly-discovered peers always -enter the `peers` table with status="unknown" — they do NOT become trusted by -being seen; vouching is a separate concern. +Newly-discovered peers always enter the `peers` table with status="unknown" — +they do NOT become trusted by being seen; vouching is a separate concern +(sibling module). Discovery only populates the candidate set. """ from __future__ import annotations from datetime import datetime, timezone -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Set, Tuple import dns.exception import dns.rdatatype @@ -31,6 +30,8 @@ _log = log.get(__name__) DEFAULT_TIMEOUT = 5.0 DEFAULT_PORT = 443 +DEFAULT_MAX_DEPTH = 2 +DEFAULT_MAX_PEERS = 200 _VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"} @@ -207,6 +208,97 @@ def fetch_public_peers(domain: str, port: int = DEFAULT_PORT, return Ok(out) +# ---------- BFS walker ------------------------------------------------------- + +def walk(seeds: List[str], max_depth: int = DEFAULT_MAX_DEPTH, + max_peers: int = DEFAULT_MAX_PEERS, timeout: float = DEFAULT_TIMEOUT) -> List[PeerCandidate]: + """Breadth-first walk from `seeds` → discovered candidates. + + For each domain: DNS-SD resolve → fetch /info to verify fingerprint → + fetch /peers/public → enqueue its domains for the next layer. Dedupes on + (domain, fingerprint). Skips our own fingerprint to avoid loops. All + errors are logged but non-fatal — one bad peer doesn't abort the walk. + """ + own_fp = "" + try: + own_fp = federation.node_fingerprint() + except Exception as exc: # noqa: BLE001 — discovery should work even pre-keygen + _log.info("discovery.walk.no_own_fp", error=str(exc)) + + seen_pairs: Set[Tuple[str, str]] = set() + seen_domains: Set[str] = set() + out: List[PeerCandidate] = [] + + # Queue of (domain, depth, source). Seeds enter at depth 0. + frontier: List[Tuple[str, int, str]] = [(d.strip(), 0, "dns-sd") for d in seeds if d and d.strip()] + next_layer: List[Tuple[str, int, str]] = [] + + while frontier: + domain, depth, source = frontier.pop(0) + if domain in seen_domains: + if not frontier: + frontier, next_layer = next_layer, [] + continue + seen_domains.add(domain) + if len(out) >= max_peers: + _log.info("discovery.walk.cap.peers", cap=max_peers) + break + + resolved = resolve_psyc(domain, timeout=timeout) + if isinstance(resolved, Err): + _log.info("discovery.resolve.skip", domain=domain, reason=resolved.reason) + if not frontier: + frontier, next_layer = next_layer, [] + continue + cand = resolved.value + cand.source = source if depth > 0 else "dns-sd" + + if cand.fingerprint == own_fp: + _log.info("discovery.skip.self", domain=domain) + if not frontier: + frontier, next_layer = next_layer, [] + continue + + pair = (cand.domain, cand.fingerprint) + if pair in seen_pairs: + if not frontier: + frontier, next_layer = next_layer, [] + continue + seen_pairs.add(pair) + + # Verify the live endpoint's fingerprint matches DNS. If we can't reach + # it, still record the DNS-discovered candidate — vouching can vet it + # later, and we don't want one HTTP outage to abort the walk. + info_res = fetch_peer_info(domain, port=cand.port, timeout=timeout, + expected_fingerprint=cand.fingerprint) + if isinstance(info_res, Err): + _log.info("discovery.info.skip", domain=domain, reason=info_res.reason) + out.append(cand) + if not frontier: + frontier, next_layer = next_layer, [] + continue + out.append(cand) + + # Recurse: fetch this peer's public-peers list, enqueue domains. + if depth + 1 <= max_depth: + peers_res = fetch_public_peers(domain, port=cand.port, timeout=timeout) + if isinstance(peers_res, Err): + _log.info("discovery.peers.skip", domain=domain, reason=peers_res.reason) + else: + for item in peers_res.value: + child_domain = str(item.get("domain", "")).strip() + if not child_domain or child_domain in seen_domains: + continue + child_source = f"peer-walk:{domain}" + next_layer.append((child_domain, depth + 1, child_source)) + + if not frontier: + frontier, next_layer = next_layer, [] + + _log.info("discovery.walk.done", seeds=len(seeds), discovered=len(out), max_depth=max_depth) + return out + + # ---------- persistence ------------------------------------------------------ def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None: