Compare commits

..

15 Commits

Author SHA1 Message Date
m17hr1l
1675a2326e stage-33 wire pulse + federation: cockpit routes, CLI, nav links, SW bump 2026-06-06 16:15:48 +02:00
m17hr1l
de5ff09815 merge federation: ed25519 identity + signed feeds
# Conflicts:
#	src/psyc/db.py
2026-06-06 16:13:36 +02:00
m17hr1l
02ce6d791c merge pulse: scheduler line + autonomy dial 2026-06-06 16:11:17 +02:00
m17hr1l
d4229dd264 stage-fed-g federation: tests 2026-06-06 16:10:31 +02:00
m17hr1l
2ef0448165 stage-fed-f federation: CLI commands 2026-06-06 16:10:26 +02:00
m17hr1l
17b94acf6b stage-fed-e federation: cockpit admin page + public feed routes 2026-06-06 16:10:19 +02:00
m17hr1l
55ffd9da3d stage-fed-d federation: signed feed export + verified import 2026-06-06 16:09:53 +02:00
m17hr1l
63e3ff2777 stage-fed-c federation: db tables for peers + signal buffer 2026-06-06 16:08:36 +02:00
m17hr1l
50158f7fa8 stage-fed-b federation: dns record format 2026-06-06 16:08:31 +02:00
m17hr1l
4c35aad2bb stage-fed-a federation: ed25519 keypair + fingerprint 2026-06-06 16:08:03 +02:00
m17hr1l
a7c59c9faa stage-33e pulse: tests 2026-06-06 16:06:54 +02:00
m17hr1l
e071f289f2 stage-33d pulse: CLI commands 2026-06-06 16:05:14 +02:00
m17hr1l
26fbe08b65 stage-33c pulse: admin cockpit page 2026-06-06 16:04:39 +02:00
m17hr1l
4d67605371 stage-33b pulse: db tables + persistence 2026-06-06 16:03:30 +02:00
m17hr1l
e710be6ebd stage-33a pulse: scheduler module with pipeline registry 2026-06-06 16:03:22 +02:00
16 changed files with 2135 additions and 2 deletions

View File

