diff --git a/src/psyc/cockpit/adminauth.py b/src/psyc/cockpit/adminauth.py index 4cb6feb..a3e0630 100644 --- a/src/psyc/cockpit/adminauth.py +++ b/src/psyc/cockpit/adminauth.py @@ -1,9 +1,11 @@ -"""Admin gate — TOTP (authenticator-app) auth for the hidden /admin zone. +"""Admin gate — per-member TOTP auth for the hidden /admin zone. -A single TOTP secret is provisioned by scanning a QR code into an authenticator -app (Google Authenticator, Authy, …). After that, entry to /admin requires the -current rotating 6-digit code. Secret + session key persist under DATA_DIR so -they survive restarts; the file is gitignored with the rest of data/. +Each project member enrolls their own authenticator (own secret, own QR) under +a named slot. Login accepts a code matching ANY active member, so offboarding +is a per-member revoke — no shared secret, no re-enrolling everyone when one +person leaves. The first visit bootstraps an "owner" slot; further members are +added from inside the authenticated admin panel. State persists under DATA_DIR +(gitignored). """ from __future__ import annotations @@ -12,8 +14,8 @@ import base64 import io import json import secrets -from pathlib import Path -from typing import Tuple +from datetime import datetime, timezone +from typing import List, Optional, Tuple import pyotp import qrcode @@ -25,20 +27,37 @@ _log = log.get(__name__) _STATE_PATH = DATA_DIR / "admin_auth.json" _ISSUER = "psyc" -_ACCOUNT = "admin" + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() def _load() -> dict: if _STATE_PATH.exists(): - return json.loads(_STATE_PATH.read_text()) - state = { - "totp_secret": pyotp.random_base32(), + data = json.loads(_STATE_PATH.read_text()) + # Migrate the old single-secret format → member list. + if "members" not in data: + members = [] + pending = data.get("totp_secret") + if data.get("provisioned") and data.get("totp_secret"): + members = [_new_member("owner", data["totp_secret"])] + pending = None + data = { + "session_secret": data.get("session_secret", secrets.token_urlsafe(32)), + "pending_secret": pending, + "members": members, + } + _save(data) + return data + data = { "session_secret": secrets.token_urlsafe(32), - "provisioned": False, + "pending_secret": pyotp.random_base32(), + "members": [], } - _save(state) + _save(data) _log.info("adminauth.initialized", path=str(_STATE_PATH)) - return state + return data def _save(state: dict) -> None: @@ -46,43 +65,115 @@ def _save(state: dict) -> None: _STATE_PATH.write_text(json.dumps(state, indent=2)) +def _new_member(label: str, secret: str) -> dict: + return { + "id": secrets.token_hex(4), + "label": label, + "secret": secret, + "created_at": _now(), + "active": True, + "last_used": None, + } + + def session_secret() -> str: return _load()["session_secret"] -def is_provisioned() -> bool: - return bool(_load().get("provisioned")) +def members() -> List[dict]: + """Active members, without exposing their secrets.""" + return [ + {k: m[k] for k in ("id", "label", "created_at", "last_used")} + for m in _load()["members"] if m.get("active") + ] -def mark_provisioned() -> None: +def is_bootstrapped() -> bool: + return any(m.get("active") for m in _load()["members"]) + + +def verify(code: str) -> Optional[str]: + """Check a code against every active member (and the bootstrap slot). + + Returns the matched member label, or None. Updates last_used; promotes the + pending bootstrap secret into the first 'owner' member on first success. + """ + code = code.strip() state = _load() - if not state.get("provisioned"): - state["provisioned"] = True + for m in state["members"]: + if m.get("active") and pyotp.TOTP(m["secret"]).verify(code, valid_window=1): + m["last_used"] = _now() + _save(state) + _log.info("adminauth.verify.ok", member=m["label"]) + return m["label"] + # Bootstrap: no active members yet → accept the pending secret as owner. + if not any(m.get("active") for m in state["members"]) and state.get("pending_secret"): + if pyotp.TOTP(state["pending_secret"]).verify(code, valid_window=1): + owner = _new_member("owner", state["pending_secret"]) + owner["last_used"] = _now() + state["members"].append(owner) + state["pending_secret"] = None + _save(state) + _log.info("adminauth.bootstrapped") + return "owner" + _log.info("adminauth.verify.fail") + return None + + +def add_member(label: str) -> Tuple[str, str]: + """Enroll a new member. Returns (member_id, QR data-uri) to hand to them.""" + label = (label or "member").strip()[:40] + secret = pyotp.random_base32() + state = _load() + m = _new_member(label, secret) + state["members"].append(m) + _save(state) + _log.info("adminauth.member.added", member=label, id=m["id"]) + return m["id"], _qr_for(secret, label) + + +def revoke_member(member_id: str) -> bool: + state = _load() + for m in state["members"]: + if m["id"] == member_id and m.get("active"): + m["active"] = False + m["revoked_at"] = _now() + _save(state) + _log.info("adminauth.member.revoked", id=member_id, label=m["label"]) + return True + return False + + +def member_qr(member_id: str) -> Optional[str]: + """One-time QR for a just-created member (admin-only surface).""" + for m in _load()["members"]: + if m["id"] == member_id and m.get("active"): + return _qr_for(m["secret"], m["label"]) + return None + + +def bootstrap_qr() -> str: + """QR for the initial owner enrollment (only meaningful before bootstrap).""" + state = _load() + if not state.get("pending_secret"): + state["pending_secret"] = pyotp.random_base32() _save(state) - _log.info("adminauth.provisioned") + return _qr_for(state["pending_secret"], "owner") -def verify(code: str) -> bool: - """Check a 6-digit TOTP code (±1 window for clock skew).""" - secret = _load()["totp_secret"] - ok = pyotp.TOTP(secret).verify(code.strip(), valid_window=1) - if ok: - mark_provisioned() - _log.info("adminauth.verify", ok=ok) - return ok - - -def provisioning_qr_data_uri() -> Tuple[str, str]: - """Return (otpauth_uri, data:image/png;base64 QR) for authenticator setup.""" - secret = _load()["totp_secret"] - uri = pyotp.TOTP(secret).provisioning_uri(name=_ACCOUNT, issuer_name=_ISSUER) +def _qr_for(secret: str, label: str) -> str: + uri = pyotp.TOTP(secret).provisioning_uri(name=label, issuer_name=_ISSUER) img = qrcode.make(uri) buf = io.BytesIO() img.save(buf, format="PNG") - data_uri = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() - return uri, data_uri + return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() -def current_code() -> str: - """The code right now — used only by tests / local verification, never shown.""" - return pyotp.TOTP(_load()["totp_secret"]).now() +def current_code(member_id: Optional[str] = None) -> str: + """Live code for tests/local verification — never shown in the UI.""" + state = _load() + if member_id: + for m in state["members"]: + if m["id"] == member_id: + return pyotp.TOTP(m["secret"]).now() + return pyotp.TOTP(state["pending_secret"]).now() diff --git a/src/psyc/cockpit/app.py b/src/psyc/cockpit/app.py index c42d68b..3015287 100644 --- a/src/psyc/cockpit/app.py +++ b/src/psyc/cockpit/app.py @@ -185,25 +185,50 @@ def response_reject(action_id: int, approver: str = Form("operator"), reason: st @app.get("/admin", response_class=HTMLResponse) def admin_home(request: Request) -> HTMLResponse: if _admin_ok(request): - return TEMPLATES.TemplateResponse(request, "admin.html", {}) - # Not authenticated — show the gate. QR only until first provisioned. - ctx = {"provisioned": adminauth.is_provisioned(), "error": request.query_params.get("error", "")} + enrolled = request.query_params.get("enrolled", "") + new_qr = adminauth.member_qr(enrolled) if enrolled else None + return TEMPLATES.TemplateResponse( + request, "admin.html", + {"members": adminauth.members(), "who": request.session.get("admin_who", ""), + "new_qr": new_qr, "new_label": request.query_params.get("label", "")}, + ) + # Not authenticated — show the gate. Bootstrap QR only until first member exists. + ctx = {"provisioned": adminauth.is_bootstrapped(), "error": request.query_params.get("error", "")} if not ctx["provisioned"]: - _, ctx["qr"] = adminauth.provisioning_qr_data_uri() + ctx["qr"] = adminauth.bootstrap_qr() return TEMPLATES.TemplateResponse(request, "admin_gate.html", ctx) @app.post("/admin/verify") def admin_verify(request: Request, code: str = Form(...)) -> RedirectResponse: - if adminauth.verify(code): + who = adminauth.verify(code) + if who: request.session["admin_ok"] = True + request.session["admin_who"] = who return RedirectResponse("/admin", status_code=303) return RedirectResponse("/admin?error=1", status_code=303) +@app.post("/admin/members") +def admin_add_member(request: Request, label: str = Form("member")) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + member_id, _ = adminauth.add_member(label) + return RedirectResponse(f"/admin?enrolled={member_id}&label={label}", status_code=303) + + +@app.post("/admin/members/{member_id}/revoke") +def admin_revoke_member(request: Request, member_id: str) -> RedirectResponse: + if not _admin_ok(request): + raise HTTPException(status_code=403, detail="admin session required") + adminauth.revoke_member(member_id) + return RedirectResponse("/admin", status_code=303) + + @app.get("/admin/logout") def admin_logout(request: Request) -> RedirectResponse: request.session.pop("admin_ok", None) + request.session.pop("admin_who", None) return RedirectResponse("/admin", status_code=303) diff --git a/src/psyc/cockpit/static/cockpit.css b/src/psyc/cockpit/static/cockpit.css index a4fa42a..ab7bf35 100644 --- a/src/psyc/cockpit/static/cockpit.css +++ b/src/psyc/cockpit/static/cockpit.css @@ -465,3 +465,9 @@ tr.sev-low .sev-badge { color: var(--muted); } .admin-tile { padding: 18px; background: linear-gradient(180deg, var(--panel-2), rgba(18,22,30,0.6)); border: 1px solid var(--border); border-radius: 10px; } .admin-tile h2 { font-size: 15px; margin: 0 0 6px; } .admin-tile p { font-size: 13px; color: var(--muted); margin: 0; } + +/* ── admin: enrollment card ─────────────────────────────────── */ +.enroll-card { display: flex; gap: 16px; align-items: center; padding: 16px; margin: 14px 0; background: var(--panel-2); border: 1px solid var(--accent); border-radius: 10px; box-shadow: 0 0 18px var(--accent-glow); } +.enroll-card .gate-qr { width: 130px; height: 130px; } +.enroll-body h3 { margin: 0 0 6px; font-size: 15px; } +.enroll-body p { margin: 0; font-size: 13px; color: var(--muted); } diff --git a/src/psyc/cockpit/templates/admin.html b/src/psyc/cockpit/templates/admin.html index 51ee68e..63ab66a 100644 --- a/src/psyc/cockpit/templates/admin.html +++ b/src/psyc/cockpit/templates/admin.html @@ -4,16 +4,56 @@

Admin Control Center

- lock ⏻ + {% if who %}signed in as {{ who }} · {% endif %}lock ⏻
-

Authenticated. This is the secured zone — TOTP-gated, hidden from the nav. Infrastructure visibility and the rest of the control system get built in here.

+

The secured zone — TOTP-gated, hidden from the nav. Manage who can get in here, and (next) watch the live infrastructure.

✓ Admin session active — expires after 60 min idle.
+
-
-
-

Docker topology

-

Live container + network map. Wiring next (stage-26b) via a read-only socket-proxy.

+
+
+

Access Control

+ {{ members|length }} enrolled +
+

Every member enrolls their own authenticator — no shared secret. To offboard someone, revoke their slot; everyone else keeps working, no re-enrollment.

+ + {% if new_qr %} +
+
enrollment QR
+
+

Enroll “{{ new_label or 'member' }}”

+

Have them scan this once with Google Authenticator / Authy. It won't be shown again — reload to hide it.

+ {% endif %} + + + + + {% for m in members %} + + + + + + + {% endfor %} + +
MemberEnrolledLast used
{{ m.label }}{{ (m.created_at or '')[:16] | replace('T', ' ') }}{{ ((m.last_used or '')[:16] | replace('T', ' ')) or '—' }} +
+ +
+
+ +
+ + +
+
+ +
+

Docker topology

+

Live container + network map. Wiring next via a read-only socket-proxy.

{% endblock %} diff --git a/tests/test_adminauth.py b/tests/test_adminauth.py new file mode 100644 index 0000000..e41e2c8 --- /dev/null +++ b/tests/test_adminauth.py @@ -0,0 +1,86 @@ +"""Admin gate — per-member TOTP enrollment + revocation isolation.""" + +from __future__ import annotations + +import json + +import pyotp +import pytest + +from psyc.cockpit import adminauth + + +@pytest.fixture +def fresh_state(tmp_path, monkeypatch): + monkeypatch.setattr(adminauth, "_STATE_PATH", tmp_path / "admin_auth.json") + yield tmp_path / "admin_auth.json" + + +def _code_for_secret(secret: str) -> str: + return pyotp.TOTP(secret).now() + + +def _secret_of(state_path, member_id: str) -> str: + data = json.loads(state_path.read_text()) + return next(m["secret"] for m in data["members"] if m["id"] == member_id) + + +def test_starts_unbootstrapped(fresh_state): + assert adminauth.is_bootstrapped() is False + assert adminauth.members() == [] + + +def test_bootstrap_promotes_pending_to_owner(fresh_state): + code = adminauth.current_code() # pending secret + assert adminauth.verify(code) == "owner" + assert adminauth.is_bootstrapped() is True + assert [m["label"] for m in adminauth.members()] == ["owner"] + + +def test_add_member_then_each_secret_authenticates(fresh_state): + adminauth.verify(adminauth.current_code()) # bootstrap owner + aid, _ = adminauth.add_member("alice") + bid, _ = adminauth.add_member("bob") + assert adminauth.verify(_code_for_secret(_secret_of(fresh_state, aid))) == "alice" + assert adminauth.verify(_code_for_secret(_secret_of(fresh_state, bid))) == "bob" + + +def test_revoke_isolates_one_member(fresh_state): + adminauth.verify(adminauth.current_code()) + aid, _ = adminauth.add_member("alice") + bid, _ = adminauth.add_member("bob") + a_secret = _secret_of(fresh_state, aid) + b_secret = _secret_of(fresh_state, bid) + + assert adminauth.revoke_member(aid) is True + + # Alice's code no longer works… + assert adminauth.verify(_code_for_secret(a_secret)) is None + # …but Bob's still does. + assert adminauth.verify(_code_for_secret(b_secret)) == "bob" + # Alice is gone from the active roster, Bob remains. + labels = [m["label"] for m in adminauth.members()] + assert "alice" not in labels and "bob" in labels + + +def test_members_never_expose_secrets(fresh_state): + adminauth.verify(adminauth.current_code()) + adminauth.add_member("alice") + for m in adminauth.members(): + assert "secret" not in m + + +def test_revoke_unknown_id_is_false(fresh_state): + adminauth.verify(adminauth.current_code()) + assert adminauth.revoke_member("deadbeef") is False + + +def test_migrates_old_single_secret_format(fresh_state): + # Simulate the pre-stage-27 state file. + old_secret = pyotp.random_base32() + fresh_state.write_text(json.dumps({ + "totp_secret": old_secret, "session_secret": "s", "provisioned": True, + })) + # Old enrolled secret should authenticate as the migrated 'owner'. + assert adminauth.verify(_code_for_secret(old_secret)) == "owner" + assert [m["label"] for m in adminauth.members()] == ["owner"]