import os, hmac, hashlib, json, logging, uuid, re from datetime import datetime, timezone from urllib.parse import urljoin from flask import Flask, request, jsonify import pytz, sqlite3 from apscheduler.schedulers.background import BackgroundScheduler import requests, urllib3 urllib3.disable_warnings() logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) app = Flask(__name__, static_folder="static", static_url_path="") DB_PATH = os.environ.get("DB_PATH", "/data/dashboard.db") TZ = os.environ.get("TZ", "America/Chicago") DASHBOARD_BASE_URL = os.environ.get("DASHBOARD_BASE_URL", "").rstrip("/") # Seed values for the auto-created "Default" controller (only used on first boot # when the controllers table is empty). After that, manage controllers via the UI. SEED_HOST = os.environ.get("UNIFI_HOST", "") SEED_PORT = int(os.environ.get("UNIFI_PORT", "12445")) SEED_TOKEN = os.environ.get("UNIFI_API_TOKEN", "") SEED_SECRET = os.environ.get("WEBHOOK_SECRET", "") def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def _column_exists(db, table, column): rows = db.execute(f"PRAGMA table_info({table})").fetchall() return any(r["name"] == column for r in rows) def _table_exists(db, table): row = db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,) ).fetchone() return row is not None def init_db(): with get_db() as db: db.execute( """ CREATE TABLE IF NOT EXISTS controllers ( id TEXT PRIMARY KEY, name TEXT NOT NULL, host TEXT NOT NULL, port INTEGER NOT NULL DEFAULT 12445, api_token TEXT NOT NULL, webhook_secret TEXT NOT NULL DEFAULT '', webhook_id TEXT NOT NULL DEFAULT '', enabled INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL, last_sync_at TEXT ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS badge_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, controller_id TEXT, actor_id TEXT NOT NULL, ts TEXT NOT NULL, date TEXT NOT NULL ) """ ) # Migrate legacy badge_events that pre-date the controller_id column. if not _column_exists(db, "badge_events", "controller_id"): db.execute("ALTER TABLE badge_events ADD COLUMN controller_id TEXT") # Migrate legacy user_cache (single-PK on actor_id) to composite PK. legacy_user_cache = _table_exists(db, "user_cache") and not _column_exists( db, "user_cache", "controller_id" ) if legacy_user_cache: db.execute("ALTER TABLE user_cache RENAME TO user_cache_legacy") db.execute( """ CREATE TABLE IF NOT EXISTS user_cache ( controller_id TEXT NOT NULL, actor_id TEXT NOT NULL, full_name TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (controller_id, actor_id) ) """ ) # Seed a Default controller from env vars when the table is empty. existing = db.execute("SELECT COUNT(*) AS n FROM controllers").fetchone()["n"] default_id = None if existing == 0 and SEED_HOST and SEED_TOKEN: default_id = str(uuid.uuid4()) db.execute( """ INSERT INTO controllers (id, name, host, port, api_token, webhook_secret, webhook_id, enabled, created_at) VALUES (?, 'Default', ?, ?, ?, ?, '', 1, ?) """, ( default_id, SEED_HOST, SEED_PORT, SEED_TOKEN, SEED_SECRET, datetime.now(timezone.utc).isoformat(), ), ) log.info("Seeded Default controller %s from env vars", default_id[:8]) # Backfill controller_id on legacy badge_events and user_cache rows. if default_id is None: row = db.execute( "SELECT id FROM controllers ORDER BY created_at LIMIT 1" ).fetchone() default_id = row["id"] if row else None if default_id: db.execute( "UPDATE badge_events SET controller_id = ? WHERE controller_id IS NULL", (default_id,), ) if legacy_user_cache: db.execute( """ INSERT OR IGNORE INTO user_cache (controller_id, actor_id, full_name, updated_at) SELECT ?, actor_id, full_name, updated_at FROM user_cache_legacy """, (default_id,), ) db.execute("DROP TABLE user_cache_legacy") db.commit() def controller_base(host, port): return f"https://{host}:{port}/api/v1/developer" def fetch_controller_users(host, port, token): r = requests.get( f"{controller_base(host, port)}/users", headers={"Authorization": f"Bearer {token}"}, verify=False, timeout=10, ) return r def sync_controller(controller_id): with get_db() as db: c = db.execute( "SELECT * FROM controllers WHERE id = ? AND enabled = 1", (controller_id,) ).fetchone() if not c: return 0 try: r = fetch_controller_users(c["host"], c["port"], c["api_token"]) if r.status_code != 200: log.warning( "User sync failed for controller %s: %s %s", c["name"], r.status_code, r.text[:200], ) return 0 users = r.json().get("data", []) except Exception as e: log.error("sync_controller(%s) network error: %s", c["name"], e) return 0 now_iso = datetime.now(timezone.utc).isoformat() with get_db() as db: for u in users: actor_id = u.get("id") if not actor_id: continue full_name = (u.get("full_name") or "").strip() if not full_name: full_name = f"{u.get('first_name','')} {u.get('last_name','')}".strip() db.execute( """ INSERT INTO user_cache (controller_id, actor_id, full_name, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(controller_id, actor_id) DO UPDATE SET full_name = excluded.full_name, updated_at = excluded.updated_at """, (controller_id, actor_id, full_name or f"User {actor_id[:8]}", now_iso), ) db.execute( "UPDATE controllers SET last_sync_at = ? WHERE id = ?", (now_iso, controller_id) ) db.commit() log.info("Synced %d users from controller %s", len(users), c["name"]) return len(users) def sync_all_controllers(): with get_db() as db: rows = db.execute("SELECT id FROM controllers WHERE enabled = 1").fetchall() for r in rows: sync_controller(r["id"]) def register_webhook(host, port, token, dashboard_url, name): r = requests.post( f"{controller_base(host, port)}/webhooks/endpoints", headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, json={ "name": name, "endpoint": dashboard_url, "events": ["access.door.unlock"], }, verify=False, timeout=10, ) return r def delete_webhook(host, port, token, webhook_id): if not webhook_id: return None try: return requests.delete( f"{controller_base(host, port)}/webhooks/endpoints/{webhook_id}", headers={"Authorization": f"Bearer {token}"}, verify=False, timeout=10, ) except Exception as e: log.warning("delete_webhook error: %s", e) return None def verify_signature(secret, payload_bytes, sig_header): if not secret: return True # controller has no secret stored yet — accept (LAN-trust mode) if not sig_header: return False try: parts = dict(p.split("=", 1) for p in sig_header.split(",")) timestamp = parts.get("t", "") received = parts.get("v1", "") if not timestamp or not received: return False signed_payload = f"{timestamp}.".encode() + payload_bytes expected = hmac.new(secret.encode(), signed_payload, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, received) except Exception as e: log.warning("Signature parse error: %s", e) return False def resolve_dashboard_base(): if DASHBOARD_BASE_URL: return DASHBOARD_BASE_URL return request.host_url.rstrip("/") def controller_to_dict(row): return { "id": row["id"], "name": row["name"], "host": row["host"], "port": row["port"], "enabled": bool(row["enabled"]), "has_webhook": bool(row["webhook_id"]), "last_sync_at": row["last_sync_at"], } # --------------------------------------------------------------------------- # Static + dashboard data # --------------------------------------------------------------------------- @app.route("/") def index(): return app.send_static_file("index.html") @app.route("/api/first-badge-status") def first_badge_status(): date = request.args.get("date", datetime.now(pytz.timezone(TZ)).strftime("%Y-%m-%d")) cutoff = request.args.get("cutoff", "09:00") controller_filter = request.args.get("controller_id", "").strip() or None if not re.match(r"^\d{2}:\d{2}$", cutoff): cutoff = "09:00" cutoff_end = cutoff + ":59" sql = """ SELECT b.actor_id, b.controller_id, c.name AS source, MIN(b.ts) AS first_ts, MAX(b.ts) AS latest_ts, COALESCE( u.full_name, 'Unknown (' || SUBSTR(b.actor_id,1,8) || '...)' ) AS name FROM badge_events b LEFT JOIN user_cache u ON u.actor_id = b.actor_id AND u.controller_id = b.controller_id LEFT JOIN controllers c ON c.id = b.controller_id WHERE b.date = ? """ params = [date] if controller_filter: sql += " AND b.controller_id = ?" params.append(controller_filter) sql += " GROUP BY b.actor_id, b.controller_id ORDER BY first_ts ASC" with get_db() as db: rows = db.execute(sql, params).fetchall() result = [] for r in rows: first = r["first_ts"] latest = r["latest_ts"] result.append({ "actor_id": r["actor_id"], "name": r["name"], "source": r["source"] or "—", "first_ts": first, "latest_ts": latest if latest != first else None, "status": "ON TIME" if first <= cutoff_end else "LATE", }) return jsonify(result) # --------------------------------------------------------------------------- # Webhook ingestion — per-controller endpoint, plus legacy compat alias # --------------------------------------------------------------------------- def _ingest_webhook(controller_id): raw = request.get_data() with get_db() as db: c = db.execute( "SELECT * FROM controllers WHERE id = ?", (controller_id,) ).fetchone() if not c: return jsonify({"error": "unknown controller"}), 404 if not verify_signature(c["webhook_secret"], raw, request.headers.get("Signature", "")): log.warning("Signature mismatch for controller %s", c["name"]) return jsonify({"error": "invalid signature"}), 401 try: payload = json.loads(raw) except Exception: return jsonify({"error": "bad json"}), 400 event = payload.get("event") or payload.get("event_object_id", "") or "" data = payload.get("data") or {} actor_obj = data.get("actor") or {} actor = actor_obj.get("id") if "access.door.unlock" not in str(event): return jsonify({"status": "ignored"}), 200 if not actor: return jsonify({"error": "no actor"}), 400 tz = pytz.timezone(TZ) ts = None top_ts_ms = payload.get("timestamp") if isinstance(top_ts_ms, (int, float)) and top_ts_ms > 1e10: ts = datetime.fromtimestamp(top_ts_ms / 1000.0, tz=pytz.utc) if ts is None: published = (data.get("event") or {}).get("published") if isinstance(published, (int, float)) and published > 1e10: ts = datetime.fromtimestamp(published / 1000.0, tz=pytz.utc) if ts is None: for field in ("created_at", "time", "occurred_at"): raw_ts = payload.get(field) if raw_ts: try: ts = datetime.fromisoformat(str(raw_ts).replace("Z", "+00:00")) break except Exception: pass if ts is None: ts = datetime.now(tz=tz) ts_local = ts.astimezone(tz) date = ts_local.strftime("%Y-%m-%d") ts_str = ts_local.strftime("%H:%M:%S") with get_db() as db: db.execute( "INSERT INTO badge_events (controller_id, actor_id, ts, date) VALUES (?, ?, ?, ?)", (controller_id, actor, ts_str, date), ) db.commit() log.info( "Badge-in: controller=%s actor=%s date=%s ts=%s", c["name"], actor, date, ts_str, ) return jsonify({"status": "ok"}), 200 @app.route("/api/unifi-access/", methods=["POST"]) def receive_webhook(controller_id): return _ingest_webhook(controller_id) @app.route("/api/unifi-access", methods=["POST"]) def receive_webhook_legacy(): """Compat alias for installs registered before per-controller URLs existed. Routes to the oldest controller (the env-seeded Default).""" with get_db() as db: row = db.execute( "SELECT id FROM controllers ORDER BY created_at LIMIT 1" ).fetchone() if not row: return jsonify({"error": "no controllers configured"}), 404 return _ingest_webhook(row["id"]) # --------------------------------------------------------------------------- # Controller management # --------------------------------------------------------------------------- @app.route("/api/controllers", methods=["GET"]) def list_controllers(): with get_db() as db: rows = db.execute( "SELECT * FROM controllers ORDER BY created_at" ).fetchall() return jsonify([controller_to_dict(r) for r in rows]) @app.route("/api/controllers", methods=["POST"]) def add_controller(): body = request.get_json(silent=True) or {} name = (body.get("name") or "").strip() host = (body.get("host") or "").strip() port = int(body.get("port") or 12445) api_token = (body.get("api_token") or "").strip() if not name or not host or not api_token: return jsonify({"error": "name, host, and api_token are required"}), 400 controller_id = str(uuid.uuid4()) dashboard_base = resolve_dashboard_base() endpoint_url = urljoin(dashboard_base + "/", f"api/unifi-access/{controller_id}") try: r = register_webhook(host, port, api_token, endpoint_url, f"Dashboard — {name}") except Exception as e: return jsonify({"error": f"webhook registration failed: {e}"}), 502 if r.status_code >= 300: return jsonify({ "error": "webhook registration rejected by controller", "status_code": r.status_code, "response": r.text[:500], }), 502 try: payload = r.json() except Exception: return jsonify({"error": "unparseable controller response", "raw": r.text[:500]}), 502 data = payload.get("data") or {} webhook_id = data.get("id", "") webhook_secret = data.get("secret", "") with get_db() as db: db.execute( """ INSERT INTO controllers (id, name, host, port, api_token, webhook_secret, webhook_id, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?) """, ( controller_id, name, host, port, api_token, webhook_secret, webhook_id, datetime.now(timezone.utc).isoformat(), ), ) db.commit() sync_controller(controller_id) with get_db() as db: row = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone() return jsonify(controller_to_dict(row)), 201 @app.route("/api/controllers/", methods=["PATCH"]) def update_controller(controller_id): body = request.get_json(silent=True) or {} fields, values = [], [] if "name" in body: fields.append("name = ?"); values.append((body["name"] or "").strip()) if "enabled" in body: fields.append("enabled = ?"); values.append(1 if body["enabled"] else 0) if not fields: return jsonify({"error": "no updatable fields provided"}), 400 values.append(controller_id) with get_db() as db: cur = db.execute( f"UPDATE controllers SET {', '.join(fields)} WHERE id = ?", values ) db.commit() if cur.rowcount == 0: return jsonify({"error": "not found"}), 404 row = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone() return jsonify(controller_to_dict(row)) @app.route("/api/controllers/", methods=["DELETE"]) def remove_controller(controller_id): with get_db() as db: c = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone() if not c: return jsonify({"error": "not found"}), 404 delete_webhook(c["host"], c["port"], c["api_token"], c["webhook_id"]) with get_db() as db: db.execute("DELETE FROM user_cache WHERE controller_id = ?", (controller_id,)) db.execute("DELETE FROM badge_events WHERE controller_id = ?", (controller_id,)) db.execute("DELETE FROM controllers WHERE id = ?", (controller_id,)) db.commit() return jsonify({"status": "ok"}) @app.route("/api/controllers//test", methods=["POST"]) def test_controller(controller_id): with get_db() as db: c = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone() if not c: return jsonify({"error": "not found"}), 404 try: r = fetch_controller_users(c["host"], c["port"], c["api_token"]) ok = r.status_code == 200 user_count = len(r.json().get("data", [])) if ok else None return jsonify({ "ok": ok, "status_code": r.status_code, "user_count": user_count, "message": "Connected" if ok else r.text[:200], }) except Exception as e: return jsonify({"ok": False, "message": str(e)}), 200 @app.route("/api/controllers//sync", methods=["POST"]) def sync_one(controller_id): n = sync_controller(controller_id) return jsonify({"status": "ok", "synced": n}) # --------------------------------------------------------------------------- # Misc admin # --------------------------------------------------------------------------- @app.route("/api/sync-users") def manual_sync_all(): sync_all_controllers() return jsonify({"status": "synced"}) @app.route("/api/reset-day", methods=["DELETE"]) def reset_day(): date = request.args.get("date", datetime.now(pytz.timezone(TZ)).strftime("%Y-%m-%d")) controller_id = request.args.get("controller_id", "").strip() or None sql = "DELETE FROM badge_events WHERE date = ?" params = [date] if controller_id: sql += " AND controller_id = ?" params.append(controller_id) with get_db() as db: cur = db.execute(sql, params) db.commit() return jsonify({"status": "ok", "deleted": cur.rowcount, "date": date}) # --------------------------------------------------------------------------- # Boot # --------------------------------------------------------------------------- with app.app_context(): init_db() sync_all_controllers() scheduler = BackgroundScheduler() scheduler.add_job(sync_all_controllers, "interval", hours=6) scheduler.start() if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))