diff --git a/src/psyc/db.py b/src/psyc/db.py index 8d58fac..4526182 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -134,6 +134,19 @@ pulse_settings = Table( Column("value", String, nullable=False), ) +pulse_audit = Table( + "pulse_audit", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("pipeline", String, nullable=False), # 'respond' | 'fetch' | ... + Column("action", String, nullable=False), # 'auto-fire' | 'skipped' | 'error' + Column("action_id", Integer, nullable=True), # response_actions.id when relevant + Column("case_id", String, nullable=True), + Column("detail", Text, nullable=True), + Column("timestamp", String, nullable=False), # ISO +) +Index("pulse_audit_pipeline_idx", pulse_audit.c.pipeline, pulse_audit.c.timestamp.desc()) +Index("pulse_audit_action_id_idx", pulse_audit.c.action_id) + peers = Table( "peers", _metadata, Column("domain", String, primary_key=True), @@ -332,6 +345,47 @@ def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: conn.execute(stmt) +# ---------- pulse audit trail -------------------------------------------- + +def pulse_audit_record(row: dict, db_path: Path = DB_PATH) -> int: + """Append one pulse_audit row. Returns its id. + + `row` must include 'pipeline', 'action', 'timestamp'. action_id, case_id, + detail are optional. Caller controls timestamp so tests can pin it. + """ + stmt = insert(pulse_audit).values(**row) + with engine(db_path).begin() as conn: + res = conn.execute(stmt) + return int(res.inserted_primary_key[0]) + + +def pulse_audit_recent(pipeline: str, limit: int = 25, db_path: Path = DB_PATH) -> List[dict]: + """Most-recent audit rows for one pipeline (newest first).""" + stmt = ( + select(pulse_audit) + .where(pulse_audit.c.pipeline == pipeline) + .order_by(pulse_audit.c.timestamp.desc()) + .limit(limit) + ) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def pulse_audit_count_since( + pipeline: str, action: str, since_iso: str, db_path: Path = DB_PATH +) -> int: + """Count audit rows for (pipeline, action) at or after `since_iso`.""" + stmt = ( + select(func.count()) + .select_from(pulse_audit) + .where(pulse_audit.c.pipeline == pipeline) + .where(pulse_audit.c.action == action) + .where(pulse_audit.c.timestamp >= since_iso) + ) + with engine(db_path).connect() as conn: + return int(conn.execute(stmt).scalar_one()) + + # ---------- federation: peers + signal buffer ---------------------------- def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: @@ -388,3 +442,18 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]: stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) with engine(db_path).connect() as conn: return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def signals_for_case(case_id: str, db_path: Path = DB_PATH) -> List[dict]: + """All federation signals attached to this case_id (signal_type='case'). + + Empty list means no peer has ever sent us this case → we generated it + ourselves and it counts as locally-sourced for auto-fire purposes. + """ + stmt = ( + select(federation_signals) + .where(federation_signals.c.signal_type == "case") + .where(federation_signals.c.signal_id == case_id) + ) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]