diff --git a/tests/test_federation.py b/tests/test_federation.py new file mode 100644 index 0000000..3a0c599 --- /dev/null +++ b/tests/test_federation.py @@ -0,0 +1,230 @@ +"""Federation — identity, signed feed, peer registry, signal buffer.""" + +from __future__ import annotations + +import base64 +import os +import stat + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import federation +from psyc.lines.federation import ( + DNSRecord, + build_signed_feed, + canonical_json, + dns_record, + import_signed_feed, + node_fingerprint, + node_keypair, + public_key_pem, + sign_payload, + verify_payload, +) +from psyc.result import Err, Ok +from conftest import make_case + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + """Redirect federation key paths to a tmp dir so each test gets a fresh key.""" + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +def test_keypair_persisted_with_correct_perms(fed_dir): + priv, pub = node_keypair() + assert federation.PRIVATE_KEY_PATH.exists() + assert federation.PUBLIC_KEY_PATH.exists() + mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode) + assert mode == 0o600 + + +def test_keypair_idempotent_across_calls(fed_dir): + priv1, pub1 = node_keypair() + priv2, pub2 = node_keypair() + # raw bytes match — same key loaded twice, not regenerated + raw1 = pub1.public_bytes( + encoding=federation.serialization.Encoding.Raw, + format=federation.serialization.PublicFormat.Raw, + ) + raw2 = pub2.public_bytes( + encoding=federation.serialization.Encoding.Raw, + format=federation.serialization.PublicFormat.Raw, + ) + assert raw1 == raw2 + + +def test_fingerprint_is_stable_and_32_hex(fed_dir): + fp1 = node_fingerprint() + fp2 = node_fingerprint() + assert fp1 == fp2 + assert len(fp1) == 32 + assert all(c in "0123456789abcdef" for c in fp1) + + +def test_sign_verify_roundtrip(fed_dir): + payload = b"hello federation" + sig = sign_payload(payload) + assert verify_payload(payload, sig, public_key_pem()) is True + + +def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path): + payload = b"the truth" + sig = sign_payload(payload) + + # Build a *different* keypair in a separate directory and use its pubkey. + other = tmp_path / "other-federation" + other.mkdir() + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + other_priv = ed25519.Ed25519PrivateKey.generate() + other_pub_pem = other_priv.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + + assert verify_payload(payload, sig, other_pub_pem) is False + + +def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir): + sig = sign_payload(b"x") + assert verify_payload(b"x", sig, "not a pem") is False + + +def test_canonical_json_is_deterministic(): + a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}}) + b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}}) + assert a == b + + +def test_dns_record_txt_value_matches_spec(fed_dir): + rec = dns_record("example.com") + assert isinstance(rec, DNSRecord) + fp = node_fingerprint() + assert rec.srv_name == "_psyc._tcp.example.com" + assert rec.srv_target == "example.com." + assert rec.srv_port == 443 + assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed" + # human instructions include both record lines + assert "_psyc._tcp.example.com" in rec.human_instructions + assert "SRV" in rec.human_instructions + assert "TXT" in rec.human_instructions + + +def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir): + case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"]) + db.upsert_case(case) + feed = build_signed_feed(window_hours=24) + assert feed["version"] == "psyc1" + assert feed["fingerprint"] == node_fingerprint() + assert feed["signature"] + # cases entry made it in + assert any(c["case_id"] == case.case_id for c in feed["cases"]) + + # Import using our own pubkey against a *different* declared fingerprint: + # swap the fingerprint so import_signed_feed doesn't reject as a loop. + pub = public_key_pem() + # Use a fresh keypair to act as "peer" — sign a feed with that key. + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + import hashlib + peer_priv = ed25519.Ed25519PrivateKey.generate() + peer_pub_pem = peer_priv.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + peer_raw = peer_priv.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex() + feed["fingerprint"] = peer_fp + unsigned = {k: v for k, v in feed.items() if k != "signature"} + new_sig = peer_priv.sign(canonical_json(unsigned)) + feed["signature"] = base64.b64encode(new_sig).decode("ascii") + + result = import_signed_feed(feed, peer_pub_pem) + assert isinstance(result, Ok), getattr(result, "reason", "") + summary = result.value + assert summary.peer_fingerprint == peer_fp + assert summary.cases_seen >= 1 + + +def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir): + db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"])) + feed = build_signed_feed(window_hours=24) + # build a *different* pubkey to claim verification against + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + # also need to change fingerprint so the loop-check doesn't trigger first + feed["fingerprint"] = "deadbeef" * 4 + result = import_signed_feed(feed, other_pub_pem) + assert isinstance(result, Err) + + +def test_import_own_feed_returns_loop_err(fresh_db, fed_dir): + db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"])) + feed = build_signed_feed(window_hours=24) + result = import_signed_feed(feed, public_key_pem()) + assert isinstance(result, Err) + assert "loop" in result.reason + + +def test_record_signal_then_lookup_by_hash(fresh_db): + rid = db.record_signal(dict( + peer_fingerprint="abc123", + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="hash-aaa", + received_at="2026-01-01T00:00:00+00:00", + raw_json="{}", + )) + assert rid > 0 + rows = db.signals_for_hash("hash-aaa") + assert len(rows) == 1 + assert rows[0]["peer_fingerprint"] == "abc123" + # second peer reports the same hash → both surface for quorum check + db.record_signal(dict( + peer_fingerprint="def456", + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="hash-aaa", + received_at="2026-01-01T00:01:00+00:00", + raw_json="{}", + )) + rows = db.signals_for_hash("hash-aaa") + assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"} + + +def test_peer_registry_crud(fresh_db, fed_dir): + federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted") + peers = federation.list_peers() + assert len(peers) == 1 + assert peers[0].domain == "peer.example" + assert peers[0].status == "trusted" + + federation.set_peer_status("peer.example", "blocked") + assert federation.get_peer("peer.example").status == "blocked" + + federation.remove_peer("peer.example") + assert federation.list_peers() == []