stage-4: multi-source Scoutline — CISA KEV + Feodo Tracker

Scoutline is now a source registry: urlhaus, cisa-kev, feodo. CISA KEV brings
exploit/CVE cases, Feodo Tracker brings botnet C2 cases — real incident-type
variety beyond URLhaus's malware monotone. Classifyline is source-aware
(feed tag → incident type; ransomware-flagged KEV → critical). CLI gains
fetch-cisa-kev, fetch-feodo, fetch-all. Both new feeds are keyless public
download feeds (verified).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-17 23:42:13 +02:00
parent b4c66c2e87
commit 2138611fdb
5 changed files with 157 additions and 55 deletions

View File

@@ -45,7 +45,7 @@ python3 -m virtualenv .venv
.venv/bin/pip install -e .
.venv/bin/psyc init # create the sqlite db
.venv/bin/psyc fetch-urlhaus --limit 50 # ingest a URLhaus pass
.venv/bin/psyc fetch-all # ingest URLhaus + CISA KEV + Feodo Tracker
.venv/bin/psyc serve --port 8767 # cockpit at http://127.0.0.1:8767
.venv/bin/psyc status # count of ingested cases
```

View File

@@ -30,14 +30,34 @@ def status() -> None:
typer.echo(f"cases: {db.case_count()}")
@app.command("fetch-urlhaus")
def fetch_urlhaus(limit: int = typer.Option(50, help="max rows to ingest from the feed")) -> None:
def _ingest(source: str, limit: int) -> None:
db.init_db()
typer.echo(f"fetching URLhaus recent feed (limit={limit})…")
cases = scout.fetch_and_signal(limit=limit)
typer.echo(f"fetching {source} (limit={limit})…")
cases = scout.fetch_and_signal(source, limit=limit)
for c in cases:
db.upsert_case(c)
typer.echo(f"ingested {len(cases)} case(s). total now: {db.case_count()}")
typer.echo(f"ingested {len(cases)} case(s) from {source}. total now: {db.case_count()}")
@app.command("fetch-urlhaus")
def fetch_urlhaus(limit: int = typer.Option(50, help="max rows to ingest")) -> None:
_ingest("urlhaus", limit)
@app.command("fetch-cisa-kev")
def fetch_cisa_kev(limit: int = typer.Option(100, help="max vulnerabilities to ingest")) -> None:
_ingest("cisa-kev", limit)
@app.command("fetch-feodo")
def fetch_feodo(limit: int = typer.Option(50, help="max C2 records to ingest")) -> None:
_ingest("feodo", limit)
@app.command("fetch-all")
def fetch_all() -> None:
for source, limit in (("urlhaus", 50), ("cisa-kev", 100), ("feodo", 50)):
_ingest(source, limit)
@app.command("classify-case")
@@ -250,7 +270,7 @@ def demo() -> None:
seal.generate_recipient_keys(recipient)
typer.echo(f" + generated demo keys for {recipient}")
typer.echo("fetching one URLhaus row…")
cases = scout.fetch_and_signal(limit=1)
cases = scout.fetch_and_signal("urlhaus", limit=1)
if not cases:
typer.echo("no cases ingested; URLhaus may be empty or unreachable", err=True)
raise typer.Exit(1)

View File

@@ -8,6 +8,12 @@ from psyc.models import Case, IncidentType, InternalClass, Severity, TLP
_log = log.get(__name__)
_FEED_INCIDENT = {
"urlhaus": IncidentType.MALWARE,
"feodo": IncidentType.BOTNET,
"cisa-kev": IncidentType.EXPLOIT,
}
def classify(case: Case) -> Case:
_classify_incident_type_and_tlp(case)
@@ -27,10 +33,14 @@ def classify(case: Case) -> Case:
def _classify_incident_type_and_tlp(case: Case) -> None:
if case.classification.incident_type is not None:
return
if case.source_type == "abuse_feed" and case.observables.urls:
case.classification.incident_type = IncidentType.MALWARE
if case.classification.tlp == TLP.AMBER:
case.classification.tlp = TLP.GREEN
incident = _FEED_INCIDENT.get(case.source_metadata.get("feed", ""))
if incident is None and case.observables.urls:
incident = IncidentType.MALWARE # fallback for un-tagged feeds
if incident is None:
return
case.classification.incident_type = incident
if case.classification.tlp == TLP.AMBER:
case.classification.tlp = TLP.GREEN
def _classify_severity(case: Case) -> None:
@@ -39,9 +49,14 @@ def _classify_severity(case: Case) -> None:
if case.victim.critical_infrastructure:
case.classification.severity = Severity.CRITICAL
return
if case.classification.incident_type == IncidentType.MALWARE:
url_status = case.source_metadata.get("url_status", "")
case.classification.severity = Severity.HIGH if url_status == "online" else Severity.MEDIUM
incident = case.classification.incident_type
if incident == IncidentType.EXPLOIT:
ransomware = case.source_metadata.get("ransomware", "")
case.classification.severity = Severity.CRITICAL if ransomware == "Known" else Severity.HIGH
return
if incident in (IncidentType.MALWARE, IncidentType.BOTNET):
status = case.source_metadata.get("url_status") or case.source_metadata.get("status", "")
case.classification.severity = Severity.HIGH if status == "online" else Severity.MEDIUM
return
case.classification.severity = Severity.MEDIUM

View File

@@ -56,7 +56,7 @@ DESTINATIONS: List[Destination] = [
name="CERT-Bund",
kind="authority",
max_tlp=TLP.RED,
accepts=[IncidentType.MALWARE, IncidentType.RANSOMWARE, IncidentType.PHISHING, IncidentType.EXPLOIT, IncidentType.DATA_LEAK, IncidentType.CREDENTIAL_LEAK],
accepts=[IncidentType.MALWARE, IncidentType.RANSOMWARE, IncidentType.PHISHING, IncidentType.EXPLOIT, IncidentType.BOTNET, IncidentType.DATA_LEAK, IncidentType.CREDENTIAL_LEAK],
priority=1,
payload_kind="sealed_evidence_package",
countries=["DE"],

View File

@@ -1,7 +1,9 @@
"""Scoutline — Fetcher + Signalizer for URLhaus.
"""Scoutline — multi-source Fetcher + Signalizer.
Emits raw Case objects with source metadata + observables only. Classification,
victim/actor resolution, confidence scoring, sealing, and routing are downstream.
Each source pulls a public defensive feed and emits normalized Case objects
carrying observables + source metadata only; classification, mapping, sealing,
and routing happen downstream. Sources are registered in SOURCES; the
`source_metadata["feed"]` tag lets Classifyline assign the right incident type.
"""
from __future__ import annotations
@@ -9,7 +11,7 @@ from __future__ import annotations
import csv
import io
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
from typing import Callable, Dict, Iterable, List, Optional
from urllib.parse import urlparse
import httpx
@@ -18,27 +20,33 @@ from psyc import log
from psyc.models import Case, Observables
URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/"
USER_AGENT = "psyc/0.1 (defensive CTI; hackathon prototype)"
HTTP_TIMEOUT = 30.0
URLHAUS_RECENT_CSV = "https://urlhaus.abuse.ch/downloads/csv_recent/"
CISA_KEV_JSON = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
FEODO_BLOCKLIST_JSON = "https://feodotracker.abuse.ch/downloads/ipblocklist.json"
_log = log.get(__name__)
def fetch_recent_csv(timeout: float = 30.0) -> str:
with httpx.Client(timeout=timeout, headers={"User-Agent": USER_AGENT}) as client:
resp = client.get(URLHAUS_RECENT_CSV)
def _http_get(url: str) -> httpx.Response:
with httpx.Client(timeout=HTTP_TIMEOUT, headers={"User-Agent": USER_AGENT}, follow_redirects=True) as client:
resp = client.get(url)
resp.raise_for_status()
return resp.text
return resp
def _parse_urlhaus_date(s: str) -> datetime:
def _parse_dt(value: str, fmt: str) -> datetime:
try:
return datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
return datetime.now(timezone.utc)
def parse_urlhaus_csv(csv_text: str) -> Iterable[Dict[str, str]]:
# --- URLhaus — malware distribution URLs ---------------------------------
def _parse_urlhaus_csv(csv_text: str) -> Iterable[Dict[str, str]]:
lines = [ln for ln in csv_text.splitlines() if ln and not ln.startswith("#")]
if not lines:
return
@@ -47,24 +55,16 @@ def parse_urlhaus_csv(csv_text: str) -> Iterable[Dict[str, str]]:
if len(fields) < 9:
continue
yield {
"id": fields[0],
"dateadded": fields[1],
"url": fields[2],
"url_status": fields[3],
"last_online": fields[4],
"threat": fields[5],
"tags": fields[6],
"urlhaus_link": fields[7],
"reporter": fields[8],
"id": fields[0], "dateadded": fields[1], "url": fields[2],
"url_status": fields[3], "last_online": fields[4], "threat": fields[5],
"tags": fields[6], "urlhaus_link": fields[7], "reporter": fields[8],
}
def row_to_case(row: Dict[str, str]) -> Case:
def _urlhaus_row_to_case(row: Dict[str, str]) -> Case:
url = row["url"]
parsed = urlparse(url)
host = parsed.hostname or ""
host = urlparse(url).hostname or ""
tags = [t.strip() for t in row["tags"].split(",") if t.strip()]
observables = Observables(urls=[url], domains=[host] if host else [])
summary = f"URLhaus: {row['threat'] or 'malware_distribution'} at {host or url}"
if tags:
summary += f" (tags: {', '.join(tags[:4])})"
@@ -74,22 +74,89 @@ def row_to_case(row: Dict[str, str]) -> Case:
source_type="abuse_feed",
source_ref=row["urlhaus_link"],
source_metadata=dict(
url_status=row["url_status"],
threat=row["threat"],
tags=row["tags"],
reporter=row["reporter"],
feed="urlhaus", url_status=row["url_status"], threat=row["threat"],
tags=row["tags"], reporter=row["reporter"],
),
observed_at=_parse_urlhaus_date(row["dateadded"]),
observables=observables,
observed_at=_parse_dt(row["dateadded"], "%Y-%m-%d %H:%M:%S"),
observables=Observables(urls=[url], domains=[host] if host else []),
)
def fetch_and_signal(limit: Optional[int] = None) -> List[Case]:
csv_text = fetch_recent_csv()
cases: List[Case] = []
for row in parse_urlhaus_csv(csv_text):
cases.append(row_to_case(row))
if limit is not None and len(cases) >= limit:
break
_log.info("scout.urlhaus.fetched", count=len(cases))
def _fetch_urlhaus() -> List[Case]:
text = _http_get(URLHAUS_RECENT_CSV).text
return [_urlhaus_row_to_case(r) for r in _parse_urlhaus_csv(text)]
# --- CISA KEV — known exploited vulnerabilities --------------------------
def _kev_vuln_to_case(v: Dict[str, object]) -> Case:
cve = str(v["cveID"])
vendor = str(v.get("vendorProject", ""))
product = str(v.get("product", ""))
vp = vendor if vendor == product else f"{vendor} {product}".strip()
summary = f"CISA KEV: {v.get('vulnerabilityName', cve)}"
if vp:
summary += f"{vp}"
return Case(
case_id=f"PSYC-KEV-{cve}",
summary=summary,
source_type="advisory",
source_ref=f"https://nvd.nist.gov/vuln/detail/{cve}",
source_metadata=dict(
feed="cisa-kev", vendor=vendor, product=product,
ransomware=str(v.get("knownRansomwareCampaignUse", "Unknown")),
date_added=str(v.get("dateAdded", "")), due_date=str(v.get("dueDate", "")),
),
observed_at=_parse_dt(str(v.get("dateAdded", "")), "%Y-%m-%d"),
observables=Observables(cves=[cve]),
)
def _fetch_cisa_kev() -> List[Case]:
data = _http_get(CISA_KEV_JSON).json()
return [_kev_vuln_to_case(v) for v in data.get("vulnerabilities", [])]
# --- Feodo Tracker — botnet C2 servers -----------------------------------
def _feodo_record_to_case(r: Dict[str, object]) -> Case:
ip = str(r.get("ip_address", ""))
port = str(r.get("port", ""))
malware = str(r.get("malware", "botnet"))
return Case(
case_id=f"PSYC-FEODO-{ip}-{port}",
summary=f"Feodo Tracker: {malware} botnet C2 at {ip}:{port}",
source_type="abuse_feed",
source_ref="https://feodotracker.abuse.ch/browse/",
source_metadata=dict(
feed="feodo", malware=malware, status=str(r.get("status", "")),
port=port, country=str(r.get("country", "")), as_name=str(r.get("as_name", "")),
),
observed_at=_parse_dt(str(r.get("first_seen", "")), "%Y-%m-%d %H:%M:%S"),
observables=Observables(ips=[ip] if ip else []),
)
def _fetch_feodo() -> List[Case]:
data = _http_get(FEODO_BLOCKLIST_JSON).json()
return [_feodo_record_to_case(r) for r in data]
# --- registry + dispatch -------------------------------------------------
SOURCES: Dict[str, Callable[[], List[Case]]] = {
"urlhaus": _fetch_urlhaus,
"cisa-kev": _fetch_cisa_kev,
"feodo": _fetch_feodo,
}
def fetch_and_signal(source: str, limit: Optional[int] = None) -> List[Case]:
fetcher = SOURCES.get(source)
if fetcher is None:
raise ValueError(f"unknown source: {source}; known: {', '.join(SOURCES)}")
cases = fetcher()
if limit is not None:
cases = cases[:limit]
_log.info("scout.fetched", source=source, count=len(cases))
return cases