@@ -16,6 +16,7 @@ dependencies = [
"httpx>=0.27",
"typer>=0.12",
"pynacl>=1.5",
"cryptography>=42.0",
"structlog>=24.1",
"sqlalchemy>=2.0",
"python-dotenv>=1.0",

135
src/psyc/_federation_cli.py Normal file
View File

@@ -0,0 +1,135 @@
"""Federation CLI — keygen, DNS records, feed export, peer registry, verify.
Registered onto the top-level Typer app from cli.py so the surface stays flat.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
import httpx
import typer
from psyc import db, log
from psyc.lines import federation
from psyc.result import Err
_log = log.get(__name__)
def register(typer_app: typer.Typer) -> None:
"""Mount all `fed-*` commands onto `typer_app`."""
@typer_app.command("fed-keygen")
def fed_keygen() -> None:
"""Generate the node's Ed25519 keypair (or load existing). Prints fingerprint."""
federation.node_keypair() # creates the files if missing
typer.echo(federation.node_fingerprint())
@typer_app.command("fed-dns")
def fed_dns(
domain: str = typer.Argument(..., help="public domain to advertise this node on"),
port: int = typer.Option(443, "--port", help="port psyc is reachable on"),
) -> None:
"""Print the DNS SRV + TXT records to publish under `domain`."""
rec = federation.dns_record(domain, port=port)
typer.echo(rec.human_instructions)
@typer_app.command("fed-feed")
def fed_feed(
window_hours: int = typer.Option(24, "--hours", help="lookback window (hours)"),
) -> None:
"""Build + print the signed feed JSON."""
db.init_db()
payload = federation.build_signed_feed(window_hours=window_hours)
typer.echo(json.dumps(payload, indent=2))
@typer_app.command("fed-verify")
def fed_verify(
peer_url: str = typer.Argument(..., help="peer base URL, e.g. https://peer.example"),
) -> None:
"""Fetch a peer's /federation/{info,key,feed} and verify the signature."""
peer_url = peer_url.rstrip("/")
try:
with httpx.Client(timeout=10.0) as client:
info = client.get(f"{peer_url}/federation/info").json()
key_text = client.get(f"{peer_url}/federation/key").text
feed = client.get(f"{peer_url}/federation/feed").json()
except Exception as exc:
typer.echo(f"error: fetch failed: {exc}", err=True)
raise typer.Exit(1)
# If the peer is already in the registry, prefer the stored pubkey
# (TOFU pin); otherwise warn and use the freshly fetched one.
declared_fp = info.get("fingerprint", "")
pubkey_pem = key_text
pinned = None
for p in federation.list_peers():
if p.fingerprint == declared_fp:
pinned = p
break
if pinned:
pubkey_pem = pinned.pubkey_pem
typer.echo(f" · using pinned pubkey for {pinned.domain}")
else:
typer.echo(" · WARNING: no pinned pubkey for this peer — trusting fetched key (TOFU)")
db.init_db()
result = federation.import_signed_feed(feed, pubkey_pem)
if isinstance(result, Err):
typer.echo(f" ✗ verification failed: {result.reason}", err=True)
raise typer.Exit(1)
s = result.value
typer.echo(f" ✓ verified peer {s.peer_fingerprint}")
typer.echo(f" cases: {s.cases_seen} iocs: {s.iocs_seen} signals buffered: {len(s.signal_ids)}")
@typer_app.command("fed-peer-add")
def fed_peer_add(
domain: str = typer.Argument(..., help="peer's public domain"),
fingerprint: str = typer.Argument(..., help="peer's 32-hex fingerprint"),
pubkey_file: Path = typer.Option(..., "--pubkey-file", help="path to peer's PEM public key"),
status: str = typer.Option("unknown", "--status", help="unknown | trusted | blocked"),
) -> None:
"""Register a peer's identity in the local registry."""
db.init_db()
pem = pubkey_file.read_text(encoding="utf-8")
federation.register_peer(domain, fingerprint, pem, status=status)
typer.echo(f"registered peer {domain} ({fingerprint[:8]}…) status={status}")
@typer_app.command("fed-peer-list")
def fed_peer_list() -> None:
"""List all registered peers."""
db.init_db()
rows = federation.list_peers()
if not rows:
typer.echo("(no peers registered)")
return
for p in rows:
typer.echo(
f" {p.status:8s} {p.domain:30s} {p.fingerprint[:8]}{p.fingerprint[-8:]}"
f" last_seen={(p.last_seen or '')[:16]}"
)
@typer_app.command("fed-peer-trust")
def fed_peer_trust(domain: str = typer.Argument(...)) -> None:
"""Mark a peer as trusted — their signals count toward quorum."""
db.init_db()
federation.set_peer_status(domain, "trusted")
typer.echo(f"{domain} → trusted")
@typer_app.command("fed-peer-block")
def fed_peer_block(domain: str = typer.Argument(...)) -> None:
"""Block a peer — ignore their feeds."""
db.init_db()
federation.set_peer_status(domain, "blocked")
typer.echo(f"{domain} → blocked")
@typer_app.command("fed-peer-remove")
def fed_peer_remove(domain: str = typer.Argument(...)) -> None:
"""Drop a peer from the registry."""
db.init_db()
federation.remove_peer(domain)
typer.echo(f"removed {domain}")

122
src/psyc/_pulse_cli.py Normal file
View File

@@ -0,0 +1,122 @@
"""Typer commands for the Pulse scheduler.
Imported and wired by `cli.py` via `register(app)`. Kept as a separate module
so the main CLI surface stays grep-friendly and the scheduler can grow its own
verbs without bloating cli.py.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
import typer
from psyc import db
from psyc.lines import pulse
def _relative(dt: Optional[datetime]) -> str:
if dt is None:
return ""
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = int((dt - now).total_seconds())
past = delta < 0
secs = abs(delta)
if secs < 5:
return "now"
if secs < 60:
unit = f"{secs}s"
elif secs < 3600:
unit = f"{secs // 60}m"
elif secs < 86400:
unit = f"{secs // 3600}h"
else:
unit = f"{secs // 86400}d"
return f"{unit} ago" if past else f"in {unit}"
def register(typer_app: typer.Typer) -> None:
"""Add pulse-* commands to the given Typer app."""
@typer_app.command("pulse-status")
def pulse_status() -> None:
"""Print the pipeline table (mode · cadence · last-fired · next-fire · last-result)."""
db.init_db()
ks = pulse.kill_switch_state()
typer.echo(f"kill switch: {'ARMED' if ks else 'OFF'}")
rows = pulse.state()
if not rows:
typer.echo("(no pipelines registered)")
return
typer.echo(f"{'name':<16} {'mode':<14} {'cadence':>8} {'enabled':<7} {'last':<10} {'next':<10} result")
for p in rows:
typer.echo(
f"{p.name:<16} {p.mode.value:<14} {p.cadence_seconds:>6}s "
f"{'yes' if p.enabled else 'no':<7} "
f"{_relative(p.last_fired):<10} {_relative(p.next_fire):<10} "
f"{(p.last_result or '')[:60]}"
)
@typer_app.command("pulse-tick")
def pulse_tick() -> None:
"""Run one scheduler heartbeat and print the per-pipeline outcomes."""
db.init_db()
results = pulse.tick()
for name, outcome, result in results:
marker = {"ok": "", "err": "", "skipped": ""}.get(outcome, "·")
typer.echo(f" {marker} {name:<16} {outcome:<8} {result[:120]}")
@typer_app.command("pulse-set-mode")
def pulse_set_mode(
name: str = typer.Argument(..., help="pipeline name"),
mode: str = typer.Argument(..., help="manual | auto-propose | auto-execute"),
) -> None:
db.init_db()
try:
pulse.set_mode(name, pulse.PulseMode(mode))
except ValueError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(1)
typer.echo(f"{name} mode → {mode}")
@typer_app.command("pulse-set-cadence")
def pulse_set_cadence(
name: str = typer.Argument(..., help="pipeline name"),
seconds: int = typer.Argument(..., help="cadence in seconds (>0)"),
) -> None:
db.init_db()
try:
pulse.set_cadence(name, seconds)
except ValueError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(1)
typer.echo(f"{name} cadence → {seconds}s")
@typer_app.command("pulse-run")
def pulse_run(name: str = typer.Argument(..., help="pipeline name")) -> None:
"""Manually fire one pipeline (bypasses cadence; honors kill switch)."""
db.init_db()
try:
outcome, result = pulse.run_now(name)
except ValueError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(1)
marker = {"ok": "", "err": "", "skipped": ""}.get(outcome, "·")
typer.echo(f" {marker} {name}: {outcome}{result}")
@typer_app.command("pulse-kill")
def pulse_kill() -> None:
"""Arm the kill switch — every pipeline halts on the next tick."""
db.init_db()
pulse.set_kill_switch(True)
typer.echo("kill switch ARMED — all pipelines halted")
@typer_app.command("pulse-unkill")
def pulse_unkill() -> None:
"""Disarm the kill switch — pulse resumes on the next tick."""
db.init_db()
pulse.set_kill_switch(False)
typer.echo("kill switch disarmed — pulse resumes")

View File

@@ -17,11 +17,15 @@ from psyc.lines import classify, courier, lookup, proof, respond, route, scout,
from psyc.lines import map as map_line
from psyc.models import Outcome
from psyc.result import Err, Ok
from psyc._federation_cli import register as _register_federation_cli
from psyc._pulse_cli import register as _register_pulse_cli
app = typer.Typer(add_completion=False, help="psyc — defensive CTI routing & sealing")
log.configure()
_log = log.get(__name__)
_register_pulse_cli(app)
_register_federation_cli(app)
@app.command("init")

View File

@@ -12,7 +12,7 @@ from fastapi.templating import Jinja2Templates
from starlette.middleware.sessions import SessionMiddleware
from psyc import db, log
from psyc.cockpit import adminauth, case_visuals, docker_view, inference, journey as journey_view
from psyc.cockpit import adminauth, case_visuals, docker_view, federation_routes, inference, journey as journey_view, pulse_routes
from psyc.lines import courier as courier_line
from psyc.lines import ledger as ledger_line
from psyc.lines import lookup as lookup_line
@@ -35,6 +35,9 @@ app = FastAPI(title="psyc Operations Cockpit", version="0.1.0")
app.add_middleware(SessionMiddleware, secret_key=adminauth.session_secret(), max_age=3600)
app.mount("/static", StaticFiles(directory=str(HERE / "static")), name="static")
pulse_routes.register(app, TEMPLATES)
federation_routes.register(app, TEMPLATES)
def _admin_ok(request: Request) -> bool:
return bool(request.session.get("admin_ok"))

View File

@@ -0,0 +1,125 @@
"""Federation cockpit routes — admin page, public feed/key/info endpoints.
Wired into the FastAPI app by app.py via a single `register(app, TEMPLATES)`
call so the federation surface stays self-contained.
"""
from __future__ import annotations
import json
import time
from typing import Any, Dict, Optional, Tuple
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from psyc import db, log
from psyc.lines import federation
_log = log.get(__name__)
# Tiny in-memory cache for the signed feed — peers may poll, recomputing
# canonical JSON + signature on every hit would be wasteful.
_FEED_CACHE: Dict[str, Any] = {"ts": 0.0, "payload": None}
_FEED_TTL = 60.0
def _admin_ok(request: Request) -> bool:
return bool(request.session.get("admin_ok"))
def _cached_feed() -> Dict[str, Any]:
now = time.time()
if _FEED_CACHE["payload"] is None or (now - _FEED_CACHE["ts"]) > _FEED_TTL:
_FEED_CACHE["payload"] = federation.build_signed_feed()
_FEED_CACHE["ts"] = now
return _FEED_CACHE["payload"]
def register(app: FastAPI, TEMPLATES: Jinja2Templates) -> None:
"""Mount all federation routes onto `app`."""
@app.get("/admin/federation", response_class=HTMLResponse)
def admin_federation(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
host = request.url.hostname or "your-node.example"
suggested = request.query_params.get("domain", host)
rec = federation.dns_record(suggested)
peers = federation.list_peers()
signals = db.recent_signals(limit=20)
return TEMPLATES.TemplateResponse(
request,
"admin_federation.html",
{
"fingerprint": federation.node_fingerprint(),
"pubkey_pem": federation.public_key_pem(),
"suggested_domain": suggested,
"dns": rec,
"peers": peers,
"signals": signals,
},
)
@app.post("/admin/federation/peers/add")
def admin_federation_add_peer(
request: Request,
domain: str = Form(...),
fingerprint: str = Form(...),
pubkey_pem: str = Form(...),
status: str = Form("unknown"),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
federation.register_peer(domain.strip(), fingerprint.strip(), pubkey_pem.strip(), status=status)
except Exception as exc:
_log.warning("federation.peer.add.error", domain=domain, error=str(exc))
return RedirectResponse("/admin/federation", status_code=303)
@app.post("/admin/federation/peers/{domain}/status")
def admin_federation_set_status(
request: Request,
domain: str,
status: str = Form(...),
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
try:
federation.set_peer_status(domain, status)
except ValueError as exc:
_log.warning("federation.peer.status.bad", domain=domain, status=status, error=str(exc))
return RedirectResponse("/admin/federation", status_code=303)
@app.post("/admin/federation/peers/{domain}/remove")
def admin_federation_remove(
request: Request,
domain: str,
) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
federation.remove_peer(domain)
return RedirectResponse("/admin/federation", status_code=303)
# ---------- public endpoints --------------------------------------
@app.get("/federation/info")
def federation_info() -> JSONResponse:
return JSONResponse({
"fingerprint": federation.node_fingerprint(),
"version": federation.FEED_VERSION,
"feed": federation.FEED_PATH,
"key": "/federation/key",
})
@app.get("/federation/key", response_class=PlainTextResponse)
def federation_key() -> PlainTextResponse:
return PlainTextResponse(federation.public_key_pem(), media_type="text/plain")
@app.get("/federation/feed")
def federation_feed() -> JSONResponse:
return JSONResponse(_cached_feed())
_log.info("federation.routes.registered")

View File

@@ -0,0 +1,120 @@
"""Cockpit routes for the Pulse scheduler — admin-gated.
The integration is intentionally single-call: `register(app, TEMPLATES)` adds
the routes AND wires the FastAPI startup hook that launches the background
scheduler loop. Caller in app.py just imports + invokes register().
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from psyc import log
from psyc.lines import pulse
_log = log.get(__name__)
TICK_INTERVAL_SECONDS = 30
def _admin_ok(request: Request) -> bool:
"""Mirror of the local helper in app.py — admin session is just session['admin_ok']."""
return bool(request.session.get("admin_ok"))
def _relative(dt: Optional[datetime]) -> str:
"""Human-friendly "3m ago" / "in 12m" / "now". None → ''."""
if dt is None:
return ""
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = (dt - now).total_seconds()
past = delta < 0
secs = abs(int(delta))
if secs < 5:
return "now"
if secs < 60:
unit = f"{secs}s"
elif secs < 3600:
unit = f"{secs // 60}m"
elif secs < 86400:
unit = f"{secs // 3600}h"
else:
unit = f"{secs // 86400}d"
return f"{unit} ago" if past else f"in {unit}"
def register(app: FastAPI, templates: Jinja2Templates) -> None:
"""Attach the /admin/pulse routes and the background scheduler loop to `app`."""
@app.get("/admin/pulse", response_class=HTMLResponse)
def pulse_view(request: Request) -> HTMLResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
flash = request.query_params.get("flash", "")
pipelines = pulse.state()
return templates.TemplateResponse(
request,
"admin_pulse.html",
{
"pipelines": pipelines,
"kill_switch": pulse.kill_switch_state(),
"tick_interval": TICK_INTERVAL_SECONDS,
"relative": _relative,
"flash": flash,
},
)
@app.post("/admin/pulse/kill")
def pulse_toggle_kill(request: Request) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
new = not pulse.kill_switch_state()
pulse.set_kill_switch(new)
flash = "kill switch ARMED — all pipelines halted" if new else "kill switch disarmed — pulse resumes"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.post("/admin/pulse/{name}/update")
def pulse_update(
request: Request,
name: str,
mode: str = Form(...),
cadence_seconds: int = Form(...),
enabled: Optional[str] = Form(None),
) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
try:
pulse.set_mode(name, pulse.PulseMode(mode))
pulse.set_cadence(name, int(cadence_seconds))
pulse.set_enabled(name, enabled is not None)
flash = f"updated {name}: mode={mode}, cadence={cadence_seconds}s, enabled={enabled is not None}"
except (ValueError, KeyError) as exc:
_log.warning("pulse.update.error", name=name, error=str(exc))
flash = f"update failed: {exc}"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.post("/admin/pulse/{name}/run")
def pulse_run_now(request: Request, name: str) -> RedirectResponse:
if not _admin_ok(request):
return RedirectResponse("/admin", status_code=303)
try:
outcome, result = pulse.run_now(name)
flash = f"{name}{outcome}: {result[:120]}"
except ValueError as exc:
flash = f"run failed: {exc}"
return RedirectResponse(f"/admin/pulse?flash={flash}", status_code=303)
@app.on_event("startup")
async def _start_pulse_loop() -> None:
# Fire-and-forget; the loop catches its own exceptions and self-restarts.
asyncio.create_task(pulse.start_background_loop(interval_seconds=TICK_INTERVAL_SECONDS))
_log.info("pulse.routes.registered", tick=TICK_INTERVAL_SECONDS)

View File

@@ -5,7 +5,7 @@
// This makes the cockpit installable as a PWA and survives flaky connections,
// without serving stale operational data behind the operator's back.
const CACHE_VERSION = "psyc-v2";
const CACHE_VERSION = "psyc-v3";
const STATIC_ASSETS = [
"/static/cockpit.css",
"/static/psyc-tokens.css",

View File

@@ -0,0 +1,132 @@
{% extends "base.html" %}
{% block title %}Federation — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Federation Identity</h1>
<span class="count">{{ peers|length }} peer{{ '' if peers|length == 1 else 's' }}</span>
</div>
<p class="page-intro">This node's Ed25519 identity. The fingerprint goes into a DNS TXT record so other psyc nodes can discover this one. The public key lets them verify any feed we publish — the private key never leaves this box.</p>
<p class="back"><a href="/admin">← back to admin</a></p>
<div class="card" style="margin-bottom:14px;">
<div class="lg-sub">node fingerprint</div>
<div style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:18px; word-break:break-all; margin:6px 0 12px; color:var(--accent); text-shadow:0 0 12px var(--accent-glow);">{{ fingerprint }}</div>
<details>
<summary class="lg-sub" style="cursor:pointer;">public key (PEM)</summary>
<pre style="background:var(--panel-2); border:1px solid var(--border); border-radius:6px; padding:10px; margin-top:8px; overflow-x:auto; font-size:11.5px;">{{ pubkey_pem }}</pre>
</details>
</div>
</section>
<section class="panel">
<div class="panel-head">
<h2>Publish via DNS</h2>
<span class="count">SRV + TXT records</span>
</div>
<p class="page-intro">Paste these into your zone file. Once they're live, any peer that knows your domain can discover the node and pin the right key without out-of-band coordination.</p>
<form method="get" action="/admin/federation" class="lookup-form" style="margin-bottom:12px;">
<input type="text" name="domain" value="{{ suggested_domain }}" class="lookup-input" placeholder="domain to publish on (e.g. psyc.example.com)">
<button type="submit" class="btn btn-enforce">regenerate</button>
</form>
<pre style="background:var(--panel-2); border:1px solid var(--border); border-radius:6px; padding:12px; overflow-x:auto; font-size:12px; line-height:1.5;">{{ dns.human_instructions }}</pre>
</section>
<section class="panel">
<div class="panel-head">
<h2>Known Peers</h2>
<span class="count">{{ peers|length }} registered</span>
</div>
<p class="page-intro">Trusted peers' feeds are signature-verified on every poll. Blocked peers are recorded but ignored. Unknown peers are kept for review — nothing flows from them until you set them trusted.</p>
{% if peers %}
<table class="ledger">
<thead><tr><th>Domain</th><th>Fingerprint</th><th>Status</th><th>Discovered</th><th>Last seen</th><th></th></tr></thead>
<tbody>
{% for p in peers %}
<tr class="ledger-row">
<td><strong>{{ p.domain }}</strong></td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ p.fingerprint[:8] }}…{{ p.fingerprint[-8:] }}</td>
<td>
{% if p.status == 'trusted' %}
<span class="sev-badge" style="background:rgba(74,222,128,0.10); color:var(--green); border-color:var(--green);">trusted</span>
{% elif p.status == 'blocked' %}
<span class="sev-badge" style="background:rgba(248,113,113,0.10); color:var(--red); border-color:var(--red);">blocked</span>
{% else %}
<span class="sev-badge">unknown</span>
{% endif %}
</td>
<td class="lg-ts">{{ (p.discovered_at or '')[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ ((p.last_seen or '')[:16] | replace('T', ' ')) or '—' }}</td>
<td>
<form method="post" action="/admin/federation/peers/{{ p.domain }}/status" class="queue-action">
<input type="hidden" name="status" value="trusted">
<button type="submit" class="btn btn-enforce" {% if p.status == 'trusted' %}disabled{% endif %}>trust</button>
</form>
<form method="post" action="/admin/federation/peers/{{ p.domain }}/status" class="queue-action">
<input type="hidden" name="status" value="blocked">
<button type="submit" class="btn btn-reject" {% if p.status == 'blocked' %}disabled{% endif %}>block</button>
</form>
<form method="post" action="/admin/federation/peers/{{ p.domain }}/remove" class="queue-action"
onsubmit="return confirm('Remove {{ p.domain }}? Their signals will no longer count toward quorum.');">
<button type="submit" class="btn">remove</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no peers yet — add one below)</p>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Add Peer</h2>
</div>
<p class="page-intro">Pin a peer's identity manually: their domain, their fingerprint (from their DNS TXT record), and the public key they publish at <code>/federation/key</code>.</p>
<form method="post" action="/admin/federation/peers/add" style="display:grid; gap:10px; max-width:680px;">
<input type="text" name="domain" placeholder="peer domain (e.g. peer.example.com)" class="lookup-input" required>
<input type="text" name="fingerprint" placeholder="fingerprint (32 hex chars)" class="lookup-input" maxlength="64" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">
<textarea name="pubkey_pem" placeholder="-----BEGIN PUBLIC KEY-----&#10;…&#10;-----END PUBLIC KEY-----" rows="6" class="lookup-input" required style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;"></textarea>
<select name="status" class="lookup-input">
<option value="unknown">unknown — record only, don't trust yet</option>
<option value="trusted">trusted — count toward quorum</option>
<option value="blocked">blocked — ignore</option>
</select>
<button type="submit" class="btn btn-enforce">+ register peer</button>
</form>
</section>
<section class="panel">
<div class="panel-head">
<h2>Recent Signals</h2>
<span class="count">last {{ signals|length }} of buffer</span>
</div>
<p class="page-intro">Verified federation signals from peers — case + IOC reports awaiting quorum. The signal buffer is what later quorum logic will count over.</p>
{% if signals %}
<table class="ledger">
<thead><tr><th>Received</th><th>Peer</th><th>Type</th><th>Id</th><th>Hash</th></tr></thead>
<tbody>
{% for s in signals %}
<tr class="ledger-row">
<td class="lg-ts">{{ (s.received_at or '')[:19] | replace('T', ' ') }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace;">{{ s.peer_fingerprint[:8] }}…</td>
<td><span class="sev-badge">{{ s.signal_type }}</span></td>
<td style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ s.signal_id[:48] }}</td>
<td class="lg-sub" style="font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:11.5px;">{{ s.signal_hash[:16] }}…</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="page-intro">(no signals received yet — quorum stage will populate this)</p>
{% endif %}
</section>
{% endblock %}

