stage-33e pulse: tests
This commit is contained in:
218
tests/test_pulse.py
Normal file
218
tests/test_pulse.py
Normal file
@@ -0,0 +1,218 @@
|
||||
"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from psyc import db
|
||||
from psyc.lines import pulse
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
"""A temp SQLite + a swapped-in registry of fast, deterministic runners.
|
||||
|
||||
Real runners would talk to scout / classify / proof / etc — none of which
|
||||
we want to exercise from a pulse-only test. We replace _REGISTRY in-place
|
||||
so tick() drives the scheduler logic against trivial counters.
|
||||
"""
|
||||
test_db = tmp_path / "pulse.db"
|
||||
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
|
||||
db._metadata.create_all(test_engine, checkfirst=True)
|
||||
monkeypatch.setattr(db, "_engine", test_engine)
|
||||
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||
|
||||
counters = {name: 0 for name in pulse._REGISTRY}
|
||||
|
||||
def _make(name: str):
|
||||
def runner() -> str:
|
||||
counters[name] += 1
|
||||
return f"{name}: tick {counters[name]}"
|
||||
return runner
|
||||
|
||||
fake_registry = {name: _make(name) for name in pulse._REGISTRY}
|
||||
monkeypatch.setattr(pulse, "_REGISTRY", fake_registry)
|
||||
return counters
|
||||
|
||||
|
||||
def _set_last_fired(name: str, when: datetime, cadence: int) -> None:
|
||||
"""Force a pipeline's last_fired + next_fire to a known instant."""
|
||||
for p in pulse.state():
|
||||
if p.name == name:
|
||||
p.last_fired = when
|
||||
p.next_fire = when + timedelta(seconds=cadence)
|
||||
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
|
||||
return
|
||||
raise AssertionError(f"unknown pipeline {name}")
|
||||
|
||||
|
||||
def test_defaults_seeded_on_first_state(fresh_db):
|
||||
pipelines = pulse.state()
|
||||
names = {p.name for p in pipelines}
|
||||
assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"}
|
||||
# Spot-check a couple of defaults from the spec.
|
||||
by_name = {p.name: p for p in pipelines}
|
||||
assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE
|
||||
assert by_name["fetch"].cadence_seconds == 900
|
||||
assert by_name["peer-pull"].enabled is False
|
||||
assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE
|
||||
|
||||
|
||||
def test_state_persists_across_reads(fresh_db):
|
||||
pulse.set_cadence("fetch", 42)
|
||||
pulse.set_mode("classify", pulse.PulseMode.MANUAL)
|
||||
pulse.set_enabled("reindex", False)
|
||||
by_name = {p.name: p for p in pulse.state()}
|
||||
assert by_name["fetch"].cadence_seconds == 42
|
||||
assert by_name["classify"].mode == pulse.PulseMode.MANUAL
|
||||
assert by_name["reindex"].enabled is False
|
||||
|
||||
|
||||
def test_tick_skips_not_due_pipelines(fresh_db):
|
||||
# Push every pipeline well into the future — nothing should fire.
|
||||
now = datetime.now(timezone.utc)
|
||||
for p in pulse.state():
|
||||
_set_last_fired(p.name, now, p.cadence_seconds)
|
||||
results = pulse.tick()
|
||||
outcomes = {name: outcome for name, outcome, _ in results}
|
||||
assert all(o == "skipped" for o in outcomes.values()), outcomes
|
||||
assert all(c == 0 for c in fresh_db.values()), fresh_db
|
||||
|
||||
|
||||
def test_tick_fires_due_auto_pipelines(fresh_db):
|
||||
# Force "fetch" to be due (last fired far in the past) and re-tick.
|
||||
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
for p in pulse.state():
|
||||
_set_last_fired(p.name, past, p.cadence_seconds)
|
||||
results = pulse.tick()
|
||||
by_name = {name: (outcome, result) for name, outcome, result in results}
|
||||
# auto-execute + auto-propose modes should fire; manual should skip.
|
||||
assert by_name["fetch"][0] == "ok"
|
||||
assert by_name["classify"][0] == "ok"
|
||||
assert by_name["prove"][0] == "ok"
|
||||
assert by_name["reindex"][0] == "ok"
|
||||
assert by_name["respond"][0] == "ok"
|
||||
# Manual + disabled pipelines stay skipped.
|
||||
assert by_name["peer-pull"][0] == "skipped"
|
||||
assert by_name["vouch-refresh"][0] == "skipped"
|
||||
|
||||
|
||||
def test_tick_respects_manual_mode(fresh_db):
|
||||
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
_set_last_fired("fetch", past, 900)
|
||||
pulse.set_mode("fetch", pulse.PulseMode.MANUAL)
|
||||
results = pulse.tick()
|
||||
by_name = {n: o for n, o, _ in results}
|
||||
assert by_name["fetch"] == "skipped"
|
||||
assert fresh_db["fetch"] == 0
|
||||
|
||||
|
||||
def test_tick_respects_disabled(fresh_db):
|
||||
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
_set_last_fired("fetch", past, 900)
|
||||
pulse.set_enabled("fetch", False)
|
||||
results = pulse.tick()
|
||||
by_name = {n: o for n, o, _ in results}
|
||||
assert by_name["fetch"] == "skipped"
|
||||
assert fresh_db["fetch"] == 0
|
||||
|
||||
|
||||
def test_kill_switch_halts_everything(fresh_db):
|
||||
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
for p in pulse.state():
|
||||
_set_last_fired(p.name, past, p.cadence_seconds)
|
||||
pulse.set_kill_switch(True)
|
||||
results = pulse.tick()
|
||||
assert all(o == "skipped" for _, o, _ in results)
|
||||
assert all(c == 0 for c in fresh_db.values())
|
||||
# And killswitch state itself round-trips.
|
||||
assert pulse.kill_switch_state() is True
|
||||
pulse.set_kill_switch(False)
|
||||
assert pulse.kill_switch_state() is False
|
||||
|
||||
|
||||
def test_tick_updates_last_fired_and_next_fire(fresh_db):
|
||||
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
_set_last_fired("fetch", past, 900)
|
||||
before = datetime.now(timezone.utc)
|
||||
pulse.tick()
|
||||
p = {x.name: x for x in pulse.state()}["fetch"]
|
||||
assert p.last_fired is not None
|
||||
assert p.last_fired >= before - timedelta(seconds=2)
|
||||
assert p.next_fire is not None
|
||||
assert p.next_fire > p.last_fired
|
||||
delta = (p.next_fire - p.last_fired).total_seconds()
|
||||
# 900s cadence, allow a 2s rounding window.
|
||||
assert 898 <= delta <= 902
|
||||
assert p.last_outcome == "ok"
|
||||
assert "fetch: tick" in p.last_result
|
||||
|
||||
|
||||
def test_run_now_bypasses_cadence(fresh_db):
|
||||
# Pipeline isn't due — run_now must still fire.
|
||||
future = datetime.now(timezone.utc) + timedelta(hours=1)
|
||||
for p in pulse.state():
|
||||
p.last_fired = datetime.now(timezone.utc)
|
||||
p.next_fire = future
|
||||
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
|
||||
outcome, result = pulse.run_now("fetch")
|
||||
assert outcome == "ok"
|
||||
assert "fetch: tick" in result
|
||||
assert fresh_db["fetch"] == 1
|
||||
|
||||
|
||||
def test_run_now_respects_kill_switch(fresh_db):
|
||||
pulse.set_kill_switch(True)
|
||||
outcome, result = pulse.run_now("fetch")
|
||||
assert outcome == "skipped"
|
||||
assert "kill switch" in result
|
||||
assert fresh_db["fetch"] == 0
|
||||
|
||||
|
||||
def test_run_now_even_when_manual(fresh_db):
|
||||
"""Manual mode blocks tick() but NOT run_now — that's the whole point of manual."""
|
||||
pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL)
|
||||
pulse.set_enabled("peer-pull", True)
|
||||
outcome, _ = pulse.run_now("peer-pull")
|
||||
assert outcome == "ok"
|
||||
assert fresh_db["peer-pull"] == 1
|
||||
|
||||
|
||||
def test_runner_exception_recorded_as_err(fresh_db, monkeypatch):
|
||||
def boom() -> str:
|
||||
raise RuntimeError("nope")
|
||||
# Inject a failing runner just for "fetch".
|
||||
bad_registry = dict(pulse._REGISTRY)
|
||||
bad_registry["fetch"] = boom
|
||||
monkeypatch.setattr(pulse, "_REGISTRY", bad_registry)
|
||||
|
||||
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
_set_last_fired("fetch", past, 900)
|
||||
results = pulse.tick()
|
||||
by_name = {n: (o, r) for n, o, r in results}
|
||||
assert by_name["fetch"][0] == "err"
|
||||
assert "nope" in by_name["fetch"][1]
|
||||
# And the failure is persisted, with next_fire still pushed forward so we
|
||||
# don't busy-retry a broken runner every tick.
|
||||
p = {x.name: x for x in pulse.state()}["fetch"]
|
||||
assert p.last_outcome == "err"
|
||||
assert p.next_fire is not None and p.next_fire > p.last_fired
|
||||
|
||||
|
||||
def test_unknown_pipeline_raises(fresh_db):
|
||||
with pytest.raises(ValueError):
|
||||
pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL)
|
||||
with pytest.raises(ValueError):
|
||||
pulse.set_cadence("does-not-exist", 60)
|
||||
with pytest.raises(ValueError):
|
||||
pulse.run_now("does-not-exist")
|
||||
|
||||
|
||||
def test_invalid_cadence_raises(fresh_db):
|
||||
with pytest.raises(ValueError):
|
||||
pulse.set_cadence("fetch", 0)
|
||||
with pytest.raises(ValueError):
|
||||
pulse.set_cadence("fetch", -5)
|
||||
Reference in New Issue
Block a user