From 4a9f6ceb7f10343a8cc9fe9602833d78018ec85a Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sat, 6 Jun 2026 21:09:25 +0200 Subject: [PATCH] stage-vouch-a federation: vouches table + DB helpers Add `vouches` (voucher_fp, target_fp, issued_at, expires_at, signature) with unique (voucher, target) and target index, plus `translog` for the append-only signed merkle chain (id, prev_hash, entry_type, entry_data, timestamp, entry_hash). Also surface `setting_get` / `setting_set` helpers on pulse_settings so the quorum config has a place to live. Co-Authored-By: Claude Opus 4.7 --- src/psyc/db.py | 139 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/src/psyc/db.py b/src/psyc/db.py index 5d19acb..72f1c16 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -161,6 +161,35 @@ Index("federation_signals_hash_idx", federation_signals.c.signal_hash) Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint) Index("federation_signals_received_idx", federation_signals.c.received_at.desc()) +# Web-of-trust vouches — voucher signs an attestation that target is OK to listen to. +# Quorum is reached when enough distinct trusted vouchers sign for the same target. +vouches = Table( + "vouches", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("voucher_fingerprint", String, nullable=False), + Column("target_fingerprint", String, nullable=False), + Column("issued_at", String, nullable=False), + Column("expires_at", String, nullable=True), + Column("signature", Text, nullable=False), # base64 ed25519 sig +) +Index("vouches_unique_idx", vouches.c.voucher_fingerprint, vouches.c.target_fingerprint, unique=True) +Index("vouches_target_idx", vouches.c.target_fingerprint) + +# Transparency log — append-only signed hash chain over every signal we receive. +# Each entry references the previous entry's hash; tampering with any row breaks +# verify_chain on every subsequent row. +translog = Table( + "translog", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("prev_hash", String, nullable=False), + Column("entry_type", String, nullable=False), # signal | vouch | config + Column("entry_data", Text, nullable=False), # canonical JSON of payload + Column("timestamp", String, nullable=False), + Column("entry_hash", String, nullable=False), +) +Index("translog_hash_idx", translog.c.entry_hash) +Index("translog_time_idx", translog.c.timestamp.desc()) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -369,3 +398,113 @@ def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]: stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) with engine(db_path).connect() as conn: return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +# ---------- federation: pulse_settings get/set (shared scratch kv) ------- + +def setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]: + """Read one pulse_settings value by key. Returns None if absent.""" + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return None if row is None else str(row.value) + + +def setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: + """Upsert one pulse_settings entry.""" + stmt = sqlite_insert(pulse_settings).values(key=key, value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +# ---------- federation: vouches ------------------------------------------ + +def upsert_vouch(row: dict, db_path: Path = DB_PATH) -> None: + """Insert-or-update one vouch. Unique on (voucher_fp, target_fp).""" + stmt = sqlite_insert(vouches).values(**row) + update_cols = {k: stmt.excluded[k] for k in row if k not in ("voucher_fingerprint", "target_fingerprint")} + stmt = stmt.on_conflict_do_update( + index_elements=[vouches.c.voucher_fingerprint, vouches.c.target_fingerprint], + set_=update_cols, + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def list_vouches(db_path: Path = DB_PATH) -> List[dict]: + stmt = select(vouches).order_by(vouches.c.issued_at.desc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def vouches_by_target(target_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(vouches).where(vouches.c.target_fingerprint == target_fingerprint) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def vouches_by_voucher(voucher_fingerprint: str, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(vouches).where(vouches.c.voucher_fingerprint == voucher_fingerprint) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def delete_vouch(voucher_fingerprint: str, target_fingerprint: str, db_path: Path = DB_PATH) -> None: + stmt = vouches.delete().where( + (vouches.c.voucher_fingerprint == voucher_fingerprint) + & (vouches.c.target_fingerprint == target_fingerprint) + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +# ---------- transparency log --------------------------------------------- + +def translog_append(row: dict, db_path: Path = DB_PATH) -> int: + """Append one transparency-log entry. Returns inserted id.""" + stmt = insert(translog).values(**row) + with engine(db_path).begin() as conn: + res = conn.execute(stmt) + return int(res.inserted_primary_key[0]) + + +def translog_head(db_path: Path = DB_PATH) -> Optional[dict]: + """Highest-id (latest) entry, or None if chain empty.""" + stmt = select(translog).order_by(translog.c.id.desc()).limit(1) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return dict(row._mapping) if row else None + + +def translog_get(entry_id: int, db_path: Path = DB_PATH) -> Optional[dict]: + stmt = select(translog).where(translog.c.id == entry_id) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return dict(row._mapping) if row else None + + +def translog_after(entry_id: int, db_path: Path = DB_PATH) -> List[dict]: + """All entries with id > entry_id, oldest first — for sync.""" + stmt = select(translog).where(translog.c.id > entry_id).order_by(translog.c.id.asc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def translog_recent(limit: int = 100, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(translog).order_by(translog.c.id.desc()).limit(limit) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def translog_range(start: int = 0, end: Optional[int] = None, db_path: Path = DB_PATH) -> List[dict]: + """All entries with start <= id (and id <= end if given), oldest first.""" + cond = translog.c.id >= start + if end is not None: + cond = cond & (translog.c.id <= end) + stmt = select(translog).where(cond).order_by(translog.c.id.asc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]