Files
psyc/tests/test_vouching.py
m17hr1l f4148d86a6 stage-vouch-e federation: tests for vouching + quorum gate
test_vouching covers the contract auto-response and other agents will
gate on:

- issue_vouch round-trip (sign + verify under our own pubkey)
- accept_vouch rejects expired vouches
- accept_vouch rejects mismatched signatures
- accept_vouch rejects vouchers whose peers.status != "trusted"
- accept_vouch happy path
- is_vouched needs DISTINCT vouchers (two upserts from one peer == 1)
- is_vouched clears threshold with two distinct trusted vouchers
- is_quorum_met counts only listening-eligible peers (untrusted +
  duplicate rows don't count)
- quorum_config defaults + pulse_settings persistence
- import_signed_feed rejects unknown peer ("not trusted")
- import_signed_feed accepts directly-trusted peer
- import_signed_feed accepts a peer made eligible via two vouches
- import_signed_feed stores vouches embedded in a trusted peer's feed

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 21:11:18 +02:00

337 lines
13 KiB
Python

"""Vouching + quorum — sign/verify, threshold logic, import gate."""
from __future__ import annotations
import base64
import hashlib
from datetime import datetime, timedelta, timezone
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation
from psyc.lines.federation import (
QuorumConfig,
Vouch,
accept_vouch,
build_signed_feed,
canonical_json,
import_signed_feed,
is_quorum_met,
is_vouched,
issue_vouch,
node_fingerprint,
our_vouches,
peer_is_listening_eligible,
public_key_pem,
quorum_config,
register_peer,
revoke_vouch,
set_quorum_config,
vouch_payload_bytes,
)
from psyc.result import Err, Ok
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def _make_peer():
"""Generate an Ed25519 keypair + matching fingerprint for a fake peer."""
priv = ed25519.Ed25519PrivateKey.generate()
pub = priv.public_key()
pub_pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
raw = pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
fp = hashlib.sha256(raw).digest()[:16].hex()
return priv, pub_pem, fp
def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at):
payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at)
sig = priv.sign(payload)
return base64.b64encode(sig).decode("ascii")
# ---------- self-issued vouch round-trip --------------------------------
def test_issue_vouch_roundtrip(fresh_db, fed_dir):
target = "ab" * 16
v = issue_vouch(target, ttl_days=30)
assert v.voucher_fingerprint == node_fingerprint()
assert v.target_fingerprint == target
assert v.expires_at is not None
# round-trip from storage
listed = our_vouches()
assert len(listed) == 1
assert listed[0].target_fingerprint == target
assert listed[0].signature == v.signature
# signature verifies under our own pubkey
payload = vouch_payload_bytes(
v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at
)
sig = base64.b64decode(v.signature)
assert federation.verify_payload(payload, sig, public_key_pem())
def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir):
target = "cd" * 16
issue_vouch(target, ttl_days=30)
assert len(our_vouches()) == 1
revoke_vouch(target)
assert our_vouches() == []
# ---------- accept_vouch validation -------------------------------------
def test_accept_vouch_rejects_expired(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc) - timedelta(days=10)
expired = datetime.now(timezone.utc) - timedelta(days=1)
sig = _sign_vouch(priv, fp, "target", issued, expired)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expired, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "expired" in result.reason
def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
# Sign a different target then claim it's for "real-target".
real_sig = _sign_vouch(priv, fp, "other-target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target",
issued_at=issued, expires_at=expires, signature=real_sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "signature" in result.reason
def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
# Voucher exists but is "unknown" not "trusted".
register_peer("voucher.example", fp, pem, status="unknown")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Err)
assert "not trusted" in result.reason
def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir):
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
result = accept_vouch(v, pem)
assert isinstance(result, Ok)
# ---------- is_vouched threshold ----------------------------------------
def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir):
"""Two vouches from the same peer must NOT clear a threshold of 2."""
priv, pem, fp = _make_peer()
register_peer("voucher.example", fp, pem, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
sig = _sign_vouch(priv, fp, "target", issued, expires)
v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued, expires_at=expires, signature=sig)
assert isinstance(accept_vouch(v1, pem), Ok)
# Newer vouch from the SAME voucher — upsert replaces, count stays 1.
issued2 = issued + timedelta(seconds=1)
sig2 = _sign_vouch(priv, fp, "target", issued2, expires)
v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
issued_at=issued2, expires_at=expires, signature=sig2)
assert isinstance(accept_vouch(v2, pem), Ok)
assert is_vouched("target", min_vouchers=2) is False
# Threshold of 1 should pass.
assert is_vouched("target", min_vouchers=1) is True
def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir):
priv_a, pem_a, fp_a = _make_peer()
priv_b, pem_b, fp_b = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target",
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_a, fp_a, "target", issued, expires))
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target",
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_b, fp_b, "target", issued, expires))
assert isinstance(accept_vouch(va, pem_a), Ok)
assert isinstance(accept_vouch(vb, pem_b), Ok)
assert is_vouched("target", min_vouchers=2) is True
assert is_vouched("target", min_vouchers=3) is False
# ---------- quorum on signal_hash ---------------------------------------
def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir):
# Two trusted peers + one untrusted peer report the same signal_hash.
_, pem_a, fp_a = _make_peer()
_, pem_b, fp_b = _make_peer()
_, pem_c, fp_c = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible
for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="h-aaa",
received_at=datetime.now(timezone.utc).isoformat(),
raw_json="{}",
))
assert is_quorum_met("h-aaa", k=2) is True
assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts
# ---------- quorum config persistence -----------------------------------
def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir):
cfg = quorum_config()
assert cfg.trust_min_vouchers == 2
assert cfg.signal_quorum_k == 2
set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4))
cfg2 = quorum_config()
assert cfg2.trust_min_vouchers == 3
assert cfg2.signal_quorum_k == 4
# ---------- import gate enforces listening eligibility ------------------
def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None):
"""Build a feed claiming origin=peer_fp, signed with peer_priv."""
payload = {
"version": federation.FEED_VERSION,
"fingerprint": peer_fp,
"generated_at": datetime.now(timezone.utc).isoformat(),
"window_hours": 24,
"cases": [],
"iocs": [{
"value": "9.9.9.9",
"type": "ip",
"severity": "high",
"first_seen": datetime.now(timezone.utc).isoformat(),
"digest_sha256": "abc123",
}],
"vouches": vouches or [],
}
sig = peer_priv.sign(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload
def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir):
peer_priv, peer_pem, peer_fp = _make_peer()
feed = _signed_feed_from_peer(peer_priv, peer_fp)
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Err)
assert "not trusted" in result.reason
def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir):
peer_priv, peer_pem, peer_fp = _make_peer()
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
feed = _signed_feed_from_peer(peer_priv, peer_fp)
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir):
# Two trusted peers vouch for a third — third becomes listening-eligible.
priv_a, pem_a, fp_a = _make_peer()
priv_b, pem_b, fp_b = _make_peer()
priv_c, pem_c, fp_c = _make_peer()
register_peer("a.example", fp_a, pem_a, status="trusted")
register_peer("b.example", fp_b, pem_b, status="trusted")
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c,
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires))
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c,
issued_at=issued, expires_at=expires,
signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires))
assert isinstance(accept_vouch(va, pem_a), Ok)
assert isinstance(accept_vouch(vb, pem_b), Ok)
assert peer_is_listening_eligible(fp_c) is True
feed = _signed_feed_from_peer(priv_c, fp_c)
result = import_signed_feed(feed, pem_c)
assert isinstance(result, Ok), getattr(result, "reason", "")
def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir):
"""A trusted peer's feed carries a vouch the peer issued — we should
accept_vouch it and store it locally."""
peer_priv, peer_pem, peer_fp = _make_peer()
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
target_fp = "ff" * 16
issued = datetime.now(timezone.utc)
expires = issued + timedelta(days=30)
peer_vouch = Vouch(
voucher_fingerprint=peer_fp,
target_fingerprint=target_fp,
issued_at=issued,
expires_at=expires,
signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires),
)
feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")])
result = import_signed_feed(feed, peer_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
# The vouch is now in our local store under the peer's fingerprint.
stored = federation.vouches_for(target_fp)
assert any(v.voucher_fingerprint == peer_fp for v in stored)