diff --git a/src/psyc/cli.py b/src/psyc/cli.py index 60baed7..0ea83eb 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -13,7 +13,7 @@ from psyc import db, log load_dotenv() # per-dev .env (API keys) is loaded into os.environ for venv CLI from psyc.cockpit import inference -from psyc.lines import classify, courier, proof, route, scout, seal, train +from psyc.lines import classify, courier, lookup, proof, route, scout, seal, train from psyc.lines import map as map_line from psyc.models import Outcome from psyc.result import Err, Ok @@ -357,6 +357,45 @@ def reject( typer.echo(f"rejected #{pending_id}{(': ' + reason) if reason else ''}") +@app.command("reindex") +def reindex() -> None: + """Rebuild the IOC index from all cases.""" + db.init_db() # ensure the iocs table exists (idempotent) + cases = db.list_cases(limit=1_000_000) + n = lookup.reindex(cases) + typer.echo(f"indexed {n} IOC(s) from {len(cases)} case(s). total: {db.ioc_count()}") + + +@app.command("lookup") +def lookup_ioc(value: str = typer.Argument(..., help="indicator: IP, domain, URL, hash, or CVE")) -> None: + """Look up an indicator across the case corpus.""" + rows = lookup.lookup(value) + if not rows: + typer.echo(f"'{value}' — not found in the corpus (no known-bad match)") + return + typer.echo(f"'{value}' — {len(rows)} match(es):") + for r in rows: + sev = r["severity"] or "?" + typer.echo(f" [{r['ioc_type']}] {r['case_id']} feed={r['feed'] or '?'} severity={sev} seen={(r['first_seen'] or '')[:10]}") + + +@app.command("export-blocklist") +def export_blocklist( + ioc_type: str = typer.Option("ip", "--type", "-t", help=f"one of: {', '.join(lookup.IOC_TYPES)}"), + min_severity: str = typer.Option("", "--min-severity", help="low | medium | high | critical"), + out: str = typer.Option("", "--out", help="write to file instead of stdout"), +) -> None: + """Emit a deduplicated blocklist of indicators (firewall/DNS/SIEM ingestion).""" + values = lookup.export_blocklist(ioc_type, min_severity or None) + text = "\n".join(values) + if out: + from pathlib import Path as _Path + _Path(out).write_text(text + "\n", encoding="utf-8") + typer.echo(f"wrote {len(values)} {ioc_type}(s) → {out}") + else: + typer.echo(text) + + @app.command("mock-cert") def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None: uvicorn.run("psyc.mock_cert:app", host=host, port=port) diff --git a/src/psyc/db.py b/src/psyc/db.py index a177010..72cbf7b 100644 --- a/src/psyc/db.py +++ b/src/psyc/db.py @@ -82,6 +82,20 @@ pending = Table( Index("pending_status_idx", pending.c.status) Index("pending_case_idx", pending.c.case_id) +iocs = Table( + "iocs", _metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("value", String, nullable=False), # normalized indicator + Column("ioc_type", String, nullable=False), # url | domain | ip | hash | cve + Column("case_id", String, nullable=False), + Column("feed", String, nullable=True), + Column("severity", String, nullable=True), + Column("first_seen", String, nullable=True), +) +Index("iocs_value_idx", iocs.c.value) +Index("iocs_type_idx", iocs.c.ioc_type) +Index("iocs_case_idx", iocs.c.case_id) + _log = log.get(__name__) _engine: Optional[Engine] = None @@ -151,3 +165,34 @@ def case_count(db_path: Path = DB_PATH) -> int: stmt = select(func.count()).select_from(cases) with engine(db_path).connect() as conn: return conn.execute(stmt).scalar_one() + + +# ---------- IOC index ---------------------------------------------------- + +def replace_iocs(rows: List[dict], db_path: Path = DB_PATH) -> int: + """Rebuild the IOC index: clear it, then bulk-insert rows. Returns count.""" + with engine(db_path).begin() as conn: + conn.execute(iocs.delete()) + if rows: + conn.execute(iocs.insert(), rows) + return len(rows) + + +def find_iocs(value: str, db_path: Path = DB_PATH) -> List[dict]: + """Exact-match lookup of one normalized indicator. Returns matching index rows.""" + stmt = select(iocs).where(iocs.c.value == value).order_by(iocs.c.first_seen.desc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def iocs_by_type(ioc_type: str, db_path: Path = DB_PATH) -> List[dict]: + """All index rows of one type, newest first — caller filters/dedupes.""" + stmt = select(iocs).where(iocs.c.ioc_type == ioc_type).order_by(iocs.c.first_seen.desc()) + with engine(db_path).connect() as conn: + return [dict(r._mapping) for r in conn.execute(stmt).fetchall()] + + +def ioc_count(db_path: Path = DB_PATH) -> int: + stmt = select(func.count()).select_from(iocs) + with engine(db_path).connect() as conn: + return conn.execute(stmt).scalar_one() diff --git a/src/psyc/lines/lookup.py b/src/psyc/lines/lookup.py new file mode 100644 index 0000000..32a0513 --- /dev/null +++ b/src/psyc/lines/lookup.py @@ -0,0 +1,102 @@ +"""Lookupline — IOC index over the case corpus. + +Turns the collected cases into a reverse index: indicator -> which cases, +feeds, and severities mention it. This is the shared primitive behind +"paste an indicator, is it known-bad?", asset matching, and blocklist export. +Indicators are normalized so lookups are case- and format-insensitive. +""" + +from __future__ import annotations + +from typing import Dict, Iterable, List, Optional, Tuple + +from psyc import db, log +from psyc.models import Case + + +_log = log.get(__name__) + +# severity ordering for min-severity filters +_SEVERITY_RANK: Dict[str, int] = {"low": 0, "medium": 1, "high": 2, "critical": 3} + +IOC_TYPES = ("url", "domain", "ip", "hash", "cve") + + +def normalize(value: str, ioc_type: str) -> str: + """Normalize an indicator for storage + lookup. CVEs upper, everything else lower.""" + v = value.strip() + if ioc_type == "cve": + return v.upper() + return v.lower() + + +def iter_case_iocs(case: Case) -> Iterable[Tuple[str, str]]: + """Yield (normalized_value, ioc_type) for every observable on a case.""" + obs = case.observables + for u in obs.urls: + yield normalize(u, "url"), "url" + for d in obs.domains: + yield normalize(d, "domain"), "domain" + for ip in obs.ips: + yield normalize(ip, "ip"), "ip" + for h in obs.hashes: + yield normalize(h, "hash"), "hash" + for c in obs.cves: + yield normalize(c, "cve"), "cve" + + +def reindex(cases: Iterable[Case]) -> int: + """Rebuild the whole IOC index from the given cases. Returns rows written.""" + rows: List[dict] = [] + seen: set = set() + for case in cases: + feed = case.source_metadata.get("feed") + sev = case.classification.severity.value if case.classification.severity else None + first_seen = case.observed_at.isoformat() if case.observed_at else None + for value, ioc_type in iter_case_iocs(case): + if not value: + continue + key = (value, ioc_type, case.case_id) + if key in seen: + continue + seen.add(key) + rows.append(dict( + value=value, ioc_type=ioc_type, case_id=case.case_id, + feed=feed, severity=sev, first_seen=first_seen, + )) + written = db.replace_iocs(rows) + _log.info("lookup.reindexed", iocs=written, cases=len(seen)) + return written + + +def lookup(value: str) -> List[dict]: + """Look up one indicator across all types. Returns matching index rows (may be empty).""" + # Try every type's normalization so callers don't need to know the type. + candidates = {normalize(value, t) for t in IOC_TYPES} + out: List[dict] = [] + seen_ids: set = set() + for cand in candidates: + for row in db.find_iocs(cand): + if row["id"] not in seen_ids: + seen_ids.add(row["id"]) + out.append(row) + return out + + +def export_blocklist(ioc_type: str, min_severity: Optional[str] = None) -> List[str]: + """Distinct indicator values of one type, optionally filtered by min severity.""" + if ioc_type not in IOC_TYPES: + raise ValueError(f"unknown ioc_type: {ioc_type}; choices: {', '.join(IOC_TYPES)}") + floor = _SEVERITY_RANK.get(min_severity, -1) if min_severity else -1 + values: List[str] = [] + seen: set = set() + for row in db.iocs_by_type(ioc_type): + if floor >= 0: + rank = _SEVERITY_RANK.get(row["severity"] or "", -1) + if rank < floor: + continue + v = row["value"] + if v not in seen: + seen.add(v) + values.append(v) + return values diff --git a/tests/test_lookup.py b/tests/test_lookup.py new file mode 100644 index 0000000..2e8e28f --- /dev/null +++ b/tests/test_lookup.py @@ -0,0 +1,85 @@ +"""Lookupline — IOC index, normalization, lookup, blocklist export.""" + +from __future__ import annotations + +import pytest +from sqlalchemy import create_engine + +from psyc import db +from psyc.lines import lookup +from psyc.models import Severity +from conftest import make_case + + +@pytest.fixture +def fresh_db(tmp_path, monkeypatch): + test_db = tmp_path / "test.db" + eng = create_engine(f"sqlite:///{test_db}", future=True) + db._metadata.create_all(eng, checkfirst=True) + monkeypatch.setattr(db, "_engine", eng) + monkeypatch.setattr(db, "DB_PATH", test_db) + yield test_db + + +def test_normalize_lowercases_except_cve(): + assert lookup.normalize("EVIL.COM", "domain") == "evil.com" + assert lookup.normalize(" AbCdEf ", "hash") == "abcdef" + assert lookup.normalize("cve-2026-0001", "cve") == "CVE-2026-0001" + + +def test_iter_case_iocs_covers_all_types(): + case = make_case( + feed="urlhaus", + urls=["http://1.2.3.4/x"], domains=["EVIL.com"], ips=["1.2.3.4"], + hashes=["AABBCC"], cves=["cve-2026-1"], + ) + pairs = set(lookup.iter_case_iocs(case)) + assert ("http://1.2.3.4/x", "url") in pairs + assert ("evil.com", "domain") in pairs # normalized + assert ("1.2.3.4", "ip") in pairs + assert ("aabbcc", "hash") in pairs # normalized + assert ("CVE-2026-1", "cve") in pairs # upper + + +def test_reindex_then_lookup_finds_case(fresh_db): + case = make_case(feed="threatfox", ips=["9.9.9.9"], severity=Severity.HIGH) + db.upsert_case(case) + n = lookup.reindex([case]) + assert n == 1 + hits = lookup.lookup("9.9.9.9") + assert len(hits) == 1 + assert hits[0]["case_id"] == case.case_id + assert hits[0]["feed"] == "threatfox" + assert hits[0]["severity"] == "high" + + +def test_lookup_is_normalization_insensitive(fresh_db): + case = make_case(feed="urlhaus", domains=["Evil.Example.COM"], severity=Severity.MEDIUM) + lookup.reindex([case]) + # Query with different casing than stored — still matches. + assert len(lookup.lookup("evil.example.com")) == 1 + assert len(lookup.lookup("EVIL.EXAMPLE.COM")) == 1 + + +def test_lookup_miss_returns_empty(fresh_db): + lookup.reindex([make_case(feed="urlhaus", ips=["1.1.1.1"])]) + assert lookup.lookup("8.8.8.8") == [] + + +def test_export_blocklist_dedupes_and_filters_by_severity(fresh_db): + high = make_case(feed="feodo", ips=["10.0.0.1"], severity=Severity.HIGH) + med = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.MEDIUM) + dup = make_case(feed="threatfox", ips=["10.0.0.1"], severity=Severity.CRITICAL) # same IP as high + lookup.reindex([high, med, dup]) + + all_ips = lookup.export_blocklist("ip") + assert set(all_ips) == {"10.0.0.1", "10.0.0.2"} # deduped across cases + + high_only = lookup.export_blocklist("ip", min_severity="high") + assert "10.0.0.1" in high_only # high + critical pass + assert "10.0.0.2" not in high_only # medium filtered out + + +def test_export_blocklist_rejects_bad_type(fresh_db): + with pytest.raises(ValueError): + lookup.export_blocklist("mutex")