apply ruff formatting

This commit is contained in:
Taylor Wilsdon
2025-12-13 13:49:28 -08:00
parent 1d80a24ca4
commit 6b8352a354
50 changed files with 4010 additions and 2842 deletions

View File

@@ -4,11 +4,13 @@ Google Docs Helper Functions
This module provides utility functions for common Google Docs operations
to simplify the implementation of document editing tools.
"""
import logging
from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _normalize_color(color: Any, param_name: str) -> Optional[Dict[str, float]]:
"""
Normalize a user-supplied color into Docs API rgbColor format.
@@ -25,28 +27,34 @@ def _normalize_color(color: Any, param_name: str) -> Optional[Dict[str, float]]:
raise ValueError(f"{param_name} components cannot be boolean values")
if isinstance(value, int):
if value < 0 or value > 255:
raise ValueError(f"{param_name} components must be 0-255 when using integers")
raise ValueError(
f"{param_name} components must be 0-255 when using integers"
)
return value / 255
if isinstance(value, float):
if value < 0 or value > 1:
raise ValueError(f"{param_name} components must be between 0 and 1 when using floats")
raise ValueError(
f"{param_name} components must be between 0 and 1 when using floats"
)
return value
raise ValueError(f"{param_name} components must be int (0-255) or float (0-1)")
if isinstance(color, str):
hex_color = color.lstrip('#')
if len(hex_color) != 6 or any(c not in '0123456789abcdefABCDEF' for c in hex_color):
hex_color = color.lstrip("#")
if len(hex_color) != 6 or any(
c not in "0123456789abcdefABCDEF" for c in hex_color
):
raise ValueError(f"{param_name} must be a hex string like '#RRGGBB'")
r = int(hex_color[0:2], 16) / 255
g = int(hex_color[2:4], 16) / 255
b = int(hex_color[4:6], 16) / 255
return {'red': r, 'green': g, 'blue': b}
return {"red": r, "green": g, "blue": b}
if isinstance(color, (list, tuple)) and len(color) == 3:
r = _to_component(color[0])
g = _to_component(color[1])
b = _to_component(color[2])
return {'red': r, 'green': g, 'blue': b}
return {"red": r, "green": g, "blue": b}
raise ValueError(f"{param_name} must be a hex string or RGB tuple/list")
@@ -58,11 +66,11 @@ def build_text_style(
font_size: int = None,
font_family: str = None,
text_color: Any = None,
background_color: Any = None
background_color: Any = None,
) -> tuple[Dict[str, Any], list[str]]:
"""
Build text style object for Google Docs API requests.
Args:
bold: Whether text should be bold
italic: Whether text should be italic
@@ -71,107 +79,102 @@ def build_text_style(
font_family: Font family name
text_color: Text color as hex string or RGB tuple/list
background_color: Background (highlight) color as hex string or RGB tuple/list
Returns:
Tuple of (text_style_dict, list_of_field_names)
"""
text_style = {}
fields = []
if bold is not None:
text_style['bold'] = bold
fields.append('bold')
text_style["bold"] = bold
fields.append("bold")
if italic is not None:
text_style['italic'] = italic
fields.append('italic')
text_style["italic"] = italic
fields.append("italic")
if underline is not None:
text_style['underline'] = underline
fields.append('underline')
text_style["underline"] = underline
fields.append("underline")
if font_size is not None:
text_style['fontSize'] = {'magnitude': font_size, 'unit': 'PT'}
fields.append('fontSize')
text_style["fontSize"] = {"magnitude": font_size, "unit": "PT"}
fields.append("fontSize")
if font_family is not None:
text_style['weightedFontFamily'] = {'fontFamily': font_family}
fields.append('weightedFontFamily')
text_style["weightedFontFamily"] = {"fontFamily": font_family}
fields.append("weightedFontFamily")
if text_color is not None:
rgb = _normalize_color(text_color, "text_color")
text_style['foregroundColor'] = {'color': {'rgbColor': rgb}}
fields.append('foregroundColor')
text_style["foregroundColor"] = {"color": {"rgbColor": rgb}}
fields.append("foregroundColor")
if background_color is not None:
rgb = _normalize_color(background_color, "background_color")
text_style['backgroundColor'] = {'color': {'rgbColor': rgb}}
fields.append('backgroundColor')
text_style["backgroundColor"] = {"color": {"rgbColor": rgb}}
fields.append("backgroundColor")
return text_style, fields
def create_insert_text_request(index: int, text: str) -> Dict[str, Any]:
"""
Create an insertText request for Google Docs API.
Args:
index: Position to insert text
text: Text to insert
Returns:
Dictionary representing the insertText request
"""
return {
'insertText': {
'location': {'index': index},
'text': text
}
}
return {"insertText": {"location": {"index": index}, "text": text}}
def create_insert_text_segment_request(index: int, text: str, segment_id: str) -> Dict[str, Any]:
def create_insert_text_segment_request(
index: int, text: str, segment_id: str
) -> Dict[str, Any]:
"""
Create an insertText request for Google Docs API with segmentId (for headers/footers).
Args:
index: Position to insert text
text: Text to insert
segment_id: Segment ID (for targeting headers/footers)
Returns:
Dictionary representing the insertText request with segmentId
"""
return {
'insertText': {
'location': {
'segmentId': segment_id,
'index': index
},
'text': text
"insertText": {
"location": {"segmentId": segment_id, "index": index},
"text": text,
}
}
def create_delete_range_request(start_index: int, end_index: int) -> Dict[str, Any]:
"""
Create a deleteContentRange request for Google Docs API.
Args:
start_index: Start position of content to delete
end_index: End position of content to delete
Returns:
Dictionary representing the deleteContentRange request
"""
return {
'deleteContentRange': {
'range': {
'startIndex': start_index,
'endIndex': end_index
}
"deleteContentRange": {
"range": {"startIndex": start_index, "endIndex": end_index}
}
}
def create_format_text_request(
start_index: int,
start_index: int,
end_index: int,
bold: bool = None,
italic: bool = None,
@@ -179,11 +182,11 @@ def create_format_text_request(
font_size: int = None,
font_family: str = None,
text_color: Any = None,
background_color: Any = None
background_color: Any = None,
) -> Optional[Dict[str, Any]]:
"""
Create an updateTextStyle request for Google Docs API.
Args:
start_index: Start position of text to format
end_index: End position of text to format
@@ -194,189 +197,166 @@ def create_format_text_request(
font_family: Font family name
text_color: Text color as hex string or RGB tuple/list
background_color: Background (highlight) color as hex string or RGB tuple/list
Returns:
Dictionary representing the updateTextStyle request, or None if no styles provided
"""
text_style, fields = build_text_style(
bold, italic, underline, font_size, font_family, text_color, background_color
)
if not text_style:
return None
return {
'updateTextStyle': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'textStyle': text_style,
'fields': ','.join(fields)
"updateTextStyle": {
"range": {"startIndex": start_index, "endIndex": end_index},
"textStyle": text_style,
"fields": ",".join(fields),
}
}
def create_find_replace_request(
find_text: str,
replace_text: str,
match_case: bool = False
find_text: str, replace_text: str, match_case: bool = False
) -> Dict[str, Any]:
"""
Create a replaceAllText request for Google Docs API.
Args:
find_text: Text to find
replace_text: Text to replace with
match_case: Whether to match case exactly
Returns:
Dictionary representing the replaceAllText request
"""
return {
'replaceAllText': {
'containsText': {
'text': find_text,
'matchCase': match_case
},
'replaceText': replace_text
"replaceAllText": {
"containsText": {"text": find_text, "matchCase": match_case},
"replaceText": replace_text,
}
}
def create_insert_table_request(index: int, rows: int, columns: int) -> Dict[str, Any]:
"""
Create an insertTable request for Google Docs API.
Args:
index: Position to insert table
rows: Number of rows
columns: Number of columns
Returns:
Dictionary representing the insertTable request
"""
return {
'insertTable': {
'location': {'index': index},
'rows': rows,
'columns': columns
}
"insertTable": {"location": {"index": index}, "rows": rows, "columns": columns}
}
def create_insert_page_break_request(index: int) -> Dict[str, Any]:
"""
Create an insertPageBreak request for Google Docs API.
Args:
index: Position to insert page break
Returns:
Dictionary representing the insertPageBreak request
"""
return {
'insertPageBreak': {
'location': {'index': index}
}
}
return {"insertPageBreak": {"location": {"index": index}}}
def create_insert_image_request(
index: int,
image_uri: str,
width: int = None,
height: int = None
index: int, image_uri: str, width: int = None, height: int = None
) -> Dict[str, Any]:
"""
Create an insertInlineImage request for Google Docs API.
Args:
index: Position to insert image
image_uri: URI of the image (Drive URL or public URL)
width: Image width in points
height: Image height in points
Returns:
Dictionary representing the insertInlineImage request
"""
request = {
'insertInlineImage': {
'location': {'index': index},
'uri': image_uri
}
}
request = {"insertInlineImage": {"location": {"index": index}, "uri": image_uri}}
# Add size properties if specified
object_size = {}
if width is not None:
object_size['width'] = {'magnitude': width, 'unit': 'PT'}
object_size["width"] = {"magnitude": width, "unit": "PT"}
if height is not None:
object_size['height'] = {'magnitude': height, 'unit': 'PT'}
object_size["height"] = {"magnitude": height, "unit": "PT"}
if object_size:
request['insertInlineImage']['objectSize'] = object_size
request["insertInlineImage"]["objectSize"] = object_size
return request
def create_bullet_list_request(
start_index: int,
end_index: int,
list_type: str = "UNORDERED"
start_index: int, end_index: int, list_type: str = "UNORDERED"
) -> Dict[str, Any]:
"""
Create a createParagraphBullets request for Google Docs API.
Args:
start_index: Start of text range to convert to list
end_index: End of text range to convert to list
list_type: Type of list ("UNORDERED" or "ORDERED")
Returns:
Dictionary representing the createParagraphBullets request
"""
bullet_preset = (
'BULLET_DISC_CIRCLE_SQUARE'
if list_type == "UNORDERED"
else 'NUMBERED_DECIMAL_ALPHA_ROMAN'
"BULLET_DISC_CIRCLE_SQUARE"
if list_type == "UNORDERED"
else "NUMBERED_DECIMAL_ALPHA_ROMAN"
)
return {
'createParagraphBullets': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'bulletPreset': bullet_preset
"createParagraphBullets": {
"range": {"startIndex": start_index, "endIndex": end_index},
"bulletPreset": bullet_preset,
}
}
def validate_operation(operation: Dict[str, Any]) -> Tuple[bool, str]:
"""
Validate a batch operation dictionary.
Args:
operation: Operation dictionary to validate
Returns:
Tuple of (is_valid, error_message)
"""
op_type = operation.get('type')
op_type = operation.get("type")
if not op_type:
return False, "Missing 'type' field"
# Validate required fields for each operation type
required_fields = {
'insert_text': ['index', 'text'],
'delete_text': ['start_index', 'end_index'],
'replace_text': ['start_index', 'end_index', 'text'],
'format_text': ['start_index', 'end_index'],
'insert_table': ['index', 'rows', 'columns'],
'insert_page_break': ['index'],
'find_replace': ['find_text', 'replace_text']
"insert_text": ["index", "text"],
"delete_text": ["start_index", "end_index"],
"replace_text": ["start_index", "end_index", "text"],
"format_text": ["start_index", "end_index"],
"insert_table": ["index", "rows", "columns"],
"insert_page_break": ["index"],
"find_replace": ["find_text", "replace_text"],
}
if op_type not in required_fields:
return False, f"Unsupported operation type: {op_type or 'None'}"
for field in required_fields[op_type]:
if field not in operation:
return False, f"Missing required field: {field}"
return True, ""

View File

@@ -4,6 +4,7 @@ Google Docs Document Structure Parsing and Analysis
This module provides utilities for parsing and analyzing the structure
of Google Docs documents, including finding tables, cells, and other elements.
"""
import logging
from typing import Any, Optional
@@ -13,127 +14,129 @@ logger = logging.getLogger(__name__)
def parse_document_structure(doc_data: dict[str, Any]) -> dict[str, Any]:
"""
Parse the full document structure into a navigable format.
Args:
doc_data: Raw document data from Google Docs API
Returns:
Dictionary containing parsed structure with elements and their positions
"""
structure = {
'title': doc_data.get('title', ''),
'body': [],
'tables': [],
'headers': {},
'footers': {},
'total_length': 0
"title": doc_data.get("title", ""),
"body": [],
"tables": [],
"headers": {},
"footers": {},
"total_length": 0,
}
body = doc_data.get('body', {})
content = body.get('content', [])
body = doc_data.get("body", {})
content = body.get("content", [])
for element in content:
element_info = _parse_element(element)
if element_info:
structure['body'].append(element_info)
if element_info['type'] == 'table':
structure['tables'].append(element_info)
structure["body"].append(element_info)
if element_info["type"] == "table":
structure["tables"].append(element_info)
# Calculate total document length
if structure['body']:
last_element = structure['body'][-1]
structure['total_length'] = last_element.get('end_index', 0)
if structure["body"]:
last_element = structure["body"][-1]
structure["total_length"] = last_element.get("end_index", 0)
# Parse headers and footers
for header_id, header_data in doc_data.get('headers', {}).items():
structure['headers'][header_id] = _parse_segment(header_data)
for footer_id, footer_data in doc_data.get('footers', {}).items():
structure['footers'][footer_id] = _parse_segment(footer_data)
for header_id, header_data in doc_data.get("headers", {}).items():
structure["headers"][header_id] = _parse_segment(header_data)
for footer_id, footer_data in doc_data.get("footers", {}).items():
structure["footers"][footer_id] = _parse_segment(footer_data)
return structure
def _parse_element(element: dict[str, Any]) -> Optional[dict[str, Any]]:
"""
Parse a single document element.
Args:
element: Element data from document
Returns:
Parsed element information or None
"""
element_info = {
'start_index': element.get('startIndex', 0),
'end_index': element.get('endIndex', 0)
"start_index": element.get("startIndex", 0),
"end_index": element.get("endIndex", 0),
}
if 'paragraph' in element:
paragraph = element['paragraph']
element_info['type'] = 'paragraph'
element_info['text'] = _extract_paragraph_text(paragraph)
element_info['style'] = paragraph.get('paragraphStyle', {})
elif 'table' in element:
table = element['table']
element_info['type'] = 'table'
element_info['rows'] = len(table.get('tableRows', []))
element_info['columns'] = len(table.get('tableRows', [{}])[0].get('tableCells', []))
element_info['cells'] = _parse_table_cells(table)
element_info['table_style'] = table.get('tableStyle', {})
elif 'sectionBreak' in element:
element_info['type'] = 'section_break'
element_info['section_style'] = element['sectionBreak'].get('sectionStyle', {})
elif 'tableOfContents' in element:
element_info['type'] = 'table_of_contents'
if "paragraph" in element:
paragraph = element["paragraph"]
element_info["type"] = "paragraph"
element_info["text"] = _extract_paragraph_text(paragraph)
element_info["style"] = paragraph.get("paragraphStyle", {})
elif "table" in element:
table = element["table"]
element_info["type"] = "table"
element_info["rows"] = len(table.get("tableRows", []))
element_info["columns"] = len(
table.get("tableRows", [{}])[0].get("tableCells", [])
)
element_info["cells"] = _parse_table_cells(table)
element_info["table_style"] = table.get("tableStyle", {})
elif "sectionBreak" in element:
element_info["type"] = "section_break"
element_info["section_style"] = element["sectionBreak"].get("sectionStyle", {})
elif "tableOfContents" in element:
element_info["type"] = "table_of_contents"
else:
return None
return element_info
def _parse_table_cells(table: dict[str, Any]) -> list[list[dict[str, Any]]]:
"""
Parse table cells with their positions and content.
Args:
table: Table element data
Returns:
2D list of cell information
"""
cells = []
for row_idx, row in enumerate(table.get('tableRows', [])):
for row_idx, row in enumerate(table.get("tableRows", [])):
row_cells = []
for col_idx, cell in enumerate(row.get('tableCells', [])):
for col_idx, cell in enumerate(row.get("tableCells", [])):
# Find the first paragraph in the cell for insertion
insertion_index = cell.get('startIndex', 0) + 1 # Default fallback
insertion_index = cell.get("startIndex", 0) + 1 # Default fallback
# Look for the first paragraph in cell content
content_elements = cell.get('content', [])
content_elements = cell.get("content", [])
for element in content_elements:
if 'paragraph' in element:
paragraph = element['paragraph']
if "paragraph" in element:
paragraph = element["paragraph"]
# Get the first element in the paragraph
para_elements = paragraph.get('elements', [])
para_elements = paragraph.get("elements", [])
if para_elements:
first_element = para_elements[0]
if 'startIndex' in first_element:
insertion_index = first_element['startIndex']
if "startIndex" in first_element:
insertion_index = first_element["startIndex"]
break
cell_info = {
'row': row_idx,
'column': col_idx,
'start_index': cell.get('startIndex', 0),
'end_index': cell.get('endIndex', 0),
'insertion_index': insertion_index, # Where to insert text in this cell
'content': _extract_cell_text(cell),
'content_elements': content_elements
"row": row_idx,
"column": col_idx,
"start_index": cell.get("startIndex", 0),
"end_index": cell.get("endIndex", 0),
"insertion_index": insertion_index, # Where to insert text in this cell
"content": _extract_cell_text(cell),
"content_elements": content_elements,
}
row_cells.append(cell_info)
cells.append(row_cells)
@@ -143,198 +146,212 @@ def _parse_table_cells(table: dict[str, Any]) -> list[list[dict[str, Any]]]:
def _extract_paragraph_text(paragraph: dict[str, Any]) -> str:
"""Extract text from a paragraph element."""
text_parts = []
for element in paragraph.get('elements', []):
if 'textRun' in element:
text_parts.append(element['textRun'].get('content', ''))
return ''.join(text_parts)
for element in paragraph.get("elements", []):
if "textRun" in element:
text_parts.append(element["textRun"].get("content", ""))
return "".join(text_parts)
def _extract_cell_text(cell: dict[str, Any]) -> str:
"""Extract text content from a table cell."""
text_parts = []
for element in cell.get('content', []):
if 'paragraph' in element:
text_parts.append(_extract_paragraph_text(element['paragraph']))
return ''.join(text_parts)
for element in cell.get("content", []):
if "paragraph" in element:
text_parts.append(_extract_paragraph_text(element["paragraph"]))
return "".join(text_parts)
def _parse_segment(segment_data: dict[str, Any]) -> dict[str, Any]:
"""Parse a document segment (header/footer)."""
return {
'content': segment_data.get('content', []),
'start_index': segment_data.get('content', [{}])[0].get('startIndex', 0) if segment_data.get('content') else 0,
'end_index': segment_data.get('content', [{}])[-1].get('endIndex', 0) if segment_data.get('content') else 0
"content": segment_data.get("content", []),
"start_index": segment_data.get("content", [{}])[0].get("startIndex", 0)
if segment_data.get("content")
else 0,
"end_index": segment_data.get("content", [{}])[-1].get("endIndex", 0)
if segment_data.get("content")
else 0,
}
def find_tables(doc_data: dict[str, Any]) -> list[dict[str, Any]]:
"""
Find all tables in the document with their positions and dimensions.
Args:
doc_data: Raw document data from Google Docs API
Returns:
List of table information dictionaries
"""
tables = []
structure = parse_document_structure(doc_data)
for idx, table_info in enumerate(structure['tables']):
tables.append({
'index': idx,
'start_index': table_info['start_index'],
'end_index': table_info['end_index'],
'rows': table_info['rows'],
'columns': table_info['columns'],
'cells': table_info['cells']
})
for idx, table_info in enumerate(structure["tables"]):
tables.append(
{
"index": idx,
"start_index": table_info["start_index"],
"end_index": table_info["end_index"],
"rows": table_info["rows"],
"columns": table_info["columns"],
"cells": table_info["cells"],
}
)
return tables
def get_table_cell_indices(doc_data: dict[str, Any], table_index: int = 0) -> Optional[list[list[tuple[int, int]]]]:
def get_table_cell_indices(
doc_data: dict[str, Any], table_index: int = 0
) -> Optional[list[list[tuple[int, int]]]]:
"""
Get content indices for all cells in a specific table.
Args:
doc_data: Raw document data from Google Docs API
table_index: Index of the table (0-based)
Returns:
2D list of (start_index, end_index) tuples for each cell, or None if table not found
"""
tables = find_tables(doc_data)
if table_index >= len(tables):
logger.warning(f"Table index {table_index} not found. Document has {len(tables)} tables.")
logger.warning(
f"Table index {table_index} not found. Document has {len(tables)} tables."
)
return None
table = tables[table_index]
cell_indices = []
for row in table['cells']:
for row in table["cells"]:
row_indices = []
for cell in row:
# Each cell contains at least one paragraph
# Find the first paragraph in the cell for content insertion
cell_content = cell.get('content_elements', [])
cell_content = cell.get("content_elements", [])
if cell_content:
# Look for the first paragraph in cell content
first_para = None
for element in cell_content:
if 'paragraph' in element:
first_para = element['paragraph']
if "paragraph" in element:
first_para = element["paragraph"]
break
if first_para and 'elements' in first_para and first_para['elements']:
if first_para and "elements" in first_para and first_para["elements"]:
# Insert at the start of the first text run in the paragraph
first_text_element = first_para['elements'][0]
if 'textRun' in first_text_element:
start_idx = first_text_element.get('startIndex', cell['start_index'] + 1)
end_idx = first_text_element.get('endIndex', start_idx + 1)
first_text_element = first_para["elements"][0]
if "textRun" in first_text_element:
start_idx = first_text_element.get(
"startIndex", cell["start_index"] + 1
)
end_idx = first_text_element.get("endIndex", start_idx + 1)
row_indices.append((start_idx, end_idx))
continue
# Fallback: use cell boundaries with safe margins
content_start = cell['start_index'] + 1
content_end = cell['end_index'] - 1
content_start = cell["start_index"] + 1
content_end = cell["end_index"] - 1
row_indices.append((content_start, content_end))
cell_indices.append(row_indices)
return cell_indices
def find_element_at_index(doc_data: dict[str, Any], index: int) -> Optional[dict[str, Any]]:
def find_element_at_index(
doc_data: dict[str, Any], index: int
) -> Optional[dict[str, Any]]:
"""
Find what element exists at a given index in the document.
Args:
doc_data: Raw document data from Google Docs API
index: Position in the document
Returns:
Information about the element at that position, or None
"""
structure = parse_document_structure(doc_data)
for element in structure['body']:
if element['start_index'] <= index < element['end_index']:
for element in structure["body"]:
if element["start_index"] <= index < element["end_index"]:
element_copy = element.copy()
# If it's a table, find which cell contains the index
if element['type'] == 'table' and 'cells' in element:
for row_idx, row in enumerate(element['cells']):
if element["type"] == "table" and "cells" in element:
for row_idx, row in enumerate(element["cells"]):
for col_idx, cell in enumerate(row):
if cell['start_index'] <= index < cell['end_index']:
element_copy['containing_cell'] = {
'row': row_idx,
'column': col_idx,
'cell_start': cell['start_index'],
'cell_end': cell['end_index']
if cell["start_index"] <= index < cell["end_index"]:
element_copy["containing_cell"] = {
"row": row_idx,
"column": col_idx,
"cell_start": cell["start_index"],
"cell_end": cell["end_index"],
}
break
return element_copy
return None
def get_next_paragraph_index(doc_data: dict[str, Any], after_index: int = 0) -> int:
"""
Find the next safe position to insert content after a given index.
Args:
doc_data: Raw document data from Google Docs API
after_index: Index after which to find insertion point
Returns:
Safe index for insertion
"""
structure = parse_document_structure(doc_data)
# Find the first paragraph element after the given index
for element in structure['body']:
if element['type'] == 'paragraph' and element['start_index'] > after_index:
for element in structure["body"]:
if element["type"] == "paragraph" and element["start_index"] > after_index:
# Insert at the end of the previous element or start of this paragraph
return element['start_index']
return element["start_index"]
# If no paragraph found, return the end of document
return structure['total_length'] - 1 if structure['total_length'] > 0 else 1
return structure["total_length"] - 1 if structure["total_length"] > 0 else 1
def analyze_document_complexity(doc_data: dict[str, Any]) -> dict[str, Any]:
"""
Analyze document complexity and provide statistics.
Args:
doc_data: Raw document data from Google Docs API
Returns:
Dictionary with document statistics
"""
structure = parse_document_structure(doc_data)
stats = {
'total_elements': len(structure['body']),
'tables': len(structure['tables']),
'paragraphs': sum(1 for e in structure['body'] if e.get('type') == 'paragraph'),
'section_breaks': sum(1 for e in structure['body'] if e.get('type') == 'section_break'),
'total_length': structure['total_length'],
'has_headers': bool(structure['headers']),
'has_footers': bool(structure['footers'])
"total_elements": len(structure["body"]),
"tables": len(structure["tables"]),
"paragraphs": sum(1 for e in structure["body"] if e.get("type") == "paragraph"),
"section_breaks": sum(
1 for e in structure["body"] if e.get("type") == "section_break"
),
"total_length": structure["total_length"],
"has_headers": bool(structure["headers"]),
"has_footers": bool(structure["footers"]),
}
# Add table statistics
if structure['tables']:
if structure["tables"]:
total_cells = sum(
table['rows'] * table['columns']
for table in structure['tables']
table["rows"] * table["columns"] for table in structure["tables"]
)
stats['total_table_cells'] = total_cells
stats['largest_table'] = max(
(t['rows'] * t['columns'] for t in structure['tables']),
default=0
stats["total_table_cells"] = total_cells
stats["largest_table"] = max(
(t["rows"] * t["columns"] for t in structure["tables"]), default=0
)
return stats
return stats

View File

@@ -4,6 +4,7 @@ Google Docs Table Operations
This module provides utilities for creating and manipulating tables
in Google Docs, including population with data and formatting.
"""
import logging
from typing import Dict, Any, List, Optional, Union, Tuple
@@ -11,174 +12,188 @@ logger = logging.getLogger(__name__)
def build_table_population_requests(
table_info: Dict[str, Any],
data: List[List[str]],
bold_headers: bool = True
table_info: Dict[str, Any], data: List[List[str]], bold_headers: bool = True
) -> List[Dict[str, Any]]:
"""
Build batch requests to populate a table with data.
Args:
table_info: Table information from document structure including cell indices
data: 2D array of data to insert into table
bold_headers: Whether to make the first row bold
Returns:
List of request dictionaries for batch update
"""
requests = []
cells = table_info.get('cells', [])
cells = table_info.get("cells", [])
if not cells:
logger.warning("No cell information found in table_info")
return requests
# Process each cell - ONLY INSERT, DON'T DELETE
for row_idx, row_data in enumerate(data):
if row_idx >= len(cells):
logger.warning(f"Data has more rows ({len(data)}) than table ({len(cells)})")
logger.warning(
f"Data has more rows ({len(data)}) than table ({len(cells)})"
)
break
for col_idx, cell_text in enumerate(row_data):
if col_idx >= len(cells[row_idx]):
logger.warning(f"Data has more columns ({len(row_data)}) than table row {row_idx} ({len(cells[row_idx])})")
logger.warning(
f"Data has more columns ({len(row_data)}) than table row {row_idx} ({len(cells[row_idx])})"
)
break
cell = cells[row_idx][col_idx]
# For new empty tables, use the insertion index
# For tables with existing content, check if cell only contains newline
existing_content = cell.get('content', '').strip()
existing_content = cell.get("content", "").strip()
# Only insert if we have text to insert
if cell_text:
# Use the specific insertion index for this cell
insertion_index = cell.get('insertion_index', cell['start_index'] + 1)
insertion_index = cell.get("insertion_index", cell["start_index"] + 1)
# If cell only contains a newline, replace it
if existing_content == '' or existing_content == '\n':
if existing_content == "" or existing_content == "\n":
# Cell is empty (just newline), insert at the insertion index
requests.append({
'insertText': {
'location': {'index': insertion_index},
'text': cell_text
requests.append(
{
"insertText": {
"location": {"index": insertion_index},
"text": cell_text,
}
}
})
)
# Apply bold formatting to first row if requested
if bold_headers and row_idx == 0:
requests.append({
'updateTextStyle': {
'range': {
'startIndex': insertion_index,
'endIndex': insertion_index + len(cell_text)
},
'textStyle': {'bold': True},
'fields': 'bold'
requests.append(
{
"updateTextStyle": {
"range": {
"startIndex": insertion_index,
"endIndex": insertion_index + len(cell_text),
},
"textStyle": {"bold": True},
"fields": "bold",
}
}
})
)
else:
# Cell has content, append after existing content
# Find the end of existing content
cell_end = cell['end_index'] - 1 # Don't include cell end marker
requests.append({
'insertText': {
'location': {'index': cell_end},
'text': cell_text
cell_end = cell["end_index"] - 1 # Don't include cell end marker
requests.append(
{
"insertText": {
"location": {"index": cell_end},
"text": cell_text,
}
}
})
)
# Apply bold formatting to first row if requested
if bold_headers and row_idx == 0:
requests.append({
'updateTextStyle': {
'range': {
'startIndex': cell_end,
'endIndex': cell_end + len(cell_text)
},
'textStyle': {'bold': True},
'fields': 'bold'
requests.append(
{
"updateTextStyle": {
"range": {
"startIndex": cell_end,
"endIndex": cell_end + len(cell_text),
},
"textStyle": {"bold": True},
"fields": "bold",
}
}
})
)
return requests
def calculate_cell_positions(
table_start_index: int,
rows: int,
table_start_index: int,
rows: int,
cols: int,
existing_table_data: Optional[Dict[str, Any]] = None
existing_table_data: Optional[Dict[str, Any]] = None,
) -> List[List[Dict[str, int]]]:
"""
Calculate estimated positions for each cell in a table.
Args:
table_start_index: Starting index of the table
rows: Number of rows
cols: Number of columns
existing_table_data: Optional existing table data with actual positions
Returns:
2D list of cell position dictionaries
"""
if existing_table_data and 'cells' in existing_table_data:
if existing_table_data and "cells" in existing_table_data:
# Use actual positions from existing table
return existing_table_data['cells']
return existing_table_data["cells"]
# Estimate positions for a new table
# Note: These are estimates; actual positions depend on content
cells = []
current_index = table_start_index + 2 # Account for table start
for row_idx in range(rows):
row_cells = []
for col_idx in range(cols):
# Each cell typically starts with a paragraph marker
cell_start = current_index
cell_end = current_index + 2 # Minimum cell size
row_cells.append({
'row': row_idx,
'column': col_idx,
'start_index': cell_start,
'end_index': cell_end
})
row_cells.append(
{
"row": row_idx,
"column": col_idx,
"start_index": cell_start,
"end_index": cell_end,
}
)
current_index = cell_end + 1
cells.append(row_cells)
return cells
def format_table_data(raw_data: Union[List[List[str]], List[str], str]) -> List[List[str]]:
def format_table_data(
raw_data: Union[List[List[str]], List[str], str],
) -> List[List[str]]:
"""
Normalize various data formats into a 2D array for table insertion.
Args:
raw_data: Data in various formats (2D list, 1D list, or delimited string)
Returns:
Normalized 2D list of strings
"""
if isinstance(raw_data, str):
# Parse delimited string (detect delimiter)
lines = raw_data.strip().split('\n')
if '\t' in raw_data:
lines = raw_data.strip().split("\n")
if "\t" in raw_data:
# Tab-delimited
return [line.split('\t') for line in lines]
elif ',' in raw_data:
return [line.split("\t") for line in lines]
elif "," in raw_data:
# Comma-delimited (simple CSV)
return [line.split(',') for line in lines]
return [line.split(",") for line in lines]
else:
# Space-delimited or single column
return [[cell.strip() for cell in line.split()] for line in lines]
elif isinstance(raw_data, list):
if not raw_data:
return [[]]
# Check if it's already a 2D list
if isinstance(raw_data[0], list):
# Ensure all cells are strings
@@ -186,7 +201,7 @@ def format_table_data(raw_data: Union[List[List[str]], List[str], str]) -> List[
else:
# Convert 1D list to single-column table
return [[str(cell)] for cell in raw_data]
else:
# Convert single value to 1x1 table
return [[str(raw_data)]]
@@ -196,65 +211,60 @@ def create_table_with_data(
index: int,
data: List[List[str]],
headers: Optional[List[str]] = None,
bold_headers: bool = True
bold_headers: bool = True,
) -> List[Dict[str, Any]]:
"""
Create a table and populate it with data in one operation.
Args:
index: Position to insert the table
data: 2D array of table data
headers: Optional header row (will be prepended to data)
bold_headers: Whether to make headers bold
Returns:
List of request dictionaries for batch update
"""
requests = []
# Prepare data with headers if provided
if headers:
full_data = [headers] + data
else:
full_data = data
# Normalize the data
full_data = format_table_data(full_data)
if not full_data or not full_data[0]:
raise ValueError("Cannot create table with empty data")
rows = len(full_data)
cols = len(full_data[0])
# Ensure all rows have the same number of columns
for row in full_data:
while len(row) < cols:
row.append('')
row.append("")
# Create the table
requests.append({
'insertTable': {
'location': {'index': index},
'rows': rows,
'columns': cols
}
})
requests.append(
{"insertTable": {"location": {"index": index}, "rows": rows, "columns": cols}}
)
# Build text insertion requests for each cell
# Note: In practice, we'd need to get the actual document structure
# after table creation to get accurate indices
return requests
def build_table_style_requests(
table_start_index: int,
style_options: Dict[str, Any]
table_start_index: int, style_options: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Build requests to style a table.
Args:
table_start_index: Starting index of the table
style_options: Dictionary of style options
@@ -262,177 +272,193 @@ def build_table_style_requests(
- border_color: RGB color for borders
- background_color: RGB color for cell backgrounds
- header_background: RGB color for header row background
Returns:
List of request dictionaries for styling
"""
requests = []
# Table cell style update
if any(k in style_options for k in ['border_width', 'border_color', 'background_color']):
if any(
k in style_options for k in ["border_width", "border_color", "background_color"]
):
table_cell_style = {}
fields = []
if 'border_width' in style_options:
border_width = {'magnitude': style_options['border_width'], 'unit': 'PT'}
table_cell_style['borderTop'] = {'width': border_width}
table_cell_style['borderBottom'] = {'width': border_width}
table_cell_style['borderLeft'] = {'width': border_width}
table_cell_style['borderRight'] = {'width': border_width}
fields.extend(['borderTop', 'borderBottom', 'borderLeft', 'borderRight'])
if 'border_color' in style_options:
border_color = {'color': {'rgbColor': style_options['border_color']}}
if 'borderTop' in table_cell_style:
table_cell_style['borderTop']['color'] = border_color['color']
table_cell_style['borderBottom']['color'] = border_color['color']
table_cell_style['borderLeft']['color'] = border_color['color']
table_cell_style['borderRight']['color'] = border_color['color']
if 'background_color' in style_options:
table_cell_style['backgroundColor'] = {
'color': {'rgbColor': style_options['background_color']}
if "border_width" in style_options:
border_width = {"magnitude": style_options["border_width"], "unit": "PT"}
table_cell_style["borderTop"] = {"width": border_width}
table_cell_style["borderBottom"] = {"width": border_width}
table_cell_style["borderLeft"] = {"width": border_width}
table_cell_style["borderRight"] = {"width": border_width}
fields.extend(["borderTop", "borderBottom", "borderLeft", "borderRight"])
if "border_color" in style_options:
border_color = {"color": {"rgbColor": style_options["border_color"]}}
if "borderTop" in table_cell_style:
table_cell_style["borderTop"]["color"] = border_color["color"]
table_cell_style["borderBottom"]["color"] = border_color["color"]
table_cell_style["borderLeft"]["color"] = border_color["color"]
table_cell_style["borderRight"]["color"] = border_color["color"]
if "background_color" in style_options:
table_cell_style["backgroundColor"] = {
"color": {"rgbColor": style_options["background_color"]}
}
fields.append('backgroundColor')
fields.append("backgroundColor")
if table_cell_style and fields:
requests.append({
'updateTableCellStyle': {
'tableStartLocation': {'index': table_start_index},
'tableCellStyle': table_cell_style,
'fields': ','.join(fields)
}
})
# Header row specific styling
if 'header_background' in style_options:
requests.append({
'updateTableCellStyle': {
'tableRange': {
'tableCellLocation': {
'tableStartLocation': {'index': table_start_index},
'rowIndex': 0,
'columnIndex': 0
},
'rowSpan': 1,
'columnSpan': 100 # Large number to cover all columns
},
'tableCellStyle': {
'backgroundColor': {
'color': {'rgbColor': style_options['header_background']}
requests.append(
{
"updateTableCellStyle": {
"tableStartLocation": {"index": table_start_index},
"tableCellStyle": table_cell_style,
"fields": ",".join(fields),
}
},
'fields': 'backgroundColor'
}
)
# Header row specific styling
if "header_background" in style_options:
requests.append(
{
"updateTableCellStyle": {
"tableRange": {
"tableCellLocation": {
"tableStartLocation": {"index": table_start_index},
"rowIndex": 0,
"columnIndex": 0,
},
"rowSpan": 1,
"columnSpan": 100, # Large number to cover all columns
},
"tableCellStyle": {
"backgroundColor": {
"color": {"rgbColor": style_options["header_background"]}
}
},
"fields": "backgroundColor",
}
}
})
)
return requests
def extract_table_as_data(table_info: Dict[str, Any]) -> List[List[str]]:
"""
Extract table content as a 2D array of strings.
Args:
table_info: Table information from document structure
Returns:
2D list of cell contents
"""
data = []
cells = table_info.get('cells', [])
cells = table_info.get("cells", [])
for row in cells:
row_data = []
for cell in row:
row_data.append(cell.get('content', '').strip())
row_data.append(cell.get("content", "").strip())
data.append(row_data)
return data
def find_table_by_content(
tables: List[Dict[str, Any]],
search_text: str,
case_sensitive: bool = False
tables: List[Dict[str, Any]], search_text: str, case_sensitive: bool = False
) -> Optional[int]:
"""
Find a table index by searching for content within it.
Args:
tables: List of table information from document
search_text: Text to search for in table cells
case_sensitive: Whether to do case-sensitive search
Returns:
Index of the first matching table, or None
"""
search_text = search_text if case_sensitive else search_text.lower()
for idx, table in enumerate(tables):
for row in table.get('cells', []):
for row in table.get("cells", []):
for cell in row:
cell_content = cell.get('content', '')
cell_content = cell.get("content", "")
if not case_sensitive:
cell_content = cell_content.lower()
if search_text in cell_content:
return idx
return None
def validate_table_data(data: List[List[str]]) -> Tuple[bool, str]:
"""
Validates table data format and provides specific error messages for LLMs.
WHAT THIS CHECKS:
- Data is a 2D list (list of lists)
- All rows have consistent column counts
- Dimensions are within Google Docs limits
- No None or undefined values
VALID FORMAT EXAMPLE:
[
["Header1", "Header2"], # Row 0 - 2 columns
["Data1", "Data2"], # Row 1 - 2 columns
["Data1", "Data2"], # Row 1 - 2 columns
["Data3", "Data4"] # Row 2 - 2 columns
]
INVALID FORMATS:
- [["col1"], ["col1", "col2"]] # Inconsistent column counts
- ["col1", "col2"] # Not 2D (missing inner lists)
- [["col1", None]] # Contains None values
- [] or [[]] # Empty data
Args:
data: 2D array of data to validate
Returns:
Tuple of (is_valid, error_message_with_examples)
"""
if not data:
return False, "Data is empty. Use format: [['col1', 'col2'], ['row1col1', 'row1col2']]"
return (
False,
"Data is empty. Use format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
)
if not isinstance(data, list):
return False, f"Data must be a list, got {type(data).__name__}. Use format: [['col1', 'col2'], ['row1col1', 'row1col2']]"
return (
False,
f"Data must be a list, got {type(data).__name__}. Use format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
)
if not all(isinstance(row, list) for row in data):
return False, f"Data must be a 2D list (list of lists). Each row must be a list. Check your format: {data}"
return (
False,
f"Data must be a 2D list (list of lists). Each row must be a list. Check your format: {data}",
)
# Check for consistent column count
col_counts = [len(row) for row in data]
if len(set(col_counts)) > 1:
return False, f"All rows must have same number of columns. Found: {col_counts}. Fix your data format."
return (
False,
f"All rows must have same number of columns. Found: {col_counts}. Fix your data format.",
)
# Check for reasonable size
rows = len(data)
cols = col_counts[0] if col_counts else 0
if rows > 1000:
return False, f"Too many rows ({rows}). Google Docs limit is 1000 rows."
if cols > 20:
return False, f"Too many columns ({cols}). Google Docs limit is 20 columns."
return True, f"Valid table data: {rows}x{cols} table format"
return True, f"Valid table data: {rows}x{cols} table format"

View File

@@ -3,6 +3,7 @@ Google Docs MCP Tools
This module provides MCP tools for interacting with Google Docs API and managing Google Docs via Drive.
"""
import logging
import asyncio
import io
@@ -25,30 +26,29 @@ from gdocs.docs_helpers import (
create_insert_table_request,
create_insert_page_break_request,
create_insert_image_request,
create_bullet_list_request
create_bullet_list_request,
)
# Import document structure and table utilities
from gdocs.docs_structure import (
parse_document_structure,
find_tables,
analyze_document_complexity
)
from gdocs.docs_tables import (
extract_table_as_data
analyze_document_complexity,
)
from gdocs.docs_tables import extract_table_as_data
# Import operation managers for complex business logic
from gdocs.managers import (
TableOperationManager,
HeaderFooterManager,
ValidationManager,
BatchOperationManager
BatchOperationManager,
)
import json
logger = logging.getLogger(__name__)
@server.tool()
@handle_http_errors("search_docs", is_read_only=True, service_type="docs")
@require_google_service("drive", "drive_read")
@@ -69,15 +69,17 @@ async def search_docs(
escaped_query = query.replace("'", "\\'")
response = await asyncio.to_thread(
service.files().list(
service.files()
.list(
q=f"name contains '{escaped_query}' and mimeType='application/vnd.google-apps.document' and trashed=false",
pageSize=page_size,
fields="files(id, name, createdTime, modifiedTime, webViewLink)",
supportsAllDrives=True,
includeItemsFromAllDrives=True
).execute
includeItemsFromAllDrives=True,
)
.execute
)
files = response.get('files', [])
files = response.get("files", [])
if not files:
return f"No Google Docs found matching '{query}'."
@@ -88,12 +90,19 @@ async def search_docs(
)
return "\n".join(output)
@server.tool()
@handle_http_errors("get_doc_content", is_read_only=True, service_type="docs")
@require_multiple_services([
{"service_type": "drive", "scopes": "drive_read", "param_name": "drive_service"},
{"service_type": "docs", "scopes": "docs_read", "param_name": "docs_service"}
])
@require_multiple_services(
[
{
"service_type": "drive",
"scopes": "drive_read",
"param_name": "drive_service",
},
{"service_type": "docs", "scopes": "docs_read", "param_name": "docs_service"},
]
)
async def get_doc_content(
drive_service: Any,
docs_service: Any,
@@ -108,31 +117,37 @@ async def get_doc_content(
Returns:
str: The document content with metadata header.
"""
logger.info(f"[get_doc_content] Invoked. Document/File ID: '{document_id}' for user '{user_google_email}'")
logger.info(
f"[get_doc_content] Invoked. Document/File ID: '{document_id}' for user '{user_google_email}'"
)
# Step 2: Get file metadata from Drive
file_metadata = await asyncio.to_thread(
drive_service.files().get(
fileId=document_id, fields="id, name, mimeType, webViewLink",
supportsAllDrives=True
).execute
drive_service.files()
.get(
fileId=document_id,
fields="id, name, mimeType, webViewLink",
supportsAllDrives=True,
)
.execute
)
mime_type = file_metadata.get("mimeType", "")
file_name = file_metadata.get("name", "Unknown File")
web_view_link = file_metadata.get("webViewLink", "#")
logger.info(f"[get_doc_content] File '{file_name}' (ID: {document_id}) has mimeType: '{mime_type}'")
logger.info(
f"[get_doc_content] File '{file_name}' (ID: {document_id}) has mimeType: '{mime_type}'"
)
body_text = "" # Initialize body_text
body_text = "" # Initialize body_text
# Step 3: Process based on mimeType
if mime_type == "application/vnd.google-apps.document":
logger.info("[get_doc_content] Processing as native Google Doc.")
doc_data = await asyncio.to_thread(
docs_service.documents().get(
documentId=document_id,
includeTabsContent=True
).execute
docs_service.documents()
.get(documentId=document_id, includeTabsContent=True)
.execute
)
# Tab header format constant
TAB_HEADER_FORMAT = "\n--- TAB: {tab_name} ---\n"
@@ -147,25 +162,27 @@ async def get_doc_content(
text_lines.append(TAB_HEADER_FORMAT.format(tab_name=tab_name))
for element in elements:
if 'paragraph' in element:
paragraph = element.get('paragraph', {})
para_elements = paragraph.get('elements', [])
if "paragraph" in element:
paragraph = element.get("paragraph", {})
para_elements = paragraph.get("elements", [])
current_line_text = ""
for pe in para_elements:
text_run = pe.get('textRun', {})
if text_run and 'content' in text_run:
current_line_text += text_run['content']
text_run = pe.get("textRun", {})
if text_run and "content" in text_run:
current_line_text += text_run["content"]
if current_line_text.strip():
text_lines.append(current_line_text)
elif 'table' in element:
elif "table" in element:
# Handle table content
table = element.get('table', {})
table_rows = table.get('tableRows', [])
table = element.get("table", {})
table_rows = table.get("tableRows", [])
for row in table_rows:
row_cells = row.get('tableCells', [])
row_cells = row.get("tableCells", [])
for cell in row_cells:
cell_content = cell.get('content', [])
cell_text = extract_text_from_elements(cell_content, depth=depth + 1)
cell_content = cell.get("content", [])
cell_text = extract_text_from_elements(
cell_content, depth=depth + 1
)
if cell_text.strip():
text_lines.append(cell_text)
return "".join(text_lines)
@@ -174,18 +191,18 @@ async def get_doc_content(
"""Process a tab and its nested child tabs recursively"""
tab_text = ""
if 'documentTab' in tab:
props = tab.get('tabProperties', {})
tab_title = props.get('title', 'Untitled Tab')
tab_id = props.get('tabId', 'Unknown ID')
if "documentTab" in tab:
props = tab.get("tabProperties", {})
tab_title = props.get("title", "Untitled Tab")
tab_id = props.get("tabId", "Unknown ID")
# Add indentation for nested tabs to show hierarchy
if level > 0:
tab_title = " " * level + f"{tab_title} ( ID: {tab_id})"
tab_body = tab.get('documentTab', {}).get('body', {}).get('content', [])
tab_body = tab.get("documentTab", {}).get("body", {}).get("content", [])
tab_text += extract_text_from_elements(tab_body, tab_title)
# Process child tabs (nested tabs)
child_tabs = tab.get('childTabs', [])
child_tabs = tab.get("childTabs", [])
for child_tab in child_tabs:
tab_text += process_tab_hierarchy(child_tab, level + 1)
@@ -194,13 +211,13 @@ async def get_doc_content(
processed_text_lines = []
# Process main document body
body_elements = doc_data.get('body', {}).get('content', [])
body_elements = doc_data.get("body", {}).get("content", [])
main_content = extract_text_from_elements(body_elements)
if main_content.strip():
processed_text_lines.append(main_content)
# Process all tabs
tabs = doc_data.get('tabs', [])
tabs = doc_data.get("tabs", [])
for tab in tabs:
tab_content = process_tab_hierarchy(tab)
if tab_content.strip():
@@ -208,19 +225,27 @@ async def get_doc_content(
body_text = "".join(processed_text_lines)
else:
logger.info(f"[get_doc_content] Processing as Drive file (e.g., .docx, other). MimeType: {mime_type}")
logger.info(
f"[get_doc_content] Processing as Drive file (e.g., .docx, other). MimeType: {mime_type}"
)
export_mime_type_map = {
# Example: "application/vnd.google-apps.spreadsheet"z: "text/csv",
# Native GSuite types that are not Docs would go here if this function
# was intended to export them. For .docx, direct download is used.
# Example: "application/vnd.google-apps.spreadsheet"z: "text/csv",
# Native GSuite types that are not Docs would go here if this function
# was intended to export them. For .docx, direct download is used.
}
effective_export_mime = export_mime_type_map.get(mime_type)
request_obj = (
drive_service.files().export_media(fileId=document_id, mimeType=effective_export_mime, supportsAllDrives=True)
drive_service.files().export_media(
fileId=document_id,
mimeType=effective_export_mime,
supportsAllDrives=True,
)
if effective_export_mime
else drive_service.files().get_media(fileId=document_id, supportsAllDrives=True)
else drive_service.files().get_media(
fileId=document_id, supportsAllDrives=True
)
)
fh = io.BytesIO()
@@ -246,18 +271,16 @@ async def get_doc_content(
header = (
f'File: "{file_name}" (ID: {document_id}, Type: {mime_type})\n'
f'Link: {web_view_link}\n\n--- CONTENT ---\n'
f"Link: {web_view_link}\n\n--- CONTENT ---\n"
)
return header + body_text
@server.tool()
@handle_http_errors("list_docs_in_folder", is_read_only=True, service_type="docs")
@require_google_service("drive", "drive_read")
async def list_docs_in_folder(
service: Any,
user_google_email: str,
folder_id: str = 'root',
page_size: int = 100
service: Any, user_google_email: str, folder_id: str = "root", page_size: int = 100
) -> str:
"""
Lists Google Docs within a specific Drive folder.
@@ -265,25 +288,32 @@ async def list_docs_in_folder(
Returns:
str: A formatted list of Google Docs in the specified folder.
"""
logger.info(f"[list_docs_in_folder] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'")
logger.info(
f"[list_docs_in_folder] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'"
)
rsp = await asyncio.to_thread(
service.files().list(
service.files()
.list(
q=f"'{folder_id}' in parents and mimeType='application/vnd.google-apps.document' and trashed=false",
pageSize=page_size,
fields="files(id, name, modifiedTime, webViewLink)",
supportsAllDrives=True,
includeItemsFromAllDrives=True
).execute
includeItemsFromAllDrives=True,
)
.execute
)
items = rsp.get('files', [])
items = rsp.get("files", [])
if not items:
return f"No Google Docs found in folder '{folder_id}'."
out = [f"Found {len(items)} Docs in folder '{folder_id}':"]
for f in items:
out.append(f"- {f['name']} (ID: {f['id']}) Modified: {f.get('modifiedTime')} Link: {f.get('webViewLink')}")
out.append(
f"- {f['name']} (ID: {f['id']}) Modified: {f.get('modifiedTime')} Link: {f.get('webViewLink')}"
)
return "\n".join(out)
@server.tool()
@handle_http_errors("create_doc", service_type="docs")
@require_google_service("docs", "docs_write")
@@ -291,7 +321,7 @@ async def create_doc(
service: Any,
user_google_email: str,
title: str,
content: str = '',
content: str = "",
) -> str:
"""
Creates a new Google Doc and optionally inserts initial content.
@@ -301,14 +331,22 @@ async def create_doc(
"""
logger.info(f"[create_doc] Invoked. Email: '{user_google_email}', Title='{title}'")
doc = await asyncio.to_thread(service.documents().create(body={'title': title}).execute)
doc_id = doc.get('documentId')
doc = await asyncio.to_thread(
service.documents().create(body={"title": title}).execute
)
doc_id = doc.get("documentId")
if content:
requests = [{'insertText': {'location': {'index': 1}, 'text': content}}]
await asyncio.to_thread(service.documents().batchUpdate(documentId=doc_id, body={'requests': requests}).execute)
requests = [{"insertText": {"location": {"index": 1}, "text": content}}]
await asyncio.to_thread(
service.documents()
.batchUpdate(documentId=doc_id, body={"requests": requests})
.execute
)
link = f"https://docs.google.com/document/d/{doc_id}/edit"
msg = f"Created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}"
logger.info(f"Successfully created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}")
logger.info(
f"Successfully created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}"
)
return msg
@@ -363,16 +401,39 @@ async def modify_doc_text(
return f"Error: {error_msg}"
# Validate that we have something to do
if text is None and not any([
bold is not None, italic is not None, underline is not None,
font_size, font_family, text_color, background_color
]):
if text is None and not any(
[
bold is not None,
italic is not None,
underline is not None,
font_size,
font_family,
text_color,
background_color,
]
):
return "Error: Must provide either 'text' to insert/replace, or formatting parameters (bold, italic, underline, font_size, font_family, text_color, background_color)."
# Validate text formatting params if provided
if any([bold is not None, italic is not None, underline is not None, font_size, font_family, text_color, background_color]):
if any(
[
bold is not None,
italic is not None,
underline is not None,
font_size,
font_family,
text_color,
background_color,
]
):
is_valid, error_msg = validator.validate_text_formatting_params(
bold, italic, underline, font_size, font_family, text_color, background_color
bold,
italic,
underline,
font_size,
font_family,
text_color,
background_color,
)
if not is_valid:
return f"Error: {error_msg}"
@@ -397,15 +458,23 @@ async def modify_doc_text(
# Instead, we insert new text at index 1 and then delete the old text
requests.append(create_insert_text_request(1, text))
adjusted_end = end_index + len(text)
requests.append(create_delete_range_request(1 + len(text), adjusted_end))
operations.append(f"Replaced text from index {start_index} to {end_index}")
requests.append(
create_delete_range_request(1 + len(text), adjusted_end)
)
operations.append(
f"Replaced text from index {start_index} to {end_index}"
)
else:
# Normal replacement: delete old text, then insert new text
requests.extend([
create_delete_range_request(start_index, end_index),
create_insert_text_request(start_index, text)
])
operations.append(f"Replaced text from index {start_index} to {end_index}")
requests.extend(
[
create_delete_range_request(start_index, end_index),
create_insert_text_request(start_index, text),
]
)
operations.append(
f"Replaced text from index {start_index} to {end_index}"
)
else:
# Text insertion
actual_index = 1 if start_index == 0 else start_index
@@ -413,7 +482,17 @@ async def modify_doc_text(
operations.append(f"Inserted text at index {start_index}")
# Handle formatting
if any([bold is not None, italic is not None, underline is not None, font_size, font_family, text_color, background_color]):
if any(
[
bold is not None,
italic is not None,
underline is not None,
font_size,
font_family,
text_color,
background_color,
]
):
# Adjust range for formatting based on text operations
format_start = start_index
format_end = end_index
@@ -444,7 +523,7 @@ async def modify_doc_text(
font_size,
font_family,
text_color,
background_color
background_color,
)
)
@@ -464,13 +543,14 @@ async def modify_doc_text(
if background_color:
format_details.append(f"background_color={background_color}")
operations.append(f"Applied formatting ({', '.join(format_details)}) to range {format_start}-{format_end}")
operations.append(
f"Applied formatting ({', '.join(format_details)}) to range {format_start}-{format_end}"
)
await asyncio.to_thread(
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute
service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute
)
link = f"https://docs.google.com/document/d/{document_id}/edit"
@@ -478,6 +558,7 @@ async def modify_doc_text(
text_info = f" Text length: {len(text)} characters." if text else ""
return f"{operation_summary} in document {document_id}.{text_info} Link: {link}"
@server.tool()
@handle_http_errors("find_and_replace_doc", service_type="docs")
@require_google_service("docs", "docs_write")
@@ -502,23 +583,24 @@ async def find_and_replace_doc(
Returns:
str: Confirmation message with replacement count
"""
logger.info(f"[find_and_replace_doc] Doc={document_id}, find='{find_text}', replace='{replace_text}'")
logger.info(
f"[find_and_replace_doc] Doc={document_id}, find='{find_text}', replace='{replace_text}'"
)
requests = [create_find_replace_request(find_text, replace_text, match_case)]
result = await asyncio.to_thread(
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute
service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute
)
# Extract number of replacements from response
replacements = 0
if 'replies' in result and result['replies']:
reply = result['replies'][0]
if 'replaceAllText' in reply:
replacements = reply['replaceAllText'].get('occurrencesChanged', 0)
if "replies" in result and result["replies"]:
reply = result["replies"][0]
if "replaceAllText" in reply:
replacements = reply["replaceAllText"].get("occurrencesChanged", 0)
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Replaced {replacements} occurrence(s) of '{find_text}' with '{replace_text}' in document {document_id}. Link: {link}"
@@ -554,7 +636,9 @@ async def insert_doc_elements(
Returns:
str: Confirmation message with insertion details
"""
logger.info(f"[insert_doc_elements] Doc={document_id}, type={element_type}, index={index}")
logger.info(
f"[insert_doc_elements] Doc={document_id}, type={element_type}, index={index}"
)
# Handle the special case where we can't insert at the first section break
# If index is 0, bump it to 1 to avoid the section break
@@ -579,10 +663,12 @@ async def insert_doc_elements(
text = "List item"
# Insert text first, then create list
requests.extend([
create_insert_text_request(index, text + '\n'),
create_bullet_list_request(index, index + len(text), list_type)
])
requests.extend(
[
create_insert_text_request(index, text + "\n"),
create_bullet_list_request(index, index + len(text), list_type),
]
)
description = f"{list_type.lower()} list"
elif element_type == "page_break":
@@ -593,21 +679,27 @@ async def insert_doc_elements(
return f"Error: Unsupported element type '{element_type}'. Supported types: 'table', 'list', 'page_break'."
await asyncio.to_thread(
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute
service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute
)
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Inserted {description} at index {index} in document {document_id}. Link: {link}"
@server.tool()
@handle_http_errors("insert_doc_image", service_type="docs")
@require_multiple_services([
{"service_type": "docs", "scopes": "docs_write", "param_name": "docs_service"},
{"service_type": "drive", "scopes": "drive_read", "param_name": "drive_service"}
])
@require_multiple_services(
[
{"service_type": "docs", "scopes": "docs_write", "param_name": "docs_service"},
{
"service_type": "drive",
"scopes": "drive_read",
"param_name": "drive_service",
},
]
)
async def insert_doc_image(
docs_service: Any,
drive_service: Any,
@@ -632,7 +724,9 @@ async def insert_doc_image(
Returns:
str: Confirmation message with insertion details
"""
logger.info(f"[insert_doc_image] Doc={document_id}, source={image_source}, index={index}")
logger.info(
f"[insert_doc_image] Doc={document_id}, source={image_source}, index={index}"
)
# Handle the special case where we can't insert at the first section break
# If index is 0, bump it to 1 to avoid the section break
@@ -641,20 +735,24 @@ async def insert_doc_image(
index = 1
# Determine if source is a Drive file ID or URL
is_drive_file = not (image_source.startswith('http://') or image_source.startswith('https://'))
is_drive_file = not (
image_source.startswith("http://") or image_source.startswith("https://")
)
if is_drive_file:
# Verify Drive file exists and get metadata
try:
file_metadata = await asyncio.to_thread(
drive_service.files().get(
drive_service.files()
.get(
fileId=image_source,
fields="id, name, mimeType",
supportsAllDrives=True
).execute
supportsAllDrives=True,
)
.execute
)
mime_type = file_metadata.get('mimeType', '')
if not mime_type.startswith('image/'):
mime_type = file_metadata.get("mimeType", "")
if not mime_type.startswith("image/"):
return f"Error: File {image_source} is not an image (MIME type: {mime_type})."
image_uri = f"https://drive.google.com/uc?id={image_source}"
@@ -669,10 +767,9 @@ async def insert_doc_image(
requests = [create_insert_image_request(index, image_uri, width, height)]
await asyncio.to_thread(
docs_service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute
docs_service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute
)
size_info = ""
@@ -682,6 +779,7 @@ async def insert_doc_image(
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Inserted {source_description}{size_info} at index {index} in document {document_id}. Link: {link}"
@server.tool()
@handle_http_errors("update_doc_headers_footers", service_type="docs")
@require_google_service("docs", "docs_write")
@@ -715,7 +813,9 @@ async def update_doc_headers_footers(
if not is_valid:
return f"Error: {error_msg}"
is_valid, error_msg = validator.validate_header_footer_params(section_type, header_footer_type)
is_valid, error_msg = validator.validate_header_footer_params(
section_type, header_footer_type
)
if not is_valid:
return f"Error: {error_msg}"
@@ -736,6 +836,7 @@ async def update_doc_headers_footers(
else:
return f"Error: {message}"
@server.tool()
@handle_http_errors("batch_update_doc", service_type="docs")
@require_google_service("docs", "docs_write")
@@ -787,11 +888,12 @@ async def batch_update_doc(
if success:
link = f"https://docs.google.com/document/d/{document_id}/edit"
replies_count = metadata.get('replies_count', 0)
replies_count = metadata.get("replies_count", 0)
return f"{message} on document {document_id}. API replies: {replies_count}. Link: {link}"
else:
return f"Error: {message}"
@server.tool()
@handle_http_errors("inspect_doc_structure", is_read_only=True, service_type="docs")
@require_google_service("docs", "docs_read")
@@ -846,46 +948,56 @@ async def inspect_doc_structure(
# Simplify for JSON serialization
result = {
'title': structure['title'],
'total_length': structure['total_length'],
'statistics': {
'elements': len(structure['body']),
'tables': len(structure['tables']),
'paragraphs': sum(1 for e in structure['body'] if e.get('type') == 'paragraph'),
'has_headers': bool(structure['headers']),
'has_footers': bool(structure['footers'])
"title": structure["title"],
"total_length": structure["total_length"],
"statistics": {
"elements": len(structure["body"]),
"tables": len(structure["tables"]),
"paragraphs": sum(
1 for e in structure["body"] if e.get("type") == "paragraph"
),
"has_headers": bool(structure["headers"]),
"has_footers": bool(structure["footers"]),
},
'elements': []
"elements": [],
}
# Add element summaries
for element in structure['body']:
for element in structure["body"]:
elem_summary = {
'type': element['type'],
'start_index': element['start_index'],
'end_index': element['end_index']
"type": element["type"],
"start_index": element["start_index"],
"end_index": element["end_index"],
}
if element['type'] == 'table':
elem_summary['rows'] = element['rows']
elem_summary['columns'] = element['columns']
elem_summary['cell_count'] = len(element.get('cells', []))
elif element['type'] == 'paragraph':
elem_summary['text_preview'] = element.get('text', '')[:100]
if element["type"] == "table":
elem_summary["rows"] = element["rows"]
elem_summary["columns"] = element["columns"]
elem_summary["cell_count"] = len(element.get("cells", []))
elif element["type"] == "paragraph":
elem_summary["text_preview"] = element.get("text", "")[:100]
result['elements'].append(elem_summary)
result["elements"].append(elem_summary)
# Add table details
if structure['tables']:
result['tables'] = []
for i, table in enumerate(structure['tables']):
if structure["tables"]:
result["tables"] = []
for i, table in enumerate(structure["tables"]):
table_data = extract_table_as_data(table)
result['tables'].append({
'index': i,
'position': {'start': table['start_index'], 'end': table['end_index']},
'dimensions': {'rows': table['rows'], 'columns': table['columns']},
'preview': table_data[:3] if table_data else [] # First 3 rows
})
result["tables"].append(
{
"index": i,
"position": {
"start": table["start_index"],
"end": table["end_index"],
},
"dimensions": {
"rows": table["rows"],
"columns": table["columns"],
},
"preview": table_data[:3] if table_data else [], # First 3 rows
}
)
else:
# Return basic analysis
@@ -894,19 +1006,22 @@ async def inspect_doc_structure(
# Add table information
tables = find_tables(doc)
if tables:
result['table_details'] = []
result["table_details"] = []
for i, table in enumerate(tables):
result['table_details'].append({
'index': i,
'rows': table['rows'],
'columns': table['columns'],
'start_index': table['start_index'],
'end_index': table['end_index']
})
result["table_details"].append(
{
"index": i,
"rows": table["rows"],
"columns": table["columns"],
"start_index": table["start_index"],
"end_index": table["end_index"],
}
)
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Document structure analysis for {document_id}:\n\n{json.dumps(result, indent=2)}\n\nLink: {link}"
@server.tool()
@handle_http_errors("create_table_with_data", service_type="docs")
@require_google_service("docs", "docs_write")
@@ -986,17 +1101,21 @@ async def create_table_with_data(
# If it failed due to index being at or beyond document end, retry with adjusted index
if not success and "must be less than the end index" in message:
logger.debug(f"Index {index} is at document boundary, retrying with index {index - 1}")
logger.debug(
f"Index {index} is at document boundary, retrying with index {index - 1}"
)
success, message, metadata = await table_manager.create_and_populate_table(
document_id, table_data, index - 1, bold_headers
)
if success:
link = f"https://docs.google.com/document/d/{document_id}/edit"
rows = metadata.get('rows', 0)
columns = metadata.get('columns', 0)
rows = metadata.get("rows", 0)
columns = metadata.get("columns", 0)
return f"SUCCESS: {message}. Table: {rows}x{columns}, Index: {index}. Link: {link}"
return (
f"SUCCESS: {message}. Table: {rows}x{columns}, Index: {index}. Link: {link}"
)
else:
return f"ERROR: {message}"
@@ -1047,7 +1166,9 @@ async def debug_table_structure(
Returns:
str: Detailed JSON structure showing table layout, cell positions, and current content
"""
logger.debug(f"[debug_table_structure] Doc={document_id}, table_index={table_index}")
logger.debug(
f"[debug_table_structure] Doc={document_id}, table_index={table_index}"
)
# Get the document
doc = await asyncio.to_thread(
@@ -1063,28 +1184,29 @@ async def debug_table_structure(
# Extract detailed cell information
debug_info = {
'table_index': table_index,
'dimensions': f"{table_info['rows']}x{table_info['columns']}",
'table_range': f"[{table_info['start_index']}-{table_info['end_index']}]",
'cells': []
"table_index": table_index,
"dimensions": f"{table_info['rows']}x{table_info['columns']}",
"table_range": f"[{table_info['start_index']}-{table_info['end_index']}]",
"cells": [],
}
for row_idx, row in enumerate(table_info['cells']):
for row_idx, row in enumerate(table_info["cells"]):
row_info = []
for col_idx, cell in enumerate(row):
cell_debug = {
'position': f"({row_idx},{col_idx})",
'range': f"[{cell['start_index']}-{cell['end_index']}]",
'insertion_index': cell.get('insertion_index', 'N/A'),
'current_content': repr(cell.get('content', '')),
'content_elements_count': len(cell.get('content_elements', []))
"position": f"({row_idx},{col_idx})",
"range": f"[{cell['start_index']}-{cell['end_index']}]",
"insertion_index": cell.get("insertion_index", "N/A"),
"current_content": repr(cell.get("content", "")),
"content_elements_count": len(cell.get("content_elements", [])),
}
row_info.append(cell_debug)
debug_info['cells'].append(row_info)
debug_info["cells"].append(row_info)
link = f"https://docs.google.com/document/d/{document_id}/edit"
return f"Table structure debug for table {table_index}:\n\n{json.dumps(debug_info, indent=2)}\n\nLink: {link}"
@server.tool()
@handle_http_errors("export_doc_to_pdf", service_type="drive")
@require_google_service("drive", "drive_file")
@@ -1107,16 +1229,20 @@ async def export_doc_to_pdf(
Returns:
str: Confirmation message with PDF file details and links
"""
logger.info(f"[export_doc_to_pdf] Email={user_google_email}, Doc={document_id}, pdf_filename={pdf_filename}, folder_id={folder_id}")
logger.info(
f"[export_doc_to_pdf] Email={user_google_email}, Doc={document_id}, pdf_filename={pdf_filename}, folder_id={folder_id}"
)
# Get file metadata first to validate it's a Google Doc
try:
file_metadata = await asyncio.to_thread(
service.files().get(
fileId=document_id,
service.files()
.get(
fileId=document_id,
fields="id, name, mimeType, webViewLink",
supportsAllDrives=True
).execute
supportsAllDrives=True,
)
.execute
)
except Exception as e:
return f"Error: Could not access document {document_id}: {str(e)}"
@@ -1134,75 +1260,70 @@ async def export_doc_to_pdf(
# Export the document as PDF
try:
request_obj = service.files().export_media(
fileId=document_id,
mimeType='application/pdf',
supportsAllDrives=True
fileId=document_id, mimeType="application/pdf", supportsAllDrives=True
)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request_obj)
done = False
while not done:
_, done = await asyncio.to_thread(downloader.next_chunk)
pdf_content = fh.getvalue()
pdf_size = len(pdf_content)
except Exception as e:
return f"Error: Failed to export document to PDF: {str(e)}"
# Determine PDF filename
if not pdf_filename:
pdf_filename = f"{original_name}_PDF.pdf"
elif not pdf_filename.endswith('.pdf'):
pdf_filename += '.pdf'
elif not pdf_filename.endswith(".pdf"):
pdf_filename += ".pdf"
# Upload PDF to Drive
try:
# Reuse the existing BytesIO object by resetting to the beginning
fh.seek(0)
# Create media upload object
media = MediaIoBaseUpload(
fh,
mimetype='application/pdf',
resumable=True
)
media = MediaIoBaseUpload(fh, mimetype="application/pdf", resumable=True)
# Prepare file metadata for upload
file_metadata = {
'name': pdf_filename,
'mimeType': 'application/pdf'
}
file_metadata = {"name": pdf_filename, "mimeType": "application/pdf"}
# Add parent folder if specified
if folder_id:
file_metadata['parents'] = [folder_id]
file_metadata["parents"] = [folder_id]
# Upload the file
uploaded_file = await asyncio.to_thread(
service.files().create(
service.files()
.create(
body=file_metadata,
media_body=media,
fields='id, name, webViewLink, parents',
supportsAllDrives=True
).execute
fields="id, name, webViewLink, parents",
supportsAllDrives=True,
)
.execute
)
pdf_file_id = uploaded_file.get('id')
pdf_web_link = uploaded_file.get('webViewLink', '#')
pdf_parents = uploaded_file.get('parents', [])
logger.info(f"[export_doc_to_pdf] Successfully uploaded PDF to Drive: {pdf_file_id}")
pdf_file_id = uploaded_file.get("id")
pdf_web_link = uploaded_file.get("webViewLink", "#")
pdf_parents = uploaded_file.get("parents", [])
logger.info(
f"[export_doc_to_pdf] Successfully uploaded PDF to Drive: {pdf_file_id}"
)
folder_info = ""
if folder_id:
folder_info = f" in folder {folder_id}"
elif pdf_parents:
folder_info = f" in folder {pdf_parents[0]}"
return f"Successfully exported '{original_name}' to PDF and saved to Drive as '{pdf_filename}' (ID: {pdf_file_id}, {pdf_size:,} bytes){folder_info}. PDF: {pdf_web_link} | Original: {web_view_link}"
except Exception as e:
return f"Error: Failed to upload PDF to Drive: {str(e)}. PDF was generated successfully ({pdf_size:,} bytes) but could not be saved to Drive."
@@ -1211,7 +1332,7 @@ async def export_doc_to_pdf(
_comment_tools = create_comment_tools("document", "document_id")
# Extract and register the functions
read_doc_comments = _comment_tools['read_comments']
create_doc_comment = _comment_tools['create_comment']
reply_to_comment = _comment_tools['reply_to_comment']
resolve_comment = _comment_tools['resolve_comment']
read_doc_comments = _comment_tools["read_comments"]
create_doc_comment = _comment_tools["create_comment"]
reply_to_comment = _comment_tools["reply_to_comment"]
resolve_comment = _comment_tools["resolve_comment"]

View File

@@ -11,8 +11,8 @@ from .validation_manager import ValidationManager
from .batch_operation_manager import BatchOperationManager
__all__ = [
'TableOperationManager',
'HeaderFooterManager',
'ValidationManager',
'BatchOperationManager'
]
"TableOperationManager",
"HeaderFooterManager",
"ValidationManager",
"BatchOperationManager",
]

View File

@@ -4,6 +4,7 @@ Batch Operation Manager
This module provides high-level batch operation management for Google Docs,
extracting complex validation and request building logic.
"""
import logging
import asyncio
from typing import Any, Union, Dict, List, Tuple
@@ -15,7 +16,7 @@ from gdocs.docs_helpers import (
create_find_replace_request,
create_insert_table_request,
create_insert_page_break_request,
validate_operation
validate_operation,
)
logger = logging.getLogger(__name__)
@@ -24,99 +25,106 @@ logger = logging.getLogger(__name__)
class BatchOperationManager:
"""
High-level manager for Google Docs batch operations.
Handles complex multi-operation requests including:
- Operation validation and request building
- Batch execution with proper error handling
- Operation result processing and reporting
"""
def __init__(self, service):
"""
Initialize the batch operation manager.
Args:
service: Google Docs API service instance
"""
self.service = service
async def execute_batch_operations(
self,
document_id: str,
operations: list[dict[str, Any]]
self, document_id: str, operations: list[dict[str, Any]]
) -> tuple[bool, str, dict[str, Any]]:
"""
Execute multiple document operations in a single atomic batch.
This method extracts the complex logic from batch_update_doc tool function.
Args:
document_id: ID of the document to update
operations: List of operation dictionaries
Returns:
Tuple of (success, message, metadata)
"""
logger.info(f"Executing batch operations on document {document_id}")
logger.info(f"Operations count: {len(operations)}")
if not operations:
return False, "No operations provided. Please provide at least one operation.", {}
return (
False,
"No operations provided. Please provide at least one operation.",
{},
)
try:
# Validate and build requests
requests, operation_descriptions = await self._validate_and_build_requests(operations)
requests, operation_descriptions = await self._validate_and_build_requests(
operations
)
if not requests:
return False, "No valid requests could be built from operations", {}
# Execute the batch
result = await self._execute_batch_requests(document_id, requests)
# Process results
metadata = {
'operations_count': len(operations),
'requests_count': len(requests),
'replies_count': len(result.get('replies', [])),
'operation_summary': operation_descriptions[:5] # First 5 operations
"operations_count": len(operations),
"requests_count": len(requests),
"replies_count": len(result.get("replies", [])),
"operation_summary": operation_descriptions[:5], # First 5 operations
}
summary = self._build_operation_summary(operation_descriptions)
return True, f"Successfully executed {len(operations)} operations ({summary})", metadata
return (
True,
f"Successfully executed {len(operations)} operations ({summary})",
metadata,
)
except Exception as e:
logger.error(f"Failed to execute batch operations: {str(e)}")
return False, f"Batch operation failed: {str(e)}", {}
async def _validate_and_build_requests(
self,
operations: list[dict[str, Any]]
self, operations: list[dict[str, Any]]
) -> tuple[list[dict[str, Any]], list[str]]:
"""
Validate operations and build API requests.
Args:
operations: List of operation dictionaries
Returns:
Tuple of (requests, operation_descriptions)
"""
requests = []
operation_descriptions = []
for i, op in enumerate(operations):
# Validate operation structure
is_valid, error_msg = validate_operation(op)
if not is_valid:
raise ValueError(f"Operation {i+1}: {error_msg}")
op_type = op.get('type')
raise ValueError(f"Operation {i + 1}: {error_msg}")
op_type = op.get("type")
try:
# Build request based on operation type
result = self._build_operation_request(op, op_type)
# Handle both single request and list of requests
if isinstance(result[0], list):
# Multiple requests (e.g., replace_text)
@@ -127,179 +135,211 @@ class BatchOperationManager:
# Single request
requests.append(result[0])
operation_descriptions.append(result[1])
except KeyError as e:
raise ValueError(f"Operation {i+1} ({op_type}) missing required field: {e}")
raise ValueError(
f"Operation {i + 1} ({op_type}) missing required field: {e}"
)
except Exception as e:
raise ValueError(f"Operation {i+1} ({op_type}) failed validation: {str(e)}")
raise ValueError(
f"Operation {i + 1} ({op_type}) failed validation: {str(e)}"
)
return requests, operation_descriptions
def _build_operation_request(
self,
op: dict[str, Any],
op_type: str
self, op: dict[str, Any], op_type: str
) -> Tuple[Union[Dict[str, Any], List[Dict[str, Any]]], str]:
"""
Build a single operation request.
Args:
op: Operation dictionary
op_type: Operation type
Returns:
Tuple of (request, description)
"""
if op_type == 'insert_text':
request = create_insert_text_request(op['index'], op['text'])
if op_type == "insert_text":
request = create_insert_text_request(op["index"], op["text"])
description = f"insert text at {op['index']}"
elif op_type == 'delete_text':
request = create_delete_range_request(op['start_index'], op['end_index'])
elif op_type == "delete_text":
request = create_delete_range_request(op["start_index"], op["end_index"])
description = f"delete text {op['start_index']}-{op['end_index']}"
elif op_type == 'replace_text':
elif op_type == "replace_text":
# Replace is delete + insert (must be done in this order)
delete_request = create_delete_range_request(op['start_index'], op['end_index'])
insert_request = create_insert_text_request(op['start_index'], op['text'])
delete_request = create_delete_range_request(
op["start_index"], op["end_index"]
)
insert_request = create_insert_text_request(op["start_index"], op["text"])
# Return both requests as a list
request = [delete_request, insert_request]
description = f"replace text {op['start_index']}-{op['end_index']} with '{op['text'][:20]}{'...' if len(op['text']) > 20 else ''}'"
elif op_type == 'format_text':
elif op_type == "format_text":
request = create_format_text_request(
op['start_index'], op['end_index'],
op.get('bold'), op.get('italic'), op.get('underline'),
op.get('font_size'), op.get('font_family'),
op.get('text_color'), op.get('background_color')
op["start_index"],
op["end_index"],
op.get("bold"),
op.get("italic"),
op.get("underline"),
op.get("font_size"),
op.get("font_family"),
op.get("text_color"),
op.get("background_color"),
)
if not request:
raise ValueError("No formatting options provided")
# Build format description
format_changes = []
for param, name in [
('bold', 'bold'), ('italic', 'italic'), ('underline', 'underline'),
('font_size', 'font size'), ('font_family', 'font family'),
('text_color', 'text color'), ('background_color', 'background color')
("bold", "bold"),
("italic", "italic"),
("underline", "underline"),
("font_size", "font size"),
("font_family", "font family"),
("text_color", "text color"),
("background_color", "background color"),
]:
if op.get(param) is not None:
value = f"{op[param]}pt" if param == 'font_size' else op[param]
value = f"{op[param]}pt" if param == "font_size" else op[param]
format_changes.append(f"{name}: {value}")
description = f"format text {op['start_index']}-{op['end_index']} ({', '.join(format_changes)})"
elif op_type == 'insert_table':
request = create_insert_table_request(op['index'], op['rows'], op['columns'])
elif op_type == "insert_table":
request = create_insert_table_request(
op["index"], op["rows"], op["columns"]
)
description = f"insert {op['rows']}x{op['columns']} table at {op['index']}"
elif op_type == 'insert_page_break':
request = create_insert_page_break_request(op['index'])
elif op_type == "insert_page_break":
request = create_insert_page_break_request(op["index"])
description = f"insert page break at {op['index']}"
elif op_type == 'find_replace':
elif op_type == "find_replace":
request = create_find_replace_request(
op['find_text'], op['replace_text'], op.get('match_case', False)
op["find_text"], op["replace_text"], op.get("match_case", False)
)
description = f"find/replace '{op['find_text']}''{op['replace_text']}'"
else:
supported_types = [
'insert_text', 'delete_text', 'replace_text', 'format_text',
'insert_table', 'insert_page_break', 'find_replace'
"insert_text",
"delete_text",
"replace_text",
"format_text",
"insert_table",
"insert_page_break",
"find_replace",
]
raise ValueError(f"Unsupported operation type '{op_type}'. Supported: {', '.join(supported_types)}")
raise ValueError(
f"Unsupported operation type '{op_type}'. Supported: {', '.join(supported_types)}"
)
return request, description
async def _execute_batch_requests(
self,
document_id: str,
requests: list[dict[str, Any]]
self, document_id: str, requests: list[dict[str, Any]]
) -> dict[str, Any]:
"""
Execute the batch requests against the Google Docs API.
Args:
document_id: Document ID
requests: List of API requests
Returns:
API response
"""
return await asyncio.to_thread(
self.service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute
self.service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute
)
def _build_operation_summary(self, operation_descriptions: list[str]) -> str:
"""
Build a concise summary of operations performed.
Args:
operation_descriptions: List of operation descriptions
Returns:
Summary string
"""
if not operation_descriptions:
return "no operations"
summary_items = operation_descriptions[:3] # Show first 3 operations
summary = ', '.join(summary_items)
summary = ", ".join(summary_items)
if len(operation_descriptions) > 3:
remaining = len(operation_descriptions) - 3
summary += f" and {remaining} more operation{'s' if remaining > 1 else ''}"
return summary
def get_supported_operations(self) -> dict[str, Any]:
"""
Get information about supported batch operations.
Returns:
Dictionary with supported operation types and their required parameters
"""
return {
'supported_operations': {
'insert_text': {
'required': ['index', 'text'],
'description': 'Insert text at specified index'
"supported_operations": {
"insert_text": {
"required": ["index", "text"],
"description": "Insert text at specified index",
},
'delete_text': {
'required': ['start_index', 'end_index'],
'description': 'Delete text in specified range'
"delete_text": {
"required": ["start_index", "end_index"],
"description": "Delete text in specified range",
},
'replace_text': {
'required': ['start_index', 'end_index', 'text'],
'description': 'Replace text in range with new text'
"replace_text": {
"required": ["start_index", "end_index", "text"],
"description": "Replace text in range with new text",
},
'format_text': {
'required': ['start_index', 'end_index'],
'optional': ['bold', 'italic', 'underline', 'font_size', 'font_family', 'text_color', 'background_color'],
'description': 'Apply formatting to text range'
"format_text": {
"required": ["start_index", "end_index"],
"optional": [
"bold",
"italic",
"underline",
"font_size",
"font_family",
"text_color",
"background_color",
],
"description": "Apply formatting to text range",
},
'insert_table': {
'required': ['index', 'rows', 'columns'],
'description': 'Insert table at specified index'
"insert_table": {
"required": ["index", "rows", "columns"],
"description": "Insert table at specified index",
},
'insert_page_break': {
'required': ['index'],
'description': 'Insert page break at specified index'
"insert_page_break": {
"required": ["index"],
"description": "Insert page break at specified index",
},
"find_replace": {
"required": ["find_text", "replace_text"],
"optional": ["match_case"],
"description": "Find and replace text throughout document",
},
'find_replace': {
'required': ['find_text', 'replace_text'],
'optional': ['match_case'],
'description': 'Find and replace text throughout document'
}
},
'example_operations': [
"example_operations": [
{"type": "insert_text", "index": 1, "text": "Hello World"},
{"type": "format_text", "start_index": 1, "end_index": 12, "bold": True},
{"type": "insert_table", "index": 20, "rows": 2, "columns": 3}
]
{
"type": "format_text",
"start_index": 1,
"end_index": 12,
"bold": True,
},
{"type": "insert_table", "index": 20, "rows": 2, "columns": 3},
],
}

View File

@@ -4,6 +4,7 @@ Header Footer Manager
This module provides high-level operations for managing headers and footers
in Google Docs, extracting complex logic from the main tools module.
"""
import logging
import asyncio
from typing import Any, Optional
@@ -14,319 +15,325 @@ logger = logging.getLogger(__name__)
class HeaderFooterManager:
"""
High-level manager for Google Docs header and footer operations.
Handles complex header/footer operations including:
- Finding and updating existing headers/footers
- Content replacement with proper range calculation
- Section type management
"""
def __init__(self, service):
"""
Initialize the header footer manager.
Args:
service: Google Docs API service instance
"""
self.service = service
async def update_header_footer_content(
self,
document_id: str,
section_type: str,
content: str,
header_footer_type: str = "DEFAULT"
header_footer_type: str = "DEFAULT",
) -> tuple[bool, str]:
"""
Updates header or footer content in a document.
This method extracts the complex logic from update_doc_headers_footers tool function.
Args:
document_id: ID of the document to update
section_type: Type of section ("header" or "footer")
content: New content for the section
header_footer_type: Type of header/footer ("DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE")
Returns:
Tuple of (success, message)
"""
logger.info(f"Updating {section_type} in document {document_id}")
# Validate section type
if section_type not in ["header", "footer"]:
return False, "section_type must be 'header' or 'footer'"
# Validate header/footer type
if header_footer_type not in ["DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE"]:
return False, "header_footer_type must be 'DEFAULT', 'FIRST_PAGE_ONLY', or 'EVEN_PAGE'"
return (
False,
"header_footer_type must be 'DEFAULT', 'FIRST_PAGE_ONLY', or 'EVEN_PAGE'",
)
try:
# Get document structure
doc = await self._get_document(document_id)
# Find the target section
target_section, section_id = await self._find_target_section(
doc, section_type, header_footer_type
)
if not target_section:
return False, f"No {section_type} found in document. Please create a {section_type} first in Google Docs."
return (
False,
f"No {section_type} found in document. Please create a {section_type} first in Google Docs.",
)
# Update the content
success = await self._replace_section_content(document_id, target_section, content)
success = await self._replace_section_content(
document_id, target_section, content
)
if success:
return True, f"Updated {section_type} content in document {document_id}"
else:
return False, f"Could not find content structure in {section_type} to update"
return (
False,
f"Could not find content structure in {section_type} to update",
)
except Exception as e:
logger.error(f"Failed to update {section_type}: {str(e)}")
return False, f"Failed to update {section_type}: {str(e)}"
async def _get_document(self, document_id: str) -> dict[str, Any]:
"""Get the full document data."""
return await asyncio.to_thread(
self.service.documents().get(documentId=document_id).execute
)
async def _find_target_section(
self,
doc: dict[str, Any],
section_type: str,
header_footer_type: str
self, doc: dict[str, Any], section_type: str, header_footer_type: str
) -> tuple[Optional[dict[str, Any]], Optional[str]]:
"""
Find the target header or footer section.
Args:
doc: Document data
section_type: "header" or "footer"
header_footer_type: Type of header/footer
Returns:
Tuple of (section_data, section_id) or (None, None) if not found
"""
if section_type == "header":
sections = doc.get('headers', {})
sections = doc.get("headers", {})
else:
sections = doc.get('footers', {})
sections = doc.get("footers", {})
# Try to match section based on header_footer_type
# Google Docs API typically uses section IDs that correspond to types
# First, try to find an exact match based on common patterns
for section_id, section_data in sections.items():
# Check if section_data contains type information
if 'type' in section_data and section_data['type'] == header_footer_type:
if "type" in section_data and section_data["type"] == header_footer_type:
return section_data, section_id
# If no exact match, try pattern matching on section ID
# Google Docs often uses predictable section ID patterns
target_patterns = {
"DEFAULT": ["default", "kix"], # DEFAULT headers often have these patterns
"FIRST_PAGE": ["first", "firstpage"],
"EVEN_PAGE": ["even", "evenpage"],
"FIRST_PAGE_ONLY": ["first", "firstpage"] # Legacy support
"FIRST_PAGE_ONLY": ["first", "firstpage"], # Legacy support
}
patterns = target_patterns.get(header_footer_type, [])
for pattern in patterns:
for section_id, section_data in sections.items():
if pattern.lower() in section_id.lower():
return section_data, section_id
# If still no match, return the first available section as fallback
# This maintains backward compatibility
for section_id, section_data in sections.items():
return section_data, section_id
return None, None
async def _replace_section_content(
self,
document_id: str,
section: dict[str, Any],
new_content: str
self, document_id: str, section: dict[str, Any], new_content: str
) -> bool:
"""
Replace the content in a header or footer section.
Args:
document_id: Document ID
section: Section data containing content elements
new_content: New content to insert
Returns:
True if successful, False otherwise
"""
content_elements = section.get('content', [])
content_elements = section.get("content", [])
if not content_elements:
return False
# Find the first paragraph to replace content
first_para = self._find_first_paragraph(content_elements)
if not first_para:
return False
# Calculate content range
start_index = first_para.get('startIndex', 0)
end_index = first_para.get('endIndex', 0)
start_index = first_para.get("startIndex", 0)
end_index = first_para.get("endIndex", 0)
# Build requests to replace content
requests = []
# Delete existing content if any (preserve paragraph structure)
if end_index > start_index:
requests.append({
'deleteContentRange': {
'range': {
'startIndex': start_index,
'endIndex': end_index - 1 # Keep the paragraph end marker
requests.append(
{
"deleteContentRange": {
"range": {
"startIndex": start_index,
"endIndex": end_index - 1, # Keep the paragraph end marker
}
}
}
})
)
# Insert new content
requests.append({
'insertText': {
'location': {'index': start_index},
'text': new_content
}
})
requests.append(
{"insertText": {"location": {"index": start_index}, "text": new_content}}
)
try:
await asyncio.to_thread(
self.service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute
self.service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute
)
return True
except Exception as e:
logger.error(f"Failed to replace section content: {str(e)}")
return False
def _find_first_paragraph(self, content_elements: list[dict[str, Any]]) -> Optional[dict[str, Any]]:
def _find_first_paragraph(
self, content_elements: list[dict[str, Any]]
) -> Optional[dict[str, Any]]:
"""Find the first paragraph element in content."""
for element in content_elements:
if 'paragraph' in element:
if "paragraph" in element:
return element
return None
async def get_header_footer_info(
self,
document_id: str
) -> dict[str, Any]:
async def get_header_footer_info(self, document_id: str) -> dict[str, Any]:
"""
Get information about all headers and footers in the document.
Args:
document_id: Document ID
Returns:
Dictionary with header and footer information
"""
try:
doc = await self._get_document(document_id)
headers_info = {}
for header_id, header_data in doc.get('headers', {}).items():
for header_id, header_data in doc.get("headers", {}).items():
headers_info[header_id] = self._extract_section_info(header_data)
footers_info = {}
for footer_id, footer_data in doc.get('footers', {}).items():
for footer_id, footer_data in doc.get("footers", {}).items():
footers_info[footer_id] = self._extract_section_info(footer_data)
return {
'headers': headers_info,
'footers': footers_info,
'has_headers': bool(headers_info),
'has_footers': bool(footers_info)
"headers": headers_info,
"footers": footers_info,
"has_headers": bool(headers_info),
"has_footers": bool(footers_info),
}
except Exception as e:
logger.error(f"Failed to get header/footer info: {str(e)}")
return {'error': str(e)}
return {"error": str(e)}
def _extract_section_info(self, section_data: dict[str, Any]) -> dict[str, Any]:
"""Extract useful information from a header/footer section."""
content_elements = section_data.get('content', [])
content_elements = section_data.get("content", [])
# Extract text content
text_content = ""
for element in content_elements:
if 'paragraph' in element:
para = element['paragraph']
for para_element in para.get('elements', []):
if 'textRun' in para_element:
text_content += para_element['textRun'].get('content', '')
if "paragraph" in element:
para = element["paragraph"]
for para_element in para.get("elements", []):
if "textRun" in para_element:
text_content += para_element["textRun"].get("content", "")
return {
'content_preview': text_content[:100] if text_content else "(empty)",
'element_count': len(content_elements),
'start_index': content_elements[0].get('startIndex', 0) if content_elements else 0,
'end_index': content_elements[-1].get('endIndex', 0) if content_elements else 0
"content_preview": text_content[:100] if text_content else "(empty)",
"element_count": len(content_elements),
"start_index": content_elements[0].get("startIndex", 0)
if content_elements
else 0,
"end_index": content_elements[-1].get("endIndex", 0)
if content_elements
else 0,
}
async def create_header_footer(
self,
document_id: str,
section_type: str,
header_footer_type: str = "DEFAULT"
self, document_id: str, section_type: str, header_footer_type: str = "DEFAULT"
) -> tuple[bool, str]:
"""
Create a new header or footer section.
Args:
document_id: Document ID
section_type: "header" or "footer"
header_footer_type: Type of header/footer ("DEFAULT", "FIRST_PAGE", or "EVEN_PAGE")
Returns:
Tuple of (success, message)
"""
if section_type not in ["header", "footer"]:
return False, "section_type must be 'header' or 'footer'"
# Map our type names to API type names
type_mapping = {
"DEFAULT": "DEFAULT",
"FIRST_PAGE": "FIRST_PAGE",
"EVEN_PAGE": "EVEN_PAGE",
"FIRST_PAGE_ONLY": "FIRST_PAGE" # Support legacy name
"FIRST_PAGE_ONLY": "FIRST_PAGE", # Support legacy name
}
api_type = type_mapping.get(header_footer_type, header_footer_type)
if api_type not in ["DEFAULT", "FIRST_PAGE", "EVEN_PAGE"]:
return False, "header_footer_type must be 'DEFAULT', 'FIRST_PAGE', or 'EVEN_PAGE'"
return (
False,
"header_footer_type must be 'DEFAULT', 'FIRST_PAGE', or 'EVEN_PAGE'",
)
try:
# Build the request
request = {
'type': api_type
}
request = {"type": api_type}
# Create the appropriate request type
if section_type == "header":
batch_request = {'createHeader': request}
batch_request = {"createHeader": request}
else:
batch_request = {'createFooter': request}
batch_request = {"createFooter": request}
# Execute the request
await asyncio.to_thread(
self.service.documents().batchUpdate(
documentId=document_id,
body={'requests': [batch_request]}
).execute
self.service.documents()
.batchUpdate(documentId=document_id, body={"requests": [batch_request]})
.execute
)
return True, f"Successfully created {section_type} with type {api_type}"
except Exception as e:
error_msg = str(e)
if "already exists" in error_msg.lower():
return False, f"A {section_type} of type {api_type} already exists in the document"
return False, f"Failed to create {section_type}: {error_msg}"
return (
False,
f"A {section_type} of type {api_type} already exists in the document",
)
return False, f"Failed to create {section_type}: {error_msg}"

View File

@@ -4,6 +4,7 @@ Table Operation Manager
This module provides high-level table operations that orchestrate
multiple Google Docs API calls for complex table manipulations.
"""
import logging
import asyncio
from typing import List, Dict, Any, Tuple
@@ -18,153 +19,160 @@ logger = logging.getLogger(__name__)
class TableOperationManager:
"""
High-level manager for Google Docs table operations.
Handles complex multi-step table operations including:
- Creating tables with data population
- Populating existing tables
- Managing cell-by-cell operations with proper index refreshing
"""
def __init__(self, service):
"""
Initialize the table operation manager.
Args:
service: Google Docs API service instance
"""
self.service = service
async def create_and_populate_table(
self,
document_id: str,
table_data: List[List[str]],
index: int,
bold_headers: bool = True
bold_headers: bool = True,
) -> Tuple[bool, str, Dict[str, Any]]:
"""
Creates a table and populates it with data in a reliable multi-step process.
This method extracts the complex logic from create_table_with_data tool function.
Args:
document_id: ID of the document to update
table_data: 2D list of strings for table content
index: Position to insert the table
bold_headers: Whether to make the first row bold
Returns:
Tuple of (success, message, metadata)
"""
logger.debug(f"Creating table at index {index}, dimensions: {len(table_data)}x{len(table_data[0]) if table_data and len(table_data) > 0 else 0}")
logger.debug(
f"Creating table at index {index}, dimensions: {len(table_data)}x{len(table_data[0]) if table_data and len(table_data) > 0 else 0}"
)
# Validate input data
is_valid, error_msg = validate_table_data(table_data)
if not is_valid:
return False, f"Invalid table data: {error_msg}", {}
rows = len(table_data)
cols = len(table_data[0])
try:
# Step 1: Create empty table
await self._create_empty_table(document_id, index, rows, cols)
# Step 2: Get fresh document structure to find actual cell positions
fresh_tables = await self._get_document_tables(document_id)
if not fresh_tables:
return False, "Could not find table after creation", {}
# Step 3: Populate each cell with proper index refreshing
population_count = await self._populate_table_cells(
document_id, table_data, bold_headers
)
metadata = {
'rows': rows,
'columns': cols,
'populated_cells': population_count,
'table_index': len(fresh_tables) - 1
"rows": rows,
"columns": cols,
"populated_cells": population_count,
"table_index": len(fresh_tables) - 1,
}
return True, f"Successfully created {rows}x{cols} table and populated {population_count} cells", metadata
return (
True,
f"Successfully created {rows}x{cols} table and populated {population_count} cells",
metadata,
)
except Exception as e:
logger.error(f"Failed to create and populate table: {str(e)}")
return False, f"Table creation failed: {str(e)}", {}
async def _create_empty_table(
self,
document_id: str,
index: int,
rows: int,
cols: int
self, document_id: str, index: int, rows: int, cols: int
) -> None:
"""Create an empty table at the specified index."""
logger.debug(f"Creating {rows}x{cols} table at index {index}")
await asyncio.to_thread(
self.service.documents().batchUpdate(
self.service.documents()
.batchUpdate(
documentId=document_id,
body={'requests': [create_insert_table_request(index, rows, cols)]}
).execute
body={"requests": [create_insert_table_request(index, rows, cols)]},
)
.execute
)
async def _get_document_tables(self, document_id: str) -> List[Dict[str, Any]]:
"""Get fresh document structure and extract table information."""
doc = await asyncio.to_thread(
self.service.documents().get(documentId=document_id).execute
)
return find_tables(doc)
async def _populate_table_cells(
self,
document_id: str,
table_data: List[List[str]],
bold_headers: bool
self, document_id: str, table_data: List[List[str]], bold_headers: bool
) -> int:
"""
Populate table cells with data, refreshing structure after each insertion.
This prevents index shifting issues by getting fresh cell positions
before each insertion.
"""
population_count = 0
for row_idx, row_data in enumerate(table_data):
logger.debug(f"Processing row {row_idx}: {len(row_data)} cells")
for col_idx, cell_text in enumerate(row_data):
if not cell_text: # Skip empty cells
continue
try:
# CRITICAL: Refresh document structure before each insertion
success = await self._populate_single_cell(
document_id, row_idx, col_idx, cell_text, bold_headers and row_idx == 0
document_id,
row_idx,
col_idx,
cell_text,
bold_headers and row_idx == 0,
)
if success:
population_count += 1
logger.debug(f"Populated cell ({row_idx},{col_idx})")
else:
logger.warning(f"Failed to populate cell ({row_idx},{col_idx})")
except Exception as e:
logger.error(f"Error populating cell ({row_idx},{col_idx}): {str(e)}")
logger.error(
f"Error populating cell ({row_idx},{col_idx}): {str(e)}"
)
return population_count
async def _populate_single_cell(
self,
document_id: str,
row_idx: int,
col_idx: int,
cell_text: str,
apply_bold: bool = False
apply_bold: bool = False,
) -> bool:
"""
Populate a single cell with text, with optional bold formatting.
Returns True if successful, False otherwise.
"""
try:
@@ -172,167 +180,193 @@ class TableOperationManager:
tables = await self._get_document_tables(document_id)
if not tables:
return False
table = tables[-1] # Use the last table (newly created one)
cells = table.get('cells', [])
cells = table.get("cells", [])
# Bounds checking
if row_idx >= len(cells) or col_idx >= len(cells[row_idx]):
logger.error(f"Cell ({row_idx},{col_idx}) out of bounds")
return False
cell = cells[row_idx][col_idx]
insertion_index = cell.get('insertion_index')
insertion_index = cell.get("insertion_index")
if not insertion_index:
logger.warning(f"No insertion_index for cell ({row_idx},{col_idx})")
return False
# Insert text
await asyncio.to_thread(
self.service.documents().batchUpdate(
self.service.documents()
.batchUpdate(
documentId=document_id,
body={'requests': [{
'insertText': {
'location': {'index': insertion_index},
'text': cell_text
}
}]}
).execute
body={
"requests": [
{
"insertText": {
"location": {"index": insertion_index},
"text": cell_text,
}
}
]
},
)
.execute
)
# Apply bold formatting if requested
if apply_bold:
await self._apply_bold_formatting(
document_id, insertion_index, insertion_index + len(cell_text)
)
return True
except Exception as e:
logger.error(f"Failed to populate single cell: {str(e)}")
return False
async def _apply_bold_formatting(
self,
document_id: str,
start_index: int,
end_index: int
self, document_id: str, start_index: int, end_index: int
) -> None:
"""Apply bold formatting to a text range."""
await asyncio.to_thread(
self.service.documents().batchUpdate(
self.service.documents()
.batchUpdate(
documentId=document_id,
body={'requests': [{
'updateTextStyle': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'textStyle': {'bold': True},
'fields': 'bold'
}
}]}
).execute
body={
"requests": [
{
"updateTextStyle": {
"range": {
"startIndex": start_index,
"endIndex": end_index,
},
"textStyle": {"bold": True},
"fields": "bold",
}
}
]
},
)
.execute
)
async def populate_existing_table(
self,
document_id: str,
table_index: int,
table_data: List[List[str]],
clear_existing: bool = False
clear_existing: bool = False,
) -> Tuple[bool, str, Dict[str, Any]]:
"""
Populate an existing table with data.
Args:
document_id: ID of the document
table_index: Index of the table to populate (0-based)
table_data: 2D list of data to insert
clear_existing: Whether to clear existing content first
Returns:
Tuple of (success, message, metadata)
"""
try:
tables = await self._get_document_tables(document_id)
if table_index >= len(tables):
return False, f"Table index {table_index} not found. Document has {len(tables)} tables", {}
return (
False,
f"Table index {table_index} not found. Document has {len(tables)} tables",
{},
)
table_info = tables[table_index]
# Validate dimensions
table_rows = table_info['rows']
table_cols = table_info['columns']
table_rows = table_info["rows"]
table_cols = table_info["columns"]
data_rows = len(table_data)
data_cols = len(table_data[0]) if table_data else 0
if data_rows > table_rows or data_cols > table_cols:
return False, f"Data ({data_rows}x{data_cols}) exceeds table dimensions ({table_rows}x{table_cols})", {}
return (
False,
f"Data ({data_rows}x{data_cols}) exceeds table dimensions ({table_rows}x{table_cols})",
{},
)
# Populate cells
population_count = await self._populate_existing_table_cells(
document_id, table_index, table_data
)
metadata = {
'table_index': table_index,
'populated_cells': population_count,
'table_dimensions': f"{table_rows}x{table_cols}",
'data_dimensions': f"{data_rows}x{data_cols}"
"table_index": table_index,
"populated_cells": population_count,
"table_dimensions": f"{table_rows}x{table_cols}",
"data_dimensions": f"{data_rows}x{data_cols}",
}
return True, f"Successfully populated {population_count} cells in existing table", metadata
return (
True,
f"Successfully populated {population_count} cells in existing table",
metadata,
)
except Exception as e:
return False, f"Failed to populate existing table: {str(e)}", {}
async def _populate_existing_table_cells(
self,
document_id: str,
table_index: int,
table_data: List[List[str]]
self, document_id: str, table_index: int, table_data: List[List[str]]
) -> int:
"""Populate cells in an existing table."""
population_count = 0
for row_idx, row_data in enumerate(table_data):
for col_idx, cell_text in enumerate(row_data):
if not cell_text:
continue
# Get fresh table structure for each cell
tables = await self._get_document_tables(document_id)
if table_index >= len(tables):
break
table = tables[table_index]
cells = table.get('cells', [])
cells = table.get("cells", [])
if row_idx >= len(cells) or col_idx >= len(cells[row_idx]):
continue
cell = cells[row_idx][col_idx]
# For existing tables, append to existing content
cell_end = cell['end_index'] - 1 # Don't include cell end marker
cell_end = cell["end_index"] - 1 # Don't include cell end marker
try:
await asyncio.to_thread(
self.service.documents().batchUpdate(
self.service.documents()
.batchUpdate(
documentId=document_id,
body={'requests': [{
'insertText': {
'location': {'index': cell_end},
'text': cell_text
}
}]}
).execute
body={
"requests": [
{
"insertText": {
"location": {"index": cell_end},
"text": cell_text,
}
}
]
},
)
.execute
)
population_count += 1
except Exception as e:
logger.error(f"Failed to populate existing cell ({row_idx},{col_idx}): {str(e)}")
return population_count
logger.error(
f"Failed to populate existing cell ({row_idx},{col_idx}): {str(e)}"
)
return population_count

View File

@@ -4,6 +4,7 @@ Validation Manager
This module provides centralized validation logic for Google Docs operations,
extracting validation patterns from individual tool functions.
"""
import logging
from typing import Dict, Any, List, Tuple, Optional
@@ -15,106 +16,138 @@ logger = logging.getLogger(__name__)
class ValidationManager:
"""
Centralized validation manager for Google Docs operations.
Provides consistent validation patterns and error messages across
all document operations, reducing code duplication and improving
error message quality.
"""
def __init__(self):
"""Initialize the validation manager."""
self.validation_rules = self._setup_validation_rules()
def _setup_validation_rules(self) -> Dict[str, Any]:
"""Setup validation rules and constraints."""
return {
'table_max_rows': 1000,
'table_max_columns': 20,
'document_id_pattern': r'^[a-zA-Z0-9-_]+$',
'max_text_length': 1000000, # 1MB text limit
'font_size_range': (1, 400), # Google Docs font size limits
'valid_header_footer_types': ["DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE"],
'valid_section_types': ["header", "footer"],
'valid_list_types': ["UNORDERED", "ORDERED"],
'valid_element_types': ["table", "list", "page_break"]
"table_max_rows": 1000,
"table_max_columns": 20,
"document_id_pattern": r"^[a-zA-Z0-9-_]+$",
"max_text_length": 1000000, # 1MB text limit
"font_size_range": (1, 400), # Google Docs font size limits
"valid_header_footer_types": ["DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE"],
"valid_section_types": ["header", "footer"],
"valid_list_types": ["UNORDERED", "ORDERED"],
"valid_element_types": ["table", "list", "page_break"],
}
def validate_document_id(self, document_id: str) -> Tuple[bool, str]:
"""
Validate Google Docs document ID format.
Args:
document_id: Document ID to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not document_id:
return False, "Document ID cannot be empty"
if not isinstance(document_id, str):
return False, f"Document ID must be a string, got {type(document_id).__name__}"
return (
False,
f"Document ID must be a string, got {type(document_id).__name__}",
)
# Basic length check (Google Docs IDs are typically 40+ characters)
if len(document_id) < 20:
return False, "Document ID appears too short to be valid"
return True, ""
def validate_table_data(self, table_data: List[List[str]]) -> Tuple[bool, str]:
"""
Comprehensive validation for table data format.
This extracts and centralizes table validation logic from multiple functions.
Args:
table_data: 2D array of data to validate
Returns:
Tuple of (is_valid, detailed_error_message)
"""
if not table_data:
return False, "Table data cannot be empty. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]"
return (
False,
"Table data cannot be empty. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
)
if not isinstance(table_data, list):
return False, f"Table data must be a list, got {type(table_data).__name__}. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]"
return (
False,
f"Table data must be a list, got {type(table_data).__name__}. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
)
# Check if it's a 2D list
if not all(isinstance(row, list) for row in table_data):
non_list_rows = [i for i, row in enumerate(table_data) if not isinstance(row, list)]
return False, f"All rows must be lists. Rows {non_list_rows} are not lists. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]"
non_list_rows = [
i for i, row in enumerate(table_data) if not isinstance(row, list)
]
return (
False,
f"All rows must be lists. Rows {non_list_rows} are not lists. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
)
# Check for empty rows
if any(len(row) == 0 for row in table_data):
empty_rows = [i for i, row in enumerate(table_data) if len(row) == 0]
return False, f"Rows cannot be empty. Empty rows found at indices: {empty_rows}"
return (
False,
f"Rows cannot be empty. Empty rows found at indices: {empty_rows}",
)
# Check column consistency
col_counts = [len(row) for row in table_data]
if len(set(col_counts)) > 1:
return False, f"All rows must have the same number of columns. Found column counts: {col_counts}. Fix your data structure."
return (
False,
f"All rows must have the same number of columns. Found column counts: {col_counts}. Fix your data structure.",
)
rows = len(table_data)
cols = col_counts[0]
# Check dimension limits
if rows > self.validation_rules['table_max_rows']:
return False, f"Too many rows ({rows}). Maximum allowed: {self.validation_rules['table_max_rows']}"
if cols > self.validation_rules['table_max_columns']:
return False, f"Too many columns ({cols}). Maximum allowed: {self.validation_rules['table_max_columns']}"
if rows > self.validation_rules["table_max_rows"]:
return (
False,
f"Too many rows ({rows}). Maximum allowed: {self.validation_rules['table_max_rows']}",
)
if cols > self.validation_rules["table_max_columns"]:
return (
False,
f"Too many columns ({cols}). Maximum allowed: {self.validation_rules['table_max_columns']}",
)
# Check cell content types
for row_idx, row in enumerate(table_data):
for col_idx, cell in enumerate(row):
if cell is None:
return False, f"Cell ({row_idx},{col_idx}) is None. All cells must be strings, use empty string '' for empty cells."
return (
False,
f"Cell ({row_idx},{col_idx}) is None. All cells must be strings, use empty string '' for empty cells.",
)
if not isinstance(cell, str):
return False, f"Cell ({row_idx},{col_idx}) is {type(cell).__name__}, not string. All cells must be strings. Value: {repr(cell)}"
return (
False,
f"Cell ({row_idx},{col_idx}) is {type(cell).__name__}, not string. All cells must be strings. Value: {repr(cell)}",
)
return True, f"Valid table data: {rows}×{cols} table format"
def validate_text_formatting_params(
self,
bold: Optional[bool] = None,
@@ -123,11 +156,11 @@ class ValidationManager:
font_size: Optional[int] = None,
font_family: Optional[str] = None,
text_color: Optional[Any] = None,
background_color: Optional[Any] = None
background_color: Optional[Any] = None,
) -> Tuple[bool, str]:
"""
Validate text formatting parameters.
Args:
bold: Bold setting
italic: Italic setting
@@ -136,36 +169,61 @@ class ValidationManager:
font_family: Font family name
text_color: Text color in hex or RGB tuple/list
background_color: Background color in hex or RGB tuple/list
Returns:
Tuple of (is_valid, error_message)
"""
# Check if at least one formatting option is provided
formatting_params = [
bold, italic, underline, font_size, font_family, text_color, background_color
bold,
italic,
underline,
font_size,
font_family,
text_color,
background_color,
]
if all(param is None for param in formatting_params):
return False, "At least one formatting parameter must be provided (bold, italic, underline, font_size, font_family, text_color, or background_color)"
return (
False,
"At least one formatting parameter must be provided (bold, italic, underline, font_size, font_family, text_color, or background_color)",
)
# Validate boolean parameters
for param, name in [(bold, 'bold'), (italic, 'italic'), (underline, 'underline')]:
for param, name in [
(bold, "bold"),
(italic, "italic"),
(underline, "underline"),
]:
if param is not None and not isinstance(param, bool):
return False, f"{name} parameter must be boolean (True/False), got {type(param).__name__}"
return (
False,
f"{name} parameter must be boolean (True/False), got {type(param).__name__}",
)
# Validate font size
if font_size is not None:
if not isinstance(font_size, int):
return False, f"font_size must be an integer, got {type(font_size).__name__}"
min_size, max_size = self.validation_rules['font_size_range']
return (
False,
f"font_size must be an integer, got {type(font_size).__name__}",
)
min_size, max_size = self.validation_rules["font_size_range"]
if not (min_size <= font_size <= max_size):
return False, f"font_size must be between {min_size} and {max_size} points, got {font_size}"
return (
False,
f"font_size must be between {min_size} and {max_size} points, got {font_size}",
)
# Validate font family
if font_family is not None:
if not isinstance(font_family, str):
return False, f"font_family must be a string, got {type(font_family).__name__}"
return (
False,
f"font_family must be a string, got {type(font_family).__name__}",
)
if not font_family.strip():
return False, "font_family cannot be empty"
@@ -174,10 +232,12 @@ class ValidationManager:
if not is_valid:
return False, error_msg
is_valid, error_msg = self.validate_color_param(background_color, "background_color")
is_valid, error_msg = self.validate_color_param(
background_color, "background_color"
)
if not is_valid:
return False, error_msg
return True, ""
def validate_color_param(self, color: Any, param_name: str) -> Tuple[bool, str]:
@@ -188,9 +248,14 @@ class ValidationManager:
return True, ""
if isinstance(color, str):
hex_color = color.lstrip('#')
if len(hex_color) != 6 or any(c not in "0123456789abcdefABCDEF" for c in hex_color):
return False, f"{param_name} must be a hex string like '#RRGGBB' or 'RRGGBB'"
hex_color = color.lstrip("#")
if len(hex_color) != 6 or any(
c not in "0123456789abcdefABCDEF" for c in hex_color
):
return (
False,
f"{param_name} must be a hex string like '#RRGGBB' or 'RRGGBB'",
)
return True, ""
if isinstance(color, (list, tuple)):
@@ -204,252 +269,305 @@ class ValidationManager:
if isinstance(component, int):
if component < 0 or component > 255:
return False, f"{comp_name} integer values must be between 0 and 255"
return (
False,
f"{comp_name} integer values must be between 0 and 255",
)
elif isinstance(component, float):
if component < 0 or component > 1:
return False, f"{comp_name} float values must be between 0 and 1"
return (
False,
f"{comp_name} float values must be between 0 and 1",
)
else:
return False, f"{comp_name} must be an int (0-255) or float (0-1), got {type(component).__name__}"
return (
False,
f"{comp_name} must be an int (0-255) or float (0-1), got {type(component).__name__}",
)
return True, ""
return False, f"{param_name} must be a hex string or RGB tuple/list like [255, 0, 0] or [1, 0, 0]"
return (
False,
f"{param_name} must be a hex string or RGB tuple/list like [255, 0, 0] or [1, 0, 0]",
)
def validate_index(self, index: int, context: str = "Index") -> Tuple[bool, str]:
"""
Validate a single document index.
Args:
index: Index to validate
context: Context description for error messages
Returns:
Tuple of (is_valid, error_message)
"""
if not isinstance(index, int):
return False, f"{context} must be an integer, got {type(index).__name__}"
if index < 0:
return False, f"{context} {index} is negative. You MUST call inspect_doc_structure first to get the proper insertion index."
return (
False,
f"{context} {index} is negative. You MUST call inspect_doc_structure first to get the proper insertion index.",
)
return True, ""
def validate_index_range(
self,
start_index: int,
end_index: Optional[int] = None,
document_length: Optional[int] = None
document_length: Optional[int] = None,
) -> Tuple[bool, str]:
"""
Validate document index ranges.
Args:
start_index: Starting index
end_index: Ending index (optional)
document_length: Total document length for bounds checking
Returns:
Tuple of (is_valid, error_message)
"""
# Validate start_index
if not isinstance(start_index, int):
return False, f"start_index must be an integer, got {type(start_index).__name__}"
return (
False,
f"start_index must be an integer, got {type(start_index).__name__}",
)
if start_index < 0:
return False, f"start_index cannot be negative, got {start_index}"
# Validate end_index if provided
if end_index is not None:
if not isinstance(end_index, int):
return False, f"end_index must be an integer, got {type(end_index).__name__}"
return (
False,
f"end_index must be an integer, got {type(end_index).__name__}",
)
if end_index <= start_index:
return False, f"end_index ({end_index}) must be greater than start_index ({start_index})"
return (
False,
f"end_index ({end_index}) must be greater than start_index ({start_index})",
)
# Validate against document length if provided
if document_length is not None:
if start_index >= document_length:
return False, f"start_index ({start_index}) exceeds document length ({document_length})"
return (
False,
f"start_index ({start_index}) exceeds document length ({document_length})",
)
if end_index is not None and end_index > document_length:
return False, f"end_index ({end_index}) exceeds document length ({document_length})"
return (
False,
f"end_index ({end_index}) exceeds document length ({document_length})",
)
return True, ""
def validate_element_insertion_params(
self,
element_type: str,
index: int,
**kwargs
self, element_type: str, index: int, **kwargs
) -> Tuple[bool, str]:
"""
Validate parameters for element insertion.
Args:
element_type: Type of element to insert
index: Insertion index
**kwargs: Additional parameters specific to element type
Returns:
Tuple of (is_valid, error_message)
"""
# Validate element type
if element_type not in self.validation_rules['valid_element_types']:
valid_types = ', '.join(self.validation_rules['valid_element_types'])
return False, f"Invalid element_type '{element_type}'. Must be one of: {valid_types}"
if element_type not in self.validation_rules["valid_element_types"]:
valid_types = ", ".join(self.validation_rules["valid_element_types"])
return (
False,
f"Invalid element_type '{element_type}'. Must be one of: {valid_types}",
)
# Validate index
if not isinstance(index, int) or index < 0:
return False, f"index must be a non-negative integer, got {index}"
# Validate element-specific parameters
if element_type == "table":
rows = kwargs.get('rows')
columns = kwargs.get('columns')
rows = kwargs.get("rows")
columns = kwargs.get("columns")
if not rows or not columns:
return False, "Table insertion requires 'rows' and 'columns' parameters"
if not isinstance(rows, int) or not isinstance(columns, int):
return False, "Table rows and columns must be integers"
if rows <= 0 or columns <= 0:
return False, "Table rows and columns must be positive integers"
if rows > self.validation_rules['table_max_rows']:
return False, f"Too many rows ({rows}). Maximum: {self.validation_rules['table_max_rows']}"
if columns > self.validation_rules['table_max_columns']:
return False, f"Too many columns ({columns}). Maximum: {self.validation_rules['table_max_columns']}"
if rows > self.validation_rules["table_max_rows"]:
return (
False,
f"Too many rows ({rows}). Maximum: {self.validation_rules['table_max_rows']}",
)
if columns > self.validation_rules["table_max_columns"]:
return (
False,
f"Too many columns ({columns}). Maximum: {self.validation_rules['table_max_columns']}",
)
elif element_type == "list":
list_type = kwargs.get('list_type')
list_type = kwargs.get("list_type")
if not list_type:
return False, "List insertion requires 'list_type' parameter"
if list_type not in self.validation_rules['valid_list_types']:
valid_types = ', '.join(self.validation_rules['valid_list_types'])
return False, f"Invalid list_type '{list_type}'. Must be one of: {valid_types}"
if list_type not in self.validation_rules["valid_list_types"]:
valid_types = ", ".join(self.validation_rules["valid_list_types"])
return (
False,
f"Invalid list_type '{list_type}'. Must be one of: {valid_types}",
)
return True, ""
def validate_header_footer_params(
self,
section_type: str,
header_footer_type: str = "DEFAULT"
self, section_type: str, header_footer_type: str = "DEFAULT"
) -> Tuple[bool, str]:
"""
Validate header/footer operation parameters.
Args:
section_type: Type of section ("header" or "footer")
header_footer_type: Specific header/footer type
Returns:
Tuple of (is_valid, error_message)
"""
if section_type not in self.validation_rules['valid_section_types']:
valid_types = ', '.join(self.validation_rules['valid_section_types'])
return False, f"section_type must be one of: {valid_types}, got '{section_type}'"
if header_footer_type not in self.validation_rules['valid_header_footer_types']:
valid_types = ', '.join(self.validation_rules['valid_header_footer_types'])
return False, f"header_footer_type must be one of: {valid_types}, got '{header_footer_type}'"
if section_type not in self.validation_rules["valid_section_types"]:
valid_types = ", ".join(self.validation_rules["valid_section_types"])
return (
False,
f"section_type must be one of: {valid_types}, got '{section_type}'",
)
if header_footer_type not in self.validation_rules["valid_header_footer_types"]:
valid_types = ", ".join(self.validation_rules["valid_header_footer_types"])
return (
False,
f"header_footer_type must be one of: {valid_types}, got '{header_footer_type}'",
)
return True, ""
def validate_batch_operations(self, operations: List[Dict[str, Any]]) -> Tuple[bool, str]:
def validate_batch_operations(
self, operations: List[Dict[str, Any]]
) -> Tuple[bool, str]:
"""
Validate a list of batch operations.
Args:
operations: List of operation dictionaries
Returns:
Tuple of (is_valid, error_message)
"""
if not operations:
return False, "Operations list cannot be empty"
if not isinstance(operations, list):
return False, f"Operations must be a list, got {type(operations).__name__}"
# Validate each operation
for i, op in enumerate(operations):
if not isinstance(op, dict):
return False, f"Operation {i+1} must be a dictionary, got {type(op).__name__}"
if 'type' not in op:
return False, f"Operation {i+1} missing required 'type' field"
return (
False,
f"Operation {i + 1} must be a dictionary, got {type(op).__name__}",
)
if "type" not in op:
return False, f"Operation {i + 1} missing required 'type' field"
# Validate required fields for the operation type
is_valid, error_msg = validate_operation(op)
if not is_valid:
return False, f"Operation {i+1}: {error_msg}"
return False, f"Operation {i + 1}: {error_msg}"
op_type = op['type']
op_type = op["type"]
if op_type == 'format_text':
if op_type == "format_text":
is_valid, error_msg = self.validate_text_formatting_params(
op.get('bold'),
op.get('italic'),
op.get('underline'),
op.get('font_size'),
op.get('font_family'),
op.get('text_color'),
op.get('background_color')
op.get("bold"),
op.get("italic"),
op.get("underline"),
op.get("font_size"),
op.get("font_family"),
op.get("text_color"),
op.get("background_color"),
)
if not is_valid:
return False, f"Operation {i+1} (format_text): {error_msg}"
return False, f"Operation {i + 1} (format_text): {error_msg}"
is_valid, error_msg = self.validate_index_range(
op['start_index'],
op['end_index']
op["start_index"], op["end_index"]
)
if not is_valid:
return False, f"Operation {i+1} (format_text): {error_msg}"
return False, f"Operation {i + 1} (format_text): {error_msg}"
return True, ""
def validate_text_content(self, text: str, max_length: Optional[int] = None) -> Tuple[bool, str]:
def validate_text_content(
self, text: str, max_length: Optional[int] = None
) -> Tuple[bool, str]:
"""
Validate text content for insertion.
Args:
text: Text to validate
max_length: Maximum allowed length
Returns:
Tuple of (is_valid, error_message)
"""
if not isinstance(text, str):
return False, f"Text must be a string, got {type(text).__name__}"
max_len = max_length or self.validation_rules['max_text_length']
max_len = max_length or self.validation_rules["max_text_length"]
if len(text) > max_len:
return False, f"Text too long ({len(text)} characters). Maximum: {max_len}"
return True, ""
def get_validation_summary(self) -> Dict[str, Any]:
"""
Get a summary of all validation rules and constraints.
Returns:
Dictionary containing validation rules
"""
return {
'constraints': self.validation_rules.copy(),
'supported_operations': {
'table_operations': ['create_table', 'populate_table'],
'text_operations': ['insert_text', 'format_text', 'find_replace'],
'element_operations': ['insert_table', 'insert_list', 'insert_page_break'],
'header_footer_operations': ['update_header', 'update_footer']
"constraints": self.validation_rules.copy(),
"supported_operations": {
"table_operations": ["create_table", "populate_table"],
"text_operations": ["insert_text", "format_text", "find_replace"],
"element_operations": [
"insert_table",
"insert_list",
"insert_page_break",
],
"header_footer_operations": ["update_header", "update_footer"],
},
"data_formats": {
"table_data": "2D list of strings: [['col1', 'col2'], ['row1col1', 'row1col2']]",
"text_formatting": "Optional boolean/integer parameters for styling",
"document_indices": "Non-negative integers for position specification",
},
'data_formats': {
'table_data': "2D list of strings: [['col1', 'col2'], ['row1col1', 'row1col2']]",
'text_formatting': "Optional boolean/integer parameters for styling",
'document_indices': "Non-negative integers for position specification"
}
}