From 6753531e9d42e68a67318cb3291425ca73031889 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Sun, 1 Mar 2026 12:36:09 -0500 Subject: [PATCH] refactor tools to consolidate all modify actions --- README.md | 111 +++- README_NEW.md | 87 +-- core/comments.py | 199 +++---- core/tool_tiers.yaml | 64 +- gappsscript/apps_script_tools.py | 94 ++- gcalendar/calendar_tools.py | 231 +++++--- gcontacts/contacts_tools.py | 966 ++++++++++++------------------- gdrive/drive_tools.py | 638 +++++++++----------- gmail/gmail_tools.py | 114 ++-- gsearch/search_tools.py | 53 +- gsheets/sheets_tools.py | 729 +++++++++++------------ gtasks/tasks_tools.py | 350 ++++++----- 12 files changed, 1712 insertions(+), 1924 deletions(-) diff --git a/README.md b/README.md index c6146f1..45a9c3d 100644 --- a/README.md +++ b/README.md @@ -846,9 +846,7 @@ cp .env.oauth21 .env |------|------|-------------| | `list_calendars` | **Core** | List accessible calendars | | `get_events` | **Core** | Retrieve events with time range filtering | -| `create_event` | **Core** | Create events with attachments & reminders | -| `modify_event` | **Core** | Update existing events | -| `delete_event` | Extended | Remove events | +| `manage_event` | **Core** | Create, update, or delete calendar events | @@ -863,15 +861,11 @@ cp .env.oauth21 .env | `create_drive_file` | **Core** | Create files or fetch from URLs | | `create_drive_folder` | **Core** | Create empty folders in Drive or shared drives | | `import_to_google_doc` | **Core** | Import files (MD, DOCX, HTML, etc.) as Google Docs | -| `share_drive_file` | **Core** | Share file with users/groups/domains/anyone | | `get_drive_shareable_link` | **Core** | Get shareable links for a file | | `list_drive_items` | Extended | List folder contents | | `copy_drive_file` | Extended | Copy existing files (templates) with optional renaming | | `update_drive_file` | Extended | Update file metadata, move between folders | -| `batch_share_drive_file` | Extended | Share file with multiple recipients | -| `update_drive_permission` | Extended | Modify permission role | -| `remove_drive_permission` | Extended | Revoke file access | -| `transfer_drive_ownership` | Extended | Transfer file ownership to another user | +| `manage_drive_access` | Extended | Grant, update, revoke permissions, and transfer ownership | | `set_drive_file_permissions` | Extended | Set link sharing and file-level sharing settings | | `get_drive_file_permissions` | Complete | Get detailed file permissions | | `check_drive_file_public_access` | Complete | Check public sharing status | @@ -894,7 +888,9 @@ cp .env.oauth21 .env | `get_gmail_thread_content` | Extended | Get full thread content | | `modify_gmail_message_labels` | Extended | Modify message labels | | `list_gmail_labels` | Extended | List available labels | +| `list_gmail_filters` | Extended | List Gmail filters | | `manage_gmail_label` | Extended | Create/update/delete labels | +| `manage_gmail_filter` | Extended | Create or delete Gmail filters | | `draft_gmail_message` | Extended | Create drafts | | `get_gmail_threads_content_batch` | Complete | Batch retrieve thread content | | `batch_modify_gmail_message_labels` | Complete | Batch modify labels | @@ -965,7 +961,8 @@ Saved files expire after 1 hour and are cleaned up automatically. | `export_doc_to_pdf` | Extended | Export document to PDF | | `create_table_with_data` | Complete | Create data tables | | `debug_table_structure` | Complete | Debug table issues | -| `*_document_comments` | Complete | Read, Reply, Create, Resolve | +| `list_document_comments` | Complete | List all document comments | +| `manage_document_comment` | Complete | Create, reply to, or resolve comments | @@ -984,7 +981,9 @@ Saved files expire after 1 hour and are cleaned up automatically. | `get_spreadsheet_info` | Extended | Get spreadsheet metadata | | `format_sheet_range` | Extended | Apply colors, number formats, text wrapping, alignment, bold/italic, font size | | `create_sheet` | Complete | Add sheets to existing files | -| `*_sheet_comment` | Complete | Read/create/reply/resolve comments | +| `list_spreadsheet_comments` | Complete | List all spreadsheet comments | +| `manage_spreadsheet_comment` | Complete | Create, reply to, or resolve comments | +| `manage_conditional_formatting` | Complete | Add, update, or delete conditional formatting rules | @@ -998,7 +997,8 @@ Saved files expire after 1 hour and are cleaned up automatically. | `batch_update_presentation` | Extended | Apply multiple updates | | `get_page` | Extended | Get specific slide information | | `get_page_thumbnail` | Extended | Generate slide thumbnails | -| `*_presentation_comment` | Complete | Read/create/reply/resolve comments | +| `list_presentation_comments` | Complete | List all presentation comments | +| `manage_presentation_comment` | Complete | Create, reply to, or resolve comments | @@ -1025,12 +1025,10 @@ Saved files expire after 1 hour and are cleaned up automatically. |------|------|-------------| | `list_tasks` | **Core** | List tasks with filtering | | `get_task` | **Core** | Retrieve task details | -| `create_task` | **Core** | Create tasks with hierarchy | -| `update_task` | **Core** | Modify task properties | -| `delete_task` | Extended | Remove tasks | -| `move_task` | Complete | Reposition tasks | -| `clear_completed_tasks` | Complete | Hide completed tasks | -| `*_task_list` | Complete | List/get/create/update/delete task lists | +| `manage_task` | **Core** | Create, update, delete, or move tasks | +| `list_task_lists` | Complete | List task lists | +| `get_task_list` | Complete | Get task list details | +| `manage_task_list` | Complete | Create, update, delete task lists, or clear completed tasks | @@ -1044,14 +1042,11 @@ Saved files expire after 1 hour and are cleaned up automatically. | `search_contacts` | **Core** | Search contacts by name, email, phone | | `get_contact` | **Core** | Retrieve detailed contact info | | `list_contacts` | **Core** | List contacts with pagination | -| `create_contact` | **Core** | Create new contacts | -| `update_contact` | Extended | Update existing contacts | -| `delete_contact` | Extended | Delete contacts | +| `manage_contact` | **Core** | Create, update, or delete contacts | | `list_contact_groups` | Extended | List contact groups/labels | | `get_contact_group` | Extended | Get group details with members | -| `batch_*_contacts` | Complete | Batch create/update/delete contacts | -| `*_contact_group` | Complete | Create/update/delete contact groups | -| `modify_contact_group_members` | Complete | Add/remove contacts from groups | +| `manage_contacts_batch` | Complete | Batch create, update, or delete contacts | +| `manage_contact_group` | Complete | Create, update, delete groups, or modify membership | @@ -1076,9 +1071,8 @@ Saved files expire after 1 hour and are cleaned up automatically. | Tool | Tier | Description | |------|------|-------------| -| `search_custom` | **Core** | Perform web searches | +| `search_custom` | **Core** | Perform web searches (supports site restrictions via sites parameter) | | `get_search_engine_info` | Complete | Retrieve search engine metadata | -| `search_custom_siterestrict` | Extended | Search within specific domains | @@ -1095,10 +1089,8 @@ Saved files expire after 1 hour and are cleaned up automatically. | `create_script_project` | **Core** | Create new standalone or bound project | | `update_script_content` | **Core** | Update or create script files | | `run_script_function` | **Core** | Execute function with parameters | -| `create_deployment` | Extended | Create new script deployment | | `list_deployments` | Extended | List all project deployments | -| `update_deployment` | Extended | Update deployment configuration | -| `delete_deployment` | Extended | Remove deployment | +| `manage_deployment` | Extended | Create, update, or delete script deployments | | `list_script_processes` | Extended | View recent executions and status | @@ -1113,6 +1105,69 @@ Saved files expire after 1 hour and are cleaned up automatically. --- +### 🔄 Tool Consolidation & Migration + +**Tool Count Reduction**: This release consolidates 139 tools down to 111 tools (20% reduction) by combining CRUD operations into action-based `manage_*` tools. + +
+Migration Mapping ← Old tool → New tool mapping + +| Old Tool | New Tool | Action Parameter | +|----------|----------|------------------| +| `create_event` | `manage_event` | `action="create"` | +| `modify_event` | `manage_event` | `action="update"` | +| `delete_event` | `manage_event` | `action="delete"` | +| `share_drive_file` | `manage_drive_access` | `action="grant"` | +| `batch_share_drive_file` | `manage_drive_access` | `action="grant_batch"` | +| `update_drive_permission` | `manage_drive_access` | `action="update"` | +| `remove_drive_permission` | `manage_drive_access` | `action="revoke"` | +| `transfer_drive_ownership` | `manage_drive_access` | `action="transfer_owner"` | +| `create_gmail_filter` | `manage_gmail_filter` | `action="create"` | +| `delete_gmail_filter` | `manage_gmail_filter` | `action="delete"` | +| `create_task` | `manage_task` | `action="create"` | +| `update_task` | `manage_task` | `action="update"` | +| `delete_task` | `manage_task` | `action="delete"` | +| `move_task` | `manage_task` | `action="move"` | +| `create_task_list` | `manage_task_list` | `action="create"` | +| `update_task_list` | `manage_task_list` | `action="update"` | +| `delete_task_list` | `manage_task_list` | `action="delete"` | +| `clear_completed_tasks` | `manage_task_list` | `action="clear_completed"` | +| `create_contact` | `manage_contact` | `action="create"` | +| `update_contact` | `manage_contact` | `action="update"` | +| `delete_contact` | `manage_contact` | `action="delete"` | +| `batch_create_contacts` | `manage_contacts_batch` | `action="create"` | +| `batch_update_contacts` | `manage_contacts_batch` | `action="update"` | +| `batch_delete_contacts` | `manage_contacts_batch` | `action="delete"` | +| `create_contact_group` | `manage_contact_group` | `action="create"` | +| `update_contact_group` | `manage_contact_group` | `action="update"` | +| `delete_contact_group` | `manage_contact_group` | `action="delete"` | +| `modify_contact_group_members` | `manage_contact_group` | `action="modify_members"` | +| `create_deployment` | `manage_deployment` | `action="create"` | +| `update_deployment` | `manage_deployment` | `action="update"` | +| `delete_deployment` | `manage_deployment` | `action="delete"` | +| `add_conditional_formatting` | `manage_conditional_formatting` | `action="add"` | +| `update_conditional_formatting` | `manage_conditional_formatting` | `action="update"` | +| `delete_conditional_formatting` | `manage_conditional_formatting` | `action="delete"` | +| `read_document_comments` | `list_document_comments` | N/A (renamed) | +| `create_document_comment` | `manage_document_comment` | `action="create"` | +| `reply_to_document_comment` | `manage_document_comment` | `action="reply"` | +| `resolve_document_comment` | `manage_document_comment` | `action="resolve"` | +| `read_spreadsheet_comments` | `list_spreadsheet_comments` | N/A (renamed) | +| `create_spreadsheet_comment` | `manage_spreadsheet_comment` | `action="create"` | +| `reply_to_spreadsheet_comment` | `manage_spreadsheet_comment` | `action="reply"` | +| `resolve_spreadsheet_comment` | `manage_spreadsheet_comment` | `action="resolve"` | +| `read_presentation_comments` | `list_presentation_comments` | N/A (renamed) | +| `create_presentation_comment` | `manage_presentation_comment` | `action="create"` | +| `reply_to_presentation_comment` | `manage_presentation_comment` | `action="reply"` | +| `resolve_presentation_comment` | `manage_presentation_comment` | `action="resolve"` | +| `search_custom_siterestrict` | `search_custom` | Use `sites` parameter | + +**Breaking Change**: Legacy tools have been removed. Use the new consolidated tools with appropriate action parameters. + +
+ +--- + ### Connect to Claude Desktop The server supports two transport modes: diff --git a/README_NEW.md b/README_NEW.md index 7debb00..9e01ba0 100644 --- a/README_NEW.md +++ b/README_NEW.md @@ -47,7 +47,7 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only ## 🛠 Tools Reference -### Gmail (11 tools) +### Gmail (10 tools) | Tool | Tier | Description | |------|------|-------------| @@ -60,39 +60,42 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `list_gmail_labels` | Extended | List all system and user labels | | `manage_gmail_label` | Extended | Create, update, delete labels | | `modify_gmail_message_labels` | Extended | Add/remove labels (archive, trash, etc.) | +| `manage_gmail_filter` | Extended | Create or delete Gmail filters | | `get_gmail_threads_content_batch` | Complete | Batch retrieve threads | | `batch_modify_gmail_message_labels` | Complete | Bulk label operations | -**Also includes:** `get_gmail_attachment_content`, `list_gmail_filters`, `create_gmail_filter`, `delete_gmail_filter` +**Also includes:** `get_gmail_attachment_content`, `list_gmail_filters` -### Google Drive (7 tools) +### Google Drive (10 tools) | Tool | Tier | Description | |------|------|-------------| | `search_drive_files` | Core | Search files with Drive query syntax or free text | | `get_drive_file_content` | Core | Read content from Docs, Sheets, Office files (.docx, .xlsx, .pptx) | +| `get_drive_file_download_url` | Core | Download Drive files to local disk | | `create_drive_file` | Core | Create files from content or URL (supports file://, http://, https://) | | `create_drive_folder` | Core | Create empty folders in Drive or shared drives | +| `import_to_google_doc` | Core | Import files (MD, DOCX, HTML, etc.) as Google Docs | +| `get_drive_shareable_link` | Core | Get shareable links for a file | | `list_drive_items` | Extended | List folder contents with shared drive support | +| `copy_drive_file` | Extended | Copy existing files (templates) with optional renaming | | `update_drive_file` | Extended | Update metadata, move between folders, star, trash | -| `get_drive_file_permissions` | Complete | Check sharing status and permissions | +| `manage_drive_access` | Extended | Grant, update, revoke permissions, and transfer ownership | +| `set_drive_file_permissions` | Extended | Set link sharing and file-level sharing settings | +| `get_drive_file_permissions` | Complete | Get detailed file permissions | | `check_drive_file_public_access` | Complete | Verify public link sharing for Docs image insertion | -**Also includes:** `get_drive_file_download_url` for generating download URLs - -### Google Calendar (5 tools) +### Google Calendar (3 tools) | Tool | Tier | Description | |------|------|-------------| | `list_calendars` | Core | List all accessible calendars | | `get_events` | Core | Query events by time range, search, or specific ID | -| `create_event` | Core | Create events with attendees, reminders, Google Meet, attachments | -| `modify_event` | Core | Update any event property including conferencing | -| `delete_event` | Extended | Remove events | +| `manage_event` | Core | Create, update, or delete calendar events | -**Event features:** Timezone support, transparency (busy/free), visibility settings, up to 5 custom reminders +**Event features:** Timezone support, transparency (busy/free), visibility settings, up to 5 custom reminders, Google Meet integration, attendees, attachments -### Google Docs (16 tools) +### Google Docs (14 tools) | Tool | Tier | Description | |------|------|-------------| @@ -103,6 +106,8 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `find_and_replace_doc` | Extended | Global find/replace with case matching | | `list_docs_in_folder` | Extended | List Docs in a specific folder | | `insert_doc_elements` | Extended | Add tables, lists, page breaks | +| `update_paragraph_style` | Extended | Apply heading styles, lists (bulleted/numbered with nesting), and paragraph formatting | +| `get_doc_as_markdown` | Extended | Export document as formatted Markdown with optional comments | | `export_doc_to_pdf` | Extended | Export to PDF and save to Drive | | `insert_doc_image` | Complete | Insert images from Drive or URLs | | `update_doc_headers_footers` | Complete | Modify headers/footers | @@ -110,10 +115,10 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `inspect_doc_structure` | Complete | Analyze document structure for safe insertion points | | `create_table_with_data` | Complete | Create and populate tables in one operation | | `debug_table_structure` | Complete | Debug table cell positions and content | +| `list_document_comments` | Complete | List all document comments | +| `manage_document_comment` | Complete | Create, reply to, or resolve comments | -**Comments:** `read_document_comments`, `create_document_comment`, `reply_to_document_comment`, `resolve_document_comment` - -### Google Sheets (13 tools) +### Google Sheets (9 tools) | Tool | Tier | Description | |------|------|-------------| @@ -124,13 +129,11 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `get_spreadsheet_info` | Extended | Get metadata, sheets, conditional formats | | `format_sheet_range` | Extended | Apply colors, number formats, text wrapping, alignment, bold/italic, font size | | `create_sheet` | Complete | Add sheets to existing spreadsheets | -| `add_conditional_formatting` | Complete | Add boolean or gradient rules | -| `update_conditional_formatting` | Complete | Modify existing rules | -| `delete_conditional_formatting` | Complete | Remove formatting rules | +| `list_spreadsheet_comments` | Complete | List all spreadsheet comments | +| `manage_spreadsheet_comment` | Complete | Create, reply to, or resolve comments | +| `manage_conditional_formatting` | Complete | Add, update, or delete conditional formatting rules | -**Comments:** `read_spreadsheet_comments`, `create_spreadsheet_comment`, `reply_to_spreadsheet_comment`, `resolve_spreadsheet_comment` - -### Google Slides (9 tools) +### Google Slides (7 tools) | Tool | Tier | Description | |------|------|-------------| @@ -139,8 +142,8 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `batch_update_presentation` | Extended | Apply multiple updates (create slides, shapes, etc.) | | `get_page` | Extended | Get specific slide details and elements | | `get_page_thumbnail` | Extended | Generate PNG thumbnails | - -**Comments:** `read_presentation_comments`, `create_presentation_comment`, `reply_to_presentation_comment`, `resolve_presentation_comment` +| `list_presentation_comments` | Complete | List all presentation comments | +| `manage_presentation_comment` | Complete | Create, reply to, or resolve comments | ### Google Forms (6 tools) @@ -153,24 +156,18 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `get_form_response` | Complete | Get individual response details | | `batch_update_form` | Complete | Execute batch updates to forms (questions, items, settings) | -### Google Tasks (12 tools) +### Google Tasks (5 tools) | Tool | Tier | Description | |------|------|-------------| | `list_tasks` | Core | List tasks with filtering, subtask hierarchy preserved | | `get_task` | Core | Get task details | -| `create_task` | Core | Create tasks with notes, due dates, parent/sibling positioning | -| `update_task` | Core | Update task properties | -| `delete_task` | Extended | Remove tasks | +| `manage_task` | Core | Create, update, delete, or move tasks | | `list_task_lists` | Complete | List all task lists | | `get_task_list` | Complete | Get task list details | -| `create_task_list` | Complete | Create new task lists | -| `update_task_list` | Complete | Rename task lists | -| `delete_task_list` | Complete | Delete task lists (and all tasks) | -| `move_task` | Complete | Reposition or move between lists | -| `clear_completed_tasks` | Complete | Hide completed tasks | +| `manage_task_list` | Complete | Create, update, delete task lists, or clear completed tasks | -### Google Apps Script (11 tools) +### Google Apps Script (9 tools) | Tool | Tier | Description | |------|------|-------------| @@ -180,16 +177,27 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `create_script_project` | Core | Create new standalone or bound project | | `update_script_content` | Core | Update or create script files | | `run_script_function` | Core | Execute function with parameters | -| `create_deployment` | Extended | Create new script deployment | | `list_deployments` | Extended | List all project deployments | -| `update_deployment` | Extended | Update deployment configuration | -| `delete_deployment` | Extended | Remove deployment | +| `manage_deployment` | Extended | Create, update, or delete script deployments | | `list_script_processes` | Extended | View recent executions and status | **Enables:** Cross-app automation, persistent workflows, custom business logic execution, script development and debugging **Note:** Trigger management is not currently supported via MCP tools. +### Google Contacts (7 tools) + +| Tool | Tier | Description | +|------|------|-------------| +| `search_contacts` | Core | Search contacts by name, email, phone | +| `get_contact` | Core | Retrieve detailed contact info | +| `list_contacts` | Core | List contacts with pagination | +| `manage_contact` | Core | Create, update, or delete contacts | +| `list_contact_groups` | Extended | List contact groups/labels | +| `get_contact_group` | Extended | Get group details with members | +| `manage_contacts_batch` | Complete | Batch create, update, or delete contacts | +| `manage_contact_group` | Complete | Create, update, delete groups, or modify membership | + ### Google Chat (4 tools) | Tool | Tier | Description | @@ -199,12 +207,11 @@ export OAUTHLIB_INSECURE_TRANSPORT=1 # Development only | `search_messages` | Core | Search across chat history | | `list_spaces` | Extended | List rooms and DMs | -### Google Custom Search (3 tools) +### Google Custom Search (2 tools) | Tool | Tier | Description | |------|------|-------------| -| `search_custom` | Core | Web search with filters (date, file type, language, safe search) | -| `search_custom_siterestrict` | Extended | Search within specific domains | +| `search_custom` | Core | Web search with filters (date, file type, language, safe search, site restrictions via sites parameter) | | `get_search_engine_info` | Complete | Get search engine metadata | **Requires:** `GOOGLE_PSE_API_KEY` and `GOOGLE_PSE_ENGINE_ID` environment variables @@ -219,7 +226,7 @@ Choose a tier based on your needs: |------|-------|----------| | **Core** | ~30 | Essential operations: search, read, create, send | | **Extended** | ~50 | Core + management: labels, folders, batch ops | -| **Complete** | ~80 | Full API: comments, headers, admin functions | +| **Complete** | 111 | Full API: comments, headers, admin functions | ```bash uvx workspace-mcp --tool-tier core # Start minimal diff --git a/core/comments.py b/core/comments.py index c07d878..5b3b0b8 100644 --- a/core/comments.py +++ b/core/comments.py @@ -7,7 +7,7 @@ All Google Workspace apps (Docs, Sheets, Slides) use the Drive API for comment o import logging import asyncio - +from typing import Optional from auth.service_decorator import require_google_service from core.server import server @@ -16,6 +16,28 @@ from core.utils import handle_http_errors logger = logging.getLogger(__name__) +async def _manage_comment_dispatch( + service, app_name: str, file_id: str, action: str, + comment_content: Optional[str] = None, comment_id: Optional[str] = None, +) -> str: + """Route comment management actions to the appropriate implementation.""" + action_lower = action.lower().strip() + if action_lower == "create": + if not comment_content: + raise ValueError("comment_content is required for create action") + return await _create_comment_impl(service, app_name, file_id, comment_content) + elif action_lower == "reply": + if not comment_id or not comment_content: + raise ValueError("comment_id and comment_content are required for reply action") + return await _reply_to_comment_impl(service, app_name, file_id, comment_id, comment_content) + elif action_lower == "resolve": + if not comment_id: + raise ValueError("comment_id is required for resolve action") + return await _resolve_comment_impl(service, app_name, file_id, comment_id) + else: + raise ValueError(f"Invalid action '{action_lower}'. Must be 'create', 'reply', or 'resolve'.") + + def create_comment_tools(app_name: str, file_id_param: str): """ Factory function to create comment management tools for a specific Google Workspace app. @@ -25,165 +47,102 @@ def create_comment_tools(app_name: str, file_id_param: str): file_id_param: Parameter name for the file ID (e.g., "document_id", "spreadsheet_id", "presentation_id") Returns: - Dict containing the four comment management functions with unique names + Dict containing the comment management functions with unique names """ - # Create unique function names based on the app type - read_func_name = f"read_{app_name}_comments" - create_func_name = f"create_{app_name}_comment" - reply_func_name = f"reply_to_{app_name}_comment" - resolve_func_name = f"resolve_{app_name}_comment" + # --- Consolidated tools --- + list_func_name = f"list_{app_name}_comments" + manage_func_name = f"manage_{app_name}_comment" - # Create functions without decorators first, then apply decorators with proper names if file_id_param == "document_id": @require_google_service("drive", "drive_read") - @handle_http_errors(read_func_name, service_type="drive") - async def read_comments( + @handle_http_errors(list_func_name, service_type="drive") + async def list_comments( service, user_google_email: str, document_id: str ) -> str: - """Read all comments from a Google Document.""" + """List all comments from a Google Document.""" return await _read_comments_impl(service, app_name, document_id) @require_google_service("drive", "drive_file") - @handle_http_errors(create_func_name, service_type="drive") - async def create_comment( - service, user_google_email: str, document_id: str, comment_content: str + @handle_http_errors(manage_func_name, service_type="drive") + async def manage_comment( + service, user_google_email: str, document_id: str, action: str, + comment_content: Optional[str] = None, comment_id: Optional[str] = None, ) -> str: - """Create a new comment on a Google Document.""" - return await _create_comment_impl( - service, app_name, document_id, comment_content - ) + """Manage comments on a Google Document. - @require_google_service("drive", "drive_file") - @handle_http_errors(reply_func_name, service_type="drive") - async def reply_to_comment( - service, - user_google_email: str, - document_id: str, - comment_id: str, - reply_content: str, - ) -> str: - """Reply to a specific comment in a Google Document.""" - return await _reply_to_comment_impl( - service, app_name, document_id, comment_id, reply_content - ) - - @require_google_service("drive", "drive_file") - @handle_http_errors(resolve_func_name, service_type="drive") - async def resolve_comment( - service, user_google_email: str, document_id: str, comment_id: str - ) -> str: - """Resolve a comment in a Google Document.""" - return await _resolve_comment_impl( - service, app_name, document_id, comment_id + Actions: + - create: Create a new comment. Requires comment_content. + - reply: Reply to a comment. Requires comment_id and comment_content. + - resolve: Resolve a comment. Requires comment_id. + """ + return await _manage_comment_dispatch( + service, app_name, document_id, action, comment_content, comment_id ) elif file_id_param == "spreadsheet_id": @require_google_service("drive", "drive_read") - @handle_http_errors(read_func_name, service_type="drive") - async def read_comments( + @handle_http_errors(list_func_name, service_type="drive") + async def list_comments( service, user_google_email: str, spreadsheet_id: str ) -> str: - """Read all comments from a Google Spreadsheet.""" + """List all comments from a Google Spreadsheet.""" return await _read_comments_impl(service, app_name, spreadsheet_id) @require_google_service("drive", "drive_file") - @handle_http_errors(create_func_name, service_type="drive") - async def create_comment( - service, user_google_email: str, spreadsheet_id: str, comment_content: str + @handle_http_errors(manage_func_name, service_type="drive") + async def manage_comment( + service, user_google_email: str, spreadsheet_id: str, action: str, + comment_content: Optional[str] = None, comment_id: Optional[str] = None, ) -> str: - """Create a new comment on a Google Spreadsheet.""" - return await _create_comment_impl( - service, app_name, spreadsheet_id, comment_content - ) + """Manage comments on a Google Spreadsheet. - @require_google_service("drive", "drive_file") - @handle_http_errors(reply_func_name, service_type="drive") - async def reply_to_comment( - service, - user_google_email: str, - spreadsheet_id: str, - comment_id: str, - reply_content: str, - ) -> str: - """Reply to a specific comment in a Google Spreadsheet.""" - return await _reply_to_comment_impl( - service, app_name, spreadsheet_id, comment_id, reply_content - ) - - @require_google_service("drive", "drive_file") - @handle_http_errors(resolve_func_name, service_type="drive") - async def resolve_comment( - service, user_google_email: str, spreadsheet_id: str, comment_id: str - ) -> str: - """Resolve a comment in a Google Spreadsheet.""" - return await _resolve_comment_impl( - service, app_name, spreadsheet_id, comment_id + Actions: + - create: Create a new comment. Requires comment_content. + - reply: Reply to a comment. Requires comment_id and comment_content. + - resolve: Resolve a comment. Requires comment_id. + """ + return await _manage_comment_dispatch( + service, app_name, spreadsheet_id, action, comment_content, comment_id ) elif file_id_param == "presentation_id": @require_google_service("drive", "drive_read") - @handle_http_errors(read_func_name, service_type="drive") - async def read_comments( + @handle_http_errors(list_func_name, service_type="drive") + async def list_comments( service, user_google_email: str, presentation_id: str ) -> str: - """Read all comments from a Google Presentation.""" + """List all comments from a Google Presentation.""" return await _read_comments_impl(service, app_name, presentation_id) @require_google_service("drive", "drive_file") - @handle_http_errors(create_func_name, service_type="drive") - async def create_comment( - service, user_google_email: str, presentation_id: str, comment_content: str + @handle_http_errors(manage_func_name, service_type="drive") + async def manage_comment( + service, user_google_email: str, presentation_id: str, action: str, + comment_content: Optional[str] = None, comment_id: Optional[str] = None, ) -> str: - """Create a new comment on a Google Presentation.""" - return await _create_comment_impl( - service, app_name, presentation_id, comment_content + """Manage comments on a Google Presentation. + + Actions: + - create: Create a new comment. Requires comment_content. + - reply: Reply to a comment. Requires comment_id and comment_content. + - resolve: Resolve a comment. Requires comment_id. + """ + return await _manage_comment_dispatch( + service, app_name, presentation_id, action, comment_content, comment_id ) - @require_google_service("drive", "drive_file") - @handle_http_errors(reply_func_name, service_type="drive") - async def reply_to_comment( - service, - user_google_email: str, - presentation_id: str, - comment_id: str, - reply_content: str, - ) -> str: - """Reply to a specific comment in a Google Presentation.""" - return await _reply_to_comment_impl( - service, app_name, presentation_id, comment_id, reply_content - ) - - @require_google_service("drive", "drive_file") - @handle_http_errors(resolve_func_name, service_type="drive") - async def resolve_comment( - service, user_google_email: str, presentation_id: str, comment_id: str - ) -> str: - """Resolve a comment in a Google Presentation.""" - return await _resolve_comment_impl( - service, app_name, presentation_id, comment_id - ) - - # Set the proper function names and register with server - read_comments.__name__ = read_func_name - create_comment.__name__ = create_func_name - reply_to_comment.__name__ = reply_func_name - resolve_comment.__name__ = resolve_func_name - - # Register tools with the server using the proper names - server.tool()(read_comments) - server.tool()(create_comment) - server.tool()(reply_to_comment) - server.tool()(resolve_comment) + list_comments.__name__ = list_func_name + manage_comment.__name__ = manage_func_name + server.tool()(list_comments) + server.tool()(manage_comment) return { - "read_comments": read_comments, - "create_comment": create_comment, - "reply_to_comment": reply_to_comment, - "resolve_comment": resolve_comment, + "list_comments": list_comments, + "manage_comment": manage_comment, } diff --git a/core/tool_tiers.yaml b/core/tool_tiers.yaml index ca929d7..666833b 100644 --- a/core/tool_tiers.yaml +++ b/core/tool_tiers.yaml @@ -13,8 +13,7 @@ gmail: - manage_gmail_label - draft_gmail_message - list_gmail_filters - - create_gmail_filter - - delete_gmail_filter + - manage_gmail_filter complete: - get_gmail_threads_content_batch @@ -29,16 +28,12 @@ drive: - create_drive_file - create_drive_folder - import_to_google_doc - - share_drive_file - get_drive_shareable_link extended: - list_drive_items - copy_drive_file - update_drive_file - - update_drive_permission - - remove_drive_permission - - transfer_drive_ownership - - batch_share_drive_file + - manage_drive_access - set_drive_file_permissions complete: - get_drive_file_permissions @@ -48,10 +43,8 @@ calendar: core: - list_calendars - get_events - - create_event - - modify_event + - manage_event extended: - - delete_event - query_freebusy complete: [] @@ -75,10 +68,8 @@ docs: - inspect_doc_structure - create_table_with_data - debug_table_structure - - read_document_comments - - create_document_comment - - reply_to_document_comment - - resolve_document_comment + - list_document_comments + - manage_document_comment sheets: core: @@ -91,10 +82,9 @@ sheets: - format_sheet_range complete: - create_sheet - - read_spreadsheet_comments - - create_spreadsheet_comment - - reply_to_spreadsheet_comment - - resolve_spreadsheet_comment + - list_spreadsheet_comments + - manage_spreadsheet_comment + - manage_conditional_formatting chat: core: @@ -127,53 +117,37 @@ slides: - get_page - get_page_thumbnail complete: - - read_presentation_comments - - create_presentation_comment - - reply_to_presentation_comment - - resolve_presentation_comment + - list_presentation_comments + - manage_presentation_comment tasks: core: - get_task - list_tasks - - create_task - - update_task - extended: - - delete_task + - manage_task + extended: [] complete: - list_task_lists - get_task_list - - create_task_list - - update_task_list - - delete_task_list - - move_task - - clear_completed_tasks + - manage_task_list contacts: core: - search_contacts - get_contact - list_contacts - - create_contact + - manage_contact extended: - - update_contact - - delete_contact - list_contact_groups - get_contact_group complete: - - batch_create_contacts - - batch_update_contacts - - batch_delete_contacts - - create_contact_group - - update_contact_group - - delete_contact_group - - modify_contact_group_members + - manage_contacts_batch + - manage_contact_group search: core: - search_custom - extended: - - search_custom_siterestrict + extended: [] complete: - get_search_engine_info @@ -187,10 +161,8 @@ appscript: - run_script_function - generate_trigger_code extended: - - create_deployment + - manage_deployment - list_deployments - - update_deployment - - delete_deployment - delete_script_project - list_versions - create_version diff --git a/gappsscript/apps_script_tools.py b/gappsscript/apps_script_tools.py index 20cba10..a99f549 100644 --- a/gappsscript/apps_script_tools.py +++ b/gappsscript/apps_script_tools.py @@ -464,31 +464,55 @@ async def _create_deployment_impl( @server.tool() -@handle_http_errors("create_deployment", service_type="script") +@handle_http_errors("manage_deployment", service_type="script") @require_google_service("script", "script_deployments") -async def create_deployment( +async def manage_deployment( service: Any, user_google_email: str, + action: str, script_id: str, - description: str, + deployment_id: Optional[str] = None, + description: Optional[str] = None, version_description: Optional[str] = None, ) -> str: """ - Creates a new deployment of the script. + Manages Apps Script deployments. Supports creating, updating, and deleting deployments. Args: service: Injected Google API service client user_google_email: User's email address + action: Action to perform - "create", "update", or "delete" script_id: The script project ID - description: Deployment description - version_description: Optional version description + deployment_id: The deployment ID (required for update and delete) + description: Deployment description (required for create, optional for update) + version_description: Optional version description (for create only) Returns: - str: Formatted string with deployment details + str: Formatted string with deployment details or confirmation """ - return await _create_deployment_impl( - service, user_google_email, script_id, description, version_description - ) + action = action.lower().strip() + if action == "create": + if not description: + raise ValueError("description is required for create action") + return await _create_deployment_impl( + service, user_google_email, script_id, description, version_description + ) + elif action == "update": + if not deployment_id: + raise ValueError("deployment_id is required for update action") + return await _update_deployment_impl( + service, user_google_email, script_id, deployment_id, description + ) + elif action == "delete": + if not deployment_id: + raise ValueError("deployment_id is required for delete action") + return await _delete_deployment_impl( + service, user_google_email, script_id, deployment_id + ) + else: + raise ValueError(f"Invalid action '{action}'. Must be 'create', 'update', or 'delete'.") + + async def _list_deployments_impl( @@ -578,32 +602,6 @@ async def _update_deployment_impl( return "\n".join(output) -@server.tool() -@handle_http_errors("update_deployment", service_type="script") -@require_google_service("script", "script_deployments") -async def update_deployment( - service: Any, - user_google_email: str, - script_id: str, - deployment_id: str, - description: Optional[str] = None, -) -> str: - """ - Updates an existing deployment configuration. - - Args: - service: Injected Google API service client - user_google_email: User's email address - script_id: The script project ID - deployment_id: The deployment ID to update - description: Optional new description - - Returns: - str: Formatted string confirming update - """ - return await _update_deployment_impl( - service, user_google_email, script_id, deployment_id, description - ) async def _delete_deployment_impl( @@ -630,30 +628,6 @@ async def _delete_deployment_impl( return output -@server.tool() -@handle_http_errors("delete_deployment", service_type="script") -@require_google_service("script", "script_deployments") -async def delete_deployment( - service: Any, - user_google_email: str, - script_id: str, - deployment_id: str, -) -> str: - """ - Deletes a deployment. - - Args: - service: Injected Google API service client - user_google_email: User's email address - script_id: The script project ID - deployment_id: The deployment ID to delete - - Returns: - str: Confirmation message - """ - return await _delete_deployment_impl( - service, user_google_email, script_id, deployment_id - ) async def _list_script_processes_impl( diff --git a/gcalendar/calendar_tools.py b/gcalendar/calendar_tools.py index 4bc51f7..5bff99c 100644 --- a/gcalendar/calendar_tools.py +++ b/gcalendar/calendar_tools.py @@ -534,10 +534,15 @@ async def get_events( return text_output -@server.tool() -@handle_http_errors("create_event", service_type="calendar") -@require_google_service("calendar", "calendar_events") -async def create_event( + +# --------------------------------------------------------------------------- +# Internal implementation functions for event create/modify/delete. +# These are called by both the consolidated ``manage_event`` tool and the +# legacy single-action tools. +# --------------------------------------------------------------------------- + + +async def _create_event_impl( service, user_google_email: str, summary: str, @@ -558,32 +563,7 @@ async def create_event( guests_can_invite_others: Optional[bool] = None, guests_can_see_other_guests: Optional[bool] = None, ) -> str: - """ - Creates a new event. - - Args: - user_google_email (str): The user's Google email address. Required. - summary (str): Event title. - start_time (str): Start time (RFC3339, e.g., "2023-10-27T10:00:00-07:00" or "2023-10-27" for all-day). - end_time (str): End time (RFC3339, e.g., "2023-10-27T11:00:00-07:00" or "2023-10-28" for all-day). - calendar_id (str): Calendar ID (default: 'primary'). - description (Optional[str]): Event description. - location (Optional[str]): Event location. - attendees (Optional[List[str]]): Attendee email addresses. - timezone (Optional[str]): Timezone (e.g., "America/New_York"). - attachments (Optional[List[str]]): List of Google Drive file URLs or IDs to attach to the event. - add_google_meet (bool): Whether to add a Google Meet video conference to the event. Defaults to False. - reminders (Optional[Union[str, List[Dict[str, Any]]]]): JSON string or list of reminder objects. Each should have 'method' ("popup" or "email") and 'minutes' (0-40320). Max 5 reminders. Example: '[{"method": "popup", "minutes": 15}]' or [{"method": "popup", "minutes": 15}] - use_default_reminders (bool): Whether to use calendar's default reminders. If False, uses custom reminders. Defaults to True. - transparency (Optional[str]): Event transparency for busy/free status. "opaque" shows as Busy (default), "transparent" shows as Available/Free. Defaults to None (uses Google Calendar default). - visibility (Optional[str]): Event visibility. "default" uses calendar default, "public" is visible to all, "private" is visible only to attendees, "confidential" is same as private (legacy). Defaults to None (uses Google Calendar default). - guests_can_modify (Optional[bool]): Whether attendees other than the organizer can modify the event. Defaults to None (uses Google Calendar default of False). - guests_can_invite_others (Optional[bool]): Whether attendees other than the organizer can invite others to the event. Defaults to None (uses Google Calendar default of True). - guests_can_see_other_guests (Optional[bool]): Whether attendees other than the organizer can see who the event's attendees are. Defaults to None (uses Google Calendar default of True). - - Returns: - str: Confirmation message of the successful event creation with event link. - """ + """Internal implementation for creating a calendar event.""" logger.info( f"[create_event] Invoked. Email: '{user_google_email}', Summary: {summary}" ) @@ -809,10 +789,8 @@ def _normalize_attendees( return normalized if normalized else None -@server.tool() -@handle_http_errors("modify_event", service_type="calendar") -@require_google_service("calendar", "calendar_events") -async def modify_event( + +async def _modify_event_impl( service, user_google_email: str, event_id: str, @@ -834,33 +812,7 @@ async def modify_event( guests_can_invite_others: Optional[bool] = None, guests_can_see_other_guests: Optional[bool] = None, ) -> str: - """ - Modifies an existing event. - - Args: - user_google_email (str): The user's Google email address. Required. - event_id (str): The ID of the event to modify. - calendar_id (str): Calendar ID (default: 'primary'). - summary (Optional[str]): New event title. - start_time (Optional[str]): New start time (RFC3339, e.g., "2023-10-27T10:00:00-07:00" or "2023-10-27" for all-day). - end_time (Optional[str]): New end time (RFC3339, e.g., "2023-10-27T11:00:00-07:00" or "2023-10-28" for all-day). - description (Optional[str]): New event description. - location (Optional[str]): New event location. - attendees (Optional[Union[List[str], List[Dict[str, Any]]]]): Attendees as email strings or objects with metadata. Supports: ["email@example.com"] or [{"email": "email@example.com", "responseStatus": "accepted", "organizer": true, "optional": true}]. When using objects, existing metadata (responseStatus, organizer, optional) is preserved. New attendees default to responseStatus="needsAction". - timezone (Optional[str]): New timezone (e.g., "America/New_York"). - add_google_meet (Optional[bool]): Whether to add or remove Google Meet video conference. If True, adds Google Meet; if False, removes it; if None, leaves unchanged. - reminders (Optional[Union[str, List[Dict[str, Any]]]]): JSON string or list of reminder objects to replace existing reminders. Each should have 'method' ("popup" or "email") and 'minutes' (0-40320). Max 5 reminders. Example: '[{"method": "popup", "minutes": 15}]' or [{"method": "popup", "minutes": 15}] - use_default_reminders (Optional[bool]): Whether to use calendar's default reminders. If specified, overrides current reminder settings. - transparency (Optional[str]): Event transparency for busy/free status. "opaque" shows as Busy, "transparent" shows as Available/Free. If None, preserves existing transparency setting. - visibility (Optional[str]): Event visibility. "default" uses calendar default, "public" is visible to all, "private" is visible only to attendees, "confidential" is same as private (legacy). If None, preserves existing visibility setting. - color_id (Optional[str]): Event color ID (1-11). If None, preserves existing color. - guests_can_modify (Optional[bool]): Whether attendees other than the organizer can modify the event. If None, preserves existing setting. - guests_can_invite_others (Optional[bool]): Whether attendees other than the organizer can invite others to the event. If None, preserves existing setting. - guests_can_see_other_guests (Optional[bool]): Whether attendees other than the organizer can see who the event's attendees are. If None, preserves existing setting. - - Returns: - str: Confirmation message of the successful event modification with event link. - """ + """Internal implementation for modifying a calendar event.""" logger.info( f"[modify_event] Invoked. Email: '{user_google_email}', Event ID: {event_id}" ) @@ -1075,23 +1027,13 @@ async def modify_event( return confirmation_message -@server.tool() -@handle_http_errors("delete_event", service_type="calendar") -@require_google_service("calendar", "calendar_events") -async def delete_event( - service, user_google_email: str, event_id: str, calendar_id: str = "primary" +async def _delete_event_impl( + service, + user_google_email: str, + event_id: str, + calendar_id: str = "primary", ) -> str: - """ - Deletes an existing event. - - Args: - user_google_email (str): The user's Google email address. Required. - event_id (str): The ID of the event to delete. - calendar_id (str): Calendar ID (default: 'primary'). - - Returns: - str: Confirmation message of the successful event deletion. - """ + """Internal implementation for deleting a calendar event.""" logger.info( f"[delete_event] Invoked. Email: '{user_google_email}', Event ID: {event_id}" ) @@ -1133,6 +1075,141 @@ async def delete_event( return confirmation_message +# --------------------------------------------------------------------------- +# Consolidated event management tool +# --------------------------------------------------------------------------- + + +@server.tool() +@handle_http_errors("manage_event", service_type="calendar") +@require_google_service("calendar", "calendar_events") +async def manage_event( + service, + user_google_email: str, + action: str, + summary: Optional[str] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + event_id: Optional[str] = None, + calendar_id: str = "primary", + description: Optional[str] = None, + location: Optional[str] = None, + attendees: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + timezone: Optional[str] = None, + attachments: Optional[List[str]] = None, + add_google_meet: Optional[bool] = None, + reminders: Optional[Union[str, List[Dict[str, Any]]]] = None, + use_default_reminders: Optional[bool] = None, + transparency: Optional[str] = None, + visibility: Optional[str] = None, + color_id: Optional[str] = None, + guests_can_modify: Optional[bool] = None, + guests_can_invite_others: Optional[bool] = None, + guests_can_see_other_guests: Optional[bool] = None, +) -> str: + """ + Manages calendar events. Supports creating, updating, and deleting events. + + Args: + user_google_email (str): The user's Google email address. Required. + action (str): Action to perform - "create", "update", or "delete". + summary (Optional[str]): Event title (required for create). + start_time (Optional[str]): Start time in RFC3339 format (required for create). + end_time (Optional[str]): End time in RFC3339 format (required for create). + event_id (Optional[str]): Event ID (required for update and delete). + calendar_id (str): Calendar ID (default: 'primary'). + description (Optional[str]): Event description. + location (Optional[str]): Event location. + attendees (Optional[Union[List[str], List[Dict[str, Any]]]]): Attendee email addresses or objects. + timezone (Optional[str]): Timezone (e.g., "America/New_York"). + attachments (Optional[List[str]]): List of Google Drive file URLs or IDs to attach. + add_google_meet (Optional[bool]): Whether to add/remove Google Meet. + reminders (Optional[Union[str, List[Dict[str, Any]]]]): Custom reminder objects. + use_default_reminders (Optional[bool]): Whether to use default reminders. + transparency (Optional[str]): "opaque" (busy) or "transparent" (free). + visibility (Optional[str]): "default", "public", "private", or "confidential". + color_id (Optional[str]): Event color ID (1-11, update only). + guests_can_modify (Optional[bool]): Whether attendees can modify. + guests_can_invite_others (Optional[bool]): Whether attendees can invite others. + guests_can_see_other_guests (Optional[bool]): Whether attendees can see other guests. + + Returns: + str: Confirmation message with event details. + """ + action_lower = action.lower().strip() + if action_lower == "create": + if not summary or not start_time or not end_time: + raise ValueError("summary, start_time, and end_time are required for create action") + return await _create_event_impl( + service=service, + user_google_email=user_google_email, + summary=summary, + start_time=start_time, + end_time=end_time, + calendar_id=calendar_id, + description=description, + location=location, + attendees=attendees, + timezone=timezone, + attachments=attachments, + add_google_meet=add_google_meet or False, + reminders=reminders, + use_default_reminders=use_default_reminders if use_default_reminders is not None else True, + transparency=transparency, + visibility=visibility, + guests_can_modify=guests_can_modify, + guests_can_invite_others=guests_can_invite_others, + guests_can_see_other_guests=guests_can_see_other_guests, + ) + elif action_lower == "update": + if not event_id: + raise ValueError("event_id is required for update action") + return await _modify_event_impl( + service=service, + user_google_email=user_google_email, + event_id=event_id, + calendar_id=calendar_id, + summary=summary, + start_time=start_time, + end_time=end_time, + description=description, + location=location, + attendees=attendees, + timezone=timezone, + add_google_meet=add_google_meet, + reminders=reminders, + use_default_reminders=use_default_reminders, + transparency=transparency, + visibility=visibility, + color_id=color_id, + guests_can_modify=guests_can_modify, + guests_can_invite_others=guests_can_invite_others, + guests_can_see_other_guests=guests_can_see_other_guests, + ) + elif action_lower == "delete": + if not event_id: + raise ValueError("event_id is required for delete action") + return await _delete_event_impl( + service=service, + user_google_email=user_google_email, + event_id=event_id, + calendar_id=calendar_id, + ) + else: + raise ValueError(f"Invalid action '{action_lower}'. Must be 'create', 'update', or 'delete'.") + + +# --------------------------------------------------------------------------- +# Legacy single-action tools (deprecated -- prefer ``manage_event``) +# --------------------------------------------------------------------------- + + + + + + + + @server.tool() @handle_http_errors("query_freebusy", is_read_only=True, service_type="calendar") @require_google_service("calendar", "calendar_read") diff --git a/gcontacts/contacts_tools.py b/gcontacts/contacts_tools.py index 38e08f8..012a4a0 100644 --- a/gcontacts/contacts_tools.py +++ b/gcontacts/contacts_tools.py @@ -414,10 +414,12 @@ async def search_contacts( @server.tool() @require_google_service("people", "contacts") -@handle_http_errors("create_contact", service_type="people") -async def create_contact( +@handle_http_errors("manage_contact", service_type="people") +async def manage_contact( service: Resource, user_google_email: str, + action: str, + contact_id: Optional[str] = None, given_name: Optional[str] = None, family_name: Optional[str] = None, email: Optional[str] = None, @@ -427,222 +429,140 @@ async def create_contact( notes: Optional[str] = None, ) -> str: """ - Create a new contact. + Create, update, or delete a contact. Consolidated tool replacing create_contact, + update_contact, and delete_contact. Args: user_google_email (str): The user's Google email address. Required. - given_name (Optional[str]): First name. - family_name (Optional[str]): Last name. - email (Optional[str]): Email address. - phone (Optional[str]): Phone number. - organization (Optional[str]): Company/organization name. - job_title (Optional[str]): Job title. - notes (Optional[str]): Additional notes. + action (str): The action to perform: "create", "update", or "delete". + contact_id (Optional[str]): The contact ID. Required for "update" and "delete" actions. + given_name (Optional[str]): First name (for create/update). + family_name (Optional[str]): Last name (for create/update). + email (Optional[str]): Email address (for create/update). + phone (Optional[str]): Phone number (for create/update). + organization (Optional[str]): Company/organization name (for create/update). + job_title (Optional[str]): Job title (for create/update). + notes (Optional[str]): Additional notes (for create/update). Returns: - str: Confirmation with the new contact's details. + str: Result of the action performed. """ + action = action.lower().strip() + if action not in ("create", "update", "delete"): + raise Exception( + f"Invalid action '{action}'. Must be 'create', 'update', or 'delete'." + ) + logger.info( - f"[create_contact] Invoked. Email: '{user_google_email}', Name: '{given_name} {family_name}'" + f"[manage_contact] Invoked. Action: '{action}', Email: '{user_google_email}'" ) try: - body = _build_person_body( - given_name=given_name, - family_name=family_name, - email=email, - phone=phone, - organization=organization, - job_title=job_title, - notes=notes, - ) - - if not body: - raise Exception( - "At least one field (name, email, phone, etc.) must be provided." + if action == "create": + body = _build_person_body( + given_name=given_name, + family_name=family_name, + email=email, + phone=phone, + organization=organization, + job_title=job_title, + notes=notes, ) - result = await asyncio.to_thread( - service.people() - .createContact(body=body, personFields=DETAILED_PERSON_FIELDS) - .execute - ) + if not body: + raise Exception( + "At least one field (name, email, phone, etc.) must be provided." + ) - response = f"Contact Created for {user_google_email}:\n\n" - response += _format_contact(result, detailed=True) - - contact_id = result.get("resourceName", "").replace("people/", "") - logger.info(f"Created contact {contact_id} for {user_google_email}") - return response - - except HttpError as error: - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) - - -# ============================================================================= -# Extended Tier Tools -# ============================================================================= - - -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("update_contact", service_type="people") -async def update_contact( - service: Resource, - user_google_email: str, - contact_id: str, - given_name: Optional[str] = None, - family_name: Optional[str] = None, - email: Optional[str] = None, - phone: Optional[str] = None, - organization: Optional[str] = None, - job_title: Optional[str] = None, - notes: Optional[str] = None, -) -> str: - """ - Update an existing contact. Note: This replaces fields, not merges them. - - Args: - user_google_email (str): The user's Google email address. Required. - contact_id (str): The contact ID to update. - given_name (Optional[str]): New first name. - family_name (Optional[str]): New last name. - email (Optional[str]): New email address. - phone (Optional[str]): New phone number. - organization (Optional[str]): New company/organization name. - job_title (Optional[str]): New job title. - notes (Optional[str]): New notes. - - Returns: - str: Confirmation with updated contact details. - """ - # Normalize resource name - if not contact_id.startswith("people/"): - resource_name = f"people/{contact_id}" - else: - resource_name = contact_id - - logger.info( - f"[update_contact] Invoked. Email: '{user_google_email}', Contact: {resource_name}" - ) - - try: - # First fetch the contact to get the etag - current = await asyncio.to_thread( - service.people() - .get(resourceName=resource_name, personFields=DETAILED_PERSON_FIELDS) - .execute - ) - - etag = current.get("etag") - if not etag: - raise Exception("Unable to get contact etag for update.") - - # Build update body - body = _build_person_body( - given_name=given_name, - family_name=family_name, - email=email, - phone=phone, - organization=organization, - job_title=job_title, - notes=notes, - ) - - if not body: - raise Exception( - "At least one field (name, email, phone, etc.) must be provided." + result = await asyncio.to_thread( + service.people() + .createContact(body=body, personFields=DETAILED_PERSON_FIELDS) + .execute ) - body["etag"] = etag + response = f"Contact Created for {user_google_email}:\n\n" + response += _format_contact(result, detailed=True) - # Determine which fields to update - update_person_fields = [] - if "names" in body: - update_person_fields.append("names") - if "emailAddresses" in body: - update_person_fields.append("emailAddresses") - if "phoneNumbers" in body: - update_person_fields.append("phoneNumbers") - if "organizations" in body: - update_person_fields.append("organizations") - if "biographies" in body: - update_person_fields.append("biographies") - if "addresses" in body: - update_person_fields.append("addresses") + created_id = result.get("resourceName", "").replace("people/", "") + logger.info(f"Created contact {created_id} for {user_google_email}") + return response - result = await asyncio.to_thread( - service.people() - .updateContact( - resourceName=resource_name, - body=body, - updatePersonFields=",".join(update_person_fields), - personFields=DETAILED_PERSON_FIELDS, + # update and delete both require contact_id + if not contact_id: + raise Exception(f"contact_id is required for '{action}' action.") + + # Normalize resource name + if not contact_id.startswith("people/"): + resource_name = f"people/{contact_id}" + else: + resource_name = contact_id + + if action == "update": + # Fetch the contact to get the etag + current = await asyncio.to_thread( + service.people() + .get(resourceName=resource_name, personFields=DETAILED_PERSON_FIELDS) + .execute ) - .execute - ) - response = f"Contact Updated for {user_google_email}:\n\n" - response += _format_contact(result, detailed=True) + etag = current.get("etag") + if not etag: + raise Exception("Unable to get contact etag for update.") - logger.info(f"Updated contact {resource_name} for {user_google_email}") - return response + body = _build_person_body( + given_name=given_name, + family_name=family_name, + email=email, + phone=phone, + organization=organization, + job_title=job_title, + notes=notes, + ) - except HttpError as error: - if error.resp.status == 404: - message = f"Contact not found: {contact_id}" - logger.warning(message) - raise Exception(message) - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) + if not body: + raise Exception( + "At least one field (name, email, phone, etc.) must be provided." + ) + body["etag"] = etag -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("delete_contact", service_type="people") -async def delete_contact( - service: Resource, - user_google_email: str, - contact_id: str, -) -> str: - """ - Delete a contact. + update_person_fields = [] + if "names" in body: + update_person_fields.append("names") + if "emailAddresses" in body: + update_person_fields.append("emailAddresses") + if "phoneNumbers" in body: + update_person_fields.append("phoneNumbers") + if "organizations" in body: + update_person_fields.append("organizations") + if "biographies" in body: + update_person_fields.append("biographies") + if "addresses" in body: + update_person_fields.append("addresses") - Args: - user_google_email (str): The user's Google email address. Required. - contact_id (str): The contact ID to delete. + result = await asyncio.to_thread( + service.people() + .updateContact( + resourceName=resource_name, + body=body, + updatePersonFields=",".join(update_person_fields), + personFields=DETAILED_PERSON_FIELDS, + ) + .execute + ) - Returns: - str: Confirmation message. - """ - # Normalize resource name - if not contact_id.startswith("people/"): - resource_name = f"people/{contact_id}" - else: - resource_name = contact_id + response = f"Contact Updated for {user_google_email}:\n\n" + response += _format_contact(result, detailed=True) - logger.info( - f"[delete_contact] Invoked. Email: '{user_google_email}', Contact: {resource_name}" - ) + logger.info(f"Updated contact {resource_name} for {user_google_email}") + return response - try: + # action == "delete" await asyncio.to_thread( service.people().deleteContact(resourceName=resource_name).execute ) response = f"Contact {contact_id} has been deleted for {user_google_email}." - logger.info(f"Deleted contact {resource_name} for {user_google_email}") return response @@ -660,6 +580,11 @@ async def delete_contact( raise Exception(message) +# ============================================================================= +# Extended Tier Tools +# ============================================================================= + + @server.tool() @require_google_service("people", "contacts_read") @handle_http_errors("list_contact_groups", service_type="people") @@ -811,264 +736,205 @@ async def get_contact_group( @server.tool() @require_google_service("people", "contacts") -@handle_http_errors("batch_create_contacts", service_type="people") -async def batch_create_contacts( +@handle_http_errors("manage_contacts_batch", service_type="people") +async def manage_contacts_batch( service: Resource, user_google_email: str, - contacts: List[Dict[str, str]], + action: str, + contacts: Optional[List[Dict[str, str]]] = None, + updates: Optional[List[Dict[str, str]]] = None, + contact_ids: Optional[List[str]] = None, ) -> str: """ - Create multiple contacts in a batch operation. + Batch create, update, or delete contacts. Consolidated tool replacing + batch_create_contacts, batch_update_contacts, and batch_delete_contacts. Args: user_google_email (str): The user's Google email address. Required. - contacts (List[Dict[str, str]]): List of contact dictionaries with fields: - - given_name: First name - - family_name: Last name - - email: Email address - - phone: Phone number - - organization: Company name - - job_title: Job title + action (str): The action to perform: "create", "update", or "delete". + contacts (Optional[List[Dict[str, str]]]): List of contact dicts for "create" action. + Each dict may contain: given_name, family_name, email, phone, organization, job_title. + updates (Optional[List[Dict[str, str]]]): List of update dicts for "update" action. + Each dict must contain contact_id and may contain: given_name, family_name, + email, phone, organization, job_title. + contact_ids (Optional[List[str]]): List of contact IDs for "delete" action. Returns: - str: Confirmation with created contacts. + str: Result of the batch action performed. """ + action = action.lower().strip() + if action not in ("create", "update", "delete"): + raise Exception( + f"Invalid action '{action}'. Must be 'create', 'update', or 'delete'." + ) + logger.info( - f"[batch_create_contacts] Invoked. Email: '{user_google_email}', Count: {len(contacts)}" + f"[manage_contacts_batch] Invoked. Action: '{action}', Email: '{user_google_email}'" ) try: - if not contacts: - raise Exception("At least one contact must be provided.") + if action == "create": + if not contacts: + raise Exception( + "contacts parameter is required for 'create' action." + ) - if len(contacts) > 200: - raise Exception("Maximum 200 contacts can be created in a batch.") + if len(contacts) > 200: + raise Exception("Maximum 200 contacts can be created in a batch.") - # Build batch request body - contact_bodies = [] - for contact in contacts: - body = _build_person_body( - given_name=contact.get("given_name"), - family_name=contact.get("family_name"), - email=contact.get("email"), - phone=contact.get("phone"), - organization=contact.get("organization"), - job_title=contact.get("job_title"), - ) - if body: - contact_bodies.append({"contactPerson": body}) + contact_bodies = [] + for contact in contacts: + body = _build_person_body( + given_name=contact.get("given_name"), + family_name=contact.get("family_name"), + email=contact.get("email"), + phone=contact.get("phone"), + organization=contact.get("organization"), + job_title=contact.get("job_title"), + ) + if body: + contact_bodies.append({"contactPerson": body}) - if not contact_bodies: - raise Exception("No valid contact data provided.") + if not contact_bodies: + raise Exception("No valid contact data provided.") - batch_body = { - "contacts": contact_bodies, - "readMask": DEFAULT_PERSON_FIELDS, - } + batch_body = { + "contacts": contact_bodies, + "readMask": DEFAULT_PERSON_FIELDS, + } - result = await asyncio.to_thread( - service.people().batchCreateContacts(body=batch_body).execute - ) - - created_people = result.get("createdPeople", []) - - response = f"Batch Create Results for {user_google_email}:\n\n" - response += f"Created {len(created_people)} contacts:\n\n" - - for item in created_people: - person = item.get("person", {}) - response += _format_contact(person) + "\n\n" - - logger.info( - f"Batch created {len(created_people)} contacts for {user_google_email}" - ) - return response - - except HttpError as error: - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) - - -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("batch_update_contacts", service_type="people") -async def batch_update_contacts( - service: Resource, - user_google_email: str, - updates: List[Dict[str, str]], -) -> str: - """ - Update multiple contacts in a batch operation. - - Args: - user_google_email (str): The user's Google email address. Required. - updates (List[Dict[str, str]]): List of update dictionaries with fields: - - contact_id: The contact ID to update (required) - - given_name: New first name - - family_name: New last name - - email: New email address - - phone: New phone number - - organization: New company name - - job_title: New job title - - Returns: - str: Confirmation with updated contacts. - """ - logger.info( - f"[batch_update_contacts] Invoked. Email: '{user_google_email}', Count: {len(updates)}" - ) - - try: - if not updates: - raise Exception("At least one update must be provided.") - - if len(updates) > 200: - raise Exception("Maximum 200 contacts can be updated in a batch.") - - # First, fetch all contacts to get their etags - resource_names = [] - for update in updates: - contact_id = update.get("contact_id") - if not contact_id: - raise Exception("Each update must include a contact_id.") - if not contact_id.startswith("people/"): - contact_id = f"people/{contact_id}" - resource_names.append(contact_id) - - # Batch get contacts for etags - batch_get_result = await asyncio.to_thread( - service.people() - .getBatchGet( - resourceNames=resource_names, - personFields="metadata", - ) - .execute - ) - - etags = {} - for response in batch_get_result.get("responses", []): - person = response.get("person", {}) - resource_name = person.get("resourceName") - etag = person.get("etag") - if resource_name and etag: - etags[resource_name] = etag - - # Build batch update body - update_bodies = [] - update_fields_set: set = set() - - for update in updates: - contact_id = update.get("contact_id", "") - if not contact_id.startswith("people/"): - contact_id = f"people/{contact_id}" - - etag = etags.get(contact_id) - if not etag: - logger.warning(f"No etag found for {contact_id}, skipping") - continue - - body = _build_person_body( - given_name=update.get("given_name"), - family_name=update.get("family_name"), - email=update.get("email"), - phone=update.get("phone"), - organization=update.get("organization"), - job_title=update.get("job_title"), + result = await asyncio.to_thread( + service.people().batchCreateContacts(body=batch_body).execute ) - if body: - body["resourceName"] = contact_id - body["etag"] = etag - update_bodies.append({"person": body}) + created_people = result.get("createdPeople", []) - # Track which fields are being updated - if "names" in body: - update_fields_set.add("names") - if "emailAddresses" in body: - update_fields_set.add("emailAddresses") - if "phoneNumbers" in body: - update_fields_set.add("phoneNumbers") - if "organizations" in body: - update_fields_set.add("organizations") + response = f"Batch Create Results for {user_google_email}:\n\n" + response += f"Created {len(created_people)} contacts:\n\n" - if not update_bodies: - raise Exception("No valid update data provided.") + for item in created_people: + person = item.get("person", {}) + response += _format_contact(person) + "\n\n" - batch_body = { - "contacts": update_bodies, - "updateMask": ",".join(update_fields_set), - "readMask": DEFAULT_PERSON_FIELDS, - } + logger.info( + f"Batch created {len(created_people)} contacts for {user_google_email}" + ) + return response - result = await asyncio.to_thread( - service.people().batchUpdateContacts(body=batch_body).execute - ) + if action == "update": + if not updates: + raise Exception( + "updates parameter is required for 'update' action." + ) - update_results = result.get("updateResult", {}) + if len(updates) > 200: + raise Exception("Maximum 200 contacts can be updated in a batch.") - response = f"Batch Update Results for {user_google_email}:\n\n" - response += f"Updated {len(update_results)} contacts:\n\n" + # Fetch all contacts to get their etags + resource_names = [] + for update in updates: + cid = update.get("contact_id") + if not cid: + raise Exception("Each update must include a contact_id.") + if not cid.startswith("people/"): + cid = f"people/{cid}" + resource_names.append(cid) - for resource_name, update_result in update_results.items(): - person = update_result.get("person", {}) - response += _format_contact(person) + "\n\n" + batch_get_result = await asyncio.to_thread( + service.people() + .getBatchGet( + resourceNames=resource_names, + personFields="metadata", + ) + .execute + ) - logger.info( - f"Batch updated {len(update_results)} contacts for {user_google_email}" - ) - return response + etags = {} + for resp in batch_get_result.get("responses", []): + person = resp.get("person", {}) + rname = person.get("resourceName") + etag = person.get("etag") + if rname and etag: + etags[rname] = etag - except HttpError as error: - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) + update_bodies = [] + update_fields_set: set = set() + for update in updates: + cid = update.get("contact_id", "") + if not cid.startswith("people/"): + cid = f"people/{cid}" -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("batch_delete_contacts", service_type="people") -async def batch_delete_contacts( - service: Resource, - user_google_email: str, - contact_ids: List[str], -) -> str: - """ - Delete multiple contacts in a batch operation. + etag = etags.get(cid) + if not etag: + logger.warning(f"No etag found for {cid}, skipping") + continue - Args: - user_google_email (str): The user's Google email address. Required. - contact_ids (List[str]): List of contact IDs to delete. + body = _build_person_body( + given_name=update.get("given_name"), + family_name=update.get("family_name"), + email=update.get("email"), + phone=update.get("phone"), + organization=update.get("organization"), + job_title=update.get("job_title"), + ) - Returns: - str: Confirmation message. - """ - logger.info( - f"[batch_delete_contacts] Invoked. Email: '{user_google_email}', Count: {len(contact_ids)}" - ) + if body: + body["resourceName"] = cid + body["etag"] = etag + update_bodies.append({"person": body}) - try: + if "names" in body: + update_fields_set.add("names") + if "emailAddresses" in body: + update_fields_set.add("emailAddresses") + if "phoneNumbers" in body: + update_fields_set.add("phoneNumbers") + if "organizations" in body: + update_fields_set.add("organizations") + + if not update_bodies: + raise Exception("No valid update data provided.") + + batch_body = { + "contacts": update_bodies, + "updateMask": ",".join(update_fields_set), + "readMask": DEFAULT_PERSON_FIELDS, + } + + result = await asyncio.to_thread( + service.people().batchUpdateContacts(body=batch_body).execute + ) + + update_results = result.get("updateResult", {}) + + response = f"Batch Update Results for {user_google_email}:\n\n" + response += f"Updated {len(update_results)} contacts:\n\n" + + for rname, update_result in update_results.items(): + person = update_result.get("person", {}) + response += _format_contact(person) + "\n\n" + + logger.info( + f"Batch updated {len(update_results)} contacts for {user_google_email}" + ) + return response + + # action == "delete" if not contact_ids: - raise Exception("At least one contact ID must be provided.") + raise Exception( + "contact_ids parameter is required for 'delete' action." + ) if len(contact_ids) > 500: raise Exception("Maximum 500 contacts can be deleted in a batch.") - # Normalize resource names resource_names = [] - for contact_id in contact_ids: - if not contact_id.startswith("people/"): - resource_names.append(f"people/{contact_id}") + for cid in contact_ids: + if not cid.startswith("people/"): + resource_names.append(f"people/{cid}") else: - resource_names.append(contact_id) + resource_names.append(cid) batch_body = {"resourceNames": resource_names} @@ -1077,7 +943,6 @@ async def batch_delete_contacts( ) response = f"Batch deleted {len(contact_ids)} contacts for {user_google_email}." - logger.info( f"Batch deleted {len(contact_ids)} contacts for {user_google_email}" ) @@ -1095,241 +960,150 @@ async def batch_delete_contacts( @server.tool() @require_google_service("people", "contacts") -@handle_http_errors("create_contact_group", service_type="people") -async def create_contact_group( +@handle_http_errors("manage_contact_group", service_type="people") +async def manage_contact_group( service: Resource, user_google_email: str, - name: str, -) -> str: - """ - Create a new contact group (label). - - Args: - user_google_email (str): The user's Google email address. Required. - name (str): The name of the new contact group. - - Returns: - str: Confirmation with the new group details. - """ - logger.info( - f"[create_contact_group] Invoked. Email: '{user_google_email}', Name: '{name}'" - ) - - try: - body = {"contactGroup": {"name": name}} - - result = await asyncio.to_thread( - service.contactGroups().create(body=body).execute - ) - - resource_name = result.get("resourceName", "") - group_id = resource_name.replace("contactGroups/", "") - created_name = result.get("name", name) - - response = f"Contact Group Created for {user_google_email}:\n\n" - response += f"Name: {created_name}\n" - response += f"ID: {group_id}\n" - response += f"Type: {result.get('groupType', 'USER_CONTACT_GROUP')}\n" - - logger.info(f"Created contact group '{name}' for {user_google_email}") - return response - - except HttpError as error: - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) - - -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("update_contact_group", service_type="people") -async def update_contact_group( - service: Resource, - user_google_email: str, - group_id: str, - name: str, -) -> str: - """ - Update a contact group's name. - - Args: - user_google_email (str): The user's Google email address. Required. - group_id (str): The contact group ID to update. - name (str): The new name for the contact group. - - Returns: - str: Confirmation with updated group details. - """ - # Normalize resource name - if not group_id.startswith("contactGroups/"): - resource_name = f"contactGroups/{group_id}" - else: - resource_name = group_id - - logger.info( - f"[update_contact_group] Invoked. Email: '{user_google_email}', Group: {resource_name}" - ) - - try: - body = {"contactGroup": {"name": name}} - - result = await asyncio.to_thread( - service.contactGroups() - .update(resourceName=resource_name, body=body) - .execute - ) - - updated_name = result.get("name", name) - - response = f"Contact Group Updated for {user_google_email}:\n\n" - response += f"Name: {updated_name}\n" - response += f"ID: {group_id}\n" - - logger.info(f"Updated contact group {resource_name} for {user_google_email}") - return response - - except HttpError as error: - if error.resp.status == 404: - message = f"Contact group not found: {group_id}" - logger.warning(message) - raise Exception(message) - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) - - -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("delete_contact_group", service_type="people") -async def delete_contact_group( - service: Resource, - user_google_email: str, - group_id: str, + action: str, + group_id: Optional[str] = None, + name: Optional[str] = None, delete_contacts: bool = False, -) -> str: - """ - Delete a contact group. - - Args: - user_google_email (str): The user's Google email address. Required. - group_id (str): The contact group ID to delete. - delete_contacts (bool): If True, also delete contacts in the group (default: False). - - Returns: - str: Confirmation message. - """ - # Normalize resource name - if not group_id.startswith("contactGroups/"): - resource_name = f"contactGroups/{group_id}" - else: - resource_name = group_id - - logger.info( - f"[delete_contact_group] Invoked. Email: '{user_google_email}', Group: {resource_name}" - ) - - try: - await asyncio.to_thread( - service.contactGroups() - .delete(resourceName=resource_name, deleteContacts=delete_contacts) - .execute - ) - - response = f"Contact group {group_id} has been deleted for {user_google_email}." - if delete_contacts: - response += " Contacts in the group were also deleted." - else: - response += " Contacts in the group were preserved." - - logger.info(f"Deleted contact group {resource_name} for {user_google_email}") - return response - - except HttpError as error: - if error.resp.status == 404: - message = f"Contact group not found: {group_id}" - logger.warning(message) - raise Exception(message) - message = f"API error: {error}. You might need to re-authenticate. LLM: Try 'start_google_auth' with the user's email ({user_google_email}) and service_name='Google Contacts'." - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) - - -@server.tool() -@require_google_service("people", "contacts") -@handle_http_errors("modify_contact_group_members", service_type="people") -async def modify_contact_group_members( - service: Resource, - user_google_email: str, - group_id: str, add_contact_ids: Optional[List[str]] = None, remove_contact_ids: Optional[List[str]] = None, ) -> str: """ - Add or remove contacts from a contact group. + Create, update, delete a contact group, or modify its members. Consolidated tool + replacing create_contact_group, update_contact_group, delete_contact_group, and + modify_contact_group_members. Args: user_google_email (str): The user's Google email address. Required. - group_id (str): The contact group ID. - add_contact_ids (Optional[List[str]]): Contact IDs to add to the group. - remove_contact_ids (Optional[List[str]]): Contact IDs to remove from the group. + action (str): The action to perform: "create", "update", "delete", or "modify_members". + group_id (Optional[str]): The contact group ID. Required for "update", "delete", + and "modify_members" actions. + name (Optional[str]): The group name. Required for "create" and "update" actions. + delete_contacts (bool): If True and action is "delete", also delete contacts in + the group (default: False). + add_contact_ids (Optional[List[str]]): Contact IDs to add (for "modify_members"). + remove_contact_ids (Optional[List[str]]): Contact IDs to remove (for "modify_members"). Returns: - str: Confirmation with results. + str: Result of the action performed. """ - # Normalize resource name - if not group_id.startswith("contactGroups/"): - resource_name = f"contactGroups/{group_id}" - else: - resource_name = group_id + action = action.lower().strip() + if action not in ("create", "update", "delete", "modify_members"): + raise Exception( + f"Invalid action '{action}'. Must be 'create', 'update', 'delete', or 'modify_members'." + ) logger.info( - f"[modify_contact_group_members] Invoked. Email: '{user_google_email}', Group: {resource_name}" + f"[manage_contact_group] Invoked. Action: '{action}', Email: '{user_google_email}'" ) try: + if action == "create": + if not name: + raise Exception("name is required for 'create' action.") + + body = {"contactGroup": {"name": name}} + + result = await asyncio.to_thread( + service.contactGroups().create(body=body).execute + ) + + resource_name = result.get("resourceName", "") + created_group_id = resource_name.replace("contactGroups/", "") + created_name = result.get("name", name) + + response = f"Contact Group Created for {user_google_email}:\n\n" + response += f"Name: {created_name}\n" + response += f"ID: {created_group_id}\n" + response += f"Type: {result.get('groupType', 'USER_CONTACT_GROUP')}\n" + + logger.info(f"Created contact group '{name}' for {user_google_email}") + return response + + # All other actions require group_id + if not group_id: + raise Exception(f"group_id is required for '{action}' action.") + + # Normalize resource name + if not group_id.startswith("contactGroups/"): + resource_name = f"contactGroups/{group_id}" + else: + resource_name = group_id + + if action == "update": + if not name: + raise Exception("name is required for 'update' action.") + + body = {"contactGroup": {"name": name}} + + result = await asyncio.to_thread( + service.contactGroups() + .update(resourceName=resource_name, body=body) + .execute + ) + + updated_name = result.get("name", name) + + response = f"Contact Group Updated for {user_google_email}:\n\n" + response += f"Name: {updated_name}\n" + response += f"ID: {group_id}\n" + + logger.info( + f"Updated contact group {resource_name} for {user_google_email}" + ) + return response + + if action == "delete": + await asyncio.to_thread( + service.contactGroups() + .delete(resourceName=resource_name, deleteContacts=delete_contacts) + .execute + ) + + response = f"Contact group {group_id} has been deleted for {user_google_email}." + if delete_contacts: + response += " Contacts in the group were also deleted." + else: + response += " Contacts in the group were preserved." + + logger.info( + f"Deleted contact group {resource_name} for {user_google_email}" + ) + return response + + # action == "modify_members" if not add_contact_ids and not remove_contact_ids: raise Exception( "At least one of add_contact_ids or remove_contact_ids must be provided." ) - body: Dict[str, Any] = {} + modify_body: Dict[str, Any] = {} if add_contact_ids: - # Normalize resource names add_names = [] for contact_id in add_contact_ids: if not contact_id.startswith("people/"): add_names.append(f"people/{contact_id}") else: add_names.append(contact_id) - body["resourceNamesToAdd"] = add_names + modify_body["resourceNamesToAdd"] = add_names if remove_contact_ids: - # Normalize resource names remove_names = [] for contact_id in remove_contact_ids: if not contact_id.startswith("people/"): remove_names.append(f"people/{contact_id}") else: remove_names.append(contact_id) - body["resourceNamesToRemove"] = remove_names + modify_body["resourceNamesToRemove"] = remove_names result = await asyncio.to_thread( service.contactGroups() .members() - .modify(resourceName=resource_name, body=body) + .modify(resourceName=resource_name, body=modify_body) .execute ) @@ -1366,3 +1140,5 @@ async def modify_contact_group_members( message = f"Unexpected error: {e}." logger.exception(message) raise Exception(message) + + diff --git a/gdrive/drive_tools.py b/gdrive/drive_tools.py index 8f34576..16f4b4c 100644 --- a/gdrive/drive_tools.py +++ b/gdrive/drive_tools.py @@ -1774,367 +1774,372 @@ async def get_drive_shareable_link( @server.tool() -@handle_http_errors("share_drive_file", is_read_only=False, service_type="drive") +@handle_http_errors("manage_drive_access", is_read_only=False, service_type="drive") @require_google_service("drive", "drive_file") -async def share_drive_file( +async def manage_drive_access( service, user_google_email: str, file_id: str, + action: str, share_with: Optional[str] = None, - role: str = "reader", + role: Optional[str] = None, share_type: str = "user", + permission_id: Optional[str] = None, + recipients: Optional[List[Dict[str, Any]]] = None, send_notification: bool = True, email_message: Optional[str] = None, expiration_time: Optional[str] = None, allow_file_discovery: Optional[bool] = None, + new_owner_email: Optional[str] = None, + move_to_new_owners_root: bool = False, ) -> str: """ - Shares a Google Drive file or folder with a user, group, domain, or anyone with the link. + Consolidated tool for managing Google Drive file and folder access permissions. - When sharing a folder, all files inside inherit the permission. + Supports granting, batch-granting, updating, revoking permissions, and + transferring file ownership -- all through a single entry point. Args: user_google_email (str): The user's Google email address. Required. - file_id (str): The ID of the file or folder to share. Required. - share_with (Optional[str]): Email address (for user/group), domain name (for domain), or omit for 'anyone'. - role (str): Permission role - 'reader', 'commenter', or 'writer'. Defaults to 'reader'. - share_type (str): Type of sharing - 'user', 'group', 'domain', or 'anyone'. Defaults to 'user'. - send_notification (bool): Whether to send a notification email. Defaults to True. - email_message (Optional[str]): Custom message for the notification email. - expiration_time (Optional[str]): Expiration time in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). Permission auto-revokes after this time. - allow_file_discovery (Optional[bool]): For 'domain' or 'anyone' shares - whether the file can be found via search. Defaults to None (API default). - - Returns: - str: Confirmation with permission details and shareable link. - """ - logger.info( - f"[share_drive_file] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Share with: '{share_with}', Role: '{role}', Type: '{share_type}'" - ) - - validate_share_role(role) - validate_share_type(share_type) - - if share_type in ("user", "group") and not share_with: - raise ValueError(f"share_with is required for share_type '{share_type}'") - if share_type == "domain" and not share_with: - raise ValueError("share_with (domain name) is required for share_type 'domain'") - - resolved_file_id, file_metadata = await resolve_drive_item( - service, file_id, extra_fields="name, webViewLink" - ) - file_id = resolved_file_id - - permission_body = { - "type": share_type, - "role": role, - } - - if share_type in ("user", "group"): - permission_body["emailAddress"] = share_with - elif share_type == "domain": - permission_body["domain"] = share_with - - if expiration_time: - validate_expiration_time(expiration_time) - permission_body["expirationTime"] = expiration_time - - if share_type in ("domain", "anyone") and allow_file_discovery is not None: - permission_body["allowFileDiscovery"] = allow_file_discovery - - create_params = { - "fileId": file_id, - "body": permission_body, - "supportsAllDrives": True, - "fields": "id, type, role, emailAddress, domain, expirationTime", - } - - if share_type in ("user", "group"): - create_params["sendNotificationEmail"] = send_notification - if email_message: - create_params["emailMessage"] = email_message - - created_permission = await asyncio.to_thread( - service.permissions().create(**create_params).execute - ) - - output_parts = [ - f"Successfully shared '{file_metadata.get('name', 'Unknown')}'", - "", - "Permission created:", - f" - {format_permission_info(created_permission)}", - "", - f"View link: {file_metadata.get('webViewLink', 'N/A')}", - ] - - return "\n".join(output_parts) - - -@server.tool() -@handle_http_errors("batch_share_drive_file", is_read_only=False, service_type="drive") -@require_google_service("drive", "drive_file") -async def batch_share_drive_file( - service, - user_google_email: str, - file_id: str, - recipients: List[Dict[str, Any]], - send_notification: bool = True, - email_message: Optional[str] = None, -) -> str: - """ - Shares a Google Drive file or folder with multiple users or groups in a single operation. - - Each recipient can have a different role and optional expiration time. - - Note: Each recipient is processed sequentially. For very large recipient lists, - consider splitting into multiple calls. - - Args: - user_google_email (str): The user's Google email address. Required. - file_id (str): The ID of the file or folder to share. Required. - recipients (List[Dict]): List of recipient objects. Each should have: - - email (str): Recipient email address. Required for 'user' or 'group' share_type. - - role (str): Permission role - 'reader', 'commenter', or 'writer'. Defaults to 'reader'. - - share_type (str, optional): 'user', 'group', or 'domain'. Defaults to 'user'. - - expiration_time (str, optional): Expiration in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). - For domain shares, use 'domain' field instead of 'email': - - domain (str): Domain name. Required when share_type is 'domain'. + file_id (str): The ID of the file or folder. Required. + action (str): The access management action to perform. Required. One of: + - "grant": Share with a single user, group, domain, or anyone. + - "grant_batch": Share with multiple recipients in one call. + - "update": Modify an existing permission (role or expiration). + - "revoke": Remove an existing permission. + - "transfer_owner": Transfer file ownership to another user. + share_with (Optional[str]): Email address (user/group), domain name (domain), + or omit for 'anyone'. Used by "grant". + role (Optional[str]): Permission role -- 'reader', 'commenter', or 'writer'. + Used by "grant" (defaults to 'reader') and "update". + share_type (str): Type of sharing -- 'user', 'group', 'domain', or 'anyone'. + Used by "grant". Defaults to 'user'. + permission_id (Optional[str]): The permission ID to modify or remove. + Required for "update" and "revoke" actions. + recipients (Optional[List[Dict[str, Any]]]): List of recipient objects for + "grant_batch". Each should have: email (str), role (str, optional), + share_type (str, optional), expiration_time (str, optional). For domain + shares use 'domain' field instead of 'email'. send_notification (bool): Whether to send notification emails. Defaults to True. - email_message (Optional[str]): Custom message for notification emails. + Used by "grant" and "grant_batch". + email_message (Optional[str]): Custom notification email message. + Used by "grant" and "grant_batch". + expiration_time (Optional[str]): Expiration in RFC 3339 format + (e.g., "2025-01-15T00:00:00Z"). Used by "grant" and "update". + allow_file_discovery (Optional[bool]): For 'domain'/'anyone' shares, whether + the file appears in search. Used by "grant". + new_owner_email (Optional[str]): Email of the new owner. + Required for "transfer_owner". + move_to_new_owners_root (bool): Move file to the new owner's My Drive root. + Defaults to False. Used by "transfer_owner". Returns: - str: Summary of created permissions with success/failure for each recipient. + str: Confirmation with details of the permission change applied. """ + valid_actions = ("grant", "grant_batch", "update", "revoke", "transfer_owner") + if action not in valid_actions: + raise ValueError( + f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}" + ) + logger.info( - f"[batch_share_drive_file] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Recipients: {len(recipients)}" + f"[manage_drive_access] Invoked. Email: '{user_google_email}', " + f"File ID: '{file_id}', Action: '{action}'" ) - resolved_file_id, file_metadata = await resolve_drive_item( - service, file_id, extra_fields="name, webViewLink" - ) - file_id = resolved_file_id + # --- grant: share with a single recipient --- + if action == "grant": + effective_role = role or "reader" + validate_share_role(effective_role) + validate_share_type(share_type) - if not recipients: - raise ValueError("recipients list cannot be empty") + if share_type in ("user", "group") and not share_with: + raise ValueError( + f"share_with is required for share_type '{share_type}'" + ) + if share_type == "domain" and not share_with: + raise ValueError( + "share_with (domain name) is required for share_type 'domain'" + ) - results = [] - success_count = 0 - failure_count = 0 + resolved_file_id, file_metadata = await resolve_drive_item( + service, file_id, extra_fields="name, webViewLink" + ) + file_id = resolved_file_id - for recipient in recipients: - share_type = recipient.get("share_type", "user") - - if share_type == "domain": - domain = recipient.get("domain") - if not domain: - results.append(" - Skipped: missing domain for domain share") - failure_count += 1 - continue - identifier = domain - else: - email = recipient.get("email") - if not email: - results.append(" - Skipped: missing email address") - failure_count += 1 - continue - identifier = email - - role = recipient.get("role", "reader") - try: - validate_share_role(role) - except ValueError as e: - results.append(f" - {identifier}: Failed - {e}") - failure_count += 1 - continue - - try: - validate_share_type(share_type) - except ValueError as e: - results.append(f" - {identifier}: Failed - {e}") - failure_count += 1 - continue - - permission_body = { + permission_body: Dict[str, Any] = { "type": share_type, - "role": role, + "role": effective_role, } + if share_type in ("user", "group"): + permission_body["emailAddress"] = share_with + elif share_type == "domain": + permission_body["domain"] = share_with - if share_type == "domain": - permission_body["domain"] = identifier - else: - permission_body["emailAddress"] = identifier + if expiration_time: + validate_expiration_time(expiration_time) + permission_body["expirationTime"] = expiration_time - if recipient.get("expiration_time"): - try: - validate_expiration_time(recipient["expiration_time"]) - permission_body["expirationTime"] = recipient["expiration_time"] - except ValueError as e: - results.append(f" - {identifier}: Failed - {e}") - failure_count += 1 - continue + if share_type in ("domain", "anyone") and allow_file_discovery is not None: + permission_body["allowFileDiscovery"] = allow_file_discovery - create_params = { + create_params: Dict[str, Any] = { "fileId": file_id, "body": permission_body, "supportsAllDrives": True, "fields": "id, type, role, emailAddress, domain, expirationTime", } - if share_type in ("user", "group"): create_params["sendNotificationEmail"] = send_notification if email_message: create_params["emailMessage"] = email_message - try: - created_permission = await asyncio.to_thread( - service.permissions().create(**create_params).execute - ) - results.append(f" - {format_permission_info(created_permission)}") - success_count += 1 - except HttpError as e: - results.append(f" - {identifier}: Failed - {str(e)}") - failure_count += 1 + created_permission = await asyncio.to_thread( + service.permissions().create(**create_params).execute + ) - output_parts = [ - f"Batch share results for '{file_metadata.get('name', 'Unknown')}'", - "", - f"Summary: {success_count} succeeded, {failure_count} failed", - "", - "Results:", - ] - output_parts.extend(results) - output_parts.extend( - [ + return "\n".join([ + f"Successfully shared '{file_metadata.get('name', 'Unknown')}'", + "", + "Permission created:", + f" - {format_permission_info(created_permission)}", "", f"View link: {file_metadata.get('webViewLink', 'N/A')}", + ]) + + # --- grant_batch: share with multiple recipients --- + if action == "grant_batch": + if not recipients: + raise ValueError("recipients list is required for 'grant_batch' action") + + resolved_file_id, file_metadata = await resolve_drive_item( + service, file_id, extra_fields="name, webViewLink" + ) + file_id = resolved_file_id + + results: List[str] = [] + success_count = 0 + failure_count = 0 + + for recipient in recipients: + r_share_type = recipient.get("share_type", "user") + + if r_share_type == "domain": + domain = recipient.get("domain") + if not domain: + results.append(" - Skipped: missing domain for domain share") + failure_count += 1 + continue + identifier = domain + else: + r_email = recipient.get("email") + if not r_email: + results.append(" - Skipped: missing email address") + failure_count += 1 + continue + identifier = r_email + + r_role = recipient.get("role", "reader") + try: + validate_share_role(r_role) + except ValueError as e: + results.append(f" - {identifier}: Failed - {e}") + failure_count += 1 + continue + + try: + validate_share_type(r_share_type) + except ValueError as e: + results.append(f" - {identifier}: Failed - {e}") + failure_count += 1 + continue + + r_perm_body: Dict[str, Any] = { + "type": r_share_type, + "role": r_role, + } + if r_share_type == "domain": + r_perm_body["domain"] = identifier + else: + r_perm_body["emailAddress"] = identifier + + if recipient.get("expiration_time"): + try: + validate_expiration_time(recipient["expiration_time"]) + r_perm_body["expirationTime"] = recipient["expiration_time"] + except ValueError as e: + results.append(f" - {identifier}: Failed - {e}") + failure_count += 1 + continue + + r_create_params: Dict[str, Any] = { + "fileId": file_id, + "body": r_perm_body, + "supportsAllDrives": True, + "fields": "id, type, role, emailAddress, domain, expirationTime", + } + if r_share_type in ("user", "group"): + r_create_params["sendNotificationEmail"] = send_notification + if email_message: + r_create_params["emailMessage"] = email_message + + try: + created_perm = await asyncio.to_thread( + service.permissions().create(**r_create_params).execute + ) + results.append(f" - {format_permission_info(created_perm)}") + success_count += 1 + except HttpError as e: + results.append(f" - {identifier}: Failed - {str(e)}") + failure_count += 1 + + output_parts = [ + f"Batch share results for '{file_metadata.get('name', 'Unknown')}'", + "", + f"Summary: {success_count} succeeded, {failure_count} failed", + "", + "Results:", ] - ) + output_parts.extend(results) + output_parts.extend([ + "", + f"View link: {file_metadata.get('webViewLink', 'N/A')}", + ]) + return "\n".join(output_parts) - return "\n".join(output_parts) + # --- update: modify an existing permission --- + if action == "update": + if not permission_id: + raise ValueError("permission_id is required for 'update' action") + if not role and not expiration_time: + raise ValueError( + "Must provide at least one of: role, expiration_time for 'update' action" + ) + if role: + validate_share_role(role) + if expiration_time: + validate_expiration_time(expiration_time) -@server.tool() -@handle_http_errors("update_drive_permission", is_read_only=False, service_type="drive") -@require_google_service("drive", "drive_file") -async def update_drive_permission( - service, - user_google_email: str, - file_id: str, - permission_id: str, - role: Optional[str] = None, - expiration_time: Optional[str] = None, -) -> str: - """ - Updates an existing permission on a Google Drive file or folder. + resolved_file_id, file_metadata = await resolve_drive_item( + service, file_id, extra_fields="name" + ) + file_id = resolved_file_id - Args: - user_google_email (str): The user's Google email address. Required. - file_id (str): The ID of the file or folder. Required. - permission_id (str): The ID of the permission to update (from get_drive_file_permissions). Required. - role (Optional[str]): New role - 'reader', 'commenter', or 'writer'. If not provided, role unchanged. - expiration_time (Optional[str]): Expiration time in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). Set or update when permission expires. + effective_role = role + if not effective_role: + current_permission = await asyncio.to_thread( + service.permissions() + .get( + fileId=file_id, + permissionId=permission_id, + supportsAllDrives=True, + fields="role", + ) + .execute + ) + effective_role = current_permission.get("role") - Returns: - str: Confirmation with updated permission details. - """ - logger.info( - f"[update_drive_permission] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Permission ID: '{permission_id}', Role: '{role}'" - ) + update_body: Dict[str, Any] = {"role": effective_role} + if expiration_time: + update_body["expirationTime"] = expiration_time - if not role and not expiration_time: - raise ValueError("Must provide at least one of: role, expiration_time") - - if role: - validate_share_role(role) - if expiration_time: - validate_expiration_time(expiration_time) - - resolved_file_id, file_metadata = await resolve_drive_item( - service, file_id, extra_fields="name" - ) - file_id = resolved_file_id - - # Google API requires role in update body, so fetch current if not provided - if not role: - current_permission = await asyncio.to_thread( + updated_permission = await asyncio.to_thread( service.permissions() - .get( + .update( fileId=file_id, permissionId=permission_id, + body=update_body, supportsAllDrives=True, - fields="role", + fields="id, type, role, emailAddress, domain, expirationTime", ) .execute ) - role = current_permission.get("role") - update_body = {"role": role} - if expiration_time: - update_body["expirationTime"] = expiration_time + return "\n".join([ + f"Successfully updated permission on '{file_metadata.get('name', 'Unknown')}'", + "", + "Updated permission:", + f" - {format_permission_info(updated_permission)}", + ]) - updated_permission = await asyncio.to_thread( + # --- revoke: remove an existing permission --- + if action == "revoke": + if not permission_id: + raise ValueError("permission_id is required for 'revoke' action") + + resolved_file_id, file_metadata = await resolve_drive_item( + service, file_id, extra_fields="name" + ) + file_id = resolved_file_id + + await asyncio.to_thread( + service.permissions() + .delete( + fileId=file_id, + permissionId=permission_id, + supportsAllDrives=True, + ) + .execute + ) + + return "\n".join([ + f"Successfully removed permission from '{file_metadata.get('name', 'Unknown')}'", + "", + f"Permission ID '{permission_id}' has been revoked.", + ]) + + # --- transfer_owner: transfer file ownership --- + # action == "transfer_owner" + if not new_owner_email: + raise ValueError("new_owner_email is required for 'transfer_owner' action") + + resolved_file_id, file_metadata = await resolve_drive_item( + service, file_id, extra_fields="name, owners" + ) + file_id = resolved_file_id + + current_owners = file_metadata.get("owners", []) + current_owner_emails = [o.get("emailAddress", "") for o in current_owners] + + transfer_body: Dict[str, Any] = { + "type": "user", + "role": "owner", + "emailAddress": new_owner_email, + } + + await asyncio.to_thread( service.permissions() - .update( + .create( fileId=file_id, - permissionId=permission_id, - body=update_body, + body=transfer_body, + transferOwnership=True, + moveToNewOwnersRoot=move_to_new_owners_root, supportsAllDrives=True, - fields="id, type, role, emailAddress, domain, expirationTime", + fields="id, type, role, emailAddress", ) .execute ) output_parts = [ - f"Successfully updated permission on '{file_metadata.get('name', 'Unknown')}'", + f"Successfully transferred ownership of '{file_metadata.get('name', 'Unknown')}'", "", - "Updated permission:", - f" - {format_permission_info(updated_permission)}", + f"New owner: {new_owner_email}", + f"Previous owner(s): {', '.join(current_owner_emails) or 'Unknown'}", ] + if move_to_new_owners_root: + output_parts.append(f"File moved to {new_owner_email}'s My Drive root.") + output_parts.extend(["", "Note: Previous owner now has editor access."]) return "\n".join(output_parts) -@server.tool() -@handle_http_errors("remove_drive_permission", is_read_only=False, service_type="drive") -@require_google_service("drive", "drive_file") -async def remove_drive_permission( - service, - user_google_email: str, - file_id: str, - permission_id: str, -) -> str: - """ - Removes a permission from a Google Drive file or folder, revoking access. - Args: - user_google_email (str): The user's Google email address. Required. - file_id (str): The ID of the file or folder. Required. - permission_id (str): The ID of the permission to remove (from get_drive_file_permissions). Required. - Returns: - str: Confirmation of the removed permission. - """ - logger.info( - f"[remove_drive_permission] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Permission ID: '{permission_id}'" - ) - resolved_file_id, file_metadata = await resolve_drive_item( - service, file_id, extra_fields="name" - ) - file_id = resolved_file_id - await asyncio.to_thread( - service.permissions() - .delete(fileId=file_id, permissionId=permission_id, supportsAllDrives=True) - .execute - ) - output_parts = [ - f"Successfully removed permission from '{file_metadata.get('name', 'Unknown')}'", - "", - f"Permission ID '{permission_id}' has been revoked.", - ] - return "\n".join(output_parts) @server.tool() @@ -2209,77 +2214,6 @@ async def copy_drive_file( return "\n".join(output_parts) -@server.tool() -@handle_http_errors( - "transfer_drive_ownership", is_read_only=False, service_type="drive" -) -@require_google_service("drive", "drive_file") -async def transfer_drive_ownership( - service, - user_google_email: str, - file_id: str, - new_owner_email: str, - move_to_new_owners_root: bool = False, -) -> str: - """ - Transfers ownership of a Google Drive file or folder to another user. - - This is an irreversible operation. The current owner will become an editor. - Only works within the same Google Workspace domain or for personal accounts. - - Args: - user_google_email (str): The user's Google email address. Required. - file_id (str): The ID of the file or folder to transfer. Required. - new_owner_email (str): Email address of the new owner. Required. - move_to_new_owners_root (bool): If True, moves the file to the new owner's My Drive root. Defaults to False. - - Returns: - str: Confirmation of the ownership transfer. - """ - logger.info( - f"[transfer_drive_ownership] Invoked. Email: '{user_google_email}', File ID: '{file_id}', New owner: '{new_owner_email}'" - ) - - resolved_file_id, file_metadata = await resolve_drive_item( - service, file_id, extra_fields="name, owners" - ) - file_id = resolved_file_id - - current_owners = file_metadata.get("owners", []) - current_owner_emails = [o.get("emailAddress", "") for o in current_owners] - - permission_body = { - "type": "user", - "role": "owner", - "emailAddress": new_owner_email, - } - - await asyncio.to_thread( - service.permissions() - .create( - fileId=file_id, - body=permission_body, - transferOwnership=True, - moveToNewOwnersRoot=move_to_new_owners_root, - supportsAllDrives=True, - fields="id, type, role, emailAddress", - ) - .execute - ) - - output_parts = [ - f"Successfully transferred ownership of '{file_metadata.get('name', 'Unknown')}'", - "", - f"New owner: {new_owner_email}", - f"Previous owner(s): {', '.join(current_owner_emails) or 'Unknown'}", - ] - - if move_to_new_owners_root: - output_parts.append(f"File moved to {new_owner_email}'s My Drive root.") - - output_parts.extend(["", "Note: Previous owner now has editor access."]) - - return "\n".join(output_parts) @server.tool() diff --git a/gmail/gmail_tools.py b/gmail/gmail_tools.py index a56034a..d054bcb 100644 --- a/gmail/gmail_tools.py +++ b/gmail/gmail_tools.py @@ -1822,88 +1822,68 @@ async def list_gmail_filters(service, user_google_email: str) -> str: @server.tool() -@handle_http_errors("create_gmail_filter", service_type="gmail") +@handle_http_errors("manage_gmail_filter", service_type="gmail") @require_google_service("gmail", "gmail_settings_basic") -async def create_gmail_filter( +async def manage_gmail_filter( service, user_google_email: str, - criteria: Annotated[ - Dict[str, Any], - Field( - description="Filter criteria object as defined in the Gmail API.", - ), - ], - action: Annotated[ - Dict[str, Any], - Field( - description="Filter action object as defined in the Gmail API.", - ), - ], + action: str, + criteria: Optional[Dict[str, Any]] = None, + filter_action: Optional[Dict[str, Any]] = None, + filter_id: Optional[str] = None, ) -> str: """ - Creates a Gmail filter using the users.settings.filters API. + Manages Gmail filters. Supports creating and deleting filters. Args: user_google_email (str): The user's Google email address. Required. - criteria (Dict[str, Any]): Criteria for matching messages. - action (Dict[str, Any]): Actions to apply to matched messages. + action (str): Action to perform - "create" or "delete". + criteria (Optional[Dict[str, Any]]): Filter criteria object (required for create). + filter_action (Optional[Dict[str, Any]]): Filter action object (required for create). Named 'filter_action' to avoid shadowing the 'action' parameter. + filter_id (Optional[str]): ID of the filter to delete (required for delete). Returns: - str: Confirmation message with the created filter ID. + str: Confirmation message with filter details. """ - logger.info("[create_gmail_filter] Invoked") - - filter_body = {"criteria": criteria, "action": action} - - created_filter = await asyncio.to_thread( - service.users() - .settings() - .filters() - .create(userId="me", body=filter_body) - .execute - ) - - filter_id = created_filter.get("id", "(unknown)") - return f"Filter created successfully!\nFilter ID: {filter_id}" + action_lower = action.lower().strip() + if action_lower == "create": + if not criteria or not filter_action: + raise ValueError("criteria and filter_action are required for create action") + logger.info("[manage_gmail_filter] Creating filter") + filter_body = {"criteria": criteria, "action": filter_action} + created_filter = await asyncio.to_thread( + service.users() + .settings() + .filters() + .create(userId="me", body=filter_body) + .execute + ) + fid = created_filter.get("id", "(unknown)") + return f"Filter created successfully!\nFilter ID: {fid}" + elif action_lower == "delete": + if not filter_id: + raise ValueError("filter_id is required for delete action") + logger.info(f"[manage_gmail_filter] Deleting filter {filter_id}") + filter_details = await asyncio.to_thread( + service.users().settings().filters().get(userId="me", id=filter_id).execute + ) + await asyncio.to_thread( + service.users().settings().filters().delete(userId="me", id=filter_id).execute + ) + criteria_info = filter_details.get("criteria", {}) + action_info = filter_details.get("action", {}) + return ( + "Filter deleted successfully!\n" + f"Filter ID: {filter_id}\n" + f"Criteria: {criteria_info or '(none)'}\n" + f"Action: {action_info or '(none)'}" + ) + else: + raise ValueError(f"Invalid action '{action_lower}'. Must be 'create' or 'delete'.") -@server.tool() -@handle_http_errors("delete_gmail_filter", service_type="gmail") -@require_google_service("gmail", "gmail_settings_basic") -async def delete_gmail_filter( - service, - user_google_email: str, - filter_id: str = Field(..., description="ID of the filter to delete."), -) -> str: - """ - Deletes a Gmail filter by ID. - Args: - user_google_email (str): The user's Google email address. Required. - filter_id (str): The ID of the filter to delete. - Returns: - str: Confirmation message for the deletion. - """ - logger.info(f"[delete_gmail_filter] Invoked. Filter ID: '{filter_id}'") - - filter_details = await asyncio.to_thread( - service.users().settings().filters().get(userId="me", id=filter_id).execute - ) - - await asyncio.to_thread( - service.users().settings().filters().delete(userId="me", id=filter_id).execute - ) - - criteria = filter_details.get("criteria", {}) - action = filter_details.get("action", {}) - - return ( - "Filter deleted successfully!\n" - f"Filter ID: {filter_id}\n" - f"Criteria: {criteria or '(none)'}\n" - f"Action: {action or '(none)'}" - ) @server.tool() diff --git a/gsearch/search_tools.py b/gsearch/search_tools.py index 6afb0fd..607f8c9 100644 --- a/gsearch/search_tools.py +++ b/gsearch/search_tools.py @@ -33,6 +33,7 @@ async def search_custom( file_type: Optional[str] = None, language: Optional[str] = None, country: Optional[str] = None, + sites: Optional[List[str]] = None, ) -> str: """ Performs a search using Google Custom Search JSON API. @@ -50,6 +51,7 @@ async def search_custom( file_type (Optional[str]): Filter by file type (e.g., "pdf", "doc"). language (Optional[str]): Language code for results (e.g., "lang_en"). country (Optional[str]): Country code for results (e.g., "countryUS"). + sites (Optional[List[str]]): List of sites/domains to restrict search to (e.g., ["example.com", "docs.example.com"]). When provided, results are limited to these sites. Returns: str: Formatted search results including title, link, and snippet for each result. @@ -71,6 +73,12 @@ async def search_custom( f"[search_custom] Invoked. Email: '{user_google_email}', Query: '{q}', CX: '{cx}'" ) + # Apply site restriction if sites are provided + if sites: + site_query = " OR ".join([f"site:{site}" for site in sites]) + q = f"{q} ({site_query})" + logger.info(f"[search_custom] Applied site restriction: {sites}") + # Build the request parameters params = { "key": api_key, @@ -226,48 +234,3 @@ async def get_search_engine_info(service, user_google_email: str) -> str: return confirmation_message -@server.tool() -@handle_http_errors( - "search_custom_siterestrict", is_read_only=True, service_type="customsearch" -) -@require_google_service("customsearch", "customsearch") -async def search_custom_siterestrict( - service, - user_google_email: str, - q: str, - sites: List[str], - num: int = 10, - start: int = 1, - safe: Literal["active", "moderate", "off"] = "off", -) -> str: - """ - Performs a search restricted to specific sites using Google Custom Search. - - Args: - user_google_email (str): The user's Google email address. Required. - q (str): The search query. Required. - sites (List[str]): List of sites/domains to search within. - num (int): Number of results to return (1-10). Defaults to 10. - start (int): The index of the first result to return (1-based). Defaults to 1. - safe (Literal["active", "moderate", "off"]): Safe search level. Defaults to "off". - - Returns: - str: Formatted search results from the specified sites. - """ - logger.info( - f"[search_custom_siterestrict] Invoked. Email: '{user_google_email}', Query: '{q}', Sites: {sites}" - ) - - # Build site restriction query - site_query = " OR ".join([f"site:{site}" for site in sites]) - full_query = f"{q} ({site_query})" - - # Use the main search function with the modified query - return await search_custom( - service=service, - user_google_email=user_google_email, - q=full_query, - num=num, - start=start, - safe=safe, - ) diff --git a/gsheets/sheets_tools.py b/gsheets/sheets_tools.py index ef9501d..0633cd2 100644 --- a/gsheets/sheets_tools.py +++ b/gsheets/sheets_tools.py @@ -729,405 +729,420 @@ async def format_sheet_range( @server.tool() -@handle_http_errors("add_conditional_formatting", service_type="sheets") +@handle_http_errors("manage_conditional_formatting", service_type="sheets") @require_google_service("sheets", "sheets_write") -async def add_conditional_formatting( +async def manage_conditional_formatting( service, user_google_email: str, spreadsheet_id: str, - range_name: str, - condition_type: str, - condition_values: Optional[Union[str, List[Union[str, int, float]]]] = None, - background_color: Optional[str] = None, - text_color: Optional[str] = None, - rule_index: Optional[int] = None, - gradient_points: Optional[Union[str, List[dict]]] = None, -) -> str: - """ - Adds a conditional formatting rule to a range. - - Args: - user_google_email (str): The user's Google email address. Required. - spreadsheet_id (str): The ID of the spreadsheet. Required. - range_name (str): A1-style range (optionally with sheet name). Required. - condition_type (str): Sheets condition type (e.g., NUMBER_GREATER, TEXT_CONTAINS, DATE_BEFORE, CUSTOM_FORMULA). - condition_values (Optional[Union[str, List[Union[str, int, float]]]]): Values for the condition; accepts a list or a JSON string representing a list. Depends on condition_type. - background_color (Optional[str]): Hex background color to apply when condition matches. - text_color (Optional[str]): Hex text color to apply when condition matches. - rule_index (Optional[int]): Optional position to insert the rule (0-based) within the sheet's rules. - gradient_points (Optional[Union[str, List[dict]]]): List (or JSON list) of gradient points for a color scale. If provided, a gradient rule is created and boolean parameters are ignored. - - Returns: - str: Confirmation of the added rule. - """ - logger.info( - "[add_conditional_formatting] Invoked. Email: '%s', Spreadsheet: %s, Range: %s, Type: %s, Values: %s", - user_google_email, - spreadsheet_id, - range_name, - condition_type, - condition_values, - ) - - if rule_index is not None and (not isinstance(rule_index, int) or rule_index < 0): - raise UserInputError("rule_index must be a non-negative integer when provided.") - - condition_values_list = _parse_condition_values(condition_values) - gradient_points_list = _parse_gradient_points(gradient_points) - - sheets, sheet_titles = await _fetch_sheets_with_rules(service, spreadsheet_id) - grid_range = _parse_a1_range(range_name, sheets) - - target_sheet = None - for sheet in sheets: - if sheet.get("properties", {}).get("sheetId") == grid_range.get("sheetId"): - target_sheet = sheet - break - if target_sheet is None: - raise UserInputError( - "Target sheet not found while adding conditional formatting." - ) - - current_rules = target_sheet.get("conditionalFormats", []) or [] - - insert_at = rule_index if rule_index is not None else len(current_rules) - if insert_at > len(current_rules): - raise UserInputError( - f"rule_index {insert_at} is out of range for sheet '{target_sheet.get('properties', {}).get('title', 'Unknown')}' " - f"(current count: {len(current_rules)})." - ) - - if gradient_points_list: - new_rule = _build_gradient_rule([grid_range], gradient_points_list) - rule_desc = "gradient" - values_desc = "" - applied_parts = [f"gradient points {len(gradient_points_list)}"] - else: - rule, cond_type_normalized = _build_boolean_rule( - [grid_range], - condition_type, - condition_values_list, - background_color, - text_color, - ) - new_rule = rule - rule_desc = cond_type_normalized - values_desc = "" - if condition_values_list: - values_desc = f" with values {condition_values_list}" - applied_parts = [] - if background_color: - applied_parts.append(f"background {background_color}") - if text_color: - applied_parts.append(f"text {text_color}") - - new_rules_state = copy.deepcopy(current_rules) - new_rules_state.insert(insert_at, new_rule) - - add_rule_request = {"rule": new_rule} - if rule_index is not None: - add_rule_request["index"] = rule_index - - request_body = {"requests": [{"addConditionalFormatRule": add_rule_request}]} - - await asyncio.to_thread( - service.spreadsheets() - .batchUpdate(spreadsheetId=spreadsheet_id, body=request_body) - .execute - ) - - format_desc = ", ".join(applied_parts) if applied_parts else "format applied" - - sheet_title = target_sheet.get("properties", {}).get("title", "Unknown") - state_text = _format_conditional_rules_section( - sheet_title, new_rules_state, sheet_titles, indent="" - ) - - return "\n".join( - [ - f"Added conditional format on '{range_name}' in spreadsheet {spreadsheet_id} " - f"for {user_google_email}: {rule_desc}{values_desc}; format: {format_desc}.", - state_text, - ] - ) - - -@server.tool() -@handle_http_errors("update_conditional_formatting", service_type="sheets") -@require_google_service("sheets", "sheets_write") -async def update_conditional_formatting( - service, - user_google_email: str, - spreadsheet_id: str, - rule_index: int, + action: str, range_name: Optional[str] = None, condition_type: Optional[str] = None, condition_values: Optional[Union[str, List[Union[str, int, float]]]] = None, background_color: Optional[str] = None, text_color: Optional[str] = None, - sheet_name: Optional[str] = None, + rule_index: Optional[int] = None, gradient_points: Optional[Union[str, List[dict]]] = None, + sheet_name: Optional[str] = None, ) -> str: """ - Updates an existing conditional formatting rule by index on a sheet. + Manages conditional formatting rules on a Google Sheet. Supports adding, + updating, and deleting conditional formatting rules via a single tool. Args: user_google_email (str): The user's Google email address. Required. spreadsheet_id (str): The ID of the spreadsheet. Required. - range_name (Optional[str]): A1-style range to apply the updated rule (optionally with sheet name). If omitted, existing ranges are preserved. - rule_index (int): Index of the rule to update (0-based). - condition_type (Optional[str]): Sheets condition type. If omitted, the existing rule's type is preserved. - condition_values (Optional[Union[str, List[Union[str, int, float]]]]): Values for the condition. - background_color (Optional[str]): Hex background color when condition matches. - text_color (Optional[str]): Hex text color when condition matches. - sheet_name (Optional[str]): Sheet name to locate the rule when range_name is omitted. Defaults to first sheet. - gradient_points (Optional[Union[str, List[dict]]]): If provided, updates the rule to a gradient color scale using these points. + action (str): The operation to perform. Must be one of "add", "update", + or "delete". + range_name (Optional[str]): A1-style range (optionally with sheet name). + Required for "add". Optional for "update" (preserves existing ranges + if omitted). Not used for "delete". + condition_type (Optional[str]): Sheets condition type (e.g., NUMBER_GREATER, + TEXT_CONTAINS, DATE_BEFORE, CUSTOM_FORMULA). Required for "add". + Optional for "update" (preserves existing type if omitted). + condition_values (Optional[Union[str, List[Union[str, int, float]]]]): Values + for the condition; accepts a list or a JSON string representing a list. + Depends on condition_type. Used by "add" and "update". + background_color (Optional[str]): Hex background color to apply when + condition matches. Used by "add" and "update". + text_color (Optional[str]): Hex text color to apply when condition matches. + Used by "add" and "update". + rule_index (Optional[int]): 0-based index of the rule. For "add", optionally + specifies insertion position. Required for "update" and "delete". + gradient_points (Optional[Union[str, List[dict]]]): List (or JSON list) of + gradient points for a color scale. If provided, a gradient rule is created + and boolean parameters are ignored. Used by "add" and "update". + sheet_name (Optional[str]): Sheet name to locate the rule when range_name is + omitted. Defaults to the first sheet. Used by "update" and "delete". Returns: - str: Confirmation of the updated rule and the current rule state. + str: Confirmation of the operation and the current rule state. """ + allowed_actions = {"add", "update", "delete"} + action_normalized = action.strip().lower() + if action_normalized not in allowed_actions: + raise UserInputError( + f"action must be one of {sorted(allowed_actions)}, got '{action}'." + ) + logger.info( - "[update_conditional_formatting] Invoked. Email: '%s', Spreadsheet: %s, Range: %s, Rule Index: %s", + "[manage_conditional_formatting] Invoked. Action: '%s', Email: '%s', Spreadsheet: %s", + action_normalized, user_google_email, spreadsheet_id, - range_name, - rule_index, ) - if not isinstance(rule_index, int) or rule_index < 0: - raise UserInputError("rule_index must be a non-negative integer.") + if action_normalized == "add": + if not range_name: + raise UserInputError("range_name is required for action 'add'.") + if not condition_type and not gradient_points: + raise UserInputError( + "condition_type (or gradient_points) is required for action 'add'." + ) - condition_values_list = _parse_condition_values(condition_values) - gradient_points_list = _parse_gradient_points(gradient_points) + if rule_index is not None and ( + not isinstance(rule_index, int) or rule_index < 0 + ): + raise UserInputError( + "rule_index must be a non-negative integer when provided." + ) - sheets, sheet_titles = await _fetch_sheets_with_rules(service, spreadsheet_id) + condition_values_list = _parse_condition_values(condition_values) + gradient_points_list = _parse_gradient_points(gradient_points) - target_sheet = None - grid_range = None - if range_name: + sheets, sheet_titles = await _fetch_sheets_with_rules( + service, spreadsheet_id + ) grid_range = _parse_a1_range(range_name, sheets) + + target_sheet = None for sheet in sheets: - if sheet.get("properties", {}).get("sheetId") == grid_range.get("sheetId"): + if ( + sheet.get("properties", {}).get("sheetId") + == grid_range.get("sheetId") + ): target_sheet = sheet break - else: + if target_sheet is None: + raise UserInputError( + "Target sheet not found while adding conditional formatting." + ) + + current_rules = target_sheet.get("conditionalFormats", []) or [] + + insert_at = rule_index if rule_index is not None else len(current_rules) + if insert_at > len(current_rules): + raise UserInputError( + f"rule_index {insert_at} is out of range for sheet " + f"'{target_sheet.get('properties', {}).get('title', 'Unknown')}' " + f"(current count: {len(current_rules)})." + ) + + if gradient_points_list: + new_rule = _build_gradient_rule([grid_range], gradient_points_list) + rule_desc = "gradient" + values_desc = "" + applied_parts = [f"gradient points {len(gradient_points_list)}"] + else: + rule, cond_type_normalized = _build_boolean_rule( + [grid_range], + condition_type, + condition_values_list, + background_color, + text_color, + ) + new_rule = rule + rule_desc = cond_type_normalized + values_desc = "" + if condition_values_list: + values_desc = f" with values {condition_values_list}" + applied_parts = [] + if background_color: + applied_parts.append(f"background {background_color}") + if text_color: + applied_parts.append(f"text {text_color}") + + new_rules_state = copy.deepcopy(current_rules) + new_rules_state.insert(insert_at, new_rule) + + add_rule_request = {"rule": new_rule} + if rule_index is not None: + add_rule_request["index"] = rule_index + + request_body = { + "requests": [{"addConditionalFormatRule": add_rule_request}] + } + + await asyncio.to_thread( + service.spreadsheets() + .batchUpdate(spreadsheetId=spreadsheet_id, body=request_body) + .execute + ) + + format_desc = ( + ", ".join(applied_parts) if applied_parts else "format applied" + ) + + sheet_title = target_sheet.get("properties", {}).get("title", "Unknown") + state_text = _format_conditional_rules_section( + sheet_title, new_rules_state, sheet_titles, indent="" + ) + + return "\n".join( + [ + f"Added conditional format on '{range_name}' in spreadsheet " + f"{spreadsheet_id} for {user_google_email}: " + f"{rule_desc}{values_desc}; format: {format_desc}.", + state_text, + ] + ) + + elif action_normalized == "update": + if rule_index is None: + raise UserInputError("rule_index is required for action 'update'.") + if not isinstance(rule_index, int) or rule_index < 0: + raise UserInputError("rule_index must be a non-negative integer.") + + condition_values_list = _parse_condition_values(condition_values) + gradient_points_list = _parse_gradient_points(gradient_points) + + sheets, sheet_titles = await _fetch_sheets_with_rules( + service, spreadsheet_id + ) + + target_sheet = None + grid_range = None + if range_name: + grid_range = _parse_a1_range(range_name, sheets) + for sheet in sheets: + if ( + sheet.get("properties", {}).get("sheetId") + == grid_range.get("sheetId") + ): + target_sheet = sheet + break + else: + target_sheet = _select_sheet(sheets, sheet_name) + + if target_sheet is None: + raise UserInputError( + "Target sheet not found while updating conditional formatting." + ) + + sheet_props = target_sheet.get("properties", {}) + sheet_id = sheet_props.get("sheetId") + sheet_title = sheet_props.get("title", f"Sheet {sheet_id}") + + rules = target_sheet.get("conditionalFormats", []) or [] + if rule_index >= len(rules): + raise UserInputError( + f"rule_index {rule_index} is out of range for sheet " + f"'{sheet_title}' (current count: {len(rules)})." + ) + + existing_rule = rules[rule_index] + ranges_to_use = existing_rule.get("ranges", []) + if range_name: + ranges_to_use = [grid_range] + if not ranges_to_use: + ranges_to_use = [{"sheetId": sheet_id}] + + new_rule = None + rule_desc = "" + values_desc = "" + format_desc = "" + + if gradient_points_list is not None: + new_rule = _build_gradient_rule(ranges_to_use, gradient_points_list) + rule_desc = "gradient" + format_desc = f"gradient points {len(gradient_points_list)}" + elif "gradientRule" in existing_rule: + if any( + [ + background_color, + text_color, + condition_type, + condition_values_list, + ] + ): + raise UserInputError( + "Existing rule is a gradient rule. Provide gradient_points " + "to update it, or omit formatting/condition parameters to " + "keep it unchanged." + ) + new_rule = { + "ranges": ranges_to_use, + "gradientRule": existing_rule.get("gradientRule", {}), + } + rule_desc = "gradient" + format_desc = "gradient (unchanged)" + else: + existing_boolean = existing_rule.get("booleanRule", {}) + existing_condition = existing_boolean.get("condition", {}) + existing_format = copy.deepcopy( + existing_boolean.get("format", {}) + ) + + cond_type = ( + condition_type or existing_condition.get("type", "") + ).upper() + if not cond_type: + raise UserInputError( + "condition_type is required for boolean rules." + ) + if cond_type not in CONDITION_TYPES: + raise UserInputError( + f"condition_type must be one of {sorted(CONDITION_TYPES)}." + ) + + if condition_values_list is not None: + cond_values = [ + {"userEnteredValue": str(val)} + for val in condition_values_list + ] + else: + cond_values = existing_condition.get("values") + + new_format = ( + copy.deepcopy(existing_format) if existing_format else {} + ) + if background_color is not None: + bg_color_parsed = _parse_hex_color(background_color) + if bg_color_parsed: + new_format["backgroundColor"] = bg_color_parsed + elif "backgroundColor" in new_format: + del new_format["backgroundColor"] + if text_color is not None: + text_color_parsed = _parse_hex_color(text_color) + text_format = copy.deepcopy( + new_format.get("textFormat", {}) + ) + if text_color_parsed: + text_format["foregroundColor"] = text_color_parsed + elif "foregroundColor" in text_format: + del text_format["foregroundColor"] + if text_format: + new_format["textFormat"] = text_format + elif "textFormat" in new_format: + del new_format["textFormat"] + + if not new_format: + raise UserInputError( + "At least one format option must remain on the rule." + ) + + new_rule = { + "ranges": ranges_to_use, + "booleanRule": { + "condition": {"type": cond_type}, + "format": new_format, + }, + } + if cond_values: + new_rule["booleanRule"]["condition"]["values"] = cond_values + + rule_desc = cond_type + if condition_values_list: + values_desc = f" with values {condition_values_list}" + format_parts = [] + if "backgroundColor" in new_format: + format_parts.append("background updated") + if "textFormat" in new_format and new_format["textFormat"].get( + "foregroundColor" + ): + format_parts.append("text color updated") + format_desc = ( + ", ".join(format_parts) if format_parts else "format preserved" + ) + + new_rules_state = copy.deepcopy(rules) + new_rules_state[rule_index] = new_rule + + request_body = { + "requests": [ + { + "updateConditionalFormatRule": { + "index": rule_index, + "sheetId": sheet_id, + "rule": new_rule, + } + } + ] + } + + await asyncio.to_thread( + service.spreadsheets() + .batchUpdate(spreadsheetId=spreadsheet_id, body=request_body) + .execute + ) + + state_text = _format_conditional_rules_section( + sheet_title, new_rules_state, sheet_titles, indent="" + ) + + return "\n".join( + [ + f"Updated conditional format at index {rule_index} on sheet " + f"'{sheet_title}' in spreadsheet {spreadsheet_id} " + f"for {user_google_email}: " + f"{rule_desc}{values_desc}; format: {format_desc}.", + state_text, + ] + ) + + else: # action_normalized == "delete" + if rule_index is None: + raise UserInputError("rule_index is required for action 'delete'.") + if not isinstance(rule_index, int) or rule_index < 0: + raise UserInputError("rule_index must be a non-negative integer.") + + sheets, sheet_titles = await _fetch_sheets_with_rules( + service, spreadsheet_id + ) target_sheet = _select_sheet(sheets, sheet_name) - if target_sheet is None: - raise UserInputError( - "Target sheet not found while updating conditional formatting." - ) - - sheet_props = target_sheet.get("properties", {}) - sheet_id = sheet_props.get("sheetId") - sheet_title = sheet_props.get("title", f"Sheet {sheet_id}") - - rules = target_sheet.get("conditionalFormats", []) or [] - if rule_index >= len(rules): - raise UserInputError( - f"rule_index {rule_index} is out of range for sheet '{sheet_title}' (current count: {len(rules)})." - ) - - existing_rule = rules[rule_index] - ranges_to_use = existing_rule.get("ranges", []) - if range_name: - ranges_to_use = [grid_range] - if not ranges_to_use: - ranges_to_use = [{"sheetId": sheet_id}] - - new_rule = None - rule_desc = "" - values_desc = "" - format_desc = "" - - if gradient_points_list is not None: - new_rule = _build_gradient_rule(ranges_to_use, gradient_points_list) - rule_desc = "gradient" - format_desc = f"gradient points {len(gradient_points_list)}" - elif "gradientRule" in existing_rule: - if any([background_color, text_color, condition_type, condition_values_list]): + sheet_props = target_sheet.get("properties", {}) + sheet_id = sheet_props.get("sheetId") + target_sheet_name = sheet_props.get("title", f"Sheet {sheet_id}") + rules = target_sheet.get("conditionalFormats", []) or [] + if rule_index >= len(rules): raise UserInputError( - "Existing rule is a gradient rule. Provide gradient_points to update it, or omit formatting/condition parameters to keep it unchanged." - ) - new_rule = { - "ranges": ranges_to_use, - "gradientRule": existing_rule.get("gradientRule", {}), - } - rule_desc = "gradient" - format_desc = "gradient (unchanged)" - else: - existing_boolean = existing_rule.get("booleanRule", {}) - existing_condition = existing_boolean.get("condition", {}) - existing_format = copy.deepcopy(existing_boolean.get("format", {})) - - cond_type = (condition_type or existing_condition.get("type", "")).upper() - if not cond_type: - raise UserInputError("condition_type is required for boolean rules.") - if cond_type not in CONDITION_TYPES: - raise UserInputError( - f"condition_type must be one of {sorted(CONDITION_TYPES)}." + f"rule_index {rule_index} is out of range for sheet " + f"'{target_sheet_name}' (current count: {len(rules)})." ) - if condition_values_list is not None: - cond_values = [ - {"userEnteredValue": str(val)} for val in condition_values_list + new_rules_state = copy.deepcopy(rules) + del new_rules_state[rule_index] + + request_body = { + "requests": [ + { + "deleteConditionalFormatRule": { + "index": rule_index, + "sheetId": sheet_id, + } + } ] - else: - cond_values = existing_condition.get("values") - - new_format = copy.deepcopy(existing_format) if existing_format else {} - if background_color is not None: - bg_color_parsed = _parse_hex_color(background_color) - if bg_color_parsed: - new_format["backgroundColor"] = bg_color_parsed - elif "backgroundColor" in new_format: - del new_format["backgroundColor"] - if text_color is not None: - text_color_parsed = _parse_hex_color(text_color) - text_format = copy.deepcopy(new_format.get("textFormat", {})) - if text_color_parsed: - text_format["foregroundColor"] = text_color_parsed - elif "foregroundColor" in text_format: - del text_format["foregroundColor"] - if text_format: - new_format["textFormat"] = text_format - elif "textFormat" in new_format: - del new_format["textFormat"] - - if not new_format: - raise UserInputError("At least one format option must remain on the rule.") - - new_rule = { - "ranges": ranges_to_use, - "booleanRule": { - "condition": {"type": cond_type}, - "format": new_format, - }, } - if cond_values: - new_rule["booleanRule"]["condition"]["values"] = cond_values - rule_desc = cond_type - if condition_values_list: - values_desc = f" with values {condition_values_list}" - format_parts = [] - if "backgroundColor" in new_format: - format_parts.append("background updated") - if "textFormat" in new_format and new_format["textFormat"].get( - "foregroundColor" - ): - format_parts.append("text color updated") - format_desc = ", ".join(format_parts) if format_parts else "format preserved" - - new_rules_state = copy.deepcopy(rules) - new_rules_state[rule_index] = new_rule - - request_body = { - "requests": [ - { - "updateConditionalFormatRule": { - "index": rule_index, - "sheetId": sheet_id, - "rule": new_rule, - } - } - ] - } - - await asyncio.to_thread( - service.spreadsheets() - .batchUpdate(spreadsheetId=spreadsheet_id, body=request_body) - .execute - ) - - state_text = _format_conditional_rules_section( - sheet_title, new_rules_state, sheet_titles, indent="" - ) - - return "\n".join( - [ - f"Updated conditional format at index {rule_index} on sheet '{sheet_title}' in spreadsheet {spreadsheet_id} " - f"for {user_google_email}: {rule_desc}{values_desc}; format: {format_desc}.", - state_text, - ] - ) - - -@server.tool() -@handle_http_errors("delete_conditional_formatting", service_type="sheets") -@require_google_service("sheets", "sheets_write") -async def delete_conditional_formatting( - service, - user_google_email: str, - spreadsheet_id: str, - rule_index: int, - sheet_name: Optional[str] = None, -) -> str: - """ - Deletes an existing conditional formatting rule by index on a sheet. - - Args: - user_google_email (str): The user's Google email address. Required. - spreadsheet_id (str): The ID of the spreadsheet. Required. - rule_index (int): Index of the rule to delete (0-based). - sheet_name (Optional[str]): Name of the sheet that contains the rule. Defaults to the first sheet if not provided. - - Returns: - str: Confirmation of the deletion and the current rule state. - """ - logger.info( - "[delete_conditional_formatting] Invoked. Email: '%s', Spreadsheet: %s, Sheet: %s, Rule Index: %s", - user_google_email, - spreadsheet_id, - sheet_name, - rule_index, - ) - - if not isinstance(rule_index, int) or rule_index < 0: - raise UserInputError("rule_index must be a non-negative integer.") - - sheets, sheet_titles = await _fetch_sheets_with_rules(service, spreadsheet_id) - target_sheet = _select_sheet(sheets, sheet_name) - - sheet_props = target_sheet.get("properties", {}) - sheet_id = sheet_props.get("sheetId") - target_sheet_name = sheet_props.get("title", f"Sheet {sheet_id}") - rules = target_sheet.get("conditionalFormats", []) or [] - if rule_index >= len(rules): - raise UserInputError( - f"rule_index {rule_index} is out of range for sheet '{target_sheet_name}' (current count: {len(rules)})." + await asyncio.to_thread( + service.spreadsheets() + .batchUpdate(spreadsheetId=spreadsheet_id, body=request_body) + .execute ) - new_rules_state = copy.deepcopy(rules) - del new_rules_state[rule_index] + state_text = _format_conditional_rules_section( + target_sheet_name, new_rules_state, sheet_titles, indent="" + ) - request_body = { - "requests": [ - { - "deleteConditionalFormatRule": { - "index": rule_index, - "sheetId": sheet_id, - } - } - ] - } - - await asyncio.to_thread( - service.spreadsheets() - .batchUpdate(spreadsheetId=spreadsheet_id, body=request_body) - .execute - ) - - state_text = _format_conditional_rules_section( - target_sheet_name, new_rules_state, sheet_titles, indent="" - ) - - return "\n".join( - [ - f"Deleted conditional format at index {rule_index} on sheet '{target_sheet_name}' in spreadsheet {spreadsheet_id} for {user_google_email}.", - state_text, - ] - ) + return "\n".join( + [ + f"Deleted conditional format at index {rule_index} on sheet " + f"'{target_sheet_name}' in spreadsheet {spreadsheet_id} " + f"for {user_google_email}.", + state_text, + ] + ) @server.tool() diff --git a/gtasks/tasks_tools.py b/gtasks/tasks_tools.py index 1e5b7ab..ea353ce 100644 --- a/gtasks/tasks_tools.py +++ b/gtasks/tasks_tools.py @@ -193,22 +193,13 @@ async def get_task_list( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("create_task_list", service_type="tasks") # type: ignore -async def create_task_list( +# --- Task list _impl functions --- + + +async def _create_task_list_impl( service: Resource, user_google_email: str, title: str ) -> str: - """ - Create a new task list. - - Args: - user_google_email (str): The user's Google email address. Required. - title (str): The title of the new task list. - - Returns: - str: Confirmation message with the new task list ID and details. - """ + """Implementation for creating a new task list.""" logger.info( f"[create_task_list] Invoked. Email: '{user_google_email}', Title: '{title}'" ) @@ -239,23 +230,10 @@ async def create_task_list( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("update_task_list", service_type="tasks") # type: ignore -async def update_task_list( +async def _update_task_list_impl( service: Resource, user_google_email: str, task_list_id: str, title: str ) -> str: - """ - Update an existing task list. - - Args: - user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the task list to update. - title (str): The new title for the task list. - - Returns: - str: Confirmation message with updated task list details. - """ + """Implementation for updating an existing task list.""" logger.info( f"[update_task_list] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, New Title: '{title}'" ) @@ -287,22 +265,10 @@ async def update_task_list( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("delete_task_list", service_type="tasks") # type: ignore -async def delete_task_list( +async def _delete_task_list_impl( service: Resource, user_google_email: str, task_list_id: str ) -> str: - """ - Delete a task list. Note: This will also delete all tasks in the list. - - Args: - user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the task list to delete. - - Returns: - str: Confirmation message. - """ + """Implementation for deleting a task list.""" logger.info( f"[delete_task_list] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}" ) @@ -327,6 +293,105 @@ async def delete_task_list( raise Exception(message) +async def _clear_completed_tasks_impl( + service: Resource, user_google_email: str, task_list_id: str +) -> str: + """Implementation for clearing completed tasks from a task list.""" + logger.info( + f"[clear_completed_tasks] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}" + ) + + try: + await asyncio.to_thread(service.tasks().clear(tasklist=task_list_id).execute) + + response = f"All completed tasks have been cleared from task list {task_list_id} for {user_google_email}. The tasks are now hidden and won't appear in default task list views." + + logger.info( + f"Cleared completed tasks from list {task_list_id} for {user_google_email}" + ) + return response + + except HttpError as error: + message = _format_reauth_message(error, user_google_email) + logger.error(message, exc_info=True) + raise Exception(message) + except Exception as e: + message = f"Unexpected error: {e}." + logger.exception(message) + raise Exception(message) + + +# --- Consolidated manage_task_list tool --- + + +@server.tool() # type: ignore +@require_google_service("tasks", "tasks") # type: ignore +@handle_http_errors("manage_task_list", service_type="tasks") # type: ignore +async def manage_task_list( + service: Resource, + user_google_email: str, + action: str, + task_list_id: Optional[str] = None, + title: Optional[str] = None, +) -> str: + """ + Manage task lists: create, update, delete, or clear completed tasks. + + Args: + user_google_email (str): The user's Google email address. Required. + action (str): The action to perform. Must be one of: "create", "update", "delete", "clear_completed". + task_list_id (Optional[str]): The ID of the task list. Required for "update", "delete", and "clear_completed" actions. + title (Optional[str]): The title for the task list. Required for "create" and "update" actions. + + Returns: + str: Result of the requested action. + """ + logger.info( + f"[manage_task_list] Invoked. Email: '{user_google_email}', Action: '{action}'" + ) + + valid_actions = ("create", "update", "delete", "clear_completed") + if action not in valid_actions: + raise ValueError( + f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}" + ) + + if action == "create": + if not title: + raise ValueError("'title' is required for the 'create' action.") + return await _create_task_list_impl(service, user_google_email, title) + + if action == "update": + if not task_list_id: + raise ValueError("'task_list_id' is required for the 'update' action.") + if not title: + raise ValueError("'title' is required for the 'update' action.") + return await _update_task_list_impl( + service, user_google_email, task_list_id, title + ) + + if action == "delete": + if not task_list_id: + raise ValueError("'task_list_id' is required for the 'delete' action.") + return await _delete_task_list_impl(service, user_google_email, task_list_id) + + # action == "clear_completed" + if not task_list_id: + raise ValueError( + "'task_list_id' is required for the 'clear_completed' action." + ) + return await _clear_completed_tasks_impl(service, user_google_email, task_list_id) + + +# --- Legacy task list tools (wrappers around _impl functions) --- + + + + + + + + @server.tool() # type: ignore @require_google_service("tasks", "tasks_read") # type: ignore @handle_http_errors("list_tasks", service_type="tasks") # type: ignore @@ -633,10 +698,10 @@ async def get_task( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("create_task", service_type="tasks") # type: ignore -async def create_task( +# --- Task _impl functions --- + + +async def _create_task_impl( service: Resource, user_google_email: str, task_list_id: str, @@ -646,21 +711,7 @@ async def create_task( parent: Optional[str] = None, previous: Optional[str] = None, ) -> str: - """ - Create a new task in a task list. - - Args: - user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the task list to create the task in. - title (str): The title of the task. - notes (Optional[str]): Notes/description for the task. - due (Optional[str]): Due date in RFC 3339 format (e.g., "2024-12-31T23:59:59Z"). - parent (Optional[str]): Parent task ID (for subtasks). - previous (Optional[str]): Previous sibling task ID (for positioning). - - Returns: - str: Confirmation message with the new task ID and details. - """ + """Implementation for creating a new task in a task list.""" logger.info( f"[create_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Title: '{title}'" ) @@ -708,10 +759,7 @@ async def create_task( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("update_task", service_type="tasks") # type: ignore -async def update_task( +async def _update_task_impl( service: Resource, user_google_email: str, task_list_id: str, @@ -721,21 +769,7 @@ async def update_task( status: Optional[str] = None, due: Optional[str] = None, ) -> str: - """ - Update an existing task. - - Args: - user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the task list containing the task. - task_id (str): The ID of the task to update. - title (Optional[str]): New title for the task. - notes (Optional[str]): New notes/description for the task. - status (Optional[str]): New status ("needsAction" or "completed"). - due (Optional[str]): New due date in RFC 3339 format. - - Returns: - str: Confirmation message with updated task details. - """ + """Implementation for updating an existing task.""" logger.info( f"[update_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Task ID: {task_id}" ) @@ -796,23 +830,10 @@ async def update_task( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("delete_task", service_type="tasks") # type: ignore -async def delete_task( +async def _delete_task_impl( service: Resource, user_google_email: str, task_list_id: str, task_id: str ) -> str: - """ - Delete a task from a task list. - - Args: - user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the task list containing the task. - task_id (str): The ID of the task to delete. - - Returns: - str: Confirmation message. - """ + """Implementation for deleting a task from a task list.""" logger.info( f"[delete_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Task ID: {task_id}" ) @@ -837,10 +858,7 @@ async def delete_task( raise Exception(message) -@server.tool() # type: ignore -@require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("move_task", service_type="tasks") # type: ignore -async def move_task( +async def _move_task_impl( service: Resource, user_google_email: str, task_list_id: str, @@ -849,20 +867,7 @@ async def move_task( previous: Optional[str] = None, destination_task_list: Optional[str] = None, ) -> str: - """ - Move a task to a different position or parent within the same list, or to a different list. - - Args: - user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the current task list containing the task. - task_id (str): The ID of the task to move. - parent (Optional[str]): New parent task ID (for making it a subtask). - previous (Optional[str]): Previous sibling task ID (for positioning). - destination_task_list (Optional[str]): Destination task list ID (for moving between lists). - - Returns: - str: Confirmation message with updated task details. - """ + """Implementation for moving a task to a different position, parent, or list.""" logger.info( f"[move_task] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}, Task ID: {task_id}" ) @@ -913,41 +918,112 @@ async def move_task( raise Exception(message) +# --- Consolidated manage_task tool --- + + @server.tool() # type: ignore @require_google_service("tasks", "tasks") # type: ignore -@handle_http_errors("clear_completed_tasks", service_type="tasks") # type: ignore -async def clear_completed_tasks( - service: Resource, user_google_email: str, task_list_id: str +@handle_http_errors("manage_task", service_type="tasks") # type: ignore +async def manage_task( + service: Resource, + user_google_email: str, + action: str, + task_list_id: str, + task_id: Optional[str] = None, + title: Optional[str] = None, + notes: Optional[str] = None, + status: Optional[str] = None, + due: Optional[str] = None, + parent: Optional[str] = None, + previous: Optional[str] = None, + destination_task_list: Optional[str] = None, ) -> str: """ - Clear all completed tasks from a task list. The tasks will be marked as hidden. + Manage tasks: create, update, delete, or move tasks within task lists. Args: user_google_email (str): The user's Google email address. Required. - task_list_id (str): The ID of the task list to clear completed tasks from. + action (str): The action to perform. Must be one of: "create", "update", "delete", "move". + task_list_id (str): The ID of the task list. Required for all actions. + task_id (Optional[str]): The ID of the task. Required for "update", "delete", and "move" actions. + title (Optional[str]): The title of the task. Required for "create", optional for "update". + notes (Optional[str]): Notes/description for the task. Used by "create" and "update" actions. + status (Optional[str]): Task status ("needsAction" or "completed"). Used by "update" action. + due (Optional[str]): Due date in RFC 3339 format (e.g., "2024-12-31T23:59:59Z"). Used by "create" and "update" actions. + parent (Optional[str]): Parent task ID (for subtasks). Used by "create" and "move" actions. + previous (Optional[str]): Previous sibling task ID (for positioning). Used by "create" and "move" actions. + destination_task_list (Optional[str]): Destination task list ID (for moving between lists). Used by "move" action. Returns: - str: Confirmation message. + str: Result of the requested action. """ logger.info( - f"[clear_completed_tasks] Invoked. Email: '{user_google_email}', Task List ID: {task_list_id}" + f"[manage_task] Invoked. Email: '{user_google_email}', Action: '{action}', Task List ID: {task_list_id}" ) - try: - await asyncio.to_thread(service.tasks().clear(tasklist=task_list_id).execute) - - response = f"All completed tasks have been cleared from task list {task_list_id} for {user_google_email}. The tasks are now hidden and won't appear in default task list views." - - logger.info( - f"Cleared completed tasks from list {task_list_id} for {user_google_email}" + valid_actions = ("create", "update", "delete", "move") + if action not in valid_actions: + raise ValueError( + f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}" ) - return response - except HttpError as error: - message = _format_reauth_message(error, user_google_email) - logger.error(message, exc_info=True) - raise Exception(message) - except Exception as e: - message = f"Unexpected error: {e}." - logger.exception(message) - raise Exception(message) + if action == "create": + if not title: + raise ValueError("'title' is required for the 'create' action.") + return await _create_task_impl( + service, + user_google_email, + task_list_id, + title, + notes=notes, + due=due, + parent=parent, + previous=previous, + ) + + if action == "update": + if not task_id: + raise ValueError("'task_id' is required for the 'update' action.") + return await _update_task_impl( + service, + user_google_email, + task_list_id, + task_id, + title=title, + notes=notes, + status=status, + due=due, + ) + + if action == "delete": + if not task_id: + raise ValueError("'task_id' is required for the 'delete' action.") + return await _delete_task_impl( + service, user_google_email, task_list_id, task_id + ) + + # action == "move" + if not task_id: + raise ValueError("'task_id' is required for the 'move' action.") + return await _move_task_impl( + service, + user_google_email, + task_list_id, + task_id, + parent=parent, + previous=previous, + destination_task_list=destination_task_list, + ) + + +# --- Legacy task tools (wrappers around _impl functions) --- + + + + + + + + + +