stage-18: approval queue — human gate before evidence leaves
CERT-Bund (authority) requires_approval by default; PSYC_REQUIRE_APPROVAL=1 forces every routable submission through the queue. Courier branches at execute_routes: approval-required → freeze payload + enqueue, no HTTP; else submit directly as before. Approve dispatches the frozen payload to mock-cert and writes the ledger row (detail=approved_by=…); reject writes a ledger row with the reviewer's reason. CLI: queue / approve / reject. Cockpit /queue page with POST approve / reject and counts. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -284,6 +284,55 @@ def submit_case(case_id: str) -> None:
|
|||||||
typer.echo(f" ⊘ {b.destination_name}: {b.reason} (logged)")
|
typer.echo(f" ⊘ {b.destination_name}: {b.reason} (logged)")
|
||||||
|
|
||||||
|
|
||||||
|
@app.command("queue")
|
||||||
|
def queue_list(
|
||||||
|
status: str = typer.Option("pending", help="pending | approved | rejected | all"),
|
||||||
|
limit: int = typer.Option(50, help="max rows"),
|
||||||
|
) -> None:
|
||||||
|
"""List the approval queue."""
|
||||||
|
from psyc.models import ApprovalStatus
|
||||||
|
status_filter = None if status == "all" else ApprovalStatus(status)
|
||||||
|
rows = courier.list_pending(status=status_filter, limit=limit)
|
||||||
|
if not rows:
|
||||||
|
typer.echo(f"(no submissions with status={status})")
|
||||||
|
return
|
||||||
|
for p in rows:
|
||||||
|
rev = f" by {p.reviewer}" if p.reviewer else ""
|
||||||
|
typer.echo(
|
||||||
|
f" #{p.id} {p.status.value:9s} {p.destination_name:16s} {p.case_id} "
|
||||||
|
f"({p.payload_kind}, tlp={p.tlp.value}){rev}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.command("approve")
|
||||||
|
def approve(
|
||||||
|
pending_id: int = typer.Argument(..., help="pending submission id"),
|
||||||
|
reviewer: str = typer.Option("operator", "--by", help="reviewer identity"),
|
||||||
|
) -> None:
|
||||||
|
"""Approve a pending submission — dispatches to its destination."""
|
||||||
|
result = courier.dispatch_pending(pending_id, reviewer=reviewer)
|
||||||
|
if isinstance(result, Err):
|
||||||
|
typer.echo(f"error: {result.reason}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
r = result.value
|
||||||
|
rcpt = f" → {r.receipt_id}" if r.receipt_id else ""
|
||||||
|
typer.echo(f"approved #{pending_id} · {r.destination_name}: {r.outcome.value}{rcpt}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.command("reject")
|
||||||
|
def reject(
|
||||||
|
pending_id: int = typer.Argument(..., help="pending submission id"),
|
||||||
|
reviewer: str = typer.Option("operator", "--by", help="reviewer identity"),
|
||||||
|
reason: str = typer.Option("", "--reason", help="rejection reason"),
|
||||||
|
) -> None:
|
||||||
|
"""Reject a pending submission — nothing leaves; ledger row written."""
|
||||||
|
result = courier.reject_pending(pending_id, reviewer=reviewer, reason=reason)
|
||||||
|
if isinstance(result, Err):
|
||||||
|
typer.echo(f"error: {result.reason}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
typer.echo(f"rejected #{pending_id}{(': ' + reason) if reason else ''}")
|
||||||
|
|
||||||
|
|
||||||
@app.command("mock-cert")
|
@app.command("mock-cert")
|
||||||
def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
|
def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
|
||||||
uvicorn.run("psyc.mock_cert:app", host=host, port=port)
|
uvicorn.run("psyc.mock_cert:app", host=host, port=port)
|
||||||
|
|||||||
@@ -5,13 +5,14 @@ from __future__ import annotations
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException, Request
|
from fastapi import FastAPI, Form, HTTPException, Request
|
||||||
from fastapi.responses import HTMLResponse
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
|
|
||||||
from psyc import db, log
|
from psyc import db, log
|
||||||
from psyc.cockpit import inference, journey as journey_view
|
from psyc.cockpit import inference, journey as journey_view
|
||||||
|
from psyc.lines import courier as courier_line
|
||||||
from psyc.lines import ledger as ledger_line
|
from psyc.lines import ledger as ledger_line
|
||||||
from psyc.lines import route as route_line
|
from psyc.lines import route as route_line
|
||||||
from psyc.lines import seal as seal_line
|
from psyc.lines import seal as seal_line
|
||||||
@@ -102,3 +103,36 @@ def healthz() -> dict:
|
|||||||
def inference_status() -> dict:
|
def inference_status() -> dict:
|
||||||
adapter = inference.server_adapter()
|
adapter = inference.server_adapter()
|
||||||
return {"online": adapter is not None, "adapter": adapter}
|
return {"online": adapter is not None, "adapter": adapter}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/queue", response_class=HTMLResponse)
|
||||||
|
def queue_view(request: Request, status: str = "pending") -> HTMLResponse:
|
||||||
|
from psyc.models import ApprovalStatus
|
||||||
|
status_filter = None if status == "all" else ApprovalStatus(status)
|
||||||
|
rows = courier_line.list_pending(status=status_filter, limit=200)
|
||||||
|
counts = {
|
||||||
|
"pending": courier_line.pending_count(ApprovalStatus.PENDING),
|
||||||
|
"approved": courier_line.pending_count(ApprovalStatus.APPROVED),
|
||||||
|
"rejected": courier_line.pending_count(ApprovalStatus.REJECTED),
|
||||||
|
}
|
||||||
|
return TEMPLATES.TemplateResponse(
|
||||||
|
request,
|
||||||
|
"queue.html",
|
||||||
|
{"rows": rows, "counts": counts, "current_status": status},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/queue/approve/{pid}")
|
||||||
|
def queue_approve(pid: int, reviewer: str = Form("operator")) -> RedirectResponse:
|
||||||
|
result = courier_line.dispatch_pending(pid, reviewer=reviewer)
|
||||||
|
if isinstance(result, Err):
|
||||||
|
_log.warning("cockpit.queue.approve.error", pending_id=pid, reason=result.reason)
|
||||||
|
return RedirectResponse("/queue", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/queue/reject/{pid}")
|
||||||
|
def queue_reject(pid: int, reviewer: str = Form("operator"), reason: str = Form("")) -> RedirectResponse:
|
||||||
|
result = courier_line.reject_pending(pid, reviewer=reviewer, reason=reason)
|
||||||
|
if isinstance(result, Err):
|
||||||
|
_log.warning("cockpit.queue.reject.error", pending_id=pid, reason=result.reason)
|
||||||
|
return RedirectResponse("/queue", status_code=303)
|
||||||
|
|||||||
@@ -285,3 +285,32 @@ tr.sev-low .sev-badge { color: var(--muted); }
|
|||||||
border-radius: 4px; padding: 4px 12px; font: inherit; font-size: 12px; cursor: pointer;
|
border-radius: 4px; padding: 4px 12px; font: inherit; font-size: 12px; cursor: pointer;
|
||||||
}
|
}
|
||||||
.replay-btn:hover { border-color: var(--accent); }
|
.replay-btn:hover { border-color: var(--accent); }
|
||||||
|
|
||||||
|
/* ── approval queue ─────────────────────────────────────────── */
|
||||||
|
.queue-tabs { display: flex; gap: 4px; margin: 12px 0 16px; border-bottom: 1px solid var(--border); }
|
||||||
|
.queue-tab {
|
||||||
|
padding: 6px 14px; color: var(--muted); border: 1px solid transparent;
|
||||||
|
border-bottom: none; border-radius: 4px 4px 0 0; font-size: 12px;
|
||||||
|
text-transform: uppercase; letter-spacing: 0.05em;
|
||||||
|
}
|
||||||
|
.queue-tab:hover { color: var(--text); }
|
||||||
|
.queue-tab.is-active {
|
||||||
|
color: var(--accent); border-color: var(--border) var(--border) transparent;
|
||||||
|
background: var(--panel-2);
|
||||||
|
}
|
||||||
|
.queue-action { display: inline-flex; gap: 4px; margin-right: 6px; align-items: center; }
|
||||||
|
.btn {
|
||||||
|
background: var(--panel-2); color: var(--text); border: 1px solid var(--border);
|
||||||
|
border-radius: 3px; padding: 4px 10px; font: inherit; font-size: 12px; cursor: pointer;
|
||||||
|
}
|
||||||
|
.btn:hover { border-color: var(--accent); }
|
||||||
|
.btn-approve { color: var(--green); border-color: rgba(74, 222, 128, 0.35); }
|
||||||
|
.btn-approve:hover { background: rgba(74, 222, 128, 0.08); border-color: var(--green); }
|
||||||
|
.btn-reject { color: var(--red); border-color: rgba(248, 113, 113, 0.35); }
|
||||||
|
.btn-reject:hover { background: rgba(248, 113, 113, 0.08); border-color: var(--red); }
|
||||||
|
.reject-reason {
|
||||||
|
background: var(--bg); color: var(--text); border: 1px solid var(--border);
|
||||||
|
border-radius: 3px; padding: 3px 6px; font: inherit; font-size: 11px; width: 130px;
|
||||||
|
}
|
||||||
|
.reject-reason::placeholder { color: var(--muted); }
|
||||||
|
.outcome-pending_approval { background: rgba(251, 191, 36, 0.15); color: var(--amber); border: 1px solid rgba(251, 191, 36, 0.4); }
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
</a>
|
</a>
|
||||||
<nav class="nav">
|
<nav class="nav">
|
||||||
<a href="/cases">Cases</a>
|
<a href="/cases">Cases</a>
|
||||||
|
<a href="/queue">Queue</a>
|
||||||
<a href="/ledger">Ledger</a>
|
<a href="/ledger">Ledger</a>
|
||||||
<a href="/train">Trainline</a>
|
<a href="/train">Trainline</a>
|
||||||
</nav>
|
</nav>
|
||||||
|
|||||||
73
src/psyc/cockpit/templates/queue.html
Normal file
73
src/psyc/cockpit/templates/queue.html
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
{% block title %}Approval Queue — psyc{% endblock %}
|
||||||
|
{% block content %}
|
||||||
|
<section class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h1>Submission Approval Queue</h1>
|
||||||
|
<span class="count">{{ counts.pending }} pending · {{ counts.approved }} approved · {{ counts.rejected }} rejected</span>
|
||||||
|
</div>
|
||||||
|
<p class="page-intro">Nothing leaves psyc to an authority destination without a human signing off. Routing builds the payload, freezes it here, and waits. You approve — Courier dispatches and the Ledger records. You reject — nothing leaves, and the rejection is recorded too.</p>
|
||||||
|
<details class="page-help">
|
||||||
|
<summary>how to use this view</summary>
|
||||||
|
<div class="help-body">
|
||||||
|
<p><b>How to use.</b> Each row is a payload waiting on you. Click the case to inspect it before deciding. Approve sends it now; Reject blocks it forever (the case can still be re-submitted later from CLI if appropriate).</p>
|
||||||
|
<p><b>What you're seeing.</b> Pending submissions for destinations marked <code>requires_approval=True</code> — CERT-Bund by default, or <em>all</em> destinations when <code>PSYC_REQUIRE_APPROVAL=1</code>.</p>
|
||||||
|
<p><b>Why it matters.</b> The dossier mandates a human gate before evidence reaches real authority systems. This is that gate. The frozen payload guarantees the reviewer approves exactly what gets sent — not a re-derived version that might have drifted.</p>
|
||||||
|
</div>
|
||||||
|
</details>
|
||||||
|
|
||||||
|
<div class="queue-tabs">
|
||||||
|
<a class="queue-tab{% if current_status == 'pending' %} is-active{% endif %}" href="/queue?status=pending">pending ({{ counts.pending }})</a>
|
||||||
|
<a class="queue-tab{% if current_status == 'approved' %} is-active{% endif %}" href="/queue?status=approved">approved ({{ counts.approved }})</a>
|
||||||
|
<a class="queue-tab{% if current_status == 'rejected' %} is-active{% endif %}" href="/queue?status=rejected">rejected ({{ counts.rejected }})</a>
|
||||||
|
<a class="queue-tab{% if current_status == 'all' %} is-active{% endif %}" href="/queue?status=all">all</a>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{% if not rows %}
|
||||||
|
<p class="empty">Queue is clear. Either nothing is pending, or no destination on the current cases requires approval. Set <code>PSYC_REQUIRE_APPROVAL=1</code> to force every routable submission through this gate.</p>
|
||||||
|
{% else %}
|
||||||
|
<table class="ledger">
|
||||||
|
<thead>
|
||||||
|
<tr>
|
||||||
|
<th>#</th>
|
||||||
|
<th>Created</th>
|
||||||
|
<th>Case</th>
|
||||||
|
<th>Destination</th>
|
||||||
|
<th>Payload</th>
|
||||||
|
<th>TLP</th>
|
||||||
|
<th>Hash</th>
|
||||||
|
<th>Status</th>
|
||||||
|
<th>Action</th>
|
||||||
|
</tr>
|
||||||
|
</thead>
|
||||||
|
<tbody>
|
||||||
|
{% for p in rows %}
|
||||||
|
<tr class="ledger-row{% if p.status.value == 'rejected' %} is-rejected{% elif p.status.value == 'approved' %} is-actioned{% endif %}">
|
||||||
|
<td>#{{ p.id }}</td>
|
||||||
|
<td class="lg-ts">{{ p.created_at.strftime('%Y-%m-%d %H:%M:%S') }}</td>
|
||||||
|
<td class="lg-case"><a href="/cases/{{ p.case_id }}">{{ p.case_id }}</a></td>
|
||||||
|
<td class="lg-dest">{{ p.destination_name }}</td>
|
||||||
|
<td>{{ p.payload_kind }}</td>
|
||||||
|
<td><span class="tlp-badge tlp-{{ p.tlp.value }}">{{ p.tlp.value }}</span></td>
|
||||||
|
<td class="lg-hash">{{ p.payload_hash[:12] }}…</td>
|
||||||
|
<td><span class="outcome-badge outcome-{{ 'submitted' if p.status.value == 'approved' else ('rejected' if p.status.value == 'rejected' else 'pending_approval') }}">{{ p.status.value }}</span></td>
|
||||||
|
<td>
|
||||||
|
{% if p.status.value == 'pending' %}
|
||||||
|
<form method="post" action="/queue/approve/{{ p.id }}" class="queue-action">
|
||||||
|
<button type="submit" class="btn btn-approve">approve</button>
|
||||||
|
</form>
|
||||||
|
<form method="post" action="/queue/reject/{{ p.id }}" class="queue-action">
|
||||||
|
<input type="text" name="reason" placeholder="reason (optional)" class="reject-reason">
|
||||||
|
<button type="submit" class="btn btn-reject">reject</button>
|
||||||
|
</form>
|
||||||
|
{% else %}
|
||||||
|
<span class="lg-sub">{{ p.reviewer or '—' }}{% if p.reason %} · {{ p.reason }}{% endif %}</span>
|
||||||
|
{% endif %}
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
{% endif %}
|
||||||
|
</section>
|
||||||
|
{% endblock %}
|
||||||
@@ -64,6 +64,24 @@ ledger = Table(
|
|||||||
Index("ledger_case_idx", ledger.c.case_id)
|
Index("ledger_case_idx", ledger.c.case_id)
|
||||||
Index("ledger_time_idx", ledger.c.timestamp.desc())
|
Index("ledger_time_idx", ledger.c.timestamp.desc())
|
||||||
|
|
||||||
|
pending = Table(
|
||||||
|
"pending_submissions", _metadata,
|
||||||
|
Column("id", Integer, primary_key=True, autoincrement=True),
|
||||||
|
Column("case_id", String, nullable=False),
|
||||||
|
Column("destination_name", String, nullable=False),
|
||||||
|
Column("payload_kind", String, nullable=False),
|
||||||
|
Column("payload_hash", String, nullable=False),
|
||||||
|
Column("payload_json", Text, nullable=False),
|
||||||
|
Column("tlp", String, nullable=False),
|
||||||
|
Column("created_at", String, nullable=False),
|
||||||
|
Column("status", String, nullable=False),
|
||||||
|
Column("reviewer", String, nullable=True),
|
||||||
|
Column("reviewed_at", String, nullable=True),
|
||||||
|
Column("reason", String, nullable=True),
|
||||||
|
)
|
||||||
|
Index("pending_status_idx", pending.c.status)
|
||||||
|
Index("pending_case_idx", pending.c.case_id)
|
||||||
|
|
||||||
|
|
||||||
_log = log.get(__name__)
|
_log = log.get(__name__)
|
||||||
_engine: Optional[Engine] = None
|
_engine: Optional[Engine] = None
|
||||||
|
|||||||
@@ -1,18 +1,21 @@
|
|||||||
"""Courier — payload building + HTTP submission to destination endpoints."""
|
"""Courier — payload building + HTTP submission, with optional approval queue."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
|
from sqlalchemy import select, update
|
||||||
|
|
||||||
from psyc import log
|
from psyc import db, log
|
||||||
from psyc.lines import ledger as ledger_line
|
from psyc.lines import ledger as ledger_line
|
||||||
from psyc.lines.route import BlockedRoute, Route, endpoint_for
|
from psyc.lines.route import BlockedRoute, Route, endpoint_for
|
||||||
from psyc.models import Case, Outcome, SealedPackage
|
from psyc.models import ApprovalStatus, Case, Outcome, PendingSubmission, SealedPackage, TLP
|
||||||
from psyc.result import Err, Ok, Result
|
from psyc.result import Err, Ok, Result
|
||||||
|
|
||||||
|
|
||||||
@@ -122,8 +125,32 @@ def execute_blocked_routes(case: Case, blocked: List[BlockedRoute]) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _force_approval() -> bool:
|
||||||
|
return os.environ.get("PSYC_REQUIRE_APPROVAL", "").lower() in ("1", "true", "yes")
|
||||||
|
|
||||||
|
|
||||||
|
def _enqueue_pending(case: Case, route: Route, payload: Dict[str, Any], payload_hash: str) -> int:
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
stmt = db.pending.insert().values(
|
||||||
|
case_id=case.case_id,
|
||||||
|
destination_name=route.destination_name,
|
||||||
|
payload_kind=route.payload_kind,
|
||||||
|
payload_hash=payload_hash,
|
||||||
|
payload_json=json.dumps(payload, sort_keys=True),
|
||||||
|
tlp=case.classification.tlp.value,
|
||||||
|
created_at=now,
|
||||||
|
status=ApprovalStatus.PENDING.value,
|
||||||
|
)
|
||||||
|
with db.engine().begin() as conn:
|
||||||
|
res = conn.execute(stmt)
|
||||||
|
pid = int(res.inserted_primary_key[0])
|
||||||
|
_log.info("courier.queued", case_id=case.case_id, destination=route.destination_name, pending_id=pid)
|
||||||
|
return pid
|
||||||
|
|
||||||
|
|
||||||
def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedPackage] = None) -> List[SubmitResult]:
|
def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedPackage] = None) -> List[SubmitResult]:
|
||||||
results: List[SubmitResult] = []
|
results: List[SubmitResult] = []
|
||||||
|
force = _force_approval()
|
||||||
for r in routes:
|
for r in routes:
|
||||||
endpoint = endpoint_for(r.destination_name)
|
endpoint = endpoint_for(r.destination_name)
|
||||||
if endpoint is None:
|
if endpoint is None:
|
||||||
@@ -140,6 +167,14 @@ def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedP
|
|||||||
continue
|
continue
|
||||||
payload = build_payload(case, r.payload_kind, sealed_pkg)
|
payload = build_payload(case, r.payload_kind, sealed_pkg)
|
||||||
payload_hash = _hash_payload(payload)
|
payload_hash = _hash_payload(payload)
|
||||||
|
if r.requires_approval or force:
|
||||||
|
pid = _enqueue_pending(case, r, payload, payload_hash)
|
||||||
|
results.append(SubmitResult(
|
||||||
|
destination_name=r.destination_name,
|
||||||
|
outcome=Outcome.PENDING_APPROVAL,
|
||||||
|
detail=f"pending_id={pid}",
|
||||||
|
))
|
||||||
|
continue
|
||||||
result = submit(endpoint, payload)
|
result = submit(endpoint, payload)
|
||||||
if isinstance(result, Err):
|
if isinstance(result, Err):
|
||||||
ledger_line.write(
|
ledger_line.write(
|
||||||
@@ -166,3 +201,129 @@ def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedP
|
|||||||
)
|
)
|
||||||
results.append(SubmitResult(destination_name=r.destination_name, outcome=outcome, receipt_id=receipt.receipt_id))
|
results.append(SubmitResult(destination_name=r.destination_name, outcome=outcome, receipt_id=receipt.receipt_id))
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def _row_to_pending(row: Any) -> PendingSubmission:
|
||||||
|
return PendingSubmission(
|
||||||
|
id=row.id,
|
||||||
|
case_id=row.case_id,
|
||||||
|
destination_name=row.destination_name,
|
||||||
|
payload_kind=row.payload_kind,
|
||||||
|
payload_hash=row.payload_hash,
|
||||||
|
payload_json=row.payload_json,
|
||||||
|
tlp=TLP(row.tlp),
|
||||||
|
created_at=datetime.fromisoformat(row.created_at),
|
||||||
|
status=ApprovalStatus(row.status),
|
||||||
|
reviewer=row.reviewer,
|
||||||
|
reviewed_at=datetime.fromisoformat(row.reviewed_at) if row.reviewed_at else None,
|
||||||
|
reason=row.reason,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def list_pending(status: Optional[ApprovalStatus] = ApprovalStatus.PENDING, limit: int = 200) -> List[PendingSubmission]:
|
||||||
|
stmt = select(db.pending)
|
||||||
|
if status is not None:
|
||||||
|
stmt = stmt.where(db.pending.c.status == status.value)
|
||||||
|
stmt = stmt.order_by(db.pending.c.created_at.desc()).limit(limit)
|
||||||
|
with db.engine().connect() as conn:
|
||||||
|
rows = conn.execute(stmt).fetchall()
|
||||||
|
return [_row_to_pending(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
|
def get_pending(pid: int) -> Result[PendingSubmission, str]:
|
||||||
|
stmt = select(db.pending).where(db.pending.c.id == pid)
|
||||||
|
with db.engine().connect() as conn:
|
||||||
|
row = conn.execute(stmt).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return Err(f"pending submission not found: {pid}")
|
||||||
|
return Ok(_row_to_pending(row))
|
||||||
|
|
||||||
|
|
||||||
|
def pending_count(status: ApprovalStatus = ApprovalStatus.PENDING) -> int:
|
||||||
|
from sqlalchemy import func as sa_func
|
||||||
|
stmt = select(sa_func.count()).select_from(db.pending).where(db.pending.c.status == status.value)
|
||||||
|
with db.engine().connect() as conn:
|
||||||
|
return int(conn.execute(stmt).scalar_one())
|
||||||
|
|
||||||
|
|
||||||
|
def dispatch_pending(pid: int, reviewer: str = "operator") -> Result[SubmitResult, str]:
|
||||||
|
"""Approve and submit a pending entry — POST to destination, write ledger, mark approved."""
|
||||||
|
pending_r = get_pending(pid)
|
||||||
|
if isinstance(pending_r, Err):
|
||||||
|
return Err(pending_r.reason)
|
||||||
|
p = pending_r.value
|
||||||
|
if p.status != ApprovalStatus.PENDING:
|
||||||
|
return Err(f"pending submission {pid} is already {p.status.value}")
|
||||||
|
endpoint = endpoint_for(p.destination_name)
|
||||||
|
if endpoint is None:
|
||||||
|
return Err(f"no endpoint configured for {p.destination_name}")
|
||||||
|
payload = json.loads(p.payload_json)
|
||||||
|
result = submit(endpoint, payload)
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
if isinstance(result, Err):
|
||||||
|
ledger_line.write(
|
||||||
|
case_id=p.case_id,
|
||||||
|
destination=p.destination_name,
|
||||||
|
payload_hash=p.payload_hash,
|
||||||
|
submitter_identity=SUBMITTER_IDENTITY,
|
||||||
|
tlp=p.tlp,
|
||||||
|
outcome=Outcome.FAILED,
|
||||||
|
detail=result.reason,
|
||||||
|
)
|
||||||
|
with db.engine().begin() as conn:
|
||||||
|
conn.execute(update(db.pending).where(db.pending.c.id == pid).values(
|
||||||
|
status=ApprovalStatus.APPROVED.value,
|
||||||
|
reviewer=reviewer,
|
||||||
|
reviewed_at=now,
|
||||||
|
reason=f"submit failed: {result.reason}",
|
||||||
|
))
|
||||||
|
return Ok(SubmitResult(destination_name=p.destination_name, outcome=Outcome.FAILED, detail=result.reason))
|
||||||
|
receipt = result.value
|
||||||
|
outcome = _STATUS_TO_OUTCOME.get(receipt.status, Outcome.SUBMITTED)
|
||||||
|
ledger_line.write(
|
||||||
|
case_id=p.case_id,
|
||||||
|
destination=p.destination_name,
|
||||||
|
payload_hash=p.payload_hash,
|
||||||
|
submitter_identity=SUBMITTER_IDENTITY,
|
||||||
|
tlp=p.tlp,
|
||||||
|
outcome=outcome,
|
||||||
|
response_id=receipt.receipt_id,
|
||||||
|
detail=f"approved_by={reviewer}",
|
||||||
|
)
|
||||||
|
with db.engine().begin() as conn:
|
||||||
|
conn.execute(update(db.pending).where(db.pending.c.id == pid).values(
|
||||||
|
status=ApprovalStatus.APPROVED.value,
|
||||||
|
reviewer=reviewer,
|
||||||
|
reviewed_at=now,
|
||||||
|
))
|
||||||
|
_log.info("courier.approved", pending_id=pid, reviewer=reviewer, outcome=outcome.value)
|
||||||
|
return Ok(SubmitResult(destination_name=p.destination_name, outcome=outcome, receipt_id=receipt.receipt_id))
|
||||||
|
|
||||||
|
|
||||||
|
def reject_pending(pid: int, reviewer: str = "operator", reason: str = "") -> Result[None, str]:
|
||||||
|
"""Reject a pending entry — write ledger reject row, mark rejected. Nothing leaves."""
|
||||||
|
pending_r = get_pending(pid)
|
||||||
|
if isinstance(pending_r, Err):
|
||||||
|
return Err(pending_r.reason)
|
||||||
|
p = pending_r.value
|
||||||
|
if p.status != ApprovalStatus.PENDING:
|
||||||
|
return Err(f"pending submission {pid} is already {p.status.value}")
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
ledger_line.write(
|
||||||
|
case_id=p.case_id,
|
||||||
|
destination=p.destination_name,
|
||||||
|
payload_hash=p.payload_hash,
|
||||||
|
submitter_identity=SUBMITTER_IDENTITY,
|
||||||
|
tlp=p.tlp,
|
||||||
|
outcome=Outcome.REJECTED,
|
||||||
|
detail=f"rejected_by={reviewer}: {reason}" if reason else f"rejected_by={reviewer}",
|
||||||
|
)
|
||||||
|
with db.engine().begin() as conn:
|
||||||
|
conn.execute(update(db.pending).where(db.pending.c.id == pid).values(
|
||||||
|
status=ApprovalStatus.REJECTED.value,
|
||||||
|
reviewer=reviewer,
|
||||||
|
reviewed_at=now,
|
||||||
|
reason=reason or None,
|
||||||
|
))
|
||||||
|
_log.info("courier.rejected", pending_id=pid, reviewer=reviewer)
|
||||||
|
return Ok(None)
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ class Destination(BaseModel):
|
|||||||
priority: int
|
priority: int
|
||||||
payload_kind: str
|
payload_kind: str
|
||||||
countries: List[str] = Field(default_factory=list)
|
countries: List[str] = Field(default_factory=list)
|
||||||
|
requires_approval: bool = False
|
||||||
|
|
||||||
|
|
||||||
class Route(BaseModel):
|
class Route(BaseModel):
|
||||||
@@ -45,6 +46,7 @@ class Route(BaseModel):
|
|||||||
priority: int
|
priority: int
|
||||||
payload_kind: str
|
payload_kind: str
|
||||||
max_tlp_allowed: TLP
|
max_tlp_allowed: TLP
|
||||||
|
requires_approval: bool = False
|
||||||
|
|
||||||
|
|
||||||
class BlockedRoute(BaseModel):
|
class BlockedRoute(BaseModel):
|
||||||
@@ -61,6 +63,7 @@ DESTINATIONS: List[Destination] = [
|
|||||||
priority=1,
|
priority=1,
|
||||||
payload_kind="sealed_evidence_package",
|
payload_kind="sealed_evidence_package",
|
||||||
countries=["DE"],
|
countries=["DE"],
|
||||||
|
requires_approval=True,
|
||||||
),
|
),
|
||||||
Destination(
|
Destination(
|
||||||
name="MISP-Community",
|
name="MISP-Community",
|
||||||
@@ -111,6 +114,7 @@ def plan(case: Case) -> Tuple[List[Route], List[BlockedRoute]]:
|
|||||||
priority=d.priority,
|
priority=d.priority,
|
||||||
payload_kind=d.payload_kind,
|
payload_kind=d.payload_kind,
|
||||||
max_tlp_allowed=d.max_tlp,
|
max_tlp_allowed=d.max_tlp,
|
||||||
|
requires_approval=d.requires_approval,
|
||||||
))
|
))
|
||||||
routes.sort(key=lambda r: r.priority)
|
routes.sort(key=lambda r: r.priority)
|
||||||
_log.info("route.planned", case_id=case.case_id, allowed=len(routes), blocked=len(blocked))
|
_log.info("route.planned", case_id=case.case_id, allowed=len(routes), blocked=len(blocked))
|
||||||
|
|||||||
@@ -133,6 +133,7 @@ class Outcome(str, Enum):
|
|||||||
REJECTED = "rejected"
|
REJECTED = "rejected"
|
||||||
ACTIONED = "actioned"
|
ACTIONED = "actioned"
|
||||||
FAILED = "failed"
|
FAILED = "failed"
|
||||||
|
PENDING_APPROVAL = "pending_approval"
|
||||||
|
|
||||||
|
|
||||||
class LedgerEntry(BaseModel):
|
class LedgerEntry(BaseModel):
|
||||||
@@ -146,3 +147,24 @@ class LedgerEntry(BaseModel):
|
|||||||
response_id: Optional[str] = None
|
response_id: Optional[str] = None
|
||||||
outcome: Outcome
|
outcome: Outcome
|
||||||
detail: Optional[str] = None
|
detail: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class ApprovalStatus(str, Enum):
|
||||||
|
PENDING = "pending"
|
||||||
|
APPROVED = "approved"
|
||||||
|
REJECTED = "rejected"
|
||||||
|
|
||||||
|
|
||||||
|
class PendingSubmission(BaseModel):
|
||||||
|
id: Optional[int] = None
|
||||||
|
case_id: str
|
||||||
|
destination_name: str
|
||||||
|
payload_kind: str
|
||||||
|
payload_hash: str
|
||||||
|
payload_json: str # frozen payload — what will be sent on approval
|
||||||
|
tlp: TLP
|
||||||
|
created_at: datetime
|
||||||
|
status: ApprovalStatus = ApprovalStatus.PENDING
|
||||||
|
reviewer: Optional[str] = None
|
||||||
|
reviewed_at: Optional[datetime] = None
|
||||||
|
reason: Optional[str] = None
|
||||||
|
|||||||
130
tests/test_courier.py
Normal file
130
tests/test_courier.py
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
"""Courier approval-queue tests — gating, dispatch, rejection."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import create_engine, select
|
||||||
|
|
||||||
|
from psyc import db
|
||||||
|
from psyc.lines import courier, ledger as ledger_line
|
||||||
|
from psyc.lines.route import Route
|
||||||
|
from psyc.models import ApprovalStatus, Outcome, TLP
|
||||||
|
from psyc.result import Ok
|
||||||
|
from conftest import make_case
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fresh_db(tmp_path, monkeypatch):
|
||||||
|
test_db = tmp_path / "test.db"
|
||||||
|
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
|
||||||
|
db._metadata.create_all(test_engine, checkfirst=True)
|
||||||
|
monkeypatch.setattr(db, "_engine", test_engine)
|
||||||
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||||
|
yield test_db
|
||||||
|
|
||||||
|
|
||||||
|
def _malware_url_route(requires_approval: bool) -> Route:
|
||||||
|
return Route(
|
||||||
|
destination_name="URLhaus",
|
||||||
|
priority=3,
|
||||||
|
payload_kind="malware_url_report",
|
||||||
|
max_tlp_allowed=TLP.GREEN,
|
||||||
|
requires_approval=requires_approval,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_execute_routes_enqueues_when_approval_required(fresh_db, monkeypatch):
|
||||||
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
||||||
|
case.classification.tlp = TLP.GREEN
|
||||||
|
|
||||||
|
# No HTTP should happen — submit must NOT be called on the approval branch.
|
||||||
|
def boom(*a, **kw):
|
||||||
|
raise AssertionError("submit() must not be called when approval is required")
|
||||||
|
monkeypatch.setattr(courier, "submit", boom)
|
||||||
|
|
||||||
|
results = courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
||||||
|
|
||||||
|
assert len(results) == 1
|
||||||
|
assert results[0].outcome is Outcome.PENDING_APPROVAL
|
||||||
|
pending = courier.list_pending()
|
||||||
|
assert len(pending) == 1
|
||||||
|
assert pending[0].destination_name == "URLhaus"
|
||||||
|
assert pending[0].status is ApprovalStatus.PENDING
|
||||||
|
assert pending[0].payload_hash # frozen hash present
|
||||||
|
assert pending[0].payload_json # frozen payload present
|
||||||
|
|
||||||
|
|
||||||
|
def test_execute_routes_force_approval_via_env(fresh_db, monkeypatch):
|
||||||
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
||||||
|
case.classification.tlp = TLP.GREEN
|
||||||
|
monkeypatch.setenv("PSYC_REQUIRE_APPROVAL", "1")
|
||||||
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("must not submit")))
|
||||||
|
|
||||||
|
results = courier.execute_routes(case, [_malware_url_route(requires_approval=False)])
|
||||||
|
|
||||||
|
assert results[0].outcome is Outcome.PENDING_APPROVAL
|
||||||
|
assert courier.pending_count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_pending_submits_and_marks_approved(fresh_db, monkeypatch):
|
||||||
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
||||||
|
case.classification.tlp = TLP.GREEN
|
||||||
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
|
||||||
|
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
||||||
|
pid = courier.list_pending()[0].id
|
||||||
|
|
||||||
|
# Now approve — submit IS called, returns a receipt.
|
||||||
|
receipt = courier.Receipt(receipt_id="r-001", destination="urlhaus", status="acknowledged", response_body={})
|
||||||
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt))
|
||||||
|
|
||||||
|
result = courier.dispatch_pending(pid, reviewer="alice")
|
||||||
|
|
||||||
|
assert isinstance(result, Ok)
|
||||||
|
assert result.value.outcome is Outcome.ACKNOWLEDGED
|
||||||
|
assert result.value.receipt_id == "r-001"
|
||||||
|
# Pending row now marked approved.
|
||||||
|
refreshed = courier.get_pending(pid).value
|
||||||
|
assert refreshed.status is ApprovalStatus.APPROVED
|
||||||
|
assert refreshed.reviewer == "alice"
|
||||||
|
# Ledger has a corresponding row.
|
||||||
|
entries = ledger_line.list_by_case(case.case_id, limit=10)
|
||||||
|
assert any(e.outcome is Outcome.ACKNOWLEDGED and e.destination == "URLhaus" for e in entries)
|
||||||
|
|
||||||
|
|
||||||
|
def test_reject_pending_writes_rejection_to_ledger(fresh_db, monkeypatch):
|
||||||
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
||||||
|
case.classification.tlp = TLP.GREEN
|
||||||
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
|
||||||
|
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
||||||
|
pid = courier.list_pending()[0].id
|
||||||
|
|
||||||
|
# Reject — submit must NOT be called.
|
||||||
|
def boom(*a, **kw):
|
||||||
|
raise AssertionError("rejected submissions must not POST")
|
||||||
|
monkeypatch.setattr(courier, "submit", boom)
|
||||||
|
|
||||||
|
result = courier.reject_pending(pid, reviewer="bob", reason="payload looks wrong")
|
||||||
|
|
||||||
|
assert isinstance(result, Ok)
|
||||||
|
refreshed = courier.get_pending(pid).value
|
||||||
|
assert refreshed.status is ApprovalStatus.REJECTED
|
||||||
|
assert refreshed.reviewer == "bob"
|
||||||
|
assert refreshed.reason == "payload looks wrong"
|
||||||
|
entries = ledger_line.list_by_case(case.case_id, limit=10)
|
||||||
|
assert any(e.outcome is Outcome.REJECTED and "rejected_by=bob" in (e.detail or "") for e in entries)
|
||||||
|
|
||||||
|
|
||||||
|
def test_double_approve_is_rejected(fresh_db, monkeypatch):
|
||||||
|
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
|
||||||
|
case.classification.tlp = TLP.GREEN
|
||||||
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
|
||||||
|
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
|
||||||
|
pid = courier.list_pending()[0].id
|
||||||
|
|
||||||
|
receipt = courier.Receipt(receipt_id="r-002", destination="urlhaus", status="acknowledged", response_body={})
|
||||||
|
monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt))
|
||||||
|
courier.dispatch_pending(pid, reviewer="alice")
|
||||||
|
|
||||||
|
# Second approval of the same id must fail — no double-submission.
|
||||||
|
again = courier.dispatch_pending(pid, reviewer="alice")
|
||||||
|
assert not isinstance(again, Ok)
|
||||||
Reference in New Issue
Block a user