View File

@@ -0,0 +1,101 @@
{% extends "base.html" %}
{% block title %}Pulse — psyc admin{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Pulse — autonomous heartbeat</h1>
<span class="count">{{ pipelines|length }} pipeline{{ '' if pipelines|length == 1 else 's' }}</span>
</div>
<p class="page-intro">Cron-style scheduler that drives every psyc line on a cadence without human input. Each pipeline has an autonomy mode and a cadence in seconds. The kill switch halts everything instantly — it overrides cadence, mode, and the enabled flag.</p>
<p class="back"><a href="/admin">← back to admin</a></p>
{% if flash %}
<div class="verdict verdict-clean">{{ flash }}</div>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Global kill switch</h2>
<span class="count">{{ 'ARMED' if kill_switch else 'OFF' }}</span>
</div>
{% if kill_switch %}
<div class="verdict" style="background:rgba(248,113,113,0.12); border:1px solid rgba(248,113,113,0.45); color:#fca5a5; padding:14px 18px; border-radius:6px; font-weight:700; letter-spacing:0.04em;">
✗ KILL SWITCH ARMED — every pipeline is paused. tick() returns "skipped" for everything. Run-now is also blocked. Toggle off to resume.
</div>
<form method="post" action="/admin/pulse/kill" style="margin-top:14px;">
<button type="submit" class="btn btn-approve">Disarm kill switch</button>
</form>
{% else %}
<div class="verdict verdict-clean">✓ Pulse is live — the background loop ticks every {{ tick_interval }}s.</div>
<form method="post" action="/admin/pulse/kill" style="margin-top:14px;"
onsubmit="return confirm('Arm the kill switch? Every pipeline halts immediately.');">
<button type="submit" class="btn btn-reject">Arm kill switch</button>
</form>
{% endif %}
</section>
<section class="panel">
<div class="panel-head">
<h2>Pipelines</h2>
<span class="count">{{ pipelines|selectattr('enabled')|list|length }} enabled</span>
</div>
<p class="page-intro">Mode: <code>auto-execute</code> fires the line, <code>auto-propose</code> stages proposals for human approval, <code>manual</code> runs only when you press “Run now”. Cadence is the gap between ticks; the loop wakes up every {{ tick_interval }}s.</p>
<table class="ledger">
<thead>
<tr>
<th style="width:24%;">Pipeline</th>
<th>Mode · cadence · enabled</th>
<th style="width:11%;">Last fired</th>
<th style="width:11%;">Next fire</th>
<th>Last result</th>
<th style="width:1%;"></th>
</tr>
</thead>
<tbody>
{% for p in pipelines %}
<tr class="ledger-row">
<td>
<strong>{{ p.title }}</strong>
<div class="lg-sub"><code>{{ p.name }}</code> · {{ p.description }}</div>
</td>
<td>
<form method="post" action="/admin/pulse/{{ p.name }}/update" class="queue-action" style="display:flex; gap:8px; align-items:center; flex-wrap:wrap;">
<select name="mode" style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px;">
<option value="auto-execute" {% if p.mode.value == 'auto-execute' %}selected{% endif %}>auto-execute</option>
<option value="auto-propose" {% if p.mode.value == 'auto-propose' %}selected{% endif %}>auto-propose</option>
<option value="manual" {% if p.mode.value == 'manual' %}selected{% endif %}>manual</option>
</select>
<input type="number" name="cadence_seconds" value="{{ p.cadence_seconds }}" min="1" step="1"
style="background:var(--panel-2); color:var(--text); border:1px solid var(--border); border-radius:4px; padding:4px 6px; width:84px;" title="cadence in seconds">
<label style="display:inline-flex; align-items:center; gap:4px; font-size:12px;">
<input type="checkbox" name="enabled" value="1" {% if p.enabled %}checked{% endif %}> enabled
</label>
<button type="submit" class="btn">save</button>
</form>
</td>
<td class="lg-ts">
{% if p.last_fired %}<span title="{{ p.last_fired.isoformat() }}">{{ relative(p.last_fired) }}</span>{% else %}—{% endif %}
</td>
<td class="lg-ts">
{% if p.next_fire %}<span title="{{ p.next_fire.isoformat() }}">{{ relative(p.next_fire) }}</span>{% else %}—{% endif %}
</td>
<td class="lg-sub" style="max-width:0;">
{% if p.last_outcome == 'ok' %}<span style="color: var(--green);"></span>
{% elif p.last_outcome == 'err' %}<span style="color: var(--red);"></span>
{% elif p.last_outcome == 'skipped' %}<span style="color: var(--muted);"></span>
{% endif %}
{{ (p.last_result or '—')[:60] }}{% if (p.last_result or '')|length > 60 %}…{% endif %}
</td>
<td>
<form method="post" action="/admin/pulse/{{ p.name }}/run" class="queue-action">
<button type="submit" class="btn btn-enforce" title="Fire now, regardless of cadence">Run now</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

View File

@@ -69,6 +69,8 @@
</svg>
Admin
</a>
<a href="/admin/pulse" class="nav-admin" title="Autonomy scheduler">Pulse</a>
<a href="/admin/federation" class="nav-admin" title="Federation peers">Federation</a>
{% endif %}
</nav>
{% if request.session.get('admin_who') %}

View File

@@ -114,6 +114,53 @@ Index("iocs_value_idx", iocs.c.value)
Index("iocs_type_idx", iocs.c.ioc_type)
Index("iocs_case_idx", iocs.c.case_id)
pulse_pipelines = Table(
"pulse_pipelines", _metadata,
Column("name", String, primary_key=True),
Column("title", String, nullable=False),
Column("description", Text, nullable=False, default=""),
Column("mode", String, nullable=False), # manual | auto-propose | auto-execute
Column("cadence_seconds", Integer, nullable=False),
Column("enabled", Boolean, nullable=False, default=True),
Column("last_fired", String, nullable=True), # ISO timestamp or NULL
Column("next_fire", String, nullable=True), # ISO timestamp or NULL
Column("last_result", Text, nullable=False, default=""),
Column("last_outcome", String, nullable=False, default=""), # ok | err | skipped | ""
)
pulse_settings = Table(
"pulse_settings", _metadata,
Column("key", String, primary_key=True),
Column("value", String, nullable=False),
)
peers = Table(
"peers", _metadata,
Column("domain", String, primary_key=True),
Column("fingerprint", String, nullable=False),
Column("pubkey_pem", Text, nullable=False),
Column("status", String, nullable=False), # unknown | trusted | blocked
Column("discovered_at", String, nullable=False),
Column("last_seen", String, nullable=True),
Column("notes", Text, nullable=True),
)
Index("peers_fp_idx", peers.c.fingerprint)
Index("peers_status_idx", peers.c.status)
federation_signals = Table(
"federation_signals", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("peer_fingerprint", String, nullable=False),
Column("signal_type", String, nullable=False), # case | ioc
Column("signal_id", String, nullable=False), # case_id or ioc value
Column("signal_hash", String, nullable=False), # sha256 of canonical record
Column("received_at", String, nullable=False),
Column("raw_json", Text, nullable=False),
)
Index("federation_signals_hash_idx", federation_signals.c.signal_hash)
Index("federation_signals_peer_idx", federation_signals.c.peer_fingerprint)
Index("federation_signals_received_idx", federation_signals.c.received_at.desc())
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -214,3 +261,111 @@ def ioc_count(db_path: Path = DB_PATH) -> int:
stmt = select(func.count()).select_from(iocs)
with engine(db_path).connect() as conn:
return conn.execute(stmt).scalar_one()
# ---------- pulse scheduler ----------------------------------------------
def get_pulse_state(db_path: Path = DB_PATH) -> List[dict]:
"""Every registered pipeline, ordered by name."""
stmt = select(pulse_pipelines).order_by(pulse_pipelines.c.name)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def upsert_pulse_pipeline(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert or update one pipeline by name."""
stmt = sqlite_insert(pulse_pipelines).values(**row)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_pipelines.c.name],
set_=dict(
title=stmt.excluded.title,
description=stmt.excluded.description,
mode=stmt.excluded.mode,
cadence_seconds=stmt.excluded.cadence_seconds,
enabled=stmt.excluded.enabled,
last_fired=stmt.excluded.last_fired,
next_fire=stmt.excluded.next_fire,
last_result=stmt.excluded.last_result,
last_outcome=stmt.excluded.last_outcome,
),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def kill_switch_get(db_path: Path = DB_PATH) -> bool:
stmt = select(pulse_settings.c.value).where(pulse_settings.c.key == "kill_switch")
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
if row is None:
return False
return str(row.value) == "1"
def kill_switch_set(armed: bool, db_path: Path = DB_PATH) -> None:
value = "1" if armed else "0"
stmt = sqlite_insert(pulse_settings).values(key="kill_switch", value=value)
stmt = stmt.on_conflict_do_update(
index_elements=[pulse_settings.c.key],
set_=dict(value=stmt.excluded.value),
)
with engine(db_path).begin() as conn:
conn.execute(stmt)
# ---------- federation: peers + signal buffer ----------------------------
def upsert_peer(row: dict, db_path: Path = DB_PATH) -> None:
"""Insert-or-update a peer by domain. `row` must include `domain`."""
stmt = sqlite_insert(peers).values(**row)
update_cols = {k: stmt.excluded[k] for k in row if k != "domain"}
stmt = stmt.on_conflict_do_update(index_elements=[peers.c.domain], set_=update_cols)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def list_peers(db_path: Path = DB_PATH) -> List[dict]:
stmt = select(peers).order_by(peers.c.discovered_at.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def get_peer(domain: str, db_path: Path = DB_PATH) -> Optional[dict]:
stmt = select(peers).where(peers.c.domain == domain)
with engine(db_path).connect() as conn:
row = conn.execute(stmt).fetchone()
return dict(row._mapping) if row else None
def set_peer_status(domain: str, status: str, db_path: Path = DB_PATH) -> None:
from sqlalchemy import update as sa_update
stmt = sa_update(peers).where(peers.c.domain == domain).values(status=status)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def remove_peer(domain: str, db_path: Path = DB_PATH) -> None:
stmt = peers.delete().where(peers.c.domain == domain)
with engine(db_path).begin() as conn:
conn.execute(stmt)
def record_signal(row: dict, db_path: Path = DB_PATH) -> int:
"""Append one federation signal. Returns the inserted row id."""
stmt = insert(federation_signals).values(**row)
with engine(db_path).begin() as conn:
res = conn.execute(stmt)
return int(res.inserted_primary_key[0])
def signals_for_hash(signal_hash: str, db_path: Path = DB_PATH) -> List[dict]:
"""All recorded signals matching `signal_hash` — quorum-lookup primitive."""
stmt = select(federation_signals).where(federation_signals.c.signal_hash == signal_hash)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def recent_signals(limit: int = 200, db_path: Path = DB_PATH) -> List[dict]:
stmt = select(federation_signals).order_by(federation_signals.c.received_at.desc()).limit(limit)
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]

View File

@@ -0,0 +1,405 @@
"""Federation — node identity, signed feeds, peer registry.
Identity layer for internet-wide federation of psyc nodes. Each node owns
an Ed25519 keypair persisted under DATA_DIR/federation/. The public key
fingerprint (first 16 bytes of SHA256(raw_pubkey) hex-encoded) goes into a
DNS TXT record so peers can discover and authenticate the node, and the
private key signs the outbound feed at /federation/feed.
This module is the *identity* primitives only — discovery walkers,
vouching/quorum, transparency log and auto-pull live in later stages.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from pydantic import BaseModel, Field
from psyc import DATA_DIR, db, log
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
FED_DIR = DATA_DIR / "federation"
PRIVATE_KEY_PATH = FED_DIR / "node.key"
PUBLIC_KEY_PATH = FED_DIR / "node.pub"
FEED_VERSION = "psyc1"
FEED_ALG = "ed25519"
FEED_PATH = "/federation/feed"
# ---------- keypair persistence -----------------------------------------
def _ensure_dir() -> None:
FED_DIR.mkdir(parents=True, exist_ok=True)
def node_keypair() -> Tuple[ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey]:
"""Return the node's Ed25519 keypair, generating + persisting it on first call.
Private key lands at data/federation/node.key (PEM, chmod 0600); public
at data/federation/node.pub (PEM). Idempotent — subsequent calls load
the existing files instead of generating new ones.
"""
_ensure_dir()
if PRIVATE_KEY_PATH.exists() and PUBLIC_KEY_PATH.exists():
priv_pem = PRIVATE_KEY_PATH.read_bytes()
priv = serialization.load_pem_private_key(priv_pem, password=None)
if not isinstance(priv, ed25519.Ed25519PrivateKey):
raise RuntimeError(f"federation key at {PRIVATE_KEY_PATH} is not Ed25519")
return priv, priv.public_key()
priv = ed25519.Ed25519PrivateKey.generate()
priv_pem = priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
pub = priv.public_key()
pub_pem = pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
PRIVATE_KEY_PATH.write_bytes(priv_pem)
os.chmod(PRIVATE_KEY_PATH, 0o600)
PUBLIC_KEY_PATH.write_bytes(pub_pem)
_log.info("federation.keypair.generated", path=str(PRIVATE_KEY_PATH))
return priv, pub
def public_key_pem() -> str:
"""PEM-encoded public key as text — what peers store + verify against."""
_, pub = node_keypair()
return pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
def _raw_pubkey_bytes(pub: ed25519.Ed25519PublicKey) -> bytes:
return pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
def node_fingerprint() -> str:
"""Short stable id for the node — first 16 bytes of SHA256(raw_pubkey), hex.
Lives in DNS TXT records; 32 hex chars is short enough to fit but long
enough to be collision-safe for any plausible peer population.
"""
_, pub = node_keypair()
digest = hashlib.sha256(_raw_pubkey_bytes(pub)).digest()
return digest[:16].hex()
def _fingerprint_for_pubkey_pem(pubkey_pem: str) -> str:
pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
if not isinstance(pub, ed25519.Ed25519PublicKey):
raise ValueError("not an Ed25519 public key")
return hashlib.sha256(_raw_pubkey_bytes(pub)).digest()[:16].hex()
# ---------- DNS record format -------------------------------------------
class DNSRecord(BaseModel):
"""The SRV + TXT pair an admin pastes into their zone file."""
srv_name: str
srv_target: str
srv_port: int
srv_priority: int = 10
srv_weight: int = 10
txt_name: str
txt_value: str
human_instructions: str
def dns_record(domain: str, port: int = 443) -> DNSRecord:
"""Build the DNS-SD-style records that advertise this node at `domain`."""
fp = node_fingerprint()
srv_name = f"_psyc._tcp.{domain}"
srv_target = f"{domain}."
txt_name = srv_name
txt_value = f"v={FEED_VERSION} fp={fp} alg={FEED_ALG} path={FEED_PATH}"
instructions = (
f"; psyc federation records for {domain}\n"
f"; ----------------------------------------------------------\n"
f"; 1) SRV record — locates this psyc node (host + port).\n"
f'{srv_name}. 3600 IN SRV 10 10 {port} {srv_target}\n'
f";\n"
f"; 2) TXT record — declares protocol version, key fingerprint,\n"
f"; signature algorithm, and the feed endpoint path.\n"
f'{txt_name}. 3600 IN TXT "{txt_value}"\n'
f"; ----------------------------------------------------------\n"
f"; Once these are live, federation peers can fetch:\n"
f"; https://{domain}{FEED_PATH} (signed feed JSON)\n"
f"; https://{domain}/federation/key (public key PEM)\n"
f"; https://{domain}/federation/info (capabilities)\n"
)
return DNSRecord(
srv_name=srv_name,
srv_target=srv_target,
srv_port=port,
txt_name=txt_name,
txt_value=txt_value,
human_instructions=instructions,
)
# ---------- signing -----------------------------------------------------
def canonical_json(obj: Dict[str, Any]) -> bytes:
"""Deterministic JSON serialization — what we sign + hash over."""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def sign_payload(payload: bytes) -> bytes:
"""Ed25519 signature over `payload`. Raw 64-byte sig."""
priv, _ = node_keypair()
return priv.sign(payload)
def verify_payload(payload: bytes, signature: bytes, pubkey_pem: str) -> bool:
"""True iff `signature` verifies under `pubkey_pem`. Never raises."""
try:
pub = serialization.load_pem_public_key(pubkey_pem.encode("ascii"))
if not isinstance(pub, ed25519.Ed25519PublicKey):
return False
pub.verify(signature, payload)
return True
except Exception:
return False
# ---------- feed export -------------------------------------------------
def _case_digest(case_record: Dict[str, Any]) -> str:
return hashlib.sha256(canonical_json(case_record)).hexdigest()
def _build_case_records(window_hours: int) -> List[Dict[str, Any]]:
cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
out: List[Dict[str, Any]] = []
for case in db.list_cases(limit=10_000):
if case.ingested_at < cutoff:
continue
record: Dict[str, Any] = {
"case_id": case.case_id,
"summary": case.summary,
"severity": case.classification.severity.value if case.classification.severity else None,
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
"observed_at": case.observed_at.isoformat(),
"feed_source": case.source_metadata.get("feed", ""),
"iocs": (
[{"value": v, "type": "url"} for v in case.observables.urls]
+ [{"value": v, "type": "domain"} for v in case.observables.domains]
+ [{"value": v, "type": "ip"} for v in case.observables.ips]
+ [{"value": v, "type": "hash"} for v in case.observables.hashes]
+ [{"value": v, "type": "cve"} for v in case.observables.cves]
),
}
record["digest_sha256"] = _case_digest(
{k: v for k, v in record.items() if k != "digest_sha256"}
)
out.append(record)
return out
def _build_ioc_records(window_hours: int) -> List[Dict[str, Any]]:
cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
out: List[Dict[str, Any]] = []
seen: set = set()
for ioc_type in ("url", "domain", "ip", "hash", "cve"):
for row in db.iocs_by_type(ioc_type):
first_seen = row.get("first_seen")
if first_seen:
try:
if datetime.fromisoformat(first_seen) < cutoff:
continue
except ValueError:
pass
key = (row["value"], row["ioc_type"])
if key in seen:
continue
seen.add(key)
record = {
"value": row["value"],
"type": row["ioc_type"],
"severity": row.get("severity"),
"first_seen": first_seen,
}
record["digest_sha256"] = hashlib.sha256(canonical_json(record)).hexdigest()
out.append(record)
return out
def build_signed_feed(window_hours: int = 24) -> Dict[str, Any]:
"""Build the JSON feed peers will pull from /federation/feed.
Pulls cases ingested in the last `window_hours` plus the corresponding
IOC slice, attaches per-record `digest_sha256` (so peers can later
quorum-match across nodes), and signs the canonical JSON of the whole
payload-minus-signature with our Ed25519 key.
"""
payload: Dict[str, Any] = {
"version": FEED_VERSION,
"fingerprint": node_fingerprint(),
"generated_at": datetime.now(timezone.utc).isoformat(),
"window_hours": window_hours,
"cases": _build_case_records(window_hours),
"iocs": _build_ioc_records(window_hours),
}
sig = sign_payload(canonical_json(payload))
payload["signature"] = base64.b64encode(sig).decode("ascii")
return payload
# ---------- import + quorum-signal buffer -------------------------------
class ImportSummary(BaseModel):
peer_fingerprint: str
cases_seen: int
iocs_seen: int
signal_ids: List[Tuple[str, str]] = Field(default_factory=list)
def import_signed_feed(feed: Dict[str, Any], expected_pubkey_pem: str) -> Result[ImportSummary, str]:
"""Verify + record a peer's feed into the federation_signals buffer.
Does NOT merge into the local case store — that's the quorum stage's
job. The buffer is the per-hash signal log that quorum logic later
aggregates ("3 trusted peers reported this same IOC → promote").
"""
sig_b64 = feed.get("signature")
if not sig_b64:
return Err("missing signature")
try:
signature = base64.b64decode(sig_b64)
except Exception:
return Err("malformed signature (not base64)")
unsigned = {k: v for k, v in feed.items() if k != "signature"}
if not verify_payload(canonical_json(unsigned), signature, expected_pubkey_pem):
return Err("signature verification failed")
peer_fp = feed.get("fingerprint", "")
if not peer_fp:
return Err("missing fingerprint")
if peer_fp == node_fingerprint():
return Err("loop: own feed")
# Cross-check the declared fingerprint matches the pubkey we verified with.
try:
if _fingerprint_for_pubkey_pem(expected_pubkey_pem) != peer_fp:
return Err("fingerprint does not match provided pubkey")
except Exception as exc:
return Err(f"bad pubkey: {exc}")
now = datetime.now(timezone.utc).isoformat()
signal_ids: List[Tuple[str, str]] = []
cases = feed.get("cases") or []
iocs = feed.get("iocs") or []
for c in cases:
case_id = c.get("case_id") or ""
digest = c.get("digest_sha256") or hashlib.sha256(canonical_json(c)).hexdigest()
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="case",
signal_id=case_id,
signal_hash=digest,
received_at=now,
raw_json=json.dumps(c, sort_keys=True),
))
signal_ids.append(("case", digest))
for i in iocs:
value = i.get("value") or ""
digest = i.get("digest_sha256") or hashlib.sha256(canonical_json(i)).hexdigest()
db.record_signal(dict(
peer_fingerprint=peer_fp,
signal_type="ioc",
signal_id=value,
signal_hash=digest,
received_at=now,
raw_json=json.dumps(i, sort_keys=True),
))
signal_ids.append(("ioc", digest))
_log.info("federation.import.ok", peer=peer_fp, cases=len(cases), iocs=len(iocs))
return Ok(ImportSummary(
peer_fingerprint=peer_fp,
cases_seen=len(cases),
iocs_seen=len(iocs),
signal_ids=signal_ids,
))
# ---------- peer registry ------------------------------------------------
class Peer(BaseModel):
domain: str
fingerprint: str
pubkey_pem: str
status: str = "unknown" # unknown | trusted | blocked
discovered_at: str
last_seen: Optional[str] = None
notes: Optional[str] = None
def _row_to_peer(row: Dict[str, Any]) -> Peer:
return Peer(
domain=row["domain"],
fingerprint=row["fingerprint"],
pubkey_pem=row["pubkey_pem"],
status=row.get("status") or "unknown",
discovered_at=row.get("discovered_at") or "",
last_seen=row.get("last_seen"),
notes=row.get("notes"),
)
def register_peer(domain: str, fingerprint: str, pubkey_pem: str, status: str = "unknown") -> None:
"""Insert or update a peer in the registry. Idempotent on `domain`."""
now = datetime.now(timezone.utc).isoformat()
existing = db.get_peer(domain)
discovered_at = existing["discovered_at"] if existing else now
db.upsert_peer(dict(
domain=domain,
fingerprint=fingerprint,
pubkey_pem=pubkey_pem,
status=status,
discovered_at=discovered_at,
last_seen=now,
notes=existing.get("notes") if existing else None,
))
def list_peers() -> List[Peer]:
return [_row_to_peer(r) for r in db.list_peers()]
def get_peer(domain: str) -> Optional[Peer]:
row = db.get_peer(domain)
return _row_to_peer(row) if row else None
def set_peer_status(domain: str, status: str) -> None:
if status not in ("unknown", "trusted", "blocked"):
raise ValueError(f"unknown peer status: {status}")
db.set_peer_status(domain, status)
def remove_peer(domain: str) -> None:
db.remove_peer(domain)

