Files
psyc/tests/test_pulse.py
2026-06-06 16:06:54 +02:00

219 lines
7.9 KiB
Python

"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import pulse
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
"""A temp SQLite + a swapped-in registry of fast, deterministic runners.
Real runners would talk to scout / classify / proof / etc — none of which
we want to exercise from a pulse-only test. We replace _REGISTRY in-place
so tick() drives the scheduler logic against trivial counters.
"""
test_db = tmp_path / "pulse.db"
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(test_engine, checkfirst=True)
monkeypatch.setattr(db, "_engine", test_engine)
monkeypatch.setattr(db, "DB_PATH", test_db)
counters = {name: 0 for name in pulse._REGISTRY}
def _make(name: str):
def runner() -> str:
counters[name] += 1
return f"{name}: tick {counters[name]}"
return runner
fake_registry = {name: _make(name) for name in pulse._REGISTRY}
monkeypatch.setattr(pulse, "_REGISTRY", fake_registry)
return counters
def _set_last_fired(name: str, when: datetime, cadence: int) -> None:
"""Force a pipeline's last_fired + next_fire to a known instant."""
for p in pulse.state():
if p.name == name:
p.last_fired = when
p.next_fire = when + timedelta(seconds=cadence)
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
return
raise AssertionError(f"unknown pipeline {name}")
def test_defaults_seeded_on_first_state(fresh_db):
pipelines = pulse.state()
names = {p.name for p in pipelines}
assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"}
# Spot-check a couple of defaults from the spec.
by_name = {p.name: p for p in pipelines}
assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE
assert by_name["fetch"].cadence_seconds == 900
assert by_name["peer-pull"].enabled is False
assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE
def test_state_persists_across_reads(fresh_db):
pulse.set_cadence("fetch", 42)
pulse.set_mode("classify", pulse.PulseMode.MANUAL)
pulse.set_enabled("reindex", False)
by_name = {p.name: p for p in pulse.state()}
assert by_name["fetch"].cadence_seconds == 42
assert by_name["classify"].mode == pulse.PulseMode.MANUAL
assert by_name["reindex"].enabled is False
def test_tick_skips_not_due_pipelines(fresh_db):
# Push every pipeline well into the future — nothing should fire.
now = datetime.now(timezone.utc)
for p in pulse.state():
_set_last_fired(p.name, now, p.cadence_seconds)
results = pulse.tick()
outcomes = {name: outcome for name, outcome, _ in results}
assert all(o == "skipped" for o in outcomes.values()), outcomes
assert all(c == 0 for c in fresh_db.values()), fresh_db
def test_tick_fires_due_auto_pipelines(fresh_db):
# Force "fetch" to be due (last fired far in the past) and re-tick.
past = datetime.now(timezone.utc) - timedelta(hours=24)
for p in pulse.state():
_set_last_fired(p.name, past, p.cadence_seconds)
results = pulse.tick()
by_name = {name: (outcome, result) for name, outcome, result in results}
# auto-execute + auto-propose modes should fire; manual should skip.
assert by_name["fetch"][0] == "ok"
assert by_name["classify"][0] == "ok"
assert by_name["prove"][0] == "ok"
assert by_name["reindex"][0] == "ok"
assert by_name["respond"][0] == "ok"
# Manual + disabled pipelines stay skipped.
assert by_name["peer-pull"][0] == "skipped"
assert by_name["vouch-refresh"][0] == "skipped"
def test_tick_respects_manual_mode(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
pulse.set_mode("fetch", pulse.PulseMode.MANUAL)
results = pulse.tick()
by_name = {n: o for n, o, _ in results}
assert by_name["fetch"] == "skipped"
assert fresh_db["fetch"] == 0
def test_tick_respects_disabled(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
pulse.set_enabled("fetch", False)
results = pulse.tick()
by_name = {n: o for n, o, _ in results}
assert by_name["fetch"] == "skipped"
assert fresh_db["fetch"] == 0
def test_kill_switch_halts_everything(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
for p in pulse.state():
_set_last_fired(p.name, past, p.cadence_seconds)
pulse.set_kill_switch(True)
results = pulse.tick()
assert all(o == "skipped" for _, o, _ in results)
assert all(c == 0 for c in fresh_db.values())
# And killswitch state itself round-trips.
assert pulse.kill_switch_state() is True
pulse.set_kill_switch(False)
assert pulse.kill_switch_state() is False
def test_tick_updates_last_fired_and_next_fire(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
before = datetime.now(timezone.utc)
pulse.tick()
p = {x.name: x for x in pulse.state()}["fetch"]
assert p.last_fired is not None
assert p.last_fired >= before - timedelta(seconds=2)
assert p.next_fire is not None
assert p.next_fire > p.last_fired
delta = (p.next_fire - p.last_fired).total_seconds()
# 900s cadence, allow a 2s rounding window.
assert 898 <= delta <= 902
assert p.last_outcome == "ok"
assert "fetch: tick" in p.last_result
def test_run_now_bypasses_cadence(fresh_db):
# Pipeline isn't due — run_now must still fire.
future = datetime.now(timezone.utc) + timedelta(hours=1)
for p in pulse.state():
p.last_fired = datetime.now(timezone.utc)
p.next_fire = future
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
outcome, result = pulse.run_now("fetch")
assert outcome == "ok"
assert "fetch: tick" in result
assert fresh_db["fetch"] == 1
def test_run_now_respects_kill_switch(fresh_db):
pulse.set_kill_switch(True)
outcome, result = pulse.run_now("fetch")
assert outcome == "skipped"
assert "kill switch" in result
assert fresh_db["fetch"] == 0
def test_run_now_even_when_manual(fresh_db):
"""Manual mode blocks tick() but NOT run_now — that's the whole point of manual."""
pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL)
pulse.set_enabled("peer-pull", True)
outcome, _ = pulse.run_now("peer-pull")
assert outcome == "ok"
assert fresh_db["peer-pull"] == 1
def test_runner_exception_recorded_as_err(fresh_db, monkeypatch):
def boom() -> str:
raise RuntimeError("nope")
# Inject a failing runner just for "fetch".
bad_registry = dict(pulse._REGISTRY)
bad_registry["fetch"] = boom
monkeypatch.setattr(pulse, "_REGISTRY", bad_registry)
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
results = pulse.tick()
by_name = {n: (o, r) for n, o, r in results}
assert by_name["fetch"][0] == "err"
assert "nope" in by_name["fetch"][1]
# And the failure is persisted, with next_fire still pushed forward so we
# don't busy-retry a broken runner every tick.
p = {x.name: x for x in pulse.state()}["fetch"]
assert p.last_outcome == "err"
assert p.next_fire is not None and p.next_fire > p.last_fired
def test_unknown_pipeline_raises(fresh_db):
with pytest.raises(ValueError):
pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL)
with pytest.raises(ValueError):
pulse.set_cadence("does-not-exist", 60)
with pytest.raises(ValueError):
pulse.run_now("does-not-exist")
def test_invalid_cadence_raises(fresh_db):
with pytest.raises(ValueError):
pulse.set_cadence("fetch", 0)
with pytest.raises(ValueError):
pulse.set_cadence("fetch", -5)