diff --git a/src/psyc/db.py b/src/psyc/db.py index 5d19acb..cd0a6e5 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -313,6 +313,25 @@ def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None: conn.execute(stmt) +def pulse_setting_get(key: str, db_path: Path = DB_PATH) -> Optional[str]: + """Fetch one row from pulse_settings by key; returns None if absent.""" + stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == key) + with engine(db_path).connect() as conn: + row = conn.execute(stmt).fetchone() + return str(row.value) if row else None + + +def pulse_setting_set(key: str, value: str, db_path: Path = DB_PATH) -> None: + """Upsert one (key, value) into pulse_settings.""" + stmt = sqlite_insert(pulse_settings).values(key=key, value=value) + stmt = stmt.on_conflict_do_update( + index_elements=[pulse_settings.c.key], + set_=dict(value=stmt.excluded.value), + ) + with engine(db_path).begin() as conn: + conn.execute(stmt) + + # ---------- federation: peers + signal buffer ---------------------------- def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None: diff --git a/src/psyc/lines/pulse.py b/src/psyc/lines/pulse.py index 10473ba..20944a8 100644 --- a/src/psyc/lines/pulse.py +++ b/src/psyc/lines/pulse.py @@ -134,8 +134,48 @@ def _run_respond_propose() -> str: return f"proposed {proposed} action(s) for {touched} case(s)" +_DISCOVERY_SEEDS_KEY = "discovery_seeds" + + +def get_discovery_seeds() -> List[str]: + """Operator-curated seed list for the discovery walker. Newline-separated in DB.""" + raw = db.pulse_setting_get(_DISCOVERY_SEEDS_KEY) + if not raw: + return [] + return [line.strip() for line in raw.splitlines() if line.strip()] + + +def set_discovery_seeds(seeds: List[str]) -> None: + """Replace the seed list. Strips blanks + dedupes preserving order.""" + seen: set = set() + cleaned: List[str] = [] + for s in seeds: + v = (s or "").strip() + if not v or v in seen: + continue + seen.add(v) + cleaned.append(v) + db.pulse_setting_set(_DISCOVERY_SEEDS_KEY, "\n".join(cleaned)) + + def _run_peer_pull() -> str: - return "federation not yet active" + """Walk DNS-SD + recurse over peer-public lists from the operator's seeds. + + Records every fresh candidate into the `peers` table with status=unknown. + Vouching (sibling stage) is what eventually promotes them. + """ + from psyc.lines import discovery + + seeds = get_discovery_seeds() + if not seeds: + return "no seeds configured" + candidates = discovery.walk(seeds) + for c in candidates: + try: + discovery.record_candidate(c) + except Exception as exc: # noqa: BLE001 — one bad write must not abort the batch + _log.warning("pulse.peer_pull.record.error", domain=c.domain, error=str(exc)) + return f"discovered {len(candidates)} candidate(s) from {len(seeds)} seed(s)" def _run_vouch_refresh() -> str: