stage-disc-c discovery: pulse pipeline wiring + seeds settings
This commit is contained in:
@@ -313,6 +313,25 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
|
|||||||
conn.execute(stmt)
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
def pulse_setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]:
|
||||||
|
"""Fetch one row from pulse_settings by key; returns None if absent."""
|
||||||
|
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key)
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
row = conn.execute(stmt).fetchone()
|
||||||
|
return str(row.value) if row else None
|
||||||
|
|
||||||
|
|
||||||
|
def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None:
|
||||||
|
"""Upsert one (key, value) into pulse_settings."""
|
||||||
|
stmt = sqlite_insert(pulse_settings).values(key=key, value=value)
|
||||||
|
stmt = stmt.on_conflict_do_update(
|
||||||
|
index_elements=[pulse_settings.c.key],
|
||||||
|
set_=dict(value=stmt.excluded.value),
|
||||||
|
)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
# ---------- federation: peers + signal buffer ----------------------------
|
# ---------- federation: peers + signal buffer ----------------------------
|
||||||
|
|
||||||
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
|
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
|
||||||
|
|||||||
@@ -134,8 +134,48 @@ def _run_respond_propose() -> str:
|
|||||||
return f"proposed {proposed} action(s) for {touched} case(s)"
|
return f"proposed {proposed} action(s) for {touched} case(s)"
|
||||||
|
|
||||||
|
|
||||||
|
_DISCOVERY_SEEDS_KEY = "discovery_seeds"
|
||||||
|
|
||||||
|
|
||||||
|
def get_discovery_seeds() -> List[str]:
|
||||||
|
"""Operator-curated seed list for the discovery walker. Newline-separated in DB."""
|
||||||
|
raw = db.pulse_setting_get(_DISCOVERY_SEEDS_KEY)
|
||||||
|
if not raw:
|
||||||
|
return []
|
||||||
|
return [line.strip() for line in raw.splitlines() if line.strip()]
|
||||||
|
|
||||||
|
|
||||||
|
def set_discovery_seeds(seeds: List[str]) -> None:
|
||||||
|
"""Replace the seed list. Strips blanks + dedupes preserving order."""
|
||||||
|
seen: set = set()
|
||||||
|
cleaned: List[str] = []
|
||||||
|
for s in seeds:
|
||||||
|
v = (s or "").strip()
|
||||||
|
if not v or v in seen:
|
||||||
|
continue
|
||||||
|
seen.add(v)
|
||||||
|
cleaned.append(v)
|
||||||
|
db.pulse_setting_set(_DISCOVERY_SEEDS_KEY, "\n".join(cleaned))
|
||||||
|
|
||||||
|
|
||||||
def _run_peer_pull() -> str:
|
def _run_peer_pull() -> str:
|
||||||
return "federation not yet active"
|
"""Walk DNS-SD + recurse over peer-public lists from the operator's seeds.
|
||||||
|
|
||||||
|
Records every fresh candidate into the `peers` table with status=unknown.
|
||||||
|
Vouching (sibling stage) is what eventually promotes them.
|
||||||
|
"""
|
||||||
|
from psyc.lines import discovery
|
||||||
|
|
||||||
|
seeds = get_discovery_seeds()
|
||||||
|
if not seeds:
|
||||||
|
return "no seeds configured"
|
||||||
|
candidates = discovery.walk(seeds)
|
||||||
|
for c in candidates:
|
||||||
|
try:
|
||||||
|
discovery.record_candidate(c)
|
||||||
|
except Exception as exc: # noqa: BLE001 — one bad write must not abort the batch
|
||||||
|
_log.warning("pulse.peer_pull.record.error", domain=c.domain, error=str(exc))
|
||||||
|
return f"discovered {len(candidates)} candidate(s) from {len(seeds)} seed(s)"
|
||||||
|
|
||||||
|
|
||||||
def _run_vouch_refresh() -> str:
|
def _run_vouch_refresh() -> str:
|
||||||
|
|||||||
Reference in New Issue
Block a user