"""Vouching + quorum — sign/verify, threshold logic, import gate.""" from __future__ import annotations import base64 import hashlib from datetime import datetime, timedelta, timezone import pytest from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 from sqlalchemy import create_engine from psyc import db from psyc.lines import federation from psyc.lines.federation import ( QuorumConfig, Vouch, accept_vouch, build_signed_feed, canonical_json, import_signed_feed, is_quorum_met, is_vouched, issue_vouch, node_fingerprint, our_vouches, peer_is_listening_eligible, public_key_pem, quorum_config, register_peer, revoke_vouch, set_quorum_config, vouch_payload_bytes, ) from psyc.result import Err, Ok @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" eng = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(eng, checkfirst=True) monkeypatch.setattr(db, "_engine", eng) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db @pytest.fixture def fed_dir(tmp_path, monkeypatch): d = tmp_path / "federation" monkeypatch.setattr(federation, "FED_DIR", d) monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") yield d def _make_peer(): """Generate an Ed25519 keypair + matching fingerprint for a fake peer.""" priv = ed25519.Ed25519PrivateKey.generate() pub = priv.public_key() pub_pem = pub.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode("ascii") raw = pub.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw, ) fp = hashlib.sha256(raw).digest()[:16].hex() return priv, pub_pem, fp def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at): payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at) sig = priv.sign(payload) return base64.b64encode(sig).decode("ascii") # ---------- self-issued vouch round-trip -------------------------------- def test_issue_vouch_roundtrip(fresh_db, fed_dir): target = "ab" * 16 v = issue_vouch(target, ttl_days=30) assert v.voucher_fingerprint == node_fingerprint() assert v.target_fingerprint == target assert v.expires_at is not None # round-trip from storage listed = our_vouches() assert len(listed) == 1 assert listed[0].target_fingerprint == target assert listed[0].signature == v.signature # signature verifies under our own pubkey payload = vouch_payload_bytes( v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at ) sig = base64.b64decode(v.signature) assert federation.verify_payload(payload, sig, public_key_pem()) def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir): target = "cd" * 16 issue_vouch(target, ttl_days=30) assert len(our_vouches()) == 1 revoke_vouch(target) assert our_vouches() == [] # ---------- accept_vouch validation ------------------------------------- def test_accept_vouch_rejects_expired(fresh_db, fed_dir): priv, pem, fp = _make_peer() register_peer("voucher.example", fp, pem, status="trusted") issued = datetime.now(timezone.utc) - timedelta(days=10) expired = datetime.now(timezone.utc) - timedelta(days=1) sig = _sign_vouch(priv, fp, "target", issued, expired) v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", issued_at=issued, expires_at=expired, signature=sig) result = accept_vouch(v, pem) assert isinstance(result, Err) assert "expired" in result.reason def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir): priv, pem, fp = _make_peer() register_peer("voucher.example", fp, pem, status="trusted") issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) # Sign a different target then claim it's for "real-target". real_sig = _sign_vouch(priv, fp, "other-target", issued, expires) v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target", issued_at=issued, expires_at=expires, signature=real_sig) result = accept_vouch(v, pem) assert isinstance(result, Err) assert "signature" in result.reason def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir): priv, pem, fp = _make_peer() # Voucher exists but is "unknown" not "trusted". register_peer("voucher.example", fp, pem, status="unknown") issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) sig = _sign_vouch(priv, fp, "target", issued, expires) v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", issued_at=issued, expires_at=expires, signature=sig) result = accept_vouch(v, pem) assert isinstance(result, Err) assert "not trusted" in result.reason def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir): priv, pem, fp = _make_peer() register_peer("voucher.example", fp, pem, status="trusted") issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) sig = _sign_vouch(priv, fp, "target", issued, expires) v = Vouch(voucher_fingerprint=fp, target_fingerprint="target", issued_at=issued, expires_at=expires, signature=sig) result = accept_vouch(v, pem) assert isinstance(result, Ok) # ---------- is_vouched threshold ---------------------------------------- def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir): """Two vouches from the same peer must NOT clear a threshold of 2.""" priv, pem, fp = _make_peer() register_peer("voucher.example", fp, pem, status="trusted") issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) sig = _sign_vouch(priv, fp, "target", issued, expires) v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target", issued_at=issued, expires_at=expires, signature=sig) assert isinstance(accept_vouch(v1, pem), Ok) # Newer vouch from the SAME voucher — upsert replaces, count stays 1. issued2 = issued + timedelta(seconds=1) sig2 = _sign_vouch(priv, fp, "target", issued2, expires) v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target", issued_at=issued2, expires_at=expires, signature=sig2) assert isinstance(accept_vouch(v2, pem), Ok) assert is_vouched("target", min_vouchers=2) is False # Threshold of 1 should pass. assert is_vouched("target", min_vouchers=1) is True def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir): priv_a, pem_a, fp_a = _make_peer() priv_b, pem_b, fp_b = _make_peer() register_peer("a.example", fp_a, pem_a, status="trusted") register_peer("b.example", fp_b, pem_b, status="trusted") issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target", issued_at=issued, expires_at=expires, signature=_sign_vouch(priv_a, fp_a, "target", issued, expires)) vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target", issued_at=issued, expires_at=expires, signature=_sign_vouch(priv_b, fp_b, "target", issued, expires)) assert isinstance(accept_vouch(va, pem_a), Ok) assert isinstance(accept_vouch(vb, pem_b), Ok) assert is_vouched("target", min_vouchers=2) is True assert is_vouched("target", min_vouchers=3) is False # ---------- quorum on signal_hash --------------------------------------- def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir): # Two trusted peers + one untrusted peer report the same signal_hash. _, pem_a, fp_a = _make_peer() _, pem_b, fp_b = _make_peer() _, pem_c, fp_c = _make_peer() register_peer("a.example", fp_a, pem_a, status="trusted") register_peer("b.example", fp_b, pem_b, status="trusted") register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="1.2.3.4", signal_hash="h-aaa", received_at=datetime.now(timezone.utc).isoformat(), raw_json="{}", )) assert is_quorum_met("h-aaa", k=2) is True assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts # ---------- quorum config persistence ----------------------------------- def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir): cfg = quorum_config() assert cfg.trust_min_vouchers == 2 assert cfg.signal_quorum_k == 2 set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4)) cfg2 = quorum_config() assert cfg2.trust_min_vouchers == 3 assert cfg2.signal_quorum_k == 4 # ---------- import gate enforces listening eligibility ------------------ def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None): """Build a feed claiming origin=peer_fp, signed with peer_priv.""" payload = { "version": federation.FEED_VERSION, "fingerprint": peer_fp, "generated_at": datetime.now(timezone.utc).isoformat(), "window_hours": 24, "cases": [], "iocs": [{ "value": "9.9.9.9", "type": "ip", "severity": "high", "first_seen": datetime.now(timezone.utc).isoformat(), "digest_sha256": "abc123", }], "vouches": vouches or [], } sig = peer_priv.sign(canonical_json(payload)) payload["signature"] = base64.b64encode(sig).decode("ascii") return payload def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir): peer_priv, peer_pem, peer_fp = _make_peer() feed = _signed_feed_from_peer(peer_priv, peer_fp) result = import_signed_feed(feed, peer_pem) assert isinstance(result, Err) assert "not trusted" in result.reason def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir): peer_priv, peer_pem, peer_fp = _make_peer() register_peer("peer.example", peer_fp, peer_pem, status="trusted") feed = _signed_feed_from_peer(peer_priv, peer_fp) result = import_signed_feed(feed, peer_pem) assert isinstance(result, Ok), getattr(result, "reason", "") def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir): # Two trusted peers vouch for a third — third becomes listening-eligible. priv_a, pem_a, fp_a = _make_peer() priv_b, pem_b, fp_b = _make_peer() priv_c, pem_c, fp_c = _make_peer() register_peer("a.example", fp_a, pem_a, status="trusted") register_peer("b.example", fp_b, pem_b, status="trusted") issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c, issued_at=issued, expires_at=expires, signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires)) vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c, issued_at=issued, expires_at=expires, signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires)) assert isinstance(accept_vouch(va, pem_a), Ok) assert isinstance(accept_vouch(vb, pem_b), Ok) assert peer_is_listening_eligible(fp_c) is True feed = _signed_feed_from_peer(priv_c, fp_c) result = import_signed_feed(feed, pem_c) assert isinstance(result, Ok), getattr(result, "reason", "") def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir): """A trusted peer's feed carries a vouch the peer issued — we should accept_vouch it and store it locally.""" peer_priv, peer_pem, peer_fp = _make_peer() register_peer("peer.example", peer_fp, peer_pem, status="trusted") target_fp = "ff" * 16 issued = datetime.now(timezone.utc) expires = issued + timedelta(days=30) peer_vouch = Vouch( voucher_fingerprint=peer_fp, target_fingerprint=target_fp, issued_at=issued, expires_at=expires, signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires), ) feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")]) result = import_signed_feed(feed, peer_pem) assert isinstance(result, Ok), getattr(result, "reason", "") # The vouch is now in our local store under the peer's fingerprint. stored = federation.vouches_for(target_fp) assert any(v.voucher_fingerprint == peer_fp for v in stored)