335 lines
12 KiB
Python
335 lines
12 KiB
Python
"""Network view — local + transitive + public payload tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Dict
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
from psyc import db
|
|
from psyc.lines import federation, network_view
|
|
from psyc.lines.network_view import (
|
|
NetworkEdge,
|
|
NetworkNode,
|
|
NetworkView,
|
|
build_local_view,
|
|
build_public_view,
|
|
build_transitive_view,
|
|
)
|
|
|
|
|
|
# ---------- fixtures ----------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(eng, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", eng)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
@pytest.fixture
|
|
def fed_dir(tmp_path, monkeypatch):
|
|
d = tmp_path / "federation"
|
|
monkeypatch.setattr(federation, "FED_DIR", d)
|
|
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
|
|
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
|
|
yield d
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_transitive_cache(monkeypatch):
|
|
"""Prevent cache bleed between tests."""
|
|
monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None})
|
|
yield
|
|
|
|
|
|
def _make_peer_pubkey() -> tuple[str, str]:
|
|
"""Return (fingerprint, pubkey_pem) for a synthetic peer keypair."""
|
|
import hashlib
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
priv = ed25519.Ed25519PrivateKey.generate()
|
|
pub = priv.public_key()
|
|
pem = pub.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
).decode("ascii")
|
|
raw = pub.public_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PublicFormat.Raw,
|
|
)
|
|
fp = hashlib.sha256(raw).digest()[:16].hex()
|
|
return fp, pem
|
|
|
|
|
|
# ---------- local view --------------------------------------------------
|
|
|
|
def test_local_view_empty_registry_yields_only_self(fresh_db, fed_dir):
|
|
view = build_local_view()
|
|
assert isinstance(view, NetworkView)
|
|
assert len(view.nodes) == 1
|
|
self_node = view.nodes[0]
|
|
assert self_node.is_self is True
|
|
assert self_node.distance == 0
|
|
assert self_node.status == "self"
|
|
assert self_node.fingerprint == federation.node_fingerprint()
|
|
assert view.edges == []
|
|
assert view.stats["total_peers"] == 0
|
|
assert view.stats["vouched_peers"] == 0
|
|
assert view.stats["signals_buffered_24h"] == 0
|
|
|
|
|
|
def test_local_view_one_trusted_peer_no_edges(fresh_db, fed_dir):
|
|
peer_fp, peer_pem = _make_peer_pubkey()
|
|
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
view = build_local_view()
|
|
assert len(view.nodes) == 2
|
|
peer_node = next(n for n in view.nodes if not n.is_self)
|
|
assert peer_node.fingerprint == peer_fp
|
|
assert peer_node.status == "trusted"
|
|
assert peer_node.distance == 1
|
|
assert peer_node.domain == "peer.example"
|
|
assert view.edges == []
|
|
assert view.stats["total_peers"] == 1
|
|
assert view.stats["vouched_peers"] == 1
|
|
|
|
|
|
def test_local_view_outbound_vouch_creates_edge(fresh_db, fed_dir):
|
|
peer_fp, peer_pem = _make_peer_pubkey()
|
|
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
federation.issue_vouch(peer_fp, ttl_days=30)
|
|
view = build_local_view()
|
|
vouch_edges = [e for e in view.edges if e.kind == "vouch"]
|
|
assert len(vouch_edges) == 1
|
|
e = vouch_edges[0]
|
|
assert e.source_fingerprint == federation.node_fingerprint()
|
|
assert e.target_fingerprint == peer_fp
|
|
assert view.stats["vouches_issued"] == 1
|
|
|
|
|
|
def test_local_view_inbound_vouch_creates_edge(fresh_db, fed_dir):
|
|
"""Vouches received that name us as target → peer → self edge."""
|
|
peer_fp, peer_pem = _make_peer_pubkey()
|
|
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
# Insert a vouch where peer vouches FOR us, bypassing accept_vouch (which
|
|
# we don't need to exercise here — the question is render shape).
|
|
our_fp = federation.node_fingerprint()
|
|
now = datetime.now(timezone.utc)
|
|
db.upsert_vouch(dict(
|
|
voucher_fingerprint=peer_fp,
|
|
target_fingerprint=our_fp,
|
|
issued_at=now.isoformat(),
|
|
expires_at=(now + timedelta(days=30)).isoformat(),
|
|
signature="x" * 88,
|
|
))
|
|
view = build_local_view()
|
|
vouch_edges = [e for e in view.edges if e.kind == "vouch"]
|
|
assert len(vouch_edges) == 1
|
|
e = vouch_edges[0]
|
|
assert e.source_fingerprint == peer_fp
|
|
assert e.target_fingerprint == our_fp
|
|
|
|
|
|
def test_local_view_bidirectional_vouches_collapse(fresh_db, fed_dir):
|
|
peer_fp, peer_pem = _make_peer_pubkey()
|
|
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
federation.issue_vouch(peer_fp, ttl_days=30)
|
|
# And peer vouches back at us.
|
|
our_fp = federation.node_fingerprint()
|
|
now = datetime.now(timezone.utc)
|
|
db.upsert_vouch(dict(
|
|
voucher_fingerprint=peer_fp,
|
|
target_fingerprint=our_fp,
|
|
issued_at=now.isoformat(),
|
|
expires_at=(now + timedelta(days=30)).isoformat(),
|
|
signature="x" * 88,
|
|
))
|
|
view = build_local_view()
|
|
vouch_edges = [e for e in view.edges if e.kind == "vouch"]
|
|
assert len(vouch_edges) == 1
|
|
assert vouch_edges[0].bidirectional is True
|
|
|
|
|
|
def test_local_view_signal_edge_weight_is_24h_count(fresh_db, fed_dir):
|
|
peer_fp, peer_pem = _make_peer_pubkey()
|
|
federation.register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
# Three signals from this peer within the window.
|
|
for i in range(3):
|
|
db.record_signal(dict(
|
|
peer_fingerprint=peer_fp,
|
|
signal_type="ioc",
|
|
signal_id=f"1.2.3.{i}",
|
|
signal_hash=f"hash-{i}",
|
|
received_at=now_iso,
|
|
raw_json="{}",
|
|
))
|
|
# One stale signal outside the window — must be ignored.
|
|
stale = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
|
|
db.record_signal(dict(
|
|
peer_fingerprint=peer_fp,
|
|
signal_type="ioc",
|
|
signal_id="9.9.9.9",
|
|
signal_hash="stale",
|
|
received_at=stale,
|
|
raw_json="{}",
|
|
))
|
|
view = build_local_view()
|
|
sig_edges = [e for e in view.edges if e.kind == "signal"]
|
|
assert len(sig_edges) == 1
|
|
assert sig_edges[0].weight == 3.0
|
|
assert sig_edges[0].source_fingerprint == peer_fp
|
|
assert sig_edges[0].target_fingerprint == federation.node_fingerprint()
|
|
assert view.stats["signals_buffered_24h"] == 3
|
|
assert view.stats["distinct_signal_hashes_24h"] == 3
|
|
|
|
|
|
def test_local_view_blocked_peer_renders_with_blocked_status(fresh_db, fed_dir):
|
|
fp, pem = _make_peer_pubkey()
|
|
federation.register_peer("blocked.example", fp, pem, status="blocked")
|
|
view = build_local_view()
|
|
peer = next(n for n in view.nodes if not n.is_self)
|
|
assert peer.status == "blocked"
|
|
|
|
|
|
# ---------- public view + signature round-trip --------------------------
|
|
|
|
def test_public_view_excludes_unknown_and_blocked(fresh_db, fed_dir):
|
|
fp_t, pem_t = _make_peer_pubkey()
|
|
fp_u, pem_u = _make_peer_pubkey()
|
|
fp_b, pem_b = _make_peer_pubkey()
|
|
federation.register_peer("trusted.example", fp_t, pem_t, status="trusted")
|
|
federation.register_peer("unknown.example", fp_u, pem_u, status="unknown")
|
|
federation.register_peer("blocked.example", fp_b, pem_b, status="blocked")
|
|
|
|
payload = build_public_view()
|
|
fps = {p["fingerprint"] for p in payload["peers"]}
|
|
assert fp_t in fps
|
|
assert fp_u not in fps
|
|
assert fp_b not in fps
|
|
|
|
|
|
def test_public_view_signature_round_trip(fresh_db, fed_dir):
|
|
fp, pem = _make_peer_pubkey()
|
|
federation.register_peer("trusted.example", fp, pem, status="trusted")
|
|
federation.issue_vouch(fp, ttl_days=30)
|
|
payload = build_public_view()
|
|
|
|
assert "signature" in payload
|
|
assert payload["fingerprint"] == federation.node_fingerprint()
|
|
|
|
sig = base64.b64decode(payload["signature"])
|
|
unsigned = {k: v for k, v in payload.items() if k != "signature"}
|
|
assert federation.verify_payload(
|
|
federation.canonical_json(unsigned),
|
|
sig,
|
|
federation.public_key_pem(),
|
|
) is True
|
|
|
|
# Vouch we issued is in the payload.
|
|
targets = {v["target_fingerprint"] for v in payload["vouches"]}
|
|
assert fp in targets
|
|
|
|
|
|
def test_public_view_omits_signals(fresh_db, fed_dir):
|
|
"""Public payload must not leak who's reporting what."""
|
|
fp, pem = _make_peer_pubkey()
|
|
federation.register_peer("trusted.example", fp, pem, status="trusted")
|
|
db.record_signal(dict(
|
|
peer_fingerprint=fp,
|
|
signal_type="ioc",
|
|
signal_id="1.2.3.4",
|
|
signal_hash="secret-hash",
|
|
received_at=datetime.now(timezone.utc).isoformat(),
|
|
raw_json="{}",
|
|
))
|
|
payload = build_public_view()
|
|
# No signal-shaped fields anywhere in the payload.
|
|
flat = str(payload)
|
|
assert "secret-hash" not in flat
|
|
assert "signals" not in payload
|
|
|
|
|
|
# ---------- transitive view ---------------------------------------------
|
|
|
|
def test_transitive_view_adds_distance_2_nodes(fresh_db, fed_dir):
|
|
direct_fp, direct_pem = _make_peer_pubkey()
|
|
federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted")
|
|
# The peer reports two peers of its own.
|
|
far_fp_a, _ = _make_peer_pubkey()
|
|
far_fp_b, _ = _make_peer_pubkey()
|
|
fake_payload: Dict[str, Any] = {
|
|
"fingerprint": direct_fp,
|
|
"peers": [
|
|
{"fingerprint": far_fp_a, "domain": "far-a.example"},
|
|
{"fingerprint": far_fp_b, "domain": "far-b.example"},
|
|
],
|
|
"vouches": [],
|
|
}
|
|
with patch.object(network_view, "_fetch_peer_network", return_value=fake_payload):
|
|
view = build_transitive_view(force_refresh=True)
|
|
|
|
distances = sorted(n.distance for n in view.nodes)
|
|
assert 0 in distances and 1 in distances and 2 in distances
|
|
transitive_fps = {n.fingerprint for n in view.nodes if n.distance == 2}
|
|
assert far_fp_a in transitive_fps
|
|
assert far_fp_b in transitive_fps
|
|
# "knows" edges from direct peer to each transitive.
|
|
knows = [e for e in view.edges if e.kind == "knows"]
|
|
assert len(knows) == 2
|
|
assert all(e.source_fingerprint == direct_fp for e in knows)
|
|
assert view.stats["transitive_nodes"] == 2
|
|
|
|
|
|
def test_transitive_view_failed_fetch_does_not_abort(fresh_db, fed_dir):
|
|
fp_a, pem_a = _make_peer_pubkey()
|
|
fp_b, pem_b = _make_peer_pubkey()
|
|
federation.register_peer("peer-a.example", fp_a, pem_a, status="trusted")
|
|
federation.register_peer("peer-b.example", fp_b, pem_b, status="trusted")
|
|
|
|
far_fp, _ = _make_peer_pubkey()
|
|
|
|
def fake_fetch(domain, timeout=4.0):
|
|
if domain == "peer-a.example":
|
|
return None # simulate a fetch failure
|
|
return {
|
|
"fingerprint": fp_b,
|
|
"peers": [{"fingerprint": far_fp, "domain": "far.example"}],
|
|
"vouches": [],
|
|
}
|
|
|
|
with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch):
|
|
view = build_transitive_view(force_refresh=True)
|
|
# Direct nodes both present, transitive only from B.
|
|
assert any(n.fingerprint == fp_a for n in view.nodes)
|
|
assert any(n.fingerprint == fp_b for n in view.nodes)
|
|
assert any(n.fingerprint == far_fp and n.distance == 2 for n in view.nodes)
|
|
assert view.stats["transitive_nodes"] == 1
|
|
|
|
|
|
def test_transitive_view_skips_only_unknown_peers(fresh_db, fed_dir):
|
|
"""Unknown peers shouldn't be queried — fetcher only called for trusted."""
|
|
fp_unknown, pem_u = _make_peer_pubkey()
|
|
fp_trusted, pem_t = _make_peer_pubkey()
|
|
federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown")
|
|
federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted")
|
|
|
|
calls = []
|
|
|
|
def fake_fetch(domain, timeout=4.0):
|
|
calls.append(domain)
|
|
return None
|
|
|
|
with patch.object(network_view, "_fetch_peer_network", side_effect=fake_fetch):
|
|
build_transitive_view(force_refresh=True)
|
|
|
|
assert "trusted.example" in calls
|
|
assert "unknown.example" not in calls
|