"""Lookupline — IOC index, normalization, lookup, blocklist export.""" from __future__ import annotations import pytest from sqlalchemy import create_engine from psyc import db from psyc.lines import lookup from psyc.models import Severity from conftest import make_case @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" eng = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(eng, checkfirst=True) monkeypatch.setattr(db, "_engine", eng) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db def test_normalize_lowercases_except_cve(): assert lookup.normalize("EVIL.COM", "domain") == "evil.com" assert lookup.normalize(" AbCdEf ", "hash") == "abcdef" assert lookup.normalize("cve-2026-0001", "cve") == "CVE-2026-0001" def test_iter_case_iocs_covers_all_types(): case = make_case( feed="urlhaus", urls=["http://1.2.3.4/x"], domains=["EVIL.com"], ips=["1.2.3.4"], hashes=["AABBCC"], cves=["cve-2026-1"], ) pairs = set(lookup.iter_case_iocs(case)) assert ("http://1.2.3.4/x", "url") in pairs assert ("evil.com", "domain") in pairs # normalized assert ("1.2.3.4", "ip") in pairs assert ("aabbcc", "hash") in pairs # normalized assert ("CVE-2026-1", "cve") in pairs # upper def test_reindex_then_lookup_finds_case(fresh_db): case = make_case(feed="threatfox", ips=["9.9.9.9"], severity=Severity.HIGH) db.upsert_case(case) n = lookup.reindex([case]) assert n == 1 hits = lookup.lookup("9.9.9.9") assert len(hits) == 1 assert hits[0]["case_id"] == case.case_id assert hits[0]["feed"] == "threatfox" assert hits[0]["severity"] == "high" def test_lookup_is_normalization_insensitive(fresh_db): case = make_case(feed="urlhaus", domains=["Evil.Example.COM"], severity=Severity.MEDIUM) lookup.reindex([case]) # Query with different casing than stored — still matches. assert len(lookup.lookup("evil.example.com")) == 1 assert len(lookup.lookup("EVIL.EXAMPLE.COM")) == 1 def test_lookup_miss_returns_empty(fresh_db): lookup.reindex([make_case(feed="urlhaus", ips=["1.1.1.1"])]) assert lookup.lookup("8.8.8.8") == [] def test_export_blocklist_dedupes_and_filters_by_severity(fresh_db): high = make_case(feed="feodo", ips=["10.0.0.1"], severity=Severity.HIGH) med = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.MEDIUM) dup = make_case(feed="threatfox", ips=["10.0.0.1"], severity=Severity.CRITICAL) # same IP as high lookup.reindex([high, med, dup]) all_ips = lookup.export_blocklist("ip") assert set(all_ips) == {"10.0.0.1", "10.0.0.2"} # deduped across cases high_only = lookup.export_blocklist("ip", min_severity="high") assert "10.0.0.1" in high_only # high + critical pass assert "10.0.0.2" not in high_only # medium filtered out def test_export_blocklist_rejects_bad_type(fresh_db): with pytest.raises(ValueError): lookup.export_blocklist("mutex")