diff --git a/src/psyc/cli.py b/src/psyc/cli.py index 8af34f8..12ecdfe 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -9,6 +9,7 @@ import uvicorn from psyc import db, log from psyc.lines import classify, courier, route, scout, seal +from psyc.lines import map as map_line from psyc.models import Outcome from psyc.result import Err, Ok @@ -65,6 +66,30 @@ def classify_all() -> None: typer.echo(f"classified {len(cases)} case(s).") +@app.command("map-case") +def map_case(case_id: str) -> None: + case_result = db.get_case(case_id) + if isinstance(case_result, Err): + typer.echo(f"error: {case_result.reason}", err=True) + raise typer.Exit(1) + case = map_line.resolve(case_result.value) + db.upsert_case(case) + typer.echo(f"mapped {case.case_id}: country={case.victim.country or '—'} ips={case.observables.ips}") + + +@app.command("map-all") +def map_all(limit: int = typer.Option(50, help="max cases to process this run")) -> None: + cases = db.list_cases(limit=limit) + resolved = 0 + for c in cases: + before = c.victim.country + mapped = map_line.resolve(c) + if mapped.victim.country != before: + db.upsert_case(mapped) + resolved += 1 + typer.echo(f"resolved {resolved} new country/ies across {len(cases)} case(s).") + + @app.command("seal-keys-gen") def seal_keys_gen(recipient: str) -> None: path = seal.generate_recipient_keys(recipient) @@ -202,6 +227,9 @@ def demo() -> None: f"{case.classification.severity.value if case.classification.severity else '—'} / " f"TLP:{case.classification.tlp.value}" ) + case = map_line.resolve(case) + db.upsert_case(case) + typer.echo(f" + mapped: hosting country = {case.victim.country or '—'}") plaintext = case.model_dump_json().encode("utf-8") metadata = dict( case_id=case.case_id, diff --git a/src/psyc/lines/map.py b/src/psyc/lines/map.py new file mode 100644 index 0000000..7a69dfd --- /dev/null +++ b/src/psyc/lines/map.py @@ -0,0 +1,90 @@ +"""Mapline — victim / actor / jurisdiction resolution. + +Current worker: GeoResolver. Resolves a case's primary host to a country code +via ip-api.com (free, no auth, 45 req/min). For malicious-infrastructure +cases (URLhaus etc.) "victim.country" carries the hosting-country semantic; +documented in psyc.lines.route's destination policy. +""" + +from __future__ import annotations + +import socket +from typing import Optional +from urllib.parse import urlparse + +import httpx + +from psyc import log +from psyc.models import Case +from psyc.result import Err, Ok, Result + + +_log = log.get(__name__) + +IP_API_URL = "http://ip-api.com/json/{ip}?fields=status,country,countryCode,isp,as,message" +USER_AGENT = "psyc/0.1 (defensive CTI; hackathon prototype)" +DNS_TIMEOUT_SECONDS = 3.0 + + +def resolve(case: Case) -> Case: + if case.victim.country: + return case + ip = _case_ip(case) + if not ip: + _log.info("map.geo.no_target", case_id=case.case_id) + return case + country_result = _geoip_country(ip) + if isinstance(country_result, Err): + _log.info("map.geo.skip", case_id=case.case_id, ip=ip, reason=country_result.reason) + return case + case.victim.country = country_result.value + if ip not in case.observables.ips: + case.observables.ips.append(ip) + _log.info("map.geo.resolved", case_id=case.case_id, ip=ip, country=country_result.value) + return case + + +def _case_ip(case: Case) -> Optional[str]: + if case.observables.ips: + return case.observables.ips[0] + if case.observables.domains: + return _resolve_dns(case.observables.domains[0]) + if case.observables.urls: + host = urlparse(case.observables.urls[0]).hostname + if host: + return _resolve_dns(host) + return None + + +def _resolve_dns(host: str) -> Optional[str]: + if _looks_like_ip(host): + return host + prev_timeout = socket.getdefaulttimeout() + socket.setdefaulttimeout(DNS_TIMEOUT_SECONDS) + try: + return socket.gethostbyname(host) + except (socket.gaierror, socket.herror, socket.timeout): + return None + finally: + socket.setdefaulttimeout(prev_timeout) + + +def _looks_like_ip(host: str) -> bool: + parts = host.split(".") + return len(parts) == 4 and all(p.isdigit() and 0 <= int(p) < 256 for p in parts) + + +def _geoip_country(ip: str, timeout: float = 5.0) -> Result[str, str]: + try: + with httpx.Client(timeout=timeout, headers={"User-Agent": USER_AGENT}) as client: + r = client.get(IP_API_URL.format(ip=ip)) + r.raise_for_status() + body = r.json() + except httpx.HTTPError as exc: + return Err(f"ip-api request failed: {exc}") + if body.get("status") != "success": + return Err(f"ip-api: {body.get('message', 'unknown error')}") + code = body.get("countryCode") + if not code: + return Err("ip-api returned no countryCode") + return Ok(code)