From 2138611fdb42c5ac9312bc6b82c71264ca7253ce Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Sun, 17 May 2026 23:42:13 +0200 Subject: [PATCH] =?UTF-8?q?stage-4:=20multi-source=20Scoutline=20=E2=80=94?= =?UTF-8?q?=20CISA=20KEV=20+=20Feodo=20Tracker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Scoutline is now a source registry: urlhaus, cisa-kev, feodo. CISA KEV brings exploit/CVE cases, Feodo Tracker brings botnet C2 cases — real incident-type variety beyond URLhaus's malware monotone. Classifyline is source-aware (feed tag → incident type; ransomware-flagged KEV → critical). CLI gains fetch-cisa-kev, fetch-feodo, fetch-all. Both new feeds are keyless public download feeds (verified). Co-Authored-By: Claude Opus 4.7 --- README.md | 2 +- src/psyc/cli.py | 32 ++++++-- src/psyc/lines/classify.py | 29 ++++++-- src/psyc/lines/route.py | 2 +- src/psyc/lines/scout.py | 147 +++++++++++++++++++++++++++---------- 5 files changed, 157 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index b6033da..556bed1 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ python3 -m virtualenv .venv .venv/bin/pip install -e . .venv/bin/psyc init # create the sqlite db -.venv/bin/psyc fetch-urlhaus --limit 50 # ingest a URLhaus pass +.venv/bin/psyc fetch-all # ingest URLhaus + CISA KEV + Feodo Tracker .venv/bin/psyc serve --port 8767 # cockpit at http://127.0.0.1:8767 .venv/bin/psyc status # count of ingested cases ``` diff --git a/src/psyc/cli.py b/src/psyc/cli.py index 5a0615c..899724e 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -30,14 +30,34 @@ def status() -> None: typer.echo(f"cases: {db.case_count()}") -@app.command("fetch-urlhaus") -def fetch_urlhaus(limit: int = typer.Option(50, help="max rows to ingest from the feed")) -> None: +def _ingest(source: str, limit: int) -> None: db.init_db() - typer.echo(f"fetching URLhaus recent feed (limit={limit})…") - cases = scout.fetch_and_signal(limit=limit) + typer.echo(f"fetching {source} (limit={limit})…") + cases = scout.fetch_and_signal(source, limit=limit) for c in cases: db.upsert_case(c) - typer.echo(f"ingested {len(cases)} case(s). total now: {db.case_count()}") + typer.echo(f"ingested {len(cases)} case(s) from {source}. total now: {db.case_count()}") + + +@app.command("fetch-urlhaus") +def fetch_urlhaus(limit: int = typer.Option(50, help="max rows to ingest")) -> None: + _ingest("urlhaus", limit) + + +@app.command("fetch-cisa-kev") +def fetch_cisa_kev(limit: int = typer.Option(100, help="max vulnerabilities to ingest")) -> None: + _ingest("cisa-kev", limit) + + +@app.command("fetch-feodo") +def fetch_feodo(limit: int = typer.Option(50, help="max C2 records to ingest")) -> None: + _ingest("feodo", limit) + + +@app.command("fetch-all") +def fetch_all() -> None: + for source, limit in (("urlhaus", 50), ("cisa-kev", 100), ("feodo", 50)): + _ingest(source, limit) @app.command("classify-case") @@ -250,7 +270,7 @@ def demo() -> None: seal.generate_recipient_keys(recipient) typer.echo(f" + generated demo keys for {recipient}") typer.echo("fetching one URLhaus row…") - cases = scout.fetch_and_signal(limit=1) + cases = scout.fetch_and_signal("urlhaus", limit=1) if not cases: typer.echo("no cases ingested; URLhaus may be empty or unreachable", err=True) raise typer.Exit(1) diff --git a/src/psyc/lines/classify.py b/src/psyc/lines/classify.py index ef403c7..b9751ae 100644 --- a/src/psyc/lines/classify.py +++ b/src/psyc/lines/classify.py @@ -8,6 +8,12 @@ from psyc.models import Case, IncidentType, InternalClass, Severity, TLP _log = log.get(__name__) +_FEED_INCIDENT = { + "urlhaus": IncidentType.MALWARE, + "feodo": IncidentType.BOTNET, + "cisa-kev": IncidentType.EXPLOIT, +} + def classify(case: Case) -> Case: _classify_incident_type_and_tlp(case) @@ -27,10 +33,14 @@ def classify(case: Case) -> Case: def _classify_incident_type_and_tlp(case: Case) -> None: if case.classification.incident_type is not None: return - if case.source_type == "abuse_feed" and case.observables.urls: - case.classification.incident_type = IncidentType.MALWARE - if case.classification.tlp == TLP.AMBER: - case.classification.tlp = TLP.GREEN + incident = _FEED_INCIDENT.get(case.source_metadata.get("feed", "")) + if incident is None and case.observables.urls: + incident = IncidentType.MALWARE # fallback for un-tagged feeds + if incident is None: + return + case.classification.incident_type = incident + if case.classification.tlp == TLP.AMBER: + case.classification.tlp = TLP.GREEN def _classify_severity(case: Case) -> None: @@ -39,9 +49,14 @@ def _classify_severity(case: Case) -> None: if case.victim.critical_infrastructure: case.classification.severity = Severity.CRITICAL return - if case.classification.incident_type == IncidentType.MALWARE: - url_status = case.source_metadata.get("url_status", "") - case.classification.severity = Severity.HIGH if url_status == "online" else Severity.MEDIUM + incident = case.classification.incident_type + if incident == IncidentType.EXPLOIT: + ransomware = case.source_metadata.get("ransomware", "") + case.classification.severity = Severity.CRITICAL if ransomware == "Known" else Severity.HIGH + return + if incident in (IncidentType.MALWARE, IncidentType.BOTNET): + status = case.source_metadata.get("url_status") or case.source_metadata.get("status", "") + case.classification.severity = Severity.HIGH if status == "online" else Severity.MEDIUM return case.classification.severity = Severity.MEDIUM diff --git a/src/psyc/lines/route.py b/src/psyc/lines/route.py index 655ec56..7ca613c 100644 --- a/src/psyc/lines/route.py +++ b/src/psyc/lines/route.py @@ -56,7 +56,7 @@ DESTINATIONS: List[Destination] = [ name="CERT-Bund", kind="authority", max_tlp=TLP.RED, - accepts=[IncidentType.MALWARE, IncidentType.RANSOMWARE, IncidentType.PHISHING, IncidentType.EXPLOIT, IncidentType.DATA_LEAK, IncidentType.CREDENTIAL_LEAK], + accepts=[IncidentType.MALWARE, IncidentType.RANSOMWARE, IncidentType.PHISHING, IncidentType.EXPLOIT, IncidentType.BOTNET, IncidentType.DATA_LEAK, IncidentType.CREDENTIAL_LEAK], priority=1, payload_kind="sealed_evidence_package", countries=["DE"], diff --git a/src/psyc/lines/scout.py b/src/psyc/lines/scout.py index 85c8558..c4307ea 100644 --- a/src/psyc/lines/scout.py +++ b/src/psyc/lines/scout.py @@ -1,7 +1,9 @@ -"""Scoutline — Fetcher + Signalizer for URLhaus. +"""Scoutline — multi-source Fetcher + Signalizer. -Emits raw Case objects with source metadata + observables only. Classification, -victim/actor resolution, confidence scoring, sealing, and routing are downstream. +Each source pulls a public defensive feed and emits normalized Case objects +carrying observables + source metadata only; classification, mapping, sealing, +and routing happen downstream. Sources are registered in SOURCES; the +`source_metadata["feed"]` tag lets Classifyline assign the right incident type. """ from __future__ import annotations @@ -9,7 +11,7 @@ from __future__ import annotations import csv import io from datetime import datetime, timezone -from typing import Dict, Iterable, List, Optional +from typing import Callable, Dict, Iterable, List, Optional from urllib.parse import urlparse import httpx @@ -18,27 +20,33 @@ from psyc import log from psyc.models import Case, Observables -URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/" USER_AGENT = "psyc/0.1 (defensive CTI; hackathon prototype)" +HTTP_TIMEOUT = 30.0 + +URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/" +CISA_KEV_JSON = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" +FEODO_BLOCKLIST_JSON = "https://feodotracker.abuse.ch/downloads/ipblocklist.json" _log = log.get(__name__) -def fetch_recent_csv(timeout: float = 30.0) -> str: - with httpx.Client(timeout=timeout, headers={"User-Agent": USER_AGENT}) as client: - resp = client.get(URLHAUS_RECENT_CSV) +def _http_get(url: str) -> httpx.Response: + with httpx.Client(timeout=HTTP_TIMEOUT, headers={"User-Agent": USER_AGENT}, follow_redirects=True) as client: + resp = client.get(url) resp.raise_for_status() - return resp.text + return resp -def _parse_urlhaus_date(s: str) -> datetime: +def _parse_dt(value: str, fmt: str) -> datetime: try: - return datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) - except ValueError: + return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc) + except (ValueError, TypeError): return datetime.now(timezone.utc) -def parse_urlhaus_csv(csv_text: str) -> Iterable[Dict[str, str]]: +# --- URLhaus — malware distribution URLs --------------------------------- + +def _parse_urlhaus_csv(csv_text: str) -> Iterable[Dict[str, str]]: lines = [ln for ln in csv_text.splitlines() if ln and not ln.startswith("#")] if not lines: return @@ -47,24 +55,16 @@ def parse_urlhaus_csv(csv_text: str) -> Iterable[Dict[str, str]]: if len(fields) < 9: continue yield { - "id": fields[0], - "dateadded": fields[1], - "url": fields[2], - "url_status": fields[3], - "last_online": fields[4], - "threat": fields[5], - "tags": fields[6], - "urlhaus_link": fields[7], - "reporter": fields[8], + "id": fields[0], "dateadded": fields[1], "url": fields[2], + "url_status": fields[3], "last_online": fields[4], "threat": fields[5], + "tags": fields[6], "urlhaus_link": fields[7], "reporter": fields[8], } -def row_to_case(row: Dict[str, str]) -> Case: +def _urlhaus_row_to_case(row: Dict[str, str]) -> Case: url = row["url"] - parsed = urlparse(url) - host = parsed.hostname or "" + host = urlparse(url).hostname or "" tags = [t.strip() for t in row["tags"].split(",") if t.strip()] - observables = Observables(urls=[url], domains=[host] if host else []) summary = f"URLhaus: {row['threat'] or 'malware_distribution'} at {host or url}" if tags: summary += f" (tags: {', '.join(tags[:4])})" @@ -74,22 +74,89 @@ def row_to_case(row: Dict[str, str]) -> Case: source_type="abuse_feed", source_ref=row["urlhaus_link"], source_metadata=dict( - url_status=row["url_status"], - threat=row["threat"], - tags=row["tags"], - reporter=row["reporter"], + feed="urlhaus", url_status=row["url_status"], threat=row["threat"], + tags=row["tags"], reporter=row["reporter"], ), - observed_at=_parse_urlhaus_date(row["dateadded"]), - observables=observables, + observed_at=_parse_dt(row["dateadded"], "%Y-%m-%d %H:%M:%S"), + observables=Observables(urls=[url], domains=[host] if host else []), ) -def fetch_and_signal(limit: Optional[int] = None) -> List[Case]: - csv_text = fetch_recent_csv() - cases: List[Case] = [] - for row in parse_urlhaus_csv(csv_text): - cases.append(row_to_case(row)) - if limit is not None and len(cases) >= limit: - break - _log.info("scout.urlhaus.fetched", count=len(cases)) +def _fetch_urlhaus() -> List[Case]: + text = _http_get(URLHAUS_RECENT_CSV).text + return [_urlhaus_row_to_case(r) for r in _parse_urlhaus_csv(text)] + + +# --- CISA KEV — known exploited vulnerabilities -------------------------- + +def _kev_vuln_to_case(v: Dict[str, object]) -> Case: + cve = str(v["cveID"]) + vendor = str(v.get("vendorProject", "")) + product = str(v.get("product", "")) + vp = vendor if vendor == product else f"{vendor} {product}".strip() + summary = f"CISA KEV: {v.get('vulnerabilityName', cve)}" + if vp: + summary += f" — {vp}" + return Case( + case_id=f"PSYC-KEV-{cve}", + summary=summary, + source_type="advisory", + source_ref=f"https://nvd.nist.gov/vuln/detail/{cve}", + source_metadata=dict( + feed="cisa-kev", vendor=vendor, product=product, + ransomware=str(v.get("knownRansomwareCampaignUse", "Unknown")), + date_added=str(v.get("dateAdded", "")), due_date=str(v.get("dueDate", "")), + ), + observed_at=_parse_dt(str(v.get("dateAdded", "")), "%Y-%m-%d"), + observables=Observables(cves=[cve]), + ) + + +def _fetch_cisa_kev() -> List[Case]: + data = _http_get(CISA_KEV_JSON).json() + return [_kev_vuln_to_case(v) for v in data.get("vulnerabilities", [])] + + +# --- Feodo Tracker — botnet C2 servers ----------------------------------- + +def _feodo_record_to_case(r: Dict[str, object]) -> Case: + ip = str(r.get("ip_address", "")) + port = str(r.get("port", "")) + malware = str(r.get("malware", "botnet")) + return Case( + case_id=f"PSYC-FEODO-{ip}-{port}", + summary=f"Feodo Tracker: {malware} botnet C2 at {ip}:{port}", + source_type="abuse_feed", + source_ref="https://feodotracker.abuse.ch/browse/", + source_metadata=dict( + feed="feodo", malware=malware, status=str(r.get("status", "")), + port=port, country=str(r.get("country", "")), as_name=str(r.get("as_name", "")), + ), + observed_at=_parse_dt(str(r.get("first_seen", "")), "%Y-%m-%d %H:%M:%S"), + observables=Observables(ips=[ip] if ip else []), + ) + + +def _fetch_feodo() -> List[Case]: + data = _http_get(FEODO_BLOCKLIST_JSON).json() + return [_feodo_record_to_case(r) for r in data] + + +# --- registry + dispatch ------------------------------------------------- + +SOURCES: Dict[str, Callable[[], List[Case]]] = { + "urlhaus": _fetch_urlhaus, + "cisa-kev": _fetch_cisa_kev, + "feodo": _fetch_feodo, +} + + +def fetch_and_signal(source: str, limit: Optional[int] = None) -> List[Case]: + fetcher = SOURCES.get(source) + if fetcher is None: + raise ValueError(f"unknown source: {source}; known: {', '.join(SOURCES)}") + cases = fetcher() + if limit is not None: + cases = cases[:limit] + _log.info("scout.fetched", source=source, count=len(cases)) return cases