test_vouching covers the contract auto-response and other agents will
gate on:
- issue_vouch round-trip (sign + verify under our own pubkey)
- accept_vouch rejects expired vouches
- accept_vouch rejects mismatched signatures
- accept_vouch rejects vouchers whose peers.status != "trusted"
- accept_vouch happy path
- is_vouched needs DISTINCT vouchers (two upserts from one peer == 1)
- is_vouched clears threshold with two distinct trusted vouchers
- is_quorum_met counts only listening-eligible peers (untrusted +
duplicate rows don't count)
- quorum_config defaults + pulse_settings persistence
- import_signed_feed rejects unknown peer ("not trusted")
- import_signed_feed accepts directly-trusted peer
- import_signed_feed accepts a peer made eligible via two vouches
- import_signed_feed stores vouches embedded in a trusted peer's feed
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
337 lines
13 KiB
Python
337 lines
13 KiB
Python
"""Vouching + quorum — sign/verify, threshold logic, import gate."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
from sqlalchemy import create_engine
|
|
|
|
from psyc import db
|
|
from psyc.lines import federation
|
|
from psyc.lines.federation import (
|
|
QuorumConfig,
|
|
Vouch,
|
|
accept_vouch,
|
|
build_signed_feed,
|
|
canonical_json,
|
|
import_signed_feed,
|
|
is_quorum_met,
|
|
is_vouched,
|
|
issue_vouch,
|
|
node_fingerprint,
|
|
our_vouches,
|
|
peer_is_listening_eligible,
|
|
public_key_pem,
|
|
quorum_config,
|
|
register_peer,
|
|
revoke_vouch,
|
|
set_quorum_config,
|
|
vouch_payload_bytes,
|
|
)
|
|
from psyc.result import Err, Ok
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(eng, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", eng)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
@pytest.fixture
|
|
def fed_dir(tmp_path, monkeypatch):
|
|
d = tmp_path / "federation"
|
|
monkeypatch.setattr(federation, "FED_DIR", d)
|
|
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
|
|
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
|
|
yield d
|
|
|
|
|
|
def _make_peer():
|
|
"""Generate an Ed25519 keypair + matching fingerprint for a fake peer."""
|
|
priv = ed25519.Ed25519PrivateKey.generate()
|
|
pub = priv.public_key()
|
|
pub_pem = pub.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
).decode("ascii")
|
|
raw = pub.public_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PublicFormat.Raw,
|
|
)
|
|
fp = hashlib.sha256(raw).digest()[:16].hex()
|
|
return priv, pub_pem, fp
|
|
|
|
|
|
def _sign_vouch(priv, voucher_fp, target_fp, issued_at, expires_at):
|
|
payload = vouch_payload_bytes(voucher_fp, target_fp, issued_at, expires_at)
|
|
sig = priv.sign(payload)
|
|
return base64.b64encode(sig).decode("ascii")
|
|
|
|
|
|
# ---------- self-issued vouch round-trip --------------------------------
|
|
|
|
def test_issue_vouch_roundtrip(fresh_db, fed_dir):
|
|
target = "ab" * 16
|
|
v = issue_vouch(target, ttl_days=30)
|
|
assert v.voucher_fingerprint == node_fingerprint()
|
|
assert v.target_fingerprint == target
|
|
assert v.expires_at is not None
|
|
# round-trip from storage
|
|
listed = our_vouches()
|
|
assert len(listed) == 1
|
|
assert listed[0].target_fingerprint == target
|
|
assert listed[0].signature == v.signature
|
|
# signature verifies under our own pubkey
|
|
payload = vouch_payload_bytes(
|
|
v.voucher_fingerprint, v.target_fingerprint, v.issued_at, v.expires_at
|
|
)
|
|
sig = base64.b64decode(v.signature)
|
|
assert federation.verify_payload(payload, sig, public_key_pem())
|
|
|
|
|
|
def test_revoke_vouch_removes_only_our_entry(fresh_db, fed_dir):
|
|
target = "cd" * 16
|
|
issue_vouch(target, ttl_days=30)
|
|
assert len(our_vouches()) == 1
|
|
revoke_vouch(target)
|
|
assert our_vouches() == []
|
|
|
|
|
|
# ---------- accept_vouch validation -------------------------------------
|
|
|
|
def test_accept_vouch_rejects_expired(fresh_db, fed_dir):
|
|
priv, pem, fp = _make_peer()
|
|
register_peer("voucher.example", fp, pem, status="trusted")
|
|
issued = datetime.now(timezone.utc) - timedelta(days=10)
|
|
expired = datetime.now(timezone.utc) - timedelta(days=1)
|
|
sig = _sign_vouch(priv, fp, "target", issued, expired)
|
|
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
|
|
issued_at=issued, expires_at=expired, signature=sig)
|
|
result = accept_vouch(v, pem)
|
|
assert isinstance(result, Err)
|
|
assert "expired" in result.reason
|
|
|
|
|
|
def test_accept_vouch_rejects_bad_signature(fresh_db, fed_dir):
|
|
priv, pem, fp = _make_peer()
|
|
register_peer("voucher.example", fp, pem, status="trusted")
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
# Sign a different target then claim it's for "real-target".
|
|
real_sig = _sign_vouch(priv, fp, "other-target", issued, expires)
|
|
v = Vouch(voucher_fingerprint=fp, target_fingerprint="real-target",
|
|
issued_at=issued, expires_at=expires, signature=real_sig)
|
|
result = accept_vouch(v, pem)
|
|
assert isinstance(result, Err)
|
|
assert "signature" in result.reason
|
|
|
|
|
|
def test_accept_vouch_rejects_voucher_not_trusted(fresh_db, fed_dir):
|
|
priv, pem, fp = _make_peer()
|
|
# Voucher exists but is "unknown" not "trusted".
|
|
register_peer("voucher.example", fp, pem, status="unknown")
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
sig = _sign_vouch(priv, fp, "target", issued, expires)
|
|
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
|
|
issued_at=issued, expires_at=expires, signature=sig)
|
|
result = accept_vouch(v, pem)
|
|
assert isinstance(result, Err)
|
|
assert "not trusted" in result.reason
|
|
|
|
|
|
def test_accept_vouch_ok_for_trusted_voucher(fresh_db, fed_dir):
|
|
priv, pem, fp = _make_peer()
|
|
register_peer("voucher.example", fp, pem, status="trusted")
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
sig = _sign_vouch(priv, fp, "target", issued, expires)
|
|
v = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
|
|
issued_at=issued, expires_at=expires, signature=sig)
|
|
result = accept_vouch(v, pem)
|
|
assert isinstance(result, Ok)
|
|
|
|
|
|
# ---------- is_vouched threshold ----------------------------------------
|
|
|
|
def test_is_vouched_needs_distinct_vouchers(fresh_db, fed_dir):
|
|
"""Two vouches from the same peer must NOT clear a threshold of 2."""
|
|
priv, pem, fp = _make_peer()
|
|
register_peer("voucher.example", fp, pem, status="trusted")
|
|
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
sig = _sign_vouch(priv, fp, "target", issued, expires)
|
|
v1 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
|
|
issued_at=issued, expires_at=expires, signature=sig)
|
|
assert isinstance(accept_vouch(v1, pem), Ok)
|
|
|
|
# Newer vouch from the SAME voucher — upsert replaces, count stays 1.
|
|
issued2 = issued + timedelta(seconds=1)
|
|
sig2 = _sign_vouch(priv, fp, "target", issued2, expires)
|
|
v2 = Vouch(voucher_fingerprint=fp, target_fingerprint="target",
|
|
issued_at=issued2, expires_at=expires, signature=sig2)
|
|
assert isinstance(accept_vouch(v2, pem), Ok)
|
|
|
|
assert is_vouched("target", min_vouchers=2) is False
|
|
# Threshold of 1 should pass.
|
|
assert is_vouched("target", min_vouchers=1) is True
|
|
|
|
|
|
def test_is_vouched_two_distinct_clear_threshold(fresh_db, fed_dir):
|
|
priv_a, pem_a, fp_a = _make_peer()
|
|
priv_b, pem_b, fp_b = _make_peer()
|
|
register_peer("a.example", fp_a, pem_a, status="trusted")
|
|
register_peer("b.example", fp_b, pem_b, status="trusted")
|
|
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint="target",
|
|
issued_at=issued, expires_at=expires,
|
|
signature=_sign_vouch(priv_a, fp_a, "target", issued, expires))
|
|
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint="target",
|
|
issued_at=issued, expires_at=expires,
|
|
signature=_sign_vouch(priv_b, fp_b, "target", issued, expires))
|
|
assert isinstance(accept_vouch(va, pem_a), Ok)
|
|
assert isinstance(accept_vouch(vb, pem_b), Ok)
|
|
|
|
assert is_vouched("target", min_vouchers=2) is True
|
|
assert is_vouched("target", min_vouchers=3) is False
|
|
|
|
|
|
# ---------- quorum on signal_hash ---------------------------------------
|
|
|
|
def test_is_quorum_met_counts_distinct_vouched_peers_only(fresh_db, fed_dir):
|
|
# Two trusted peers + one untrusted peer report the same signal_hash.
|
|
_, pem_a, fp_a = _make_peer()
|
|
_, pem_b, fp_b = _make_peer()
|
|
_, pem_c, fp_c = _make_peer()
|
|
register_peer("a.example", fp_a, pem_a, status="trusted")
|
|
register_peer("b.example", fp_b, pem_b, status="trusted")
|
|
register_peer("c.example", fp_c, pem_c, status="unknown") # not eligible
|
|
|
|
for fp in (fp_a, fp_b, fp_c, fp_a): # fp_a duplicated → still 1 distinct
|
|
db.record_signal(dict(
|
|
peer_fingerprint=fp,
|
|
signal_type="ioc",
|
|
signal_id="1.2.3.4",
|
|
signal_hash="h-aaa",
|
|
received_at=datetime.now(timezone.utc).isoformat(),
|
|
raw_json="{}",
|
|
))
|
|
|
|
assert is_quorum_met("h-aaa", k=2) is True
|
|
assert is_quorum_met("h-aaa", k=3) is False # only 2 eligible distincts
|
|
|
|
|
|
# ---------- quorum config persistence -----------------------------------
|
|
|
|
def test_quorum_config_defaults_and_persistence(fresh_db, fed_dir):
|
|
cfg = quorum_config()
|
|
assert cfg.trust_min_vouchers == 2
|
|
assert cfg.signal_quorum_k == 2
|
|
set_quorum_config(QuorumConfig(trust_min_vouchers=3, signal_quorum_k=4))
|
|
cfg2 = quorum_config()
|
|
assert cfg2.trust_min_vouchers == 3
|
|
assert cfg2.signal_quorum_k == 4
|
|
|
|
|
|
# ---------- import gate enforces listening eligibility ------------------
|
|
|
|
def _signed_feed_from_peer(peer_priv, peer_fp, vouches=None):
|
|
"""Build a feed claiming origin=peer_fp, signed with peer_priv."""
|
|
payload = {
|
|
"version": federation.FEED_VERSION,
|
|
"fingerprint": peer_fp,
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"window_hours": 24,
|
|
"cases": [],
|
|
"iocs": [{
|
|
"value": "9.9.9.9",
|
|
"type": "ip",
|
|
"severity": "high",
|
|
"first_seen": datetime.now(timezone.utc).isoformat(),
|
|
"digest_sha256": "abc123",
|
|
}],
|
|
"vouches": vouches or [],
|
|
}
|
|
sig = peer_priv.sign(canonical_json(payload))
|
|
payload["signature"] = base64.b64encode(sig).decode("ascii")
|
|
return payload
|
|
|
|
|
|
def test_import_feed_rejects_unknown_peer(fresh_db, fed_dir):
|
|
peer_priv, peer_pem, peer_fp = _make_peer()
|
|
feed = _signed_feed_from_peer(peer_priv, peer_fp)
|
|
result = import_signed_feed(feed, peer_pem)
|
|
assert isinstance(result, Err)
|
|
assert "not trusted" in result.reason
|
|
|
|
|
|
def test_import_feed_accepts_directly_trusted_peer(fresh_db, fed_dir):
|
|
peer_priv, peer_pem, peer_fp = _make_peer()
|
|
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
feed = _signed_feed_from_peer(peer_priv, peer_fp)
|
|
result = import_signed_feed(feed, peer_pem)
|
|
assert isinstance(result, Ok), getattr(result, "reason", "")
|
|
|
|
|
|
def test_import_feed_accepts_vouched_peer(fresh_db, fed_dir):
|
|
# Two trusted peers vouch for a third — third becomes listening-eligible.
|
|
priv_a, pem_a, fp_a = _make_peer()
|
|
priv_b, pem_b, fp_b = _make_peer()
|
|
priv_c, pem_c, fp_c = _make_peer()
|
|
register_peer("a.example", fp_a, pem_a, status="trusted")
|
|
register_peer("b.example", fp_b, pem_b, status="trusted")
|
|
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
va = Vouch(voucher_fingerprint=fp_a, target_fingerprint=fp_c,
|
|
issued_at=issued, expires_at=expires,
|
|
signature=_sign_vouch(priv_a, fp_a, fp_c, issued, expires))
|
|
vb = Vouch(voucher_fingerprint=fp_b, target_fingerprint=fp_c,
|
|
issued_at=issued, expires_at=expires,
|
|
signature=_sign_vouch(priv_b, fp_b, fp_c, issued, expires))
|
|
assert isinstance(accept_vouch(va, pem_a), Ok)
|
|
assert isinstance(accept_vouch(vb, pem_b), Ok)
|
|
assert peer_is_listening_eligible(fp_c) is True
|
|
|
|
feed = _signed_feed_from_peer(priv_c, fp_c)
|
|
result = import_signed_feed(feed, pem_c)
|
|
assert isinstance(result, Ok), getattr(result, "reason", "")
|
|
|
|
|
|
def test_import_feed_propagates_vouches_in_payload(fresh_db, fed_dir):
|
|
"""A trusted peer's feed carries a vouch the peer issued — we should
|
|
accept_vouch it and store it locally."""
|
|
peer_priv, peer_pem, peer_fp = _make_peer()
|
|
register_peer("peer.example", peer_fp, peer_pem, status="trusted")
|
|
|
|
target_fp = "ff" * 16
|
|
issued = datetime.now(timezone.utc)
|
|
expires = issued + timedelta(days=30)
|
|
peer_vouch = Vouch(
|
|
voucher_fingerprint=peer_fp,
|
|
target_fingerprint=target_fp,
|
|
issued_at=issued,
|
|
expires_at=expires,
|
|
signature=_sign_vouch(peer_priv, peer_fp, target_fp, issued, expires),
|
|
)
|
|
feed = _signed_feed_from_peer(peer_priv, peer_fp, vouches=[peer_vouch.model_dump(mode="json")])
|
|
|
|
result = import_signed_feed(feed, peer_pem)
|
|
assert isinstance(result, Ok), getattr(result, "reason", "")
|
|
|
|
# The vouch is now in our local store under the peer's fingerprint.
|
|
stored = federation.vouches_for(target_fp)
|
|
assert any(v.voucher_fingerprint == peer_fp for v in stored)
|