Add HTTP URL-based attachment serving for Gmail attachments

This commit implements a new feature that allows Gmail attachments to be
served via HTTP URLs instead of returning base64-encoded data in the tool
response. This avoids consuming LLM context window space and token budgets
for large attachments.

Architecture:
-------------
The implementation works in both stdio and streamable-http transport modes:

1. Temp File Storage (core/attachment_storage.py):
   - New AttachmentStorage class manages temporary file storage in ./tmp/attachments/
   - Uses UUID-based file IDs to prevent guessing/unauthorized access
   - Tracks metadata: filename, mime type, size, creation/expiration times
   - Files expire after 1 hour (configurable) with automatic cleanup support
   - Handles base64 decoding and file writing

2. HTTP Route Handlers:
   - Added /attachments/{file_id} route to main FastMCP server (streamable-http mode)
   - Added same route to MinimalOAuthServer (stdio mode)
   - Both routes serve files with proper Content-Type headers via FileResponse
   - Returns 404 for expired or missing attachments

3. Modified get_gmail_attachment_content():
   - Now saves attachments to temp storage and returns HTTP URL
   - Attempts to fetch filename/mimeType from message metadata (best effort)
   - Handles stateless mode gracefully (skips file saving, shows preview)
   - Falls back to base64 preview if file saving fails
   - URL generation respects WORKSPACE_EXTERNAL_URL for reverse proxy setups

Key Features:
-------------
- Works in both stdio and streamable-http modes (uses existing HTTP servers)
- Respects stateless mode (no file writes when WORKSPACE_MCP_STATELESS_MODE=true)
- Secure: UUID-based file IDs prevent unauthorized access
- Automatic expiration: Files cleaned up after 1 hour
- Reverse proxy support: Uses WORKSPACE_EXTERNAL_URL if configured
- Graceful degradation: Falls back to preview if storage fails

Benefits:
---------
- Avoids context window bloat: Large attachments don't consume LLM tokens
- Better performance: Clients can stream/download files directly
- More efficient: No need to decode base64 in client applications
- Works across network boundaries: URLs accessible from any client

The feature maintains backward compatibility - if file saving fails or stateless
mode is enabled, the function falls back to showing a base64 preview.
This commit is contained in:
Josh Dzielak
2025-11-29 15:06:57 +01:00
parent 0402b1a0b8
commit ee1db221af
4 changed files with 360 additions and 13 deletions

218
core/attachment_storage.py Normal file
View File

