diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py index baeb1b9..6d84ca4 100644 --- a/src/psyc/lines/pulse.py +++ b/src/psyc/lines/pulse.py @@ -182,24 +182,218 @@ def _run_reindex() -> str: return f"indexed {written} IOC(s) from {len(cases)} case(s)" -def _run_respond_propose() -> str: +def _propose_for_recent_cases() -> int: """Propose response actions for high-severity cases that don't yet have any. - Strictly proposal-only: nothing is dispatched. The respond.propose_for_case - helper is already idempotent per case (skips cases that already have - actions), so re-running this on a tick is safe. + Returns total proposed-action count. Idempotent per case (respond's + propose_for_case skips cases that already have actions). """ from psyc.lines import respond cases = db.list_cases(limit=10_000) proposed = 0 - touched = 0 for c in cases: ids = respond.propose_for_case(c) - if ids: - proposed += len(ids) - touched += 1 - return f"proposed {proposed} action(s) for {touched} case(s)" + proposed += len(ids) + return proposed + + +def _current_mode(pipeline_name: str) -> PulseMode: + p = _get_pipeline(pipeline_name) + return p.mode if p is not None else PulseMode.MANUAL + + +def _is_quorum_met(case_digest_hash: str) -> bool: + """Wrapper for federation.is_quorum_met that tolerates the sibling agent + not having shipped the function yet. + + If federation lacks `is_quorum_met`, we fall back to False — the safe + default ("no quorum signal → don't fire federation cases"). + """ + try: + from psyc.lines import federation as _federation + fn = getattr(_federation, "is_quorum_met", None) + if fn is None: + return False + return bool(fn(case_digest_hash)) + except Exception as exc: # noqa: BLE001 — defensive: any import / runtime miss → safe-false + _log.warning("pulse.respond.quorum.unavailable", error=str(exc)) + return False + + +def _canonical_json_local(obj: Dict[str, object]) -> bytes: + """Deterministic JSON serialization — mirrors federation.canonical_json + for the case-digest computation. Local copy so we don't hard-require + federation to be importable. + """ + import json + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def _case_digest_hash(case_id: str) -> str: + """SHA-256 of the canonical JSON of {case_id: ...} — what federation hashes. + + Returns "" if the case can't be loaded (e.g. row vanished mid-fire). + """ + import hashlib + from psyc.result import Ok as _Ok + got = db.get_case(case_id) + if not isinstance(got, _Ok): + return "" + case = got.value + # Mirror federation._build_case_records' record shape so digests match. + record = { + "case_id": case.case_id, + "summary": case.summary, + "severity": case.classification.severity.value if case.classification.severity else None, + "incident_type": case.classification.incident_type.value if case.classification.incident_type else None, + "observed_at": case.observed_at.isoformat(), + "feed_source": case.source_metadata.get("feed", ""), + "iocs": ( + [{"value": v, "type": "url"} for v in case.observables.urls] + + [{"value": v, "type": "domain"} for v in case.observables.domains] + + [{"value": v, "type": "ip"} for v in case.observables.ips] + + [{"value": v, "type": "hash"} for v in case.observables.hashes] + + [{"value": v, "type": "cve"} for v in case.observables.cves] + ), + } + return hashlib.sha256(_canonical_json_local(record)).hexdigest() + + +def _case_is_local(case_id: str) -> bool: + """True if no federation peer has ever pushed us this case_id.""" + return len(db.signals_for_case(case_id)) == 0 + + +def _audit(action: str, *, action_id: Optional[int] = None, + case_id: Optional[str] = None, detail: str = "") -> None: + db.pulse_audit_record(dict( + pipeline="respond", + action=action, + action_id=action_id, + case_id=case_id, + detail=detail[:500], + timestamp=_now().isoformat(), + )) + + +def _auto_fire_eligible() -> Tuple[int, int]: + """Iterate PROPOSED actions and execute the ones that clear every gate. + + Returns (fired_count, skipped_count). Records a pulse_audit row for every + decision (fired or skipped-with-reason) so the cockpit can show history. + A single failing action never aborts the batch. + """ + from psyc.lines import respond + from psyc.models import ActionStatus + from psyc.result import Ok as _Ok + + threshold = respond_auto_threshold() + threshold_rank = _severity_rank(threshold) + require_quorum = respond_require_quorum() + local_only = respond_local_only() + + fired = 0 + skipped = 0 + + actions = respond.list_actions(status=ActionStatus.PROPOSED, limit=100) + for action in actions: + # Re-hydrate severity enum (action.severity is the .value string). + try: + sev_enum = Severity(action.severity) if action.severity else None + except ValueError: + sev_enum = None + + if _severity_rank(sev_enum) < threshold_rank: + skipped += 1 + _audit( + "skipped", + action_id=action.id, + case_id=action.case_id, + detail=f"below threshold: severity={action.severity!r} < {threshold.value}", + ) + continue + + is_local = _case_is_local(action.case_id) + if require_quorum and not is_local: + digest = _case_digest_hash(action.case_id) + if not digest or not _is_quorum_met(digest): + if local_only: + # local-only is armed but this case was imported via federation + # → defer (don't fire) until federation grants quorum + skipped += 1 + _audit( + "skipped", + action_id=action.id, + case_id=action.case_id, + detail="local-only armed + federation-sourced case", + ) + continue + skipped += 1 + _audit( + "skipped", + action_id=action.id, + case_id=action.case_id, + detail="no quorum on federation-sourced case", + ) + continue + # else: quorum disabled, or case is locally-generated → fire. + + try: + result = respond.execute_action(action.id, approver="pulse-auto") + except Exception as exc: # noqa: BLE001 — one bad action shouldn't kill the batch + skipped += 1 + _audit( + "error", + action_id=action.id, + case_id=action.case_id, + detail=f"execute raised: {type(exc).__name__}: {exc}", + ) + _log.warning("pulse.respond.auto-fire.error", + action_id=action.id, error=str(exc)) + continue + + if isinstance(result, _Ok): + fired += 1 + _log.info("pulse.respond.auto-fire", + action_id=action.id, case_id=action.case_id, + type=action.action_type.value, target=action.target) + _audit( + "auto-fire", + action_id=action.id, + case_id=action.case_id, + detail=f"{action.action_type.value} → {action.target}", + ) + else: + # Err path — execute_action returned Err (e.g. SOAR sink down) + reason = getattr(result, "reason", "unknown") + skipped += 1 + _audit( + "error", + action_id=action.id, + case_id=action.case_id, + detail=f"execute failed: {reason}", + ) + _log.warning("pulse.respond.auto-fire.failed", + action_id=action.id, reason=str(reason)) + + return fired, skipped + + +def _run_respond() -> str: + """Propose + (when mode is auto-execute) auto-fire eligible PROPOSED actions. + + Two phases: + 1. Always propose new actions for high-severity cases (existing behavior). + 2. If pipeline mode is auto-execute, iterate PROPOSED actions and execute + those that clear severity/quorum/local-only gates. + """ + propose_count = _propose_for_recent_cases() + mode = _current_mode("respond") + if mode != PulseMode.AUTO_EXECUTE: + return f"proposed {propose_count} actions; mode={mode.value} → no auto-fire" + fired, skipped = _auto_fire_eligible() + return f"proposed {propose_count}; auto-fired {fired}; skipped {skipped} (gate)" def _run_peer_pull() -> str: @@ -217,7 +411,7 @@ _REGISTRY: Dict[str, Callable[[], str]] = { "classify": _run_classify, "prove": _run_prove, "reindex": _run_reindex, - "respond": _run_respond_propose, + "respond": _run_respond, "peer-pull": _run_peer_pull, "vouch-refresh": _run_vouch_refresh, }