Merge pull request #7 from jasonMPM/v7

Update app.py
This commit was merged in pull request #7.
This commit is contained in:
jasonMPM
2026-03-04 22:03:58 -06:00
committed by GitHub

119
app.py
View File

@@ -19,13 +19,15 @@ WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "")
DB_PATH = os.environ.get("DB_PATH", "/data/dashboard.db") DB_PATH = os.environ.get("DB_PATH", "/data/dashboard.db")
TZ = os.environ.get("TZ", "America/Chicago") TZ = os.environ.get("TZ", "America/Chicago")
UNIFI_BASE = f"https://{UNIFI_HOST}:{UNIFI_PORT}/api/v1/developer" UNIFI_BASE = f"https://{UNIFI_HOST}:{UNIFI_PORT}/api/v1/developer"
def get_db(): def get_db():
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return conn return conn
def init_db(): def init_db():
with get_db() as db: with get_db() as db:
db.execute(""" db.execute("""
@@ -45,12 +47,14 @@ def init_db():
""") """)
db.commit() db.commit()
def sync_unifi_users(): def sync_unifi_users():
try: try:
r = requests.get( r = requests.get(
f"{UNIFI_BASE}/users", f"{UNIFI_BASE}/users",
headers={"Authorization": f"Bearer {UNIFI_TOKEN}"}, headers={"Authorization": f"Bearer {UNIFI_TOKEN}"},
verify=False, timeout=10 verify=False,
timeout=10,
) )
if r.status_code != 200: if r.status_code != 200:
log.warning("User sync failed: %s %s", r.status_code, r.text[:200]) log.warning("User sync failed: %s %s", r.status_code, r.text[:200])
@@ -58,37 +62,52 @@ def sync_unifi_users():
users = r.json().get("data", []) users = r.json().get("data", [])
with get_db() as db: with get_db() as db:
for u in users: for u in users:
db.execute(""" full_name = (u.get("full_name") or "").strip()
if not full_name:
full_name = f"{u.get('first_name','')} {u.get('last_name','')}".strip()
db.execute(
"""
INSERT INTO user_cache (actor_id, full_name, updated_at) INSERT INTO user_cache (actor_id, full_name, updated_at)
VALUES (?, ?, ?) VALUES (?, ?, ?)
ON CONFLICT(actor_id) DO UPDATE SET ON CONFLICT(actor_id) DO UPDATE SET
full_name = excluded.full_name, full_name = excluded.full_name,
updated_at = excluded.updated_at updated_at = excluded.updated_at
""", (u["id"], u.get("full_name", "").strip() or """,
f"{u.get('first_name','')} {u.get('last_name','')}".strip(), (
datetime.utcnow().isoformat())) u["id"],
full_name or f"User {u['id'][:8]}",
datetime.utcnow().isoformat(),
),
)
db.commit() db.commit()
log.info("Synced %d users from UniFi Access", len(users)) log.info("Synced %d users from UniFi Access", len(users))
except Exception as e: except Exception as e:
log.error("sync_unifi_users error: %s", e) log.error("sync_unifi_users error: %s", e)
def verify_signature(payload_bytes, sig_header): def verify_signature(payload_bytes, sig_header):
""" """
UniFi Access signature format (from official API docs section 11.7): UniFi Access signature format (API docs section 11.7):
Header name : Signature Header name : Signature
Header value: t=<unix_timestamp>,v1=<hex_hmac_sha256> Header value: t=<unix_timestamp>,v1=<hex_hmac_sha256>
Signed data : f"{timestamp}.{raw_body}" Signed data : f"{timestamp}.{raw_body}"
Returns True if valid, or if no WEBHOOK_SECRET is configured. Returns True if valid, or if no WEBHOOK_SECRET is configured.
""" """
if not WEBHOOK_SECRET: if not WEBHOOK_SECRET:
# If no secret configured, accept all (useful for initial testing)
return True return True
if not sig_header: if not sig_header:
log.warning("No Signature header present") log.warning("No Signature header present")
return False return False
try: try:
parts = dict(p.split("=", 1) for p in sig_header.split(",")) parts = dict(p.split("=", 1) for p in sig_header.split(","))
timestamp = parts.get("t", "") timestamp = parts.get("t", "")
received = parts.get("v1", "") received = parts.get("v1", "")
if not timestamp or not received:
log.warning("Signature header missing t or v1: %s", sig_header)
return False
signed_payload = f"{timestamp}.".encode() + payload_bytes signed_payload = f"{timestamp}.".encode() + payload_bytes
expected = hmac.new( expected = hmac.new(
WEBHOOK_SECRET.encode(), signed_payload, hashlib.sha256 WEBHOOK_SECRET.encode(), signed_payload, hashlib.sha256
@@ -98,15 +117,16 @@ def verify_signature(payload_bytes, sig_header):
log.warning("Signature parse error: %s", e) log.warning("Signature parse error: %s", e)
return False return False
@app.route("/") @app.route("/")
def index(): def index():
return app.send_static_file("index.html") return app.send_static_file("index.html")
@app.route("/api/unifi-access", methods=["POST"]) @app.route("/api/unifi-access", methods=["POST"])
def receive_webhook(): def receive_webhook():
raw = request.get_data() raw = request.get_data()
# Optional signature verification
sig = request.headers.get("Signature", "") sig = request.headers.get("Signature", "")
if not verify_signature(raw, sig): if not verify_signature(raw, sig):
log.warning("Webhook signature mismatch") log.warning("Webhook signature mismatch")
@@ -119,72 +139,100 @@ def receive_webhook():
log.info("Webhook received: %s", json.dumps(payload)[:300]) log.info("Webhook received: %s", json.dumps(payload)[:300])
# Support both UniFi Access developer API and legacy Alarm Manager formats event = payload.get("event") or payload.get("event_object_id", "") or ""
event = payload.get("event") or payload.get("event_object_id", "")
actor = (payload.get("actor") or {}).get("id") or payload.get("actor_id", "")
ts_raw = payload.get("timestamp") or payload.get("created_at") or datetime.utcnow().isoformat()
if "door.unlock" not in str(event) and "access.door.unlock" not in str(event): # Data block per UniFi Access docs: payload["data"]["actor"], ["event"], etc.
data = payload.get("data") or {}
actor_obj = data.get("actor") or {}
actor = actor_obj.get("id") or payload.get("actor_id", "")
if "access.door.unlock" not in str(event):
# Ignore other notification types
return jsonify({"status": "ignored"}), 200 return jsonify({"status": "ignored"}), 200
if not actor: if not actor:
log.warning("Webhook has no actor id: %s", json.dumps(payload)[:300])
return jsonify({"error": "no actor"}), 400 return jsonify({"error": "no actor"}), 400
tz = pytz.timezone(TZ) # Prefer data.event.published (ms since epoch) if present
ts = datetime.fromisoformat(str(ts_raw).replace("Z", "+00:00")) event_meta = data.get("event") or {}
ts_l = ts.astimezone(tz) ts_ms = event_meta.get("published")
date = ts_l.strftime("%Y-%m-%d") if ts_ms:
ts_s = ts_l.strftime("%H:%M:%S") ts = datetime.fromtimestamp(ts_ms / 1000.0, tz=pytz.utc)
else:
ts_raw = (
payload.get("timestamp")
or payload.get("created_at")
or datetime.utcnow().isoformat()
)
ts = datetime.fromisoformat(str(ts_raw).replace("Z", "+00:00"))
tz = pytz.timezone(TZ)
ts_local = ts.astimezone(tz)
date = ts_local.strftime("%Y-%m-%d")
ts_str = ts_local.strftime("%H:%M:%S")
with get_db() as db: with get_db() as db:
db.execute( db.execute(
"INSERT INTO badge_events (actor_id, ts, date) VALUES (?, ?, ?)", "INSERT INTO badge_events (actor_id, ts, date) VALUES (?, ?, ?)",
(actor, ts_s, date) (actor, ts_str, date),
) )
db.commit() db.commit()
log.info("Badge-in recorded: actor=%s date=%s ts=%s", actor, date, ts_s) log.info("Badge-in recorded: actor=%s date=%s ts=%s", actor, date, ts_str)
return jsonify({"status": "ok"}), 200 return jsonify({"status": "ok"}), 200
@app.route("/api/first-badge-status") @app.route("/api/first-badge-status")
def first_badge_status(): def first_badge_status():
date = request.args.get("date", datetime.now().strftime("%Y-%m-%d")) date = request.args.get("date", datetime.now().strftime("%Y-%m-%d"))
cutoff = request.args.get("cutoff", "09:00") cutoff = request.args.get("cutoff", "09:00") # HH:MM
with get_db() as db: with get_db() as db:
rows = db.execute(""" rows = db.execute(
"""
SELECT SELECT
b.actor_id, b.actor_id,
MIN(b.ts) AS first_ts, MIN(b.ts) AS first_ts,
MAX(b.ts) AS latest_ts, MAX(b.ts) AS latest_ts,
COALESCE(u.full_name, 'Unknown (' || SUBSTR(b.actor_id,1,8) || '...)') AS name COALESCE(
u.full_name,
'Unknown (' || SUBSTR(b.actor_id,1,8) || '...)'
) AS name
FROM badge_events b FROM badge_events b
LEFT JOIN user_cache u ON u.actor_id = b.actor_id LEFT JOIN user_cache u ON u.actor_id = b.actor_id
WHERE b.date = ? WHERE b.date = ?
GROUP BY b.actor_id GROUP BY b.actor_id
ORDER BY first_ts ASC ORDER BY first_ts ASC
""", (date,)).fetchall() """,
(date,),
).fetchall()
result = [] result = []
for r in rows: for r in rows:
first = r["first_ts"] first = r["first_ts"]
latest = r["latest_ts"] latest = r["latest_ts"]
# Compare as strings HH:MM:SS against cutoff HH:MM (treat <= cutoff:59 as on time)
status = "ON TIME" if first <= cutoff + ":59" else "LATE" status = "ON TIME" if first <= cutoff + ":59" else "LATE"
result.append({ result.append(
"actor_id": r["actor_id"], {
"name": r["name"], "actor_id": r["actor_id"],
"first_ts": first, "name": r["name"],
"latest_ts": latest if latest != first else None, "first_ts": first,
"status": status "latest_ts": latest if latest != first else None,
}) "status": status,
}
)
return jsonify(result) return jsonify(result)
@app.route("/api/sync-users") @app.route("/api/sync-users")
def manual_sync(): def manual_sync():
sync_unifi_users() sync_unifi_users()
return jsonify({"status": "synced"}) return jsonify({"status": "synced"})
@app.route("/api/reset-day", methods=["DELETE"]) @app.route("/api/reset-day", methods=["DELETE"])
def reset_day(): def reset_day():
date = request.args.get("date", datetime.now().strftime("%Y-%m-%d")) date = request.args.get("date", datetime.now().strftime("%Y-%m-%d"))
@@ -193,6 +241,7 @@ def reset_day():
db.commit() db.commit()
return jsonify({"status": "ok", "deleted": cur.rowcount, "date": date}) return jsonify({"status": "ok", "deleted": cur.rowcount, "date": date})
# Initialise DB and kick off background scheduler at import time # Initialise DB and kick off background scheduler at import time
with app.app_context(): with app.app_context():
init_db() init_db()