@@ -0,0 +1,218 @@
"""
Temporary attachment storage for Gmail attachments.
Stores attachments in ./tmp directory and provides HTTP URLs for access.
Files are automatically cleaned up after expiration (default 1 hour).
"""
import base64
import logging
import os
import time
import uuid
from pathlib import Path
from typing import Optional, Dict
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
# Default expiration: 1 hour
DEFAULT_EXPIRATION_SECONDS = 3600
# Storage directory
STORAGE_DIR = Path("./tmp/attachments")
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
class AttachmentStorage:
"""Manages temporary storage of email attachments."""
def __init__(self, expiration_seconds: int = DEFAULT_EXPIRATION_SECONDS):
self.expiration_seconds = expiration_seconds
self._metadata: Dict[str, Dict] = {}
def save_attachment(
self,
base64_data: str,
filename: Optional[str] = None,
mime_type: Optional[str] = None,
) -> str:
"""
Save an attachment and return a unique file ID.
Args:
base64_data: Base64-encoded attachment data
filename: Original filename (optional)
mime_type: MIME type (optional)
Returns:
Unique file ID (UUID string)
"""
# Generate unique file ID
file_id = str(uuid.uuid4())
# Decode base64 data
try:
file_bytes = base64.urlsafe_b64decode(base64_data)
except Exception as e:
logger.error(f"Failed to decode base64 attachment data: {e}")
raise ValueError(f"Invalid base64 data: {e}")
# Determine file extension from filename or mime type
extension = ""
if filename:
extension = Path(filename).suffix
elif mime_type:
# Basic mime type to extension mapping
mime_to_ext = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"application/pdf": ".pdf",
"application/zip": ".zip",
"text/plain": ".txt",
"text/html": ".html",
}
extension = mime_to_ext.get(mime_type, "")
# Save file
file_path = STORAGE_DIR / f"{file_id}{extension}"
try:
file_path.write_bytes(file_bytes)
logger.info(f"Saved attachment {file_id} ({len(file_bytes)} bytes) to {file_path}")
except Exception as e:
logger.error(f"Failed to save attachment to {file_path}: {e}")
raise
# Store metadata
expires_at = datetime.now() + timedelta(seconds=self.expiration_seconds)
self._metadata[file_id] = {
"file_path": str(file_path),
"filename": filename or f"attachment{extension}",
"mime_type": mime_type or "application/octet-stream",
"size": len(file_bytes),
"created_at": datetime.now(),
"expires_at": expires_at,
}
return file_id
def get_attachment_path(self, file_id: str) -> Optional[Path]:
"""
Get the file path for an attachment ID.
Args:
file_id: Unique file ID
Returns:
Path object if file exists and not expired, None otherwise
"""
if file_id not in self._metadata:
logger.warning(f"Attachment {file_id} not found in metadata")
return None
metadata = self._metadata[file_id]
file_path = Path(metadata["file_path"])
# Check if expired
if datetime.now() > metadata["expires_at"]:
logger.info(f"Attachment {file_id} has expired, cleaning up")
self._cleanup_file(file_id)
return None
# Check if file exists
if not file_path.exists():
logger.warning(f"Attachment file {file_path} does not exist")
del self._metadata[file_id]
return None
return file_path
def get_attachment_metadata(self, file_id: str) -> Optional[Dict]:
"""
Get metadata for an attachment.
Args:
file_id: Unique file ID
Returns:
Metadata dict if exists and not expired, None otherwise
"""
if file_id not in self._metadata:
return None
metadata = self._metadata[file_id].copy()
# Check if expired
if datetime.now() > metadata["expires_at"]:
self._cleanup_file(file_id)
return None
return metadata
def _cleanup_file(self, file_id: str) -> None:
"""Remove file and metadata."""
if file_id in self._metadata:
file_path = Path(self._metadata[file_id]["file_path"])
try:
if file_path.exists():
file_path.unlink()
logger.debug(f"Deleted expired attachment file: {file_path}")
except Exception as e:
logger.warning(f"Failed to delete attachment file {file_path}: {e}")
del self._metadata[file_id]
def cleanup_expired(self) -> int:
"""
Clean up expired attachments.
Returns:
Number of files cleaned up
"""
now = datetime.now()
expired_ids = [
file_id
for file_id, metadata in self._metadata.items()
if now > metadata["expires_at"]
]
for file_id in expired_ids:
self._cleanup_file(file_id)
return len(expired_ids)
# Global instance
_attachment_storage: Optional[AttachmentStorage] = None
def get_attachment_storage() -> AttachmentStorage:
"""Get the global attachment storage instance."""
global _attachment_storage
if _attachment_storage is None:
_attachment_storage = AttachmentStorage()
return _attachment_storage
def get_attachment_url(file_id: str) -> str:
"""
Generate a URL for accessing an attachment.
Args:
file_id: Unique file ID
Returns:
Full URL to access the attachment
"""
import os
from core.config import WORKSPACE_MCP_PORT, WORKSPACE_MCP_BASE_URI
# Use external URL if set (for reverse proxy scenarios)
external_url = os.getenv("WORKSPACE_EXTERNAL_URL")
if external_url:
base_url = external_url.rstrip("/")
else:
base_url = f"{WORKSPACE_MCP_BASE_URI}:{WORKSPACE_MCP_PORT}"
return f"{base_url}/attachments/{file_id}"

View File

@@ -2,7 +2,7 @@ import logging
from typing import List, Optional
from importlib import metadata
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.middleware import Middleware
@@ -156,6 +156,33 @@ async def health_check(request: Request):
"transport": get_transport_mode()
})
@server.custom_route("/attachments/{file_id}", methods=["GET"])
async def serve_attachment(file_id: str, request: Request):
"""Serve a stored attachment file."""
from core.attachment_storage import get_attachment_storage
storage = get_attachment_storage()
metadata = storage.get_attachment_metadata(file_id)
if not metadata:
return JSONResponse(
{"error": "Attachment not found or expired"},
status_code=404
)
file_path = storage.get_attachment_path(file_id)
if not file_path:
return JSONResponse(
{"error": "Attachment file not found"},
status_code=404
)
return FileResponse(
path=str(file_path),
filename=metadata["filename"],
media_type=metadata["mime_type"]
)
async def legacy_oauth2_callback(request: Request) -> HTMLResponse:
state = request.query_params.get("state")
code = request.query_params.get("code")