stage-33b pulse: db tables + persistence

This commit is contained in:
m17hr1l
2026-06-06 16:03:30 +02:00
parent e710be6ebd
commit 4d67605371

View File

@@ -114,6 +114,26 @@ Index("iocs_value_idx", iocs.c.value)
Index("iocs_type_idx", iocs.c.ioc_type)
Index("iocs_case_idx", iocs.c.case_id)
pulse_pipelines = Table(
"pulse_pipelines", _metadata,
Column("name", String, primary_key=True),
Column("title", String, nullable=False),
Column("description", Text, nullable=False, default=""),
Column("mode", String, nullable=False), # manual | auto-propose | auto-execute
Column("cadence_seconds", Integer, nullable=False),
Column("enabled", Boolean, nullable=False, default=True),
Column("last_fired", String, nullable=True), # ISO timestamp or NULL
Column("next_fire", String, nullable=True), # ISO timestamp or NULL
Column("last_result", Text, nullable=False, default=""),
Column("last_outcome", String, nullable=False, default=""), # ok | err | skipped | ""
)
pulse_settings = Table(
"pulse_settings", _metadata,
Column("key", String, primary_key=True),
Column("value", String, nullable=False),
)
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -214,3 +234,53 @@ def ioc_count(db_path: Path = DB_PATH) -> int:
stmt = select(func.count()).select_from(iocs)
with engine(db_path).connect() as conn:
return conn.execute(stmt).scalar_one()
# ---------- pulse scheduler ----------------------------------------------
def get_pulse_state(db_path: Path = DB_PATH) -> List[dict]:
"""Every registered pipeline, ordered by name."""
stmt = select(pulse_pipelines).order_by(pulse_pipelines.c.name)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def upsert_pulse_pipeline(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert or update one pipeline by name."""
stmt = sqlite_insert(pulse_pipelines).values(**row)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_pipelines.c.name],
set_=dict(
title=stmt.excluded.title,
description=stmt.excluded.description,
mode=stmt.excluded.mode,
cadence_seconds=stmt.excluded.cadence_seconds,
enabled=stmt.excluded.enabled,
last_fired=stmt.excluded.last_fired,
next_fire=stmt.excluded.next_fire,
last_result=stmt.excluded.last_result,
last_outcome=stmt.excluded.last_outcome,
),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def kill_switch_get(db_path: Path = DB_PATH) -> bool:
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == "kill_switch")
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
if row is None:
return False
return str(row.value) == "1"
def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
value = "1" if armed else "0"
stmt = sqlite_insert(pulse_settings).values(key="kill_switch", value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)