From d87bd710bb1c542a2ed39b825e929c4324e26f68 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Wed, 20 May 2026 22:14:18 +0200 Subject: [PATCH] stage-19: ThreatFox + MalwareBazaar + OTX Scoutline sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new feeds — biggest near-term data-diversity win. ThreatFox brings multi-malware IOCs with threat_type signal (botnet_cc → BOTNET, payload_delivery → MALWARE, phishing → PHISHING). MalwareBazaar brings file-hash samples with signatures. OTX brings curated multi-source pulses with paragraph-form descriptions — by far the richest real-prose source. Auth: THREATFOX_AUTH_KEY (one abuse.ch key covers ThreatFox + MalwareBazaar) and OTX_API_KEY. fetch-all skips keyed feeds cleanly with where-to-get-it guidance instead of tracebacking. Proofline reliability table extended; abuse.ch sources rated B/2, OTX rated C/3 (community-driven). Co-Authored-By: Claude Opus 4.7 --- src/psyc/cli.py | 28 ++++- src/psyc/lines/classify.py | 16 ++- src/psyc/lines/proof.py | 9 +- src/psyc/lines/scout.py | 219 ++++++++++++++++++++++++++++++++++++- tests/test_classify.py | 23 ++++ tests/test_scout.py | 102 ++++++++++++++++- 6 files changed, 385 insertions(+), 12 deletions(-) diff --git a/src/psyc/cli.py b/src/psyc/cli.py index 29a5f97..995b9e8 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -88,10 +88,34 @@ def fetch_feodo(limit: int = typer.Option(50, help="max C2 records to ingest")) _ingest("feodo", limit) +@app.command("fetch-threatfox") +def fetch_threatfox(limit: int = typer.Option(200, help="max IOCs to ingest")) -> None: + """ThreatFox (abuse.ch) — needs THREATFOX_AUTH_KEY in .env.""" + _ingest("threatfox", limit) + + +@app.command("fetch-malware-bazaar") +def fetch_malware_bazaar(limit: int = typer.Option(100, help="max samples to ingest")) -> None: + """MalwareBazaar (abuse.ch) — also uses THREATFOX_AUTH_KEY.""" + _ingest("malware-bazaar", limit) + + +@app.command("fetch-otx") +def fetch_otx(limit: int = typer.Option(100, help="max pulse-cases to ingest")) -> None: + """AlienVault OTX — needs OTX_API_KEY in .env.""" + _ingest("otx", limit) + + @app.command("fetch-all") def fetch_all() -> None: - for source, limit in (("urlhaus", 50), ("cisa-kev", 100), ("feodo", 50)): - _ingest(source, limit) + """Fetch every configured source. Keyed feeds skip cleanly when the key is missing.""" + plan = (("urlhaus", 50), ("cisa-kev", 100), ("feodo", 50), + ("threatfox", 200), ("malware-bazaar", 100), ("otx", 100)) + for source, limit in plan: + try: + _ingest(source, limit) + except RuntimeError as exc: + typer.echo(f" skip {source}: {exc}", err=True) @app.command("classify-case") diff --git a/src/psyc/lines/classify.py b/src/psyc/lines/classify.py index b9751ae..35bd460 100644 --- a/src/psyc/lines/classify.py +++ b/src/psyc/lines/classify.py @@ -12,6 +12,16 @@ _FEED_INCIDENT = { "urlhaus": IncidentType.MALWARE, "feodo": IncidentType.BOTNET, "cisa-kev": IncidentType.EXPLOIT, + "malware-bazaar": IncidentType.MALWARE, + "otx": IncidentType.MALWARE, # default; OTX pulses span many types +} + +# ThreatFox carries its own type signal — map it instead of using a feed default. +_THREATFOX_THREAT_TYPE = { + "botnet_cc": IncidentType.BOTNET, + "payload_delivery": IncidentType.MALWARE, + "payload": IncidentType.MALWARE, + "phishing": IncidentType.PHISHING, } @@ -33,7 +43,11 @@ def classify(case: Case) -> Case: def _classify_incident_type_and_tlp(case: Case) -> None: if case.classification.incident_type is not None: return - incident = _FEED_INCIDENT.get(case.source_metadata.get("feed", "")) + feed = case.source_metadata.get("feed", "") + if feed == "threatfox": + incident = _THREATFOX_THREAT_TYPE.get(case.source_metadata.get("threat_type", ""), IncidentType.MALWARE) + else: + incident = _FEED_INCIDENT.get(feed) if incident is None and case.observables.urls: incident = IncidentType.MALWARE # fallback for un-tagged feeds if incident is None: diff --git a/src/psyc/lines/proof.py b/src/psyc/lines/proof.py index 1ff5afd..a759f10 100644 --- a/src/psyc/lines/proof.py +++ b/src/psyc/lines/proof.py @@ -23,9 +23,12 @@ _SHA_RE = re.compile(r"^[a-fA-F0-9]{32,64}$") # feed -> (Admiralty source reliability A-F, information credibility 1-6) _FEED_RELIABILITY = { - "cisa-kev": ("A", "1"), # government catalog, confirmed exploited - "urlhaus": ("B", "2"), # established CTI source, confirmed malware - "feodo": ("B", "2"), # established CTI source, confirmed C2 + "cisa-kev": ("A", "1"), # government catalog, confirmed exploited + "urlhaus": ("B", "2"), # established CTI source, confirmed malware + "feodo": ("B", "2"), # established CTI source, confirmed C2 + "threatfox": ("B", "2"), # abuse.ch CTI source + "malware-bazaar": ("B", "2"), # abuse.ch CTI source, confirmed sample + "otx": ("C", "3"), # community-driven, varying quality } diff --git a/src/psyc/lines/scout.py b/src/psyc/lines/scout.py index c4307ea..ba740de 100644 --- a/src/psyc/lines/scout.py +++ b/src/psyc/lines/scout.py @@ -10,14 +10,15 @@ from __future__ import annotations import csv import io +import os from datetime import datetime, timezone -from typing import Callable, Dict, Iterable, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional from urllib.parse import urlparse import httpx from psyc import log -from psyc.models import Case, Observables +from psyc.models import Case, IncidentType, Observables USER_AGENT = "psyc/0.1 (defensive CTI; hackathon prototype)" @@ -26,17 +27,30 @@ HTTP_TIMEOUT = 30.0 URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/" CISA_KEV_JSON = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" FEODO_BLOCKLIST_JSON = "https://feodotracker.abuse.ch/downloads/ipblocklist.json" +THREATFOX_API = "https://threatfox-api.abuse.ch/api/v1/" +MALWARE_BAZAAR_API = "https://mb-api.abuse.ch/api/v1/" +OTX_PULSES_API = "https://otx.alienvault.com/api/v1/pulses/subscribed" _log = log.get(__name__) -def _http_get(url: str) -> httpx.Response: - with httpx.Client(timeout=HTTP_TIMEOUT, headers={"User-Agent": USER_AGENT}, follow_redirects=True) as client: - resp = client.get(url) +def _http(method: str, url: str, headers: Optional[Dict[str, str]] = None, json_body: Optional[Dict[str, Any]] = None) -> httpx.Response: + h = {"User-Agent": USER_AGENT} + if headers: + h.update(headers) + with httpx.Client(timeout=HTTP_TIMEOUT, headers=h, follow_redirects=True) as client: + if method.upper() == "POST": + resp = client.post(url, json=json_body) + else: + resp = client.get(url) resp.raise_for_status() return resp +def _http_get(url: str) -> httpx.Response: + return _http("GET", url) + + def _parse_dt(value: str, fmt: str) -> datetime: try: return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc) @@ -142,12 +156,207 @@ def _fetch_feodo() -> List[Case]: return [_feodo_record_to_case(r) for r in data] +# --- ThreatFox — multi-malware IOC feed (abuse.ch) ----------------------- + +# ThreatFox threat_type values → psyc IncidentType. +THREATFOX_THREAT_TYPE: Dict[str, IncidentType] = { + "botnet_cc": IncidentType.BOTNET, + "payload_delivery": IncidentType.MALWARE, + "payload": IncidentType.MALWARE, + "phishing": IncidentType.PHISHING, +} + + +def _threatfox_row_to_case(r: Dict[str, Any]) -> Optional[Case]: + ioc_value = str(r.get("ioc_value") or "").strip() + ioc_type = str(r.get("ioc_type") or "").lower() + if not ioc_value or not ioc_type: + return None + malware = str(r.get("malware_printable") or r.get("malware") or "unknown") + threat_type = str(r.get("threat_type") or "") + tags_raw = r.get("tags") or [] + tags = tags_raw if isinstance(tags_raw, list) else [] + + obs = Observables() + host = "" + if ioc_type in ("ip:port", "ipv4", "ipv6"): + ip = ioc_value.split(":")[0] + obs.ips = [ip] + elif ioc_type == "domain": + obs.domains = [ioc_value] + host = ioc_value + elif ioc_type == "url": + obs.urls = [ioc_value] + host = urlparse(ioc_value).hostname or "" + if host: + obs.domains = [host] + elif ioc_type in ("sha256_hash", "md5_hash", "sha1_hash"): + obs.hashes = [ioc_value] + else: + return None + + threat_label = threat_type.replace("_", " ") or "malware" + summary = f"ThreatFox: {malware} {threat_label} — {ioc_value}" + return Case( + case_id=f"PSYC-THREATFOX-{r.get('id', '')}", + summary=summary, + source_type="abuse_feed", + source_ref=str(r.get("reference") or f"https://threatfox.abuse.ch/ioc/{r.get('id', '')}/"), + source_metadata=dict( + feed="threatfox", + malware=malware, + threat_type=threat_type, + ioc_type=ioc_type, + confidence_level=str(r.get("confidence_level", "")), + tags=",".join(t for t in tags if t), + reporter=str(r.get("reporter", "")), + ), + observed_at=_parse_dt(str(r.get("first_seen_utc", "")), "%Y-%m-%d %H:%M:%S"), + observables=obs, + ) + + +def _fetch_threatfox() -> List[Case]: + key = os.environ.get("THREATFOX_AUTH_KEY", "").strip() + if not key: + raise RuntimeError("THREATFOX_AUTH_KEY not set — free abuse.ch auth-key from https://auth.abuse.ch/") + data = _http("POST", THREATFOX_API, headers={"Auth-Key": key}, json_body={"query": "get_iocs", "days": 1}).json() + rows = data.get("data") or [] + out: List[Case] = [] + for r in rows: + c = _threatfox_row_to_case(r) + if c is not None: + out.append(c) + return out + + +# --- MalwareBazaar — recent malware samples (abuse.ch) ------------------- + +def _mb_row_to_case(r: Dict[str, Any]) -> Optional[Case]: + sha256 = str(r.get("sha256_hash") or "") + if not sha256: + return None + sha1 = str(r.get("sha1_hash") or "") + md5 = str(r.get("md5_hash") or "") + file_name = str(r.get("file_name") or "unknown") + signature = str(r.get("signature") or "") + file_type = str(r.get("file_type") or "") + tags_raw = r.get("tags") or [] + tags = tags_raw if isinstance(tags_raw, list) else [] + hashes = [h for h in (sha256, sha1, md5) if h] + label = signature or "unsigned" + summary = f"MalwareBazaar: {label} {file_type} sample — {file_name}" + return Case( + case_id=f"PSYC-MBAZAAR-{sha256[:16]}", + summary=summary, + source_type="abuse_feed", + source_ref=f"https://bazaar.abuse.ch/sample/{sha256}/", + source_metadata=dict( + feed="malware-bazaar", + signature=signature, + file_type=file_type, + file_name=file_name, + tags=",".join(t for t in tags if t), + reporter=str(r.get("reporter", "")), + ), + observed_at=_parse_dt(str(r.get("first_seen") or ""), "%Y-%m-%d %H:%M:%S"), + observables=Observables(hashes=hashes), + ) + + +def _fetch_malware_bazaar() -> List[Case]: + key = os.environ.get("THREATFOX_AUTH_KEY", "").strip() + if not key: + raise RuntimeError("THREATFOX_AUTH_KEY not set — abuse.ch auth-key from https://auth.abuse.ch/ also covers MalwareBazaar") + data = _http("POST", MALWARE_BAZAAR_API, headers={"Auth-Key": key}, json_body={"query": "get_recent", "selector": "100"}).json() + rows = data.get("data") or [] + out: List[Case] = [] + for r in rows: + c = _mb_row_to_case(r) + if c is not None: + out.append(c) + return out + + +# --- AlienVault OTX — curated multi-source pulses ------------------------ + +_OTX_IOC_LIMIT_PER_PULSE = 50 + + +def _otx_pulse_to_case(p: Dict[str, Any]) -> Optional[Case]: + pulse_id = str(p.get("id") or "") + if not pulse_id: + return None + pulse_name = str(p.get("name") or "OTX pulse") + description = str(p.get("description") or "") + tags_raw = p.get("tags") or [] + tags = tags_raw if isinstance(tags_raw, list) else [] + tlp_pulse = str(p.get("tlp") or "white").upper() + indicators = p.get("indicators") or [] + + obs = Observables() + for ind in indicators[:_OTX_IOC_LIMIT_PER_PULSE]: + value = str(ind.get("indicator") or "").strip() + itype = str(ind.get("type") or "").lower() + if not value: + continue + if itype in ("ipv4", "ipv6"): + obs.ips.append(value) + elif itype in ("domain", "hostname"): + obs.domains.append(value) + elif itype == "url": + obs.urls.append(value) + host = urlparse(value).hostname or "" + if host and host not in obs.domains: + obs.domains.append(host) + elif itype in ("filehash-sha256", "filehash-sha1", "filehash-md5"): + obs.hashes.append(value) + elif itype == "cve": + obs.cves.append(value) + + if not (obs.urls or obs.domains or obs.ips or obs.hashes or obs.cves): + return None + + return Case( + case_id=f"PSYC-OTX-{pulse_id}", + summary=f"OTX: {pulse_name}", + source_type="threat_intel", + source_ref=f"https://otx.alienvault.com/pulse/{pulse_id}", + source_metadata=dict( + feed="otx", + pulse_name=pulse_name, + description=description[:2000], + tags=",".join(t for t in tags if t), + tlp_pulse=tlp_pulse, + ), + observed_at=_parse_dt(str(p.get("created") or "").split(".")[0], "%Y-%m-%dT%H:%M:%S"), + observables=obs, + ) + + +def _fetch_otx() -> List[Case]: + key = os.environ.get("OTX_API_KEY", "").strip() + if not key: + raise RuntimeError("OTX_API_KEY not set — free key at https://otx.alienvault.com → settings → API") + data = _http("GET", OTX_PULSES_API, headers={"X-OTX-API-KEY": key}).json() + pulses = data.get("results") or [] + out: List[Case] = [] + for p in pulses: + c = _otx_pulse_to_case(p) + if c is not None: + out.append(c) + return out + + # --- registry + dispatch ------------------------------------------------- SOURCES: Dict[str, Callable[[], List[Case]]] = { "urlhaus": _fetch_urlhaus, "cisa-kev": _fetch_cisa_kev, "feodo": _fetch_feodo, + "threatfox": _fetch_threatfox, + "malware-bazaar": _fetch_malware_bazaar, + "otx": _fetch_otx, } diff --git a/tests/test_classify.py b/tests/test_classify.py index f97e3ea..d17bc91 100644 --- a/tests/test_classify.py +++ b/tests/test_classify.py @@ -57,3 +57,26 @@ def test_classify_is_idempotent(): first = case.classification.model_copy(deep=True) classify(case) assert case.classification == first + + +def test_threatfox_botnet_cc_is_botnet(): + case = make_case(feed="threatfox", ips=["1.2.3.4"]) + case.source_metadata["threat_type"] = "botnet_cc" + assert classify(case).classification.incident_type is IncidentType.BOTNET + + +def test_threatfox_payload_delivery_is_malware(): + case = make_case(feed="threatfox", urls=["http://1.2.3.4/x.bin"]) + case.source_metadata["threat_type"] = "payload_delivery" + assert classify(case).classification.incident_type is IncidentType.MALWARE + + +def test_threatfox_phishing_threat_type_is_phishing(): + case = make_case(feed="threatfox", urls=["http://login.bad/example"]) + case.source_metadata["threat_type"] = "phishing" + assert classify(case).classification.incident_type is IncidentType.PHISHING + + +def test_malware_bazaar_is_malware(): + case = make_case(feed="malware-bazaar", hashes=["a" * 64]) + assert classify(case).classification.incident_type is IncidentType.MALWARE diff --git a/tests/test_scout.py b/tests/test_scout.py index 831c1e4..5f6587b 100644 --- a/tests/test_scout.py +++ b/tests/test_scout.py @@ -2,7 +2,14 @@ from __future__ import annotations -from psyc.lines.scout import _feodo_record_to_case, _kev_vuln_to_case, _parse_urlhaus_csv +from psyc.lines.scout import ( + _feodo_record_to_case, + _kev_vuln_to_case, + _mb_row_to_case, + _otx_pulse_to_case, + _parse_urlhaus_csv, + _threatfox_row_to_case, +) URLHAUS_CSV = """\ # comment line @@ -47,3 +54,96 @@ def test_feodo_record_to_case(): assert case.source_metadata["feed"] == "feodo" assert case.source_metadata["malware"] == "Emotet" assert case.source_metadata["status"] == "online" + + +def test_threatfox_row_url_to_case(): + row = { + "id": "1234567", + "ioc_value": "http://1.2.3.4/x.bin", + "ioc_type": "url", + "threat_type": "payload_delivery", + "malware_printable": "Cobalt Strike", + "first_seen_utc": "2026-05-19 10:00:00", + "confidence_level": 100, + "tags": ["c2", "stager"], + "reporter": "anon", + } + case = _threatfox_row_to_case(row) + assert case is not None + assert case.case_id == "PSYC-THREATFOX-1234567" + assert case.observables.urls == ["http://1.2.3.4/x.bin"] + assert case.observables.domains == ["1.2.3.4"] + assert case.source_metadata["feed"] == "threatfox" + assert case.source_metadata["malware"] == "Cobalt Strike" + assert case.source_metadata["threat_type"] == "payload_delivery" + + +def test_threatfox_row_ip_port_to_case(): + row = { + "id": "9999", + "ioc_value": "5.6.7.8:443", + "ioc_type": "ip:port", + "threat_type": "botnet_cc", + "malware_printable": "Qakbot", + "first_seen_utc": "2026-05-18 10:00:00", + } + case = _threatfox_row_to_case(row) + assert case is not None + assert case.observables.ips == ["5.6.7.8"] # port stripped + + +def test_threatfox_row_rejects_unknown_type(): + assert _threatfox_row_to_case({"id": "1", "ioc_value": "x", "ioc_type": "ja3_fp"}) is None + + +def test_malware_bazaar_row_to_case(): + row = { + "sha256_hash": "a" * 64, + "sha1_hash": "b" * 40, + "md5_hash": "c" * 32, + "file_name": "invoice.exe", + "signature": "AgentTesla", + "file_type": "exe", + "first_seen": "2026-05-19 10:00:00", + "tags": ["RAT", "stealer"], + } + case = _mb_row_to_case(row) + assert case is not None + assert case.case_id == "PSYC-MBAZAAR-" + "a" * 16 + assert case.observables.hashes == ["a" * 64, "b" * 40, "c" * 32] + assert case.source_metadata["feed"] == "malware-bazaar" + assert case.source_metadata["signature"] == "AgentTesla" + + +def test_otx_pulse_to_case_multi_indicator(): + pulse = { + "id": "pulse-abc", + "name": "APT-X campaign Q2 2026", + "description": "Threat actor APT-X distributed Cobalt Strike via spear-phishing emails targeting EU energy firms. The following indicators were recovered:", + "created": "2026-05-15T12:00:00.000000", + "tlp": "white", + "tags": ["apt-x", "energy"], + "indicators": [ + {"indicator": "1.2.3.4", "type": "IPv4"}, + {"indicator": "evil.example", "type": "domain"}, + {"indicator": "http://evil.example/payload.bin", "type": "URL"}, + {"indicator": "d" * 64, "type": "FileHash-SHA256"}, + {"indicator": "CVE-2026-1111", "type": "CVE"}, + {"indicator": "irrelevant", "type": "Mutex"}, # ignored + ], + } + case = _otx_pulse_to_case(pulse) + assert case is not None + assert case.case_id == "PSYC-OTX-pulse-abc" + assert case.observables.ips == ["1.2.3.4"] + assert "evil.example" in case.observables.domains + assert case.observables.urls == ["http://evil.example/payload.bin"] + assert case.observables.hashes == ["d" * 64] + assert case.observables.cves == ["CVE-2026-1111"] + assert "APT-X" in case.source_metadata["description"] + assert case.source_metadata["feed"] == "otx" + + +def test_otx_pulse_skips_when_no_recognized_indicators(): + pulse = {"id": "p1", "name": "x", "description": "", "indicators": [{"indicator": "x", "type": "Mutex"}]} + assert _otx_pulse_to_case(pulse) is None