stage-auto-b pulse: pulse_audit table + history

Add a per-decision audit log so the cockpit + CLI can show what the
auto-response runner did each tick:
- pulse_audit table: id, pipeline, action ('auto-fire'|'skipped'|'error'),
  action_id, case_id, detail, timestamp
- helpers: pulse_audit_record, pulse_audit_recent, pulse_audit_count_since
- indexes on (pipeline, timestamp desc) and on action_id

Also add db.signals_for_case(case_id) — checks the federation_signals
buffer to tell whether a case was peer-sourced. Used by the runner to
decide if a quorum check is required.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-06-06 21:10:38 +02:00
parent 0dbeb056c5
commit 31ec1557ec

View File

@@ -134,6 +134,19 @@ pulse_settings = Table(
Column("value", String, nullable=False),
)
pulse_audit = Table(
"pulse_audit", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("pipeline", String, nullable=False), # 'respond' | 'fetch' | ...
Column("action", String, nullable=False), # 'auto-fire' | 'skipped' | 'error'
Column("action_id", Integer, nullable=True), # response_actions.id when relevant
Column("case_id", String, nullable=True),
Column("detail", Text, nullable=True),
Column("timestamp", String, nullable=False), # ISO
)
Index("pulse_audit_pipeline_idx", pulse_audit.c.pipeline, pulse_audit.c.timestamp.desc())
Index("pulse_audit_action_id_idx", pulse_audit.c.action_id)
peers = Table(
"peers", _metadata,
Column("domain", String, primary_key=True),
@@ -332,6 +345,47 @@ def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
conn.execute(stmt)
# ---------- pulse audit trail --------------------------------------------
def pulse_audit_record(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one pulse_audit row. Returns its id.
`row` must include 'pipeline', 'action', 'timestamp'. action_id, case_id,
detail are optional. Caller controls timestamp so tests can pin it.
"""
stmt = insert(pulse_audit).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def pulse_audit_recent(pipeline: str, limit: int = 25, db_path: Path = DB_PATH) -> List[dict]:
"""Most-recent audit rows for one pipeline (newest first)."""
stmt = (
select(pulse_audit)
.where(pulse_audit.c.pipeline == pipeline)
.order_by(pulse_audit.c.timestamp.desc())
.limit(limit)
)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def pulse_audit_count_since(
pipeline: str, action: str, since_iso: str, db_path: Path = DB_PATH
) -> int:
"""Count audit rows for (pipeline, action) at or after `since_iso`."""
stmt = (
select(func.count())
.select_from(pulse_audit)
.where(pulse_audit.c.pipeline == pipeline)
.where(pulse_audit.c.action == action)
.where(pulse_audit.c.timestamp >= since_iso)
)
with engine(db_path).connect() as conn:
return int(conn.execute(stmt).scalar_one())
# ---------- federation: peers + signal buffer ----------------------------
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
@@ -388,3 +442,18 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def signals_for_case(case_id: str, db_path: Path = DB_PATH) -> List[dict]:
"""All federation signals attached to this case_id (signal_type='case').
Empty list means no peer has ever sent us this case → we generated it
ourselves and it counts as locally-sourced for auto-fire purposes.
"""
stmt = (
select(federation_signals)
.where(federation_signals.c.signal_type == "case")
.where(federation_signals.c.signal_id == case_id)
)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]