stage-vouch-a federation: vouches table + DB helpers

Add `vouches` (voucher_fp, target_fp, issued_at, expires_at, signature)
with unique (voucher, target) and target index, plus `translog` for the
append-only signed merkle chain (id, prev_hash, entry_type, entry_data,
timestamp, entry_hash). Also surface `setting_get` / `setting_set`
helpers on pulse_settings so the quorum config has a place to live.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-06-06 21:09:25 +02:00
parent 1675a2326e
commit 4a9f6ceb7f

View File

@@ -161,6 +161,35 @@ Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
# Web-of-trust vouches — voucher signs an attestation that target is OK to listen to.
# Quorum is reached when enough distinct trusted vouchers sign for the same target.
vouches = Table(
"vouches", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("voucher_fingerprint", String, nullable=False),
Column("target_fingerprint", String, nullable=False),
Column("issued_at", String, nullable=False),
Column("expires_at", String, nullable=True),
Column("signature", Text, nullable=False), # base64 ed25519 sig
)
Index("vouches_unique_idx", vouches.c.voucher_fingerprint, vouches.c.target_fingerprint, unique=True)
Index("vouches_target_idx", vouches.c.target_fingerprint)
# Transparency log — append-only signed hash chain over every signal we receive.
# Each entry references the previous entry's hash; tampering with any row breaks
# verify_chain on every subsequent row.
translog = Table(
"translog", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("prev_hash", String, nullable=False),
Column("entry_type", String, nullable=False), # signal | vouch | config
Column("entry_data", Text, nullable=False), # canonical JSON of payload
Column("timestamp", String, nullable=False),
Column("entry_hash", String, nullable=False),
)
Index("translog_hash_idx", translog.c.entry_hash)
Index("translog_time_idx", translog.c.timestamp.desc())
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -369,3 +398,113 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
# ---------- federation: pulse_settings get/set (shared scratch kv) -------
def setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
"""Read one pulse_settings value by key. Returns None if absent."""
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return None if row is None else str(row.value)
def setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
"""Upsert one pulse_settings entry."""
stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- federation: vouches ------------------------------------------
def upsert_vouch(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert-or-update one vouch. Unique on (voucher_fp, target_fp)."""
stmt = sqlite_insert(vouches).values(**row)
update_cols = {k: stmt.excluded[k] for k in row if k not in ("voucher_fingerprint", "target_fingerprint")}
stmt = stmt.on_conflict_do_update(
index_elements=[vouches.c.voucher_fingerprint, vouches.c.target_fingerprint],
set_=update_cols,
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def list_vouches(db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).order_by(vouches.c.issued_at.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def vouches_by_target(target_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).where(vouches.c.target_fingerprint == target_fingerprint)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def vouches_by_voucher(voucher_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(vouches).where(vouches.c.voucher_fingerprint == voucher_fingerprint)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def delete_vouch(voucher_fingerprint: str, target_fingerprint: str, db_path: Path = DB_PATH) -> None:
stmt = vouches.delete().where(
(vouches.c.voucher_fingerprint == voucher_fingerprint)
& (vouches.c.target_fingerprint == target_fingerprint)
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- transparency log ---------------------------------------------
def translog_append(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one transparency-log entry. Returns inserted id."""
stmt = insert(translog).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def translog_head(db_path: Path = DB_PATH) -> Optional[dict]:
"""Highest-id (latest) entry, or None if chain empty."""
stmt = select(translog).order_by(translog.c.id.desc()).limit(1)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def translog_get(entry_id: int, db_path: Path = DB_PATH) -> Optional[dict]:
stmt = select(translog).where(translog.c.id == entry_id)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def translog_after(entry_id: int, db_path: Path = DB_PATH) -> List[dict]:
"""All entries with id > entry_id, oldest first — for sync."""
stmt = select(translog).where(translog.c.id > entry_id).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def translog_recent(limit: int = 100, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(translog).order_by(translog.c.id.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB_PATH) -> List[dict]:
"""All entries with start <= id (and id <= end if given), oldest first."""
cond = translog.c.id >= start
if end is not None:
cond = cond & (translog.c.id <= end)
stmt = select(translog).where(cond).order_by(translog.c.id.asc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]