stage-23: IOC index + lookup — the actionable keystone
New iocs table (value, type, case_id, feed, severity, first_seen) + lines/lookup.py: normalize() (CVE upper, rest lower), reindex() to rebuild from the corpus, lookup() (normalization-insensitive, scans all types), export_blocklist() (deduped, min-severity filter). CLI: psyc reindex / lookup <indicator> / export-blocklist --type --min-severity. Verified on the live corpus: 1288 IOCs from 598 cases; lookup of a real IP/CVE resolves to its case+feed+severity; 8.8.8.8 correctly misses; blocklist export yields 148 IPs / 289 domains / 150 URLs / 514 hashes / 108 CVEs. This primitive backs the upcoming search UI, asset matching, and watchlist alerting. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
85
tests/test_lookup.py
Normal file
85
tests/test_lookup.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Lookupline — IOC index, normalization, lookup, blocklist export."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from psyc import db
|
||||
from psyc.lines import lookup
|
||||
from psyc.models import Severity
|
||||
from conftest import make_case
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
test_db = tmp_path / "test.db"
|
||||
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
||||
db._metadata.create_all(eng, checkfirst=True)
|
||||
monkeypatch.setattr(db, "_engine", eng)
|
||||
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||
yield test_db
|
||||
|
||||
|
||||
def test_normalize_lowercases_except_cve():
|
||||
assert lookup.normalize("EVIL.COM", "domain") == "evil.com"
|
||||
assert lookup.normalize(" AbCdEf ", "hash") == "abcdef"
|
||||
assert lookup.normalize("cve-2026-0001", "cve") == "CVE-2026-0001"
|
||||
|
||||
|
||||
def test_iter_case_iocs_covers_all_types():
|
||||
case = make_case(
|
||||
feed="urlhaus",
|
||||
urls=["http://1.2.3.4/x"], domains=["EVIL.com"], ips=["1.2.3.4"],
|
||||
hashes=["AABBCC"], cves=["cve-2026-1"],
|
||||
)
|
||||
pairs = set(lookup.iter_case_iocs(case))
|
||||
assert ("http://1.2.3.4/x", "url") in pairs
|
||||
assert ("evil.com", "domain") in pairs # normalized
|
||||
assert ("1.2.3.4", "ip") in pairs
|
||||
assert ("aabbcc", "hash") in pairs # normalized
|
||||
assert ("CVE-2026-1", "cve") in pairs # upper
|
||||
|
||||
|
||||
def test_reindex_then_lookup_finds_case(fresh_db):
|
||||
case = make_case(feed="threatfox", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
n = lookup.reindex([case])
|
||||
assert n == 1
|
||||
hits = lookup.lookup("9.9.9.9")
|
||||
assert len(hits) == 1
|
||||
assert hits[0]["case_id"] == case.case_id
|
||||
assert hits[0]["feed"] == "threatfox"
|
||||
assert hits[0]["severity"] == "high"
|
||||
|
||||
|
||||
def test_lookup_is_normalization_insensitive(fresh_db):
|
||||
case = make_case(feed="urlhaus", domains=["Evil.Example.COM"], severity=Severity.MEDIUM)
|
||||
lookup.reindex([case])
|
||||
# Query with different casing than stored — still matches.
|
||||
assert len(lookup.lookup("evil.example.com")) == 1
|
||||
assert len(lookup.lookup("EVIL.EXAMPLE.COM")) == 1
|
||||
|
||||
|
||||
def test_lookup_miss_returns_empty(fresh_db):
|
||||
lookup.reindex([make_case(feed="urlhaus", ips=["1.1.1.1"])])
|
||||
assert lookup.lookup("8.8.8.8") == []
|
||||
|
||||
|
||||
def test_export_blocklist_dedupes_and_filters_by_severity(fresh_db):
|
||||
high = make_case(feed="feodo", ips=["10.0.0.1"], severity=Severity.HIGH)
|
||||
med = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.MEDIUM)
|
||||
dup = make_case(feed="threatfox", ips=["10.0.0.1"], severity=Severity.CRITICAL) # same IP as high
|
||||
lookup.reindex([high, med, dup])
|
||||
|
||||
all_ips = lookup.export_blocklist("ip")
|
||||
assert set(all_ips) == {"10.0.0.1", "10.0.0.2"} # deduped across cases
|
||||
|
||||
high_only = lookup.export_blocklist("ip", min_severity="high")
|
||||
assert "10.0.0.1" in high_only # high + critical pass
|
||||
assert "10.0.0.2" not in high_only # medium filtered out
|
||||
|
||||
|
||||
def test_export_blocklist_rejects_bad_type(fresh_db):
|
||||
with pytest.raises(ValueError):
|
||||
lookup.export_blocklist("mutex")
|
||||
Reference in New Issue
Block a user