diff --git a/src/psyc/db.py b/src/psyc/db.py index 1b12436..0f7506d 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -114,6 +114,33 @@ Index("iocs_value_idx", iocs.c.value) Index("iocs_type_idx", iocs.c.ioc_type) Index("iocs_case_idx", iocs.c.case_id) +peers = Table( + "peers", _metadata, + Column("domain", String, primary_key=True), + Column("fingerprint", String, nullable=False), + Column("pubkey_pem", Text, nullable=False), + Column("status", String, nullable=False), # unknown | trusted | blocked + Column("discovered_at", String, nullable=False), + Column("last_seen", String, nullable=True), + Column("notes", Text, nullable=True), +) +Index("peers_fp_idx", peers.c.fingerprint) +Index("peers_status_idx", peers.c.status) + +federation_signals = Table( + "federation_signals", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("peer_fingerprint", String, nullable=False), + Column("signal_type", String, nullable=False), # case | ioc + Column("signal_id", String, nullable=False), # case_id or ioc value + Column("signal_hash", String, nullable=False), # sha256 of canonical record + Column("received_at", String, nullable=False), + Column("raw_json", Text, nullable=False), +) +Index("federation_signals_hash_idx", federation_signals.c.signal_hash) +Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint) +Index("federation_signals_received_idx", federation_signals.c.received_at.desc()) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -214,3 +241,61 @@ def ioc_count(db_path: Path = DB_PATH) -> int: stmt = select(func.count()).select_from(iocs) with engine(db_path).connect() as conn: return conn.execute(stmt).scalar_one() + + +# ---------- federation: peers + signal buffer ---------------------------- + +def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: + """Insert-or-update a peer by domain. `row` must include `domain`.""" + stmt = sqlite_insert(peers).values(**row) + update_cols = {k: stmt.excluded[k] for k in row if k != "domain"} + stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def list_peers(db_path: Path = DB_PATH) -> List[dict]: + stmt = select(peers).order_by(peers.c.discovered_at.desc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]: + stmt = select(peers).where(peers.c.domain == domain) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return dict(row._mapping) if row else None + + +def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None: + from sqlalchemy import update as sa_update + stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def remove_peer(domain: str, db_path: Path = DB_PATH) -> None: + stmt = peers.delete().where(peers.c.domain == domain) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + +def record_signal(row: dict, db_path: Path = DB_PATH) -> int: + """Append one federation signal. Returns the inserted row id.""" + stmt = insert(federation_signals).values(**row) + with engine(db_path).begin() as conn: + res = conn.execute(stmt) + return int(res.inserted_primary_key[0]) + + +def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]: + """All recorded signals matching `signal_hash` — quorum-lookup primitive.""" + stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]: + stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]