diff --git a/tests/test_pulse.py b/tests/test_pulse.py new file mode 100644 index 0000000..b327585 --- /dev/null +++ b/tests/test_pulse.py @@ -0,0 +1,218 @@ +"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import pulse + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + """A temp SQLite + a swapped-in registry of fast, deterministic runners. + + Real runners would talk to scout / classify / proof / etc — none of which + we want to exercise from a pulse-only test. We replace _REGISTRY in-place + so tick() drives the scheduler logic against trivial counters. + """ + test_db = tmp_path / "pulse.db" + test_engine = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(test_engine, checkfirst=True) + monkeypatch.setattr(db, "_engine", test_engine) + monkeypatch.setattr(db, "DB_PATH", test_db) + + counters = {name: 0 for name in pulse._REGISTRY} + + def _make(name: str): + def runner() -> str: + counters[name] += 1 + return f"{name}: tick {counters[name]}" + return runner + + fake_registry = {name: _make(name) for name in pulse._REGISTRY} + monkeypatch.setattr(pulse, "_REGISTRY", fake_registry) + return counters + + +def _set_last_fired(name: str, when: datetime, cadence: int) -> None: + """Force a pipeline's last_fired + next_fire to a known instant.""" + for p in pulse.state(): + if p.name == name: + p.last_fired = when + p.next_fire = when + timedelta(seconds=cadence) + db.upsert_pulse_pipeline(pulse._pipeline_to_row(p)) + return + raise AssertionError(f"unknown pipeline {name}") + + +def test_defaults_seeded_on_first_state(fresh_db): + pipelines = pulse.state() + names = {p.name for p in pipelines} + assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"} + # Spot-check a couple of defaults from the spec. + by_name = {p.name: p for p in pipelines} + assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE + assert by_name["fetch"].cadence_seconds == 900 + assert by_name["peer-pull"].enabled is False + assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE + + +def test_state_persists_across_reads(fresh_db): + pulse.set_cadence("fetch", 42) + pulse.set_mode("classify", pulse.PulseMode.MANUAL) + pulse.set_enabled("reindex", False) + by_name = {p.name: p for p in pulse.state()} + assert by_name["fetch"].cadence_seconds == 42 + assert by_name["classify"].mode == pulse.PulseMode.MANUAL + assert by_name["reindex"].enabled is False + + +def test_tick_skips_not_due_pipelines(fresh_db): + # Push every pipeline well into the future — nothing should fire. + now = datetime.now(timezone.utc) + for p in pulse.state(): + _set_last_fired(p.name, now, p.cadence_seconds) + results = pulse.tick() + outcomes = {name: outcome for name, outcome, _ in results} + assert all(o == "skipped" for o in outcomes.values()), outcomes + assert all(c == 0 for c in fresh_db.values()), fresh_db + + +def test_tick_fires_due_auto_pipelines(fresh_db): + # Force "fetch" to be due (last fired far in the past) and re-tick. + past = datetime.now(timezone.utc) - timedelta(hours=24) + for p in pulse.state(): + _set_last_fired(p.name, past, p.cadence_seconds) + results = pulse.tick() + by_name = {name: (outcome, result) for name, outcome, result in results} + # auto-execute + auto-propose modes should fire; manual should skip. + assert by_name["fetch"][0] == "ok" + assert by_name["classify"][0] == "ok" + assert by_name["prove"][0] == "ok" + assert by_name["reindex"][0] == "ok" + assert by_name["respond"][0] == "ok" + # Manual + disabled pipelines stay skipped. + assert by_name["peer-pull"][0] == "skipped" + assert by_name["vouch-refresh"][0] == "skipped" + + +def test_tick_respects_manual_mode(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + pulse.set_mode("fetch", pulse.PulseMode.MANUAL) + results = pulse.tick() + by_name = {n: o for n, o, _ in results} + assert by_name["fetch"] == "skipped" + assert fresh_db["fetch"] == 0 + + +def test_tick_respects_disabled(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + pulse.set_enabled("fetch", False) + results = pulse.tick() + by_name = {n: o for n, o, _ in results} + assert by_name["fetch"] == "skipped" + assert fresh_db["fetch"] == 0 + + +def test_kill_switch_halts_everything(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + for p in pulse.state(): + _set_last_fired(p.name, past, p.cadence_seconds) + pulse.set_kill_switch(True) + results = pulse.tick() + assert all(o == "skipped" for _, o, _ in results) + assert all(c == 0 for c in fresh_db.values()) + # And killswitch state itself round-trips. + assert pulse.kill_switch_state() is True + pulse.set_kill_switch(False) + assert pulse.kill_switch_state() is False + + +def test_tick_updates_last_fired_and_next_fire(fresh_db): + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + before = datetime.now(timezone.utc) + pulse.tick() + p = {x.name: x for x in pulse.state()}["fetch"] + assert p.last_fired is not None + assert p.last_fired >= before - timedelta(seconds=2) + assert p.next_fire is not None + assert p.next_fire > p.last_fired + delta = (p.next_fire - p.last_fired).total_seconds() + # 900s cadence, allow a 2s rounding window. + assert 898 <= delta <= 902 + assert p.last_outcome == "ok" + assert "fetch: tick" in p.last_result + + +def test_run_now_bypasses_cadence(fresh_db): + # Pipeline isn't due — run_now must still fire. + future = datetime.now(timezone.utc) + timedelta(hours=1) + for p in pulse.state(): + p.last_fired = datetime.now(timezone.utc) + p.next_fire = future + db.upsert_pulse_pipeline(pulse._pipeline_to_row(p)) + outcome, result = pulse.run_now("fetch") + assert outcome == "ok" + assert "fetch: tick" in result + assert fresh_db["fetch"] == 1 + + +def test_run_now_respects_kill_switch(fresh_db): + pulse.set_kill_switch(True) + outcome, result = pulse.run_now("fetch") + assert outcome == "skipped" + assert "kill switch" in result + assert fresh_db["fetch"] == 0 + + +def test_run_now_even_when_manual(fresh_db): + """Manual mode blocks tick() but NOT run_now — that's the whole point of manual.""" + pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL) + pulse.set_enabled("peer-pull", True) + outcome, _ = pulse.run_now("peer-pull") + assert outcome == "ok" + assert fresh_db["peer-pull"] == 1 + + +def test_runner_exception_recorded_as_err(fresh_db, monkeypatch): + def boom() -> str: + raise RuntimeError("nope") + # Inject a failing runner just for "fetch". + bad_registry = dict(pulse._REGISTRY) + bad_registry["fetch"] = boom + monkeypatch.setattr(pulse, "_REGISTRY", bad_registry) + + past = datetime.now(timezone.utc) - timedelta(hours=24) + _set_last_fired("fetch", past, 900) + results = pulse.tick() + by_name = {n: (o, r) for n, o, r in results} + assert by_name["fetch"][0] == "err" + assert "nope" in by_name["fetch"][1] + # And the failure is persisted, with next_fire still pushed forward so we + # don't busy-retry a broken runner every tick. + p = {x.name: x for x in pulse.state()}["fetch"] + assert p.last_outcome == "err" + assert p.next_fire is not None and p.next_fire > p.last_fired + + +def test_unknown_pipeline_raises(fresh_db): + with pytest.raises(ValueError): + pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL) + with pytest.raises(ValueError): + pulse.set_cadence("does-not-exist", 60) + with pytest.raises(ValueError): + pulse.run_now("does-not-exist") + + +def test_invalid_cadence_raises(fresh_db): + with pytest.raises(ValueError): + pulse.set_cadence("fetch", 0) + with pytest.raises(ValueError): + pulse.set_cadence("fetch", -5)