"""Newsline — start-page digest aggregation.""" from __future__ import annotations import pytest from sqlalchemy import create_engine from psyc import db from psyc.lines import ledger as ledger_line from psyc.lines import news from psyc.models import Outcome, Severity, TLP from conftest import make_case @pytest.fixture def fresh_db(tmp_path, monkeypatch): test_db = tmp_path / "test.db" eng = create_engine(f"sqlite:///{test_db}", future=True) db._metadata.create_all(eng, checkfirst=True) monkeypatch.setattr(db, "_engine", eng) monkeypatch.setattr(db, "DB_PATH", test_db) yield test_db def test_kpis_count_basic(fresh_db): # vary age_days so make_case() doesn't collide its case_id db.upsert_case(make_case(feed="urlhaus", urls=["http://1.1.1.1/x"], severity=Severity.LOW, age_days=1)) db.upsert_case(make_case(feed="feodo", ips=["2.2.2.2"], severity=Severity.HIGH, age_days=0)) db.upsert_case(make_case(feed="urlhaus", urls=["http://3.3.3.3/x"], severity=Severity.CRITICAL, age_days=2)) k = news.kpis() assert k["cases"] == 3 assert k["high_total"] == 2 # high + critical def test_recent_items_interleaves_ledger_and_cases(fresh_db): c = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH) db.upsert_case(c) ledger_line.write( case_id=c.case_id, destination="CERT-Bund", payload_hash="", submitter_identity="x", tlp=TLP.AMBER, outcome=Outcome.ACTIONED, ) items = news.recent_items(limit=10) kinds = {i.kind for i in items} assert "case" in kinds assert "enforced" in kinds # newest-first ordering assert items == sorted(items, key=lambda i: i.timestamp, reverse=True) def test_feed_health_groups_by_feed(fresh_db): db.upsert_case(make_case(feed="urlhaus", urls=["http://a/"], age_days=1)) db.upsert_case(make_case(feed="urlhaus", urls=["http://b/"], age_days=2)) db.upsert_case(make_case(feed="otx", ips=["1.1.1.1"], age_days=1)) h = news.feed_health() by_feed = {f.feed: f for f in h} assert by_feed["urlhaus"].count == 2 assert by_feed["otx"].count == 1 # highest count first assert h[0].feed == "urlhaus" def test_bucket_items_groups_by_recency(fresh_db): from datetime import datetime, timedelta, timezone now = datetime.now(timezone.utc) items = [ news.NewsItem(timestamp=now, kind="case", headline="t1", body="", icon="•"), news.NewsItem(timestamp=now - timedelta(days=1), kind="case", headline="t2", body="", icon="•"), news.NewsItem(timestamp=now - timedelta(days=3), kind="case", headline="t3", body="", icon="•"), news.NewsItem(timestamp=now - timedelta(days=14), kind="case", headline="t4", body="", icon="•"), ] labels = [b.label for b in news.bucket_items(items)] # all four buckets should appear, in chronological order assert labels == ["Today", "Yesterday", "Earlier this week", "Older"] def test_featured_case_picks_highest_severity(fresh_db): db.upsert_case(make_case(feed="urlhaus", urls=["http://1.1.1.1/x"], severity=Severity.LOW, age_days=0)) db.upsert_case(make_case(feed="urlhaus", urls=["http://2.2.2.2/x"], severity=Severity.HIGH, age_days=1)) db.upsert_case(make_case(feed="urlhaus", urls=["http://3.3.3.3/x"], severity=Severity.CRITICAL, age_days=2)) f = news.featured_case() assert f is not None assert f.classification.severity is Severity.CRITICAL def test_featured_case_none_when_nothing_high(fresh_db): db.upsert_case(make_case(feed="urlhaus", urls=["http://1.1.1.1/x"], severity=Severity.LOW, age_days=0)) db.upsert_case(make_case(feed="urlhaus", urls=["http://2.2.2.2/x"], severity=Severity.MEDIUM, age_days=1)) assert news.featured_case() is None def test_outcome_kinds_match_render_map(fresh_db): # Every Outcome should produce a NewsItem (no KeyError). c = make_case(feed="urlhaus", ips=["1.2.3.4"]) db.upsert_case(c) for outcome in Outcome: ledger_line.write( case_id=c.case_id, destination="X", payload_hash="", submitter_identity="x", tlp=TLP.AMBER, outcome=outcome, ) items = news.recent_items(limit=200) # at least one item per outcome we wrote assert len([i for i in items if i.kind != "case"]) >= len(list(Outcome))