The home page goes from a flat event stream to something that reads like a news blog: - Featured-case hero card at the top, picked as the highest-severity case (CRITICAL > HIGH, tie-break recency) from the last 7 days. Wide, with a procedurally generated SVG hero behind a gradient overlay that carries title + severity + TLP + feed + ingest time. - Recent activity is now grouped under Today / Yesterday / Earlier this week / Older bucket headers. - Each item gets a left-border severity accent (red CRITICAL, amber HIGH, muted MEDIUM/LOW) so the page is scannable at a glance. Images: new cockpit/case_visuals.py generates SVGs from case data — zero external image gen, zero curated assets. Every visual is deterministic from case_id (so a case keeps its identity across sessions) and themed to its severity: - case_hero_svg() — 880x220 hero with severity radial glow, a faint scan grid, a particle constellation with auto-connecting lines, HUD corner brackets, and the case id whispered in the bottom-right. - case_glyph_svg() — small mirror-symmetric identicon (5-grid), severity-colored, shown beside each case news item in place of an emoji icon. Two case_ids → two distinct glyphs; same id → same glyph. 7 news tests pass; visual sanity print confirms hero is deterministic and uses the right severity accent. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
103 lines
4.3 KiB
Python
103 lines
4.3 KiB
Python
"""Newsline — start-page digest aggregation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
from psyc import db
|
|
from psyc.lines import ledger as ledger_line
|
|
from psyc.lines import news
|
|
from psyc.models import Outcome, Severity, TLP
|
|
from conftest import make_case
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
test_db = tmp_path / "test.db"
|
|
eng = create_engine(f"sqlite:///{test_db}", future=True)
|
|
db._metadata.create_all(eng, checkfirst=True)
|
|
monkeypatch.setattr(db, "_engine", eng)
|
|
monkeypatch.setattr(db, "DB_PATH", test_db)
|
|
yield test_db
|
|
|
|
|
|
def test_kpis_count_basic(fresh_db):
|
|
# vary age_days so make_case() doesn't collide its case_id
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://1.1.1.1/x"], severity=Severity.LOW, age_days=1))
|
|
db.upsert_case(make_case(feed="feodo", ips=["2.2.2.2"], severity=Severity.HIGH, age_days=0))
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://3.3.3.3/x"], severity=Severity.CRITICAL, age_days=2))
|
|
k = news.kpis()
|
|
assert k["cases"] == 3
|
|
assert k["high_total"] == 2 # high + critical
|
|
|
|
|
|
def test_recent_items_interleaves_ledger_and_cases(fresh_db):
|
|
c = make_case(feed="feodo", ips=["9.9.9.9"], severity=Severity.HIGH)
|
|
db.upsert_case(c)
|
|
ledger_line.write(
|
|
case_id=c.case_id, destination="CERT-Bund", payload_hash="",
|
|
submitter_identity="x", tlp=TLP.AMBER, outcome=Outcome.ACTIONED,
|
|
)
|
|
items = news.recent_items(limit=10)
|
|
kinds = {i.kind for i in items}
|
|
assert "case" in kinds
|
|
assert "enforced" in kinds
|
|
# newest-first ordering
|
|
assert items == sorted(items, key=lambda i: i.timestamp, reverse=True)
|
|
|
|
|
|
def test_feed_health_groups_by_feed(fresh_db):
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://a/"], age_days=1))
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://b/"], age_days=2))
|
|
db.upsert_case(make_case(feed="otx", ips=["1.1.1.1"], age_days=1))
|
|
h = news.feed_health()
|
|
by_feed = {f.feed: f for f in h}
|
|
assert by_feed["urlhaus"].count == 2
|
|
assert by_feed["otx"].count == 1
|
|
# highest count first
|
|
assert h[0].feed == "urlhaus"
|
|
|
|
|
|
def test_bucket_items_groups_by_recency(fresh_db):
|
|
from datetime import datetime, timedelta, timezone
|
|
now = datetime.now(timezone.utc)
|
|
items = [
|
|
news.NewsItem(timestamp=now, kind="case", headline="t1", body="", icon="•"),
|
|
news.NewsItem(timestamp=now - timedelta(days=1), kind="case", headline="t2", body="", icon="•"),
|
|
news.NewsItem(timestamp=now - timedelta(days=3), kind="case", headline="t3", body="", icon="•"),
|
|
news.NewsItem(timestamp=now - timedelta(days=14), kind="case", headline="t4", body="", icon="•"),
|
|
]
|
|
labels = [b.label for b in news.bucket_items(items)]
|
|
# all four buckets should appear, in chronological order
|
|
assert labels == ["Today", "Yesterday", "Earlier this week", "Older"]
|
|
|
|
|
|
def test_featured_case_picks_highest_severity(fresh_db):
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://1.1.1.1/x"], severity=Severity.LOW, age_days=0))
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://2.2.2.2/x"], severity=Severity.HIGH, age_days=1))
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://3.3.3.3/x"], severity=Severity.CRITICAL, age_days=2))
|
|
f = news.featured_case()
|
|
assert f is not None
|
|
assert f.classification.severity is Severity.CRITICAL
|
|
|
|
|
|
def test_featured_case_none_when_nothing_high(fresh_db):
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://1.1.1.1/x"], severity=Severity.LOW, age_days=0))
|
|
db.upsert_case(make_case(feed="urlhaus", urls=["http://2.2.2.2/x"], severity=Severity.MEDIUM, age_days=1))
|
|
assert news.featured_case() is None
|
|
|
|
|
|
def test_outcome_kinds_match_render_map(fresh_db):
|
|
# Every Outcome should produce a NewsItem (no KeyError).
|
|
c = make_case(feed="urlhaus", ips=["1.2.3.4"])
|
|
db.upsert_case(c)
|
|
for outcome in Outcome:
|
|
ledger_line.write(
|
|
case_id=c.case_id, destination="X", payload_hash="",
|
|
submitter_identity="x", tlp=TLP.AMBER, outcome=outcome,
|
|
)
|
|
items = news.recent_items(limit=200)
|
|
# at least one item per outcome we wrote
|
|
assert len([i for i in items if i.kind != "case"]) >= len(list(Outcome))
|