From 0dbeb056c5caadbb6aafd9295840644cef07bd1a Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:09:46 +0200 Subject: [PATCH 1/5] stage-auto-a pulse: respond config (threshold/quorum/local-only) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persisted-key/value helpers for the respond-pipeline auto-fire gates: - respond_auto_threshold / set_… (Severity, default HIGH) - respond_require_quorum / set_… (bool, default True) - respond_local_only / set_… (bool, default False) Plus a _severity_rank helper for threshold comparison. Backing store is the existing pulse_settings table; this commit also adds generic pulse_setting_get / pulse_setting_set helpers in db.py so future pulse settings don't each need their own column-pair helper. Co-Authored-By: Claude Opus 4.7 --- src/psyc/db.py | 19 ++++++++++++ src/psyc/lines/pulse.py | 68 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/src/psyc/db.py b/src/psyc/db.py index 5d19acb..8d58fac 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -313,6 +313,25 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None: conn.execute(stmt) +def pulse_setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]: + """Read one pulse_settings row by key. None if unset.""" + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return None if row is None else str(row.value) + + +def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: + """Upsert one pulse_settings row by key. Both args are strings.""" + stmt = sqlite_insert(pulse_settings).values(key=key, value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + # ---------- federation: peers + signal buffer ---------------------------- def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py index 10473ba..baeb1b9 100644 --- a/src/psyc/lines/pulse.py +++ b/src/psyc/lines/pulse.py @@ -22,6 +22,7 @@ from typing import Callable, Dict, List, Optional, Tuple from pydantic import BaseModel, Field from psyc import db, log +from psyc.models import Severity _log = log.get(__name__) @@ -33,6 +34,73 @@ class PulseMode(str, Enum): AUTO_EXECUTE = "auto-execute" +# ---------- respond auto-fire gates ----------------------------------------- +# Persisted as rows in pulse_settings (key/value pairs). All defaults are +# "safe" — quorum required, HIGH threshold, federation cases permitted only +# when quorum-met. + +_KEY_RESPOND_THRESHOLD = "respond_auto_threshold" +_KEY_RESPOND_REQUIRE_QUORUM = "respond_require_quorum" +_KEY_RESPOND_LOCAL_ONLY = "respond_local_only" + +_DEFAULT_THRESHOLD = Severity.HIGH +_DEFAULT_REQUIRE_QUORUM = True +_DEFAULT_LOCAL_ONLY = False + + +def _severity_rank(sev: Optional[Severity]) -> int: + """Rank order for severity threshold comparison. Unknown / None → -1.""" + if sev is None: + return -1 + return { + Severity.LOW: 0, + Severity.MEDIUM: 1, + Severity.HIGH: 2, + Severity.CRITICAL: 3, + }.get(sev, -1) + + +def respond_auto_threshold() -> Severity: + raw = db.pulse_setting_get(_KEY_RESPOND_THRESHOLD) + if raw is None: + return _DEFAULT_THRESHOLD + try: + return Severity(raw) + except ValueError: + return _DEFAULT_THRESHOLD + + +def set_respond_auto_threshold(sev: Severity) -> None: + if not isinstance(sev, Severity): + raise ValueError(f"not a Severity: {sev!r}") + db.pulse_setting_set(_KEY_RESPOND_THRESHOLD, sev.value) + _log.info("pulse.respond.threshold.changed", severity=sev.value) + + +def respond_require_quorum() -> bool: + raw = db.pulse_setting_get(_KEY_RESPOND_REQUIRE_QUORUM) + if raw is None: + return _DEFAULT_REQUIRE_QUORUM + return raw == "1" + + +def set_respond_require_quorum(state: bool) -> None: + db.pulse_setting_set(_KEY_RESPOND_REQUIRE_QUORUM, "1" if state else "0") + _log.info("pulse.respond.quorum.changed", required=bool(state)) + + +def respond_local_only() -> bool: + raw = db.pulse_setting_get(_KEY_RESPOND_LOCAL_ONLY) + if raw is None: + return _DEFAULT_LOCAL_ONLY + return raw == "1" + + +def set_respond_local_only(state: bool) -> None: + db.pulse_setting_set(_KEY_RESPOND_LOCAL_ONLY, "1" if state else "0") + _log.info("pulse.respond.local-only.changed", local_only=bool(state)) + + class Pipeline(BaseModel): name: str title: str From 31ec1557ec0b1b9326399036e5ced54088179a33 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:10:38 +0200 Subject: [PATCH 2/5] stage-auto-b pulse: pulse_audit table + history MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a per-decision audit log so the cockpit + CLI can show what the auto-response runner did each tick: - pulse_audit table: id, pipeline, action ('auto-fire'|'skipped'|'error'), action_id, case_id, detail, timestamp - helpers: pulse_audit_record, pulse_audit_recent, pulse_audit_count_since - indexes on (pipeline, timestamp desc) and on action_id Also add db.signals_for_case(case_id) — checks the federation_signals buffer to tell whether a case was peer-sourced. Used by the runner to decide if a quorum check is required. Co-Authored-By: Claude Opus 4.7 --- src/psyc/db.py | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/src/psyc/db.py b/src/psyc/db.py index 8d58fac..4526182 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -134,6 +134,19 @@ pulse_settings = Table( Column("value", String, nullable=False), ) +pulse_audit = Table( + "pulse_audit", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("pipeline", String, nullable=False), # 'respond' | 'fetch' | ... + Column("action", String, nullable=False), # 'auto-fire' | 'skipped' | 'error' + Column("action_id", Integer, nullable=True), # response_actions.id when relevant + Column("case_id", String, nullable=True), + Column("detail", Text, nullable=True), + Column("timestamp", String, nullable=False), # ISO +) +Index("pulse_audit_pipeline_idx", pulse_audit.c.pipeline, pulse_audit.c.timestamp.desc()) +Index("pulse_audit_action_id_idx", pulse_audit.c.action_id) + peers = Table( "peers", _metadata, Column("domain", String, primary_key=True), @@ -332,6 +345,47 @@ def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: conn.execute(stmt) +# ---------- pulse audit trail -------------------------------------------- + +def pulse_audit_record(row: dict, db_path: Path = DB_PATH) -> int: + """Append one pulse_audit row. Returns its id. + + `row` must include 'pipeline', 'action', 'timestamp'. action_id, case_id, + detail are optional. Caller controls timestamp so tests can pin it. + """ + stmt = insert(pulse_audit).values(**row) + with engine(db_path).begin() as conn: + res = conn.execute(stmt) + return int(res.inserted_primary_key[0]) + + +def pulse_audit_recent(pipeline: str, limit: int = 25, db_path: Path = DB_PATH) -> List[dict]: + """Most-recent audit rows for one pipeline (newest first).""" + stmt = ( + select(pulse_audit) + .where(pulse_audit.c.pipeline == pipeline) + .order_by(pulse_audit.c.timestamp.desc()) + .limit(limit) + ) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def pulse_audit_count_since( + pipeline: str, action: str, since_iso: str, db_path: Path = DB_PATH +) -> int: + """Count audit rows for (pipeline, action) at or after `since_iso`.""" + stmt = ( + select(func.count()) + .select_from(pulse_audit) + .where(pulse_audit.c.pipeline == pipeline) + .where(pulse_audit.c.action == action) + .where(pulse_audit.c.timestamp >= since_iso) + ) + with engine(db_path).connect() as conn: + return int(conn.execute(stmt).scalar_one()) + + # ---------- federation: peers + signal buffer ---------------------------- def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: @@ -388,3 +442,18 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]: stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) with engine(db_path).connect() as conn: return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def signals_for_case(case_id: str, db_path: Path = DB_PATH) -> List[dict]: + """All federation signals attached to this case_id (signal_type='case'). + + Empty list means no peer has ever sent us this case → we generated it + ourselves and it counts as locally-sourced for auto-fire purposes. + """ + stmt = ( + select(federation_signals) + .where(federation_signals.c.signal_type == "case") + .where(federation_signals.c.signal_id == case_id) + ) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] From e66c3d3359ade8aa2ce89b7954613adcf00853c0 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:11:39 +0200 Subject: [PATCH 3/5] stage-auto-c pulse: respond runner with gates + auto-fire path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the propose-only respond runner with a two-phase runner: phase 1 always proposes actions for fresh high-severity cases (unchanged); phase 2 fires when pipeline mode is auto-execute and the action clears all gates. Gates: - severity ≥ configured threshold - if require_quorum is on, federation-sourced cases must hit federation.is_quorum_met (wrapped in try/except so we tolerate the sibling agent not having shipped that function yet — fallback is "no quorum metric → don't auto-fire", the safe default) - locally-generated cases (no row in federation_signals for their case_id) bypass the quorum check - when local-only is armed, federation-sourced cases never auto-fire even with quorum Every decision (auto-fire, skipped, error) records a pulse_audit row so the cockpit and CLI can show history. Per-action try/except keeps one bad action from aborting the whole batch. Co-Authored-By: Claude Opus 4.7 --- src/psyc/lines/pulse.py | 214 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 204 insertions(+), 10 deletions(-) diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py index baeb1b9..6d84ca4 100644 --- a/src/psyc/lines/pulse.py +++ b/src/psyc/lines/pulse.py @@ -182,24 +182,218 @@ def _run_reindex() -> str: return f"indexed {written} IOC(s) from {len(cases)} case(s)" -def _run_respond_propose() -> str: +def _propose_for_recent_cases() -> int: """Propose response actions for high-severity cases that don't yet have any. - Strictly proposal-only: nothing is dispatched. The respond.propose_for_case - helper is already idempotent per case (skips cases that already have - actions), so re-running this on a tick is safe. + Returns total proposed-action count. Idempotent per case (respond's + propose_for_case skips cases that already have actions). """ from psyc.lines import respond cases = db.list_cases(limit=10_000) proposed = 0 - touched = 0 for c in cases: ids = respond.propose_for_case(c) - if ids: - proposed += len(ids) - touched += 1 - return f"proposed {proposed} action(s) for {touched} case(s)" + proposed += len(ids) + return proposed + + +def _current_mode(pipeline_name: str) -> PulseMode: + p = _get_pipeline(pipeline_name) + return p.mode if p is not None else PulseMode.MANUAL + + +def _is_quorum_met(case_digest_hash: str) -> bool: + """Wrapper for federation.is_quorum_met that tolerates the sibling agent + not having shipped the function yet. + + If federation lacks `is_quorum_met`, we fall back to False — the safe + default ("no quorum signal → don't fire federation cases"). + """ + try: + from psyc.lines import federation as _federation + fn = getattr(_federation, "is_quorum_met", None) + if fn is None: + return False + return bool(fn(case_digest_hash)) + except Exception as exc: # noqa: BLE001 — defensive: any import / runtime miss → safe-false + _log.warning("pulse.respond.quorum.unavailable", error=str(exc)) + return False + + +def _canonical_json_local(obj: Dict[str, object]) -> bytes: + """Deterministic JSON serialization — mirrors federation.canonical_json + for the case-digest computation. Local copy so we don't hard-require + federation to be importable. + """ + import json + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def _case_digest_hash(case_id: str) -> str: + """SHA-256 of the canonical JSON of {case_id: ...} — what federation hashes. + + Returns "" if the case can't be loaded (e.g. row vanished mid-fire). + """ + import hashlib + from psyc.result import Ok as _Ok + got = db.get_case(case_id) + if not isinstance(got, _Ok): + return "" + case = got.value + # Mirror federation._build_case_records' record shape so digests match. + record = { + "case_id": case.case_id, + "summary": case.summary, + "severity": case.classification.severity.value if case.classification.severity else None, + "incident_type": case.classification.incident_type.value if case.classification.incident_type else None, + "observed_at": case.observed_at.isoformat(), + "feed_source": case.source_metadata.get("feed", ""), + "iocs": ( + [{"value": v, "type": "url"} for v in case.observables.urls] + + [{"value": v, "type": "domain"} for v in case.observables.domains] + + [{"value": v, "type": "ip"} for v in case.observables.ips] + + [{"value": v, "type": "hash"} for v in case.observables.hashes] + + [{"value": v, "type": "cve"} for v in case.observables.cves] + ), + } + return hashlib.sha256(_canonical_json_local(record)).hexdigest() + + +def _case_is_local(case_id: str) -> bool: + """True if no federation peer has ever pushed us this case_id.""" + return len(db.signals_for_case(case_id)) == 0 + + +def _audit(action: str, *, action_id: Optional[int] = None, + case_id: Optional[str] = None, detail: str = "") -> None: + db.pulse_audit_record(dict( + pipeline="respond", + action=action, + action_id=action_id, + case_id=case_id, + detail=detail[:500], + timestamp=_now().isoformat(), + )) + + +def _auto_fire_eligible() -> Tuple[int, int]: + """Iterate PROPOSED actions and execute the ones that clear every gate. + + Returns (fired_count, skipped_count). Records a pulse_audit row for every + decision (fired or skipped-with-reason) so the cockpit can show history. + A single failing action never aborts the batch. + """ + from psyc.lines import respond + from psyc.models import ActionStatus + from psyc.result import Ok as _Ok + + threshold = respond_auto_threshold() + threshold_rank = _severity_rank(threshold) + require_quorum = respond_require_quorum() + local_only = respond_local_only() + + fired = 0 + skipped = 0 + + actions = respond.list_actions(status=ActionStatus.PROPOSED, limit=100) + for action in actions: + # Re-hydrate severity enum (action.severity is the .value string). + try: + sev_enum = Severity(action.severity) if action.severity else None + except ValueError: + sev_enum = None + + if _severity_rank(sev_enum) < threshold_rank: + skipped += 1 + _audit( + "skipped", + action_id=action.id, + case_id=action.case_id, + detail=f"below threshold: severity={action.severity!r} < {threshold.value}", + ) + continue + + is_local = _case_is_local(action.case_id) + if require_quorum and not is_local: + digest = _case_digest_hash(action.case_id) + if not digest or not _is_quorum_met(digest): + if local_only: + # local-only is armed but this case was imported via federation + # → defer (don't fire) until federation grants quorum + skipped += 1 + _audit( + "skipped", + action_id=action.id, + case_id=action.case_id, + detail="local-only armed + federation-sourced case", + ) + continue + skipped += 1 + _audit( + "skipped", + action_id=action.id, + case_id=action.case_id, + detail="no quorum on federation-sourced case", + ) + continue + # else: quorum disabled, or case is locally-generated → fire. + + try: + result = respond.execute_action(action.id, approver="pulse-auto") + except Exception as exc: # noqa: BLE001 — one bad action shouldn't kill the batch + skipped += 1 + _audit( + "error", + action_id=action.id, + case_id=action.case_id, + detail=f"execute raised: {type(exc).__name__}: {exc}", + ) + _log.warning("pulse.respond.auto-fire.error", + action_id=action.id, error=str(exc)) + continue + + if isinstance(result, _Ok): + fired += 1 + _log.info("pulse.respond.auto-fire", + action_id=action.id, case_id=action.case_id, + type=action.action_type.value, target=action.target) + _audit( + "auto-fire", + action_id=action.id, + case_id=action.case_id, + detail=f"{action.action_type.value} → {action.target}", + ) + else: + # Err path — execute_action returned Err (e.g. SOAR sink down) + reason = getattr(result, "reason", "unknown") + skipped += 1 + _audit( + "error", + action_id=action.id, + case_id=action.case_id, + detail=f"execute failed: {reason}", + ) + _log.warning("pulse.respond.auto-fire.failed", + action_id=action.id, reason=str(reason)) + + return fired, skipped + + +def _run_respond() -> str: + """Propose + (when mode is auto-execute) auto-fire eligible PROPOSED actions. + + Two phases: + 1. Always propose new actions for high-severity cases (existing behavior). + 2. If pipeline mode is auto-execute, iterate PROPOSED actions and execute + those that clear severity/quorum/local-only gates. + """ + propose_count = _propose_for_recent_cases() + mode = _current_mode("respond") + if mode != PulseMode.AUTO_EXECUTE: + return f"proposed {propose_count} actions; mode={mode.value} → no auto-fire" + fired, skipped = _auto_fire_eligible() + return f"proposed {propose_count}; auto-fired {fired}; skipped {skipped} (gate)" def _run_peer_pull() -> str: @@ -217,7 +411,7 @@ _REGISTRY: Dict[str, Callable[[], str]] = { "classify": _run_classify, "prove": _run_prove, "reindex": _run_reindex, - "respond": _run_respond_propose, + "respond": _run_respond, "peer-pull": _run_peer_pull, "vouch-refresh": _run_vouch_refresh, } From f5ca928f92e29059a924a2aa883f40759c057bb0 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:11:52 +0200 Subject: [PATCH 4/5] stage-auto-d pulse: cockpit auto-response state panel + CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cockpit: - /admin/pulse now renders an "AUTO-RESPONSE STATE" panel above the pipeline table — mode badge (traffic-light colored), threshold, quorum on/off, local-only on/off, auto-fired-in-24h count, last 5 audit entries, and a one-form save for threshold + gates. - POST /admin/pulse/respond-config writes the new gates. CLI: - pulse-respond-config [--threshold …] [--quorum/--no-quorum] [--local-only/--no-local-only] Args left unset are unchanged; echoes the post-state. - pulse-respond-status prints mode, gates, and the last 10 audit entries. Co-Authored-By: Claude Opus 4.7 --- src/psyc/_pulse_cli.py | 60 ++++++++++++++++ src/psyc/cockpit/pulse_routes.py | 41 ++++++++++- src/psyc/cockpit/templates/admin_pulse.html | 80 +++++++++++++++++++++ 3 files changed, 178 insertions(+), 3 deletions(-) diff --git a/src/psyc/_pulse_cli.py b/src/psyc/_pulse_cli.py index 97fee7b..84bd8a5 100644 --- a/src/psyc/_pulse_cli.py +++ b/src/psyc/_pulse_cli.py @@ -14,6 +14,7 @@ import typer from psyc import db from psyc.lines import pulse +from psyc.models import Severity def _relative(dt: Optional[datetime]) -> str: @@ -120,3 +121,62 @@ def register(typer_app: typer.Typer) -> None: db.init_db() pulse.set_kill_switch(False) typer.echo("kill switch disarmed — pulse resumes") + + @typer_app.command("pulse-respond-config") + def pulse_respond_config( + threshold: Optional[str] = typer.Option( + None, "--threshold", help="min severity: low | medium | high | critical" + ), + quorum: Optional[bool] = typer.Option( + None, "--quorum/--no-quorum", help="require quorum on federation-sourced cases" + ), + local_only: Optional[bool] = typer.Option( + None, "--local-only/--no-local-only", + help="when armed, auto-execute defers federation cases until quorum" + ), + ) -> None: + """Update the respond-pipeline auto-fire gates. Args left unset are unchanged.""" + db.init_db() + if threshold is not None: + try: + pulse.set_respond_auto_threshold(Severity(threshold)) + except ValueError: + typer.echo(f"error: unknown severity {threshold!r}", err=True) + raise typer.Exit(1) + if quorum is not None: + pulse.set_respond_require_quorum(quorum) + if local_only is not None: + pulse.set_respond_local_only(local_only) + typer.echo( + f"threshold={pulse.respond_auto_threshold().value} " + f"quorum={'on' if pulse.respond_require_quorum() else 'off'} " + f"local-only={'on' if pulse.respond_local_only() else 'off'}" + ) + + @typer_app.command("pulse-respond-status") + def pulse_respond_status() -> None: + """Print the respond-pipeline gates + the last 10 audit entries.""" + db.init_db() + mode = "manual" + for p in pulse.state(): + if p.name == "respond": + mode = p.mode.value + break + typer.echo(f"respond mode : {mode}") + typer.echo(f"threshold : {pulse.respond_auto_threshold().value}") + typer.echo(f"require quorum : {'yes' if pulse.respond_require_quorum() else 'no'}") + typer.echo(f"local-only : {'yes' if pulse.respond_local_only() else 'no'}") + + audit = db.pulse_audit_recent("respond", limit=10) + if not audit: + typer.echo("(no audit entries yet)") + return + typer.echo("") + typer.echo(f"{'timestamp':<28} {'action':<11} {'case_id':<22} detail") + for row in audit: + typer.echo( + f"{(row['timestamp'] or '')[:27]:<28} " + f"{(row['action'] or ''):<11} " + f"{(row['case_id'] or '—'):<22} " + f"{(row['detail'] or '')[:80]}" + ) diff --git a/src/psyc/cockpit/pulse_routes.py b/src/psyc/cockpit/pulse_routes.py index 7291ff6..c3b5e21 100644 --- a/src/psyc/cockpit/pulse_routes.py +++ b/src/psyc/cockpit/pulse_routes.py @@ -8,15 +8,16 @@ scheduler loop. Caller in app.py just imports + invokes register(). from __future__ import annotations import asyncio -from datetime import datetime, timezone -from typing import Optional +from datetime import datetime, timedelta, timezone +from typing import List, Optional from fastapi import FastAPI, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates -from psyc import log +from psyc import db, log from psyc.lines import pulse +from psyc.models import Severity _log = log.get(__name__) @@ -61,6 +62,10 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None: return RedirectResponse("/admin", status_code=303) flash = request.query_params.get("flash", "") pipelines = pulse.state() + respond_mode = next((p.mode.value for p in pipelines if p.name == "respond"), "manual") + since = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() + auto_fired_24h = db.pulse_audit_count_since("respond", "auto-fire", since) + audit_recent = db.pulse_audit_recent("respond", limit=5) return templates.TemplateResponse( request, "admin_pulse.html", @@ -70,6 +75,13 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None: "tick_interval": TICK_INTERVAL_SECONDS, "relative": _relative, "flash": flash, + "respond_mode": respond_mode, + "respond_threshold": pulse.respond_auto_threshold().value, + "respond_require_quorum": pulse.respond_require_quorum(), + "respond_local_only": pulse.respond_local_only(), + "respond_auto_fired_24h": auto_fired_24h, + "respond_audit_recent": audit_recent, + "severity_choices": [s.value for s in Severity], }, ) @@ -113,6 +125,29 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None: flash = f"run failed: {exc}" return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303) + @app.post("/admin/pulse/respond-config") + def pulse_respond_config( + request: Request, + threshold: str = Form(...), + require_quorum: Optional[str] = Form(None), + local_only: Optional[str] = Form(None), + ) -> RedirectResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + try: + sev = Severity(threshold) + pulse.set_respond_auto_threshold(sev) + pulse.set_respond_require_quorum(require_quorum is not None) + pulse.set_respond_local_only(local_only is not None) + flash = ( + f"respond gates updated: threshold={sev.value}, " + f"quorum={'on' if require_quorum is not None else 'off'}, " + f"local-only={'on' if local_only is not None else 'off'}" + ) + except ValueError as exc: + flash = f"respond-config failed: {exc}" + return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303) + @app.on_event("startup") async def _start_pulse_loop() -> None: # Fire-and-forget; the loop catches its own exceptions and self-restarts. diff --git a/src/psyc/cockpit/templates/admin_pulse.html b/src/psyc/cockpit/templates/admin_pulse.html index f542881..2cc4b8e 100644 --- a/src/psyc/cockpit/templates/admin_pulse.html +++ b/src/psyc/cockpit/templates/admin_pulse.html @@ -35,6 +35,86 @@ {% endif %} +
+
+

AUTO-RESPONSE STATE

+ {{ respond_auto_fired_24h }} auto-fired in last 24h +
+

When the respond pipeline runs in auto-execute, every PROPOSED action that passes all three gates fires automatically. Below shows the live config + audit trail.

+ +
+ {# Mode badge — traffic-light coloring. auto-execute is "armed" (red), auto-propose is amber, manual is green/safe. #} + {% if respond_mode == 'auto-execute' %} +
+ MODE: auto-execute (ARMED) +
+ {% elif respond_mode == 'auto-propose' %} +
+ MODE: auto-propose (staging only) +
+ {% else %} +
+ MODE: manual (no proposals) +
+ {% endif %} +
+ Threshold: {{ respond_threshold|upper }}+ +
+
+ Quorum: {{ 'ON' if respond_require_quorum else 'OFF' }} +
+
+ Local-only: {{ 'ON' if respond_local_only else 'OFF' }} +
+
+ +
+ + + + +
+ + {% if respond_audit_recent %} + + + + + + + + + + + {% for row in respond_audit_recent %} + + + + + + + {% endfor %} + +
timestampdecisioncasedetail
{{ row.timestamp }} + {% if row.action == 'auto-fire' %}✓ {{ row.action }} + {% elif row.action == 'error' %}✗ {{ row.action }} + {% else %}⊘ {{ row.action }}{% endif %} + {{ row.case_id or '—' }}{% if row.action_id %} · #{{ row.action_id }}{% endif %}{{ row.detail or '' }}
+ {% else %} +

No auto-response decisions logged yet.

+ {% endif %} +
+

Pipelines

From c5472b3134684ad8a44ae2a041f424dcb077f8ec Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:12:02 +0200 Subject: [PATCH 5/5] stage-auto-e pulse: tests for auto-response gating MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover the auto-fire decision matrix: - _severity_rank ordering - mode != auto-execute → never fires (auto-propose, manual) - below-threshold action is skipped + audited - federation case + no quorum → skipped + audited "no quorum" - federation case + quorum met → fires - local case + quorum required + local-only on → still fires - local case + quorum required + local-only off → still fires - quorum gating disabled → federation cases fire too - kill switch armed → tick() skips everything - pulse_audit records both auto-fire and skip rows - audit_count_since returns the per-action counts the cockpit needs - config round-trips through pulse_settings Tests patch federation.is_quorum_met (raising=False so the sibling agent can ship the real function later without breaking these), and swap respond.execute_action for a counter so no SOAR sink call escapes. Co-Authored-By: Claude Opus 4.7 --- tests/test_pulse_respond.py | 314 ++++++++++++++++++++++++++++++++++++ 1 file changed, 314 insertions(+) create mode 100644 tests/test_pulse_respond.py diff --git a/tests/test_pulse_respond.py b/tests/test_pulse_respond.py new file mode 100644 index 0000000..c940b3c --- /dev/null +++ b/tests/test_pulse_respond.py @@ -0,0 +1,314 @@ +"""Pulseline auto-response gating — severity threshold, quorum, local-only. + +The runner here is the live `_run_respond` from pulse.py. We point it at a +temp DB, monkeypatch federation.is_quorum_met to a controllable function, and +swap respond.execute_action for a counter so we don't reach the SOAR sink. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import List, Tuple + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import pulse, respond +from psyc.lines import federation +from psyc.models import ( + ActionStatus, + ActionType, + Case, + Classification, + Observables, + ResponseAction, + Severity, + TLP, +) +from psyc.result import Ok + +from conftest import make_case + + +# ----- fixtures -------------------------------------------------------------- + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + """Temp SQLite + the real runner registry. Mode pinned to auto-execute.""" + test_db = tmp_path / "respond.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +@pytest.fixture +def fired(monkeypatch): + """Capture every execute_action(action_id, approver=...) — no SOAR sink call.""" + log: List[Tuple[int, str]] = [] + + def fake_execute(action_id: int, approver: str = "operator"): + log.append((action_id, approver)) + # Re-read the action so we can return a realistic Ok value + got = respond.get_action(action_id) + return got if isinstance(got, Ok) else got + + monkeypatch.setattr(respond, "execute_action", fake_execute) + return log + + +@pytest.fixture +def quorum_yes(monkeypatch): + monkeypatch.setattr(federation, "is_quorum_met", + lambda h, k=None: True, raising=False) + + +@pytest.fixture +def quorum_no(monkeypatch): + monkeypatch.setattr(federation, "is_quorum_met", + lambda h, k=None: False, raising=False) + + +def _set_respond_mode(mode: pulse.PulseMode) -> None: + pulse.set_mode("respond", mode) + + +def _propose_one(case: Case) -> int: + db.upsert_case(case) + ids = respond.propose_for_case(case) + assert ids, "test setup expected at least one action proposed" + return ids[0] + + +# ----- severity rank --------------------------------------------------------- + +def test_severity_rank_ordering(): + assert pulse._severity_rank(Severity.LOW) == 0 + assert pulse._severity_rank(Severity.MEDIUM) == 1 + assert pulse._severity_rank(Severity.HIGH) == 2 + assert pulse._severity_rank(Severity.CRITICAL) == 3 + assert pulse._severity_rank(None) == -1 + + +# ----- runner mode gating ---------------------------------------------------- + +def test_runner_no_auto_fire_when_mode_is_propose(fresh_db, fired, quorum_yes): + case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + # default seed mode for respond is auto-propose → no auto-fire even with PROPOSED actions + result = pulse._run_respond() + assert "no auto-fire" in result + assert fired == [] + + +def test_runner_no_auto_fire_when_manual(fresh_db, fired, quorum_yes): + case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + _set_respond_mode(pulse.PulseMode.MANUAL) + result = pulse._run_respond() + assert "no auto-fire" in result + assert fired == [] + + +# ----- severity threshold ---------------------------------------------------- + +def test_below_threshold_is_skipped(fresh_db, fired, quorum_yes): + # Propose an action carrying severity=MEDIUM by hand — propose_for_case + # only generates HIGH/CRITICAL actions, but the gate must still work for + # any below-threshold severity we drop in. + case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + respond.propose_for_case(case) + # Demote every action's severity to MEDIUM so all should be skipped under HIGH threshold. + from sqlalchemy import update as sa_update + with db.engine().begin() as conn: + conn.execute(sa_update(db.response_actions).values(severity=Severity.MEDIUM.value)) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + + pulse._run_respond() + assert fired == [], "below-threshold action must not fire" + audit = db.pulse_audit_recent("respond", limit=5) + assert any(r["action"] == "skipped" and "below threshold" in (r["detail"] or "") for r in audit) + + +# ----- quorum gate ----------------------------------------------------------- + +def test_federation_case_no_quorum_skipped(fresh_db, fired, quorum_no): + case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + # Mark this case as federation-sourced by inserting a signal row for it. + db.record_signal(dict( + peer_fingerprint="peer-a", + signal_type="case", + signal_id=case.case_id, + signal_hash="dummyhash", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + respond.propose_for_case(case) + + pulse.set_respond_require_quorum(True) + pulse.set_respond_local_only(False) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + + pulse._run_respond() + assert fired == [] + audit = db.pulse_audit_recent("respond", limit=5) + assert any(r["action"] == "skipped" and "no quorum" in (r["detail"] or "") for r in audit) + + +def test_local_case_fires_when_quorum_required(fresh_db, fired, quorum_no): + """Locally-generated cases bypass quorum — they're our own work.""" + case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + # No federation_signals row → locally-generated + respond.propose_for_case(case) + + pulse.set_respond_require_quorum(True) + pulse.set_respond_local_only(True) # both armed; local cases still fire + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + + pulse._run_respond() + assert len(fired) >= 1 + audit = db.pulse_audit_recent("respond", limit=10) + assert any(r["action"] == "auto-fire" for r in audit) + + +def test_local_case_fires_local_only_off(fresh_db, fired, quorum_no): + """Even with local_only OFF, a locally-generated case still fires (no quorum needed).""" + case = make_case(feed="urlhaus", ips=["1.1.1.1"], severity=Severity.CRITICAL) + db.upsert_case(case) + respond.propose_for_case(case) + + pulse.set_respond_require_quorum(True) + pulse.set_respond_local_only(False) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + + pulse._run_respond() + assert len(fired) >= 1 + + +def test_federation_case_with_quorum_fires(fresh_db, fired, quorum_yes): + case = make_case(feed="urlhaus", ips=["2.2.2.2"], severity=Severity.HIGH) + db.upsert_case(case) + db.record_signal(dict( + peer_fingerprint="peer-b", + signal_type="case", + signal_id=case.case_id, + signal_hash="dummyhash2", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + respond.propose_for_case(case) + + pulse.set_respond_require_quorum(True) + pulse.set_respond_local_only(False) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + + pulse._run_respond() + assert len(fired) >= 1 + + +def test_quorum_off_fires_federation_case(fresh_db, fired, quorum_no): + """With quorum gating disabled entirely, federation cases fire too.""" + case = make_case(feed="urlhaus", ips=["3.3.3.3"], severity=Severity.HIGH) + db.upsert_case(case) + db.record_signal(dict( + peer_fingerprint="peer-c", + signal_type="case", + signal_id=case.case_id, + signal_hash="dummyhash3", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + respond.propose_for_case(case) + + pulse.set_respond_require_quorum(False) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + + pulse._run_respond() + assert len(fired) >= 1 + + +# ----- kill switch ----------------------------------------------------------- + +def test_kill_switch_blocks_tick(fresh_db, fired, quorum_yes): + """The parent tick() skips everything when kill switch is armed.""" + case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + respond.propose_for_case(case) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + pulse.set_kill_switch(True) + results = pulse.tick() + assert all(o == "skipped" for _, o, _ in results) + assert fired == [] + + +# ----- audit ----------------------------------------------------------------- + +def test_pulse_audit_records_fire_and_skip(fresh_db, fired, quorum_no): + # Local case → should fire and audit auto-fire + local = make_case(feed="urlhaus", ips=["10.0.0.1"], severity=Severity.HIGH, age_days=1) + db.upsert_case(local) + respond.propose_for_case(local) + + # Federation-sourced case w/o quorum → should skip and audit skip + fedcase = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.HIGH, age_days=2) + db.upsert_case(fedcase) + db.record_signal(dict( + peer_fingerprint="peer-x", + signal_type="case", + signal_id=fedcase.case_id, + signal_hash="xhash", + received_at=datetime.now(timezone.utc).isoformat(), + raw_json="{}", + )) + respond.propose_for_case(fedcase) + + pulse.set_respond_require_quorum(True) + pulse.set_respond_local_only(False) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + pulse._run_respond() + + audit = db.pulse_audit_recent("respond", limit=20) + actions = {r["action"] for r in audit} + assert "auto-fire" in actions + assert "skipped" in actions + + +def test_audit_count_since(fresh_db, fired, quorum_no): + case = make_case(feed="urlhaus", ips=["8.8.8.8"], severity=Severity.HIGH) + db.upsert_case(case) + respond.propose_for_case(case) + pulse.set_respond_require_quorum(True) + pulse.set_respond_auto_threshold(Severity.HIGH) + _set_respond_mode(pulse.PulseMode.AUTO_EXECUTE) + pulse._run_respond() + from datetime import timedelta + since = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() + assert db.pulse_audit_count_since("respond", "auto-fire", since) >= 1 + + +# ----- config round-trip ----------------------------------------------------- + +def test_config_round_trips(fresh_db): + assert pulse.respond_auto_threshold() == Severity.HIGH + assert pulse.respond_require_quorum() is True + assert pulse.respond_local_only() is False + + pulse.set_respond_auto_threshold(Severity.CRITICAL) + pulse.set_respond_require_quorum(False) + pulse.set_respond_local_only(True) + + assert pulse.respond_auto_threshold() == Severity.CRITICAL + assert pulse.respond_require_quorum() is False + assert pulse.respond_local_only() is True