stage-disc-e discovery: tests

This commit is contained in:
m17hr1l
2026-06-06 21:08:15 +02:00
parent 9b49f768ca
commit ff88aba569

376
tests/test_discovery.py Normal file
View File

@@ -0,0 +1,376 @@
"""Discovery — DNS-SD parse + resolver, BFS walker, persistence, public endpoint."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock, patch
import dns.exception
import dns.resolver
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from starlette.middleware.sessions import SessionMiddleware
from psyc import db
from psyc.cockpit import federation_routes
from psyc.lines import discovery, federation, pulse
from psyc.lines.discovery import (
PeerCandidate,
_parse_txt_value,
fetch_peer_info,
fetch_public_peers,
public_peer_attestation,
record_candidate,
resolve_psyc,
walk,
)
from psyc.result import Err, Ok
# ---------- fixtures ---------------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def _mk_srv(port: int = 443) -> Any:
rd = MagicMock()
rd.port = port
return rd
def _mk_txt(value: str) -> Any:
rd = MagicMock()
rd.strings = [value.encode("utf-8")]
return rd
# ---------- TXT parser -------------------------------------------------------
def test_parse_txt_valid():
txt = "v=psyc1 fp=" + "a" * 32 + " alg=ed25519 path=/federation/feed"
res = _parse_txt_value(txt)
assert isinstance(res, Ok)
assert res.value["fp"] == "a" * 32
assert res.value["alg"] == "ed25519"
def test_parse_txt_tolerates_token_order():
txt = "path=/federation/feed alg=ed25519 fp=" + "f" * 32 + " v=psyc1"
res = _parse_txt_value(txt)
assert isinstance(res, Ok)
def test_parse_txt_rejects_wrong_version():
txt = "v=psyc2 fp=" + "a" * 32 + " alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "version" in res.reason
def test_parse_txt_rejects_bad_fingerprint_length():
txt = "v=psyc1 fp=deadbeef alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "fingerprint" in res.reason
def test_parse_txt_rejects_non_hex_fingerprint():
txt = "v=psyc1 fp=" + "z" * 32 + " alg=ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
def test_parse_txt_rejects_malformed_token():
txt = "v=psyc1 fp=" + "a" * 32 + " alg ed25519"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
assert "malformed" in res.reason
def test_parse_txt_rejects_wrong_alg():
txt = "v=psyc1 fp=" + "a" * 32 + " alg=rsa"
res = _parse_txt_value(txt)
assert isinstance(res, Err)
# ---------- resolve_psyc -----------------------------------------------------
def test_resolve_psyc_happy_path():
fp = "1" * 32
txt = f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
def fake_resolve(self, name, rdtype):
if rdtype == "SRV":
return [_mk_srv(port=8443)]
if rdtype == "TXT":
return [_mk_txt(txt)]
raise dns.exception.DNSException("unexpected")
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example.com")
assert isinstance(res, Ok)
cand = res.value
assert cand.domain == "peer.example.com"
assert cand.fingerprint == fp
assert cand.port == 8443
assert cand.source == "dns-sd"
def test_resolve_psyc_nxdomain_returns_err():
def fake_resolve(self, name, rdtype):
raise dns.resolver.NXDOMAIN()
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("nothere.example")
assert isinstance(res, Err)
assert "NXDOMAIN" in res.reason
def test_resolve_psyc_txt_malformed_returns_err():
def fake_resolve(self, name, rdtype):
if rdtype == "SRV":
return [_mk_srv()]
return [_mk_txt("v=psyc1 fp=garbage alg=ed25519")]
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example")
assert isinstance(res, Err)
assert "TXT" in res.reason or "fingerprint" in res.reason
def test_resolve_psyc_no_answer_returns_err():
def fake_resolve(self, name, rdtype):
raise dns.resolver.NoAnswer()
with patch.object(dns.resolver.Resolver, "resolve", fake_resolve):
res = resolve_psyc("peer.example")
assert isinstance(res, Err)
assert "NoAnswer" in res.reason
# ---------- walk -------------------------------------------------------------
def _stub_resolve(catalog: Dict[str, str]):
"""Build a resolve_psyc stub that returns each domain's catalog fingerprint."""
def _stub(domain: str, timeout: float = 5.0):
if domain not in catalog:
return Err(f"no record for {domain}")
return Ok(PeerCandidate(
domain=domain,
fingerprint=catalog[domain],
port=443,
source="dns-sd",
))
return _stub
def _stub_fetch_info_ok(*args, **kwargs):
return Ok({"fingerprint": kwargs.get("expected_fingerprint", "")})
def _stub_fetch_peers_factory(graph: Dict[str, List[Dict[str, str]]]):
def _stub(domain: str, port: int = 443, timeout: float = 5.0):
return Ok(graph.get(domain, []))
return _stub
def test_walk_dedupes_by_fingerprint(fresh_db, fed_dir, monkeypatch):
# Two seeds, same fingerprint via different domains → only one survives the (domain,fp) dedupe
# but distinct domains both surface; the (domain, fp) pair just shouldn't repeat.
fp = "9" * 32
catalog = {"a.example": fp, "b.example": fp}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["a.example", "b.example", "a.example"], max_depth=1)
# both unique domains made it in; the duplicate seed didn't re-enter
assert len(out) == 2
domains = {c.domain for c in out}
assert domains == {"a.example", "b.example"}
def test_walk_respects_max_depth(fresh_db, fed_dir, monkeypatch):
catalog = {"d0.example": "0" * 32, "d1.example": "1" * 32, "d2.example": "2" * 32}
graph = {
"d0.example": [{"domain": "d1.example", "fingerprint": "1" * 32}],
"d1.example": [{"domain": "d2.example", "fingerprint": "2" * 32}],
}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph))
out = walk(["d0.example"], max_depth=1)
domains = {c.domain for c in out}
# depth 0: d0; depth 1: d1; depth 2 (d2) is excluded by max_depth=1
assert "d0.example" in domains and "d1.example" in domains
assert "d2.example" not in domains
def test_walk_respects_max_peers(fresh_db, fed_dir, monkeypatch):
catalog = {f"d{i}.example": f"{i:032x}" for i in range(10)}
graph = {"d0.example": [{"domain": f"d{i}.example", "fingerprint": f"{i:032x}"} for i in range(1, 10)]}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph))
out = walk(["d0.example"], max_depth=2, max_peers=3)
assert len(out) <= 3
def test_walk_skips_own_fingerprint(fresh_db, fed_dir, monkeypatch):
own = federation.node_fingerprint()
catalog = {"self.example": own, "peer.example": "f" * 32}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["self.example", "peer.example"], max_depth=1)
domains = {c.domain for c in out}
assert "self.example" not in domains
assert "peer.example" in domains
def test_walk_one_failure_does_not_abort(fresh_db, fed_dir, monkeypatch):
catalog = {"good.example": "a" * 32} # bad.example is absent → Err on resolve
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
out = walk(["bad.example", "good.example"], max_depth=1)
assert len(out) == 1
assert out[0].domain == "good.example"
# ---------- record_candidate -------------------------------------------------
def test_record_candidate_inserts_as_unknown(fresh_db):
c = PeerCandidate(domain="new.example", fingerprint="a" * 32, source="dns-sd")
record_candidate(c)
row = db.get_peer("new.example")
assert row is not None
assert row["status"] == "unknown"
assert row["fingerprint"] == "a" * 32
def test_record_candidate_preserves_trusted(fresh_db, fed_dir):
federation.register_peer("vip.example", "b" * 32, "PEM", status="trusted")
# walker re-discovers it
c = PeerCandidate(domain="vip.example", fingerprint="b" * 32, source="peer-walk:other.example")
record_candidate(c)
row = db.get_peer("vip.example")
assert row["status"] == "trusted"
def test_record_candidate_preserves_blocked(fresh_db, fed_dir):
federation.register_peer("bad.example", "c" * 32, "PEM", status="blocked")
c = PeerCandidate(domain="bad.example", fingerprint="c" * 32, source="dns-sd")
record_candidate(c)
row = db.get_peer("bad.example")
assert row["status"] == "blocked"
def test_record_candidate_updates_last_seen(fresh_db):
c = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd")
record_candidate(c)
first = db.get_peer("repeat.example")
# second pass — last_seen advances, discovered_at stays
c2 = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd")
record_candidate(c2)
second = db.get_peer("repeat.example")
assert second["discovered_at"] == first["discovered_at"]
# ---------- public attestation -----------------------------------------------
def test_public_peer_attestation_only_trusted(fresh_db, fed_dir):
federation.register_peer("trusted.example", "1" * 32, "PEM", status="trusted")
federation.register_peer("unknown.example", "2" * 32, "PEM", status="unknown")
federation.register_peer("blocked.example", "3" * 32, "PEM", status="blocked")
out = public_peer_attestation()
domains = {p["domain"] for p in out}
assert domains == {"trusted.example"}
def test_public_peer_attestation_payload_shape(fresh_db, fed_dir):
federation.register_peer("t.example", "f" * 32, "PEM", status="trusted")
out = public_peer_attestation()
assert len(out) == 1
entry = out[0]
assert set(entry.keys()) == {"domain", "fingerprint", "first_seen"}
# ---------- public endpoint via TestClient -----------------------------------
def _mk_app() -> FastAPI:
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="test-secret")
# Templates aren't exercised by the public endpoints we care about here.
from fastapi.templating import Jinja2Templates
import tempfile, os
tdir = tempfile.mkdtemp()
templates = Jinja2Templates(directory=tdir)
federation_routes.register(app, templates)
return app
def test_public_peers_endpoint_excludes_unknown_blocked(fresh_db, fed_dir):
federation.register_peer("ok.example", "a" * 32, "PEM", status="trusted")
federation.register_peer("rude.example", "b" * 32, "PEM", status="blocked")
federation.register_peer("new.example", "c" * 32, "PEM", status="unknown")
# Flush in-memory cache from any earlier test.
federation_routes._PUBLIC_PEERS_CACHE["payload"] = None
federation_routes._PUBLIC_PEERS_CACHE["ts"] = 0.0
app = _mk_app()
client = TestClient(app)
r = client.get("/federation/peers/public")
assert r.status_code == 200
body = r.json()
domains = {p["domain"] for p in body}
assert "ok.example" in domains
assert "rude.example" not in domains
assert "new.example" not in domains
# ---------- pulse integration ------------------------------------------------
def test_discovery_seeds_roundtrip(fresh_db):
assert pulse.get_discovery_seeds() == []
pulse.set_discovery_seeds(["a.example", "b.example", "a.example", "", " "])
# dedupe + strip blanks
assert pulse.get_discovery_seeds() == ["a.example", "b.example"]
def test_peer_pull_pipeline_no_seeds(fresh_db, fed_dir, monkeypatch):
# peer-pull runner returns a clean message when nothing's configured.
outcome, result = pulse.run_now("peer-pull")
assert outcome == "ok"
assert "no seeds" in result
def test_peer_pull_pipeline_with_seeds(fresh_db, fed_dir, monkeypatch):
pulse.set_discovery_seeds(["good.example"])
catalog = {"good.example": "e" * 32}
monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog))
monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok)
monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({}))
outcome, result = pulse.run_now("peer-pull")
assert outcome == "ok"
assert "discovered 1" in result
# And it was recorded.
row = db.get_peer("good.example")
assert row is not None
assert row["status"] == "unknown"