merge discovery: DNS-SD walker + public peers endpoint

This commit is contained in:
m17hr1l
2026-06-06 21:13:29 +02:00
8 changed files with 1016 additions and 5 deletions

View File

@@ -23,6 +23,7 @@ dependencies = [
"pyotp>=2.9",
"qrcode[pil]>=7.4",
"itsdangerous>=2.1",
"dnspython>=2.4",
]
[project.optional-dependencies]

View File

@@ -7,14 +7,14 @@ from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
from typing import List, Optional
import httpx
import typer
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err
from psyc.lines import discovery, federation, pulse
from psyc.result import Err, Ok
_log = log.get(__name__)
@@ -133,3 +133,80 @@ def register(typer_app: typer.Typer) -> None:
db.init_db()
federation.remove_peer(domain)
typer.echo(f"removed {domain}")
# ---------- discovery (DNS-SD walker) ----------------------------------
@typer_app.command("fed-resolve")
def fed_resolve(
domain: str = typer.Argument(..., help="domain to look up via _psyc._tcp.<domain>"),
timeout: float = typer.Option(5.0, "--timeout", help="DNS lookup timeout, seconds"),
) -> None:
"""Resolve a domain's psyc DNS-SD record. Prints fingerprint + port."""
result = discovery.resolve_psyc(domain, timeout=timeout)
if isinstance(result, Err):
typer.echo(f"error: {result.reason}", err=True)
raise typer.Exit(1)
c = result.value
typer.echo(f" domain {c.domain}")
typer.echo(f" fingerprint {c.fingerprint}")
typer.echo(f" port {c.port}")
typer.echo(f" source {c.source}")
@typer_app.command("fed-walk")
def fed_walk(
seeds: List[str] = typer.Argument(..., help="one or more seed domains"),
depth: int = typer.Option(2, "--depth", help="max BFS depth"),
max_peers: int = typer.Option(200, "--max-peers", help="cap discovered candidates"),
record: bool = typer.Option(False, "--record", help="persist candidates as status=unknown"),
) -> None:
"""Walk DNS-SD + peer-public from `seeds`. Prints discovered table."""
db.init_db()
cands = discovery.walk(seeds, max_depth=depth, max_peers=max_peers)
if not cands:
typer.echo("(no candidates discovered)")
return
typer.echo(f"{'domain':<32} {'fingerprint':<24} {'port':>5} source")
for c in cands:
fp = f"{c.fingerprint[:8]}{c.fingerprint[-8:]}"
typer.echo(f"{c.domain:<32} {fp:<24} {c.port:>5} {c.source}")
if record:
for c in cands:
discovery.record_candidate(c)
typer.echo(f"recorded {len(cands)} candidate(s) into peers table")
@typer_app.command("fed-seeds-list")
def fed_seeds_list() -> None:
"""Print the operator-curated discovery seed list."""
db.init_db()
seeds = pulse.get_discovery_seeds()
if not seeds:
typer.echo("(no seeds configured)")
return
for s in seeds:
typer.echo(s)
@typer_app.command("fed-seeds-add")
def fed_seeds_add(domain: str = typer.Argument(...)) -> None:
"""Append a seed domain (no-op if already present)."""
db.init_db()
seeds = pulse.get_discovery_seeds()
d = domain.strip()
if d in seeds:
typer.echo(f"{d} already a seed")
return
seeds.append(d)
pulse.set_discovery_seeds(seeds)
typer.echo(f"added seed {d}")
@typer_app.command("fed-seeds-remove")
def fed_seeds_remove(domain: str = typer.Argument(...)) -> None:
"""Remove a seed domain (no-op if absent)."""
db.init_db()
seeds = pulse.get_discovery_seeds()
d = domain.strip()
if d not in seeds:
typer.echo(f"{d} not in seeds")
return
seeds = [s for s in seeds if s != d]
pulse.set_discovery_seeds(seeds)
typer.echo(f"removed seed {d}")

View File

