stage-27: per-member TOTP enrollment + individual revocation

Replaces the single shared admin secret with named per-member
enrollments — no more "one key for everyone". First visit bootstraps an
'owner' slot; further members are added from inside the admin panel,
each scanning their own QR. Login accepts a code matching any active
member and records who got in. Offboarding is a per-member revoke: that
person's codes stop immediately, everyone else is unaffected, nobody
re-enrolls. Old single-secret state migrates to an 'owner' member.

Admin panel gains an Access Control table (member, enrolled, last used,
revoke) + add-member form that shows the new QR once. 7 tests including
revocation isolation; verified the full lifecycle live (bootstrap → add
→ authenticate → revoke → rejected while owner persists).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
m17hr1l
2026-05-23 00:46:45 +02:00
parent 4a832964a3
commit cb7bef4e40
5 changed files with 298 additions and 50 deletions

View File

@@ -1,9 +1,11 @@
"""Admin gate — TOTP (authenticator-app) auth for the hidden /admin zone. """Admin gate — per-member TOTP auth for the hidden /admin zone.
A single TOTP secret is provisioned by scanning a QR code into an authenticator Each project member enrolls their own authenticator (own secret, own QR) under
app (Google Authenticator, Authy, …). After that, entry to /admin requires the a named slot. Login accepts a code matching ANY active member, so offboarding
current rotating 6-digit code. Secret + session key persist under DATA_DIR so is a per-member revoke — no shared secret, no re-enrolling everyone when one
they survive restarts; the file is gitignored with the rest of data/. person leaves. The first visit bootstraps an "owner" slot; further members are
added from inside the authenticated admin panel. State persists under DATA_DIR
(gitignored).
""" """
from __future__ import annotations from __future__ import annotations
@@ -12,8 +14,8 @@ import base64
import io import io
import json import json
import secrets import secrets
from pathlib import Path from datetime import datetime, timezone
from typing import Tuple from typing import List, Optional, Tuple
import pyotp import pyotp
import qrcode import qrcode
@@ -25,20 +27,37 @@ _log = log.get(__name__)
_STATE_PATH = DATA_DIR / "admin_auth.json" _STATE_PATH = DATA_DIR / "admin_auth.json"
_ISSUER = "psyc" _ISSUER = "psyc"
_ACCOUNT = "admin"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _load() -> dict: def _load() -> dict:
if _STATE_PATH.exists(): if _STATE_PATH.exists():
return json.loads(_STATE_PATH.read_text()) data = json.loads(_STATE_PATH.read_text())
state = { # Migrate the old single-secret format → member list.
"totp_secret": pyotp.random_base32(), if "members" not in data:
members = []
pending = data.get("totp_secret")
if data.get("provisioned") and data.get("totp_secret"):
members = [_new_member("owner", data["totp_secret"])]
pending = None
data = {
"session_secret": data.get("session_secret", secrets.token_urlsafe(32)),
"pending_secret": pending,
"members": members,
}
_save(data)
return data
data = {
"session_secret": secrets.token_urlsafe(32), "session_secret": secrets.token_urlsafe(32),
"provisioned": False, "pending_secret": pyotp.random_base32(),
"members": [],
} }
_save(state) _save(data)
_log.info("adminauth.initialized", path=str(_STATE_PATH)) _log.info("adminauth.initialized", path=str(_STATE_PATH))
return state return data
def _save(state: dict) -> None: def _save(state: dict) -> None:
@@ -46,43 +65,115 @@ def _save(state: dict) -> None:
_STATE_PATH.write_text(json.dumps(state, indent=2)) _STATE_PATH.write_text(json.dumps(state, indent=2))
def _new_member(label: str, secret: str) -> dict:
return {
"id": secrets.token_hex(4),
"label": label,
"secret": secret,
"created_at": _now(),
"active": True,
"last_used": None,
}
def session_secret() -> str: def session_secret() -> str:
return _load()["session_secret"] return _load()["session_secret"]
def is_provisioned() -> bool: def members() -> List[dict]:
return bool(_load().get("provisioned")) """Active members, without exposing their secrets."""
return [
{k: m[k] for k in ("id", "label", "created_at", "last_used")}
for m in _load()["members"] if m.get("active")
]
def mark_provisioned() -> None: def is_bootstrapped() -> bool:
return any(m.get("active") for m in _load()["members"])
def verify(code: str) -> Optional[str]:
"""Check a code against every active member (and the bootstrap slot).
Returns the matched member label, or None. Updates last_used; promotes the
pending bootstrap secret into the first 'owner' member on first success.
"""
code = code.strip()
state = _load() state = _load()
if not state.get("provisioned"): for m in state["members"]:
state["provisioned"] = True if m.get("active") and pyotp.TOTP(m["secret"]).verify(code, valid_window=1):
m["last_used"] = _now()
_save(state)
_log.info("adminauth.verify.ok", member=m["label"])
return m["label"]
# Bootstrap: no active members yet → accept the pending secret as owner.
if not any(m.get("active") for m in state["members"]) and state.get("pending_secret"):
if pyotp.TOTP(state["pending_secret"]).verify(code, valid_window=1):
owner = _new_member("owner", state["pending_secret"])
owner["last_used"] = _now()
state["members"].append(owner)
state["pending_secret"] = None
_save(state)
_log.info("adminauth.bootstrapped")
return "owner"
_log.info("adminauth.verify.fail")
return None
def add_member(label: str) -> Tuple[str, str]:
"""Enroll a new member. Returns (member_id, QR data-uri) to hand to them."""
label = (label or "member").strip()[:40]
secret = pyotp.random_base32()
state = _load()
m = _new_member(label, secret)
state["members"].append(m)
_save(state)
_log.info("adminauth.member.added", member=label, id=m["id"])
return m["id"], _qr_for(secret, label)
def revoke_member(member_id: str) -> bool:
state = _load()
for m in state["members"]:
if m["id"] == member_id and m.get("active"):
m["active"] = False
m["revoked_at"] = _now()
_save(state)
_log.info("adminauth.member.revoked", id=member_id, label=m["label"])
return True
return False
def member_qr(member_id: str) -> Optional[str]:
"""One-time QR for a just-created member (admin-only surface)."""
for m in _load()["members"]:
if m["id"] == member_id and m.get("active"):
return _qr_for(m["secret"], m["label"])
return None
def bootstrap_qr() -> str:
"""QR for the initial owner enrollment (only meaningful before bootstrap)."""
state = _load()
if not state.get("pending_secret"):
state["pending_secret"] = pyotp.random_base32()
_save(state) _save(state)
_log.info("adminauth.provisioned") return _qr_for(state["pending_secret"], "owner")
def verify(code: str) -> bool: def _qr_for(secret: str, label: str) -> str:
"""Check a 6-digit TOTP code (±1 window for clock skew).""" uri = pyotp.TOTP(secret).provisioning_uri(name=label, issuer_name=_ISSUER)
secret = _load()["totp_secret"]
ok = pyotp.TOTP(secret).verify(code.strip(), valid_window=1)
if ok:
mark_provisioned()
_log.info("adminauth.verify", ok=ok)
return ok
def provisioning_qr_data_uri() -> Tuple[str, str]:
"""Return (otpauth_uri, data:image/png;base64 QR) for authenticator setup."""
secret = _load()["totp_secret"]
uri = pyotp.TOTP(secret).provisioning_uri(name=_ACCOUNT, issuer_name=_ISSUER)
img = qrcode.make(uri) img = qrcode.make(uri)
buf = io.BytesIO() buf = io.BytesIO()
img.save(buf, format="PNG") img.save(buf, format="PNG")
data_uri = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()
return uri, data_uri
def current_code() -> str: def current_code(member_id: Optional[str] = None) -> str:
"""The code right now — used only by tests / local verification, never shown.""" """Live code for tests/local verification never shown in the UI."""
return pyotp.TOTP(_load()["totp_secret"]).now() state = _load()
if member_id:
for m in state["members"]:
if m["id"] == member_id:
return pyotp.TOTP(m["secret"]).now()
return pyotp.TOTP(state["pending_secret"]).now()

