stage-exp-f explore: tests

This commit is contained in:
m17hr1l
2026-06-07 01:17:11 +02:00
parent c2bd68e246
commit 9ab3271bc8

427
tests/test_explore_view.py Normal file
View File

@@ -0,0 +1,427 @@
"""Federation explore view — public transparency payload tests.
Sibling to `test_network_view.py`; focused on the explore-only shape:
shape contract, signature round-trip, no-leak invariants, transitive walk,
inbound vouches filter, and the corroboration counter.
"""
from __future__ import annotations
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from unittest.mock import patch
import pytest
from fastapi import FastAPI
from fastapi.templating import Jinja2Templates
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from starlette.middleware.sessions import SessionMiddleware
from psyc import db
from psyc.cockpit import federation_routes
from psyc.lines import federation, network_view, translog
from psyc.lines.network_view import build_explore_view
# ---------- fixtures ----------------------------------------------------
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
@pytest.fixture(autouse=True)
def reset_explore_caches(monkeypatch):
"""Prevent route-level cache bleed between tests."""
monkeypatch.setattr(network_view, "_TRANSITIVE_CACHE", {"ts": 0.0, "view": None})
federation_routes._FEED_CACHE["payload"] = None
federation_routes._PUBLIC_PEERS_CACHE["payload"] = None
federation_routes._PUBLIC_NETWORK_CACHE["payload"] = None
if hasattr(federation_routes, "_EXPLORE_CACHE"):
federation_routes._EXPLORE_CACHE["payload"] = None
yield
def _make_peer_pubkey() -> tuple[str, str]:
"""Return (fingerprint, pubkey_pem) for a synthetic peer keypair."""
import hashlib
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
priv = ed25519.Ed25519PrivateKey.generate()
pub = priv.public_key()
pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
raw = pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
fp = hashlib.sha256(raw).digest()[:16].hex()
return fp, pem
def _silence_explore_fetch():
return patch.object(network_view, "_fetch_peer_explore", return_value=None)
def _silence_network_fetch():
return patch.object(network_view, "_fetch_peer_network", return_value=None)
# ---------- schema ------------------------------------------------------
def test_explore_view_top_level_shape(fresh_db, fed_dir):
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view(node_domain="me.example")
for key in (
"version", "fingerprint", "generated_at",
"node", "peers", "vouches", "vouches_out", "vouches_in",
"transitive_peers", "corroboration_count_24h", "signature",
):
assert key in payload, f"missing {key}"
node = payload["node"]
for key in (
"fingerprint", "domain", "generated_at",
"transparency_log_head_hash", "translog_entry_count",
"peer_count", "vouches_out_count", "vouches_in_count",
"corroboration_count_24h", "signals_count_24h",
):
assert key in node, f"missing node.{key}"
assert node["domain"] == "me.example"
assert node["fingerprint"] == federation.node_fingerprint()
def test_explore_peer_carries_public_safe_stats(fresh_db, fed_dir):
fp, pem = _make_peer_pubkey()
federation.register_peer("peer.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
for i in range(3):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id=f"1.2.3.{i}",
signal_hash=f"hash-{i}",
received_at=now_iso,
raw_json=json.dumps({"type": "ip", "value": f"1.2.3.{i}"}),
))
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view()
assert len(payload["peers"]) == 1
p = payload["peers"][0]
# Public-safe stats present.
for key in (
"signal_count_24h", "signal_count_total",
"cases_24h", "iocs_24h",
"quorum_contribution_24h", "last_seen",
):
assert key in p
assert p["signal_count_24h"] == 3
assert p["iocs_24h"] == 3
assert p["cases_24h"] == 0
# Sensitive fields are not surfaced per-peer.
assert "severity_breakdown" not in p
assert "ioc_type_breakdown" not in p
assert "recent_translog" not in p
# ---------- signature round-trip ---------------------------------------
def test_explore_view_signature_round_trip(fresh_db, fed_dir):
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
federation.issue_vouch(fp, ttl_days=30)
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view()
assert "signature" in payload
sig = base64.b64decode(payload["signature"])
unsigned = {k: v for k, v in payload.items() if k != "signature"}
assert federation.verify_payload(
federation.canonical_json(unsigned),
sig,
federation.public_key_pem(),
) is True
# ---------- no-leak invariants -----------------------------------------
def test_explore_view_has_no_ioc_values_or_case_ids_or_raw_json(fresh_db, fed_dir):
"""Public payload must not expose IOC values, case_ids in raw form, or raw_json.
This is the core transparency-vs-leakage contract: anyone can see who's
talking to whom and how much, but never what they're saying.
"""
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="evil-domain-do-not-leak.com",
signal_hash="ioc-hash-leak",
received_at=now_iso,
raw_json=json.dumps({"type": "domain", "value": "evil-domain-do-not-leak.com"}),
))
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="case",
signal_id="CASE-SECRET-42",
signal_hash="case-hash-leak",
received_at=now_iso,
raw_json=json.dumps({"severity": "critical", "case_id": "CASE-SECRET-42"}),
))
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view()
flat = json.dumps(payload, default=str)
# IOC values.
assert "evil-domain-do-not-leak.com" not in flat
# Case ids (raw).
assert "CASE-SECRET-42" not in flat
# raw_json shape never serialized.
assert "raw_json" not in flat
# Sector-leaking breakdowns.
assert "severity_breakdown" not in flat
assert "ioc_type_breakdown" not in flat
# ---------- transitive peers --------------------------------------------
def test_explore_transitive_peers_populated_from_peer_responses(fresh_db, fed_dir):
direct_fp, direct_pem = _make_peer_pubkey()
federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted")
far_a, _ = _make_peer_pubkey()
far_b, _ = _make_peer_pubkey()
fake_payload: Dict[str, Any] = {
"fingerprint": direct_fp,
"peers": [
{"fingerprint": far_a, "domain": "far-a.example"},
{"fingerprint": far_b, "domain": "far-b.example"},
],
"vouches": [],
}
with patch.object(network_view, "_fetch_peer_explore", return_value=fake_payload):
payload = build_explore_view()
tps = payload["transitive_peers"]
fps = {t["fingerprint"] for t in tps}
assert far_a in fps
assert far_b in fps
via_fps = {t["via_peer_fingerprint"] for t in tps}
assert via_fps == {direct_fp}
def test_explore_transitive_peers_falls_back_to_network_endpoint(fresh_db, fed_dir):
"""If a peer doesn't have /federation/explore/data (older node), fall back
to /federation/network — the public-view shape is the same {fingerprint, peers}."""
direct_fp, direct_pem = _make_peer_pubkey()
federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted")
far_fp, _ = _make_peer_pubkey()
fallback_payload: Dict[str, Any] = {
"fingerprint": direct_fp,
"peers": [{"fingerprint": far_fp, "domain": "far.example"}],
"vouches": [],
}
with patch.object(network_view, "_fetch_peer_explore", return_value=None), \
patch.object(network_view, "_fetch_peer_network", return_value=fallback_payload):
payload = build_explore_view()
assert any(t["fingerprint"] == far_fp for t in payload["transitive_peers"])
def test_explore_transitive_peers_dedupe_against_direct(fresh_db, fed_dir):
"""If a transitive fp is already a direct peer, don't duplicate it."""
direct_fp, direct_pem = _make_peer_pubkey()
federation.register_peer("direct.example", direct_fp, direct_pem, status="trusted")
fake_payload = {
"fingerprint": direct_fp,
# Direct peer's own fp echoed back — must be deduped.
"peers": [{"fingerprint": direct_fp, "domain": "direct.example"}],
"vouches": [],
}
with patch.object(network_view, "_fetch_peer_explore", return_value=fake_payload):
payload = build_explore_view()
assert payload["transitive_peers"] == []
# ---------- vouches_in --------------------------------------------------
def test_explore_vouches_in_filters_to_target_self_and_trusted_vouchers(fresh_db, fed_dir):
"""vouches_in includes ONLY entries naming us as target whose voucher we trust."""
our_fp = federation.node_fingerprint()
fp_trusted, pem_t = _make_peer_pubkey()
fp_unknown, pem_u = _make_peer_pubkey()
federation.register_peer("trusted.example", fp_trusted, pem_t, status="trusted")
federation.register_peer("unknown.example", fp_unknown, pem_u, status="unknown")
now = datetime.now(timezone.utc).isoformat()
# Trusted peer vouches for us.
db.upsert_vouch(dict(
voucher_fingerprint=fp_trusted,
target_fingerprint=our_fp,
issued_at=now,
expires_at=None,
signature="trusted-sig",
))
# Unknown peer also "vouches" for us — must NOT leak.
db.upsert_vouch(dict(
voucher_fingerprint=fp_unknown,
target_fingerprint=our_fp,
issued_at=now,
expires_at=None,
signature="rogue-sig",
))
# Vouch naming someone else — must NOT appear in vouches_in.
other_fp, _ = _make_peer_pubkey()
db.upsert_vouch(dict(
voucher_fingerprint=fp_trusted,
target_fingerprint=other_fp,
issued_at=now,
expires_at=None,
signature="other-sig",
))
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view()
vouchers = {v["voucher_fingerprint"] for v in payload["vouches_in"]}
assert vouchers == {fp_trusted}
# And the rogue signature is not anywhere in the payload.
assert "rogue-sig" not in json.dumps(payload, default=str)
# ---------- corroboration counter --------------------------------------
def test_explore_corroboration_count_matches_distinct_shared_hashes(fresh_db, fed_dir):
fp_a, pem_a = _make_peer_pubkey()
fp_b, pem_b = _make_peer_pubkey()
federation.register_peer("a.example", fp_a, pem_a, status="trusted")
federation.register_peer("b.example", fp_b, pem_b, status="trusted")
now_iso = datetime.now(timezone.utc).isoformat()
# Two shared hashes between A and B.
for h in ("shared-1", "shared-2"):
for fp in (fp_a, fp_b):
db.record_signal(dict(
peer_fingerprint=fp,
signal_type="ioc",
signal_id="x",
signal_hash=h,
received_at=now_iso,
raw_json="{}",
))
# One solo hash — must NOT count.
db.record_signal(dict(
peer_fingerprint=fp_a,
signal_type="ioc",
signal_id="solo",
signal_hash="solo-hash",
received_at=now_iso,
raw_json="{}",
))
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view()
assert payload["corroboration_count_24h"] == 2
assert payload["node"]["corroboration_count_24h"] == 2
# ---------- transparency log headline ----------------------------------
def test_explore_node_translog_headline_reflects_chain(fresh_db, fed_dir):
translog.append("vouch", {"foo": "bar"})
translog.append("signal", {"x": 1})
with _silence_explore_fetch(), _silence_network_fetch():
payload = build_explore_view()
node = payload["node"]
assert node["translog_entry_count"] == 2
assert isinstance(node["transparency_log_head_hash"], str)
assert len(node["transparency_log_head_hash"]) == 64 # hex sha256
# ---------- HTTP routes -------------------------------------------------
def _mk_app() -> FastAPI:
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="test-secret")
import tempfile
from pathlib import Path as _Path
# We need real templates for /federation/explore HTML response.
here = _Path(__file__).resolve().parent.parent / "src" / "psyc" / "cockpit" / "templates"
templates = Jinja2Templates(directory=str(here))
federation_routes.register(app, templates)
return app
def test_federation_explore_endpoint_returns_html(fresh_db, fed_dir):
with _silence_explore_fetch(), _silence_network_fetch():
client = TestClient(_mk_app())
r = client.get("/federation/explore")
assert r.status_code == 200
assert "text/html" in r.headers.get("content-type", "")
# Banner + page title are present.
body = r.text
assert "Federation Explorer" in body
def test_federation_explore_data_returns_signed_json(fresh_db, fed_dir):
fp, pem = _make_peer_pubkey()
federation.register_peer("trusted.example", fp, pem, status="trusted")
with _silence_explore_fetch(), _silence_network_fetch():
client = TestClient(_mk_app())
r = client.get("/federation/explore/data")
assert r.status_code == 200
data = r.json()
assert "signature" in data
assert "node" in data
sig = base64.b64decode(data["signature"])
unsigned = {k: v for k, v in data.items() if k != "signature"}
assert federation.verify_payload(
federation.canonical_json(unsigned),
sig,
federation.public_key_pem(),
) is True
def test_federation_explore_data_has_cors_header(fresh_db, fed_dir):
"""Other psyc nodes' explore pages need to fetch this from the browser."""
with _silence_explore_fetch(), _silence_network_fetch():
client = TestClient(_mk_app())
r = client.get("/federation/explore/data")
assert r.status_code == 200
assert r.headers.get("access-control-allow-origin") == "*"
def test_federation_info_has_explore_and_cors(fresh_db, fed_dir):
client = TestClient(_mk_app())
r = client.get("/federation/info")
assert r.status_code == 200
data = r.json()
assert data.get("explore") == "/federation/explore"
assert r.headers.get("access-control-allow-origin") == "*"
def test_existing_public_endpoints_have_cors_header(fresh_db, fed_dir):
"""All public endpoints must be cross-origin fetchable for the explorer."""
client = TestClient(_mk_app())
for path in (
"/federation/key",
"/federation/feed",
"/federation/vouches",
"/federation/log",
"/federation/log/verify",
"/federation/peers/public",
"/federation/network",
):
r = client.get(path)
assert r.status_code in (200, 409), f"{path} status {r.status_code}"
assert r.headers.get("access-control-allow-origin") == "*", f"{path} missing CORS"