diff --git a/src/psyc/_pulse_cli.py b/src/psyc/_pulse_cli.py new file mode 100644 index 0000000..97fee7b --- /dev/null +++ b/src/psyc/_pulse_cli.py @@ -0,0 +1,122 @@ +"""Typer commands for the Pulse scheduler. + +Imported and wired by `cli.py` via `register(app)`. Kept as a separate module +so the main CLI surface stays grep-friendly and the scheduler can grow its own +verbs without bloating cli.py. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Optional + +import typer + +from psyc import db +from psyc.lines import pulse + + +def _relative(dt: Optional[datetime]) -> str: + if dt is None: + return "—" + now = datetime.now(timezone.utc) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + delta = int((dt - now).total_seconds()) + past = delta < 0 + secs = abs(delta) + if secs < 5: + return "now" + if secs < 60: + unit = f"{secs}s" + elif secs < 3600: + unit = f"{secs // 60}m" + elif secs < 86400: + unit = f"{secs // 3600}h" + else: + unit = f"{secs // 86400}d" + return f"{unit} ago" if past else f"in {unit}" + + +def register(typer_app: typer.Typer) -> None: + """Add pulse-* commands to the given Typer app.""" + + @typer_app.command("pulse-status") + def pulse_status() -> None: + """Print the pipeline table (mode · cadence · last-fired · next-fire · last-result).""" + db.init_db() + ks = pulse.kill_switch_state() + typer.echo(f"kill switch: {'ARMED' if ks else 'OFF'}") + rows = pulse.state() + if not rows: + typer.echo("(no pipelines registered)") + return + typer.echo(f"{'name':<16} {'mode':<14} {'cadence':>8} {'enabled':<7} {'last':<10} {'next':<10} result") + for p in rows: + typer.echo( + f"{p.name:<16} {p.mode.value:<14} {p.cadence_seconds:>6}s " + f"{'yes' if p.enabled else 'no':<7} " + f"{_relative(p.last_fired):<10} {_relative(p.next_fire):<10} " + f"{(p.last_result or '')[:60]}" + ) + + @typer_app.command("pulse-tick") + def pulse_tick() -> None: + """Run one scheduler heartbeat and print the per-pipeline outcomes.""" + db.init_db() + results = pulse.tick() + for name, outcome, result in results: + marker = {"ok": "✓", "err": "✗", "skipped": "⊘"}.get(outcome, "·") + typer.echo(f" {marker} {name:<16} {outcome:<8} {result[:120]}") + + @typer_app.command("pulse-set-mode") + def pulse_set_mode( + name: str = typer.Argument(..., help="pipeline name"), + mode: str = typer.Argument(..., help="manual | auto-propose | auto-execute"), + ) -> None: + db.init_db() + try: + pulse.set_mode(name, pulse.PulseMode(mode)) + except ValueError as exc: + typer.echo(f"error: {exc}", err=True) + raise typer.Exit(1) + typer.echo(f"{name} mode → {mode}") + + @typer_app.command("pulse-set-cadence") + def pulse_set_cadence( + name: str = typer.Argument(..., help="pipeline name"), + seconds: int = typer.Argument(..., help="cadence in seconds (>0)"), + ) -> None: + db.init_db() + try: + pulse.set_cadence(name, seconds) + except ValueError as exc: + typer.echo(f"error: {exc}", err=True) + raise typer.Exit(1) + typer.echo(f"{name} cadence → {seconds}s") + + @typer_app.command("pulse-run") + def pulse_run(name: str = typer.Argument(..., help="pipeline name")) -> None: + """Manually fire one pipeline (bypasses cadence; honors kill switch).""" + db.init_db() + try: + outcome, result = pulse.run_now(name) + except ValueError as exc: + typer.echo(f"error: {exc}", err=True) + raise typer.Exit(1) + marker = {"ok": "✓", "err": "✗", "skipped": "⊘"}.get(outcome, "·") + typer.echo(f" {marker} {name}: {outcome} — {result}") + + @typer_app.command("pulse-kill") + def pulse_kill() -> None: + """Arm the kill switch — every pipeline halts on the next tick.""" + db.init_db() + pulse.set_kill_switch(True) + typer.echo("kill switch ARMED — all pipelines halted") + + @typer_app.command("pulse-unkill") + def pulse_unkill() -> None: + """Disarm the kill switch — pulse resumes on the next tick.""" + db.init_db() + pulse.set_kill_switch(False) + typer.echo("kill switch disarmed — pulse resumes") diff --git a/src/psyc/cockpit/pulse_routes.py b/src/psyc/cockpit/pulse_routes.py new file mode 100644 index 0000000..7291ff6 --- /dev/null +++ b/src/psyc/cockpit/pulse_routes.py @@ -0,0 +1,120 @@ +"""Cockpit routes for the Pulse scheduler — admin-gated. + +The integration is intentionally single-call: `register(app, TEMPLATES)` adds +the routes AND wires the FastAPI startup hook that launches the background +scheduler loop. Caller in app.py just imports + invokes register(). +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from typing import Optional + +from fastapi import FastAPI, Form, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.templating import Jinja2Templates + +from psyc import log +from psyc.lines import pulse + + +_log = log.get(__name__) + +TICK_INTERVAL_SECONDS = 30 + + +def _admin_ok(request: Request) -> bool: + """Mirror of the local helper in app.py — admin session is just session['admin_ok'].""" + return bool(request.session.get("admin_ok")) + + +def _relative(dt: Optional[datetime]) -> str: + """Human-friendly "3m ago" / "in 12m" / "now". None → '—'.""" + if dt is None: + return "—" + now = datetime.now(timezone.utc) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + delta = (dt - now).total_seconds() + past = delta < 0 + secs = abs(int(delta)) + if secs < 5: + return "now" + if secs < 60: + unit = f"{secs}s" + elif secs < 3600: + unit = f"{secs // 60}m" + elif secs < 86400: + unit = f"{secs // 3600}h" + else: + unit = f"{secs // 86400}d" + return f"{unit} ago" if past else f"in {unit}" + + +def register(app: FastAPI, templates: Jinja2Templates) -> None: + """Attach the /admin/pulse routes and the background scheduler loop to `app`.""" + + @app.get("/admin/pulse", response_class=HTMLResponse) + def pulse_view(request: Request) -> HTMLResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + flash = request.query_params.get("flash", "") + pipelines = pulse.state() + return templates.TemplateResponse( + request, + "admin_pulse.html", + { + "pipelines": pipelines, + "kill_switch": pulse.kill_switch_state(), + "tick_interval": TICK_INTERVAL_SECONDS, + "relative": _relative, + "flash": flash, + }, + ) + + @app.post("/admin/pulse/kill") + def pulse_toggle_kill(request: Request) -> RedirectResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + new = not pulse.kill_switch_state() + pulse.set_kill_switch(new) + flash = "kill switch ARMED — all pipelines halted" if new else "kill switch disarmed — pulse resumes" + return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303) + + @app.post("/admin/pulse/{name}/update") + def pulse_update( + request: Request, + name: str, + mode: str = Form(...), + cadence_seconds: int = Form(...), + enabled: Optional[str] = Form(None), + ) -> RedirectResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + try: + pulse.set_mode(name, pulse.PulseMode(mode)) + pulse.set_cadence(name, int(cadence_seconds)) + pulse.set_enabled(name, enabled is not None) + flash = f"updated {name}: mode={mode}, cadence={cadence_seconds}s, enabled={enabled is not None}" + except (ValueError, KeyError) as exc: + _log.warning("pulse.update.error", name=name, error=str(exc)) + flash = f"update failed: {exc}" + return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303) + + @app.post("/admin/pulse/{name}/run") + def pulse_run_now(request: Request, name: str) -> RedirectResponse: + if not _admin_ok(request): + return RedirectResponse("/admin", status_code=303) + try: + outcome, result = pulse.run_now(name) + flash = f"{name} → {outcome}: {result[:120]}" + except ValueError as exc: + flash = f"run failed: {exc}" + return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303) + + @app.on_event("startup") + async def _start_pulse_loop() -> None: + # Fire-and-forget; the loop catches its own exceptions and self-restarts. + asyncio.create_task(pulse.start_background_loop(interval_seconds=TICK_INTERVAL_SECONDS)) + _log.info("pulse.routes.registered", tick=TICK_INTERVAL_SECONDS) diff --git a/src/psyc/cockpit/templates/admin_pulse.html b/src/psyc/cockpit/templates/admin_pulse.html new file mode 100644 index 0000000..f542881 --- /dev/null +++ b/src/psyc/cockpit/templates/admin_pulse.html @@ -0,0 +1,101 @@ +{% extends "base.html" %} +{% block title %}Pulse — psyc admin{% endblock %} +{% block content %} +
+
+

