stage-12: Proofline — confidence scoring + brighter logo glow

New lines/proof.py: IOCChecker validates observables are well-formed,
FreshnessChecker ages observed_at into new/recent/stale/resurfaced,
ConfidenceScorer sets the Admiralty source-reliability code and an overall
confidence level. Fills case.confidence (previously left at defaults).
CLI prove-case / prove-all; folded into psyc demo. Logo glow strengthened
to a solid-cyan drop-shadow so it reads against the dark topbar.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-18 23:28:37 +02:00
parent ce375211f3
commit 94e17d4452
4 changed files with 127 additions and 5 deletions

View File

@@ -9,7 +9,7 @@ import uvicorn
from psyc import db, log
from psyc.cockpit import inference
from psyc.lines import classify, courier, route, scout, seal, train
from psyc.lines import classify, courier, proof, route, scout, seal, train
from psyc.lines import map as map_line
from psyc.models import Outcome
from psyc.result import Err, Ok
@@ -87,6 +87,27 @@ def classify_all() -> None:
typer.echo(f"classified {len(cases)} case(s).")
@app.command("prove-case")
def prove_case(case_id: str) -> None:
case_result = db.get_case(case_id)
if isinstance(case_result, Err):
typer.echo(f"error: {case_result.reason}", err=True)
raise typer.Exit(1)
case = proof.prove(case_result.value)
db.upsert_case(case)
c = case.confidence
typer.echo(f"proved {case.case_id}: confidence {c.level} · reliability {c.source_reliability}{c.information_credibility} · {c.freshness}")
@app.command("prove-all")
def prove_all() -> None:
cases = db.list_cases(limit=10_000)
for c in cases:
proof.prove(c)
db.upsert_case(c)
typer.echo(f"proved {len(cases)} case(s).")
@app.command("map-case")
def map_case(case_id: str) -> None:
case_result = db.get_case(case_id)
@@ -285,6 +306,9 @@ def demo() -> None:
f"{case.classification.severity.value if case.classification.severity else ''} / "
f"TLP:{case.classification.tlp.value}"
)
case = proof.prove(case)
db.upsert_case(case)
typer.echo(f" + proved: confidence {case.confidence.level} · reliability {case.confidence.source_reliability}{case.confidence.information_credibility} · {case.confidence.freshness}")
case = map_line.resolve(case)
db.upsert_case(case)
typer.echo(f" + mapped: hosting country = {case.victim.country or ''}")

View File

@@ -73,7 +73,7 @@ h1, h2, h3,
height: 48px; width: 160px;
object-fit: cover; object-position: center;
display: block;
filter: drop-shadow(0 0 3px var(--accent-glow)) drop-shadow(0 0 11px var(--accent-glow));
filter: drop-shadow(0 0 5px var(--accent)) drop-shadow(0 0 15px rgba(30, 200, 255, 0.5));
}
.brand-sub {
color: var(--muted); font-size: 11px;

96
src/psyc/lines/proof.py Normal file
View File

@@ -0,0 +1,96 @@
"""Proofline — validates indicators, measures freshness, scores confidence.
Workers: IOCChecker (are the observables well-formed?), FreshnessChecker (how
old is the signal?), ConfidenceScorer (Admiralty source-reliability + an
overall confidence level). Fills case.confidence; runs after Scoutline.
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from psyc import log
from psyc.models import Case
_log = log.get(__name__)
_CVE_RE = re.compile(r"^CVE-\d{4}-\d{4,}$", re.IGNORECASE)
_IPV4_RE = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
_DOMAIN_RE = re.compile(r"^[a-z0-9-]+(\.[a-z0-9-]+)+$", re.IGNORECASE)
_SHA_RE = re.compile(r"^[a-fA-F0-9]{32,64}$")
# feed -> (Admiralty source reliability A-F, information credibility 1-6)
_FEED_RELIABILITY = {
"cisa-kev": ("A", "1"), # government catalog, confirmed exploited
"urlhaus": ("B", "2"), # established CTI source, confirmed malware
"feodo": ("B", "2"), # established CTI source, confirmed C2
}
def prove(case: Case) -> Case:
_check_iocs(case)
_check_freshness(case)
_score_confidence(case)
_log.info(
"proof.scored",
case_id=case.case_id,
level=case.confidence.level,
reliability=case.confidence.source_reliability,
freshness=case.confidence.freshness,
iocs_valid=case.confidence.iocs_valid,
)
return case
def _ipv4_ok(value: str) -> bool:
if not _IPV4_RE.match(value):
return False
return all(0 <= int(part) <= 255 for part in value.split("."))
def _check_iocs(case: Case) -> None:
obs = case.observables
valid = True
for ip in obs.ips:
if not _ipv4_ok(ip):
valid = False
for cve in obs.cves:
if not _CVE_RE.match(cve):
valid = False
for h in obs.hashes:
if not _SHA_RE.match(h):
valid = False
for d in obs.domains:
# an IP used as a host is a valid observable even though it is not a domain
if not (_DOMAIN_RE.match(d) or _ipv4_ok(d)):
valid = False
case.confidence.iocs_valid = valid
def _check_freshness(case: Case) -> None:
age_days = (datetime.now(timezone.utc) - case.observed_at).days
if age_days <= 2:
case.confidence.freshness = "new"
elif age_days <= 14:
case.confidence.freshness = "recent"
elif age_days <= 90:
case.confidence.freshness = "stale"
else:
case.confidence.freshness = "resurfaced"
def _score_confidence(case: Case) -> None:
feed = case.source_metadata.get("feed", "")
reliability, credibility = _FEED_RELIABILITY.get(feed, ("C", "3"))
case.confidence.source_reliability = reliability
case.confidence.information_credibility = credibility
# level: start from source reliability, then dock for staleness / bad IOCs
level = "high" if reliability == "A" else "medium"
if case.confidence.freshness in ("stale", "resurfaced"):
level = "medium" if level == "high" else "low"
if not case.confidence.iocs_valid:
level = "low"
case.confidence.level = level

View File

@@ -55,9 +55,11 @@ class Classification(BaseModel):
class Confidence(BaseModel):
level: str = "low"
source_reliability: str = "unknown"
information_credibility: str = "unknown"
level: str = "low" # low | medium | high
source_reliability: str = "unknown" # Admiralty A-F
information_credibility: str = "unknown" # Admiralty 1-6
freshness: str = "unknown" # new | recent | stale | resurfaced
iocs_valid: bool = True
class Victim(BaseModel):