Pipelines
diff --git a/src/psyc/db.py b/src/psyc/db.py
index c355f9a..a6e66b0 100644
--- a/src/psyc/db.py
+++ b/src/psyc/db.py
@@ -134,6 +134,19 @@ pulse_settings = Table(
Column("value", String, nullable=False),
)
+pulse_audit = Table(
+ "pulse_audit", _metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("pipeline", String, nullable=False), # 'respond' | 'fetch' | ...
+ Column("action", String, nullable=False), # 'auto-fire' | 'skipped' | 'error'
+ Column("action_id", Integer, nullable=True), # response_actions.id when relevant
+ Column("case_id", String, nullable=True),
+ Column("detail", Text, nullable=True),
+ Column("timestamp", String, nullable=False), # ISO
+)
+Index("pulse_audit_pipeline_idx", pulse_audit.c.pipeline, pulse_audit.c.timestamp.desc())
+Index("pulse_audit_action_id_idx", pulse_audit.c.action_id)
+
peers = Table(
"peers", _metadata,
Column("domain", String, primary_key=True),
@@ -361,6 +374,47 @@ def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
conn.execute(stmt)
+# ---------- pulse audit trail --------------------------------------------
+
+def pulse_audit_record(row: dict, db_path: Path = DB_PATH) -> int:
+ """Append one pulse_audit row. Returns its id.
+
+ `row` must include 'pipeline', 'action', 'timestamp'. action_id, case_id,
+ detail are optional. Caller controls timestamp so tests can pin it.
+ """
+ stmt = insert(pulse_audit).values(**row)
+ with engine(db_path).begin() as conn:
+ res = conn.execute(stmt)
+ return int(res.inserted_primary_key[0])
+
+
+def pulse_audit_recent(pipeline: str, limit: int = 25, db_path: Path = DB_PATH) -> List[dict]:
+ """Most-recent audit rows for one pipeline (newest first)."""
+ stmt = (
+ select(pulse_audit)
+ .where(pulse_audit.c.pipeline == pipeline)
+ .order_by(pulse_audit.c.timestamp.desc())
+ .limit(limit)
+ )
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def pulse_audit_count_since(
+ pipeline: str, action: str, since_iso: str, db_path: Path = DB_PATH
+) -> int:
+ """Count audit rows for (pipeline, action) at or after `since_iso`."""
+ stmt = (
+ select(func.count())
+ .select_from(pulse_audit)
+ .where(pulse_audit.c.pipeline == pipeline)
+ .where(pulse_audit.c.action == action)
+ .where(pulse_audit.c.timestamp >= since_iso)
+ )
+ with engine(db_path).connect() as conn:
+ return int(conn.execute(stmt).scalar_one())
+
+
# ---------- federation: peers + signal buffer ----------------------------
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
@@ -527,3 +581,18 @@ def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB
stmt = select(translog).where(cond).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
+
+
+def signals_for_case(case_id: str, db_path: Path = DB_PATH) -> List[dict]:
+ """All federation signals attached to this case_id (signal_type='case').
+
+ Empty list means no peer has ever sent us this case → we generated it
+ ourselves and it counts as locally-sourced for auto-fire purposes.
+ """
+ stmt = (
+ select(federation_signals)
+ .where(federation_signals.c.signal_type == "case")
+ .where(federation_signals.c.signal_id == case_id)
+ )
+ with engine(db_path).connect() as conn:
+ return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py
index 20944a8..e06b638 100644
--- a/src/psyc/lines/pulse.py
+++ b/src/psyc/lines/pulse.py
@@ -22,6 +22,7 @@ from typing import Callable, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
from psyc import db, log
+from psyc.models import Severity
_log = log.get(__name__)
@@ -33,6 +34,73 @@ class PulseMode(str, Enum):
AUTO_EXECUTE = "auto-execute"
+# ---------- respond auto-fire gates -----------------------------------------
+# Persisted as rows in pulse_settings (key/value pairs). All defaults are
+# "safe" — quorum required, HIGH threshold, federation cases permitted only
+# when quorum-met.
+
+_KEY_RESPOND_THRESHOLD = "respond_auto_threshold"
+_KEY_RESPOND_REQUIRE_QUORUM = "respond_require_quorum"
+_KEY_RESPOND_LOCAL_ONLY = "respond_local_only"
+
+_DEFAULT_THRESHOLD = Severity.HIGH
+_DEFAULT_REQUIRE_QUORUM = True
+_DEFAULT_LOCAL_ONLY = False
+
+
+def _severity_rank(sev: Optional[Severity]) -> int:
+ """Rank order for severity threshold comparison. Unknown / None → -1."""
+ if sev is None:
+ return -1
+ return {
+ Severity.LOW: 0,
+ Severity.MEDIUM: 1,
+ Severity.HIGH: 2,
+ Severity.CRITICAL: 3,
+ }.get(sev, -1)
+
+
+def respond_auto_threshold() -> Severity:
+ raw = db.pulse_setting_get(_KEY_RESPOND_THRESHOLD)
+ if raw is None:
+ return _DEFAULT_THRESHOLD
+ try:
+ return Severity(raw)
+ except ValueError:
+ return _DEFAULT_THRESHOLD
+
+
+def set_respond_auto_threshold(sev: Severity) -> None:
+ if not isinstance(sev, Severity):
+ raise ValueError(f"not a Severity: {sev!r}")
+ db.pulse_setting_set(_KEY_RESPOND_THRESHOLD, sev.value)
+ _log.info("pulse.respond.threshold.changed", severity=sev.value)
+
+
+def respond_require_quorum() -> bool:
+ raw = db.pulse_setting_get(_KEY_RESPOND_REQUIRE_QUORUM)
+ if raw is None:
+ return _DEFAULT_REQUIRE_QUORUM
+ return raw == "1"
+
+
+def set_respond_require_quorum(state: bool) -> None:
+ db.pulse_setting_set(_KEY_RESPOND_REQUIRE_QUORUM, "1" if state else "0")
+ _log.info("pulse.respond.quorum.changed", required=bool(state))
+
+
+def respond_local_only() -> bool:
+ raw = db.pulse_setting_get(_KEY_RESPOND_LOCAL_ONLY)
+ if raw is None:
+ return _DEFAULT_LOCAL_ONLY
+ return raw == "1"
+
+
+def set_respond_local_only(state: bool) -> None:
+ db.pulse_setting_set(_KEY_RESPOND_LOCAL_ONLY, "1" if state else "0")
+ _log.info("pulse.respond.local-only.changed", local_only=bool(state))
+
+
class Pipeline(BaseModel):
name: str
title: str
@@ -114,24 +182,218 @@ def _run_reindex() -> str:
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
-def _run_respond_propose() -> str:
+def _propose_for_recent_cases() -> int:
"""Propose response actions for high-severity cases that don't yet have any.
- Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
- helper is already idempotent per case (skips cases that already have
- actions), so re-running this on a tick is safe.
+ Returns total proposed-action count. Idempotent per case (respond's
+ propose_for_case skips cases that already have actions).
"""
from psyc.lines import respond
cases = db.list_cases(limit=10_000)
proposed = 0
- touched = 0
for c in cases:
ids = respond.propose_for_case(c)
- if ids:
- proposed += len(ids)
- touched += 1
- return f"proposed {proposed} action(s) for {touched} case(s)"
+ proposed += len(ids)
+ return proposed
+
+
+def _current_mode(pipeline_name: str) -> PulseMode:
+ p = _get_pipeline(pipeline_name)
+ return p.mode if p is not None else PulseMode.MANUAL
+
+
+def _is_quorum_met(case_digest_hash: str) -> bool:
+ """Wrapper for federation.is_quorum_met that tolerates the sibling agent
+ not having shipped the function yet.
+
+ If federation lacks `is_quorum_met`, we fall back to False — the safe
+ default ("no quorum signal → don't fire federation cases").
+ """
+ try:
+ from psyc.lines import federation as _federation
+ fn = getattr(_federation, "is_quorum_met", None)
+ if fn is None:
+ return False
+ return bool(fn(case_digest_hash))
+ except Exception as exc: # noqa: BLE001 — defensive: any import / runtime miss → safe-false
+ _log.warning("pulse.respond.quorum.unavailable", error=str(exc))
+ return False
+
+
+def _canonical_json_local(obj: Dict[str, object]) -> bytes:
+ """Deterministic JSON serialization — mirrors federation.canonical_json
+ for the case-digest computation. Local copy so we don't hard-require
+ federation to be importable.
+ """
+ import json
+ return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
+
+
+def _case_digest_hash(case_id: str) -> str:
+ """SHA-256 of the canonical JSON of {case_id: ...} — what federation hashes.
+
+ Returns "" if the case can't be loaded (e.g. row vanished mid-fire).
+ """
+ import hashlib
+ from psyc.result import Ok as _Ok
+ got = db.get_case(case_id)
+ if not isinstance(got, _Ok):
+ return ""
+ case = got.value
+ # Mirror federation._build_case_records' record shape so digests match.
+ record = {
+ "case_id": case.case_id,
+ "summary": case.summary,
+ "severity": case.classification.severity.value if case.classification.severity else None,
+ "incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
+ "observed_at": case.observed_at.isoformat(),
+ "feed_source": case.source_metadata.get("feed", ""),
+ "iocs": (
+ [{"value": v, "type": "url"} for v in case.observables.urls]
+ + [{"value": v, "type": "domain"} for v in case.observables.domains]
+ + [{"value": v, "type": "ip"} for v in case.observables.ips]
+ + [{"value": v, "type": "hash"} for v in case.observables.hashes]
+ + [{"value": v, "type": "cve"} for v in case.observables.cves]
+ ),
+ }
+ return hashlib.sha256(_canonical_json_local(record)).hexdigest()
+
+
+def _case_is_local(case_id: str) -> bool:
+ """True if no federation peer has ever pushed us this case_id."""
+ return len(db.signals_for_case(case_id)) == 0
+
+
+def _audit(action: str, *, action_id: Optional[int] = None,
+ case_id: Optional[str] = None, detail: str = "") -> None:
+ db.pulse_audit_record(dict(
+ pipeline="respond",
+ action=action,
+ action_id=action_id,
+ case_id=case_id,
+ detail=detail[:500],
+ timestamp=_now().isoformat(),
+ ))
+
+
+def _auto_fire_eligible() -> Tuple[int, int]:
+ """Iterate PROPOSED actions and execute the ones that clear every gate.
+
+ Returns (fired_count, skipped_count). Records a pulse_audit row for every
+ decision (fired or skipped-with-reason) so the cockpit can show history.
+ A single failing action never aborts the batch.
+ """
+ from psyc.lines import respond
+ from psyc.models import ActionStatus
+ from psyc.result import Ok as _Ok
+
+ threshold = respond_auto_threshold()
+ threshold_rank = _severity_rank(threshold)
+ require_quorum = respond_require_quorum()
+ local_only = respond_local_only()
+
+ fired = 0
+ skipped = 0
+
+ actions = respond.list_actions(status=ActionStatus.PROPOSED, limit=100)
+ for action in actions:
+ # Re-hydrate severity enum (action.severity is the .value string).
+ try:
+ sev_enum = Severity(action.severity) if action.severity else None
+ except ValueError:
+ sev_enum = None
+
+ if _severity_rank(sev_enum) < threshold_rank:
+ skipped += 1
+ _audit(
+ "skipped",
+ action_id=action.id,
+ case_id=action.case_id,
+ detail=f"below threshold: severity={action.severity!r} < {threshold.value}",
+ )
+ continue
+
+ is_local = _case_is_local(action.case_id)
+ if require_quorum and not is_local:
+ digest = _case_digest_hash(action.case_id)
+ if not digest or not _is_quorum_met(digest):
+ if local_only:
+ # local-only is armed but this case was imported via federation
+ # → defer (don't fire) until federation grants quorum
+ skipped += 1
+ _audit(
+ "skipped",
+ action_id=action.id,
+ case_id=action.case_id,
+ detail="local-only armed + federation-sourced case",
+ )
+ continue
+ skipped += 1
+ _audit(
+ "skipped",
+ action_id=action.id,
+ case_id=action.case_id,
+ detail="no quorum on federation-sourced case",
+ )
+ continue
+ # else: quorum disabled, or case is locally-generated → fire.
+
+ try:
+ result = respond.execute_action(action.id, approver="pulse-auto")
+ except Exception as exc: # noqa: BLE001 — one bad action shouldn't kill the batch
+ skipped += 1
+ _audit(
+ "error",
+ action_id=action.id,
+ case_id=action.case_id,
+ detail=f"execute raised: {type(exc).__name__}: {exc}",
+ )
+ _log.warning("pulse.respond.auto-fire.error",
+ action_id=action.id, error=str(exc))
+ continue
+
+ if isinstance(result, _Ok):
+ fired += 1
+ _log.info("pulse.respond.auto-fire",
+ action_id=action.id, case_id=action.case_id,
+ type=action.action_type.value, target=action.target)
+ _audit(
+ "auto-fire",
+ action_id=action.id,
+ case_id=action.case_id,
+ detail=f"{action.action_type.value} → {action.target}",
+ )
+ else:
+ # Err path — execute_action returned Err (e.g. SOAR sink down)
+ reason = getattr(result, "reason", "unknown")
+ skipped += 1
+ _audit(
+ "error",
+ action_id=action.id,
+ case_id=action.case_id,
+ detail=f"execute failed: {reason}",
+ )
+ _log.warning("pulse.respond.auto-fire.failed",
+ action_id=action.id, reason=str(reason))
+
+ return fired, skipped
+
+
+def _run_respond() -> str:
+ """Propose + (when mode is auto-execute) auto-fire eligible PROPOSED actions.
+
+ Two phases:
+ 1. Always propose new actions for high-severity cases (existing behavior).
+ 2. If pipeline mode is auto-execute, iterate PROPOSED actions and execute
+ those that clear severity/quorum/local-only gates.
+ """
+ propose_count = _propose_for_recent_cases()
+ mode = _current_mode("respond")
+ if mode != PulseMode.AUTO_EXECUTE:
+ return f"proposed {propose_count} actions; mode={mode.value} → no auto-fire"
+ fired, skipped = _auto_fire_eligible()
+ return f"proposed {propose_count}; auto-fired {fired}; skipped {skipped} (gate)"
_DISCOVERY_SEEDS_KEY = "discovery_seeds"
@@ -189,7 +451,7 @@ _REGISTRY: Dict[str, Callable[[], str]] = {
"classify": _run_classify,
"prove": _run_prove,
"reindex": _run_reindex,
- "respond": _run_respond_propose,
+ "respond": _run_respond,
"peer-pull": _run_peer_pull,
"vouch-refresh": _run_vouch_refresh,
}
diff --git a/tests/test_pulse_respond.py b/tests/test_pulse_respond.py
new file mode 100644
index 0000000..c940b3c
--- /dev/null
+++ b/tests/test_pulse_respond.py
@@ -0,0 +1,314 @@
+"""Pulseline auto-response gating — severity threshold, quorum, local-only.
+
+The runner here is the live `_run_respond` from pulse.py. We point it at a
+temp DB, monkeypatch federation.is_quorum_met to a controllable function, and
+swap respond.execute_action for a counter so we don't reach the SOAR sink.
+"""
+
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import List, Tuple
+
+import pytest
+from sqlalchemy import create_engine
+
+from psyc import db
+from psyc.lines import pulse, respond
+from psyc.lines import federation
+from psyc.models import (
+ ActionStatus,
+ ActionType,
+ Case,
+ Classification,
+ Observables,
+ ResponseAction,
+ Severity,
+ TLP,
+)
+from psyc.result import Ok
+
+from conftest import make_case
+
+
+# ----- fixtures --------------------------------------------------------------
+
+@pytest.fixture
+def fresh_db(tmp_path, monkeypatch):
+ """Temp SQLite + the real runner registry. Mode pinned to auto-execute."""
+ test_db = tmp_path / "respond.db"
+ eng = create_engine(f"sqlite:///{test_db}", future=True)
+ db._metadata.create_all(eng, checkfirst=True)
+ monkeypatch.setattr(db, "_engine", eng)
+ monkeypatch.setattr(db, "DB_PATH", test_db)
+ yield test_db
+
+
+@pytest.fixture
+def fired(monkeypatch):
+ """Capture every execute_action(action_id, approver=...) — no SOAR sink call."""
+ log: List[Tuple[int, str]] = []
+
+ def fake_execute(action_id: int, approver: str = "operator"):
+ log.append((action_id, approver))
+ # Re-read the action so we can return a realistic Ok value
+ got = respond.get_action(action_id)
+ return got if isinstance(got, Ok) else got
+
+ monkeypatch.setattr(respond, "execute_action", fake_execute)
+ return log
+
+
+@pytest.fixture
+def quorum_yes(monkeypatch):
+ monkeypatch.setattr(federation, "is_quorum_met",
+ lambda h, k=None: True, raising=False)
+
+
+@pytest.fixture
+def quorum_no(monkeypatch):
+ monkeypatch.setattr(federation, "is_quorum_met",
+ lambda h, k=None: False, raising=False)
+
+
+def _set_respond_mode(mode: pulse.PulseMode) -> None:
+ pulse.set_mode("respond", mode)
+
+
+def _propose_one(case: Case) -> int:
+ db.upsert_case(case)
+ ids = respond.propose_for_case(case)
+ assert ids, "test setup expected at least one action proposed"
+ return ids[0]
+
+
+# ----- severity rank ---------------------------------------------------------
+
+def test_severity_rank_ordering():
+ assert pulse._severity_rank(Severity.LOW) == 0
+ assert pulse._severity_rank(Severity.MEDIUM) == 1
+ assert pulse._severity_rank(Severity.HIGH) == 2
+ assert pulse._severity_rank(Severity.CRITICAL) == 3
+ assert pulse._severity_rank(None) == -1
+
+
+# ----- runner mode gating ----------------------------------------------------
+
+def test_runner_no_auto_fire_when_mode_is_propose(fresh_db, fired, quorum_yes):
+ case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ # default seed mode for respond is auto-propose → no auto-fire even with PROPOSED actions
+ result = pulse._run_respond()
+ assert "no auto-fire" in result
+ assert fired == []
+
+
+def test_runner_no_auto_fire_when_manual(fresh_db, fired, quorum_yes):
+ case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ _set_respond_mode(pulse.PulseMode.MANUAL)
+ result = pulse._run_respond()
+ assert "no auto-fire" in result
+ assert fired == []
+
+
+# ----- severity threshold ----------------------------------------------------
+
+def test_below_threshold_is_skipped(fresh_db, fired, quorum_yes):
+ # Propose an action carrying severity=MEDIUM by hand — propose_for_case
+ # only generates HIGH/CRITICAL actions, but the gate must still work for
+ # any below-threshold severity we drop in.
+ case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ respond.propose_for_case(case)
+ # Demote every action's severity to MEDIUM so all should be skipped under HIGH threshold.
+ from sqlalchemy import update as sa_update
+ with db.engine().begin() as conn:
+ conn.execute(sa_update(db.response_actions).values(severity=Severity.MEDIUM.value))
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+
+ pulse._run_respond()
+ assert fired == [], "below-threshold action must not fire"
+ audit = db.pulse_audit_recent("respond", limit=5)
+ assert any(r["action"] == "skipped" and "below threshold" in (r["detail"] or "") for r in audit)
+
+
+# ----- quorum gate -----------------------------------------------------------
+
+def test_federation_case_no_quorum_skipped(fresh_db, fired, quorum_no):
+ case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ # Mark this case as federation-sourced by inserting a signal row for it.
+ db.record_signal(dict(
+ peer_fingerprint="peer-a",
+ signal_type="case",
+ signal_id=case.case_id,
+ signal_hash="dummyhash",
+ received_at=datetime.now(timezone.utc).isoformat(),
+ raw_json="{}",
+ ))
+ respond.propose_for_case(case)
+
+ pulse.set_respond_require_quorum(True)
+ pulse.set_respond_local_only(False)
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+
+ pulse._run_respond()
+ assert fired == []
+ audit = db.pulse_audit_recent("respond", limit=5)
+ assert any(r["action"] == "skipped" and "no quorum" in (r["detail"] or "") for r in audit)
+
+
+def test_local_case_fires_when_quorum_required(fresh_db, fired, quorum_no):
+ """Locally-generated cases bypass quorum — they're our own work."""
+ case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ # No federation_signals row → locally-generated
+ respond.propose_for_case(case)
+
+ pulse.set_respond_require_quorum(True)
+ pulse.set_respond_local_only(True) # both armed; local cases still fire
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+
+ pulse._run_respond()
+ assert len(fired) >= 1
+ audit = db.pulse_audit_recent("respond", limit=10)
+ assert any(r["action"] == "auto-fire" for r in audit)
+
+
+def test_local_case_fires_local_only_off(fresh_db, fired, quorum_no):
+ """Even with local_only OFF, a locally-generated case still fires (no quorum needed)."""
+ case = make_case(feed="urlhaus", ips=["1.1.1.1"], severity=Severity.CRITICAL)
+ db.upsert_case(case)
+ respond.propose_for_case(case)
+
+ pulse.set_respond_require_quorum(True)
+ pulse.set_respond_local_only(False)
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+
+ pulse._run_respond()
+ assert len(fired) >= 1
+
+
+def test_federation_case_with_quorum_fires(fresh_db, fired, quorum_yes):
+ case = make_case(feed="urlhaus", ips=["2.2.2.2"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ db.record_signal(dict(
+ peer_fingerprint="peer-b",
+ signal_type="case",
+ signal_id=case.case_id,
+ signal_hash="dummyhash2",
+ received_at=datetime.now(timezone.utc).isoformat(),
+ raw_json="{}",
+ ))
+ respond.propose_for_case(case)
+
+ pulse.set_respond_require_quorum(True)
+ pulse.set_respond_local_only(False)
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+
+ pulse._run_respond()
+ assert len(fired) >= 1
+
+
+def test_quorum_off_fires_federation_case(fresh_db, fired, quorum_no):
+ """With quorum gating disabled entirely, federation cases fire too."""
+ case = make_case(feed="urlhaus", ips=["3.3.3.3"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ db.record_signal(dict(
+ peer_fingerprint="peer-c",
+ signal_type="case",
+ signal_id=case.case_id,
+ signal_hash="dummyhash3",
+ received_at=datetime.now(timezone.utc).isoformat(),
+ raw_json="{}",
+ ))
+ respond.propose_for_case(case)
+
+ pulse.set_respond_require_quorum(False)
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+
+ pulse._run_respond()
+ assert len(fired) >= 1
+
+
+# ----- kill switch -----------------------------------------------------------
+
+def test_kill_switch_blocks_tick(fresh_db, fired, quorum_yes):
+ """The parent tick() skips everything when kill switch is armed."""
+ case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ respond.propose_for_case(case)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+ pulse.set_kill_switch(True)
+ results = pulse.tick()
+ assert all(o == "skipped" for _, o, _ in results)
+ assert fired == []
+
+
+# ----- audit -----------------------------------------------------------------
+
+def test_pulse_audit_records_fire_and_skip(fresh_db, fired, quorum_no):
+ # Local case → should fire and audit auto-fire
+ local = make_case(feed="urlhaus", ips=["10.0.0.1"], severity=Severity.HIGH, age_days=1)
+ db.upsert_case(local)
+ respond.propose_for_case(local)
+
+ # Federation-sourced case w/o quorum → should skip and audit skip
+ fedcase = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.HIGH, age_days=2)
+ db.upsert_case(fedcase)
+ db.record_signal(dict(
+ peer_fingerprint="peer-x",
+ signal_type="case",
+ signal_id=fedcase.case_id,
+ signal_hash="xhash",
+ received_at=datetime.now(timezone.utc).isoformat(),
+ raw_json="{}",
+ ))
+ respond.propose_for_case(fedcase)
+
+ pulse.set_respond_require_quorum(True)
+ pulse.set_respond_local_only(False)
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+ pulse._run_respond()
+
+ audit = db.pulse_audit_recent("respond", limit=20)
+ actions = {r["action"] for r in audit}
+ assert "auto-fire" in actions
+ assert "skipped" in actions
+
+
+def test_audit_count_since(fresh_db, fired, quorum_no):
+ case = make_case(feed="urlhaus", ips=["8.8.8.8"], severity=Severity.HIGH)
+ db.upsert_case(case)
+ respond.propose_for_case(case)
+ pulse.set_respond_require_quorum(True)
+ pulse.set_respond_auto_threshold(Severity.HIGH)
+ _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
+ pulse._run_respond()
+ from datetime import timedelta
+ since = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
+ assert db.pulse_audit_count_since("respond", "auto-fire", since) >= 1
+
+
+# ----- config round-trip -----------------------------------------------------
+
+def test_config_round_trips(fresh_db):
+ assert pulse.respond_auto_threshold() == Severity.HIGH
+ assert pulse.respond_require_quorum() is True
+ assert pulse.respond_local_only() is False
+
+ pulse.set_respond_auto_threshold(Severity.CRITICAL)
+ pulse.set_respond_require_quorum(False)
+ pulse.set_respond_local_only(True)
+
+ assert pulse.respond_auto_threshold() == Severity.CRITICAL
+ assert pulse.respond_require_quorum() is False
+ assert pulse.respond_local_only() is True