"""Scoutline parser tests — feed rows to normalized Case objects.""" from __future__ import annotations from psyc.lines.scout import ( _feodo_record_to_case, _kev_vuln_to_case, _mb_row_to_case, _otx_pulse_to_case, _parse_urlhaus_csv, _threatfox_row_to_case, ) URLHAUS_CSV = """\ # comment line "3846688","2026-05-14 11:01:14","http://1.2.3.4/x","online","2026-05-14","malware_download","elf,mirai","https://urlhaus.abuse.ch/url/3846688/","reporter1" """ def test__parse_urlhaus_csv_skips_comments_and_parses_rows(): rows = list(_parse_urlhaus_csv(URLHAUS_CSV)) assert len(rows) == 1 assert rows[0]["url"] == "http://1.2.3.4/x" assert rows[0]["url_status"] == "online" def test_kev_vuln_to_case(): vuln = { "cveID": "CVE-2026-0300", "vendorProject": "Microsoft", "product": "Exchange", "vulnerabilityName": "Exchange XSS", "dateAdded": "2026-05-15", "knownRansomwareCampaignUse": "Known", } case = _kev_vuln_to_case(vuln) assert case.case_id == "PSYC-KEV-CVE-2026-0300" assert case.observables.cves == ["CVE-2026-0300"] assert case.source_metadata["feed"] == "cisa-kev" assert case.source_metadata["ransomware"] == "Known" def test_feodo_record_to_case(): record = { "ip_address": "162.243.103.246", "port": 8080, "status": "online", "malware": "Emotet", "country": "US", "first_seen": "2022-06-04 21:24:53", } case = _feodo_record_to_case(record) assert case.observables.ips == ["162.243.103.246"] assert case.source_metadata["feed"] == "feodo" assert case.source_metadata["malware"] == "Emotet" assert case.source_metadata["status"] == "online" def test_threatfox_row_url_to_case(): row = { "id": "1234567", "ioc": "http://1.2.3.4/x.bin", "ioc_type": "url", "threat_type": "payload_delivery", "malware_printable": "Cobalt Strike", "first_seen": "2026-05-19 10:00:00", "confidence_level": 100, "tags": ["c2", "stager"], "reporter": "anon", } case = _threatfox_row_to_case(row) assert case is not None assert case.case_id == "PSYC-THREATFOX-1234567" assert case.observables.urls == ["http://1.2.3.4/x.bin"] assert case.observables.domains == ["1.2.3.4"] assert case.source_metadata["feed"] == "threatfox" assert case.source_metadata["malware"] == "Cobalt Strike" assert case.source_metadata["threat_type"] == "payload_delivery" def test_threatfox_row_ip_port_to_case(): row = { "id": "9999", "ioc": "5.6.7.8:443", "ioc_type": "ip:port", "threat_type": "botnet_cc", "malware_printable": "Qakbot", "first_seen": "2026-05-18 10:00:00", } case = _threatfox_row_to_case(row) assert case is not None assert case.observables.ips == ["5.6.7.8"] # port stripped def test_threatfox_row_rejects_unknown_type(): assert _threatfox_row_to_case({"id": "1", "ioc": "x", "ioc_type": "ja3_fp"}) is None def test_malware_bazaar_row_to_case(): row = { "sha256_hash": "a" * 64, "sha1_hash": "b" * 40, "md5_hash": "c" * 32, "file_name": "invoice.exe", "signature": "AgentTesla", "file_type": "exe", "first_seen": "2026-05-19 10:00:00", "tags": ["RAT", "stealer"], } case = _mb_row_to_case(row) assert case is not None assert case.case_id == "PSYC-MBAZAAR-" + "a" * 16 assert case.observables.hashes == ["a" * 64, "b" * 40, "c" * 32] assert case.source_metadata["feed"] == "malware-bazaar" assert case.source_metadata["signature"] == "AgentTesla" def test_otx_pulse_to_case_multi_indicator(): pulse = { "id": "pulse-abc", "name": "APT-X campaign Q2 2026", "description": "Threat actor APT-X distributed Cobalt Strike via spear-phishing emails targeting EU energy firms. The following indicators were recovered:", "created": "2026-05-15T12:00:00.000000", "tlp": "white", "tags": ["apt-x", "energy"], "indicators": [ {"indicator": "1.2.3.4", "type": "IPv4"}, {"indicator": "evil.example", "type": "domain"}, {"indicator": "http://evil.example/payload.bin", "type": "URL"}, {"indicator": "d" * 64, "type": "FileHash-SHA256"}, {"indicator": "CVE-2026-1111", "type": "CVE"}, {"indicator": "irrelevant", "type": "Mutex"}, # ignored ], } case = _otx_pulse_to_case(pulse) assert case is not None assert case.case_id == "PSYC-OTX-pulse-abc" assert case.observables.ips == ["1.2.3.4"] assert "evil.example" in case.observables.domains assert case.observables.urls == ["http://evil.example/payload.bin"] assert case.observables.hashes == ["d" * 64] assert case.observables.cves == ["CVE-2026-1111"] assert "APT-X" in case.source_metadata["description"] assert case.source_metadata["feed"] == "otx" def test_otx_pulse_skips_when_no_recognized_indicators(): pulse = {"id": "p1", "name": "x", "description": "", "indicators": [{"indicator": "x", "type": "Mutex"}]} assert _otx_pulse_to_case(pulse) is None