merge pulse: scheduler line + autonomy dial

This commit is contained in:
m17hr1l
2026-06-06 16:11:17 +02:00
6 changed files with 1011 additions and 0 deletions

122
src/psyc/_pulse_cli.py Normal file
View File

@@ -0,0 +1,122 @@
"""Typer commands for the Pulse scheduler.
Imported and wired by `cli.py` via `register(app)`. Kept as a separate module
so the main CLI surface stays grep-friendly and the scheduler can grow its own
verbs without bloating cli.py.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
import typer
from psyc import db
from psyc.lines import pulse
def _relative(dt: Optional[datetime]) -> str:
if dt is None:
return ""
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = int((dt - now).total_seconds())
past = delta < 0
secs = abs(delta)
if secs < 5:
return "now"
if secs < 60:
unit = f"{secs}s"
elif secs < 3600:
unit = f"{secs // 60}m"
elif secs < 86400:
unit = f"{secs // 3600}h"
else:
unit = f"{secs // 86400}d"
return f"{unit} ago" if past else f"in {unit}"
def register(typer_app: typer.Typer) -> None:
"""Add pulse-* commands to the given Typer app."""
@typer_app.command("pulse-status")
def pulse_status() -> None:
"""Print the pipeline table (mode · cadence · last-fired · next-fire · last-result)."""
db.init_db()
ks = pulse.kill_switch_state()
typer.echo(f"kill switch: {'ARMED' if ks else 'OFF'}")
rows = pulse.state()
if not rows:
typer.echo("(no pipelines registered)")
return
typer.echo(f"{'name':<16} {'mode':<14} {'cadence':>8} {'enabled':<7} {'last':<10} {'next':<10} result")
for p in rows:
typer.echo(
f"{p.name:<16} {p.mode.value:<14} {p.cadence_seconds:>6}s "
f"{'yes' if p.enabled else 'no':<7} "
f"{_relative(p.last_fired):<10} {_relative(p.next_fire):<10} "
f"{(p.last_result or '')[:60]}"
)
@typer_app.command("pulse-tick")
def pulse_tick() -> None:
"""Run one scheduler heartbeat and print the per-pipeline outcomes."""
db.init_db()
results = pulse.tick()
for name, outcome, result in results:
marker = {"ok": "", "err": "", "skipped": ""}.get(outcome, "·")
typer.echo(f" {marker} {name:<16} {outcome:<8} {result[:120]}")
@typer_app.command("pulse-set-mode")
def pulse_set_mode(
name: str = typer.Argument(..., help="pipeline name"),
mode: str = typer.Argument(..., help="manual | auto-propose | auto-execute"),
) -> None:
db.init_db()
try:
pulse.set_mode(name, pulse.PulseMode(mode))
except ValueError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(1)
typer.echo(f"{name} mode → {mode}")
@typer_app.command("pulse-set-cadence")
def pulse_set_cadence(
name: str = typer.Argument(..., help="pipeline name"),
seconds: int = typer.Argument(..., help="cadence in seconds (>0)"),
) -> None:
db.init_db()
try:
pulse.set_cadence(name, seconds)
except ValueError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(1)
typer.echo(f"{name} cadence → {seconds}s")
@typer_app.command("pulse-run")
def pulse_run(name: str = typer.Argument(..., help="pipeline name")) -> None:
"""Manually fire one pipeline (bypasses cadence; honors kill switch)."""
db.init_db()
try:
outcome, result = pulse.run_now(name)
except ValueError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(1)
marker = {"ok": "", "err": "", "skipped": ""}.get(outcome, "·")
typer.echo(f" {marker} {name}: {outcome}{result}")
@typer_app.command("pulse-kill")
def pulse_kill() -> None:
"""Arm the kill switch — every pipeline halts on the next tick."""
db.init_db()
pulse.set_kill_switch(True)
typer.echo("kill switch ARMED — all pipelines halted")
@typer_app.command("pulse-unkill")
def pulse_unkill() -> None:
"""Disarm the kill switch — pulse resumes on the next tick."""
db.init_db()
pulse.set_kill_switch(False)
typer.echo("kill switch disarmed — pulse resumes")

View File

@@ -0,0 +1,120 @@
"""Cockpit routes for the Pulse scheduler — admin-gated.
The integration is intentionally single-call: `register(app, TEMPLATES)` adds
the routes AND wires the FastAPI startup hook that launches the background
scheduler loop. Caller in app.py just imports + invokes register().
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from psyc import log
from psyc.lines import pulse
_log = log.get(__name__)
TICK_INTERVAL_SECONDS = 30
def _admin_ok(request: Request) -> bool:
"""Mirror of the local helper in app.py — admin session is just session['admin_ok']."""
return bool(request.session.get("admin_ok"))
def _relative(dt: Optional[datetime]) -> str:
"""Human-friendly "3m ago" / "in 12m" / "now". None → ''."""
if dt is None:
return ""
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = (dt - now).total_seconds()
past = delta < 0
secs = abs(int(delta))
if secs < 5:
return "now"
if secs < 60:
unit = f"{secs}s"
elif secs < 3600:
unit = f"{secs // 60}m"
elif secs < 86400:
unit = f"{secs // 3600}h"
else:
unit = f"{secs // 86400}d"
return f"{unit} ago" if past else f"in {unit}"
def register(app: FastAPI, templates: Jinja2Templates) -> None:
"""Attach the /admin/pulse routes and the background scheduler loop to `app`."""
@app.get("/admin/pulse", response_class=HTMLResponse)
def pulse_view(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
flash = request.query_params.get("flash", "")
pipelines = pulse.state()
return templates.TemplateResponse(
request,
"admin_pulse.html",
{
"pipelines": pipelines,
"kill_switch": pulse.kill_switch_state(),
"tick_interval": TICK_INTERVAL_SECONDS,
"relative": _relative,
"flash": flash,
},
)
@app.post("/admin/pulse/kill")
def pulse_toggle_kill(request: Request) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
new = not pulse.kill_switch_state()
pulse.set_kill_switch(new)
flash = "kill switch ARMED — all pipelines halted" if new else "kill switch disarmed — pulse resumes"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.post("/admin/pulse/{name}/update")
def pulse_update(
request: Request,
name: str,
mode: str = Form(...),
cadence_seconds: int = Form(...),
enabled: Optional[str] = Form(None),
) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
try:
pulse.set_mode(name, pulse.PulseMode(mode))
pulse.set_cadence(name, int(cadence_seconds))
pulse.set_enabled(name, enabled is not None)
flash = f"updated {name}: mode={mode}, cadence={cadence_seconds}s, enabled={enabled is not None}"
except (ValueError, KeyError) as exc:
_log.warning("pulse.update.error", name=name, error=str(exc))
flash = f"update failed: {exc}"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.post("/admin/pulse/{name}/run")
def pulse_run_now(request: Request, name: str) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
try:
outcome, result = pulse.run_now(name)
flash = f"{name}{outcome}: {result[:120]}"
except ValueError as exc:
flash = f"run failed: {exc}"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.on_event("startup")
async def _start_pulse_loop() -> None:
# Fire-and-forget; the loop catches its own exceptions and self-restarts.
asyncio.create_task(pulse.start_background_loop(interval_seconds=TICK_INTERVAL_SECONDS))
_log.info("pulse.routes.registered", tick=TICK_INTERVAL_SECONDS)

View File

@@ -0,0 +1,101 @@
{% extends "base.html" %}
{% block title %}Pulse — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Pulse — autonomous heartbeat</h1>
<span class="count">{{ pipelines|length }} pipeline{{ '' if pipelines|length == 1 else 's' }}</span>
</div>
<p class="page-intro">Cron-style scheduler that drives every psyc line on a cadence without human input. Each pipeline has an autonomy mode and a cadence in seconds. The kill switch halts everything instantly — it overrides cadence, mode, and the enabled flag.</p>
<p class="back"><a href="/admin">← back to admin</a></p>
{% if flash %}
<div class="verdict verdict-clean">{{ flash }}</div>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Global kill switch</h2>
<span class="count">{{ 'ARMED' if kill_switch else 'OFF' }}</span>
</div>
{% if kill_switch %}
<div class="verdict" style="background:rgba(248,113,113,0.12); border:1px solid rgba(248,113,113,0.45); color:#fca5a5; padding:14px 18px; border-radius:6px; font-weight:700; letter-spacing:0.04em;">
✗ KILL SWITCH ARMED — every pipeline is paused. tick() returns "skipped" for everything. Run-now is also blocked. Toggle off to resume.
</div>
<form method="post" action="/admin/pulse/kill" style="margin-top:14px;">
<button type="submit" class="btn btn-approve">Disarm kill switch</button>
</form>
{% else %}
<div class="verdict verdict-clean">✓ Pulse is live — the background loop ticks every {{ tick_interval }}s.</div>
<form method="post" action="/admin/pulse/kill" style="margin-top:14px;"
onsubmit="return confirm('Arm the kill switch? Every pipeline halts immediately.');">
<button type="submit" class="btn btn-reject">Arm kill switch</button>
</form>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Pipelines</h2>
<span class="count">{{ pipelines|selectattr('enabled')|list|length }} enabled</span>
</div>
<p class="page-intro">Mode: <code>auto-execute</code> fires the line, <code>auto-propose</code> stages proposals for human approval, <code>manual</code> runs only when you press “Run now”. Cadence is the gap between ticks; the loop wakes up every {{ tick_interval }}s.</p>
<table class="ledger">
<thead>
<tr>
<th style="width:24%;">Pipeline</th>
<th>Mode · cadence · enabled</th>
<th style="width:11%;">Last fired</th>
<th style="width:11%;">Next fire</th>
<th>Last result</th>
<th style="width:1%;"></th>
</tr>
</thead>
<tbody>
{% for p in pipelines %}
<tr class="ledger-row">
<td>
<strong>{{ p.title }}</strong>
<div class="lg-sub"><code>{{ p.name }}</code> · {{ p.description }}</div>
</td>
<td>
<form method="post" action="/admin/pulse/{{ p.name }}/update" class="queue-action" style="display:flex; gap:8px; align-items:center; flex-wrap:wrap;">
<select name="mode" style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px;">
<option value="auto-execute" {% if p.mode.value == 'auto-execute' %}selected{% endif %}>auto-execute</option>
<option value="auto-propose" {% if p.mode.value == 'auto-propose' %}selected{% endif %}>auto-propose</option>
<option value="manual" {% if p.mode.value == 'manual' %}selected{% endif %}>manual</option>
</select>
<input type="number" name="cadence_seconds" value="{{ p.cadence_seconds }}" min="1" step="1"
style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px; width:84px;" title="cadence in seconds">
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
<input type="checkbox" name="enabled" value="1" {% if p.enabled %}checked{% endif %}> enabled
</label>
<button type="submit" class="btn">save</button>
</form>
</td>
<td class="lg-ts">
{% if p.last_fired %}<span title="{{ p.last_fired.isoformat() }}">{{ relative(p.last_fired) }}</span>{% else %}—{% endif %}
</td>
<td class="lg-ts">
{% if p.next_fire %}<span title="{{ p.next_fire.isoformat() }}">{{ relative(p.next_fire) }}</span>{% else %}—{% endif %}
</td>
<td class="lg-sub" style="max-width:0;">
{% if p.last_outcome == 'ok' %}<span style="color: var(--green);"></span>
{% elif p.last_outcome == 'err' %}<span style="color: var(--red);"></span>
{% elif p.last_outcome == 'skipped' %}<span style="color: var(--muted);"></span>
{% endif %}
{{ (p.last_result or '—')[:60] }}{% if (p.last_result or '')|length > 60 %}…{% endif %}
</td>
<td>
<form method="post" action="/admin/pulse/{{ p.name }}/run" class="queue-action">
<button type="submit" class="btn btn-enforce" title="Fire now, regardless of cadence">Run now</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

View File

@@ -114,6 +114,26 @@ Index("iocs_value_idx", iocs.c.value)
Index("iocs_type_idx", iocs.c.ioc_type)
Index("iocs_case_idx", iocs.c.case_id)
pulse_pipelines = Table(
"pulse_pipelines", _metadata,
Column("name", String, primary_key=True),
Column("title", String, nullable=False),
Column("description", Text, nullable=False, default=""),
Column("mode", String, nullable=False), # manual | auto-propose | auto-execute
Column("cadence_seconds", Integer, nullable=False),
Column("enabled", Boolean, nullable=False, default=True),
Column("last_fired", String, nullable=True), # ISO timestamp or NULL
Column("next_fire", String, nullable=True), # ISO timestamp or NULL
Column("last_result", Text, nullable=False, default=""),
Column("last_outcome", String, nullable=False, default=""), # ok | err | skipped | ""
)
pulse_settings = Table(
"pulse_settings", _metadata,
Column("key", String, primary_key=True),
Column("value", String, nullable=False),
)
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -214,3 +234,53 @@ def ioc_count(db_path: Path = DB_PATH) -> int:
stmt = select(func.count()).select_from(iocs)
with engine(db_path).connect() as conn:
return conn.execute(stmt).scalar_one()
# ---------- pulse scheduler ----------------------------------------------
def get_pulse_state(db_path: Path = DB_PATH) -> List[dict]:
"""Every registered pipeline, ordered by name."""
stmt = select(pulse_pipelines).order_by(pulse_pipelines.c.name)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def upsert_pulse_pipeline(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert or update one pipeline by name."""
stmt = sqlite_insert(pulse_pipelines).values(**row)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_pipelines.c.name],
set_=dict(
title=stmt.excluded.title,
description=stmt.excluded.description,
mode=stmt.excluded.mode,
cadence_seconds=stmt.excluded.cadence_seconds,
enabled=stmt.excluded.enabled,
last_fired=stmt.excluded.last_fired,
next_fire=stmt.excluded.next_fire,
last_result=stmt.excluded.last_result,
last_outcome=stmt.excluded.last_outcome,
),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def kill_switch_get(db_path: Path = DB_PATH) -> bool:
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == "kill_switch")
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
if row is None:
return False
return str(row.value) == "1"
def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
value = "1" if armed else "0"
stmt = sqlite_insert(pulse_settings).values(key="kill_switch", value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)

380
src/psyc/lines/pulse.py Normal file
View File

@@ -0,0 +1,380 @@
"""Pulseline — cron-style scheduler that drives every psyc pipeline on a cadence.
Each registered pipeline has an autonomy mode (manual / auto-propose /
auto-execute) and a cadence in seconds. tick() iterates every pipeline and
fires whichever ones are due (and enabled, and not manual, and the global kill
switch is off). State persists to SQLite via psyc.db so cadences survive
restarts. A background asyncio loop calls tick() at a fixed interval — the
cockpit lifespan attaches it.
NOTE: federation pipelines (peer-pull, vouch-refresh) are wired as placeholders
that return a no-op string. Real federation lands in a later stage.
"""
from __future__ import annotations
import asyncio
import traceback
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
from psyc import db, log
_log = log.get(__name__)
class PulseMode(str, Enum):
MANUAL = "manual"
AUTO_PROPOSE = "auto-propose"
AUTO_EXECUTE = "auto-execute"
class Pipeline(BaseModel):
name: str
title: str
description: str
mode: PulseMode
cadence_seconds: int
enabled: bool = True
last_fired: Optional[datetime] = None
next_fire: Optional[datetime] = None
last_result: str = ""
last_outcome: str = "" # "ok" | "err" | "skipped" | ""
# ---------- pipeline runners --------------------------------------------------
def _run_fetch() -> str:
"""Fetch every enabled scout source; partial fetch is fine.
Skip-on-fail is critical: keyed feeds (threatfox, malware-bazaar, otx)
raise when their key isn't configured — we don't want one missing key to
block the public ones.
"""
from psyc.lines import scout
plan: Tuple[Tuple[str, Optional[int]], ...] = (
("urlhaus", 50),
("cisa-kev", 100),
("feodo", 50),
("threatfox", 200),
("malware-bazaar", 100),
("otx", 100),
)
total = 0
feeds_ok = 0
feeds_err: List[str] = []
for source, limit in plan:
try:
cases = scout.fetch_and_signal(source, limit=limit)
for c in cases:
db.upsert_case(c)
total += len(cases)
feeds_ok += 1
except Exception as exc: # noqa: BLE001 — partial fetch is the point
feeds_err.append(source)
_log.info("pulse.fetch.skip", source=source, error=str(exc)[:200])
tail = f" (skipped: {', '.join(feeds_err)})" if feeds_err else ""
return f"fetched {total} cases across {feeds_ok} feed(s){tail}"
def _run_classify() -> str:
from psyc.lines import classify
cases = db.list_cases(limit=10_000)
n = 0
for c in cases:
classify.classify(c)
db.upsert_case(c)
n += 1
return f"classified {n} case(s)"
def _run_prove() -> str:
from psyc.lines import proof
cases = db.list_cases(limit=10_000)
n = 0
for c in cases:
proof.prove(c)
db.upsert_case(c)
n += 1
return f"proved {n} case(s)"
def _run_reindex() -> str:
from psyc.lines import lookup
cases = db.list_cases(limit=1_000_000)
written = lookup.reindex(cases)
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
def _run_respond_propose() -> str:
"""Propose response actions for high-severity cases that don't yet have any.
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
helper is already idempotent per case (skips cases that already have
actions), so re-running this on a tick is safe.
"""
from psyc.lines import respond
cases = db.list_cases(limit=10_000)
proposed = 0
touched = 0
for c in cases:
ids = respond.propose_for_case(c)
if ids:
proposed += len(ids)
touched += 1
return f"proposed {proposed} action(s) for {touched} case(s)"
def _run_peer_pull() -> str:
return "federation not yet active"
def _run_vouch_refresh() -> str:
return "federation not yet active"
# ---------- registry ----------------------------------------------------------
_REGISTRY: Dict[str, Callable[[], str]] = {
"fetch": _run_fetch,
"classify": _run_classify,
"prove": _run_prove,
"reindex": _run_reindex,
"respond": _run_respond_propose,
"peer-pull": _run_peer_pull,
"vouch-refresh": _run_vouch_refresh,
}
# Initial defaults — seeded once on first DB init. Tuples of
# (name, title, description, mode, cadence_seconds, enabled).
_DEFAULTS: Tuple[Tuple[str, str, str, PulseMode, int, bool], ...] = (
("fetch", "Scout · fetch feeds", "Pull every configured threat feed and ingest new cases.",
PulseMode.AUTO_EXECUTE, 900, True),
("classify", "Classify · label cases", "Assign incident type, severity, TLP, and internal class to every case.",
PulseMode.AUTO_EXECUTE, 300, True),
("prove", "Proof · score confidence", "Compute confidence (reliability · credibility · freshness) for every case.",
PulseMode.AUTO_EXECUTE, 300, True),
("reindex", "Lookup · rebuild IOC index","Rebuild the IOC reverse-index over the case corpus.",
PulseMode.AUTO_EXECUTE, 3600, True),
("respond", "Respond · propose actions", "Propose human-gated response actions for newly high-severity cases.",
PulseMode.AUTO_PROPOSE, 600, True),
("peer-pull", "Federation · peer pull", "(placeholder) Pull sealed cases from federated peers.",
PulseMode.MANUAL, 600, False),
("vouch-refresh","Federation · vouch refresh","(placeholder) Refresh per-peer vouching ledgers.",
PulseMode.MANUAL, 3600, False),
)
# ---------- helpers -----------------------------------------------------------
def _now() -> datetime:
return datetime.now(timezone.utc)
def _parse_dt(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def _row_to_pipeline(row: dict) -> Pipeline:
return Pipeline(
name=row["name"],
title=row["title"],
description=row["description"],
mode=PulseMode(row["mode"]),
cadence_seconds=int(row["cadence_seconds"]),
enabled=bool(row["enabled"]),
last_fired=_parse_dt(row.get("last_fired")),
next_fire=_parse_dt(row.get("next_fire")),
last_result=row.get("last_result") or "",
last_outcome=row.get("last_outcome") or "",
)
def _pipeline_to_row(p: Pipeline) -> dict:
return dict(
name=p.name,
title=p.title,
description=p.description,
mode=p.mode.value,
cadence_seconds=int(p.cadence_seconds),
enabled=bool(p.enabled),
last_fired=p.last_fired.isoformat() if p.last_fired else None,
next_fire=p.next_fire.isoformat() if p.next_fire else None,
last_result=p.last_result or "",
last_outcome=p.last_outcome or "",
)
def seed_defaults() -> None:
"""Insert any default pipelines that aren't already in the DB. Idempotent."""
existing = {row["name"] for row in db.get_pulse_state()}
for name, title, desc, mode, cadence, enabled in _DEFAULTS:
if name in existing:
continue
p = Pipeline(
name=name, title=title, description=desc,
mode=mode, cadence_seconds=cadence, enabled=enabled,
next_fire=_now(), # first tick after install fires immediately if due
)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
_log.info("pulse.defaults.seeded", count=len(_DEFAULTS))
def state() -> List[Pipeline]:
"""Every pipeline, ordered by name. Seeds defaults on the first call."""
rows = db.get_pulse_state()
if not rows:
seed_defaults()
rows = db.get_pulse_state()
return [_row_to_pipeline(r) for r in rows]
def _get_pipeline(name: str) -> Optional[Pipeline]:
for p in state():
if p.name == name:
return p
return None
def set_mode(name: str, mode: PulseMode) -> None:
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.mode = mode
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_cadence(name: str, seconds: int) -> None:
if seconds <= 0:
raise ValueError("cadence must be > 0 seconds")
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.cadence_seconds = int(seconds)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_enabled(name: str, enabled: bool) -> None:
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.enabled = bool(enabled)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_kill_switch(armed: bool) -> None:
db.kill_switch_set(armed)
_log.warning("pulse.killswitch.changed", armed=bool(armed))
def kill_switch_state() -> bool:
return db.kill_switch_get()
# ---------- the heartbeat -----------------------------------------------------
def _fire(p: Pipeline) -> Tuple[str, str]:
"""Run one pipeline. Returns (outcome, result_str). Persists the timestamps.
Outcome is "ok" if the runner returned, "err" if it raised.
"""
runner = _REGISTRY.get(p.name)
if runner is None:
outcome = "err"
result = f"no runner registered for '{p.name}'"
else:
try:
result = runner() or ""
outcome = "ok"
except Exception as exc: # noqa: BLE001 — log + record, don't crash the loop
outcome = "err"
result = f"{type(exc).__name__}: {exc}"
_log.warning("pulse.fire.error", name=p.name, error=result, trace=traceback.format_exc())
now = _now()
p.last_fired = now
p.next_fire = now + timedelta(seconds=max(1, p.cadence_seconds))
p.last_result = result[:500]
p.last_outcome = outcome
db.upsert_pulse_pipeline(_pipeline_to_row(p))
_log.info("pulse.fired", name=p.name, outcome=outcome, result=p.last_result)
return outcome, p.last_result
def _should_fire(p: Pipeline, now: datetime) -> bool:
if not p.enabled:
return False
if p.mode == PulseMode.MANUAL:
return False
if p.next_fire is None:
return True
return now >= p.next_fire
def tick() -> List[Tuple[str, str, str]]:
"""Single scheduler heartbeat. Returns (name, outcome, result_str) per pipeline.
Outcome is "ok" / "err" / "skipped" — every registered pipeline appears in
the return value so callers can see what was skipped and why.
"""
if kill_switch_state():
_log.info("pulse.tick.killed")
return [(p.name, "skipped", "kill switch armed") for p in state()]
now = _now()
out: List[Tuple[str, str, str]] = []
for p in state():
if not _should_fire(p, now):
out.append((p.name, "skipped", "not due"))
continue
outcome, result = _fire(p)
out.append((p.name, outcome, result))
return out
def run_now(name: str) -> Tuple[str, str]:
"""Manually fire one pipeline, bypassing cadence and mode. Honors kill switch.
Returns (outcome, result_str). Raises ValueError on unknown name.
"""
if kill_switch_state():
return ("skipped", "kill switch armed")
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
return _fire(p)
# ---------- background loop ---------------------------------------------------
async def start_background_loop(interval_seconds: int = 30) -> None:
"""Long-running scheduler — calls tick() every interval. Launched from FastAPI lifespan.
Designed to run for the life of the process; cancellation is the normal stop signal.
"""
_log.info("pulse.loop.starting", interval=interval_seconds)
while True:
try:
tick()
except Exception as exc: # noqa: BLE001 — one bad tick must not kill the loop
_log.warning("pulse.loop.tick.error", error=str(exc), trace=traceback.format_exc())
try:
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
_log.info("pulse.loop.cancelled")
raise

218
tests/test_pulse.py Normal file
View File

@@ -0,0 +1,218 @@
"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import pulse
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
"""A temp SQLite + a swapped-in registry of fast, deterministic runners.
Real runners would talk to scout / classify / proof / etc — none of which
we want to exercise from a pulse-only test. We replace _REGISTRY in-place
so tick() drives the scheduler logic against trivial counters.
"""
test_db = tmp_path / "pulse.db"
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(test_engine, checkfirst=True)
monkeypatch.setattr(db, "_engine", test_engine)
monkeypatch.setattr(db, "DB_PATH", test_db)
counters = {name: 0 for name in pulse._REGISTRY}
def _make(name: str):
def runner() -> str:
counters[name] += 1
return f"{name}: tick {counters[name]}"
return runner
fake_registry = {name: _make(name) for name in pulse._REGISTRY}
monkeypatch.setattr(pulse, "_REGISTRY", fake_registry)
return counters
def _set_last_fired(name: str, when: datetime, cadence: int) -> None:
"""Force a pipeline's last_fired + next_fire to a known instant."""
for p in pulse.state():
if p.name == name:
p.last_fired = when
p.next_fire = when + timedelta(seconds=cadence)
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
return
raise AssertionError(f"unknown pipeline {name}")
def test_defaults_seeded_on_first_state(fresh_db):
pipelines = pulse.state()
names = {p.name for p in pipelines}
assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"}
# Spot-check a couple of defaults from the spec.
by_name = {p.name: p for p in pipelines}
assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE
assert by_name["fetch"].cadence_seconds == 900
assert by_name["peer-pull"].enabled is False
assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE
def test_state_persists_across_reads(fresh_db):
pulse.set_cadence("fetch", 42)
pulse.set_mode("classify", pulse.PulseMode.MANUAL)
pulse.set_enabled("reindex", False)
by_name = {p.name: p for p in pulse.state()}
assert by_name["fetch"].cadence_seconds == 42
assert by_name["classify"].mode == pulse.PulseMode.MANUAL
assert by_name["reindex"].enabled is False
def test_tick_skips_not_due_pipelines(fresh_db):
# Push every pipeline well into the future — nothing should fire.
now = datetime.now(timezone.utc)
for p in pulse.state():
_set_last_fired(p.name, now, p.cadence_seconds)
results = pulse.tick()
outcomes = {name: outcome for name, outcome, _ in results}
assert all(o == "skipped" for o in outcomes.values()), outcomes
assert all(c == 0 for c in fresh_db.values()), fresh_db
def test_tick_fires_due_auto_pipelines(fresh_db):
# Force "fetch" to be due (last fired far in the past) and re-tick.
past = datetime.now(timezone.utc) - timedelta(hours=24)
for p in pulse.state():
_set_last_fired(p.name, past, p.cadence_seconds)
results = pulse.tick()
by_name = {name: (outcome, result) for name, outcome, result in results}
# auto-execute + auto-propose modes should fire; manual should skip.
assert by_name["fetch"][0] == "ok"
assert by_name["classify"][0] == "ok"
assert by_name["prove"][0] == "ok"
assert by_name["reindex"][0] == "ok"
assert by_name["respond"][0] == "ok"
# Manual + disabled pipelines stay skipped.
assert by_name["peer-pull"][0] == "skipped"
assert by_name["vouch-refresh"][0] == "skipped"
def test_tick_respects_manual_mode(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
pulse.set_mode("fetch", pulse.PulseMode.MANUAL)
results = pulse.tick()
by_name = {n: o for n, o, _ in results}
assert by_name["fetch"] == "skipped"
assert fresh_db["fetch"] == 0
def test_tick_respects_disabled(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
pulse.set_enabled("fetch", False)
results = pulse.tick()
by_name = {n: o for n, o, _ in results}
assert by_name["fetch"] == "skipped"
assert fresh_db["fetch"] == 0
def test_kill_switch_halts_everything(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
for p in pulse.state():
_set_last_fired(p.name, past, p.cadence_seconds)
pulse.set_kill_switch(True)
results = pulse.tick()
assert all(o == "skipped" for _, o, _ in results)
assert all(c == 0 for c in fresh_db.values())
# And killswitch state itself round-trips.
assert pulse.kill_switch_state() is True
pulse.set_kill_switch(False)
assert pulse.kill_switch_state() is False
def test_tick_updates_last_fired_and_next_fire(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
before = datetime.now(timezone.utc)
pulse.tick()
p = {x.name: x for x in pulse.state()}["fetch"]
assert p.last_fired is not None
assert p.last_fired >= before - timedelta(seconds=2)
assert p.next_fire is not None
assert p.next_fire > p.last_fired
delta = (p.next_fire - p.last_fired).total_seconds()
# 900s cadence, allow a 2s rounding window.
assert 898 <= delta <= 902
assert p.last_outcome == "ok"
assert "fetch: tick" in p.last_result
def test_run_now_bypasses_cadence(fresh_db):
# Pipeline isn't due — run_now must still fire.
future = datetime.now(timezone.utc) + timedelta(hours=1)
for p in pulse.state():
p.last_fired = datetime.now(timezone.utc)
p.next_fire = future
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
outcome, result = pulse.run_now("fetch")
assert outcome == "ok"
assert "fetch: tick" in result
assert fresh_db["fetch"] == 1
def test_run_now_respects_kill_switch(fresh_db):
pulse.set_kill_switch(True)
outcome, result = pulse.run_now("fetch")
assert outcome == "skipped"
assert "kill switch" in result
assert fresh_db["fetch"] == 0
def test_run_now_even_when_manual(fresh_db):
"""Manual mode blocks tick() but NOT run_now — that's the whole point of manual."""
pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL)
pulse.set_enabled("peer-pull", True)
outcome, _ = pulse.run_now("peer-pull")
assert outcome == "ok"
assert fresh_db["peer-pull"] == 1
def test_runner_exception_recorded_as_err(fresh_db, monkeypatch):
def boom() -> str:
raise RuntimeError("nope")
# Inject a failing runner just for "fetch".
bad_registry = dict(pulse._REGISTRY)
bad_registry["fetch"] = boom
monkeypatch.setattr(pulse, "_REGISTRY", bad_registry)
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
results = pulse.tick()
by_name = {n: (o, r) for n, o, r in results}
assert by_name["fetch"][0] == "err"
assert "nope" in by_name["fetch"][1]
# And the failure is persisted, with next_fire still pushed forward so we
# don't busy-retry a broken runner every tick.
p = {x.name: x for x in pulse.state()}["fetch"]
assert p.last_outcome == "err"
assert p.next_fire is not None and p.next_fire > p.last_fired
def test_unknown_pipeline_raises(fresh_db):
with pytest.raises(ValueError):
pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL)
with pytest.raises(ValueError):
pulse.set_cadence("does-not-exist", 60)
with pytest.raises(ValueError):
pulse.run_now("does-not-exist")
def test_invalid_cadence_raises(fresh_db):
with pytest.raises(ValueError):
pulse.set_cadence("fetch", 0)
with pytest.raises(ValueError):
pulse.set_cadence("fetch", -5)