stage-33a pulse: scheduler module with pipeline registry

This commit is contained in:
m17hr1l
2026-06-06 16:03:22 +02:00
parent 6356c5535b
commit e710be6ebd

380
src/psyc/lines/pulse.py Normal file
View File

@@ -0,0 +1,380 @@
"""Pulseline — cron-style scheduler that drives every psyc pipeline on a cadence.
Each registered pipeline has an autonomy mode (manual / auto-propose /
auto-execute) and a cadence in seconds. tick() iterates every pipeline and
fires whichever ones are due (and enabled, and not manual, and the global kill
switch is off). State persists to SQLite via psyc.db so cadences survive
restarts. A background asyncio loop calls tick() at a fixed interval — the
cockpit lifespan attaches it.
NOTE: federation pipelines (peer-pull, vouch-refresh) are wired as placeholders
that return a no-op string. Real federation lands in a later stage.
"""
from __future__ import annotations
import asyncio
import traceback
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
from psyc import db, log
_log = log.get(__name__)
class PulseMode(str, Enum):
MANUAL = "manual"
AUTO_PROPOSE = "auto-propose"
AUTO_EXECUTE = "auto-execute"
class Pipeline(BaseModel):
name: str
title: str
description: str
mode: PulseMode
cadence_seconds: int
enabled: bool = True
last_fired: Optional[datetime] = None
next_fire: Optional[datetime] = None
last_result: str = ""
last_outcome: str = "" # "ok" | "err" | "skipped" | ""
# ---------- pipeline runners --------------------------------------------------
def _run_fetch() -> str:
"""Fetch every enabled scout source; partial fetch is fine.
Skip-on-fail is critical: keyed feeds (threatfox, malware-bazaar, otx)
raise when their key isn't configured — we don't want one missing key to
block the public ones.
"""
from psyc.lines import scout
plan: Tuple[Tuple[str, Optional[int]], ...] = (
("urlhaus", 50),
("cisa-kev", 100),
("feodo", 50),
("threatfox", 200),
("malware-bazaar", 100),
("otx", 100),
)
total = 0
feeds_ok = 0
feeds_err: List[str] = []
for source, limit in plan:
try:
cases = scout.fetch_and_signal(source, limit=limit)
for c in cases:
db.upsert_case(c)
total += len(cases)
feeds_ok += 1
except Exception as exc: # noqa: BLE001 — partial fetch is the point
feeds_err.append(source)
_log.info("pulse.fetch.skip", source=source, error=str(exc)[:200])
tail = f" (skipped: {', '.join(feeds_err)})" if feeds_err else ""
return f"fetched {total} cases across {feeds_ok} feed(s){tail}"
def _run_classify() -> str:
from psyc.lines import classify
cases = db.list_cases(limit=10_000)
n = 0
for c in cases:
classify.classify(c)
db.upsert_case(c)
n += 1
return f"classified {n} case(s)"
def _run_prove() -> str:
from psyc.lines import proof
cases = db.list_cases(limit=10_000)
n = 0
for c in cases:
proof.prove(c)
db.upsert_case(c)
n += 1
return f"proved {n} case(s)"
def _run_reindex() -> str:
from psyc.lines import lookup
cases = db.list_cases(limit=1_000_000)
written = lookup.reindex(cases)
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
def _run_respond_propose() -> str:
"""Propose response actions for high-severity cases that don't yet have any.
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
helper is already idempotent per case (skips cases that already have
actions), so re-running this on a tick is safe.
"""
from psyc.lines import respond
cases = db.list_cases(limit=10_000)
proposed = 0
touched = 0
for c in cases:
ids = respond.propose_for_case(c)
if ids:
proposed += len(ids)
touched += 1
return f"proposed {proposed} action(s) for {touched} case(s)"
def _run_peer_pull() -> str:
return "federation not yet active"
def _run_vouch_refresh() -> str:
return "federation not yet active"
# ---------- registry ----------------------------------------------------------
_REGISTRY: Dict[str, Callable[[], str]] = {
"fetch": _run_fetch,
"classify": _run_classify,
"prove": _run_prove,
"reindex": _run_reindex,
"respond": _run_respond_propose,
"peer-pull": _run_peer_pull,
"vouch-refresh": _run_vouch_refresh,
}
# Initial defaults — seeded once on first DB init. Tuples of
# (name, title, description, mode, cadence_seconds, enabled).
_DEFAULTS: Tuple[Tuple[str, str, str, PulseMode, int, bool], ...] = (
("fetch", "Scout · fetch feeds", "Pull every configured threat feed and ingest new cases.",
PulseMode.AUTO_EXECUTE, 900, True),
("classify", "Classify · label cases", "Assign incident type, severity, TLP, and internal class to every case.",
PulseMode.AUTO_EXECUTE, 300, True),
("prove", "Proof · score confidence", "Compute confidence (reliability · credibility · freshness) for every case.",
PulseMode.AUTO_EXECUTE, 300, True),
("reindex", "Lookup · rebuild IOC index","Rebuild the IOC reverse-index over the case corpus.",
PulseMode.AUTO_EXECUTE, 3600, True),
("respond", "Respond · propose actions", "Propose human-gated response actions for newly high-severity cases.",
PulseMode.AUTO_PROPOSE, 600, True),
("peer-pull", "Federation · peer pull", "(placeholder) Pull sealed cases from federated peers.",
PulseMode.MANUAL, 600, False),
("vouch-refresh","Federation · vouch refresh","(placeholder) Refresh per-peer vouching ledgers.",
PulseMode.MANUAL, 3600, False),
)
# ---------- helpers -----------------------------------------------------------
def _now() -> datetime:
return datetime.now(timezone.utc)
def _parse_dt(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def _row_to_pipeline(row: dict) -> Pipeline:
return Pipeline(
name=row["name"],
title=row["title"],
description=row["description"],
mode=PulseMode(row["mode"]),
cadence_seconds=int(row["cadence_seconds"]),
enabled=bool(row["enabled"]),
last_fired=_parse_dt(row.get("last_fired")),
next_fire=_parse_dt(row.get("next_fire")),
last_result=row.get("last_result") or "",
last_outcome=row.get("last_outcome") or "",
)
def _pipeline_to_row(p: Pipeline) -> dict:
return dict(
name=p.name,
title=p.title,
description=p.description,
mode=p.mode.value,
cadence_seconds=int(p.cadence_seconds),
enabled=bool(p.enabled),
last_fired=p.last_fired.isoformat() if p.last_fired else None,
next_fire=p.next_fire.isoformat() if p.next_fire else None,
last_result=p.last_result or "",
last_outcome=p.last_outcome or "",
)
def seed_defaults() -> None:
"""Insert any default pipelines that aren't already in the DB. Idempotent."""
existing = {row["name"] for row in db.get_pulse_state()}
for name, title, desc, mode, cadence, enabled in _DEFAULTS:
if name in existing:
continue
p = Pipeline(
name=name, title=title, description=desc,
mode=mode, cadence_seconds=cadence, enabled=enabled,
next_fire=_now(), # first tick after install fires immediately if due
)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
_log.info("pulse.defaults.seeded", count=len(_DEFAULTS))
def state() -> List[Pipeline]:
"""Every pipeline, ordered by name. Seeds defaults on the first call."""
rows = db.get_pulse_state()
if not rows:
seed_defaults()
rows = db.get_pulse_state()
return [_row_to_pipeline(r) for r in rows]
def _get_pipeline(name: str) -> Optional[Pipeline]:
for p in state():
if p.name == name:
return p
return None
def set_mode(name: str, mode: PulseMode) -> None:
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.mode = mode
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_cadence(name: str, seconds: int) -> None:
if seconds <= 0:
raise ValueError("cadence must be > 0 seconds")
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.cadence_seconds = int(seconds)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_enabled(name: str, enabled: bool) -> None:
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.enabled = bool(enabled)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_kill_switch(armed: bool) -> None:
db.kill_switch_set(armed)
_log.warning("pulse.killswitch.changed", armed=bool(armed))
def kill_switch_state() -> bool:
return db.kill_switch_get()
# ---------- the heartbeat -----------------------------------------------------
def _fire(p: Pipeline) -> Tuple[str, str]:
"""Run one pipeline. Returns (outcome, result_str). Persists the timestamps.
Outcome is "ok" if the runner returned, "err" if it raised.
"""
runner = _REGISTRY.get(p.name)
if runner is None:
outcome = "err"
result = f"no runner registered for '{p.name}'"
else:
try:
result = runner() or ""
outcome = "ok"
except Exception as exc: # noqa: BLE001 — log + record, don't crash the loop
outcome = "err"
result = f"{type(exc).__name__}: {exc}"
_log.warning("pulse.fire.error", name=p.name, error=result, trace=traceback.format_exc())
now = _now()
p.last_fired = now
p.next_fire = now + timedelta(seconds=max(1, p.cadence_seconds))
p.last_result = result[:500]
p.last_outcome = outcome
db.upsert_pulse_pipeline(_pipeline_to_row(p))
_log.info("pulse.fired", name=p.name, outcome=outcome, result=p.last_result)
return outcome, p.last_result
def _should_fire(p: Pipeline, now: datetime) -> bool:
if not p.enabled:
return False
if p.mode == PulseMode.MANUAL:
return False
if p.next_fire is None:
return True
return now >= p.next_fire
def tick() -> List[Tuple[str, str, str]]:
"""Single scheduler heartbeat. Returns (name, outcome, result_str) per pipeline.
Outcome is "ok" / "err" / "skipped" — every registered pipeline appears in
the return value so callers can see what was skipped and why.
"""
if kill_switch_state():
_log.info("pulse.tick.killed")
return [(p.name, "skipped", "kill switch armed") for p in state()]
now = _now()
out: List[Tuple[str, str, str]] = []
for p in state():
if not _should_fire(p, now):
out.append((p.name, "skipped", "not due"))
continue
outcome, result = _fire(p)
out.append((p.name, outcome, result))
return out
def run_now(name: str) -> Tuple[str, str]:
"""Manually fire one pipeline, bypassing cadence and mode. Honors kill switch.
Returns (outcome, result_str). Raises ValueError on unknown name.
"""
if kill_switch_state():
return ("skipped", "kill switch armed")
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
return _fire(p)
# ---------- background loop ---------------------------------------------------
async def start_background_loop(interval_seconds: int = 30) -> None:
"""Long-running scheduler — calls tick() every interval. Launched from FastAPI lifespan.
Designed to run for the life of the process; cancellation is the normal stop signal.
"""
_log.info("pulse.loop.starting", interval=interval_seconds)
while True:
try:
tick()
except Exception as exc: # noqa: BLE001 — one bad tick must not kill the loop
_log.warning("pulse.loop.tick.error", error=str(exc), trace=traceback.format_exc())
try:
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
_log.info("pulse.loop.cancelled")
raise