Merge pull request #531 from taylorwilsdon/tool_consolidation

enh: Tool Consolidation
This commit is contained in:
Taylor Wilsdon
2026-03-01 17:09:02 -05:00
committed by GitHub
15 changed files with 1693 additions and 2173 deletions

View File

@@ -846,9 +846,7 @@ cp .env.oauth21 .env
|------|------|-------------|
| `list_calendars` | **Core** | List accessible calendars |
| `get_events` | **Core** | Retrieve events with time range filtering |
| `create_event` | **Core** | Create events with attachments & reminders |
| `modify_event` | **Core** | Update existing events |
| `delete_event` | Extended | Remove events |
| `manage_event` | **Core** | Create, update, or delete calendar events |
</td>
<td width="50%" valign="top">
@@ -863,15 +861,11 @@ cp .env.oauth21 .env
| `create_drive_file` | **Core** | Create files or fetch from URLs |
| `create_drive_folder` | **Core** | Create empty folders in Drive or shared drives |
| `import_to_google_doc` | **Core** | Import files (MD, DOCX, HTML, etc.) as Google Docs |
| `share_drive_file` | **Core** | Share file with users/groups/domains/anyone |
| `get_drive_shareable_link` | **Core** | Get shareable links for a file |
| `list_drive_items` | Extended | List folder contents |
| `copy_drive_file` | Extended | Copy existing files (templates) with optional renaming |
| `update_drive_file` | Extended | Update file metadata, move between folders |
| `batch_share_drive_file` | Extended | Share file with multiple recipients |
| `update_drive_permission` | Extended | Modify permission role |
| `remove_drive_permission` | Extended | Revoke file access |
| `transfer_drive_ownership` | Extended | Transfer file ownership to another user |
| `manage_drive_access` | Extended | Grant, update, revoke permissions, and transfer ownership |
| `set_drive_file_permissions` | Extended | Set link sharing and file-level sharing settings |
| `get_drive_file_permissions` | Complete | Get detailed file permissions |
| `check_drive_file_public_access` | Complete | Check public sharing status |
@@ -894,7 +888,9 @@ cp .env.oauth21 .env
| `get_gmail_thread_content` | Extended | Get full thread content |
| `modify_gmail_message_labels` | Extended | Modify message labels |
| `list_gmail_labels` | Extended | List available labels |
| `list_gmail_filters` | Extended | List Gmail filters |
| `manage_gmail_label` | Extended | Create/update/delete labels |
| `manage_gmail_filter` | Extended | Create or delete Gmail filters |
| `draft_gmail_message` | Extended | Create drafts |
| `get_gmail_threads_content_batch` | Complete | Batch retrieve thread content |
| `batch_modify_gmail_message_labels` | Complete | Batch modify labels |
@@ -965,7 +961,8 @@ Saved files expire after 1 hour and are cleaned up automatically.
| `export_doc_to_pdf` | Extended | Export document to PDF |
| `create_table_with_data` | Complete | Create data tables |
| `debug_table_structure` | Complete | Debug table issues |
| `*_document_comments` | Complete | Read, Reply, Create, Resolve |
| `list_document_comments` | Complete | List all document comments |
| `manage_document_comment` | Complete | Create, reply to, or resolve comments |
</td>
</tr>
@@ -984,7 +981,9 @@ Saved files expire after 1 hour and are cleaned up automatically.
| `get_spreadsheet_info` | Extended | Get spreadsheet metadata |
| `format_sheet_range` | Extended | Apply colors, number formats, text wrapping, alignment, bold/italic, font size |
| `create_sheet` | Complete | Add sheets to existing files |
| `*_sheet_comment` | Complete | Read/create/reply/resolve comments |
| `list_spreadsheet_comments` | Complete | List all spreadsheet comments |
| `manage_spreadsheet_comment` | Complete | Create, reply to, or resolve comments |
| `manage_conditional_formatting` | Complete | Add, update, or delete conditional formatting rules |
</td>
<td width="50%" valign="top">
@@ -998,7 +997,8 @@ Saved files expire after 1 hour and are cleaned up automatically.
| `batch_update_presentation` | Extended | Apply multiple updates |
| `get_page` | Extended | Get specific slide information |
| `get_page_thumbnail` | Extended | Generate slide thumbnails |
| `*_presentation_comment` | Complete | Read/create/reply/resolve comments |
| `list_presentation_comments` | Complete | List all presentation comments |
| `manage_presentation_comment` | Complete | Create, reply to, or resolve comments |
</td>
</tr>
@@ -1025,12 +1025,10 @@ Saved files expire after 1 hour and are cleaned up automatically.
|------|------|-------------|
| `list_tasks` | **Core** | List tasks with filtering |
| `get_task` | **Core** | Retrieve task details |
| `create_task` | **Core** | Create tasks with hierarchy |
| `update_task` | **Core** | Modify task properties |
| `delete_task` | Extended | Remove tasks |
| `move_task` | Complete | Reposition tasks |
| `clear_completed_tasks` | Complete | Hide completed tasks |
| `*_task_list` | Complete | List/get/create/update/delete task lists |
| `manage_task` | **Core** | Create, update, delete, or move tasks |
| `list_task_lists` | Complete | List task lists |
| `get_task_list` | Complete | Get task list details |
| `manage_task_list` | Complete | Create, update, delete task lists, or clear completed tasks |
</td>
</tr>
@@ -1044,14 +1042,11 @@ Saved files expire after 1 hour and are cleaned up automatically.
| `search_contacts` | **Core** | Search contacts by name, email, phone |
| `get_contact` | **Core** | Retrieve detailed contact info |
| `list_contacts` | **Core** | List contacts with pagination |
| `create_contact` | **Core** | Create new contacts |
| `update_contact` | Extended | Update existing contacts |
| `delete_contact` | Extended | Delete contacts |
| `manage_contact` | **Core** | Create, update, or delete contacts |
| `list_contact_groups` | Extended | List contact groups/labels |
| `get_contact_group` | Extended | Get group details with members |
| `batch_*_contacts` | Complete | Batch create/update/delete contacts |
| `*_contact_group` | Complete | Create/update/delete contact groups |
| `modify_contact_group_members` | Complete | Add/remove contacts from groups |
| `manage_contacts_batch` | Complete | Batch create, update, or delete contacts |
| `manage_contact_group` | Complete | Create, update, delete groups, or modify membership |
</td>
</tr>
@@ -1076,9 +1071,8 @@ Saved files expire after 1 hour and are cleaned up automatically.
| Tool | Tier | Description |
|------|------|-------------|
| `search_custom` | **Core** | Perform web searches |
| `search_custom` | **Core** | Perform web searches (supports site restrictions via sites parameter) |
| `get_search_engine_info` | Complete | Retrieve search engine metadata |
| `search_custom_siterestrict` | Extended | Search within specific domains |
</td>
</tr>
@@ -1095,10 +1089,8 @@ Saved files expire after 1 hour and are cleaned up automatically.
| `create_script_project` | **Core** | Create new standalone or bound project |
| `update_script_content` | **Core** | Update or create script files |
| `run_script_function` | **Core** | Execute function with parameters |
| `create_deployment` | Extended | Create new script deployment |
| `list_deployments` | Extended | List all project deployments |
| `update_deployment` | Extended | Update deployment configuration |
| `delete_deployment` | Extended | Remove deployment |
| `manage_deployment` | Extended | Create, update, or delete script deployments |
| `list_script_processes` | Extended | View recent executions and status |
</td>

View File

