Compare commits
2 Commits
f88db2fdf7
...
d0a71d0226
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0a71d0226 | ||
|
|
9a2a31ec9a |
@@ -13,7 +13,7 @@ from psyc import db, log
|
||||
|
||||
load_dotenv() # per-dev .env (API keys) is loaded into os.environ for venv CLI
|
||||
from psyc.cockpit import inference
|
||||
from psyc.lines import classify, courier, proof, route, scout, seal, train
|
||||
from psyc.lines import classify, courier, lookup, proof, route, scout, seal, train
|
||||
from psyc.lines import map as map_line
|
||||
from psyc.models import Outcome
|
||||
from psyc.result import Err, Ok
|
||||
@@ -357,6 +357,45 @@ def reject(
|
||||
typer.echo(f"rejected #{pending_id}{(': ' + reason) if reason else ''}")
|
||||
|
||||
|
||||
@app.command("reindex")
|
||||
def reindex() -> None:
|
||||
"""Rebuild the IOC index from all cases."""
|
||||
db.init_db() # ensure the iocs table exists (idempotent)
|
||||
cases = db.list_cases(limit=1_000_000)
|
||||
n = lookup.reindex(cases)
|
||||
typer.echo(f"indexed {n} IOC(s) from {len(cases)} case(s). total: {db.ioc_count()}")
|
||||
|
||||
|
||||
@app.command("lookup")
|
||||
def lookup_ioc(value: str = typer.Argument(..., help="indicator: IP, domain, URL, hash, or CVE")) -> None:
|
||||
"""Look up an indicator across the case corpus."""
|
||||
rows = lookup.lookup(value)
|
||||
if not rows:
|
||||
typer.echo(f"'{value}' — not found in the corpus (no known-bad match)")
|
||||
return
|
||||
typer.echo(f"'{value}' — {len(rows)} match(es):")
|
||||
for r in rows:
|
||||
sev = r["severity"] or "?"
|
||||
typer.echo(f" [{r['ioc_type']}] {r['case_id']} feed={r['feed'] or '?'} severity={sev} seen={(r['first_seen'] or '')[:10]}")
|
||||
|
||||
|
||||
@app.command("export-blocklist")
|
||||
def export_blocklist(
|
||||
ioc_type: str = typer.Option("ip", "--type", "-t", help=f"one of: {', '.join(lookup.IOC_TYPES)}"),
|
||||
min_severity: str = typer.Option("", "--min-severity", help="low | medium | high | critical"),
|
||||
out: str = typer.Option("", "--out", help="write to file instead of stdout"),
|
||||
) -> None:
|
||||
"""Emit a deduplicated blocklist of indicators (firewall/DNS/SIEM ingestion)."""
|
||||
values = lookup.export_blocklist(ioc_type, min_severity or None)
|
||||
text = "\n".join(values)
|
||||
if out:
|
||||
from pathlib import Path as _Path
|
||||
_Path(out).write_text(text + "\n", encoding="utf-8")
|
||||
typer.echo(f"wrote {len(values)} {ioc_type}(s) → {out}")
|
||||
else:
|
||||
typer.echo(text)
|
||||
|
||||
|
||||
@app.command("mock-cert")
|
||||
def mock_cert_serve(host: str = "127.0.0.1", port: int = 8770) -> None:
|
||||
uvicorn.run("psyc.mock_cert:app", host=host, port=port)
|
||||
|
||||
@@ -6,7 +6,7 @@ from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from fastapi import FastAPI, Form, HTTPException, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
@@ -14,6 +14,7 @@ from psyc import db, log
|
||||
from psyc.cockpit import inference, journey as journey_view
|
||||
from psyc.lines import courier as courier_line
|
||||
from psyc.lines import ledger as ledger_line
|
||||
from psyc.lines import lookup as lookup_line
|
||||
from psyc.lines import route as route_line
|
||||
from psyc.lines import seal as seal_line
|
||||
from psyc.lines import train as train_line
|
||||
@@ -110,6 +111,33 @@ def inference_status() -> dict:
|
||||
return {"online": adapter is not None, "adapter": adapter}
|
||||
|
||||
|
||||
@app.get("/lookup", response_class=HTMLResponse)
|
||||
def lookup_view(request: Request, q: str = "") -> HTMLResponse:
|
||||
query = q.strip()
|
||||
matches = lookup_line.lookup(query) if query else []
|
||||
counts = {t: len(lookup_line.export_blocklist(t)) for t in lookup_line.IOC_TYPES}
|
||||
return TEMPLATES.TemplateResponse(
|
||||
request,
|
||||
"lookup.html",
|
||||
{
|
||||
"query": query,
|
||||
"matches": matches,
|
||||
"searched": bool(query),
|
||||
"total_iocs": db.ioc_count(),
|
||||
"counts": counts,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/export/blocklist", response_class=PlainTextResponse)
|
||||
def export_blocklist(type: str = "ip", min_severity: str = "") -> PlainTextResponse:
|
||||
if type not in lookup_line.IOC_TYPES:
|
||||
raise HTTPException(status_code=400, detail=f"unknown type: {type}")
|
||||
values = lookup_line.export_blocklist(type, min_severity or None)
|
||||
header = f"# psyc blocklist — type={type} min_severity={min_severity or 'any'} count={len(values)}\n"
|
||||
return PlainTextResponse(header + "\n".join(values) + "\n")
|
||||
|
||||
|
||||
@app.get("/queue", response_class=HTMLResponse)
|
||||
def queue_view(request: Request, status: str = "pending") -> HTMLResponse:
|
||||
from psyc.models import ApprovalStatus
|
||||
|
||||
@@ -314,3 +314,14 @@ tr.sev-low .sev-badge { color: var(--muted); }
|
||||
}
|
||||
.reject-reason::placeholder { color: var(--muted); }
|
||||
.outcome-pending_approval { background: rgba(251, 191, 36, 0.15); color: var(--amber); border: 1px solid rgba(251, 191, 36, 0.4); }
|
||||
|
||||
/* ── indicator lookup ───────────────────────────────────────── */
|
||||
.lookup-form { display: flex; gap: 8px; margin: 14px 0 18px; }
|
||||
.lookup-input {
|
||||
flex: 1; background: var(--bg); color: var(--text); border: 1px solid var(--border);
|
||||
border-radius: 4px; padding: 9px 12px; font: inherit; font-size: 14px;
|
||||
}
|
||||
.lookup-input:focus { outline: none; border-color: var(--accent); box-shadow: 0 0 0 3px var(--accent-glow); }
|
||||
.verdict { padding: 12px 16px; border-radius: 6px; margin: 14px 0; font-size: 14px; }
|
||||
.verdict-bad { background: rgba(248, 113, 113, 0.12); border: 1px solid var(--red); color: var(--red); }
|
||||
.verdict-clean { background: rgba(74, 222, 128, 0.10); border: 1px solid var(--green); color: var(--green); }
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
</a>
|
||||
<nav class="nav">
|
||||
<a href="/cases">Cases</a>
|
||||
<a href="/lookup">Lookup</a>
|
||||
<a href="/queue">Queue</a>
|
||||
<a href="/ledger">Ledger</a>
|
||||
<a href="/train">Trainline</a>
|
||||
|
||||
66
src/psyc/cockpit/templates/lookup.html
Normal file
66
src/psyc/cockpit/templates/lookup.html
Normal file
@@ -0,0 +1,66 @@
|
||||
{% extends "base.html" %}
|
||||
{% block title %}Lookup — psyc{% endblock %}
|
||||
{% block content %}
|
||||
<section class="panel">
|
||||
<div class="panel-head">
|
||||
<h1>Indicator Lookup</h1>
|
||||
<span class="count">{{ total_iocs }} indicators indexed</span>
|
||||
</div>
|
||||
<p class="page-intro">Paste any indicator — IP, domain, URL, file hash, or CVE — and psyc tells you whether it's known-bad across the whole case corpus, which feed flagged it, and at what severity. This is the "is this thing dangerous?" desk check.</p>
|
||||
<details class="page-help">
|
||||
<summary>how to use this view</summary>
|
||||
<div class="help-body">
|
||||
<p><b>How to use.</b> Type or paste an indicator and hit Look up. A green banner means it's clean (not in the corpus); a red banner means it matched known threat intel — open the case to see the full context.</p>
|
||||
<p><b>What you're seeing.</b> Matches come from the IOC index built across all {{ total_iocs }} indicators in the corpus. Lookup is case- and format-insensitive (EVIL.COM = evil.com).</p>
|
||||
<p><b>Why it matters.</b> A defender investigating an alert needs a fast verdict on a raw indicator — and a way to push the whole known-bad set into a firewall or DNS sinkhole (see Blocklist export below).</p>
|
||||
</div>
|
||||
</details>
|
||||
|
||||
<form method="get" action="/lookup" class="lookup-form">
|
||||
<input type="text" name="q" value="{{ query }}" placeholder="1.2.3.4 · evil.com · http://… · <sha256> · CVE-2024-3721" class="lookup-input" autofocus>
|
||||
<button type="submit" class="btn btn-approve">Look up</button>
|
||||
</form>
|
||||
|
||||
{% if searched %}
|
||||
{% if matches %}
|
||||
<div class="verdict verdict-bad">⚠ <strong>{{ query }}</strong> is KNOWN-BAD — {{ matches|length }} match(es) in the corpus</div>
|
||||
<table class="ledger">
|
||||
<thead>
|
||||
<tr><th>Type</th><th>Case</th><th>Feed</th><th>Severity</th><th>First seen</th></tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for m in matches %}
|
||||
<tr class="ledger-row sev-{{ m.severity or 'none' }}">
|
||||
<td>{{ m.ioc_type }}</td>
|
||||
<td class="lg-case"><a href="/cases/{{ m.case_id }}">{{ m.case_id }}</a></td>
|
||||
<td class="lg-dest">{{ m.feed or '—' }}</td>
|
||||
<td>{% if m.severity %}<span class="sev-badge">{{ m.severity }}</span>{% else %}—{% endif %}</td>
|
||||
<td class="lg-ts">{{ (m.first_seen or '')[:10] }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
{% else %}
|
||||
<div class="verdict verdict-clean">✓ <strong>{{ query }}</strong> is not in the corpus — no known-bad match</div>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</section>
|
||||
|
||||
<section class="panel">
|
||||
<div class="panel-head"><h2>Blocklist export</h2></div>
|
||||
<p class="page-intro">Download the deduplicated set of known-bad indicators of one type as plain text — ready to paste into a firewall denylist, DNS sinkhole, or SIEM watchlist.</p>
|
||||
<table class="ledger">
|
||||
<thead><tr><th>Type</th><th>Count</th><th>Download (all)</th><th>Download (high+)</th></tr></thead>
|
||||
<tbody>
|
||||
{% for t, n in counts.items() %}
|
||||
<tr class="ledger-row">
|
||||
<td>{{ t }}</td>
|
||||
<td>{{ n }}</td>
|
||||
<td><a href="/export/blocklist?type={{ t }}" target="_blank">{{ t }} blocklist ▾</a></td>
|
||||
<td><a href="/export/blocklist?type={{ t }}&min_severity=high" target="_blank">{{ t }} (high+) ▾</a></td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</section>
|
||||
{% endblock %}
|
||||
@@ -82,6 +82,20 @@ pending = Table(
|
||||
Index("pending_status_idx", pending.c.status)
|
||||
Index("pending_case_idx", pending.c.case_id)
|
||||
|
||||
iocs = Table(
|
||||
"iocs", _metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||||
Column("value", String, nullable=False), # normalized indicator
|
||||
Column("ioc_type", String, nullable=False), # url | domain | ip | hash | cve
|
||||
Column("case_id", String, nullable=False),
|
||||
Column("feed", String, nullable=True),
|
||||
Column("severity", String, nullable=True),
|
||||
Column("first_seen", String, nullable=True),
|
||||
)
|
||||
Index("iocs_value_idx", iocs.c.value)
|
||||
Index("iocs_type_idx", iocs.c.ioc_type)
|
||||
Index("iocs_case_idx", iocs.c.case_id)
|
||||
|
||||
|
||||
_log = log.get(__name__)
|
||||
_engine: Optional[Engine] = None
|
||||
@@ -151,3 +165,34 @@ def case_count(db_path: Path = DB_PATH) -> int:
|
||||
stmt = select(func.count()).select_from(cases)
|
||||
with engine(db_path).connect() as conn:
|
||||
return conn.execute(stmt).scalar_one()
|
||||
|
||||
|
||||
# ---------- IOC index ----------------------------------------------------
|
||||
|
||||
def replace_iocs(rows: List[dict], db_path: Path = DB_PATH) -> int:
|
||||
"""Rebuild the IOC index: clear it, then bulk-insert rows. Returns count."""
|
||||
with engine(db_path).begin() as conn:
|
||||
conn.execute(iocs.delete())
|
||||
if rows:
|
||||
conn.execute(iocs.insert(), rows)
|
||||
return len(rows)
|
||||
|
||||
|
||||
def find_iocs(value: str, db_path: Path = DB_PATH) -> List[dict]:
|
||||
"""Exact-match lookup of one normalized indicator. Returns matching index rows."""
|
||||
stmt = select(iocs).where(iocs.c.value == value).order_by(iocs.c.first_seen.desc())
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
|
||||
def iocs_by_type(ioc_type: str, db_path: Path = DB_PATH) -> List[dict]:
|
||||
"""All index rows of one type, newest first — caller filters/dedupes."""
|
||||
stmt = select(iocs).where(iocs.c.ioc_type == ioc_type).order_by(iocs.c.first_seen.desc())
|
||||
with engine(db_path).connect() as conn:
|
||||
return [dict(r._mapping) for r in conn.execute(stmt).fetchall()]
|
||||
|
||||
|
||||
def ioc_count(db_path: Path = DB_PATH) -> int:
|
||||
stmt = select(func.count()).select_from(iocs)
|
||||
with engine(db_path).connect() as conn:
|
||||
return conn.execute(stmt).scalar_one()
|
||||
|
||||
102
src/psyc/lines/lookup.py
Normal file
102
src/psyc/lines/lookup.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Lookupline — IOC index over the case corpus.
|
||||
|
||||
Turns the collected cases into a reverse index: indicator -> which cases,
|
||||
feeds, and severities mention it. This is the shared primitive behind
|
||||
"paste an indicator, is it known-bad?", asset matching, and blocklist export.
|
||||
Indicators are normalized so lookups are case- and format-insensitive.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from psyc import db, log
|
||||
from psyc.models import Case
|
||||
|
||||
|
||||
_log = log.get(__name__)
|
||||
|
||||
# severity ordering for min-severity filters
|
||||
_SEVERITY_RANK: Dict[str, int] = {"low": 0, "medium": 1, "high": 2, "critical": 3}
|
||||
|
||||
IOC_TYPES = ("url", "domain", "ip", "hash", "cve")
|
||||
|
||||
|
||||
def normalize(value: str, ioc_type: str) -> str:
|
||||
"""Normalize an indicator for storage + lookup. CVEs upper, everything else lower."""
|
||||
v = value.strip()
|
||||
if ioc_type == "cve":
|
||||
return v.upper()
|
||||
return v.lower()
|
||||
|
||||
|
||||
def iter_case_iocs(case: Case) -> Iterable[Tuple[str, str]]:
|
||||
"""Yield (normalized_value, ioc_type) for every observable on a case."""
|
||||
obs = case.observables
|
||||
for u in obs.urls:
|
||||
yield normalize(u, "url"), "url"
|
||||
for d in obs.domains:
|
||||
yield normalize(d, "domain"), "domain"
|
||||
for ip in obs.ips:
|
||||
yield normalize(ip, "ip"), "ip"
|
||||
for h in obs.hashes:
|
||||
yield normalize(h, "hash"), "hash"
|
||||
for c in obs.cves:
|
||||
yield normalize(c, "cve"), "cve"
|
||||
|
||||
|
||||
def reindex(cases: Iterable[Case]) -> int:
|
||||
"""Rebuild the whole IOC index from the given cases. Returns rows written."""
|
||||
rows: List[dict] = []
|
||||
seen: set = set()
|
||||
for case in cases:
|
||||
feed = case.source_metadata.get("feed")
|
||||
sev = case.classification.severity.value if case.classification.severity else None
|
||||
first_seen = case.observed_at.isoformat() if case.observed_at else None
|
||||
for value, ioc_type in iter_case_iocs(case):
|
||||
if not value:
|
||||
continue
|
||||
key = (value, ioc_type, case.case_id)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
rows.append(dict(
|
||||
value=value, ioc_type=ioc_type, case_id=case.case_id,
|
||||
feed=feed, severity=sev, first_seen=first_seen,
|
||||
))
|
||||
written = db.replace_iocs(rows)
|
||||
_log.info("lookup.reindexed", iocs=written, cases=len(seen))
|
||||
return written
|
||||
|
||||
|
||||
def lookup(value: str) -> List[dict]:
|
||||
"""Look up one indicator across all types. Returns matching index rows (may be empty)."""
|
||||
# Try every type's normalization so callers don't need to know the type.
|
||||
candidates = {normalize(value, t) for t in IOC_TYPES}
|
||||
out: List[dict] = []
|
||||
seen_ids: set = set()
|
||||
for cand in candidates:
|
||||
for row in db.find_iocs(cand):
|
||||
if row["id"] not in seen_ids:
|
||||
seen_ids.add(row["id"])
|
||||
out.append(row)
|
||||
return out
|
||||
|
||||
|
||||
def export_blocklist(ioc_type: str, min_severity: Optional[str] = None) -> List[str]:
|
||||
"""Distinct indicator values of one type, optionally filtered by min severity."""
|
||||
if ioc_type not in IOC_TYPES:
|
||||
raise ValueError(f"unknown ioc_type: {ioc_type}; choices: {', '.join(IOC_TYPES)}")
|
||||
floor = _SEVERITY_RANK.get(min_severity, -1) if min_severity else -1
|
||||
values: List[str] = []
|
||||
seen: set = set()
|
||||
for row in db.iocs_by_type(ioc_type):
|
||||
if floor >= 0:
|
||||
rank = _SEVERITY_RANK.get(row["severity"] or "", -1)
|
||||
if rank < floor:
|
||||
continue
|
||||
v = row["value"]
|
||||
if v not in seen:
|
||||
seen.add(v)
|
||||
values.append(v)
|
||||
return values
|
||||
85
tests/test_lookup.py
Normal file
85
tests/test_lookup.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Lookupline — IOC index, normalization, lookup, blocklist export."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from psyc import db
|
||||
from psyc.lines import lookup
|
||||
from psyc.models import Severity
|
||||
from conftest import make_case
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
test_db = tmp_path / "test.db"
|
||||
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
||||
db._metadata.create_all(eng, checkfirst=True)
|
||||
monkeypatch.setattr(db, "_engine", eng)
|
||||
monkeypatch.setattr(db, "DB_PATH", test_db)
|
||||
yield test_db
|
||||
|
||||
|
||||
def test_normalize_lowercases_except_cve():
|
||||
assert lookup.normalize("EVIL.COM", "domain") == "evil.com"
|
||||
assert lookup.normalize(" AbCdEf ", "hash") == "abcdef"
|
||||
assert lookup.normalize("cve-2026-0001", "cve") == "CVE-2026-0001"
|
||||
|
||||
|
||||
def test_iter_case_iocs_covers_all_types():
|
||||
case = make_case(
|
||||
feed="urlhaus",
|
||||
urls=["http://1.2.3.4/x"], domains=["EVIL.com"], ips=["1.2.3.4"],
|
||||
hashes=["AABBCC"], cves=["cve-2026-1"],
|
||||
)
|
||||
pairs = set(lookup.iter_case_iocs(case))
|
||||
assert ("http://1.2.3.4/x", "url") in pairs
|
||||
assert ("evil.com", "domain") in pairs # normalized
|
||||
assert ("1.2.3.4", "ip") in pairs
|
||||
assert ("aabbcc", "hash") in pairs # normalized
|
||||
assert ("CVE-2026-1", "cve") in pairs # upper
|
||||
|
||||
|
||||
def test_reindex_then_lookup_finds_case(fresh_db):
|
||||
case = make_case(feed="threatfox", ips=["9.9.9.9"], severity=Severity.HIGH)
|
||||
db.upsert_case(case)
|
||||
n = lookup.reindex([case])
|
||||
assert n == 1
|
||||
hits = lookup.lookup("9.9.9.9")
|
||||
assert len(hits) == 1
|
||||
assert hits[0]["case_id"] == case.case_id
|
||||
assert hits[0]["feed"] == "threatfox"
|
||||
assert hits[0]["severity"] == "high"
|
||||
|
||||
|
||||
def test_lookup_is_normalization_insensitive(fresh_db):
|
||||
case = make_case(feed="urlhaus", domains=["Evil.Example.COM"], severity=Severity.MEDIUM)
|
||||
lookup.reindex([case])
|
||||
# Query with different casing than stored — still matches.
|
||||
assert len(lookup.lookup("evil.example.com")) == 1
|
||||
assert len(lookup.lookup("EVIL.EXAMPLE.COM")) == 1
|
||||
|
||||
|
||||
def test_lookup_miss_returns_empty(fresh_db):
|
||||
lookup.reindex([make_case(feed="urlhaus", ips=["1.1.1.1"])])
|
||||
assert lookup.lookup("8.8.8.8") == []
|
||||
|
||||
|
||||
def test_export_blocklist_dedupes_and_filters_by_severity(fresh_db):
|
||||
high = make_case(feed="feodo", ips=["10.0.0.1"], severity=Severity.HIGH)
|
||||
med = make_case(feed="urlhaus", ips=["10.0.0.2"], severity=Severity.MEDIUM)
|
||||
dup = make_case(feed="threatfox", ips=["10.0.0.1"], severity=Severity.CRITICAL) # same IP as high
|
||||
lookup.reindex([high, med, dup])
|
||||
|
||||
all_ips = lookup.export_blocklist("ip")
|
||||
assert set(all_ips) == {"10.0.0.1", "10.0.0.2"} # deduped across cases
|
||||
|
||||
high_only = lookup.export_blocklist("ip", min_severity="high")
|
||||
assert "10.0.0.1" in high_only # high + critical pass
|
||||
assert "10.0.0.2" not in high_only # medium filtered out
|
||||
|
||||
|
||||
def test_export_blocklist_rejects_bad_type(fresh_db):
|
||||
with pytest.raises(ValueError):
|
||||
lookup.export_blocklist("mutex")
|
||||
Reference in New Issue
Block a user