stage-25: response actions — human-gated enforcement + the disco

Closes the loop: intel -> decision -> enforcement -> audit. High/critical
cases propose response actions (alert SOC, push IOCs to perimeter
firewall+DNS). Nothing fires automatically — each sits PROPOSED until a
human approves, then it's POSTed to the enforcement sink (PSYC_SOAR_URL,
default mock-cert /soar/enforce) and written to the ledger as ACTIONED.

- models: ActionType / ActionStatus / ResponseAction
- db: response_actions table
- lines/respond.py: propose_for_case (idempotent, sev-gated), execute_action
  (fire + ledger + mark), reject_action; mock SOAR endpoint in mock_cert
- cockpit /response page: proposed/enforced/declined tabs,  Enforce +
  decline, and the disco — a full-screen strobe + "ENFORCED" + IOC-scatter
  animation that fires on approval (respects prefers-reduced-motion)
- cli: respond / actions / act-approve / act-reject
- 8 tests; verified the full loop live (propose -> enforce -> disco ->
  SOAR receipt -> ledger ACTIONED row)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-23 00:24:31 +02:00
parent d0a71d0226
commit 73a932d8be
11 changed files with 635 additions and 1 deletions

144
tests/test_respond.py Normal file
View File

@@ -0,0 +1,144 @@
"""Respondline — proposal gating, human-gated execution, rejection."""
from __future__ import annotations
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import ledger as ledger_line
from psyc.lines import respond
from psyc.models import ActionStatus, ActionType, Outcome, Severity
from psyc.result import Ok
from conftest import make_case
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
def test_low_severity_proposes_nothing(fresh_db):
case = make_case(feed="urlhaus", ips=["1.2.3.4"], severity=Severity.LOW)
db.upsert_case(case)
assert respond.propose_for_case(case) == []
def test_high_severity_proposes_alert_and_blocklist(fresh_db):
case = make_case(feed="feodo", ips=["9.9.9.9"], domains=["evil.com"], severity=Severity.HIGH)
db.upsert_case(case)
ids = respond.propose_for_case(case)
assert len(ids) == 2
actions = respond.list_actions(status=ActionStatus.PROPOSED)
types = {a.action_type for a in actions}
assert types == {ActionType.ALERT, ActionType.BLOCKLIST}
def test_proposal_is_idempotent_per_case(fresh_db):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.CRITICAL)
db.upsert_case(case)
respond.propose_for_case(case)
assert respond.propose_for_case(case) == [] # second call adds nothing
assert respond.action_count(ActionStatus.PROPOSED) == 2
def test_blocklist_skipped_when_no_network_iocs(fresh_db):
case = make_case(feed="malware-bazaar", hashes=["a" * 64], severity=Severity.HIGH)
db.upsert_case(case)
respond.propose_for_case(case)
actions = respond.list_actions()
# hash-only case → alert yes, blocklist no
assert {a.action_type for a in actions} == {ActionType.ALERT}
def test_execute_fires_and_marks_executed(fresh_db, monkeypatch):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
aid = respond.propose_for_case(case)[0]
captured = {}
class _Resp:
def raise_for_status(self): pass
def json(self): return {"receipt_id": "MOCK-AB12"}
class _Client:
def __init__(self, *a, **k): pass
def __enter__(self): return self
def __exit__(self, *a): return False
def post(self, url, json):
captured["url"] = url
captured["payload"] = json
return _Resp()
monkeypatch.setattr(respond.httpx, "Client", _Client)
result = respond.execute_action(aid, approver="alice")
assert isinstance(result, Ok)
assert result.value.status is ActionStatus.EXECUTED
assert result.value.approver == "alice"
assert captured["payload"]["approved_by"] == "alice"
# ledger has an ACTIONED row
entries = ledger_line.list_by_case(case.case_id, limit=10)
assert any(e.outcome is Outcome.ACTIONED for e in entries)
def test_execute_failure_marks_failed(fresh_db, monkeypatch):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
aid = respond.propose_for_case(case)[0]
class _Client:
def __init__(self, *a, **k): pass
def __enter__(self): return self
def __exit__(self, *a): return False
def post(self, url, json): raise RuntimeError("sink down")
monkeypatch.setattr(respond.httpx, "Client", _Client)
result = respond.execute_action(aid)
assert not isinstance(result, Ok)
assert respond.get_action(aid).value.status is ActionStatus.FAILED
def test_reject_fires_nothing_and_records(fresh_db, monkeypatch):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
aid = respond.propose_for_case(case)[0]
def boom(*a, **k):
raise AssertionError("reject must not POST")
monkeypatch.setattr(respond.httpx, "Client", boom)
result = respond.reject_action(aid, approver="bob", reason="false positive")
assert isinstance(result, Ok)
a = respond.get_action(aid).value
assert a.status is ActionStatus.REJECTED
assert a.approver == "bob"
entries = ledger_line.list_by_case(case.case_id, limit=10)
assert any(e.outcome is Outcome.REJECTED and "false positive" in (e.detail or "") for e in entries)
def test_double_execute_refused(fresh_db, monkeypatch):
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
aid = respond.propose_for_case(case)[0]
class _Resp:
def raise_for_status(self): pass
def json(self): return {"receipt_id": "X"}
class _Client:
def __init__(self, *a, **k): pass
def __enter__(self): return self
def __exit__(self, *a): return False
def post(self, url, json): return _Resp()
monkeypatch.setattr(respond.httpx, "Client", _Client)
respond.execute_action(aid)
again = respond.execute_action(aid)
assert not isinstance(again, Ok) # already executed