Compare commits

...

6 Commits

Author SHA1 Message Date
m17hr1l
ee387abcd4 stage-21: swap inference server to psyc-v5 adapter
v5 trained on 598 ex/task (20× v4's 30), with --defang-frac 0.5 over
the new ThreatFox + MalwareBazaar + OTX corpus. Final train_loss 0.3225
vs v4's 0.7397 (56% reduction), 60m20s wall clock on a 3090.

Live eval before swap:
- severity (botnet, ONLINE): v4 high / v5 high — tied, both correct
- ioc_extraction with defanged input (hxxps://, [.], (.), [dot]):
    v4 kept hxxps:// in output (failed canonicalization)
    v5 returned canonical https:// — defang training paid off
- ioc_extraction on real OTX-style prose (never trained on this shape):
    v5 cleanly extracted 2 domains + 1 IP + 1 SHA256 + 1 CVE

Cockpit /api/inference-status confirms the swap:
  {"online":true,"adapter":"/data/adapters/psyc-v5/final"}

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 23:55:47 +02:00
m17hr1l
376c5b6f4a stage-19-fix2: OTX — narrow by modified_since, longer timeout
The /pulses/subscribed endpoint enumerates every curated feed a fresh
account is auto-subscribed to. On its own that's enough to 504 from
OTX's backend regardless of client timeout. Narrowing by
modified_since=now-7d brings the response back to a single-second fetch.

Also: _http now accepts params + per-call timeout overrides (OTX uses
120s). The CLI --limit still slices post-fetch.

Verified live: 10 OTX pulse-cases ingested, each carrying real
paragraph-form descriptions (Mirai, macOS Stealer, FlowerStorm PhaaS,
Vidar v1.5, manufacturing intrusion) — exactly the real-prose source
the IOC extractor's been missing.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:39:24 +02:00
m17hr1l
f6fa52839f stage-20: defanging pipeline for IOC-extraction augmentation
Real CTI prose defangs IOCs (1[.]2[.]3[.]4, hxxp://, evil[dot]com) so they
don't auto-link in email/chat. A model trained only on canonical inputs
will fail to extract them.

New lines/defang.py: defang_ip, defang_domain, defang_url, defang_text —
four dot-styles ([.], (.), [dot], {.}) plus protocol defanging
(http→hxxp, https→hxxps). Each occurrence picks its style independently
since real advisories don't keep one style across paragraphs.

train.BuildOptions adds defang_frac (default 0.0) and seed; build()
threads options + a seeded Random through the example builders so
the augmentation is reproducible. Only _ex_ioc_extraction reads it
today — output stays canonical so the model learns messy→canonical.

CLI: train-build and train-build-all gain --defang-frac and --seed.
8 new tests including a frac=1.0 / output-canonical integration check.
The pipeline runs but is dormant at defang_frac=0.0 — psyc-v5 dataset
build will set 0.5 once OTX cases land.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:33:52 +02:00
m17hr1l
85830be9fa stage-19-fix: ThreatFox + MalwareBazaar — real API shape
Live test against abuse.ch revealed two issues with the stage-19 wiring:

- ThreatFox returns `ioc` (not `ioc_value`) and `first_seen` (not
  `first_seen_utc`) — older field names from stale docs. Parser now reads
  the real names and falls back to the old aliases defensively. Also
  captures `malware_malpedia` (per-family writeup URL) and
  `threat_type_desc` for richer downstream prose.
- MalwareBazaar's API expects form-encoded bodies, unlike ThreatFox's
  JSON. Extended _http with form_body=; MB fetcher switched to it.

Verified live: 10 ThreatFox cases landed with mixed botnet/malware
classification (4/6 split from threat_type signal — first real
incident-type diversity from a single feed). 10 MalwareBazaar cases
landed with sha256+sha1 hash observables and exe/file_type metadata.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:25:56 +02:00
m17hr1l
d87bd710bb stage-19: ThreatFox + MalwareBazaar + OTX Scoutline sources
Three new feeds — biggest near-term data-diversity win. ThreatFox brings
multi-malware IOCs with threat_type signal (botnet_cc → BOTNET,
payload_delivery → MALWARE, phishing → PHISHING). MalwareBazaar brings
file-hash samples with signatures. OTX brings curated multi-source pulses
with paragraph-form descriptions — by far the richest real-prose source.

Auth: THREATFOX_AUTH_KEY (one abuse.ch key covers ThreatFox + MalwareBazaar)
and OTX_API_KEY. fetch-all skips keyed feeds cleanly with where-to-get-it
guidance instead of tracebacking. Proofline reliability table extended;
abuse.ch sources rated B/2, OTX rated C/3 (community-driven).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:14:18 +02:00
m17hr1l
994a5c642f stage-18: approval queue — human gate before evidence leaves
CERT-Bund (authority) requires_approval by default; PSYC_REQUIRE_APPROVAL=1
forces every routable submission through the queue. Courier branches at
execute_routes: approval-required → freeze payload + enqueue, no HTTP; else
submit directly as before. Approve dispatches the frozen payload to mock-cert
and writes the ledger row (detail=approved_by=…); reject writes a ledger row
with the reviewer's reason. CLI: queue / approve / reject. Cockpit /queue
page with POST approve / reject and counts.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 21:42:08 +02:00
19 changed files with 1135 additions and 30 deletions

View File

@@ -51,7 +51,7 @@ services:
# `--profile gpu`. Uses the psyc-trainer image (built from Dockerfile.train).
inference:
image: psyc-trainer
command: ["/scripts/serve_model.py", "--adapter", "/data/adapters/psyc-v4/final", "--host", "0.0.0.0", "--port", "8771"]
command: ["/scripts/serve_model.py", "--adapter", "/data/adapters/psyc-v5/final", "--host", "0.0.0.0", "--port", "8771"]
volumes:
- ./data:/data
- ./scripts:/scripts

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import List
from typing import List, Optional
import typer
import uvicorn
@@ -88,10 +88,34 @@ def fetch_feodo(limit: int = typer.Option(50, help="max C2 records to ingest"))
_ingest("feodo", limit)
@app.command("fetch-threatfox")
def fetch_threatfox(limit: int = typer.Option(200, help="max IOCs to ingest")) -> None:
"""ThreatFox (abuse.ch) — needs THREATFOX_AUTH_KEY in .env."""
_ingest("threatfox", limit)
@app.command("fetch-malware-bazaar")
def fetch_malware_bazaar(limit: int = typer.Option(100, help="max samples to ingest")) -> None:
"""MalwareBazaar (abuse.ch) — also uses THREATFOX_AUTH_KEY."""
_ingest("malware-bazaar", limit)
@app.command("fetch-otx")
def fetch_otx(limit: int = typer.Option(100, help="max pulse-cases to ingest")) -> None:
"""AlienVault OTX — needs OTX_API_KEY in .env."""
_ingest("otx", limit)
@app.command("fetch-all")
def fetch_all() -> None:
for source, limit in (("urlhaus", 50), ("cisa-kev", 100), ("feodo", 50)):
_ingest(source, limit)
"""Fetch every configured source. Keyed feeds skip cleanly when the key is missing."""
plan = (("urlhaus", 50), ("cisa-kev", 100), ("feodo", 50),
("threatfox", 200), ("malware-bazaar", 100), ("otx", 100))
for source, limit in plan:
try:
_ingest(source, limit)
except RuntimeError as exc:
typer.echo(f" skip {source}: {exc}", err=True)
@app.command("classify-case")
@@ -284,6 +308,55 @@ def submit_case(case_id: str) -> None:
typer.echo(f"{b.destination_name}: {b.reason} (logged)")
@app.command("queue")
def queue_list(
status: str = typer.Option("pending", help="pending | approved | rejected | all"),
limit: int = typer.Option(50, help="max rows"),
) -> None:
"""List the approval queue."""
from psyc.models import ApprovalStatus
status_filter = None if status == "all" else ApprovalStatus(status)
rows = courier.list_pending(status=status_filter, limit=limit)
if not rows:
typer.echo(f"(no submissions with status={status})")
return
for p in rows:
rev = f" by {p.reviewer}" if p.reviewer else ""
typer.echo(
f" #{p.id} {p.status.value:9s} {p.destination_name:16s} {p.case_id} "
f"({p.payload_kind}, tlp={p.tlp.value}){rev}"
)
@app.command("approve")
def approve(
pending_id: int = typer.Argument(..., help="pending submission id"),
reviewer: str = typer.Option("operator", "--by", help="reviewer identity"),
) -> None:
"""Approve a pending submission — dispatches to its destination."""
result = courier.dispatch_pending(pending_id, reviewer=reviewer)
if isinstance(result, Err):
typer.echo(f"error: {result.reason}", err=True)
raise typer.Exit(1)
r = result.value
rcpt = f"{r.receipt_id}" if r.receipt_id else ""
typer.echo(f"approved #{pending_id} · {r.destination_name}: {r.outcome.value}{rcpt}")
@app.command("reject")
def reject(
pending_id: int = typer.Argument(..., help="pending submission id"),
reviewer: str = typer.Option("operator", "--by", help="reviewer identity"),
reason: str = typer.Option("", "--reason", help="rejection reason"),
) -> None:
"""Reject a pending submission — nothing leaves; ledger row written."""
result = courier.reject_pending(pending_id, reviewer=reviewer, reason=reason)
if isinstance(result, Err):
typer.echo(f"error: {result.reason}", err=True)
raise typer.Exit(1)
typer.echo(f"rejected #{pending_id}{(': ' + reason) if reason else ''}")
@app.command("mock-cert")
def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
uvicorn.run("psyc.mock_cert:app", host=host, port=port)
@@ -293,12 +366,15 @@ def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
def train_build(
task: str = typer.Option(..., "--task", "-t", help=f"one of: {', '.join(train.TASKS)}"),
limit: int = typer.Option(10_000, help="max cases to process"),
defang_frac: float = typer.Option(0.0, "--defang-frac", help="fraction of ioc_extraction inputs to defang ([0.0, 1.0])"),
seed: Optional[int] = typer.Option(None, "--seed", help="rng seed for reproducible defanging"),
) -> None:
if task not in train.TASKS:
typer.echo(f"unknown task: {task}; choices: {', '.join(train.TASKS)}", err=True)
raise typer.Exit(1)
cases = db.list_cases(limit=limit)
report = train.build(task, cases)
options = train.BuildOptions(defang_frac=defang_frac, seed=seed)
report = train.build(task, cases, options=options)
typer.echo(f"task: {report.task}")
typer.echo(f"path: {report.path}")
typer.echo(f" written: {report.written}")
@@ -309,10 +385,15 @@ def train_build(
@app.command("train-build-all")
def train_build_all(limit: int = typer.Option(10_000, help="max cases per task")) -> None:
def train_build_all(
limit: int = typer.Option(10_000, help="max cases per task"),
defang_frac: float = typer.Option(0.0, "--defang-frac", help="fraction of ioc_extraction inputs to defang ([0.0, 1.0])"),
seed: Optional[int] = typer.Option(None, "--seed", help="rng seed for reproducible defanging"),
) -> None:
cases = db.list_cases(limit=limit)
options = train.BuildOptions(defang_frac=defang_frac, seed=seed)
for task in train.TASKS:
report = train.build(task, cases)
report = train.build(task, cases, options=options)
typer.echo(f" {task}: wrote {report.written}{report.path.name}")

View File

@@ -5,13 +5,14 @@ from __future__ import annotations
from pathlib import Path
from typing import List
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from psyc import db, log
from psyc.cockpit import inference, journey as journey_view
from psyc.lines import courier as courier_line
from psyc.lines import ledger as ledger_line
from psyc.lines import route as route_line
from psyc.lines import seal as seal_line
@@ -102,3 +103,36 @@ def healthz() -> dict:
def inference_status() -> dict:
adapter = inference.server_adapter()
return {"online": adapter is not None, "adapter": adapter}
@app.get("/queue", response_class=HTMLResponse)
def queue_view(request: Request, status: str = "pending") -> HTMLResponse:
from psyc.models import ApprovalStatus
status_filter = None if status == "all" else ApprovalStatus(status)
rows = courier_line.list_pending(status=status_filter, limit=200)
counts = {
"pending": courier_line.pending_count(ApprovalStatus.PENDING),
"approved": courier_line.pending_count(ApprovalStatus.APPROVED),
"rejected": courier_line.pending_count(ApprovalStatus.REJECTED),
}
return TEMPLATES.TemplateResponse(
request,
"queue.html",
{"rows": rows, "counts": counts, "current_status": status},
)
@app.post("/queue/approve/{pid}")
def queue_approve(pid: int, reviewer: str = Form("operator")) -> RedirectResponse:
result = courier_line.dispatch_pending(pid, reviewer=reviewer)
if isinstance(result, Err):
_log.warning("cockpit.queue.approve.error", pending_id=pid, reason=result.reason)
return RedirectResponse("/queue", status_code=303)
@app.post("/queue/reject/{pid}")
def queue_reject(pid: int, reviewer: str = Form("operator"), reason: str = Form("")) -> RedirectResponse:
result = courier_line.reject_pending(pid, reviewer=reviewer, reason=reason)
if isinstance(result, Err):
_log.warning("cockpit.queue.reject.error", pending_id=pid, reason=result.reason)
return RedirectResponse("/queue", status_code=303)

View File

@@ -285,3 +285,32 @@ tr.sev-low .sev-badge { color: var(--muted); }
border-radius: 4px; padding: 4px 12px; font: inherit; font-size: 12px; cursor: pointer;
}
.replay-btn:hover { border-color: var(--accent); }
/* ── approval queue ─────────────────────────────────────────── */
.queue-tabs { display: flex; gap: 4px; margin: 12px 0 16px; border-bottom: 1px solid var(--border); }
.queue-tab {
padding: 6px 14px; color: var(--muted); border: 1px solid transparent;
border-bottom: none; border-radius: 4px 4px 0 0; font-size: 12px;
text-transform: uppercase; letter-spacing: 0.05em;
}
.queue-tab:hover { color: var(--text); }
.queue-tab.is-active {
color: var(--accent); border-color: var(--border) var(--border) transparent;
background: var(--panel-2);
}
.queue-action { display: inline-flex; gap: 4px; margin-right: 6px; align-items: center; }
.btn {
background: var(--panel-2); color: var(--text); border: 1px solid var(--border);
border-radius: 3px; padding: 4px 10px; font: inherit; font-size: 12px; cursor: pointer;
}
.btn:hover { border-color: var(--accent); }
.btn-approve { color: var(--green); border-color: rgba(74, 222, 128, 0.35); }
.btn-approve:hover { background: rgba(74, 222, 128, 0.08); border-color: var(--green); }
.btn-reject { color: var(--red); border-color: rgba(248, 113, 113, 0.35); }
.btn-reject:hover { background: rgba(248, 113, 113, 0.08); border-color: var(--red); }
.reject-reason {
background: var(--bg); color: var(--text); border: 1px solid var(--border);
border-radius: 3px; padding: 3px 6px; font: inherit; font-size: 11px; width: 130px;
}
.reject-reason::placeholder { color: var(--muted); }
.outcome-pending_approval { background: rgba(251, 191, 36, 0.15); color: var(--amber); border: 1px solid rgba(251, 191, 36, 0.4); }

View File

@@ -18,6 +18,7 @@
</a>
<nav class="nav">
<a href="/cases">Cases</a>
<a href="/queue">Queue</a>
<a href="/ledger">Ledger</a>
<a href="/train">Trainline</a>
</nav>

View File

@@ -0,0 +1,73 @@
{% extends "base.html" %}
{% block title %}Approval Queue — psyc{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Submission Approval Queue</h1>
<span class="count">{{ counts.pending }} pending · {{ counts.approved }} approved · {{ counts.rejected }} rejected</span>
</div>
<p class="page-intro">Nothing leaves psyc to an authority destination without a human signing off. Routing builds the payload, freezes it here, and waits. You approve — Courier dispatches and the Ledger records. You reject — nothing leaves, and the rejection is recorded too.</p>
<details class="page-help">
<summary>how to use this view</summary>
<div class="help-body">
<p><b>How to use.</b> Each row is a payload waiting on you. Click the case to inspect it before deciding. Approve sends it now; Reject blocks it forever (the case can still be re-submitted later from CLI if appropriate).</p>
<p><b>What you're seeing.</b> Pending submissions for destinations marked <code>requires_approval=True</code> — CERT-Bund by default, or <em>all</em> destinations when <code>PSYC_REQUIRE_APPROVAL=1</code>.</p>
<p><b>Why it matters.</b> The dossier mandates a human gate before evidence reaches real authority systems. This is that gate. The frozen payload guarantees the reviewer approves exactly what gets sent — not a re-derived version that might have drifted.</p>
</div>
</details>
<div class="queue-tabs">
<a class="queue-tab{% if current_status == 'pending' %} is-active{% endif %}" href="/queue?status=pending">pending ({{ counts.pending }})</a>
<a class="queue-tab{% if current_status == 'approved' %} is-active{% endif %}" href="/queue?status=approved">approved ({{ counts.approved }})</a>
<a class="queue-tab{% if current_status == 'rejected' %} is-active{% endif %}" href="/queue?status=rejected">rejected ({{ counts.rejected }})</a>
<a class="queue-tab{% if current_status == 'all' %} is-active{% endif %}" href="/queue?status=all">all</a>
</div>
{% if not rows %}
<p class="empty">Queue is clear. Either nothing is pending, or no destination on the current cases requires approval. Set <code>PSYC_REQUIRE_APPROVAL=1</code> to force every routable submission through this gate.</p>
{% else %}
<table class="ledger">
<thead>
<tr>
<th>#</th>
<th>Created</th>
<th>Case</th>
<th>Destination</th>
<th>Payload</th>
<th>TLP</th>
<th>Hash</th>
<th>Status</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for p in rows %}
<tr class="ledger-row{% if p.status.value == 'rejected' %} is-rejected{% elif p.status.value == 'approved' %} is-actioned{% endif %}">
<td>#{{ p.id }}</td>
<td class="lg-ts">{{ p.created_at.strftime('%Y-%m-%d %H:%M:%S') }}</td>
<td class="lg-case"><a href="/cases/{{ p.case_id }}">{{ p.case_id }}</a></td>
<td class="lg-dest">{{ p.destination_name }}</td>
<td>{{ p.payload_kind }}</td>
<td><span class="tlp-badge tlp-{{ p.tlp.value }}">{{ p.tlp.value }}</span></td>
<td class="lg-hash">{{ p.payload_hash[:12] }}…</td>
<td><span class="outcome-badge outcome-{{ 'submitted' if p.status.value == 'approved' else ('rejected' if p.status.value == 'rejected' else 'pending_approval') }}">{{ p.status.value }}</span></td>
<td>
{% if p.status.value == 'pending' %}
<form method="post" action="/queue/approve/{{ p.id }}" class="queue-action">
<button type="submit" class="btn btn-approve">approve</button>
</form>
<form method="post" action="/queue/reject/{{ p.id }}" class="queue-action">
<input type="text" name="reason" placeholder="reason (optional)" class="reject-reason">
<button type="submit" class="btn btn-reject">reject</button>
</form>
{% else %}
<span class="lg-sub">{{ p.reviewer or '—' }}{% if p.reason %} · {{ p.reason }}{% endif %}</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
</section>
{% endblock %}

View File

@@ -64,6 +64,24 @@ ledger = Table(
Index("ledger_case_idx", ledger.c.case_id)
Index("ledger_time_idx", ledger.c.timestamp.desc())
pending = Table(
"pending_submissions", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("case_id", String, nullable=False),
Column("destination_name", String, nullable=False),
Column("payload_kind", String, nullable=False),
Column("payload_hash", String, nullable=False),
Column("payload_json", Text, nullable=False),
Column("tlp", String, nullable=False),
Column("created_at", String, nullable=False),
Column("status", String, nullable=False),
Column("reviewer", String, nullable=True),
Column("reviewed_at", String, nullable=True),
Column("reason", String, nullable=True),
)
Index("pending_status_idx", pending.c.status)
Index("pending_case_idx", pending.c.case_id)
_log = log.get(__name__)
_engine: Optional[Engine] = None

View File

@@ -12,6 +12,16 @@ _FEED_INCIDENT = {
"urlhaus": IncidentType.MALWARE,
"feodo": IncidentType.BOTNET,
"cisa-kev": IncidentType.EXPLOIT,
"malware-bazaar": IncidentType.MALWARE,
"otx": IncidentType.MALWARE, # default; OTX pulses span many types
}
# ThreatFox carries its own type signal — map it instead of using a feed default.
_THREATFOX_THREAT_TYPE = {
"botnet_cc": IncidentType.BOTNET,
"payload_delivery": IncidentType.MALWARE,
"payload": IncidentType.MALWARE,
"phishing": IncidentType.PHISHING,
}
@@ -33,7 +43,11 @@ def classify(case: Case) -> Case:
def _classify_incident_type_and_tlp(case: Case) -> None:
if case.classification.incident_type is not None:
return
incident = _FEED_INCIDENT.get(case.source_metadata.get("feed", ""))
feed = case.source_metadata.get("feed", "")
if feed == "threatfox":
incident = _THREATFOX_THREAT_TYPE.get(case.source_metadata.get("threat_type", ""), IncidentType.MALWARE)
else:
incident = _FEED_INCIDENT.get(feed)
if incident is None and case.observables.urls:
incident = IncidentType.MALWARE # fallback for un-tagged feeds
if incident is None:

View File

@@ -1,18 +1,21 @@
"""Courier — payload building + HTTP submission to destination endpoints."""
"""Courier — payload building + HTTP submission, with optional approval queue."""
from __future__ import annotations
import hashlib
import json
import os
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel, Field
from sqlalchemy import select, update
from psyc import log
from psyc import db, log
from psyc.lines import ledger as ledger_line
from psyc.lines.route import BlockedRoute, Route, endpoint_for
from psyc.models import Case, Outcome, SealedPackage
from psyc.models import ApprovalStatus, Case, Outcome, PendingSubmission, SealedPackage, TLP
from psyc.result import Err, Ok, Result
@@ -122,8 +125,32 @@ def execute_blocked_routes(case: Case, blocked: List[BlockedRoute]) -> None:
)
def _force_approval() -> bool:
return os.environ.get("PSYC_REQUIRE_APPROVAL", "").lower() in ("1", "true", "yes")
def _enqueue_pending(case: Case, route: Route, payload: Dict[str, Any], payload_hash: str) -> int:
now = datetime.now(timezone.utc).isoformat()
stmt = db.pending.insert().values(
case_id=case.case_id,
destination_name=route.destination_name,
payload_kind=route.payload_kind,
payload_hash=payload_hash,
payload_json=json.dumps(payload, sort_keys=True),
tlp=case.classification.tlp.value,
created_at=now,
status=ApprovalStatus.PENDING.value,
)
with db.engine().begin() as conn:
res = conn.execute(stmt)
pid = int(res.inserted_primary_key[0])
_log.info("courier.queued", case_id=case.case_id, destination=route.destination_name, pending_id=pid)
return pid
def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedPackage] = None) -> List[SubmitResult]:
results: List[SubmitResult] = []
force = _force_approval()
for r in routes:
endpoint = endpoint_for(r.destination_name)
if endpoint is None:
@@ -140,6 +167,14 @@ def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedP
continue
payload = build_payload(case, r.payload_kind, sealed_pkg)
payload_hash = _hash_payload(payload)
if r.requires_approval or force:
pid = _enqueue_pending(case, r, payload, payload_hash)
results.append(SubmitResult(
destination_name=r.destination_name,
outcome=Outcome.PENDING_APPROVAL,
detail=f"pending_id={pid}",
))
continue
result = submit(endpoint, payload)
if isinstance(result, Err):
ledger_line.write(
@@ -166,3 +201,129 @@ def execute_routes(case: Case, routes: List[Route], sealed_pkg: Optional[SealedP
)
results.append(SubmitResult(destination_name=r.destination_name, outcome=outcome, receipt_id=receipt.receipt_id))
return results
def _row_to_pending(row: Any) -> PendingSubmission:
return PendingSubmission(
id=row.id,
case_id=row.case_id,
destination_name=row.destination_name,
payload_kind=row.payload_kind,
payload_hash=row.payload_hash,
payload_json=row.payload_json,
tlp=TLP(row.tlp),
created_at=datetime.fromisoformat(row.created_at),
status=ApprovalStatus(row.status),
reviewer=row.reviewer,
reviewed_at=datetime.fromisoformat(row.reviewed_at) if row.reviewed_at else None,
reason=row.reason,
)
def list_pending(status: Optional[ApprovalStatus] = ApprovalStatus.PENDING, limit: int = 200) -> List[PendingSubmission]:
stmt = select(db.pending)
if status is not None:
stmt = stmt.where(db.pending.c.status == status.value)
stmt = stmt.order_by(db.pending.c.created_at.desc()).limit(limit)
with db.engine().connect() as conn:
rows = conn.execute(stmt).fetchall()
return [_row_to_pending(r) for r in rows]
def get_pending(pid: int) -> Result[PendingSubmission, str]:
stmt = select(db.pending).where(db.pending.c.id == pid)
with db.engine().connect() as conn:
row = conn.execute(stmt).fetchone()
if row is None:
return Err(f"pending submission not found: {pid}")
return Ok(_row_to_pending(row))
def pending_count(status: ApprovalStatus = ApprovalStatus.PENDING) -> int:
from sqlalchemy import func as sa_func
stmt = select(sa_func.count()).select_from(db.pending).where(db.pending.c.status == status.value)
with db.engine().connect() as conn:
return int(conn.execute(stmt).scalar_one())
def dispatch_pending(pid: int, reviewer: str = "operator") -> Result[SubmitResult, str]:
"""Approve and submit a pending entry — POST to destination, write ledger, mark approved."""
pending_r = get_pending(pid)
if isinstance(pending_r, Err):
return Err(pending_r.reason)
p = pending_r.value
if p.status != ApprovalStatus.PENDING:
return Err(f"pending submission {pid} is already {p.status.value}")
endpoint = endpoint_for(p.destination_name)
if endpoint is None:
return Err(f"no endpoint configured for {p.destination_name}")
payload = json.loads(p.payload_json)
result = submit(endpoint, payload)
now = datetime.now(timezone.utc).isoformat()
if isinstance(result, Err):
ledger_line.write(
case_id=p.case_id,
destination=p.destination_name,
payload_hash=p.payload_hash,
submitter_identity=SUBMITTER_IDENTITY,
tlp=p.tlp,
outcome=Outcome.FAILED,
detail=result.reason,
)
with db.engine().begin() as conn:
conn.execute(update(db.pending).where(db.pending.c.id == pid).values(
status=ApprovalStatus.APPROVED.value,
reviewer=reviewer,
reviewed_at=now,
reason=f"submit failed: {result.reason}",
))
return Ok(SubmitResult(destination_name=p.destination_name, outcome=Outcome.FAILED, detail=result.reason))
receipt = result.value
outcome = _STATUS_TO_OUTCOME.get(receipt.status, Outcome.SUBMITTED)
ledger_line.write(
case_id=p.case_id,
destination=p.destination_name,
payload_hash=p.payload_hash,
submitter_identity=SUBMITTER_IDENTITY,
tlp=p.tlp,
outcome=outcome,
response_id=receipt.receipt_id,
detail=f"approved_by={reviewer}",
)
with db.engine().begin() as conn:
conn.execute(update(db.pending).where(db.pending.c.id == pid).values(
status=ApprovalStatus.APPROVED.value,
reviewer=reviewer,
reviewed_at=now,
))
_log.info("courier.approved", pending_id=pid, reviewer=reviewer, outcome=outcome.value)
return Ok(SubmitResult(destination_name=p.destination_name, outcome=outcome, receipt_id=receipt.receipt_id))
def reject_pending(pid: int, reviewer: str = "operator", reason: str = "") -> Result[None, str]:
"""Reject a pending entry — write ledger reject row, mark rejected. Nothing leaves."""
pending_r = get_pending(pid)
if isinstance(pending_r, Err):
return Err(pending_r.reason)
p = pending_r.value
if p.status != ApprovalStatus.PENDING:
return Err(f"pending submission {pid} is already {p.status.value}")
now = datetime.now(timezone.utc).isoformat()
ledger_line.write(
case_id=p.case_id,
destination=p.destination_name,
payload_hash=p.payload_hash,
submitter_identity=SUBMITTER_IDENTITY,
tlp=p.tlp,
outcome=Outcome.REJECTED,
detail=f"rejected_by={reviewer}: {reason}" if reason else f"rejected_by={reviewer}",
)
with db.engine().begin() as conn:
conn.execute(update(db.pending).where(db.pending.c.id == pid).values(
status=ApprovalStatus.REJECTED.value,
reviewer=reviewer,
reviewed_at=now,
reason=reason or None,
))
_log.info("courier.rejected", pending_id=pid, reviewer=reviewer)
return Ok(None)

73
src/psyc/lines/defang.py Normal file
View File

@@ -0,0 +1,73 @@
"""Defanging — IOC obfuscation styles common in real CTI prose.
Real advisories don't write `1.2.3.4` and `http://evil.com` verbatim; they
defang IOCs into bracket/paren/word forms (`1[.]2[.]3[.]4`, `hxxp://evil[.]com`)
so indicators don't auto-link in email/chat clients. Training the IOC extractor
purely on canonical inputs leaves it brittle. This module corrupts canonical
IOCs into common defanged forms for use as training-time data augmentation.
"""
from __future__ import annotations
import random
from typing import List, Optional
# Dot replacement styles seen in the wild, in rough frequency order.
_DOT_FORMS = ("[.]", "(.)", "[dot]", "{.}")
_PROTOCOL_FORMS = {
"http://": "hxxp://",
"https://": "hxxps://",
}
def _rng(r: Optional[random.Random]) -> random.Random:
return r if r is not None else random.Random()
def defang_ip(ip: str, rng: Optional[random.Random] = None) -> str:
"""`1.2.3.4` → `1[.]2[.]3[.]4` (one randomly chosen dot style)."""
return ip.replace(".", _rng(rng).choice(_DOT_FORMS))
def defang_domain(domain: str, rng: Optional[random.Random] = None) -> str:
"""`evil.com` → `evil[.]com`."""
return domain.replace(".", _rng(rng).choice(_DOT_FORMS))
def defang_url(url: str, rng: Optional[random.Random] = None) -> str:
"""`http://evil.com/x` → `hxxp://evil[.]com/x` — protocol + dot defanging."""
r = _rng(rng)
out = url
for proto, replacement in _PROTOCOL_FORMS.items():
if out.startswith(proto):
out = replacement + out[len(proto):]
break
out = out.replace(".", r.choice(_DOT_FORMS))
return out
def defang_text(
text: str,
ips: List[str],
domains: List[str],
urls: List[str],
rng: Optional[random.Random] = None,
) -> str:
"""Defang every occurrence of the given IOCs inside a free-text body.
URLs are replaced before domains (URLs contain domain substrings, so
domain-first would corrupt the URL match). Likewise IPs last. Each
occurrence picks its own dot-style independently — real advisories don't
keep one style consistent across paragraphs.
"""
r = _rng(rng)
out = text
for u in sorted(set(urls), key=len, reverse=True):
out = out.replace(u, defang_url(u, r))
for d in sorted(set(domains), key=len, reverse=True):
out = out.replace(d, defang_domain(d, r))
for i in sorted(set(ips), key=len, reverse=True):
out = out.replace(i, defang_ip(i, r))
return out

View File

@@ -23,9 +23,12 @@ _SHA_RE = re.compile(r"^[a-fA-F0-9]{32,64}$")
# feed -> (Admiralty source reliability A-F, information credibility 1-6)
_FEED_RELIABILITY = {
"cisa-kev": ("A", "1"), # government catalog, confirmed exploited
"urlhaus": ("B", "2"), # established CTI source, confirmed malware
"feodo": ("B", "2"), # established CTI source, confirmed C2
"cisa-kev": ("A", "1"), # government catalog, confirmed exploited
"urlhaus": ("B", "2"), # established CTI source, confirmed malware
"feodo": ("B", "2"), # established CTI source, confirmed C2
"threatfox": ("B", "2"), # abuse.ch CTI source
"malware-bazaar": ("B", "2"), # abuse.ch CTI source, confirmed sample
"otx": ("C", "3"), # community-driven, varying quality
}

View File

@@ -38,6 +38,7 @@ class Destination(BaseModel):
priority: int
payload_kind: str
countries: List[str] = Field(default_factory=list)
requires_approval: bool = False
class Route(BaseModel):
@@ -45,6 +46,7 @@ class Route(BaseModel):
priority: int
payload_kind: str
max_tlp_allowed: TLP
requires_approval: bool = False
class BlockedRoute(BaseModel):
@@ -61,6 +63,7 @@ DESTINATIONS: List[Destination] = [
priority=1,
payload_kind="sealed_evidence_package",
countries=["DE"],
requires_approval=True,
),
Destination(
name="MISP-Community",
@@ -111,6 +114,7 @@ def plan(case: Case) -> Tuple[List[Route], List[BlockedRoute]]:
priority=d.priority,
payload_kind=d.payload_kind,
max_tlp_allowed=d.max_tlp,
requires_approval=d.requires_approval,
))
routes.sort(key=lambda r: r.priority)
_log.info("route.planned", case_id=case.case_id, allowed=len(routes), blocked=len(blocked))

View File

@@ -10,14 +10,15 @@ from __future__ import annotations
import csv
import io
from datetime import datetime, timezone
from typing import Callable, Dict, Iterable, List, Optional
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Dict, Iterable, List, Optional
from urllib.parse import urlparse
import httpx
from psyc import log
from psyc.models import Case, Observables
from psyc.models import Case, IncidentType, Observables
USER_AGENT = "psyc/0.1 (defensive CTI; hackathon prototype)"
@@ -26,17 +27,41 @@ HTTP_TIMEOUT = 30.0
URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/"
CISA_KEV_JSON = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
FEODO_BLOCKLIST_JSON = "https://feodotracker.abuse.ch/downloads/ipblocklist.json"
THREATFOX_API = "https://threatfox-api.abuse.ch/api/v1/"
MALWARE_BAZAAR_API = "https://mb-api.abuse.ch/api/v1/"
OTX_PULSES_API = "https://otx.alienvault.com/api/v1/pulses/subscribed"
_log = log.get(__name__)
def _http_get(url: str) -> httpx.Response:
with httpx.Client(timeout=HTTP_TIMEOUT, headers={"User-Agent": USER_AGENT}, follow_redirects=True) as client:
resp = client.get(url)
def _http(
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
json_body: Optional[Dict[str, Any]] = None,
form_body: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
timeout: float = HTTP_TIMEOUT,
) -> httpx.Response:
h = {"User-Agent": USER_AGENT}
if headers:
h.update(headers)
with httpx.Client(timeout=timeout, headers=h, follow_redirects=True) as client:
if method.upper() == "POST":
if form_body is not None:
resp = client.post(url, data=form_body, params=params)
else:
resp = client.post(url, json=json_body, params=params)
else:
resp = client.get(url, params=params)
resp.raise_for_status()
return resp
def _http_get(url: str) -> httpx.Response:
return _http("GET", url)
def _parse_dt(value: str, fmt: str) -> datetime:
try:
return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc)
@@ -142,12 +167,221 @@ def _fetch_feodo() -> List[Case]:
return [_feodo_record_to_case(r) for r in data]
# --- ThreatFox — multi-malware IOC feed (abuse.ch) -----------------------
# ThreatFox threat_type values → psyc IncidentType.
THREATFOX_THREAT_TYPE: Dict[str, IncidentType] = {
"botnet_cc": IncidentType.BOTNET,
"payload_delivery": IncidentType.MALWARE,
"payload": IncidentType.MALWARE,
"phishing": IncidentType.PHISHING,
}
def _threatfox_row_to_case(r: Dict[str, Any]) -> Optional[Case]:
# API field is `ioc` (the `_value` alias is older docs); date is `first_seen`.
ioc_value = str(r.get("ioc") or r.get("ioc_value") or "").strip()
ioc_type = str(r.get("ioc_type") or "").lower()
if not ioc_value or not ioc_type:
return None
malware = str(r.get("malware_printable") or r.get("malware") or "unknown")
threat_type = str(r.get("threat_type") or "")
tags_raw = r.get("tags") or []
tags = tags_raw if isinstance(tags_raw, list) else []
obs = Observables()
host = ""
if ioc_type in ("ip:port", "ipv4", "ipv6"):
ip = ioc_value.split(":")[0]
obs.ips = [ip]
elif ioc_type == "domain":
obs.domains = [ioc_value]
host = ioc_value
elif ioc_type == "url":
obs.urls = [ioc_value]
host = urlparse(ioc_value).hostname or ""
if host:
obs.domains = [host]
elif ioc_type in ("sha256_hash", "md5_hash", "sha1_hash"):
obs.hashes = [ioc_value]
else:
return None
threat_label = threat_type.replace("_", " ") or "malware"
summary = f"ThreatFox: {malware} {threat_label}{ioc_value}"
first_seen = str(r.get("first_seen") or r.get("first_seen_utc") or "")
return Case(
case_id=f"PSYC-THREATFOX-{r.get('id', '')}",
summary=summary,
source_type="abuse_feed",
source_ref=str(r.get("reference") or f"https://threatfox.abuse.ch/ioc/{r.get('id', '')}/"),
source_metadata=dict(
feed="threatfox",
malware=malware,
malware_malpedia=str(r.get("malware_malpedia") or ""),
threat_type=threat_type,
threat_type_desc=str(r.get("threat_type_desc") or ""),
ioc_type=ioc_type,
confidence_level=str(r.get("confidence_level", "")),
tags=",".join(t for t in tags if t),
reporter=str(r.get("reporter", "")),
),
observed_at=_parse_dt(first_seen, "%Y-%m-%d %H:%M:%S"),
observables=obs,
)
def _fetch_threatfox() -> List[Case]:
key = os.environ.get("THREATFOX_AUTH_KEY", "").strip()
if not key:
raise RuntimeError("THREATFOX_AUTH_KEY not set — free abuse.ch auth-key from https://auth.abuse.ch/")
data = _http("POST", THREATFOX_API, headers={"Auth-Key": key}, json_body={"query": "get_iocs", "days": 1}).json()
rows = data.get("data") or []
out: List[Case] = []
for r in rows:
c = _threatfox_row_to_case(r)
if c is not None:
out.append(c)
return out
# --- MalwareBazaar — recent malware samples (abuse.ch) -------------------
def _mb_row_to_case(r: Dict[str, Any]) -> Optional[Case]:
sha256 = str(r.get("sha256_hash") or "")
if not sha256:
return None
sha1 = str(r.get("sha1_hash") or "")
md5 = str(r.get("md5_hash") or "")
file_name = str(r.get("file_name") or "unknown")
signature = str(r.get("signature") or "")
file_type = str(r.get("file_type") or "")
tags_raw = r.get("tags") or []
tags = tags_raw if isinstance(tags_raw, list) else []
hashes = [h for h in (sha256, sha1, md5) if h]
label = signature or "unsigned"
summary = f"MalwareBazaar: {label} {file_type} sample — {file_name}"
return Case(
case_id=f"PSYC-MBAZAAR-{sha256[:16]}",
summary=summary,
source_type="abuse_feed",
source_ref=f"https://bazaar.abuse.ch/sample/{sha256}/",
source_metadata=dict(
feed="malware-bazaar",
signature=signature,
file_type=file_type,
file_name=file_name,
tags=",".join(t for t in tags if t),
reporter=str(r.get("reporter", "")),
),
observed_at=_parse_dt(str(r.get("first_seen") or ""), "%Y-%m-%d %H:%M:%S"),
observables=Observables(hashes=hashes),
)
def _fetch_malware_bazaar() -> List[Case]:
key = os.environ.get("THREATFOX_AUTH_KEY", "").strip()
if not key:
raise RuntimeError("THREATFOX_AUTH_KEY not set — abuse.ch auth-key from https://auth.abuse.ch/ also covers MalwareBazaar")
# MalwareBazaar expects form-encoded body (unlike ThreatFox which takes JSON).
data = _http("POST", MALWARE_BAZAAR_API, headers={"Auth-Key": key}, form_body={"query": "get_recent", "selector": "100"}).json()
rows = data.get("data") or []
out: List[Case] = []
for r in rows:
c = _mb_row_to_case(r)
if c is not None:
out.append(c)
return out
# --- AlienVault OTX — curated multi-source pulses ------------------------
_OTX_IOC_LIMIT_PER_PULSE = 50
def _otx_pulse_to_case(p: Dict[str, Any]) -> Optional[Case]:
pulse_id = str(p.get("id") or "")
if not pulse_id:
return None
pulse_name = str(p.get("name") or "OTX pulse")
description = str(p.get("description") or "")
tags_raw = p.get("tags") or []
tags = tags_raw if isinstance(tags_raw, list) else []
tlp_pulse = str(p.get("tlp") or "white").upper()
indicators = p.get("indicators") or []
obs = Observables()
for ind in indicators[:_OTX_IOC_LIMIT_PER_PULSE]:
value = str(ind.get("indicator") or "").strip()
itype = str(ind.get("type") or "").lower()
if not value:
continue
if itype in ("ipv4", "ipv6"):
obs.ips.append(value)
elif itype in ("domain", "hostname"):
obs.domains.append(value)
elif itype == "url":
obs.urls.append(value)
host = urlparse(value).hostname or ""
if host and host not in obs.domains:
obs.domains.append(host)
elif itype in ("filehash-sha256", "filehash-sha1", "filehash-md5"):
obs.hashes.append(value)
elif itype == "cve":
obs.cves.append(value)
if not (obs.urls or obs.domains or obs.ips or obs.hashes or obs.cves):
return None
return Case(
case_id=f"PSYC-OTX-{pulse_id}",
summary=f"OTX: {pulse_name}",
source_type="threat_intel",
source_ref=f"https://otx.alienvault.com/pulse/{pulse_id}",
source_metadata=dict(
feed="otx",
pulse_name=pulse_name,
description=description[:2000],
tags=",".join(t for t in tags if t),
tlp_pulse=tlp_pulse,
),
observed_at=_parse_dt(str(p.get("created") or "").split(".")[0], "%Y-%m-%dT%H:%M:%S"),
observables=obs,
)
def _fetch_otx() -> List[Case]:
key = os.environ.get("OTX_API_KEY", "").strip()
if not key:
raise RuntimeError("OTX_API_KEY not set — free key at https://otx.alienvault.com → settings → API")
# OTX subscribes a new account to many curated feeds, so the unfiltered
# /pulses/subscribed page can 504 on its own backend. modified_since
# narrows to recent pulses; page size 20 caps the response.
since = (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%S")
data = _http(
"GET", OTX_PULSES_API,
headers={"X-OTX-API-KEY": key},
params={"limit": 20, "modified_since": since},
timeout=120.0,
).json()
pulses = data.get("results") or []
out: List[Case] = []
for p in pulses:
c = _otx_pulse_to_case(p)
if c is not None:
out.append(c)
return out
# --- registry + dispatch -------------------------------------------------
SOURCES: Dict[str, Callable[[], List[Case]]] = {
"urlhaus": _fetch_urlhaus,
"cisa-kev": _fetch_cisa_kev,
"feodo": _fetch_feodo,
"threatfox": _fetch_threatfox,
"malware-bazaar": _fetch_malware_bazaar,
"otx": _fetch_otx,
}

View File

@@ -15,6 +15,7 @@ restricted source types, never empty input/output.
from __future__ import annotations
import json
import random
import re
from datetime import datetime, timezone
from pathlib import Path
@@ -24,10 +25,18 @@ from pydantic import BaseModel, Field
from psyc import DATA_DIR, log
from psyc.lines import classify as classify_line
from psyc.lines import defang as defang_line
from psyc.lines import route as route_line
from psyc.models import Case, TLP
class BuildOptions(BaseModel):
"""Per-build configuration. Currently only ioc_extraction reads any field."""
defang_frac: float = 0.0 # in [0.0, 1.0] — fraction of ioc_extraction inputs to defang
seed: Optional[int] = None # reproducible RNG when set
_log = log.get(__name__)
DATASETS_DIR = DATA_DIR / "datasets"
@@ -60,7 +69,11 @@ class DatasetReport(BaseModel):
# ---------- ExampleBuilder per task ---------------------------------------
def _ex_ioc_extraction(case: Case) -> Optional[Example]:
def _ex_ioc_extraction(
case: Case,
options: Optional["BuildOptions"] = None,
rng: Optional[random.Random] = None,
) -> Optional[Example]:
obs = case.observables
if not (obs.urls or obs.domains or obs.ips or obs.hashes or obs.cves):
return None
@@ -81,6 +94,13 @@ def _ex_ioc_extraction(case: Case) -> Optional[Example]:
body.append("Related CVEs: " + ", ".join(obs.cves) + ".")
if tags:
body.append(f"Tags: {tags}.")
body_text = " ".join(body)
# Defanging augmentation: with probability options.defang_frac, replace IOCs
# in the input with common real-world defanged forms (1[.]2[.]3[.]4,
# hxxp://, etc.). Output stays canonical so the model learns the mapping.
if options is not None and rng is not None and options.defang_frac > 0.0:
if rng.random() < options.defang_frac:
body_text = defang_line.defang_text(body_text, obs.ips, obs.domains, obs.urls, rng)
output_obj = {
"urls": obs.urls,
"domains": obs.domains,
@@ -90,7 +110,7 @@ def _ex_ioc_extraction(case: Case) -> Optional[Example]:
}
return Example(
instruction="Extract all indicators of compromise from the advisory and return JSON with keys: urls, domains, ips, hashes, cves.",
input=" ".join(body),
input=body_text,
output=json.dumps(output_obj, ensure_ascii=False),
task="ioc_extraction",
case_id=case.case_id,
@@ -119,7 +139,11 @@ def severity_features(case: Case) -> Dict[str, object]:
}
def _ex_severity_classification(case: Case) -> Optional[Example]:
def _ex_severity_classification(
case: Case,
options: Optional["BuildOptions"] = None,
rng: Optional[random.Random] = None,
) -> Optional[Example]:
if case.classification.severity is None:
return None
return Example(
@@ -132,7 +156,11 @@ def _ex_severity_classification(case: Case) -> Optional[Example]:
)
def _ex_routing_decision(case: Case) -> Optional[Example]:
def _ex_routing_decision(
case: Case,
options: Optional["BuildOptions"] = None,
rng: Optional[random.Random] = None,
) -> Optional[Example]:
if case.classification.incident_type is None:
return None
routes, blocked = route_line.plan(case)
@@ -158,7 +186,11 @@ def _ex_routing_decision(case: Case) -> Optional[Example]:
)
def _ex_tlp_assignment(case: Case) -> Optional[Example]:
def _ex_tlp_assignment(
case: Case,
options: Optional["BuildOptions"] = None,
rng: Optional[random.Random] = None,
) -> Optional[Example]:
input_obj = {
"source_type": case.source_type,
"incident_type": case.classification.incident_type.value if case.classification.incident_type else None,
@@ -217,10 +249,12 @@ def _next_version(task: str) -> int:
return (max(used) + 1) if used else 1
def build(task: str, cases: Iterable[Case]) -> DatasetReport:
def build(task: str, cases: Iterable[Case], options: Optional[BuildOptions] = None) -> DatasetReport:
if task not in _BUILDERS:
raise ValueError(f"unknown task: {task}; choices: {sorted(_BUILDERS)}")
builder = _BUILDERS[task]
options = options or BuildOptions()
rng = random.Random(options.seed)
version = _next_version(task)
path = DATASETS_DIR / f"{task}-v{version}.jsonl"
written = 0
@@ -230,7 +264,7 @@ def build(task: str, cases: Iterable[Case]) -> DatasetReport:
skipped_empty = 0
with path.open("w", encoding="utf-8") as fh:
for case in cases:
example = builder(case)
example = builder(case, options, rng)
if example is None:
skipped_empty += 1
continue

View File

@@ -133,6 +133,7 @@ class Outcome(str, Enum):
REJECTED = "rejected"
ACTIONED = "actioned"
FAILED = "failed"
PENDING_APPROVAL = "pending_approval"
class LedgerEntry(BaseModel):
@@ -146,3 +147,24 @@ class LedgerEntry(BaseModel):
response_id: Optional[str] = None
outcome: Outcome
detail: Optional[str] = None
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
class PendingSubmission(BaseModel):
id: Optional[int] = None
case_id: str
destination_name: str
payload_kind: str
payload_hash: str
payload_json: str # frozen payload — what will be sent on approval
tlp: TLP
created_at: datetime
status: ApprovalStatus = ApprovalStatus.PENDING
reviewer: Optional[str] = None
reviewed_at: Optional[datetime] = None
reason: Optional[str] = None

View File

@@ -57,3 +57,26 @@ def test_classify_is_idempotent():
first = case.classification.model_copy(deep=True)
classify(case)
assert case.classification == first
def test_threatfox_botnet_cc_is_botnet():
case = make_case(feed="threatfox", ips=["1.2.3.4"])
case.source_metadata["threat_type"] = "botnet_cc"
assert classify(case).classification.incident_type is IncidentType.BOTNET
def test_threatfox_payload_delivery_is_malware():
case = make_case(feed="threatfox", urls=["http://1.2.3.4/x.bin"])
case.source_metadata["threat_type"] = "payload_delivery"
assert classify(case).classification.incident_type is IncidentType.MALWARE
def test_threatfox_phishing_threat_type_is_phishing():
case = make_case(feed="threatfox", urls=["http://login.bad/example"])
case.source_metadata["threat_type"] = "phishing"
assert classify(case).classification.incident_type is IncidentType.PHISHING
def test_malware_bazaar_is_malware():
case = make_case(feed="malware-bazaar", hashes=["a" * 64])
assert classify(case).classification.incident_type is IncidentType.MALWARE

130
tests/test_courier.py Normal file
View File

@@ -0,0 +1,130 @@
"""Courier approval-queue tests — gating, dispatch, rejection."""
from __future__ import annotations
import pytest
from sqlalchemy import create_engine, select
from psyc import db
from psyc.lines import courier, ledger as ledger_line
from psyc.lines.route import Route
from psyc.models import ApprovalStatus, Outcome, TLP
from psyc.result import Ok
from conftest import make_case
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
test_engine = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(test_engine, checkfirst=True)
monkeypatch.setattr(db, "_engine", test_engine)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
def _malware_url_route(requires_approval: bool) -> Route:
return Route(
destination_name="URLhaus",
priority=3,
payload_kind="malware_url_report",
max_tlp_allowed=TLP.GREEN,
requires_approval=requires_approval,
)
def test_execute_routes_enqueues_when_approval_required(fresh_db, monkeypatch):
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
case.classification.tlp = TLP.GREEN
# No HTTP should happen — submit must NOT be called on the approval branch.
def boom(*a, **kw):
raise AssertionError("submit() must not be called when approval is required")
monkeypatch.setattr(courier, "submit", boom)
results = courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
assert len(results) == 1
assert results[0].outcome is Outcome.PENDING_APPROVAL
pending = courier.list_pending()
assert len(pending) == 1
assert pending[0].destination_name == "URLhaus"
assert pending[0].status is ApprovalStatus.PENDING
assert pending[0].payload_hash # frozen hash present
assert pending[0].payload_json # frozen payload present
def test_execute_routes_force_approval_via_env(fresh_db, monkeypatch):
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
case.classification.tlp = TLP.GREEN
monkeypatch.setenv("PSYC_REQUIRE_APPROVAL", "1")
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("must not submit")))
results = courier.execute_routes(case, [_malware_url_route(requires_approval=False)])
assert results[0].outcome is Outcome.PENDING_APPROVAL
assert courier.pending_count() == 1
def test_dispatch_pending_submits_and_marks_approved(fresh_db, monkeypatch):
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
case.classification.tlp = TLP.GREEN
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
pid = courier.list_pending()[0].id
# Now approve — submit IS called, returns a receipt.
receipt = courier.Receipt(receipt_id="r-001", destination="urlhaus", status="acknowledged", response_body={})
monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt))
result = courier.dispatch_pending(pid, reviewer="alice")
assert isinstance(result, Ok)
assert result.value.outcome is Outcome.ACKNOWLEDGED
assert result.value.receipt_id == "r-001"
# Pending row now marked approved.
refreshed = courier.get_pending(pid).value
assert refreshed.status is ApprovalStatus.APPROVED
assert refreshed.reviewer == "alice"
# Ledger has a corresponding row.
entries = ledger_line.list_by_case(case.case_id, limit=10)
assert any(e.outcome is Outcome.ACKNOWLEDGED and e.destination == "URLhaus" for e in entries)
def test_reject_pending_writes_rejection_to_ledger(fresh_db, monkeypatch):
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
case.classification.tlp = TLP.GREEN
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
pid = courier.list_pending()[0].id
# Reject — submit must NOT be called.
def boom(*a, **kw):
raise AssertionError("rejected submissions must not POST")
monkeypatch.setattr(courier, "submit", boom)
result = courier.reject_pending(pid, reviewer="bob", reason="payload looks wrong")
assert isinstance(result, Ok)
refreshed = courier.get_pending(pid).value
assert refreshed.status is ApprovalStatus.REJECTED
assert refreshed.reviewer == "bob"
assert refreshed.reason == "payload looks wrong"
entries = ledger_line.list_by_case(case.case_id, limit=10)
assert any(e.outcome is Outcome.REJECTED and "rejected_by=bob" in (e.detail or "") for e in entries)
def test_double_approve_is_rejected(fresh_db, monkeypatch):
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
case.classification.tlp = TLP.GREEN
monkeypatch.setattr(courier, "submit", lambda *a, **kw: (_ for _ in ()).throw(AssertionError("queue phase")))
courier.execute_routes(case, [_malware_url_route(requires_approval=True)])
pid = courier.list_pending()[0].id
receipt = courier.Receipt(receipt_id="r-002", destination="urlhaus", status="acknowledged", response_body={})
monkeypatch.setattr(courier, "submit", lambda *a, **kw: Ok(receipt))
courier.dispatch_pending(pid, reviewer="alice")
# Second approval of the same id must fail — no double-submission.
again = courier.dispatch_pending(pid, reviewer="alice")
assert not isinstance(again, Ok)

