Files
psyc/tests/test_federation.py
2026-06-06 16:10:31 +02:00

231 lines
8.0 KiB
Python

"""Federation — identity, signed feed, peer registry, signal buffer."""
from __future__ import annotations
import base64
import os
import stat
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation
from psyc.lines.federation import (
DNSRecord,
build_signed_feed,
canonical_json,
dns_record,
import_signed_feed,
node_fingerprint,
node_keypair,
public_key_pem,
sign_payload,
verify_payload,
)
from psyc.result import Err, Ok
from conftest import make_case
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
"""Redirect federation key paths to a tmp dir so each test gets a fresh key."""
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def test_keypair_persisted_with_correct_perms(fed_dir):
priv, pub = node_keypair()
assert federation.PRIVATE_KEY_PATH.exists()
assert federation.PUBLIC_KEY_PATH.exists()
mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode)
assert mode == 0o600
def test_keypair_idempotent_across_calls(fed_dir):
priv1, pub1 = node_keypair()
priv2, pub2 = node_keypair()
# raw bytes match — same key loaded twice, not regenerated
raw1 = pub1.public_bytes(
encoding=federation.serialization.Encoding.Raw,
format=federation.serialization.PublicFormat.Raw,
)
raw2 = pub2.public_bytes(
encoding=federation.serialization.Encoding.Raw,
format=federation.serialization.PublicFormat.Raw,
)
assert raw1 == raw2
def test_fingerprint_is_stable_and_32_hex(fed_dir):
fp1 = node_fingerprint()
fp2 = node_fingerprint()
assert fp1 == fp2
assert len(fp1) == 32
assert all(c in "0123456789abcdef" for c in fp1)
def test_sign_verify_roundtrip(fed_dir):
payload = b"hello federation"
sig = sign_payload(payload)
assert verify_payload(payload, sig, public_key_pem()) is True
def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path):
payload = b"the truth"
sig = sign_payload(payload)
# Build a *different* keypair in a separate directory and use its pubkey.
other = tmp_path / "other-federation"
other.mkdir()
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
other_priv = ed25519.Ed25519PrivateKey.generate()
other_pub_pem = other_priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
assert verify_payload(payload, sig, other_pub_pem) is False
def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir):
sig = sign_payload(b"x")
assert verify_payload(b"x", sig, "not a pem") is False
def test_canonical_json_is_deterministic():
a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}})
b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}})
assert a == b
def test_dns_record_txt_value_matches_spec(fed_dir):
rec = dns_record("example.com")
assert isinstance(rec, DNSRecord)
fp = node_fingerprint()
assert rec.srv_name == "_psyc._tcp.example.com"
assert rec.srv_target == "example.com."
assert rec.srv_port == 443
assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
# human instructions include both record lines
assert "_psyc._tcp.example.com" in rec.human_instructions
assert "SRV" in rec.human_instructions
assert "TXT" in rec.human_instructions
def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"])
db.upsert_case(case)
feed = build_signed_feed(window_hours=24)
assert feed["version"] == "psyc1"
assert feed["fingerprint"] == node_fingerprint()
assert feed["signature"]
# cases entry made it in
assert any(c["case_id"] == case.case_id for c in feed["cases"])
# Import using our own pubkey against a *different* declared fingerprint:
# swap the fingerprint so import_signed_feed doesn't reject as a loop.
pub = public_key_pem()
# Use a fresh keypair to act as "peer" — sign a feed with that key.
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
import hashlib
peer_priv = ed25519.Ed25519PrivateKey.generate()
peer_pub_pem = peer_priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
peer_raw = peer_priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex()
feed["fingerprint"] = peer_fp
unsigned = {k: v for k, v in feed.items() if k != "signature"}
new_sig = peer_priv.sign(canonical_json(unsigned))
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
result = import_signed_feed(feed, peer_pub_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
summary = result.value
assert summary.peer_fingerprint == peer_fp
assert summary.cases_seen >= 1
def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir):
db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"]))
feed = build_signed_feed(window_hours=24)
# build a *different* pubkey to claim verification against
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
# also need to change fingerprint so the loop-check doesn't trigger first
feed["fingerprint"] = "deadbeef" * 4
result = import_signed_feed(feed, other_pub_pem)
assert isinstance(result, Err)
def test_import_own_feed_returns_loop_err(fresh_db, fed_dir):
db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"]))
feed = build_signed_feed(window_hours=24)
result = import_signed_feed(feed, public_key_pem())
assert isinstance(result, Err)
assert "loop" in result.reason
def test_record_signal_then_lookup_by_hash(fresh_db):
rid = db.record_signal(dict(
peer_fingerprint="abc123",
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="hash-aaa",
received_at="2026-01-01T00:00:00+00:00",
raw_json="{}",
))
assert rid > 0
rows = db.signals_for_hash("hash-aaa")
assert len(rows) == 1
assert rows[0]["peer_fingerprint"] == "abc123"
# second peer reports the same hash → both surface for quorum check
db.record_signal(dict(
peer_fingerprint="def456",
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="hash-aaa",
received_at="2026-01-01T00:01:00+00:00",
raw_json="{}",
))
rows = db.signals_for_hash("hash-aaa")
assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"}
def test_peer_registry_crud(fresh_db, fed_dir):
federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted")
peers = federation.list_peers()
assert len(peers) == 1
assert peers[0].domain == "peer.example"
assert peers[0].status == "trusted"
federation.set_peer_status("peer.example", "blocked")
assert federation.get_peer("peer.example").status == "blocked"
federation.remove_peer("peer.example")
assert federation.list_peers() == []