@@ -47,7 +47,7 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
## 🛠 Tools Reference
### Gmail (11 tools)
### Gmail (10 tools)
| Tool | Tier | Description |
|------|------|-------------|
@@ -60,39 +60,42 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `list_gmail_labels` | Extended | List all system and user labels |
| `manage_gmail_label` | Extended | Create, update, delete labels |
| `modify_gmail_message_labels` | Extended | Add/remove labels (archive, trash, etc.) |
| `manage_gmail_filter` | Extended | Create or delete Gmail filters |
| `get_gmail_threads_content_batch` | Complete | Batch retrieve threads |
| `batch_modify_gmail_message_labels` | Complete | Bulk label operations |
**Also includes:** `get_gmail_attachment_content`, `list_gmail_filters`, `create_gmail_filter`, `delete_gmail_filter`
**Also includes:** `get_gmail_attachment_content`, `list_gmail_filters`
### Google Drive (7 tools)
### Google Drive (10 tools)
| Tool | Tier | Description |
|------|------|-------------|
| `search_drive_files` | Core | Search files with Drive query syntax or free text |
| `get_drive_file_content` | Core | Read content from Docs, Sheets, Office files (.docx, .xlsx, .pptx) |
| `get_drive_file_download_url` | Core | Download Drive files to local disk |
| `create_drive_file` | Core | Create files from content or URL (supports file://, http://, https://) |
| `create_drive_folder` | Core | Create empty folders in Drive or shared drives |
| `import_to_google_doc` | Core | Import files (MD, DOCX, HTML, etc.) as Google Docs |
| `get_drive_shareable_link` | Core | Get shareable links for a file |
| `list_drive_items` | Extended | List folder contents with shared drive support |
| `copy_drive_file` | Extended | Copy existing files (templates) with optional renaming |
| `update_drive_file` | Extended | Update metadata, move between folders, star, trash |
| `get_drive_file_permissions` | Complete | Check sharing status and permissions |
| `manage_drive_access` | Extended | Grant, update, revoke permissions, and transfer ownership |
| `set_drive_file_permissions` | Extended | Set link sharing and file-level sharing settings |
| `get_drive_file_permissions` | Complete | Get detailed file permissions |
| `check_drive_file_public_access` | Complete | Verify public link sharing for Docs image insertion |
**Also includes:** `get_drive_file_download_url` for generating download URLs
### Google Calendar (5 tools)
### Google Calendar (3 tools)
| Tool | Tier | Description |
|------|------|-------------|
| `list_calendars` | Core | List all accessible calendars |
| `get_events` | Core | Query events by time range, search, or specific ID |
| `create_event` | Core | Create events with attendees, reminders, Google Meet, attachments |
| `modify_event` | Core | Update any event property including conferencing |
| `delete_event` | Extended | Remove events |
| `manage_event` | Core | Create, update, or delete calendar events |
**Event features:** Timezone support, transparency (busy/free), visibility settings, up to 5 custom reminders
**Event features:** Timezone support, transparency (busy/free), visibility settings, up to 5 custom reminders, Google Meet integration, attendees, attachments
### Google Docs (16 tools)
### Google Docs (14 tools)
| Tool | Tier | Description |
|------|------|-------------|
@@ -103,6 +106,8 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `find_and_replace_doc` | Extended | Global find/replace with case matching |
| `list_docs_in_folder` | Extended | List Docs in a specific folder |
| `insert_doc_elements` | Extended | Add tables, lists, page breaks |
| `update_paragraph_style` | Extended | Apply heading styles, lists (bulleted/numbered with nesting), and paragraph formatting |
| `get_doc_as_markdown` | Extended | Export document as formatted Markdown with optional comments |
| `export_doc_to_pdf` | Extended | Export to PDF and save to Drive |
| `insert_doc_image` | Complete | Insert images from Drive or URLs |
| `update_doc_headers_footers` | Complete | Modify headers/footers |
@@ -110,10 +115,10 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `inspect_doc_structure` | Complete | Analyze document structure for safe insertion points |
| `create_table_with_data` | Complete | Create and populate tables in one operation |
| `debug_table_structure` | Complete | Debug table cell positions and content |
| `list_document_comments` | Complete | List all document comments |
| `manage_document_comment` | Complete | Create, reply to, or resolve comments |
**Comments:** `read_document_comments`, `create_document_comment`, `reply_to_document_comment`, `resolve_document_comment`
### Google Sheets (13 tools)
### Google Sheets (9 tools)
| Tool | Tier | Description |
|------|------|-------------|
@@ -124,13 +129,11 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `get_spreadsheet_info` | Extended | Get metadata, sheets, conditional formats |
| `format_sheet_range` | Extended | Apply colors, number formats, text wrapping, alignment, bold/italic, font size |
| `create_sheet` | Complete | Add sheets to existing spreadsheets |
| `add_conditional_formatting` | Complete | Add boolean or gradient rules |
| `update_conditional_formatting` | Complete | Modify existing rules |
| `delete_conditional_formatting` | Complete | Remove formatting rules |
| `list_spreadsheet_comments` | Complete | List all spreadsheet comments |
| `manage_spreadsheet_comment` | Complete | Create, reply to, or resolve comments |
| `manage_conditional_formatting` | Complete | Add, update, or delete conditional formatting rules |
**Comments:** `read_spreadsheet_comments`, `create_spreadsheet_comment`, `reply_to_spreadsheet_comment`, `resolve_spreadsheet_comment`
### Google Slides (9 tools)
### Google Slides (7 tools)
| Tool | Tier | Description |
|------|------|-------------|
@@ -139,8 +142,8 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `batch_update_presentation` | Extended | Apply multiple updates (create slides, shapes, etc.) |
| `get_page` | Extended | Get specific slide details and elements |
| `get_page_thumbnail` | Extended | Generate PNG thumbnails |
**Comments:** `read_presentation_comments`, `create_presentation_comment`, `reply_to_presentation_comment`, `resolve_presentation_comment`
| `list_presentation_comments` | Complete | List all presentation comments |
| `manage_presentation_comment` | Complete | Create, reply to, or resolve comments |
### Google Forms (6 tools)
@@ -153,24 +156,18 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `get_form_response` | Complete | Get individual response details |
| `batch_update_form` | Complete | Execute batch updates to forms (questions, items, settings) |
### Google Tasks (12 tools)
### Google Tasks (5 tools)
| Tool | Tier | Description |
|------|------|-------------|
| `list_tasks` | Core | List tasks with filtering, subtask hierarchy preserved |
| `get_task` | Core | Get task details |
| `create_task` | Core | Create tasks with notes, due dates, parent/sibling positioning |
| `update_task` | Core | Update task properties |
| `delete_task` | Extended | Remove tasks |
| `manage_task` | Core | Create, update, delete, or move tasks |
| `list_task_lists` | Complete | List all task lists |
| `get_task_list` | Complete | Get task list details |
| `create_task_list` | Complete | Create new task lists |
| `update_task_list` | Complete | Rename task lists |
| `delete_task_list` | Complete | Delete task lists (and all tasks) |
| `move_task` | Complete | Reposition or move between lists |
| `clear_completed_tasks` | Complete | Hide completed tasks |
| `manage_task_list` | Complete | Create, update, delete task lists, or clear completed tasks |
### Google Apps Script (11 tools)
### Google Apps Script (9 tools)
| Tool | Tier | Description |
|------|------|-------------|
@@ -180,16 +177,27 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `create_script_project` | Core | Create new standalone or bound project |
| `update_script_content` | Core | Update or create script files |
| `run_script_function` | Core | Execute function with parameters |
| `create_deployment` | Extended | Create new script deployment |
| `list_deployments` | Extended | List all project deployments |
| `update_deployment` | Extended | Update deployment configuration |
| `delete_deployment` | Extended | Remove deployment |
| `manage_deployment` | Extended | Create, update, or delete script deployments |
| `list_script_processes` | Extended | View recent executions and status |
**Enables:** Cross-app automation, persistent workflows, custom business logic execution, script development and debugging
**Note:** Trigger management is not currently supported via MCP tools.
### Google Contacts (7 tools)
| Tool | Tier | Description |
|------|------|-------------|
| `search_contacts` | Core | Search contacts by name, email, phone |
| `get_contact` | Core | Retrieve detailed contact info |
| `list_contacts` | Core | List contacts with pagination |
| `manage_contact` | Core | Create, update, or delete contacts |
| `list_contact_groups` | Extended | List contact groups/labels |
| `get_contact_group` | Extended | Get group details with members |
| `manage_contacts_batch` | Complete | Batch create, update, or delete contacts |
| `manage_contact_group` | Complete | Create, update, delete groups, or modify membership |
### Google Chat (4 tools)
| Tool | Tier | Description |
@@ -199,12 +207,11 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only
| `search_messages` | Core | Search across chat history |
| `list_spaces` | Extended | List rooms and DMs |
### Google Custom Search (3 tools)
### Google Custom Search (2 tools)
| Tool | Tier | Description |
|------|------|-------------|
| `search_custom` | Core | Web search with filters (date, file type, language, safe search) |
| `search_custom_siterestrict` | Extended | Search within specific domains |
| `search_custom` | Core | Web search with filters (date, file type, language, safe search, site restrictions via sites parameter) |
| `get_search_engine_info` | Complete | Get search engine metadata |
**Requires:** `GOOGLE_PSE_API_KEY` and `GOOGLE_PSE_ENGINE_ID` environment variables
@@ -219,7 +226,7 @@ Choose a tier based on your needs:
|------|-------|----------|
| **Core** | ~30 | Essential operations: search, read, create, send |
| **Extended** | ~50 | Core + management: labels, folders, batch ops |
| **Complete** | ~80 | Full API: comments, headers, admin functions |
| **Complete** | 111 | Full API: comments, headers, admin functions |
```bash
uvx workspace-mcp --tool-tier core # Start minimal

View File

@@ -7,7 +7,7 @@ All Google Workspace apps (Docs, Sheets, Slides) use the Drive API for comment o
import logging
import asyncio
from typing import Optional
from auth.service_decorator import require_google_service
from core.server import server
@@ -16,6 +16,38 @@ from core.utils import handle_http_errors
logger = logging.getLogger(__name__)
async def _manage_comment_dispatch(
service,
app_name: str,
file_id: str,
action: str,
comment_content: Optional[str] = None,
comment_id: Optional[str] = None,
) -> str:
"""Route comment management actions to the appropriate implementation."""
action_lower = action.lower().strip()
if action_lower == "create":
if not comment_content:
raise ValueError("comment_content is required for create action")
return await _create_comment_impl(service, app_name, file_id, comment_content)
elif action_lower == "reply":
if not comment_id or not comment_content:
raise ValueError(
"comment_id and comment_content are required for reply action"
)
return await _reply_to_comment_impl(
service, app_name, file_id, comment_id, comment_content
)
elif action_lower == "resolve":
if not comment_id:
raise ValueError("comment_id is required for resolve action")
return await _resolve_comment_impl(service, app_name, file_id, comment_id)
else:
raise ValueError(
f"Invalid action '{action_lower}'. Must be 'create', 'reply', or 'resolve'."
)
def create_comment_tools(app_name: str, file_id_param: str):
"""
Factory function to create comment management tools for a specific Google Workspace app.
@@ -25,165 +57,114 @@ def create_comment_tools(app_name: str, file_id_param: str):
file_id_param: Parameter name for the file ID (e.g., "document_id", "spreadsheet_id", "presentation_id")
Returns:
Dict containing the four comment management functions with unique names
Dict containing the comment management functions with unique names
"""
# Create unique function names based on the app type
read_func_name = f"read_{app_name}_comments"
create_func_name = f"create_{app_name}_comment"
reply_func_name = f"reply_to_{app_name}_comment"
resolve_func_name = f"resolve_{app_name}_comment"
# --- Consolidated tools ---
list_func_name = f"list_{app_name}_comments"
manage_func_name = f"manage_{app_name}_comment"
# Create functions without decorators first, then apply decorators with proper names
if file_id_param == "document_id":
@require_google_service("drive", "drive_read")
@handle_http_errors(read_func_name, service_type="drive")
async def read_comments(
@handle_http_errors(list_func_name, service_type="drive")
async def list_comments(
service, user_google_email: str, document_id: str
) -> str:
"""Read all comments from a Google Document."""
"""List all comments from a Google Document."""
return await _read_comments_impl(service, app_name, document_id)
@require_google_service("drive", "drive_file")
@handle_http_errors(create_func_name, service_type="drive")
async def create_comment(
service, user_google_email: str, document_id: str, comment_content: str
) -> str:
"""Create a new comment on a Google Document."""
return await _create_comment_impl(
service, app_name, document_id, comment_content
)
@require_google_service("drive", "drive_file")
@handle_http_errors(reply_func_name, service_type="drive")
async def reply_to_comment(
@handle_http_errors(manage_func_name, service_type="drive")
async def manage_comment(
service,
user_google_email: str,
document_id: str,
comment_id: str,
reply_content: str,
action: str,
comment_content: Optional[str] = None,
comment_id: Optional[str] = None,
) -> str:
"""Reply to a specific comment in a Google Document."""
return await _reply_to_comment_impl(
service, app_name, document_id, comment_id, reply_content
)
"""Manage comments on a Google Document.
@require_google_service("drive", "drive_file")
@handle_http_errors(resolve_func_name, service_type="drive")
async def resolve_comment(
service, user_google_email: str, document_id: str, comment_id: str
) -> str:
"""Resolve a comment in a Google Document."""
return await _resolve_comment_impl(
service, app_name, document_id, comment_id
Actions:
- create: Create a new comment. Requires comment_content.
- reply: Reply to a comment. Requires comment_id and comment_content.
- resolve: Resolve a comment. Requires comment_id.
"""
return await _manage_comment_dispatch(
service, app_name, document_id, action, comment_content, comment_id
)
elif file_id_param == "spreadsheet_id":
@require_google_service("drive", "drive_read")
@handle_http_errors(read_func_name, service_type="drive")
async def read_comments(
@handle_http_errors(list_func_name, service_type="drive")
async def list_comments(
service, user_google_email: str, spreadsheet_id: str
) -> str:
"""Read all comments from a Google Spreadsheet."""
"""List all comments from a Google Spreadsheet."""
return await _read_comments_impl(service, app_name, spreadsheet_id)
@require_google_service("drive", "drive_file")
@handle_http_errors(create_func_name, service_type="drive")
async def create_comment(
service, user_google_email: str, spreadsheet_id: str, comment_content: str
) -> str:
"""Create a new comment on a Google Spreadsheet."""
return await _create_comment_impl(
service, app_name, spreadsheet_id, comment_content
)
@require_google_service("drive", "drive_file")
@handle_http_errors(reply_func_name, service_type="drive")
async def reply_to_comment(
@handle_http_errors(manage_func_name, service_type="drive")
async def manage_comment(
service,
user_google_email: str,
spreadsheet_id: str,
comment_id: str,
reply_content: str,
action: str,
comment_content: Optional[str] = None,
comment_id: Optional[str] = None,
) -> str:
"""Reply to a specific comment in a Google Spreadsheet."""
return await _reply_to_comment_impl(
service, app_name, spreadsheet_id, comment_id, reply_content
)
"""Manage comments on a Google Spreadsheet.
@require_google_service("drive", "drive_file")
@handle_http_errors(resolve_func_name, service_type="drive")
async def resolve_comment(
service, user_google_email: str, spreadsheet_id: str, comment_id: str
) -> str:
"""Resolve a comment in a Google Spreadsheet."""
return await _resolve_comment_impl(
service, app_name, spreadsheet_id, comment_id
Actions:
- create: Create a new comment. Requires comment_content.
- reply: Reply to a comment. Requires comment_id and comment_content.
- resolve: Resolve a comment. Requires comment_id.
"""
return await _manage_comment_dispatch(
service, app_name, spreadsheet_id, action, comment_content, comment_id
)
elif file_id_param == "presentation_id":
@require_google_service("drive", "drive_read")
@handle_http_errors(read_func_name, service_type="drive")
async def read_comments(
@handle_http_errors(list_func_name, service_type="drive")
async def list_comments(
service, user_google_email: str, presentation_id: str
) -> str:
"""Read all comments from a Google Presentation."""
"""List all comments from a Google Presentation."""
return await _read_comments_impl(service, app_name, presentation_id)
@require_google_service("drive", "drive_file")
@handle_http_errors(create_func_name, service_type="drive")
async def create_comment(
service, user_google_email: str, presentation_id: str, comment_content: str
) -> str:
"""Create a new comment on a Google Presentation."""
return await _create_comment_impl(
service, app_name, presentation_id, comment_content
)
@require_google_service("drive", "drive_file")
@handle_http_errors(reply_func_name, service_type="drive")
async def reply_to_comment(
@handle_http_errors(manage_func_name, service_type="drive")
async def manage_comment(
service,
user_google_email: str,
presentation_id: str,
comment_id: str,
reply_content: str,
action: str,
comment_content: Optional[str] = None,
comment_id: Optional[str] = None,
) -> str:
"""Reply to a specific comment in a Google Presentation."""
return await _reply_to_comment_impl(
service, app_name, presentation_id, comment_id, reply_content
"""Manage comments on a Google Presentation.
Actions:
- create: Create a new comment. Requires comment_content.
- reply: Reply to a comment. Requires comment_id and comment_content.
- resolve: Resolve a comment. Requires comment_id.
"""
return await _manage_comment_dispatch(
service, app_name, presentation_id, action, comment_content, comment_id
)
@require_google_service("drive", "drive_file")
@handle_http_errors(resolve_func_name, service_type="drive")
async def resolve_comment(
service, user_google_email: str, presentation_id: str, comment_id: str
) -> str:
"""Resolve a comment in a Google Presentation."""
return await _resolve_comment_impl(
service, app_name, presentation_id, comment_id
)
# Set the proper function names and register with server
read_comments.__name__ = read_func_name
create_comment.__name__ = create_func_name
reply_to_comment.__name__ = reply_func_name
resolve_comment.__name__ = resolve_func_name
# Register tools with the server using the proper names
server.tool()(read_comments)
server.tool()(create_comment)
server.tool()(reply_to_comment)
server.tool()(resolve_comment)
list_comments.__name__ = list_func_name
manage_comment.__name__ = manage_func_name
server.tool()(list_comments)
server.tool()(manage_comment)
return {
"read_comments": read_comments,
"create_comment": create_comment,
"reply_to_comment": reply_to_comment,
"resolve_comment": resolve_comment,
"list_comments": list_comments,
"manage_comment": manage_comment,
}

View File

@@ -13,8 +13,7 @@ gmail:
- manage_gmail_label
- draft_gmail_message
- list_gmail_filters
- create_gmail_filter
- delete_gmail_filter
- manage_gmail_filter
complete:
- get_gmail_threads_content_batch
@@ -29,16 +28,12 @@ drive:
- create_drive_file
- create_drive_folder
- import_to_google_doc
- share_drive_file
- get_drive_shareable_link
extended:
- list_drive_items
- copy_drive_file
- update_drive_file
- update_drive_permission
- remove_drive_permission
- transfer_drive_ownership
- batch_share_drive_file
- manage_drive_access
- set_drive_file_permissions
complete:
- get_drive_file_permissions
@@ -48,10 +43,8 @@ calendar:
core:
- list_calendars
- get_events
- create_event
- modify_event
- manage_event
extended:
- delete_event
- query_freebusy
complete: []
@@ -75,10 +68,8 @@ docs:
- inspect_doc_structure
- create_table_with_data
- debug_table_structure
- read_document_comments
- create_document_comment
- reply_to_document_comment
- resolve_document_comment
- list_document_comments
- manage_document_comment
sheets:
core:
@@ -91,10 +82,9 @@ sheets:
- format_sheet_range
complete:
- create_sheet
- read_spreadsheet_comments
- create_spreadsheet_comment
- reply_to_spreadsheet_comment
- resolve_spreadsheet_comment
- list_spreadsheet_comments
- manage_spreadsheet_comment
- manage_conditional_formatting
chat:
core:
@@ -127,53 +117,37 @@ slides:
- get_page
- get_page_thumbnail
complete:
- read_presentation_comments
- create_presentation_comment
- reply_to_presentation_comment
- resolve_presentation_comment
- list_presentation_comments
- manage_presentation_comment
tasks:
core:
- get_task
- list_tasks
- create_task
- update_task
extended:
- delete_task
- manage_task
extended: []
complete:
- list_task_lists
- get_task_list
- create_task_list
- update_task_list
- delete_task_list
- move_task
- clear_completed_tasks
- manage_task_list
contacts:
core:
- search_contacts
- get_contact
- list_contacts
- create_contact
- manage_contact
extended:
- update_contact
- delete_contact
- list_contact_groups
- get_contact_group
complete:
- batch_create_contacts
- batch_update_contacts
- batch_delete_contacts
- create_contact_group
- update_contact_group
- delete_contact_group
- modify_contact_group_members
- manage_contacts_batch
- manage_contact_group
search:
core:
- search_custom
extended:
- search_custom_siterestrict
extended: []
complete:
- get_search_engine_info
@@ -187,10 +161,8 @@ appscript:
- run_script_function
- generate_trigger_code
extended:
- create_deployment
- manage_deployment
- list_deployments
- update_deployment
- delete_deployment
- delete_script_project
- list_versions
- create_version

View File

@@ -464,31 +464,57 @@ async def _create_deployment_impl(
@server.tool()
@handle_http_errors("create_deployment", service_type="script")
@handle_http_errors("manage_deployment", service_type="script")
@require_google_service("script", "script_deployments")
async def create_deployment(
async def manage_deployment(
service: Any,
user_google_email: str,
action: str,
script_id: str,
description: str,
deployment_id: Optional[str] = None,
description: Optional[str] = None,
version_description: Optional[str] = None,
) -> str:
"""
Creates a new deployment of the script.
Manages Apps Script deployments. Supports creating, updating, and deleting deployments.
Args:
service: Injected Google API service client
user_google_email: User's email address
action: Action to perform - "create", "update", or "delete"
script_id: The script project ID
description: Deployment description
version_description: Optional version description
deployment_id: The deployment ID (required for update and delete)
description: Deployment description (required for create and update)
version_description: Optional version description (for create only)
Returns:
str: Formatted string with deployment details
str: Formatted string with deployment details or confirmation
"""
action = action.lower().strip()
if action == "create":
if description is None or description.strip() == "":
raise ValueError("description is required for create action")
return await _create_deployment_impl(
service, user_google_email, script_id, description, version_description
)
elif action == "update":
if not deployment_id:
raise ValueError("deployment_id is required for update action")
if description is None or description.strip() == "":
raise ValueError("description is required for update action")
return await _update_deployment_impl(
service, user_google_email, script_id, deployment_id, description
)
elif action == "delete":
if not deployment_id:
raise ValueError("deployment_id is required for delete action")
return await _delete_deployment_impl(
service, user_google_email, script_id, deployment_id
)
else:
raise ValueError(
f"Invalid action '{action}'. Must be 'create', 'update', or 'delete'."
)
async def _list_deployments_impl(
@@ -578,34 +604,6 @@ async def _update_deployment_impl(
return "\n".join(output)
@server.tool()
@handle_http_errors("update_deployment", service_type="script")
@require_google_service("script", "script_deployments")
async def update_deployment(
service: Any,
user_google_email: str,
script_id: str,
deployment_id: str,
description: Optional[str] = None,
) -> str:
"""
Updates an existing deployment configuration.
Args:
service: Injected Google API service client
user_google_email: User's email address
script_id: The script project ID
deployment_id: The deployment ID to update
description: Optional new description
Returns:
str: Formatted string confirming update
"""
return await _update_deployment_impl(
service, user_google_email, script_id, deployment_id, description
)
async def _delete_deployment_impl(
service: Any,
user_google_email: str,
@@ -630,32 +628,6 @@ async def _delete_deployment_impl(
return output
@server.tool()
@handle_http_errors("delete_deployment", service_type="script")
@require_google_service("script", "script_deployments")
async def delete_deployment(
service: Any,
user_google_email: str,
script_id: str,
deployment_id: str,
) -> str:
"""
Deletes a deployment.
Args:
service: Injected Google API service client
user_google_email: User's email address
script_id: The script project ID
deployment_id: The deployment ID to delete
Returns:
str: Confirmation message
"""
return await _delete_deployment_impl(
service, user_google_email, script_id, deployment_id
)
async def _list_script_processes_impl(
service: Any,
user_google_email: str,

View File

@@ -534,10 +534,14 @@ async def get_events(
return text_output
@server.tool()
@handle_http_errors("create_event", service_type="calendar")
@require_google_service("calendar", "calendar_events")
async def create_event(
# ---------------------------------------------------------------------------
# Internal implementation functions for event create/modify/delete.
# These are called by both the consolidated ``manage_event`` tool and the
# legacy single-action tools.
# ---------------------------------------------------------------------------
async def _create_event_impl(
service,
user_google_email: str,
summary: str,
@@ -558,32 +562,7 @@ async def create_event(
guests_can_invite_others: Optional[bool] = None,
guests_can_see_other_guests: Optional[bool] = None,
) -> str:
"""
Creates a new event.
Args:
user_google_email (str): The user's Google email address. Required.
summary (str): Event title.
start_time (str): Start time (RFC3339, e.g., "2023-10-27T10:00:00-07:00" or "2023-10-27" for all-day).
end_time (str): End time (RFC3339, e.g., "2023-10-27T11:00:00-07:00" or "2023-10-28" for all-day).
calendar_id (str): Calendar ID (default: 'primary').
description (Optional[str]): Event description.
location (Optional[str]): Event location.
attendees (Optional[List[str]]): Attendee email addresses.
timezone (Optional[str]): Timezone (e.g., "America/New_York").
attachments (Optional[List[str]]): List of Google Drive file URLs or IDs to attach to the event.
add_google_meet (bool): Whether to add a Google Meet video conference to the event. Defaults to False.
reminders (Optional[Union[str, List[Dict[str, Any]]]]): JSON string or list of reminder objects. Each should have 'method' ("popup" or "email") and 'minutes' (0-40320). Max 5 reminders. Example: '[{"method": "popup", "minutes": 15}]' or [{"method": "popup", "minutes": 15}]
use_default_reminders (bool): Whether to use calendar's default reminders. If False, uses custom reminders. Defaults to True.
transparency (Optional[str]): Event transparency for busy/free status. "opaque" shows as Busy (default), "transparent" shows as Available/Free. Defaults to None (uses Google Calendar default).
visibility (Optional[str]): Event visibility. "default" uses calendar default, "public" is visible to all, "private" is visible only to attendees, "confidential" is same as private (legacy). Defaults to None (uses Google Calendar default).
guests_can_modify (Optional[bool]): Whether attendees other than the organizer can modify the event. Defaults to None (uses Google Calendar default of False).
guests_can_invite_others (Optional[bool]): Whether attendees other than the organizer can invite others to the event. Defaults to None (uses Google Calendar default of True).
guests_can_see_other_guests (Optional[bool]): Whether attendees other than the organizer can see who the event's attendees are. Defaults to None (uses Google Calendar default of True).
Returns:
str: Confirmation message of the successful event creation with event link.
"""
"""Internal implementation for creating a calendar event."""
logger.info(
f"[create_event] Invoked. Email: '{user_google_email}', Summary: {summary}"
)
@@ -809,10 +788,7 @@ def _normalize_attendees(
return normalized if normalized else None
@server.tool()
@handle_http_errors("modify_event", service_type="calendar")
@require_google_service("calendar", "calendar_events")
async def modify_event(
async def _modify_event_impl(
service,
user_google_email: str,
event_id: str,
@@ -834,33 +810,7 @@ async def modify_event(
guests_can_invite_others: Optional[bool] = None,
guests_can_see_other_guests: Optional[bool] = None,
) -> str:
"""
Modifies an existing event.
Args:
user_google_email (str): The user's Google email address. Required.
event_id (str): The ID of the event to modify.
calendar_id (str): Calendar ID (default: 'primary').
summary (Optional[str]): New event title.
start_time (Optional[str]): New start time (RFC3339, e.g., "2023-10-27T10:00:00-07:00" or "2023-10-27" for all-day).
end_time (Optional[str]): New end time (RFC3339, e.g., "2023-10-27T11:00:00-07:00" or "2023-10-28" for all-day).
description (Optional[str]): New event description.
location (Optional[str]): New event location.
attendees (Optional[Union[List[str], List[Dict[str, Any]]]]): Attendees as email strings or objects with metadata. Supports: ["email@example.com"] or [{"email": "email@example.com", "responseStatus": "accepted", "organizer": true, "optional": true}]. When using objects, existing metadata (responseStatus, organizer, optional) is preserved. New attendees default to responseStatus="needsAction".
timezone (Optional[str]): New timezone (e.g., "America/New_York").
add_google_meet (Optional[bool]): Whether to add or remove Google Meet video conference. If True, adds Google Meet; if False, removes it; if None, leaves unchanged.
reminders (Optional[Union[str, List[Dict[str, Any]]]]): JSON string or list of reminder objects to replace existing reminders. Each should have 'method' ("popup" or "email") and 'minutes' (0-40320). Max 5 reminders. Example: '[{"method": "popup", "minutes": 15}]' or [{"method": "popup", "minutes": 15}]
use_default_reminders (Optional[bool]): Whether to use calendar's default reminders. If specified, overrides current reminder settings.
transparency (Optional[str]): Event transparency for busy/free status. "opaque" shows as Busy, "transparent" shows as Available/Free. If None, preserves existing transparency setting.
visibility (Optional[str]): Event visibility. "default" uses calendar default, "public" is visible to all, "private" is visible only to attendees, "confidential" is same as private (legacy). If None, preserves existing visibility setting.
color_id (Optional[str]): Event color ID (1-11). If None, preserves existing color.
guests_can_modify (Optional[bool]): Whether attendees other than the organizer can modify the event. If None, preserves existing setting.
guests_can_invite_others (Optional[bool]): Whether attendees other than the organizer can invite others to the event. If None, preserves existing setting.
guests_can_see_other_guests (Optional[bool]): Whether attendees other than the organizer can see who the event's attendees are. If None, preserves existing setting.
Returns:
str: Confirmation message of the successful event modification with event link.
"""
"""Internal implementation for modifying a calendar event."""
logger.info(
f"[modify_event] Invoked. Email: '{user_google_email}', Event ID: {event_id}"
)
@@ -1075,23 +1025,13 @@ async def modify_event(
return confirmation_message
@server.tool()
@handle_http_errors("delete_event", service_type="calendar")
@require_google_service("calendar", "calendar_events")
async def delete_event(
service, user_google_email: str, event_id: str, calendar_id: str = "primary"
async def _delete_event_impl(
service,
user_google_email: str,
event_id: str,
calendar_id: str = "primary",
) -> str:
"""
Deletes an existing event.
Args:
user_google_email (str): The user's Google email address. Required.
event_id (str): The ID of the event to delete.
calendar_id (str): Calendar ID (default: 'primary').
Returns:
str: Confirmation message of the successful event deletion.
"""
"""Internal implementation for deleting a calendar event."""
logger.info(
f"[delete_event] Invoked. Email: '{user_google_email}', Event ID: {event_id}"
)
@@ -1133,6 +1073,141 @@ async def delete_event(
return confirmation_message
# ---------------------------------------------------------------------------
# Consolidated event management tool
# ---------------------------------------------------------------------------
@server.tool()
@handle_http_errors("manage_event", service_type="calendar")
@require_google_service("calendar", "calendar_events")
async def manage_event(
service,
user_google_email: str,
action: str,
summary: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
event_id: Optional[str] = None,
calendar_id: str = "primary",
description: Optional[str] = None,
location: Optional[str] = None,
attendees: Optional[Union[List[str], List[Dict[str, Any]]]] = None,
timezone: Optional[str] = None,
attachments: Optional[List[str]] = None,
add_google_meet: Optional[bool] = None,
reminders: Optional[Union[str, List[Dict[str, Any]]]] = None,
use_default_reminders: Optional[bool] = None,
transparency: Optional[str] = None,
visibility: Optional[str] = None,
color_id: Optional[str] = None,
guests_can_modify: Optional[bool] = None,
guests_can_invite_others: Optional[bool] = None,
guests_can_see_other_guests: Optional[bool] = None,
) -> str:
"""
Manages calendar events. Supports creating, updating, and deleting events.
Args:
user_google_email (str): The user's Google email address. Required.
action (str): Action to perform - "create", "update", or "delete".
summary (Optional[str]): Event title (required for create).
start_time (Optional[str]): Start time in RFC3339 format (required for create).
end_time (Optional[str]): End time in RFC3339 format (required for create).
event_id (Optional[str]): Event ID (required for update and delete).
calendar_id (str): Calendar ID (default: 'primary').
description (Optional[str]): Event description.
location (Optional[str]): Event location.
attendees (Optional[Union[List[str], List[Dict[str, Any]]]]): Attendee email addresses or objects.
timezone (Optional[str]): Timezone (e.g., "America/New_York").
attachments (Optional[List[str]]): List of Google Drive file URLs or IDs to attach.
add_google_meet (Optional[bool]): Whether to add/remove Google Meet.
reminders (Optional[Union[str, List[Dict[str, Any]]]]): Custom reminder objects.
use_default_reminders (Optional[bool]): Whether to use default reminders.
transparency (Optional[str]): "opaque" (busy) or "transparent" (free).
visibility (Optional[str]): "default", "public", "private", or "confidential".
color_id (Optional[str]): Event color ID (1-11, update only).
guests_can_modify (Optional[bool]): Whether attendees can modify.
guests_can_invite_others (Optional[bool]): Whether attendees can invite others.
guests_can_see_other_guests (Optional[bool]): Whether attendees can see other guests.
Returns:
str: Confirmation message with event details.
"""
action_lower = action.lower().strip()
if action_lower == "create":
if not summary or not start_time or not end_time:
raise ValueError(
"summary, start_time, and end_time are required for create action"
)
return await _create_event_impl(
service=service,
user_google_email=user_google_email,
summary=summary,
start_time=start_time,
end_time=end_time,
calendar_id=calendar_id,
description=description,
location=location,
attendees=attendees,
timezone=timezone,
attachments=attachments,
add_google_meet=add_google_meet or False,
reminders=reminders,
use_default_reminders=use_default_reminders
if use_default_reminders is not None
else True,
transparency=transparency,
visibility=visibility,
guests_can_modify=guests_can_modify,
guests_can_invite_others=guests_can_invite_others,
guests_can_see_other_guests=guests_can_see_other_guests,
)
elif action_lower == "update":
if not event_id:
raise ValueError("event_id is required for update action")
return await _modify_event_impl(
service=service,
user_google_email=user_google_email,
event_id=event_id,
calendar_id=calendar_id,
summary=summary,
start_time=start_time,
end_time=end_time,
description=description,
location=location,
attendees=attendees,
timezone=timezone,
add_google_meet=add_google_meet,
reminders=reminders,
use_default_reminders=use_default_reminders,
transparency=transparency,
visibility=visibility,
color_id=color_id,
guests_can_modify=guests_can_modify,
guests_can_invite_others=guests_can_invite_others,
guests_can_see_other_guests=guests_can_see_other_guests,
)
elif action_lower == "delete":
if not event_id:
raise ValueError("event_id is required for delete action")
return await _delete_event_impl(
service=service,
user_google_email=user_google_email,
event_id=event_id,
calendar_id=calendar_id,
)
else:
raise ValueError(
f"Invalid action '{action_lower}'. Must be 'create', 'update', or 'delete'."
)
# ---------------------------------------------------------------------------
# Legacy single-action tools (deprecated -- prefer ``manage_event``)
# ---------------------------------------------------------------------------
@server.tool()
@handle_http_errors("query_freebusy", is_read_only=True, service_type="calendar")
@require_google_service("calendar", "calendar_read")

View File

@@ -13,7 +13,7 @@ from mcp import Resource
from auth.service_decorator import require_google_service
from core.server import server
from core.utils import handle_http_errors
from core.utils import UserInputError, handle_http_errors
logger = logging.getLogger(__name__)
@@ -249,11 +249,14 @@ async def list_contacts(
"""
logger.info(f"[list_contacts] Invoked. Email: '{user_google_email}'")
try:
if page_size < 1:
raise UserInputError("page_size must be >= 1")
page_size = min(page_size, 1000)
params: Dict[str, Any] = {
"resourceName": "people/me",
"personFields": DEFAULT_PERSON_FIELDS,
"pageSize": min(page_size, 1000),
"pageSize": page_size,
}
if page_token:
@@ -272,7 +275,9 @@ async def list_contacts(
if not connections:
return f"No contacts found for {user_google_email}."
response = f"Contacts for {user_google_email} ({len(connections)} of {total_people}):\n\n"
response = (
f"Contacts for {user_google_email} ({len(connections)} of {total_people}):\n\n"
)
for person in connections:
response += _format_contact(person) + "\n\n"
@@ -283,15 +288,6 @@ async def list_contacts(
logger.info(f"Found {len(connections)} contacts for {user_google_email}")
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts_read")
@@ -321,7 +317,6 @@ async def get_contact(
f"[get_contact] Invoked. Email: '{user_google_email}', Contact: {resource_name}"
)
try:
person = await asyncio.to_thread(
service.people()
.get(resourceName=resource_name, personFields=DETAILED_PERSON_FIELDS)
@@ -334,19 +329,6 @@ async def get_contact(
logger.info(f"Retrieved contact {resource_name} for {user_google_email}")
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact not found: {contact_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts_read")
@@ -372,7 +354,10 @@ async def search_contacts(
f"[search_contacts] Invoked. Email: '{user_google_email}', Query: '{query}'"
)
try:
if page_size < 1:
raise UserInputError("page_size must be >= 1")
page_size = min(page_size, 30)
# Warm up the search cache if needed
await _warmup_search_cache(service, user_google_email)
@@ -381,7 +366,7 @@ async def search_contacts(
.searchContacts(
query=query,
readMask=DEFAULT_PERSON_FIELDS,
pageSize=min(page_size, 30),
pageSize=page_size,
)
.execute
)
@@ -402,22 +387,15 @@ async def search_contacts(
)
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("create_contact", service_type="people")
async def create_contact(
@handle_http_errors("manage_contact", service_type="people")
async def manage_contact(
service: Resource,
user_google_email: str,
action: str,
contact_id: Optional[str] = None,
given_name: Optional[str] = None,
family_name: Optional[str] = None,
email: Optional[str] = None,
@@ -427,26 +405,35 @@ async def create_contact(
notes: Optional[str] = None,
) -> str:
"""
Create a new contact.
Create, update, or delete a contact. Consolidated tool replacing create_contact,
update_contact, and delete_contact.
Args:
user_google_email (str): The user's Google email address. Required.
given_name (Optional[str]): First name.
family_name (Optional[str]): Last name.
email (Optional[str]): Email address.
phone (Optional[str]): Phone number.
organization (Optional[str]): Company/organization name.
job_title (Optional[str]): Job title.
notes (Optional[str]): Additional notes.
action (str): The action to perform: "create", "update", or "delete".
contact_id (Optional[str]): The contact ID. Required for "update" and "delete" actions.
given_name (Optional[str]): First name (for create/update).
family_name (Optional[str]): Last name (for create/update).
email (Optional[str]): Email address (for create/update).
phone (Optional[str]): Phone number (for create/update).
organization (Optional[str]): Company/organization name (for create/update).
job_title (Optional[str]): Job title (for create/update).
notes (Optional[str]): Additional notes (for create/update).
Returns:
str: Confirmation with the new contact's details.
str: Result of the action performed.
"""
logger.info(
f"[create_contact] Invoked. Email: '{user_google_email}', Name: '{given_name} {family_name}'"
action = action.lower().strip()
if action not in ("create", "update", "delete"):
raise UserInputError(
f"Invalid action '{action}'. Must be 'create', 'update', or 'delete'."
)
try:
logger.info(
f"[manage_contact] Invoked. Action: '{action}', Email: '{user_google_email}'"
)
if action == "create":
body = _build_person_body(
given_name=given_name,
family_name=family_name,
@@ -458,7 +445,7 @@ async def create_contact(
)
if not body:
raise Exception(
raise UserInputError(
"At least one field (name, email, phone, etc.) must be provided."
)
@@ -471,69 +458,22 @@ async def create_contact(
response = f"Contact Created for {user_google_email}:\n\n"
response += _format_contact(result, detailed=True)
contact_id = result.get("resourceName", "").replace("people/", "")
logger.info(f"Created contact {contact_id} for {user_google_email}")
created_id = result.get("resourceName", "").replace("people/", "")
logger.info(f"Created contact {created_id} for {user_google_email}")
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
# update and delete both require contact_id
if not contact_id:
raise UserInputError(f"contact_id is required for '{action}' action.")
# =============================================================================
# Extended Tier Tools
# =============================================================================
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("update_contact", service_type="people")
async def update_contact(
service: Resource,
user_google_email: str,
contact_id: str,
given_name: Optional[str] = None,
family_name: Optional[str] = None,
email: Optional[str] = None,
phone: Optional[str] = None,
organization: Optional[str] = None,
job_title: Optional[str] = None,
notes: Optional[str] = None,
) -> str:
"""
Update an existing contact. Note: This replaces fields, not merges them.
Args:
user_google_email (str): The user's Google email address. Required.
contact_id (str): The contact ID to update.
given_name (Optional[str]): New first name.
family_name (Optional[str]): New last name.
email (Optional[str]): New email address.
phone (Optional[str]): New phone number.
organization (Optional[str]): New company/organization name.
job_title (Optional[str]): New job title.
notes (Optional[str]): New notes.
Returns:
str: Confirmation with updated contact details.
"""
# Normalize resource name
if not contact_id.startswith("people/"):
resource_name = f"people/{contact_id}"
else:
resource_name = contact_id
logger.info(
f"[update_contact] Invoked. Email: '{user_google_email}', Contact: {resource_name}"
)
try:
# First fetch the contact to get the etag
if action == "update":
# Fetch the contact to get the etag
current = await asyncio.to_thread(
service.people()
.get(resourceName=resource_name, personFields=DETAILED_PERSON_FIELDS)
@@ -544,7 +484,6 @@ async def update_contact(
if not etag:
raise Exception("Unable to get contact etag for update.")
# Build update body
body = _build_person_body(
given_name=given_name,
family_name=family_name,
@@ -556,13 +495,12 @@ async def update_contact(
)
if not body:
raise Exception(
raise UserInputError(
"At least one field (name, email, phone, etc.) must be provided."
)
body["etag"] = etag
# Determine which fields to update
update_person_fields = []
if "names" in body:
update_person_fields.append("names")
@@ -594,70 +532,19 @@ async def update_contact(
logger.info(f"Updated contact {resource_name} for {user_google_email}")
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact not found: {contact_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("delete_contact", service_type="people")
async def delete_contact(
service: Resource,
user_google_email: str,
contact_id: str,
) -> str:
"""
Delete a contact.
Args:
user_google_email (str): The user's Google email address. Required.
contact_id (str): The contact ID to delete.
Returns:
str: Confirmation message.
"""
# Normalize resource name
if not contact_id.startswith("people/"):
resource_name = f"people/{contact_id}"
else:
resource_name = contact_id
logger.info(
f"[delete_contact] Invoked. Email: '{user_google_email}', Contact: {resource_name}"
)
try:
# action == "delete"
await asyncio.to_thread(
service.people().deleteContact(resourceName=resource_name).execute
)
response = f"Contact {contact_id} has been deleted for {user_google_email}."
logger.info(f"Deleted contact {resource_name} for {user_google_email}")
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact not found: {contact_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
# =============================================================================
# Extended Tier Tools
# =============================================================================
@server.tool()
@@ -682,9 +569,12 @@ async def list_contact_groups(
"""
logger.info(f"[list_contact_groups] Invoked. Email: '{user_google_email}'")
try:
if page_size < 1:
raise UserInputError("page_size must be >= 1")
page_size = min(page_size, 1000)
params: Dict[str, Any] = {
"pageSize": min(page_size, 1000),
"pageSize": page_size,
"groupFields": CONTACT_GROUP_FIELDS,
}
@@ -719,15 +609,6 @@ async def list_contact_groups(
logger.info(f"Found {len(groups)} contact groups for {user_google_email}")
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts_read")
@@ -759,12 +640,15 @@ async def get_contact_group(
f"[get_contact_group] Invoked. Email: '{user_google_email}', Group: {resource_name}"
)
try:
if max_members < 1:
raise UserInputError("max_members must be >= 1")
max_members = min(max_members, 1000)
result = await asyncio.to_thread(
service.contactGroups()
.get(
resourceName=resource_name,
maxMembers=min(max_members, 1000),
maxMembers=max_members,
groupFields=CONTACT_GROUP_FIELDS,
)
.execute
@@ -790,19 +674,6 @@ async def get_contact_group(
logger.info(f"Retrieved contact group {resource_name} for {user_google_email}")
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact group not found: {group_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
# =============================================================================
# Complete Tier Tools
@@ -811,40 +682,49 @@ async def get_contact_group(
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("batch_create_contacts", service_type="people")
async def batch_create_contacts(
@handle_http_errors("manage_contacts_batch", service_type="people")
async def manage_contacts_batch(
service: Resource,
user_google_email: str,
contacts: List[Dict[str, str]],
action: str,
contacts: Optional[List[Dict[str, str]]] = None,
updates: Optional[List[Dict[str, str]]] = None,
contact_ids: Optional[List[str]] = None,
) -> str:
"""
Create multiple contacts in a batch operation.
Batch create, update, or delete contacts. Consolidated tool replacing
batch_create_contacts, batch_update_contacts, and batch_delete_contacts.
Args:
user_google_email (str): The user's Google email address. Required.
contacts (List[Dict[str, str]]): List of contact dictionaries with fields:
- given_name: First name
- family_name: Last name
- email: Email address
- phone: Phone number
- organization: Company name
- job_title: Job title
action (str): The action to perform: "create", "update", or "delete".
contacts (Optional[List[Dict[str, str]]]): List of contact dicts for "create" action.
Each dict may contain: given_name, family_name, email, phone, organization, job_title.
updates (Optional[List[Dict[str, str]]]): List of update dicts for "update" action.
Each dict must contain contact_id and may contain: given_name, family_name,
email, phone, organization, job_title.
contact_ids (Optional[List[str]]): List of contact IDs for "delete" action.
Returns:
str: Confirmation with created contacts.
str: Result of the batch action performed.
"""
logger.info(
f"[batch_create_contacts] Invoked. Email: '{user_google_email}', Count: {len(contacts)}"
action = action.lower().strip()
if action not in ("create", "update", "delete"):
raise UserInputError(
f"Invalid action '{action}'. Must be 'create', 'update', or 'delete'."
)
try:
logger.info(
f"[manage_contacts_batch] Invoked. Action: '{action}', Email: '{user_google_email}'"
)
if action == "create":
if not contacts:
raise Exception("At least one contact must be provided.")
raise UserInputError("contacts parameter is required for 'create' action.")
if len(contacts) > 200:
raise Exception("Maximum 200 contacts can be created in a batch.")
raise UserInputError("Maximum 200 contacts can be created in a batch.")
# Build batch request body
contact_bodies = []
for contact in contacts:
body = _build_person_body(
@@ -859,7 +739,7 @@ async def batch_create_contacts(
contact_bodies.append({"contactPerson": body})
if not contact_bodies:
raise Exception("No valid contact data provided.")
raise UserInputError("No valid contact data provided.")
batch_body = {
"contacts": contact_bodies,
@@ -884,63 +764,23 @@ async def batch_create_contacts(
)
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("batch_update_contacts", service_type="people")
async def batch_update_contacts(
service: Resource,
user_google_email: str,
updates: List[Dict[str, str]],
) -> str:
"""
Update multiple contacts in a batch operation.
Args:
user_google_email (str): The user's Google email address. Required.
updates (List[Dict[str, str]]): List of update dictionaries with fields:
- contact_id: The contact ID to update (required)
- given_name: New first name
- family_name: New last name
- email: New email address
- phone: New phone number
- organization: New company name
- job_title: New job title
Returns:
str: Confirmation with updated contacts.
"""
logger.info(
f"[batch_update_contacts] Invoked. Email: '{user_google_email}', Count: {len(updates)}"
)
try:
if action == "update":
if not updates:
raise Exception("At least one update must be provided.")
raise UserInputError("updates parameter is required for 'update' action.")
if len(updates) > 200:
raise Exception("Maximum 200 contacts can be updated in a batch.")
raise UserInputError("Maximum 200 contacts can be updated in a batch.")
# First, fetch all contacts to get their etags
# Fetch all contacts to get their etags
resource_names = []
for update in updates:
contact_id = update.get("contact_id")
if not contact_id:
raise Exception("Each update must include a contact_id.")
if not contact_id.startswith("people/"):
contact_id = f"people/{contact_id}"
resource_names.append(contact_id)
cid = update.get("contact_id")
if not cid:
raise UserInputError("Each update must include a contact_id.")
if not cid.startswith("people/"):
cid = f"people/{cid}"
resource_names.append(cid)
# Batch get contacts for etags
batch_get_result = await asyncio.to_thread(
service.people()
.getBatchGet(
@@ -951,25 +791,24 @@ async def batch_update_contacts(
)
etags = {}
for response in batch_get_result.get("responses", []):
person = response.get("person", {})
resource_name = person.get("resourceName")
for resp in batch_get_result.get("responses", []):
person = resp.get("person", {})
rname = person.get("resourceName")
etag = person.get("etag")
if resource_name and etag:
etags[resource_name] = etag
if rname and etag:
etags[rname] = etag
# Build batch update body
update_bodies = []
update_fields_set: set = set()
for update in updates:
contact_id = update.get("contact_id", "")
if not contact_id.startswith("people/"):
contact_id = f"people/{contact_id}"
cid = update.get("contact_id", "")
if not cid.startswith("people/"):
cid = f"people/{cid}"
etag = etags.get(contact_id)
etag = etags.get(cid)
if not etag:
logger.warning(f"No etag found for {contact_id}, skipping")
logger.warning(f"No etag found for {cid}, skipping")
continue
body = _build_person_body(
@@ -982,11 +821,10 @@ async def batch_update_contacts(
)
if body:
body["resourceName"] = contact_id
body["resourceName"] = cid
body["etag"] = etag
update_bodies.append({"person": body})
# Track which fields are being updated
if "names" in body:
update_fields_set.add("names")
if "emailAddresses" in body:
@@ -997,7 +835,7 @@ async def batch_update_contacts(
update_fields_set.add("organizations")
if not update_bodies:
raise Exception("No valid update data provided.")
raise UserInputError("No valid update data provided.")
batch_body = {
"contacts": update_bodies,
@@ -1014,7 +852,7 @@ async def batch_update_contacts(
response = f"Batch Update Results for {user_google_email}:\n\n"
response += f"Updated {len(update_results)} contacts:\n\n"
for resource_name, update_result in update_results.items():
for rname, update_result in update_results.items():
person = update_result.get("person", {})
response += _format_contact(person) + "\n\n"
@@ -1023,52 +861,19 @@ async def batch_update_contacts(
)
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("batch_delete_contacts", service_type="people")
async def batch_delete_contacts(
service: Resource,
user_google_email: str,
contact_ids: List[str],
) -> str:
"""
Delete multiple contacts in a batch operation.
Args:
user_google_email (str): The user's Google email address. Required.
contact_ids (List[str]): List of contact IDs to delete.
Returns:
str: Confirmation message.
"""
logger.info(
f"[batch_delete_contacts] Invoked. Email: '{user_google_email}', Count: {len(contact_ids)}"
)
try:
# action == "delete"
if not contact_ids:
raise Exception("At least one contact ID must be provided.")
raise UserInputError("contact_ids parameter is required for 'delete' action.")
if len(contact_ids) > 500:
raise Exception("Maximum 500 contacts can be deleted in a batch.")
raise UserInputError("Maximum 500 contacts can be deleted in a batch.")
# Normalize resource names
resource_names = []
for contact_id in contact_ids:
if not contact_id.startswith("people/"):
resource_names.append(f"people/{contact_id}")
for cid in contact_ids:
if not cid.startswith("people/"):
resource_names.append(f"people/{cid}")
else:
resource_names.append(contact_id)
resource_names.append(cid)
batch_body = {"resourceNames": resource_names}
@@ -1077,45 +882,56 @@ async def batch_delete_contacts(
)
response = f"Batch deleted {len(contact_ids)} contacts for {user_google_email}."
logger.info(
f"Batch deleted {len(contact_ids)} contacts for {user_google_email}"
)
logger.info(f"Batch deleted {len(contact_ids)} contacts for {user_google_email}")
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("create_contact_group", service_type="people")
async def create_contact_group(
@handle_http_errors("manage_contact_group", service_type="people")
async def manage_contact_group(
service: Resource,
user_google_email: str,
name: str,
action: str,
group_id: Optional[str] = None,
name: Optional[str] = None,
delete_contacts: bool = False,
add_contact_ids: Optional[List[str]] = None,
remove_contact_ids: Optional[List[str]] = None,
) -> str:
"""
Create a new contact group (label).
Create, update, delete a contact group, or modify its members. Consolidated tool
replacing create_contact_group, update_contact_group, delete_contact_group, and
modify_contact_group_members.
Args:
user_google_email (str): The user's Google email address. Required.
name (str): The name of the new contact group.
action (str): The action to perform: "create", "update", "delete", or "modify_members".
group_id (Optional[str]): The contact group ID. Required for "update", "delete",
and "modify_members" actions.
name (Optional[str]): The group name. Required for "create" and "update" actions.
delete_contacts (bool): If True and action is "delete", also delete contacts in
the group (default: False).
add_contact_ids (Optional[List[str]]): Contact IDs to add (for "modify_members").
remove_contact_ids (Optional[List[str]]): Contact IDs to remove (for "modify_members").
Returns:
str: Confirmation with the new group details.
str: Result of the action performed.
"""
logger.info(
f"[create_contact_group] Invoked. Email: '{user_google_email}', Name: '{name}'"
action = action.lower().strip()
if action not in ("create", "update", "delete", "modify_members"):
raise UserInputError(
f"Invalid action '{action}'. Must be 'create', 'update', 'delete', or 'modify_members'."
)
try:
logger.info(
f"[manage_contact_group] Invoked. Action: '{action}', Email: '{user_google_email}'"
)
if action == "create":
if not name:
raise UserInputError("name is required for 'create' action.")
body = {"contactGroup": {"name": name}}
result = await asyncio.to_thread(
@@ -1123,58 +939,31 @@ async def create_contact_group(
)
resource_name = result.get("resourceName", "")
group_id = resource_name.replace("contactGroups/", "")
created_group_id = resource_name.replace("contactGroups/", "")
created_name = result.get("name", name)
response = f"Contact Group Created for {user_google_email}:\n\n"
response += f"Name: {created_name}\n"
response += f"ID: {group_id}\n"
response += f"ID: {created_group_id}\n"
response += f"Type: {result.get('groupType', 'USER_CONTACT_GROUP')}\n"
logger.info(f"Created contact group '{name}' for {user_google_email}")
return response
except HttpError as error:
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
# All other actions require group_id
if not group_id:
raise UserInputError(f"group_id is required for '{action}' action.")
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("update_contact_group", service_type="people")
async def update_contact_group(
service: Resource,
user_google_email: str,
group_id: str,
name: str,
) -> str:
"""
Update a contact group's name.
Args:
user_google_email (str): The user's Google email address. Required.
group_id (str): The contact group ID to update.
name (str): The new name for the contact group.
Returns:
str: Confirmation with updated group details.
"""
# Normalize resource name
if not group_id.startswith("contactGroups/"):
resource_name = f"contactGroups/{group_id}"
else:
resource_name = group_id
logger.info(
f"[update_contact_group] Invoked. Email: '{user_google_email}', Group: {resource_name}"
)
if action == "update":
if not name:
raise UserInputError("name is required for 'update' action.")
try:
body = {"contactGroup": {"name": name}}
result = await asyncio.to_thread(
@@ -1192,51 +981,7 @@ async def update_contact_group(
logger.info(f"Updated contact group {resource_name} for {user_google_email}")
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact group not found: {group_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("delete_contact_group", service_type="people")
async def delete_contact_group(
service: Resource,
user_google_email: str,
group_id: str,
delete_contacts: bool = False,
) -> str:
"""
Delete a contact group.
Args:
user_google_email (str): The user's Google email address. Required.
group_id (str): The contact group ID to delete.
delete_contacts (bool): If True, also delete contacts in the group (default: False).
Returns:
str: Confirmation message.
"""
# Normalize resource name
if not group_id.startswith("contactGroups/"):
resource_name = f"contactGroups/{group_id}"
else:
resource_name = group_id
logger.info(
f"[delete_contact_group] Invoked. Email: '{user_google_email}', Group: {resource_name}"
)
try:
if action == "delete":
await asyncio.to_thread(
service.contactGroups()
.delete(resourceName=resource_name, deleteContacts=delete_contacts)
@@ -1252,84 +997,36 @@ async def delete_contact_group(
logger.info(f"Deleted contact group {resource_name} for {user_google_email}")
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact group not found: {group_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool()
@require_google_service("people", "contacts")
@handle_http_errors("modify_contact_group_members", service_type="people")
async def modify_contact_group_members(
service: Resource,
user_google_email: str,
group_id: str,
add_contact_ids: Optional[List[str]] = None,
remove_contact_ids: Optional[List[str]] = None,
) -> str:
"""
Add or remove contacts from a contact group.
Args:
user_google_email (str): The user's Google email address. Required.
group_id (str): The contact group ID.
add_contact_ids (Optional[List[str]]): Contact IDs to add to the group.
remove_contact_ids (Optional[List[str]]): Contact IDs to remove from the group.
Returns:
str: Confirmation with results.
"""
# Normalize resource name
if not group_id.startswith("contactGroups/"):
resource_name = f"contactGroups/{group_id}"
else:
resource_name = group_id
logger.info(
f"[modify_contact_group_members] Invoked. Email: '{user_google_email}', Group: {resource_name}"
)
try:
# action == "modify_members"
if not add_contact_ids and not remove_contact_ids:
raise Exception(
raise UserInputError(
"At least one of add_contact_ids or remove_contact_ids must be provided."
)
body: Dict[str, Any] = {}
modify_body: Dict[str, Any] = {}
if add_contact_ids:
# Normalize resource names
add_names = []
for contact_id in add_contact_ids:
if not contact_id.startswith("people/"):
add_names.append(f"people/{contact_id}")
else:
add_names.append(contact_id)
body["resourceNamesToAdd"] = add_names
modify_body["resourceNamesToAdd"] = add_names
if remove_contact_ids:
# Normalize resource names
remove_names = []
for contact_id in remove_contact_ids:
if not contact_id.startswith("people/"):
remove_names.append(f"people/{contact_id}")
else:
remove_names.append(contact_id)
body["resourceNamesToRemove"] = remove_names
modify_body["resourceNamesToRemove"] = remove_names
result = await asyncio.to_thread(
service.contactGroups()
.members()
.modify(resourceName=resource_name, body=body)
.modify(resourceName=resource_name, body=modify_body)
.execute
)
@@ -1353,16 +1050,3 @@ async def modify_contact_group_members(
f"Modified contact group members for {resource_name} for {user_google_email}"
)
return response
except HttpError as error:
if error.resp.status == 404:
message = f"Contact group not found: {group_id}"
logger.warning(message)
raise Exception(message)
message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'."
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)

View File

@@ -1678,7 +1678,5 @@ async def get_doc_as_markdown(
_comment_tools = create_comment_tools("document", "document_id")
# Extract and register the functions
read_doc_comments = _comment_tools["read_comments"]
create_doc_comment = _comment_tools["create_comment"]
reply_to_comment = _comment_tools["reply_to_comment"]
resolve_comment = _comment_tools["resolve_comment"]
list_document_comments = _comment_tools["list_comments"]
manage_document_comment = _comment_tools["manage_comment"]

View File

@@ -1774,61 +1774,101 @@ async def get_drive_shareable_link(
@server.tool()
@handle_http_errors("share_drive_file", is_read_only=False, service_type="drive")
@handle_http_errors("manage_drive_access", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def share_drive_file(
async def manage_drive_access(
service,
user_google_email: str,
file_id: str,
action: str,
share_with: Optional[str] = None,
role: str = "reader",
role: Optional[str] = None,
share_type: str = "user",
permission_id: Optional[str] = None,
recipients: Optional[List[Dict[str, Any]]] = None,
send_notification: bool = True,
email_message: Optional[str] = None,
expiration_time: Optional[str] = None,
allow_file_discovery: Optional[bool] = None,
new_owner_email: Optional[str] = None,
move_to_new_owners_root: bool = False,
) -> str:
"""
Shares a Google Drive file or folder with a user, group, domain, or anyone with the link.
Consolidated tool for managing Google Drive file and folder access permissions.
When sharing a folder, all files inside inherit the permission.
Supports granting, batch-granting, updating, revoking permissions, and
transferring file ownership -- all through a single entry point.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder to share. Required.
share_with (Optional[str]): Email address (for user/group), domain name (for domain), or omit for 'anyone'.
role (str): Permission role - 'reader', 'commenter', or 'writer'. Defaults to 'reader'.
share_type (str): Type of sharing - 'user', 'group', 'domain', or 'anyone'. Defaults to 'user'.
send_notification (bool): Whether to send a notification email. Defaults to True.
email_message (Optional[str]): Custom message for the notification email.
expiration_time (Optional[str]): Expiration time in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). Permission auto-revokes after this time.
allow_file_discovery (Optional[bool]): For 'domain' or 'anyone' shares - whether the file can be found via search. Defaults to None (API default).
file_id (str): The ID of the file or folder. Required.
action (str): The access management action to perform. Required. One of:
- "grant": Share with a single user, group, domain, or anyone.
- "grant_batch": Share with multiple recipients in one call.
- "update": Modify an existing permission (role or expiration).
- "revoke": Remove an existing permission.
- "transfer_owner": Transfer file ownership to another user.
share_with (Optional[str]): Email address (user/group), domain name (domain),
or omit for 'anyone'. Used by "grant".
role (Optional[str]): Permission role -- 'reader', 'commenter', or 'writer'.
Used by "grant" (defaults to 'reader') and "update".
share_type (str): Type of sharing -- 'user', 'group', 'domain', or 'anyone'.
Used by "grant". Defaults to 'user'.
permission_id (Optional[str]): The permission ID to modify or remove.
Required for "update" and "revoke" actions.
recipients (Optional[List[Dict[str, Any]]]): List of recipient objects for
"grant_batch". Each should have: email (str), role (str, optional),
share_type (str, optional), expiration_time (str, optional). For domain
shares use 'domain' field instead of 'email'.
send_notification (bool): Whether to send notification emails. Defaults to True.
Used by "grant" and "grant_batch".
email_message (Optional[str]): Custom notification email message.
Used by "grant" and "grant_batch".
expiration_time (Optional[str]): Expiration in RFC 3339 format
(e.g., "2025-01-15T00:00:00Z"). Used by "grant" and "update".
allow_file_discovery (Optional[bool]): For 'domain'/'anyone' shares, whether
the file appears in search. Used by "grant".
new_owner_email (Optional[str]): Email of the new owner.
Required for "transfer_owner".
move_to_new_owners_root (bool): Move file to the new owner's My Drive root.
Defaults to False. Used by "transfer_owner".
Returns:
str: Confirmation with permission details and shareable link.
str: Confirmation with details of the permission change applied.
"""
logger.info(
f"[share_drive_file] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Share with: '{share_with}', Role: '{role}', Type: '{share_type}'"
valid_actions = ("grant", "grant_batch", "update", "revoke", "transfer_owner")
if action not in valid_actions:
raise ValueError(
f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}"
)
validate_share_role(role)
logger.info(
f"[manage_drive_access] Invoked. Email: '{user_google_email}', "
f"File ID: '{file_id}', Action: '{action}'"
)
# --- grant: share with a single recipient ---
if action == "grant":
effective_role = role or "reader"
validate_share_role(effective_role)
validate_share_type(share_type)
if share_type in ("user", "group") and not share_with:
raise ValueError(f"share_with is required for share_type '{share_type}'")
if share_type == "domain" and not share_with:
raise ValueError("share_with (domain name) is required for share_type 'domain'")
raise ValueError(
"share_with (domain name) is required for share_type 'domain'"
)
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, webViewLink"
)
file_id = resolved_file_id
permission_body = {
permission_body: Dict[str, Any] = {
"type": share_type,
"role": role,
"role": effective_role,
}
if share_type in ("user", "group"):
permission_body["emailAddress"] = share_with
elif share_type == "domain":
@@ -1841,13 +1881,12 @@ async def share_drive_file(
if share_type in ("domain", "anyone") and allow_file_discovery is not None:
permission_body["allowFileDiscovery"] = allow_file_discovery
create_params = {
create_params: Dict[str, Any] = {
"fileId": file_id,
"body": permission_body,
"supportsAllDrives": True,
"fields": "id, type, role, emailAddress, domain, expirationTime",
}
if share_type in ("user", "group"):
create_params["sendNotificationEmail"] = send_notification
if email_message:
@@ -1857,7 +1896,8 @@ async def share_drive_file(
service.permissions().create(**create_params).execute
)
output_parts = [
return "\n".join(
[
f"Successfully shared '{file_metadata.get('name', 'Unknown')}'",
"",
"Permission created:",
@@ -1865,65 +1905,26 @@ async def share_drive_file(
"",
f"View link: {file_metadata.get('webViewLink', 'N/A')}",
]
return "\n".join(output_parts)
@server.tool()
@handle_http_errors("batch_share_drive_file", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def batch_share_drive_file(
service,
user_google_email: str,
file_id: str,
recipients: List[Dict[str, Any]],
send_notification: bool = True,
email_message: Optional[str] = None,
) -> str:
"""
Shares a Google Drive file or folder with multiple users or groups in a single operation.
Each recipient can have a different role and optional expiration time.
Note: Each recipient is processed sequentially. For very large recipient lists,
consider splitting into multiple calls.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder to share. Required.
recipients (List[Dict]): List of recipient objects. Each should have:
- email (str): Recipient email address. Required for 'user' or 'group' share_type.
- role (str): Permission role - 'reader', 'commenter', or 'writer'. Defaults to 'reader'.
- share_type (str, optional): 'user', 'group', or 'domain'. Defaults to 'user'.
- expiration_time (str, optional): Expiration in RFC 3339 format (e.g., "2025-01-15T00:00:00Z").
For domain shares, use 'domain' field instead of 'email':
- domain (str): Domain name. Required when share_type is 'domain'.
send_notification (bool): Whether to send notification emails. Defaults to True.
email_message (Optional[str]): Custom message for notification emails.
Returns:
str: Summary of created permissions with success/failure for each recipient.
"""
logger.info(
f"[batch_share_drive_file] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Recipients: {len(recipients)}"
)
# --- grant_batch: share with multiple recipients ---
if action == "grant_batch":
if not recipients:
raise ValueError("recipients list is required for 'grant_batch' action")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, webViewLink"
)
file_id = resolved_file_id
if not recipients:
raise ValueError("recipients list cannot be empty")
results = []
results: List[str] = []
success_count = 0
failure_count = 0
for recipient in recipients:
share_type = recipient.get("share_type", "user")
r_share_type = recipient.get("share_type", "user")
if share_type == "domain":
if r_share_type == "domain":
domain = recipient.get("domain")
if not domain:
results.append(" - Skipped: missing domain for domain share")
@@ -1931,64 +1932,62 @@ async def batch_share_drive_file(
continue
identifier = domain
else:
email = recipient.get("email")
if not email:
r_email = recipient.get("email")
if not r_email:
results.append(" - Skipped: missing email address")
failure_count += 1
continue
identifier = email
identifier = r_email
role = recipient.get("role", "reader")
r_role = recipient.get("role", "reader")
try:
validate_share_role(role)
validate_share_role(r_role)
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
try:
validate_share_type(share_type)
validate_share_type(r_share_type)
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
permission_body = {
"type": share_type,
"role": role,
r_perm_body: Dict[str, Any] = {
"type": r_share_type,
"role": r_role,
}
if share_type == "domain":
permission_body["domain"] = identifier
if r_share_type == "domain":
r_perm_body["domain"] = identifier
else:
permission_body["emailAddress"] = identifier
r_perm_body["emailAddress"] = identifier
if recipient.get("expiration_time"):
try:
validate_expiration_time(recipient["expiration_time"])
permission_body["expirationTime"] = recipient["expiration_time"]
r_perm_body["expirationTime"] = recipient["expiration_time"]
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
create_params = {
r_create_params: Dict[str, Any] = {
"fileId": file_id,
"body": permission_body,
"body": r_perm_body,
"supportsAllDrives": True,
"fields": "id, type, role, emailAddress, domain, expirationTime",
}
if share_type in ("user", "group"):
create_params["sendNotificationEmail"] = send_notification
if r_share_type in ("user", "group"):
r_create_params["sendNotificationEmail"] = send_notification
if email_message:
create_params["emailMessage"] = email_message
r_create_params["emailMessage"] = email_message
try:
created_permission = await asyncio.to_thread(
service.permissions().create(**create_params).execute
created_perm = await asyncio.to_thread(
service.permissions().create(**r_create_params).execute
)
results.append(f" - {format_permission_info(created_permission)}")
results.append(f" - {format_permission_info(created_perm)}")
success_count += 1
except HttpError as e:
results.append(f" - {identifier}: Failed - {str(e)}")
@@ -2008,40 +2007,16 @@ async def batch_share_drive_file(
f"View link: {file_metadata.get('webViewLink', 'N/A')}",
]
)
return "\n".join(output_parts)
@server.tool()
@handle_http_errors("update_drive_permission", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def update_drive_permission(
service,
user_google_email: str,
file_id: str,
permission_id: str,
role: Optional[str] = None,
expiration_time: Optional[str] = None,
) -> str:
"""
Updates an existing permission on a Google Drive file or folder.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder. Required.
permission_id (str): The ID of the permission to update (from get_drive_file_permissions). Required.
role (Optional[str]): New role - 'reader', 'commenter', or 'writer'. If not provided, role unchanged.
expiration_time (Optional[str]): Expiration time in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). Set or update when permission expires.
Returns:
str: Confirmation with updated permission details.
"""
logger.info(
f"[update_drive_permission] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Permission ID: '{permission_id}', Role: '{role}'"
)
# --- update: modify an existing permission ---
if action == "update":
if not permission_id:
raise ValueError("permission_id is required for 'update' action")
if not role and not expiration_time:
raise ValueError("Must provide at least one of: role, expiration_time")
raise ValueError(
"Must provide at least one of: role, expiration_time for 'update' action"
)
if role:
validate_share_role(role)
@@ -2053,8 +2028,8 @@ async def update_drive_permission(
)
file_id = resolved_file_id
# Google API requires role in update body, so fetch current if not provided
if not role:
effective_role = role
if not effective_role:
current_permission = await asyncio.to_thread(
service.permissions()
.get(
@@ -2065,9 +2040,9 @@ async def update_drive_permission(
)
.execute
)
role = current_permission.get("role")
effective_role = current_permission.get("role")
update_body = {"role": role}
update_body: Dict[str, Any] = {"role": effective_role}
if expiration_time:
update_body["expirationTime"] = expiration_time
@@ -2083,40 +2058,20 @@ async def update_drive_permission(
.execute
)
output_parts = [
return "\n".join(
[
f"Successfully updated permission on '{file_metadata.get('name', 'Unknown')}'",
"",
"Updated permission:",
f" - {format_permission_info(updated_permission)}",
]
return "\n".join(output_parts)
@server.tool()
@handle_http_errors("remove_drive_permission", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def remove_drive_permission(
service,
user_google_email: str,
file_id: str,
permission_id: str,
) -> str:
"""
Removes a permission from a Google Drive file or folder, revoking access.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder. Required.
permission_id (str): The ID of the permission to remove (from get_drive_file_permissions). Required.
Returns:
str: Confirmation of the removed permission.
"""
logger.info(
f"[remove_drive_permission] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Permission ID: '{permission_id}'"
)
# --- revoke: remove an existing permission ---
if action == "revoke":
if not permission_id:
raise ValueError("permission_id is required for 'revoke' action")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name"
)
@@ -2124,15 +2079,63 @@ async def remove_drive_permission(
await asyncio.to_thread(
service.permissions()
.delete(fileId=file_id, permissionId=permission_id, supportsAllDrives=True)
.delete(
fileId=file_id,
permissionId=permission_id,
supportsAllDrives=True,
)
.execute
)
output_parts = [
return "\n".join(
[
f"Successfully removed permission from '{file_metadata.get('name', 'Unknown')}'",
"",
f"Permission ID '{permission_id}' has been revoked.",
]
)
# --- transfer_owner: transfer file ownership ---
# action == "transfer_owner"
if not new_owner_email:
raise ValueError("new_owner_email is required for 'transfer_owner' action")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, owners"
)
file_id = resolved_file_id
current_owners = file_metadata.get("owners", [])
current_owner_emails = [o.get("emailAddress", "") for o in current_owners]
transfer_body: Dict[str, Any] = {
"type": "user",
"role": "owner",
"emailAddress": new_owner_email,
}
await asyncio.to_thread(
service.permissions()
.create(
fileId=file_id,
body=transfer_body,
transferOwnership=True,
moveToNewOwnersRoot=move_to_new_owners_root,
supportsAllDrives=True,
fields="id, type, role, emailAddress",
)
.execute
)
output_parts = [
f"Successfully transferred ownership of '{file_metadata.get('name', 'Unknown')}'",
"",
f"New owner: {new_owner_email}",
f"Previous owner(s): {', '.join(current_owner_emails) or 'Unknown'}",
]
if move_to_new_owners_root:
output_parts.append(f"File moved to {new_owner_email}'s My Drive root.")
output_parts.extend(["", "Note: Previous owner now has editor access."])
return "\n".join(output_parts)
@@ -2209,79 +2212,6 @@ async def copy_drive_file(
return "\n".join(output_parts)
@server.tool()
@handle_http_errors(
"transfer_drive_ownership", is_read_only=False, service_type="drive"
)
@require_google_service("drive", "drive_file")
async def transfer_drive_ownership(
service,
user_google_email: str,
file_id: str,
new_owner_email: str,
move_to_new_owners_root: bool = False,
) -> str:
"""
Transfers ownership of a Google Drive file or folder to another user.
This is an irreversible operation. The current owner will become an editor.
Only works within the same Google Workspace domain or for personal accounts.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder to transfer. Required.
new_owner_email (str): Email address of the new owner. Required.
move_to_new_owners_root (bool): If True, moves the file to the new owner's My Drive root. Defaults to False.
Returns:
str: Confirmation of the ownership transfer.
"""
logger.info(
f"[transfer_drive_ownership] Invoked. Email: '{user_google_email}', File ID: '{file_id}', New owner: '{new_owner_email}'"
)
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, owners"
)
file_id = resolved_file_id
current_owners = file_metadata.get("owners", [])
current_owner_emails = [o.get("emailAddress", "") for o in current_owners]
permission_body = {
"type": "user",
"role": "owner",
"emailAddress": new_owner_email,
}
await asyncio.to_thread(
service.permissions()
.create(
fileId=file_id,
body=permission_body,
transferOwnership=True,
moveToNewOwnersRoot=move_to_new_owners_root,
supportsAllDrives=True,
fields="id, type, role, emailAddress",
)
.execute
)
output_parts = [
f"Successfully transferred ownership of '{file_metadata.get('name', 'Unknown')}'",
"",
f"New owner: {new_owner_email}",
f"Previous owner(s): {', '.join(current_owner_emails) or 'Unknown'}",
]
if move_to_new_owners_root:
output_parts.append(f"File moved to {new_owner_email}'s My Drive root.")
output_parts.extend(["", "Note: Previous owner now has editor access."])
return "\n".join(output_parts)
@server.tool()
@handle_http_errors(
"set_drive_file_permissions", is_read_only=False, service_type="drive"

View File

@@ -1925,39 +1925,37 @@ async def list_gmail_filters(service, user_google_email: str) -> str:
@server.tool()
@handle_http_errors("create_gmail_filter", service_type="gmail")
@handle_http_errors("manage_gmail_filter", service_type="gmail")
@require_google_service("gmail", "gmail_settings_basic")
async def create_gmail_filter(
async def manage_gmail_filter(
service,
user_google_email: str,
criteria: Annotated[
Dict[str, Any],
Field(
description="Filter criteria object as defined in the Gmail API.",
),
],
action: Annotated[
Dict[str, Any],
Field(
description="Filter action object as defined in the Gmail API.",
),
],
action: str,
criteria: Optional[Dict[str, Any]] = None,
filter_action: Optional[Dict[str, Any]] = None,
filter_id: Optional[str] = None,
) -> str:
"""
Creates a Gmail filter using the users.settings.filters API.
Manages Gmail filters. Supports creating and deleting filters.
Args:
user_google_email (str): The user's Google email address. Required.
criteria (Dict[str, Any]): Criteria for matching messages.
action (Dict[str, Any]): Actions to apply to matched messages.
action (str): Action to perform - "create" or "delete".
criteria (Optional[Dict[str, Any]]): Filter criteria object (required for create).
filter_action (Optional[Dict[str, Any]]): Filter action object (required for create). Named 'filter_action' to avoid shadowing the 'action' parameter.
filter_id (Optional[str]): ID of the filter to delete (required for delete).
Returns:
str: Confirmation message with the created filter ID.
str: Confirmation message with filter details.
"""
logger.info("[create_gmail_filter] Invoked")
filter_body = {"criteria": criteria, "action": action}
action_lower = action.lower().strip()
if action_lower == "create":
if not criteria or not filter_action:
raise ValueError(
"criteria and filter_action are required for create action"
)
logger.info("[manage_gmail_filter] Creating filter")
filter_body = {"criteria": criteria, "action": filter_action}
created_filter = await asyncio.to_thread(
service.users()
.settings()
@@ -1965,47 +1963,33 @@ async def create_gmail_filter(
.create(userId="me", body=filter_body)
.execute
)
filter_id = created_filter.get("id", "(unknown)")
return f"Filter created successfully!\nFilter ID: {filter_id}"
@server.tool()
@handle_http_errors("delete_gmail_filter", service_type="gmail")
@require_google_service("gmail", "gmail_settings_basic")
async def delete_gmail_filter(
service,
user_google_email: str,
filter_id: str = Field(..., description="ID of the filter to delete."),
) -> str:
"""
Deletes a Gmail filter by ID.
Args:
user_google_email (str): The user's Google email address. Required.
filter_id (str): The ID of the filter to delete.
Returns:
str: Confirmation message for the deletion.
"""
logger.info(f"[delete_gmail_filter] Invoked. Filter ID: '{filter_id}'")
fid = created_filter.get("id", "(unknown)")
return f"Filter created successfully!\nFilter ID: {fid}"
elif action_lower == "delete":
if not filter_id:
raise ValueError("filter_id is required for delete action")
logger.info(f"[manage_gmail_filter] Deleting filter {filter_id}")
filter_details = await asyncio.to_thread(
service.users().settings().filters().get(userId="me", id=filter_id).execute
)
await asyncio.to_thread(
service.users().settings().filters().delete(userId="me", id=filter_id).execute
service.users()
.settings()
.filters()
.delete(userId="me", id=filter_id)
.execute
)
criteria = filter_details.get("criteria", {})
action = filter_details.get("action", {})
criteria_info = filter_details.get("criteria", {})
action_info = filter_details.get("action", {})
return (
"Filter deleted successfully!\n"
f"Filter ID: {filter_id}\n"
f"Criteria: {criteria or '(none)'}\n"
f"Action: {action or '(none)'}"
f"Criteria: {criteria_info or '(none)'}\n"
f"Action: {action_info or '(none)'}"
)
else:
raise ValueError(
f"Invalid action '{action_lower}'. Must be 'create' or 'delete'."
)

View File

@@ -33,6 +33,7 @@ async def search_custom(
file_type: Optional[str] = None,
language: Optional[str] = None,
country: Optional[str] = None,
sites: Optional[List[str]] = None,
) -> str:
"""
Performs a search using Google Custom Search JSON API.
@@ -50,6 +51,7 @@ async def search_custom(
file_type (Optional[str]): Filter by file type (e.g., "pdf", "doc").
language (Optional[str]): Language code for results (e.g., "lang_en").
country (Optional[str]): Country code for results (e.g., "countryUS").
sites (Optional[List[str]]): List of sites/domains to restrict search to (e.g., ["example.com", "docs.example.com"]). When provided, results are limited to these sites.
Returns:
str: Formatted search results including title, link, and snippet for each result.
@@ -71,6 +73,12 @@ async def search_custom(
f"[search_custom] Invoked. Email: '{user_google_email}', Query: '{q}', CX: '{cx}'"
)
# Apply site restriction if sites are provided
if sites:
site_query = " OR ".join([f"site:{site}" for site in sites])
q = f"{q} ({site_query})"
logger.info(f"[search_custom] Applied site restriction: {sites}")
# Build the request parameters
params = {
"key": api_key,
@@ -224,50 +232,3 @@ async def get_search_engine_info(service, user_google_email: str) -> str:
logger.info(f"Search engine info retrieved successfully for {user_google_email}")
return confirmation_message
@server.tool()
@handle_http_errors(
"search_custom_siterestrict", is_read_only=True, service_type="customsearch"
)
@require_google_service("customsearch", "customsearch")
async def search_custom_siterestrict(
service,
user_google_email: str,
q: str,
sites: List[str],
num: int = 10,
start: int = 1,
safe: Literal["active", "moderate", "off"] = "off",
) -> str:
"""
Performs a search restricted to specific sites using Google Custom Search.
Args:
user_google_email (str): The user's Google email address. Required.
q (str): The search query. Required.
sites (List[str]): List of sites/domains to search within.
num (int): Number of results to return (1-10). Defaults to 10.
start (int): The index of the first result to return (1-based). Defaults to 1.
safe (Literal["active", "moderate", "off"]): Safe search level. Defaults to "off".
Returns:
str: Formatted search results from the specified sites.
"""
logger.info(
f"[search_custom_siterestrict] Invoked. Email: '{user_google_email}', Query: '{q}', Sites: {sites}"
)
# Build site restriction query
site_query = " OR ".join([f"site:{site}" for site in sites])
full_query = f"{q} ({site_query})"
# Use the main search function with the modified query
return await search_custom(
service=service,
user_google_email=user_google_email,
q=full_query,
num=num,
start=start,
safe=safe,
)

View File

@@ -729,51 +729,88 @@ async def format_sheet_range(
@server.tool()
@handle_http_errors("add_conditional_formatting", service_type="sheets")
@handle_http_errors("manage_conditional_formatting", service_type="sheets")
@require_google_service("sheets", "sheets_write")
async def add_conditional_formatting(
async def manage_conditional_formatting(
service,
user_google_email: str,
spreadsheet_id: str,
range_name: str,
condition_type: str,
action: str,
range_name: Optional[str] = None,
condition_type: Optional[str] = None,
condition_values: Optional[Union[str, List[Union[str, int, float]]]] = None,
background_color: Optional[str] = None,
text_color: Optional[str] = None,
rule_index: Optional[int] = None,
gradient_points: Optional[Union[str, List[dict]]] = None,
sheet_name: Optional[str] = None,
) -> str:
"""
Adds a conditional formatting rule to a range.
Manages conditional formatting rules on a Google Sheet. Supports adding,
updating, and deleting conditional formatting rules via a single tool.
Args:
user_google_email (str): The user's Google email address. Required.
spreadsheet_id (str): The ID of the spreadsheet. Required.
range_name (str): A1-style range (optionally with sheet name). Required.
condition_type (str): Sheets condition type (e.g., NUMBER_GREATER, TEXT_CONTAINS, DATE_BEFORE, CUSTOM_FORMULA).
condition_values (Optional[Union[str, List[Union[str, int, float]]]]): Values for the condition; accepts a list or a JSON string representing a list. Depends on condition_type.
background_color (Optional[str]): Hex background color to apply when condition matches.
action (str): The operation to perform. Must be one of "add", "update",
or "delete".
range_name (Optional[str]): A1-style range (optionally with sheet name).
Required for "add". Optional for "update" (preserves existing ranges
if omitted). Not used for "delete".
condition_type (Optional[str]): Sheets condition type (e.g., NUMBER_GREATER,
TEXT_CONTAINS, DATE_BEFORE, CUSTOM_FORMULA). Required for "add".
Optional for "update" (preserves existing type if omitted).
condition_values (Optional[Union[str, List[Union[str, int, float]]]]): Values
for the condition; accepts a list or a JSON string representing a list.
Depends on condition_type. Used by "add" and "update".
background_color (Optional[str]): Hex background color to apply when
condition matches. Used by "add" and "update".
text_color (Optional[str]): Hex text color to apply when condition matches.
rule_index (Optional[int]): Optional position to insert the rule (0-based) within the sheet's rules.
gradient_points (Optional[Union[str, List[dict]]]): List (or JSON list) of gradient points for a color scale. If provided, a gradient rule is created and boolean parameters are ignored.
Used by "add" and "update".
rule_index (Optional[int]): 0-based index of the rule. For "add", optionally
specifies insertion position. Required for "update" and "delete".
gradient_points (Optional[Union[str, List[dict]]]): List (or JSON list) of
gradient points for a color scale. If provided, a gradient rule is created
and boolean parameters are ignored. Used by "add" and "update".
sheet_name (Optional[str]): Sheet name to locate the rule when range_name is
omitted. Defaults to the first sheet. Used by "update" and "delete".
Returns:
str: Confirmation of the added rule.
str: Confirmation of the operation and the current rule state.
"""
logger.info(
"[add_conditional_formatting] Invoked. Email: '%s', Spreadsheet: %s, Range: %s, Type: %s, Values: %s",
user_google_email,
spreadsheet_id,
range_name,
condition_type,
condition_values,
allowed_actions = {"add", "update", "delete"}
action_normalized = action.strip().lower()
if action_normalized not in allowed_actions:
raise UserInputError(
f"action must be one of {sorted(allowed_actions)}, got '{action}'."
)
if rule_index is not None and (not isinstance(rule_index, int) or rule_index < 0):
raise UserInputError("rule_index must be a non-negative integer when provided.")
logger.info(
"[manage_conditional_formatting] Invoked. Action: '%s', Email: '%s', Spreadsheet: %s",
action_normalized,
user_google_email,
spreadsheet_id,
)
if action_normalized == "add":
if not range_name:
raise UserInputError("range_name is required for action 'add'.")
if not condition_type and not gradient_points:
raise UserInputError(
"condition_type (or gradient_points) is required for action 'add'."
)
if rule_index is not None and (
not isinstance(rule_index, int) or rule_index < 0
):
raise UserInputError(
"rule_index must be a non-negative integer when provided."
)
condition_values_list = _parse_condition_values(condition_values)
gradient_points_list = _parse_gradient_points(gradient_points)
condition_values_list = (
None if gradient_points_list else _parse_condition_values(condition_values)
)
sheets, sheet_titles = await _fetch_sheets_with_rules(service, spreadsheet_id)
grid_range = _parse_a1_range(range_name, sheets)
@@ -793,7 +830,8 @@ async def add_conditional_formatting(
insert_at = rule_index if rule_index is not None else len(current_rules)
if insert_at > len(current_rules):
raise UserInputError(
f"rule_index {insert_at} is out of range for sheet '{target_sheet.get('properties', {}).get('title', 'Unknown')}' "
f"rule_index {insert_at} is out of range for sheet "
f"'{target_sheet.get('properties', {}).get('title', 'Unknown')}' "
f"(current count: {len(current_rules)})."
)
@@ -845,60 +883,25 @@ async def add_conditional_formatting(
return "\n".join(
[
f"Added conditional format on '{range_name}' in spreadsheet {spreadsheet_id} "
f"for {user_google_email}: {rule_desc}{values_desc}; format: {format_desc}.",
f"Added conditional format on '{range_name}' in spreadsheet "
f"{spreadsheet_id} for {user_google_email}: "
f"{rule_desc}{values_desc}; format: {format_desc}.",
state_text,
]
)
@server.tool()
@handle_http_errors("update_conditional_formatting", service_type="sheets")
@require_google_service("sheets", "sheets_write")
async def update_conditional_formatting(
service,
user_google_email: str,
spreadsheet_id: str,
rule_index: int,
range_name: Optional[str] = None,
condition_type: Optional[str] = None,
condition_values: Optional[Union[str, List[Union[str, int, float]]]] = None,
background_color: Optional[str] = None,
text_color: Optional[str] = None,
sheet_name: Optional[str] = None,
gradient_points: Optional[Union[str, List[dict]]] = None,
) -> str:
"""
Updates an existing conditional formatting rule by index on a sheet.
Args:
user_google_email (str): The user's Google email address. Required.
spreadsheet_id (str): The ID of the spreadsheet. Required.
range_name (Optional[str]): A1-style range to apply the updated rule (optionally with sheet name). If omitted, existing ranges are preserved.
rule_index (int): Index of the rule to update (0-based).
condition_type (Optional[str]): Sheets condition type. If omitted, the existing rule's type is preserved.
condition_values (Optional[Union[str, List[Union[str, int, float]]]]): Values for the condition.
background_color (Optional[str]): Hex background color when condition matches.
text_color (Optional[str]): Hex text color when condition matches.
sheet_name (Optional[str]): Sheet name to locate the rule when range_name is omitted. Defaults to first sheet.
gradient_points (Optional[Union[str, List[dict]]]): If provided, updates the rule to a gradient color scale using these points.
Returns:
str: Confirmation of the updated rule and the current rule state.
"""
logger.info(
"[update_conditional_formatting] Invoked. Email: '%s', Spreadsheet: %s, Range: %s, Rule Index: %s",
user_google_email,
spreadsheet_id,
range_name,
rule_index,
)
elif action_normalized == "update":
if rule_index is None:
raise UserInputError("rule_index is required for action 'update'.")
if not isinstance(rule_index, int) or rule_index < 0:
raise UserInputError("rule_index must be a non-negative integer.")
condition_values_list = _parse_condition_values(condition_values)
gradient_points_list = _parse_gradient_points(gradient_points)
condition_values_list = (
None
if gradient_points_list is not None
else _parse_condition_values(condition_values)
)
sheets, sheet_titles = await _fetch_sheets_with_rules(service, spreadsheet_id)
@@ -907,7 +910,9 @@ async def update_conditional_formatting(
if range_name:
grid_range = _parse_a1_range(range_name, sheets)
for sheet in sheets:
if sheet.get("properties", {}).get("sheetId") == grid_range.get("sheetId"):
if sheet.get("properties", {}).get("sheetId") == grid_range.get(
"sheetId"
):
target_sheet = sheet
break
else:
@@ -925,7 +930,8 @@ async def update_conditional_formatting(
rules = target_sheet.get("conditionalFormats", []) or []
if rule_index >= len(rules):
raise UserInputError(
f"rule_index {rule_index} is out of range for sheet '{sheet_title}' (current count: {len(rules)})."
f"rule_index {rule_index} is out of range for sheet "
f"'{sheet_title}' (current count: {len(rules)})."
)
existing_rule = rules[rule_index]
@@ -945,9 +951,18 @@ async def update_conditional_formatting(
rule_desc = "gradient"
format_desc = f"gradient points {len(gradient_points_list)}"
elif "gradientRule" in existing_rule:
if any([background_color, text_color, condition_type, condition_values_list]):
if any(
[
background_color,
text_color,
condition_type,
condition_values_list,
]
):
raise UserInputError(
"Existing rule is a gradient rule. Provide gradient_points to update it, or omit formatting/condition parameters to keep it unchanged."
"Existing rule is a gradient rule. Provide gradient_points "
"to update it, or omit formatting/condition parameters to "
"keep it unchanged."
)
new_rule = {
"ranges": ranges_to_use,
@@ -995,7 +1010,9 @@ async def update_conditional_formatting(
del new_format["textFormat"]
if not new_format:
raise UserInputError("At least one format option must remain on the rule.")
raise UserInputError(
"At least one format option must remain on the rule."
)
new_rule = {
"ranges": ranges_to_use,
@@ -1017,7 +1034,9 @@ async def update_conditional_formatting(
"foregroundColor"
):
format_parts.append("text color updated")
format_desc = ", ".join(format_parts) if format_parts else "format preserved"
format_desc = (
", ".join(format_parts) if format_parts else "format preserved"
)
new_rules_state = copy.deepcopy(rules)
new_rules_state[rule_index] = new_rule
@@ -1046,43 +1065,17 @@ async def update_conditional_formatting(
return "\n".join(
[
f"Updated conditional format at index {rule_index} on sheet '{sheet_title}' in spreadsheet {spreadsheet_id} "
f"for {user_google_email}: {rule_desc}{values_desc}; format: {format_desc}.",
f"Updated conditional format at index {rule_index} on sheet "
f"'{sheet_title}' in spreadsheet {spreadsheet_id} "
f"for {user_google_email}: "
f"{rule_desc}{values_desc}; format: {format_desc}.",
state_text,
]
)
@server.tool()
@handle_http_errors("delete_conditional_formatting", service_type="sheets")
@require_google_service("sheets", "sheets_write")
async def delete_conditional_formatting(
service,
user_google_email: str,
spreadsheet_id: str,
rule_index: int,
sheet_name: Optional[str] = None,
) -> str:
"""
Deletes an existing conditional formatting rule by index on a sheet.
Args:
user_google_email (str): The user's Google email address. Required.
spreadsheet_id (str): The ID of the spreadsheet. Required.
rule_index (int): Index of the rule to delete (0-based).
sheet_name (Optional[str]): Name of the sheet that contains the rule. Defaults to the first sheet if not provided.
Returns:
str: Confirmation of the deletion and the current rule state.
"""
logger.info(
"[delete_conditional_formatting] Invoked. Email: '%s', Spreadsheet: %s, Sheet: %s, Rule Index: %s",
user_google_email,
spreadsheet_id,
sheet_name,
rule_index,
)
else: # action_normalized == "delete"
if rule_index is None:
raise UserInputError("rule_index is required for action 'delete'.")
if not isinstance(rule_index, int) or rule_index < 0:
raise UserInputError("rule_index must be a non-negative integer.")
@@ -1095,7 +1088,8 @@ async def delete_conditional_formatting(
rules = target_sheet.get("conditionalFormats", []) or []
if rule_index >= len(rules):
raise UserInputError(
f"rule_index {rule_index} is out of range for sheet '{target_sheet_name}' (current count: {len(rules)})."
f"rule_index {rule_index} is out of range for sheet "
f"'{target_sheet_name}' (current count: {len(rules)})."
)
new_rules_state = copy.deepcopy(rules)
@@ -1124,7 +1118,9 @@ async def delete_conditional_formatting(
return "\n".join(
[
f"Deleted conditional format at index {rule_index} on sheet '{target_sheet_name}' in spreadsheet {spreadsheet_id} for {user_google_email}.",
f"Deleted conditional format at index {rule_index} on sheet "
f"'{target_sheet_name}' in spreadsheet {spreadsheet_id} "
f"for {user_google_email}.",
state_text,
]
)
@@ -1232,7 +1228,5 @@ async def create_sheet(
_comment_tools = create_comment_tools("spreadsheet", "spreadsheet_id")
# Extract and register the functions
read_sheet_comments = _comment_tools["read_comments"]
create_sheet_comment = _comment_tools["create_comment"]
reply_to_sheet_comment = _comment_tools["reply_to_comment"]
resolve_sheet_comment = _comment_tools["resolve_comment"]
list_spreadsheet_comments = _comment_tools["list_comments"]
manage_spreadsheet_comment = _comment_tools["manage_comment"]

View File

@@ -322,13 +322,9 @@ You can view or download the thumbnail using the provided URL."""
# Create comment management tools for slides
_comment_tools = create_comment_tools("presentation", "presentation_id")
read_presentation_comments = _comment_tools["read_comments"]
create_presentation_comment = _comment_tools["create_comment"]
reply_to_presentation_comment = _comment_tools["reply_to_comment"]
resolve_presentation_comment = _comment_tools["resolve_comment"]
list_presentation_comments = _comment_tools["list_comments"]
manage_presentation_comment = _comment_tools["manage_comment"]
# Aliases for backwards compatibility and intuitive naming
read_slide_comments = read_presentation_comments
create_slide_comment = create_presentation_comment
reply_to_slide_comment = reply_to_presentation_comment
resolve_slide_comment = resolve_presentation_comment
list_slide_comments = list_presentation_comments
manage_slide_comment = manage_presentation_comment

View File

@@ -15,7 +15,7 @@ from mcp import Resource
from auth.oauth_config import is_oauth21_enabled, is_external_oauth21_provider
from auth.service_decorator import require_google_service
from core.server import server
from core.utils import handle_http_errors
from core.utils import UserInputError, handle_http_errors
logger = logging.getLogger(__name__)
@@ -193,27 +193,17 @@ async def get_task_list(
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("create_task_list", service_type="tasks") # type: ignore
async def create_task_list(
# --- Task list _impl functions ---
async def _create_task_list_impl(
service: Resource, user_google_email: str, title: str
) -> str:
"""
Create a new task list.
Args:
user_google_email (str): The user's Google email address. Required.
title (str): The title of the new task list.
Returns:
str: Confirmation message with the new task list ID and details.
"""
"""Implementation for creating a new task list."""
logger.info(
f"[create_task_list] Invoked. Email: '{user_google_email}', Title: '{title}'"
)
try:
body = {"title": title}
result = await asyncio.to_thread(service.tasklists().insert(body=body).execute)
@@ -229,38 +219,15 @@ async def create_task_list(
)
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("update_task_list", service_type="tasks") # type: ignore
async def update_task_list(
async def _update_task_list_impl(
service: Resource, user_google_email: str, task_list_id: str, title: str
) -> str:
"""
Update an existing task list.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the task list to update.
title (str): The new title for the task list.
Returns:
str: Confirmation message with updated task list details.
"""
"""Implementation for updating an existing task list."""
logger.info(
f"[update_task_list] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, New Title: '{title}'"
)
try:
body = {"id": task_list_id, "title": title}
result = await asyncio.to_thread(
@@ -277,54 +244,104 @@ async def update_task_list(
)
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("delete_task_list", service_type="tasks") # type: ignore
async def delete_task_list(
async def _delete_task_list_impl(
service: Resource, user_google_email: str, task_list_id: str
) -> str:
"""
Delete a task list. Note: This will also delete all tasks in the list.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the task list to delete.
Returns:
str: Confirmation message.
"""
"""Implementation for deleting a task list."""
logger.info(
f"[delete_task_list] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}"
)
try:
await asyncio.to_thread(
service.tasklists().delete(tasklist=task_list_id).execute
)
await asyncio.to_thread(service.tasklists().delete(tasklist=task_list_id).execute)
response = f"Task list {task_list_id} has been deleted for {user_google_email}. All tasks in this list have also been deleted."
logger.info(f"Deleted task list {task_list_id} for {user_google_email}")
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
async def _clear_completed_tasks_impl(
service: Resource, user_google_email: str, task_list_id: str
) -> str:
"""Implementation for clearing completed tasks from a task list."""
logger.info(
f"[clear_completed_tasks] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}"
)
await asyncio.to_thread(service.tasks().clear(tasklist=task_list_id).execute)
response = f"All completed tasks have been cleared from task list {task_list_id} for {user_google_email}. The tasks are now hidden and won't appear in default task list views."
logger.info(
f"Cleared completed tasks from list {task_list_id} for {user_google_email}"
)
return response
# --- Consolidated manage_task_list tool ---
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("manage_task_list", service_type="tasks") # type: ignore
async def manage_task_list(
service: Resource,
user_google_email: str,
action: str,
task_list_id: Optional[str] = None,
title: Optional[str] = None,
) -> str:
"""
Manage task lists: create, update, delete, or clear completed tasks.
Args:
user_google_email (str): The user's Google email address. Required.
action (str): The action to perform. Must be one of: "create", "update", "delete", "clear_completed".
task_list_id (Optional[str]): The ID of the task list. Required for "update", "delete", and "clear_completed" actions.
title (Optional[str]): The title for the task list. Required for "create" and "update" actions.
Returns:
str: Result of the requested action.
"""
logger.info(
f"[manage_task_list] Invoked. Email: '{user_google_email}', Action: '{action}'"
)
valid_actions = ("create", "update", "delete", "clear_completed")
if action not in valid_actions:
raise UserInputError(
f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}"
)
if action == "create":
if not title:
raise UserInputError("'title' is required for the 'create' action.")
return await _create_task_list_impl(service, user_google_email, title)
if action == "update":
if not task_list_id:
raise UserInputError("'task_list_id' is required for the 'update' action.")
if not title:
raise UserInputError("'title' is required for the 'update' action.")
return await _update_task_list_impl(
service, user_google_email, task_list_id, title
)
if action == "delete":
if not task_list_id:
raise UserInputError("'task_list_id' is required for the 'delete' action.")
return await _delete_task_list_impl(service, user_google_email, task_list_id)
# action == "clear_completed"
if not task_list_id:
raise UserInputError(
"'task_list_id' is required for the 'clear_completed' action."
)
return await _clear_completed_tasks_impl(service, user_google_email, task_list_id)
# --- Task tools ---
@server.tool() # type: ignore
@@ -633,10 +650,10 @@ async def get_task(
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("create_task", service_type="tasks") # type: ignore
async def create_task(
# --- Task _impl functions ---
async def _create_task_impl(
service: Resource,
user_google_email: str,
task_list_id: str,
@@ -646,26 +663,11 @@ async def create_task(
parent: Optional[str] = None,
previous: Optional[str] = None,
) -> str:
"""
Create a new task in a task list.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the task list to create the task in.
title (str): The title of the task.
notes (Optional[str]): Notes/description for the task.
due (Optional[str]): Due date in RFC 3339 format (e.g., "2024-12-31T23:59:59Z").
parent (Optional[str]): Parent task ID (for subtasks).
previous (Optional[str]): Previous sibling task ID (for positioning).
Returns:
str: Confirmation message with the new task ID and details.
"""
"""Implementation for creating a new task in a task list."""
logger.info(
f"[create_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Title: '{title}'"
)
try:
body = {"title": title}
if notes:
body["notes"] = notes
@@ -698,20 +700,8 @@ async def create_task(
)
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("update_task", service_type="tasks") # type: ignore
async def update_task(
async def _update_task_impl(
service: Resource,
user_google_email: str,
task_list_id: str,
@@ -721,26 +711,11 @@ async def update_task(
status: Optional[str] = None,
due: Optional[str] = None,
) -> str:
"""
Update an existing task.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the task list containing the task.
task_id (str): The ID of the task to update.
title (Optional[str]): New title for the task.
notes (Optional[str]): New notes/description for the task.
status (Optional[str]): New status ("needsAction" or "completed").
due (Optional[str]): New due date in RFC 3339 format.
Returns:
str: Confirmation message with updated task details.
"""
"""Implementation for updating an existing task."""
logger.info(
f"[update_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Task ID: {task_id}"
)
try:
# First get the current task to build the update body
current_task = await asyncio.to_thread(
service.tasks().get(tasklist=task_list_id, task=task_id).execute
@@ -765,9 +740,7 @@ async def update_task(
body["due"] = current_task["due"]
result = await asyncio.to_thread(
service.tasks()
.update(tasklist=task_list_id, task=task_id, body=body)
.execute
service.tasks().update(tasklist=task_list_id, task=task_id, body=body).execute
)
response = f"""Task Updated for {user_google_email}:
@@ -786,38 +759,15 @@ async def update_task(
logger.info(f"Updated task {task_id} for {user_google_email}")
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("delete_task", service_type="tasks") # type: ignore
async def delete_task(
async def _delete_task_impl(
service: Resource, user_google_email: str, task_list_id: str, task_id: str
) -> str:
"""
Delete a task from a task list.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the task list containing the task.
task_id (str): The ID of the task to delete.
Returns:
str: Confirmation message.
"""
"""Implementation for deleting a task from a task list."""
logger.info(
f"[delete_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Task ID: {task_id}"
)
try:
await asyncio.to_thread(
service.tasks().delete(tasklist=task_list_id, task=task_id).execute
)
@@ -827,20 +777,8 @@ async def delete_task(
logger.info(f"Deleted task {task_id} for {user_google_email}")
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("move_task", service_type="tasks") # type: ignore
async def move_task(
async def _move_task_impl(
service: Resource,
user_google_email: str,
task_list_id: str,
@@ -849,25 +787,11 @@ async def move_task(
previous: Optional[str] = None,
destination_task_list: Optional[str] = None,
) -> str:
"""
Move a task to a different position or parent within the same list, or to a different list.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the current task list containing the task.
task_id (str): The ID of the task to move.
parent (Optional[str]): New parent task ID (for making it a subtask).
previous (Optional[str]): Previous sibling task ID (for positioning).
destination_task_list (Optional[str]): Destination task list ID (for moving between lists).
Returns:
str: Confirmation message with updated task details.
"""
"""Implementation for moving a task to a different position, parent, or list."""
logger.info(
f"[move_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Task ID: {task_id}"
)
try:
params = {"tasklist": task_list_id, "task": task_id}
if parent:
params["parent"] = parent
@@ -903,51 +827,108 @@ async def move_task(
logger.info(f"Moved task {task_id} for {user_google_email}")
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
# --- Consolidated manage_task tool ---
@server.tool() # type: ignore
@require_google_service("tasks", "tasks") # type: ignore
@handle_http_errors("clear_completed_tasks", service_type="tasks") # type: ignore
async def clear_completed_tasks(
service: Resource, user_google_email: str, task_list_id: str
@handle_http_errors("manage_task", service_type="tasks") # type: ignore
async def manage_task(
service: Resource,
user_google_email: str,
action: str,
task_list_id: str,
task_id: Optional[str] = None,
title: Optional[str] = None,
notes: Optional[str] = None,
status: Optional[str] = None,
due: Optional[str] = None,
parent: Optional[str] = None,
previous: Optional[str] = None,
destination_task_list: Optional[str] = None,
) -> str:
"""
Clear all completed tasks from a task list. The tasks will be marked as hidden.
Manage tasks: create, update, delete, or move tasks within task lists.
Args:
user_google_email (str): The user's Google email address. Required.
task_list_id (str): The ID of the task list to clear completed tasks from.
action (str): The action to perform. Must be one of: "create", "update", "delete", "move".
task_list_id (str): The ID of the task list. Required for all actions.
task_id (Optional[str]): The ID of the task. Required for "update", "delete", and "move" actions.
title (Optional[str]): The title of the task. Required for "create", optional for "update".
notes (Optional[str]): Notes/description for the task. Used by "create" and "update" actions.
status (Optional[str]): Task status ("needsAction" or "completed"). Used by "update" action.
due (Optional[str]): Due date in RFC 3339 format (e.g., "2024-12-31T23:59:59Z"). Used by "create" and "update" actions.
parent (Optional[str]): Parent task ID (for subtasks). Used by "create" and "move" actions.
previous (Optional[str]): Previous sibling task ID (for positioning). Used by "create" and "move" actions.
destination_task_list (Optional[str]): Destination task list ID (for moving between lists). Used by "move" action.
Returns:
str: Confirmation message.
str: Result of the requested action.
"""
logger.info(
f"[clear_completed_tasks] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}"
f"[manage_task] Invoked. Email: '{user_google_email}', Action: '{action}', Task List ID: {task_list_id}"
)
try:
await asyncio.to_thread(service.tasks().clear(tasklist=task_list_id).execute)
allowed_statuses = {"needsAction", "completed"}
if status is not None and status not in allowed_statuses:
raise UserInputError("invalid status: must be 'needsAction' or 'completed'")
response = f"All completed tasks have been cleared from task list {task_list_id} for {user_google_email}. The tasks are now hidden and won't appear in default task list views."
logger.info(
f"Cleared completed tasks from list {task_list_id} for {user_google_email}"
valid_actions = ("create", "update", "delete", "move")
if action not in valid_actions:
raise UserInputError(
f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}"
)
return response
except HttpError as error:
message = _format_reauth_message(error, user_google_email)
logger.error(message, exc_info=True)
raise Exception(message)
except Exception as e:
message = f"Unexpected error: {e}."
logger.exception(message)
raise Exception(message)
if action == "create":
if status is not None:
raise UserInputError("'status' is only supported for the 'update' action.")
if not title:
raise UserInputError("'title' is required for the 'create' action.")
return await _create_task_impl(
service,
user_google_email,
task_list_id,
title,
notes=notes,
due=due,
parent=parent,
previous=previous,
)
if action == "update":
if status is not None and status not in allowed_statuses:
raise UserInputError("invalid status: must be 'needsAction' or 'completed'")
if not task_id:
raise UserInputError("'task_id' is required for the 'update' action.")
return await _update_task_impl(
service,
user_google_email,
task_list_id,
task_id,
title=title,
notes=notes,
status=status,
due=due,
)
if action == "delete":
if not task_id:
raise UserInputError("'task_id' is required for the 'delete' action.")
return await _delete_task_impl(
service, user_google_email, task_list_id, task_id
)
# action == "move"
if not task_id:
raise UserInputError("'task_id' is required for the 'move' action.")
return await _move_task_impl(
service,
user_google_email,
task_list_id,
task_id,
parent=parent,
previous=previous,
destination_task_list=destination_task_list,
)

View File

@@ -291,9 +291,7 @@ class TestImports:
assert hasattr(contacts_tools, "list_contacts")
assert hasattr(contacts_tools, "get_contact")
assert hasattr(contacts_tools, "search_contacts")
assert hasattr(contacts_tools, "create_contact")
assert hasattr(contacts_tools, "update_contact")
assert hasattr(contacts_tools, "delete_contact")
assert hasattr(contacts_tools, "manage_contact")
def test_import_group_tools(self):
"""Test that group tools can be imported."""
@@ -301,18 +299,13 @@ class TestImports:
assert hasattr(contacts_tools, "list_contact_groups")
assert hasattr(contacts_tools, "get_contact_group")
assert hasattr(contacts_tools, "create_contact_group")
assert hasattr(contacts_tools, "update_contact_group")
assert hasattr(contacts_tools, "delete_contact_group")
assert hasattr(contacts_tools, "modify_contact_group_members")
assert hasattr(contacts_tools, "manage_contact_group")
def test_import_batch_tools(self):
"""Test that batch tools can be imported."""
from gcontacts import contacts_tools
assert hasattr(contacts_tools, "batch_create_contacts")
assert hasattr(contacts_tools, "batch_update_contacts")
assert hasattr(contacts_tools, "batch_delete_contacts")
assert hasattr(contacts_tools, "manage_contacts_batch")
class TestConstants: