Handle Drive shortcuts transparently

This commit is contained in:
Kevin Littlejohn
2025-11-11 16:43:00 +08:00
parent 973f921c32
commit 55dfba3d83
3 changed files with 367 additions and 22 deletions

View File

@@ -20,7 +20,12 @@ from auth.oauth_config import is_stateless_mode
from core.utils import extract_office_xml_text, handle_http_errors
from core.server import server
from core.config import get_transport_mode
from gdrive.drive_helpers import DRIVE_QUERY_PATTERNS, build_drive_list_params
from gdrive.drive_helpers import (
DRIVE_QUERY_PATTERNS,
build_drive_list_params,
resolve_drive_item,
resolve_folder_id,
)
logger = logging.getLogger(__name__)
@@ -119,11 +124,12 @@ async def get_drive_file_content(
"""
logger.info(f"[get_drive_file_content] Invoked. File ID: '{file_id}'")
file_metadata = await asyncio.to_thread(
service.files().get(
fileId=file_id, fields="id, name, mimeType, webViewLink", supportsAllDrives=True
).execute
resolved_file_id, file_metadata = await resolve_drive_item(
service,
file_id,
extra_fields="name, webViewLink",
)
file_id = resolved_file_id
mime_type = file_metadata.get("mimeType", "")
file_name = file_metadata.get("name", "Unknown File")
export_mime_type = {
@@ -133,9 +139,9 @@ async def get_drive_file_content(
}.get(mime_type)
request_obj = (
service.files().export_media(fileId=file_id, mimeType=export_mime_type, supportsAllDrives=True)
service.files().export_media(fileId=file_id, mimeType=export_mime_type)
if export_mime_type
else service.files().get_media(fileId=file_id, supportsAllDrives=True)
else service.files().get_media(fileId=file_id)
)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request_obj)
@@ -214,7 +220,8 @@ async def list_drive_items(
"""
logger.info(f"[list_drive_items] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'")
final_query = f"'{folder_id}' in parents and trashed=false"
resolved_folder_id = await resolve_folder_id(service, folder_id)
final_query = f"'{resolved_folder_id}' in parents and trashed=false"
list_params = build_drive_list_params(
query=final_query,
@@ -273,10 +280,11 @@ async def create_drive_file(
raise Exception("You must provide either 'content' or 'fileUrl'.")
file_data = None
resolved_folder_id = await resolve_folder_id(service, folder_id)
file_metadata = {
'name': file_name,
'parents': [folder_id],
'parents': [resolved_folder_id],
'mimeType': mime_type
}
@@ -449,6 +457,9 @@ async def get_drive_file_permissions(
str: Detailed file metadata including sharing status and URLs.
"""
logger.info(f"[get_drive_file_permissions] Checking file {file_id} for {user_google_email}")
resolved_file_id, _ = await resolve_drive_item(service, file_id)
file_id = resolved_file_id
try:
# Get comprehensive file metadata including permissions
@@ -589,6 +600,8 @@ async def check_drive_file_public_access(
# Check permissions for the first file
file_id = files[0]['id']
resolved_file_id, _ = await resolve_drive_item(service, file_id)
file_id = resolved_file_id
# Get detailed permissions
file_metadata = await asyncio.to_thread(
@@ -674,14 +687,16 @@ async def update_drive_file(
"""
logger.info(f"[update_drive_file] Updating file {file_id} for {user_google_email}")
# First, get current file info for reference
current_file = await asyncio.to_thread(
service.files().get(
fileId=file_id,
fields="id, name, description, mimeType, parents, starred, trashed, webViewLink",
supportsAllDrives=True
).execute
current_file_fields = (
"name, description, mimeType, parents, starred, trashed, webViewLink, "
"writersCanShare, copyRequiresWriterPermission, properties"
)
resolved_file_id, current_file = await resolve_drive_item(
service,
file_id,
extra_fields=current_file_fields,
)
file_id = resolved_file_id
# Build the update body with only specified fields
update_body = {}
@@ -702,6 +717,22 @@ async def update_drive_file(
if properties is not None:
update_body['properties'] = properties
async def _resolve_parent_arguments(parent_arg: Optional[str]) -> Optional[str]:
if not parent_arg:
return None
parent_ids = [part.strip() for part in parent_arg.split(",") if part.strip()]
if not parent_ids:
return None
resolved_ids = []
for parent in parent_ids:
resolved_parent = await resolve_folder_id(service, parent)
resolved_ids.append(resolved_parent)
return ",".join(resolved_ids)
resolved_add_parents = await _resolve_parent_arguments(add_parents)
resolved_remove_parents = await _resolve_parent_arguments(remove_parents)
# Build query parameters for parent changes
query_params = {
'fileId': file_id,
@@ -709,10 +740,10 @@ async def update_drive_file(
'fields': 'id, name, description, mimeType, parents, starred, trashed, webViewLink, writersCanShare, copyRequiresWriterPermission, properties'
}
if add_parents:
query_params['addParents'] = add_parents
if remove_parents:
query_params['removeParents'] = remove_parents
if resolved_add_parents:
query_params['addParents'] = resolved_add_parents
if resolved_remove_parents:
query_params['removeParents'] = resolved_remove_parents
# Only include body if there are updates
if update_body: