# Conflicts:
#	uv.lock
This commit is contained in:
Taylor Wilsdon
2026-02-24 10:13:14 -04:00
40 changed files with 3515 additions and 335 deletions

View File

@@ -1,15 +1,16 @@
"""
Temporary attachment storage for Gmail attachments.
Stores attachments in ./tmp directory and provides HTTP URLs for access.
Stores attachments to local disk and returns file paths for direct access.
Files are automatically cleaned up after expiration (default 1 hour).
"""
import base64
import logging
import os
import uuid
from pathlib import Path
from typing import Optional, Dict
from typing import NamedTuple, Optional, Dict
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
@@ -17,9 +18,24 @@ logger = logging.getLogger(__name__)
# Default expiration: 1 hour
DEFAULT_EXPIRATION_SECONDS = 3600
# Storage directory
STORAGE_DIR = Path("./tmp/attachments")
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
# Storage directory - configurable via WORKSPACE_ATTACHMENT_DIR env var
# Uses absolute path to avoid creating tmp/ in arbitrary working directories (see #327)
_default_dir = str(Path.home() / ".workspace-mcp" / "attachments")
STORAGE_DIR = (
Path(os.getenv("WORKSPACE_ATTACHMENT_DIR", _default_dir)).expanduser().resolve()
)
def _ensure_storage_dir() -> None:
"""Create the storage directory on first use, not at import time."""
STORAGE_DIR.mkdir(parents=True, exist_ok=True, mode=0o700)
class SavedAttachment(NamedTuple):
"""Result of saving an attachment: provides both the UUID and the absolute file path."""
file_id: str
path: str
class AttachmentStorage:
@@ -34,9 +50,9 @@ class AttachmentStorage:
base64_data: str,
filename: Optional[str] = None,
mime_type: Optional[str] = None,
) -> str:
) -> SavedAttachment:
"""
Save an attachment and return a unique file ID.
Save an attachment to local disk.
Args:
base64_data: Base64-encoded attachment data
@@ -44,9 +60,11 @@ class AttachmentStorage:
mime_type: MIME type (optional)
Returns:
Unique file ID (UUID string)
SavedAttachment with file_id (UUID) and path (absolute file path)
"""
# Generate unique file ID
_ensure_storage_dir()
# Generate unique file ID for metadata tracking
file_id = str(uuid.uuid4())
# Decode base64 data
@@ -73,15 +91,39 @@ class AttachmentStorage:
}
extension = mime_to_ext.get(mime_type, "")
# Save file
file_path = STORAGE_DIR / f"{file_id}{extension}"
# Use original filename if available, with UUID suffix for uniqueness
if filename:
stem = Path(filename).stem
ext = Path(filename).suffix
save_name = f"{stem}_{file_id[:8]}{ext}"
else:
save_name = f"{file_id}{extension}"
# Save file with restrictive permissions (sensitive email/drive content)
file_path = STORAGE_DIR / save_name
try:
file_path.write_bytes(file_bytes)
fd = os.open(file_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
total_written = 0
data_len = len(file_bytes)
while total_written < data_len:
written = os.write(fd, file_bytes[total_written:])
if written == 0:
raise OSError(
"os.write returned 0 bytes; could not write attachment data"
)
total_written += written
finally:
os.close(fd)
logger.info(
f"Saved attachment {file_id} ({len(file_bytes)} bytes) to {file_path}"
f"Saved attachment file_id={file_id} filename={filename or save_name} "
f"({len(file_bytes)} bytes) to {file_path}"
)
except Exception as e:
logger.error(f"Failed to save attachment to {file_path}: {e}")
logger.error(
f"Failed to save attachment file_id={file_id} "
f"filename={filename or save_name} to {file_path}: {e}"
)
raise
# Store metadata
@@ -95,7 +137,7 @@ class AttachmentStorage:
"expires_at": expires_at,
}
return file_id
return SavedAttachment(file_id=file_id, path=str(file_path))
def get_attachment_path(self, file_id: str) -> Optional[Path]:
"""
@@ -204,7 +246,6 @@ def get_attachment_url(file_id: str) -> str:
Returns:
Full URL to access the attachment
"""
import os
from core.config import WORKSPACE_MCP_PORT, WORKSPACE_MCP_BASE_URI
# Use external URL if set (for reverse proxy scenarios)

View File

@@ -195,7 +195,7 @@ async def _read_comments_impl(service, app_name: str, file_id: str) -> str:
service.comments()
.list(
fileId=file_id,
fields="comments(id,content,author,createdTime,modifiedTime,resolved,replies(content,author,id,createdTime,modifiedTime))",
fields="comments(id,content,author,createdTime,modifiedTime,resolved,quotedFileContent,replies(content,author,id,createdTime,modifiedTime))",
)
.execute
)
@@ -215,9 +215,13 @@ async def _read_comments_impl(service, app_name: str, file_id: str) -> str:
comment_id = comment.get("id", "")
status = " [RESOLVED]" if resolved else ""
quoted_text = comment.get("quotedFileContent", {}).get("value", "")
output.append(f"Comment ID: {comment_id}")
output.append(f"Author: {author}")
output.append(f"Created: {created}{status}")
if quoted_text:
output.append(f"Quoted text: {quoted_text}")
output.append(f"Content: {content}")
# Add replies if any

View File

@@ -27,6 +27,7 @@ drive:
- get_drive_file_content
- get_drive_file_download_url
- create_drive_file
- create_drive_folder
- import_to_google_doc
- share_drive_file
- get_drive_shareable_link
@@ -66,6 +67,7 @@ docs:
- list_docs_in_folder
- insert_doc_elements
- update_paragraph_style
- get_doc_as_markdown
complete:
- insert_doc_image
- update_doc_headers_footers
@@ -99,8 +101,10 @@ chat:
- send_message
- get_messages
- search_messages
- create_reaction
extended:
- list_spaces
- download_chat_attachment
complete: []
forms:

View File

@@ -7,6 +7,7 @@ import ssl
import asyncio
import functools
from pathlib import Path
from typing import List, Optional
from googleapiclient.errors import HttpError
@@ -29,6 +30,135 @@ class UserInputError(Exception):
pass
# Directories from which local file reads are allowed.
# The user's home directory is the default safe base.
# Override via ALLOWED_FILE_DIRS env var (os.pathsep-separated paths).
_ALLOWED_FILE_DIRS_ENV = "ALLOWED_FILE_DIRS"
def _get_allowed_file_dirs() -> list[Path]:
"""Return the list of directories from which local file access is permitted."""
env_val = os.environ.get(_ALLOWED_FILE_DIRS_ENV)
if env_val:
return [
Path(p).expanduser().resolve()
for p in env_val.split(os.pathsep)
if p.strip()
]
home = Path.home()
return [home] if home else []
def validate_file_path(file_path: str) -> Path:
"""
Validate that a file path is safe to read from the server filesystem.
Resolves the path canonically (following symlinks), then verifies it falls
within one of the allowed base directories. Rejects paths to sensitive
system locations regardless of allowlist.
Args:
file_path: The raw file path string to validate.
Returns:
Path: The resolved, validated Path object.
Raises:
ValueError: If the path is outside allowed directories or targets
a sensitive location.
"""
resolved = Path(file_path).resolve()
if not resolved.exists():
raise FileNotFoundError(f"Path does not exist: {resolved}")
# Block sensitive file patterns regardless of allowlist
resolved_str = str(resolved)
file_name = resolved.name.lower()
# Block .env files and variants (.env, .env.local, .env.production, etc.)
if file_name == ".env" or file_name.startswith(".env."):
raise ValueError(
f"Access to '{resolved_str}' is not allowed: "
".env files may contain secrets and cannot be read, uploaded, or attached."
)
# Block well-known sensitive system paths (including macOS /private variants)
sensitive_prefixes = (
"/proc",
"/sys",
"/dev",
"/etc/shadow",
"/etc/passwd",
"/private/etc/shadow",
"/private/etc/passwd",
)
for prefix in sensitive_prefixes:
if resolved_str == prefix or resolved_str.startswith(prefix + "/"):
raise ValueError(
f"Access to '{resolved_str}' is not allowed: "
"path is in a restricted system location."
)
# Block sensitive directories that commonly contain credentials/keys
sensitive_dirs = (
".ssh",
".aws",
".kube",
".gnupg",
".config/gcloud",
)
for sensitive_dir in sensitive_dirs:
home = Path.home()
blocked = home / sensitive_dir
if resolved == blocked or str(resolved).startswith(str(blocked) + "/"):
raise ValueError(
f"Access to '{resolved_str}' is not allowed: "
"path is in a directory that commonly contains secrets or credentials."
)
# Block other credential/secret file patterns
sensitive_names = {
".credentials",
".credentials.json",
"credentials.json",
"client_secret.json",
"client_secrets.json",
"service_account.json",
"service-account.json",
".npmrc",
".pypirc",
".netrc",
".git-credentials",
".docker/config.json",
}
if file_name in sensitive_names:
raise ValueError(
f"Access to '{resolved_str}' is not allowed: "
"this file commonly contains secrets or credentials."
)
allowed_dirs = _get_allowed_file_dirs()
if not allowed_dirs:
raise ValueError(
"No allowed file directories configured. "
"Set the ALLOWED_FILE_DIRS environment variable or ensure a home directory exists."
)
for allowed in allowed_dirs:
try:
resolved.relative_to(allowed)
return resolved
except ValueError:
continue
raise ValueError(
f"Access to '{resolved_str}' is not allowed: "
f"path is outside permitted directories ({', '.join(str(d) for d in allowed_dirs)}). "
"Set ALLOWED_FILE_DIRS to adjust."
)
def check_credentials_directory_permissions(credentials_dir: str = None) -> None:
"""
Check if the service has appropriate permissions to create and write to the .credentials directory.