@@ -15,7 +15,7 @@ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Red
from fastapi.templating import Jinja2Templates
from psyc import db, log
from psyc.lines import federation
from psyc.lines import discovery, federation, pulse
_log = log.get(__name__)
@@ -25,6 +25,10 @@ _log = log.get(__name__)
_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_FEED_TTL = 60.0
# Mirror the feed cache for the public peers list — same poll-load pattern.
_PUBLIC_PEERS_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_PUBLIC_PEERS_TTL = 60.0
def _admin_ok(request: Request) -> bool:
return bool(request.session.get("admin_ok"))
@@ -38,6 +42,14 @@ def _cached_feed() -> Dict[str, Any]:
return _FEED_CACHE["payload"]
def _cached_public_peers() -> Any:
now = time.time()
if _PUBLIC_PEERS_CACHE["payload"] is None or (now - _PUBLIC_PEERS_CACHE["ts"]) > _PUBLIC_PEERS_TTL:
_PUBLIC_PEERS_CACHE["payload"] = discovery.public_peer_attestation()
_PUBLIC_PEERS_CACHE["ts"] = now
return _PUBLIC_PEERS_CACHE["payload"]
def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
"""Mount all federation routes onto `app`."""
@@ -103,6 +115,54 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
federation.remove_peer(domain)
return RedirectResponse("/admin/federation", status_code=303)
# ---------- discovery (DNS-SD walker) ----------------------------
@app.get("/admin/federation/discovery", response_class=HTMLResponse)
def admin_discovery(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
seeds = pulse.get_discovery_seeds()
candidates = federation.list_peers()
flash = request.query_params.get("flash") or ""
return TEMPLATES.TemplateResponse(
request,
"admin_discovery.html",
{
"seeds": seeds,
"seeds_text": "\n".join(seeds),
"candidates": candidates,
"flash": flash,
},
)
@app.post("/admin/federation/discovery/seeds")
def admin_discovery_seeds(
request: Request,
seeds: str = Form(""),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
lines = [line for line in seeds.splitlines()]
pulse.set_discovery_seeds(lines)
return RedirectResponse("/admin/federation/discovery?flash=seeds+saved", status_code=303)
@app.post("/admin/federation/discovery/walk")
def admin_discovery_walk(request: Request) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
seeds = pulse.get_discovery_seeds()
if not seeds:
return RedirectResponse("/admin/federation/discovery?flash=no+seeds+configured", status_code=303)
try:
cands = discovery.walk(seeds)
for c in cands:
discovery.record_candidate(c)
msg = f"discovered+{len(cands)}+candidates+from+{len(seeds)}+seed(s)"
except Exception as exc: # noqa: BLE001 — surface the error to the operator
_log.warning("federation.discovery.walk.error", error=str(exc))
msg = f"walk+failed:+{str(exc)[:80]}"
return RedirectResponse(f"/admin/federation/discovery?flash={msg}", status_code=303)
# ---------- public endpoints --------------------------------------
@app.get("/federation/info")
@@ -122,4 +182,13 @@ def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
def federation_feed() -> JSONResponse:
return JSONResponse(_cached_feed())
@app.get("/federation/peers/public")
def federation_peers_public() -> JSONResponse:
"""Publicly attested peer list — what other psyc walkers discover us through.
Only trusted peers leak; unknown + blocked are internal state and must
never appear here.
"""
return JSONResponse(_cached_public_peers())
_log.info("federation.routes.registered")

View File

@@ -0,0 +1,72 @@
{% extends "base.html" %}
{% block title %}Discovery — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Peer discovery</h1>
<span class="count">{{ candidates|length }} candidate{{ '' if candidates|length == 1 else 's' }}</span>
</div>
<p class="page-intro">Walk DNS-SD records from a seed domain you know runs psyc, then recurse through its public peer list. Newly-found peers land here with status <code>unknown</code> — vouching is what eventually promotes them. Once seeds exist, enabling the <code>peer-pull</code> pulse pipeline runs this on a cadence.</p>
<p class="back"><a href="/admin/federation">← back to federation</a></p>
{% if flash %}
<div class="verdict verdict-clean">{{ flash }}</div>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Seed domains</h2>
<span class="count">{{ seeds|length }} configured</span>
</div>
<p class="page-intro">One domain per line. Each seed is resolved via <code>_psyc._tcp.&lt;domain&gt;</code> SRV+TXT; its <code>/federation/peers/public</code> is fetched and recursed.</p>
<form method="post" action="/admin/federation/discovery/seeds" style="display:grid; gap:10px; max-width:680px;">
<textarea name="seeds" rows="6" class="lookup-input" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:12px;" placeholder="peer1.example.com&#10;peer2.example.org">{{ seeds_text }}</textarea>
<div style="display:flex; gap:10px;">
<button type="submit" class="btn btn-enforce">save seeds</button>
</div>
</form>
<form method="post" action="/admin/federation/discovery/walk" style="margin-top:14px;">
<button type="submit" class="btn btn-approve" {% if not seeds %}disabled{% endif %}>walk now</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Recent candidates</h2>
<span class="count">{{ candidates|length }} in registry</span>
</div>
<p class="page-intro">Every peer the walker has ever found, newest first. Trusted/blocked statuses are preserved across re-walks — discovery never demotes a peer the operator has classified.</p>
{% if candidates %}
<table class="ledger">
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Discovered</th><th>Last seen</th><th>Notes</th></tr></thead>
<tbody>
{% for p in candidates %}
<tr class="ledger-row">
<td><strong>{{ p.domain }}</strong></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }}</td>
<td>
{% if p.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif p.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">{{ p.status }}</span>
{% endif %}
</td>
<td class="lg-ts">{{ (p.discovered_at or '')[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }}</td>
<td class="lg-sub">{{ (p.notes or '')[:60] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no candidates yet — add a seed above and walk)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -313,6 +313,25 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
conn.execute(stmt)
def pulse_setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
"""Fetch one row from pulse_settings by key; returns None if absent."""
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return str(row.value) if row else None
def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
"""Upsert one (key, value) into pulse_settings."""
stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- federation: peers + signal buffer ----------------------------
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:

357
src/psyc/lines/discovery.py Normal file
View File

@@ -0,0 +1,357 @@
"""Discovery — DNS-SD resolver + BFS peer-walker for internet-wide federation.
Federation identity (psyc.lines.federation) gives every node a stable Ed25519
keypair, a 32-hex fingerprint, and a DNS record format. This module is the
*finder*: given a seed domain you suspect runs psyc, walk its DNS-SD records
to learn the fingerprint, fetch its public peer list, and recurse.
Newly-discovered peers always enter the `peers` table with status="unknown"
they do NOT become trusted by being seen; vouching is a separate concern
(sibling module). Discovery only populates the candidate set.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple
import dns.exception
import dns.rdatatype
import dns.resolver
import httpx
from pydantic import BaseModel, Field
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
DEFAULT_TIMEOUT = 5.0
DEFAULT_PORT = 443
DEFAULT_MAX_DEPTH = 2
DEFAULT_MAX_PEERS = 200
_VALID_STATUSES = {"unknown", "trusted", "blocked", "vouched"}
# ---------- candidate model --------------------------------------------------
class PeerCandidate(BaseModel):
"""A peer found by the resolver/walker — not yet vetted, just observed."""
domain: str
fingerprint: str
port: int = DEFAULT_PORT
source: str # "dns-sd" | "peer-walk:<source-domain>"
discovered_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# ---------- DNS-SD resolver --------------------------------------------------
def _parse_txt_value(txt: str) -> Result[Dict[str, str], str]:
"""Parse `v=psyc1 fp=<hex> alg=ed25519 path=...` → dict. Tolerant of order."""
out: Dict[str, str] = {}
for token in txt.strip().split():
if "=" not in token:
return Err(f"malformed TXT token (no '='): {token!r}")
k, v = token.split("=", 1)
out[k.strip()] = v.strip()
if out.get("v") != federation.FEED_VERSION:
return Err(f"unsupported version: {out.get('v')!r}")
fp = out.get("fp", "")
if len(fp) != 32 or any(c not in "0123456789abcdef" for c in fp.lower()):
return Err(f"bad fingerprint: {fp!r}")
if out.get("alg") and out["alg"] != federation.FEED_ALG:
return Err(f"unsupported alg: {out.get('alg')!r}")
return Ok(out)
def _flatten_txt(rdata: Any) -> str:
"""DNS TXT records are a sequence of byte-strings — join them. Tolerant of mocks."""
strings = getattr(rdata, "strings", None)
if strings is None:
return str(rdata).strip('"')
parts: List[str] = []
for s in strings:
if isinstance(s, bytes):
parts.append(s.decode("utf-8", errors="replace"))
else:
parts.append(str(s))
return "".join(parts)
def resolve_psyc(domain: str, timeout: float = DEFAULT_TIMEOUT) -> Result[PeerCandidate, str]:
"""Look up `_psyc._tcp.<domain>` SRV + TXT → return a candidate.
SRV gives target+port; TXT carries `v=psyc1 fp=... alg=ed25519 path=...`.
Any DNS failure, parse failure, or missing record returns Err.
"""
name = f"_psyc._tcp.{domain}"
resolver = dns.resolver.Resolver()
resolver.lifetime = timeout
resolver.timeout = timeout
# SRV: target host + port.
port = DEFAULT_PORT
try:
srv_answers = resolver.resolve(name, "SRV")
except dns.resolver.NXDOMAIN:
return Err(f"no SRV record at {name} (NXDOMAIN)")
except dns.resolver.NoAnswer:
return Err(f"no SRV record at {name} (NoAnswer)")
except dns.exception.Timeout:
return Err(f"SRV lookup timed out for {name}")
except dns.exception.DNSException as exc:
return Err(f"SRV lookup failed for {name}: {exc}")
srv_list = list(srv_answers)
if not srv_list:
return Err(f"no SRV record at {name} (empty)")
try:
port = int(srv_list[0].port)
except Exception:
port = DEFAULT_PORT
# TXT: fingerprint + protocol metadata.
try:
txt_answers = resolver.resolve(name, "TXT")
except dns.resolver.NXDOMAIN:
return Err(f"no TXT record at {name} (NXDOMAIN)")
except dns.resolver.NoAnswer:
return Err(f"no TXT record at {name} (NoAnswer)")
except dns.exception.Timeout:
return Err(f"TXT lookup timed out for {name}")
except dns.exception.DNSException as exc:
return Err(f"TXT lookup failed for {name}: {exc}")
txt_list = list(txt_answers)
if not txt_list:
return Err(f"no TXT record at {name} (empty)")
last_err: Optional[str] = None
for rdata in txt_list:
value = _flatten_txt(rdata)
parsed = _parse_txt_value(value)
if isinstance(parsed, Ok):
return Ok(PeerCandidate(
domain=domain,
fingerprint=parsed.value["fp"].lower(),
port=port,
source="dns-sd",
))
last_err = parsed.reason
return Err(f"no usable psyc TXT record at {name}: {last_err}")
# ---------- HTTP probes ------------------------------------------------------
def _base_url(domain: str, port: int) -> str:
if port == 443:
return f"https://{domain}"
return f"https://{domain}:{port}"
def fetch_peer_info(domain: str, port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT,
expected_fingerprint: Optional[str] = None) -> Result[Dict[str, Any], str]:
"""GET /federation/info on a peer. Cross-checks fingerprint if provided.
The cross-check defends against MITM-injected TXT records — DNS said one
fingerprint, the live node's HTTPS-served info MUST agree.
"""
url = _base_url(domain, port) + "/federation/info"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except httpx.HTTPStatusError as exc:
return Err(f"HTTP {exc.response.status_code} from {url}")
except httpx.RequestError as exc:
return Err(f"network error fetching {url}: {exc}")
except ValueError as exc:
return Err(f"non-JSON response from {url}: {exc}")
except Exception as exc: # noqa: BLE001 — anything weird is a failure
return Err(f"fetch failed for {url}: {exc}")
if not isinstance(data, dict):
return Err(f"unexpected info shape from {url}: {type(data).__name__}")
declared = str(data.get("fingerprint", "")).lower()
if expected_fingerprint and declared != expected_fingerprint.lower():
return Err(
f"fingerprint mismatch for {domain}: DNS said {expected_fingerprint!r} "
f"but /federation/info said {declared!r}"
)
return Ok(data)
def fetch_public_peers(domain: str, port: int = DEFAULT_PORT,
timeout: float = DEFAULT_TIMEOUT) -> Result[List[Dict[str, Any]], str]:
"""GET /federation/peers/public on a peer. Returns the list as-is for the walker to dedupe."""
url = _base_url(domain, port) + "/federation/peers/public"
try:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
except httpx.HTTPStatusError as exc:
return Err(f"HTTP {exc.response.status_code} from {url}")
except httpx.RequestError as exc:
return Err(f"network error fetching {url}: {exc}")
except ValueError as exc:
return Err(f"non-JSON response from {url}: {exc}")
except Exception as exc: # noqa: BLE001
return Err(f"fetch failed for {url}: {exc}")
if not isinstance(data, list):
return Err(f"unexpected peers shape from {url}: {type(data).__name__}")
out: List[Dict[str, Any]] = []
for item in data:
if isinstance(item, dict) and item.get("domain") and item.get("fingerprint"):
out.append(item)
return Ok(out)
# ---------- BFS walker -------------------------------------------------------
def walk(seeds: List[str], max_depth: int = DEFAULT_MAX_DEPTH,
max_peers: int = DEFAULT_MAX_PEERS, timeout: float = DEFAULT_TIMEOUT) -> List[PeerCandidate]:
"""Breadth-first walk from `seeds` → discovered candidates.
For each domain: DNS-SD resolve → fetch /info to verify fingerprint →
fetch /peers/public → enqueue its domains for the next layer. Dedupes on
(domain, fingerprint). Skips our own fingerprint to avoid loops. All
errors are logged but non-fatal — one bad peer doesn't abort the walk.
"""
own_fp = ""
try:
own_fp = federation.node_fingerprint()
except Exception as exc: # noqa: BLE001 — discovery should work even pre-keygen
_log.info("discovery.walk.no_own_fp", error=str(exc))
seen_pairs: Set[Tuple[str, str]] = set()
seen_domains: Set[str] = set()
out: List[PeerCandidate] = []
# Queue of (domain, depth, source). Seeds enter at depth 0.
frontier: List[Tuple[str, int, str]] = [(d.strip(), 0, "dns-sd") for d in seeds if d and d.strip()]
next_layer: List[Tuple[str, int, str]] = []
while frontier:
domain, depth, source = frontier.pop(0)
if domain in seen_domains:
if not frontier:
frontier, next_layer = next_layer, []
continue
seen_domains.add(domain)
if len(out) >= max_peers:
_log.info("discovery.walk.cap.peers", cap=max_peers)
break
resolved = resolve_psyc(domain, timeout=timeout)
if isinstance(resolved, Err):
_log.info("discovery.resolve.skip", domain=domain, reason=resolved.reason)
if not frontier:
frontier, next_layer = next_layer, []
continue
cand = resolved.value
cand.source = source if depth > 0 else "dns-sd"
if cand.fingerprint == own_fp:
_log.info("discovery.skip.self", domain=domain)
if not frontier:
frontier, next_layer = next_layer, []
continue
pair = (cand.domain, cand.fingerprint)
if pair in seen_pairs:
if not frontier:
frontier, next_layer = next_layer, []
continue
seen_pairs.add(pair)
# Verify the live endpoint's fingerprint matches DNS. If we can't reach
# it, still record the DNS-discovered candidate — vouching can vet it
# later, and we don't want one HTTP outage to abort the walk.
info_res = fetch_peer_info(domain, port=cand.port, timeout=timeout,
expected_fingerprint=cand.fingerprint)
if isinstance(info_res, Err):
_log.info("discovery.info.skip", domain=domain, reason=info_res.reason)
out.append(cand)
if not frontier:
frontier, next_layer = next_layer, []
continue
out.append(cand)
# Recurse: fetch this peer's public-peers list, enqueue domains.
if depth + 1 <= max_depth:
peers_res = fetch_public_peers(domain, port=cand.port, timeout=timeout)
if isinstance(peers_res, Err):
_log.info("discovery.peers.skip", domain=domain, reason=peers_res.reason)
else:
for item in peers_res.value:
child_domain = str(item.get("domain", "")).strip()
if not child_domain or child_domain in seen_domains:
continue
child_source = f"peer-walk:{domain}"
next_layer.append((child_domain, depth + 1, child_source))
if not frontier:
frontier, next_layer = next_layer, []
_log.info("discovery.walk.done", seeds=len(seeds), discovered=len(out), max_depth=max_depth)
return out
# ---------- persistence ------------------------------------------------------
def record_candidate(c: PeerCandidate, default_status: str = "unknown") -> None:
"""Upsert a discovered candidate into the peers table.
Preserves any existing trusted/blocked status — discovery NEVER demotes a
peer the operator has already classified. Only updates last_seen.
"""
if default_status not in _VALID_STATUSES:
default_status = "unknown"
existing = db.get_peer(c.domain)
now = c.discovered_at.isoformat()
if existing:
status = existing.get("status") or default_status
if status not in ("trusted", "blocked"):
status = default_status
db.upsert_peer(dict(
domain=c.domain,
fingerprint=existing.get("fingerprint") or c.fingerprint,
pubkey_pem=existing.get("pubkey_pem") or "",
status=status,
discovered_at=existing.get("discovered_at") or now,
last_seen=now,
notes=existing.get("notes"),
))
return
db.upsert_peer(dict(
domain=c.domain,
fingerprint=c.fingerprint,
pubkey_pem="", # populated when we successfully fetch /federation/key during vouching
status=default_status,
discovered_at=now,
last_seen=now,
notes=f"discovered via {c.source}",
))
_log.info("discovery.recorded", domain=c.domain, fp=c.fingerprint, source=c.source)
# ---------- public attestation -----------------------------------------------
def public_peer_attestation() -> List[Dict[str, Any]]:
"""List of peers we'll publicly attest to. Only `trusted` — never leaks unknown/blocked.
This is the surface that other psyc nodes' walkers read from us. We never
expose unknown or blocked peers — those are internal classification state.
"""
out: List[Dict[str, Any]] = []
for row in db.list_peers():
if row.get("status") != "trusted":
continue
out.append({
"domain": row["domain"],
"fingerprint": row["fingerprint"],
"first_seen": row.get("discovered_at"),
})
return out

View File

@@ -134,8 +134,48 @@ def _run_respond_propose() -> str:
return f"proposed {proposed} action(s) for {touched} case(s)"
_DISCOVERY_SEEDS_KEY = "discovery_seeds"
def get_discovery_seeds() -> List[str]:
"""Operator-curated seed list for the discovery walker. Newline-separated in DB."""
raw = db.pulse_setting_get(_DISCOVERY_SEEDS_KEY)
if not raw:
return []
return [line.strip() for line in raw.splitlines() if line.strip()]
def set_discovery_seeds(seeds: List[str]) -> None:
"""Replace the seed list. Strips blanks + dedupes preserving order."""
seen: set = set()
cleaned: List[str] = []
for s in seeds:
v = (s or "").strip()
if not v or v in seen:
continue
seen.add(v)
cleaned.append(v)
db.pulse_setting_set(_DISCOVERY_SEEDS_KEY, "\n".join(cleaned))
def _run_peer_pull() -> str:
return "federation not yet active"
"""Walk DNS-SD + recurse over peer-public lists from the operator's seeds.
Records every fresh candidate into the `peers` table with status=unknown.
Vouching (sibling stage) is what eventually promotes them.
"""
from psyc.lines import discovery
seeds = get_discovery_seeds()
if not seeds:
return "no seeds configured"
candidates = discovery.walk(seeds)
for c in candidates:
try:
discovery.record_candidate(c)
except Exception as exc: # noqa: BLE001 — one bad write must not abort the batch
_log.warning("pulse.peer_pull.record.error", domain=c.domain, error=str(exc))
return f"discovered {len(candidates)} candidate(s) from {len(seeds)} seed(s)"
def _run_vouch_refresh() -> str:

376
tests/test_discovery.py Normal file
View File

@@ -0,0 +1,376 @@
"""Discovery — DNS-SD parse + resolver, BFS walker, persistence, public endpoint."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock, patch
import dns.exception
import dns.resolver
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from starlette.middleware.sessions import SessionMiddleware
from psyc import db
from psyc.cockpit import federation_routes
from psyc.lines import discovery, federation, pulse
from psyc.lines.discovery import (
PeerCandidate,
_parse_txt_value,
fetch_peer_info,
fetch_public_peers,
public_peer_attestation,
record_candidate,
resolve_psyc,
walk,
)
from psyc.result import Err, Ok
# ---------- fixtures ---------------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def _mk_srv(port: int = 443) -> Any:
rd = MagicMock()
rd.port = port
return rd
def _mk_txt(value: str) -> Any:
rd = MagicMock()
rd.strings = [value.encode("utf-8")]
return rd
# ---------- TXT parser -------------------------------------------------------
def test_parse_txt_valid():
txt = "v=psyc1 fp=" + "a" * 32 + " alg=ed25519 path=/federation/feed"
res = _parse_txt_value(txt)
assert isinstance(res, Ok)
assert res.value["fp"] == "a" * 32
assert res.value["alg"] == "ed25519"
def test_parse_txt_tolerates_token_order():
txt = "path=/federation/feed alg=ed25519 fp=" + "f" * 32 + " v=psyc1"
res = _parse_txt_value(txt)
assert isinstance(res, Ok)
def test_parse_txt_rejects_wrong_version():
txt = "v=psyc2 fp=" + "a" * 32 + " alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "version" in res.reason
def test_parse_txt_rejects_bad_fingerprint_length():
txt = "v=psyc1 fp=deadbeef alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "fingerprint" in res.reason
def test_parse_txt_rejects_non_hex_fingerprint():
txt = "v=psyc1 fp=" + "z" * 32 + " alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
def test_parse_txt_rejects_malformed_token():
txt = "v=psyc1 fp=" + "a" * 32 + " alg ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "malformed" in res.reason
def test_parse_txt_rejects_wrong_alg():
txt = "v=psyc1 fp=" + "a" * 32 + " alg=rsa"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
# ---------- resolve_psyc -----------------------------------------------------
def test_resolve_psyc_happy_path():
fp = "1" * 32
txt = f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
def fake_resolve(self, name, rdtype):
if rdtype == "SRV":
return [_mk_srv(port=8443)]
if rdtype == "TXT":
return [_mk_txt(txt)]
raise dns.exception.DNSException("unexpected")
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example.com")
assert isinstance(res, Ok)
cand = res.value
assert cand.domain == "peer.example.com"
assert cand.fingerprint == fp
assert cand.port == 8443
assert cand.source == "dns-sd"
def test_resolve_psyc_nxdomain_returns_err():
def fake_resolve(self, name, rdtype):
raise dns.resolver.NXDOMAIN()
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("nothere.example")
assert isinstance(res, Err)
assert "NXDOMAIN" in res.reason
def test_resolve_psyc_txt_malformed_returns_err():
def fake_resolve(self, name, rdtype):
if rdtype == "SRV":
return [_mk_srv()]
return [_mk_txt("v=psyc1 fp=garbage alg=ed25519")]
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example")
assert isinstance(res, Err)
assert "TXT" in res.reason or "fingerprint" in res.reason
def test_resolve_psyc_no_answer_returns_err():
def fake_resolve(self, name, rdtype):
raise dns.resolver.NoAnswer()
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example")
assert isinstance(res, Err)
assert "NoAnswer" in res.reason
# ---------- walk -------------------------------------------------------------
def _stub_resolve(catalog: Dict[str, str]):
"""Build a resolve_psyc stub that returns each domain's catalog fingerprint."""
def _stub(domain: str, timeout: float = 5.0):
if domain not in catalog:
return Err(f"no record for {domain}")
return Ok(PeerCandidate(
domain=domain,
fingerprint=catalog[domain],
port=443,
source="dns-sd",
))
return _stub
def _stub_fetch_info_ok(*args, **kwargs):
return Ok({"fingerprint": kwargs.get("expected_fingerprint", "")})
def _stub_fetch_peers_factory(graph: Dict[str, List[Dict[str, str]]]):
def _stub(domain: str, port: int = 443, timeout: float = 5.0):
return Ok(graph.get(domain, []))
return _stub
def test_walk_dedupes_by_fingerprint(fresh_db, fed_dir, monkeypatch):
# Two seeds, same fingerprint via different domains → only one survives the (domain,fp) dedupe
# but distinct domains both surface; the (domain, fp) pair just shouldn't repeat.
fp = "9" * 32
catalog = {"a.example": fp, "b.example": fp}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["a.example", "b.example", "a.example"], max_depth=1)
# both unique domains made it in; the duplicate seed didn't re-enter
assert len(out) == 2
domains = {c.domain for c in out}
assert domains == {"a.example", "b.example"}
def test_walk_respects_max_depth(fresh_db, fed_dir, monkeypatch):
catalog = {"d0.example": "0" * 32, "d1.example": "1" * 32, "d2.example": "2" * 32}
graph = {
"d0.example": [{"domain": "d1.example", "fingerprint": "1" * 32}],
"d1.example": [{"domain": "d2.example", "fingerprint": "2" * 32}],
}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph))
out = walk(["d0.example"], max_depth=1)
domains = {c.domain for c in out}
# depth 0: d0; depth 1: d1; depth 2 (d2) is excluded by max_depth=1
assert "d0.example" in domains and "d1.example" in domains
assert "d2.example" not in domains
def test_walk_respects_max_peers(fresh_db, fed_dir, monkeypatch):
catalog = {f"d{i}.example": f"{i:032x}" for i in range(10)}
graph = {"d0.example": [{"domain": f"d{i}.example", "fingerprint": f"{i:032x}"} for i in range(1, 10)]}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph))
out = walk(["d0.example"], max_depth=2, max_peers=3)
assert len(out) <= 3
def test_walk_skips_own_fingerprint(fresh_db, fed_dir, monkeypatch):
own = federation.node_fingerprint()
catalog = {"self.example": own, "peer.example": "f" * 32}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["self.example", "peer.example"], max_depth=1)
domains = {c.domain for c in out}
assert "self.example" not in domains
assert "peer.example" in domains
def test_walk_one_failure_does_not_abort(fresh_db, fed_dir, monkeypatch):
catalog = {"good.example": "a" * 32} # bad.example is absent → Err on resolve
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["bad.example", "good.example"], max_depth=1)
assert len(out) == 1
assert out[0].domain == "good.example"
# ---------- record_candidate -------------------------------------------------
def test_record_candidate_inserts_as_unknown(fresh_db):
c = PeerCandidate(domain="new.example", fingerprint="a" * 32, source="dns-sd")
record_candidate(c)
row = db.get_peer("new.example")
assert row is not None
assert row["status"] == "unknown"
assert row["fingerprint"] == "a" * 32
def test_record_candidate_preserves_trusted(fresh_db, fed_dir):
federation.register_peer("vip.example", "b" * 32, "PEM", status="trusted")
# walker re-discovers it
c = PeerCandidate(domain="vip.example", fingerprint="b" * 32, source="peer-walk:other.example")
record_candidate(c)
row = db.get_peer("vip.example")
assert row["status"] == "trusted"
def test_record_candidate_preserves_blocked(fresh_db, fed_dir):
federation.register_peer("bad.example", "c" * 32, "PEM", status="blocked")
c = PeerCandidate(domain="bad.example", fingerprint="c" * 32, source="dns-sd")
record_candidate(c)
row = db.get_peer("bad.example")
assert row["status"] == "blocked"
def test_record_candidate_updates_last_seen(fresh_db):
c = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd")
record_candidate(c)
first = db.get_peer("repeat.example")
# second pass — last_seen advances, discovered_at stays
c2 = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd")
record_candidate(c2)
second = db.get_peer("repeat.example")
assert second["discovered_at"] == first["discovered_at"]
# ---------- public attestation -----------------------------------------------
def test_public_peer_attestation_only_trusted(fresh_db, fed_dir):
federation.register_peer("trusted.example", "1" * 32, "PEM", status="trusted")
federation.register_peer("unknown.example", "2" * 32, "PEM", status="unknown")
federation.register_peer("blocked.example", "3" * 32, "PEM", status="blocked")
out = public_peer_attestation()
domains = {p["domain"] for p in out}
assert domains == {"trusted.example"}
def test_public_peer_attestation_payload_shape(fresh_db, fed_dir):
federation.register_peer("t.example", "f" * 32, "PEM", status="trusted")
out = public_peer_attestation()
assert len(out) == 1
entry = out[0]
assert set(entry.keys()) == {"domain", "fingerprint", "first_seen"}
# ---------- public endpoint via TestClient -----------------------------------
def _mk_app() -> FastAPI:
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="test-secret")
# Templates aren't exercised by the public endpoints we care about here.
from fastapi.templating import Jinja2Templates
import tempfile, os
tdir = tempfile.mkdtemp()
templates = Jinja2Templates(directory=tdir)
federation_routes.register(app, templates)
return app
def test_public_peers_endpoint_excludes_unknown_blocked(fresh_db, fed_dir):
federation.register_peer("ok.example", "a" * 32, "PEM", status="trusted")
federation.register_peer("rude.example", "b" * 32, "PEM", status="blocked")
federation.register_peer("new.example", "c" * 32, "PEM", status="unknown")
# Flush in-memory cache from any earlier test.
federation_routes._PUBLIC_PEERS_CACHE["payload"] = None
federation_routes._PUBLIC_PEERS_CACHE["ts"] = 0.0
app = _mk_app()
client = TestClient(app)
r = client.get("/federation/peers/public")
assert r.status_code == 200
body = r.json()
domains = {p["domain"] for p in body}
assert "ok.example" in domains
assert "rude.example" not in domains
assert "new.example" not in domains
# ---------- pulse integration ------------------------------------------------
def test_discovery_seeds_roundtrip(fresh_db):
assert pulse.get_discovery_seeds() == []
pulse.set_discovery_seeds(["a.example", "b.example", "a.example", "", " "])
# dedupe + strip blanks
assert pulse.get_discovery_seeds() == ["a.example", "b.example"]
def test_peer_pull_pipeline_no_seeds(fresh_db, fed_dir, monkeypatch):
# peer-pull runner returns a clean message when nothing's configured.
outcome, result = pulse.run_now("peer-pull")
assert outcome == "ok"
assert "no seeds" in result
def test_peer_pull_pipeline_with_seeds(fresh_db, fed_dir, monkeypatch):
pulse.set_discovery_seeds(["good.example"])
catalog = {"good.example": "e" * 32}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
outcome, result = pulse.run_now("peer-pull")
assert outcome == "ok"
assert "discovered 1" in result
# And it was recorded.
row = db.get_peer("good.example")
assert row is not None
assert row["status"] == "unknown"