stage-14: pytest test suite over the worker lines

38 tests covering the pure worker-line logic: Classifyline rules, Routeline
TLP/country/incident-type gates, Sealine seal/unseal round-trip, Proofline
confidence scoring, Mapline CVEResolver escalation, Trainline dataset
well-posedness (the v1/v3 input-signal bugs are now regression-guarded), and
the Scoutline feed parsers. pytest added as a dev extra.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-18 23:36:41 +02:00
parent bc61b9a3a1
commit e504b3dbcf
9 changed files with 403 additions and 0 deletions

View File

@@ -20,12 +20,18 @@ dependencies = [
"sqlalchemy>=2.0", "sqlalchemy>=2.0",
] ]
[project.optional-dependencies]
dev = ["pytest>=8.0"]
[project.scripts] [project.scripts]
psyc = "psyc.cli:app" psyc = "psyc.cli:app"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["src/psyc"] packages = ["src/psyc"]
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.ruff] [tool.ruff]
line-length = 120 line-length = 120

40
tests/conftest.py Normal file
View File

@@ -0,0 +1,40 @@
"""Shared test fixtures — Case builders for the worker-line tests."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Optional
import pytest
from psyc.models import Case, Classification, IncidentType, Observables, Severity, TLP
def make_case(
feed: str = "urlhaus",
incident_type: Optional[IncidentType] = None,
severity: Optional[Severity] = None,
tlp: TLP = TLP.AMBER,
country: str = "",
age_days: int = 1,
**observables: list,
) -> Case:
"""A Case with controllable feed, classification, age, and observables."""
case = Case(
case_id=f"TEST-{feed}-{age_days}",
summary=f"test case from {feed}",
source_type="abuse_feed",
observed_at=datetime.now(timezone.utc) - timedelta(days=age_days),
observables=Observables(**observables),
classification=Classification(incident_type=incident_type, severity=severity, tlp=tlp),
)
case.source_metadata["feed"] = feed
case.victim.country = country
return case
@pytest.fixture
def urlhaus_case() -> Case:
c = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], domains=["1.2.3.4"], ips=["1.2.3.4"])
c.source_metadata["url_status"] = "online"
return c

59
tests/test_classify.py Normal file
View File

@@ -0,0 +1,59 @@
"""Classifyline rule tests."""
from __future__ import annotations
from psyc.lines.classify import classify
from psyc.models import IncidentType, InternalClass, Severity, TLP
from conftest import make_case
def test_urlhaus_feed_is_malware():
case = classify(make_case(feed="urlhaus", urls=["http://1.2.3.4/x"]))
assert case.classification.incident_type is IncidentType.MALWARE
assert case.classification.tlp is TLP.GREEN
def test_cisa_kev_feed_is_exploit():
case = classify(make_case(feed="cisa-kev", cves=["CVE-2026-0001"]))
assert case.classification.incident_type is IncidentType.EXPLOIT
def test_feodo_feed_is_botnet():
case = classify(make_case(feed="feodo", ips=["1.2.3.4"]))
assert case.classification.incident_type is IncidentType.BOTNET
def test_malware_severity_tracks_url_status():
online = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"])
online.source_metadata["url_status"] = "online"
assert classify(online).classification.severity is Severity.HIGH
offline = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"])
offline.source_metadata["url_status"] = "offline"
assert classify(offline).classification.severity is Severity.MEDIUM
def test_ransomware_kev_is_critical():
case = make_case(feed="cisa-kev", cves=["CVE-2026-0001"])
case.source_metadata["ransomware"] = "Known"
assert classify(case).classification.severity is Severity.CRITICAL
def test_critical_infrastructure_forces_critical():
case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"])
case.victim.critical_infrastructure = True
assert classify(case).classification.severity is Severity.CRITICAL
def test_internal_class_from_severity():
assert classify(make_case(feed="cisa-kev", cves=["CVE-2026-1"])).classification.internal_class is InternalClass.C
crit = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"])
crit.victim.critical_infrastructure = True
assert classify(crit).classification.internal_class is InternalClass.A
def test_classify_is_idempotent():
case = classify(make_case(feed="urlhaus", urls=["http://1.2.3.4/x"]))
first = case.classification.model_copy(deep=True)
classify(case)
assert case.classification == first

42
tests/test_map.py Normal file
View File

@@ -0,0 +1,42 @@
"""Mapline tests — CVEResolver KEV cross-check."""
from __future__ import annotations
from psyc.lines.map import _looks_like_ip, kev_cve_set, resolve_cves
from psyc.models import Severity
from conftest import make_case
def test_kev_cve_set_only_from_kev_cases():
kev = make_case(feed="cisa-kev", cves=["CVE-2026-0001"])
urlhaus = make_case(feed="urlhaus", cves=["CVE-2099-9999"]) # not KEV-sourced
assert kev_cve_set([kev, urlhaus]) == {"CVE-2026-0001"}
def test_resolve_cves_flags_and_escalates_non_kev_case():
kev_set = {"CVE-2026-0001"}
case = make_case(feed="urlhaus", cves=["CVE-2026-0001"], severity=Severity.LOW)
resolve_cves(case, kev_set)
assert case.source_metadata["kev_cves"] == "CVE-2026-0001"
assert case.classification.severity is Severity.HIGH
def test_resolve_cves_does_not_escalate_kev_source_case():
kev_set = {"CVE-2026-0001"}
case = make_case(feed="cisa-kev", cves=["CVE-2026-0001"], severity=Severity.LOW)
resolve_cves(case, kev_set)
# its own CVE is in KEV by definition — no self-escalation
assert case.classification.severity is Severity.LOW
def test_resolve_cves_noop_without_match():
case = make_case(feed="urlhaus", cves=["CVE-2099-9999"], severity=Severity.MEDIUM)
resolve_cves(case, {"CVE-2026-0001"})
assert "kev_cves" not in case.source_metadata
assert case.classification.severity is Severity.MEDIUM
def test_looks_like_ip():
assert _looks_like_ip("8.8.8.8")
assert not _looks_like_ip("example.com")
assert not _looks_like_ip("999.1.1.1")

42
tests/test_proof.py Normal file
View File

@@ -0,0 +1,42 @@
"""Proofline confidence-scoring tests."""
from __future__ import annotations
from psyc.lines.proof import prove
from conftest import make_case
def test_kev_source_is_high_confidence():
case = prove(make_case(feed="cisa-kev", cves=["CVE-2026-0001"], age_days=1))
assert case.confidence.source_reliability == "A"
assert case.confidence.level == "high"
def test_urlhaus_source_is_medium_confidence():
case = prove(make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], age_days=1))
assert case.confidence.source_reliability == "B"
assert case.confidence.level == "medium"
def test_freshness_buckets():
assert prove(make_case(age_days=1)).confidence.freshness == "new"
assert prove(make_case(age_days=7)).confidence.freshness == "recent"
assert prove(make_case(age_days=30)).confidence.freshness == "stale"
assert prove(make_case(age_days=200)).confidence.freshness == "resurfaced"
def test_stale_kev_case_is_docked_to_medium():
case = prove(make_case(feed="cisa-kev", cves=["CVE-2026-0001"], age_days=200))
assert case.confidence.level == "medium" # high docked by staleness
def test_malformed_ioc_drops_confidence_to_low():
case = make_case(feed="cisa-kev", ips=["999.999.0.1"], age_days=1)
proved = prove(case)
assert proved.confidence.iocs_valid is False
assert proved.confidence.level == "low"
def test_valid_iocs_pass():
case = prove(make_case(feed="urlhaus", ips=["8.8.8.8"], cves=["CVE-2026-1234"], age_days=1))
assert case.confidence.iocs_valid is True

52
tests/test_route.py Normal file
View File

@@ -0,0 +1,52 @@
"""Routeline policy-gate tests."""
from __future__ import annotations
from psyc.lines.route import plan
from psyc.models import IncidentType, Severity, TLP
from conftest import make_case
def _dest_names(routes):
return {r.destination_name for r in routes}
def _blocked_reasons(blocked):
return {b.destination_name: b.reason for b in blocked}
def test_green_malware_routes_to_misp_and_urlhaus():
case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.GREEN)
routes, blocked = plan(case)
assert {"MISP-Community", "URLhaus"} <= _dest_names(routes)
def test_tlp_ceiling_blocks_abuseipdb():
# AbuseIPDB max TLP is CLEAR; a GREEN case must be blocked there
case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.GREEN)
_, blocked = plan(case)
assert _blocked_reasons(blocked).get("AbuseIPDB") == "tlp_exceeded"
def test_country_gate_blocks_cert_bund_when_not_de():
case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.AMBER, country="CN")
_, blocked = plan(case)
assert _blocked_reasons(blocked).get("CERT-Bund") == "country_mismatch"
def test_country_gate_allows_cert_bund_for_de():
case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.AMBER, country="DE")
routes, _ = plan(case)
assert "CERT-Bund" in _dest_names(routes)
def test_incident_type_gate_blocks_urlhaus_for_non_malware():
case = make_case(incident_type=IncidentType.BOTNET, severity=Severity.HIGH, tlp=TLP.GREEN)
_, blocked = plan(case)
assert _blocked_reasons(blocked).get("URLhaus") == "incident_type_mismatch"
def test_routes_sorted_by_priority():
case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.GREEN, country="DE")
routes, _ = plan(case)
assert [r.priority for r in routes] == sorted(r.priority for r in routes)

49
tests/test_scout.py Normal file
View File

@@ -0,0 +1,49 @@
"""Scoutline parser tests — feed rows to normalized Case objects."""
from __future__ import annotations
from psyc.lines.scout import _feodo_record_to_case, _kev_vuln_to_case, _parse_urlhaus_csv
URLHAUS_CSV = """\
# comment line
"3846688","2026-05-14 11:01:14","http://1.2.3.4/x","online","2026-05-14","malware_download","elf,mirai","https://urlhaus.abuse.ch/url/3846688/","reporter1"
"""
def test__parse_urlhaus_csv_skips_comments_and_parses_rows():
rows = list(_parse_urlhaus_csv(URLHAUS_CSV))
assert len(rows) == 1
assert rows[0]["url"] == "http://1.2.3.4/x"
assert rows[0]["url_status"] == "online"
def test_kev_vuln_to_case():
vuln = {
"cveID": "CVE-2026-0300",
"vendorProject": "Microsoft",
"product": "Exchange",
"vulnerabilityName": "Exchange XSS",
"dateAdded": "2026-05-15",
"knownRansomwareCampaignUse": "Known",
}
case = _kev_vuln_to_case(vuln)
assert case.case_id == "PSYC-KEV-CVE-2026-0300"
assert case.observables.cves == ["CVE-2026-0300"]
assert case.source_metadata["feed"] == "cisa-kev"
assert case.source_metadata["ransomware"] == "Known"
def test_feodo_record_to_case():
record = {
"ip_address": "162.243.103.246",
"port": 8080,
"status": "online",
"malware": "Emotet",
"country": "US",
"first_seen": "2022-06-04 21:24:53",
}
case = _feodo_record_to_case(record)
assert case.observables.ips == ["162.243.103.246"]
assert case.source_metadata["feed"] == "feodo"
assert case.source_metadata["malware"] == "Emotet"
assert case.source_metadata["status"] == "online"

58
tests/test_seal.py Normal file
View File

@@ -0,0 +1,58 @@
"""Sealine — sealed-box encryption round-trip tests."""
from __future__ import annotations
import pytest
from psyc.lines import seal
from psyc.result import Err, Ok
@pytest.fixture(autouse=True)
def _isolate_seal_dirs(tmp_path, monkeypatch):
monkeypatch.setattr(seal, "KEYS_DIR", tmp_path / "keys")
monkeypatch.setattr(seal, "SEALED_DIR", tmp_path / "sealed")
def test_seal_unseal_round_trip():
seal.generate_recipient_keys("CERT-Test")
plaintext = b'{"case": "evidence", "secret": true}'
pkg = seal.seal(plaintext, ["CERT-Test"])
assert isinstance(pkg, Ok)
out = seal.unseal(pkg.value.package_id, "CERT-Test")
assert isinstance(out, Ok)
assert out.value == plaintext
def test_seal_to_unknown_recipient_errors():
result = seal.seal(b"data", ["Nobody"])
assert isinstance(result, Err)
def test_unseal_with_wrong_recipient_errors():
seal.generate_recipient_keys("CERT-A")
seal.generate_recipient_keys("CERT-B")
pkg = seal.seal(b"data", ["CERT-A"])
assert isinstance(pkg, Ok)
# CERT-B has keys but is not on the package
assert isinstance(seal.unseal(pkg.value.package_id, "CERT-B"), Err)
def test_multi_recipient_each_can_unseal():
seal.generate_recipient_keys("CERT-Bund")
seal.generate_recipient_keys("MISP")
plaintext = b"shared evidence"
pkg = seal.seal(plaintext, ["CERT-Bund", "MISP"])
assert isinstance(pkg, Ok)
for recipient in ("CERT-Bund", "MISP"):
out = seal.unseal(pkg.value.package_id, recipient)
assert isinstance(out, Ok) and out.value == plaintext
def test_plaintext_hash_recorded():
import hashlib
seal.generate_recipient_keys("R")
plaintext = b"hash me"
pkg = seal.seal(plaintext, ["R"])
assert isinstance(pkg, Ok)
assert pkg.value.plaintext_hash == hashlib.sha256(plaintext).hexdigest()

55
tests/test_train.py Normal file
View File

@@ -0,0 +1,55 @@
"""Trainline dataset-builder tests — the well-posedness properties."""
from __future__ import annotations
import json
from psyc.lines.train import (
_ex_ioc_extraction,
_ex_severity_classification,
quality_gate,
)
from psyc.models import IncidentType, Severity, TLP
from conftest import make_case
def test_ioc_extraction_is_well_posed():
"""Every IOC in the output must also appear in the input — the v1 bug."""
case = make_case(feed="urlhaus", urls=["http://1.2.3.4:8080/x"], domains=["1.2.3.4"], ips=["1.2.3.4"])
ex = _ex_ioc_extraction(case)
assert ex is not None
output = json.loads(ex.output)
for bucket in output.values():
for ioc in bucket:
assert ioc in ex.input, f"{ioc!r} not derivable from the input"
def test_ioc_extraction_includes_cve_only_cases():
case = make_case(feed="cisa-kev", cves=["CVE-2026-0001"])
ex = _ex_ioc_extraction(case)
assert ex is not None
assert "CVE-2026-0001" in ex.input
def test_severity_input_carries_status_signal():
"""The severity task input must contain the online/offline status (v3 bug)."""
case = make_case(feed="feodo", ips=["1.2.3.4"], severity=Severity.HIGH,
incident_type=IncidentType.BOTNET)
case.source_metadata["status"] = "online"
ex = _ex_severity_classification(case)
assert ex is not None
assert "online" in ex.input
def test_quality_gate_drops_tlp_red():
case = make_case(tlp=TLP.RED, urls=["http://1.2.3.4/x"])
ex = _ex_ioc_extraction(case)
assert ex is not None
assert quality_gate(ex, case) == "tlp_red"
def test_quality_gate_passes_clean_example():
case = make_case(feed="urlhaus", tlp=TLP.GREEN, urls=["http://1.2.3.4/x"], ips=["1.2.3.4"])
ex = _ex_ioc_extraction(case)
assert ex is not None
assert quality_gate(ex, case) is None