stage-fed-c federation: db tables for peers + signal buffer
This commit is contained in:
@@ -114,6 +114,33 @@ Index("iocs_value_idx", iocs.c.value)
|
||||
Index("iocs_type_idx", iocs.c.ioc_type)
|
||||
Index("iocs_case_idx", iocs.c.case_id)
|
||||
|
||||
peers = Table(
|
||||
"peers", _metadata,
|
||||
Column("domain", String, primary_key=True),
|
||||
Column("fingerprint", String, nullable=False),
|
||||
Column("pubkey_pem", Text, nullable=False),
|
||||
Column("status", String, nullable=False), # unknown | trusted | blocked
|
||||
Column("discovered_at", String, nullable=False),
|
||||
Column("last_seen", String, nullable=True),
|
||||
Column("notes", Text, nullable=True),
|
||||
)
|
||||
Index("peers_fp_idx", peers.c.fingerprint)
|
||||
Index("peers_status_idx", peers.c.status)
|
||||
|
||||
federation_signals = Table(
|
||||
"federation_signals", _metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||||
Column("peer_fingerprint", String, nullable=False),
|
||||
Column("signal_type", String, nullable=False), # case | ioc
|
||||
Column("signal_id", String, nullable=False), # case_id or ioc value
|
||||
Column("signal_hash", String, nullable=False), # sha256 of canonical record
|
||||
Column("received_at", String, nullable=False),
|
||||
Column("raw_json", Text, nullable=False),
|
||||
)
|
||||
Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
|
||||
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
|
||||
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
|
||||
|
||||
|
||||
_log = log.get(__name__)
|
||||
_engine: Optional[Engine] = None
|
||||
@@ -214,3 +241,61 @@ def ioc_count(db_path: Path = DB_PATH) -> int:
|
||||
stmt = select(func.count()).select_from(iocs)
|
||||
with engine(db_path).connect() as conn:
|
||||
return conn.execute(stmt).scalar_one()
|
||||
|
||||
|
||||
# ---------- federation: peers + signal buffer ----------------------------
|
||||
|
||||
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
|
||||
"""Insert-or-update a peer by domain. `row` must include `domain`."""
|
||||
stmt = sqlite_insert(peers).values(**row)
|
||||
update_cols = {k: stmt.excluded[k] for k in row if k != "domain"}
|
||||
stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols)
|
||||
with engine(db_path).begin() as conn:
|
||||
conn.execute(stmt)
|
||||
|
||||
|
||||
def list_peers(db_path: Path = DB_PATH) -> List[dict]:
|
||||
stmt = select(peers).order_by(peers.c.discovered_at.desc())
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
|
||||
def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]:
|
||||
stmt = select(peers).where(peers.c.domain == domain)
|
||||
with engine(db_path).connect() as conn:
|
||||
row = conn.execute(stmt).fetchone()
|
||||
return dict(row._mapping) if row else None
|
||||
|
||||
|
||||
def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None:
|
||||
from sqlalchemy import update as sa_update
|
||||
stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status)
|
||||
with engine(db_path).begin() as conn:
|
||||
conn.execute(stmt)
|
||||
|
||||
|
||||
def remove_peer(domain: str, db_path: Path = DB_PATH) -> None:
|
||||
stmt = peers.delete().where(peers.c.domain == domain)
|
||||
with engine(db_path).begin() as conn:
|
||||
conn.execute(stmt)
|
||||
|
||||
|
||||
def record_signal(row: dict, db_path: Path = DB_PATH) -> int:
|
||||
"""Append one federation signal. Returns the inserted row id."""
|
||||
stmt = insert(federation_signals).values(**row)
|
||||
with engine(db_path).begin() as conn:
|
||||
res = conn.execute(stmt)
|
||||
return int(res.inserted_primary_key[0])
|
||||
|
||||
|
||||
def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]:
|
||||
"""All recorded signals matching `signal_hash` — quorum-lookup primitive."""
|
||||
stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash)
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
|
||||
def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
|
||||
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
Reference in New Issue
Block a user