Compare commits
15 Commits
6356c5535b
...
1675a2326e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1675a2326e | ||
|
|
de5ff09815 | ||
|
|
02ce6d791c | ||
|
|
d4229dd264 | ||
|
|
2ef0448165 | ||
|
|
17b94acf6b | ||
|
|
55ffd9da3d | ||
|
|
63e3ff2777 | ||
|
|
50158f7fa8 | ||
|
|
4c35aad2bb | ||
|
|
a7c59c9faa | ||
|
|
e071f289f2 | ||
|
|
26fbe08b65 | ||
|
|
4d67605371 | ||
|
|
e710be6ebd |
@@ -16,6 +16,7 @@ dependencies = [
|
|||||||
"httpx>=0.27",
|
"httpx>=0.27",
|
||||||
"typer>=0.12",
|
"typer>=0.12",
|
||||||
"pynacl>=1.5",
|
"pynacl>=1.5",
|
||||||
|
"cryptography>=42.0",
|
||||||
"structlog>=24.1",
|
"structlog>=24.1",
|
||||||
"sqlalchemy>=2.0",
|
"sqlalchemy>=2.0",
|
||||||
"python-dotenv>=1.0",
|
"python-dotenv>=1.0",
|
||||||
|
|||||||
135
src/psyc/_federation_cli.py
Normal file
135
src/psyc/_federation_cli.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
"""Federation CLI — keygen, DNS records, feed export, peer registry, verify.
|
||||||
|
|
||||||
|
Registered onto the top-level Typer app from cli.py so the surface stays flat.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import typer
|
||||||
|
|
||||||
|
from psyc import db, log
|
||||||
|
from psyc.lines import federation
|
||||||
|
from psyc.result import Err
|
||||||
|
|
||||||
|
|
||||||
|
_log = log.get(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def register(typer_app: typer.Typer) -> None:
|
||||||
|
"""Mount all `fed-*` commands onto `typer_app`."""
|
||||||
|
|
||||||
|
@typer_app.command("fed-keygen")
|
||||||
|
def fed_keygen() -> None:
|
||||||
|
"""Generate the node's Ed25519 keypair (or load existing). Prints fingerprint."""
|
||||||
|
federation.node_keypair() # creates the files if missing
|
||||||
|
typer.echo(federation.node_fingerprint())
|
||||||
|
|
||||||
|
@typer_app.command("fed-dns")
|
||||||
|
def fed_dns(
|
||||||
|
domain: str = typer.Argument(..., help="public domain to advertise this node on"),
|
||||||
|
port: int = typer.Option(443, "--port", help="port psyc is reachable on"),
|
||||||
|
) -> None:
|
||||||
|
"""Print the DNS SRV + TXT records to publish under `domain`."""
|
||||||
|
rec = federation.dns_record(domain, port=port)
|
||||||
|
typer.echo(rec.human_instructions)
|
||||||
|
|
||||||
|
@typer_app.command("fed-feed")
|
||||||
|
def fed_feed(
|
||||||
|
window_hours: int = typer.Option(24, "--hours", help="lookback window (hours)"),
|
||||||
|
) -> None:
|
||||||
|
"""Build + print the signed feed JSON."""
|
||||||
|
db.init_db()
|
||||||
|
payload = federation.build_signed_feed(window_hours=window_hours)
|
||||||
|
typer.echo(json.dumps(payload, indent=2))
|
||||||
|
|
||||||
|
@typer_app.command("fed-verify")
|
||||||
|
def fed_verify(
|
||||||
|
peer_url: str = typer.Argument(..., help="peer base URL, e.g. https://peer.example"),
|
||||||
|
) -> None:
|
||||||
|
"""Fetch a peer's /federation/{info,key,feed} and verify the signature."""
|
||||||
|
peer_url = peer_url.rstrip("/")
|
||||||
|
try:
|
||||||
|
with httpx.Client(timeout=10.0) as client:
|
||||||
|
info = client.get(f"{peer_url}/federation/info").json()
|
||||||
|
key_text = client.get(f"{peer_url}/federation/key").text
|
||||||
|
feed = client.get(f"{peer_url}/federation/feed").json()
|
||||||
|
except Exception as exc:
|
||||||
|
typer.echo(f"error: fetch failed: {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
# If the peer is already in the registry, prefer the stored pubkey
|
||||||
|
# (TOFU pin); otherwise warn and use the freshly fetched one.
|
||||||
|
declared_fp = info.get("fingerprint", "")
|
||||||
|
pubkey_pem = key_text
|
||||||
|
pinned = None
|
||||||
|
for p in federation.list_peers():
|
||||||
|
if p.fingerprint == declared_fp:
|
||||||
|
pinned = p
|
||||||
|
break
|
||||||
|
if pinned:
|
||||||
|
pubkey_pem = pinned.pubkey_pem
|
||||||
|
typer.echo(f" · using pinned pubkey for {pinned.domain}")
|
||||||
|
else:
|
||||||
|
typer.echo(" · WARNING: no pinned pubkey for this peer — trusting fetched key (TOFU)")
|
||||||
|
|
||||||
|
db.init_db()
|
||||||
|
result = federation.import_signed_feed(feed, pubkey_pem)
|
||||||
|
if isinstance(result, Err):
|
||||||
|
typer.echo(f" ✗ verification failed: {result.reason}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
s = result.value
|
||||||
|
typer.echo(f" ✓ verified peer {s.peer_fingerprint}")
|
||||||
|
typer.echo(f" cases: {s.cases_seen} iocs: {s.iocs_seen} signals buffered: {len(s.signal_ids)}")
|
||||||
|
|
||||||
|
@typer_app.command("fed-peer-add")
|
||||||
|
def fed_peer_add(
|
||||||
|
domain: str = typer.Argument(..., help="peer's public domain"),
|
||||||
|
fingerprint: str = typer.Argument(..., help="peer's 32-hex fingerprint"),
|
||||||
|
pubkey_file: Path = typer.Option(..., "--pubkey-file", help="path to peer's PEM public key"),
|
||||||
|
status: str = typer.Option("unknown", "--status", help="unknown | trusted | blocked"),
|
||||||
|
) -> None:
|
||||||
|
"""Register a peer's identity in the local registry."""
|
||||||
|
db.init_db()
|
||||||
|
pem = pubkey_file.read_text(encoding="utf-8")
|
||||||
|
federation.register_peer(domain, fingerprint, pem, status=status)
|
||||||
|
typer.echo(f"registered peer {domain} ({fingerprint[:8]}…) status={status}")
|
||||||
|
|
||||||
|
@typer_app.command("fed-peer-list")
|
||||||
|
def fed_peer_list() -> None:
|
||||||
|
"""List all registered peers."""
|
||||||
|
db.init_db()
|
||||||
|
rows = federation.list_peers()
|
||||||
|
if not rows:
|
||||||
|
typer.echo("(no peers registered)")
|
||||||
|
return
|
||||||
|
for p in rows:
|
||||||
|
typer.echo(
|
||||||
|
f" {p.status:8s} {p.domain:30s} {p.fingerprint[:8]}…{p.fingerprint[-8:]}"
|
||||||
|
f" last_seen={(p.last_seen or '—')[:16]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@typer_app.command("fed-peer-trust")
|
||||||
|
def fed_peer_trust(domain: str = typer.Argument(...)) -> None:
|
||||||
|
"""Mark a peer as trusted — their signals count toward quorum."""
|
||||||
|
db.init_db()
|
||||||
|
federation.set_peer_status(domain, "trusted")
|
||||||
|
typer.echo(f"{domain} → trusted")
|
||||||
|
|
||||||
|
@typer_app.command("fed-peer-block")
|
||||||
|
def fed_peer_block(domain: str = typer.Argument(...)) -> None:
|
||||||
|
"""Block a peer — ignore their feeds."""
|
||||||
|
db.init_db()
|
||||||
|
federation.set_peer_status(domain, "blocked")
|
||||||
|
typer.echo(f"{domain} → blocked")
|
||||||
|
|
||||||
|
@typer_app.command("fed-peer-remove")
|
||||||
|
def fed_peer_remove(domain: str = typer.Argument(...)) -> None:
|
||||||
|
"""Drop a peer from the registry."""
|
||||||
|
db.init_db()
|
||||||
|
federation.remove_peer(domain)
|
||||||
|
typer.echo(f"removed {domain}")
|
||||||
122
src/psyc/_pulse_cli.py
Normal file
122
src/psyc/_pulse_cli.py
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
"""Typer commands for the Pulse scheduler.
|
||||||
|
|
||||||
|
Imported and wired by `cli.py` via `register(app)`. Kept as a separate module
|
||||||
|
so the main CLI surface stays grep-friendly and the scheduler can grow its own
|
||||||
|
verbs without bloating cli.py.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import typer
|
||||||
|
|
||||||
|
from psyc import db
|
||||||
|
from psyc.lines import pulse
|
||||||
|
|
||||||
|
|
||||||
|
def _relative(dt: Optional[datetime]) -> str:
|
||||||
|
if dt is None:
|
||||||
|
return "—"
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
if dt.tzinfo is None:
|
||||||
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
delta = int((dt - now).total_seconds())
|
||||||
|
past = delta < 0
|
||||||
|
secs = abs(delta)
|
||||||
|
if secs < 5:
|
||||||
|
return "now"
|
||||||
|
if secs < 60:
|
||||||
|
unit = f"{secs}s"
|
||||||
|
elif secs < 3600:
|
||||||
|
unit = f"{secs // 60}m"
|
||||||
|
elif secs < 86400:
|
||||||
|
unit = f"{secs // 3600}h"
|
||||||
|
else:
|
||||||
|
unit = f"{secs // 86400}d"
|
||||||
|
return f"{unit} ago" if past else f"in {unit}"
|
||||||
|
|
||||||
|
|
||||||
|
def register(typer_app: typer.Typer) -> None:
|
||||||
|
"""Add pulse-* commands to the given Typer app."""
|
||||||
|
|
||||||
|
@typer_app.command("pulse-status")
|
||||||
|
def pulse_status() -> None:
|
||||||
|
"""Print the pipeline table (mode · cadence · last-fired · next-fire · last-result)."""
|
||||||
|
db.init_db()
|
||||||
|
ks = pulse.kill_switch_state()
|
||||||
|
typer.echo(f"kill switch: {'ARMED' if ks else 'OFF'}")
|
||||||
|
rows = pulse.state()
|
||||||
|
if not rows:
|
||||||
|
typer.echo("(no pipelines registered)")
|
||||||
|
return
|
||||||
|
typer.echo(f"{'name':<16} {'mode':<14} {'cadence':>8} {'enabled':<7} {'last':<10} {'next':<10} result")
|
||||||
|
for p in rows:
|
||||||
|
typer.echo(
|
||||||
|
f"{p.name:<16} {p.mode.value:<14} {p.cadence_seconds:>6}s "
|
||||||
|
f"{'yes' if p.enabled else 'no':<7} "
|
||||||
|
f"{_relative(p.last_fired):<10} {_relative(p.next_fire):<10} "
|
||||||
|
f"{(p.last_result or '')[:60]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@typer_app.command("pulse-tick")
|
||||||
|
def pulse_tick() -> None:
|
||||||
|
"""Run one scheduler heartbeat and print the per-pipeline outcomes."""
|
||||||
|
db.init_db()
|
||||||
|
results = pulse.tick()
|
||||||
|
for name, outcome, result in results:
|
||||||
|
marker = {"ok": "✓", "err": "✗", "skipped": "⊘"}.get(outcome, "·")
|
||||||
|
typer.echo(f" {marker} {name:<16} {outcome:<8} {result[:120]}")
|
||||||
|
|
||||||
|
@typer_app.command("pulse-set-mode")
|
||||||
|
def pulse_set_mode(
|
||||||
|
name: str = typer.Argument(..., help="pipeline name"),
|
||||||
|
mode: str = typer.Argument(..., help="manual | auto-propose | auto-execute"),
|
||||||
|
) -> None:
|
||||||
|
db.init_db()
|
||||||
|
try:
|
||||||
|
pulse.set_mode(name, pulse.PulseMode(mode))
|
||||||
|
except ValueError as exc:
|
||||||
|
typer.echo(f"error: {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
typer.echo(f"{name} mode → {mode}")
|
||||||
|
|
||||||
|
@typer_app.command("pulse-set-cadence")
|
||||||
|
def pulse_set_cadence(
|
||||||
|
name: str = typer.Argument(..., help="pipeline name"),
|
||||||
|
seconds: int = typer.Argument(..., help="cadence in seconds (>0)"),
|
||||||
|
) -> None:
|
||||||
|
db.init_db()
|
||||||
|
try:
|
||||||
|
pulse.set_cadence(name, seconds)
|
||||||
|
except ValueError as exc:
|
||||||
|
typer.echo(f"error: {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
typer.echo(f"{name} cadence → {seconds}s")
|
||||||
|
|
||||||
|
@typer_app.command("pulse-run")
|
||||||
|
def pulse_run(name: str = typer.Argument(..., help="pipeline name")) -> None:
|
||||||
|
"""Manually fire one pipeline (bypasses cadence; honors kill switch)."""
|
||||||
|
db.init_db()
|
||||||
|
try:
|
||||||
|
outcome, result = pulse.run_now(name)
|
||||||
|
except ValueError as exc:
|
||||||
|
typer.echo(f"error: {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
marker = {"ok": "✓", "err": "✗", "skipped": "⊘"}.get(outcome, "·")
|
||||||
|
typer.echo(f" {marker} {name}: {outcome} — {result}")
|
||||||
|
|
||||||
|
@typer_app.command("pulse-kill")
|
||||||
|
def pulse_kill() -> None:
|
||||||
|
"""Arm the kill switch — every pipeline halts on the next tick."""
|
||||||
|
db.init_db()
|
||||||
|
pulse.set_kill_switch(True)
|
||||||
|
typer.echo("kill switch ARMED — all pipelines halted")
|
||||||
|
|
||||||
|
@typer_app.command("pulse-unkill")
|
||||||
|
def pulse_unkill() -> None:
|
||||||
|
"""Disarm the kill switch — pulse resumes on the next tick."""
|
||||||
|
db.init_db()
|
||||||
|
pulse.set_kill_switch(False)
|
||||||
|
typer.echo("kill switch disarmed — pulse resumes")
|
||||||
@@ -17,11 +17,15 @@ from psyc.lines import classify, courier, lookup, proof, respond, route, scout,
|
|||||||
from psyc.lines import map as map_line
|
from psyc.lines import map as map_line
|
||||||
from psyc.models import Outcome
|
from psyc.models import Outcome
|
||||||
from psyc.result import Err, Ok
|
from psyc.result import Err, Ok
|
||||||
|
from psyc._federation_cli import register as _register_federation_cli
|
||||||
|
from psyc._pulse_cli import register as _register_pulse_cli
|
||||||
|
|
||||||
|
|
||||||
app = typer.Typer(add_completion=False, help="psyc — defensive CTI routing & sealing")
|
app = typer.Typer(add_completion=False, help="psyc — defensive CTI routing & sealing")
|
||||||
log.configure()
|
log.configure()
|
||||||
_log = log.get(__name__)
|
_log = log.get(__name__)
|
||||||
|
_register_pulse_cli(app)
|
||||||
|
_register_federation_cli(app)
|
||||||
|
|
||||||
|
|
||||||
@app.command("init")
|
@app.command("init")
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from fastapi.templating import Jinja2Templates
|
|||||||
from starlette.middleware.sessions import SessionMiddleware
|
from starlette.middleware.sessions import SessionMiddleware
|
||||||
|
|
||||||
from psyc import db, log
|
from psyc import db, log
|
||||||
from psyc.cockpit import adminauth, case_visuals, docker_view, inference, journey as journey_view
|
from psyc.cockpit import adminauth, case_visuals, docker_view, federation_routes, inference, journey as journey_view, pulse_routes
|
||||||
from psyc.lines import courier as courier_line
|
from psyc.lines import courier as courier_line
|
||||||
from psyc.lines import ledger as ledger_line
|
from psyc.lines import ledger as ledger_line
|
||||||
from psyc.lines import lookup as lookup_line
|
from psyc.lines import lookup as lookup_line
|
||||||
@@ -35,6 +35,9 @@ app = FastAPI(title="psyc Operations Cockpit", version="0.1.0")
|
|||||||
app.add_middleware(SessionMiddleware, secret_key=adminauth.session_secret(), max_age=3600)
|
app.add_middleware(SessionMiddleware, secret_key=adminauth.session_secret(), max_age=3600)
|
||||||
app.mount("/static", StaticFiles(directory=str(HERE / "static")), name="static")
|
app.mount("/static", StaticFiles(directory=str(HERE / "static")), name="static")
|
||||||
|
|
||||||
|
pulse_routes.register(app, TEMPLATES)
|
||||||
|
federation_routes.register(app, TEMPLATES)
|
||||||
|
|
||||||
|
|
||||||
def _admin_ok(request: Request) -> bool:
|
def _admin_ok(request: Request) -> bool:
|
||||||
return bool(request.session.get("admin_ok"))
|
return bool(request.session.get("admin_ok"))
|
||||||
|
|||||||
125
src/psyc/cockpit/federation_routes.py
Normal file
125
src/psyc/cockpit/federation_routes.py
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
"""Federation cockpit routes — admin page, public feed/key/info endpoints.
|
||||||
|
|
||||||
|
Wired into the FastAPI app by app.py via a single `register(app, TEMPLATES)`
|
||||||
|
call so the federation surface stays self-contained.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from typing import Any, Dict, Optional, Tuple
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Form, HTTPException, Request
|
||||||
|
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse
|
||||||
|
from fastapi.templating import Jinja2Templates
|
||||||
|
|
||||||
|
from psyc import db, log
|
||||||
|
from psyc.lines import federation
|
||||||
|
|
||||||
|
|
||||||
|
_log = log.get(__name__)
|
||||||
|
|
||||||
|
# Tiny in-memory cache for the signed feed — peers may poll, recomputing
|
||||||
|
# canonical JSON + signature on every hit would be wasteful.
|
||||||
|
_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
|
||||||
|
_FEED_TTL = 60.0
|
||||||
|
|
||||||
|
|
||||||
|
def _admin_ok(request: Request) -> bool:
|
||||||
|
return bool(request.session.get("admin_ok"))
|
||||||
|
|
||||||
|
|
||||||
|
def _cached_feed() -> Dict[str, Any]:
|
||||||
|
now = time.time()
|
||||||
|
if _FEED_CACHE["payload"] is None or (now - _FEED_CACHE["ts"]) > _FEED_TTL:
|
||||||
|
_FEED_CACHE["payload"] = federation.build_signed_feed()
|
||||||
|
_FEED_CACHE["ts"] = now
|
||||||
|
return _FEED_CACHE["payload"]
|
||||||
|
|
||||||
|
|
||||||
|
def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
|
||||||
|
"""Mount all federation routes onto `app`."""
|
||||||
|
|
||||||
|
@app.get("/admin/federation", response_class=HTMLResponse)
|
||||||
|
def admin_federation(request: Request) -> HTMLResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
return RedirectResponse("/admin", status_code=303)
|
||||||
|
host = request.url.hostname or "your-node.example"
|
||||||
|
suggested = request.query_params.get("domain", host)
|
||||||
|
rec = federation.dns_record(suggested)
|
||||||
|
peers = federation.list_peers()
|
||||||
|
signals = db.recent_signals(limit=20)
|
||||||
|
return TEMPLATES.TemplateResponse(
|
||||||
|
request,
|
||||||
|
"admin_federation.html",
|
||||||
|
{
|
||||||
|
"fingerprint": federation.node_fingerprint(),
|
||||||
|
"pubkey_pem": federation.public_key_pem(),
|
||||||
|
"suggested_domain": suggested,
|
||||||
|
"dns": rec,
|
||||||
|
"peers": peers,
|
||||||
|
"signals": signals,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.post("/admin/federation/peers/add")
|
||||||
|
def admin_federation_add_peer(
|
||||||
|
request: Request,
|
||||||
|
domain: str = Form(...),
|
||||||
|
fingerprint: str = Form(...),
|
||||||
|
pubkey_pem: str = Form(...),
|
||||||
|
status: str = Form("unknown"),
|
||||||
|
) -> RedirectResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
raise HTTPException(status_code=403, detail="admin session required")
|
||||||
|
try:
|
||||||
|
federation.register_peer(domain.strip(), fingerprint.strip(), pubkey_pem.strip(), status=status)
|
||||||
|
except Exception as exc:
|
||||||
|
_log.warning("federation.peer.add.error", domain=domain, error=str(exc))
|
||||||
|
return RedirectResponse("/admin/federation", status_code=303)
|
||||||
|
|
||||||
|
@app.post("/admin/federation/peers/{domain}/status")
|
||||||
|
def admin_federation_set_status(
|
||||||
|
request: Request,
|
||||||
|
domain: str,
|
||||||
|
status: str = Form(...),
|
||||||
|
) -> RedirectResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
raise HTTPException(status_code=403, detail="admin session required")
|
||||||
|
try:
|
||||||
|
federation.set_peer_status(domain, status)
|
||||||
|
except ValueError as exc:
|
||||||
|
_log.warning("federation.peer.status.bad", domain=domain, status=status, error=str(exc))
|
||||||
|
return RedirectResponse("/admin/federation", status_code=303)
|
||||||
|
|
||||||
|
@app.post("/admin/federation/peers/{domain}/remove")
|
||||||
|
def admin_federation_remove(
|
||||||
|
request: Request,
|
||||||
|
domain: str,
|
||||||
|
) -> RedirectResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
raise HTTPException(status_code=403, detail="admin session required")
|
||||||
|
federation.remove_peer(domain)
|
||||||
|
return RedirectResponse("/admin/federation", status_code=303)
|
||||||
|
|
||||||
|
# ---------- public endpoints --------------------------------------
|
||||||
|
|
||||||
|
@app.get("/federation/info")
|
||||||
|
def federation_info() -> JSONResponse:
|
||||||
|
return JSONResponse({
|
||||||
|
"fingerprint": federation.node_fingerprint(),
|
||||||
|
"version": federation.FEED_VERSION,
|
||||||
|
"feed": federation.FEED_PATH,
|
||||||
|
"key": "/federation/key",
|
||||||
|
})
|
||||||
|
|
||||||
|
@app.get("/federation/key", response_class=PlainTextResponse)
|
||||||
|
def federation_key() -> PlainTextResponse:
|
||||||
|
return PlainTextResponse(federation.public_key_pem(), media_type="text/plain")
|
||||||
|
|
||||||
|
@app.get("/federation/feed")
|
||||||
|
def federation_feed() -> JSONResponse:
|
||||||
|
return JSONResponse(_cached_feed())
|
||||||
|
|
||||||
|
_log.info("federation.routes.registered")
|
||||||
120
src/psyc/cockpit/pulse_routes.py
Normal file
120
src/psyc/cockpit/pulse_routes.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
"""Cockpit routes for the Pulse scheduler — admin-gated.
|
||||||
|
|
||||||
|
The integration is intentionally single-call: `register(app, TEMPLATES)` adds
|
||||||
|
the routes AND wires the FastAPI startup hook that launches the background
|
||||||
|
scheduler loop. Caller in app.py just imports + invokes register().
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Form, Request
|
||||||
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||||
|
from fastapi.templating import Jinja2Templates
|
||||||
|
|
||||||
|
from psyc import log
|
||||||
|
from psyc.lines import pulse
|
||||||
|
|
||||||
|
|
||||||
|
_log = log.get(__name__)
|
||||||
|
|
||||||
|
TICK_INTERVAL_SECONDS = 30
|
||||||
|
|
||||||
|
|
||||||
|
def _admin_ok(request: Request) -> bool:
|
||||||
|
"""Mirror of the local helper in app.py — admin session is just session['admin_ok']."""
|
||||||
|
return bool(request.session.get("admin_ok"))
|
||||||
|
|
||||||
|
|
||||||
|
def _relative(dt: Optional[datetime]) -> str:
|
||||||
|
"""Human-friendly "3m ago" / "in 12m" / "now". None → '—'."""
|
||||||
|
if dt is None:
|
||||||
|
return "—"
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
if dt.tzinfo is None:
|
||||||
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
delta = (dt - now).total_seconds()
|
||||||
|
past = delta < 0
|
||||||
|
secs = abs(int(delta))
|
||||||
|
if secs < 5:
|
||||||
|
return "now"
|
||||||
|
if secs < 60:
|
||||||
|
unit = f"{secs}s"
|
||||||
|
elif secs < 3600:
|
||||||
|
unit = f"{secs // 60}m"
|
||||||
|
elif secs < 86400:
|
||||||
|
unit = f"{secs // 3600}h"
|
||||||
|
else:
|
||||||
|
unit = f"{secs // 86400}d"
|
||||||
|
return f"{unit} ago" if past else f"in {unit}"
|
||||||
|
|
||||||
|
|
||||||
|
def register(app: FastAPI, templates: Jinja2Templates) -> None:
|
||||||
|
"""Attach the /admin/pulse routes and the background scheduler loop to `app`."""
|
||||||
|
|
||||||
|
@app.get("/admin/pulse", response_class=HTMLResponse)
|
||||||
|
def pulse_view(request: Request) -> HTMLResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
return RedirectResponse("/admin", status_code=303)
|
||||||
|
flash = request.query_params.get("flash", "")
|
||||||
|
pipelines = pulse.state()
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
request,
|
||||||
|
"admin_pulse.html",
|
||||||
|
{
|
||||||
|
"pipelines": pipelines,
|
||||||
|
"kill_switch": pulse.kill_switch_state(),
|
||||||
|
"tick_interval": TICK_INTERVAL_SECONDS,
|
||||||
|
"relative": _relative,
|
||||||
|
"flash": flash,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.post("/admin/pulse/kill")
|
||||||
|
def pulse_toggle_kill(request: Request) -> RedirectResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
return RedirectResponse("/admin", status_code=303)
|
||||||
|
new = not pulse.kill_switch_state()
|
||||||
|
pulse.set_kill_switch(new)
|
||||||
|
flash = "kill switch ARMED — all pipelines halted" if new else "kill switch disarmed — pulse resumes"
|
||||||
|
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
|
||||||
|
|
||||||
|
@app.post("/admin/pulse/{name}/update")
|
||||||
|
def pulse_update(
|
||||||
|
request: Request,
|
||||||
|
name: str,
|
||||||
|
mode: str = Form(...),
|
||||||
|
cadence_seconds: int = Form(...),
|
||||||
|
enabled: Optional[str] = Form(None),
|
||||||
|
) -> RedirectResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
return RedirectResponse("/admin", status_code=303)
|
||||||
|
try:
|
||||||
|
pulse.set_mode(name, pulse.PulseMode(mode))
|
||||||
|
pulse.set_cadence(name, int(cadence_seconds))
|
||||||
|
pulse.set_enabled(name, enabled is not None)
|
||||||
|
flash = f"updated {name}: mode={mode}, cadence={cadence_seconds}s, enabled={enabled is not None}"
|
||||||
|
except (ValueError, KeyError) as exc:
|
||||||
|
_log.warning("pulse.update.error", name=name, error=str(exc))
|
||||||
|
flash = f"update failed: {exc}"
|
||||||
|
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
|
||||||
|
|
||||||
|
@app.post("/admin/pulse/{name}/run")
|
||||||
|
def pulse_run_now(request: Request, name: str) -> RedirectResponse:
|
||||||
|
if not _admin_ok(request):
|
||||||
|
return RedirectResponse("/admin", status_code=303)
|
||||||
|
try:
|
||||||
|
outcome, result = pulse.run_now(name)
|
||||||
|
flash = f"{name} → {outcome}: {result[:120]}"
|
||||||
|
except ValueError as exc:
|
||||||
|
flash = f"run failed: {exc}"
|
||||||
|
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def _start_pulse_loop() -> None:
|
||||||
|
# Fire-and-forget; the loop catches its own exceptions and self-restarts.
|
||||||
|
asyncio.create_task(pulse.start_background_loop(interval_seconds=TICK_INTERVAL_SECONDS))
|
||||||
|
_log.info("pulse.routes.registered", tick=TICK_INTERVAL_SECONDS)
|
||||||
@@ -5,7 +5,7 @@
|
|||||||
// This makes the cockpit installable as a PWA and survives flaky connections,
|
// This makes the cockpit installable as a PWA and survives flaky connections,
|
||||||
// without serving stale operational data behind the operator's back.
|
// without serving stale operational data behind the operator's back.
|
||||||
|
|
||||||
const CACHE_VERSION = "psyc-v2";
|
const CACHE_VERSION = "psyc-v3";
|
||||||
const STATIC_ASSETS = [
|
const STATIC_ASSETS = [
|
||||||
"/static/cockpit.css",
|
"/static/cockpit.css",
|
||||||
"/static/psyc-tokens.css",
|
"/static/psyc-tokens.css",
|
||||||
|
|||||||
132
src/psyc/cockpit/templates/admin_federation.html
Normal file
132
src/psyc/cockpit/templates/admin_federation.html
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
{% block title %}Federation — psyc admin{% endblock %}
|
||||||
|
{% block content %}
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h1>Federation Identity</h1>
|
||||||
|
<span class="count">{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.</p>
|
||||||
|
<p class="back"><a href="/admin">← back to admin</a></p>
|
||||||
|
|
||||||
|
<div class="card" style="margin-bottom:14px;">
|
||||||
|
<div class="lg-sub">node fingerprint</div>
|
||||||
|
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:18px; word-break:break-all; margin:6px 0 12px; color:var(--accent); text-shadow:0 0 12px var(--accent-glow);">{{ fingerprint }}</div>
|
||||||
|
<details>
|
||||||
|
<summary class="lg-sub" style="cursor:pointer;">public key (PEM)</summary>
|
||||||
|
<pre style="background:var(--panel-2); border:1px solid var(--border); border-radius:6px; padding:10px; margin-top:8px; overflow-x:auto; font-size:11.5px;">{{ pubkey_pem }}</pre>
|
||||||
|
</details>
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>Publish via DNS</h2>
|
||||||
|
<span class="count">SRV + TXT records</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Paste these into your zone file. Once they're live, any peer that knows your domain can discover the node and pin the right key without out-of-band coordination.</p>
|
||||||
|
|
||||||
|
<form method="get" action="/admin/federation" class="lookup-form" style="margin-bottom:12px;">
|
||||||
|
<input type="text" name="domain" value="{{ suggested_domain }}" class="lookup-input" placeholder="domain to publish on (e.g. psyc.example.com)">
|
||||||
|
<button type="submit" class="btn btn-enforce">regenerate</button>
|
||||||
|
</form>
|
||||||
|
|
||||||
|
<pre style="background:var(--panel-2); border:1px solid var(--border); border-radius:6px; padding:12px; overflow-x:auto; font-size:12px; line-height:1.5;">{{ dns.human_instructions }}</pre>
|
||||||
|
</section>
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>Known Peers</h2>
|
||||||
|
<span class="count">{{ peers|length }} registered</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Trusted peers' feeds are signature-verified on every poll. Blocked peers are recorded but ignored. Unknown peers are kept for review — nothing flows from them until you set them trusted.</p>
|
||||||
|
|
||||||
|
{% if peers %}
|
||||||
|
<table class="ledger">
|
||||||
|
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Discovered</th><th>Last seen</th><th></th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
{% for p in peers %}
|
||||||
|
<tr class="ledger-row">
|
||||||
|
<td><strong>{{ p.domain }}</strong></td>
|
||||||
|
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }}</td>
|
||||||
|
<td>
|
||||||
|
{% if p.status == 'trusted' %}
|
||||||
|
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
|
||||||
|
{% elif p.status == 'blocked' %}
|
||||||
|
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
|
||||||
|
{% else %}
|
||||||
|
<span class="sev-badge">unknown</span>
|
||||||
|
{% endif %}
|
||||||
|
</td>
|
||||||
|
<td class="lg-ts">{{ (p.discovered_at or '')[:16] | replace('T', ' ') }}</td>
|
||||||
|
<td class="lg-ts">{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }}</td>
|
||||||
|
<td>
|
||||||
|
<form method="post" action="/admin/federation/peers/{{ p.domain }}/status" class="queue-action">
|
||||||
|
<input type="hidden" name="status" value="trusted">
|
||||||
|
<button type="submit" class="btn btn-enforce" {% if p.status == 'trusted' %}disabled{% endif %}>trust</button>
|
||||||
|
</form>
|
||||||
|
<form method="post" action="/admin/federation/peers/{{ p.domain }}/status" class="queue-action">
|
||||||
|
<input type="hidden" name="status" value="blocked">
|
||||||
|
<button type="submit" class="btn btn-reject" {% if p.status == 'blocked' %}disabled{% endif %}>block</button>
|
||||||
|
</form>
|
||||||
|
<form method="post" action="/admin/federation/peers/{{ p.domain }}/remove" class="queue-action"
|
||||||
|
onsubmit="return confirm('Remove {{ p.domain }}? Their signals will no longer count toward quorum.');">
|
||||||
|
<button type="submit" class="btn">remove</button>
|
||||||
|
</form>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
{% else %}
|
||||||
|
<p class="page-intro">(no peers yet — add one below)</p>
|
||||||
|
{% endif %}
|
||||||
|
</section>
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>Add Peer</h2>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Pin a peer's identity manually: their domain, their fingerprint (from their DNS TXT record), and the public key they publish at <code>/federation/key</code>.</p>
|
||||||
|
<form method="post" action="/admin/federation/peers/add" style="display:grid; gap:10px; max-width:680px;">
|
||||||
|
<input type="text" name="domain" placeholder="peer domain (e.g. peer.example.com)" class="lookup-input" required>
|
||||||
|
<input type="text" name="fingerprint" placeholder="fingerprint (32 hex chars)" class="lookup-input" maxlength="64" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">
|
||||||
|
<textarea name="pubkey_pem" placeholder="-----BEGIN PUBLIC KEY----- … -----END PUBLIC KEY-----" rows="6" class="lookup-input" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;"></textarea>
|
||||||
|
<select name="status" class="lookup-input">
|
||||||
|
<option value="unknown">unknown — record only, don't trust yet</option>
|
||||||
|
<option value="trusted">trusted — count toward quorum</option>
|
||||||
|
<option value="blocked">blocked — ignore</option>
|
||||||
|
</select>
|
||||||
|
<button type="submit" class="btn btn-enforce">+ register peer</button>
|
||||||
|
</form>
|
||||||
|
</section>
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>Recent Signals</h2>
|
||||||
|
<span class="count">last {{ signals|length }} of buffer</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Verified federation signals from peers — case + IOC reports awaiting quorum. The signal buffer is what later quorum logic will count over.</p>
|
||||||
|
|
||||||
|
{% if signals %}
|
||||||
|
<table class="ledger">
|
||||||
|
<thead><tr><th>Received</th><th>Peer</th><th>Type</th><th>Id</th><th>Hash</th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
{% for s in signals %}
|
||||||
|
<tr class="ledger-row">
|
||||||
|
<td class="lg-ts">{{ (s.received_at or '')[:19] | replace('T', ' ') }}</td>
|
||||||
|
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ s.peer_fingerprint[:8] }}…</td>
|
||||||
|
<td><span class="sev-badge">{{ s.signal_type }}</span></td>
|
||||||
|
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ s.signal_id[:48] }}</td>
|
||||||
|
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ s.signal_hash[:16] }}…</td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
{% else %}
|
||||||
|
<p class="page-intro">(no signals received yet — quorum stage will populate this)</p>
|
||||||
|
{% endif %}
|
||||||
|
</section>
|
||||||
|
|
||||||
|
{% endblock %}
|
||||||
101
src/psyc/cockpit/templates/admin_pulse.html
Normal file
101
src/psyc/cockpit/templates/admin_pulse.html
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
{% block title %}Pulse — psyc admin{% endblock %}
|
||||||
|
{% block content %}
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h1>Pulse — autonomous heartbeat</h1>
|
||||||
|
<span class="count">{{ pipelines|length }} pipeline{{ '' if pipelines|length == 1 else 's' }}</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Cron-style scheduler that drives every psyc line on a cadence without human input. Each pipeline has an autonomy mode and a cadence in seconds. The kill switch halts everything instantly — it overrides cadence, mode, and the enabled flag.</p>
|
||||||
|
<p class="back"><a href="/admin">← back to admin</a></p>
|
||||||
|
|
||||||
|
{% if flash %}
|
||||||
|
<div class="verdict verdict-clean">{{ flash }}</div>
|
||||||
|
{% endif %}
|
||||||
|
</section>
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>Global kill switch</h2>
|
||||||
|
<span class="count">{{ 'ARMED' if kill_switch else 'OFF' }}</span>
|
||||||
|
</div>
|
||||||
|
{% if kill_switch %}
|
||||||
|
<div class="verdict" style="background:rgba(248,113,113,0.12); border:1px solid rgba(248,113,113,0.45); color:#fca5a5; padding:14px 18px; border-radius:6px; font-weight:700; letter-spacing:0.04em;">
|
||||||
|
✗ KILL SWITCH ARMED — every pipeline is paused. tick() returns "skipped" for everything. Run-now is also blocked. Toggle off to resume.
|
||||||
|
</div>
|
||||||
|
<form method="post" action="/admin/pulse/kill" style="margin-top:14px;">
|
||||||
|
<button type="submit" class="btn btn-approve">Disarm kill switch</button>
|
||||||
|
</form>
|
||||||
|
{% else %}
|
||||||
|
<div class="verdict verdict-clean">✓ Pulse is live — the background loop ticks every {{ tick_interval }}s.</div>
|
||||||
|
<form method="post" action="/admin/pulse/kill" style="margin-top:14px;"
|
||||||
|
onsubmit="return confirm('Arm the kill switch? Every pipeline halts immediately.');">
|
||||||
|
<button type="submit" class="btn btn-reject">Arm kill switch</button>
|
||||||
|
</form>
|
||||||
|
{% endif %}
|
||||||
|
</section>
|
||||||
|
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>Pipelines</h2>
|
||||||
|
<span class="count">{{ pipelines|selectattr('enabled')|list|length }} enabled</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Mode: <code>auto-execute</code> fires the line, <code>auto-propose</code> stages proposals for human approval, <code>manual</code> runs only when you press “Run now”. Cadence is the gap between ticks; the loop wakes up every {{ tick_interval }}s.</p>
|
||||||
|
|
||||||
|
<table class="ledger">
|
||||||
|
<thead>
|
||||||
|
<tr>
|
||||||
|
<th style="width:24%;">Pipeline</th>
|
||||||
|
<th>Mode · cadence · enabled</th>
|
||||||
|
<th style="width:11%;">Last fired</th>
|
||||||
|
<th style="width:11%;">Next fire</th>
|
||||||
|
<th>Last result</th>
|
||||||
|
<th style="width:1%;"></th>
|
||||||
|
</tr>
|
||||||
|
</thead>
|
||||||
|
<tbody>
|
||||||
|
{% for p in pipelines %}
|
||||||
|
<tr class="ledger-row">
|
||||||
|
<td>
|
||||||
|
<strong>{{ p.title }}</strong>
|
||||||
|
<div class="lg-sub"><code>{{ p.name }}</code> · {{ p.description }}</div>
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
<form method="post" action="/admin/pulse/{{ p.name }}/update" class="queue-action" style="display:flex; gap:8px; align-items:center; flex-wrap:wrap;">
|
||||||
|
<select name="mode" style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px;">
|
||||||
|
<option value="auto-execute" {% if p.mode.value == 'auto-execute' %}selected{% endif %}>auto-execute</option>
|
||||||
|
<option value="auto-propose" {% if p.mode.value == 'auto-propose' %}selected{% endif %}>auto-propose</option>
|
||||||
|
<option value="manual" {% if p.mode.value == 'manual' %}selected{% endif %}>manual</option>
|
||||||
|
</select>
|
||||||
|
<input type="number" name="cadence_seconds" value="{{ p.cadence_seconds }}" min="1" step="1"
|
||||||
|
style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px; width:84px;" title="cadence in seconds">
|
||||||
|
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
|
||||||
|
<input type="checkbox" name="enabled" value="1" {% if p.enabled %}checked{% endif %}> enabled
|
||||||
|
</label>
|
||||||
|
<button type="submit" class="btn">save</button>
|
||||||
|
</form>
|
||||||
|
</td>
|
||||||
|
<td class="lg-ts">
|
||||||
|
{% if p.last_fired %}<span title="{{ p.last_fired.isoformat() }}">{{ relative(p.last_fired) }}</span>{% else %}—{% endif %}
|
||||||
|
</td>
|
||||||
|
<td class="lg-ts">
|
||||||
|
{% if p.next_fire %}<span title="{{ p.next_fire.isoformat() }}">{{ relative(p.next_fire) }}</span>{% else %}—{% endif %}
|
||||||
|
</td>
|
||||||
|
<td class="lg-sub" style="max-width:0;">
|
||||||
|
{% if p.last_outcome == 'ok' %}<span style="color: var(--green);">✓</span>
|
||||||
|
{% elif p.last_outcome == 'err' %}<span style="color: var(--red);">✗</span>
|
||||||
|
{% elif p.last_outcome == 'skipped' %}<span style="color: var(--muted);">⊘</span>
|
||||||
|
{% endif %}
|
||||||
|
{{ (p.last_result or '—')[:60] }}{% if (p.last_result or '')|length > 60 %}…{% endif %}
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
<form method="post" action="/admin/pulse/{{ p.name }}/run" class="queue-action">
|
||||||
|
<button type="submit" class="btn btn-enforce" title="Fire now, regardless of cadence">Run now</button>
|
||||||
|
</form>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</section>
|
||||||
|
{% endblock %}
|
||||||
@@ -69,6 +69,8 @@
|
|||||||
</svg>
|
</svg>
|
||||||
Admin
|
Admin
|
||||||
</a>
|
</a>
|
||||||
|
<a href="/admin/pulse" class="nav-admin" title="Autonomy scheduler">Pulse</a>
|
||||||
|
<a href="/admin/federation" class="nav-admin" title="Federation peers">Federation</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</nav>
|
</nav>
|
||||||
{% if request.session.get('admin_who') %}
|
{% if request.session.get('admin_who') %}
|
||||||
|
|||||||
155
src/psyc/db.py
155
src/psyc/db.py
@@ -114,6 +114,53 @@ Index("iocs_value_idx", iocs.c.value)
|
|||||||
Index("iocs_type_idx", iocs.c.ioc_type)
|
Index("iocs_type_idx", iocs.c.ioc_type)
|
||||||
Index("iocs_case_idx", iocs.c.case_id)
|
Index("iocs_case_idx", iocs.c.case_id)
|
||||||
|
|
||||||
|
pulse_pipelines = Table(
|
||||||
|
"pulse_pipelines", _metadata,
|
||||||
|
Column("name", String, primary_key=True),
|
||||||
|
Column("title", String, nullable=False),
|
||||||
|
Column("description", Text, nullable=False, default=""),
|
||||||
|
Column("mode", String, nullable=False), # manual | auto-propose | auto-execute
|
||||||
|
Column("cadence_seconds", Integer, nullable=False),
|
||||||
|
Column("enabled", Boolean, nullable=False, default=True),
|
||||||
|
Column("last_fired", String, nullable=True), # ISO timestamp or NULL
|
||||||
|
Column("next_fire", String, nullable=True), # ISO timestamp or NULL
|
||||||
|
Column("last_result", Text, nullable=False, default=""),
|
||||||
|
Column("last_outcome", String, nullable=False, default=""), # ok | err | skipped | ""
|
||||||
|
)
|
||||||
|
|
||||||
|
pulse_settings = Table(
|
||||||
|
"pulse_settings", _metadata,
|
||||||
|
Column("key", String, primary_key=True),
|
||||||
|
Column("value", String, nullable=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
peers = Table(
|
||||||
|
"peers", _metadata,
|
||||||
|
Column("domain", String, primary_key=True),
|
||||||
|
Column("fingerprint", String, nullable=False),
|
||||||
|
Column("pubkey_pem", Text, nullable=False),
|
||||||
|
Column("status", String, nullable=False), # unknown | trusted | blocked
|
||||||
|
Column("discovered_at", String, nullable=False),
|
||||||
|
Column("last_seen", String, nullable=True),
|
||||||
|
Column("notes", Text, nullable=True),
|
||||||
|
)
|
||||||
|
Index("peers_fp_idx", peers.c.fingerprint)
|
||||||
|
Index("peers_status_idx", peers.c.status)
|
||||||
|
|
||||||
|
federation_signals = Table(
|
||||||
|
"federation_signals", _metadata,
|
||||||
|
Column("id", Integer, primary_key=True, autoincrement=True),
|
||||||
|
Column("peer_fingerprint", String, nullable=False),
|
||||||
|
Column("signal_type", String, nullable=False), # case | ioc
|
||||||
|
Column("signal_id", String, nullable=False), # case_id or ioc value
|
||||||
|
Column("signal_hash", String, nullable=False), # sha256 of canonical record
|
||||||
|
Column("received_at", String, nullable=False),
|
||||||
|
Column("raw_json", Text, nullable=False),
|
||||||
|
)
|
||||||
|
Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
|
||||||
|
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
|
||||||
|
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
|
||||||
|
|
||||||
|
|
||||||
_log = log.get(__name__)
|
_log = log.get(__name__)
|
||||||
_engine: Optional[Engine] = None
|
_engine: Optional[Engine] = None
|
||||||
@@ -214,3 +261,111 @@ def ioc_count(db_path: Path = DB_PATH) -> int:
|
|||||||
stmt = select(func.count()).select_from(iocs)
|
stmt = select(func.count()).select_from(iocs)
|
||||||
with engine(db_path).connect() as conn:
|
with engine(db_path).connect() as conn:
|
||||||
return conn.execute(stmt).scalar_one()
|
return conn.execute(stmt).scalar_one()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- pulse scheduler ----------------------------------------------
|
||||||
|
|
||||||
|
def get_pulse_state(db_path: Path = DB_PATH) -> List[dict]:
|
||||||
|
"""Every registered pipeline, ordered by name."""
|
||||||
|
stmt = select(pulse_pipelines).order_by(pulse_pipelines.c.name)
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_pulse_pipeline(row: dict, db_path: Path = DB_PATH) -> None:
|
||||||
|
"""Insert or update one pipeline by name."""
|
||||||
|
stmt = sqlite_insert(pulse_pipelines).values(**row)
|
||||||
|
stmt = stmt.on_conflict_do_update(
|
||||||
|
index_elements=[pulse_pipelines.c.name],
|
||||||
|
set_=dict(
|
||||||
|
title=stmt.excluded.title,
|
||||||
|
description=stmt.excluded.description,
|
||||||
|
mode=stmt.excluded.mode,
|
||||||
|
cadence_seconds=stmt.excluded.cadence_seconds,
|
||||||
|
enabled=stmt.excluded.enabled,
|
||||||
|
last_fired=stmt.excluded.last_fired,
|
||||||
|
next_fire=stmt.excluded.next_fire,
|
||||||
|
last_result=stmt.excluded.last_result,
|
||||||
|
last_outcome=stmt.excluded.last_outcome,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
def kill_switch_get(db_path: Path = DB_PATH) -> bool:
|
||||||
|
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == "kill_switch")
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
row = conn.execute(stmt).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return False
|
||||||
|
return str(row.value) == "1"
|
||||||
|
|
||||||
|
|
||||||
|
def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
|
||||||
|
value = "1" if armed else "0"
|
||||||
|
stmt = sqlite_insert(pulse_settings).values(key="kill_switch", value=value)
|
||||||
|
stmt = stmt.on_conflict_do_update(
|
||||||
|
index_elements=[pulse_settings.c.key],
|
||||||
|
set_=dict(value=stmt.excluded.value),
|
||||||
|
)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- federation: peers + signal buffer ----------------------------
|
||||||
|
|
||||||
|
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
|
||||||
|
"""Insert-or-update a peer by domain. `row` must include `domain`."""
|
||||||
|
stmt = sqlite_insert(peers).values(**row)
|
||||||
|
update_cols = {k: stmt.excluded[k] for k in row if k != "domain"}
|
||||||
|
stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
def list_peers(db_path: Path = DB_PATH) -> List[dict]:
|
||||||
|
stmt = select(peers).order_by(peers.c.discovered_at.desc())
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||||
|
|
||||||
|
|
||||||
|
def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]:
|
||||||
|
stmt = select(peers).where(peers.c.domain == domain)
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
row = conn.execute(stmt).fetchone()
|
||||||
|
return dict(row._mapping) if row else None
|
||||||
|
|
||||||
|
|
||||||
|
def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None:
|
||||||
|
from sqlalchemy import update as sa_update
|
||||||
|
stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
def remove_peer(domain: str, db_path: Path = DB_PATH) -> None:
|
||||||
|
stmt = peers.delete().where(peers.c.domain == domain)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
def record_signal(row: dict, db_path: Path = DB_PATH) -> int:
|
||||||
|
"""Append one federation signal. Returns the inserted row id."""
|
||||||
|
stmt = insert(federation_signals).values(**row)
|
||||||
|
with engine(db_path).begin() as conn:
|
||||||
|
res = conn.execute(stmt)
|
||||||
|
return int(res.inserted_primary_key[0])
|
||||||
|
|
||||||
|
|
||||||
|
def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]:
|
||||||
|
"""All recorded signals matching `signal_hash` — quorum-lookup primitive."""
|
||||||
|
stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash)
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||||
|
|
||||||
|
|
||||||
|
def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
|
||||||
|
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
|
||||||
|
with engine(db_path).connect() as conn:
|
||||||
|
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||||
|
|||||||
405
src/psyc/lines/federation.py
Normal file
405
src/psyc/lines/federation.py
Normal file
@@ -0,0 +1,405 @@
|
|||||||
|
"""Federation — node identity, signed feeds, peer registry.
|
||||||
|
|
||||||
|
Identity layer for internet-wide federation of psyc nodes. Each node owns
|
||||||
|
an Ed25519 keypair persisted under DATA_DIR/federation/. The public key
|
||||||
|
fingerprint (first 16 bytes of SHA256(raw_pubkey) hex-encoded) goes into a
|
||||||
|
DNS TXT record so peers can discover and authenticate the node, and the
|
||||||
|
private key signs the outbound feed at /federation/feed.
|
||||||
|
|
||||||
|
This module is the *identity* primitives only — discovery walkers,
|
||||||
|
vouching/quorum, transparency log and auto-pull live in later stages.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from psyc import DATA_DIR, db, log
|
||||||
|
from psyc.result import Err, Ok, Result
|
||||||
|
|
||||||
|
|
||||||
|
_log = log.get(__name__)
|
||||||
|
|
||||||
|
FED_DIR = DATA_DIR / "federation"
|
||||||
|
PRIVATE_KEY_PATH = FED_DIR / "node.key"
|
||||||
|
PUBLIC_KEY_PATH = FED_DIR / "node.pub"
|
||||||
|
|
||||||
|
FEED_VERSION = "psyc1"
|
||||||
|
FEED_ALG = "ed25519"
|
||||||
|
FEED_PATH = "/federation/feed"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- keypair persistence -----------------------------------------
|
||||||
|
|
||||||
|
def _ensure_dir() -> None:
|
||||||
|
FED_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def node_keypair() -> Tuple[ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey]:
|
||||||
|
"""Return the node's Ed25519 keypair, generating + persisting it on first call.
|
||||||
|
|
||||||
|
Private key lands at data/federation/node.key (PEM, chmod 0600); public
|
||||||
|
at data/federation/node.pub (PEM). Idempotent — subsequent calls load
|
||||||
|
the existing files instead of generating new ones.
|
||||||
|
"""
|
||||||
|
_ensure_dir()
|
||||||
|
if PRIVATE_KEY_PATH.exists() and PUBLIC_KEY_PATH.exists():
|
||||||
|
priv_pem = PRIVATE_KEY_PATH.read_bytes()
|
||||||
|
priv = serialization.load_pem_private_key(priv_pem, password=None)
|
||||||
|
if not isinstance(priv, ed25519.Ed25519PrivateKey):
|
||||||
|
raise RuntimeError(f"federation key at {PRIVATE_KEY_PATH} is not Ed25519")
|
||||||
|
return priv, priv.public_key()
|
||||||
|
|
||||||
|
priv = ed25519.Ed25519PrivateKey.generate()
|
||||||
|
priv_pem = priv.private_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PrivateFormat.PKCS8,
|
||||||
|
encryption_algorithm=serialization.NoEncryption(),
|
||||||
|
)
|
||||||
|
pub = priv.public_key()
|
||||||
|
pub_pem = pub.public_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
)
|
||||||
|
PRIVATE_KEY_PATH.write_bytes(priv_pem)
|
||||||
|
os.chmod(PRIVATE_KEY_PATH, 0o600)
|
||||||
|
PUBLIC_KEY_PATH.write_bytes(pub_pem)
|
||||||
|
_log.info("federation.keypair.generated", path=str(PRIVATE_KEY_PATH))
|
||||||
|
return priv, pub
|
||||||
|
|
||||||
|
|
||||||
|
def public_key_pem() -> str:
|
||||||
|
"""PEM-encoded public key as text — what peers store + verify against."""
|
||||||
|
_, pub = node_keypair()
|
||||||
|
return pub.public_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
).decode("ascii")
|
||||||
|
|
||||||
|
|
||||||
|
def _raw_pubkey_bytes(pub: ed25519.Ed25519PublicKey) -> bytes:
|
||||||
|
return pub.public_bytes(
|
||||||
|
encoding=serialization.Encoding.Raw,
|
||||||
|
format=serialization.PublicFormat.Raw,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def node_fingerprint() -> str:
|
||||||
|
"""Short stable id for the node — first 16 bytes of SHA256(raw_pubkey), hex.
|
||||||
|
|
||||||
|
Lives in DNS TXT records; 32 hex chars is short enough to fit but long
|
||||||
|
enough to be collision-safe for any plausible peer population.
|
||||||
|
"""
|
||||||
|
_, pub = node_keypair()
|
||||||
|
digest = hashlib.sha256(_raw_pubkey_bytes(pub)).digest()
|
||||||
|
return digest[:16].hex()
|
||||||
|
|
||||||
|
|
||||||
|
def _fingerprint_for_pubkey_pem(pubkey_pem: str) -> str:
|
||||||
|
pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
|
||||||
|
if not isinstance(pub, ed25519.Ed25519PublicKey):
|
||||||
|
raise ValueError("not an Ed25519 public key")
|
||||||
|
return hashlib.sha256(_raw_pubkey_bytes(pub)).digest()[:16].hex()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- DNS record format -------------------------------------------
|
||||||
|
|
||||||
|
class DNSRecord(BaseModel):
|
||||||
|
"""The SRV + TXT pair an admin pastes into their zone file."""
|
||||||
|
srv_name: str
|
||||||
|
srv_target: str
|
||||||
|
srv_port: int
|
||||||
|
srv_priority: int = 10
|
||||||
|
srv_weight: int = 10
|
||||||
|
txt_name: str
|
||||||
|
txt_value: str
|
||||||
|
human_instructions: str
|
||||||
|
|
||||||
|
|
||||||
|
def dns_record(domain: str, port: int = 443) -> DNSRecord:
|
||||||
|
"""Build the DNS-SD-style records that advertise this node at `domain`."""
|
||||||
|
fp = node_fingerprint()
|
||||||
|
srv_name = f"_psyc._tcp.{domain}"
|
||||||
|
srv_target = f"{domain}."
|
||||||
|
txt_name = srv_name
|
||||||
|
txt_value = f"v={FEED_VERSION} fp={fp} alg={FEED_ALG} path={FEED_PATH}"
|
||||||
|
instructions = (
|
||||||
|
f"; psyc federation records for {domain}\n"
|
||||||
|
f"; ----------------------------------------------------------\n"
|
||||||
|
f"; 1) SRV record — locates this psyc node (host + port).\n"
|
||||||
|
f'{srv_name}. 3600 IN SRV 10 10 {port} {srv_target}\n'
|
||||||
|
f";\n"
|
||||||
|
f"; 2) TXT record — declares protocol version, key fingerprint,\n"
|
||||||
|
f"; signature algorithm, and the feed endpoint path.\n"
|
||||||
|
f'{txt_name}. 3600 IN TXT "{txt_value}"\n'
|
||||||
|
f"; ----------------------------------------------------------\n"
|
||||||
|
f"; Once these are live, federation peers can fetch:\n"
|
||||||
|
f"; https://{domain}{FEED_PATH} (signed feed JSON)\n"
|
||||||
|
f"; https://{domain}/federation/key (public key PEM)\n"
|
||||||
|
f"; https://{domain}/federation/info (capabilities)\n"
|
||||||
|
)
|
||||||
|
return DNSRecord(
|
||||||
|
srv_name=srv_name,
|
||||||
|
srv_target=srv_target,
|
||||||
|
srv_port=port,
|
||||||
|
txt_name=txt_name,
|
||||||
|
txt_value=txt_value,
|
||||||
|
human_instructions=instructions,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- signing -----------------------------------------------------
|
||||||
|
|
||||||
|
def canonical_json(obj: Dict[str, Any]) -> bytes:
|
||||||
|
"""Deterministic JSON serialization — what we sign + hash over."""
|
||||||
|
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def sign_payload(payload: bytes) -> bytes:
|
||||||
|
"""Ed25519 signature over `payload`. Raw 64-byte sig."""
|
||||||
|
priv, _ = node_keypair()
|
||||||
|
return priv.sign(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_payload(payload: bytes, signature: bytes, pubkey_pem: str) -> bool:
|
||||||
|
"""True iff `signature` verifies under `pubkey_pem`. Never raises."""
|
||||||
|
try:
|
||||||
|
pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
|
||||||
|
if not isinstance(pub, ed25519.Ed25519PublicKey):
|
||||||
|
return False
|
||||||
|
pub.verify(signature, payload)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- feed export -------------------------------------------------
|
||||||
|
|
||||||
|
def _case_digest(case_record: Dict[str, Any]) -> str:
|
||||||
|
return hashlib.sha256(canonical_json(case_record)).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def _build_case_records(window_hours: int) -> List[Dict[str, Any]]:
|
||||||
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
|
||||||
|
out: List[Dict[str, Any]] = []
|
||||||
|
for case in db.list_cases(limit=10_000):
|
||||||
|
if case.ingested_at < cutoff:
|
||||||
|
continue
|
||||||
|
record: Dict[str, Any] = {
|
||||||
|
"case_id": case.case_id,
|
||||||
|
"summary": case.summary,
|
||||||
|
"severity": case.classification.severity.value if case.classification.severity else None,
|
||||||
|
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
|
||||||
|
"observed_at": case.observed_at.isoformat(),
|
||||||
|
"feed_source": case.source_metadata.get("feed", ""),
|
||||||
|
"iocs": (
|
||||||
|
[{"value": v, "type": "url"} for v in case.observables.urls]
|
||||||
|
+ [{"value": v, "type": "domain"} for v in case.observables.domains]
|
||||||
|
+ [{"value": v, "type": "ip"} for v in case.observables.ips]
|
||||||
|
+ [{"value": v, "type": "hash"} for v in case.observables.hashes]
|
||||||
|
+ [{"value": v, "type": "cve"} for v in case.observables.cves]
|
||||||
|
),
|
||||||
|
}
|
||||||
|
record["digest_sha256"] = _case_digest(
|
||||||
|
{k: v for k, v in record.items() if k != "digest_sha256"}
|
||||||
|
)
|
||||||
|
out.append(record)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _build_ioc_records(window_hours: int) -> List[Dict[str, Any]]:
|
||||||
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
|
||||||
|
out: List[Dict[str, Any]] = []
|
||||||
|
seen: set = set()
|
||||||
|
for ioc_type in ("url", "domain", "ip", "hash", "cve"):
|
||||||
|
for row in db.iocs_by_type(ioc_type):
|
||||||
|
first_seen = row.get("first_seen")
|
||||||
|
if first_seen:
|
||||||
|
try:
|
||||||
|
if datetime.fromisoformat(first_seen) < cutoff:
|
||||||
|
continue
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
key = (row["value"], row["ioc_type"])
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(key)
|
||||||
|
record = {
|
||||||
|
"value": row["value"],
|
||||||
|
"type": row["ioc_type"],
|
||||||
|
"severity": row.get("severity"),
|
||||||
|
"first_seen": first_seen,
|
||||||
|
}
|
||||||
|
record["digest_sha256"] = hashlib.sha256(canonical_json(record)).hexdigest()
|
||||||
|
out.append(record)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
|
||||||
|
"""Build the JSON feed peers will pull from /federation/feed.
|
||||||
|
|
||||||
|
Pulls cases ingested in the last `window_hours` plus the corresponding
|
||||||
|
IOC slice, attaches per-record `digest_sha256` (so peers can later
|
||||||
|
quorum-match across nodes), and signs the canonical JSON of the whole
|
||||||
|
payload-minus-signature with our Ed25519 key.
|
||||||
|
"""
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"version": FEED_VERSION,
|
||||||
|
"fingerprint": node_fingerprint(),
|
||||||
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"window_hours": window_hours,
|
||||||
|
"cases": _build_case_records(window_hours),
|
||||||
|
"iocs": _build_ioc_records(window_hours),
|
||||||
|
}
|
||||||
|
sig = sign_payload(canonical_json(payload))
|
||||||
|
payload["signature"] = base64.b64encode(sig).decode("ascii")
|
||||||
|
return payload
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- import + quorum-signal buffer -------------------------------
|
||||||
|
|
||||||
|
class ImportSummary(BaseModel):
|
||||||
|
peer_fingerprint: str
|
||||||
|
cases_seen: int
|
||||||
|
iocs_seen: int
|
||||||
|
signal_ids: List[Tuple[str, str]] = Field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result[ImportSummary, str]:
|
||||||
|
"""Verify + record a peer's feed into the federation_signals buffer.
|
||||||
|
|
||||||
|
Does NOT merge into the local case store — that's the quorum stage's
|
||||||
|
job. The buffer is the per-hash signal log that quorum logic later
|
||||||
|
aggregates ("3 trusted peers reported this same IOC → promote").
|
||||||
|
"""
|
||||||
|
sig_b64 = feed.get("signature")
|
||||||
|
if not sig_b64:
|
||||||
|
return Err("missing signature")
|
||||||
|
try:
|
||||||
|
signature = base64.b64decode(sig_b64)
|
||||||
|
except Exception:
|
||||||
|
return Err("malformed signature (not base64)")
|
||||||
|
|
||||||
|
unsigned = {k: v for k, v in feed.items() if k != "signature"}
|
||||||
|
if not verify_payload(canonical_json(unsigned), signature, expected_pubkey_pem):
|
||||||
|
return Err("signature verification failed")
|
||||||
|
|
||||||
|
peer_fp = feed.get("fingerprint", "")
|
||||||
|
if not peer_fp:
|
||||||
|
return Err("missing fingerprint")
|
||||||
|
if peer_fp == node_fingerprint():
|
||||||
|
return Err("loop: own feed")
|
||||||
|
|
||||||
|
# Cross-check the declared fingerprint matches the pubkey we verified with.
|
||||||
|
try:
|
||||||
|
if _fingerprint_for_pubkey_pem(expected_pubkey_pem) != peer_fp:
|
||||||
|
return Err("fingerprint does not match provided pubkey")
|
||||||
|
except Exception as exc:
|
||||||
|
return Err(f"bad pubkey: {exc}")
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
signal_ids: List[Tuple[str, str]] = []
|
||||||
|
cases = feed.get("cases") or []
|
||||||
|
iocs = feed.get("iocs") or []
|
||||||
|
|
||||||
|
for c in cases:
|
||||||
|
case_id = c.get("case_id") or ""
|
||||||
|
digest = c.get("digest_sha256") or hashlib.sha256(canonical_json(c)).hexdigest()
|
||||||
|
db.record_signal(dict(
|
||||||
|
peer_fingerprint=peer_fp,
|
||||||
|
signal_type="case",
|
||||||
|
signal_id=case_id,
|
||||||
|
signal_hash=digest,
|
||||||
|
received_at=now,
|
||||||
|
raw_json=json.dumps(c, sort_keys=True),
|
||||||
|
))
|
||||||
|
signal_ids.append(("case", digest))
|
||||||
|
|
||||||
|
for i in iocs:
|
||||||
|
value = i.get("value") or ""
|
||||||
|
digest = i.get("digest_sha256") or hashlib.sha256(canonical_json(i)).hexdigest()
|
||||||
|
db.record_signal(dict(
|
||||||
|
peer_fingerprint=peer_fp,
|
||||||
|
signal_type="ioc",
|
||||||
|
signal_id=value,
|
||||||
|
signal_hash=digest,
|
||||||
|
received_at=now,
|
||||||
|
raw_json=json.dumps(i, sort_keys=True),
|
||||||
|
))
|
||||||
|
signal_ids.append(("ioc", digest))
|
||||||
|
|
||||||
|
_log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
|
||||||
|
return Ok(ImportSummary(
|
||||||
|
peer_fingerprint=peer_fp,
|
||||||
|
cases_seen=len(cases),
|
||||||
|
iocs_seen=len(iocs),
|
||||||
|
signal_ids=signal_ids,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- peer registry ------------------------------------------------
|
||||||
|
|
||||||
|
class Peer(BaseModel):
|
||||||
|
domain: str
|
||||||
|
fingerprint: str
|
||||||
|
pubkey_pem: str
|
||||||
|
status: str = "unknown" # unknown | trusted | blocked
|
||||||
|
discovered_at: str
|
||||||
|
last_seen: Optional[str] = None
|
||||||
|
notes: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _row_to_peer(row: Dict[str, Any]) -> Peer:
|
||||||
|
return Peer(
|
||||||
|
domain=row["domain"],
|
||||||
|
fingerprint=row["fingerprint"],
|
||||||
|
pubkey_pem=row["pubkey_pem"],
|
||||||
|
status=row.get("status") or "unknown",
|
||||||
|
discovered_at=row.get("discovered_at") or "",
|
||||||
|
last_seen=row.get("last_seen"),
|
||||||
|
notes=row.get("notes"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def register_peer(domain: str, fingerprint: str, pubkey_pem: str, status: str = "unknown") -> None:
|
||||||
|
"""Insert or update a peer in the registry. Idempotent on `domain`."""
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
existing = db.get_peer(domain)
|
||||||
|
discovered_at = existing["discovered_at"] if existing else now
|
||||||
|
db.upsert_peer(dict(
|
||||||
|
domain=domain,
|
||||||
|
fingerprint=fingerprint,
|
||||||
|
pubkey_pem=pubkey_pem,
|
||||||
|
status=status,
|
||||||
|
discovered_at=discovered_at,
|
||||||
|
last_seen=now,
|
||||||
|
notes=existing.get("notes") if existing else None,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
def list_peers() -> List[Peer]:
|
||||||
|
return [_row_to_peer(r) for r in db.list_peers()]
|
||||||
|
|
||||||
|
|
||||||
|
def get_peer(domain: str) -> Optional[Peer]:
|
||||||
|
row = db.get_peer(domain)
|
||||||
|
return _row_to_peer(row) if row else None
|
||||||
|
|
||||||
|
|
||||||
|
def set_peer_status(domain: str, status: str) -> None:
|
||||||
|
if status not in ("unknown", "trusted", "blocked"):
|
||||||
|
raise ValueError(f"unknown peer status: {status}")
|
||||||
|
db.set_peer_status(domain, status)
|
||||||
|
|
||||||
|
|
||||||
|
def remove_peer(domain: str) -> None:
|
||||||
|
db.remove_peer(domain)
|
||||||
380
src/psyc/lines/pulse.py
Normal file
380
src/psyc/lines/pulse.py
Normal file
@@ -0,0 +1,380 @@
|
|||||||
|
"""Pulseline — cron-style scheduler that drives every psyc pipeline on a cadence.
|
||||||
|
|
||||||
|
Each registered pipeline has an autonomy mode (manual / auto-propose /
|
||||||
|
auto-execute) and a cadence in seconds. tick() iterates every pipeline and
|
||||||
|
fires whichever ones are due (and enabled, and not manual, and the global kill
|
||||||
|
switch is off). State persists to SQLite via psyc.db so cadences survive
|
||||||
|
restarts. A background asyncio loop calls tick() at a fixed interval — the
|
||||||
|
cockpit lifespan attaches it.
|
||||||
|
|
||||||
|
NOTE: federation pipelines (peer-pull, vouch-refresh) are wired as placeholders
|
||||||
|
that return a no-op string. Real federation lands in a later stage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import traceback
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Callable, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from psyc import db, log
|
||||||
|
|
||||||
|
|
||||||
|
_log = log.get(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PulseMode(str, Enum):
|
||||||
|
MANUAL = "manual"
|
||||||
|
AUTO_PROPOSE = "auto-propose"
|
||||||
|
AUTO_EXECUTE = "auto-execute"
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline(BaseModel):
|
||||||
|
name: str
|
||||||
|
title: str
|
||||||
|
description: str
|
||||||
|
mode: PulseMode
|
||||||
|
cadence_seconds: int
|
||||||
|
enabled: bool = True
|
||||||
|
last_fired: Optional[datetime] = None
|
||||||
|
next_fire: Optional[datetime] = None
|
||||||
|
last_result: str = ""
|
||||||
|
last_outcome: str = "" # "ok" | "err" | "skipped" | ""
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- pipeline runners --------------------------------------------------
|
||||||
|
|
||||||
|
def _run_fetch() -> str:
|
||||||
|
"""Fetch every enabled scout source; partial fetch is fine.
|
||||||
|
|
||||||
|
Skip-on-fail is critical: keyed feeds (threatfox, malware-bazaar, otx)
|
||||||
|
raise when their key isn't configured — we don't want one missing key to
|
||||||
|
block the public ones.
|
||||||
|
"""
|
||||||
|
from psyc.lines import scout
|
||||||
|
|
||||||
|
plan: Tuple[Tuple[str, Optional[int]], ...] = (
|
||||||
|
("urlhaus", 50),
|
||||||
|
("cisa-kev", 100),
|
||||||
|
("feodo", 50),
|
||||||
|
("threatfox", 200),
|
||||||
|
("malware-bazaar", 100),
|
||||||
|
("otx", 100),
|
||||||
|
)
|
||||||
|
total = 0
|
||||||
|
feeds_ok = 0
|
||||||
|
feeds_err: List[str] = []
|
||||||
|
for source, limit in plan:
|
||||||
|
try:
|
||||||
|
cases = scout.fetch_and_signal(source, limit=limit)
|
||||||
|
for c in cases:
|
||||||
|
db.upsert_case(c)
|
||||||
|
total += len(cases)
|
||||||
|
feeds_ok += 1
|
||||||
|
except Exception as exc: # noqa: BLE001 — partial fetch is the point
|
||||||
|
feeds_err.append(source)
|
||||||
|
_log.info("pulse.fetch.skip", source=source, error=str(exc)[:200])
|
||||||
|
tail = f" (skipped: {', '.join(feeds_err)})" if feeds_err else ""
|
||||||
|
return f"fetched {total} cases across {feeds_ok} feed(s){tail}"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_classify() -> str:
|
||||||
|
from psyc.lines import classify
|
||||||
|
|
||||||
|
cases = db.list_cases(limit=10_000)
|
||||||
|
n = 0
|
||||||
|
for c in cases:
|
||||||
|
classify.classify(c)
|
||||||
|
db.upsert_case(c)
|
||||||
|
n += 1
|
||||||
|
return f"classified {n} case(s)"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_prove() -> str:
|
||||||
|
from psyc.lines import proof
|
||||||
|
|
||||||
|
cases = db.list_cases(limit=10_000)
|
||||||
|
n = 0
|
||||||
|
for c in cases:
|
||||||
|
proof.prove(c)
|
||||||
|
db.upsert_case(c)
|
||||||
|
n += 1
|
||||||
|
return f"proved {n} case(s)"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_reindex() -> str:
|
||||||
|
from psyc.lines import lookup
|
||||||
|
|
||||||
|
cases = db.list_cases(limit=1_000_000)
|
||||||
|
written = lookup.reindex(cases)
|
||||||
|
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_respond_propose() -> str:
|
||||||
|
"""Propose response actions for high-severity cases that don't yet have any.
|
||||||
|
|
||||||
|
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
|
||||||
|
helper is already idempotent per case (skips cases that already have
|
||||||
|
actions), so re-running this on a tick is safe.
|
||||||
|
"""
|
||||||
|
from psyc.lines import respond
|
||||||
|
|
||||||
|
cases = db.list_cases(limit=10_000)
|
||||||
|
proposed = 0
|
||||||
|
touched = 0
|
||||||
|
for c in cases:
|
||||||
|
ids = respond.propose_for_case(c)
|
||||||
|
if ids:
|
||||||
|
proposed += len(ids)
|
||||||
|
touched += 1
|
||||||
|
return f"proposed {proposed} action(s) for {touched} case(s)"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_peer_pull() -> str:
|
||||||
|
return "federation not yet active"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_vouch_refresh() -> str:
|
||||||
|
return "federation not yet active"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- registry ----------------------------------------------------------
|
||||||
|
|
||||||
|
_REGISTRY: Dict[str, Callable[[], str]] = {
|
||||||
|
"fetch": _run_fetch,
|
||||||
|
"classify": _run_classify,
|
||||||
|
"prove": _run_prove,
|
||||||
|
"reindex": _run_reindex,
|
||||||
|
"respond": _run_respond_propose,
|
||||||
|
"peer-pull": _run_peer_pull,
|
||||||
|
"vouch-refresh": _run_vouch_refresh,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Initial defaults — seeded once on first DB init. Tuples of
|
||||||
|
# (name, title, description, mode, cadence_seconds, enabled).
|
||||||
|
_DEFAULTS: Tuple[Tuple[str, str, str, PulseMode, int, bool], ...] = (
|
||||||
|
("fetch", "Scout · fetch feeds", "Pull every configured threat feed and ingest new cases.",
|
||||||
|
PulseMode.AUTO_EXECUTE, 900, True),
|
||||||
|
("classify", "Classify · label cases", "Assign incident type, severity, TLP, and internal class to every case.",
|
||||||
|
PulseMode.AUTO_EXECUTE, 300, True),
|
||||||
|
("prove", "Proof · score confidence", "Compute confidence (reliability · credibility · freshness) for every case.",
|
||||||
|
PulseMode.AUTO_EXECUTE, 300, True),
|
||||||
|
("reindex", "Lookup · rebuild IOC index","Rebuild the IOC reverse-index over the case corpus.",
|
||||||
|
PulseMode.AUTO_EXECUTE, 3600, True),
|
||||||
|
("respond", "Respond · propose actions", "Propose human-gated response actions for newly high-severity cases.",
|
||||||
|
PulseMode.AUTO_PROPOSE, 600, True),
|
||||||
|
("peer-pull", "Federation · peer pull", "(placeholder) Pull sealed cases from federated peers.",
|
||||||
|
PulseMode.MANUAL, 600, False),
|
||||||
|
("vouch-refresh","Federation · vouch refresh","(placeholder) Refresh per-peer vouching ledgers.",
|
||||||
|
PulseMode.MANUAL, 3600, False),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- helpers -----------------------------------------------------------
|
||||||
|
|
||||||
|
def _now() -> datetime:
|
||||||
|
return datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_dt(value: Optional[str]) -> Optional[datetime]:
|
||||||
|
if not value:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _row_to_pipeline(row: dict) -> Pipeline:
|
||||||
|
return Pipeline(
|
||||||
|
name=row["name"],
|
||||||
|
title=row["title"],
|
||||||
|
description=row["description"],
|
||||||
|
mode=PulseMode(row["mode"]),
|
||||||
|
cadence_seconds=int(row["cadence_seconds"]),
|
||||||
|
enabled=bool(row["enabled"]),
|
||||||
|
last_fired=_parse_dt(row.get("last_fired")),
|
||||||
|
next_fire=_parse_dt(row.get("next_fire")),
|
||||||
|
last_result=row.get("last_result") or "",
|
||||||
|
last_outcome=row.get("last_outcome") or "",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _pipeline_to_row(p: Pipeline) -> dict:
|
||||||
|
return dict(
|
||||||
|
name=p.name,
|
||||||
|
title=p.title,
|
||||||
|
description=p.description,
|
||||||
|
mode=p.mode.value,
|
||||||
|
cadence_seconds=int(p.cadence_seconds),
|
||||||
|
enabled=bool(p.enabled),
|
||||||
|
last_fired=p.last_fired.isoformat() if p.last_fired else None,
|
||||||
|
next_fire=p.next_fire.isoformat() if p.next_fire else None,
|
||||||
|
last_result=p.last_result or "",
|
||||||
|
last_outcome=p.last_outcome or "",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def seed_defaults() -> None:
|
||||||
|
"""Insert any default pipelines that aren't already in the DB. Idempotent."""
|
||||||
|
existing = {row["name"] for row in db.get_pulse_state()}
|
||||||
|
for name, title, desc, mode, cadence, enabled in _DEFAULTS:
|
||||||
|
if name in existing:
|
||||||
|
continue
|
||||||
|
p = Pipeline(
|
||||||
|
name=name, title=title, description=desc,
|
||||||
|
mode=mode, cadence_seconds=cadence, enabled=enabled,
|
||||||
|
next_fire=_now(), # first tick after install fires immediately if due
|
||||||
|
)
|
||||||
|
db.upsert_pulse_pipeline(_pipeline_to_row(p))
|
||||||
|
_log.info("pulse.defaults.seeded", count=len(_DEFAULTS))
|
||||||
|
|
||||||
|
|
||||||
|
def state() -> List[Pipeline]:
|
||||||
|
"""Every pipeline, ordered by name. Seeds defaults on the first call."""
|
||||||
|
rows = db.get_pulse_state()
|
||||||
|
if not rows:
|
||||||
|
seed_defaults()
|
||||||
|
rows = db.get_pulse_state()
|
||||||
|
return [_row_to_pipeline(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
|
def _get_pipeline(name: str) -> Optional[Pipeline]:
|
||||||
|
for p in state():
|
||||||
|
if p.name == name:
|
||||||
|
return p
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def set_mode(name: str, mode: PulseMode) -> None:
|
||||||
|
p = _get_pipeline(name)
|
||||||
|
if p is None:
|
||||||
|
raise ValueError(f"unknown pipeline: {name}")
|
||||||
|
p.mode = mode
|
||||||
|
db.upsert_pulse_pipeline(_pipeline_to_row(p))
|
||||||
|
|
||||||
|
|
||||||
|
def set_cadence(name: str, seconds: int) -> None:
|
||||||
|
if seconds <= 0:
|
||||||
|
raise ValueError("cadence must be > 0 seconds")
|
||||||
|
p = _get_pipeline(name)
|
||||||
|
if p is None:
|
||||||
|
raise ValueError(f"unknown pipeline: {name}")
|
||||||
|
p.cadence_seconds = int(seconds)
|
||||||
|
db.upsert_pulse_pipeline(_pipeline_to_row(p))
|
||||||
|
|
||||||
|
|
||||||
|
def set_enabled(name: str, enabled: bool) -> None:
|
||||||
|
p = _get_pipeline(name)
|
||||||
|
if p is None:
|
||||||
|
raise ValueError(f"unknown pipeline: {name}")
|
||||||
|
p.enabled = bool(enabled)
|
||||||
|
db.upsert_pulse_pipeline(_pipeline_to_row(p))
|
||||||
|
|
||||||
|
|
||||||
|
def set_kill_switch(armed: bool) -> None:
|
||||||
|
db.kill_switch_set(armed)
|
||||||
|
_log.warning("pulse.killswitch.changed", armed=bool(armed))
|
||||||
|
|
||||||
|
|
||||||
|
def kill_switch_state() -> bool:
|
||||||
|
return db.kill_switch_get()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- the heartbeat -----------------------------------------------------
|
||||||
|
|
||||||
|
def _fire(p: Pipeline) -> Tuple[str, str]:
|
||||||
|
"""Run one pipeline. Returns (outcome, result_str). Persists the timestamps.
|
||||||
|
|
||||||
|
Outcome is "ok" if the runner returned, "err" if it raised.
|
||||||
|
"""
|
||||||
|
runner = _REGISTRY.get(p.name)
|
||||||
|
if runner is None:
|
||||||
|
outcome = "err"
|
||||||
|
result = f"no runner registered for '{p.name}'"
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
result = runner() or ""
|
||||||
|
outcome = "ok"
|
||||||
|
except Exception as exc: # noqa: BLE001 — log + record, don't crash the loop
|
||||||
|
outcome = "err"
|
||||||
|
result = f"{type(exc).__name__}: {exc}"
|
||||||
|
_log.warning("pulse.fire.error", name=p.name, error=result, trace=traceback.format_exc())
|
||||||
|
|
||||||
|
now = _now()
|
||||||
|
p.last_fired = now
|
||||||
|
p.next_fire = now + timedelta(seconds=max(1, p.cadence_seconds))
|
||||||
|
p.last_result = result[:500]
|
||||||
|
p.last_outcome = outcome
|
||||||
|
db.upsert_pulse_pipeline(_pipeline_to_row(p))
|
||||||
|
_log.info("pulse.fired", name=p.name, outcome=outcome, result=p.last_result)
|
||||||
|
return outcome, p.last_result
|
||||||
|
|
||||||
|
|
||||||
|
def _should_fire(p: Pipeline, now: datetime) -> bool:
|
||||||
|
if not p.enabled:
|
||||||
|
return False
|
||||||
|
if p.mode == PulseMode.MANUAL:
|
||||||
|
return False
|
||||||
|
if p.next_fire is None:
|
||||||
|
return True
|
||||||
|
return now >= p.next_fire
|
||||||
|
|
||||||
|
|
||||||
|
def tick() -> List[Tuple[str, str, str]]:
|
||||||
|
"""Single scheduler heartbeat. Returns (name, outcome, result_str) per pipeline.
|
||||||
|
|
||||||
|
Outcome is "ok" / "err" / "skipped" — every registered pipeline appears in
|
||||||
|
the return value so callers can see what was skipped and why.
|
||||||
|
"""
|
||||||
|
if kill_switch_state():
|
||||||
|
_log.info("pulse.tick.killed")
|
||||||
|
return [(p.name, "skipped", "kill switch armed") for p in state()]
|
||||||
|
|
||||||
|
now = _now()
|
||||||
|
out: List[Tuple[str, str, str]] = []
|
||||||
|
for p in state():
|
||||||
|
if not _should_fire(p, now):
|
||||||
|
out.append((p.name, "skipped", "not due"))
|
||||||
|
continue
|
||||||
|
outcome, result = _fire(p)
|
||||||
|
out.append((p.name, outcome, result))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def run_now(name: str) -> Tuple[str, str]:
|
||||||
|
"""Manually fire one pipeline, bypassing cadence and mode. Honors kill switch.
|
||||||
|
|
||||||
|
Returns (outcome, result_str). Raises ValueError on unknown name.
|
||||||
|
"""
|
||||||
|
if kill_switch_state():
|
||||||
|
return ("skipped", "kill switch armed")
|
||||||
|
p = _get_pipeline(name)
|
||||||
|
if p is None:
|
||||||
|
raise ValueError(f"unknown pipeline: {name}")
|
||||||
|
return _fire(p)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- background loop ---------------------------------------------------
|
||||||
|
|
||||||
|
async def start_background_loop(interval_seconds: int = 30) -> None:
|
||||||
|
"""Long-running scheduler — calls tick() every interval. Launched from FastAPI lifespan.
|
||||||
|
|
||||||
|
Designed to run for the life of the process; cancellation is the normal stop signal.
|
||||||
|
"""
|
||||||
|
_log.info("pulse.loop.starting", interval=interval_seconds)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
tick()
|
||||||
|
except Exception as exc: # noqa: BLE001 — one bad tick must not kill the loop
|
||||||
|
_log.warning("pulse.loop.tick.error", error=str(exc), trace=traceback.format_exc())
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(interval_seconds)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
_log.info("pulse.loop.cancelled")
|
||||||
|
raise
|
||||||
230
tests/test_federation.py
Normal file
230
tests/test_federation.py
Normal file
@@ -0,0 +1,230 @@
|
|||||||
|
"""Federation — identity, signed feed, peer registry, signal buffer."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
import stat
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
from psyc import db
|
||||||
|
from psyc.lines import federation
|
||||||
|
from psyc.lines.federation import (
|
||||||
|
DNSRecord,
|
||||||
|
build_signed_feed,
|
||||||
|
canonical_json,
|
||||||
|
dns_record,
|
||||||
|
import_signed_feed,
|
||||||
|
node_fingerprint,
|
||||||
|
node_keypair,
|
||||||
|
public_key_pem,
|
||||||
|
sign_payload,
|
||||||
|
verify_payload,
|
||||||
|
)
|
||||||
|
from psyc.result import Err, Ok
|
||||||
|
from conftest import make_case
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fresh_db(tmp_path, monkeypatch):
|
||||||
|
test_db = tmp_path / "test.db"
|
||||||
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
||||||
|
db._metadata.create_all(eng, checkfirst=True)
|
||||||
|
monkeypatch.setattr(db, "_engine", eng)
|
||||||
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||||
|
yield test_db
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fed_dir(tmp_path, monkeypatch):
|
||||||
|
"""Redirect federation key paths to a tmp dir so each test gets a fresh key."""
|
||||||
|
d = tmp_path / "federation"
|
||||||
|
monkeypatch.setattr(federation, "FED_DIR", d)
|
||||||
|
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
|
||||||
|
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
|
||||||
|
yield d
|
||||||
|
|
||||||
|
|
||||||
|
def test_keypair_persisted_with_correct_perms(fed_dir):
|
||||||
|
priv, pub = node_keypair()
|
||||||
|
assert federation.PRIVATE_KEY_PATH.exists()
|
||||||
|
assert federation.PUBLIC_KEY_PATH.exists()
|
||||||
|
mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode)
|
||||||
|
assert mode == 0o600
|
||||||
|
|
||||||
|
|
||||||
|
def test_keypair_idempotent_across_calls(fed_dir):
|
||||||
|
priv1, pub1 = node_keypair()
|
||||||
|
priv2, pub2 = node_keypair()
|
||||||
|
# raw bytes match — same key loaded twice, not regenerated
|
||||||
|
raw1 = pub1.public_bytes(
|
||||||
|
encoding=federation.serialization.Encoding.Raw,
|
||||||
|
format=federation.serialization.PublicFormat.Raw,
|
||||||
|
)
|
||||||
|
raw2 = pub2.public_bytes(
|
||||||
|
encoding=federation.serialization.Encoding.Raw,
|
||||||
|
format=federation.serialization.PublicFormat.Raw,
|
||||||
|
)
|
||||||
|
assert raw1 == raw2
|
||||||
|
|
||||||
|
|
||||||
|
def test_fingerprint_is_stable_and_32_hex(fed_dir):
|
||||||
|
fp1 = node_fingerprint()
|
||||||
|
fp2 = node_fingerprint()
|
||||||
|
assert fp1 == fp2
|
||||||
|
assert len(fp1) == 32
|
||||||
|
assert all(c in "0123456789abcdef" for c in fp1)
|
||||||
|
|
||||||
|
|
||||||
|
def test_sign_verify_roundtrip(fed_dir):
|
||||||
|
payload = b"hello federation"
|
||||||
|
sig = sign_payload(payload)
|
||||||
|
assert verify_payload(payload, sig, public_key_pem()) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path):
|
||||||
|
payload = b"the truth"
|
||||||
|
sig = sign_payload(payload)
|
||||||
|
|
||||||
|
# Build a *different* keypair in a separate directory and use its pubkey.
|
||||||
|
other = tmp_path / "other-federation"
|
||||||
|
other.mkdir()
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||||||
|
other_priv = ed25519.Ed25519PrivateKey.generate()
|
||||||
|
other_pub_pem = other_priv.public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
).decode("ascii")
|
||||||
|
|
||||||
|
assert verify_payload(payload, sig, other_pub_pem) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir):
|
||||||
|
sig = sign_payload(b"x")
|
||||||
|
assert verify_payload(b"x", sig, "not a pem") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_canonical_json_is_deterministic():
|
||||||
|
a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}})
|
||||||
|
b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}})
|
||||||
|
assert a == b
|
||||||
|
|
||||||
|
|
||||||
|
def test_dns_record_txt_value_matches_spec(fed_dir):
|
||||||
|
rec = dns_record("example.com")
|
||||||
|
assert isinstance(rec, DNSRecord)
|
||||||
|
fp = node_fingerprint()
|
||||||
|
assert rec.srv_name == "_psyc._tcp.example.com"
|
||||||
|
assert rec.srv_target == "example.com."
|
||||||
|
assert rec.srv_port == 443
|
||||||
|
assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
|
||||||
|
# human instructions include both record lines
|
||||||
|
assert "_psyc._tcp.example.com" in rec.human_instructions
|
||||||
|
assert "SRV" in rec.human_instructions
|
||||||
|
assert "TXT" in rec.human_instructions
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
|
||||||
|
case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"])
|
||||||
|
db.upsert_case(case)
|
||||||
|
feed = build_signed_feed(window_hours=24)
|
||||||
|
assert feed["version"] == "psyc1"
|
||||||
|
assert feed["fingerprint"] == node_fingerprint()
|
||||||
|
assert feed["signature"]
|
||||||
|
# cases entry made it in
|
||||||
|
assert any(c["case_id"] == case.case_id for c in feed["cases"])
|
||||||
|
|
||||||
|
# Import using our own pubkey against a *different* declared fingerprint:
|
||||||
|
# swap the fingerprint so import_signed_feed doesn't reject as a loop.
|
||||||
|
pub = public_key_pem()
|
||||||
|
# Use a fresh keypair to act as "peer" — sign a feed with that key.
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||||||
|
import hashlib
|
||||||
|
peer_priv = ed25519.Ed25519PrivateKey.generate()
|
||||||
|
peer_pub_pem = peer_priv.public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
).decode("ascii")
|
||||||
|
peer_raw = peer_priv.public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.Raw,
|
||||||
|
format=serialization.PublicFormat.Raw,
|
||||||
|
)
|
||||||
|
peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex()
|
||||||
|
feed["fingerprint"] = peer_fp
|
||||||
|
unsigned = {k: v for k, v in feed.items() if k != "signature"}
|
||||||
|
new_sig = peer_priv.sign(canonical_json(unsigned))
|
||||||
|
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
|
||||||
|
|
||||||
|
result = import_signed_feed(feed, peer_pub_pem)
|
||||||
|
assert isinstance(result, Ok), getattr(result, "reason", "")
|
||||||
|
summary = result.value
|
||||||
|
assert summary.peer_fingerprint == peer_fp
|
||||||
|
assert summary.cases_seen >= 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir):
|
||||||
|
db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"]))
|
||||||
|
feed = build_signed_feed(window_hours=24)
|
||||||
|
# build a *different* pubkey to claim verification against
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||||||
|
other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
).decode("ascii")
|
||||||
|
# also need to change fingerprint so the loop-check doesn't trigger first
|
||||||
|
feed["fingerprint"] = "deadbeef" * 4
|
||||||
|
result = import_signed_feed(feed, other_pub_pem)
|
||||||
|
assert isinstance(result, Err)
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_own_feed_returns_loop_err(fresh_db, fed_dir):
|
||||||
|
db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"]))
|
||||||
|
feed = build_signed_feed(window_hours=24)
|
||||||
|
result = import_signed_feed(feed, public_key_pem())
|
||||||
|
assert isinstance(result, Err)
|
||||||
|
assert "loop" in result.reason
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_signal_then_lookup_by_hash(fresh_db):
|
||||||
|
rid = db.record_signal(dict(
|
||||||
|
peer_fingerprint="abc123",
|
||||||
|
signal_type="ioc",
|
||||||
|
signal_id="1.2.3.4",
|
||||||
|
signal_hash="hash-aaa",
|
||||||
|
received_at="2026-01-01T00:00:00+00:00",
|
||||||
|
raw_json="{}",
|
||||||
|
))
|
||||||
|
assert rid > 0
|
||||||
|
rows = db.signals_for_hash("hash-aaa")
|
||||||
|
assert len(rows) == 1
|
||||||
|
assert rows[0]["peer_fingerprint"] == "abc123"
|
||||||
|
# second peer reports the same hash → both surface for quorum check
|
||||||
|
db.record_signal(dict(
|
||||||
|
peer_fingerprint="def456",
|
||||||
|
signal_type="ioc",
|
||||||
|
signal_id="1.2.3.4",
|
||||||
|
signal_hash="hash-aaa",
|
||||||
|
received_at="2026-01-01T00:01:00+00:00",
|
||||||
|
raw_json="{}",
|
||||||
|
))
|
||||||
|
rows = db.signals_for_hash("hash-aaa")
|
||||||
|
assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_peer_registry_crud(fresh_db, fed_dir):
|
||||||
|
federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted")
|
||||||
|
peers = federation.list_peers()
|
||||||
|
assert len(peers) == 1
|
||||||
|
assert peers[0].domain == "peer.example"
|
||||||
|
assert peers[0].status == "trusted"
|
||||||
|
|
||||||
|
federation.set_peer_status("peer.example", "blocked")
|
||||||
|
assert federation.get_peer("peer.example").status == "blocked"
|
||||||
|
|
||||||
|
federation.remove_peer("peer.example")
|
||||||
|
assert federation.list_peers() == []
|
||||||
218
tests/test_pulse.py
Normal file
218
tests/test_pulse.py
Normal file
@@ -0,0 +1,218 @@
|
|||||||
|
"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
from psyc import db
|
||||||
|
from psyc.lines import pulse
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fresh_db(tmp_path, monkeypatch):
|
||||||
|
"""A temp SQLite + a swapped-in registry of fast, deterministic runners.
|
||||||
|
|
||||||
|
Real runners would talk to scout / classify / proof / etc — none of which
|
||||||
|
we want to exercise from a pulse-only test. We replace _REGISTRY in-place
|
||||||
|
so tick() drives the scheduler logic against trivial counters.
|
||||||
|
"""
|
||||||
|
test_db = tmp_path / "pulse.db"
|
||||||
|
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
|
||||||
|
db._metadata.create_all(test_engine, checkfirst=True)
|
||||||
|
monkeypatch.setattr(db, "_engine", test_engine)
|
||||||
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||||
|
|
||||||
|
counters = {name: 0 for name in pulse._REGISTRY}
|
||||||
|
|
||||||
|
def _make(name: str):
|
||||||
|
def runner() -> str:
|
||||||
|
counters[name] += 1
|
||||||
|
return f"{name}: tick {counters[name]}"
|
||||||
|
return runner
|
||||||
|
|
||||||
|
fake_registry = {name: _make(name) for name in pulse._REGISTRY}
|
||||||
|
monkeypatch.setattr(pulse, "_REGISTRY", fake_registry)
|
||||||
|
return counters
|
||||||
|
|
||||||
|
|
||||||
|
def _set_last_fired(name: str, when: datetime, cadence: int) -> None:
|
||||||
|
"""Force a pipeline's last_fired + next_fire to a known instant."""
|
||||||
|
for p in pulse.state():
|
||||||
|
if p.name == name:
|
||||||
|
p.last_fired = when
|
||||||
|
p.next_fire = when + timedelta(seconds=cadence)
|
||||||
|
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
|
||||||
|
return
|
||||||
|
raise AssertionError(f"unknown pipeline {name}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_defaults_seeded_on_first_state(fresh_db):
|
||||||
|
pipelines = pulse.state()
|
||||||
|
names = {p.name for p in pipelines}
|
||||||
|
assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"}
|
||||||
|
# Spot-check a couple of defaults from the spec.
|
||||||
|
by_name = {p.name: p for p in pipelines}
|
||||||
|
assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE
|
||||||
|
assert by_name["fetch"].cadence_seconds == 900
|
||||||
|
assert by_name["peer-pull"].enabled is False
|
||||||
|
assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_persists_across_reads(fresh_db):
|
||||||
|
pulse.set_cadence("fetch", 42)
|
||||||
|
pulse.set_mode("classify", pulse.PulseMode.MANUAL)
|
||||||
|
pulse.set_enabled("reindex", False)
|
||||||
|
by_name = {p.name: p for p in pulse.state()}
|
||||||
|
assert by_name["fetch"].cadence_seconds == 42
|
||||||
|
assert by_name["classify"].mode == pulse.PulseMode.MANUAL
|
||||||
|
assert by_name["reindex"].enabled is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_tick_skips_not_due_pipelines(fresh_db):
|
||||||
|
# Push every pipeline well into the future — nothing should fire.
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
for p in pulse.state():
|
||||||
|
_set_last_fired(p.name, now, p.cadence_seconds)
|
||||||
|
results = pulse.tick()
|
||||||
|
outcomes = {name: outcome for name, outcome, _ in results}
|
||||||
|
assert all(o == "skipped" for o in outcomes.values()), outcomes
|
||||||
|
assert all(c == 0 for c in fresh_db.values()), fresh_db
|
||||||
|
|
||||||
|
|
||||||
|
def test_tick_fires_due_auto_pipelines(fresh_db):
|
||||||
|
# Force "fetch" to be due (last fired far in the past) and re-tick.
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||||
|
for p in pulse.state():
|
||||||
|
_set_last_fired(p.name, past, p.cadence_seconds)
|
||||||
|
results = pulse.tick()
|
||||||
|
by_name = {name: (outcome, result) for name, outcome, result in results}
|
||||||
|
# auto-execute + auto-propose modes should fire; manual should skip.
|
||||||
|
assert by_name["fetch"][0] == "ok"
|
||||||
|
assert by_name["classify"][0] == "ok"
|
||||||
|
assert by_name["prove"][0] == "ok"
|
||||||
|
assert by_name["reindex"][0] == "ok"
|
||||||
|
assert by_name["respond"][0] == "ok"
|
||||||
|
# Manual + disabled pipelines stay skipped.
|
||||||
|
assert by_name["peer-pull"][0] == "skipped"
|
||||||
|
assert by_name["vouch-refresh"][0] == "skipped"
|
||||||
|
|
||||||
|
|
||||||
|
def test_tick_respects_manual_mode(fresh_db):
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||||
|
_set_last_fired("fetch", past, 900)
|
||||||
|
pulse.set_mode("fetch", pulse.PulseMode.MANUAL)
|
||||||
|
results = pulse.tick()
|
||||||
|
by_name = {n: o for n, o, _ in results}
|
||||||
|
assert by_name["fetch"] == "skipped"
|
||||||
|
assert fresh_db["fetch"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_tick_respects_disabled(fresh_db):
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||||
|
_set_last_fired("fetch", past, 900)
|
||||||
|
pulse.set_enabled("fetch", False)
|
||||||
|
results = pulse.tick()
|
||||||
|
by_name = {n: o for n, o, _ in results}
|
||||||
|
assert by_name["fetch"] == "skipped"
|
||||||
|
assert fresh_db["fetch"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_kill_switch_halts_everything(fresh_db):
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||||
|
for p in pulse.state():
|
||||||
|
_set_last_fired(p.name, past, p.cadence_seconds)
|
||||||
|
pulse.set_kill_switch(True)
|
||||||
|
results = pulse.tick()
|
||||||
|
assert all(o == "skipped" for _, o, _ in results)
|
||||||
|
assert all(c == 0 for c in fresh_db.values())
|
||||||
|
# And killswitch state itself round-trips.
|
||||||
|
assert pulse.kill_switch_state() is True
|
||||||
|
pulse.set_kill_switch(False)
|
||||||
|
assert pulse.kill_switch_state() is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_tick_updates_last_fired_and_next_fire(fresh_db):
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||||
|
_set_last_fired("fetch", past, 900)
|
||||||
|
before = datetime.now(timezone.utc)
|
||||||
|
pulse.tick()
|
||||||
|
p = {x.name: x for x in pulse.state()}["fetch"]
|
||||||
|
assert p.last_fired is not None
|
||||||
|
assert p.last_fired >= before - timedelta(seconds=2)
|
||||||
|
assert p.next_fire is not None
|
||||||
|
assert p.next_fire > p.last_fired
|
||||||
|
delta = (p.next_fire - p.last_fired).total_seconds()
|
||||||
|
# 900s cadence, allow a 2s rounding window.
|
||||||
|
assert 898 <= delta <= 902
|
||||||
|
assert p.last_outcome == "ok"
|
||||||
|
assert "fetch: tick" in p.last_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_now_bypasses_cadence(fresh_db):
|
||||||
|
# Pipeline isn't due — run_now must still fire.
|
||||||
|
future = datetime.now(timezone.utc) + timedelta(hours=1)
|
||||||
|
for p in pulse.state():
|
||||||
|
p.last_fired = datetime.now(timezone.utc)
|
||||||
|
p.next_fire = future
|
||||||
|
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
|
||||||
|
outcome, result = pulse.run_now("fetch")
|
||||||
|
assert outcome == "ok"
|
||||||
|
assert "fetch: tick" in result
|
||||||
|
assert fresh_db["fetch"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_now_respects_kill_switch(fresh_db):
|
||||||
|
pulse.set_kill_switch(True)
|
||||||
|
outcome, result = pulse.run_now("fetch")
|
||||||
|
assert outcome == "skipped"
|
||||||
|
assert "kill switch" in result
|
||||||
|
assert fresh_db["fetch"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_now_even_when_manual(fresh_db):
|
||||||
|
"""Manual mode blocks tick() but NOT run_now — that's the whole point of manual."""
|
||||||
|
pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL)
|
||||||
|
pulse.set_enabled("peer-pull", True)
|
||||||
|
outcome, _ = pulse.run_now("peer-pull")
|
||||||
|
assert outcome == "ok"
|
||||||
|
assert fresh_db["peer-pull"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_exception_recorded_as_err(fresh_db, monkeypatch):
|
||||||
|
def boom() -> str:
|
||||||
|
raise RuntimeError("nope")
|
||||||
|
# Inject a failing runner just for "fetch".
|
||||||
|
bad_registry = dict(pulse._REGISTRY)
|
||||||
|
bad_registry["fetch"] = boom
|
||||||
|
monkeypatch.setattr(pulse, "_REGISTRY", bad_registry)
|
||||||
|
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||||
|
_set_last_fired("fetch", past, 900)
|
||||||
|
results = pulse.tick()
|
||||||
|
by_name = {n: (o, r) for n, o, r in results}
|
||||||
|
assert by_name["fetch"][0] == "err"
|
||||||
|
assert "nope" in by_name["fetch"][1]
|
||||||
|
# And the failure is persisted, with next_fire still pushed forward so we
|
||||||
|
# don't busy-retry a broken runner every tick.
|
||||||
|
p = {x.name: x for x in pulse.state()}["fetch"]
|
||||||
|
assert p.last_outcome == "err"
|
||||||
|
assert p.next_fire is not None and p.next_fire > p.last_fired
|
||||||
|
|
||||||
|
|
||||||
|
def test_unknown_pipeline_raises(fresh_db):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
pulse.set_cadence("does-not-exist", 60)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
pulse.run_now("does-not-exist")
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_cadence_raises(fresh_db):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
pulse.set_cadence("fetch", 0)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
pulse.set_cadence("fetch", -5)
|
||||||
Reference in New Issue
Block a user