"""Courier approval-queue tests — gating, dispatch, rejection.""" from __future__ import annotations import pytest from sqlalchemy import create_engine, select from psyc import db from psyc.lines import courier, ledger as ledger_line from psyc.lines.route import Route from psyc.models import ApprovalStatus, Outcome, TLP from psyc.result import Ok from conftest import make_case @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" test_engine = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(test_engine, checkfirst=True) monkeypatch.setattr(db, "_engine", test_engine) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db def _malware_url_route(requires_approval: bool) -> Route: return Route( destination_name="URLhaus", priority=3, payload_kind="malware_url_report", max_tlp_allowed=TLP.GREEN, requires_approval=requires_approval, ) def test_execute_routes_enqueues_when_approval_required(fresh_db, monkeypatch): case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) case.classification.tlp = TLP.GREEN # No HTTP should happen — submit must NOT be called on the approval branch. def boom(*a, **kw): raise AssertionError("submit() must not be called when approval is required") monkeypatch.setattr(courier, "submit", boom) results = courier.execute_routes(case, [_malware_url_route(requires_approval=True)]) assert len(results) == 1 assert results[0].outcome is Outcome.PENDING_APPROVAL pending = courier.list_pending() assert len(pending) == 1 assert pending[0].destination_name == "URLhaus" assert pending[0].status is ApprovalStatus.PENDING assert pending[0].payload_hash # frozen hash present assert pending[0].payload_json # frozen payload present def test_execute_routes_force_approval_via_env(fresh_db, monkeypatch): case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) case.classification.tlp = TLP.GREEN monkeypatch.setenv("PSYC_REQUIRE_APPROVAL", "1") monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("must not submit"))) results = courier.execute_routes(case, [_malware_url_route(requires_approval=False)]) assert results[0].outcome is Outcome.PENDING_APPROVAL assert courier.pending_count() == 1 def test_dispatch_pending_submits_and_marks_approved(fresh_db, monkeypatch): case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) case.classification.tlp = TLP.GREEN monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase"))) courier.execute_routes(case, [_malware_url_route(requires_approval=True)]) pid = courier.list_pending()[0].id # Now approve — submit IS called, returns a receipt. receipt = courier.Receipt(receipt_id="r-001", destination="urlhaus", status="acknowledged", response_body={}) monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt)) result = courier.dispatch_pending(pid, reviewer="alice") assert isinstance(result, Ok) assert result.value.outcome is Outcome.ACKNOWLEDGED assert result.value.receipt_id == "r-001" # Pending row now marked approved. refreshed = courier.get_pending(pid).value assert refreshed.status is ApprovalStatus.APPROVED assert refreshed.reviewer == "alice" # Ledger has a corresponding row. entries = ledger_line.list_by_case(case.case_id, limit=10) assert any(e.outcome is Outcome.ACKNOWLEDGED and e.destination == "URLhaus" for e in entries) def test_reject_pending_writes_rejection_to_ledger(fresh_db, monkeypatch): case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) case.classification.tlp = TLP.GREEN monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase"))) courier.execute_routes(case, [_malware_url_route(requires_approval=True)]) pid = courier.list_pending()[0].id # Reject — submit must NOT be called. def boom(*a, **kw): raise AssertionError("rejected submissions must not POST") monkeypatch.setattr(courier, "submit", boom) result = courier.reject_pending(pid, reviewer="bob", reason="payload looks wrong") assert isinstance(result, Ok) refreshed = courier.get_pending(pid).value assert refreshed.status is ApprovalStatus.REJECTED assert refreshed.reviewer == "bob" assert refreshed.reason == "payload looks wrong" entries = ledger_line.list_by_case(case.case_id, limit=10) assert any(e.outcome is Outcome.REJECTED and "rejected_by=bob" in (e.detail or "") for e in entries) def test_double_approve_is_rejected(fresh_db, monkeypatch): case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) case.classification.tlp = TLP.GREEN monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase"))) courier.execute_routes(case, [_malware_url_route(requires_approval=True)]) pid = courier.list_pending()[0].id receipt = courier.Receipt(receipt_id="r-002", destination="urlhaus", status="acknowledged", response_body={}) monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt)) courier.dispatch_pending(pid, reviewer="alice") # Second approval of the same id must fail — no double-submission. again = courier.dispatch_pending(pid, reviewer="alice") assert not isinstance(again, Ok)