stage-auto-c pulse: respond runner with gates + auto-fire path

Replace the propose-only respond runner with a two-phase runner: phase 1
always proposes actions for fresh high-severity cases (unchanged); phase 2
fires when pipeline mode is auto-execute and the action clears all gates.

Gates:
- severity ≥ configured threshold
- if require_quorum is on, federation-sourced cases must hit
  federation.is_quorum_met (wrapped in try/except so we tolerate the
  sibling agent not having shipped that function yet — fallback is
  "no quorum metric → don't auto-fire", the safe default)
- locally-generated cases (no row in federation_signals for their case_id)
  bypass the quorum check
- when local-only is armed, federation-sourced cases never auto-fire
  even with quorum

Every decision (auto-fire, skipped, error) records a pulse_audit row so
the cockpit and CLI can show history. Per-action try/except keeps one
bad action from aborting the whole batch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-06-06 21:11:39 +02:00
parent 31ec1557ec
commit e66c3d3359

View File

@@ -182,24 +182,218 @@ def _run_reindex() -> str:
return f"indexed {written} IOC(s) from {len(cases)} case(s)" return f"indexed {written} IOC(s) from {len(cases)} case(s)"
def _run_respond_propose() -> str: def _propose_for_recent_cases() -> int:
"""Propose response actions for high-severity cases that don't yet have any. """Propose response actions for high-severity cases that don't yet have any.
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case Returns total proposed-action count. Idempotent per case (respond's
helper is already idempotent per case (skips cases that already have propose_for_case skips cases that already have actions).
actions), so re-running this on a tick is safe.
""" """
from psyc.lines import respond from psyc.lines import respond
cases = db.list_cases(limit=10_000) cases = db.list_cases(limit=10_000)
proposed = 0 proposed = 0
touched = 0
for c in cases: for c in cases:
ids = respond.propose_for_case(c) ids = respond.propose_for_case(c)
if ids: proposed += len(ids)
proposed += len(ids) return proposed
touched += 1
return f"proposed {proposed} action(s) for {touched} case(s)"
def _current_mode(pipeline_name: str) -> PulseMode:
p = _get_pipeline(pipeline_name)
return p.mode if p is not None else PulseMode.MANUAL
def _is_quorum_met(case_digest_hash: str) -> bool:
"""Wrapper for federation.is_quorum_met that tolerates the sibling agent
not having shipped the function yet.
If federation lacks `is_quorum_met`, we fall back to False — the safe
default ("no quorum signal → don't fire federation cases").
"""
try:
from psyc.lines import federation as _federation
fn = getattr(_federation, "is_quorum_met", None)
if fn is None:
return False
return bool(fn(case_digest_hash))
except Exception as exc: # noqa: BLE001 — defensive: any import / runtime miss → safe-false
_log.warning("pulse.respond.quorum.unavailable", error=str(exc))
return False
def _canonical_json_local(obj: Dict[str, object]) -> bytes:
"""Deterministic JSON serialization — mirrors federation.canonical_json
for the case-digest computation. Local copy so we don't hard-require
federation to be importable.
"""
import json
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def _case_digest_hash(case_id: str) -> str:
"""SHA-256 of the canonical JSON of {case_id: ...} — what federation hashes.
Returns "" if the case can't be loaded (e.g. row vanished mid-fire).
"""
import hashlib
from psyc.result import Ok as _Ok
got = db.get_case(case_id)
if not isinstance(got, _Ok):
return ""
case = got.value
# Mirror federation._build_case_records' record shape so digests match.
record = {
"case_id": case.case_id,
"summary": case.summary,
"severity": case.classification.severity.value if case.classification.severity else None,
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
"observed_at": case.observed_at.isoformat(),
"feed_source": case.source_metadata.get("feed", ""),
"iocs": (
[{"value": v, "type": "url"} for v in case.observables.urls]
+ [{"value": v, "type": "domain"} for v in case.observables.domains]
+ [{"value": v, "type": "ip"} for v in case.observables.ips]
+ [{"value": v, "type": "hash"} for v in case.observables.hashes]
+ [{"value": v, "type": "cve"} for v in case.observables.cves]
),
}
return hashlib.sha256(_canonical_json_local(record)).hexdigest()
def _case_is_local(case_id: str) -> bool:
"""True if no federation peer has ever pushed us this case_id."""
return len(db.signals_for_case(case_id)) == 0
def _audit(action: str, *, action_id: Optional[int] = None,
case_id: Optional[str] = None, detail: str = "") -> None:
db.pulse_audit_record(dict(
pipeline="respond",
action=action,
action_id=action_id,
case_id=case_id,
detail=detail[:500],
timestamp=_now().isoformat(),
))
def _auto_fire_eligible() -> Tuple[int, int]:
"""Iterate PROPOSED actions and execute the ones that clear every gate.
Returns (fired_count, skipped_count). Records a pulse_audit row for every
decision (fired or skipped-with-reason) so the cockpit can show history.
A single failing action never aborts the batch.
"""
from psyc.lines import respond
from psyc.models import ActionStatus
from psyc.result import Ok as _Ok
threshold = respond_auto_threshold()
threshold_rank = _severity_rank(threshold)
require_quorum = respond_require_quorum()
local_only = respond_local_only()
fired = 0
skipped = 0
actions = respond.list_actions(status=ActionStatus.PROPOSED, limit=100)
for action in actions:
# Re-hydrate severity enum (action.severity is the .value string).
try:
sev_enum = Severity(action.severity) if action.severity else None
except ValueError:
sev_enum = None
if _severity_rank(sev_enum) < threshold_rank:
skipped += 1
_audit(
"skipped",
action_id=action.id,
case_id=action.case_id,
detail=f"below threshold: severity={action.severity!r} < {threshold.value}",
)
continue
is_local = _case_is_local(action.case_id)
if require_quorum and not is_local:
digest = _case_digest_hash(action.case_id)
if not digest or not _is_quorum_met(digest):
if local_only:
# local-only is armed but this case was imported via federation
# → defer (don't fire) until federation grants quorum
skipped += 1
_audit(
"skipped",
action_id=action.id,
case_id=action.case_id,
detail="local-only armed + federation-sourced case",
)
continue
skipped += 1
_audit(
"skipped",
action_id=action.id,
case_id=action.case_id,
detail="no quorum on federation-sourced case",
)
continue
# else: quorum disabled, or case is locally-generated → fire.
try:
result = respond.execute_action(action.id, approver="pulse-auto")
except Exception as exc: # noqa: BLE001 — one bad action shouldn't kill the batch
skipped += 1
_audit(
"error",
action_id=action.id,
case_id=action.case_id,
detail=f"execute raised: {type(exc).__name__}: {exc}",
)
_log.warning("pulse.respond.auto-fire.error",
action_id=action.id, error=str(exc))
continue
if isinstance(result, _Ok):
fired += 1
_log.info("pulse.respond.auto-fire",
action_id=action.id, case_id=action.case_id,
type=action.action_type.value, target=action.target)
_audit(
"auto-fire",
action_id=action.id,
case_id=action.case_id,
detail=f"{action.action_type.value}{action.target}",
)
else:
# Err path — execute_action returned Err (e.g. SOAR sink down)
reason = getattr(result, "reason", "unknown")
skipped += 1
_audit(
"error",
action_id=action.id,
case_id=action.case_id,
detail=f"execute failed: {reason}",
)
_log.warning("pulse.respond.auto-fire.failed",
action_id=action.id, reason=str(reason))
return fired, skipped
def _run_respond() -> str:
"""Propose + (when mode is auto-execute) auto-fire eligible PROPOSED actions.
Two phases:
1. Always propose new actions for high-severity cases (existing behavior).
2. If pipeline mode is auto-execute, iterate PROPOSED actions and execute
those that clear severity/quorum/local-only gates.
"""
propose_count = _propose_for_recent_cases()
mode = _current_mode("respond")
if mode != PulseMode.AUTO_EXECUTE:
return f"proposed {propose_count} actions; mode={mode.value} → no auto-fire"
fired, skipped = _auto_fire_eligible()
return f"proposed {propose_count}; auto-fired {fired}; skipped {skipped} (gate)"
def _run_peer_pull() -> str: def _run_peer_pull() -> str:
@@ -217,7 +411,7 @@ _REGISTRY: Dict[str, Callable[[], str]] = {
"classify": _run_classify, "classify": _run_classify,
"prove": _run_prove, "prove": _run_prove,
"reindex": _run_reindex, "reindex": _run_reindex,
"respond": _run_respond_propose, "respond": _run_respond,
"peer-pull": _run_peer_pull, "peer-pull": _run_peer_pull,
"vouch-refresh": _run_vouch_refresh, "vouch-refresh": _run_vouch_refresh,
} }