71
tests/test_defang.py Normal file
View File

@@ -0,0 +1,71 @@
"""Defanging — IOC obfuscation styles for training-data augmentation."""
from __future__ import annotations
import json
import random
from psyc.lines.defang import defang_domain, defang_ip, defang_text, defang_url
from psyc.lines.train import BuildOptions, _ex_ioc_extraction
from conftest import make_case
def test_defang_ip_breaks_canonical_form():
out = defang_ip("1.2.3.4", random.Random(0))
assert "1.2.3.4" not in out # canonical IP substring no longer appears
assert "1" in out and "4" in out # digits preserved
assert any(form in out for form in ("[.]", "(.)", "[dot]", "{.}"))
def test_defang_domain_preserves_label_text():
out = defang_domain("evil.example.com", random.Random(1))
assert "evil" in out and "example" in out and "com" in out
assert "evil.example.com" not in out # canonical domain broken
def test_defang_url_defangs_protocol_and_breaks_canonical_form():
out = defang_url("http://evil.example.com/payload.bin", random.Random(2))
assert out.startswith("hxxp://") # protocol defanged
assert "http://" not in out
assert "evil.example.com" not in out # host part defanged
def test_defang_url_handles_https():
assert defang_url("https://evil.com/x", random.Random(0)).startswith("hxxps://")
def test_defang_text_substitutes_every_listed_ioc():
text = "See URL http://1.2.3.4/x and IP 1.2.3.4 and domain evil.com please."
out = defang_text(text, ips=["1.2.3.4"], domains=["evil.com"], urls=["http://1.2.3.4/x"], rng=random.Random(3))
# No canonical IOC string should remain anywhere in the corrupted body.
assert "http://" not in out
assert "1.2.3.4" not in out
assert "evil.com" not in out
# Surrounding prose is preserved.
assert "See URL" in out and "please" in out
def test_ioc_extraction_with_defang_frac_1_corrupts_input_only():
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], domains=["1.2.3.4"], ips=["1.2.3.4"])
options = BuildOptions(defang_frac=1.0, seed=42)
rng = random.Random(options.seed)
ex = _ex_ioc_extraction(case, options, rng)
assert ex is not None
# Input has been defanged.
assert "1.2.3.4" not in ex.input
assert "http://" not in ex.input
# Output stays canonical so the model learns the inverse mapping.
output = json.loads(ex.output)
assert "1.2.3.4" in output["ips"]
assert "http://1.2.3.4/x" in output["urls"]
def test_ioc_extraction_with_defang_frac_0_is_canonical():
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
options = BuildOptions(defang_frac=0.0, seed=0)
rng = random.Random(0)
ex = _ex_ioc_extraction(case, options, rng)
assert ex is not None
# No defanging → input keeps the canonical IOCs.
assert "http://1.2.3.4/x" in ex.input
assert "1.2.3.4" in ex.input

