Files
psyc/tests/test_scout.py
m17hr1l 85830be9fa stage-19-fix: ThreatFox + MalwareBazaar — real API shape
Live test against abuse.ch revealed two issues with the stage-19 wiring:

- ThreatFox returns `ioc` (not `ioc_value`) and `first_seen` (not
  `first_seen_utc`) — older field names from stale docs. Parser now reads
  the real names and falls back to the old aliases defensively. Also
  captures `malware_malpedia` (per-family writeup URL) and
  `threat_type_desc` for richer downstream prose.
- MalwareBazaar's API expects form-encoded bodies, unlike ThreatFox's
  JSON. Extended _http with form_body=; MB fetcher switched to it.

Verified live: 10 ThreatFox cases landed with mixed botnet/malware
classification (4/6 split from threat_type signal — first real
incident-type diversity from a single feed). 10 MalwareBazaar cases
landed with sha256+sha1 hash observables and exe/file_type metadata.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:25:56 +02:00

150 lines
5.1 KiB
Python

"""Scoutline parser tests — feed rows to normalized Case objects."""
from __future__ import annotations
from psyc.lines.scout import (
_feodo_record_to_case,
_kev_vuln_to_case,
_mb_row_to_case,
_otx_pulse_to_case,
_parse_urlhaus_csv,
_threatfox_row_to_case,
)
URLHAUS_CSV = """\
# comment line
"3846688","2026-05-14 11:01:14","http://1.2.3.4/x","online","2026-05-14","malware_download","elf,mirai","https://urlhaus.abuse.ch/url/3846688/","reporter1"
"""
def test__parse_urlhaus_csv_skips_comments_and_parses_rows():
rows = list(_parse_urlhaus_csv(URLHAUS_CSV))
assert len(rows) == 1
assert rows[0]["url"] == "http://1.2.3.4/x"
assert rows[0]["url_status"] == "online"
def test_kev_vuln_to_case():
vuln = {
"cveID": "CVE-2026-0300",
"vendorProject": "Microsoft",
"product": "Exchange",
"vulnerabilityName": "Exchange XSS",
"dateAdded": "2026-05-15",
"knownRansomwareCampaignUse": "Known",
}
case = _kev_vuln_to_case(vuln)
assert case.case_id == "PSYC-KEV-CVE-2026-0300"
assert case.observables.cves == ["CVE-2026-0300"]
assert case.source_metadata["feed"] == "cisa-kev"
assert case.source_metadata["ransomware"] == "Known"
def test_feodo_record_to_case():
record = {
"ip_address": "162.243.103.246",
"port": 8080,
"status": "online",
"malware": "Emotet",
"country": "US",
"first_seen": "2022-06-04 21:24:53",
}
case = _feodo_record_to_case(record)
assert case.observables.ips == ["162.243.103.246"]
assert case.source_metadata["feed"] == "feodo"
assert case.source_metadata["malware"] == "Emotet"
assert case.source_metadata["status"] == "online"
def test_threatfox_row_url_to_case():
row = {
"id": "1234567",
"ioc": "http://1.2.3.4/x.bin",
"ioc_type": "url",
"threat_type": "payload_delivery",
"malware_printable": "Cobalt Strike",
"first_seen": "2026-05-19 10:00:00",
"confidence_level": 100,
"tags": ["c2", "stager"],
"reporter": "anon",
}
case = _threatfox_row_to_case(row)
assert case is not None
assert case.case_id == "PSYC-THREATFOX-1234567"
assert case.observables.urls == ["http://1.2.3.4/x.bin"]
assert case.observables.domains == ["1.2.3.4"]
assert case.source_metadata["feed"] == "threatfox"
assert case.source_metadata["malware"] == "Cobalt Strike"
assert case.source_metadata["threat_type"] == "payload_delivery"
def test_threatfox_row_ip_port_to_case():
row = {
"id": "9999",
"ioc": "5.6.7.8:443",
"ioc_type": "ip:port",
"threat_type": "botnet_cc",
"malware_printable": "Qakbot",
"first_seen": "2026-05-18 10:00:00",
}
case = _threatfox_row_to_case(row)
assert case is not None
assert case.observables.ips == ["5.6.7.8"] # port stripped
def test_threatfox_row_rejects_unknown_type():
assert _threatfox_row_to_case({"id": "1", "ioc": "x", "ioc_type": "ja3_fp"}) is None
def test_malware_bazaar_row_to_case():
row = {
"sha256_hash": "a" * 64,
"sha1_hash": "b" * 40,
"md5_hash": "c" * 32,
"file_name": "invoice.exe",
"signature": "AgentTesla",
"file_type": "exe",
"first_seen": "2026-05-19 10:00:00",
"tags": ["RAT", "stealer"],
}
case = _mb_row_to_case(row)
assert case is not None
assert case.case_id == "PSYC-MBAZAAR-" + "a" * 16
assert case.observables.hashes == ["a" * 64, "b" * 40, "c" * 32]
assert case.source_metadata["feed"] == "malware-bazaar"
assert case.source_metadata["signature"] == "AgentTesla"
def test_otx_pulse_to_case_multi_indicator():
pulse = {
"id": "pulse-abc",
"name": "APT-X campaign Q2 2026",
"description": "Threat actor APT-X distributed Cobalt Strike via spear-phishing emails targeting EU energy firms. The following indicators were recovered:",
"created": "2026-05-15T12:00:00.000000",
"tlp": "white",
"tags": ["apt-x", "energy"],
"indicators": [
{"indicator": "1.2.3.4", "type": "IPv4"},
{"indicator": "evil.example", "type": "domain"},
{"indicator": "http://evil.example/payload.bin", "type": "URL"},
{"indicator": "d" * 64, "type": "FileHash-SHA256"},
{"indicator": "CVE-2026-1111", "type": "CVE"},
{"indicator": "irrelevant", "type": "Mutex"}, # ignored
],
}
case = _otx_pulse_to_case(pulse)
assert case is not None
assert case.case_id == "PSYC-OTX-pulse-abc"
assert case.observables.ips == ["1.2.3.4"]
assert "evil.example" in case.observables.domains
assert case.observables.urls == ["http://evil.example/payload.bin"]
assert case.observables.hashes == ["d" * 64]
assert case.observables.cves == ["CVE-2026-1111"]
assert "APT-X" in case.source_metadata["description"]
assert case.source_metadata["feed"] == "otx"
def test_otx_pulse_skips_when_no_recognized_indicators():
pulse = {"id": "p1", "name": "x", "description": "", "indicators": [{"indicator": "x", "type": "Mutex"}]}
assert _otx_pulse_to_case(pulse) is None