refactor tools to consolidate all modify actions

This commit is contained in:
Taylor Wilsdon
2026-03-01 12:36:09 -05:00
parent 7cd46d72c7
commit 6753531e9d
12 changed files with 1712 additions and 1924 deletions

View File

@@ -1774,367 +1774,372 @@ async def get_drive_shareable_link(
@server.tool()
@handle_http_errors("share_drive_file", is_read_only=False, service_type="drive")
@handle_http_errors("manage_drive_access", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def share_drive_file(
async def manage_drive_access(
service,
user_google_email: str,
file_id: str,
action: str,
share_with: Optional[str] = None,
role: str = "reader",
role: Optional[str] = None,
share_type: str = "user",
permission_id: Optional[str] = None,
recipients: Optional[List[Dict[str, Any]]] = None,
send_notification: bool = True,
email_message: Optional[str] = None,
expiration_time: Optional[str] = None,
allow_file_discovery: Optional[bool] = None,
new_owner_email: Optional[str] = None,
move_to_new_owners_root: bool = False,
) -> str:
"""
Shares a Google Drive file or folder with a user, group, domain, or anyone with the link.
Consolidated tool for managing Google Drive file and folder access permissions.
When sharing a folder, all files inside inherit the permission.
Supports granting, batch-granting, updating, revoking permissions, and
transferring file ownership -- all through a single entry point.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder to share. Required.
share_with (Optional[str]): Email address (for user/group), domain name (for domain), or omit for 'anyone'.
role (str): Permission role - 'reader', 'commenter', or 'writer'. Defaults to 'reader'.
share_type (str): Type of sharing - 'user', 'group', 'domain', or 'anyone'. Defaults to 'user'.
send_notification (bool): Whether to send a notification email. Defaults to True.
email_message (Optional[str]): Custom message for the notification email.
expiration_time (Optional[str]): Expiration time in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). Permission auto-revokes after this time.
allow_file_discovery (Optional[bool]): For 'domain' or 'anyone' shares - whether the file can be found via search. Defaults to None (API default).
Returns:
str: Confirmation with permission details and shareable link.
"""
logger.info(
f"[share_drive_file] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Share with: '{share_with}', Role: '{role}', Type: '{share_type}'"
)
validate_share_role(role)
validate_share_type(share_type)
if share_type in ("user", "group") and not share_with:
raise ValueError(f"share_with is required for share_type '{share_type}'")
if share_type == "domain" and not share_with:
raise ValueError("share_with (domain name) is required for share_type 'domain'")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, webViewLink"
)
file_id = resolved_file_id
permission_body = {
"type": share_type,
"role": role,
}
if share_type in ("user", "group"):
permission_body["emailAddress"] = share_with
elif share_type == "domain":
permission_body["domain"] = share_with
if expiration_time:
validate_expiration_time(expiration_time)
permission_body["expirationTime"] = expiration_time
if share_type in ("domain", "anyone") and allow_file_discovery is not None:
permission_body["allowFileDiscovery"] = allow_file_discovery
create_params = {
"fileId": file_id,
"body": permission_body,
"supportsAllDrives": True,
"fields": "id, type, role, emailAddress, domain, expirationTime",
}
if share_type in ("user", "group"):
create_params["sendNotificationEmail"] = send_notification
if email_message:
create_params["emailMessage"] = email_message
created_permission = await asyncio.to_thread(
service.permissions().create(**create_params).execute
)
output_parts = [
f"Successfully shared '{file_metadata.get('name', 'Unknown')}'",
"",
"Permission created:",
f" - {format_permission_info(created_permission)}",
"",
f"View link: {file_metadata.get('webViewLink', 'N/A')}",
]
return "\n".join(output_parts)
@server.tool()
@handle_http_errors("batch_share_drive_file", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def batch_share_drive_file(
service,
user_google_email: str,
file_id: str,
recipients: List[Dict[str, Any]],
send_notification: bool = True,
email_message: Optional[str] = None,
) -> str:
"""
Shares a Google Drive file or folder with multiple users or groups in a single operation.
Each recipient can have a different role and optional expiration time.
Note: Each recipient is processed sequentially. For very large recipient lists,
consider splitting into multiple calls.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder to share. Required.
recipients (List[Dict]): List of recipient objects. Each should have:
- email (str): Recipient email address. Required for 'user' or 'group' share_type.
- role (str): Permission role - 'reader', 'commenter', or 'writer'. Defaults to 'reader'.
- share_type (str, optional): 'user', 'group', or 'domain'. Defaults to 'user'.
- expiration_time (str, optional): Expiration in RFC 3339 format (e.g., "2025-01-15T00:00:00Z").
For domain shares, use 'domain' field instead of 'email':
- domain (str): Domain name. Required when share_type is 'domain'.
file_id (str): The ID of the file or folder. Required.
action (str): The access management action to perform. Required. One of:
- "grant": Share with a single user, group, domain, or anyone.
- "grant_batch": Share with multiple recipients in one call.
- "update": Modify an existing permission (role or expiration).
- "revoke": Remove an existing permission.
- "transfer_owner": Transfer file ownership to another user.
share_with (Optional[str]): Email address (user/group), domain name (domain),
or omit for 'anyone'. Used by "grant".
role (Optional[str]): Permission role -- 'reader', 'commenter', or 'writer'.
Used by "grant" (defaults to 'reader') and "update".
share_type (str): Type of sharing -- 'user', 'group', 'domain', or 'anyone'.
Used by "grant". Defaults to 'user'.
permission_id (Optional[str]): The permission ID to modify or remove.
Required for "update" and "revoke" actions.
recipients (Optional[List[Dict[str, Any]]]): List of recipient objects for
"grant_batch". Each should have: email (str), role (str, optional),
share_type (str, optional), expiration_time (str, optional). For domain
shares use 'domain' field instead of 'email'.
send_notification (bool): Whether to send notification emails. Defaults to True.
email_message (Optional[str]): Custom message for notification emails.
Used by "grant" and "grant_batch".
email_message (Optional[str]): Custom notification email message.
Used by "grant" and "grant_batch".
expiration_time (Optional[str]): Expiration in RFC 3339 format
(e.g., "2025-01-15T00:00:00Z"). Used by "grant" and "update".
allow_file_discovery (Optional[bool]): For 'domain'/'anyone' shares, whether
the file appears in search. Used by "grant".
new_owner_email (Optional[str]): Email of the new owner.
Required for "transfer_owner".
move_to_new_owners_root (bool): Move file to the new owner's My Drive root.
Defaults to False. Used by "transfer_owner".
Returns:
str: Summary of created permissions with success/failure for each recipient.
str: Confirmation with details of the permission change applied.
"""
valid_actions = ("grant", "grant_batch", "update", "revoke", "transfer_owner")
if action not in valid_actions:
raise ValueError(
f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}"
)
logger.info(
f"[batch_share_drive_file] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Recipients: {len(recipients)}"
f"[manage_drive_access] Invoked. Email: '{user_google_email}', "
f"File ID: '{file_id}', Action: '{action}'"
)
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, webViewLink"
)
file_id = resolved_file_id
# --- grant: share with a single recipient ---
if action == "grant":
effective_role = role or "reader"
validate_share_role(effective_role)
validate_share_type(share_type)
if not recipients:
raise ValueError("recipients list cannot be empty")
if share_type in ("user", "group") and not share_with:
raise ValueError(
f"share_with is required for share_type '{share_type}'"
)
if share_type == "domain" and not share_with:
raise ValueError(
"share_with (domain name) is required for share_type 'domain'"
)
results = []
success_count = 0
failure_count = 0
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, webViewLink"
)
file_id = resolved_file_id
for recipient in recipients:
share_type = recipient.get("share_type", "user")
if share_type == "domain":
domain = recipient.get("domain")
if not domain:
results.append(" - Skipped: missing domain for domain share")
failure_count += 1
continue
identifier = domain
else:
email = recipient.get("email")
if not email:
results.append(" - Skipped: missing email address")
failure_count += 1
continue
identifier = email
role = recipient.get("role", "reader")
try:
validate_share_role(role)
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
try:
validate_share_type(share_type)
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
permission_body = {
permission_body: Dict[str, Any] = {
"type": share_type,
"role": role,
"role": effective_role,
}
if share_type in ("user", "group"):
permission_body["emailAddress"] = share_with
elif share_type == "domain":
permission_body["domain"] = share_with
if share_type == "domain":
permission_body["domain"] = identifier
else:
permission_body["emailAddress"] = identifier
if expiration_time:
validate_expiration_time(expiration_time)
permission_body["expirationTime"] = expiration_time
if recipient.get("expiration_time"):
try:
validate_expiration_time(recipient["expiration_time"])
permission_body["expirationTime"] = recipient["expiration_time"]
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
if share_type in ("domain", "anyone") and allow_file_discovery is not None:
permission_body["allowFileDiscovery"] = allow_file_discovery
create_params = {
create_params: Dict[str, Any] = {
"fileId": file_id,
"body": permission_body,
"supportsAllDrives": True,
"fields": "id, type, role, emailAddress, domain, expirationTime",
}
if share_type in ("user", "group"):
create_params["sendNotificationEmail"] = send_notification
if email_message:
create_params["emailMessage"] = email_message
try:
created_permission = await asyncio.to_thread(
service.permissions().create(**create_params).execute
)
results.append(f" - {format_permission_info(created_permission)}")
success_count += 1
except HttpError as e:
results.append(f" - {identifier}: Failed - {str(e)}")
failure_count += 1
created_permission = await asyncio.to_thread(
service.permissions().create(**create_params).execute
)
output_parts = [
f"Batch share results for '{file_metadata.get('name', 'Unknown')}'",
"",
f"Summary: {success_count} succeeded, {failure_count} failed",
"",
"Results:",
]
output_parts.extend(results)
output_parts.extend(
[
return "\n".join([
f"Successfully shared '{file_metadata.get('name', 'Unknown')}'",
"",
"Permission created:",
f" - {format_permission_info(created_permission)}",
"",
f"View link: {file_metadata.get('webViewLink', 'N/A')}",
])
# --- grant_batch: share with multiple recipients ---
if action == "grant_batch":
if not recipients:
raise ValueError("recipients list is required for 'grant_batch' action")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, webViewLink"
)
file_id = resolved_file_id
results: List[str] = []
success_count = 0
failure_count = 0
for recipient in recipients:
r_share_type = recipient.get("share_type", "user")
if r_share_type == "domain":
domain = recipient.get("domain")
if not domain:
results.append(" - Skipped: missing domain for domain share")
failure_count += 1
continue
identifier = domain
else:
r_email = recipient.get("email")
if not r_email:
results.append(" - Skipped: missing email address")
failure_count += 1
continue
identifier = r_email
r_role = recipient.get("role", "reader")
try:
validate_share_role(r_role)
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
try:
validate_share_type(r_share_type)
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
r_perm_body: Dict[str, Any] = {
"type": r_share_type,
"role": r_role,
}
if r_share_type == "domain":
r_perm_body["domain"] = identifier
else:
r_perm_body["emailAddress"] = identifier
if recipient.get("expiration_time"):
try:
validate_expiration_time(recipient["expiration_time"])
r_perm_body["expirationTime"] = recipient["expiration_time"]
except ValueError as e:
results.append(f" - {identifier}: Failed - {e}")
failure_count += 1
continue
r_create_params: Dict[str, Any] = {
"fileId": file_id,
"body": r_perm_body,
"supportsAllDrives": True,
"fields": "id, type, role, emailAddress, domain, expirationTime",
}
if r_share_type in ("user", "group"):
r_create_params["sendNotificationEmail"] = send_notification
if email_message:
r_create_params["emailMessage"] = email_message
try:
created_perm = await asyncio.to_thread(
service.permissions().create(**r_create_params).execute
)
results.append(f" - {format_permission_info(created_perm)}")
success_count += 1
except HttpError as e:
results.append(f" - {identifier}: Failed - {str(e)}")
failure_count += 1
output_parts = [
f"Batch share results for '{file_metadata.get('name', 'Unknown')}'",
"",
f"Summary: {success_count} succeeded, {failure_count} failed",
"",
"Results:",
]
)
output_parts.extend(results)
output_parts.extend([
"",
f"View link: {file_metadata.get('webViewLink', 'N/A')}",
])
return "\n".join(output_parts)
return "\n".join(output_parts)
# --- update: modify an existing permission ---
if action == "update":
if not permission_id:
raise ValueError("permission_id is required for 'update' action")
if not role and not expiration_time:
raise ValueError(
"Must provide at least one of: role, expiration_time for 'update' action"
)
if role:
validate_share_role(role)
if expiration_time:
validate_expiration_time(expiration_time)
@server.tool()
@handle_http_errors("update_drive_permission", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def update_drive_permission(
service,
user_google_email: str,
file_id: str,
permission_id: str,
role: Optional[str] = None,
expiration_time: Optional[str] = None,
) -> str:
"""
Updates an existing permission on a Google Drive file or folder.
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name"
)
file_id = resolved_file_id
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder. Required.
permission_id (str): The ID of the permission to update (from get_drive_file_permissions). Required.
role (Optional[str]): New role - 'reader', 'commenter', or 'writer'. If not provided, role unchanged.
expiration_time (Optional[str]): Expiration time in RFC 3339 format (e.g., "2025-01-15T00:00:00Z"). Set or update when permission expires.
effective_role = role
if not effective_role:
current_permission = await asyncio.to_thread(
service.permissions()
.get(
fileId=file_id,
permissionId=permission_id,
supportsAllDrives=True,
fields="role",
)
.execute
)
effective_role = current_permission.get("role")
Returns:
str: Confirmation with updated permission details.
"""
logger.info(
f"[update_drive_permission] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Permission ID: '{permission_id}', Role: '{role}'"
)
update_body: Dict[str, Any] = {"role": effective_role}
if expiration_time:
update_body["expirationTime"] = expiration_time
if not role and not expiration_time:
raise ValueError("Must provide at least one of: role, expiration_time")
if role:
validate_share_role(role)
if expiration_time:
validate_expiration_time(expiration_time)
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name"
)
file_id = resolved_file_id
# Google API requires role in update body, so fetch current if not provided
if not role:
current_permission = await asyncio.to_thread(
updated_permission = await asyncio.to_thread(
service.permissions()
.get(
.update(
fileId=file_id,
permissionId=permission_id,
body=update_body,
supportsAllDrives=True,
fields="role",
fields="id, type, role, emailAddress, domain, expirationTime",
)
.execute
)
role = current_permission.get("role")
update_body = {"role": role}
if expiration_time:
update_body["expirationTime"] = expiration_time
return "\n".join([
f"Successfully updated permission on '{file_metadata.get('name', 'Unknown')}'",
"",
"Updated permission:",
f" - {format_permission_info(updated_permission)}",
])
updated_permission = await asyncio.to_thread(
# --- revoke: remove an existing permission ---
if action == "revoke":
if not permission_id:
raise ValueError("permission_id is required for 'revoke' action")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name"
)
file_id = resolved_file_id
await asyncio.to_thread(
service.permissions()
.delete(
fileId=file_id,
permissionId=permission_id,
supportsAllDrives=True,
)
.execute
)
return "\n".join([
f"Successfully removed permission from '{file_metadata.get('name', 'Unknown')}'",
"",
f"Permission ID '{permission_id}' has been revoked.",
])
# --- transfer_owner: transfer file ownership ---
# action == "transfer_owner"
if not new_owner_email:
raise ValueError("new_owner_email is required for 'transfer_owner' action")
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, owners"
)
file_id = resolved_file_id
current_owners = file_metadata.get("owners", [])
current_owner_emails = [o.get("emailAddress", "") for o in current_owners]
transfer_body: Dict[str, Any] = {
"type": "user",
"role": "owner",
"emailAddress": new_owner_email,
}
await asyncio.to_thread(
service.permissions()
.update(
.create(
fileId=file_id,
permissionId=permission_id,
body=update_body,
body=transfer_body,
transferOwnership=True,
moveToNewOwnersRoot=move_to_new_owners_root,
supportsAllDrives=True,
fields="id, type, role, emailAddress, domain, expirationTime",
fields="id, type, role, emailAddress",
)
.execute
)
output_parts = [
f"Successfully updated permission on '{file_metadata.get('name', 'Unknown')}'",
f"Successfully transferred ownership of '{file_metadata.get('name', 'Unknown')}'",
"",
"Updated permission:",
f" - {format_permission_info(updated_permission)}",
f"New owner: {new_owner_email}",
f"Previous owner(s): {', '.join(current_owner_emails) or 'Unknown'}",
]
if move_to_new_owners_root:
output_parts.append(f"File moved to {new_owner_email}'s My Drive root.")
output_parts.extend(["", "Note: Previous owner now has editor access."])
return "\n".join(output_parts)
@server.tool()
@handle_http_errors("remove_drive_permission", is_read_only=False, service_type="drive")
@require_google_service("drive", "drive_file")
async def remove_drive_permission(
service,
user_google_email: str,
file_id: str,
permission_id: str,
) -> str:
"""
Removes a permission from a Google Drive file or folder, revoking access.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder. Required.
permission_id (str): The ID of the permission to remove (from get_drive_file_permissions). Required.
Returns:
str: Confirmation of the removed permission.
"""
logger.info(
f"[remove_drive_permission] Invoked. Email: '{user_google_email}', File ID: '{file_id}', Permission ID: '{permission_id}'"
)
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name"
)
file_id = resolved_file_id
await asyncio.to_thread(
service.permissions()
.delete(fileId=file_id, permissionId=permission_id, supportsAllDrives=True)
.execute
)
output_parts = [
f"Successfully removed permission from '{file_metadata.get('name', 'Unknown')}'",
"",
f"Permission ID '{permission_id}' has been revoked.",
]
return "\n".join(output_parts)
@server.tool()
@@ -2209,77 +2214,6 @@ async def copy_drive_file(
return "\n".join(output_parts)
@server.tool()
@handle_http_errors(
"transfer_drive_ownership", is_read_only=False, service_type="drive"
)
@require_google_service("drive", "drive_file")
async def transfer_drive_ownership(
service,
user_google_email: str,
file_id: str,
new_owner_email: str,
move_to_new_owners_root: bool = False,
) -> str:
"""
Transfers ownership of a Google Drive file or folder to another user.
This is an irreversible operation. The current owner will become an editor.
Only works within the same Google Workspace domain or for personal accounts.
Args:
user_google_email (str): The user's Google email address. Required.
file_id (str): The ID of the file or folder to transfer. Required.
new_owner_email (str): Email address of the new owner. Required.
move_to_new_owners_root (bool): If True, moves the file to the new owner's My Drive root. Defaults to False.
Returns:
str: Confirmation of the ownership transfer.
"""
logger.info(
f"[transfer_drive_ownership] Invoked. Email: '{user_google_email}', File ID: '{file_id}', New owner: '{new_owner_email}'"
)
resolved_file_id, file_metadata = await resolve_drive_item(
service, file_id, extra_fields="name, owners"
)
file_id = resolved_file_id
current_owners = file_metadata.get("owners", [])
current_owner_emails = [o.get("emailAddress", "") for o in current_owners]
permission_body = {
"type": "user",
"role": "owner",
"emailAddress": new_owner_email,
}
await asyncio.to_thread(
service.permissions()
.create(
fileId=file_id,
body=permission_body,
transferOwnership=True,
moveToNewOwnersRoot=move_to_new_owners_root,
supportsAllDrives=True,
fields="id, type, role, emailAddress",
)
.execute
)
output_parts = [
f"Successfully transferred ownership of '{file_metadata.get('name', 'Unknown')}'",
"",
f"New owner: {new_owner_email}",
f"Previous owner(s): {', '.join(current_owner_emails) or 'Unknown'}",
]
if move_to_new_owners_root:
output_parts.append(f"File moved to {new_owner_email}'s My Drive root.")
output_parts.extend(["", "Note: Previous owner now has editor access."])
return "\n".join(output_parts)
@server.tool()