CERT-Bund (authority) requires_approval by default; PSYC_REQUIRE_APPROVAL=1 forces every routable submission through the queue. Courier branches at execute_routes: approval-required → freeze payload + enqueue, no HTTP; else submit directly as before. Approve dispatches the frozen payload to mock-cert and writes the ledger row (detail=approved_by=…); reject writes a ledger row with the reviewer's reason. CLI: queue / approve / reject. Cockpit /queue page with POST approve / reject and counts. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
131 lines
5.5 KiB
Python
131 lines
5.5 KiB
Python
"""Courier approval-queue tests — gating, dispatch, rejection."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine, select
|
|
|
|
from psyc import db
|
|
from psyc.lines import courier, ledger as ledger_line
|
|
from psyc.lines.route import Route
|
|
from psyc.models import ApprovalStatus, Outcome, TLP
|
|
from psyc.result import Ok
|
|
from conftest import make_case
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(test_engine, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", test_engine)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
def _malware_url_route(requires_approval: bool) -> Route:
|
|
return Route(
|
|
destination_name="URLhaus",
|
|
priority=3,
|
|
payload_kind="malware_url_report",
|
|
max_tlp_allowed=TLP.GREEN,
|
|
requires_approval=requires_approval,
|
|
)
|
|
|
|
|
|
def test_execute_routes_enqueues_when_approval_required(fresh_db, monkeypatch):
|
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
|
case.classification.tlp = TLP.GREEN
|
|
|
|
# No HTTP should happen — submit must NOT be called on the approval branch.
|
|
def boom(*a, **kw):
|
|
raise AssertionError("submit() must not be called when approval is required")
|
|
monkeypatch.setattr(courier, "submit", boom)
|
|
|
|
results = courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
|
|
|
assert len(results) == 1
|
|
assert results[0].outcome is Outcome.PENDING_APPROVAL
|
|
pending = courier.list_pending()
|
|
assert len(pending) == 1
|
|
assert pending[0].destination_name == "URLhaus"
|
|
assert pending[0].status is ApprovalStatus.PENDING
|
|
assert pending[0].payload_hash # frozen hash present
|
|
assert pending[0].payload_json # frozen payload present
|
|
|
|
|
|
def test_execute_routes_force_approval_via_env(fresh_db, monkeypatch):
|
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
|
case.classification.tlp = TLP.GREEN
|
|
monkeypatch.setenv("PSYC_REQUIRE_APPROVAL", "1")
|
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("must not submit")))
|
|
|
|
results = courier.execute_routes(case, [_malware_url_route(requires_approval=False)])
|
|
|
|
assert results[0].outcome is Outcome.PENDING_APPROVAL
|
|
assert courier.pending_count() == 1
|
|
|
|
|
|
def test_dispatch_pending_submits_and_marks_approved(fresh_db, monkeypatch):
|
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
|
case.classification.tlp = TLP.GREEN
|
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
|
|
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
|
pid = courier.list_pending()[0].id
|
|
|
|
# Now approve — submit IS called, returns a receipt.
|
|
receipt = courier.Receipt(receipt_id="r-001", destination="urlhaus", status="acknowledged", response_body={})
|
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt))
|
|
|
|
result = courier.dispatch_pending(pid, reviewer="alice")
|
|
|
|
assert isinstance(result, Ok)
|
|
assert result.value.outcome is Outcome.ACKNOWLEDGED
|
|
assert result.value.receipt_id == "r-001"
|
|
# Pending row now marked approved.
|
|
refreshed = courier.get_pending(pid).value
|
|
assert refreshed.status is ApprovalStatus.APPROVED
|
|
assert refreshed.reviewer == "alice"
|
|
# Ledger has a corresponding row.
|
|
entries = ledger_line.list_by_case(case.case_id, limit=10)
|
|
assert any(e.outcome is Outcome.ACKNOWLEDGED and e.destination == "URLhaus" for e in entries)
|
|
|
|
|
|
def test_reject_pending_writes_rejection_to_ledger(fresh_db, monkeypatch):
|
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
|
case.classification.tlp = TLP.GREEN
|
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
|
|
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
|
pid = courier.list_pending()[0].id
|
|
|
|
# Reject — submit must NOT be called.
|
|
def boom(*a, **kw):
|
|
raise AssertionError("rejected submissions must not POST")
|
|
monkeypatch.setattr(courier, "submit", boom)
|
|
|
|
result = courier.reject_pending(pid, reviewer="bob", reason="payload looks wrong")
|
|
|
|
assert isinstance(result, Ok)
|
|
refreshed = courier.get_pending(pid).value
|
|
assert refreshed.status is ApprovalStatus.REJECTED
|
|
assert refreshed.reviewer == "bob"
|
|
assert refreshed.reason == "payload looks wrong"
|
|
entries = ledger_line.list_by_case(case.case_id, limit=10)
|
|
assert any(e.outcome is Outcome.REJECTED and "rejected_by=bob" in (e.detail or "") for e in entries)
|
|
|
|
|
|
def test_double_approve_is_rejected(fresh_db, monkeypatch):
|
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
|
case.classification.tlp = TLP.GREEN
|
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
|
|
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
|
pid = courier.list_pending()[0].id
|
|
|
|
receipt = courier.Receipt(receipt_id="r-002", destination="urlhaus", status="acknowledged", response_body={})
|
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt))
|
|
courier.dispatch_pending(pid, reviewer="alice")
|
|
|
|
# Second approval of the same id must fail — no double-submission.
|
|
again = courier.dispatch_pending(pid, reviewer="alice")
|
|
assert not isinstance(again, Ok)
|