View File

@@ -185,25 +185,50 @@ def response_reject(action_id: int, approver: str = Form("operator"), reason: st
@app.get("/admin", response_class=HTMLResponse) @app.get("/admin", response_class=HTMLResponse)
def admin_home(request: Request) -> HTMLResponse: def admin_home(request: Request) -> HTMLResponse:
if _admin_ok(request): if _admin_ok(request):
return TEMPLATES.TemplateResponse(request, "admin.html", {}) enrolled = request.query_params.get("enrolled", "")
# Not authenticated — show the gate. QR only until first provisioned. new_qr = adminauth.member_qr(enrolled) if enrolled else None
ctx = {"provisioned": adminauth.is_provisioned(), "error": request.query_params.get("error", "")} return TEMPLATES.TemplateResponse(
request, "admin.html",
{"members": adminauth.members(), "who": request.session.get("admin_who", ""),
"new_qr": new_qr, "new_label": request.query_params.get("label", "")},
)
# Not authenticated — show the gate. Bootstrap QR only until first member exists.
ctx = {"provisioned": adminauth.is_bootstrapped(), "error": request.query_params.get("error", "")}
if not ctx["provisioned"]: if not ctx["provisioned"]:
_, ctx["qr"] = adminauth.provisioning_qr_data_uri() ctx["qr"] = adminauth.bootstrap_qr()
return TEMPLATES.TemplateResponse(request, "admin_gate.html", ctx) return TEMPLATES.TemplateResponse(request, "admin_gate.html", ctx)
@app.post("/admin/verify") @app.post("/admin/verify")
def admin_verify(request: Request, code: str = Form(...)) -> RedirectResponse: def admin_verify(request: Request, code: str = Form(...)) -> RedirectResponse:
if adminauth.verify(code): who = adminauth.verify(code)
if who:
request.session["admin_ok"] = True request.session["admin_ok"] = True
request.session["admin_who"] = who
return RedirectResponse("/admin", status_code=303) return RedirectResponse("/admin", status_code=303)
return RedirectResponse("/admin?error=1", status_code=303) return RedirectResponse("/admin?error=1", status_code=303)
@app.post("/admin/members")
def admin_add_member(request: Request, label: str = Form("member")) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
member_id, _ = adminauth.add_member(label)
return RedirectResponse(f"/admin?enrolled={member_id}&label={label}", status_code=303)
@app.post("/admin/members/{member_id}/revoke")
def admin_revoke_member(request: Request, member_id: str) -> RedirectResponse:
if not _admin_ok(request):
raise HTTPException(status_code=403, detail="admin session required")
adminauth.revoke_member(member_id)
return RedirectResponse("/admin", status_code=303)
@app.get("/admin/logout") @app.get("/admin/logout")
def admin_logout(request: Request) -> RedirectResponse: def admin_logout(request: Request) -> RedirectResponse:
request.session.pop("admin_ok", None) request.session.pop("admin_ok", None)
request.session.pop("admin_who", None)
return RedirectResponse("/admin", status_code=303) return RedirectResponse("/admin", status_code=303)

View File

@@ -465,3 +465,9 @@ tr.sev-low .sev-badge { color: var(--muted); }
.admin-tile { padding: 18px; background: linear-gradient(180deg, var(--panel-2), rgba(18,22,30,0.6)); border: 1px solid var(--border); border-radius: 10px; } .admin-tile { padding: 18px; background: linear-gradient(180deg, var(--panel-2), rgba(18,22,30,0.6)); border: 1px solid var(--border); border-radius: 10px; }
.admin-tile h2 { font-size: 15px; margin: 0 0 6px; } .admin-tile h2 { font-size: 15px; margin: 0 0 6px; }
.admin-tile p { font-size: 13px; color: var(--muted); margin: 0; } .admin-tile p { font-size: 13px; color: var(--muted); margin: 0; }
/* ── admin: enrollment card ─────────────────────────────────── */
.enroll-card { display: flex; gap: 16px; align-items: center; padding: 16px; margin: 14px 0; background: var(--panel-2); border: 1px solid var(--accent); border-radius: 10px; box-shadow: 0 0 18px var(--accent-glow); }
.enroll-card .gate-qr { width: 130px; height: 130px; }
.enroll-body h3 { margin: 0 0 6px; font-size: 15px; }
.enroll-body p { margin: 0; font-size: 13px; color: var(--muted); }

View File

@@ -4,16 +4,56 @@
<section class="panel"> <section class="panel">
<div class="panel-head"> <div class="panel-head">
<h1>Admin Control Center</h1> <h1>Admin Control Center</h1>
<span class="count"><a href="/admin/logout" class="lg-sub">lock ⏻</a></span> <span class="count">{% if who %}signed in as <strong>{{ who }}</strong> · {% endif %}<a href="/admin/logout" class="lg-sub">lock ⏻</a></span>
</div> </div>
<p class="page-intro">Authenticated. This is the secured zone — TOTP-gated, hidden from the nav. Infrastructure visibility and the rest of the control system get built in here.</p> <p class="page-intro">The secured zone — TOTP-gated, hidden from the nav. Manage who can get in here, and (next) watch the live infrastructure.</p>
<div class="verdict verdict-clean">✓ Admin session active — expires after 60 min idle.</div> <div class="verdict verdict-clean">✓ Admin session active — expires after 60 min idle.</div>
</section>
<div class="admin-grid"> <section class="panel">
<div class="admin-tile admin-tile-pending"> <div class="panel-head">
<h2>Docker topology</h2> <h2>Access Control</h2>
<p>Live container + network map. <em>Wiring next (stage-26b) via a read-only socket-proxy.</em></p> <span class="count">{{ members|length }} enrolled</span>
</div>
<p class="page-intro">Every member enrolls their own authenticator — no shared secret. To offboard someone, revoke their slot; everyone else keeps working, no re-enrollment.</p>
{% if new_qr %}
<div class="enroll-card">
<div class="gate-qr-frame" style="margin:0;"><img class="gate-qr" src="{{ new_qr }}" alt="enrollment QR"></div>
<div class="enroll-body">
<h3>Enroll “{{ new_label or 'member' }}”</h3>
<p>Have them scan this <strong>once</strong> with Google&nbsp;Authenticator / Authy. It won't be shown again — reload to hide it.</p>
</div> </div>
</div> </div>
{% endif %}
<table class="ledger">
<thead><tr><th>Member</th><th>Enrolled</th><th>Last used</th><th></th></tr></thead>
<tbody>
{% for m in members %}
<tr class="ledger-row">
<td><strong>{{ m.label }}</strong></td>
<td class="lg-ts">{{ (m.created_at or '')[:16] | replace('T', ' ') }}</td>
<td class="lg-ts">{{ ((m.last_used or '')[:16] | replace('T', ' ')) or '—' }}</td>
<td>
<form method="post" action="/admin/members/{{ m.id }}/revoke" class="queue-action"
onsubmit="return confirm('Revoke {{ m.label }}? Their codes stop working immediately.');">
<button type="submit" class="btn btn-reject">revoke</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<form method="post" action="/admin/members" class="lookup-form" style="margin-top:14px;">
<input type="text" name="label" placeholder="new member name (e.g. alice)" class="lookup-input" maxlength="40">
<button type="submit" class="btn btn-enforce">+ Add member</button>
</form>
</section>
<section class="panel">
<div class="panel-head"><h2>Docker topology</h2></div>
<p class="page-intro">Live container + network map. <em>Wiring next via a read-only socket-proxy.</em></p>
</section> </section>
{% endblock %} {% endblock %}

86
tests/test_adminauth.py Normal file
View File

@@ -0,0 +1,86 @@
"""Admin gate — per-member TOTP enrollment + revocation isolation."""
from __future__ import annotations
import json
import pyotp
import pytest
from psyc.cockpit import adminauth
@pytest.fixture
def fresh_state(tmp_path, monkeypatch):
monkeypatch.setattr(adminauth, "_STATE_PATH", tmp_path / "admin_auth.json")
yield tmp_path / "admin_auth.json"
def _code_for_secret(secret: str) -> str:
return pyotp.TOTP(secret).now()
def _secret_of(state_path, member_id: str) -> str:
data = json.loads(state_path.read_text())
return next(m["secret"] for m in data["members"] if m["id"] == member_id)
def test_starts_unbootstrapped(fresh_state):
assert adminauth.is_bootstrapped() is False
assert adminauth.members() == []
def test_bootstrap_promotes_pending_to_owner(fresh_state):
code = adminauth.current_code() # pending secret
assert adminauth.verify(code) == "owner"
assert adminauth.is_bootstrapped() is True
assert [m["label"] for m in adminauth.members()] == ["owner"]
def test_add_member_then_each_secret_authenticates(fresh_state):
adminauth.verify(adminauth.current_code()) # bootstrap owner
aid, _ = adminauth.add_member("alice")
bid, _ = adminauth.add_member("bob")
assert adminauth.verify(_code_for_secret(_secret_of(fresh_state, aid))) == "alice"
assert adminauth.verify(_code_for_secret(_secret_of(fresh_state, bid))) == "bob"
def test_revoke_isolates_one_member(fresh_state):
adminauth.verify(adminauth.current_code())
aid, _ = adminauth.add_member("alice")
bid, _ = adminauth.add_member("bob")
a_secret = _secret_of(fresh_state, aid)
b_secret = _secret_of(fresh_state, bid)
assert adminauth.revoke_member(aid) is True
# Alice's code no longer works…
assert adminauth.verify(_code_for_secret(a_secret)) is None
# …but Bob's still does.
assert adminauth.verify(_code_for_secret(b_secret)) == "bob"
# Alice is gone from the active roster, Bob remains.
labels = [m["label"] for m in adminauth.members()]
assert "alice" not in labels and "bob" in labels
def test_members_never_expose_secrets(fresh_state):
adminauth.verify(adminauth.current_code())
adminauth.add_member("alice")
for m in adminauth.members():
assert "secret" not in m
def test_revoke_unknown_id_is_false(fresh_state):
adminauth.verify(adminauth.current_code())
assert adminauth.revoke_member("deadbeef") is False
def test_migrates_old_single_secret_format(fresh_state):
# Simulate the pre-stage-27 state file.
old_secret = pyotp.random_base32()
fresh_state.write_text(json.dumps({
"totp_secret": old_secret, "session_secret": "s", "provisioned": True,
}))
# Old enrolled secret should authenticate as the migrated 'owner'.
assert adminauth.verify(_code_for_secret(old_secret)) == "owner"
assert [m["label"] for m in adminauth.members()] == ["owner"]