""" Google Gmail MCP Tools This module provides MCP tools for interacting with the Gmail API. """ import logging import asyncio import base64 import re import ssl import mimetypes from html.parser import HTMLParser from typing import Annotated, Optional, List, Dict, Literal, Any from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders from email.utils import formataddr from pydantic import Field from googleapiclient.errors import HttpError from auth.service_decorator import require_google_service from core.utils import handle_http_errors, validate_file_path, UserInputError from core.server import server from auth.scopes import ( GMAIL_SEND_SCOPE, GMAIL_COMPOSE_SCOPE, GMAIL_MODIFY_SCOPE, GMAIL_LABELS_SCOPE, ) logger = logging.getLogger(__name__) GMAIL_BATCH_SIZE = 25 GMAIL_REQUEST_DELAY = 0.1 HTML_BODY_TRUNCATE_LIMIT = 20000 GMAIL_METADATA_HEADERS = [ "Subject", "From", "To", "Cc", "Message-ID", "In-Reply-To", "References", "Date", ] LOW_VALUE_TEXT_PLACEHOLDERS = ( "your client does not support html", "view this email in your browser", "open this email in your browser", ) LOW_VALUE_TEXT_FOOTER_MARKERS = ( "mailing list", "mailman/listinfo", "unsubscribe", "list-unsubscribe", "manage preferences", ) LOW_VALUE_TEXT_HTML_DIFF_MIN = 80 class _HTMLTextExtractor(HTMLParser): """Extract readable text from HTML using stdlib.""" def __init__(self): super().__init__() self._text = [] self._skip = False def handle_starttag(self, tag, attrs): self._skip = tag in ("script", "style") def handle_endtag(self, tag): if tag in ("script", "style"): self._skip = False def handle_data(self, data): if not self._skip: self._text.append(data) def get_text(self) -> str: return " ".join("".join(self._text).split()) def _html_to_text(html: str) -> str: """Convert HTML to readable plain text.""" try: parser = _HTMLTextExtractor() parser.feed(html) return parser.get_text() except Exception: return html def _extract_message_body(payload): """ Helper function to extract plain text body from a Gmail message payload. (Maintained for backward compatibility) Args: payload (dict): The message payload from Gmail API Returns: str: The plain text body content, or empty string if not found """ bodies = _extract_message_bodies(payload) return bodies.get("text", "") def _extract_message_bodies(payload): """ Helper function to extract both plain text and HTML bodies from a Gmail message payload. Args: payload (dict): The message payload from Gmail API Returns: dict: Dictionary with 'text' and 'html' keys containing body content """ text_body = "" html_body = "" parts = [payload] if "parts" not in payload else payload.get("parts", []) part_queue = list(parts) # Use a queue for BFS traversal of parts while part_queue: part = part_queue.pop(0) mime_type = part.get("mimeType", "") body_data = part.get("body", {}).get("data") if body_data: try: decoded_data = base64.urlsafe_b64decode(body_data).decode( "utf-8", errors="ignore" ) if mime_type == "text/plain" and not text_body: text_body = decoded_data elif mime_type == "text/html" and not html_body: html_body = decoded_data except Exception as e: logger.warning(f"Failed to decode body part: {e}") # Add sub-parts to queue for multipart messages if mime_type.startswith("multipart/") and "parts" in part: part_queue.extend(part.get("parts", [])) # Check the main payload if it has body data directly if payload.get("body", {}).get("data"): try: decoded_data = base64.urlsafe_b64decode(payload["body"]["data"]).decode( "utf-8", errors="ignore" ) mime_type = payload.get("mimeType", "") if mime_type == "text/plain" and not text_body: text_body = decoded_data elif mime_type == "text/html" and not html_body: html_body = decoded_data except Exception as e: logger.warning(f"Failed to decode main payload body: {e}") return {"text": text_body, "html": html_body} def _format_body_content(text_body: str, html_body: str) -> str: """ Helper function to format message body content with HTML fallback and truncation. Detects useless text/plain fallbacks (e.g., "Your client does not support HTML"). Args: text_body: Plain text body content html_body: HTML body content Returns: Formatted body content string """ text_stripped = text_body.strip() html_stripped = html_body.strip() html_text = _html_to_text(html_stripped).strip() if html_stripped else "" plain_lower = " ".join(text_stripped.split()).lower() html_lower = " ".join(html_text.split()).lower() plain_is_low_value = plain_lower and ( any(marker in plain_lower for marker in LOW_VALUE_TEXT_PLACEHOLDERS) or ( any(marker in plain_lower for marker in LOW_VALUE_TEXT_FOOTER_MARKERS) and len(html_lower) >= len(plain_lower) + LOW_VALUE_TEXT_HTML_DIFF_MIN ) or ( len(html_lower) >= len(plain_lower) + LOW_VALUE_TEXT_HTML_DIFF_MIN and html_lower.endswith(plain_lower) ) ) # Prefer plain text, but fall back to HTML when plain text is empty or clearly low-value. use_html = html_text and ( not text_stripped or "