stage-disc-b discovery: peer walker (BFS)
This commit is contained in:
@@ -1,20 +1,19 @@
|
||||
"""Discovery — DNS-SD resolver + HTTP probes for internet-wide federation.
|
||||
"""Discovery — DNS-SD resolver + BFS peer-walker for internet-wide federation.
|
||||
|
||||
Federation identity (psyc.lines.federation) gives every node a stable Ed25519
|
||||
keypair, a 32-hex fingerprint, and a DNS record format. This module is the
|
||||
*finder*: given a domain you suspect runs psyc, walk its DNS-SD records to
|
||||
learn the fingerprint and reach its public federation endpoints.
|
||||
*finder*: given a seed domain you suspect runs psyc, walk its DNS-SD records
|
||||
to learn the fingerprint, fetch its public peer list, and recurse.
|
||||
|
||||
The BFS peer-walker that recurses across `/federation/peers/public` lives in
|
||||
the same module and lands in the next commit. Newly-discovered peers always
|
||||
enter the `peers` table with status="unknown" — they do NOT become trusted by
|
||||
being seen; vouching is a separate concern.
|
||||
Newly-discovered peers always enter the `peers` table with status="unknown" —
|
||||
they do NOT become trusted by being seen; vouching is a separate concern
|
||||
(sibling module). Discovery only populates the candidate set.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
import dns.exception
|
||||
import dns.rdatatype
|
||||
@@ -31,6 +30,8 @@ _log = log.get(__name__)
|
||||
|
||||
DEFAULT_TIMEOUT = 5.0
|
||||
DEFAULT_PORT = 443
|
||||
DEFAULT_MAX_DEPTH = 2
|
||||
DEFAULT_MAX_PEERS = 200
|
||||
|
||||
_VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"}
|
||||
|
||||
@@ -207,6 +208,97 @@ def fetch_public_peers(domain: str, port: int = DEFAULT_PORT,
|
||||
return Ok(out)
|
||||
|
||||
|
||||
# ---------- BFS walker -------------------------------------------------------
|
||||
|
||||
def walk(seeds: List[str], max_depth: int = DEFAULT_MAX_DEPTH,
|
||||
max_peers: int = DEFAULT_MAX_PEERS, timeout: float = DEFAULT_TIMEOUT) -> List[PeerCandidate]:
|
||||
"""Breadth-first walk from `seeds` → discovered candidates.
|
||||
|
||||
For each domain: DNS-SD resolve → fetch /info to verify fingerprint →
|
||||
fetch /peers/public → enqueue its domains for the next layer. Dedupes on
|
||||
(domain, fingerprint). Skips our own fingerprint to avoid loops. All
|
||||
errors are logged but non-fatal — one bad peer doesn't abort the walk.
|
||||
"""
|
||||
own_fp = ""
|
||||
try:
|
||||
own_fp = federation.node_fingerprint()
|
||||
except Exception as exc: # noqa: BLE001 — discovery should work even pre-keygen
|
||||
_log.info("discovery.walk.no_own_fp", error=str(exc))
|
||||
|
||||
seen_pairs: Set[Tuple[str, str]] = set()
|
||||
seen_domains: Set[str] = set()
|
||||
out: List[PeerCandidate] = []
|
||||
|
||||
# Queue of (domain, depth, source). Seeds enter at depth 0.
|
||||
frontier: List[Tuple[str, int, str]] = [(d.strip(), 0, "dns-sd") for d in seeds if d and d.strip()]
|
||||
next_layer: List[Tuple[str, int, str]] = []
|
||||
|
||||
while frontier:
|
||||
domain, depth, source = frontier.pop(0)
|
||||
if domain in seen_domains:
|
||||
if not frontier:
|
||||
frontier, next_layer = next_layer, []
|
||||
continue
|
||||
seen_domains.add(domain)
|
||||
if len(out) >= max_peers:
|
||||
_log.info("discovery.walk.cap.peers", cap=max_peers)
|
||||
break
|
||||
|
||||
resolved = resolve_psyc(domain, timeout=timeout)
|
||||
if isinstance(resolved, Err):
|
||||
_log.info("discovery.resolve.skip", domain=domain, reason=resolved.reason)
|
||||
if not frontier:
|
||||
frontier, next_layer = next_layer, []
|
||||
continue
|
||||
cand = resolved.value
|
||||
cand.source = source if depth > 0 else "dns-sd"
|
||||
|
||||
if cand.fingerprint == own_fp:
|
||||
_log.info("discovery.skip.self", domain=domain)
|
||||
if not frontier:
|
||||
frontier, next_layer = next_layer, []
|
||||
continue
|
||||
|
||||
pair = (cand.domain, cand.fingerprint)
|
||||
if pair in seen_pairs:
|
||||
if not frontier:
|
||||
frontier, next_layer = next_layer, []
|
||||
continue
|
||||
seen_pairs.add(pair)
|
||||
|
||||
# Verify the live endpoint's fingerprint matches DNS. If we can't reach
|
||||
# it, still record the DNS-discovered candidate — vouching can vet it
|
||||
# later, and we don't want one HTTP outage to abort the walk.
|
||||
info_res = fetch_peer_info(domain, port=cand.port, timeout=timeout,
|
||||
expected_fingerprint=cand.fingerprint)
|
||||
if isinstance(info_res, Err):
|
||||
_log.info("discovery.info.skip", domain=domain, reason=info_res.reason)
|
||||
out.append(cand)
|
||||
if not frontier:
|
||||
frontier, next_layer = next_layer, []
|
||||
continue
|
||||
out.append(cand)
|
||||
|
||||
# Recurse: fetch this peer's public-peers list, enqueue domains.
|
||||
if depth + 1 <= max_depth:
|
||||
peers_res = fetch_public_peers(domain, port=cand.port, timeout=timeout)
|
||||
if isinstance(peers_res, Err):
|
||||
_log.info("discovery.peers.skip", domain=domain, reason=peers_res.reason)
|
||||
else:
|
||||
for item in peers_res.value:
|
||||
child_domain = str(item.get("domain", "")).strip()
|
||||
if not child_domain or child_domain in seen_domains:
|
||||
continue
|
||||
child_source = f"peer-walk:{domain}"
|
||||
next_layer.append((child_domain, depth + 1, child_source))
|
||||
|
||||
if not frontier:
|
||||
frontier, next_layer = next_layer, []
|
||||
|
||||
_log.info("discovery.walk.done", seeds=len(seeds), discovered=len(out), max_depth=max_depth)
|
||||
return out
|
||||
|
||||
|
||||
# ---------- persistence ------------------------------------------------------
|
||||
|
||||
def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None:
|
||||
|
||||
Reference in New Issue
Block a user