View File

@@ -2,7 +2,14 @@
from __future__ import annotations
from psyc.lines.scout import _feodo_record_to_case, _kev_vuln_to_case, _parse_urlhaus_csv
from psyc.lines.scout import (
_feodo_record_to_case,
_kev_vuln_to_case,
_mb_row_to_case,
_otx_pulse_to_case,
_parse_urlhaus_csv,
_threatfox_row_to_case,
)
URLHAUS_CSV = """\
# comment line
@@ -47,3 +54,96 @@ def test_feodo_record_to_case():
assert case.source_metadata["feed"] == "feodo"
assert case.source_metadata["malware"] == "Emotet"
assert case.source_metadata["status"] == "online"
def test_threatfox_row_url_to_case():
row = {
"id": "1234567",
"ioc": "http://1.2.3.4/x.bin",
"ioc_type": "url",
"threat_type": "payload_delivery",
"malware_printable": "Cobalt Strike",
"first_seen": "2026-05-19 10:00:00",
"confidence_level": 100,
"tags": ["c2", "stager"],
"reporter": "anon",
}
case = _threatfox_row_to_case(row)
assert case is not None
assert case.case_id == "PSYC-THREATFOX-1234567"
assert case.observables.urls == ["http://1.2.3.4/x.bin"]
assert case.observables.domains == ["1.2.3.4"]
assert case.source_metadata["feed"] == "threatfox"
assert case.source_metadata["malware"] == "Cobalt Strike"
assert case.source_metadata["threat_type"] == "payload_delivery"
def test_threatfox_row_ip_port_to_case():
row = {
"id": "9999",
"ioc": "5.6.7.8:443",
"ioc_type": "ip:port",
"threat_type": "botnet_cc",
"malware_printable": "Qakbot",
"first_seen": "2026-05-18 10:00:00",
}
case = _threatfox_row_to_case(row)
assert case is not None
assert case.observables.ips == ["5.6.7.8"] # port stripped
def test_threatfox_row_rejects_unknown_type():
assert _threatfox_row_to_case({"id": "1", "ioc": "x", "ioc_type": "ja3_fp"}) is None
def test_malware_bazaar_row_to_case():
row = {
"sha256_hash": "a" * 64,
"sha1_hash": "b" * 40,
"md5_hash": "c" * 32,
"file_name": "invoice.exe",
"signature": "AgentTesla",
"file_type": "exe",
"first_seen": "2026-05-19 10:00:00",
"tags": ["RAT", "stealer"],
}
case = _mb_row_to_case(row)
assert case is not None
assert case.case_id == "PSYC-MBAZAAR-" + "a" * 16
assert case.observables.hashes == ["a" * 64, "b" * 40, "c" * 32]
assert case.source_metadata["feed"] == "malware-bazaar"
assert case.source_metadata["signature"] == "AgentTesla"
def test_otx_pulse_to_case_multi_indicator():
pulse = {
"id": "pulse-abc",
"name": "APT-X campaign Q2 2026",
"description": "Threat actor APT-X distributed Cobalt Strike via spear-phishing emails targeting EU energy firms. The following indicators were recovered:",
"created": "2026-05-15T12:00:00.000000",
"tlp": "white",
"tags": ["apt-x", "energy"],
"indicators": [
{"indicator": "1.2.3.4", "type": "IPv4"},
{"indicator": "evil.example", "type": "domain"},
{"indicator": "http://evil.example/payload.bin", "type": "URL"},
{"indicator": "d" * 64, "type": "FileHash-SHA256"},
{"indicator": "CVE-2026-1111", "type": "CVE"},
{"indicator": "irrelevant", "type": "Mutex"}, # ignored
],
}
case = _otx_pulse_to_case(pulse)
assert case is not None
assert case.case_id == "PSYC-OTX-pulse-abc"
assert case.observables.ips == ["1.2.3.4"]
assert "evil.example" in case.observables.domains
assert case.observables.urls == ["http://evil.example/payload.bin"]
assert case.observables.hashes == ["d" * 64]
assert case.observables.cves == ["CVE-2026-1111"]
assert "APT-X" in case.source_metadata["description"]
assert case.source_metadata["feed"] == "otx"
def test_otx_pulse_skips_when_no_recognized_indicators():
pulse = {"id": "p1", "name": "x", "description": "", "indicators": [{"indicator": "x", "type": "Mutex"}]}
assert _otx_pulse_to_case(pulse) is None