diff --git a/tests/test_network_view.py b/tests/test_network_view.py new file mode 100644 index 0000000..152da47 --- /dev/null +++ b/tests/test_network_view.py @@ -0,0 +1,334 @@ +"""Network view — local + transitive + public payload tests.""" + +from __future__ import annotations + +import base64 +from datetime import datetime, timedelta, timezone +from typing import Any, Dict +from unittest.mock import patch + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import federation, network_view +from psyc.lines.network_view import ( + NetworkEdge, + NetworkNode, + NetworkView, + build_local_view, + build_public_view, + build_transitive_view, +) + + +# ---------- fixtures ---------------------------------------------------- + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +@pytest.fixture(autouse=True) +def reset_transitive_cache(monkeypatch): + """Prevent cache bleed between tests.""" + monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None}) + yield + + +def _make_peer_pubkey() -> tuple[str, str]: + """Return (fingerprint, pubkey_pem) for a synthetic peer keypair.""" + import hashlib + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + priv = ed25519.Ed25519PrivateKey.generate() + pub = priv.public_key() + pem = pub.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + raw = pub.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + fp = hashlib.sha256(raw).digest()[:16].hex() + return fp, pem + + +# ---------- local view -------------------------------------------------- + +def test_local_view_empty_registry_yields_only_self(fresh_db, fed_dir): + view = build_local_view() + assert isinstance(view, NetworkView) + assert len(view.nodes) == 1 + self_node = view.nodes[0] + assert self_node.is_self is True + assert self_node.distance == 0 + assert self_node.status == "self" + assert self_node.fingerprint == federation.node_fingerprint() + assert view.edges == [] + assert view.stats["total_peers"] == 0 + assert view.stats["vouched_peers"] == 0 + assert view.stats["signals_buffered_24h"] == 0 + + +def test_local_view_one_trusted_peer_no_edges(fresh_db, fed_dir): + peer_fp, peer_pem = _make_peer_pubkey() + federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") + view = build_local_view() + assert len(view.nodes) == 2 + peer_node = next(n for n in view.nodes if not n.is_self) + assert peer_node.fingerprint == peer_fp + assert peer_node.status == "trusted" + assert peer_node.distance == 1 + assert peer_node.domain == "peer.example" + assert view.edges == [] + assert view.stats["total_peers"] == 1 + assert view.stats["vouched_peers"] == 1 + + +def test_local_view_outbound_vouch_creates_edge(fresh_db, fed_dir): + peer_fp, peer_pem = _make_peer_pubkey() + federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") + federation.issue_vouch(peer_fp, ttl_days=30) + view = build_local_view() + vouch_edges = [e for e in view.edges if e.kind == "vouch"] + assert len(vouch_edges) == 1 + e = vouch_edges[0] + assert e.source_fingerprint == federation.node_fingerprint() + assert e.target_fingerprint == peer_fp + assert view.stats["vouches_issued"] == 1 + + +def test_local_view_inbound_vouch_creates_edge(fresh_db, fed_dir): + """Vouches received that name us as target → peer → self edge.""" + peer_fp, peer_pem = _make_peer_pubkey() + federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") + # Insert a vouch where peer vouches FOR us, bypassing accept_vouch (which + # we don't need to exercise here — the question is render shape). + our_fp = federation.node_fingerprint() + now = datetime.now(timezone.utc) + db.upsert_vouch(dict( + voucher_fingerprint=peer_fp, + target_fingerprint=our_fp, + issued_at=now.isoformat(), + expires_at=(now + timedelta(days=30)).isoformat(), + signature="x" * 88, + )) + view = build_local_view() + vouch_edges = [e for e in view.edges if e.kind == "vouch"] + assert len(vouch_edges) == 1 + e = vouch_edges[0] + assert e.source_fingerprint == peer_fp + assert e.target_fingerprint == our_fp + + +def test_local_view_bidirectional_vouches_collapse(fresh_db, fed_dir): + peer_fp, peer_pem = _make_peer_pubkey() + federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") + federation.issue_vouch(peer_fp, ttl_days=30) + # And peer vouches back at us. + our_fp = federation.node_fingerprint() + now = datetime.now(timezone.utc) + db.upsert_vouch(dict( + voucher_fingerprint=peer_fp, + target_fingerprint=our_fp, + issued_at=now.isoformat(), + expires_at=(now + timedelta(days=30)).isoformat(), + signature="x" * 88, + )) + view = build_local_view() + vouch_edges = [e for e in view.edges if e.kind == "vouch"] + assert len(vouch_edges) == 1 + assert vouch_edges[0].bidirectional is True + + +def test_local_view_signal_edge_weight_is_24h_count(fresh_db, fed_dir): + peer_fp, peer_pem = _make_peer_pubkey() + federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + # Three signals from this peer within the window. + for i in range(3): + db.record_signal(dict( + peer_fingerprint=peer_fp, + signal_type="ioc", + signal_id=f"1.2.3.{i}", + signal_hash=f"hash-{i}", + received_at=now_iso, + raw_json="{}", + )) + # One stale signal outside the window — must be ignored. + stale = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat() + db.record_signal(dict( + peer_fingerprint=peer_fp, + signal_type="ioc", + signal_id="9.9.9.9", + signal_hash="stale", + received_at=stale, + raw_json="{}", + )) + view = build_local_view() + sig_edges = [e for e in view.edges if e.kind == "signal"] + assert len(sig_edges) == 1 + assert sig_edges[0].weight == 3.0 + assert sig_edges[0].source_fingerprint == peer_fp + assert sig_edges[0].target_fingerprint == federation.node_fingerprint() + assert view.stats["signals_buffered_24h"] == 3 + assert view.stats["distinct_signal_hashes_24h"] == 3 + + +def test_local_view_blocked_peer_renders_with_blocked_status(fresh_db, fed_dir): + fp, pem = _make_peer_pubkey() + federation.register_peer("blocked.example", fp, pem, status="blocked") + view = build_local_view() + peer = next(n for n in view.nodes if not n.is_self) + assert peer.status == "blocked" + + +# ---------- public view + signature round-trip -------------------------- + +def test_public_view_excludes_unknown_and_blocked(fresh_db, fed_dir): + fp_t, pem_t = _make_peer_pubkey() + fp_u, pem_u = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("trusted.example", fp_t, pem_t, status="trusted") + federation.register_peer("unknown.example", fp_u, pem_u, status="unknown") + federation.register_peer("blocked.example", fp_b, pem_b, status="blocked") + + payload = build_public_view() + fps = {p["fingerprint"] for p in payload["peers"]} + assert fp_t in fps + assert fp_u not in fps + assert fp_b not in fps + + +def test_public_view_signature_round_trip(fresh_db, fed_dir): + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + federation.issue_vouch(fp, ttl_days=30) + payload = build_public_view() + + assert "signature" in payload + assert payload["fingerprint"] == federation.node_fingerprint() + + sig = base64.b64decode(payload["signature"]) + unsigned = {k: v for k, v in payload.items() if k != "signature"} + assert federation.verify_payload( + federation.canonical_json(unsigned), + sig, + federation.public_key_pem(), + ) is True + + # Vouch we issued is in the payload. + targets = {v["target_fingerprint"] for v in payload["vouches"]} + assert fp in targets + + +def test_public_view_omits_signals(fresh_db, fed_dir): + """Public payload must not leak who's reporting what.""" + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="secret-hash", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + payload = build_public_view() + # No signal-shaped fields anywhere in the payload. + flat = str(payload) + assert "secret-hash" not in flat + assert "signals" not in payload + + +# ---------- transitive view --------------------------------------------- + +def test_transitive_view_adds_distance_2_nodes(fresh_db, fed_dir): + direct_fp, direct_pem = _make_peer_pubkey() + federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") + # The peer reports two peers of its own. + far_fp_a, _ = _make_peer_pubkey() + far_fp_b, _ = _make_peer_pubkey() + fake_payload: Dict[str, Any] = { + "fingerprint": direct_fp, + "peers": [ + {"fingerprint": far_fp_a, "domain": "far-a.example"}, + {"fingerprint": far_fp_b, "domain": "far-b.example"}, + ], + "vouches": [], + } + with patch.object(network_view, "_fetch_peer_network", return_value=fake_payload): + view = build_transitive_view(force_refresh=True) + + distances = sorted(n.distance for n in view.nodes) + assert 0 in distances and 1 in distances and 2 in distances + transitive_fps = {n.fingerprint for n in view.nodes if n.distance == 2} + assert far_fp_a in transitive_fps + assert far_fp_b in transitive_fps + # "knows" edges from direct peer to each transitive. + knows = [e for e in view.edges if e.kind == "knows"] + assert len(knows) == 2 + assert all(e.source_fingerprint == direct_fp for e in knows) + assert view.stats["transitive_nodes"] == 2 + + +def test_transitive_view_failed_fetch_does_not_abort(fresh_db, fed_dir): + fp_a, pem_a = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("peer-a.example", fp_a, pem_a, status="trusted") + federation.register_peer("peer-b.example", fp_b, pem_b, status="trusted") + + far_fp, _ = _make_peer_pubkey() + + def fake_fetch(domain, timeout=4.0): + if domain == "peer-a.example": + return None # simulate a fetch failure + return { + "fingerprint": fp_b, + "peers": [{"fingerprint": far_fp, "domain": "far.example"}], + "vouches": [], + } + + with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch): + view = build_transitive_view(force_refresh=True) + # Direct nodes both present, transitive only from B. + assert any(n.fingerprint == fp_a for n in view.nodes) + assert any(n.fingerprint == fp_b for n in view.nodes) + assert any(n.fingerprint == far_fp and n.distance == 2 for n in view.nodes) + assert view.stats["transitive_nodes"] == 1 + + +def test_transitive_view_skips_only_unknown_peers(fresh_db, fed_dir): + """Unknown peers shouldn't be queried — fetcher only called for trusted.""" + fp_unknown, pem_u = _make_peer_pubkey() + fp_trusted, pem_t = _make_peer_pubkey() + federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown") + federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted") + + calls = [] + + def fake_fetch(domain, timeout=4.0): + calls.append(domain) + return None + + with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch): + build_transitive_view(force_refresh=True) + + assert "trusted.example" in calls + assert "unknown.example" not in calls