Merge branch 'main' of https://github.com/taylorwilsdon/google_workspace_mcp into tool_consolidation

This commit is contained in:
Taylor Wilsdon
2026-03-01 12:36:10 -05:00
2 changed files with 115 additions and 12 deletions

View File

@@ -64,11 +64,11 @@ A production-ready MCP server that integrates all major Google Workspace service
<td width="50%" valign="top">
**<span style="color:#72898f">@</span> Gmail** • **<span style="color:#72898f">≡</span> Drive** • **<span style="color:#72898f">⧖</span> Calendar** **<span style="color:#72898f">≡</span> Docs**
- Complete Gmail management, end to end coverage
- Complete Gmail management, end-to-end coverage
- Full calendar management with advanced features
- File operations with Office format support
- Document creation, editing & comments
- Deep, exhaustive support for fine grained editing
- Deep, exhaustive support for fine-grained editing
---

View File

@@ -19,9 +19,10 @@ from email import encoders
from email.utils import formataddr
from pydantic import Field
from googleapiclient.errors import HttpError
from auth.service_decorator import require_google_service
from core.utils import handle_http_errors, validate_file_path
from core.utils import handle_http_errors, validate_file_path, UserInputError
from core.server import server
from auth.scopes import (
GMAIL_SEND_SCOPE,
@@ -172,6 +173,74 @@ def _format_body_content(text_body: str, html_body: str) -> str:
return "[No readable content found]"
def _append_signature_to_body(
body: str, body_format: Literal["plain", "html"], signature_html: str
) -> str:
"""Append a Gmail signature to the outgoing body, preserving body format."""
if not signature_html or not signature_html.strip():
return body
if body_format == "html":
separator = "<br><br>" if body.strip() else ""
return f"{body}{separator}{signature_html}"
signature_text = _html_to_text(signature_html).strip()
if not signature_text:
return body
separator = "\n\n" if body.strip() else ""
return f"{body}{separator}{signature_text}"
async def _get_send_as_signature_html(service, from_email: Optional[str] = None) -> str:
"""
Fetch signature HTML from Gmail send-as settings.
Returns empty string when the account has no signature configured or the
OAuth token cannot access settings endpoints.
"""
try:
response = await asyncio.to_thread(
service.users().settings().sendAs().list(userId="me").execute
)
except HttpError as e:
status = getattr(getattr(e, "resp", None), "status", None)
if status in {401, 403}:
logger.info(
"Skipping Gmail signature fetch: missing auth/scope for settings endpoint."
)
return ""
logger.warning(f"Failed to fetch Gmail send-as signatures: {e}")
return ""
except Exception as e:
logger.warning(f"Failed to fetch Gmail send-as signatures: {e}")
return ""
send_as_entries = response.get("sendAs", [])
if not send_as_entries:
return ""
if from_email:
from_email_normalized = from_email.strip().lower()
for entry in send_as_entries:
if entry.get("sendAsEmail", "").strip().lower() == from_email_normalized:
return entry.get("signature", "") or ""
for entry in send_as_entries:
if entry.get("isPrimary"):
return entry.get("signature", "") or ""
return send_as_entries[0].get("signature", "") or ""
def _format_attachment_result(attached_count: int, requested_count: int) -> str:
"""Format attachment result message for user-facing responses."""
if requested_count <= 0:
return ""
if attached_count == requested_count:
return f" with {attached_count} attachment(s)"
return f" with {attached_count}/{requested_count} attachment(s) attached"
def _extract_attachments(payload: dict) -> List[Dict[str, Any]]:
"""
Extract attachment metadata from a Gmail message payload.
@@ -241,7 +310,7 @@ def _prepare_gmail_message(
from_email: Optional[str] = None,
from_name: Optional[str] = None,
attachments: Optional[List[Dict[str, str]]] = None,
) -> tuple[str, Optional[str]]:
) -> tuple[str, Optional[str], int]:
"""
Prepare a Gmail message with threading and attachment support.
@@ -260,7 +329,7 @@ def _prepare_gmail_message(
attachments: Optional list of attachments. Each can have 'path' (file path) OR 'content' (base64) + 'filename'
Returns:
Tuple of (raw_message, thread_id) where raw_message is base64 encoded
Tuple of (raw_message, thread_id, attached_count) where raw_message is base64 encoded
"""
# Handle reply subject formatting
reply_subject = subject
@@ -273,6 +342,7 @@ def _prepare_gmail_message(
raise ValueError("body_format must be either 'plain' or 'html'.")
# Use multipart if attachments are provided
attached_count = 0
if attachments:
message = MIMEMultipart()
message.attach(MIMEText(body, normalized_format))
@@ -344,6 +414,7 @@ def _prepare_gmail_message(
)
message.attach(part)
attached_count += 1
logger.info(f"Attached file: {filename} ({len(file_data)} bytes)")
except Exception as e:
logger.error(f"Failed to attach {filename or file_path}: {e}")
@@ -382,7 +453,7 @@ def _prepare_gmail_message(
# Encode message
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
return raw_message, thread_id
return raw_message, thread_id, attached_count
def _generate_gmail_web_url(item_id: str, account_index: int = 0) -> str:
@@ -1177,7 +1248,7 @@ async def send_gmail_message(
# Prepare the email message
# Use from_email (Send As alias) if provided, otherwise default to authenticated user
sender_email = from_email or user_google_email
raw_message, thread_id_final = _prepare_gmail_message(
raw_message, thread_id_final, attached_count = _prepare_gmail_message(
subject=subject,
body=body,
to=to,
@@ -1192,6 +1263,12 @@ async def send_gmail_message(
attachments=attachments if attachments else None,
)
requested_attachment_count = len(attachments or [])
if requested_attachment_count > 0 and attached_count == 0:
raise UserInputError(
"No valid attachments were added. Verify each attachment path/content and retry."
)
send_body = {"raw": raw_message}
# Associate with thread if provided
@@ -1204,8 +1281,11 @@ async def send_gmail_message(
)
message_id = sent_message.get("id")
if attachments:
return f"Email sent with {len(attachments)} attachment(s)! Message ID: {message_id}"
if requested_attachment_count > 0:
attachment_info = _format_attachment_result(
attached_count, requested_attachment_count
)
return f"Email sent{attachment_info}! Message ID: {message_id}"
return f"Email sent! Message ID: {message_id}"
@@ -1271,6 +1351,12 @@ async def draft_gmail_message(
description="Optional list of attachments. Each can have: 'path' (file path, auto-encodes), OR 'content' (standard base64, not urlsafe) + 'filename'. Optional 'mime_type' (auto-detected from path if not provided).",
),
] = None,
include_signature: Annotated[
bool,
Field(
description="Whether to append the Gmail signature from Settings > Signature when available. Defaults to true.",
),
] = True,
) -> str:
"""
Creates a draft email in the user's Gmail account. Supports both new drafts and reply drafts with optional attachments.
@@ -1300,6 +1386,8 @@ async def draft_gmail_message(
- 'content' (required): Standard base64-encoded file content (not urlsafe)
- 'filename' (required): Name of the file
- 'mime_type' (optional): MIME type (defaults to 'application/octet-stream')
include_signature (bool): Whether to append Gmail signature HTML from send-as settings.
If unavailable (e.g., missing gmail.settings.basic scope), the draft is still created without signature.
Returns:
str: Confirmation message with the created draft's ID.
@@ -1363,9 +1451,16 @@ async def draft_gmail_message(
# Prepare the email message
# Use from_email (Send As alias) if provided, otherwise default to authenticated user
sender_email = from_email or user_google_email
raw_message, thread_id_final = _prepare_gmail_message(
draft_body = body
if include_signature:
signature_html = await _get_send_as_signature_html(
service, from_email=sender_email
)
draft_body = _append_signature_to_body(draft_body, body_format, signature_html)
raw_message, thread_id_final, attached_count = _prepare_gmail_message(
subject=subject,
body=body,
body=draft_body,
body_format=body_format,
to=to,
cc=cc,
@@ -1378,6 +1473,12 @@ async def draft_gmail_message(
attachments=attachments,
)
requested_attachment_count = len(attachments or [])
if requested_attachment_count > 0 and attached_count == 0:
raise UserInputError(
"No valid attachments were added. Verify each attachment path/content and retry."
)
# Create a draft instead of sending
draft_body = {"message": {"raw": raw_message}}
@@ -1390,7 +1491,9 @@ async def draft_gmail_message(
service.users().drafts().create(userId="me", body=draft_body).execute
)
draft_id = created_draft.get("id")
attachment_info = f" with {len(attachments)} attachment(s)" if attachments else ""
attachment_info = _format_attachment_result(
attached_count, requested_attachment_count
)
return f"Draft created{attachment_info}! Draft ID: {draft_id}"