Closes the loop: intel -> decision -> enforcement -> audit. High/critical cases propose response actions (alert SOC, push IOCs to perimeter firewall+DNS). Nothing fires automatically — each sits PROPOSED until a human approves, then it's POSTed to the enforcement sink (PSYC_SOAR_URL, default mock-cert /soar/enforce) and written to the ledger as ACTIONED. - models: ActionType / ActionStatus / ResponseAction - db: response_actions table - lines/respond.py: propose_for_case (idempotent, sev-gated), execute_action (fire + ledger + mark), reject_action; mock SOAR endpoint in mock_cert - cockpit /response page: proposed/enforced/declined tabs, ⚡ Enforce + decline, and the disco — a full-screen strobe + "ENFORCED" + IOC-scatter animation that fires on approval (respects prefers-reduced-motion) - cli: respond / actions / act-approve / act-reject - 8 tests; verified the full loop live (propose -> enforce -> disco -> SOAR receipt -> ledger ACTIONED row) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
145 lines
5.1 KiB
Python
145 lines
5.1 KiB
Python
"""Respondline — proposal gating, human-gated execution, rejection."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
from psyc import db
|
|
from psyc.lines import ledger as ledger_line
|
|
from psyc.lines import respond
|
|
from psyc.models import ActionStatus, ActionType, Outcome, Severity
|
|
from psyc.result import Ok
|
|
from conftest import make_case
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(eng, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", eng)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
def test_low_severity_proposes_nothing(fresh_db):
|
|
case = make_case(feed="urlhaus", ips=["1.2.3.4"], severity=Severity.LOW)
|
|
db.upsert_case(case)
|
|
assert respond.propose_for_case(case) == []
|
|
|
|
|
|
def test_high_severity_proposes_alert_and_blocklist(fresh_db):
|
|
case = make_case(feed="feodo", ips=["9.9.9.9"], domains=["evil.com"], severity=Severity.HIGH)
|
|
db.upsert_case(case)
|
|
ids = respond.propose_for_case(case)
|
|
assert len(ids) == 2
|
|
actions = respond.list_actions(status=ActionStatus.PROPOSED)
|
|
types = {a.action_type for a in actions}
|
|
assert types == {ActionType.ALERT, ActionType.BLOCKLIST}
|
|
|
|
|
|
def test_proposal_is_idempotent_per_case(fresh_db):
|
|
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.CRITICAL)
|
|
db.upsert_case(case)
|
|
respond.propose_for_case(case)
|
|
assert respond.propose_for_case(case) == [] # second call adds nothing
|
|
assert respond.action_count(ActionStatus.PROPOSED) == 2
|
|
|
|
|
|
def test_blocklist_skipped_when_no_network_iocs(fresh_db):
|
|
case = make_case(feed="malware-bazaar", hashes=["a" * 64], severity=Severity.HIGH)
|
|
db.upsert_case(case)
|
|
respond.propose_for_case(case)
|
|
actions = respond.list_actions()
|
|
# hash-only case → alert yes, blocklist no
|
|
assert {a.action_type for a in actions} == {ActionType.ALERT}
|
|
|
|
|
|
def test_execute_fires_and_marks_executed(fresh_db, monkeypatch):
|
|
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
|
db.upsert_case(case)
|
|
aid = respond.propose_for_case(case)[0]
|
|
|
|
captured = {}
|
|
|
|
class _Resp:
|
|
def raise_for_status(self): pass
|
|
def json(self): return {"receipt_id": "MOCK-AB12"}
|
|
|
|
class _Client:
|
|
def __init__(self, *a, **k): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *a): return False
|
|
def post(self, url, json):
|
|
captured["url"] = url
|
|
captured["payload"] = json
|
|
return _Resp()
|
|
|
|
monkeypatch.setattr(respond.httpx, "Client", _Client)
|
|
|
|
result = respond.execute_action(aid, approver="alice")
|
|
assert isinstance(result, Ok)
|
|
assert result.value.status is ActionStatus.EXECUTED
|
|
assert result.value.approver == "alice"
|
|
assert captured["payload"]["approved_by"] == "alice"
|
|
# ledger has an ACTIONED row
|
|
entries = ledger_line.list_by_case(case.case_id, limit=10)
|
|
assert any(e.outcome is Outcome.ACTIONED for e in entries)
|
|
|
|
|
|
def test_execute_failure_marks_failed(fresh_db, monkeypatch):
|
|
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
|
db.upsert_case(case)
|
|
aid = respond.propose_for_case(case)[0]
|
|
|
|
class _Client:
|
|
def __init__(self, *a, **k): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *a): return False
|
|
def post(self, url, json): raise RuntimeError("sink down")
|
|
|
|
monkeypatch.setattr(respond.httpx, "Client", _Client)
|
|
result = respond.execute_action(aid)
|
|
assert not isinstance(result, Ok)
|
|
assert respond.get_action(aid).value.status is ActionStatus.FAILED
|
|
|
|
|
|
def test_reject_fires_nothing_and_records(fresh_db, monkeypatch):
|
|
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
|
db.upsert_case(case)
|
|
aid = respond.propose_for_case(case)[0]
|
|
|
|
def boom(*a, **k):
|
|
raise AssertionError("reject must not POST")
|
|
monkeypatch.setattr(respond.httpx, "Client", boom)
|
|
|
|
result = respond.reject_action(aid, approver="bob", reason="false positive")
|
|
assert isinstance(result, Ok)
|
|
a = respond.get_action(aid).value
|
|
assert a.status is ActionStatus.REJECTED
|
|
assert a.approver == "bob"
|
|
entries = ledger_line.list_by_case(case.case_id, limit=10)
|
|
assert any(e.outcome is Outcome.REJECTED and "false positive" in (e.detail or "") for e in entries)
|
|
|
|
|
|
def test_double_execute_refused(fresh_db, monkeypatch):
|
|
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
|
db.upsert_case(case)
|
|
aid = respond.propose_for_case(case)[0]
|
|
|
|
class _Resp:
|
|
def raise_for_status(self): pass
|
|
def json(self): return {"receipt_id": "X"}
|
|
|
|
class _Client:
|
|
def __init__(self, *a, **k): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *a): return False
|
|
def post(self, url, json): return _Resp()
|
|
|
|
monkeypatch.setattr(respond.httpx, "Client", _Client)
|
|
respond.execute_action(aid)
|
|
again = respond.execute_action(aid)
|
|
assert not isinstance(again, Ok) # already executed
|