diff --git a/src/psyc/cli.py b/src/psyc/cli.py index d978e6c..b953400 100644 --- a/src/psyc/cli.py +++ b/src/psyc/cli.py @@ -9,7 +9,7 @@ import uvicorn from psyc import db, log from psyc.cockpit import inference -from psyc.lines import classify, courier, route, scout, seal, train +from psyc.lines import classify, courier, proof, route, scout, seal, train from psyc.lines import map as map_line from psyc.models import Outcome from psyc.result import Err, Ok @@ -87,6 +87,27 @@ def classify_all() -> None: typer.echo(f"classified {len(cases)} case(s).") +@app.command("prove-case") +def prove_case(case_id: str) -> None: + case_result = db.get_case(case_id) + if isinstance(case_result, Err): + typer.echo(f"error: {case_result.reason}", err=True) + raise typer.Exit(1) + case = proof.prove(case_result.value) + db.upsert_case(case) + c = case.confidence + typer.echo(f"proved {case.case_id}: confidence {c.level} · reliability {c.source_reliability}{c.information_credibility} · {c.freshness}") + + +@app.command("prove-all") +def prove_all() -> None: + cases = db.list_cases(limit=10_000) + for c in cases: + proof.prove(c) + db.upsert_case(c) + typer.echo(f"proved {len(cases)} case(s).") + + @app.command("map-case") def map_case(case_id: str) -> None: case_result = db.get_case(case_id) @@ -285,6 +306,9 @@ def demo() -> None: f"{case.classification.severity.value if case.classification.severity else '—'} / " f"TLP:{case.classification.tlp.value}" ) + case = proof.prove(case) + db.upsert_case(case) + typer.echo(f" + proved: confidence {case.confidence.level} · reliability {case.confidence.source_reliability}{case.confidence.information_credibility} · {case.confidence.freshness}") case = map_line.resolve(case) db.upsert_case(case) typer.echo(f" + mapped: hosting country = {case.victim.country or '—'}") diff --git a/src/psyc/cockpit/static/cockpit.css b/src/psyc/cockpit/static/cockpit.css index 8cb156b..828479a 100644 --- a/src/psyc/cockpit/static/cockpit.css +++ b/src/psyc/cockpit/static/cockpit.css @@ -73,7 +73,7 @@ h1, h2, h3, height: 48px; width: 160px; object-fit: cover; object-position: center; display: block; - filter: drop-shadow(0 0 3px var(--accent-glow)) drop-shadow(0 0 11px var(--accent-glow)); + filter: drop-shadow(0 0 5px var(--accent)) drop-shadow(0 0 15px rgba(30, 200, 255, 0.5)); } .brand-sub { color: var(--muted); font-size: 11px; diff --git a/src/psyc/lines/proof.py b/src/psyc/lines/proof.py new file mode 100644 index 0000000..1ff5afd --- /dev/null +++ b/src/psyc/lines/proof.py @@ -0,0 +1,96 @@ +"""Proofline — validates indicators, measures freshness, scores confidence. + +Workers: IOCChecker (are the observables well-formed?), FreshnessChecker (how +old is the signal?), ConfidenceScorer (Admiralty source-reliability + an +overall confidence level). Fills case.confidence; runs after Scoutline. +""" + +from __future__ import annotations + +import re +from datetime import datetime, timezone + +from psyc import log +from psyc.models import Case + + +_log = log.get(__name__) + +_CVE_RE = re.compile(r"^CVE-\d{4}-\d{4,}$", re.IGNORECASE) +_IPV4_RE = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$") +_DOMAIN_RE = re.compile(r"^[a-z0-9-]+(\.[a-z0-9-]+)+$", re.IGNORECASE) +_SHA_RE = re.compile(r"^[a-fA-F0-9]{32,64}$") + +# feed -> (Admiralty source reliability A-F, information credibility 1-6) +_FEED_RELIABILITY = { + "cisa-kev": ("A", "1"), # government catalog, confirmed exploited + "urlhaus": ("B", "2"), # established CTI source, confirmed malware + "feodo": ("B", "2"), # established CTI source, confirmed C2 +} + + +def prove(case: Case) -> Case: + _check_iocs(case) + _check_freshness(case) + _score_confidence(case) + _log.info( + "proof.scored", + case_id=case.case_id, + level=case.confidence.level, + reliability=case.confidence.source_reliability, + freshness=case.confidence.freshness, + iocs_valid=case.confidence.iocs_valid, + ) + return case + + +def _ipv4_ok(value: str) -> bool: + if not _IPV4_RE.match(value): + return False + return all(0 <= int(part) <= 255 for part in value.split(".")) + + +def _check_iocs(case: Case) -> None: + obs = case.observables + valid = True + for ip in obs.ips: + if not _ipv4_ok(ip): + valid = False + for cve in obs.cves: + if not _CVE_RE.match(cve): + valid = False + for h in obs.hashes: + if not _SHA_RE.match(h): + valid = False + for d in obs.domains: + # an IP used as a host is a valid observable even though it is not a domain + if not (_DOMAIN_RE.match(d) or _ipv4_ok(d)): + valid = False + case.confidence.iocs_valid = valid + + +def _check_freshness(case: Case) -> None: + age_days = (datetime.now(timezone.utc) - case.observed_at).days + if age_days <= 2: + case.confidence.freshness = "new" + elif age_days <= 14: + case.confidence.freshness = "recent" + elif age_days <= 90: + case.confidence.freshness = "stale" + else: + case.confidence.freshness = "resurfaced" + + +def _score_confidence(case: Case) -> None: + feed = case.source_metadata.get("feed", "") + reliability, credibility = _FEED_RELIABILITY.get(feed, ("C", "3")) + case.confidence.source_reliability = reliability + case.confidence.information_credibility = credibility + + # level: start from source reliability, then dock for staleness / bad IOCs + level = "high" if reliability == "A" else "medium" + if case.confidence.freshness in ("stale", "resurfaced"): + level = "medium" if level == "high" else "low" + if not case.confidence.iocs_valid: + level = "low" + case.confidence.level = level diff --git a/src/psyc/models.py b/src/psyc/models.py index 748b070..f49ddca 100644 --- a/src/psyc/models.py +++ b/src/psyc/models.py @@ -55,9 +55,11 @@ class Classification(BaseModel): class Confidence(BaseModel): - level: str = "low" - source_reliability: str = "unknown" - information_credibility: str = "unknown" + level: str = "low" # low | medium | high + source_reliability: str = "unknown" # Admiralty A-F + information_credibility: str = "unknown" # Admiralty 1-6 + freshness: str = "unknown" # new | recent | stale | resurfaced + iocs_valid: bool = True class Victim(BaseModel):