From bc61b9a3a17371e4f585d819036328314ef5e8b0 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Mon, 18 May 2026 23:31:13 +0200 Subject: [PATCH] =?UTF-8?q?stage-13:=20CVEResolver=20=E2=80=94=20cross-che?= =?UTF-8?q?ck=20cases=20against=20the=20CISA=20KEV=20catalog?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mapline gains kev_cve_set() (the known-exploited CVE set, derived from the already-ingested KEV cases) and resolve_cves() — flags any of a case's CVEs that are known-exploited and escalates a non-KEV case's severity to HIGH when one surfaces. Folded into map-case / map-all / demo. Honest limit: only KEV-sourced cases carry CVEs today, so the cross-check is largely self-referential until a CVE-bearing source or model extraction feeds CVEs into other cases — the escalation path is verified against a synthetic case. Co-Authored-By: Claude Opus 4.7 --- src/psyc/cli.py | 29 +++++++++++++++++++++-------- src/psyc/lines/map.py | 33 ++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/psyc/cli.py b/src/psyc/cli.py index b953400..0a6df27 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -114,22 +114,32 @@ def map_case(case_id: str) -> None: if isinstance(case_result, Err): typer.echo(f"error: {case_result.reason}", err=True) raise typer.Exit(1) + kev = map_line.kev_cve_set(db.list_cases(limit=10_000)) case = map_line.resolve(case_result.value) + case = map_line.resolve_cves(case, kev) db.upsert_case(case) - typer.echo(f"mapped {case.case_id}: country={case.victim.country or '—'} ips={case.observables.ips}") + line = f"mapped {case.case_id}: country={case.victim.country or '—'} ips={case.observables.ips}" + if case.source_metadata.get("kev_cves"): + line += f" · known-exploited CVEs: {case.source_metadata['kev_cves']}" + typer.echo(line) @app.command("map-all") def map_all(limit: int = typer.Option(50, help="max cases to process this run")) -> None: cases = db.list_cases(limit=limit) - resolved = 0 + kev = map_line.kev_cve_set(db.list_cases(limit=10_000)) + geo = 0 + kev_hits = 0 for c in cases: before = c.victim.country - mapped = map_line.resolve(c) - if mapped.victim.country != before: - db.upsert_case(mapped) - resolved += 1 - typer.echo(f"resolved {resolved} new country/ies across {len(cases)} case(s).") + c = map_line.resolve(c) + c = map_line.resolve_cves(c, kev) + db.upsert_case(c) + if c.victim.country != before: + geo += 1 + if c.source_metadata.get("kev_cves"): + kev_hits += 1 + typer.echo(f"mapped {len(cases)} case(s): {geo} geo-resolved, {kev_hits} with known-exploited CVEs.") @app.command("seal-keys-gen") @@ -309,9 +319,12 @@ def demo() -> None: case = proof.prove(case) db.upsert_case(case) typer.echo(f" + proved: confidence {case.confidence.level} · reliability {case.confidence.source_reliability}{case.confidence.information_credibility} · {case.confidence.freshness}") + kev = map_line.kev_cve_set(db.list_cases(limit=10_000)) case = map_line.resolve(case) + case = map_line.resolve_cves(case, kev) db.upsert_case(case) - typer.echo(f" + mapped: hosting country = {case.victim.country or '—'}") + kev_note = f" · KEV CVEs: {case.source_metadata['kev_cves']}" if case.source_metadata.get("kev_cves") else "" + typer.echo(f" + mapped: hosting country = {case.victim.country or '—'}{kev_note}") plaintext = case.model_dump_json().encode("utf-8") metadata = dict( case_id=case.case_id, diff --git a/src/psyc/lines/map.py b/src/psyc/lines/map.py index 7a69dfd..7189a4c 100644 --- a/src/psyc/lines/map.py +++ b/src/psyc/lines/map.py @@ -1,6 +1,6 @@ """Mapline — victim / actor / jurisdiction resolution. -Current worker: GeoResolver. Resolves a case's primary host to a country code +Workers: GeoResolver + CVEResolver. GeoResolver resolves a case's primary host to a country code via ip-api.com (free, no auth, 45 req/min). For malicious-infrastructure cases (URLhaus etc.) "victim.country" carries the hosting-country semantic; documented in psyc.lines.route's destination policy. @@ -9,13 +9,13 @@ documented in psyc.lines.route's destination policy. from __future__ import annotations import socket -from typing import Optional +from typing import Iterable, Optional, Set from urllib.parse import urlparse import httpx from psyc import log -from psyc.models import Case +from psyc.models import Case, Severity from psyc.result import Err, Ok, Result @@ -88,3 +88,30 @@ def _geoip_country(ip: str, timeout: float = 5.0) -> Result[str, str]: if not code: return Err("ip-api returned no countryCode") return Ok(code) + + +# --- CVEResolver — cross-check case CVEs against the CISA KEV catalog -------- + +def kev_cve_set(cases: Iterable[Case]) -> Set[str]: + """The set of CVE IDs carried by CISA KEV cases — the known-exploited catalog.""" + out: Set[str] = set() + for case in cases: + if case.source_metadata.get("feed") == "cisa-kev": + out.update(cve.upper() for cve in case.observables.cves) + return out + + +def resolve_cves(case: Case, kev_cves: Set[str]) -> Case: + """Flag any of the case's CVEs that are known-exploited; escalate if so.""" + if not case.observables.cves: + return case + hits = sorted(c for c in case.observables.cves if c.upper() in kev_cves) + if not hits: + return case + case.source_metadata["kev_cves"] = ",".join(hits) + # a known-exploited CVE surfacing on a non-KEV case is a real escalation + if case.source_metadata.get("feed") != "cisa-kev": + if case.classification.severity in (None, Severity.LOW, Severity.MEDIUM): + case.classification.severity = Severity.HIGH + _log.info("map.cve.kev_match", case_id=case.case_id, kev_cves=hits) + return case