Compare commits

...

2 Commits

Author SHA1 Message Date
m17hr1l
d0a71d0226 stage-24: indicator lookup page + blocklist download in cockpit
Surfaces the stage-23 index in the UI. New /lookup page: paste any
indicator (IP/domain/URL/hash/CVE) → red KNOWN-BAD banner with the
matching cases/feeds/severities, or green clean banner. New
/export/blocklist endpoint returns deduplicated plain-text indicator
lists (all or high+ severity) for firewall/DNS/SIEM ingestion, linked
from a download table on the lookup page. Lookup added to topbar nav.

Verified live: lookup of a real corpus IP returns the OTX case;
8.8.8.8 returns clean; blocklist endpoint emits 26 high-severity IPs
with a descriptive header line.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 23:55:50 +02:00
m17hr1l
9a2a31ec9a stage-23: IOC index + lookup — the actionable keystone
New iocs table (value, type, case_id, feed, severity, first_seen) +
lines/lookup.py: normalize() (CVE upper, rest lower), reindex() to
rebuild from the corpus, lookup() (normalization-insensitive, scans all
types), export_blocklist() (deduped, min-severity filter).

CLI: psyc reindex / lookup <indicator> / export-blocklist --type --min-severity.

Verified on the live corpus: 1288 IOCs from 598 cases; lookup of a real
IP/CVE resolves to its case+feed+severity; 8.8.8.8 correctly misses;
blocklist export yields 148 IPs / 289 domains / 150 URLs / 514 hashes /
108 CVEs. This primitive backs the upcoming search UI, asset matching,
and watchlist alerting.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 23:39:05 +02:00
8 changed files with 379 additions and 2 deletions

View File

