From f6fa52839f7b5ecefcc08d3fffc5ab5f756007ab Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Wed, 20 May 2026 22:33:52 +0200 Subject: [PATCH] stage-20: defanging pipeline for IOC-extraction augmentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real CTI prose defangs IOCs (1[.]2[.]3[.]4, hxxp://, evil[dot]com) so they don't auto-link in email/chat. A model trained only on canonical inputs will fail to extract them. New lines/defang.py: defang_ip, defang_domain, defang_url, defang_text — four dot-styles ([.], (.), [dot], {.}) plus protocol defanging (http→hxxp, https→hxxps). Each occurrence picks its style independently since real advisories don't keep one style across paragraphs. train.BuildOptions adds defang_frac (default 0.0) and seed; build() threads options + a seeded Random through the example builders so the augmentation is reproducible. Only _ex_ioc_extraction reads it today — output stays canonical so the model learns messy→canonical. CLI: train-build and train-build-all gain --defang-frac and --seed. 8 new tests including a frac=1.0 / output-canonical integration check. The pipeline runs but is dormant at defang_frac=0.0 — psyc-v5 dataset build will set 0.5 once OTX cases land. Co-Authored-By: Claude Opus 4.7 --- src/psyc/cli.py | 16 ++++++--- src/psyc/lines/defang.py | 73 ++++++++++++++++++++++++++++++++++++++++ src/psyc/lines/train.py | 48 ++++++++++++++++++++++---- tests/test_defang.py | 71 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 197 insertions(+), 11 deletions(-) create mode 100644 src/psyc/lines/defang.py create mode 100644 tests/test_defang.py diff --git a/src/psyc/cli.py b/src/psyc/cli.py index 995b9e8..60baed7 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import List +from typing import List, Optional import typer import uvicorn @@ -366,12 +366,15 @@ def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None: def train_build( task: str = typer.Option(..., "--task", "-t", help=f"one of: {', '.join(train.TASKS)}"), limit: int = typer.Option(10_000, help="max cases to process"), + defang_frac: float = typer.Option(0.0, "--defang-frac", help="fraction of ioc_extraction inputs to defang ([0.0, 1.0])"), + seed: Optional[int] = typer.Option(None, "--seed", help="rng seed for reproducible defanging"), ) -> None: if task not in train.TASKS: typer.echo(f"unknown task: {task}; choices: {', '.join(train.TASKS)}", err=True) raise typer.Exit(1) cases = db.list_cases(limit=limit) - report = train.build(task, cases) + options = train.BuildOptions(defang_frac=defang_frac, seed=seed) + report = train.build(task, cases, options=options) typer.echo(f"task: {report.task}") typer.echo(f"path: {report.path}") typer.echo(f" written: {report.written}") @@ -382,10 +385,15 @@ def train_build( @app.command("train-build-all") -def train_build_all(limit: int = typer.Option(10_000, help="max cases per task")) -> None: +def train_build_all( + limit: int = typer.Option(10_000, help="max cases per task"), + defang_frac: float = typer.Option(0.0, "--defang-frac", help="fraction of ioc_extraction inputs to defang ([0.0, 1.0])"), + seed: Optional[int] = typer.Option(None, "--seed", help="rng seed for reproducible defanging"), +) -> None: cases = db.list_cases(limit=limit) + options = train.BuildOptions(defang_frac=defang_frac, seed=seed) for task in train.TASKS: - report = train.build(task, cases) + report = train.build(task, cases, options=options) typer.echo(f" {task}: wrote {report.written} → {report.path.name}") diff --git a/src/psyc/lines/defang.py b/src/psyc/lines/defang.py new file mode 100644 index 0000000..27a79a6 --- /dev/null +++ b/src/psyc/lines/defang.py @@ -0,0 +1,73 @@ +"""Defanging — IOC obfuscation styles common in real CTI prose. + +Real advisories don't write `1.2.3.4` and `http://evil.com` verbatim; they +defang IOCs into bracket/paren/word forms (`1[.]2[.]3[.]4`, `hxxp://evil[.]com`) +so indicators don't auto-link in email/chat clients. Training the IOC extractor +purely on canonical inputs leaves it brittle. This module corrupts canonical +IOCs into common defanged forms for use as training-time data augmentation. +""" + +from __future__ import annotations + +import random +from typing import List, Optional + + +# Dot replacement styles seen in the wild, in rough frequency order. +_DOT_FORMS = ("[.]", "(.)", "[dot]", "{.}") + +_PROTOCOL_FORMS = { + "http://": "hxxp://", + "https://": "hxxps://", +} + + +def _rng(r: Optional[random.Random]) -> random.Random: + return r if r is not None else random.Random() + + +def defang_ip(ip: str, rng: Optional[random.Random] = None) -> str: + """`1.2.3.4` → `1[.]2[.]3[.]4` (one randomly chosen dot style).""" + return ip.replace(".", _rng(rng).choice(_DOT_FORMS)) + + +def defang_domain(domain: str, rng: Optional[random.Random] = None) -> str: + """`evil.com` → `evil[.]com`.""" + return domain.replace(".", _rng(rng).choice(_DOT_FORMS)) + + +def defang_url(url: str, rng: Optional[random.Random] = None) -> str: + """`http://evil.com/x` → `hxxp://evil[.]com/x` — protocol + dot defanging.""" + r = _rng(rng) + out = url + for proto, replacement in _PROTOCOL_FORMS.items(): + if out.startswith(proto): + out = replacement + out[len(proto):] + break + out = out.replace(".", r.choice(_DOT_FORMS)) + return out + + +def defang_text( + text: str, + ips: List[str], + domains: List[str], + urls: List[str], + rng: Optional[random.Random] = None, +) -> str: + """Defang every occurrence of the given IOCs inside a free-text body. + + URLs are replaced before domains (URLs contain domain substrings, so + domain-first would corrupt the URL match). Likewise IPs last. Each + occurrence picks its own dot-style independently — real advisories don't + keep one style consistent across paragraphs. + """ + r = _rng(rng) + out = text + for u in sorted(set(urls), key=len, reverse=True): + out = out.replace(u, defang_url(u, r)) + for d in sorted(set(domains), key=len, reverse=True): + out = out.replace(d, defang_domain(d, r)) + for i in sorted(set(ips), key=len, reverse=True): + out = out.replace(i, defang_ip(i, r)) + return out diff --git a/src/psyc/lines/train.py b/src/psyc/lines/train.py index 49c4d4c..8e225a2 100644 --- a/src/psyc/lines/train.py +++ b/src/psyc/lines/train.py @@ -15,6 +15,7 @@ restricted source types, never empty input/output. from __future__ import annotations import json +import random import re from datetime import datetime, timezone from pathlib import Path @@ -24,10 +25,18 @@ from pydantic import BaseModel, Field from psyc import DATA_DIR, log from psyc.lines import classify as classify_line +from psyc.lines import defang as defang_line from psyc.lines import route as route_line from psyc.models import Case, TLP +class BuildOptions(BaseModel): + """Per-build configuration. Currently only ioc_extraction reads any field.""" + + defang_frac: float = 0.0 # in [0.0, 1.0] — fraction of ioc_extraction inputs to defang + seed: Optional[int] = None # reproducible RNG when set + + _log = log.get(__name__) DATASETS_DIR = DATA_DIR / "datasets" @@ -60,7 +69,11 @@ class DatasetReport(BaseModel): # ---------- ExampleBuilder per task --------------------------------------- -def _ex_ioc_extraction(case: Case) -> Optional[Example]: +def _ex_ioc_extraction( + case: Case, + options: Optional["BuildOptions"] = None, + rng: Optional[random.Random] = None, +) -> Optional[Example]: obs = case.observables if not (obs.urls or obs.domains or obs.ips or obs.hashes or obs.cves): return None @@ -81,6 +94,13 @@ def _ex_ioc_extraction(case: Case) -> Optional[Example]: body.append("Related CVEs: " + ", ".join(obs.cves) + ".") if tags: body.append(f"Tags: {tags}.") + body_text = " ".join(body) + # Defanging augmentation: with probability options.defang_frac, replace IOCs + # in the input with common real-world defanged forms (1[.]2[.]3[.]4, + # hxxp://, etc.). Output stays canonical so the model learns the mapping. + if options is not None and rng is not None and options.defang_frac > 0.0: + if rng.random() < options.defang_frac: + body_text = defang_line.defang_text(body_text, obs.ips, obs.domains, obs.urls, rng) output_obj = { "urls": obs.urls, "domains": obs.domains, @@ -90,7 +110,7 @@ def _ex_ioc_extraction(case: Case) -> Optional[Example]: } return Example( instruction="Extract all indicators of compromise from the advisory and return JSON with keys: urls, domains, ips, hashes, cves.", - input=" ".join(body), + input=body_text, output=json.dumps(output_obj, ensure_ascii=False), task="ioc_extraction", case_id=case.case_id, @@ -119,7 +139,11 @@ def severity_features(case: Case) -> Dict[str, object]: } -def _ex_severity_classification(case: Case) -> Optional[Example]: +def _ex_severity_classification( + case: Case, + options: Optional["BuildOptions"] = None, + rng: Optional[random.Random] = None, +) -> Optional[Example]: if case.classification.severity is None: return None return Example( @@ -132,7 +156,11 @@ def _ex_severity_classification(case: Case) -> Optional[Example]: ) -def _ex_routing_decision(case: Case) -> Optional[Example]: +def _ex_routing_decision( + case: Case, + options: Optional["BuildOptions"] = None, + rng: Optional[random.Random] = None, +) -> Optional[Example]: if case.classification.incident_type is None: return None routes, blocked = route_line.plan(case) @@ -158,7 +186,11 @@ def _ex_routing_decision(case: Case) -> Optional[Example]: ) -def _ex_tlp_assignment(case: Case) -> Optional[Example]: +def _ex_tlp_assignment( + case: Case, + options: Optional["BuildOptions"] = None, + rng: Optional[random.Random] = None, +) -> Optional[Example]: input_obj = { "source_type": case.source_type, "incident_type": case.classification.incident_type.value if case.classification.incident_type else None, @@ -217,10 +249,12 @@ def _next_version(task: str) -> int: return (max(used) + 1) if used else 1 -def build(task: str, cases: Iterable[Case]) -> DatasetReport: +def build(task: str, cases: Iterable[Case], options: Optional[BuildOptions] = None) -> DatasetReport: if task not in _BUILDERS: raise ValueError(f"unknown task: {task}; choices: {sorted(_BUILDERS)}") builder = _BUILDERS[task] + options = options or BuildOptions() + rng = random.Random(options.seed) version = _next_version(task) path = DATASETS_DIR / f"{task}-v{version}.jsonl" written = 0 @@ -230,7 +264,7 @@ def build(task: str, cases: Iterable[Case]) -> DatasetReport: skipped_empty = 0 with path.open("w", encoding="utf-8") as fh: for case in cases: - example = builder(case) + example = builder(case, options, rng) if example is None: skipped_empty += 1 continue diff --git a/tests/test_defang.py b/tests/test_defang.py new file mode 100644 index 0000000..3825439 --- /dev/null +++ b/tests/test_defang.py @@ -0,0 +1,71 @@ +"""Defanging — IOC obfuscation styles for training-data augmentation.""" + +from __future__ import annotations + +import json +import random + +from psyc.lines.defang import defang_domain, defang_ip, defang_text, defang_url +from psyc.lines.train import BuildOptions, _ex_ioc_extraction +from conftest import make_case + + +def test_defang_ip_breaks_canonical_form(): + out = defang_ip("1.2.3.4", random.Random(0)) + assert "1.2.3.4" not in out # canonical IP substring no longer appears + assert "1" in out and "4" in out # digits preserved + assert any(form in out for form in ("[.]", "(.)", "[dot]", "{.}")) + + +def test_defang_domain_preserves_label_text(): + out = defang_domain("evil.example.com", random.Random(1)) + assert "evil" in out and "example" in out and "com" in out + assert "evil.example.com" not in out # canonical domain broken + + +def test_defang_url_defangs_protocol_and_breaks_canonical_form(): + out = defang_url("http://evil.example.com/payload.bin", random.Random(2)) + assert out.startswith("hxxp://") # protocol defanged + assert "http://" not in out + assert "evil.example.com" not in out # host part defanged + + +def test_defang_url_handles_https(): + assert defang_url("https://evil.com/x", random.Random(0)).startswith("hxxps://") + + +def test_defang_text_substitutes_every_listed_ioc(): + text = "See URL http://1.2.3.4/x and IP 1.2.3.4 and domain evil.com please." + out = defang_text(text, ips=["1.2.3.4"], domains=["evil.com"], urls=["http://1.2.3.4/x"], rng=random.Random(3)) + # No canonical IOC string should remain anywhere in the corrupted body. + assert "http://" not in out + assert "1.2.3.4" not in out + assert "evil.com" not in out + # Surrounding prose is preserved. + assert "See URL" in out and "please" in out + + +def test_ioc_extraction_with_defang_frac_1_corrupts_input_only(): + case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], domains=["1.2.3.4"], ips=["1.2.3.4"]) + options = BuildOptions(defang_frac=1.0, seed=42) + rng = random.Random(options.seed) + ex = _ex_ioc_extraction(case, options, rng) + assert ex is not None + # Input has been defanged. + assert "1.2.3.4" not in ex.input + assert "http://" not in ex.input + # Output stays canonical so the model learns the inverse mapping. + output = json.loads(ex.output) + assert "1.2.3.4" in output["ips"] + assert "http://1.2.3.4/x" in output["urls"] + + +def test_ioc_extraction_with_defang_frac_0_is_canonical(): + case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) + options = BuildOptions(defang_frac=0.0, seed=0) + rng = random.Random(0) + ex = _ex_ioc_extraction(case, options, rng) + assert ex is not None + # No defanging → input keeps the canonical IOCs. + assert "http://1.2.3.4/x" in ex.input + assert "1.2.3.4" in ex.input