diff --git a/tests/test_vouching.py b/tests/test_vouching.py new file mode 100644 index 0000000..d9b312e --- /dev/null +++ b/tests/test_vouching.py @@ -0,0 +1,336 @@ +"""Vouching + quorum — sign/verify, threshold logic, import gate.""" + +from __future__ import annotations + +import base64 +import hashlib +from datetime import datetime, timedelta, timezone + +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import federation +from psyc.lines.federation import ( + QuorumConfig, + Vouch, + accept_vouch, + build_signed_feed, + canonical_json, + import_signed_feed, + is_quorum_met, + is_vouched, + issue_vouch, + node_fingerprint, + our_vouches, + peer_is_listening_eligible, + public_key_pem, + quorum_config, + register_peer, + revoke_vouch, + set_quorum_config, + vouch_payload_bytes, +) +from psyc.result import Err, Ok + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +def _make_peer(): + """Generate an Ed25519 keypair + matching fingerprint for a fake peer.""" + priv = ed25519.Ed25519PrivateKey.generate() + pub = priv.public_key() + pub_pem = pub.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + raw = pub.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + fp = hashlib.sha256(raw).digest()[:16].hex() + return priv, pub_pem, fp + + +def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at): + payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at) + sig = priv.sign(payload) + return base64.b64encode(sig).decode("ascii") + + +# ---------- self-issued vouch round-trip -------------------------------- + +def test_issue_vouch_roundtrip(fresh_db, fed_dir): + target = "ab" * 16 + v = issue_vouch(target, ttl_days=30) + assert v.voucher_fingerprint == node_fingerprint() + assert v.target_fingerprint == target + assert v.expires_at is not None + # round-trip from storage + listed = our_vouches() + assert len(listed) == 1 + assert listed[0].target_fingerprint == target + assert listed[0].signature == v.signature + # signature verifies under our own pubkey + payload = vouch_payload_bytes( + v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at + ) + sig = base64.b64decode(v.signature) + assert federation.verify_payload(payload, sig, public_key_pem()) + + +def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir): + target = "cd" * 16 + issue_vouch(target, ttl_days=30) + assert len(our_vouches()) == 1 + revoke_vouch(target) + assert our_vouches() == [] + + +# ---------- accept_vouch validation ------------------------------------- + +def test_accept_vouch_rejects_expired(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + issued = datetime.now(timezone.utc) - timedelta(days=10) + expired = datetime.now(timezone.utc) - timedelta(days=1) + sig = _sign_vouch(priv, fp, "target", issued, expired) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expired, signature=sig) + result = accept_vouch(v, pem) + assert isinstance(result, Err) + assert "expired" in result.reason + + +def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + # Sign a different target then claim it's for "real-target". + real_sig = _sign_vouch(priv, fp, "other-target", issued, expires) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target", + issued_at=issued, expires_at=expires, signature=real_sig) + result = accept_vouch(v, pem) + assert isinstance(result, Err) + assert "signature" in result.reason + + +def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + # Voucher exists but is "unknown" not "trusted". + register_peer("voucher.example", fp, pem, status="unknown") + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + sig = _sign_vouch(priv, fp, "target", issued, expires) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expires, signature=sig) + result = accept_vouch(v, pem) + assert isinstance(result, Err) + assert "not trusted" in result.reason + + +def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir): + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + sig = _sign_vouch(priv, fp, "target", issued, expires) + v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expires, signature=sig) + result = accept_vouch(v, pem) + assert isinstance(result, Ok) + + +# ---------- is_vouched threshold ---------------------------------------- + +def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir): + """Two vouches from the same peer must NOT clear a threshold of 2.""" + priv, pem, fp = _make_peer() + register_peer("voucher.example", fp, pem, status="trusted") + + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + sig = _sign_vouch(priv, fp, "target", issued, expires) + v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued, expires_at=expires, signature=sig) + assert isinstance(accept_vouch(v1, pem), Ok) + + # Newer vouch from the SAME voucher — upsert replaces, count stays 1. + issued2 = issued + timedelta(seconds=1) + sig2 = _sign_vouch(priv, fp, "target", issued2, expires) + v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target", + issued_at=issued2, expires_at=expires, signature=sig2) + assert isinstance(accept_vouch(v2, pem), Ok) + + assert is_vouched("target", min_vouchers=2) is False + # Threshold of 1 should pass. + assert is_vouched("target", min_vouchers=1) is True + + +def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir): + priv_a, pem_a, fp_a = _make_peer() + priv_b, pem_b, fp_b = _make_peer() + register_peer("a.example", fp_a, pem_a, status="trusted") + register_peer("b.example", fp_b, pem_b, status="trusted") + + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target", + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_a, fp_a, "target", issued, expires)) + vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target", + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_b, fp_b, "target", issued, expires)) + assert isinstance(accept_vouch(va, pem_a), Ok) + assert isinstance(accept_vouch(vb, pem_b), Ok) + + assert is_vouched("target", min_vouchers=2) is True + assert is_vouched("target", min_vouchers=3) is False + + +# ---------- quorum on signal_hash --------------------------------------- + +def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir): + # Two trusted peers + one untrusted peer report the same signal_hash. + _, pem_a, fp_a = _make_peer() + _, pem_b, fp_b = _make_peer() + _, pem_c, fp_c = _make_peer() + register_peer("a.example", fp_a, pem_a, status="trusted") + register_peer("b.example", fp_b, pem_b, status="trusted") + register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible + + for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="1.2.3.4", + signal_hash="h-aaa", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + + assert is_quorum_met("h-aaa", k=2) is True + assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts + + +# ---------- quorum config persistence ----------------------------------- + +def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir): + cfg = quorum_config() + assert cfg.trust_min_vouchers == 2 + assert cfg.signal_quorum_k == 2 + set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4)) + cfg2 = quorum_config() + assert cfg2.trust_min_vouchers == 3 + assert cfg2.signal_quorum_k == 4 + + +# ---------- import gate enforces listening eligibility ------------------ + +def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None): + """Build a feed claiming origin=peer_fp, signed with peer_priv.""" + payload = { + "version": federation.FEED_VERSION, + "fingerprint": peer_fp, + "generated_at": datetime.now(timezone.utc).isoformat(), + "window_hours": 24, + "cases": [], + "iocs": [{ + "value": "9.9.9.9", + "type": "ip", + "severity": "high", + "first_seen": datetime.now(timezone.utc).isoformat(), + "digest_sha256": "abc123", + }], + "vouches": vouches or [], + } + sig = peer_priv.sign(canonical_json(payload)) + payload["signature"] = base64.b64encode(sig).decode("ascii") + return payload + + +def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir): + peer_priv, peer_pem, peer_fp = _make_peer() + feed = _signed_feed_from_peer(peer_priv, peer_fp) + result = import_signed_feed(feed, peer_pem) + assert isinstance(result, Err) + assert "not trusted" in result.reason + + +def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir): + peer_priv, peer_pem, peer_fp = _make_peer() + register_peer("peer.example", peer_fp, peer_pem, status="trusted") + feed = _signed_feed_from_peer(peer_priv, peer_fp) + result = import_signed_feed(feed, peer_pem) + assert isinstance(result, Ok), getattr(result, "reason", "") + + +def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir): + # Two trusted peers vouch for a third — third becomes listening-eligible. + priv_a, pem_a, fp_a = _make_peer() + priv_b, pem_b, fp_b = _make_peer() + priv_c, pem_c, fp_c = _make_peer() + register_peer("a.example", fp_a, pem_a, status="trusted") + register_peer("b.example", fp_b, pem_b, status="trusted") + + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c, + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires)) + vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c, + issued_at=issued, expires_at=expires, + signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires)) + assert isinstance(accept_vouch(va, pem_a), Ok) + assert isinstance(accept_vouch(vb, pem_b), Ok) + assert peer_is_listening_eligible(fp_c) is True + + feed = _signed_feed_from_peer(priv_c, fp_c) + result = import_signed_feed(feed, pem_c) + assert isinstance(result, Ok), getattr(result, "reason", "") + + +def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir): + """A trusted peer's feed carries a vouch the peer issued — we should + accept_vouch it and store it locally.""" + peer_priv, peer_pem, peer_fp = _make_peer() + register_peer("peer.example", peer_fp, peer_pem, status="trusted") + + target_fp = "ff" * 16 + issued = datetime.now(timezone.utc) + expires = issued + timedelta(days=30) + peer_vouch = Vouch( + voucher_fingerprint=peer_fp, + target_fingerprint=target_fp, + issued_at=issued, + expires_at=expires, + signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires), + ) + feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")]) + + result = import_signed_feed(feed, peer_pem) + assert isinstance(result, Ok), getattr(result, "reason", "") + + # The vouch is now in our local store under the peer's fingerprint. + stored = federation.vouches_for(target_fp) + assert any(v.voucher_fingerprint == peer_fp for v in stored)