Add Plaud MCP server (plaud_mcp.py) — 7 tools

This commit is contained in:
2026-05-12 12:27:32 -05:00
parent e89096f9ce
commit 99f8aa7b8e
+338
View File
@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""
Plaud MCP Server for MPM
Connects to the Plaud API and exposes tools for listing, searching,
and reading transcripts, summaries, and notes from Plaud recordings.
Credentials: reads token from ~/.plaud/config.json (written by plaud-connector
import-token command). Falls back to PLAUD_TOKEN env var.
"""
import json
import os
import urllib.request
import urllib.error
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
# ── Configuration ─────────────────────────────────────────────────────────────
# US region: api.plaud.ai | EU region: api.plaud.eu
PLAUD_REGION = os.environ.get("PLAUD_REGION", "us")
BASE_URL = "https://api.plaud.eu" if PLAUD_REGION == "eu" else "https://api.plaud.ai"
CONFIG_PATH = Path.home() / ".plaud" / "config.json"
# ── Token resolution ──────────────────────────────────────────────────────────
def _load_token() -> Optional[str]:
"""
Resolve the Plaud bearer token. Priority:
1. PLAUD_TOKEN env var
2. ~/.plaud/config.json → token.token (plaud-connector format)
3. ~/.plaud/config.json → token (bare string fallback)
"""
env_token = os.environ.get("PLAUD_TOKEN", "").strip()
if env_token:
return env_token
if CONFIG_PATH.exists():
try:
raw = CONFIG_PATH.read_text()
data = json.loads(raw)
# plaud-connector / plaud-toolkit format: { "token": { "token": "...", "expiresAt": ... } }
token_block = data.get("token")
if isinstance(token_block, dict):
t = token_block.get("token", "").strip()
if t:
return t
# bare string fallback
if isinstance(token_block, str) and token_block.strip():
return token_block.strip()
except Exception:
pass
return None
def _get_token() -> str:
token = _load_token()
if not token:
raise RuntimeError(
"No Plaud token found. Run:\n"
" npx tsx ~/Developer/plaud-connector/packages/cli/bin/plaud.ts import-token app \"<jwt>\"\n"
"or set the PLAUD_TOKEN environment variable."
)
return token
# ── HTTP helpers ──────────────────────────────────────────────────────────────
def _api_get(path: str, params: Optional[dict] = None) -> dict:
"""GET request to the Plaud API. Returns parsed JSON."""
token = _get_token()
url = f"{BASE_URL}{path}"
if params:
query = "&".join(f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items() if v is not None)
if query:
url = f"{url}?{query}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
req.add_header("User-Agent", "plaud-mpm-cowork/0.1.0")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Plaud API error {e.code}: {body[:300]}")
except urllib.error.URLError as e:
raise RuntimeError(f"Network error reaching Plaud API: {e.reason}")
# ── Data helpers ──────────────────────────────────────────────────────────────
def _fmt_duration(ms: int) -> str:
mins = ms // 60000
if mins < 60:
return f"{mins}m"
return f"{mins // 60}h {mins % 60}m"
def _fmt_date(ts_ms: int) -> str:
try:
dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M UTC")
except Exception:
return str(ts_ms)
def _summarise_recording(r: dict) -> dict:
"""Return a compact dict suitable for listing."""
return {
"id": r.get("id", ""),
"title": r.get("filename") or r.get("title") or "(untitled)",
"date": _fmt_date(r.get("start_time", 0)),
"duration": _fmt_duration(r.get("duration", 0)),
"has_transcript": bool(r.get("is_trans")),
"has_summary": bool(r.get("auto_sum_note") or r.get("is_sum")),
}
# ── MCP server ────────────────────────────────────────────────────────────────
mcp = FastMCP("plaud-mpm")
@mcp.tool()
def plaud_list_recordings(
limit: int = 50,
only_with_transcript: bool = False,
) -> str:
"""
List Plaud recordings newest-first.
Args:
limit: Max number to return (default 50, max 200).
only_with_transcript: If true, only return recordings that have a transcript.
"""
resp = _api_get("/file/simple/web")
recordings = resp.get("data", resp) if isinstance(resp, dict) else resp
if not isinstance(recordings, list):
recordings = []
if only_with_transcript:
recordings = [r for r in recordings if r.get("is_trans")]
recordings = recordings[:min(limit, 200)]
result = [_summarise_recording(r) for r in recordings]
return json.dumps(result, indent=2)
@mcp.tool()
def plaud_search_recordings(
title_contains: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
only_with_transcript: bool = False,
max_results: int = 20,
) -> str:
"""
Search recordings by title and/or date range.
Args:
title_contains: Substring to match in the recording title (case-insensitive).
start_date: ISO date string, e.g. '2026-04-01'. Only recordings on or after this date.
end_date: ISO date string, e.g. '2026-05-01'. Only recordings on or before this date.
only_with_transcript: If true, only return recordings that have a transcript.
max_results: Max number of results to return (default 20).
"""
resp = _api_get("/file/simple/web")
recordings = resp.get("data", resp) if isinstance(resp, dict) else resp
if not isinstance(recordings, list):
recordings = []
def matches(r: dict) -> bool:
if only_with_transcript and not r.get("is_trans"):
return False
title = (r.get("filename") or r.get("title") or "").lower()
if title_contains and title_contains.lower() not in title:
return False
if start_date or end_date:
ts_ms = r.get("start_time", 0)
rec_date = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).date()
if start_date:
if rec_date < datetime.fromisoformat(start_date).date():
return False
if end_date:
if rec_date > datetime.fromisoformat(end_date).date():
return False
return True
matched = [r for r in recordings if matches(r)][:max_results]
return json.dumps([_summarise_recording(r) for r in matched], indent=2)
@mcp.tool()
def plaud_get_transcript(recording_id: str) -> str:
"""
Get the full transcript for a Plaud recording.
Args:
recording_id: The recording ID (from plaud_list_recordings).
"""
resp = _api_get(f"/file/detail/{recording_id}")
data = resp.get("data", resp) if isinstance(resp, dict) else resp
title = data.get("filename") or data.get("title") or "(untitled)"
transcript = data.get("transcript") or ""
# Some recordings store transcript in source_list segments
if not transcript:
source_list = data.get("source_list") or []
if source_list:
transcript = "\n".join(
f"[{s.get('speaker', '')}] {s.get('content', '')}" if s.get("speaker")
else s.get("content", "")
for s in source_list
)
return json.dumps({
"id": recording_id,
"title": title,
"date": _fmt_date(data.get("start_time", 0)),
"duration": _fmt_duration(data.get("duration", 0)),
"transcript": transcript or "No transcript available for this recording.",
}, indent=2)
@mcp.tool()
def plaud_get_summary(recording_id: str) -> str:
"""
Get the AI-generated summary for a Plaud recording.
Args:
recording_id: The recording ID (from plaud_list_recordings).
"""
resp = _api_get(f"/file/detail/{recording_id}")
data = resp.get("data", resp) if isinstance(resp, dict) else resp
title = data.get("filename") or data.get("title") or "(untitled)"
summary = data.get("auto_sum_note") or ""
return json.dumps({
"id": recording_id,
"title": title,
"date": _fmt_date(data.get("start_time", 0)),
"summary": summary or "No AI summary available for this recording.",
}, indent=2)
@mcp.tool()
def plaud_get_notes(recording_id: str) -> str:
"""
Get AI-generated notes and action items for a Plaud recording.
Returns all note entries from the content_list (action items, key points, etc.).
Args:
recording_id: The recording ID (from plaud_list_recordings).
"""
resp = _api_get(f"/file/detail/{recording_id}")
data = resp.get("data", resp) if isinstance(resp, dict) else resp
title = data.get("filename") or data.get("title") or "(untitled)"
content_list = data.get("content_list") or []
notes = []
for item in content_list:
note_entry = {
"type": item.get("data_type") or item.get("template_type") or "note",
"title": item.get("template_name") or item.get("tab_name") or "",
"content": item.get("note") or item.get("content") or "",
}
if note_entry["content"]:
notes.append(note_entry)
return json.dumps({
"id": recording_id,
"title": title,
"date": _fmt_date(data.get("start_time", 0)),
"notes": notes if notes else [{"type": "none", "content": "No notes available for this recording."}],
}, indent=2)
@mcp.tool()
def plaud_get_recording_detail(recording_id: str) -> str:
"""
Get full metadata for a Plaud recording including title, date, duration,
transcript availability, summary availability, and note count.
Args:
recording_id: The recording ID (from plaud_list_recordings).
"""
resp = _api_get(f"/file/detail/{recording_id}")
data = resp.get("data", resp) if isinstance(resp, dict) else resp
return json.dumps({
"id": recording_id,
"title": data.get("filename") or data.get("title") or "(untitled)",
"date": _fmt_date(data.get("start_time", 0)),
"duration": _fmt_duration(data.get("duration", 0)),
"has_transcript": bool(data.get("is_trans")),
"has_summary": bool(data.get("auto_sum_note") or data.get("is_sum")),
"note_count": len(data.get("content_list") or []),
"language": data.get("language") or "unknown",
"device": data.get("device_name") or data.get("device") or "unknown",
"tags": data.get("filetag") or [],
}, indent=2)
@mcp.tool()
def plaud_user_info() -> str:
"""Get the current Plaud account information."""
try:
resp = _api_get("/user/info")
data = resp.get("data", resp) if isinstance(resp, dict) else resp
return json.dumps({
"email": data.get("email") or data.get("username") or "unknown",
"name": data.get("name") or data.get("display_name") or "",
"region": PLAUD_REGION,
"api_base": BASE_URL,
"token_loaded": bool(_load_token()),
}, indent=2)
except Exception as e:
return json.dumps({
"region": PLAUD_REGION,
"api_base": BASE_URL,
"token_loaded": bool(_load_token()),
"note": str(e),
}, indent=2)
if __name__ == "__main__":
mcp.run()