Merge pull request #539 from slowpoison/feature_doc_tab

Add support for GDoc Tabs
This commit is contained in:
Taylor Wilsdon
2026-03-03 17:47:45 -05:00
committed by GitHub
4 changed files with 500 additions and 71 deletions

View File

@@ -184,22 +184,28 @@ def build_paragraph_style(
return paragraph_style, fields
def create_insert_text_request(index: int, text: str) -> Dict[str, Any]:
def create_insert_text_request(
index: int, text: str, tab_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create an insertText request for Google Docs API.
Args:
index: Position to insert text
text: Text to insert
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the insertText request
"""
return {"insertText": {"location": {"index": index}, "text": text}}
location = {"index": index}
if tab_id:
location["tabId"] = tab_id
return {"insertText": {"location": location, "text": text}}
def create_insert_text_segment_request(
index: int, text: str, segment_id: str
index: int, text: str, segment_id: str, tab_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create an insertText request for Google Docs API with segmentId (for headers/footers).
@@ -208,34 +214,40 @@ def create_insert_text_segment_request(
index: Position to insert text
text: Text to insert
segment_id: Segment ID (for targeting headers/footers)
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the insertText request with segmentId
Dictionary representing the insertText request with segmentId and optional tabId
"""
location = {"segmentId": segment_id, "index": index}
if tab_id:
location["tabId"] = tab_id
return {
"insertText": {
"location": {"segmentId": segment_id, "index": index},
"location": location,
"text": text,
}
}
def create_delete_range_request(start_index: int, end_index: int) -> Dict[str, Any]:
def create_delete_range_request(
start_index: int, end_index: int, tab_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a deleteContentRange request for Google Docs API.
Args:
start_index: Start position of content to delete
end_index: End position of content to delete
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the deleteContentRange request
"""
return {
"deleteContentRange": {
"range": {"startIndex": start_index, "endIndex": end_index}
}
}
range_obj = {"startIndex": start_index, "endIndex": end_index}
if tab_id:
range_obj["tabId"] = tab_id
return {"deleteContentRange": {"range": range_obj}}
def create_format_text_request(
@@ -249,6 +261,7 @@ def create_format_text_request(
text_color: str = None,
background_color: str = None,
link_url: str = None,
tab_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""
Create an updateTextStyle request for Google Docs API.
@@ -264,6 +277,7 @@ def create_format_text_request(
text_color: Text color as hex string "#RRGGBB"
background_color: Background (highlight) color as hex string "#RRGGBB"
link_url: Hyperlink URL (http/https)
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the updateTextStyle request, or None if no styles provided
@@ -282,9 +296,13 @@ def create_format_text_request(
if not text_style:
return None
range_obj = {"startIndex": start_index, "endIndex": end_index}
if tab_id:
range_obj["tabId"] = tab_id
return {
"updateTextStyle": {
"range": {"startIndex": start_index, "endIndex": end_index},
"range": range_obj,
"textStyle": text_style,
"fields": ",".join(fields),
}
@@ -302,6 +320,7 @@ def create_update_paragraph_style_request(
indent_end: float = None,
space_above: float = None,
space_below: float = None,
tab_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""
Create an updateParagraphStyle request for Google Docs API.
@@ -317,6 +336,7 @@ def create_update_paragraph_style_request(
indent_end: Right/end indent in points
space_above: Space above paragraph in points
space_below: Space below paragraph in points
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the updateParagraphStyle request, or None if no styles provided
@@ -335,9 +355,13 @@ def create_update_paragraph_style_request(
if not paragraph_style:
return None
range_obj = {"startIndex": start_index, "endIndex": end_index}
if tab_id:
range_obj["tabId"] = tab_id
return {
"updateParagraphStyle": {
"range": {"startIndex": start_index, "endIndex": end_index},
"range": range_obj,
"paragraphStyle": paragraph_style,
"fields": ",".join(fields),
}
@@ -345,7 +369,10 @@ def create_update_paragraph_style_request(
def create_find_replace_request(
find_text: str, replace_text: str, match_case: bool = False
find_text: str,
replace_text: str,
match_case: bool = False,
tab_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a replaceAllText request for Google Docs API.
@@ -354,19 +381,25 @@ def create_find_replace_request(
find_text: Text to find
replace_text: Text to replace with
match_case: Whether to match case exactly
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the replaceAllText request
"""
return {
request = {
"replaceAllText": {
"containsText": {"text": find_text, "matchCase": match_case},
"replaceText": replace_text,
}
}
if tab_id:
request["replaceAllText"]["tabsCriteria"] = {"tabIds": [tab_id]}
return request
def create_insert_table_request(index: int, rows: int, columns: int) -> Dict[str, Any]:
def create_insert_table_request(
index: int, rows: int, columns: int, tab_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create an insertTable request for Google Docs API.
@@ -374,30 +407,104 @@ def create_insert_table_request(index: int, rows: int, columns: int) -> Dict[str
index: Position to insert table
rows: Number of rows
columns: Number of columns
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the insertTable request
"""
return {
"insertTable": {"location": {"index": index}, "rows": rows, "columns": columns}
}
location = {"index": index}
if tab_id:
location["tabId"] = tab_id
return {"insertTable": {"location": location, "rows": rows, "columns": columns}}
def create_insert_page_break_request(index: int) -> Dict[str, Any]:
def create_insert_page_break_request(
index: int, tab_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create an insertPageBreak request for Google Docs API.
Args:
index: Position to insert page break
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the insertPageBreak request
"""
return {"insertPageBreak": {"location": {"index": index}}}
location = {"index": index}
if tab_id:
location["tabId"] = tab_id
return {"insertPageBreak": {"location": location}}
def create_insert_doc_tab_request(
title: str, index: int, parent_tab_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create an addDocumentTab request for Google Docs API.
Args:
title: Title of the new tab
index: Position to insert the tab
parent_tab_id: Optional ID of the parent tab to nest under
Returns:
Dictionary representing the addDocumentTab request
"""
tab_properties: Dict[str, Any] = {
"title": title,
"index": index,
}
if parent_tab_id:
tab_properties["parentTabId"] = parent_tab_id
return {
"addDocumentTab": {
"tabProperties": tab_properties,
}
}
def create_delete_doc_tab_request(tab_id: str) -> Dict[str, Any]:
"""
Create a deleteDocumentTab request for Google Docs API.
Args:
tab_id: ID of the tab to delete
Returns:
Dictionary representing the deleteDocumentTab request
"""
return {"deleteTab": {"tabId": tab_id}}
def create_update_doc_tab_request(tab_id: str, title: str) -> Dict[str, Any]:
"""
Create an updateDocumentTab request for Google Docs API.
Args:
tab_id: ID of the tab to update
title: New title for the tab
Returns:
Dictionary representing the updateDocumentTab request
"""
return {
"updateDocumentTabProperties": {
"tabProperties": {
"tabId": tab_id,
"title": title,
},
"fields": "title",
}
}
def create_insert_image_request(
index: int, image_uri: str, width: int = None, height: int = None
index: int,
image_uri: str,
width: int = None,
height: int = None,
tab_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create an insertInlineImage request for Google Docs API.
@@ -407,11 +514,16 @@ def create_insert_image_request(
image_uri: URI of the image (Drive URL or public URL)
width: Image width in points
height: Image height in points
tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the insertInlineImage request
"""
request = {"insertInlineImage": {"location": {"index": index}, "uri": image_uri}}
location = {"index": index}
if tab_id:
location["tabId"] = tab_id
request = {"insertInlineImage": {"location": location, "uri": image_uri}}
# Add size properties if specified
object_size = {}
@@ -432,6 +544,7 @@ def create_bullet_list_request(
list_type: str = "UNORDERED",
nesting_level: int = None,
paragraph_start_indices: Optional[list[int]] = None,
doc_tab_id: Optional[str] = None,
) -> list[Dict[str, Any]]:
"""
Create requests to apply bullet list formatting with optional nesting.
@@ -448,6 +561,7 @@ def create_bullet_list_request(
nesting_level: Nesting level (0-8, where 0 is top level). If None or 0, no tabs added.
paragraph_start_indices: Optional paragraph start positions for ranges with
multiple paragraphs. If omitted, only start_index is tab-prefixed.
doc_tab_id: Optional ID of the tab to target
Returns:
List of request dictionaries (insertText for nesting tabs if needed,
@@ -485,12 +599,7 @@ def create_bullet_list_request(
for paragraph_start in paragraph_starts:
adjusted_start = paragraph_start + inserted_char_count
requests.append(
{
"insertText": {
"location": {"index": adjusted_start},
"text": tabs,
}
}
create_insert_text_request(adjusted_start, tabs, doc_tab_id)
)
inserted_char_count += nesting_level
@@ -503,10 +612,14 @@ def create_bullet_list_request(
)
# Create the bullet list
range_obj = {"startIndex": start_index, "endIndex": end_index}
if doc_tab_id:
range_obj["tabId"] = doc_tab_id
requests.append(
{
"createParagraphBullets": {
"range": {"startIndex": start_index, "endIndex": end_index},
"range": range_obj,
"bulletPreset": bullet_preset,
}
}
@@ -539,6 +652,9 @@ def validate_operation(operation: Dict[str, Any]) -> tuple[bool, str]:
"insert_table": ["index", "rows", "columns"],
"insert_page_break": ["index"],
"find_replace": ["find_text", "replace_text"],
"insert_doc_tab": ["title", "index"],
"delete_doc_tab": ["tab_id"],
"update_doc_tab": ["tab_id", "title"],
}
if op_type not in required_fields:

View File

@@ -8,7 +8,7 @@ import logging
import asyncio
import io
import re
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
@@ -28,6 +28,9 @@ from gdocs.docs_helpers import (
create_insert_page_break_request,
create_insert_image_request,
create_bullet_list_request,
create_insert_doc_tab_request,
create_update_doc_tab_request,
create_delete_doc_tab_request,
)
# Import document structure and table utilities
@@ -157,16 +160,18 @@ async def get_doc_content(
.execute
)
# Tab header format constant
TAB_HEADER_FORMAT = "\n--- TAB: {tab_name} ---\n"
TAB_HEADER_FORMAT = "\n--- TAB: {tab_name} (ID: {tab_id}) ---\n"
def extract_text_from_elements(elements, tab_name=None, depth=0):
def extract_text_from_elements(elements, tab_name=None, tab_id=None, depth=0):
"""Extract text from document elements (paragraphs, tables, etc.)"""
# Prevent infinite recursion by limiting depth
if depth > 5:
return ""
text_lines = []
if tab_name:
text_lines.append(TAB_HEADER_FORMAT.format(tab_name=tab_name))
text_lines.append(
TAB_HEADER_FORMAT.format(tab_name=tab_name, tab_id=tab_id)
)
for element in elements:
if "paragraph" in element:
@@ -204,9 +209,9 @@ async def get_doc_content(
tab_id = props.get("tabId", "Unknown ID")
# Add indentation for nested tabs to show hierarchy
if level > 0:
tab_title = " " * level + f"{tab_title} ( ID: {tab_id})"
tab_title = " " * level + f"{tab_title}"
tab_body = tab.get("documentTab", {}).get("body", {}).get("content", [])
tab_text += extract_text_from_elements(tab_body, tab_title)
tab_text += extract_text_from_elements(tab_body, tab_title, tab_id)
# Process child tabs (nested tabs)
child_tabs = tab.get("childTabs", [])
@@ -559,6 +564,7 @@ async def find_and_replace_doc(
find_text: str,
replace_text: str,
match_case: bool = False,
tab_id: Optional[str] = None,
) -> str:
"""
Finds and replaces text throughout a Google Doc.
@@ -569,15 +575,18 @@ async def find_and_replace_doc(
find_text: Text to search for
replace_text: Text to replace with
match_case: Whether to match case exactly
tab_id: Optional ID of the tab to target
Returns:
str: Confirmation message with replacement count
"""
logger.info(
f"[find_and_replace_doc] Doc={document_id}, find='{find_text}', replace='{replace_text}'"
f"[find_and_replace_doc] Doc={document_id}, find='{find_text}', replace='{replace_text}', tab='{tab_id}'"
)
requests = [create_find_replace_request(find_text, replace_text, match_case)]
requests = [
create_find_replace_request(find_text, replace_text, match_case, tab_id)
]
result = await asyncio.to_thread(
service.documents()
@@ -842,15 +851,41 @@ async def batch_update_doc(
Args:
user_google_email: User's Google email address
document_id: ID of the document to update
operations: List of operation dictionaries. Each operation should contain:
- type: Operation type ('insert_text', 'delete_text', 'replace_text', 'format_text', 'insert_table', 'insert_page_break')
- Additional parameters specific to each operation type
operations: List of operation dicts. Each operation MUST have a 'type' field.
All operations accept an optional 'tab_id' to target a specific tab.
Supported operation types and their parameters:
insert_text - required: index (int), text (str)
delete_text - required: start_index (int), end_index (int)
replace_text - required: start_index (int), end_index (int), text (str)
format_text - required: start_index (int), end_index (int)
optional: bold, italic, underline, font_size, font_family,
text_color, background_color, link_url
update_paragraph_style
- required: start_index (int), end_index (int)
optional: heading_level (0-6, 0=normal), alignment
(START/CENTER/END/JUSTIFIED), line_spacing,
indent_first_line, indent_start, indent_end,
space_above, space_below
insert_table - required: index (int), rows (int), columns (int)
insert_page_break- required: index (int)
find_replace - required: find_text (str), replace_text (str)
optional: match_case (bool, default false)
insert_doc_tab - required: title (str), index (int)
optional: parent_tab_id (str)
delete_doc_tab - required: tab_id (str)
update_doc_tab - required: tab_id (str), title (str)
Example operations:
[
{"type": "insert_text", "index": 1, "text": "Hello World"},
{"type": "format_text", "start_index": 1, "end_index": 12, "bold": true},
{"type": "insert_table", "index": 20, "rows": 2, "columns": 3}
{"type": "update_paragraph_style", "start_index": 1, "end_index": 12,
"heading_level": 1, "alignment": "CENTER"},
{"type": "find_replace", "find_text": "foo", "replace_text": "bar"},
{"type": "insert_table", "index": 20, "rows": 2, "columns": 3},
{"type": "insert_doc_tab", "title": "Appendix", "index": 1}
]
Returns:
@@ -892,6 +927,7 @@ async def inspect_doc_structure(
user_google_email: str,
document_id: str,
detailed: bool = False,
tab_id: str = None,
) -> str:
"""
Essential tool for finding safe insertion points and understanding document structure.
@@ -901,6 +937,7 @@ async def inspect_doc_structure(
- Understanding document layout before making changes
- Locating existing tables and their positions
- Getting document statistics and complexity info
- Inspecting structure of specific tabs
CRITICAL FOR TABLE OPERATIONS:
ALWAYS call this BEFORE creating tables to get a safe insertion index.
@@ -910,6 +947,7 @@ async def inspect_doc_structure(
- total_length: Maximum safe index for insertion
- tables: Number of existing tables
- table_details: Position and dimensions of each table
- tabs: List of available tabs in the document (if no tab_id specified)
WORKFLOW:
Step 1: Call this function
@@ -921,20 +959,49 @@ async def inspect_doc_structure(
user_google_email: User's Google email address
document_id: ID of the document to inspect
detailed: Whether to return detailed structure information
tab_id: Optional ID of the tab to inspect. If not provided, inspects main document.
Returns:
str: JSON string containing document structure and safe insertion indices
"""
logger.debug(f"[inspect_doc_structure] Doc={document_id}, detailed={detailed}")
logger.debug(
f"[inspect_doc_structure] Doc={document_id}, detailed={detailed}, tab_id={tab_id}"
)
# Get the document
doc = await asyncio.to_thread(
service.documents().get(documentId=document_id).execute
service.documents().get(documentId=document_id, includeTabsContent=True).execute
)
# If tab_id is specified, find the tab and use its content
target_content = doc.get("body", {})
def find_tab(tabs, target_id):
for tab in tabs:
if tab.get("tabProperties", {}).get("tabId") == target_id:
return tab
if "childTabs" in tab:
found = find_tab(tab["childTabs"], target_id)
if found:
return found
return None
if tab_id:
tab = find_tab(doc.get("tabs", []), tab_id)
if tab and "documentTab" in tab:
target_content = tab["documentTab"].get("body", {})
elif tab:
return f"Error: Tab {tab_id} is not a document tab and has no body content."
else:
return f"Error: Tab {tab_id} not found in document."
# Create a dummy doc object for analysis tools that expect a full doc
analysis_doc = doc.copy()
analysis_doc["body"] = target_content
if detailed:
# Return full parsed structure
structure = parse_document_structure(doc)
structure = parse_document_structure(analysis_doc)
# Simplify for JSON serialization
result = {
@@ -991,10 +1058,10 @@ async def inspect_doc_structure(
else:
# Return basic analysis
result = analyze_document_complexity(doc)
result = analyze_document_complexity(analysis_doc)
# Add table information
tables = find_tables(doc)
tables = find_tables(analysis_doc)
if tables:
result["table_details"] = []
for i, table in enumerate(tables):
@@ -1008,6 +1075,27 @@ async def inspect_doc_structure(
}
)
# Always include available tabs if no tab_id was specified
if not tab_id:
def get_tabs_summary(tabs):
summary = []
for tab in tabs:
props = tab.get("tabProperties", {})
tab_info = {
"title": props.get("title"),
"tab_id": props.get("tabId"),
}
if "childTabs" in tab:
tab_info["child_tabs"] = get_tabs_summary(tab["childTabs"])
summary.append(tab_info)
return summary
result["tabs"] = get_tabs_summary(doc.get("tabs", []))
if tab_id:
result["inspected_tab_id"] = tab_id
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Document structure analysis for {document_id}:\n\n{json.dumps(result, indent=2)}\n\nLink: {link}"
@@ -1022,6 +1110,7 @@ async def create_table_with_data(
table_data: List[List[str]],
index: int,
bold_headers: bool = True,
tab_id: Optional[str] = None,
) -> str:
"""
Creates a table and populates it with data in one reliable operation.
@@ -1060,6 +1149,7 @@ async def create_table_with_data(
table_data: 2D list of strings - EXACT format: [["col1", "col2"], ["row1col1", "row1col2"]]
index: Document position (MANDATORY: get from inspect_doc_structure 'total_length')
bold_headers: Whether to make first row bold (default: true)
tab_id: Optional tab ID to create the table in a specific tab
Returns:
str: Confirmation with table details and link
@@ -1086,7 +1176,7 @@ async def create_table_with_data(
# Try to create the table, and if it fails due to index being at document end, retry with index-1
success, message, metadata = await table_manager.create_and_populate_table(
document_id, table_data, index, bold_headers
document_id, table_data, index, bold_headers, tab_id
)
# If it failed due to index being at or beyond document end, retry with adjusted index
@@ -1095,7 +1185,7 @@ async def create_table_with_data(
f"Index {index} is at document boundary, retrying with index {index - 1}"
)
success, message, metadata = await table_manager.create_and_populate_table(
document_id, table_data, index - 1, bold_headers
document_id, table_data, index - 1, bold_headers, tab_id
)
if success:
@@ -1674,6 +1764,127 @@ async def get_doc_as_markdown(
return markdown.rstrip("\n") + "\n\n" + appendix
@server.tool()
@handle_http_errors("insert_doc_tab", service_type="docs")
@require_google_service("docs", "docs_write")
async def insert_doc_tab(
service: Any,
user_google_email: str,
document_id: str,
title: str,
index: int,
parent_tab_id: Optional[str] = None,
) -> str:
"""
Inserts a new tab into a Google Doc.
Args:
user_google_email: User's Google email address
document_id: ID of the document to update
title: Title of the new tab
index: Position index for the new tab (0-based among sibling tabs)
parent_tab_id: Optional ID of a parent tab to nest the new tab under
Returns:
str: Confirmation message with document link
"""
logger.info(f"[insert_doc_tab] Doc={document_id}, title='{title}', index={index}")
request = create_insert_doc_tab_request(title, index, parent_tab_id)
result = await asyncio.to_thread(
service.documents()
.batchUpdate(documentId=document_id, body={"requests": [request]})
.execute
)
# Extract the new tab ID from the batchUpdate response
tab_id = None
if "replies" in result and result["replies"]:
reply = result["replies"][0]
if "createDocumentTab" in reply:
tab_id = reply["createDocumentTab"].get("tabProperties", {}).get("tabId")
link = f"https://docs.google.com/document/d/{document_id}/edit"
msg = f"Inserted tab '{title}' at index {index} in document {document_id}."
if tab_id:
msg += f" Tab ID: {tab_id}."
if parent_tab_id:
msg += f" Nested under parent tab {parent_tab_id}."
return f"{msg} Link: {link}"
@server.tool()
@handle_http_errors("delete_doc_tab", service_type="docs")
@require_google_service("docs", "docs_write")
async def delete_doc_tab(
service: Any,
user_google_email: str,
document_id: str,
tab_id: str,
) -> str:
"""
Deletes a tab from a Google Doc by its tab ID.
Args:
user_google_email: User's Google email address
document_id: ID of the document to update
tab_id: ID of the tab to delete (use inspect_doc_structure to find tab IDs)
Returns:
str: Confirmation message with document link
"""
logger.info(f"[delete_doc_tab] Doc={document_id}, tab_id='{tab_id}'")
request = create_delete_doc_tab_request(tab_id)
await asyncio.to_thread(
service.documents()
.batchUpdate(documentId=document_id, body={"requests": [request]})
.execute
)
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Deleted tab '{tab_id}' from document {document_id}. Link: {link}"
@server.tool()
@handle_http_errors("update_doc_tab", service_type="docs")
@require_google_service("docs", "docs_write")
async def update_doc_tab(
service: Any,
user_google_email: str,
document_id: str,
tab_id: str,
title: str,
) -> str:
"""
Renames a tab in a Google Doc.
Args:
user_google_email: User's Google email address
document_id: ID of the document to update
tab_id: ID of the tab to rename (use inspect_doc_structure to find tab IDs)
title: New title for the tab
Returns:
str: Confirmation message with document link
"""
logger.info(
f"[update_doc_tab] Doc={document_id}, tab_id='{tab_id}', title='{title}'"
)
request = create_update_doc_tab_request(tab_id, title)
await asyncio.to_thread(
service.documents()
.batchUpdate(documentId=document_id, body={"requests": [request]})
.execute
)
link = f"https://docs.google.com/document/d/{document_id}/edit"
return (
f"Renamed tab '{tab_id}' to '{title}' in document {document_id}. Link: {link}"
)
# Create comment management tools for documents
_comment_tools = create_comment_tools("document", "document_id")

View File

@@ -17,6 +17,9 @@ from gdocs.docs_helpers import (
create_find_replace_request,
create_insert_table_request,
create_insert_page_break_request,
create_insert_doc_tab_request,
create_delete_doc_tab_request,
create_update_doc_tab_request,
validate_operation,
)
@@ -87,13 +90,20 @@ class BatchOperationManager:
"operation_summary": operation_descriptions[:5], # First 5 operations
}
summary = self._build_operation_summary(operation_descriptions)
# Extract new tab IDs from insert_doc_tab replies
created_tabs = self._extract_created_tabs(result)
if created_tabs:
metadata["created_tabs"] = created_tabs
return (
True,
f"Successfully executed {len(operations)} operations ({summary})",
metadata,
)
summary = self._build_operation_summary(operation_descriptions)
msg = f"Successfully executed {len(operations)} operations ({summary})"
if created_tabs:
tab_info = ", ".join(
f"'{t['title']}' (tab_id: {t['tab_id']})" for t in created_tabs
)
msg += f". Created tabs: {tab_info}"
return True, msg, metadata
except Exception as e:
logger.error(f"Failed to execute batch operations: {str(e)}")
@@ -161,20 +171,26 @@ class BatchOperationManager:
Returns:
Tuple of (request, description)
"""
tab_id = op.get("tab_id")
if op_type == "insert_text":
request = create_insert_text_request(op["index"], op["text"])
request = create_insert_text_request(op["index"], op["text"], tab_id)
description = f"insert text at {op['index']}"
elif op_type == "delete_text":
request = create_delete_range_request(op["start_index"], op["end_index"])
request = create_delete_range_request(
op["start_index"], op["end_index"], tab_id
)
description = f"delete text {op['start_index']}-{op['end_index']}"
elif op_type == "replace_text":
# Replace is delete + insert (must be done in this order)
delete_request = create_delete_range_request(
op["start_index"], op["end_index"]
op["start_index"], op["end_index"], tab_id
)
insert_request = create_insert_text_request(
op["start_index"], op["text"], tab_id
)
insert_request = create_insert_text_request(op["start_index"], op["text"])
# Return both requests as a list
request = [delete_request, insert_request]
description = f"replace text {op['start_index']}-{op['end_index']} with '{op['text'][:20]}{'...' if len(op['text']) > 20 else ''}'"
@@ -191,6 +207,7 @@ class BatchOperationManager:
op.get("text_color"),
op.get("background_color"),
op.get("link_url"),
tab_id,
)
if not request:
@@ -226,6 +243,7 @@ class BatchOperationManager:
op.get("indent_end"),
op.get("space_above"),
op.get("space_below"),
tab_id,
)
if not request:
@@ -269,20 +287,36 @@ class BatchOperationManager:
elif op_type == "insert_table":
request = create_insert_table_request(
op["index"], op["rows"], op["columns"]
op["index"], op["rows"], op["columns"], tab_id
)
description = f"insert {op['rows']}x{op['columns']} table at {op['index']}"
elif op_type == "insert_page_break":
request = create_insert_page_break_request(op["index"])
request = create_insert_page_break_request(op["index"], tab_id)
description = f"insert page break at {op['index']}"
elif op_type == "find_replace":
request = create_find_replace_request(
op["find_text"], op["replace_text"], op.get("match_case", False)
op["find_text"], op["replace_text"], op.get("match_case", False), tab_id
)
description = f"find/replace '{op['find_text']}''{op['replace_text']}'"
elif op_type == "insert_doc_tab":
request = create_insert_doc_tab_request(
op["title"], op["index"], op.get("parent_tab_id")
)
description = f"insert tab '{op['title']}' at {op['index']}"
if op.get("parent_tab_id"):
description += f" under parent tab {op['parent_tab_id']}"
elif op_type == "delete_doc_tab":
request = create_delete_doc_tab_request(op["tab_id"])
description = f"delete tab '{op['tab_id']}'"
elif op_type == "update_doc_tab":
request = create_update_doc_tab_request(op["tab_id"], op["title"])
description = f"rename tab '{op['tab_id']}' to '{op['title']}'"
else:
supported_types = [
"insert_text",
@@ -293,6 +327,9 @@ class BatchOperationManager:
"insert_table",
"insert_page_break",
"find_replace",
"insert_doc_tab",
"delete_doc_tab",
"update_doc_tab",
]
raise ValueError(
f"Unsupported operation type '{op_type}'. Supported: {', '.join(supported_types)}"
@@ -319,6 +356,26 @@ class BatchOperationManager:
.execute
)
def _extract_created_tabs(self, result: dict[str, Any]) -> list[dict[str, str]]:
"""
Extract tab IDs from insert_doc_tab replies in the batchUpdate response.
Args:
result: The batchUpdate API response
Returns:
List of dicts with tab_id and title for each created tab
"""
created_tabs = []
for reply in result.get("replies", []):
if "createDocumentTab" in reply:
props = reply["createDocumentTab"].get("tabProperties", {})
tab_id = props.get("tabId")
title = props.get("title", "")
if tab_id:
created_tabs.append({"tab_id": tab_id, "title": title})
return created_tabs
def _build_operation_summary(self, operation_descriptions: list[str]) -> str:
"""
Build a concise summary of operations performed.
@@ -403,6 +460,18 @@ class BatchOperationManager:
"optional": ["match_case"],
"description": "Find and replace text throughout document",
},
"insert_doc_tab": {
"required": ["title", "index"],
"description": "Insert a new document tab with given title at specified index",
},
"delete_doc_tab": {
"required": ["tab_id"],
"description": "Delete a document tab by its ID",
},
"update_doc_tab": {
"required": ["tab_id", "title"],
"description": "Rename a document tab",
},
},
"example_operations": [
{"type": "insert_text", "index": 1, "text": "Hello World"},

View File

@@ -41,6 +41,7 @@ class TableOperationManager:
table_data: List[List[str]],
index: int,
bold_headers: bool = True,
tab_id: str = None,
) -> Tuple[bool, str, Dict[str, Any]]:
"""
Creates a table and populates it with data in a reliable multi-step process.
@@ -52,6 +53,7 @@ class TableOperationManager:
table_data: 2D list of strings for table content
index: Position to insert the table
bold_headers: Whether to make the first row bold
tab_id: Optional tab ID for targeting a specific tab
Returns:
Tuple of (success, message, metadata)
@@ -70,16 +72,16 @@ class TableOperationManager:
try:
# Step 1: Create empty table
await self._create_empty_table(document_id, index, rows, cols)
await self._create_empty_table(document_id, index, rows, cols, tab_id)
# Step 2: Get fresh document structure to find actual cell positions
fresh_tables = await self._get_document_tables(document_id)
fresh_tables = await self._get_document_tables(document_id, tab_id)
if not fresh_tables:
return False, "Could not find table after creation", {}
# Step 3: Populate each cell with proper index refreshing
population_count = await self._populate_table_cells(
document_id, table_data, bold_headers
document_id, table_data, bold_headers, tab_id
)
metadata = {
@@ -100,7 +102,7 @@ class TableOperationManager:
return False, f"Table creation failed: {str(e)}", {}
async def _create_empty_table(
self, document_id: str, index: int, rows: int, cols: int
self, document_id: str, index: int, rows: int, cols: int, tab_id: str = None
) -> None:
"""Create an empty table at the specified index."""
logger.debug(f"Creating {rows}x{cols} table at index {index}")
@@ -109,20 +111,49 @@ class TableOperationManager:
self.service.documents()
.batchUpdate(
documentId=document_id,
body={"requests": [create_insert_table_request(index, rows, cols)]},
body={
"requests": [create_insert_table_request(index, rows, cols, tab_id)]
},
)
.execute
)
async def _get_document_tables(self, document_id: str) -> List[Dict[str, Any]]:
async def _get_document_tables(
self, document_id: str, tab_id: str = None
) -> List[Dict[str, Any]]:
"""Get fresh document structure and extract table information."""
doc = await asyncio.to_thread(
self.service.documents().get(documentId=document_id).execute
self.service.documents()
.get(documentId=document_id, includeTabsContent=True)
.execute
)
if tab_id:
tab = self._find_tab(doc.get("tabs", []), tab_id)
if tab and "documentTab" in tab:
doc = doc.copy()
doc["body"] = tab["documentTab"].get("body", {})
return find_tables(doc)
@staticmethod
def _find_tab(tabs: list, target_id: str):
"""Recursively find a tab by ID."""
for tab in tabs:
if tab.get("tabProperties", {}).get("tabId") == target_id:
return tab
if "childTabs" in tab:
found = TableOperationManager._find_tab(tab["childTabs"], target_id)
if found:
return found
return None
async def _populate_table_cells(
self, document_id: str, table_data: List[List[str]], bold_headers: bool
self,
document_id: str,
table_data: List[List[str]],
bold_headers: bool,
tab_id: str = None,
) -> int:
"""
Populate table cells with data, refreshing structure after each insertion.
@@ -147,6 +178,7 @@ class TableOperationManager:
col_idx,
cell_text,
bold_headers and row_idx == 0,
tab_id,
)
if success:
@@ -169,6 +201,7 @@ class TableOperationManager:
col_idx: int,
cell_text: str,
apply_bold: bool = False,
tab_id: str = None,
) -> bool:
"""
Populate a single cell with text, with optional bold formatting.
@@ -177,7 +210,7 @@ class TableOperationManager:
"""
try:
# Get fresh table structure to avoid index shifting issues
tables = await self._get_document_tables(document_id)
tables = await self._get_document_tables(document_id, tab_id)
if not tables:
return False