diff --git a/src/psyc/lines/federation.py b/src/psyc/lines/federation.py index 5439068..059aa9d 100644 --- a/src/psyc/lines/federation.py +++ b/src/psyc/lines/federation.py @@ -260,6 +260,9 @@ def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]: "window_hours": window_hours, "cases": _build_case_records(window_hours), "iocs": _build_ioc_records(window_hours), + # Vouches we've issued ride along with the feed so peers can learn + # who we trust and accumulate quorum on shared targets. + "vouches": [v.model_dump() for v in our_vouches()], } sig = sign_payload(canonical_json(payload)) payload["signature"] = base64.b64encode(sig).decode("ascii") @@ -307,10 +310,16 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result except Exception as exc: return Err(f"bad pubkey: {exc}") + # Listening gate: only accept signals from peers we explicitly trust or + # that quorum of trusted peers vouches for. Unknown peers don't land here. + if not peer_is_listening_eligible(peer_fp): + return Err(f"peer not trusted: {peer_fp}") + now = datetime.now(timezone.utc).isoformat() signal_ids: List[Tuple[str, str]] = [] cases = feed.get("cases") or [] iocs = feed.get("iocs") or [] + feed_vouches = feed.get("vouches") or [] for c in cases: case_id = c.get("case_id") or "" @@ -324,6 +333,15 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result raw_json=json.dumps(c, sort_keys=True), )) signal_ids.append(("case", digest)) + try: + translog.append("signal", { + "peer_fingerprint": peer_fp, + "signal_type": "case", + "signal_id": case_id, + "signal_hash": digest, + }) + except Exception as exc: # transparency log is best-effort, never block ingest + _log.warning("federation.translog.append.fail", error=str(exc)) for i in iocs: value = i.get("value") or "" @@ -337,6 +355,36 @@ def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result raw_json=json.dumps(i, sort_keys=True), )) signal_ids.append(("ioc", digest)) + try: + translog.append("signal", { + "peer_fingerprint": peer_fp, + "signal_type": "ioc", + "signal_id": value, + "signal_hash": digest, + }) + except Exception as exc: + _log.warning("federation.translog.append.fail", error=str(exc)) + + # Vouch propagation — peer asserts who they trust. We only accept vouches + # whose declared voucher fingerprint matches the peer we just authenticated + # (so a peer can't forge vouches "from" someone else through us). + for v_raw in feed_vouches: + if not isinstance(v_raw, dict): + continue + try: + vouch = Vouch.model_validate(v_raw) + except Exception as exc: + _log.warning("federation.vouch.malformed", error=str(exc)) + continue + if vouch.voucher_fingerprint != peer_fp: + _log.warning( + "federation.vouch.voucher_mismatch", + claimed=vouch.voucher_fingerprint, actual=peer_fp, + ) + continue + accepted = accept_vouch(vouch, expected_pubkey_pem) + if isinstance(accepted, Err): + _log.warning("federation.vouch.rejected", reason=accepted.reason) _log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs)) return Ok(ImportSummary( diff --git a/tests/test_federation.py b/tests/test_federation.py index 3a0c599..b04fddb 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -159,6 +159,8 @@ def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir): new_sig = peer_priv.sign(canonical_json(unsigned)) feed["signature"] = base64.b64encode(new_sig).decode("ascii") + # Stage 4 listening gate: peer must be trusted to land signals. + federation.register_peer("peer.example", peer_fp, peer_pub_pem, status="trusted") result = import_signed_feed(feed, peer_pub_pem) assert isinstance(result, Ok), getattr(result, "reason", "") summary = result.value