stage-2: full pipeline — Classifyline → Sealine → Routeline → Courier → Ledger + mock CERT

Adds the end-to-end demo chain. PyNaCl sealed boxes implement the dossier's Model A
authority public-key encryption; SQLAlchemy ledger records every submission and every
policy-blocked route. Cockpit gains /ledger and an enriched case detail (sealed-package
card, routes panel, per-case audit). Mock CERT FastAPI app on :8770 stands in for the
real authority endpoints. `psyc demo` runs the whole chain on a fresh URLhaus row.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-14 13:44:43 +02:00
parent e04c6c96d8
commit 3f18e5aa8e
18 changed files with 1253 additions and 35 deletions

5
.gitignore vendored
View File

@@ -8,10 +8,7 @@ dist/
*.pyc
# data / runtime
data/*.db
data/*.db-journal
data/sealed/
data/keys/
data/
# env
.env

View File

@@ -1 +1,9 @@
"""psyc — defensive CTI routing & evidence-sealing platform."""
from pathlib import Path
__version__ = "0.1.0"
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
DATA_DIR = PROJECT_ROOT / "data"

View File

@@ -2,11 +2,15 @@
from __future__ import annotations
from typing import List
import typer
import uvicorn
from psyc import db, log
from psyc.lines import scout
from psyc.lines import classify, courier, route, scout, seal
from psyc.models import Outcome
from psyc.result import Err, Ok
app = typer.Typer(add_completion=False, help="psyc — defensive CTI routing & sealing")
@@ -35,6 +39,201 @@ def fetch_urlhaus(limit: int = typer.Option(50, help="max rows to ingest from th
typer.echo(f"ingested {len(cases)} case(s). total now: {db.case_count()}")
@app.command("classify-case")
def classify_case(case_id: str) -> None:
case_result = db.get_case(case_id)
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
case = classify.classify(case_result.value)
db.upsert_case(case)
typer.echo(
f"classified {case.case_id}: "
f"{case.classification.incident_type.value if case.classification.incident_type else ''} / "
f"{case.classification.severity.value if case.classification.severity else ''} / "
f"TLP:{case.classification.tlp.value} / "
f"class:{case.classification.internal_class.value if case.classification.internal_class else ''}"
)
@app.command("classify-all")
def classify_all() -> None:
cases = db.list_cases(limit=10_000)
for c in cases:
classify.classify(c)
db.upsert_case(c)
typer.echo(f"classified {len(cases)} case(s).")
@app.command("seal-keys-gen")
def seal_keys_gen(recipient: str) -> None:
path = seal.generate_recipient_keys(recipient)
typer.echo(f"keys generated for {recipient}: {path}")
@app.command("seal-keys-list")
def seal_keys_list() -> None:
recipients = seal.list_recipients()
if not recipients:
typer.echo("(no recipients)")
return
for r in recipients:
typer.echo(r)
@app.command("seal-pack")
def seal_pack(case_id: str, recipient: List[str] = typer.Option(..., "--recipient", "-r", help="recipient name (repeatable)")) -> None:
case_result = db.get_case(case_id)
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
case = case_result.value
plaintext = case.model_dump_json().encode("utf-8")
metadata = dict(
case_id=case.case_id,
tlp=case.classification.tlp.value,
severity=case.classification.severity.value if case.classification.severity else "",
incident_type=case.classification.incident_type.value if case.classification.incident_type else "",
)
pkg_result = seal.seal(plaintext, recipient, metadata=metadata)
if isinstance(pkg_result, Err):
typer.echo(f"error: {pkg_result.reason}", err=True)
raise typer.Exit(1)
pkg = pkg_result.value
case.evidence.sealed_package_id = pkg.package_id
case.evidence.payload_hash = pkg.plaintext_hash
db.upsert_case(case)
typer.echo(f"sealed: {pkg.package_id}{', '.join(recipient)}")
@app.command("seal-list")
def seal_list() -> None:
pkgs = seal.list_packages()
if not pkgs:
typer.echo("(no sealed packages)")
return
for p in pkgs:
typer.echo(p)
@app.command("seal-show")
def seal_show(package_id: str) -> None:
result = seal.load_package(package_id)
if isinstance(result, Err):
typer.echo(f"error: {result.reason}", err=True)
raise typer.Exit(1)
typer.echo(result.value.model_dump_json(indent=2))
@app.command("seal-unseal")
def seal_unseal(package_id: str, recipient: str) -> None:
result = seal.unseal(package_id, recipient)
if isinstance(result, Err):
typer.echo(f"error: {result.reason}", err=True)
raise typer.Exit(1)
typer.echo(result.value.decode("utf-8"))
@app.command("route-plan")
def route_plan(case_id: str) -> None:
case_result = db.get_case(case_id)
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
case = case_result.value
routes, blocked = route.plan(case)
typer.echo(f"case {case_id}{len(routes)} route(s), {len(blocked)} blocked")
for r in routes:
typer.echo(f"{r.destination_name} (priority={r.priority}, payload={r.payload_kind}, max_tlp={r.max_tlp_allowed.value})")
for b in blocked:
typer.echo(f"{b.destination_name}: {b.reason}")
@app.command("submit")
def submit_case(case_id: str) -> None:
case_result = db.get_case(case_id)
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
case = case_result.value
routes, blocked = route.plan(case)
sealed_pkg = None
if case.evidence.sealed_package_id:
pkg_result = seal.load_package(case.evidence.sealed_package_id)
if isinstance(pkg_result, Ok):
sealed_pkg = pkg_result.value
typer.echo(f"submitting {case_id}{len(routes)} route(s), {len(blocked)} blocked")
courier.execute_blocked_routes(case, blocked)
results = courier.execute_routes(case, routes, sealed_pkg=sealed_pkg)
for r in results:
marker = "" if r.outcome != Outcome.FAILED else ""
rcpt = f"{r.receipt_id}" if r.receipt_id else ""
detail = f" ({r.detail})" if r.detail else ""
typer.echo(f" {marker} {r.destination_name}: {r.outcome.value}{rcpt}{detail}")
for b in blocked:
typer.echo(f"{b.destination_name}: {b.reason} (logged)")
@app.command("mock-cert")
def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
uvicorn.run("psyc.mock_cert:app", host=host, port=port)
@app.command("demo")
def demo() -> None:
db.init_db()
typer.echo("== psyc demo: scout → classify → seal → route → submit → ledger ==")
for recipient in ("CERT-Bund", "MISP-Community"):
if recipient not in seal.list_recipients():
seal.generate_recipient_keys(recipient)
typer.echo(f" + generated demo keys for {recipient}")
typer.echo("fetching one URLhaus row…")
cases = scout.fetch_and_signal(limit=1)
if not cases:
typer.echo("no cases ingested; URLhaus may be empty or unreachable", err=True)
raise typer.Exit(1)
case = cases[0]
db.upsert_case(case)
typer.echo(f" + ingested {case.case_id}")
case = classify.classify(case)
db.upsert_case(case)
typer.echo(
f" + classified: {case.classification.incident_type.value if case.classification.incident_type else ''} / "
f"{case.classification.severity.value if case.classification.severity else ''} / "
f"TLP:{case.classification.tlp.value}"
)
plaintext = case.model_dump_json().encode("utf-8")
metadata = dict(
case_id=case.case_id,
tlp=case.classification.tlp.value,
severity=case.classification.severity.value if case.classification.severity else "",
incident_type=case.classification.incident_type.value if case.classification.incident_type else "",
)
pkg_result = seal.seal(plaintext, ["CERT-Bund", "MISP-Community"], metadata=metadata)
if isinstance(pkg_result, Err):
typer.echo(f"seal failed: {pkg_result.reason}", err=True)
raise typer.Exit(1)
pkg = pkg_result.value
case.evidence.sealed_package_id = pkg.package_id
case.evidence.payload_hash = pkg.plaintext_hash
db.upsert_case(case)
typer.echo(f" + sealed: {pkg.package_id}")
routes, blocked = route.plan(case)
courier.execute_blocked_routes(case, blocked)
results = courier.execute_routes(case, routes, sealed_pkg=pkg)
typer.echo(f" + routed: {len(routes)} allowed, {len(blocked)} blocked")
for r in results:
marker = "" if r.outcome != Outcome.FAILED else ""
rcpt = f"{r.receipt_id}" if r.receipt_id else ""
detail = f" ({r.detail})" if r.detail else ""
typer.echo(f" {marker} {r.destination_name}: {r.outcome.value}{rcpt}{detail}")
for b in blocked:
typer.echo(f"{b.destination_name}: {b.reason}")
typer.echo("")
typer.echo(f"inspect: http://127.0.0.1:8767/cases/{case.case_id}")
typer.echo(f"ledger: http://127.0.0.1:8767/ledger")
@app.command("serve")
def serve(host: str = "127.0.0.1", port: int = 8000, reload: bool = False) -> None:
db.init_db()

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from pathlib import Path
from typing import List
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
@@ -10,6 +11,10 @@ from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from psyc import db, log
from psyc.lines import ledger as ledger_line
from psyc.lines import route as route_line
from psyc.lines import seal as seal_line
from psyc.lines.route import BlockedRoute, Route
from psyc.result import Err
@@ -41,7 +46,33 @@ def case_detail(request: Request, case_id: str) -> HTMLResponse:
if isinstance(result, Err):
_log.info("cockpit.case.miss", case_id=case_id)
raise HTTPException(status_code=404, detail=result.reason)
return TEMPLATES.TemplateResponse(request, "case_detail.html", {"case": result.value})
case = result.value
routes: List[Route] = []
blocked: List[BlockedRoute] = []
if case.classification.incident_type is not None:
routes, blocked = route_line.plan(case)
sealed_pkg = None
if case.evidence.sealed_package_id:
pkg_result = seal_line.load_package(case.evidence.sealed_package_id)
if not isinstance(pkg_result, Err):
sealed_pkg = pkg_result.value
case_ledger = ledger_line.list_by_case(case_id, limit=50)
return TEMPLATES.TemplateResponse(
request,
"case_detail.html",
{"case": case, "routes": routes, "blocked": blocked, "sealed_pkg": sealed_pkg, "ledger": case_ledger},
)
@app.get("/ledger", response_class=HTMLResponse)
def ledger_view(request: Request) -> HTMLResponse:
entries = ledger_line.list_recent(limit=200)
total = ledger_line.count()
return TEMPLATES.TemplateResponse(request, "ledger.html", {"entries": entries, "total": total})
@app.get("/healthz")

View File

@@ -0,0 +1,234 @@
/* psyc-tokens.css
* Augments cockpit.css. Drop in after it:
* <link rel="stylesheet" href="cockpit.css">
* <link rel="stylesheet" href="psyc-tokens.css">
*
* Dark theme only. Monospace stack inherited from cockpit.css.
* All text-bearing tokens below hit WCAG AA (>=4.5:1) on --panel (#161a22).
*/
:root {
/* ---- Navy scale (anchored on cockpit bg/panel/border) ---- */
--navy-50: #d8dee9; /* body text (12.9:1) */
--navy-100: #aeb6c4; /* secondary text (7.7:1) */
--navy-200: #7d8597; /* muted (4.6:1) */
--navy-300: #5a6478; /* hairline emphasis */
--navy-400: #3a4458; /* strong border */
--navy-500: #262d3a; /* default border */
--navy-600: #1c2230; /* raised surface (panel-2) */
--navy-700: #161a22; /* panel surface */
--navy-800: #0f1115; /* app bg */
/* ---- Cyan scale (anchored on --accent #1ec8ff) ---- */
--cyan-50: #e0f7ff; /* highest-contrast text on panel */
--cyan-100: #b8eaff;
--cyan-200: #7dd6ff;
--cyan-300: #1ec8ff; /* accent */
--cyan-400: #0ba6db;
--cyan-500: #0982ad;
--cyan-600: #0a5f7d; /* filled-badge bg */
--cyan-700: #07394d; /* deepest */
/* ---- TLP (text-on-panel; AA verified) ---- */
--tlp-red: #ff5d5d; /* 5.3:1 */
--tlp-amber: #fbbf24; /* 11.0:1 */
--tlp-green: #4ade80; /* 11.4:1 */
--tlp-clear: #cbd5e1; /* 11.5:1 */
/* paired filled-badge tokens */
--tlp-red-bg: #7f1d1d; --tlp-red-fg: #fee2e2;
--tlp-amber-bg: #78350f; --tlp-amber-fg: #fef3c7;
--tlp-green-bg: #14532d; --tlp-green-fg: #d1fae5;
--tlp-clear-bg: #1c2230; --tlp-clear-fg: #cbd5e1;
/* ---- Severity (text-on-panel; AA verified) ---- */
--sev-low: #94a3b8; /* 6.6:1 */
--sev-medium: #fde047; /* 13.4:1 */
--sev-high: #fb923c; /* 7.8:1 */
--sev-critical: #ff5d5d; /* 5.3:1, paired with glow */
/* ---- Outcome (text-on-panel; AA verified) ---- */
--out-submitted: #7dd6ff; /* 9.0:1 cyan */
--out-acknowledged: #c4b5fd; /* 8.4:1 violet */
--out-rejected: #f87171; /* 6.0:1 red */
--out-actioned: #4ade80; /* 11.4:1 green */
/* ---- Spacing (4-step) ---- */
--sp-1: 4px;
--sp-2: 8px;
--sp-3: 16px;
--sp-4: 24px;
/* ---- Type (4-step; px + weight) ---- */
--fs-xs: 11px; --fw-xs: 600; /* badges, eyebrows */
--fs-sm: 12px; --fw-sm: 400; /* meta, table head */
--fs-md: 14px; --fw-md: 400; /* body (cockpit base) */
--fs-lg: 18px; --fw-lg: 600; /* panel titles */
}
/* ===== Badges ============================================================ */
/* Shared base — overrides cockpit's .sev-badge / .tlp-badge with tokens. */
.sev-badge,
.tlp-badge,
.outcome-badge {
display: inline-block;
padding: 2px var(--sp-2);
border-radius: 3px;
font-size: var(--fs-xs);
font-weight: var(--fw-xs);
letter-spacing: 0.6px;
text-transform: uppercase;
line-height: 1.6;
border: 1px solid transparent;
white-space: nowrap;
}
/* Severity — outlined, text-driven (works on panel or row hover) */
.sev-badge { background: var(--navy-600); color: var(--navy-100); border-color: var(--navy-500); border-radius: 10px; }
.sev-badge.sev-low, tr.sev-low .sev-badge { color: var(--sev-low); border-color: var(--navy-400); }
.sev-badge.sev-medium, tr.sev-medium .sev-badge { color: var(--sev-medium); border-color: color-mix(in oklab, var(--sev-medium) 40%, var(--navy-500)); }
.sev-badge.sev-high, tr.sev-high .sev-badge { color: var(--sev-high); border-color: color-mix(in oklab, var(--sev-high) 50%, var(--navy-500)); }
.sev-badge.sev-critical, tr.sev-critical .sev-badge {
color: var(--sev-critical);
border-color: var(--sev-critical);
box-shadow: 0 0 0 1px color-mix(in oklab, var(--sev-critical) 25%, transparent);
}
/* TLP — filled chips (FIRST.org-style emphasis) */
.tlp-badge.tlp-RED { background: var(--tlp-red-bg); color: var(--tlp-red-fg); }
.tlp-badge.tlp-AMBER { background: var(--tlp-amber-bg); color: var(--tlp-amber-fg); }
.tlp-badge.tlp-GREEN { background: var(--tlp-green-bg); color: var(--tlp-green-fg); }
.tlp-badge.tlp-CLEAR { background: var(--tlp-clear-bg); color: var(--tlp-clear-fg); border-color: var(--navy-500); }
/* Outcome — text-driven with a leading dot for scannability in dense rows */
.outcome-badge { background: var(--navy-600); border-color: var(--navy-500); border-radius: 10px; }
.outcome-badge::before {
content: "";
display: inline-block;
width: 6px; height: 6px; border-radius: 50%;
margin-right: 6px; vertical-align: 1px;
background: currentColor;
box-shadow: 0 0 6px currentColor;
}
.outcome-badge.outcome-submitted { color: var(--out-submitted); }
.outcome-badge.outcome-acknowledged { color: var(--out-acknowledged); }
.outcome-badge.outcome-rejected { color: var(--out-rejected); border-color: color-mix(in oklab, var(--out-rejected) 35%, var(--navy-500)); }
.outcome-badge.outcome-actioned { color: var(--out-actioned); border-color: color-mix(in oklab, var(--out-actioned) 35%, var(--navy-500)); }
/* ===== Routes ============================================================ */
/* Block-level row inside a case "Routes" panel. Markup:
<div class="route route-allowed"><span class="route-dest">…</span>
<span class="route-reason">…</span></div> */
.route-allowed,
.route-blocked {
display: grid;
grid-template-columns: 1fr auto;
align-items: baseline;
gap: var(--sp-3);
padding: var(--sp-2) var(--sp-3);
border: 1px solid var(--navy-500);
border-left-width: 3px;
border-radius: 4px;
background: var(--navy-600);
font-size: var(--fs-md);
margin-bottom: var(--sp-1);
}
.route-allowed { border-left-color: var(--out-actioned); }
.route-blocked { border-left-color: var(--out-rejected); }
.route-allowed .route-reason,
.route-blocked .route-reason {
font-size: var(--fs-sm);
color: var(--navy-200);
}
.route-allowed .route-dest::before { content: "→ "; color: var(--out-actioned); }
.route-blocked .route-dest::before { content: "⊘ "; color: var(--out-rejected); }
.route-blocked .route-dest { color: var(--navy-100); text-decoration: line-through; text-decoration-color: var(--navy-300); }
/* ===== Ledger row ======================================================= */
/* Tabular dense row for /ledger. Markup:
<tr class="ledger-row">
<td class="lg-ts">…</td><td class="lg-case">…</td><td class="lg-dest">…</td>
<td class="lg-hash">…</td><td class="lg-sub">…</td>
<td><span class="tlp-badge tlp-AMBER">AMBER</span></td>
<td><span class="outcome-badge outcome-actioned">ACTIONED</span></td>
</tr> */
.ledger-row > td {
padding: var(--sp-1) var(--sp-2);
border-bottom: 1px solid var(--navy-500);
font-size: var(--fs-sm);
vertical-align: baseline;
}
.ledger-row:hover > td { background: var(--navy-600); }
.ledger-row .lg-ts { color: var(--navy-200); white-space: nowrap; width: 1%; }
.ledger-row .lg-case { color: var(--cyan-200); white-space: nowrap; width: 1%; }
.ledger-row .lg-dest { color: var(--navy-50); white-space: nowrap; }
.ledger-row .lg-hash {
color: var(--navy-100);
font-variant-ligatures: none;
max-width: 22ch;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
direction: rtl; /* clip the head of the hash, keep the tail */
text-align: left;
}
.ledger-row .lg-sub { color: var(--navy-100); white-space: nowrap; }
.ledger-row.is-rejected > td { background: color-mix(in oklab, var(--out-rejected) 6%, transparent); }
.ledger-row.is-actioned > td { background: color-mix(in oklab, var(--out-actioned) 5%, transparent); }
/* ===== Sealed package card =============================================== */
/* Panel/card variant for case detail + /sealed/{id}. Markup:
<section class="sealed-package-card">
<header class="sp-head">
<h2>SEALED PACKAGE</h2>
<code class="sp-id">pkg_01H…</code>
</header>
<dl class="sp-meta">…</dl>
<ul class="sp-recipients">…</ul>
</section> */
.sealed-package-card {
position: relative;
background:
linear-gradient(180deg, color-mix(in oklab, var(--cyan-700) 35%, var(--navy-700)) 0, var(--navy-700) 64px),
var(--navy-700);
border: 1px solid var(--navy-500);
border-radius: 6px;
padding: var(--sp-3) var(--sp-4) var(--sp-4);
}
.sealed-package-card::before {
content: "";
position: absolute; inset: 0 0 auto 0; height: 2px;
background: linear-gradient(90deg, transparent, var(--cyan-300), transparent);
opacity: 0.7;
}
.sealed-package-card .sp-head {
display: flex; justify-content: space-between; align-items: baseline;
gap: var(--sp-3); margin-bottom: var(--sp-3);
}
.sealed-package-card .sp-head h2 {
margin: 0;
font-size: var(--fs-xs); font-weight: var(--fw-xs);
letter-spacing: 1.4px; text-transform: uppercase;
color: var(--cyan-200);
}
.sealed-package-card .sp-id {
font-size: var(--fs-sm); color: var(--navy-50);
background: var(--navy-600); border: 1px solid var(--navy-500);
padding: 2px var(--sp-2); border-radius: 3px;
}
.sealed-package-card .sp-meta {
display: grid; grid-template-columns: 160px 1fr; gap: var(--sp-1) var(--sp-3);
margin: 0 0 var(--sp-3); font-size: var(--fs-md);
}
.sealed-package-card .sp-meta dt { color: var(--navy-200); }
.sealed-package-card .sp-meta dd { margin: 0; color: var(--navy-50); }
.sealed-package-card .sp-meta .sp-hash { font-size: var(--fs-sm); color: var(--navy-100); word-break: break-all; }
.sealed-package-card .sp-recipients {
list-style: none; margin: 0; padding: 0;
display: flex; flex-wrap: wrap; gap: var(--sp-1) var(--sp-2);
}
.sealed-package-card .sp-recipients > li {
font-size: var(--fs-sm); color: var(--navy-50);
background: var(--navy-600); border: 1px solid var(--navy-500);
padding: 2px var(--sp-2); border-radius: 3px;
}
.sealed-package-card .sp-recipients > li .sp-size { color: var(--navy-200); margin-left: var(--sp-2); }