@@ -13,7 +13,7 @@ from psyc import db, log
load_dotenv() # per-dev .env (API keys) is loaded into os.environ for venv CLI
from psyc.cockpit import inference
from psyc.lines import classify, courier, proof, route, scout, seal, train
from psyc.lines import classify, courier, lookup, proof, route, scout, seal, train
from psyc.lines import map as map_line
from psyc.models import Outcome
from psyc.result import Err, Ok
@@ -357,6 +357,45 @@ def reject(
typer.echo(f"rejected #{pending_id}{(': ' + reason) if reason else ''}")
@app.command("reindex")
def reindex() -> None:
"""Rebuild the IOC index from all cases."""
db.init_db() # ensure the iocs table exists (idempotent)
cases = db.list_cases(limit=1_000_000)
n = lookup.reindex(cases)
typer.echo(f"indexed {n} IOC(s) from {len(cases)} case(s). total: {db.ioc_count()}")
@app.command("lookup")
def lookup_ioc(value: str = typer.Argument(..., help="indicator: IP, domain, URL, hash, or CVE")) -> None:
"""Look up an indicator across the case corpus."""
rows = lookup.lookup(value)
if not rows:
typer.echo(f"'{value}' — not found in the corpus (no known-bad match)")
return
typer.echo(f"'{value}'{len(rows)} match(es):")
for r in rows:
sev = r["severity"] or "?"
typer.echo(f" [{r['ioc_type']}] {r['case_id']} feed={r['feed'] or '?'} severity={sev} seen={(r['first_seen'] or '')[:10]}")
@app.command("export-blocklist")
def export_blocklist(
ioc_type: str = typer.Option("ip", "--type", "-t", help=f"one of: {', '.join(lookup.IOC_TYPES)}"),
min_severity: str = typer.Option("", "--min-severity", help="low | medium | high | critical"),
out: str = typer.Option("", "--out", help="write to file instead of stdout"),
) -> None:
"""Emit a deduplicated blocklist of indicators (firewall/DNS/SIEM ingestion)."""
values = lookup.export_blocklist(ioc_type, min_severity or None)
text = "\n".join(values)
if out:
from pathlib import Path as _Path
_Path(out).write_text(text + "\n", encoding="utf-8")
typer.echo(f"wrote {len(values)} {ioc_type}(s) → {out}")
else:
typer.echo(text)
@app.command("mock-cert")
def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
uvicorn.run("psyc.mock_cert:app", host=host, port=port)

View File

@@ -6,7 +6,7 @@ from pathlib import Path
from typing import List
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
@@ -14,6 +14,7 @@ from psyc import db, log
from psyc.cockpit import inference, journey as journey_view
from psyc.lines import courier as courier_line
from psyc.lines import ledger as ledger_line
from psyc.lines import lookup as lookup_line
from psyc.lines import route as route_line
from psyc.lines import seal as seal_line
from psyc.lines import train as train_line
@@ -110,6 +111,33 @@ def inference_status() -> dict:
return {"online": adapter is not None, "adapter": adapter}
@app.get("/lookup", response_class=HTMLResponse)
def lookup_view(request: Request, q: str = "") -> HTMLResponse:
query = q.strip()
matches = lookup_line.lookup(query) if query else []
counts = {t: len(lookup_line.export_blocklist(t)) for t in lookup_line.IOC_TYPES}
return TEMPLATES.TemplateResponse(
request,
"lookup.html",
{
"query": query,
"matches": matches,
"searched": bool(query),
"total_iocs": db.ioc_count(),
"counts": counts,
},
)
@app.get("/export/blocklist", response_class=PlainTextResponse)
def export_blocklist(type: str = "ip", min_severity: str = "") -> PlainTextResponse:
if type not in lookup_line.IOC_TYPES:
raise HTTPException(status_code=400, detail=f"unknown type: {type}")
values = lookup_line.export_blocklist(type, min_severity or None)
header = f"# psyc blocklist — type={type} min_severity={min_severity or 'any'} count={len(values)}\n"
return PlainTextResponse(header + "\n".join(values) + "\n")
@app.get("/queue", response_class=HTMLResponse)
def queue_view(request: Request, status: str = "pending") -> HTMLResponse:
from psyc.models import ApprovalStatus

View File

@@ -314,3 +314,14 @@ tr.sev-low .sev-badge { color: var(--muted); }
}
.reject-reason::placeholder { color: var(--muted); }
.outcome-pending_approval { background: rgba(251, 191, 36, 0.15); color: var(--amber); border: 1px solid rgba(251, 191, 36, 0.4); }
/* ── indicator lookup ───────────────────────────────────────── */
.lookup-form { display: flex; gap: 8px; margin: 14px 0 18px; }
.lookup-input {
flex: 1; background: var(--bg); color: var(--text); border: 1px solid var(--border);
border-radius: 4px; padding: 9px 12px; font: inherit; font-size: 14px;
}
.lookup-input:focus { outline: none; border-color: var(--accent); box-shadow: 0 0 0 3px var(--accent-glow); }
.verdict { padding: 12px 16px; border-radius: 6px; margin: 14px 0; font-size: 14px; }
.verdict-bad { background: rgba(248, 113, 113, 0.12); border: 1px solid var(--red); color: var(--red); }
.verdict-clean { background: rgba(74, 222, 128, 0.10); border: 1px solid var(--green); color: var(--green); }

View File

@@ -18,6 +18,7 @@
</a>
<nav class="nav">
<a href="/cases">Cases</a>
<a href="/lookup">Lookup</a>
<a href="/queue">Queue</a>
<a href="/ledger">Ledger</a>
<a href="/train">Trainline</a>

View File