Pulse — autonomous heartbeat

+ {{ pipelines|length }} pipeline{{ '' if pipelines|length == 1 else 's' }} +
+

Cron-style scheduler that drives every psyc line on a cadence without human input. Each pipeline has an autonomy mode and a cadence in seconds. The kill switch halts everything instantly — it overrides cadence, mode, and the enabled flag.

+

← back to admin

+ + {% if flash %} +
{{ flash }}
+ {% endif %} +
+ +
+
+

Global kill switch

+ {{ 'ARMED' if kill_switch else 'OFF' }} +
+ {% if kill_switch %} +
+ ✗ KILL SWITCH ARMED — every pipeline is paused. tick() returns "skipped" for everything. Run-now is also blocked. Toggle off to resume. +
+
+ +
+ {% else %} +
✓ Pulse is live — the background loop ticks every {{ tick_interval }}s.
+
+ +
+ {% endif %} +
+ +
+
+

Pipelines

+ {{ pipelines|selectattr('enabled')|list|length }} enabled +
+

Mode: auto-execute fires the line, auto-propose stages proposals for human approval, manual runs only when you press “Run now”. Cadence is the gap between ticks; the loop wakes up every {{ tick_interval }}s.

+ + + + + + + + + + + + + + {% for p in pipelines %} + + + + + + + + + {% endfor %} + +
PipelineMode · cadence · enabledLast firedNext fireLast result
+ {{ p.title }} +
{{ p.name }} · {{ p.description }}
+
+
+ + + + +
+
+ {% if p.last_fired %}{{ relative(p.last_fired) }}{% else %}—{% endif %} + + {% if p.next_fire %}{{ relative(p.next_fire) }}{% else %}—{% endif %} + + {% if p.last_outcome == 'ok' %} + {% elif p.last_outcome == 'err' %} + {% elif p.last_outcome == 'skipped' %} + {% endif %} + {{ (p.last_result or '—')[:60] }}{% if (p.last_result or '')|length > 60 %}…{% endif %} + +
+ +
+
+
+{% endblock %} diff --git a/src/psyc/db.py b/src/psyc/db.py index 1b12436..273154c 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -114,6 +114,26 @@ Index("iocs_value_idx", iocs.c.value) Index("iocs_type_idx", iocs.c.ioc_type) Index("iocs_case_idx", iocs.c.case_id) +pulse_pipelines = Table( + "pulse_pipelines", _metadata, + Column("name", String, primary_key=True), + Column("title", String, nullable=False), + Column("description", Text, nullable=False, default=""), + Column("mode", String, nullable=False), # manual | auto-propose | auto-execute + Column("cadence_seconds", Integer, nullable=False), + Column("enabled", Boolean, nullable=False, default=True), + Column("last_fired", String, nullable=True), # ISO timestamp or NULL + Column("next_fire", String, nullable=True), # ISO timestamp or NULL + Column("last_result", Text, nullable=False, default=""), + Column("last_outcome", String, nullable=False, default=""), # ok | err | skipped | "" +) + +pulse_settings = Table( + "pulse_settings", _metadata, + Column("key", String, primary_key=True), + Column("value", String, nullable=False), +) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -214,3 +234,53 @@ def ioc_count(db_path: Path = DB_PATH) -> int: stmt = select(func.count()).select_from(iocs) with engine(db_path).connect() as conn: return conn.execute(stmt).scalar_one() + + +# ---------- pulse scheduler ---------------------------------------------- + +def get_pulse_state(db_path: Path = DB_PATH) -> List[dict]: + """Every registered pipeline, ordered by name.""" + stmt = select(pulse_pipelines).order_by(pulse_pipelines.c.name) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def upsert_pulse_pipeline(row: dict, db_path: Path = DB_PATH) -> None: + """Insert or update one pipeline by name.""" + stmt = sqlite_insert(pulse_pipelines).values(**row) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_pipelines.c.name], + set_=dict( + title=stmt.excluded.title, + description=stmt.excluded.description, + mode=stmt.excluded.mode, + cadence_seconds=stmt.excluded.cadence_seconds, + enabled=stmt.excluded.enabled, + last_fired=stmt.excluded.last_fired, + next_fire=stmt.excluded.next_fire, + last_result=stmt.excluded.last_result, + last_outcome=stmt.excluded.last_outcome, + ), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def kill_switch_get(db_path: Path = DB_PATH) -> bool: + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == "kill_switch") + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + if row is None: + return False + return str(row.value) == "1" + + +def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None: + value = "1" if armed else "0" + stmt = sqlite_insert(pulse_settings).values(key="kill_switch", value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py new file mode 100644 index 0000000..10473ba --- /dev/null +++ b/src/psyc/lines/pulse.py @@ -0,0 +1,380 @@ +"""Pulseline — cron-style scheduler that drives every psyc pipeline on a cadence. + +Each registered pipeline has an autonomy mode (manual / auto-propose / +auto-execute) and a cadence in seconds. tick() iterates every pipeline and +fires whichever ones are due (and enabled, and not manual, and the global kill +switch is off). State persists to SQLite via psyc.db so cadences survive +restarts. A background asyncio loop calls tick() at a fixed interval — the +cockpit lifespan attaches it. + +NOTE: federation pipelines (peer-pull, vouch-refresh) are wired as placeholders +that return a no-op string. Real federation lands in a later stage. +""" + +from __future__ import annotations + +import asyncio +import traceback +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Callable, Dict, List, Optional, Tuple + +from pydantic import BaseModel, Field + +from psyc import db, log + + +_log = log.get(__name__) + + +class PulseMode(str, Enum): + MANUAL = "manual" + AUTO_PROPOSE = "auto-propose" + AUTO_EXECUTE = "auto-execute" + + +class Pipeline(BaseModel): + name: str + title: str + description: str + mode: PulseMode + cadence_seconds: int + enabled: bool = True + last_fired: Optional[datetime] = None + next_fire: Optional[datetime] = None + last_result: str = "" + last_outcome: str = "" # "ok" | "err" | "skipped" | "" + + +# ---------- pipeline runners -------------------------------------------------- + +def _run_fetch() -> str: + """Fetch every enabled scout source; partial fetch is fine. + + Skip-on-fail is critical: keyed feeds (threatfox, malware-bazaar, otx) + raise when their key isn't configured — we don't want one missing key to + block the public ones. + """ + from psyc.lines import scout + + plan: Tuple[Tuple[str, Optional[int]], ...] = ( + ("urlhaus", 50), + ("cisa-kev", 100), + ("feodo", 50), + ("threatfox", 200), + ("malware-bazaar", 100), + ("otx", 100), + ) + total = 0 + feeds_ok = 0 + feeds_err: List[str] = [] + for source, limit in plan: + try: + cases = scout.fetch_and_signal(source, limit=limit) + for c in cases: + db.upsert_case(c) + total += len(cases) + feeds_ok += 1 + except Exception as exc: # noqa: BLE001 — partial fetch is the point + feeds_err.append(source) + _log.info("pulse.fetch.skip", source=source, error=str(exc)[:200]) + tail = f" (skipped: {', '.join(feeds_err)})" if feeds_err else "" + return f"fetched {total} cases across {feeds_ok} feed(s){tail}" + + +def _run_classify() -> str: + from psyc.lines import classify + + cases = db.list_cases(limit=10_000) + n = 0 + for c in cases: + classify.classify(c) + db.upsert_case(c) + n += 1 + return f"classified {n} case(s)" + + +def _run_prove() -> str: + from psyc.lines import proof + + cases = db.list_cases(limit=10_000) + n = 0 + for c in cases: + proof.prove(c) + db.upsert_case(c) + n += 1 + return f"proved {n} case(s)" + + +def _run_reindex() -> str: + from psyc.lines import lookup + + cases = db.list_cases(limit=1_000_000) + written = lookup.reindex(cases) + return f"indexed {written} IOC(s) from {len(cases)} case(s)" + + +def _run_respond_propose() -> str: + """Propose response actions for high-severity cases that don't yet have any. + + Strictly proposal-only: nothing is dispatched. The respond.propose_for_case + helper is already idempotent per case (skips cases that already have + actions), so re-running this on a tick is safe. + """ + from psyc.lines import respond + + cases = db.list_cases(limit=10_000) + proposed = 0 + touched = 0 + for c in cases: + ids = respond.propose_for_case(c) + if ids: + proposed += len(ids) + touched += 1 + return f"proposed {proposed} action(s) for {touched} case(s)" + + +def _run_peer_pull() -> str: + return "federation not yet active" + + +def _run_vouch_refresh() -> str: + return "federation not yet active" + + +# ---------- registry ---------------------------------------------------------- + +_REGISTRY: Dict[str, Callable[[], str]] = { + "fetch": _run_fetch, + "classify": _run_classify, + "prove": _run_prove, + "reindex": _run_reindex, + "respond": _run_respond_propose, + "peer-pull": _run_peer_pull, + "vouch-refresh": _run_vouch_refresh, +} + + +# Initial defaults — seeded once on first DB init. Tuples of +# (name, title, description, mode, cadence_seconds, enabled). +_DEFAULTS: Tuple[Tuple[str, str, str, PulseMode, int, bool], ...] = ( + ("fetch", "Scout · fetch feeds", "Pull every configured threat feed and ingest new cases.", + PulseMode.AUTO_EXECUTE, 900, True), + ("classify", "Classify · label cases", "Assign incident type, severity, TLP, and internal class to every case.", + PulseMode.AUTO_EXECUTE, 300, True), + ("prove", "Proof · score confidence", "Compute confidence (reliability · credibility · freshness) for every case.", + PulseMode.AUTO_EXECUTE, 300, True), + ("reindex", "Lookup · rebuild IOC index","Rebuild the IOC reverse-index over the case corpus.", + PulseMode.AUTO_EXECUTE, 3600, True), + ("respond", "Respond · propose actions", "Propose human-gated response actions for newly high-severity cases.", + PulseMode.AUTO_PROPOSE, 600, True), + ("peer-pull", "Federation · peer pull", "(placeholder) Pull sealed cases from federated peers.", + PulseMode.MANUAL, 600, False), + ("vouch-refresh","Federation · vouch refresh","(placeholder) Refresh per-peer vouching ledgers.", + PulseMode.MANUAL, 3600, False), +) + + +# ---------- helpers ----------------------------------------------------------- + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +def _parse_dt(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + return datetime.fromisoformat(value) + except ValueError: + return None + + +def _row_to_pipeline(row: dict) -> Pipeline: + return Pipeline( + name=row["name"], + title=row["title"], + description=row["description"], + mode=PulseMode(row["mode"]), + cadence_seconds=int(row["cadence_seconds"]), + enabled=bool(row["enabled"]), + last_fired=_parse_dt(row.get("last_fired")), + next_fire=_parse_dt(row.get("next_fire")), + last_result=row.get("last_result") or "", + last_outcome=row.get("last_outcome") or "", + ) + + +def _pipeline_to_row(p: Pipeline) -> dict: + return dict( + name=p.name, + title=p.title, + description=p.description, + mode=p.mode.value, + cadence_seconds=int(p.cadence_seconds), + enabled=bool(p.enabled), + last_fired=p.last_fired.isoformat() if p.last_fired else None, + next_fire=p.next_fire.isoformat() if p.next_fire else None, + last_result=p.last_result or "", + last_outcome=p.last_outcome or "", + ) + + +def seed_defaults() -> None: + """Insert any default pipelines that aren't already in the DB. Idempotent.""" + existing = {row["name"] for row in db.get_pulse_state()} + for name, title, desc, mode, cadence, enabled in _DEFAULTS: + if name in existing: + continue + p = Pipeline( + name=name, title=title, description=desc, + mode=mode, cadence_seconds=cadence, enabled=enabled, + next_fire=_now(), # first tick after install fires immediately if due + ) + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + _log.info("pulse.defaults.seeded", count=len(_DEFAULTS)) + + +def state() -> List[Pipeline]: + """Every pipeline, ordered by name. Seeds defaults on the first call.""" + rows = db.get_pulse_state() + if not rows: + seed_defaults() + rows = db.get_pulse_state() + return [_row_to_pipeline(r) for r in rows] + + +def _get_pipeline(name: str) -> Optional[Pipeline]: + for p in state(): + if p.name == name: + return p + return None + + +def set_mode(name: str, mode: PulseMode) -> None: + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + p.mode = mode + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + + +def set_cadence(name: str, seconds: int) -> None: + if seconds <= 0: + raise ValueError("cadence must be > 0 seconds") + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + p.cadence_seconds = int(seconds) + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + + +def set_enabled(name: str, enabled: bool) -> None: + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + p.enabled = bool(enabled) + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + + +def set_kill_switch(armed: bool) -> None: + db.kill_switch_set(armed) + _log.warning("pulse.killswitch.changed", armed=bool(armed)) + + +def kill_switch_state() -> bool: + return db.kill_switch_get() + + +# ---------- the heartbeat ----------------------------------------------------- + +def _fire(p: Pipeline) -> Tuple[str, str]: + """Run one pipeline. Returns (outcome, result_str). Persists the timestamps. + + Outcome is "ok" if the runner returned, "err" if it raised. + """ + runner = _REGISTRY.get(p.name) + if runner is None: + outcome = "err" + result = f"no runner registered for '{p.name}'" + else: + try: + result = runner() or "" + outcome = "ok" + except Exception as exc: # noqa: BLE001 — log + record, don't crash the loop + outcome = "err" + result = f"{type(exc).__name__}: {exc}" + _log.warning("pulse.fire.error", name=p.name, error=result, trace=traceback.format_exc()) + + now = _now() + p.last_fired = now + p.next_fire = now + timedelta(seconds=max(1, p.cadence_seconds)) + p.last_result = result[:500] + p.last_outcome = outcome + db.upsert_pulse_pipeline(_pipeline_to_row(p)) + _log.info("pulse.fired", name=p.name, outcome=outcome, result=p.last_result) + return outcome, p.last_result + + +def _should_fire(p: Pipeline, now: datetime) -> bool: + if not p.enabled: + return False + if p.mode == PulseMode.MANUAL: + return False + if p.next_fire is None: + return True + return now >= p.next_fire + + +def tick() -> List[Tuple[str, str, str]]: + """Single scheduler heartbeat. Returns (name, outcome, result_str) per pipeline. + + Outcome is "ok" / "err" / "skipped" — every registered pipeline appears in + the return value so callers can see what was skipped and why. + """ + if kill_switch_state(): + _log.info("pulse.tick.killed") + return [(p.name, "skipped", "kill switch armed") for p in state()] + + now = _now() + out: List[Tuple[str, str, str]] = [] + for p in state(): + if not _should_fire(p, now): + out.append((p.name, "skipped", "not due")) + continue + outcome, result = _fire(p) + out.append((p.name, outcome, result)) + return out + + +def run_now(name: str) -> Tuple[str, str]: + """Manually fire one pipeline, bypassing cadence and mode. Honors kill switch. + + Returns (outcome, result_str). Raises ValueError on unknown name. + """ + if kill_switch_state(): + return ("skipped", "kill switch armed") + p = _get_pipeline(name) + if p is None: + raise ValueError(f"unknown pipeline: {name}") + return _fire(p) + + +# ---------- background loop --------------------------------------------------- + +async def start_background_loop(interval_seconds: int = 30) -> None: + """Long-running scheduler — calls tick() every interval. Launched from FastAPI lifespan. + + Designed to run for the life of the process; cancellation is the normal stop signal. + """ + _log.info("pulse.loop.starting", interval=interval_seconds) + while True: + try: + tick() + except Exception as exc: # noqa: BLE001 — one bad tick must not kill the loop + _log.warning("pulse.loop.tick.error", error=str(exc), trace=traceback.format_exc()) + try: + await asyncio.sleep(interval_seconds) + except asyncio.CancelledError: + _log.info("pulse.loop.cancelled") + raise diff --git a/tests/test_pulse.py b/tests/test_pulse.py new file mode 100644 index 0000000..b327585 --- /dev/null +++ b/tests/test_pulse.py @@ -0,0 +1,218 @@ +"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import pulse + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + """A temp SQLite + a swapped-in registry of fast, deterministic runners. + + Real runners would talk to scout / classify / proof / etc — none of which + we want to exercise from a pulse-only test. We replace _REGISTRY in-place + so tick() drives the scheduler logic against trivial counters. + """ + test_db = tmp_path / "pulse.db" + test_engine = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(test_engine, checkfirst=True) + monkeypatch.setattr(db, "_engine", test_engine) + monkeypatch.setattr(db, "DB_PATH", test_db) + + counters = {name: 0 for name in pulse._REGISTRY} + + def _make(name: str): + def runner() -> str: + counters[name] += 1 + return f"{name}: tick {counters[name]}" + return runner + + fake_registry = {name: _make(name) for name in pulse._REGISTRY} + monkeypatch.setattr(pulse, "_REGISTRY", fake_registry) + return counters + + +def _set_last_fired(name: str, when: datetime, cadence: int) -> None: + """Force a pipeline's last_fired + next_fire to a known instant.""" + for p in pulse.state(): + if p.name == name: + p.last_fired = when + p.next_fire = when + timedelta(seconds=cadence) + db.upsert_pulse_pipeline(pulse._pipeline_to_row(p)) + return + raise AssertionError(f"unknown pipeline {name}") + + +def test_defaults_seeded_on_first_state(fresh_db): + pipelines = pulse.state() + names = {p.name for p in pipelines} + assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"} + # Spot-check a couple of defaults from the spec. + by_name = {p.name: p for p in pipelines} + assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE + assert by_name["fetch"].cadence_seconds == 900 + assert by_name["peer-pull"].enabled is False + assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE + + +def test_state_persists_across_reads(fresh_db): + pulse.set_cadence("fetch", 42) + pulse.set_mode("classify", pulse.PulseMode.MANUAL) + pulse.set_enabled("reindex", False) + by_name = {p.name: p for p in pulse.state()} + assert by_name["fetch"].cadence_seconds == 42 + assert by_name["classify"].mode == pulse.PulseMode.MANUAL + assert by_name["reindex"].enabled is False + + +def test_tick_skips_not_due_pipelines(fresh_db): + # Push every pipeline well into the future — nothing should fire. + now = datetime.now(timezone.utc) + for p in pulse.state(): + _set_last_fired(p.name, now, p.cadence_seconds) + results = pulse.tick() + outcomes = {name: outcome for name, outcome, _ in results} + assert all(o == "skipped" for o in outcomes.values()), outcomes + assert all(c == 0 for c in fresh_db.values()), fresh_db + + +def test_tick_fires_due_auto_pipelines(fresh_db): + # Force "fetch" to be due (last fired far in the past) and re-tick. + past = datetime.now(timezone.utc) - timedelta(hours=24) + for p in pulse.state(): + _set_last_fired(p.name, past, p.cadence_seconds) + results = pulse.tick() + by_name = {name: (outcome, result) for name, outcome, result in results} + # auto-execute + auto-propose modes should fire; manual should skip. + assert by_name["fetch"][0] == "ok" + assert by_name["classify"][0] == "ok" + assert by_name["prove"][0] == "ok" + assert by_name["reindex"][0] == "ok" + assert by_name["respond"][0] == "ok" + # Manual + disabled pipelines stay skipped. + assert by_name["peer-pull"][0] == "skipped" + assert by_name["vouch-refresh"][0] == "skipped" + + +def test_tick_respects_manual_mode(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + pulse.set_mode("fetch", pulse.PulseMode.MANUAL) + results = pulse.tick() + by_name = {n: o for n, o, _ in results} + assert by_name["fetch"] == "skipped" + assert fresh_db["fetch"] == 0 + + +def test_tick_respects_disabled(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + pulse.set_enabled("fetch", False) + results = pulse.tick() + by_name = {n: o for n, o, _ in results} + assert by_name["fetch"] == "skipped" + assert fresh_db["fetch"] == 0 + + +def test_kill_switch_halts_everything(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + for p in pulse.state(): + _set_last_fired(p.name, past, p.cadence_seconds) + pulse.set_kill_switch(True) + results = pulse.tick() + assert all(o == "skipped" for _, o, _ in results) + assert all(c == 0 for c in fresh_db.values()) + # And killswitch state itself round-trips. + assert pulse.kill_switch_state() is True + pulse.set_kill_switch(False) + assert pulse.kill_switch_state() is False + + +def test_tick_updates_last_fired_and_next_fire(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + before = datetime.now(timezone.utc) + pulse.tick() + p = {x.name: x for x in pulse.state()}["fetch"] + assert p.last_fired is not None + assert p.last_fired >= before - timedelta(seconds=2) + assert p.next_fire is not None + assert p.next_fire > p.last_fired + delta = (p.next_fire - p.last_fired).total_seconds() + # 900s cadence, allow a 2s rounding window. + assert 898 <= delta <= 902 + assert p.last_outcome == "ok" + assert "fetch: tick" in p.last_result + + +def test_run_now_bypasses_cadence(fresh_db): + # Pipeline isn't due — run_now must still fire. + future = datetime.now(timezone.utc) + timedelta(hours=1) + for p in pulse.state(): + p.last_fired = datetime.now(timezone.utc) + p.next_fire = future + db.upsert_pulse_pipeline(pulse._pipeline_to_row(p)) + outcome, result = pulse.run_now("fetch") + assert outcome == "ok" + assert "fetch: tick" in result + assert fresh_db["fetch"] == 1 + + +def test_run_now_respects_kill_switch(fresh_db): + pulse.set_kill_switch(True) + outcome, result = pulse.run_now("fetch") + assert outcome == "skipped" + assert "kill switch" in result + assert fresh_db["fetch"] == 0 + + +def test_run_now_even_when_manual(fresh_db): + """Manual mode blocks tick() but NOT run_now — that's the whole point of manual.""" + pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL) + pulse.set_enabled("peer-pull", True) + outcome, _ = pulse.run_now("peer-pull") + assert outcome == "ok" + assert fresh_db["peer-pull"] == 1 + + +def test_runner_exception_recorded_as_err(fresh_db, monkeypatch): + def boom() -> str: + raise RuntimeError("nope") + # Inject a failing runner just for "fetch". + bad_registry = dict(pulse._REGISTRY) + bad_registry["fetch"] = boom + monkeypatch.setattr(pulse, "_REGISTRY", bad_registry) + + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + results = pulse.tick() + by_name = {n: (o, r) for n, o, r in results} + assert by_name["fetch"][0] == "err" + assert "nope" in by_name["fetch"][1] + # And the failure is persisted, with next_fire still pushed forward so we + # don't busy-retry a broken runner every tick. + p = {x.name: x for x in pulse.state()}["fetch"] + assert p.last_outcome == "err" + assert p.next_fire is not None and p.next_fire > p.last_fired + + +def test_unknown_pipeline_raises(fresh_db): + with pytest.raises(ValueError): + pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL) + with pytest.raises(ValueError): + pulse.set_cadence("does-not-exist", 60) + with pytest.raises(ValueError): + pulse.run_now("does-not-exist") + + +def test_invalid_cadence_raises(fresh_db): + with pytest.raises(ValueError): + pulse.set_cadence("fetch", 0) + with pytest.raises(ValueError): + pulse.set_cadence("fetch", -5)