Files
psyc/tests/test_scout.py
m17hr1l d87bd710bb stage-19: ThreatFox + MalwareBazaar + OTX Scoutline sources
Three new feeds — biggest near-term data-diversity win. ThreatFox brings
multi-malware IOCs with threat_type signal (botnet_cc → BOTNET,
payload_delivery → MALWARE, phishing → PHISHING). MalwareBazaar brings
file-hash samples with signatures. OTX brings curated multi-source pulses
with paragraph-form descriptions — by far the richest real-prose source.

Auth: THREATFOX_AUTH_KEY (one abuse.ch key covers ThreatFox + MalwareBazaar)
and OTX_API_KEY. fetch-all skips keyed feeds cleanly with where-to-get-it
guidance instead of tracebacking. Proofline reliability table extended;
abuse.ch sources rated B/2, OTX rated C/3 (community-driven).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:14:18 +02:00

150 lines
5.2 KiB
Python

"""Scoutline parser tests — feed rows to normalized Case objects."""
from __future__ import annotations
from psyc.lines.scout import (
_feodo_record_to_case,
_kev_vuln_to_case,
_mb_row_to_case,
_otx_pulse_to_case,
_parse_urlhaus_csv,
_threatfox_row_to_case,
)
URLHAUS_CSV = """\
# comment line
"3846688","2026-05-14 11:01:14","http://1.2.3.4/x","online","2026-05-14","malware_download","elf,mirai","https://urlhaus.abuse.ch/url/3846688/","reporter1"
"""
def test__parse_urlhaus_csv_skips_comments_and_parses_rows():
rows = list(_parse_urlhaus_csv(URLHAUS_CSV))
assert len(rows) == 1
assert rows[0]["url"] == "http://1.2.3.4/x"
assert rows[0]["url_status"] == "online"
def test_kev_vuln_to_case():
vuln = {
"cveID": "CVE-2026-0300",
"vendorProject": "Microsoft",
"product": "Exchange",
"vulnerabilityName": "Exchange XSS",
"dateAdded": "2026-05-15",
"knownRansomwareCampaignUse": "Known",
}
case = _kev_vuln_to_case(vuln)
assert case.case_id == "PSYC-KEV-CVE-2026-0300"
assert case.observables.cves == ["CVE-2026-0300"]
assert case.source_metadata["feed"] == "cisa-kev"
assert case.source_metadata["ransomware"] == "Known"
def test_feodo_record_to_case():
record = {
"ip_address": "162.243.103.246",
"port": 8080,
"status": "online",
"malware": "Emotet",
"country": "US",
"first_seen": "2022-06-04 21:24:53",
}
case = _feodo_record_to_case(record)
assert case.observables.ips == ["162.243.103.246"]
assert case.source_metadata["feed"] == "feodo"
assert case.source_metadata["malware"] == "Emotet"
assert case.source_metadata["status"] == "online"
def test_threatfox_row_url_to_case():
row = {
"id": "1234567",
"ioc_value": "http://1.2.3.4/x.bin",
"ioc_type": "url",
"threat_type": "payload_delivery",
"malware_printable": "Cobalt Strike",
"first_seen_utc": "2026-05-19 10:00:00",
"confidence_level": 100,
"tags": ["c2", "stager"],
"reporter": "anon",
}
case = _threatfox_row_to_case(row)
assert case is not None
assert case.case_id == "PSYC-THREATFOX-1234567"
assert case.observables.urls == ["http://1.2.3.4/x.bin"]
assert case.observables.domains == ["1.2.3.4"]
assert case.source_metadata["feed"] == "threatfox"
assert case.source_metadata["malware"] == "Cobalt Strike"
assert case.source_metadata["threat_type"] == "payload_delivery"
def test_threatfox_row_ip_port_to_case():
row = {
"id": "9999",
"ioc_value": "5.6.7.8:443",
"ioc_type": "ip:port",
"threat_type": "botnet_cc",
"malware_printable": "Qakbot",
"first_seen_utc": "2026-05-18 10:00:00",
}
case = _threatfox_row_to_case(row)
assert case is not None
assert case.observables.ips == ["5.6.7.8"] # port stripped
def test_threatfox_row_rejects_unknown_type():
assert _threatfox_row_to_case({"id": "1", "ioc_value": "x", "ioc_type": "ja3_fp"}) is None
def test_malware_bazaar_row_to_case():
row = {
"sha256_hash": "a" * 64,
"sha1_hash": "b" * 40,
"md5_hash": "c" * 32,
"file_name": "invoice.exe",
"signature": "AgentTesla",
"file_type": "exe",
"first_seen": "2026-05-19 10:00:00",
"tags": ["RAT", "stealer"],
}
case = _mb_row_to_case(row)
assert case is not None
assert case.case_id == "PSYC-MBAZAAR-" + "a" * 16
assert case.observables.hashes == ["a" * 64, "b" * 40, "c" * 32]
assert case.source_metadata["feed"] == "malware-bazaar"
assert case.source_metadata["signature"] == "AgentTesla"
def test_otx_pulse_to_case_multi_indicator():
pulse = {
"id": "pulse-abc",
"name": "APT-X campaign Q2 2026",
"description": "Threat actor APT-X distributed Cobalt Strike via spear-phishing emails targeting EU energy firms. The following indicators were recovered:",
"created": "2026-05-15T12:00:00.000000",
"tlp": "white",
"tags": ["apt-x", "energy"],
"indicators": [
{"indicator": "1.2.3.4", "type": "IPv4"},
{"indicator": "evil.example", "type": "domain"},
{"indicator": "http://evil.example/payload.bin", "type": "URL"},
{"indicator": "d" * 64, "type": "FileHash-SHA256"},
{"indicator": "CVE-2026-1111", "type": "CVE"},
{"indicator": "irrelevant", "type": "Mutex"}, # ignored
],
}
case = _otx_pulse_to_case(pulse)
assert case is not None
assert case.case_id == "PSYC-OTX-pulse-abc"
assert case.observables.ips == ["1.2.3.4"]
assert "evil.example" in case.observables.domains
assert case.observables.urls == ["http://evil.example/payload.bin"]
assert case.observables.hashes == ["d" * 64]
assert case.observables.cves == ["CVE-2026-1111"]
assert "APT-X" in case.source_metadata["description"]
assert case.source_metadata["feed"] == "otx"
def test_otx_pulse_skips_when_no_recognized_indicators():
pulse = {"id": "p1", "name": "x", "description": "", "indicators": [{"indicator": "x", "type": "Mutex"}]}
assert _otx_pulse_to_case(pulse) is None