@@ -0,0 +1,66 @@
{% extends "base.html" %}
{% block title %}Lookup — psyc{% endblock %}
{% block content %}
<section class="panel">
<div class="panel-head">
<h1>Indicator Lookup</h1>
<span class="count">{{ total_iocs }} indicators indexed</span>
</div>
<p class="page-intro">Paste any indicator — IP, domain, URL, file hash, or CVE — and psyc tells you whether it's known-bad across the whole case corpus, which feed flagged it, and at what severity. This is the "is this thing dangerous?" desk check.</p>
<details class="page-help">
<summary>how to use this view</summary>
<div class="help-body">
<p><b>How to use.</b> Type or paste an indicator and hit Look up. A green banner means it's clean (not in the corpus); a red banner means it matched known threat intel — open the case to see the full context.</p>
<p><b>What you're seeing.</b> Matches come from the IOC index built across all {{ total_iocs }} indicators in the corpus. Lookup is case- and format-insensitive (EVIL.COM = evil.com).</p>
<p><b>Why it matters.</b> A defender investigating an alert needs a fast verdict on a raw indicator — and a way to push the whole known-bad set into a firewall or DNS sinkhole (see Blocklist export below).</p>
</div>
</details>
<form method="get" action="/lookup" class="lookup-form">
<input type="text" name="q" value="{{ query }}" placeholder="1.2.3.4 · evil.com · http://… · &lt;sha256&gt; · CVE-2024-3721" class="lookup-input" autofocus>
<button type="submit" class="btn btn-approve">Look up</button>
</form>
{% if searched %}
{% if matches %}
<div class="verdict verdict-bad"><strong>{{ query }}</strong> is KNOWN-BAD — {{ matches|length }} match(es) in the corpus</div>
<table class="ledger">
<thead>
<tr><th>Type</th><th>Case</th><th>Feed</th><th>Severity</th><th>First seen</th></tr>
</thead>
<tbody>
{% for m in matches %}
<tr class="ledger-row sev-{{ m.severity or 'none' }}">
<td>{{ m.ioc_type }}</td>
<td class="lg-case"><a href="/cases/{{ m.case_id }}">{{ m.case_id }}</a></td>
<td class="lg-dest">{{ m.feed or '—' }}</td>
<td>{% if m.severity %}<span class="sev-badge">{{ m.severity }}</span>{% else %}—{% endif %}</td>
<td class="lg-ts">{{ (m.first_seen or '')[:10] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="verdict verdict-clean"><strong>{{ query }}</strong> is not in the corpus — no known-bad match</div>
{% endif %}
{% endif %}
</section>
<section class="panel">
<div class="panel-head"><h2>Blocklist export</h2></div>
<p class="page-intro">Download the deduplicated set of known-bad indicators of one type as plain text — ready to paste into a firewall denylist, DNS sinkhole, or SIEM watchlist.</p>
<table class="ledger">
<thead><tr><th>Type</th><th>Count</th><th>Download (all)</th><th>Download (high+)</th></tr></thead>
<tbody>
{% for t, n in counts.items() %}
<tr class="ledger-row">
<td>{{ t }}</td>
<td>{{ n }}</td>
<td><a href="/export/blocklist?type={{ t }}" target="_blank">{{ t }} blocklist ▾</a></td>
<td><a href="/export/blocklist?type={{ t }}&min_severity=high" target="_blank">{{ t }} (high+) ▾</a></td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

View File

@@ -82,6 +82,20 @@ pending = Table(
Index("pending_status_idx", pending.c.status)
Index("pending_case_idx", pending.c.case_id)
iocs = Table(
"iocs", _metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("value", String, nullable=False), # normalized indicator
Column("ioc_type", String, nullable=False), # url | domain | ip | hash | cve
Column("case_id", String, nullable=False),
Column("feed", String, nullable=True),
Column("severity", String, nullable=True),
Column("first_seen", String, nullable=True),
)
Index("iocs_value_idx", iocs.c.value)
Index("iocs_type_idx", iocs.c.ioc_type)
Index("iocs_case_idx", iocs.c.case_id)
_log = log.get(__name__)
_engine: Optional[Engine] = None
@@ -151,3 +165,34 @@ def case_count(db_path: Path = DB_PATH) -> int:
stmt = select(func.count()).select_from(cases)
with engine(db_path).connect() as conn:
return conn.execute(stmt).scalar_one()
# ---------- IOC index ----------------------------------------------------
def replace_iocs(rows: List[dict], db_path: Path = DB_PATH) -> int:
"""Rebuild the IOC index: clear it, then bulk-insert rows. Returns count."""
with engine(db_path).begin() as conn:
conn.execute(iocs.delete())
if rows:
conn.execute(iocs.insert(), rows)
return len(rows)
def find_iocs(value: str, db_path: Path = DB_PATH) -> List[dict]:
"""Exact-match lookup of one normalized indicator. Returns matching index rows."""
stmt = select(iocs).where(iocs.c.value == value).order_by(iocs.c.first_seen.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def iocs_by_type(ioc_type: str, db_path: Path = DB_PATH) -> List[dict]:
"""All index rows of one type, newest first — caller filters/dedupes."""
stmt = select(iocs).where(iocs.c.ioc_type == ioc_type).order_by(iocs.c.first_seen.desc())
with engine(db_path).connect() as conn:
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
def ioc_count(db_path: Path = DB_PATH) -> int:
stmt = select(func.count()).select_from(iocs)
with engine(db_path).connect() as conn:
return conn.execute(stmt).scalar_one()

102
src/psyc/lines/lookup.py Normal file
View File

@@ -0,0 +1,102 @@
"""Lookupline — IOC index over the case corpus.
Turns the collected cases into a reverse index: indicator -> which cases,
feeds, and severities mention it. This is the shared primitive behind
"paste an indicator, is it known-bad?", asset matching, and blocklist export.
Indicators are normalized so lookups are case- and format-insensitive.
"""
from __future__ import annotations
from typing import Dict, Iterable, List, Optional, Tuple
from psyc import db, log
from psyc.models import Case
_log = log.get(__name__)
# severity ordering for min-severity filters
_SEVERITY_RANK: Dict[str, int] = {"low": 0, "medium": 1, "high": 2, "critical": 3}
IOC_TYPES = ("url", "domain", "ip", "hash", "cve")
def normalize(value: str, ioc_type: str) -> str:
"""Normalize an indicator for storage + lookup. CVEs upper, everything else lower."""
v = value.strip()
if ioc_type == "cve":
return v.upper()
return v.lower()
def iter_case_iocs(case: Case) -> Iterable[Tuple[str, str]]:
"""Yield (normalized_value, ioc_type) for every observable on a case."""
obs = case.observables
for u in obs.urls:
yield normalize(u, "url"), "url"
for d in obs.domains:
yield normalize(d, "domain"), "domain"
for ip in obs.ips:
yield normalize(ip, "ip"), "ip"
for h in obs.hashes:
yield normalize(h, "hash"), "hash"
for c in obs.cves:
yield normalize(c, "cve"), "cve"
def reindex(cases: Iterable[Case]) -> int:
"""Rebuild the whole IOC index from the given cases. Returns rows written."""
rows: List[dict] = []
seen: set = set()
for case in cases:
feed = case.source_metadata.get("feed")
sev = case.classification.severity.value if case.classification.severity else None
first_seen = case.observed_at.isoformat() if case.observed_at else None
for value, ioc_type in iter_case_iocs(case):
if not value:
continue
key = (value, ioc_type, case.case_id)
if key in seen:
continue
seen.add(key)
rows.append(dict(
value=value, ioc_type=ioc_type, case_id=case.case_id,
feed=feed, severity=sev, first_seen=first_seen,
))
written = db.replace_iocs(rows)
_log.info("lookup.reindexed", iocs=written, cases=len(seen))
return written
def lookup(value: str) -> List[dict]:
"""Look up one indicator across all types. Returns matching index rows (may be empty)."""
# Try every type's normalization so callers don't need to know the type.
candidates = {normalize(value, t) for t in IOC_TYPES}
out: List[dict] = []
seen_ids: set = set()
for cand in candidates:
for row in db.find_iocs(cand):
if row["id"] not in seen_ids:
seen_ids.add(row["id"])
out.append(row)
return out
def export_blocklist(ioc_type: str, min_severity: Optional[str] = None) -> List[str]:
"""Distinct indicator values of one type, optionally filtered by min severity."""
if ioc_type not in IOC_TYPES:
raise ValueError(f"unknown ioc_type: {ioc_type}; choices: {', '.join(IOC_TYPES)}")
floor = _SEVERITY_RANK.get(min_severity, -1) if min_severity else -1
values: List[str] = []
seen: set = set()
for row in db.iocs_by_type(ioc_type):
if floor >= 0:
rank = _SEVERITY_RANK.get(row["severity"] or "", -1)
if rank < floor:
continue
v = row["value"]
if v not in seen:
seen.add(v)
values.append(v)
return values

85
tests/test_lookup.py Normal file
View File

@@ -0,0 +1,85 @@
"""Lookupline — IOC index, normalization, lookup, blocklist export."""
from __future__ import annotations
import pytest
from sqlalchemy import create_engine
from psyc import db
from psyc.lines import lookup
from psyc.models import Severity
from conftest import make_case
@pytest.fixture
def fresh_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
eng = create_engine(f"sqlite:///{test_db}", future=True)
db._metadata.create_all(eng, checkfirst=True)
monkeypatch.setattr(db, "_engine", eng)
monkeypatch.setattr(db, "DB_PATH", test_db)
yield test_db
def test_normalize_lowercases_except_cve():
assert lookup.normalize("EVIL.COM", "domain") == "evil.com"
assert lookup.normalize(" AbCdEf ", "hash") == "abcdef"
assert lookup.normalize("cve-2026-0001", "cve") == "CVE-2026-0001"
def test_iter_case_iocs_covers_all_types():
case = make_case(
feed="urlhaus",
urls=["http://1.2.3.4/x"], domains=["EVIL.com"], ips=["1.2.3.4"],
hashes=["AABBCC"], cves=["cve-2026-1"],
)
pairs = set(lookup.iter_case_iocs(case))
assert ("http://1.2.3.4/x", "url") in pairs
assert ("evil.com", "domain") in pairs # normalized
assert ("1.2.3.4", "ip") in pairs
assert ("aabbcc", "hash") in pairs # normalized
assert ("CVE-2026-1", "cve") in pairs # upper
def test_reindex_then_lookup_finds_case(fresh_db):
case = make_case(feed="threatfox", ips=["9.9.9.9"], severity=Severity.HIGH)
db.upsert_case(case)
n = lookup.reindex([case])
assert n == 1
hits = lookup.lookup("9.9.9.9")
assert len(hits) == 1
assert hits[0]["case_id"] == case.case_id
assert hits[0]["feed"] == "threatfox"
assert hits[0]["severity"] == "high"
def test_lookup_is_normalization_insensitive(fresh_db):
case = make_case(feed="urlhaus", domains=["Evil.Example.COM"], severity=Severity.MEDIUM)
lookup.reindex([case])
# Query with different casing than stored — still matches.
assert len(lookup.lookup("evil.example.com")) == 1
assert len(lookup.lookup("EVIL.EXAMPLE.COM")) == 1
def test_lookup_miss_returns_empty(fresh_db):
lookup.reindex([make_case(feed="urlhaus", ips=["1.1.1.1"])])
assert lookup.lookup("8.8.8.8") == []
def test_export_blocklist_dedupes_and_filters_by_severity(fresh_db):
high = make_case(feed="feodo", ips=["10.0.0.1"], severity=Severity.HIGH)
med = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.MEDIUM)
dup = make_case(feed="threatfox", ips=["10.0.0.1"], severity=Severity.CRITICAL) # same IP as high
lookup.reindex([high, med, dup])
all_ips = lookup.export_blocklist("ip")
assert set(all_ips) == {"10.0.0.1", "10.0.0.2"} # deduped across cases
high_only = lookup.export_blocklist("ip", min_severity="high")
assert "10.0.0.1" in high_only # high + critical pass
assert "10.0.0.2" not in high_only # medium filtered out
def test_export_blocklist_rejects_bad_type(fresh_db):
with pytest.raises(ValueError):
lookup.export_blocklist("mutex")