From ff88aba56975869e30f64e1e2c2c777cebdcc714 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:08:15 +0200 Subject: [PATCH] stage-disc-e discovery: tests --- tests/test_discovery.py | 376 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 tests/test_discovery.py diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..46275cf --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,376 @@ +"""Discovery — DNS-SD parse + resolver, BFS walker, persistence, public endpoint.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from unittest.mock import MagicMock, patch + +import dns.exception +import dns.resolver +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from starlette.middleware.sessions import SessionMiddleware + +from psyc import db +from psyc.cockpit import federation_routes +from psyc.lines import discovery, federation, pulse +from psyc.lines.discovery import ( + PeerCandidate, + _parse_txt_value, + fetch_peer_info, + fetch_public_peers, + public_peer_attestation, + record_candidate, + resolve_psyc, + walk, +) +from psyc.result import Err, Ok + + +# ---------- fixtures --------------------------------------------------------- + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +def _mk_srv(port: int = 443) -> Any: + rd = MagicMock() + rd.port = port + return rd + + +def _mk_txt(value: str) -> Any: + rd = MagicMock() + rd.strings = [value.encode("utf-8")] + return rd + + +# ---------- TXT parser ------------------------------------------------------- + +def test_parse_txt_valid(): + txt = "v=psyc1 fp=" + "a" * 32 + " alg=ed25519 path=/federation/feed" + res = _parse_txt_value(txt) + assert isinstance(res, Ok) + assert res.value["fp"] == "a" * 32 + assert res.value["alg"] == "ed25519" + + +def test_parse_txt_tolerates_token_order(): + txt = "path=/federation/feed alg=ed25519 fp=" + "f" * 32 + " v=psyc1" + res = _parse_txt_value(txt) + assert isinstance(res, Ok) + + +def test_parse_txt_rejects_wrong_version(): + txt = "v=psyc2 fp=" + "a" * 32 + " alg=ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + assert "version" in res.reason + + +def test_parse_txt_rejects_bad_fingerprint_length(): + txt = "v=psyc1 fp=deadbeef alg=ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + assert "fingerprint" in res.reason + + +def test_parse_txt_rejects_non_hex_fingerprint(): + txt = "v=psyc1 fp=" + "z" * 32 + " alg=ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + + +def test_parse_txt_rejects_malformed_token(): + txt = "v=psyc1 fp=" + "a" * 32 + " alg ed25519" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + assert "malformed" in res.reason + + +def test_parse_txt_rejects_wrong_alg(): + txt = "v=psyc1 fp=" + "a" * 32 + " alg=rsa" + res = _parse_txt_value(txt) + assert isinstance(res, Err) + + +# ---------- resolve_psyc ----------------------------------------------------- + +def test_resolve_psyc_happy_path(): + fp = "1" * 32 + txt = f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed" + + def fake_resolve(self, name, rdtype): + if rdtype == "SRV": + return [_mk_srv(port=8443)] + if rdtype == "TXT": + return [_mk_txt(txt)] + raise dns.exception.DNSException("unexpected") + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("peer.example.com") + assert isinstance(res, Ok) + cand = res.value + assert cand.domain == "peer.example.com" + assert cand.fingerprint == fp + assert cand.port == 8443 + assert cand.source == "dns-sd" + + +def test_resolve_psyc_nxdomain_returns_err(): + def fake_resolve(self, name, rdtype): + raise dns.resolver.NXDOMAIN() + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("nothere.example") + assert isinstance(res, Err) + assert "NXDOMAIN" in res.reason + + +def test_resolve_psyc_txt_malformed_returns_err(): + def fake_resolve(self, name, rdtype): + if rdtype == "SRV": + return [_mk_srv()] + return [_mk_txt("v=psyc1 fp=garbage alg=ed25519")] + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("peer.example") + assert isinstance(res, Err) + assert "TXT" in res.reason or "fingerprint" in res.reason + + +def test_resolve_psyc_no_answer_returns_err(): + def fake_resolve(self, name, rdtype): + raise dns.resolver.NoAnswer() + + with patch.object(dns.resolver.Resolver, "resolve", fake_resolve): + res = resolve_psyc("peer.example") + assert isinstance(res, Err) + assert "NoAnswer" in res.reason + + +# ---------- walk ------------------------------------------------------------- + +def _stub_resolve(catalog: Dict[str, str]): + """Build a resolve_psyc stub that returns each domain's catalog fingerprint.""" + def _stub(domain: str, timeout: float = 5.0): + if domain not in catalog: + return Err(f"no record for {domain}") + return Ok(PeerCandidate( + domain=domain, + fingerprint=catalog[domain], + port=443, + source="dns-sd", + )) + return _stub + + +def _stub_fetch_info_ok(*args, **kwargs): + return Ok({"fingerprint": kwargs.get("expected_fingerprint", "")}) + + +def _stub_fetch_peers_factory(graph: Dict[str, List[Dict[str, str]]]): + def _stub(domain: str, port: int = 443, timeout: float = 5.0): + return Ok(graph.get(domain, [])) + return _stub + + +def test_walk_dedupes_by_fingerprint(fresh_db, fed_dir, monkeypatch): + # Two seeds, same fingerprint via different domains → only one survives the (domain,fp) dedupe + # but distinct domains both surface; the (domain, fp) pair just shouldn't repeat. + fp = "9" * 32 + catalog = {"a.example": fp, "b.example": fp} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + out = walk(["a.example", "b.example", "a.example"], max_depth=1) + # both unique domains made it in; the duplicate seed didn't re-enter + assert len(out) == 2 + domains = {c.domain for c in out} + assert domains == {"a.example", "b.example"} + + +def test_walk_respects_max_depth(fresh_db, fed_dir, monkeypatch): + catalog = {"d0.example": "0" * 32, "d1.example": "1" * 32, "d2.example": "2" * 32} + graph = { + "d0.example": [{"domain": "d1.example", "fingerprint": "1" * 32}], + "d1.example": [{"domain": "d2.example", "fingerprint": "2" * 32}], + } + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph)) + out = walk(["d0.example"], max_depth=1) + domains = {c.domain for c in out} + # depth 0: d0; depth 1: d1; depth 2 (d2) is excluded by max_depth=1 + assert "d0.example" in domains and "d1.example" in domains + assert "d2.example" not in domains + + +def test_walk_respects_max_peers(fresh_db, fed_dir, monkeypatch): + catalog = {f"d{i}.example": f"{i:032x}" for i in range(10)} + graph = {"d0.example": [{"domain": f"d{i}.example", "fingerprint": f"{i:032x}"} for i in range(1, 10)]} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory(graph)) + out = walk(["d0.example"], max_depth=2, max_peers=3) + assert len(out) <= 3 + + +def test_walk_skips_own_fingerprint(fresh_db, fed_dir, monkeypatch): + own = federation.node_fingerprint() + catalog = {"self.example": own, "peer.example": "f" * 32} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + out = walk(["self.example", "peer.example"], max_depth=1) + domains = {c.domain for c in out} + assert "self.example" not in domains + assert "peer.example" in domains + + +def test_walk_one_failure_does_not_abort(fresh_db, fed_dir, monkeypatch): + catalog = {"good.example": "a" * 32} # bad.example is absent → Err on resolve + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + out = walk(["bad.example", "good.example"], max_depth=1) + assert len(out) == 1 + assert out[0].domain == "good.example" + + +# ---------- record_candidate ------------------------------------------------- + +def test_record_candidate_inserts_as_unknown(fresh_db): + c = PeerCandidate(domain="new.example", fingerprint="a" * 32, source="dns-sd") + record_candidate(c) + row = db.get_peer("new.example") + assert row is not None + assert row["status"] == "unknown" + assert row["fingerprint"] == "a" * 32 + + +def test_record_candidate_preserves_trusted(fresh_db, fed_dir): + federation.register_peer("vip.example", "b" * 32, "PEM", status="trusted") + # walker re-discovers it + c = PeerCandidate(domain="vip.example", fingerprint="b" * 32, source="peer-walk:other.example") + record_candidate(c) + row = db.get_peer("vip.example") + assert row["status"] == "trusted" + + +def test_record_candidate_preserves_blocked(fresh_db, fed_dir): + federation.register_peer("bad.example", "c" * 32, "PEM", status="blocked") + c = PeerCandidate(domain="bad.example", fingerprint="c" * 32, source="dns-sd") + record_candidate(c) + row = db.get_peer("bad.example") + assert row["status"] == "blocked" + + +def test_record_candidate_updates_last_seen(fresh_db): + c = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd") + record_candidate(c) + first = db.get_peer("repeat.example") + # second pass — last_seen advances, discovered_at stays + c2 = PeerCandidate(domain="repeat.example", fingerprint="d" * 32, source="dns-sd") + record_candidate(c2) + second = db.get_peer("repeat.example") + assert second["discovered_at"] == first["discovered_at"] + + +# ---------- public attestation ----------------------------------------------- + +def test_public_peer_attestation_only_trusted(fresh_db, fed_dir): + federation.register_peer("trusted.example", "1" * 32, "PEM", status="trusted") + federation.register_peer("unknown.example", "2" * 32, "PEM", status="unknown") + federation.register_peer("blocked.example", "3" * 32, "PEM", status="blocked") + out = public_peer_attestation() + domains = {p["domain"] for p in out} + assert domains == {"trusted.example"} + + +def test_public_peer_attestation_payload_shape(fresh_db, fed_dir): + federation.register_peer("t.example", "f" * 32, "PEM", status="trusted") + out = public_peer_attestation() + assert len(out) == 1 + entry = out[0] + assert set(entry.keys()) == {"domain", "fingerprint", "first_seen"} + + +# ---------- public endpoint via TestClient ----------------------------------- + +def _mk_app() -> FastAPI: + app = FastAPI() + app.add_middleware(SessionMiddleware, secret_key="test-secret") + # Templates aren't exercised by the public endpoints we care about here. + from fastapi.templating import Jinja2Templates + import tempfile, os + tdir = tempfile.mkdtemp() + templates = Jinja2Templates(directory=tdir) + federation_routes.register(app, templates) + return app + + +def test_public_peers_endpoint_excludes_unknown_blocked(fresh_db, fed_dir): + federation.register_peer("ok.example", "a" * 32, "PEM", status="trusted") + federation.register_peer("rude.example", "b" * 32, "PEM", status="blocked") + federation.register_peer("new.example", "c" * 32, "PEM", status="unknown") + # Flush in-memory cache from any earlier test. + federation_routes._PUBLIC_PEERS_CACHE["payload"] = None + federation_routes._PUBLIC_PEERS_CACHE["ts"] = 0.0 + app = _mk_app() + client = TestClient(app) + r = client.get("/federation/peers/public") + assert r.status_code == 200 + body = r.json() + domains = {p["domain"] for p in body} + assert "ok.example" in domains + assert "rude.example" not in domains + assert "new.example" not in domains + + +# ---------- pulse integration ------------------------------------------------ + +def test_discovery_seeds_roundtrip(fresh_db): + assert pulse.get_discovery_seeds() == [] + pulse.set_discovery_seeds(["a.example", "b.example", "a.example", "", " "]) + # dedupe + strip blanks + assert pulse.get_discovery_seeds() == ["a.example", "b.example"] + + +def test_peer_pull_pipeline_no_seeds(fresh_db, fed_dir, monkeypatch): + # peer-pull runner returns a clean message when nothing's configured. + outcome, result = pulse.run_now("peer-pull") + assert outcome == "ok" + assert "no seeds" in result + + +def test_peer_pull_pipeline_with_seeds(fresh_db, fed_dir, monkeypatch): + pulse.set_discovery_seeds(["good.example"]) + catalog = {"good.example": "e" * 32} + monkeypatch.setattr(discovery, "resolve_psyc", _stub_resolve(catalog)) + monkeypatch.setattr(discovery, "fetch_peer_info", _stub_fetch_info_ok) + monkeypatch.setattr(discovery, "fetch_public_peers", _stub_fetch_peers_factory({})) + outcome, result = pulse.run_now("peer-pull") + assert outcome == "ok" + assert "discovered 1" in result + # And it was recorded. + row = db.get_peer("good.example") + assert row is not None + assert row["status"] == "unknown"