380
src/psyc/lines/pulse.py Normal file
View File

@@ -0,0 +1,380 @@
"""Pulseline — cron-style scheduler that drives every psyc pipeline on a cadence.
Each registered pipeline has an autonomy mode (manual / auto-propose /
auto-execute) and a cadence in seconds. tick() iterates every pipeline and
fires whichever ones are due (and enabled, and not manual, and the global kill
switch is off). State persists to SQLite via psyc.db so cadences survive
restarts. A background asyncio loop calls tick() at a fixed interval — the
cockpit lifespan attaches it.
NOTE: federation pipelines (peer-pull, vouch-refresh) are wired as placeholders
that return a no-op string. Real federation lands in a later stage.
"""
from __future__ import annotations
import asyncio
import traceback
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
from psyc import db, log
_log = log.get(__name__)
class PulseMode(str, Enum):
MANUAL = "manual"
AUTO_PROPOSE = "auto-propose"
AUTO_EXECUTE = "auto-execute"
class Pipeline(BaseModel):
name: str
title: str
description: str
mode: PulseMode
cadence_seconds: int
enabled: bool = True
last_fired: Optional[datetime] = None
next_fire: Optional[datetime] = None
last_result: str = ""
last_outcome: str = "" # "ok" | "err" | "skipped" | ""
# ---------- pipeline runners --------------------------------------------------
def _run_fetch() -> str:
"""Fetch every enabled scout source; partial fetch is fine.
Skip-on-fail is critical: keyed feeds (threatfox, malware-bazaar, otx)
raise when their key isn't configured — we don't want one missing key to
block the public ones.
"""
from psyc.lines import scout
plan: Tuple[Tuple[str, Optional[int]], ...] = (
("urlhaus", 50),
("cisa-kev", 100),
("feodo", 50),
("threatfox", 200),
("malware-bazaar", 100),
("otx", 100),
)
total = 0
feeds_ok = 0
feeds_err: List[str] = []
for source, limit in plan:
try:
cases = scout.fetch_and_signal(source, limit=limit)
for c in cases:
db.upsert_case(c)
total += len(cases)
feeds_ok += 1
except Exception as exc: # noqa: BLE001 — partial fetch is the point
feeds_err.append(source)
_log.info("pulse.fetch.skip", source=source, error=str(exc)[:200])
tail = f" (skipped: {', '.join(feeds_err)})" if feeds_err else ""
return f"fetched {total} cases across {feeds_ok} feed(s){tail}"
def _run_classify() -> str:
from psyc.lines import classify
cases = db.list_cases(limit=10_000)
n = 0
for c in cases:
classify.classify(c)
db.upsert_case(c)
n += 1
return f"classified {n} case(s)"
def _run_prove() -> str:
from psyc.lines import proof
cases = db.list_cases(limit=10_000)
n = 0
for c in cases:
proof.prove(c)
db.upsert_case(c)
n += 1
return f"proved {n} case(s)"
def _run_reindex() -> str:
from psyc.lines import lookup
cases = db.list_cases(limit=1_000_000)
written = lookup.reindex(cases)
return f"indexed {written} IOC(s) from {len(cases)} case(s)"
def _run_respond_propose() -> str:
"""Propose response actions for high-severity cases that don't yet have any.
Strictly proposal-only: nothing is dispatched. The respond.propose_for_case
helper is already idempotent per case (skips cases that already have
actions), so re-running this on a tick is safe.
"""
from psyc.lines import respond
cases = db.list_cases(limit=10_000)
proposed = 0
touched = 0
for c in cases:
ids = respond.propose_for_case(c)
if ids:
proposed += len(ids)
touched += 1
return f"proposed {proposed} action(s) for {touched} case(s)"
def _run_peer_pull() -> str:
return "federation not yet active"
def _run_vouch_refresh() -> str:
return "federation not yet active"
# ---------- registry ----------------------------------------------------------
_REGISTRY: Dict[str, Callable[[], str]] = {
"fetch": _run_fetch,
"classify": _run_classify,
"prove": _run_prove,
"reindex": _run_reindex,
"respond": _run_respond_propose,
"peer-pull": _run_peer_pull,
"vouch-refresh": _run_vouch_refresh,
}
# Initial defaults — seeded once on first DB init. Tuples of
# (name, title, description, mode, cadence_seconds, enabled).
_DEFAULTS: Tuple[Tuple[str, str, str, PulseMode, int, bool], ...] = (
("fetch", "Scout · fetch feeds", "Pull every configured threat feed and ingest new cases.",
PulseMode.AUTO_EXECUTE, 900, True),
("classify", "Classify · label cases", "Assign incident type, severity, TLP, and internal class to every case.",
PulseMode.AUTO_EXECUTE, 300, True),
("prove", "Proof · score confidence", "Compute confidence (reliability · credibility · freshness) for every case.",
PulseMode.AUTO_EXECUTE, 300, True),
("reindex", "Lookup · rebuild IOC index","Rebuild the IOC reverse-index over the case corpus.",
PulseMode.AUTO_EXECUTE, 3600, True),
("respond", "Respond · propose actions", "Propose human-gated response actions for newly high-severity cases.",
PulseMode.AUTO_PROPOSE, 600, True),
("peer-pull", "Federation · peer pull", "(placeholder) Pull sealed cases from federated peers.",
PulseMode.MANUAL, 600, False),
("vouch-refresh","Federation · vouch refresh","(placeholder) Refresh per-peer vouching ledgers.",
PulseMode.MANUAL, 3600, False),
)
# ---------- helpers -----------------------------------------------------------
def _now() -> datetime:
return datetime.now(timezone.utc)
def _parse_dt(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def _row_to_pipeline(row: dict) -> Pipeline:
return Pipeline(
name=row["name"],
title=row["title"],
description=row["description"],
mode=PulseMode(row["mode"]),
cadence_seconds=int(row["cadence_seconds"]),
enabled=bool(row["enabled"]),
last_fired=_parse_dt(row.get("last_fired")),
next_fire=_parse_dt(row.get("next_fire")),
last_result=row.get("last_result") or "",
last_outcome=row.get("last_outcome") or "",
)
def _pipeline_to_row(p: Pipeline) -> dict:
return dict(
name=p.name,
title=p.title,
description=p.description,
mode=p.mode.value,
cadence_seconds=int(p.cadence_seconds),
enabled=bool(p.enabled),
last_fired=p.last_fired.isoformat() if p.last_fired else None,
next_fire=p.next_fire.isoformat() if p.next_fire else None,
last_result=p.last_result or "",
last_outcome=p.last_outcome or "",
)
def seed_defaults() -> None:
"""Insert any default pipelines that aren't already in the DB. Idempotent."""
existing = {row["name"] for row in db.get_pulse_state()}
for name, title, desc, mode, cadence, enabled in _DEFAULTS:
if name in existing:
continue
p = Pipeline(
name=name, title=title, description=desc,
mode=mode, cadence_seconds=cadence, enabled=enabled,
next_fire=_now(), # first tick after install fires immediately if due
)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
_log.info("pulse.defaults.seeded", count=len(_DEFAULTS))
def state() -> List[Pipeline]:
"""Every pipeline, ordered by name. Seeds defaults on the first call."""
rows = db.get_pulse_state()
if not rows:
seed_defaults()
rows = db.get_pulse_state()
return [_row_to_pipeline(r) for r in rows]
def _get_pipeline(name: str) -> Optional[Pipeline]:
for p in state():
if p.name == name:
return p
return None
def set_mode(name: str, mode: PulseMode) -> None:
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.mode = mode
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_cadence(name: str, seconds: int) -> None:
if seconds <= 0:
raise ValueError("cadence must be > 0 seconds")
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.cadence_seconds = int(seconds)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_enabled(name: str, enabled: bool) -> None:
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
p.enabled = bool(enabled)
db.upsert_pulse_pipeline(_pipeline_to_row(p))
def set_kill_switch(armed: bool) -> None:
db.kill_switch_set(armed)
_log.warning("pulse.killswitch.changed", armed=bool(armed))
def kill_switch_state() -> bool:
return db.kill_switch_get()
# ---------- the heartbeat -----------------------------------------------------
def _fire(p: Pipeline) -> Tuple[str, str]:
"""Run one pipeline. Returns (outcome, result_str). Persists the timestamps.
Outcome is "ok" if the runner returned, "err" if it raised.
"""
runner = _REGISTRY.get(p.name)
if runner is None:
outcome = "err"
result = f"no runner registered for '{p.name}'"
else:
try:
result = runner() or ""
outcome = "ok"
except Exception as exc: # noqa: BLE001 — log + record, don't crash the loop
outcome = "err"
result = f"{type(exc).__name__}: {exc}"
_log.warning("pulse.fire.error", name=p.name, error=result, trace=traceback.format_exc())
now = _now()
p.last_fired = now
p.next_fire = now + timedelta(seconds=max(1, p.cadence_seconds))
p.last_result = result[:500]
p.last_outcome = outcome
db.upsert_pulse_pipeline(_pipeline_to_row(p))
_log.info("pulse.fired", name=p.name, outcome=outcome, result=p.last_result)
return outcome, p.last_result
def _should_fire(p: Pipeline, now: datetime) -> bool:
if not p.enabled:
return False
if p.mode == PulseMode.MANUAL:
return False
if p.next_fire is None:
return True
return now >= p.next_fire
def tick() -> List[Tuple[str, str, str]]:
"""Single scheduler heartbeat. Returns (name, outcome, result_str) per pipeline.
Outcome is "ok" / "err" / "skipped" — every registered pipeline appears in
the return value so callers can see what was skipped and why.
"""
if kill_switch_state():
_log.info("pulse.tick.killed")
return [(p.name, "skipped", "kill switch armed") for p in state()]
now = _now()
out: List[Tuple[str, str, str]] = []
for p in state():
if not _should_fire(p, now):
out.append((p.name, "skipped", "not due"))
continue
outcome, result = _fire(p)
out.append((p.name, outcome, result))
return out
def run_now(name: str) -> Tuple[str, str]:
"""Manually fire one pipeline, bypassing cadence and mode. Honors kill switch.
Returns (outcome, result_str). Raises ValueError on unknown name.
"""
if kill_switch_state():
return ("skipped", "kill switch armed")
p = _get_pipeline(name)
if p is None:
raise ValueError(f"unknown pipeline: {name}")
return _fire(p)
# ---------- background loop ---------------------------------------------------
async def start_background_loop(interval_seconds: int = 30) -> None:
"""Long-running scheduler — calls tick() every interval. Launched from FastAPI lifespan.
Designed to run for the life of the process; cancellation is the normal stop signal.
"""
_log.info("pulse.loop.starting", interval=interval_seconds)
while True:
try:
tick()
except Exception as exc: # noqa: BLE001 — one bad tick must not kill the loop
_log.warning("pulse.loop.tick.error", error=str(exc), trace=traceback.format_exc())
try:
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
_log.info("pulse.loop.cancelled")
raise

230
tests/test_federation.py Normal file
View File

@@ -0,0 +1,230 @@
"""Federation — identity, signed feed, peer registry, signal buffer."""
from __future__ import annotations
import base64
import os
import stat
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import federation
from psyc.lines.federation import (
DNSRecord,
build_signed_feed,
canonical_json,
dns_record,
import_signed_feed,
node_fingerprint,
node_keypair,
public_key_pem,
sign_payload,
verify_payload,
)
from psyc.result import Err, Ok
from conftest import make_case
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
@pytest.fixture
def fed_dir(tmp_path, monkeypatch):
"""Redirect federation key paths to a tmp dir so each test gets a fresh key."""
d = tmp_path / "federation"
monkeypatch.setattr(federation, "FED_DIR", d)
monkeypatch.setattr(federation, "PRIVATE_KEY_PATH", d / "node.key")
monkeypatch.setattr(federation, "PUBLIC_KEY_PATH", d / "node.pub")
yield d
def test_keypair_persisted_with_correct_perms(fed_dir):
priv, pub = node_keypair()
assert federation.PRIVATE_KEY_PATH.exists()
assert federation.PUBLIC_KEY_PATH.exists()
mode = stat.S_IMODE(os.stat(federation.PRIVATE_KEY_PATH).st_mode)
assert mode == 0o600
def test_keypair_idempotent_across_calls(fed_dir):
priv1, pub1 = node_keypair()
priv2, pub2 = node_keypair()
# raw bytes match — same key loaded twice, not regenerated
raw1 = pub1.public_bytes(
encoding=federation.serialization.Encoding.Raw,
format=federation.serialization.PublicFormat.Raw,
)
raw2 = pub2.public_bytes(
encoding=federation.serialization.Encoding.Raw,
format=federation.serialization.PublicFormat.Raw,
)
assert raw1 == raw2
def test_fingerprint_is_stable_and_32_hex(fed_dir):
fp1 = node_fingerprint()
fp2 = node_fingerprint()
assert fp1 == fp2
assert len(fp1) == 32
assert all(c in "0123456789abcdef" for c in fp1)
def test_sign_verify_roundtrip(fed_dir):
payload = b"hello federation"
sig = sign_payload(payload)
assert verify_payload(payload, sig, public_key_pem()) is True
def test_verify_with_wrong_key_returns_false(fed_dir, tmp_path):
payload = b"the truth"
sig = sign_payload(payload)
# Build a *different* keypair in a separate directory and use its pubkey.
other = tmp_path / "other-federation"
other.mkdir()
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
other_priv = ed25519.Ed25519PrivateKey.generate()
other_pub_pem = other_priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
assert verify_payload(payload, sig, other_pub_pem) is False
def test_verify_with_garbage_pubkey_returns_false_no_raise(fed_dir):
sig = sign_payload(b"x")
assert verify_payload(b"x", sig, "not a pem") is False
def test_canonical_json_is_deterministic():
a = canonical_json({"b": 1, "a": 2, "nested": {"y": 1, "x": 2}})
b = canonical_json({"a": 2, "b": 1, "nested": {"x": 2, "y": 1}})
assert a == b
def test_dns_record_txt_value_matches_spec(fed_dir):
rec = dns_record("example.com")
assert isinstance(rec, DNSRecord)
fp = node_fingerprint()
assert rec.srv_name == "_psyc._tcp.example.com"
assert rec.srv_target == "example.com."
assert rec.srv_port == 443
assert rec.txt_value == f"v=psyc1 fp={fp} alg=ed25519 path=/federation/feed"
# human instructions include both record lines
assert "_psyc._tcp.example.com" in rec.human_instructions
assert "SRV" in rec.human_instructions
assert "TXT" in rec.human_instructions
def test_build_then_import_signed_feed_roundtrip(fresh_db, fed_dir):
case = make_case(feed="urlhaus", ips=["1.1.1.1"], urls=["http://1.1.1.1/x"])
db.upsert_case(case)
feed = build_signed_feed(window_hours=24)
assert feed["version"] == "psyc1"
assert feed["fingerprint"] == node_fingerprint()
assert feed["signature"]
# cases entry made it in
assert any(c["case_id"] == case.case_id for c in feed["cases"])
# Import using our own pubkey against a *different* declared fingerprint:
# swap the fingerprint so import_signed_feed doesn't reject as a loop.
pub = public_key_pem()
# Use a fresh keypair to act as "peer" — sign a feed with that key.
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
import hashlib
peer_priv = ed25519.Ed25519PrivateKey.generate()
peer_pub_pem = peer_priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
peer_raw = peer_priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
peer_fp = hashlib.sha256(peer_raw).digest()[:16].hex()
feed["fingerprint"] = peer_fp
unsigned = {k: v for k, v in feed.items() if k != "signature"}
new_sig = peer_priv.sign(canonical_json(unsigned))
feed["signature"] = base64.b64encode(new_sig).decode("ascii")
result = import_signed_feed(feed, peer_pub_pem)
assert isinstance(result, Ok), getattr(result, "reason", "")
summary = result.value
assert summary.peer_fingerprint == peer_fp
assert summary.cases_seen >= 1
def test_import_with_wrong_pubkey_returns_err(fresh_db, fed_dir):
db.upsert_case(make_case(feed="urlhaus", ips=["2.2.2.2"]))
feed = build_signed_feed(window_hours=24)
# build a *different* pubkey to claim verification against
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
other_pub_pem = ed25519.Ed25519PrivateKey.generate().public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("ascii")
# also need to change fingerprint so the loop-check doesn't trigger first
feed["fingerprint"] = "deadbeef" * 4
result = import_signed_feed(feed, other_pub_pem)
assert isinstance(result, Err)
def test_import_own_feed_returns_loop_err(fresh_db, fed_dir):
db.upsert_case(make_case(feed="urlhaus", ips=["3.3.3.3"]))
feed = build_signed_feed(window_hours=24)
result = import_signed_feed(feed, public_key_pem())
assert isinstance(result, Err)
assert "loop" in result.reason
def test_record_signal_then_lookup_by_hash(fresh_db):
rid = db.record_signal(dict(
peer_fingerprint="abc123",
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="hash-aaa",
received_at="2026-01-01T00:00:00+00:00",
raw_json="{}",
))
assert rid > 0
rows = db.signals_for_hash("hash-aaa")
assert len(rows) == 1
assert rows[0]["peer_fingerprint"] == "abc123"
# second peer reports the same hash → both surface for quorum check
db.record_signal(dict(
peer_fingerprint="def456",
signal_type="ioc",
signal_id="1.2.3.4",
signal_hash="hash-aaa",
received_at="2026-01-01T00:01:00+00:00",
raw_json="{}",
))
rows = db.signals_for_hash("hash-aaa")
assert {r["peer_fingerprint"] for r in rows} == {"abc123", "def456"}
def test_peer_registry_crud(fresh_db, fed_dir):
federation.register_peer("peer.example", "ff" * 16, "PEM", status="trusted")
peers = federation.list_peers()
assert len(peers) == 1
assert peers[0].domain == "peer.example"
assert peers[0].status == "trusted"
federation.set_peer_status("peer.example", "blocked")
assert federation.get_peer("peer.example").status == "blocked"
federation.remove_peer("peer.example")
assert federation.list_peers() == []

218
tests/test_pulse.py Normal file
View File

@@ -0,0 +1,218 @@
"""Pulse scheduler tests — cadence, kill switch, mode gating, persistence."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import pulse
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
"""A temp SQLite + a swapped-in registry of fast, deterministic runners.
Real runners would talk to scout / classify / proof / etc — none of which
we want to exercise from a pulse-only test. We replace _REGISTRY in-place
so tick() drives the scheduler logic against trivial counters.
"""
test_db = tmp_path / "pulse.db"
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(test_engine, checkfirst=True)
monkeypatch.setattr(db, "_engine", test_engine)
monkeypatch.setattr(db, "DB_PATH", test_db)
counters = {name: 0 for name in pulse._REGISTRY}
def _make(name: str):
def runner() -> str:
counters[name] += 1
return f"{name}: tick {counters[name]}"
return runner
fake_registry = {name: _make(name) for name in pulse._REGISTRY}
monkeypatch.setattr(pulse, "_REGISTRY", fake_registry)
return counters
def _set_last_fired(name: str, when: datetime, cadence: int) -> None:
"""Force a pipeline's last_fired + next_fire to a known instant."""
for p in pulse.state():
if p.name == name:
p.last_fired = when
p.next_fire = when + timedelta(seconds=cadence)
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
return
raise AssertionError(f"unknown pipeline {name}")
def test_defaults_seeded_on_first_state(fresh_db):
pipelines = pulse.state()
names = {p.name for p in pipelines}
assert names == {"fetch", "classify", "prove", "reindex", "respond", "peer-pull", "vouch-refresh"}
# Spot-check a couple of defaults from the spec.
by_name = {p.name: p for p in pipelines}
assert by_name["fetch"].mode == pulse.PulseMode.AUTO_EXECUTE
assert by_name["fetch"].cadence_seconds == 900
assert by_name["peer-pull"].enabled is False
assert by_name["respond"].mode == pulse.PulseMode.AUTO_PROPOSE
def test_state_persists_across_reads(fresh_db):
pulse.set_cadence("fetch", 42)
pulse.set_mode("classify", pulse.PulseMode.MANUAL)
pulse.set_enabled("reindex", False)
by_name = {p.name: p for p in pulse.state()}
assert by_name["fetch"].cadence_seconds == 42
assert by_name["classify"].mode == pulse.PulseMode.MANUAL
assert by_name["reindex"].enabled is False
def test_tick_skips_not_due_pipelines(fresh_db):
# Push every pipeline well into the future — nothing should fire.
now = datetime.now(timezone.utc)
for p in pulse.state():
_set_last_fired(p.name, now, p.cadence_seconds)
results = pulse.tick()
outcomes = {name: outcome for name, outcome, _ in results}
assert all(o == "skipped" for o in outcomes.values()), outcomes
assert all(c == 0 for c in fresh_db.values()), fresh_db
def test_tick_fires_due_auto_pipelines(fresh_db):
# Force "fetch" to be due (last fired far in the past) and re-tick.
past = datetime.now(timezone.utc) - timedelta(hours=24)
for p in pulse.state():
_set_last_fired(p.name, past, p.cadence_seconds)
results = pulse.tick()
by_name = {name: (outcome, result) for name, outcome, result in results}
# auto-execute + auto-propose modes should fire; manual should skip.
assert by_name["fetch"][0] == "ok"
assert by_name["classify"][0] == "ok"
assert by_name["prove"][0] == "ok"
assert by_name["reindex"][0] == "ok"
assert by_name["respond"][0] == "ok"
# Manual + disabled pipelines stay skipped.
assert by_name["peer-pull"][0] == "skipped"
assert by_name["vouch-refresh"][0] == "skipped"
def test_tick_respects_manual_mode(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
pulse.set_mode("fetch", pulse.PulseMode.MANUAL)
results = pulse.tick()
by_name = {n: o for n, o, _ in results}
assert by_name["fetch"] == "skipped"
assert fresh_db["fetch"] == 0
def test_tick_respects_disabled(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
pulse.set_enabled("fetch", False)
results = pulse.tick()
by_name = {n: o for n, o, _ in results}
assert by_name["fetch"] == "skipped"
assert fresh_db["fetch"] == 0
def test_kill_switch_halts_everything(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
for p in pulse.state():
_set_last_fired(p.name, past, p.cadence_seconds)
pulse.set_kill_switch(True)
results = pulse.tick()
assert all(o == "skipped" for _, o, _ in results)
assert all(c == 0 for c in fresh_db.values())
# And killswitch state itself round-trips.
assert pulse.kill_switch_state() is True
pulse.set_kill_switch(False)
assert pulse.kill_switch_state() is False
def test_tick_updates_last_fired_and_next_fire(fresh_db):
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
before = datetime.now(timezone.utc)
pulse.tick()
p = {x.name: x for x in pulse.state()}["fetch"]
assert p.last_fired is not None
assert p.last_fired >= before - timedelta(seconds=2)
assert p.next_fire is not None
assert p.next_fire > p.last_fired
delta = (p.next_fire - p.last_fired).total_seconds()
# 900s cadence, allow a 2s rounding window.
assert 898 <= delta <= 902
assert p.last_outcome == "ok"
assert "fetch: tick" in p.last_result
def test_run_now_bypasses_cadence(fresh_db):
# Pipeline isn't due — run_now must still fire.
future = datetime.now(timezone.utc) + timedelta(hours=1)
for p in pulse.state():
p.last_fired = datetime.now(timezone.utc)
p.next_fire = future
db.upsert_pulse_pipeline(pulse._pipeline_to_row(p))
outcome, result = pulse.run_now("fetch")
assert outcome == "ok"
assert "fetch: tick" in result
assert fresh_db["fetch"] == 1
def test_run_now_respects_kill_switch(fresh_db):
pulse.set_kill_switch(True)
outcome, result = pulse.run_now("fetch")
assert outcome == "skipped"
assert "kill switch" in result
assert fresh_db["fetch"] == 0
def test_run_now_even_when_manual(fresh_db):
"""Manual mode blocks tick() but NOT run_now — that's the whole point of manual."""
pulse.set_mode("peer-pull", pulse.PulseMode.MANUAL)
pulse.set_enabled("peer-pull", True)
outcome, _ = pulse.run_now("peer-pull")
assert outcome == "ok"
assert fresh_db["peer-pull"] == 1
def test_runner_exception_recorded_as_err(fresh_db, monkeypatch):
def boom() -> str:
raise RuntimeError("nope")
# Inject a failing runner just for "fetch".
bad_registry = dict(pulse._REGISTRY)
bad_registry["fetch"] = boom
monkeypatch.setattr(pulse, "_REGISTRY", bad_registry)
past = datetime.now(timezone.utc) - timedelta(hours=24)
_set_last_fired("fetch", past, 900)
results = pulse.tick()
by_name = {n: (o, r) for n, o, r in results}
assert by_name["fetch"][0] == "err"
assert "nope" in by_name["fetch"][1]
# And the failure is persisted, with next_fire still pushed forward so we
# don't busy-retry a broken runner every tick.
p = {x.name: x for x in pulse.state()}["fetch"]
assert p.last_outcome == "err"
assert p.next_fire is not None and p.next_fire > p.last_fired
def test_unknown_pipeline_raises(fresh_db):
with pytest.raises(ValueError):
pulse.set_mode("does-not-exist", pulse.PulseMode.MANUAL)
with pytest.raises(ValueError):
pulse.set_cadence("does-not-exist", 60)
with pytest.raises(ValueError):
pulse.run_now("does-not-exist")
def test_invalid_cadence_raises(fresh_db):
with pytest.raises(ValueError):
pulse.set_cadence("fetch", 0)
with pytest.raises(ValueError):
pulse.set_cadence("fetch", -5)