From e504b3dbcfd3b435c2fce5d2b23143ff9bdbe5d0 Mon Sep 17 00:00:00 2001 From: m17hr1l Date: Mon, 18 May 2026 23:36:41 +0200 Subject: [PATCH] stage-14: pytest test suite over the worker lines 38 tests covering the pure worker-line logic: Classifyline rules, Routeline TLP/country/incident-type gates, Sealine seal/unseal round-trip, Proofline confidence scoring, Mapline CVEResolver escalation, Trainline dataset well-posedness (the v1/v3 input-signal bugs are now regression-guarded), and the Scoutline feed parsers. pytest added as a dev extra. Co-Authored-By: Claude Opus 4.7 --- pyproject.toml | 6 +++++ tests/conftest.py | 40 ++++++++++++++++++++++++++++ tests/test_classify.py | 59 ++++++++++++++++++++++++++++++++++++++++++ tests/test_map.py | 42 ++++++++++++++++++++++++++++++ tests/test_proof.py | 42 ++++++++++++++++++++++++++++++ tests/test_route.py | 52 +++++++++++++++++++++++++++++++++++++ tests/test_scout.py | 49 +++++++++++++++++++++++++++++++++++ tests/test_seal.py | 58 +++++++++++++++++++++++++++++++++++++++++ tests/test_train.py | 55 +++++++++++++++++++++++++++++++++++++++ 9 files changed, 403 insertions(+) create mode 100644 tests/conftest.py create mode 100644 tests/test_classify.py create mode 100644 tests/test_map.py create mode 100644 tests/test_proof.py create mode 100644 tests/test_route.py create mode 100644 tests/test_scout.py create mode 100644 tests/test_seal.py create mode 100644 tests/test_train.py diff --git a/pyproject.toml b/pyproject.toml index 39b2511..3f9ab98 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,12 +20,18 @@ dependencies = [ "sqlalchemy>=2.0", ] +[project.optional-dependencies] +dev = ["pytest>=8.0"] + [project.scripts] psyc = "psyc.cli:app" [tool.hatch.build.targets.wheel] packages = ["src/psyc"] +[tool.pytest.ini_options] +testpaths = ["tests"] + [tool.ruff] line-length = 120 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..61321e0 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,40 @@ +"""Shared test fixtures — Case builders for the worker-line tests.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Optional + +import pytest + +from psyc.models import Case, Classification, IncidentType, Observables, Severity, TLP + + +def make_case( + feed: str = "urlhaus", + incident_type: Optional[IncidentType] = None, + severity: Optional[Severity] = None, + tlp: TLP = TLP.AMBER, + country: str = "", + age_days: int = 1, + **observables: list, +) -> Case: + """A Case with controllable feed, classification, age, and observables.""" + case = Case( + case_id=f"TEST-{feed}-{age_days}", + summary=f"test case from {feed}", + source_type="abuse_feed", + observed_at=datetime.now(timezone.utc) - timedelta(days=age_days), + observables=Observables(**observables), + classification=Classification(incident_type=incident_type, severity=severity, tlp=tlp), + ) + case.source_metadata["feed"] = feed + case.victim.country = country + return case + + +@pytest.fixture +def urlhaus_case() -> Case: + c = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], domains=["1.2.3.4"], ips=["1.2.3.4"]) + c.source_metadata["url_status"] = "online" + return c diff --git a/tests/test_classify.py b/tests/test_classify.py new file mode 100644 index 0000000..f97e3ea --- /dev/null +++ b/tests/test_classify.py @@ -0,0 +1,59 @@ +"""Classifyline rule tests.""" + +from __future__ import annotations + +from psyc.lines.classify import classify +from psyc.models import IncidentType, InternalClass, Severity, TLP +from conftest import make_case + + +def test_urlhaus_feed_is_malware(): + case = classify(make_case(feed="urlhaus", urls=["http://1.2.3.4/x"])) + assert case.classification.incident_type is IncidentType.MALWARE + assert case.classification.tlp is TLP.GREEN + + +def test_cisa_kev_feed_is_exploit(): + case = classify(make_case(feed="cisa-kev", cves=["CVE-2026-0001"])) + assert case.classification.incident_type is IncidentType.EXPLOIT + + +def test_feodo_feed_is_botnet(): + case = classify(make_case(feed="feodo", ips=["1.2.3.4"])) + assert case.classification.incident_type is IncidentType.BOTNET + + +def test_malware_severity_tracks_url_status(): + online = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"]) + online.source_metadata["url_status"] = "online" + assert classify(online).classification.severity is Severity.HIGH + + offline = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"]) + offline.source_metadata["url_status"] = "offline" + assert classify(offline).classification.severity is Severity.MEDIUM + + +def test_ransomware_kev_is_critical(): + case = make_case(feed="cisa-kev", cves=["CVE-2026-0001"]) + case.source_metadata["ransomware"] = "Known" + assert classify(case).classification.severity is Severity.CRITICAL + + +def test_critical_infrastructure_forces_critical(): + case = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"]) + case.victim.critical_infrastructure = True + assert classify(case).classification.severity is Severity.CRITICAL + + +def test_internal_class_from_severity(): + assert classify(make_case(feed="cisa-kev", cves=["CVE-2026-1"])).classification.internal_class is InternalClass.C + crit = make_case(feed="urlhaus", urls=["http://1.2.3.4/x"]) + crit.victim.critical_infrastructure = True + assert classify(crit).classification.internal_class is InternalClass.A + + +def test_classify_is_idempotent(): + case = classify(make_case(feed="urlhaus", urls=["http://1.2.3.4/x"])) + first = case.classification.model_copy(deep=True) + classify(case) + assert case.classification == first diff --git a/tests/test_map.py b/tests/test_map.py new file mode 100644 index 0000000..1cd2dad --- /dev/null +++ b/tests/test_map.py @@ -0,0 +1,42 @@ +"""Mapline tests — CVEResolver KEV cross-check.""" + +from __future__ import annotations + +from psyc.lines.map import _looks_like_ip, kev_cve_set, resolve_cves +from psyc.models import Severity +from conftest import make_case + + +def test_kev_cve_set_only_from_kev_cases(): + kev = make_case(feed="cisa-kev", cves=["CVE-2026-0001"]) + urlhaus = make_case(feed="urlhaus", cves=["CVE-2099-9999"]) # not KEV-sourced + assert kev_cve_set([kev, urlhaus]) == {"CVE-2026-0001"} + + +def test_resolve_cves_flags_and_escalates_non_kev_case(): + kev_set = {"CVE-2026-0001"} + case = make_case(feed="urlhaus", cves=["CVE-2026-0001"], severity=Severity.LOW) + resolve_cves(case, kev_set) + assert case.source_metadata["kev_cves"] == "CVE-2026-0001" + assert case.classification.severity is Severity.HIGH + + +def test_resolve_cves_does_not_escalate_kev_source_case(): + kev_set = {"CVE-2026-0001"} + case = make_case(feed="cisa-kev", cves=["CVE-2026-0001"], severity=Severity.LOW) + resolve_cves(case, kev_set) + # its own CVE is in KEV by definition — no self-escalation + assert case.classification.severity is Severity.LOW + + +def test_resolve_cves_noop_without_match(): + case = make_case(feed="urlhaus", cves=["CVE-2099-9999"], severity=Severity.MEDIUM) + resolve_cves(case, {"CVE-2026-0001"}) + assert "kev_cves" not in case.source_metadata + assert case.classification.severity is Severity.MEDIUM + + +def test_looks_like_ip(): + assert _looks_like_ip("8.8.8.8") + assert not _looks_like_ip("example.com") + assert not _looks_like_ip("999.1.1.1") diff --git a/tests/test_proof.py b/tests/test_proof.py new file mode 100644 index 0000000..67b5550 --- /dev/null +++ b/tests/test_proof.py @@ -0,0 +1,42 @@ +"""Proofline confidence-scoring tests.""" + +from __future__ import annotations + +from psyc.lines.proof import prove +from conftest import make_case + + +def test_kev_source_is_high_confidence(): + case = prove(make_case(feed="cisa-kev", cves=["CVE-2026-0001"], age_days=1)) + assert case.confidence.source_reliability == "A" + assert case.confidence.level == "high" + + +def test_urlhaus_source_is_medium_confidence(): + case = prove(make_case(feed="urlhaus", urls=["http://1.2.3.4/x"], age_days=1)) + assert case.confidence.source_reliability == "B" + assert case.confidence.level == "medium" + + +def test_freshness_buckets(): + assert prove(make_case(age_days=1)).confidence.freshness == "new" + assert prove(make_case(age_days=7)).confidence.freshness == "recent" + assert prove(make_case(age_days=30)).confidence.freshness == "stale" + assert prove(make_case(age_days=200)).confidence.freshness == "resurfaced" + + +def test_stale_kev_case_is_docked_to_medium(): + case = prove(make_case(feed="cisa-kev", cves=["CVE-2026-0001"], age_days=200)) + assert case.confidence.level == "medium" # high docked by staleness + + +def test_malformed_ioc_drops_confidence_to_low(): + case = make_case(feed="cisa-kev", ips=["999.999.0.1"], age_days=1) + proved = prove(case) + assert proved.confidence.iocs_valid is False + assert proved.confidence.level == "low" + + +def test_valid_iocs_pass(): + case = prove(make_case(feed="urlhaus", ips=["8.8.8.8"], cves=["CVE-2026-1234"], age_days=1)) + assert case.confidence.iocs_valid is True diff --git a/tests/test_route.py b/tests/test_route.py new file mode 100644 index 0000000..e2fcd7f --- /dev/null +++ b/tests/test_route.py @@ -0,0 +1,52 @@ +"""Routeline policy-gate tests.""" + +from __future__ import annotations + +from psyc.lines.route import plan +from psyc.models import IncidentType, Severity, TLP +from conftest import make_case + + +def _dest_names(routes): + return {r.destination_name for r in routes} + + +def _blocked_reasons(blocked): + return {b.destination_name: b.reason for b in blocked} + + +def test_green_malware_routes_to_misp_and_urlhaus(): + case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.GREEN) + routes, blocked = plan(case) + assert {"MISP-Community", "URLhaus"} <= _dest_names(routes) + + +def test_tlp_ceiling_blocks_abuseipdb(): + # AbuseIPDB max TLP is CLEAR; a GREEN case must be blocked there + case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.GREEN) + _, blocked = plan(case) + assert _blocked_reasons(blocked).get("AbuseIPDB") == "tlp_exceeded" + + +def test_country_gate_blocks_cert_bund_when_not_de(): + case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.AMBER, country="CN") + _, blocked = plan(case) + assert _blocked_reasons(blocked).get("CERT-Bund") == "country_mismatch" + + +def test_country_gate_allows_cert_bund_for_de(): + case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.AMBER, country="DE") + routes, _ = plan(case) + assert "CERT-Bund" in _dest_names(routes) + + +def test_incident_type_gate_blocks_urlhaus_for_non_malware(): + case = make_case(incident_type=IncidentType.BOTNET, severity=Severity.HIGH, tlp=TLP.GREEN) + _, blocked = plan(case) + assert _blocked_reasons(blocked).get("URLhaus") == "incident_type_mismatch" + + +def test_routes_sorted_by_priority(): + case = make_case(incident_type=IncidentType.MALWARE, severity=Severity.HIGH, tlp=TLP.GREEN, country="DE") + routes, _ = plan(case) + assert [r.priority for r in routes] == sorted(r.priority for r in routes) diff --git a/tests/test_scout.py b/tests/test_scout.py new file mode 100644 index 0000000..831c1e4 --- /dev/null +++ b/tests/test_scout.py @@ -0,0 +1,49 @@ +"""Scoutline parser tests — feed rows to normalized Case objects.""" + +from __future__ import annotations + +from psyc.lines.scout import _feodo_record_to_case, _kev_vuln_to_case, _parse_urlhaus_csv + +URLHAUS_CSV = """\ +# comment line +"3846688","2026-05-14 11:01:14","http://1.2.3.4/x","online","2026-05-14","malware_download","elf,mirai","https://urlhaus.abuse.ch/url/3846688/","reporter1" +""" + + +def test__parse_urlhaus_csv_skips_comments_and_parses_rows(): + rows = list(_parse_urlhaus_csv(URLHAUS_CSV)) + assert len(rows) == 1 + assert rows[0]["url"] == "http://1.2.3.4/x" + assert rows[0]["url_status"] == "online" + + +def test_kev_vuln_to_case(): + vuln = { + "cveID": "CVE-2026-0300", + "vendorProject": "Microsoft", + "product": "Exchange", + "vulnerabilityName": "Exchange XSS", + "dateAdded": "2026-05-15", + "knownRansomwareCampaignUse": "Known", + } + case = _kev_vuln_to_case(vuln) + assert case.case_id == "PSYC-KEV-CVE-2026-0300" + assert case.observables.cves == ["CVE-2026-0300"] + assert case.source_metadata["feed"] == "cisa-kev" + assert case.source_metadata["ransomware"] == "Known" + + +def test_feodo_record_to_case(): + record = { + "ip_address": "162.243.103.246", + "port": 8080, + "status": "online", + "malware": "Emotet", + "country": "US", + "first_seen": "2022-06-04 21:24:53", + } + case = _feodo_record_to_case(record) + assert case.observables.ips == ["162.243.103.246"] + assert case.source_metadata["feed"] == "feodo" + assert case.source_metadata["malware"] == "Emotet" + assert case.source_metadata["status"] == "online" diff --git a/tests/test_seal.py b/tests/test_seal.py new file mode 100644 index 0000000..60d01dc --- /dev/null +++ b/tests/test_seal.py @@ -0,0 +1,58 @@ +"""Sealine — sealed-box encryption round-trip tests.""" + +from __future__ import annotations + +import pytest + +from psyc.lines import seal +from psyc.result import Err, Ok + + +@pytest.fixture(autouse=True) +def _isolate_seal_dirs(tmp_path, monkeypatch): + monkeypatch.setattr(seal, "KEYS_DIR", tmp_path / "keys") + monkeypatch.setattr(seal, "SEALED_DIR", tmp_path / "sealed") + + +def test_seal_unseal_round_trip(): + seal.generate_recipient_keys("CERT-Test") + plaintext = b'{"case": "evidence", "secret": true}' + pkg = seal.seal(plaintext, ["CERT-Test"]) + assert isinstance(pkg, Ok) + out = seal.unseal(pkg.value.package_id, "CERT-Test") + assert isinstance(out, Ok) + assert out.value == plaintext + + +def test_seal_to_unknown_recipient_errors(): + result = seal.seal(b"data", ["Nobody"]) + assert isinstance(result, Err) + + +def test_unseal_with_wrong_recipient_errors(): + seal.generate_recipient_keys("CERT-A") + seal.generate_recipient_keys("CERT-B") + pkg = seal.seal(b"data", ["CERT-A"]) + assert isinstance(pkg, Ok) + # CERT-B has keys but is not on the package + assert isinstance(seal.unseal(pkg.value.package_id, "CERT-B"), Err) + + +def test_multi_recipient_each_can_unseal(): + seal.generate_recipient_keys("CERT-Bund") + seal.generate_recipient_keys("MISP") + plaintext = b"shared evidence" + pkg = seal.seal(plaintext, ["CERT-Bund", "MISP"]) + assert isinstance(pkg, Ok) + for recipient in ("CERT-Bund", "MISP"): + out = seal.unseal(pkg.value.package_id, recipient) + assert isinstance(out, Ok) and out.value == plaintext + + +def test_plaintext_hash_recorded(): + import hashlib + seal.generate_recipient_keys("R") + plaintext = b"hash me" + pkg = seal.seal(plaintext, ["R"]) + assert isinstance(pkg, Ok) + assert pkg.value.plaintext_hash == hashlib.sha256(plaintext).hexdigest() diff --git a/tests/test_train.py b/tests/test_train.py new file mode 100644 index 0000000..e7ab3cc --- /dev/null +++ b/tests/test_train.py @@ -0,0 +1,55 @@ +"""Trainline dataset-builder tests — the well-posedness properties.""" + +from __future__ import annotations + +import json + +from psyc.lines.train import ( + _ex_ioc_extraction, + _ex_severity_classification, + quality_gate, +) +from psyc.models import IncidentType, Severity, TLP +from conftest import make_case + + +def test_ioc_extraction_is_well_posed(): + """Every IOC in the output must also appear in the input — the v1 bug.""" + case = make_case(feed="urlhaus", urls=["http://1.2.3.4:8080/x"], domains=["1.2.3.4"], ips=["1.2.3.4"]) + ex = _ex_ioc_extraction(case) + assert ex is not None + output = json.loads(ex.output) + for bucket in output.values(): + for ioc in bucket: + assert ioc in ex.input, f"{ioc!r} not derivable from the input" + + +def test_ioc_extraction_includes_cve_only_cases(): + case = make_case(feed="cisa-kev", cves=["CVE-2026-0001"]) + ex = _ex_ioc_extraction(case) + assert ex is not None + assert "CVE-2026-0001" in ex.input + + +def test_severity_input_carries_status_signal(): + """The severity task input must contain the online/offline status (v3 bug).""" + case = make_case(feed="feodo", ips=["1.2.3.4"], severity=Severity.HIGH, + incident_type=IncidentType.BOTNET) + case.source_metadata["status"] = "online" + ex = _ex_severity_classification(case) + assert ex is not None + assert "online" in ex.input + + +def test_quality_gate_drops_tlp_red(): + case = make_case(tlp=TLP.RED, urls=["http://1.2.3.4/x"]) + ex = _ex_ioc_extraction(case) + assert ex is not None + assert quality_gate(ex, case) == "tlp_red" + + +def test_quality_gate_passes_clean_example(): + case = make_case(feed="urlhaus", tlp=TLP.GREEN, urls=["http://1.2.3.4/x"], ips=["1.2.3.4"]) + ex = _ex_ioc_extraction(case) + assert ex is not None + assert quality_gate(ex, case) is None