stage-3a: Mapline GeoResolver — host IP → country via ip-api.com

Cases now carry a resolved hosting country, which feeds the country-scoped
destination policy. CN-hosted URLhaus malware correctly stays gated off
CERT-Bund (only DE) while still firing MISP-Community + URLhaus.
psyc demo runs the map step between classify and seal.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-14 14:13:31 +02:00
parent 3f18e5aa8e
commit da4792c179
2 changed files with 118 additions and 0 deletions

View File

@@ -9,6 +9,7 @@ import uvicorn
from psyc import db, log
from psyc.lines import classify, courier, route, scout, seal
from psyc.lines import map as map_line
from psyc.models import Outcome
from psyc.result import Err, Ok
@@ -65,6 +66,30 @@ def classify_all() -> None:
typer.echo(f"classified {len(cases)} case(s).")
@app.command("map-case")
def map_case(case_id: str) -> None:
case_result = db.get_case(case_id)
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
case = map_line.resolve(case_result.value)
db.upsert_case(case)
typer.echo(f"mapped {case.case_id}: country={case.victim.country or ''} ips={case.observables.ips}")
@app.command("map-all")
def map_all(limit: int = typer.Option(50, help="max cases to process this run")) -> None:
cases = db.list_cases(limit=limit)
resolved = 0
for c in cases:
before = c.victim.country
mapped = map_line.resolve(c)
if mapped.victim.country != before:
db.upsert_case(mapped)
resolved += 1
typer.echo(f"resolved {resolved} new country/ies across {len(cases)} case(s).")
@app.command("seal-keys-gen")
def seal_keys_gen(recipient: str) -> None:
path = seal.generate_recipient_keys(recipient)
@@ -202,6 +227,9 @@ def demo() -> None:
f"{case.classification.severity.value if case.classification.severity else ''} / "
f"TLP:{case.classification.tlp.value}"
)
case = map_line.resolve(case)
db.upsert_case(case)
typer.echo(f" + mapped: hosting country = {case.victim.country or ''}")
plaintext = case.model_dump_json().encode("utf-8")
metadata = dict(
case_id=case.case_id,

90
src/psyc/lines/map.py Normal file
View File

@@ -0,0 +1,90 @@
"""Mapline — victim / actor / jurisdiction resolution.
Current worker: GeoResolver. Resolves a case's primary host to a country code
via ip-api.com (free, no auth, 45 req/min). For malicious-infrastructure
cases (URLhaus etc.) "victim.country" carries the hosting-country semantic;
documented in psyc.lines.route's destination policy.
"""
from __future__ import annotations
import socket
from typing import Optional
from urllib.parse import urlparse
import httpx
from psyc import log
from psyc.models import Case
from psyc.result import Err, Ok, Result
_log = log.get(__name__)
IP_API_URL = "http://ip-api.com/json/{ip}?fields=status,country,countryCode,isp,as,message"
USER_AGENT = "psyc/0.1 (defensive CTI; hackathon prototype)"
DNS_TIMEOUT_SECONDS = 3.0
def resolve(case: Case) -> Case:
if case.victim.country:
return case
ip = _case_ip(case)
if not ip:
_log.info("map.geo.no_target", case_id=case.case_id)
return case
country_result = _geoip_country(ip)
if isinstance(country_result, Err):
_log.info("map.geo.skip", case_id=case.case_id, ip=ip, reason=country_result.reason)
return case
case.victim.country = country_result.value
if ip not in case.observables.ips:
case.observables.ips.append(ip)
_log.info("map.geo.resolved", case_id=case.case_id, ip=ip, country=country_result.value)
return case
def _case_ip(case: Case) -> Optional[str]:
if case.observables.ips:
return case.observables.ips[0]
if case.observables.domains:
return _resolve_dns(case.observables.domains[0])
if case.observables.urls:
host = urlparse(case.observables.urls[0]).hostname
if host:
return _resolve_dns(host)
return None
def _resolve_dns(host: str) -> Optional[str]:
if _looks_like_ip(host):
return host
prev_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(DNS_TIMEOUT_SECONDS)
try:
return socket.gethostbyname(host)
except (socket.gaierror, socket.herror, socket.timeout):
return None
finally:
socket.setdefaulttimeout(prev_timeout)
def _looks_like_ip(host: str) -> bool:
parts = host.split(".")
return len(parts) == 4 and all(p.isdigit() and 0 <= int(p) < 256 for p in parts)
def _geoip_country(ip: str, timeout: float = 5.0) -> Result[str, str]:
try:
with httpx.Client(timeout=timeout, headers={"User-Agent": USER_AGENT}) as client:
r = client.get(IP_API_URL.format(ip=ip))
r.raise_for_status()
body = r.json()
except httpx.HTTPError as exc:
return Err(f"ip-api request failed: {exc}")
if body.get("status") != "success":
return Err(f"ip-api: {body.get('message', 'unknown error')}")
code = body.get("countryCode")
if not code:
return Err("ip-api returned no countryCode")
return Ok(code)