diff --git a/src/psyc/lines/translog.py b/src/psyc/lines/translog.py new file mode 100644 index 0000000..b4a1859 --- /dev/null +++ b/src/psyc/lines/translog.py @@ -0,0 +1,161 @@ +"""Transparency log — append-only signed merkle chain over federation signals. + +Every signal we receive from a peer (case, IOC, or accepted vouch) is appended +as one `LogEntry`. Each entry's `entry_hash = sha256(canonical(prev_hash + +entry_type + entry_data + timestamp))` references the previous head, so any +tampering with a historical row invalidates every subsequent hash. The chain +is public — auditors can re-fetch it and re-run `verify_chain` to detect a +node that quietly mutated history (e.g. to hide a bad signal it accepted). + +Hash format: lowercase hex SHA-256 of the canonical JSON of +``{"prev_hash": "...", "entry_type": "...", "entry_data": {...}, "timestamp": "..."}``. +Genesis entries use ``prev_hash = "0" * 64``. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from psyc import db, log +from psyc.result import Err, Ok, Result + + +_log = log.get(__name__) + +GENESIS_PREV_HASH = "0" * 64 + + +class LogEntry(BaseModel): + id: int + prev_hash: str + entry_type: str + entry_data: Dict[str, Any] = Field(default_factory=dict) + timestamp: str + entry_hash: str + + +def _canonical_json(obj: Dict[str, Any]) -> bytes: + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def compute_entry_hash(prev_hash: str, entry_type: str, entry_data: Dict[str, Any], timestamp: str) -> str: + """Hex SHA-256 of canonical(prev_hash + entry_type + entry_data + timestamp).""" + payload: Dict[str, Any] = { + "prev_hash": prev_hash, + "entry_type": entry_type, + "entry_data": entry_data, + "timestamp": timestamp, + } + return hashlib.sha256(_canonical_json(payload)).hexdigest() + + +def _row_to_entry(row: Dict[str, Any]) -> LogEntry: + raw = row.get("entry_data") or "{}" + try: + data = json.loads(raw) + except Exception: + data = {} + return LogEntry( + id=int(row["id"]), + prev_hash=str(row["prev_hash"]), + entry_type=str(row["entry_type"]), + entry_data=data if isinstance(data, dict) else {}, + timestamp=str(row["timestamp"]), + entry_hash=str(row["entry_hash"]), + ) + + +def head(db_path: Path = db.DB_PATH) -> Optional[LogEntry]: + """Latest log entry, or None if the chain is empty.""" + row = db.translog_head(db_path=db_path) + return _row_to_entry(row) if row else None + + +def append(entry_type: str, entry_data: Dict[str, Any], db_path: Path = db.DB_PATH) -> LogEntry: + """Atomically append one entry to the chain. Returns the persisted entry.""" + prev = db.translog_head(db_path=db_path) + prev_hash = str(prev["entry_hash"]) if prev else GENESIS_PREV_HASH + timestamp = datetime.now(timezone.utc).isoformat() + entry_hash = compute_entry_hash(prev_hash, entry_type, entry_data, timestamp) + new_id = db.translog_append( + dict( + prev_hash=prev_hash, + entry_type=entry_type, + entry_data=json.dumps(entry_data, sort_keys=True), + timestamp=timestamp, + entry_hash=entry_hash, + ), + db_path=db_path, + ) + _log.info("translog.append", id=new_id, entry_type=entry_type, hash=entry_hash[:12]) + return LogEntry( + id=new_id, + prev_hash=prev_hash, + entry_type=entry_type, + entry_data=entry_data, + timestamp=timestamp, + entry_hash=entry_hash, + ) + + +def verify_chain(start: int = 0, end: Optional[int] = None, db_path: Path = db.DB_PATH) -> Result[int, str]: + """Walk entries [start, end] in id order, recompute each hash, compare. + + Returns Ok(n_verified) when every entry's recomputed hash equals the + stored one and each prev_hash matches the previous entry's stored hash. + Returns Err with the offending id + expected/got hashes otherwise. + """ + rows = db.translog_range(start=start, end=end, db_path=db_path) + if not rows: + return Ok(0) + # Establish the prior hash anchor — either genesis (if walking from id=1) + # or the entry just before `start`. + first_id = int(rows[0]["id"]) + if first_id <= 1: + prior_hash = GENESIS_PREV_HASH + else: + anchor = db.translog_get(first_id - 1, db_path=db_path) + if anchor is None: + return Err(f"missing anchor entry id={first_id - 1}") + prior_hash = str(anchor["entry_hash"]) + + verified = 0 + for row in rows: + stored_prev = str(row["prev_hash"]) + if stored_prev != prior_hash: + return Err( + f"broken at id={row['id']} expected_prev={prior_hash} got_prev={stored_prev}" + ) + try: + data = json.loads(row.get("entry_data") or "{}") + except Exception: + return Err(f"broken at id={row['id']} entry_data not JSON") + if not isinstance(data, dict): + return Err(f"broken at id={row['id']} entry_data not an object") + recomputed = compute_entry_hash( + stored_prev, str(row["entry_type"]), data, str(row["timestamp"]) + ) + stored_hash = str(row["entry_hash"]) + if recomputed != stored_hash: + return Err( + f"broken at id={row['id']} expected={recomputed} got={stored_hash}" + ) + prior_hash = stored_hash + verified += 1 + return Ok(verified) + + +def recent(limit: int = 100, db_path: Path = db.DB_PATH) -> List[LogEntry]: + """The latest `limit` entries, newest first.""" + return [_row_to_entry(r) for r in db.translog_recent(limit=limit, db_path=db_path)] + + +def entries_after(entry_id: int, db_path: Path = db.DB_PATH) -> List[LogEntry]: + """All entries with id > entry_id, oldest first — for peer sync.""" + return [_row_to_entry(r) for r in db.translog_after(entry_id, db_path=db_path)] diff --git a/tests/test_translog.py b/tests/test_translog.py new file mode 100644 index 0000000..da50cb0 --- /dev/null +++ b/tests/test_translog.py @@ -0,0 +1,118 @@ +"""Transparency log — append, verify, tamper detection, sync slices.""" + +from __future__ import annotations + +import json + +import pytest +from sqlalchemy import create_engine, update + +from psyc import db +from psyc.lines import translog +from psyc.lines.translog import GENESIS_PREV_HASH +from psyc.result import Err, Ok + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +def test_first_append_uses_genesis_prev_hash(fresh_db): + e = translog.append("signal", {"x": 1}) + assert e.prev_hash == GENESIS_PREV_HASH + assert e.id >= 1 + assert e.entry_type == "signal" + assert e.entry_data == {"x": 1} + # head matches + h = translog.head() + assert h is not None + assert h.id == e.id + assert h.entry_hash == e.entry_hash + + +def test_append_chains_prev_hash(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + e3 = translog.append("vouch", {"c": 3}) + assert e2.prev_hash == e1.entry_hash + assert e3.prev_hash == e2.entry_hash + head = translog.head() + assert head is not None + assert head.entry_hash == e3.entry_hash + + +def test_verify_chain_ok_round_trip(fresh_db): + translog.append("signal", {"a": 1}) + translog.append("signal", {"b": 2}) + translog.append("vouch", {"c": 3}) + result = translog.verify_chain() + assert isinstance(result, Ok) + assert result.value == 3 + + +def test_verify_chain_empty_returns_zero(fresh_db): + result = translog.verify_chain() + assert isinstance(result, Ok) + assert result.value == 0 + + +def test_verify_chain_detects_tampered_data(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + + # Mutate entry_data of the first row directly in the DB; entry_hash stays + # the same but no longer matches the recomputed hash. + with db.engine().begin() as conn: + conn.execute( + update(db.translog) + .where(db.translog.c.id == e1.id) + .values(entry_data=json.dumps({"a": 999}, sort_keys=True)) + ) + + result = translog.verify_chain() + assert isinstance(result, Err) + assert "broken at id=" in result.reason + + +def test_verify_chain_detects_tampered_prev_hash(fresh_db): + translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + # Flip e2.prev_hash so it no longer matches e1.entry_hash. + with db.engine().begin() as conn: + conn.execute( + update(db.translog) + .where(db.translog.c.id == e2.id) + .values(prev_hash="f" * 64) + ) + result = translog.verify_chain() + assert isinstance(result, Err) + assert "broken at id=" in result.reason + + +def test_entries_after_returns_correct_slice(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + e3 = translog.append("signal", {"c": 3}) + + after_zero = translog.entries_after(0) + assert [e.id for e in after_zero] == [e1.id, e2.id, e3.id] + + after_e1 = translog.entries_after(e1.id) + assert [e.id for e in after_e1] == [e2.id, e3.id] + + after_e3 = translog.entries_after(e3.id) + assert after_e3 == [] + + +def test_recent_newest_first(fresh_db): + e1 = translog.append("signal", {"a": 1}) + e2 = translog.append("signal", {"b": 2}) + e3 = translog.append("signal", {"c": 3}) + recent = translog.recent(limit=10) + assert [e.id for e in recent] == [e3.id, e2.id, e1.id]