From 9ab3271bc8ee1043e2b2080c545f9c74eb031176 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sun, 7 Jun 2026 01:17:11 +0200 Subject: [PATCH] stage-exp-f explore: tests --- tests/test_explore_view.py | 427 +++++++++++++++++++++++++++++++++++++ 1 file changed, 427 insertions(+) create mode 100644 tests/test_explore_view.py diff --git a/tests/test_explore_view.py b/tests/test_explore_view.py new file mode 100644 index 0000000..4424586 --- /dev/null +++ b/tests/test_explore_view.py @@ -0,0 +1,427 @@ +"""Federation explore view — public transparency payload tests. + +Sibling to `test_network_view.py`; focused on the explore-only shape: +shape contract, signature round-trip, no-leak invariants, transitive walk, +inbound vouches filter, and the corroboration counter. +""" + +from __future__ import annotations + +import base64 +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional +from unittest.mock import patch + +import pytest +from fastapi import FastAPI +from fastapi.templating import Jinja2Templates +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from starlette.middleware.sessions import SessionMiddleware + +from psyc import db +from psyc.cockpit import federation_routes +from psyc.lines import federation, network_view, translog +from psyc.lines.network_view import build_explore_view + + +# ---------- fixtures ---------------------------------------------------- + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fed_dir(tmp_path, monkeypatch): + d = tmp_path / "federation" + monkeypatch.setattr(federation, "FED_DIR", d) + monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key") + monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub") + yield d + + +@pytest.fixture(autouse=True) +def reset_explore_caches(monkeypatch): + """Prevent route-level cache bleed between tests.""" + monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None}) + federation_routes._FEED_CACHE["payload"] = None + federation_routes._PUBLIC_PEERS_CACHE["payload"] = None + federation_routes._PUBLIC_NETWORK_CACHE["payload"] = None + if hasattr(federation_routes, "_EXPLORE_CACHE"): + federation_routes._EXPLORE_CACHE["payload"] = None + yield + + +def _make_peer_pubkey() -> tuple[str, str]: + """Return (fingerprint, pubkey_pem) for a synthetic peer keypair.""" + import hashlib + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 + priv = ed25519.Ed25519PrivateKey.generate() + pub = priv.public_key() + pem = pub.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("ascii") + raw = pub.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + fp = hashlib.sha256(raw).digest()[:16].hex() + return fp, pem + + +def _silence_explore_fetch(): + return patch.object(network_view, "_fetch_peer_explore", return_value=None) + + +def _silence_network_fetch(): + return patch.object(network_view, "_fetch_peer_network", return_value=None) + + +# ---------- schema ------------------------------------------------------ + +def test_explore_view_top_level_shape(fresh_db, fed_dir): + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view(node_domain="me.example") + for key in ( + "version", "fingerprint", "generated_at", + "node", "peers", "vouches", "vouches_out", "vouches_in", + "transitive_peers", "corroboration_count_24h", "signature", + ): + assert key in payload, f"missing {key}" + node = payload["node"] + for key in ( + "fingerprint", "domain", "generated_at", + "transparency_log_head_hash", "translog_entry_count", + "peer_count", "vouches_out_count", "vouches_in_count", + "corroboration_count_24h", "signals_count_24h", + ): + assert key in node, f"missing node.{key}" + assert node["domain"] == "me.example" + assert node["fingerprint"] == federation.node_fingerprint() + + +def test_explore_peer_carries_public_safe_stats(fresh_db, fed_dir): + fp, pem = _make_peer_pubkey() + federation.register_peer("peer.example", fp, pem, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + for i in range(3): + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id=f"1.2.3.{i}", + signal_hash=f"hash-{i}", + received_at=now_iso, + raw_json=json.dumps({"type": "ip", "value": f"1.2.3.{i}"}), + )) + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view() + assert len(payload["peers"]) == 1 + p = payload["peers"][0] + # Public-safe stats present. + for key in ( + "signal_count_24h", "signal_count_total", + "cases_24h", "iocs_24h", + "quorum_contribution_24h", "last_seen", + ): + assert key in p + assert p["signal_count_24h"] == 3 + assert p["iocs_24h"] == 3 + assert p["cases_24h"] == 0 + # Sensitive fields are not surfaced per-peer. + assert "severity_breakdown" not in p + assert "ioc_type_breakdown" not in p + assert "recent_translog" not in p + + +# ---------- signature round-trip --------------------------------------- + +def test_explore_view_signature_round_trip(fresh_db, fed_dir): + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + federation.issue_vouch(fp, ttl_days=30) + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view() + assert "signature" in payload + sig = base64.b64decode(payload["signature"]) + unsigned = {k: v for k, v in payload.items() if k != "signature"} + assert federation.verify_payload( + federation.canonical_json(unsigned), + sig, + federation.public_key_pem(), + ) is True + + +# ---------- no-leak invariants ----------------------------------------- + +def test_explore_view_has_no_ioc_values_or_case_ids_or_raw_json(fresh_db, fed_dir): + """Public payload must not expose IOC values, case_ids in raw form, or raw_json. + + This is the core transparency-vs-leakage contract: anyone can see who's + talking to whom and how much, but never what they're saying. + """ + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="evil-domain-do-not-leak.com", + signal_hash="ioc-hash-leak", + received_at=now_iso, + raw_json=json.dumps({"type": "domain", "value": "evil-domain-do-not-leak.com"}), + )) + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="case", + signal_id="CASE-SECRET-42", + signal_hash="case-hash-leak", + received_at=now_iso, + raw_json=json.dumps({"severity": "critical", "case_id": "CASE-SECRET-42"}), + )) + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view() + flat = json.dumps(payload, default=str) + # IOC values. + assert "evil-domain-do-not-leak.com" not in flat + # Case ids (raw). + assert "CASE-SECRET-42" not in flat + # raw_json shape never serialized. + assert "raw_json" not in flat + # Sector-leaking breakdowns. + assert "severity_breakdown" not in flat + assert "ioc_type_breakdown" not in flat + + +# ---------- transitive peers -------------------------------------------- + +def test_explore_transitive_peers_populated_from_peer_responses(fresh_db, fed_dir): + direct_fp, direct_pem = _make_peer_pubkey() + federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") + far_a, _ = _make_peer_pubkey() + far_b, _ = _make_peer_pubkey() + fake_payload: Dict[str, Any] = { + "fingerprint": direct_fp, + "peers": [ + {"fingerprint": far_a, "domain": "far-a.example"}, + {"fingerprint": far_b, "domain": "far-b.example"}, + ], + "vouches": [], + } + with patch.object(network_view, "_fetch_peer_explore", return_value=fake_payload): + payload = build_explore_view() + tps = payload["transitive_peers"] + fps = {t["fingerprint"] for t in tps} + assert far_a in fps + assert far_b in fps + via_fps = {t["via_peer_fingerprint"] for t in tps} + assert via_fps == {direct_fp} + + +def test_explore_transitive_peers_falls_back_to_network_endpoint(fresh_db, fed_dir): + """If a peer doesn't have /federation/explore/data (older node), fall back + to /federation/network — the public-view shape is the same {fingerprint, peers}.""" + direct_fp, direct_pem = _make_peer_pubkey() + federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") + far_fp, _ = _make_peer_pubkey() + fallback_payload: Dict[str, Any] = { + "fingerprint": direct_fp, + "peers": [{"fingerprint": far_fp, "domain": "far.example"}], + "vouches": [], + } + with patch.object(network_view, "_fetch_peer_explore", return_value=None), \ + patch.object(network_view, "_fetch_peer_network", return_value=fallback_payload): + payload = build_explore_view() + assert any(t["fingerprint"] == far_fp for t in payload["transitive_peers"]) + + +def test_explore_transitive_peers_dedupe_against_direct(fresh_db, fed_dir): + """If a transitive fp is already a direct peer, don't duplicate it.""" + direct_fp, direct_pem = _make_peer_pubkey() + federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted") + fake_payload = { + "fingerprint": direct_fp, + # Direct peer's own fp echoed back — must be deduped. + "peers": [{"fingerprint": direct_fp, "domain": "direct.example"}], + "vouches": [], + } + with patch.object(network_view, "_fetch_peer_explore", return_value=fake_payload): + payload = build_explore_view() + assert payload["transitive_peers"] == [] + + +# ---------- vouches_in -------------------------------------------------- + +def test_explore_vouches_in_filters_to_target_self_and_trusted_vouchers(fresh_db, fed_dir): + """vouches_in includes ONLY entries naming us as target whose voucher we trust.""" + our_fp = federation.node_fingerprint() + fp_trusted, pem_t = _make_peer_pubkey() + fp_unknown, pem_u = _make_peer_pubkey() + federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted") + federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown") + now = datetime.now(timezone.utc).isoformat() + # Trusted peer vouches for us. + db.upsert_vouch(dict( + voucher_fingerprint=fp_trusted, + target_fingerprint=our_fp, + issued_at=now, + expires_at=None, + signature="trusted-sig", + )) + # Unknown peer also "vouches" for us — must NOT leak. + db.upsert_vouch(dict( + voucher_fingerprint=fp_unknown, + target_fingerprint=our_fp, + issued_at=now, + expires_at=None, + signature="rogue-sig", + )) + # Vouch naming someone else — must NOT appear in vouches_in. + other_fp, _ = _make_peer_pubkey() + db.upsert_vouch(dict( + voucher_fingerprint=fp_trusted, + target_fingerprint=other_fp, + issued_at=now, + expires_at=None, + signature="other-sig", + )) + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view() + vouchers = {v["voucher_fingerprint"] for v in payload["vouches_in"]} + assert vouchers == {fp_trusted} + # And the rogue signature is not anywhere in the payload. + assert "rogue-sig" not in json.dumps(payload, default=str) + + +# ---------- corroboration counter -------------------------------------- + +def test_explore_corroboration_count_matches_distinct_shared_hashes(fresh_db, fed_dir): + fp_a, pem_a = _make_peer_pubkey() + fp_b, pem_b = _make_peer_pubkey() + federation.register_peer("a.example", fp_a, pem_a, status="trusted") + federation.register_peer("b.example", fp_b, pem_b, status="trusted") + now_iso = datetime.now(timezone.utc).isoformat() + # Two shared hashes between A and B. + for h in ("shared-1", "shared-2"): + for fp in (fp_a, fp_b): + db.record_signal(dict( + peer_fingerprint=fp, + signal_type="ioc", + signal_id="x", + signal_hash=h, + received_at=now_iso, + raw_json="{}", + )) + # One solo hash — must NOT count. + db.record_signal(dict( + peer_fingerprint=fp_a, + signal_type="ioc", + signal_id="solo", + signal_hash="solo-hash", + received_at=now_iso, + raw_json="{}", + )) + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view() + assert payload["corroboration_count_24h"] == 2 + assert payload["node"]["corroboration_count_24h"] == 2 + + +# ---------- transparency log headline ---------------------------------- + +def test_explore_node_translog_headline_reflects_chain(fresh_db, fed_dir): + translog.append("vouch", {"foo": "bar"}) + translog.append("signal", {"x": 1}) + with _silence_explore_fetch(), _silence_network_fetch(): + payload = build_explore_view() + node = payload["node"] + assert node["translog_entry_count"] == 2 + assert isinstance(node["transparency_log_head_hash"], str) + assert len(node["transparency_log_head_hash"]) == 64 # hex sha256 + + +# ---------- HTTP routes ------------------------------------------------- + +def _mk_app() -> FastAPI: + app = FastAPI() + app.add_middleware(SessionMiddleware, secret_key="test-secret") + import tempfile + from pathlib import Path as _Path + # We need real templates for /federation/explore HTML response. + here = _Path(__file__).resolve().parent.parent / "src" / "psyc" / "cockpit" / "templates" + templates = Jinja2Templates(directory=str(here)) + federation_routes.register(app, templates) + return app + + +def test_federation_explore_endpoint_returns_html(fresh_db, fed_dir): + with _silence_explore_fetch(), _silence_network_fetch(): + client = TestClient(_mk_app()) + r = client.get("/federation/explore") + assert r.status_code == 200 + assert "text/html" in r.headers.get("content-type", "") + # Banner + page title are present. + body = r.text + assert "Federation Explorer" in body + + +def test_federation_explore_data_returns_signed_json(fresh_db, fed_dir): + fp, pem = _make_peer_pubkey() + federation.register_peer("trusted.example", fp, pem, status="trusted") + with _silence_explore_fetch(), _silence_network_fetch(): + client = TestClient(_mk_app()) + r = client.get("/federation/explore/data") + assert r.status_code == 200 + data = r.json() + assert "signature" in data + assert "node" in data + sig = base64.b64decode(data["signature"]) + unsigned = {k: v for k, v in data.items() if k != "signature"} + assert federation.verify_payload( + federation.canonical_json(unsigned), + sig, + federation.public_key_pem(), + ) is True + + +def test_federation_explore_data_has_cors_header(fresh_db, fed_dir): + """Other psyc nodes' explore pages need to fetch this from the browser.""" + with _silence_explore_fetch(), _silence_network_fetch(): + client = TestClient(_mk_app()) + r = client.get("/federation/explore/data") + assert r.status_code == 200 + assert r.headers.get("access-control-allow-origin") == "*" + + +def test_federation_info_has_explore_and_cors(fresh_db, fed_dir): + client = TestClient(_mk_app()) + r = client.get("/federation/info") + assert r.status_code == 200 + data = r.json() + assert data.get("explore") == "/federation/explore" + assert r.headers.get("access-control-allow-origin") == "*" + + +def test_existing_public_endpoints_have_cors_header(fresh_db, fed_dir): + """All public endpoints must be cross-origin fetchable for the explorer.""" + client = TestClient(_mk_app()) + for path in ( + "/federation/key", + "/federation/feed", + "/federation/vouches", + "/federation/log", + "/federation/log/verify", + "/federation/peers/public", + "/federation/network", + ): + r = client.get(path) + assert r.status_code in (200, 409), f"{path} status {r.status_code}" + assert r.headers.get("access-control-allow-origin") == "*", f"{path} missing CORS"