"""Federation explore view — public transparency payload tests. Sibling to `test_network_view.py`; focused on the explore-only shape: shape contract, signature round-trip, no-leak invariants, transitive walk, inbound vouches filter, and the corroboration counter. """ from __future__ import annotations import base64 import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional from unittest.mock import patch import pytest from fastapi import FastAPI from fastapi.templating import Jinja2Templates from fastapi.testclient import TestClient from sqlalchemy import create_engine from starlette.middleware.sessions import SessionMiddleware from psyc import db from psyc.cockpit import federation_routes from psyc.lines import federation, network_view, translog from psyc.lines.network_view import build_explore_view # ---------- fixtures ---------------------------------------------------- @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" eng = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(eng, checkfirst=True) monkeypatch.setattr(db, "_engine", eng) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db @pytest.fixture def fed_dir(tmp_path, monkeypatch): d = tmp_path / "federation" monkeypatch.setattr(federation, "FED_DIR", d) monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") yield d @pytest.fixture(autouse=True) def reset_explore_caches(monkeypatch): """Prevent route-level cache bleed between tests.""" monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None}) federation_routes._FEED_CACHE["payload"] = None federation_routes._PUBLIC_PEERS_CACHE["payload"] = None federation_routes._PUBLIC_NETWORK_CACHE["payload"] = None if hasattr(federation_routes, "_EXPLORE_CACHE"): federation_routes._EXPLORE_CACHE["payload"] = None yield def _make_peer_pubkey() -> tuple[str, str]: """Return (fingerprint, pubkey_pem) for a synthetic peer keypair.""" import hashlib from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 priv = ed25519.Ed25519PrivateKey.generate() pub = priv.public_key() pem = pub.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode("ascii") raw = pub.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw, ) fp = hashlib.sha256(raw).digest()[:16].hex() return fp, pem def _silence_explore_fetch(): return patch.object(network_view, "_fetch_peer_explore", return_value=None) def _silence_network_fetch(): return patch.object(network_view, "_fetch_peer_network", return_value=None) # ---------- schema ------------------------------------------------------ def test_explore_view_top_level_shape(fresh_db, fed_dir): with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view(node_domain="me.example") for key in ( "version", "fingerprint", "generated_at", "node", "peers", "vouches", "vouches_out", "vouches_in", "transitive_peers", "corroboration_count_24h", "signature", ): assert key in payload, f"missing {key}" node = payload["node"] for key in ( "fingerprint", "domain", "generated_at", "transparency_log_head_hash", "translog_entry_count", "peer_count", "vouches_out_count", "vouches_in_count", "corroboration_count_24h", "signals_count_24h", ): assert key in node, f"missing node.{key}" assert node["domain"] == "me.example" assert node["fingerprint"] == federation.node_fingerprint() def test_explore_peer_carries_public_safe_stats(fresh_db, fed_dir): fp, pem = _make_peer_pubkey() federation.register_peer("peer.example", fp, pem, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() for i in range(3): db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id=f"1.2.3.{i}", signal_hash=f"hash-{i}", received_at=now_iso, raw_json=json.dumps({"type": "ip", "value": f"1.2.3.{i}"}), )) with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view() assert len(payload["peers"]) == 1 p = payload["peers"][0] # Public-safe stats present. for key in ( "signal_count_24h", "signal_count_total", "cases_24h", "iocs_24h", "quorum_contribution_24h", "last_seen", ): assert key in p assert p["signal_count_24h"] == 3 assert p["iocs_24h"] == 3 assert p["cases_24h"] == 0 # Sensitive fields are not surfaced per-peer. assert "severity_breakdown" not in p assert "ioc_type_breakdown" not in p assert "recent_translog" not in p # ---------- signature round-trip --------------------------------------- def test_explore_view_signature_round_trip(fresh_db, fed_dir): fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") federation.issue_vouch(fp, ttl_days=30) with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view() assert "signature" in payload sig = base64.b64decode(payload["signature"]) unsigned = {k: v for k, v in payload.items() if k != "signature"} assert federation.verify_payload( federation.canonical_json(unsigned), sig, federation.public_key_pem(), ) is True # ---------- no-leak invariants ----------------------------------------- def test_explore_view_has_no_ioc_values_or_case_ids_or_raw_json(fresh_db, fed_dir): """Public payload must not expose IOC values, case_ids in raw form, or raw_json. This is the core transparency-vs-leakage contract: anyone can see who's talking to whom and how much, but never what they're saying. """ fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="evil-domain-do-not-leak.com", signal_hash="ioc-hash-leak", received_at=now_iso, raw_json=json.dumps({"type": "domain", "value": "evil-domain-do-not-leak.com"}), )) db.record_signal(dict( peer_fingerprint=fp, signal_type="case", signal_id="CASE-SECRET-42", signal_hash="case-hash-leak", received_at=now_iso, raw_json=json.dumps({"severity": "critical", "case_id": "CASE-SECRET-42"}), )) with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view() flat = json.dumps(payload, default=str) # IOC values. assert "evil-domain-do-not-leak.com" not in flat # Case ids (raw). assert "CASE-SECRET-42" not in flat # raw_json shape never serialized. assert "raw_json" not in flat # Sector-leaking breakdowns. assert "severity_breakdown" not in flat assert "ioc_type_breakdown" not in flat # ---------- transitive peers -------------------------------------------- def test_explore_transitive_peers_populated_from_peer_responses(fresh_db, fed_dir): direct_fp, direct_pem = _make_peer_pubkey() federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") far_a, _ = _make_peer_pubkey() far_b, _ = _make_peer_pubkey() fake_payload: Dict[str, Any] = { "fingerprint": direct_fp, "peers": [ {"fingerprint": far_a, "domain": "far-a.example"}, {"fingerprint": far_b, "domain": "far-b.example"}, ], "vouches": [], } with patch.object(network_view, "_fetch_peer_explore", return_value=fake_payload): payload = build_explore_view() tps = payload["transitive_peers"] fps = {t["fingerprint"] for t in tps} assert far_a in fps assert far_b in fps via_fps = {t["via_peer_fingerprint"] for t in tps} assert via_fps == {direct_fp} def test_explore_transitive_peers_falls_back_to_network_endpoint(fresh_db, fed_dir): """If a peer doesn't have /federation/explore/data (older node), fall back to /federation/network — the public-view shape is the same {fingerprint, peers}.""" direct_fp, direct_pem = _make_peer_pubkey() federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") far_fp, _ = _make_peer_pubkey() fallback_payload: Dict[str, Any] = { "fingerprint": direct_fp, "peers": [{"fingerprint": far_fp, "domain": "far.example"}], "vouches": [], } with patch.object(network_view, "_fetch_peer_explore", return_value=None), \ patch.object(network_view, "_fetch_peer_network", return_value=fallback_payload): payload = build_explore_view() assert any(t["fingerprint"] == far_fp for t in payload["transitive_peers"]) def test_explore_transitive_peers_dedupe_against_direct(fresh_db, fed_dir): """If a transitive fp is already a direct peer, don't duplicate it.""" direct_fp, direct_pem = _make_peer_pubkey() federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") fake_payload = { "fingerprint": direct_fp, # Direct peer's own fp echoed back — must be deduped. "peers": [{"fingerprint": direct_fp, "domain": "direct.example"}], "vouches": [], } with patch.object(network_view, "_fetch_peer_explore", return_value=fake_payload): payload = build_explore_view() assert payload["transitive_peers"] == [] # ---------- vouches_in -------------------------------------------------- def test_explore_vouches_in_filters_to_target_self_and_trusted_vouchers(fresh_db, fed_dir): """vouches_in includes ONLY entries naming us as target whose voucher we trust.""" our_fp = federation.node_fingerprint() fp_trusted, pem_t = _make_peer_pubkey() fp_unknown, pem_u = _make_peer_pubkey() federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted") federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown") now = datetime.now(timezone.utc).isoformat() # Trusted peer vouches for us. db.upsert_vouch(dict( voucher_fingerprint=fp_trusted, target_fingerprint=our_fp, issued_at=now, expires_at=None, signature="trusted-sig", )) # Unknown peer also "vouches" for us — must NOT leak. db.upsert_vouch(dict( voucher_fingerprint=fp_unknown, target_fingerprint=our_fp, issued_at=now, expires_at=None, signature="rogue-sig", )) # Vouch naming someone else — must NOT appear in vouches_in. other_fp, _ = _make_peer_pubkey() db.upsert_vouch(dict( voucher_fingerprint=fp_trusted, target_fingerprint=other_fp, issued_at=now, expires_at=None, signature="other-sig", )) with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view() vouchers = {v["voucher_fingerprint"] for v in payload["vouches_in"]} assert vouchers == {fp_trusted} # And the rogue signature is not anywhere in the payload. assert "rogue-sig" not in json.dumps(payload, default=str) # ---------- corroboration counter -------------------------------------- def test_explore_corroboration_count_matches_distinct_shared_hashes(fresh_db, fed_dir): fp_a, pem_a = _make_peer_pubkey() fp_b, pem_b = _make_peer_pubkey() federation.register_peer("a.example", fp_a, pem_a, status="trusted") federation.register_peer("b.example", fp_b, pem_b, status="trusted") now_iso = datetime.now(timezone.utc).isoformat() # Two shared hashes between A and B. for h in ("shared-1", "shared-2"): for fp in (fp_a, fp_b): db.record_signal(dict( peer_fingerprint=fp, signal_type="ioc", signal_id="x", signal_hash=h, received_at=now_iso, raw_json="{}", )) # One solo hash — must NOT count. db.record_signal(dict( peer_fingerprint=fp_a, signal_type="ioc", signal_id="solo", signal_hash="solo-hash", received_at=now_iso, raw_json="{}", )) with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view() assert payload["corroboration_count_24h"] == 2 assert payload["node"]["corroboration_count_24h"] == 2 # ---------- transparency log headline ---------------------------------- def test_explore_node_translog_headline_reflects_chain(fresh_db, fed_dir): translog.append("vouch", {"foo": "bar"}) translog.append("signal", {"x": 1}) with _silence_explore_fetch(), _silence_network_fetch(): payload = build_explore_view() node = payload["node"] assert node["translog_entry_count"] == 2 assert isinstance(node["transparency_log_head_hash"], str) assert len(node["transparency_log_head_hash"]) == 64 # hex sha256 # ---------- HTTP routes ------------------------------------------------- def _mk_app() -> FastAPI: app = FastAPI() app.add_middleware(SessionMiddleware, secret_key="test-secret") import tempfile from pathlib import Path as _Path # We need real templates for /federation/explore HTML response. here = _Path(__file__).resolve().parent.parent / "src" / "psyc" / "cockpit" / "templates" templates = Jinja2Templates(directory=str(here)) federation_routes.register(app, templates) return app def test_federation_explore_endpoint_returns_html(fresh_db, fed_dir): with _silence_explore_fetch(), _silence_network_fetch(): client = TestClient(_mk_app()) r = client.get("/federation/explore") assert r.status_code == 200 assert "text/html" in r.headers.get("content-type", "") # Banner + page title are present. body = r.text assert "Federation Explorer" in body def test_federation_explore_data_returns_signed_json(fresh_db, fed_dir): fp, pem = _make_peer_pubkey() federation.register_peer("trusted.example", fp, pem, status="trusted") with _silence_explore_fetch(), _silence_network_fetch(): client = TestClient(_mk_app()) r = client.get("/federation/explore/data") assert r.status_code == 200 data = r.json() assert "signature" in data assert "node" in data sig = base64.b64decode(data["signature"]) unsigned = {k: v for k, v in data.items() if k != "signature"} assert federation.verify_payload( federation.canonical_json(unsigned), sig, federation.public_key_pem(), ) is True def test_federation_explore_data_has_cors_header(fresh_db, fed_dir): """Other psyc nodes' explore pages need to fetch this from the browser.""" with _silence_explore_fetch(), _silence_network_fetch(): client = TestClient(_mk_app()) r = client.get("/federation/explore/data") assert r.status_code == 200 assert r.headers.get("access-control-allow-origin") == "*" def test_federation_info_has_explore_and_cors(fresh_db, fed_dir): client = TestClient(_mk_app()) r = client.get("/federation/info") assert r.status_code == 200 data = r.json() assert data.get("explore") == "/federation/explore" assert r.headers.get("access-control-allow-origin") == "*" def test_existing_public_endpoints_have_cors_header(fresh_db, fed_dir): """All public endpoints must be cross-origin fetchable for the explorer.""" client = TestClient(_mk_app()) for path in ( "/federation/key", "/federation/feed", "/federation/vouches", "/federation/log", "/federation/log/verify", "/federation/peers/public", "/federation/network", ): r = client.get(path) assert r.status_code in (200, 409), f"{path} status {r.status_code}" assert r.headers.get("access-control-allow-origin") == "*", f"{path} missing CORS"