diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py new file mode 100644 index 0000000..10473ba --- /dev/null +++ b/src/psyc/lines/pulse.py @@ -0,0 +1,380 @@ +"""Pulseline — cron-style scheduler that drives every psyc pipeline on a cadence. + +Each registered pipeline has an autonomy mode (manual / auto-propose / +auto-execute) and a cadence in seconds. tick() iterates every pipeline and +fires whichever ones are due (and enabled, and not manual, and the global kill +switch is off). State persists to SQLite via psyc.db so cadences survive +restarts. A background asyncio loop calls tick() at a fixed interval — the +cockpit lifespan attaches it. + +NOTE: federation pipelines (peer-pull, vouch-refresh) are wired as placeholders +that return a no-op string. Real federation lands in a later stage. +""" + +from __future__ import annotations + +import asyncio +import traceback +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Callable, Dict, List, Optional, Tuple + +from pydantic import BaseModel, Field + +from psyc import db, log + + +_log = log.get(__name__) + + +class PulseMode(str, Enum): + MANUAL = "manual" + AUTO_PROPOSE = "auto-propose" + AUTO_EXECUTE = "auto-execute" + + +class Pipeline(BaseModel): + name: str + title: str + description: str + mode: PulseMode + cadence_seconds: int + enabled: bool = True + last_fired: Optional[datetime] = None + next_fire: Optional[datetime] = None + last_result: str = "" + last_outcome: str = "" # "ok" | "err" | "skipped" | "" + + +# ---------- pipeline runners -------------------------------------------------- + +def _run_fetch() -> str: + """Fetch every enabled scout source; partial fetch is fine. + + Skip-on-fail is critical: keyed feeds (threatfox, malware-bazaar, otx) + raise when their key isn't configured — we don't want one missing key to + block the public ones. + """ + from psyc.lines import scout + + plan: Tuple[Tuple[str, Optional[int]], ...] = ( + ("urlhaus", 50), + ("cisa-kev", 100), + ("feodo", 50), + ("threatfox", 200), + ("malware-bazaar", 100), + ("otx", 100), + ) + total = 0 + feeds_ok = 0 + feeds_err: List[str] = [] + for source, limit in plan: + try: + cases = scout.fetch_and_signal(source, limit=limit) + for c in cases: + db.upsert_case(c) + total += len(cases) + feeds_ok += 1 + except Exception as exc: # noqa: BLE001 — partial fetch is the point + feeds_err.append(source) + _log.info("pulse.fetch.skip", source=source, error=str(exc)[:200]) + tail = f" (skipped: {', '.join(feeds_err)})" if feeds_err else "" + return f"fetched {total} cases across {feeds_ok} feed(s){tail}" + + +def _run_classify() -> str: + from psyc.lines import classify + + cases = db.list_cases(limit=10_000) + n = 0 + for c in cases: + classify.classify(c) + db.upsert_case(c) + n += 1 + return f"classified {n} case(s)" + + +def _run_prove() -> str: + from psyc.lines import proof + + cases = db.list_cases(limit=10_000) + n = 0 + for c in cases: + proof.prove(c) + db.upsert_case(c) + n += 1 + return f"proved {n} case(s)" + + +def _run_reindex() -> str: + from psyc.lines import lookup + + cases = db.list_cases(limit=1_000_000) + written = lookup.reindex(cases) + return f"indexed {written} IOC(s) from {len(cases)} case(s)" + + +def _run_respond_propose() -> str: + """Propose response actions for high-severity cases that don't yet have any. + + Strictly proposal-only: nothing is dispatched. The respond.propose_for_case + helper is already idempotent per case (skips cases that already have + actions), so re-running this on a tick is safe. + """ + from psyc.lines import respond + + cases = db.list_cases(limit=10_000) + proposed = 0 + touched = 0 + for c in cases: + ids = respond.propose_for_case(c) + if ids: + proposed += len(ids) + touched += 1 + return f"proposed {proposed} action(s) for {touched} case(s)" + + +def _run_peer_pull() -> str: + return "federation not yet active" + + +def _run_vouch_refresh() -> str: + return "federation not yet active" + + +# ---------- registry ---------------------------------------------------------- + +_REGISTRY: Dict[str, Callable[[], str]] = { + "fetch": _run_fetch, + "classify": _run_classify, + "prove": _run_prove, + "reindex": _run_reindex, + "respond": _run_respond_propose, + "peer-pull": _run_peer_pull, + "vouch-refresh": _run_vouch_refresh, +} + + +# Initial defaults — seeded once on first DB init. Tuples of +# (name, title, description, mode, cadence_seconds, enabled). +_DEFAULTS: Tuple[Tuple[str, str, str, PulseMode, int, bool], ...] = ( + ("fetch", "Scout · fetch feeds", "Pull every configured threat feed and ingest new cases.", + PulseMode.AUTO_EXECUTE, 900, True), + ("classify", "Classify · label cases", "Assign incident type, severity, TLP, and internal class to every case.", + PulseMode.AUTO_EXECUTE, 300, True), + ("prove", "Proof · score confidence", "Compute confidence (reliability · credibility · freshness) for every case.", + PulseMode.AUTO_EXECUTE, 300, True), + ("reindex", "Lookup · rebuild IOC index","Rebuild the IOC reverse-index over the case corpus.", + PulseMode.AUTO_EXECUTE, 3600, True), + ("respond", "Respond · propose actions", "Propose human-gated response actions for newly high-severity cases.", + PulseMode.AUTO_PROPOSE, 600, True), + ("peer-pull", "Federation · peer pull", "(placeholder) Pull sealed cases from federated peers.", + PulseMode.MANUAL, 600, False), + ("vouch-refresh","Federation · vouch refresh","(placeholder) Refresh per-peer vouching ledgers.", + PulseMode.MANUAL, 3600, False), +) + + +# ---------- helpers ----------------------------------------------------------- + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +def _parse_dt(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + return datetime.fromisoformat(value) + except ValueError: + return None + + +def _row_to_pipeline(row: dict) -> Pipeline: + return Pipeline( + name=row["name"], + title=row["title"], + description=row["description"], + mode=PulseMode(row["mode"]), + cadence_seconds=int(row["cadence_seconds"]), + enabled=bool(row["enabled"]), + last_fired=_parse_dt(row.get("last_fired")), + next_fire=_parse_dt(row.get("next_fire")), + last_result=row.get("last_result") or "", + last_outcome=row.get("last_outcome") or "", + ) + + +def _pipeline_to_row(p: Pipeline) -> dict: + return dict( + name=p.name, + title=p.title, + description=p.description, + mode=p.mode.value, + cadence_seconds=int(p.cadence_seconds), + enabled=bool(p.enabled), + last_fired=p.last_fired.isoformat() if p.last_fired else None, + next_fire=p.next_fire.isoformat() if p.next_fire else None, + last_result=p.last_result or "", + last_outcome=p.last_outcome or "", + ) + + +def seed_defaults() -> None: + """Insert any default pipelines that aren't already in the DB. Idempotent.""" + existing = {row["name"] for row in db.get_pulse_state()} + for name, title, desc, mode, cadence, enabled in _DEFAULTS: + if name in existing: + continue + p = Pipeline( + name=name, title=title, description=desc, + mode=mode, cadence_seconds=cadence, enabled=enabled, + next_fire=_now(), # first tick after install fires immediately if due + ) + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + _log.info("pulse.defaults.seeded", count=len(_DEFAULTS)) + + +def state() -> List[Pipeline]: + """Every pipeline, ordered by name. Seeds defaults on the first call.""" + rows = db.get_pulse_state() + if not rows: + seed_defaults() + rows = db.get_pulse_state() + return [_row_to_pipeline(r) for r in rows] + + +def _get_pipeline(name: str) -> Optional[Pipeline]: + for p in state(): + if p.name == name: + return p + return None + + +def set_mode(name: str, mode: PulseMode) -> None: + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + p.mode = mode + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + + +def set_cadence(name: str, seconds: int) -> None: + if seconds <= 0: + raise ValueError("cadence must be > 0 seconds") + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + p.cadence_seconds = int(seconds) + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + + +def set_enabled(name: str, enabled: bool) -> None: + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + p.enabled = bool(enabled) + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + + +def set_kill_switch(armed: bool) -> None: + db.kill_switch_set(armed) + _log.warning("pulse.killswitch.changed", armed=bool(armed)) + + +def kill_switch_state() -> bool: + return db.kill_switch_get() + + +# ---------- the heartbeat ----------------------------------------------------- + +def _fire(p: Pipeline) -> Tuple[str, str]: + """Run one pipeline. Returns (outcome, result_str). Persists the timestamps. + + Outcome is "ok" if the runner returned, "err" if it raised. + """ + runner = _REGISTRY.get(p.name) + if runner is None: + outcome = "err" + result = f"no runner registered for '{p.name}'" + else: + try: + result = runner() or "" + outcome = "ok" + except Exception as exc: # noqa: BLE001 — log + record, don't crash the loop + outcome = "err" + result = f"{type(exc).__name__}: {exc}" + _log.warning("pulse.fire.error", name=p.name, error=result, trace=traceback.format_exc()) + + now = _now() + p.last_fired = now + p.next_fire = now + timedelta(seconds=max(1, p.cadence_seconds)) + p.last_result = result[:500] + p.last_outcome = outcome + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + _log.info("pulse.fired", name=p.name, outcome=outcome, result=p.last_result) + return outcome, p.last_result + + +def _should_fire(p: Pipeline, now: datetime) -> bool: + if not p.enabled: + return False + if p.mode == PulseMode.MANUAL: + return False + if p.next_fire is None: + return True + return now >= p.next_fire + + +def tick() -> List[Tuple[str, str, str]]: + """Single scheduler heartbeat. Returns (name, outcome, result_str) per pipeline. + + Outcome is "ok" / "err" / "skipped" — every registered pipeline appears in + the return value so callers can see what was skipped and why. + """ + if kill_switch_state(): + _log.info("pulse.tick.killed") + return [(p.name, "skipped", "kill switch armed") for p in state()] + + now = _now() + out: List[Tuple[str, str, str]] = [] + for p in state(): + if not _should_fire(p, now): + out.append((p.name, "skipped", "not due")) + continue + outcome, result = _fire(p) + out.append((p.name, outcome, result)) + return out + + +def run_now(name: str) -> Tuple[str, str]: + """Manually fire one pipeline, bypassing cadence and mode. Honors kill switch. + + Returns (outcome, result_str). Raises ValueError on unknown name. + """ + if kill_switch_state(): + return ("skipped", "kill switch armed") + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + return _fire(p) + + +# ---------- background loop --------------------------------------------------- + +async def start_background_loop(interval_seconds: int = 30) -> None: + """Long-running scheduler — calls tick() every interval. Launched from FastAPI lifespan. + + Designed to run for the life of the process; cancellation is the normal stop signal. + """ + _log.info("pulse.loop.starting", interval=interval_seconds) + while True: + try: + tick() + except Exception as exc: # noqa: BLE001 — one bad tick must not kill the loop + _log.warning("pulse.loop.tick.error", error=str(exc), trace=traceback.format_exc()) + try: + await asyncio.sleep(interval_seconds) + except asyncio.CancelledError: + _log.info("pulse.loop.cancelled") + raise