merge auto-response: severity/quorum/local-only gated execution
# Conflicts: # src/psyc/db.py
This commit is contained in:
@@ -14,6 +14,7 @@ import typer
|
||||
|
||||
from psyc import db
|
||||
from psyc.lines import pulse
|
||||
from psyc.models import Severity
|
||||
|
||||
|
||||
def _relative(dt: Optional[datetime]) -> str:
|
||||
@@ -120,3 +121,62 @@ def register(typer_app: typer.Typer) -> None:
|
||||
db.init_db()
|
||||
pulse.set_kill_switch(False)
|
||||
typer.echo("kill switch disarmed — pulse resumes")
|
||||
|
||||
@typer_app.command("pulse-respond-config")
|
||||
def pulse_respond_config(
|
||||
threshold: Optional[str] = typer.Option(
|
||||
None, "--threshold", help="min severity: low | medium | high | critical"
|
||||
),
|
||||
quorum: Optional[bool] = typer.Option(
|
||||
None, "--quorum/--no-quorum", help="require quorum on federation-sourced cases"
|
||||
),
|
||||
local_only: Optional[bool] = typer.Option(
|
||||
None, "--local-only/--no-local-only",
|
||||
help="when armed, auto-execute defers federation cases until quorum"
|
||||
),
|
||||
) -> None:
|
||||
"""Update the respond-pipeline auto-fire gates. Args left unset are unchanged."""
|
||||
db.init_db()
|
||||
if threshold is not None:
|
||||
try:
|
||||
pulse.set_respond_auto_threshold(Severity(threshold))
|
||||
except ValueError:
|
||||
typer.echo(f"error: unknown severity {threshold!r}", err=True)
|
||||
raise typer.Exit(1)
|
||||
if quorum is not None:
|
||||
pulse.set_respond_require_quorum(quorum)
|
||||
if local_only is not None:
|
||||
pulse.set_respond_local_only(local_only)
|
||||
typer.echo(
|
||||
f"threshold={pulse.respond_auto_threshold().value} "
|
||||
f"quorum={'on' if pulse.respond_require_quorum() else 'off'} "
|
||||
f"local-only={'on' if pulse.respond_local_only() else 'off'}"
|
||||
)
|
||||
|
||||
@typer_app.command("pulse-respond-status")
|
||||
def pulse_respond_status() -> None:
|
||||
"""Print the respond-pipeline gates + the last 10 audit entries."""
|
||||
db.init_db()
|
||||
mode = "manual"
|
||||
for p in pulse.state():
|
||||
if p.name == "respond":
|
||||
mode = p.mode.value
|
||||
break
|
||||
typer.echo(f"respond mode : {mode}")
|
||||
typer.echo(f"threshold : {pulse.respond_auto_threshold().value}")
|
||||
typer.echo(f"require quorum : {'yes' if pulse.respond_require_quorum() else 'no'}")
|
||||
typer.echo(f"local-only : {'yes' if pulse.respond_local_only() else 'no'}")
|
||||
|
||||
audit = db.pulse_audit_recent("respond", limit=10)
|
||||
if not audit:
|
||||
typer.echo("(no audit entries yet)")
|
||||
return
|
||||
typer.echo("")
|
||||
typer.echo(f"{'timestamp':<28} {'action':<11} {'case_id':<22} detail")
|
||||
for row in audit:
|
||||
typer.echo(
|
||||
f"{(row['timestamp'] or '')[:27]:<28} "
|
||||
f"{(row['action'] or ''):<11} "
|
||||
f"{(row['case_id'] or '—'):<22} "
|
||||
f"{(row['detail'] or '')[:80]}"
|
||||
)
|
||||
|
||||
@@ -8,15 +8,16 @@ scheduler loop. Caller in app.py just imports + invokes register().
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import FastAPI, Form, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
from psyc import log
|
||||
from psyc import db, log
|
||||
from psyc.lines import pulse
|
||||
from psyc.models import Severity
|
||||
|
||||
|
||||
_log = log.get(__name__)
|
||||
@@ -61,6 +62,10 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None:
|
||||
return RedirectResponse("/admin", status_code=303)
|
||||
flash = request.query_params.get("flash", "")
|
||||
pipelines = pulse.state()
|
||||
respond_mode = next((p.mode.value for p in pipelines if p.name == "respond"), "manual")
|
||||
since = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
|
||||
auto_fired_24h = db.pulse_audit_count_since("respond", "auto-fire", since)
|
||||
audit_recent = db.pulse_audit_recent("respond", limit=5)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"admin_pulse.html",
|
||||
@@ -70,6 +75,13 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None:
|
||||
"tick_interval": TICK_INTERVAL_SECONDS,
|
||||
"relative": _relative,
|
||||
"flash": flash,
|
||||
"respond_mode": respond_mode,
|
||||
"respond_threshold": pulse.respond_auto_threshold().value,
|
||||
"respond_require_quorum": pulse.respond_require_quorum(),
|
||||
"respond_local_only": pulse.respond_local_only(),
|
||||
"respond_auto_fired_24h": auto_fired_24h,
|
||||
"respond_audit_recent": audit_recent,
|
||||
"severity_choices": [s.value for s in Severity],
|
||||
},
|
||||
)
|
||||
|
||||
@@ -113,6 +125,29 @@ def register(app: FastAPI, templates: Jinja2Templates) -> None:
|
||||
flash = f"run failed: {exc}"
|
||||
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
|
||||
|
||||
@app.post("/admin/pulse/respond-config")
|
||||
def pulse_respond_config(
|
||||
request: Request,
|
||||
threshold: str = Form(...),
|
||||
require_quorum: Optional[str] = Form(None),
|
||||
local_only: Optional[str] = Form(None),
|
||||
) -> RedirectResponse:
|
||||
if not _admin_ok(request):
|
||||
return RedirectResponse("/admin", status_code=303)
|
||||
try:
|
||||
sev = Severity(threshold)
|
||||
pulse.set_respond_auto_threshold(sev)
|
||||
pulse.set_respond_require_quorum(require_quorum is not None)
|
||||
pulse.set_respond_local_only(local_only is not None)
|
||||
flash = (
|
||||
f"respond gates updated: threshold={sev.value}, "
|
||||
f"quorum={'on' if require_quorum is not None else 'off'}, "
|
||||
f"local-only={'on' if local_only is not None else 'off'}"
|
||||
)
|
||||
except ValueError as exc:
|
||||
flash = f"respond-config failed: {exc}"
|
||||
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
|
||||
|
||||
@app.on_event("startup")
|
||||
async def _start_pulse_loop() -> None:
|
||||
# Fire-and-forget; the loop catches its own exceptions and self-restarts.
|
||||
|
||||
@@ -35,6 +35,86 @@
|
||||
{% endif %}
|
||||
</section>
|
||||
|
||||
<section class="panel">
|
||||
<div class="panel-head">
|
||||
<h2>AUTO-RESPONSE STATE</h2>
|
||||
<span class="count">{{ respond_auto_fired_24h }} auto-fired in last 24h</span>
|
||||
</div>
|
||||
<p class="page-intro">When the <code>respond</code> pipeline runs in <code>auto-execute</code>, every PROPOSED action that passes all three gates fires automatically. Below shows the live config + audit trail.</p>
|
||||
|
||||
<div style="display:flex; gap:14px; flex-wrap:wrap; margin:14px 0;">
|
||||
{# Mode badge — traffic-light coloring. auto-execute is "armed" (red), auto-propose is amber, manual is green/safe. #}
|
||||
{% if respond_mode == 'auto-execute' %}
|
||||
<div style="padding:10px 14px; border-radius:6px; background:rgba(248,113,113,0.12); border:1px solid rgba(248,113,113,0.45); color:#fca5a5; font-weight:700; letter-spacing:0.04em;">
|
||||
MODE: auto-execute (ARMED)
|
||||
</div>
|
||||
{% elif respond_mode == 'auto-propose' %}
|
||||
<div style="padding:10px 14px; border-radius:6px; background:rgba(250,204,21,0.12); border:1px solid rgba(250,204,21,0.45); color:#fde047; font-weight:700; letter-spacing:0.04em;">
|
||||
MODE: auto-propose (staging only)
|
||||
</div>
|
||||
{% else %}
|
||||
<div style="padding:10px 14px; border-radius:6px; background:rgba(74,222,128,0.10); border:1px solid var(--green); color:var(--green); font-weight:700; letter-spacing:0.04em;">
|
||||
MODE: manual (no proposals)
|
||||
</div>
|
||||
{% endif %}
|
||||
<div style="padding:10px 14px; border-radius:6px; background:var(--panel-2); border:1px solid var(--border); color:var(--text);">
|
||||
Threshold: <strong>{{ respond_threshold|upper }}+</strong>
|
||||
</div>
|
||||
<div style="padding:10px 14px; border-radius:6px; background:var(--panel-2); border:1px solid var(--border); color:var(--text);">
|
||||
Quorum: <strong style="color: {{ 'var(--green)' if respond_require_quorum else 'var(--muted)' }};">{{ 'ON' if respond_require_quorum else 'OFF' }}</strong>
|
||||
</div>
|
||||
<div style="padding:10px 14px; border-radius:6px; background:var(--panel-2); border:1px solid var(--border); color:var(--text);">
|
||||
Local-only: <strong style="color: {{ 'var(--green)' if respond_local_only else 'var(--muted)' }};">{{ 'ON' if respond_local_only else 'OFF' }}</strong>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<form method="post" action="/admin/pulse/respond-config" style="display:flex; gap:10px; align-items:center; flex-wrap:wrap; margin-top:14px;">
|
||||
<label style="font-size:12px;">Min severity:
|
||||
<select name="threshold" style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px;">
|
||||
{% for s in severity_choices %}
|
||||
<option value="{{ s }}" {% if s == respond_threshold %}selected{% endif %}>{{ s }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</label>
|
||||
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
|
||||
<input type="checkbox" name="require_quorum" value="1" {% if respond_require_quorum %}checked{% endif %}> require quorum
|
||||
</label>
|
||||
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
|
||||
<input type="checkbox" name="local_only" value="1" {% if respond_local_only %}checked{% endif %}> local-only
|
||||
</label>
|
||||
<button type="submit" class="btn">save gates</button>
|
||||
</form>
|
||||
|
||||
{% if respond_audit_recent %}
|
||||
<table class="ledger" style="margin-top:18px;">
|
||||
<thead>
|
||||
<tr>
|
||||
<th style="width:18%;">timestamp</th>
|
||||
<th style="width:12%;">decision</th>
|
||||
<th style="width:18%;">case</th>
|
||||
<th>detail</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for row in respond_audit_recent %}
|
||||
<tr class="ledger-row">
|
||||
<td class="lg-ts">{{ row.timestamp }}</td>
|
||||
<td>
|
||||
{% if row.action == 'auto-fire' %}<span style="color: var(--green);">✓ {{ row.action }}</span>
|
||||
{% elif row.action == 'error' %}<span style="color: var(--red);">✗ {{ row.action }}</span>
|
||||
{% else %}<span style="color: var(--muted);">⊘ {{ row.action }}</span>{% endif %}
|
||||
</td>
|
||||
<td class="lg-sub"><code>{{ row.case_id or '—' }}</code>{% if row.action_id %} · #{{ row.action_id }}{% endif %}</td>
|
||||
<td class="lg-sub">{{ row.detail or '' }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
{% else %}
|
||||
<p class="lg-sub" style="margin-top:14px;">No auto-response decisions logged yet.</p>
|
||||
{% endif %}
|
||||
</section>
|
||||
|
||||
<section class="panel">
|
||||
<div class="panel-head">
|
||||
<h2>Pipelines</h2>
|
||||
|
||||
@@ -134,6 +134,19 @@ pulse_settings = Table(
|
||||
Column("value", String, nullable=False),
|
||||
)
|
||||
|
||||
pulse_audit = Table(
|
||||
"pulse_audit", _metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||||
Column("pipeline", String, nullable=False), # 'respond' | 'fetch' | ...
|
||||
Column("action", String, nullable=False), # 'auto-fire' | 'skipped' | 'error'
|
||||
Column("action_id", Integer, nullable=True), # response_actions.id when relevant
|
||||
Column("case_id", String, nullable=True),
|
||||
Column("detail", Text, nullable=True),
|
||||
Column("timestamp", String, nullable=False), # ISO
|
||||
)
|
||||
Index("pulse_audit_pipeline_idx", pulse_audit.c.pipeline, pulse_audit.c.timestamp.desc())
|
||||
Index("pulse_audit_action_id_idx", pulse_audit.c.action_id)
|
||||
|
||||
peers = Table(
|
||||
"peers", _metadata,
|
||||
Column("domain", String, primary_key=True),
|
||||
@@ -361,6 +374,47 @@ def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
|
||||
conn.execute(stmt)
|
||||
|
||||
|
||||
# ---------- pulse audit trail --------------------------------------------
|
||||
|
||||
def pulse_audit_record(row: dict, db_path: Path = DB_PATH) -> int:
|
||||
"""Append one pulse_audit row. Returns its id.
|
||||
|
||||
`row` must include 'pipeline', 'action', 'timestamp'. action_id, case_id,
|
||||
detail are optional. Caller controls timestamp so tests can pin it.
|
||||
"""
|
||||
stmt = insert(pulse_audit).values(**row)
|
||||
with engine(db_path).begin() as conn:
|
||||
res = conn.execute(stmt)
|
||||
return int(res.inserted_primary_key[0])
|
||||
|
||||
|
||||
def pulse_audit_recent(pipeline: str, limit: int = 25, db_path: Path = DB_PATH) -> List[dict]:
|
||||
"""Most-recent audit rows for one pipeline (newest first)."""
|
||||
stmt = (
|
||||
select(pulse_audit)
|
||||
.where(pulse_audit.c.pipeline == pipeline)
|
||||
.order_by(pulse_audit.c.timestamp.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
|
||||
def pulse_audit_count_since(
|
||||
pipeline: str, action: str, since_iso: str, db_path: Path = DB_PATH
|
||||
) -> int:
|
||||
"""Count audit rows for (pipeline, action) at or after `since_iso`."""
|
||||
stmt = (
|
||||
select(func.count())
|
||||
.select_from(pulse_audit)
|
||||
.where(pulse_audit.c.pipeline == pipeline)
|
||||
.where(pulse_audit.c.action == action)
|
||||
.where(pulse_audit.c.timestamp >= since_iso)
|
||||
)
|
||||
with engine(db_path).connect() as conn:
|
||||
return int(conn.execute(stmt).scalar_one())
|
||||
|
||||
|
||||
# ---------- federation: peers + signal buffer ----------------------------
|
||||
|
||||
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
|
||||
@@ -527,3 +581,18 @@ def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB
|
||||
stmt = select(translog).where(cond).order_by(translog.c.id.asc())
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
|
||||
def signals_for_case(case_id: str, db_path: Path = DB_PATH) -> List[dict]:
|
||||
"""All federation signals attached to this case_id (signal_type='case').
|
||||
|
||||
Empty list means no peer has ever sent us this case → we generated it
|
||||
ourselves and it counts as locally-sourced for auto-fire purposes.
|
||||
"""
|
||||
stmt = (
|
||||
select(federation_signals)
|
||||
.where(federation_signals.c.signal_type == "case")
|
||||
.where(federation_signals.c.signal_id == case_id)
|
||||
)
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
@@ -22,6 +22,7 @@ from typing import Callable, Dict, List, Optional, Tuple
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from psyc import db, log
|
||||
from psyc.models import Severity
|
||||
|
||||
|
||||
_log = log.get(__name__)
|
||||
@@ -33,6 +34,73 @@ class PulseMode(str, Enum):
|
||||
AUTO_EXECUTE = "auto-execute"
|
||||
|
||||
|
||||
# ---------- respond auto-fire gates -----------------------------------------
|
||||
# Persisted as rows in pulse_settings (key/value pairs). All defaults are
|
||||
# "safe" — quorum required, HIGH threshold, federation cases permitted only
|
||||
# when quorum-met.
|
||||
|
||||
_KEY_RESPOND_THRESHOLD = "respond_auto_threshold"
|
||||
_KEY_RESPOND_REQUIRE_QUORUM = "respond_require_quorum"
|
||||
_KEY_RESPOND_LOCAL_ONLY = "respond_local_only"
|
||||
|
||||
_DEFAULT_THRESHOLD = Severity.HIGH
|
||||
_DEFAULT_REQUIRE_QUORUM = True
|
||||
_DEFAULT_LOCAL_ONLY = False
|
||||
|
||||
|
||||
def _severity_rank(sev: Optional[Severity]) -> int:
|
||||
"""Rank order for severity threshold comparison. Unknown / None → -1."""
|
||||
if sev is None:
|
||||
return -1
|
||||
return {
|
||||
Severity.LOW: 0,
|
||||
Severity.MEDIUM: 1,
|
||||
Severity.HIGH: 2,
|
||||
Severity.CRITICAL: 3,
|
||||
}.get(sev, -1)
|
||||
|
||||
|
||||
def respond_auto_threshold() -> Severity:
|
||||
raw = db.pulse_setting_get(_KEY_RESPOND_THRESHOLD)
|
||||
if raw is None:
|
||||
return _DEFAULT_THRESHOLD
|
||||
try:
|
||||
return Severity(raw)
|
||||
except ValueError:
|
||||
return _DEFAULT_THRESHOLD
|
||||
|
||||
|
||||
def set_respond_auto_threshold(sev: Severity) -> None:
|
||||
if not isinstance(sev, Severity):
|
||||
raise ValueError(f"not a Severity: {sev!r}")
|
||||
db.pulse_setting_set(_KEY_RESPOND_THRESHOLD, sev.value)
|
||||
_log.info("pulse.respond.threshold.changed", severity=sev.value)
|
||||
|
||||
|
||||
def respond_require_quorum() -> bool:
|
||||
raw = db.pulse_setting_get(_KEY_RESPOND_REQUIRE_QUORUM)
|
||||
if raw is None:
|
||||
return _DEFAULT_REQUIRE_QUORUM
|
||||
return raw == "1"
|
||||
|
||||
|
||||
def set_respond_require_quorum(state: bool) -> None:
|
||||
db.pulse_setting_set(_KEY_RESPOND_REQUIRE_QUORUM, "1" if state else "0")
|
||||
_log.info("pulse.respond.quorum.changed", required=bool(state))
|
||||
|
||||
|
||||
def respond_local_only() -> bool:
|
||||
raw = db.pulse_setting_get(_KEY_RESPOND_LOCAL_ONLY)
|
||||
if raw is None:
|
||||
return _DEFAULT_LOCAL_ONLY
|
||||
return raw == "1"
|
||||
|
||||
|
||||
def set_respond_local_only(state: bool) -> None:
|
||||
db.pulse_setting_set(_KEY_RESPOND_LOCAL_ONLY, "1" if state else "0")
|
||||
_log.info("pulse.respond.local-only.changed", local_only=bool(state))
|
||||
|
||||
|
||||
class Pipeline(BaseModel):
|
||||
name: str
|
||||
title: str
|
||||
@@ -114,24 +182,218 @@ def _run_reindex() -> str:
|
||||
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
|
||||
|
||||
|
||||
def _run_respond_propose() -> str:
|
||||
def _propose_for_recent_cases() -> int:
|
||||
"""Propose response actions for high-severity cases that don't yet have any.
|
||||
|
||||
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
|
||||
helper is already idempotent per case (skips cases that already have
|
||||
actions), so re-running this on a tick is safe.
|
||||
Returns total proposed-action count. Idempotent per case (respond's
|
||||
propose_for_case skips cases that already have actions).
|
||||
"""
|
||||
from psyc.lines import respond
|
||||
|
||||
cases = db.list_cases(limit=10_000)
|
||||
proposed = 0
|
||||
touched = 0
|
||||
for c in cases:
|
||||
ids = respond.propose_for_case(c)
|
||||
if ids:
|
||||
proposed += len(ids)
|
||||
touched += 1
|
||||
return f"proposed {proposed} action(s) for {touched} case(s)"
|
||||
return proposed
|
||||
|
||||
|
||||
def _current_mode(pipeline_name: str) -> PulseMode:
|
||||
p = _get_pipeline(pipeline_name)
|
||||
return p.mode if p is not None else PulseMode.MANUAL
|
||||
|
||||
|
||||
def _is_quorum_met(case_digest_hash: str) -> bool:
|
||||
"""Wrapper for federation.is_quorum_met that tolerates the sibling agent
|
||||
not having shipped the function yet.
|
||||
|
||||
If federation lacks `is_quorum_met`, we fall back to False — the safe
|
||||
default ("no quorum signal → don't fire federation cases").
|
||||
"""
|
||||
try:
|
||||
from psyc.lines import federation as _federation
|
||||
fn = getattr(_federation, "is_quorum_met", None)
|
||||
if fn is None:
|
||||
return False
|
||||
return bool(fn(case_digest_hash))
|
||||
except Exception as exc: # noqa: BLE001 — defensive: any import / runtime miss → safe-false
|
||||
_log.warning("pulse.respond.quorum.unavailable", error=str(exc))
|
||||
return False
|
||||
|
||||
|
||||
def _canonical_json_local(obj: Dict[str, object]) -> bytes:
|
||||
"""Deterministic JSON serialization — mirrors federation.canonical_json
|
||||
for the case-digest computation. Local copy so we don't hard-require
|
||||
federation to be importable.
|
||||
"""
|
||||
import json
|
||||
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
|
||||
|
||||
def _case_digest_hash(case_id: str) -> str:
|
||||
"""SHA-256 of the canonical JSON of {case_id: ...} — what federation hashes.
|
||||
|
||||
Returns "" if the case can't be loaded (e.g. row vanished mid-fire).
|
||||
"""
|
||||
import hashlib
|
||||
from psyc.result import Ok as _Ok
|
||||
got = db.get_case(case_id)
|
||||
if not isinstance(got, _Ok):
|
||||
return ""
|
||||
case = got.value
|
||||
# Mirror federation._build_case_records' record shape so digests match.
|
||||
record = {
|
||||
"case_id": case.case_id,
|
||||
"summary": case.summary,
|
||||
"severity": case.classification.severity.value if case.classification.severity else None,
|
||||
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
|
||||
"observed_at": case.observed_at.isoformat(),
|
||||
"feed_source": case.source_metadata.get("feed", ""),
|
||||
"iocs": (
|
||||
[{"value": v, "type": "url"} for v in case.observables.urls]
|
||||
+ [{"value": v, "type": "domain"} for v in case.observables.domains]
|
||||
+ [{"value": v, "type": "ip"} for v in case.observables.ips]
|
||||
+ [{"value": v, "type": "hash"} for v in case.observables.hashes]
|
||||
+ [{"value": v, "type": "cve"} for v in case.observables.cves]
|
||||
),
|
||||
}
|
||||
return hashlib.sha256(_canonical_json_local(record)).hexdigest()
|
||||
|
||||
|
||||
def _case_is_local(case_id: str) -> bool:
|
||||
"""True if no federation peer has ever pushed us this case_id."""
|
||||
return len(db.signals_for_case(case_id)) == 0
|
||||
|
||||
|
||||
def _audit(action: str, *, action_id: Optional[int] = None,
|
||||
case_id: Optional[str] = None, detail: str = "") -> None:
|
||||
db.pulse_audit_record(dict(
|
||||
pipeline="respond",
|
||||
action=action,
|
||||
action_id=action_id,
|
||||
case_id=case_id,
|
||||
detail=detail[:500],
|
||||
timestamp=_now().isoformat(),
|
||||
))
|
||||
|
||||
|
||||
def _auto_fire_eligible() -> Tuple[int, int]:
|
||||
"""Iterate PROPOSED actions and execute the ones that clear every gate.
|
||||
|
||||
Returns (fired_count, skipped_count). Records a pulse_audit row for every
|
||||
decision (fired or skipped-with-reason) so the cockpit can show history.
|
||||
A single failing action never aborts the batch.
|
||||
"""
|
||||
from psyc.lines import respond
|
||||
from psyc.models import ActionStatus
|
||||
from psyc.result import Ok as _Ok
|
||||
|
||||
threshold = respond_auto_threshold()
|
||||
threshold_rank = _severity_rank(threshold)
|
||||
require_quorum = respond_require_quorum()
|
||||
local_only = respond_local_only()
|
||||
|
||||
fired = 0
|
||||
skipped = 0
|
||||
|
||||
actions = respond.list_actions(status=ActionStatus.PROPOSED, limit=100)
|
||||
for action in actions:
|
||||
# Re-hydrate severity enum (action.severity is the .value string).
|
||||
try:
|
||||
sev_enum = Severity(action.severity) if action.severity else None
|
||||
except ValueError:
|
||||
sev_enum = None
|
||||
|
||||
if _severity_rank(sev_enum) < threshold_rank:
|
||||
skipped += 1
|
||||
_audit(
|
||||
"skipped",
|
||||
action_id=action.id,
|
||||
case_id=action.case_id,
|
||||
detail=f"below threshold: severity={action.severity!r} < {threshold.value}",
|
||||
)
|
||||
continue
|
||||
|
||||
is_local = _case_is_local(action.case_id)
|
||||
if require_quorum and not is_local:
|
||||
digest = _case_digest_hash(action.case_id)
|
||||
if not digest or not _is_quorum_met(digest):
|
||||
if local_only:
|
||||
# local-only is armed but this case was imported via federation
|
||||
# → defer (don't fire) until federation grants quorum
|
||||
skipped += 1
|
||||
_audit(
|
||||
"skipped",
|
||||
action_id=action.id,
|
||||
case_id=action.case_id,
|
||||
detail="local-only armed + federation-sourced case",
|
||||
)
|
||||
continue
|
||||
skipped += 1
|
||||
_audit(
|
||||
"skipped",
|
||||
action_id=action.id,
|
||||
case_id=action.case_id,
|
||||
detail="no quorum on federation-sourced case",
|
||||
)
|
||||
continue
|
||||
# else: quorum disabled, or case is locally-generated → fire.
|
||||
|
||||
try:
|
||||
result = respond.execute_action(action.id, approver="pulse-auto")
|
||||
except Exception as exc: # noqa: BLE001 — one bad action shouldn't kill the batch
|
||||
skipped += 1
|
||||
_audit(
|
||||
"error",
|
||||
action_id=action.id,
|
||||
case_id=action.case_id,
|
||||
detail=f"execute raised: {type(exc).__name__}: {exc}",
|
||||
)
|
||||
_log.warning("pulse.respond.auto-fire.error",
|
||||
action_id=action.id, error=str(exc))
|
||||
continue
|
||||
|
||||
if isinstance(result, _Ok):
|
||||
fired += 1
|
||||
_log.info("pulse.respond.auto-fire",
|
||||
action_id=action.id, case_id=action.case_id,
|
||||
type=action.action_type.value, target=action.target)
|
||||
_audit(
|
||||
"auto-fire",
|
||||
action_id=action.id,
|
||||
case_id=action.case_id,
|
||||
detail=f"{action.action_type.value} → {action.target}",
|
||||
)
|
||||
else:
|
||||
# Err path — execute_action returned Err (e.g. SOAR sink down)
|
||||
reason = getattr(result, "reason", "unknown")
|
||||
skipped += 1
|
||||
_audit(
|
||||
"error",
|
||||
action_id=action.id,
|
||||
case_id=action.case_id,
|
||||
detail=f"execute failed: {reason}",
|
||||
)
|
||||
_log.warning("pulse.respond.auto-fire.failed",
|
||||
action_id=action.id, reason=str(reason))
|
||||
|
||||
return fired, skipped
|
||||
|
||||
|
||||
def _run_respond() -> str:
|
||||
"""Propose + (when mode is auto-execute) auto-fire eligible PROPOSED actions.
|
||||
|
||||
Two phases:
|
||||
1. Always propose new actions for high-severity cases (existing behavior).
|
||||
2. If pipeline mode is auto-execute, iterate PROPOSED actions and execute
|
||||
those that clear severity/quorum/local-only gates.
|
||||
"""
|
||||
propose_count = _propose_for_recent_cases()
|
||||
mode = _current_mode("respond")
|
||||
if mode != PulseMode.AUTO_EXECUTE:
|
||||
return f"proposed {propose_count} actions; mode={mode.value} → no auto-fire"
|
||||
fired, skipped = _auto_fire_eligible()
|
||||
return f"proposed {propose_count}; auto-fired {fired}; skipped {skipped} (gate)"
|
||||
|
||||
|
||||
_DISCOVERY_SEEDS_KEY = "discovery_seeds"
|
||||
@@ -189,7 +451,7 @@ _REGISTRY: Dict[str, Callable[[], str]] = {
|
||||
"classify": _run_classify,
|
||||
"prove": _run_prove,
|
||||
"reindex": _run_reindex,
|
||||
"respond": _run_respond_propose,
|
||||
"respond": _run_respond,
|
||||
"peer-pull": _run_peer_pull,
|
||||
"vouch-refresh": _run_vouch_refresh,
|
||||
}
|
||||
|
||||
314
tests/test_pulse_respond.py
Normal file
314
tests/test_pulse_respond.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""Pulseline auto-response gating — severity threshold, quorum, local-only.
|
||||
|
||||
The runner here is the live `_run_respond` from pulse.py. We point it at a
|
||||
temp DB, monkeypatch federation.is_quorum_met to a controllable function, and
|
||||
swap respond.execute_action for a counter so we don't reach the SOAR sink.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from psyc import db
|
||||
from psyc.lines import pulse, respond
|
||||
from psyc.lines import federation
|
||||
from psyc.models import (
|
||||
ActionStatus,
|
||||
ActionType,
|
||||
Case,
|
||||
Classification,
|
||||
Observables,
|
||||
ResponseAction,
|
||||
Severity,
|
||||
TLP,
|
||||
)
|
||||
from psyc.result import Ok
|
||||
|
||||
from conftest import make_case
|
||||
|
||||
|
||||
# ----- fixtures --------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
"""Temp SQLite + the real runner registry. Mode pinned to auto-execute."""
|
||||
test_db = tmp_path / "respond.db"
|
||||
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
||||
db._metadata.create_all(eng, checkfirst=True)
|
||||
monkeypatch.setattr(db, "_engine", eng)
|
||||
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||
yield test_db
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fired(monkeypatch):
|
||||
"""Capture every execute_action(action_id, approver=...) — no SOAR sink call."""
|
||||
log: List[Tuple[int, str]] = []
|
||||
|
||||
def fake_execute(action_id: int, approver: str = "operator"):
|
||||
log.append((action_id, approver))
|
||||
# Re-read the action so we can return a realistic Ok value
|
||||
got = respond.get_action(action_id)
|
||||
return got if isinstance(got, Ok) else got
|
||||
|
||||
monkeypatch.setattr(respond, "execute_action", fake_execute)
|
||||
return log
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def quorum_yes(monkeypatch):
|
||||
monkeypatch.setattr(federation, "is_quorum_met",
|
||||
lambda h, k=None: True, raising=False)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def quorum_no(monkeypatch):
|
||||
monkeypatch.setattr(federation, "is_quorum_met",
|
||||
lambda h, k=None: False, raising=False)
|
||||
|
||||
|
||||
def _set_respond_mode(mode: pulse.PulseMode) -> None:
|
||||
pulse.set_mode("respond", mode)
|
||||
|
||||
|
||||
def _propose_one(case: Case) -> int:
|
||||
db.upsert_case(case)
|
||||
ids = respond.propose_for_case(case)
|
||||
assert ids, "test setup expected at least one action proposed"
|
||||
return ids[0]
|
||||
|
||||
|
||||
# ----- severity rank ---------------------------------------------------------
|
||||
|
||||
def test_severity_rank_ordering():
|
||||
assert pulse._severity_rank(Severity.LOW) == 0
|
||||
assert pulse._severity_rank(Severity.MEDIUM) == 1
|
||||
assert pulse._severity_rank(Severity.HIGH) == 2
|
||||
assert pulse._severity_rank(Severity.CRITICAL) == 3
|
||||
assert pulse._severity_rank(None) == -1
|
||||
|
||||
|
||||
# ----- runner mode gating ----------------------------------------------------
|
||||
|
||||
def test_runner_no_auto_fire_when_mode_is_propose(fresh_db, fired, quorum_yes):
|
||||
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
# default seed mode for respond is auto-propose → no auto-fire even with PROPOSED actions
|
||||
result = pulse._run_respond()
|
||||
assert "no auto-fire" in result
|
||||
assert fired == []
|
||||
|
||||
|
||||
def test_runner_no_auto_fire_when_manual(fresh_db, fired, quorum_yes):
|
||||
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
_set_respond_mode(pulse.PulseMode.MANUAL)
|
||||
result = pulse._run_respond()
|
||||
assert "no auto-fire" in result
|
||||
assert fired == []
|
||||
|
||||
|
||||
# ----- severity threshold ----------------------------------------------------
|
||||
|
||||
def test_below_threshold_is_skipped(fresh_db, fired, quorum_yes):
|
||||
# Propose an action carrying severity=MEDIUM by hand — propose_for_case
|
||||
# only generates HIGH/CRITICAL actions, but the gate must still work for
|
||||
# any below-threshold severity we drop in.
|
||||
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
respond.propose_for_case(case)
|
||||
# Demote every action's severity to MEDIUM so all should be skipped under HIGH threshold.
|
||||
from sqlalchemy import update as sa_update
|
||||
with db.engine().begin() as conn:
|
||||
conn.execute(sa_update(db.response_actions).values(severity=Severity.MEDIUM.value))
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
|
||||
pulse._run_respond()
|
||||
assert fired == [], "below-threshold action must not fire"
|
||||
audit = db.pulse_audit_recent("respond", limit=5)
|
||||
assert any(r["action"] == "skipped" and "below threshold" in (r["detail"] or "") for r in audit)
|
||||
|
||||
|
||||
# ----- quorum gate -----------------------------------------------------------
|
||||
|
||||
def test_federation_case_no_quorum_skipped(fresh_db, fired, quorum_no):
|
||||
case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
# Mark this case as federation-sourced by inserting a signal row for it.
|
||||
db.record_signal(dict(
|
||||
peer_fingerprint="peer-a",
|
||||
signal_type="case",
|
||||
signal_id=case.case_id,
|
||||
signal_hash="dummyhash",
|
||||
received_at=datetime.now(timezone.utc).isoformat(),
|
||||
raw_json="{}",
|
||||
))
|
||||
respond.propose_for_case(case)
|
||||
|
||||
pulse.set_respond_require_quorum(True)
|
||||
pulse.set_respond_local_only(False)
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
|
||||
pulse._run_respond()
|
||||
assert fired == []
|
||||
audit = db.pulse_audit_recent("respond", limit=5)
|
||||
assert any(r["action"] == "skipped" and "no quorum" in (r["detail"] or "") for r in audit)
|
||||
|
||||
|
||||
def test_local_case_fires_when_quorum_required(fresh_db, fired, quorum_no):
|
||||
"""Locally-generated cases bypass quorum — they're our own work."""
|
||||
case = make_case(feed="urlhaus", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
# No federation_signals row → locally-generated
|
||||
respond.propose_for_case(case)
|
||||
|
||||
pulse.set_respond_require_quorum(True)
|
||||
pulse.set_respond_local_only(True) # both armed; local cases still fire
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
|
||||
pulse._run_respond()
|
||||
assert len(fired) >= 1
|
||||
audit = db.pulse_audit_recent("respond", limit=10)
|
||||
assert any(r["action"] == "auto-fire" for r in audit)
|
||||
|
||||
|
||||
def test_local_case_fires_local_only_off(fresh_db, fired, quorum_no):
|
||||
"""Even with local_only OFF, a locally-generated case still fires (no quorum needed)."""
|
||||
case = make_case(feed="urlhaus", ips=["1.1.1.1"], severity=Severity.CRITICAL)
|
||||
db.upsert_case(case)
|
||||
respond.propose_for_case(case)
|
||||
|
||||
pulse.set_respond_require_quorum(True)
|
||||
pulse.set_respond_local_only(False)
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
|
||||
pulse._run_respond()
|
||||
assert len(fired) >= 1
|
||||
|
||||
|
||||
def test_federation_case_with_quorum_fires(fresh_db, fired, quorum_yes):
|
||||
case = make_case(feed="urlhaus", ips=["2.2.2.2"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
db.record_signal(dict(
|
||||
peer_fingerprint="peer-b",
|
||||
signal_type="case",
|
||||
signal_id=case.case_id,
|
||||
signal_hash="dummyhash2",
|
||||
received_at=datetime.now(timezone.utc).isoformat(),
|
||||
raw_json="{}",
|
||||
))
|
||||
respond.propose_for_case(case)
|
||||
|
||||
pulse.set_respond_require_quorum(True)
|
||||
pulse.set_respond_local_only(False)
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
|
||||
pulse._run_respond()
|
||||
assert len(fired) >= 1
|
||||
|
||||
|
||||
def test_quorum_off_fires_federation_case(fresh_db, fired, quorum_no):
|
||||
"""With quorum gating disabled entirely, federation cases fire too."""
|
||||
case = make_case(feed="urlhaus", ips=["3.3.3.3"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
db.record_signal(dict(
|
||||
peer_fingerprint="peer-c",
|
||||
signal_type="case",
|
||||
signal_id=case.case_id,
|
||||
signal_hash="dummyhash3",
|
||||
received_at=datetime.now(timezone.utc).isoformat(),
|
||||
raw_json="{}",
|
||||
))
|
||||
respond.propose_for_case(case)
|
||||
|
||||
pulse.set_respond_require_quorum(False)
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
|
||||
pulse._run_respond()
|
||||
assert len(fired) >= 1
|
||||
|
||||
|
||||
# ----- kill switch -----------------------------------------------------------
|
||||
|
||||
def test_kill_switch_blocks_tick(fresh_db, fired, quorum_yes):
|
||||
"""The parent tick() skips everything when kill switch is armed."""
|
||||
case = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
respond.propose_for_case(case)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
pulse.set_kill_switch(True)
|
||||
results = pulse.tick()
|
||||
assert all(o == "skipped" for _, o, _ in results)
|
||||
assert fired == []
|
||||
|
||||
|
||||
# ----- audit -----------------------------------------------------------------
|
||||
|
||||
def test_pulse_audit_records_fire_and_skip(fresh_db, fired, quorum_no):
|
||||
# Local case → should fire and audit auto-fire
|
||||
local = make_case(feed="urlhaus", ips=["10.0.0.1"], severity=Severity.HIGH, age_days=1)
|
||||
db.upsert_case(local)
|
||||
respond.propose_for_case(local)
|
||||
|
||||
# Federation-sourced case w/o quorum → should skip and audit skip
|
||||
fedcase = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.HIGH, age_days=2)
|
||||
db.upsert_case(fedcase)
|
||||
db.record_signal(dict(
|
||||
peer_fingerprint="peer-x",
|
||||
signal_type="case",
|
||||
signal_id=fedcase.case_id,
|
||||
signal_hash="xhash",
|
||||
received_at=datetime.now(timezone.utc).isoformat(),
|
||||
raw_json="{}",
|
||||
))
|
||||
respond.propose_for_case(fedcase)
|
||||
|
||||
pulse.set_respond_require_quorum(True)
|
||||
pulse.set_respond_local_only(False)
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
pulse._run_respond()
|
||||
|
||||
audit = db.pulse_audit_recent("respond", limit=20)
|
||||
actions = {r["action"] for r in audit}
|
||||
assert "auto-fire" in actions
|
||||
assert "skipped" in actions
|
||||
|
||||
|
||||
def test_audit_count_since(fresh_db, fired, quorum_no):
|
||||
case = make_case(feed="urlhaus", ips=["8.8.8.8"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
respond.propose_for_case(case)
|
||||
pulse.set_respond_require_quorum(True)
|
||||
pulse.set_respond_auto_threshold(Severity.HIGH)
|
||||
_set_respond_mode(pulse.PulseMode.AUTO_EXECUTE)
|
||||
pulse._run_respond()
|
||||
from datetime import timedelta
|
||||
since = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
|
||||
assert db.pulse_audit_count_since("respond", "auto-fire", since) >= 1
|
||||
|
||||
|
||||
# ----- config round-trip -----------------------------------------------------
|
||||
|
||||
def test_config_round_trips(fresh_db):
|
||||
assert pulse.respond_auto_threshold() == Severity.HIGH
|
||||
assert pulse.respond_require_quorum() is True
|
||||
assert pulse.respond_local_only() is False
|
||||
|
||||
pulse.set_respond_auto_threshold(Severity.CRITICAL)
|
||||
pulse.set_respond_require_quorum(False)
|
||||
pulse.set_respond_local_only(True)
|
||||
|
||||
assert pulse.respond_auto_threshold() == Severity.CRITICAL
|
||||
assert pulse.respond_require_quorum() is False
|
||||
assert pulse.respond_local_only() is True
|
||||
Reference in New Issue
Block a user