translog.append computes
sha256(canonical({prev_hash, entry_type, entry_data, timestamp})) and
writes one row per call; the first entry uses prev_hash = "0"*64.
verify_chain walks rows in id order, re-hashes each, and returns
Err("broken at id=X expected=... got=...") on the first mismatch — so
tampering with either entry_data or prev_hash invalidates every
downstream row. recent / entries_after / head support peer sync and UI.
Tests cover: genesis prev_hash, chained prev_hash, full-chain verify,
tampered-data detection, tampered-prev_hash detection, slicing.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
119 lines
3.6 KiB
Python
119 lines
3.6 KiB
Python
"""Transparency log — append, verify, tamper detection, sync slices."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine, update
|
|
|
|
from psyc import db
|
|
from psyc.lines import translog
|
|
from psyc.lines.translog import GENESIS_PREV_HASH
|
|
from psyc.result import Err, Ok
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(eng, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", eng)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
def test_first_append_uses_genesis_prev_hash(fresh_db):
|
|
e = translog.append("signal", {"x": 1})
|
|
assert e.prev_hash == GENESIS_PREV_HASH
|
|
assert e.id >= 1
|
|
assert e.entry_type == "signal"
|
|
assert e.entry_data == {"x": 1}
|
|
# head matches
|
|
h = translog.head()
|
|
assert h is not None
|
|
assert h.id == e.id
|
|
assert h.entry_hash == e.entry_hash
|
|
|
|
|
|
def test_append_chains_prev_hash(fresh_db):
|
|
e1 = translog.append("signal", {"a": 1})
|
|
e2 = translog.append("signal", {"b": 2})
|
|
e3 = translog.append("vouch", {"c": 3})
|
|
assert e2.prev_hash == e1.entry_hash
|
|
assert e3.prev_hash == e2.entry_hash
|
|
head = translog.head()
|
|
assert head is not None
|
|
assert head.entry_hash == e3.entry_hash
|
|
|
|
|
|
def test_verify_chain_ok_round_trip(fresh_db):
|
|
translog.append("signal", {"a": 1})
|
|
translog.append("signal", {"b": 2})
|
|
translog.append("vouch", {"c": 3})
|
|
result = translog.verify_chain()
|
|
assert isinstance(result, Ok)
|
|
assert result.value == 3
|
|
|
|
|
|
def test_verify_chain_empty_returns_zero(fresh_db):
|
|
result = translog.verify_chain()
|
|
assert isinstance(result, Ok)
|
|
assert result.value == 0
|
|
|
|
|
|
def test_verify_chain_detects_tampered_data(fresh_db):
|
|
e1 = translog.append("signal", {"a": 1})
|
|
e2 = translog.append("signal", {"b": 2})
|
|
|
|
# Mutate entry_data of the first row directly in the DB; entry_hash stays
|
|
# the same but no longer matches the recomputed hash.
|
|
with db.engine().begin() as conn:
|
|
conn.execute(
|
|
update(db.translog)
|
|
.where(db.translog.c.id == e1.id)
|
|
.values(entry_data=json.dumps({"a": 999}, sort_keys=True))
|
|
)
|
|
|
|
result = translog.verify_chain()
|
|
assert isinstance(result, Err)
|
|
assert "broken at id=" in result.reason
|
|
|
|
|
|
def test_verify_chain_detects_tampered_prev_hash(fresh_db):
|
|
translog.append("signal", {"a": 1})
|
|
e2 = translog.append("signal", {"b": 2})
|
|
# Flip e2.prev_hash so it no longer matches e1.entry_hash.
|
|
with db.engine().begin() as conn:
|
|
conn.execute(
|
|
update(db.translog)
|
|
.where(db.translog.c.id == e2.id)
|
|
.values(prev_hash="f" * 64)
|
|
)
|
|
result = translog.verify_chain()
|
|
assert isinstance(result, Err)
|
|
assert "broken at id=" in result.reason
|
|
|
|
|
|
def test_entries_after_returns_correct_slice(fresh_db):
|
|
e1 = translog.append("signal", {"a": 1})
|
|
e2 = translog.append("signal", {"b": 2})
|
|
e3 = translog.append("signal", {"c": 3})
|
|
|
|
after_zero = translog.entries_after(0)
|
|
assert [e.id for e in after_zero] == [e1.id, e2.id, e3.id]
|
|
|
|
after_e1 = translog.entries_after(e1.id)
|
|
assert [e.id for e in after_e1] == [e2.id, e3.id]
|
|
|
|
after_e3 = translog.entries_after(e3.id)
|
|
assert after_e3 == []
|
|
|
|
|
|
def test_recent_newest_first(fresh_db):
|
|
e1 = translog.append("signal", {"a": 1})
|
|
e2 = translog.append("signal", {"b": 2})
|
|
e3 = translog.append("signal", {"c": 3})
|
|
recent = translog.recent(limit=10)
|
|
assert [e.id for e in recent] == [e3.id, e2.id, e1.id]
|