stage-13: CVEResolver — cross-check cases against the CISA KEV catalog

Mapline gains kev_cve_set() (the known-exploited CVE set, derived from the
already-ingested KEV cases) and resolve_cves() — flags any of a case's CVEs
that are known-exploited and escalates a non-KEV case's severity to HIGH when
one surfaces. Folded into map-case / map-all / demo.

Honest limit: only KEV-sourced cases carry CVEs today, so the cross-check is
largely self-referential until a CVE-bearing source or model extraction feeds
CVEs into other cases — the escalation path is verified against a synthetic case.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-18 23:31:13 +02:00
parent 94e17d4452
commit bc61b9a3a1
2 changed files with 51 additions and 11 deletions

View File

@@ -114,22 +114,32 @@ def map_case(case_id: str) -> None:
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
kev = map_line.kev_cve_set(db.list_cases(limit=10_000))
case = map_line.resolve(case_result.value)
case = map_line.resolve_cves(case, kev)
db.upsert_case(case)
typer.echo(f"mapped {case.case_id}: country={case.victim.country or ''} ips={case.observables.ips}")
line = f"mapped {case.case_id}: country={case.victim.country or ''} ips={case.observables.ips}"
if case.source_metadata.get("kev_cves"):
line += f" · known-exploited CVEs: {case.source_metadata['kev_cves']}"
typer.echo(line)
@app.command("map-all")
def map_all(limit: int = typer.Option(50, help="max cases to process this run")) -> None:
cases = db.list_cases(limit=limit)
resolved = 0
kev = map_line.kev_cve_set(db.list_cases(limit=10_000))
geo = 0
kev_hits = 0
for c in cases:
before = c.victim.country
mapped = map_line.resolve(c)
if mapped.victim.country != before:
db.upsert_case(mapped)
resolved += 1
typer.echo(f"resolved {resolved} new country/ies across {len(cases)} case(s).")
c = map_line.resolve(c)
c = map_line.resolve_cves(c, kev)
db.upsert_case(c)
if c.victim.country != before:
geo += 1
if c.source_metadata.get("kev_cves"):
kev_hits += 1
typer.echo(f"mapped {len(cases)} case(s): {geo} geo-resolved, {kev_hits} with known-exploited CVEs.")
@app.command("seal-keys-gen")
@@ -309,9 +319,12 @@ def demo() -> None:
case = proof.prove(case)
db.upsert_case(case)
typer.echo(f" + proved: confidence {case.confidence.level} · reliability {case.confidence.source_reliability}{case.confidence.information_credibility} · {case.confidence.freshness}")
kev = map_line.kev_cve_set(db.list_cases(limit=10_000))
case = map_line.resolve(case)
case = map_line.resolve_cves(case, kev)
db.upsert_case(case)
typer.echo(f" + mapped: hosting country = {case.victim.country or ''}")
kev_note = f" · KEV CVEs: {case.source_metadata['kev_cves']}" if case.source_metadata.get("kev_cves") else ""
typer.echo(f" + mapped: hosting country = {case.victim.country or ''}{kev_note}")
plaintext = case.model_dump_json().encode("utf-8")
metadata = dict(
case_id=case.case_id,

View File

@@ -1,6 +1,6 @@
"""Mapline — victim / actor / jurisdiction resolution.
Current worker: GeoResolver. Resolves a case's primary host to a country code
Workers: GeoResolver + CVEResolver. GeoResolver resolves a case's primary host to a country code
via ip-api.com (free, no auth, 45 req/min). For malicious-infrastructure
cases (URLhaus etc.) "victim.country" carries the hosting-country semantic;
documented in psyc.lines.route's destination policy.
@@ -9,13 +9,13 @@ documented in psyc.lines.route's destination policy.
from __future__ import annotations
import socket
from typing import Optional
from typing import Iterable, Optional, Set
from urllib.parse import urlparse
import httpx
from psyc import log
from psyc.models import Case
from psyc.models import Case, Severity
from psyc.result import Err, Ok, Result
@@ -88,3 +88,30 @@ def _geoip_country(ip: str, timeout: float = 5.0) -> Result[str, str]:
if not code:
return Err("ip-api returned no countryCode")
return Ok(code)
# --- CVEResolver — cross-check case CVEs against the CISA KEV catalog --------
def kev_cve_set(cases: Iterable[Case]) -> Set[str]:
"""The set of CVE IDs carried by CISA KEV cases — the known-exploited catalog."""
out: Set[str] = set()
for case in cases:
if case.source_metadata.get("feed") == "cisa-kev":
out.update(cve.upper() for cve in case.observables.cves)
return out
def resolve_cves(case: Case, kev_cves: Set[str]) -> Case:
"""Flag any of the case's CVEs that are known-exploited; escalate if so."""
if not case.observables.cves:
return case
hits = sorted(c for c in case.observables.cves if c.upper() in kev_cves)
if not hits:
return case
case.source_metadata["kev_cves"] = ",".join(hits)
# a known-exploited CVE surfacing on a non-KEV case is a real escalation
if case.source_metadata.get("feed") != "cisa-kev":
if case.classification.severity in (None, Severity.LOW, Severity.MEDIUM):
case.classification.severity = Severity.HIGH
_log.info("map.cve.kev_match", case_id=case.case_id, kev_cves=hits)
return case