- F1 case_detail.html: scheme-check source_ref href (block javascript: URLs) - F2 admin.html / F3 admin_federation.html: replace inline onsubmit confirm() with data-attr + global handler in base.html (no more label/domain interpolation into onsubmit attribute string) - federation.register_peer: validate hostname + fingerprint regex at ingest - federation_explore.html: window.PSYC_EXPLORE via | tojson - federation_network.js: DOMAIN_RE guard on peer-supplied domain before building cross-origin fetch URL (also closes open-redirect via 'open their explorer' button) - app.py: nosniff + Referrer-Policy: no-referrer + X-Frame-Options: DENY - sw.js: psyc-v11 cache bump CSP deferred — needs inline scripts moved to external files first. Tests: +2 cases, 245/245 green.
263 lines
9.2 KiB
Python
263 lines
9.2 KiB
Python
"""Federation — identity, signed feed, peer registry, signal buffer."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import os
|
|
import stat
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
from psyc import db
|
|
from psyc.lines import federation
|
|
from psyc.lines.federation import (
|
|
DNSRecord,
|
|
build_signed_feed,
|
|
canonical_json,
|
|
dns_record,
|
|
import_signed_feed,
|
|
node_fingerprint,
|
|
node_keypair,
|
|
public_key_pem,
|
|
sign_payload,
|
|
verify_payload,
|
|
)
|
|
from psyc.result import Err, Ok
|
|
from conftest import make_case
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(eng, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", eng)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
@pytest.fixture
|
|
def fed_dir(tmp_path, monkeypatch):
|
|
"""Redirect federation key paths to a tmp dir so each test gets a fresh key."""
|
|
d = tmp_path / "federation"
|
|
monkeypatch.setattr(federation, "FED_DIR", d)
|
|
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
|
|
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
|
|
yield d
|
|
|
|
|
|
def test_keypair_persisted_with_correct_perms(fed_dir):
|
|
priv, pub = node_keypair()
|
|
assert federation.PRIVATE_KEY_PATH.exists()
|
|
assert federation.PUBLIC_KEY_PATH.exists()
|
|
mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode)
|
|
assert mode == 0o600
|
|
|
|
|
|
def test_keypair_idempotent_across_calls(fed_dir):
|
|
priv1, pub1 = node_keypair()
|
|
priv2, pub2 = node_keypair()
|
|
# raw bytes match — same key loaded twice, not regenerated
|
|
raw1 = pub1.public_bytes(
|
|
encoding=federation.serialization.Encoding.Raw,
|
|
format=federation.serialization.PublicFormat.Raw,
|
|
)
|
|
raw2 = pub2.public_bytes(
|
|
encoding=federation.serialization.Encoding.Raw,
|
|
format=federation.serialization.PublicFormat.Raw,
|
|
)
|
|
assert raw1 == raw2
|
|
|
|
|
|
def test_fingerprint_is_stable_and_32_hex(fed_dir):
|
|
fp1 = node_fingerprint()
|
|
fp2 = node_fingerprint()
|
|
assert fp1 == fp2
|
|
assert len(fp1) == 32
|
|
assert all(c in "0123456789abcdef" for c in fp1)
|
|
|
|
|
|
def test_sign_verify_roundtrip(fed_dir):
|
|
payload = b"hello federation"
|
|
sig = sign_payload(payload)
|
|
assert verify_payload(payload, sig, public_key_pem()) is True
|
|
|
|
|
|
def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path):
|
|
payload = b"the truth"
|
|
sig = sign_payload(payload)
|
|
|
|
# Build a *different* keypair in a separate directory and use its pubkey.
|
|
other = tmp_path / "other-federation"
|
|
other.mkdir()
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
other_priv = ed25519.Ed25519PrivateKey.generate()
|
|
other_pub_pem = other_priv.public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
).decode("ascii")
|
|
|
|
assert verify_payload(payload, sig, other_pub_pem) is False
|
|
|
|
|
|
def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir):
|
|
sig = sign_payload(b"x")
|
|
assert verify_payload(b"x", sig, "not a pem") is False
|
|
|
|
|
|
def test_canonical_json_is_deterministic():
|
|
a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}})
|
|
b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}})
|
|
assert a == b
|
|
|
|
|
|
def test_dns_record_txt_value_matches_spec(fed_dir):
|
|
rec = dns_record("example.com")
|
|
assert isinstance(rec, DNSRecord)
|
|
fp = node_fingerprint()
|
|
assert rec.srv_name == "_psyc._tcp.example.com"
|
|
assert rec.srv_target == "example.com."
|
|
assert rec.srv_port == 443
|
|
assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
|
|
# human instructions include both record lines
|
|
assert "_psyc._tcp.example.com" in rec.human_instructions
|
|
assert "SRV" in rec.human_instructions
|
|
assert "TXT" in rec.human_instructions
|
|
|
|
|
|
def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
|
|
case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"])
|
|
db.upsert_case(case)
|
|
feed = build_signed_feed(window_hours=24)
|
|
assert feed["version"] == "psyc1"
|
|
assert feed["fingerprint"] == node_fingerprint()
|
|
assert feed["signature"]
|
|
# cases entry made it in
|
|
assert any(c["case_id"] == case.case_id for c in feed["cases"])
|
|
|
|
# Import using our own pubkey against a *different* declared fingerprint:
|
|
# swap the fingerprint so import_signed_feed doesn't reject as a loop.
|
|
pub = public_key_pem()
|
|
# Use a fresh keypair to act as "peer" — sign a feed with that key.
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
import hashlib
|
|
peer_priv = ed25519.Ed25519PrivateKey.generate()
|
|
peer_pub_pem = peer_priv.public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
).decode("ascii")
|
|
peer_raw = peer_priv.public_key().public_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PublicFormat.Raw,
|
|
)
|
|
peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex()
|
|
feed["fingerprint"] = peer_fp
|
|
unsigned = {k: v for k, v in feed.items() if k != "signature"}
|
|
new_sig = peer_priv.sign(canonical_json(unsigned))
|
|
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
|
|
|
|
# Stage 4 listening gate: peer must be trusted to land signals.
|
|
federation.register_peer("peer.example", peer_fp, peer_pub_pem, status="trusted")
|
|
result = import_signed_feed(feed, peer_pub_pem)
|
|
assert isinstance(result, Ok), getattr(result, "reason", "")
|
|
summary = result.value
|
|
assert summary.peer_fingerprint == peer_fp
|
|
assert summary.cases_seen >= 1
|
|
|
|
|
|
def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir):
|
|
db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"]))
|
|
feed = build_signed_feed(window_hours=24)
|
|
# build a *different* pubkey to claim verification against
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
).decode("ascii")
|
|
# also need to change fingerprint so the loop-check doesn't trigger first
|
|
feed["fingerprint"] = "deadbeef" * 4
|
|
result = import_signed_feed(feed, other_pub_pem)
|
|
assert isinstance(result, Err)
|
|
|
|
|
|
def test_import_own_feed_returns_loop_err(fresh_db, fed_dir):
|
|
db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"]))
|
|
feed = build_signed_feed(window_hours=24)
|
|
result = import_signed_feed(feed, public_key_pem())
|
|
assert isinstance(result, Err)
|
|
assert "loop" in result.reason
|
|
|
|
|
|
def test_record_signal_then_lookup_by_hash(fresh_db):
|
|
rid = db.record_signal(dict(
|
|
peer_fingerprint="abc123",
|
|
signal_type="ioc",
|
|
signal_id="1.2.3.4",
|
|
signal_hash="hash-aaa",
|
|
received_at="2026-01-01T00:00:00+00:00",
|
|
raw_json="{}",
|
|
))
|
|
assert rid > 0
|
|
rows = db.signals_for_hash("hash-aaa")
|
|
assert len(rows) == 1
|
|
assert rows[0]["peer_fingerprint"] == "abc123"
|
|
# second peer reports the same hash → both surface for quorum check
|
|
db.record_signal(dict(
|
|
peer_fingerprint="def456",
|
|
signal_type="ioc",
|
|
signal_id="1.2.3.4",
|
|
signal_hash="hash-aaa",
|
|
received_at="2026-01-01T00:01:00+00:00",
|
|
raw_json="{}",
|
|
))
|
|
rows = db.signals_for_hash("hash-aaa")
|
|
assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"}
|
|
|
|
|
|
def test_peer_registry_crud(fresh_db, fed_dir):
|
|
federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted")
|
|
peers = federation.list_peers()
|
|
assert len(peers) == 1
|
|
assert peers[0].domain == "peer.example"
|
|
assert peers[0].status == "trusted"
|
|
|
|
federation.set_peer_status("peer.example", "blocked")
|
|
assert federation.get_peer("peer.example").status == "blocked"
|
|
|
|
federation.remove_peer("peer.example")
|
|
assert federation.list_peers() == []
|
|
|
|
|
|
def test_register_peer_rejects_malformed_domain(fresh_db, fed_dir):
|
|
"""XSS guard: domain must look like a hostname (+ optional :port)."""
|
|
import pytest
|
|
bad = [
|
|
"evil.com'); alert(1); //",
|
|
"evil.com<script>",
|
|
"evil.com onclick=alert(1)",
|
|
"",
|
|
"evil com", # space
|
|
"/etc/passwd",
|
|
"evil.com/?phish=1",
|
|
]
|
|
for d in bad:
|
|
with pytest.raises(ValueError):
|
|
federation.register_peer(d, "ff" * 16, "PEM")
|
|
# And good ones still pass:
|
|
for d in ["peer.example.com", "peer.example.com:8443", "peer-1.example", "127.0.0.1:8767"]:
|
|
federation.register_peer(d, "ff" * 16, "PEM")
|
|
federation.remove_peer(d)
|
|
|
|
|
|
def test_register_peer_rejects_malformed_fingerprint(fresh_db, fed_dir):
|
|
"""Defense-in-depth: fingerprint must be 32 hex chars."""
|
|
import pytest
|
|
with pytest.raises(ValueError):
|
|
federation.register_peer("peer.example", "not-hex", "PEM")
|
|
with pytest.raises(ValueError):
|
|
federation.register_peer("peer.example", "ff" * 8, "PEM") # too short
|