View File

@@ -5,6 +5,7 @@
<title>{% block title %}psyc cockpit{% endblock %}</title>
<link rel="icon" type="image/png" href="/static/psyc-logo.png">
<link rel="stylesheet" href="/static/cockpit.css">
<link rel="stylesheet" href="/static/psyc-tokens.css">
</head>
<body>
<header class="topbar">
@@ -14,6 +15,7 @@
</a>
<nav class="nav">
<a href="/cases">Cases</a>
<a href="/ledger">Ledger</a>
<a href="/healthz">Health</a>
</nav>
</header>

View File

@@ -10,7 +10,7 @@
<div class="card">
<h2>Classification</h2>
<dl>
<dt>Severity</dt><dd>{{ case.classification.severity.value if case.classification.severity else '—' }}</dd>
<dt>Severity</dt><dd>{% if case.classification.severity %}<span class="sev-badge sev-{{ case.classification.severity.value }}">{{ case.classification.severity.value }}</span>{% else %}—{% endif %}</dd>
<dt>TLP</dt><dd><span class="tlp-badge tlp-{{ case.classification.tlp.value }}">{{ case.classification.tlp.value }}</span></dd>
<dt>Incident type</dt><dd>{{ case.classification.incident_type.value if case.classification.incident_type else '—' }}</dd>
<dt>Internal class</dt><dd>{{ case.classification.internal_class.value if case.classification.internal_class else '—' }}</dd>
@@ -47,4 +47,77 @@
</div>
</div>
</section>
{% if sealed_pkg %}
<section class="sealed-package-card" style="margin-top: 20px;">
<header class="sp-head">
<h2>SEALED PACKAGE</h2>
<code class="sp-id">{{ sealed_pkg.package_id }}</code>
</header>
<dl class="sp-meta">
<dt>Created</dt><dd>{{ sealed_pkg.created_at.strftime('%Y-%m-%d %H:%M UTC') }}</dd>
<dt>Plaintext SHA-256</dt><dd class="sp-hash">{{ sealed_pkg.plaintext_hash }}</dd>
<dt>Recipients</dt>
<dd>
<ul class="sp-recipients">
{% for r in sealed_pkg.recipients %}
<li>{{ r }}<span class="sp-size">{{ (sealed_pkg.ciphertext_per_recipient[r] | length) // 4 * 3 }} B</span></li>
{% endfor %}
</ul>
</dd>
</dl>
</section>
{% endif %}
{% if routes or blocked %}
<section class="panel" style="margin-top: 20px;">
<div class="panel-head">
<h1>Routes</h1>
<span class="count">{{ routes|length }} allowed · {{ blocked|length }} blocked</span>
</div>
{% for r in routes %}
<div class="route-allowed">
<span class="route-dest">{{ r.destination_name }}</span>
<span class="route-reason">priority {{ r.priority }} · {{ r.payload_kind }} · max_tlp {{ r.max_tlp_allowed.value }}</span>
</div>
{% endfor %}
{% for b in blocked %}
<div class="route-blocked">
<span class="route-dest">{{ b.destination_name }}</span>
<span class="route-reason">{{ b.reason }}</span>
</div>
{% endfor %}
</section>
{% endif %}
{% if ledger %}
<section class="panel" style="margin-top: 20px;">
<div class="panel-head">
<h1>Ledger (this case)</h1>
<span class="count">{{ ledger|length }} entr{{ 'y' if ledger|length == 1 else 'ies' }}</span>
</div>
<table class="ledger">
<thead>
<tr>
<th>Timestamp</th>
<th>Destination</th>
<th>Payload hash</th>
<th>TLP</th>
<th>Outcome</th>
</tr>
</thead>
<tbody>
{% for e in ledger %}
<tr class="ledger-row{% if e.outcome.value == 'rejected' %} is-rejected{% elif e.outcome.value == 'actioned' %} is-actioned{% endif %}">
<td class="lg-ts">{{ e.timestamp.strftime('%H:%M:%S') }}</td>
<td class="lg-dest">{{ e.destination }}</td>
<td class="lg-hash">{{ e.payload_hash or '—' }}</td>
<td><span class="tlp-badge tlp-{{ e.tlp.value }}">{{ e.tlp.value }}</span></td>
<td><span class="outcome-badge outcome-{{ e.outcome.value }}">{{ e.outcome.value }}</span></td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
{% endif %}
{% endblock %}

View File

@@ -0,0 +1,40 @@
{% extends "base.html" %}
{% block title %}Ledger — psyc{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Immutable Audit Ledger</h1>
<span class="count">{{ total }} entr{{ 'y' if total == 1 else 'ies' }}</span>
</div>
{% if not entries %}
<p class="empty">No ledger entries yet. Run <code>psyc submit &lt;case_id&gt;</code> or <code>psyc demo</code>.</p>
{% else %}
<table class="ledger">
<thead>
<tr>
<th>Timestamp</th>
<th>Case</th>
<th>Destination</th>
<th>Payload hash</th>
<th>Submitter</th>
<th>TLP</th>
<th>Outcome</th>
</tr>
</thead>
<tbody>
{% for e in entries %}
<tr class="ledger-row{% if e.outcome.value == 'rejected' %} is-rejected{% elif e.outcome.value == 'actioned' %} is-actioned{% endif %}">
<td class="lg-ts">{{ e.timestamp.strftime('%Y-%m-%d %H:%M:%S') }}</td>
<td class="lg-case"><a href="/cases/{{ e.case_id }}">{{ e.case_id }}</a></td>
<td class="lg-dest">{{ e.destination }}</td>
<td class="lg-hash">{{ e.payload_hash or '—' }}</td>
<td class="lg-sub">{{ e.submitter_identity }}</td>
<td><span class="tlp-badge tlp-{{ e.tlp.value }}">{{ e.tlp.value }}</span></td>
<td><span class="outcome-badge outcome-{{ e.outcome.value }}">{{ e.outcome.value }}</span></td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
</section>
{% endblock %}

View File

@@ -23,12 +23,12 @@ from sqlalchemy import (
)
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from psyc import log
from psyc import DATA_DIR, log
from psyc.models import Case
from psyc.result import Err, Ok, Result
DB_PATH = Path(__file__).resolve().parent.parent.parent / "data" / "psyc.db"
DB_PATH = DATA_DIR / "psyc.db"
_metadata = MetaData()

View File

@@ -0,0 +1 @@
"""Worker line modules — one file per pipeline stage."""

View File

@@ -0,0 +1,58 @@
"""Classifyline — assigns incident_type, severity, TLP, internal_class."""
from __future__ import annotations
from psyc import log
from psyc.models import Case, IncidentType, InternalClass, Severity, TLP
_log = log.get(__name__)
def classify(case: Case) -> Case:
_classify_incident_type_and_tlp(case)
_classify_severity(case)
_classify_internal_class(case)
_log.info(
"classify.applied",
case_id=case.case_id,
incident_type=case.classification.incident_type.value if case.classification.incident_type else None,
severity=case.classification.severity.value if case.classification.severity else None,
tlp=case.classification.tlp.value,
internal_class=case.classification.internal_class.value if case.classification.internal_class else None,
)
return case
def _classify_incident_type_and_tlp(case: Case) -> None:
if case.classification.incident_type is not None:
return
if case.source_type == "abuse_feed" and case.observables.urls:
case.classification.incident_type = IncidentType.MALWARE
if case.classification.tlp == TLP.AMBER:
case.classification.tlp = TLP.GREEN
def _classify_severity(case: Case) -> None:
if case.classification.severity is not None:
return
if case.victim.critical_infrastructure:
case.classification.severity = Severity.CRITICAL
return
if case.classification.incident_type == IncidentType.MALWARE:
url_status = case.source_metadata.get("url_status", "")
case.classification.severity = Severity.HIGH if url_status == "online" else Severity.MEDIUM
return
case.classification.severity = Severity.MEDIUM
def _classify_internal_class(case: Case) -> None:
if case.classification.internal_class is not None:
return
if case.victim.critical_infrastructure or case.classification.severity == Severity.CRITICAL:
case.classification.internal_class = InternalClass.A
return
if case.classification.severity == Severity.HIGH:
case.classification.internal_class = InternalClass.C
return
case.classification.internal_class = InternalClass.D

168
src/psyc/lines/courier.py Normal file
View File

@@ -0,0 +1,168 @@
"""Courier — payload building + HTTP submission to destination endpoints."""
from __future__ import annotations
import hashlib
import json
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel, Field
from psyc import log
from psyc.lines import ledger as ledger_line
from psyc.lines.route import BlockedRoute, Route, endpoint_for
from psyc.models import Case, Outcome, SealedPackage
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
DEFAULT_TIMEOUT = 10.0
SUBMITTER_IDENTITY = "psyc/courier@0.1"
_STATUS_TO_OUTCOME: Dict[str, Outcome] = {
"submitted": Outcome.SUBMITTED,
"acknowledged": Outcome.ACKNOWLEDGED,
"actioned": Outcome.ACTIONED,
"rejected": Outcome.REJECTED,
}
class Receipt(BaseModel):
receipt_id: str
destination: str
status: str
response_body: Dict[str, Any] = Field(default_factory=dict)
class SubmitResult(BaseModel):
destination_name: str
outcome: Outcome
receipt_id: Optional[str] = None
detail: Optional[str] = None
def build_payload(case: Case, payload_kind: str, sealed_pkg: Optional[SealedPackage] = None) -> Dict[str, Any]:
if payload_kind == "sealed_evidence_package":
if sealed_pkg is None:
return {"case_id": case.case_id, "error": "no sealed package available"}
return sealed_pkg.model_dump(mode="json")
if payload_kind == "stix_indicators":
return {
"spec_version": "2.1",
"type": "bundle",
"case_id": case.case_id,
"tlp": case.classification.tlp.value,
"objects": [
{
"type": "indicator",
"id": f"indicator--{case.case_id}-{i}",
"pattern": f"[url:value = '{u}']",
"valid_from": case.observed_at.isoformat(),
}
for i, u in enumerate(case.observables.urls)
],
}
if payload_kind == "malware_url_report":
return {
"case_id": case.case_id,
"urls": case.observables.urls,
"threat": case.source_metadata.get("threat", "malware_distribution"),
"tags": [t for t in case.source_metadata.get("tags", "").split(",") if t],
}
if payload_kind == "ip_abuse_report":
return {
"case_id": case.case_id,
"ips": case.observables.ips,
"category": "malware_distribution",
"comment": case.summary,
}
return {"case_id": case.case_id, "payload_kind": payload_kind}
def _hash_payload(payload: Dict[str, Any]) -> str:
return hashlib.sha256(json.dumps(payload, sort_keys=True).encode("utf-8")).hexdigest()
def submit(endpoint: str, payload: Dict[str, Any], timeout: float = DEFAULT_TIMEOUT) -> Result[Receipt, str]:
try:
with httpx.Client(timeout=timeout) as client:
r = client.post(endpoint, json=payload)
r.raise_for_status()
body = r.json()
except httpx.HTTPStatusError as exc:
_log.warning("courier.http.error", endpoint=endpoint, status=exc.response.status_code)
return Err(f"HTTP {exc.response.status_code}: {exc.response.text[:200]}")
except httpx.RequestError as exc:
_log.warning("courier.request.error", endpoint=endpoint, error=str(exc))
return Err(f"network error: {exc}")
except Exception as exc:
_log.warning("courier.unknown.error", endpoint=endpoint, error=str(exc))
return Err(f"submit failed: {exc}")
_log.info("courier.submitted", endpoint=endpoint, receipt_id=body.get("receipt_id", ""))
return Ok(Receipt(
receipt_id=body.get("receipt_id", ""),
destination=body.get("destination", ""),
status=body.get("status", "submitted"),
response_body=body,
))
def execute_blocked_routes(case: Case, blocked: List[BlockedRoute]) -> None:
for b in blocked:
ledger_line.write(
case_id=case.case_id,
destination=b.destination_name,
payload_hash="",
submitter_identity="psyc/route-planner@0.1",
tlp=case.classification.tlp,
outcome=Outcome.REJECTED,
detail=b.reason,
)
def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedPackage] = None) -> List[SubmitResult]:
results: List[SubmitResult] = []
for r in routes:
endpoint = endpoint_for(r.destination_name)
if endpoint is None:
ledger_line.write(
case_id=case.case_id,
destination=r.destination_name,
payload_hash="",
submitter_identity=SUBMITTER_IDENTITY,
tlp=case.classification.tlp,
outcome=Outcome.FAILED,
detail="no endpoint configured",
)
results.append(SubmitResult(destination_name=r.destination_name, outcome=Outcome.FAILED, detail="no endpoint configured"))
continue
payload = build_payload(case, r.payload_kind, sealed_pkg)
payload_hash = _hash_payload(payload)
result = submit(endpoint, payload)
if isinstance(result, Err):
ledger_line.write(
case_id=case.case_id,
destination=r.destination_name,
payload_hash=payload_hash,
submitter_identity=SUBMITTER_IDENTITY,
tlp=case.classification.tlp,
outcome=Outcome.FAILED,
detail=result.reason,
)
results.append(SubmitResult(destination_name=r.destination_name, outcome=Outcome.FAILED, detail=result.reason))
continue
receipt = result.value
outcome = _STATUS_TO_OUTCOME.get(receipt.status, Outcome.SUBMITTED)
ledger_line.write(
case_id=case.case_id,
destination=r.destination_name,
payload_hash=payload_hash,
submitter_identity=SUBMITTER_IDENTITY,
tlp=case.classification.tlp,
outcome=outcome,
response_id=receipt.receipt_id,
)
results.append(SubmitResult(destination_name=r.destination_name, outcome=outcome, receipt_id=receipt.receipt_id))
return results

75
src/psyc/lines/ledger.py Normal file
View File

@@ -0,0 +1,75 @@
"""Ledgerline — append-only audit records for external submissions and destructive actions."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import List, Optional
from sqlalchemy import func, insert, select
from psyc import db, log
from psyc.models import LedgerEntry, Outcome, TLP
_log = log.get(__name__)
def write(
case_id: str,
destination: str,
payload_hash: str,
submitter_identity: str,
tlp: TLP,
outcome: Outcome,
response_id: Optional[str] = None,
detail: Optional[str] = None,
) -> None:
stmt = insert(db.ledger).values(
timestamp=datetime.now(timezone.utc).isoformat(),
case_id=case_id,
destination=destination,
payload_hash=payload_hash,
submitter_identity=submitter_identity,
tlp=tlp.value,
response_id=response_id,
outcome=outcome.value,
detail=detail,
)
with db.engine().begin() as conn:
conn.execute(stmt)
_log.info("ledger.write", case_id=case_id, destination=destination, outcome=outcome.value, response_id=response_id)
def _row_to_entry(r) -> LedgerEntry:
return LedgerEntry(
id=r.id,
timestamp=datetime.fromisoformat(r.timestamp),
case_id=r.case_id,
destination=r.destination,
payload_hash=r.payload_hash,
submitter_identity=r.submitter_identity,
tlp=TLP(r.tlp),
response_id=r.response_id,
outcome=Outcome(r.outcome),
detail=r.detail,
)
def list_recent(limit: int = 200) -> List[LedgerEntry]:
stmt = select(db.ledger).order_by(db.ledger.c.timestamp.desc()).limit(limit)
with db.engine().connect() as conn:
rows = conn.execute(stmt).fetchall()
return [_row_to_entry(r) for r in rows]
def list_by_case(case_id: str, limit: int = 200) -> List[LedgerEntry]:
stmt = select(db.ledger).where(db.ledger.c.case_id == case_id).order_by(db.ledger.c.timestamp.desc()).limit(limit)
with db.engine().connect() as conn:
rows = conn.execute(stmt).fetchall()
return [_row_to_entry(r) for r in rows]
def count() -> int:
stmt = select(func.count()).select_from(db.ledger)
with db.engine().connect() as conn:
return conn.execute(stmt).scalar_one()

116
src/psyc/lines/route.py Normal file
View File

@@ -0,0 +1,116 @@
"""Routeline — destination matrix + RoutePlanner."""
from __future__ import annotations
from typing import Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
from psyc import log
from psyc.models import Case, IncidentType, TLP
_log = log.get(__name__)
_TLP_RANK: Dict[TLP, int] = {TLP.CLEAR: 0, TLP.GREEN: 1, TLP.AMBER: 2, TLP.RED: 3}
MOCK_CERT_BASE = "http://127.0.0.1:8770"
ENDPOINTS: Dict[str, str] = {
"CERT-Bund": f"{MOCK_CERT_BASE}/cert-bund/submit",
"MISP-Community": f"{MOCK_CERT_BASE}/misp/event",
"URLhaus": f"{MOCK_CERT_BASE}/urlhaus/submit",
"AbuseIPDB": f"{MOCK_CERT_BASE}/abuseipdb/report",
}
def endpoint_for(destination_name: str) -> Optional[str]:
return ENDPOINTS.get(destination_name)
class Destination(BaseModel):
name: str
kind: str
max_tlp: TLP
accepts: List[IncidentType] = Field(default_factory=list)
priority: int
payload_kind: str
countries: List[str] = Field(default_factory=list)
class Route(BaseModel):
destination_name: str
priority: int
payload_kind: str
max_tlp_allowed: TLP
class BlockedRoute(BaseModel):
destination_name: str
reason: str
DESTINATIONS: List[Destination] = [
Destination(
name="CERT-Bund",
kind="authority",
max_tlp=TLP.RED,
accepts=[IncidentType.MALWARE, IncidentType.RANSOMWARE, IncidentType.PHISHING, IncidentType.EXPLOIT, IncidentType.DATA_LEAK, IncidentType.CREDENTIAL_LEAK],
priority=1,
payload_kind="sealed_evidence_package",
countries=["DE"],
),
Destination(
name="MISP-Community",
kind="cti_sharing",
max_tlp=TLP.AMBER,
accepts=list(IncidentType),
priority=2,
payload_kind="stix_indicators",
),
Destination(
name="URLhaus",
kind="public_abuse",
max_tlp=TLP.GREEN,
accepts=[IncidentType.MALWARE],
priority=3,
payload_kind="malware_url_report",
),
Destination(
name="AbuseIPDB",
kind="public_abuse",
max_tlp=TLP.CLEAR,
accepts=[IncidentType.MALWARE, IncidentType.BOTNET, IncidentType.PHISHING],
priority=4,
payload_kind="ip_abuse_report",
),
]
def _tlp_allowed(submission_tlp: TLP, dest_max_tlp: TLP) -> bool:
return _TLP_RANK[submission_tlp] <= _TLP_RANK[dest_max_tlp]
def plan(case: Case) -> Tuple[List[Route], List[BlockedRoute]]:
routes: List[Route] = []
blocked: List[BlockedRoute] = []
for d in DESTINATIONS:
if not _tlp_allowed(case.classification.tlp, d.max_tlp):
blocked.append(BlockedRoute(destination_name=d.name, reason="tlp_exceeded"))
continue
if case.classification.incident_type is not None and case.classification.incident_type not in d.accepts:
blocked.append(BlockedRoute(destination_name=d.name, reason="incident_type_mismatch"))
continue
if d.countries and case.victim.country not in d.countries:
blocked.append(BlockedRoute(destination_name=d.name, reason="country_mismatch"))
continue
routes.append(Route(
destination_name=d.name,
priority=d.priority,
payload_kind=d.payload_kind,
max_tlp_allowed=d.max_tlp,
))
routes.sort(key=lambda r: r.priority)
_log.info("route.planned", case_id=case.case_id, allowed=len(routes), blocked=len(blocked))
return routes, blocked

View File

@@ -1,4 +1,8 @@
"""Scoutline — Fetcher + Signalizer for URLhaus."""
"""Scoutline — Fetcher + Signalizer for URLhaus.
Emits raw Case objects with source metadata + observables only. Classification,
victim/actor resolution, confidence scoring, sealing, and routing are downstream.
"""
from __future__ import annotations
@@ -11,15 +15,7 @@ from urllib.parse import urlparse
import httpx
from psyc import log
from psyc.models import (
Case,
Classification,
Confidence,
IncidentType,
Observables,
Severity,
TLP,
)
from psyc.models import Case, Observables
URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/"
@@ -68,31 +64,22 @@ def row_to_case(row: Dict[str, str]) -> Case:
parsed = urlparse(url)
host = parsed.hostname or ""
tags = [t.strip() for t in row["tags"].split(",") if t.strip()]
severity = Severity.HIGH if row["url_status"] == "online" else Severity.MEDIUM
classification = Classification(
severity=severity,
tlp=TLP.GREEN,
incident_type=IncidentType.MALWARE,
)
confidence = Confidence(
level="medium",
source_reliability="B",
information_credibility="2",
)
observables = Observables(urls=[url], domains=[host] if host else [])
summary = f"URLhaus: {row['threat'] or 'malware_distribution'} at {host or url}"
if tags:
summary += f" (tags: {', '.join(tags[:4])})"
return Case(
case_id=f"PSYC-URLHAUS-{row['id']}",
summary=summary,
source_type="abuse_feed",
source_ref=row["urlhaus_link"],
source_metadata=dict(
url_status=row["url_status"],
threat=row["threat"],
tags=row["tags"],
reporter=row["reporter"],
),
observed_at=_parse_urlhaus_date(row["dateadded"]),
classification=classification,
confidence=confidence,
observables=observables,
)

113
src/psyc/lines/seal.py Normal file
View File

@@ -0,0 +1,113 @@
"""Sealine — PyNaCl sealed-box evidence encryption + recipient key management."""
from __future__ import annotations
import base64
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from uuid import uuid4
from nacl.public import PrivateKey, PublicKey, SealedBox
from psyc import DATA_DIR, log
from psyc.models import SealedPackage
from psyc.result import Err, Ok, Result
KEYS_DIR = DATA_DIR / "keys"
SEALED_DIR = DATA_DIR / "sealed"
_log = log.get(__name__)
def generate_recipient_keys(recipient: str) -> Path:
KEYS_DIR.mkdir(parents=True, exist_ok=True)
sk = PrivateKey.generate()
pk = sk.public_key
pub_path = KEYS_DIR / f"{recipient}.pub"
sec_path = KEYS_DIR / f"{recipient}.sec"
pub_path.write_text(base64.b64encode(bytes(pk)).decode())
sec_path.write_text(base64.b64encode(bytes(sk)).decode())
sec_path.chmod(0o600)
_log.info("seal.keys.generated", recipient=recipient, pub_path=str(pub_path))
return pub_path
def load_public_key(recipient: str) -> Result[PublicKey, str]:
pub_path = KEYS_DIR / f"{recipient}.pub"
if not pub_path.exists():
return Err(f"no public key for recipient: {recipient}")
return Ok(PublicKey(base64.b64decode(pub_path.read_text())))
def load_private_key(recipient: str) -> Result[PrivateKey, str]:
sec_path = KEYS_DIR / f"{recipient}.sec"
if not sec_path.exists():
return Err(f"no private key for recipient: {recipient}")
return Ok(PrivateKey(base64.b64decode(sec_path.read_text())))
def list_recipients() -> List[str]:
if not KEYS_DIR.exists():
return []
return sorted(p.stem for p in KEYS_DIR.glob("*.pub"))
def seal(plaintext: bytes, recipients: List[str], metadata: Optional[Dict[str, str]] = None) -> Result[SealedPackage, str]:
if not recipients:
return Err("no recipients specified")
SEALED_DIR.mkdir(parents=True, exist_ok=True)
plaintext_hash = hashlib.sha256(plaintext).hexdigest()
ciphertext_per_recipient: Dict[str, str] = {}
for name in recipients:
pk_result = load_public_key(name)
if isinstance(pk_result, Err):
return Err(f"recipient {name}: {pk_result.reason}")
box = SealedBox(pk_result.value)
ciphertext = box.encrypt(plaintext)
ciphertext_per_recipient[name] = base64.b64encode(ciphertext).decode()
package = SealedPackage(
package_id=f"PKG-{uuid4().hex[:12].upper()}",
created_at=datetime.now(timezone.utc),
plaintext_hash=plaintext_hash,
recipients=list(recipients),
ciphertext_per_recipient=ciphertext_per_recipient,
metadata=metadata or {},
)
pkg_path = SEALED_DIR / f"{package.package_id}.json"
pkg_path.write_text(package.model_dump_json(indent=2))
_log.info("seal.packaged", package_id=package.package_id, recipients=recipients, plaintext_hash=plaintext_hash[:16])
return Ok(package)
def load_package(package_id: str) -> Result[SealedPackage, str]:
pkg_path = SEALED_DIR / f"{package_id}.json"
if not pkg_path.exists():
return Err(f"no sealed package: {package_id}")
return Ok(SealedPackage.model_validate_json(pkg_path.read_text()))
def list_packages() -> List[str]:
if not SEALED_DIR.exists():
return []
return sorted(p.stem for p in SEALED_DIR.glob("PKG-*.json"))
def unseal(package_id: str, recipient: str) -> Result[bytes, str]:
pkg_result = load_package(package_id)
if isinstance(pkg_result, Err):
return pkg_result
pkg = pkg_result.value
if recipient not in pkg.ciphertext_per_recipient:
return Err(f"recipient {recipient} is not on this package")
sk_result = load_private_key(recipient)
if isinstance(sk_result, Err):
return sk_result
box = SealedBox(sk_result.value)
try:
plaintext = box.decrypt(base64.b64decode(pkg.ciphertext_per_recipient[recipient]))
except Exception as exc:
return Err(f"decrypt failed: {exc}")
return Ok(plaintext)

85
src/psyc/mock_cert.py Normal file
View File

@@ -0,0 +1,85 @@
"""Mock CERT / abuse-API receiver for the hackathon demo.
Pretends to be CERT-Bund, MISP, URLhaus, and AbuseIPDB. Accepts JSON payloads,
returns a receipt id + status. Submissions are kept in memory AND mirrored to
data/mock_cert/<receipt_id>.json so the demo has an artifact trail.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, List
from uuid import uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
from psyc import DATA_DIR, log
_log = log.get(__name__)
MOCK_LOG_DIR = DATA_DIR / "mock_cert"
class MockSubmission(BaseModel):
receipt_id: str
destination: str
status: str
received_at: datetime
payload: Dict[str, Any] = Field(default_factory=dict)
_submissions: List[MockSubmission] = []
def _record(destination: str, status: str, payload: Dict[str, Any]) -> MockSubmission:
sub = MockSubmission(
receipt_id=f"MOCK-{uuid4().hex[:12].upper()}",
destination=destination,
status=status,
received_at=datetime.now(timezone.utc),
payload=payload,
)
_submissions.append(sub)
MOCK_LOG_DIR.mkdir(parents=True, exist_ok=True)
(MOCK_LOG_DIR / f"{sub.receipt_id}.json").write_text(sub.model_dump_json(indent=2))
_log.info("mock_cert.received", destination=destination, receipt_id=sub.receipt_id, status=status)
return sub
app = FastAPI(title="psyc mock-cert", version="0.1.0")
@app.post("/cert-bund/submit")
def submit_cert_bund(payload: Dict[str, Any]) -> Dict[str, Any]:
sub = _record("CERT-Bund", "acknowledged", payload)
return {"receipt_id": sub.receipt_id, "destination": sub.destination, "status": sub.status, "received_at": sub.received_at.isoformat()}
@app.post("/misp/event")
def submit_misp(payload: Dict[str, Any]) -> Dict[str, Any]:
sub = _record("MISP-Community", "submitted", payload)
return {"receipt_id": sub.receipt_id, "destination": sub.destination, "status": sub.status, "received_at": sub.received_at.isoformat()}
@app.post("/urlhaus/submit")
def submit_urlhaus(payload: Dict[str, Any]) -> Dict[str, Any]:
sub = _record("URLhaus", "actioned", payload)
return {"receipt_id": sub.receipt_id, "destination": sub.destination, "status": sub.status, "received_at": sub.received_at.isoformat()}
@app.post("/abuseipdb/report")
def submit_abuseipdb(payload: Dict[str, Any]) -> Dict[str, Any]:
sub = _record("AbuseIPDB", "submitted", payload)
return {"receipt_id": sub.receipt_id, "destination": sub.destination, "status": sub.status, "received_at": sub.received_at.isoformat()}
@app.get("/received")
def received() -> Dict[str, Any]:
return {"count": len(_submissions), "submissions": [s.model_dump(mode="json") for s in _submissions]}
@app.get("/healthz")
def healthz() -> Dict[str, str]:
return {"status": "ok", "received": str(len(_submissions))}

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from typing import List, Optional
from typing import Dict, List, Optional
from uuid import uuid4
from pydantic import BaseModel, Field
@@ -104,6 +104,7 @@ class Case(BaseModel):
summary: str = ""
source_type: str = ""
source_ref: str = ""
source_metadata: Dict[str, str] = Field(default_factory=dict)
observed_at: datetime = Field(default_factory=utcnow)
ingested_at: datetime = Field(default_factory=utcnow)
classification: Classification = Field(default_factory=Classification)
@@ -113,3 +114,33 @@ class Case(BaseModel):
observables: Observables = Field(default_factory=Observables)
evidence: Evidence = Field(default_factory=Evidence)
routing: Routing = Field(default_factory=Routing)
class SealedPackage(BaseModel):
package_id: str
created_at: datetime
plaintext_hash: str
recipients: List[str] = Field(default_factory=list)
ciphertext_per_recipient: Dict[str, str] = Field(default_factory=dict)
metadata: Dict[str, str] = Field(default_factory=dict)
class Outcome(str, Enum):
SUBMITTED = "submitted"
ACKNOWLEDGED = "acknowledged"
REJECTED = "rejected"
ACTIONED = "actioned"
FAILED = "failed"
class LedgerEntry(BaseModel):
id: int
timestamp: datetime
case_id: str
destination: str
payload_hash: str
submitter_identity: str
tlp: TLP
response_id: Optional[str] = None
outcome: Outcome
detail: Optional[str] = None