diff --git a/src/psyc/db.py b/src/psyc/db.py index 1b12436..273154c 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -114,6 +114,26 @@ Index("iocs_value_idx", iocs.c.value) Index("iocs_type_idx", iocs.c.ioc_type) Index("iocs_case_idx", iocs.c.case_id) +pulse_pipelines = Table( + "pulse_pipelines", _metadata, + Column("name", String, primary_key=True), + Column("title", String, nullable=False), + Column("description", Text, nullable=False, default=""), + Column("mode", String, nullable=False), # manual | auto-propose | auto-execute + Column("cadence_seconds", Integer, nullable=False), + Column("enabled", Boolean, nullable=False, default=True), + Column("last_fired", String, nullable=True), # ISO timestamp or NULL + Column("next_fire", String, nullable=True), # ISO timestamp or NULL + Column("last_result", Text, nullable=False, default=""), + Column("last_outcome", String, nullable=False, default=""), # ok | err | skipped | "" +) + +pulse_settings = Table( + "pulse_settings", _metadata, + Column("key", String, primary_key=True), + Column("value", String, nullable=False), +) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -214,3 +234,53 @@ def ioc_count(db_path: Path = DB_PATH) -> int: stmt = select(func.count()).select_from(iocs) with engine(db_path).connect() as conn: return conn.execute(stmt).scalar_one() + + +# ---------- pulse scheduler ---------------------------------------------- + +def get_pulse_state(db_path: Path = DB_PATH) -> List[dict]: + """Every registered pipeline, ordered by name.""" + stmt = select(pulse_pipelines).order_by(pulse_pipelines.c.name) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def upsert_pulse_pipeline(row: dict, db_path: Path = DB_PATH) -> None: + """Insert or update one pipeline by name.""" + stmt = sqlite_insert(pulse_pipelines).values(**row) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_pipelines.c.name], + set_=dict( + title=stmt.excluded.title, + description=stmt.excluded.description, + mode=stmt.excluded.mode, + cadence_seconds=stmt.excluded.cadence_seconds, + enabled=stmt.excluded.enabled, + last_fired=stmt.excluded.last_fired, + next_fire=stmt.excluded.next_fire, + last_result=stmt.excluded.last_result, + last_outcome=stmt.excluded.last_outcome, + ), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def kill_switch_get(db_path: Path = DB_PATH) -> bool: + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == "kill_switch") + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + if row is None: + return False + return str(row.value) == "1" + + +def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None: + value = "1" if armed else "0" + stmt = sqlite_insert(pulse_settings).values(key="kill_switch", value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt)