feat: initial commit from workspace-mcp
Some checks failed
Check Maintainer Edits Enabled / check-maintainer-edits (pull_request) Has been cancelled
Check Maintainer Edits Enabled / check-maintainer-edits-internal (pull_request) Has been cancelled
Docker Build and Push to GHCR / build-and-push (pull_request) Has been cancelled
Ruff / ruff (pull_request) Has been cancelled
Some checks failed
Check Maintainer Edits Enabled / check-maintainer-edits (pull_request) Has been cancelled
Check Maintainer Edits Enabled / check-maintainer-edits-internal (pull_request) Has been cancelled
Docker Build and Push to GHCR / build-and-push (pull_request) Has been cancelled
Ruff / ruff (pull_request) Has been cancelled
This commit is contained in:
0
gdocs/__init__.py
Normal file
0
gdocs/__init__.py
Normal file
720
gdocs/docs_helpers.py
Normal file
720
gdocs/docs_helpers.py
Normal file
@@ -0,0 +1,720 @@
|
||||
"""
|
||||
Google Docs Helper Functions
|
||||
|
||||
This module provides utility functions for common Google Docs operations
|
||||
to simplify the implementation of document editing tools.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _normalize_color(
|
||||
color: Optional[str], param_name: str
|
||||
) -> Optional[Dict[str, float]]:
|
||||
"""
|
||||
Normalize a user-supplied color into Docs API rgbColor format.
|
||||
|
||||
Supports only hex strings in the form "#RRGGBB".
|
||||
"""
|
||||
if color is None:
|
||||
return None
|
||||
|
||||
if not isinstance(color, str):
|
||||
raise ValueError(f"{param_name} must be a hex string like '#RRGGBB'")
|
||||
|
||||
if len(color) != 7 or not color.startswith("#"):
|
||||
raise ValueError(f"{param_name} must be a hex string like '#RRGGBB'")
|
||||
|
||||
hex_color = color[1:]
|
||||
if any(c not in "0123456789abcdefABCDEF" for c in hex_color):
|
||||
raise ValueError(f"{param_name} must be a hex string like '#RRGGBB'")
|
||||
|
||||
r = int(hex_color[0:2], 16) / 255
|
||||
g = int(hex_color[2:4], 16) / 255
|
||||
b = int(hex_color[4:6], 16) / 255
|
||||
return {"red": r, "green": g, "blue": b}
|
||||
|
||||
|
||||
def build_text_style(
|
||||
bold: bool = None,
|
||||
italic: bool = None,
|
||||
underline: bool = None,
|
||||
font_size: int = None,
|
||||
font_family: str = None,
|
||||
text_color: str = None,
|
||||
background_color: str = None,
|
||||
link_url: str = None,
|
||||
) -> tuple[Dict[str, Any], list[str]]:
|
||||
"""
|
||||
Build text style object for Google Docs API requests.
|
||||
|
||||
Args:
|
||||
bold: Whether text should be bold
|
||||
italic: Whether text should be italic
|
||||
underline: Whether text should be underlined
|
||||
font_size: Font size in points
|
||||
font_family: Font family name
|
||||
text_color: Text color as hex string "#RRGGBB"
|
||||
background_color: Background (highlight) color as hex string "#RRGGBB"
|
||||
link_url: Hyperlink URL (http/https)
|
||||
|
||||
Returns:
|
||||
Tuple of (text_style_dict, list_of_field_names)
|
||||
"""
|
||||
text_style = {}
|
||||
fields = []
|
||||
|
||||
if bold is not None:
|
||||
text_style["bold"] = bold
|
||||
fields.append("bold")
|
||||
|
||||
if italic is not None:
|
||||
text_style["italic"] = italic
|
||||
fields.append("italic")
|
||||
|
||||
if underline is not None:
|
||||
text_style["underline"] = underline
|
||||
fields.append("underline")
|
||||
|
||||
if font_size is not None:
|
||||
text_style["fontSize"] = {"magnitude": font_size, "unit": "PT"}
|
||||
fields.append("fontSize")
|
||||
|
||||
if font_family is not None:
|
||||
text_style["weightedFontFamily"] = {"fontFamily": font_family}
|
||||
fields.append("weightedFontFamily")
|
||||
|
||||
if text_color is not None:
|
||||
rgb = _normalize_color(text_color, "text_color")
|
||||
text_style["foregroundColor"] = {"color": {"rgbColor": rgb}}
|
||||
fields.append("foregroundColor")
|
||||
|
||||
if background_color is not None:
|
||||
rgb = _normalize_color(background_color, "background_color")
|
||||
text_style["backgroundColor"] = {"color": {"rgbColor": rgb}}
|
||||
fields.append("backgroundColor")
|
||||
|
||||
if link_url is not None:
|
||||
text_style["link"] = {"url": link_url}
|
||||
fields.append("link")
|
||||
|
||||
return text_style, fields
|
||||
|
||||
|
||||
def build_paragraph_style(
|
||||
heading_level: int = None,
|
||||
alignment: str = None,
|
||||
line_spacing: float = None,
|
||||
indent_first_line: float = None,
|
||||
indent_start: float = None,
|
||||
indent_end: float = None,
|
||||
space_above: float = None,
|
||||
space_below: float = None,
|
||||
named_style_type: str = None,
|
||||
) -> tuple[Dict[str, Any], list[str]]:
|
||||
"""
|
||||
Build paragraph style object for Google Docs API requests.
|
||||
|
||||
Args:
|
||||
heading_level: Heading level 0-6 (0 = NORMAL_TEXT, 1-6 = HEADING_N)
|
||||
alignment: Text alignment - 'START', 'CENTER', 'END', or 'JUSTIFIED'
|
||||
line_spacing: Line spacing multiplier (1.0 = single, 2.0 = double)
|
||||
indent_first_line: First line indent in points
|
||||
indent_start: Left/start indent in points
|
||||
indent_end: Right/end indent in points
|
||||
space_above: Space above paragraph in points
|
||||
space_below: Space below paragraph in points
|
||||
named_style_type: Direct named style (TITLE, SUBTITLE, HEADING_1..6, NORMAL_TEXT).
|
||||
Takes precedence over heading_level when both are provided.
|
||||
|
||||
Returns:
|
||||
Tuple of (paragraph_style_dict, list_of_field_names)
|
||||
"""
|
||||
paragraph_style = {}
|
||||
fields = []
|
||||
|
||||
if named_style_type is not None:
|
||||
valid_styles = [
|
||||
"NORMAL_TEXT",
|
||||
"TITLE",
|
||||
"SUBTITLE",
|
||||
"HEADING_1",
|
||||
"HEADING_2",
|
||||
"HEADING_3",
|
||||
"HEADING_4",
|
||||
"HEADING_5",
|
||||
"HEADING_6",
|
||||
]
|
||||
if named_style_type not in valid_styles:
|
||||
raise ValueError(
|
||||
f"Invalid named_style_type '{named_style_type}'. "
|
||||
f"Must be one of: {', '.join(valid_styles)}"
|
||||
)
|
||||
paragraph_style["namedStyleType"] = named_style_type
|
||||
fields.append("namedStyleType")
|
||||
elif heading_level is not None:
|
||||
if heading_level < 0 or heading_level > 6:
|
||||
raise ValueError("heading_level must be between 0 (normal text) and 6")
|
||||
if heading_level == 0:
|
||||
paragraph_style["namedStyleType"] = "NORMAL_TEXT"
|
||||
else:
|
||||
paragraph_style["namedStyleType"] = f"HEADING_{heading_level}"
|
||||
fields.append("namedStyleType")
|
||||
|
||||
if alignment is not None:
|
||||
valid_alignments = ["START", "CENTER", "END", "JUSTIFIED"]
|
||||
alignment_upper = alignment.upper()
|
||||
if alignment_upper not in valid_alignments:
|
||||
raise ValueError(
|
||||
f"Invalid alignment '{alignment}'. Must be one of: {valid_alignments}"
|
||||
)
|
||||
paragraph_style["alignment"] = alignment_upper
|
||||
fields.append("alignment")
|
||||
|
||||
if line_spacing is not None:
|
||||
if line_spacing <= 0:
|
||||
raise ValueError("line_spacing must be positive")
|
||||
paragraph_style["lineSpacing"] = line_spacing * 100
|
||||
fields.append("lineSpacing")
|
||||
|
||||
if indent_first_line is not None:
|
||||
paragraph_style["indentFirstLine"] = {
|
||||
"magnitude": indent_first_line,
|
||||
"unit": "PT",
|
||||
}
|
||||
fields.append("indentFirstLine")
|
||||
|
||||
if indent_start is not None:
|
||||
paragraph_style["indentStart"] = {"magnitude": indent_start, "unit": "PT"}
|
||||
fields.append("indentStart")
|
||||
|
||||
if indent_end is not None:
|
||||
paragraph_style["indentEnd"] = {"magnitude": indent_end, "unit": "PT"}
|
||||
fields.append("indentEnd")
|
||||
|
||||
if space_above is not None:
|
||||
paragraph_style["spaceAbove"] = {"magnitude": space_above, "unit": "PT"}
|
||||
fields.append("spaceAbove")
|
||||
|
||||
if space_below is not None:
|
||||
paragraph_style["spaceBelow"] = {"magnitude": space_below, "unit": "PT"}
|
||||
fields.append("spaceBelow")
|
||||
|
||||
return paragraph_style, fields
|
||||
|
||||
|
||||
def create_insert_text_request(
|
||||
index: int, text: str, tab_id: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an insertText request for Google Docs API.
|
||||
|
||||
Args:
|
||||
index: Position to insert text
|
||||
text: Text to insert
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the insertText request
|
||||
"""
|
||||
location = {"index": index}
|
||||
if tab_id:
|
||||
location["tabId"] = tab_id
|
||||
return {"insertText": {"location": location, "text": text}}
|
||||
|
||||
|
||||
def create_insert_text_segment_request(
|
||||
index: int, text: str, segment_id: str, tab_id: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an insertText request for Google Docs API with segmentId (for headers/footers).
|
||||
|
||||
Args:
|
||||
index: Position to insert text
|
||||
text: Text to insert
|
||||
segment_id: Segment ID (for targeting headers/footers)
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the insertText request with segmentId and optional tabId
|
||||
"""
|
||||
location = {"segmentId": segment_id, "index": index}
|
||||
if tab_id:
|
||||
location["tabId"] = tab_id
|
||||
return {
|
||||
"insertText": {
|
||||
"location": location,
|
||||
"text": text,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_delete_range_request(
|
||||
start_index: int, end_index: int, tab_id: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a deleteContentRange request for Google Docs API.
|
||||
|
||||
Args:
|
||||
start_index: Start position of content to delete
|
||||
end_index: End position of content to delete
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the deleteContentRange request
|
||||
"""
|
||||
range_obj = {"startIndex": start_index, "endIndex": end_index}
|
||||
if tab_id:
|
||||
range_obj["tabId"] = tab_id
|
||||
return {"deleteContentRange": {"range": range_obj}}
|
||||
|
||||
|
||||
def create_format_text_request(
|
||||
start_index: int,
|
||||
end_index: int,
|
||||
bold: bool = None,
|
||||
italic: bool = None,
|
||||
underline: bool = None,
|
||||
font_size: int = None,
|
||||
font_family: str = None,
|
||||
text_color: str = None,
|
||||
background_color: str = None,
|
||||
link_url: str = None,
|
||||
tab_id: Optional[str] = None,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Create an updateTextStyle request for Google Docs API.
|
||||
|
||||
Args:
|
||||
start_index: Start position of text to format
|
||||
end_index: End position of text to format
|
||||
bold: Whether text should be bold
|
||||
italic: Whether text should be italic
|
||||
underline: Whether text should be underlined
|
||||
font_size: Font size in points
|
||||
font_family: Font family name
|
||||
text_color: Text color as hex string "#RRGGBB"
|
||||
background_color: Background (highlight) color as hex string "#RRGGBB"
|
||||
link_url: Hyperlink URL (http/https)
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the updateTextStyle request, or None if no styles provided
|
||||
"""
|
||||
text_style, fields = build_text_style(
|
||||
bold,
|
||||
italic,
|
||||
underline,
|
||||
font_size,
|
||||
font_family,
|
||||
text_color,
|
||||
background_color,
|
||||
link_url,
|
||||
)
|
||||
|
||||
if not text_style:
|
||||
return None
|
||||
|
||||
range_obj = {"startIndex": start_index, "endIndex": end_index}
|
||||
if tab_id:
|
||||
range_obj["tabId"] = tab_id
|
||||
|
||||
return {
|
||||
"updateTextStyle": {
|
||||
"range": range_obj,
|
||||
"textStyle": text_style,
|
||||
"fields": ",".join(fields),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_update_paragraph_style_request(
|
||||
start_index: int,
|
||||
end_index: int,
|
||||
heading_level: int = None,
|
||||
alignment: str = None,
|
||||
line_spacing: float = None,
|
||||
indent_first_line: float = None,
|
||||
indent_start: float = None,
|
||||
indent_end: float = None,
|
||||
space_above: float = None,
|
||||
space_below: float = None,
|
||||
tab_id: Optional[str] = None,
|
||||
named_style_type: str = None,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Create an updateParagraphStyle request for Google Docs API.
|
||||
|
||||
Args:
|
||||
start_index: Start position of paragraph range
|
||||
end_index: End position of paragraph range
|
||||
heading_level: Heading level 0-6 (0 = NORMAL_TEXT, 1-6 = HEADING_N)
|
||||
alignment: Text alignment - 'START', 'CENTER', 'END', or 'JUSTIFIED'
|
||||
line_spacing: Line spacing multiplier (1.0 = single, 2.0 = double)
|
||||
indent_first_line: First line indent in points
|
||||
indent_start: Left/start indent in points
|
||||
indent_end: Right/end indent in points
|
||||
space_above: Space above paragraph in points
|
||||
space_below: Space below paragraph in points
|
||||
tab_id: Optional ID of the tab to target
|
||||
named_style_type: Direct named style (TITLE, SUBTITLE, HEADING_1..6, NORMAL_TEXT)
|
||||
|
||||
Returns:
|
||||
Dictionary representing the updateParagraphStyle request, or None if no styles provided
|
||||
"""
|
||||
paragraph_style, fields = build_paragraph_style(
|
||||
heading_level,
|
||||
alignment,
|
||||
line_spacing,
|
||||
indent_first_line,
|
||||
indent_start,
|
||||
indent_end,
|
||||
space_above,
|
||||
space_below,
|
||||
named_style_type,
|
||||
)
|
||||
|
||||
if not paragraph_style:
|
||||
return None
|
||||
|
||||
range_obj = {"startIndex": start_index, "endIndex": end_index}
|
||||
if tab_id:
|
||||
range_obj["tabId"] = tab_id
|
||||
|
||||
return {
|
||||
"updateParagraphStyle": {
|
||||
"range": range_obj,
|
||||
"paragraphStyle": paragraph_style,
|
||||
"fields": ",".join(fields),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_find_replace_request(
|
||||
find_text: str,
|
||||
replace_text: str,
|
||||
match_case: bool = False,
|
||||
tab_id: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a replaceAllText request for Google Docs API.
|
||||
|
||||
Args:
|
||||
find_text: Text to find
|
||||
replace_text: Text to replace with
|
||||
match_case: Whether to match case exactly
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the replaceAllText request
|
||||
"""
|
||||
request = {
|
||||
"replaceAllText": {
|
||||
"containsText": {"text": find_text, "matchCase": match_case},
|
||||
"replaceText": replace_text,
|
||||
}
|
||||
}
|
||||
if tab_id:
|
||||
request["replaceAllText"]["tabsCriteria"] = {"tabIds": [tab_id]}
|
||||
return request
|
||||
|
||||
|
||||
def create_insert_table_request(
|
||||
index: int, rows: int, columns: int, tab_id: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an insertTable request for Google Docs API.
|
||||
|
||||
Args:
|
||||
index: Position to insert table
|
||||
rows: Number of rows
|
||||
columns: Number of columns
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the insertTable request
|
||||
"""
|
||||
location = {"index": index}
|
||||
if tab_id:
|
||||
location["tabId"] = tab_id
|
||||
return {"insertTable": {"location": location, "rows": rows, "columns": columns}}
|
||||
|
||||
|
||||
def create_insert_page_break_request(
|
||||
index: int, tab_id: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an insertPageBreak request for Google Docs API.
|
||||
|
||||
Args:
|
||||
index: Position to insert page break
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the insertPageBreak request
|
||||
"""
|
||||
location = {"index": index}
|
||||
if tab_id:
|
||||
location["tabId"] = tab_id
|
||||
return {"insertPageBreak": {"location": location}}
|
||||
|
||||
|
||||
def create_insert_doc_tab_request(
|
||||
title: str, index: int, parent_tab_id: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an addDocumentTab request for Google Docs API.
|
||||
|
||||
Args:
|
||||
title: Title of the new tab
|
||||
index: Position to insert the tab
|
||||
parent_tab_id: Optional ID of the parent tab to nest under
|
||||
|
||||
Returns:
|
||||
Dictionary representing the addDocumentTab request
|
||||
"""
|
||||
tab_properties: Dict[str, Any] = {
|
||||
"title": title,
|
||||
"index": index,
|
||||
}
|
||||
if parent_tab_id:
|
||||
tab_properties["parentTabId"] = parent_tab_id
|
||||
return {
|
||||
"addDocumentTab": {
|
||||
"tabProperties": tab_properties,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_delete_doc_tab_request(tab_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a deleteDocumentTab request for Google Docs API.
|
||||
|
||||
Args:
|
||||
tab_id: ID of the tab to delete
|
||||
|
||||
Returns:
|
||||
Dictionary representing the deleteDocumentTab request
|
||||
"""
|
||||
return {"deleteTab": {"tabId": tab_id}}
|
||||
|
||||
|
||||
def create_update_doc_tab_request(tab_id: str, title: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an updateDocumentTab request for Google Docs API.
|
||||
|
||||
Args:
|
||||
tab_id: ID of the tab to update
|
||||
title: New title for the tab
|
||||
|
||||
Returns:
|
||||
Dictionary representing the updateDocumentTab request
|
||||
"""
|
||||
return {
|
||||
"updateDocumentTabProperties": {
|
||||
"tabProperties": {
|
||||
"tabId": tab_id,
|
||||
"title": title,
|
||||
},
|
||||
"fields": "title",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_insert_image_request(
|
||||
index: int,
|
||||
image_uri: str,
|
||||
width: int = None,
|
||||
height: int = None,
|
||||
tab_id: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an insertInlineImage request for Google Docs API.
|
||||
|
||||
Args:
|
||||
index: Position to insert image
|
||||
image_uri: URI of the image (Drive URL or public URL)
|
||||
width: Image width in points
|
||||
height: Image height in points
|
||||
tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the insertInlineImage request
|
||||
"""
|
||||
location = {"index": index}
|
||||
if tab_id:
|
||||
location["tabId"] = tab_id
|
||||
|
||||
request = {"insertInlineImage": {"location": location, "uri": image_uri}}
|
||||
|
||||
# Add size properties if specified
|
||||
object_size = {}
|
||||
if width is not None:
|
||||
object_size["width"] = {"magnitude": width, "unit": "PT"}
|
||||
if height is not None:
|
||||
object_size["height"] = {"magnitude": height, "unit": "PT"}
|
||||
|
||||
if object_size:
|
||||
request["insertInlineImage"]["objectSize"] = object_size
|
||||
|
||||
return request
|
||||
|
||||
|
||||
def create_bullet_list_request(
|
||||
start_index: int,
|
||||
end_index: int,
|
||||
list_type: str = "UNORDERED",
|
||||
nesting_level: int = None,
|
||||
paragraph_start_indices: Optional[list[int]] = None,
|
||||
doc_tab_id: Optional[str] = None,
|
||||
) -> list[Dict[str, Any]]:
|
||||
"""
|
||||
Create requests to apply bullet list formatting with optional nesting.
|
||||
|
||||
Google Docs infers list nesting from leading tab characters. To set a nested
|
||||
level, this helper inserts literal tab characters before each targeted
|
||||
paragraph, then calls createParagraphBullets. This is a Docs API workaround
|
||||
and does temporarily mutate content/index positions while the batch executes.
|
||||
|
||||
Args:
|
||||
start_index: Start of text range to convert to list
|
||||
end_index: End of text range to convert to list
|
||||
list_type: Type of list ("UNORDERED" or "ORDERED")
|
||||
nesting_level: Nesting level (0-8, where 0 is top level). If None or 0, no tabs added.
|
||||
paragraph_start_indices: Optional paragraph start positions for ranges with
|
||||
multiple paragraphs. If omitted, only start_index is tab-prefixed.
|
||||
doc_tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
List of request dictionaries (insertText for nesting tabs if needed,
|
||||
then createParagraphBullets)
|
||||
"""
|
||||
bullet_preset = (
|
||||
"BULLET_DISC_CIRCLE_SQUARE"
|
||||
if list_type == "UNORDERED"
|
||||
else "NUMBERED_DECIMAL_ALPHA_ROMAN"
|
||||
)
|
||||
|
||||
# Validate nesting level
|
||||
if nesting_level is not None:
|
||||
if not isinstance(nesting_level, int):
|
||||
raise ValueError("nesting_level must be an integer between 0 and 8")
|
||||
if nesting_level < 0 or nesting_level > 8:
|
||||
raise ValueError("nesting_level must be between 0 and 8")
|
||||
|
||||
requests = []
|
||||
|
||||
# Insert tabs for nesting if needed (nesting_level > 0).
|
||||
# For multi-paragraph ranges, callers should provide paragraph_start_indices.
|
||||
if nesting_level and nesting_level > 0:
|
||||
tabs = "\t" * nesting_level
|
||||
paragraph_starts = paragraph_start_indices or [start_index]
|
||||
paragraph_starts = sorted(set(paragraph_starts))
|
||||
|
||||
if any(not isinstance(idx, int) for idx in paragraph_starts):
|
||||
raise ValueError("paragraph_start_indices must contain only integers")
|
||||
|
||||
original_start = start_index
|
||||
original_end = end_index
|
||||
inserted_char_count = 0
|
||||
|
||||
for paragraph_start in paragraph_starts:
|
||||
adjusted_start = paragraph_start + inserted_char_count
|
||||
requests.append(
|
||||
create_insert_text_request(adjusted_start, tabs, doc_tab_id)
|
||||
)
|
||||
inserted_char_count += nesting_level
|
||||
|
||||
# Keep createParagraphBullets range aligned to the same logical content.
|
||||
start_index += (
|
||||
sum(1 for idx in paragraph_starts if idx < original_start) * nesting_level
|
||||
)
|
||||
end_index += (
|
||||
sum(1 for idx in paragraph_starts if idx < original_end) * nesting_level
|
||||
)
|
||||
|
||||
# Create the bullet list
|
||||
range_obj = {"startIndex": start_index, "endIndex": end_index}
|
||||
if doc_tab_id:
|
||||
range_obj["tabId"] = doc_tab_id
|
||||
|
||||
requests.append(
|
||||
{
|
||||
"createParagraphBullets": {
|
||||
"range": range_obj,
|
||||
"bulletPreset": bullet_preset,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return requests
|
||||
|
||||
|
||||
def create_delete_bullet_list_request(
|
||||
start_index: int,
|
||||
end_index: int,
|
||||
doc_tab_id: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a deleteParagraphBullets request to remove bullet/list formatting.
|
||||
|
||||
Args:
|
||||
start_index: Start of the paragraph range
|
||||
end_index: End of the paragraph range
|
||||
doc_tab_id: Optional ID of the tab to target
|
||||
|
||||
Returns:
|
||||
Dictionary representing the deleteParagraphBullets request
|
||||
"""
|
||||
range_obj = {"startIndex": start_index, "endIndex": end_index}
|
||||
if doc_tab_id:
|
||||
range_obj["tabId"] = doc_tab_id
|
||||
|
||||
return {
|
||||
"deleteParagraphBullets": {
|
||||
"range": range_obj,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def validate_operation(operation: Dict[str, Any]) -> tuple[bool, str]:
|
||||
"""
|
||||
Validate a batch operation dictionary.
|
||||
|
||||
Args:
|
||||
operation: Operation dictionary to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
op_type = operation.get("type")
|
||||
if not op_type:
|
||||
return False, "Missing 'type' field"
|
||||
|
||||
# Validate required fields for each operation type
|
||||
required_fields = {
|
||||
"insert_text": ["index", "text"],
|
||||
"delete_text": ["start_index", "end_index"],
|
||||
"replace_text": ["start_index", "end_index", "text"],
|
||||
"format_text": ["start_index", "end_index"],
|
||||
"update_paragraph_style": ["start_index", "end_index"],
|
||||
"insert_table": ["index", "rows", "columns"],
|
||||
"insert_page_break": ["index"],
|
||||
"find_replace": ["find_text", "replace_text"],
|
||||
"create_bullet_list": ["start_index", "end_index"],
|
||||
"insert_doc_tab": ["title", "index"],
|
||||
"delete_doc_tab": ["tab_id"],
|
||||
"update_doc_tab": ["tab_id", "title"],
|
||||
}
|
||||
|
||||
if op_type not in required_fields:
|
||||
return False, f"Unsupported operation type: {op_type or 'None'}"
|
||||
|
||||
for field in required_fields[op_type]:
|
||||
if field not in operation:
|
||||
return False, f"Missing required field: {field}"
|
||||
|
||||
return True, ""
|
||||
344
gdocs/docs_markdown.py
Normal file
344
gdocs/docs_markdown.py
Normal file
@@ -0,0 +1,344 @@
|
||||
"""
|
||||
Google Docs to Markdown Converter
|
||||
|
||||
Converts Google Docs API JSON responses to clean Markdown, preserving:
|
||||
- Headings (H1-H6, Title, Subtitle)
|
||||
- Bold, italic, strikethrough, code, links
|
||||
- Ordered and unordered lists with nesting
|
||||
- Checklists with checked/unchecked state
|
||||
- Tables with header row separators
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MONO_FONTS = {"Courier New", "Consolas", "Roboto Mono", "Source Code Pro"}
|
||||
|
||||
HEADING_MAP = {
|
||||
"TITLE": "#",
|
||||
"SUBTITLE": "##",
|
||||
"HEADING_1": "#",
|
||||
"HEADING_2": "##",
|
||||
"HEADING_3": "###",
|
||||
"HEADING_4": "####",
|
||||
"HEADING_5": "#####",
|
||||
"HEADING_6": "######",
|
||||
}
|
||||
|
||||
|
||||
def convert_doc_to_markdown(doc: dict[str, Any]) -> str:
|
||||
"""Convert a Google Docs API document response to markdown.
|
||||
|
||||
Args:
|
||||
doc: The document JSON from docs.documents.get()
|
||||
|
||||
Returns:
|
||||
Markdown string
|
||||
"""
|
||||
body = doc.get("body", {})
|
||||
content = body.get("content", [])
|
||||
lists_meta = doc.get("lists", {})
|
||||
|
||||
lines: list[str] = []
|
||||
ordered_counters: dict[tuple[str, int], int] = {}
|
||||
prev_was_list = False
|
||||
|
||||
for element in content:
|
||||
if "paragraph" in element:
|
||||
para = element["paragraph"]
|
||||
text = _convert_paragraph_text(para)
|
||||
|
||||
if not text.strip():
|
||||
if prev_was_list:
|
||||
prev_was_list = False
|
||||
continue
|
||||
|
||||
bullet = para.get("bullet")
|
||||
if bullet:
|
||||
list_id = bullet["listId"]
|
||||
nesting = bullet.get("nestingLevel", 0)
|
||||
|
||||
if _is_checklist(lists_meta, list_id, nesting):
|
||||
checked = _is_checked(para)
|
||||
checkbox = "[x]" if checked else "[ ]"
|
||||
indent = " " * nesting
|
||||
# Re-render text without strikethrough for checked items
|
||||
# to avoid redundant ~~text~~ alongside [x]
|
||||
cb_text = (
|
||||
_convert_paragraph_text(para, skip_strikethrough=True)
|
||||
if checked
|
||||
else text
|
||||
)
|
||||
lines.append(f"{indent}- {checkbox} {cb_text}")
|
||||
elif _is_ordered_list(lists_meta, list_id, nesting):
|
||||
key = (list_id, nesting)
|
||||
ordered_counters[key] = ordered_counters.get(key, 0) + 1
|
||||
counter = ordered_counters[key]
|
||||
indent = " " * nesting
|
||||
lines.append(f"{indent}{counter}. {text}")
|
||||
else:
|
||||
indent = " " * nesting
|
||||
lines.append(f"{indent}- {text}")
|
||||
prev_was_list = True
|
||||
else:
|
||||
if prev_was_list:
|
||||
ordered_counters.clear()
|
||||
lines.append("")
|
||||
prev_was_list = False
|
||||
|
||||
style = para.get("paragraphStyle", {})
|
||||
named_style = style.get("namedStyleType", "NORMAL_TEXT")
|
||||
prefix = HEADING_MAP.get(named_style, "")
|
||||
|
||||
if prefix:
|
||||
lines.append(f"{prefix} {text}")
|
||||
lines.append("")
|
||||
else:
|
||||
lines.append(text)
|
||||
lines.append("")
|
||||
|
||||
elif "table" in element:
|
||||
if prev_was_list:
|
||||
ordered_counters.clear()
|
||||
lines.append("")
|
||||
prev_was_list = False
|
||||
table_md = _convert_table(element["table"])
|
||||
lines.append(table_md)
|
||||
lines.append("")
|
||||
|
||||
result = "\n".join(lines).rstrip("\n") + "\n"
|
||||
return result
|
||||
|
||||
|
||||
def _convert_paragraph_text(
|
||||
para: dict[str, Any], skip_strikethrough: bool = False
|
||||
) -> str:
|
||||
"""Convert paragraph elements to inline markdown text."""
|
||||
parts: list[str] = []
|
||||
for elem in para.get("elements", []):
|
||||
if "textRun" in elem:
|
||||
parts.append(_convert_text_run(elem["textRun"], skip_strikethrough))
|
||||
return "".join(parts).strip()
|
||||
|
||||
|
||||
def _convert_text_run(
|
||||
text_run: dict[str, Any], skip_strikethrough: bool = False
|
||||
) -> str:
|
||||
"""Convert a single text run to markdown."""
|
||||
content = text_run.get("content", "")
|
||||
style = text_run.get("textStyle", {})
|
||||
|
||||
text = content.rstrip("\n")
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
return _apply_text_style(text, style, skip_strikethrough)
|
||||
|
||||
|
||||
def _apply_text_style(
|
||||
text: str, style: dict[str, Any], skip_strikethrough: bool = False
|
||||
) -> str:
|
||||
"""Apply markdown formatting based on text style."""
|
||||
link = style.get("link", {})
|
||||
url = link.get("url")
|
||||
|
||||
font_family = style.get("weightedFontFamily", {}).get("fontFamily", "")
|
||||
if font_family in MONO_FONTS:
|
||||
return f"`{text}`"
|
||||
|
||||
bold = style.get("bold", False)
|
||||
italic = style.get("italic", False)
|
||||
strikethrough = style.get("strikethrough", False)
|
||||
|
||||
if bold and italic:
|
||||
text = f"***{text}***"
|
||||
elif bold:
|
||||
text = f"**{text}**"
|
||||
elif italic:
|
||||
text = f"*{text}*"
|
||||
|
||||
if strikethrough and not skip_strikethrough:
|
||||
text = f"~~{text}~~"
|
||||
|
||||
if url:
|
||||
text = f"[{text}]({url})"
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def _is_ordered_list(lists_meta: dict[str, Any], list_id: str, nesting: int) -> bool:
|
||||
"""Check if a list at a given nesting level is ordered."""
|
||||
list_info = lists_meta.get(list_id, {})
|
||||
nesting_levels = list_info.get("listProperties", {}).get("nestingLevels", [])
|
||||
if nesting < len(nesting_levels):
|
||||
level = nesting_levels[nesting]
|
||||
glyph = level.get("glyphType", "")
|
||||
return glyph not in ("", "GLYPH_TYPE_UNSPECIFIED")
|
||||
return False
|
||||
|
||||
|
||||
def _is_checklist(lists_meta: dict[str, Any], list_id: str, nesting: int) -> bool:
|
||||
"""Check if a list at a given nesting level is a checklist.
|
||||
|
||||
Google Docs checklists are distinguished from regular bullet lists by having
|
||||
GLYPH_TYPE_UNSPECIFIED with no glyphSymbol — the Docs UI renders interactive
|
||||
checkboxes rather than a static glyph character.
|
||||
"""
|
||||
list_info = lists_meta.get(list_id, {})
|
||||
nesting_levels = list_info.get("listProperties", {}).get("nestingLevels", [])
|
||||
if nesting < len(nesting_levels):
|
||||
level = nesting_levels[nesting]
|
||||
glyph_type = level.get("glyphType", "")
|
||||
has_glyph_symbol = "glyphSymbol" in level
|
||||
return glyph_type in ("", "GLYPH_TYPE_UNSPECIFIED") and not has_glyph_symbol
|
||||
return False
|
||||
|
||||
|
||||
def _is_checked(para: dict[str, Any]) -> bool:
|
||||
"""Check if a checklist item is checked.
|
||||
|
||||
Google Docs marks checked checklist items by applying strikethrough
|
||||
formatting to the paragraph text.
|
||||
"""
|
||||
for elem in para.get("elements", []):
|
||||
if "textRun" in elem:
|
||||
content = elem["textRun"].get("content", "").strip()
|
||||
if content:
|
||||
return elem["textRun"].get("textStyle", {}).get("strikethrough", False)
|
||||
return False
|
||||
|
||||
|
||||
def _convert_table(table: dict[str, Any]) -> str:
|
||||
"""Convert a table element to markdown."""
|
||||
rows = table.get("tableRows", [])
|
||||
if not rows:
|
||||
return ""
|
||||
|
||||
md_rows: list[str] = []
|
||||
for i, row in enumerate(rows):
|
||||
cells: list[str] = []
|
||||
for cell in row.get("tableCells", []):
|
||||
cell_text = _extract_cell_text(cell)
|
||||
cells.append(cell_text)
|
||||
md_rows.append("| " + " | ".join(cells) + " |")
|
||||
|
||||
if i == 0:
|
||||
sep = "| " + " | ".join("---" for _ in cells) + " |"
|
||||
md_rows.append(sep)
|
||||
|
||||
return "\n".join(md_rows)
|
||||
|
||||
|
||||
def _extract_cell_text(cell: dict[str, Any]) -> str:
|
||||
"""Extract text from a table cell."""
|
||||
parts: list[str] = []
|
||||
for content_elem in cell.get("content", []):
|
||||
if "paragraph" in content_elem:
|
||||
text = _convert_paragraph_text(content_elem["paragraph"])
|
||||
if text.strip():
|
||||
parts.append(text.strip())
|
||||
cell_text = " ".join(parts)
|
||||
return cell_text.replace("|", "\\|")
|
||||
|
||||
|
||||
def format_comments_inline(markdown: str, comments: list[dict[str, Any]]) -> str:
|
||||
"""Insert footnote-style comment annotations inline in markdown.
|
||||
|
||||
For each comment, finds the anchor text in the markdown and inserts
|
||||
a footnote reference. Unmatched comments go to an appendix at the bottom.
|
||||
"""
|
||||
if not comments:
|
||||
return markdown
|
||||
|
||||
footnotes: list[str] = []
|
||||
unmatched: list[dict[str, Any]] = []
|
||||
|
||||
for i, comment in enumerate(comments, 1):
|
||||
ref = f"[^c{i}]"
|
||||
anchor = comment.get("anchor_text", "")
|
||||
|
||||
if anchor and anchor in markdown:
|
||||
markdown = markdown.replace(anchor, anchor + ref, 1)
|
||||
footnotes.append(_format_footnote(i, comment))
|
||||
else:
|
||||
unmatched.append(comment)
|
||||
|
||||
if footnotes:
|
||||
markdown = markdown.rstrip("\n") + "\n\n" + "\n".join(footnotes) + "\n"
|
||||
|
||||
if unmatched:
|
||||
appendix = format_comments_appendix(unmatched)
|
||||
if appendix.strip():
|
||||
markdown = markdown.rstrip("\n") + "\n\n" + appendix
|
||||
|
||||
return markdown
|
||||
|
||||
|
||||
def _format_footnote(num: int, comment: dict[str, Any]) -> str:
|
||||
"""Format a single footnote."""
|
||||
lines = [f"[^c{num}]: **{comment['author']}**: {comment['content']}"]
|
||||
for reply in comment.get("replies", []):
|
||||
lines.append(f" - **{reply['author']}**: {reply['content']}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_comments_appendix(comments: list[dict[str, Any]]) -> str:
|
||||
"""Format comments as an appendix section with blockquoted anchors."""
|
||||
if not comments:
|
||||
return ""
|
||||
|
||||
lines = ["## Comments", ""]
|
||||
for comment in comments:
|
||||
resolved_tag = " *(Resolved)*" if comment.get("resolved") else ""
|
||||
anchor = comment.get("anchor_text", "")
|
||||
if anchor:
|
||||
lines.append(f"> {anchor}")
|
||||
lines.append("")
|
||||
lines.append(f"- **{comment['author']}**: {comment['content']}{resolved_tag}")
|
||||
for reply in comment.get("replies", []):
|
||||
lines.append(f" - **{reply['author']}**: {reply['content']}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def parse_drive_comments(
|
||||
response: dict[str, Any], include_resolved: bool = False
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Parse Drive API comments response into structured dicts.
|
||||
|
||||
Args:
|
||||
response: Raw JSON from drive.comments.list()
|
||||
include_resolved: Whether to include resolved comments
|
||||
|
||||
Returns:
|
||||
List of comment dicts with keys: author, content, anchor_text,
|
||||
replies, resolved
|
||||
"""
|
||||
results = []
|
||||
for comment in response.get("comments", []):
|
||||
if not include_resolved and comment.get("resolved", False):
|
||||
continue
|
||||
|
||||
anchor_text = comment.get("quotedFileContent", {}).get("value", "")
|
||||
replies = [
|
||||
{
|
||||
"author": r.get("author", {}).get("displayName", "Unknown"),
|
||||
"content": r.get("content", ""),
|
||||
}
|
||||
for r in comment.get("replies", [])
|
||||
]
|
||||
results.append(
|
||||
{
|
||||
"author": comment.get("author", {}).get("displayName", "Unknown"),
|
||||
"content": comment.get("content", ""),
|
||||
"anchor_text": anchor_text,
|
||||
"replies": replies,
|
||||
"resolved": comment.get("resolved", False),
|
||||
}
|
||||
)
|
||||
return results
|
||||
357
gdocs/docs_structure.py
Normal file
357
gdocs/docs_structure.py
Normal file
@@ -0,0 +1,357 @@
|
||||
"""
|
||||
Google Docs Document Structure Parsing and Analysis
|
||||
|
||||
This module provides utilities for parsing and analyzing the structure
|
||||
of Google Docs documents, including finding tables, cells, and other elements.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_document_structure(doc_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Parse the full document structure into a navigable format.
|
||||
|
||||
Args:
|
||||
doc_data: Raw document data from Google Docs API
|
||||
|
||||
Returns:
|
||||
Dictionary containing parsed structure with elements and their positions
|
||||
"""
|
||||
structure = {
|
||||
"title": doc_data.get("title", ""),
|
||||
"body": [],
|
||||
"tables": [],
|
||||
"headers": {},
|
||||
"footers": {},
|
||||
"total_length": 0,
|
||||
}
|
||||
|
||||
body = doc_data.get("body", {})
|
||||
content = body.get("content", [])
|
||||
|
||||
for element in content:
|
||||
element_info = _parse_element(element)
|
||||
if element_info:
|
||||
structure["body"].append(element_info)
|
||||
if element_info["type"] == "table":
|
||||
structure["tables"].append(element_info)
|
||||
|
||||
# Calculate total document length
|
||||
if structure["body"]:
|
||||
last_element = structure["body"][-1]
|
||||
structure["total_length"] = last_element.get("end_index", 0)
|
||||
|
||||
# Parse headers and footers
|
||||
for header_id, header_data in doc_data.get("headers", {}).items():
|
||||
structure["headers"][header_id] = _parse_segment(header_data)
|
||||
|
||||
for footer_id, footer_data in doc_data.get("footers", {}).items():
|
||||
structure["footers"][footer_id] = _parse_segment(footer_data)
|
||||
|
||||
return structure
|
||||
|
||||
|
||||
def _parse_element(element: dict[str, Any]) -> Optional[dict[str, Any]]:
|
||||
"""
|
||||
Parse a single document element.
|
||||
|
||||
Args:
|
||||
element: Element data from document
|
||||
|
||||
Returns:
|
||||
Parsed element information or None
|
||||
"""
|
||||
element_info = {
|
||||
"start_index": element.get("startIndex", 0),
|
||||
"end_index": element.get("endIndex", 0),
|
||||
}
|
||||
|
||||
if "paragraph" in element:
|
||||
paragraph = element["paragraph"]
|
||||
element_info["type"] = "paragraph"
|
||||
element_info["text"] = _extract_paragraph_text(paragraph)
|
||||
element_info["style"] = paragraph.get("paragraphStyle", {})
|
||||
|
||||
elif "table" in element:
|
||||
table = element["table"]
|
||||
element_info["type"] = "table"
|
||||
element_info["rows"] = len(table.get("tableRows", []))
|
||||
element_info["columns"] = len(
|
||||
table.get("tableRows", [{}])[0].get("tableCells", [])
|
||||
)
|
||||
element_info["cells"] = _parse_table_cells(table)
|
||||
element_info["table_style"] = table.get("tableStyle", {})
|
||||
|
||||
elif "sectionBreak" in element:
|
||||
element_info["type"] = "section_break"
|
||||
element_info["section_style"] = element["sectionBreak"].get("sectionStyle", {})
|
||||
|
||||
elif "tableOfContents" in element:
|
||||
element_info["type"] = "table_of_contents"
|
||||
|
||||
else:
|
||||
return None
|
||||
|
||||
return element_info
|
||||
|
||||
|
||||
def _parse_table_cells(table: dict[str, Any]) -> list[list[dict[str, Any]]]:
|
||||
"""
|
||||
Parse table cells with their positions and content.
|
||||
|
||||
Args:
|
||||
table: Table element data
|
||||
|
||||
Returns:
|
||||
2D list of cell information
|
||||
"""
|
||||
cells = []
|
||||
for row_idx, row in enumerate(table.get("tableRows", [])):
|
||||
row_cells = []
|
||||
for col_idx, cell in enumerate(row.get("tableCells", [])):
|
||||
# Find the first paragraph in the cell for insertion
|
||||
insertion_index = cell.get("startIndex", 0) + 1 # Default fallback
|
||||
|
||||
# Look for the first paragraph in cell content
|
||||
content_elements = cell.get("content", [])
|
||||
for element in content_elements:
|
||||
if "paragraph" in element:
|
||||
paragraph = element["paragraph"]
|
||||
# Get the first element in the paragraph
|
||||
para_elements = paragraph.get("elements", [])
|
||||
if para_elements:
|
||||
first_element = para_elements[0]
|
||||
if "startIndex" in first_element:
|
||||
insertion_index = first_element["startIndex"]
|
||||
break
|
||||
|
||||
cell_info = {
|
||||
"row": row_idx,
|
||||
"column": col_idx,
|
||||
"start_index": cell.get("startIndex", 0),
|
||||
"end_index": cell.get("endIndex", 0),
|
||||
"insertion_index": insertion_index, # Where to insert text in this cell
|
||||
"content": _extract_cell_text(cell),
|
||||
"content_elements": content_elements,
|
||||
}
|
||||
row_cells.append(cell_info)
|
||||
cells.append(row_cells)
|
||||
return cells
|
||||
|
||||
|
||||
def _extract_paragraph_text(paragraph: dict[str, Any]) -> str:
|
||||
"""Extract text from a paragraph element."""
|
||||
text_parts = []
|
||||
for element in paragraph.get("elements", []):
|
||||
if "textRun" in element:
|
||||
text_parts.append(element["textRun"].get("content", ""))
|
||||
return "".join(text_parts)
|
||||
|
||||
|
||||
def _extract_cell_text(cell: dict[str, Any]) -> str:
|
||||
"""Extract text content from a table cell."""
|
||||
text_parts = []
|
||||
for element in cell.get("content", []):
|
||||
if "paragraph" in element:
|
||||
text_parts.append(_extract_paragraph_text(element["paragraph"]))
|
||||
return "".join(text_parts)
|
||||
|
||||
|
||||
def _parse_segment(segment_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Parse a document segment (header/footer)."""
|
||||
return {
|
||||
"content": segment_data.get("content", []),
|
||||
"start_index": segment_data.get("content", [{}])[0].get("startIndex", 0)
|
||||
if segment_data.get("content")
|
||||
else 0,
|
||||
"end_index": segment_data.get("content", [{}])[-1].get("endIndex", 0)
|
||||
if segment_data.get("content")
|
||||
else 0,
|
||||
}
|
||||
|
||||
|
||||
def find_tables(doc_data: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Find all tables in the document with their positions and dimensions.
|
||||
|
||||
Args:
|
||||
doc_data: Raw document data from Google Docs API
|
||||
|
||||
Returns:
|
||||
List of table information dictionaries
|
||||
"""
|
||||
tables = []
|
||||
structure = parse_document_structure(doc_data)
|
||||
|
||||
for idx, table_info in enumerate(structure["tables"]):
|
||||
tables.append(
|
||||
{
|
||||
"index": idx,
|
||||
"start_index": table_info["start_index"],
|
||||
"end_index": table_info["end_index"],
|
||||
"rows": table_info["rows"],
|
||||
"columns": table_info["columns"],
|
||||
"cells": table_info["cells"],
|
||||
}
|
||||
)
|
||||
|
||||
return tables
|
||||
|
||||
|
||||
def get_table_cell_indices(
|
||||
doc_data: dict[str, Any], table_index: int = 0
|
||||
) -> Optional[list[list[tuple[int, int]]]]:
|
||||
"""
|
||||
Get content indices for all cells in a specific table.
|
||||
|
||||
Args:
|
||||
doc_data: Raw document data from Google Docs API
|
||||
table_index: Index of the table (0-based)
|
||||
|
||||
Returns:
|
||||
2D list of (start_index, end_index) tuples for each cell, or None if table not found
|
||||
"""
|
||||
tables = find_tables(doc_data)
|
||||
|
||||
if table_index >= len(tables):
|
||||
logger.warning(
|
||||
f"Table index {table_index} not found. Document has {len(tables)} tables."
|
||||
)
|
||||
return None
|
||||
|
||||
table = tables[table_index]
|
||||
cell_indices = []
|
||||
|
||||
for row in table["cells"]:
|
||||
row_indices = []
|
||||
for cell in row:
|
||||
# Each cell contains at least one paragraph
|
||||
# Find the first paragraph in the cell for content insertion
|
||||
cell_content = cell.get("content_elements", [])
|
||||
if cell_content:
|
||||
# Look for the first paragraph in cell content
|
||||
first_para = None
|
||||
for element in cell_content:
|
||||
if "paragraph" in element:
|
||||
first_para = element["paragraph"]
|
||||
break
|
||||
|
||||
if first_para and "elements" in first_para and first_para["elements"]:
|
||||
# Insert at the start of the first text run in the paragraph
|
||||
first_text_element = first_para["elements"][0]
|
||||
if "textRun" in first_text_element:
|
||||
start_idx = first_text_element.get(
|
||||
"startIndex", cell["start_index"] + 1
|
||||
)
|
||||
end_idx = first_text_element.get("endIndex", start_idx + 1)
|
||||
row_indices.append((start_idx, end_idx))
|
||||
continue
|
||||
|
||||
# Fallback: use cell boundaries with safe margins
|
||||
content_start = cell["start_index"] + 1
|
||||
content_end = cell["end_index"] - 1
|
||||
row_indices.append((content_start, content_end))
|
||||
cell_indices.append(row_indices)
|
||||
|
||||
return cell_indices
|
||||
|
||||
|
||||
def find_element_at_index(
|
||||
doc_data: dict[str, Any], index: int
|
||||
) -> Optional[dict[str, Any]]:
|
||||
"""
|
||||
Find what element exists at a given index in the document.
|
||||
|
||||
Args:
|
||||
doc_data: Raw document data from Google Docs API
|
||||
index: Position in the document
|
||||
|
||||
Returns:
|
||||
Information about the element at that position, or None
|
||||
"""
|
||||
structure = parse_document_structure(doc_data)
|
||||
|
||||
for element in structure["body"]:
|
||||
if element["start_index"] <= index < element["end_index"]:
|
||||
element_copy = element.copy()
|
||||
|
||||
# If it's a table, find which cell contains the index
|
||||
if element["type"] == "table" and "cells" in element:
|
||||
for row_idx, row in enumerate(element["cells"]):
|
||||
for col_idx, cell in enumerate(row):
|
||||
if cell["start_index"] <= index < cell["end_index"]:
|
||||
element_copy["containing_cell"] = {
|
||||
"row": row_idx,
|
||||
"column": col_idx,
|
||||
"cell_start": cell["start_index"],
|
||||
"cell_end": cell["end_index"],
|
||||
}
|
||||
break
|
||||
|
||||
return element_copy
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_next_paragraph_index(doc_data: dict[str, Any], after_index: int = 0) -> int:
|
||||
"""
|
||||
Find the next safe position to insert content after a given index.
|
||||
|
||||
Args:
|
||||
doc_data: Raw document data from Google Docs API
|
||||
after_index: Index after which to find insertion point
|
||||
|
||||
Returns:
|
||||
Safe index for insertion
|
||||
"""
|
||||
structure = parse_document_structure(doc_data)
|
||||
|
||||
# Find the first paragraph element after the given index
|
||||
for element in structure["body"]:
|
||||
if element["type"] == "paragraph" and element["start_index"] > after_index:
|
||||
# Insert at the end of the previous element or start of this paragraph
|
||||
return element["start_index"]
|
||||
|
||||
# If no paragraph found, return the end of document
|
||||
return structure["total_length"] - 1 if structure["total_length"] > 0 else 1
|
||||
|
||||
|
||||
def analyze_document_complexity(doc_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Analyze document complexity and provide statistics.
|
||||
|
||||
Args:
|
||||
doc_data: Raw document data from Google Docs API
|
||||
|
||||
Returns:
|
||||
Dictionary with document statistics
|
||||
"""
|
||||
structure = parse_document_structure(doc_data)
|
||||
|
||||
stats = {
|
||||
"total_elements": len(structure["body"]),
|
||||
"tables": len(structure["tables"]),
|
||||
"paragraphs": sum(1 for e in structure["body"] if e.get("type") == "paragraph"),
|
||||
"section_breaks": sum(
|
||||
1 for e in structure["body"] if e.get("type") == "section_break"
|
||||
),
|
||||
"total_length": structure["total_length"],
|
||||
"has_headers": bool(structure["headers"]),
|
||||
"has_footers": bool(structure["footers"]),
|
||||
}
|
||||
|
||||
# Add table statistics
|
||||
if structure["tables"]:
|
||||
total_cells = sum(
|
||||
table["rows"] * table["columns"] for table in structure["tables"]
|
||||
)
|
||||
stats["total_table_cells"] = total_cells
|
||||
stats["largest_table"] = max(
|
||||
(t["rows"] * t["columns"] for t in structure["tables"]), default=0
|
||||
)
|
||||
|
||||
return stats
|
||||
464
gdocs/docs_tables.py
Normal file
464
gdocs/docs_tables.py
Normal file
@@ -0,0 +1,464 @@
|
||||
"""
|
||||
Google Docs Table Operations
|
||||
|
||||
This module provides utilities for creating and manipulating tables
|
||||
in Google Docs, including population with data and formatting.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional, Union, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def build_table_population_requests(
|
||||
table_info: Dict[str, Any], data: List[List[str]], bold_headers: bool = True
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Build batch requests to populate a table with data.
|
||||
|
||||
Args:
|
||||
table_info: Table information from document structure including cell indices
|
||||
data: 2D array of data to insert into table
|
||||
bold_headers: Whether to make the first row bold
|
||||
|
||||
Returns:
|
||||
List of request dictionaries for batch update
|
||||
"""
|
||||
requests = []
|
||||
cells = table_info.get("cells", [])
|
||||
|
||||
if not cells:
|
||||
logger.warning("No cell information found in table_info")
|
||||
return requests
|
||||
|
||||
# Process each cell - ONLY INSERT, DON'T DELETE
|
||||
for row_idx, row_data in enumerate(data):
|
||||
if row_idx >= len(cells):
|
||||
logger.warning(
|
||||
f"Data has more rows ({len(data)}) than table ({len(cells)})"
|
||||
)
|
||||
break
|
||||
|
||||
for col_idx, cell_text in enumerate(row_data):
|
||||
if col_idx >= len(cells[row_idx]):
|
||||
logger.warning(
|
||||
f"Data has more columns ({len(row_data)}) than table row {row_idx} ({len(cells[row_idx])})"
|
||||
)
|
||||
break
|
||||
|
||||
cell = cells[row_idx][col_idx]
|
||||
|
||||
# For new empty tables, use the insertion index
|
||||
# For tables with existing content, check if cell only contains newline
|
||||
existing_content = cell.get("content", "").strip()
|
||||
|
||||
# Only insert if we have text to insert
|
||||
if cell_text:
|
||||
# Use the specific insertion index for this cell
|
||||
insertion_index = cell.get("insertion_index", cell["start_index"] + 1)
|
||||
|
||||
# If cell only contains a newline, replace it
|
||||
if existing_content == "" or existing_content == "\n":
|
||||
# Cell is empty (just newline), insert at the insertion index
|
||||
requests.append(
|
||||
{
|
||||
"insertText": {
|
||||
"location": {"index": insertion_index},
|
||||
"text": cell_text,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Apply bold formatting to first row if requested
|
||||
if bold_headers and row_idx == 0:
|
||||
requests.append(
|
||||
{
|
||||
"updateTextStyle": {
|
||||
"range": {
|
||||
"startIndex": insertion_index,
|
||||
"endIndex": insertion_index + len(cell_text),
|
||||
},
|
||||
"textStyle": {"bold": True},
|
||||
"fields": "bold",
|
||||
}
|
||||
}
|
||||
)
|
||||
else:
|
||||
# Cell has content, append after existing content
|
||||
# Find the end of existing content
|
||||
cell_end = cell["end_index"] - 1 # Don't include cell end marker
|
||||
requests.append(
|
||||
{
|
||||
"insertText": {
|
||||
"location": {"index": cell_end},
|
||||
"text": cell_text,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Apply bold formatting to first row if requested
|
||||
if bold_headers and row_idx == 0:
|
||||
requests.append(
|
||||
{
|
||||
"updateTextStyle": {
|
||||
"range": {
|
||||
"startIndex": cell_end,
|
||||
"endIndex": cell_end + len(cell_text),
|
||||
},
|
||||
"textStyle": {"bold": True},
|
||||
"fields": "bold",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return requests
|
||||
|
||||
|
||||
def calculate_cell_positions(
|
||||
table_start_index: int,
|
||||
rows: int,
|
||||
cols: int,
|
||||
existing_table_data: Optional[Dict[str, Any]] = None,
|
||||
) -> List[List[Dict[str, int]]]:
|
||||
"""
|
||||
Calculate estimated positions for each cell in a table.
|
||||
|
||||
Args:
|
||||
table_start_index: Starting index of the table
|
||||
rows: Number of rows
|
||||
cols: Number of columns
|
||||
existing_table_data: Optional existing table data with actual positions
|
||||
|
||||
Returns:
|
||||
2D list of cell position dictionaries
|
||||
"""
|
||||
if existing_table_data and "cells" in existing_table_data:
|
||||
# Use actual positions from existing table
|
||||
return existing_table_data["cells"]
|
||||
|
||||
# Estimate positions for a new table
|
||||
# Note: These are estimates; actual positions depend on content
|
||||
cells = []
|
||||
current_index = table_start_index + 2 # Account for table start
|
||||
|
||||
for row_idx in range(rows):
|
||||
row_cells = []
|
||||
for col_idx in range(cols):
|
||||
# Each cell typically starts with a paragraph marker
|
||||
cell_start = current_index
|
||||
cell_end = current_index + 2 # Minimum cell size
|
||||
|
||||
row_cells.append(
|
||||
{
|
||||
"row": row_idx,
|
||||
"column": col_idx,
|
||||
"start_index": cell_start,
|
||||
"end_index": cell_end,
|
||||
}
|
||||
)
|
||||
|
||||
current_index = cell_end + 1
|
||||
|
||||
cells.append(row_cells)
|
||||
|
||||
return cells
|
||||
|
||||
|
||||
def format_table_data(
|
||||
raw_data: Union[List[List[str]], List[str], str],
|
||||
) -> List[List[str]]:
|
||||
"""
|
||||
Normalize various data formats into a 2D array for table insertion.
|
||||
|
||||
Args:
|
||||
raw_data: Data in various formats (2D list, 1D list, or delimited string)
|
||||
|
||||
Returns:
|
||||
Normalized 2D list of strings
|
||||
"""
|
||||
if isinstance(raw_data, str):
|
||||
# Parse delimited string (detect delimiter)
|
||||
lines = raw_data.strip().split("\n")
|
||||
if "\t" in raw_data:
|
||||
# Tab-delimited
|
||||
return [line.split("\t") for line in lines]
|
||||
elif "," in raw_data:
|
||||
# Comma-delimited (simple CSV)
|
||||
return [line.split(",") for line in lines]
|
||||
else:
|
||||
# Space-delimited or single column
|
||||
return [[cell.strip() for cell in line.split()] for line in lines]
|
||||
|
||||
elif isinstance(raw_data, list):
|
||||
if not raw_data:
|
||||
return [[]]
|
||||
|
||||
# Check if it's already a 2D list
|
||||
if isinstance(raw_data[0], list):
|
||||
# Ensure all cells are strings
|
||||
return [[str(cell) for cell in row] for row in raw_data]
|
||||
else:
|
||||
# Convert 1D list to single-column table
|
||||
return [[str(cell)] for cell in raw_data]
|
||||
|
||||
else:
|
||||
# Convert single value to 1x1 table
|
||||
return [[str(raw_data)]]
|
||||
|
||||
|
||||
def create_table_with_data(
|
||||
index: int,
|
||||
data: List[List[str]],
|
||||
headers: Optional[List[str]] = None,
|
||||
bold_headers: bool = True,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Create a table and populate it with data in one operation.
|
||||
|
||||
Args:
|
||||
index: Position to insert the table
|
||||
data: 2D array of table data
|
||||
headers: Optional header row (will be prepended to data)
|
||||
bold_headers: Whether to make headers bold
|
||||
|
||||
Returns:
|
||||
List of request dictionaries for batch update
|
||||
"""
|
||||
requests = []
|
||||
|
||||
# Prepare data with headers if provided
|
||||
if headers:
|
||||
full_data = [headers] + data
|
||||
else:
|
||||
full_data = data
|
||||
|
||||
# Normalize the data
|
||||
full_data = format_table_data(full_data)
|
||||
|
||||
if not full_data or not full_data[0]:
|
||||
raise ValueError("Cannot create table with empty data")
|
||||
|
||||
rows = len(full_data)
|
||||
cols = len(full_data[0])
|
||||
|
||||
# Ensure all rows have the same number of columns
|
||||
for row in full_data:
|
||||
while len(row) < cols:
|
||||
row.append("")
|
||||
|
||||
# Create the table
|
||||
requests.append(
|
||||
{"insertTable": {"location": {"index": index}, "rows": rows, "columns": cols}}
|
||||
)
|
||||
|
||||
# Build text insertion requests for each cell
|
||||
# Note: In practice, we'd need to get the actual document structure
|
||||
# after table creation to get accurate indices
|
||||
|
||||
return requests
|
||||
|
||||
|
||||
def build_table_style_requests(
|
||||
table_start_index: int, style_options: Dict[str, Any]
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Build requests to style a table.
|
||||
|
||||
Args:
|
||||
table_start_index: Starting index of the table
|
||||
style_options: Dictionary of style options
|
||||
- border_width: Width of borders in points
|
||||
- border_color: RGB color for borders
|
||||
- background_color: RGB color for cell backgrounds
|
||||
- header_background: RGB color for header row background
|
||||
|
||||
Returns:
|
||||
List of request dictionaries for styling
|
||||
"""
|
||||
requests = []
|
||||
|
||||
# Table cell style update
|
||||
if any(
|
||||
k in style_options for k in ["border_width", "border_color", "background_color"]
|
||||
):
|
||||
table_cell_style = {}
|
||||
fields = []
|
||||
|
||||
if "border_width" in style_options:
|
||||
border_width = {"magnitude": style_options["border_width"], "unit": "PT"}
|
||||
table_cell_style["borderTop"] = {"width": border_width}
|
||||
table_cell_style["borderBottom"] = {"width": border_width}
|
||||
table_cell_style["borderLeft"] = {"width": border_width}
|
||||
table_cell_style["borderRight"] = {"width": border_width}
|
||||
fields.extend(["borderTop", "borderBottom", "borderLeft", "borderRight"])
|
||||
|
||||
if "border_color" in style_options:
|
||||
border_color = {"color": {"rgbColor": style_options["border_color"]}}
|
||||
if "borderTop" in table_cell_style:
|
||||
table_cell_style["borderTop"]["color"] = border_color["color"]
|
||||
table_cell_style["borderBottom"]["color"] = border_color["color"]
|
||||
table_cell_style["borderLeft"]["color"] = border_color["color"]
|
||||
table_cell_style["borderRight"]["color"] = border_color["color"]
|
||||
|
||||
if "background_color" in style_options:
|
||||
table_cell_style["backgroundColor"] = {
|
||||
"color": {"rgbColor": style_options["background_color"]}
|
||||
}
|
||||
fields.append("backgroundColor")
|
||||
|
||||
if table_cell_style and fields:
|
||||
requests.append(
|
||||
{
|
||||
"updateTableCellStyle": {
|
||||
"tableStartLocation": {"index": table_start_index},
|
||||
"tableCellStyle": table_cell_style,
|
||||
"fields": ",".join(fields),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Header row specific styling
|
||||
if "header_background" in style_options:
|
||||
requests.append(
|
||||
{
|
||||
"updateTableCellStyle": {
|
||||
"tableRange": {
|
||||
"tableCellLocation": {
|
||||
"tableStartLocation": {"index": table_start_index},
|
||||
"rowIndex": 0,
|
||||
"columnIndex": 0,
|
||||
},
|
||||
"rowSpan": 1,
|
||||
"columnSpan": 100, # Large number to cover all columns
|
||||
},
|
||||
"tableCellStyle": {
|
||||
"backgroundColor": {
|
||||
"color": {"rgbColor": style_options["header_background"]}
|
||||
}
|
||||
},
|
||||
"fields": "backgroundColor",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return requests
|
||||
|
||||
|
||||
def extract_table_as_data(table_info: Dict[str, Any]) -> List[List[str]]:
|
||||
"""
|
||||
Extract table content as a 2D array of strings.
|
||||
|
||||
Args:
|
||||
table_info: Table information from document structure
|
||||
|
||||
Returns:
|
||||
2D list of cell contents
|
||||
"""
|
||||
data = []
|
||||
cells = table_info.get("cells", [])
|
||||
|
||||
for row in cells:
|
||||
row_data = []
|
||||
for cell in row:
|
||||
row_data.append(cell.get("content", "").strip())
|
||||
data.append(row_data)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def find_table_by_content(
|
||||
tables: List[Dict[str, Any]], search_text: str, case_sensitive: bool = False
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Find a table index by searching for content within it.
|
||||
|
||||
Args:
|
||||
tables: List of table information from document
|
||||
search_text: Text to search for in table cells
|
||||
case_sensitive: Whether to do case-sensitive search
|
||||
|
||||
Returns:
|
||||
Index of the first matching table, or None
|
||||
"""
|
||||
search_text = search_text if case_sensitive else search_text.lower()
|
||||
|
||||
for idx, table in enumerate(tables):
|
||||
for row in table.get("cells", []):
|
||||
for cell in row:
|
||||
cell_content = cell.get("content", "")
|
||||
if not case_sensitive:
|
||||
cell_content = cell_content.lower()
|
||||
|
||||
if search_text in cell_content:
|
||||
return idx
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def validate_table_data(data: List[List[str]]) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validates table data format and provides specific error messages for LLMs.
|
||||
|
||||
WHAT THIS CHECKS:
|
||||
- Data is a 2D list (list of lists)
|
||||
- All rows have consistent column counts
|
||||
- Dimensions are within Google Docs limits
|
||||
- No None or undefined values
|
||||
|
||||
VALID FORMAT EXAMPLE:
|
||||
[
|
||||
["Header1", "Header2"], # Row 0 - 2 columns
|
||||
["Data1", "Data2"], # Row 1 - 2 columns
|
||||
["Data3", "Data4"] # Row 2 - 2 columns
|
||||
]
|
||||
|
||||
INVALID FORMATS:
|
||||
- [["col1"], ["col1", "col2"]] # Inconsistent column counts
|
||||
- ["col1", "col2"] # Not 2D (missing inner lists)
|
||||
- [["col1", None]] # Contains None values
|
||||
- [] or [[]] # Empty data
|
||||
|
||||
Args:
|
||||
data: 2D array of data to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message_with_examples)
|
||||
"""
|
||||
if not data:
|
||||
return (
|
||||
False,
|
||||
"Data is empty. Use format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
|
||||
)
|
||||
|
||||
if not isinstance(data, list):
|
||||
return (
|
||||
False,
|
||||
f"Data must be a list, got {type(data).__name__}. Use format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
|
||||
)
|
||||
|
||||
if not all(isinstance(row, list) for row in data):
|
||||
return (
|
||||
False,
|
||||
f"Data must be a 2D list (list of lists). Each row must be a list. Check your format: {data}",
|
||||
)
|
||||
|
||||
# Check for consistent column count
|
||||
col_counts = [len(row) for row in data]
|
||||
if len(set(col_counts)) > 1:
|
||||
return (
|
||||
False,
|
||||
f"All rows must have same number of columns. Found: {col_counts}. Fix your data format.",
|
||||
)
|
||||
|
||||
# Check for reasonable size
|
||||
rows = len(data)
|
||||
cols = col_counts[0] if col_counts else 0
|
||||
|
||||
if rows > 1000:
|
||||
return False, f"Too many rows ({rows}). Google Docs limit is 1000 rows."
|
||||
|
||||
if cols > 20:
|
||||
return False, f"Too many columns ({cols}). Google Docs limit is 20 columns."
|
||||
|
||||
return True, f"Valid table data: {rows}x{cols} table format"
|
||||
1918
gdocs/docs_tools.py
Normal file
1918
gdocs/docs_tools.py
Normal file
File diff suppressed because it is too large
Load Diff
18
gdocs/managers/__init__.py
Normal file
18
gdocs/managers/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
||||
"""
|
||||
Google Docs Operation Managers
|
||||
|
||||
This package provides high-level manager classes for complex Google Docs operations,
|
||||
extracting business logic from the main tools module to improve maintainability.
|
||||
"""
|
||||
|
||||
from .table_operation_manager import TableOperationManager
|
||||
from .header_footer_manager import HeaderFooterManager
|
||||
from .validation_manager import ValidationManager
|
||||
from .batch_operation_manager import BatchOperationManager
|
||||
|
||||
__all__ = [
|
||||
"TableOperationManager",
|
||||
"HeaderFooterManager",
|
||||
"ValidationManager",
|
||||
"BatchOperationManager",
|
||||
]
|
||||
534
gdocs/managers/batch_operation_manager.py
Normal file
534
gdocs/managers/batch_operation_manager.py
Normal file
@@ -0,0 +1,534 @@
|
||||
"""
|
||||
Batch Operation Manager
|
||||
|
||||
This module provides high-level batch operation management for Google Docs,
|
||||
extracting complex validation and request building logic.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Any, Union, Dict, List, Tuple
|
||||
|
||||
from gdocs.docs_helpers import (
|
||||
create_insert_text_request,
|
||||
create_delete_range_request,
|
||||
create_format_text_request,
|
||||
create_update_paragraph_style_request,
|
||||
create_find_replace_request,
|
||||
create_insert_table_request,
|
||||
create_insert_page_break_request,
|
||||
create_bullet_list_request,
|
||||
create_delete_bullet_list_request,
|
||||
create_insert_doc_tab_request,
|
||||
create_delete_doc_tab_request,
|
||||
create_update_doc_tab_request,
|
||||
validate_operation,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BatchOperationManager:
|
||||
"""
|
||||
High-level manager for Google Docs batch operations.
|
||||
|
||||
Handles complex multi-operation requests including:
|
||||
- Operation validation and request building
|
||||
- Batch execution with proper error handling
|
||||
- Operation result processing and reporting
|
||||
"""
|
||||
|
||||
def __init__(self, service):
|
||||
"""
|
||||
Initialize the batch operation manager.
|
||||
|
||||
Args:
|
||||
service: Google Docs API service instance
|
||||
"""
|
||||
self.service = service
|
||||
|
||||
async def execute_batch_operations(
|
||||
self, document_id: str, operations: list[dict[str, Any]]
|
||||
) -> tuple[bool, str, dict[str, Any]]:
|
||||
"""
|
||||
Execute multiple document operations in a single atomic batch.
|
||||
|
||||
This method extracts the complex logic from batch_update_doc tool function.
|
||||
|
||||
Args:
|
||||
document_id: ID of the document to update
|
||||
operations: List of operation dictionaries
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message, metadata)
|
||||
"""
|
||||
logger.info(f"Executing batch operations on document {document_id}")
|
||||
logger.info(f"Operations count: {len(operations)}")
|
||||
|
||||
if not operations:
|
||||
return (
|
||||
False,
|
||||
"No operations provided. Please provide at least one operation.",
|
||||
{},
|
||||
)
|
||||
|
||||
try:
|
||||
# Validate and build requests
|
||||
requests, operation_descriptions = await self._validate_and_build_requests(
|
||||
operations
|
||||
)
|
||||
|
||||
if not requests:
|
||||
return False, "No valid requests could be built from operations", {}
|
||||
|
||||
# Execute the batch
|
||||
result = await self._execute_batch_requests(document_id, requests)
|
||||
|
||||
# Process results
|
||||
metadata = {
|
||||
"operations_count": len(operations),
|
||||
"requests_count": len(requests),
|
||||
"replies_count": len(result.get("replies", [])),
|
||||
"operation_summary": operation_descriptions[:5], # First 5 operations
|
||||
}
|
||||
|
||||
# Extract new tab IDs from insert_doc_tab replies
|
||||
created_tabs = self._extract_created_tabs(result)
|
||||
if created_tabs:
|
||||
metadata["created_tabs"] = created_tabs
|
||||
|
||||
summary = self._build_operation_summary(operation_descriptions)
|
||||
msg = f"Successfully executed {len(operations)} operations ({summary})"
|
||||
if created_tabs:
|
||||
tab_info = ", ".join(
|
||||
f"'{t['title']}' (tab_id: {t['tab_id']})" for t in created_tabs
|
||||
)
|
||||
msg += f". Created tabs: {tab_info}"
|
||||
|
||||
return True, msg, metadata
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to execute batch operations: {str(e)}")
|
||||
return False, f"Batch operation failed: {str(e)}", {}
|
||||
|
||||
async def _validate_and_build_requests(
|
||||
self, operations: list[dict[str, Any]]
|
||||
) -> tuple[list[dict[str, Any]], list[str]]:
|
||||
"""
|
||||
Validate operations and build API requests.
|
||||
|
||||
Args:
|
||||
operations: List of operation dictionaries
|
||||
|
||||
Returns:
|
||||
Tuple of (requests, operation_descriptions)
|
||||
"""
|
||||
requests = []
|
||||
operation_descriptions = []
|
||||
|
||||
for i, op in enumerate(operations):
|
||||
# Validate operation structure
|
||||
is_valid, error_msg = validate_operation(op)
|
||||
if not is_valid:
|
||||
raise ValueError(f"Operation {i + 1}: {error_msg}")
|
||||
|
||||
op_type = op.get("type")
|
||||
|
||||
try:
|
||||
# Build request based on operation type
|
||||
result = self._build_operation_request(op, op_type)
|
||||
|
||||
# Handle both single request and list of requests
|
||||
if isinstance(result[0], list):
|
||||
# Multiple requests (e.g., replace_text)
|
||||
for req in result[0]:
|
||||
requests.append(req)
|
||||
operation_descriptions.append(result[1])
|
||||
elif result[0]:
|
||||
# Single request
|
||||
requests.append(result[0])
|
||||
operation_descriptions.append(result[1])
|
||||
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
f"Operation {i + 1} ({op_type}) missing required field: {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(
|
||||
f"Operation {i + 1} ({op_type}) failed validation: {str(e)}"
|
||||
)
|
||||
|
||||
return requests, operation_descriptions
|
||||
|
||||
def _build_operation_request(
|
||||
self, op: dict[str, Any], op_type: str
|
||||
) -> Tuple[Union[Dict[str, Any], List[Dict[str, Any]]], str]:
|
||||
"""
|
||||
Build a single operation request.
|
||||
|
||||
Args:
|
||||
op: Operation dictionary
|
||||
op_type: Operation type
|
||||
|
||||
Returns:
|
||||
Tuple of (request, description)
|
||||
"""
|
||||
tab_id = op.get("tab_id")
|
||||
|
||||
if op_type == "insert_text":
|
||||
request = create_insert_text_request(op["index"], op["text"], tab_id)
|
||||
description = f"insert text at {op['index']}"
|
||||
|
||||
elif op_type == "delete_text":
|
||||
request = create_delete_range_request(
|
||||
op["start_index"], op["end_index"], tab_id
|
||||
)
|
||||
description = f"delete text {op['start_index']}-{op['end_index']}"
|
||||
|
||||
elif op_type == "replace_text":
|
||||
# Replace is delete + insert (must be done in this order)
|
||||
delete_request = create_delete_range_request(
|
||||
op["start_index"], op["end_index"], tab_id
|
||||
)
|
||||
insert_request = create_insert_text_request(
|
||||
op["start_index"], op["text"], tab_id
|
||||
)
|
||||
# Return both requests as a list
|
||||
request = [delete_request, insert_request]
|
||||
description = f"replace text {op['start_index']}-{op['end_index']} with '{op['text'][:20]}{'...' if len(op['text']) > 20 else ''}'"
|
||||
|
||||
elif op_type == "format_text":
|
||||
request = create_format_text_request(
|
||||
op["start_index"],
|
||||
op["end_index"],
|
||||
op.get("bold"),
|
||||
op.get("italic"),
|
||||
op.get("underline"),
|
||||
op.get("font_size"),
|
||||
op.get("font_family"),
|
||||
op.get("text_color"),
|
||||
op.get("background_color"),
|
||||
op.get("link_url"),
|
||||
tab_id,
|
||||
)
|
||||
|
||||
if not request:
|
||||
raise ValueError("No formatting options provided")
|
||||
|
||||
# Build format description
|
||||
format_changes = []
|
||||
for param, name in [
|
||||
("bold", "bold"),
|
||||
("italic", "italic"),
|
||||
("underline", "underline"),
|
||||
("font_size", "font size"),
|
||||
("font_family", "font family"),
|
||||
("text_color", "text color"),
|
||||
("background_color", "background color"),
|
||||
("link_url", "link"),
|
||||
]:
|
||||
if op.get(param) is not None:
|
||||
value = f"{op[param]}pt" if param == "font_size" else op[param]
|
||||
format_changes.append(f"{name}: {value}")
|
||||
|
||||
description = f"format text {op['start_index']}-{op['end_index']} ({', '.join(format_changes)})"
|
||||
|
||||
elif op_type == "update_paragraph_style":
|
||||
request = create_update_paragraph_style_request(
|
||||
op["start_index"],
|
||||
op["end_index"],
|
||||
op.get("heading_level"),
|
||||
op.get("alignment"),
|
||||
op.get("line_spacing"),
|
||||
op.get("indent_first_line"),
|
||||
op.get("indent_start"),
|
||||
op.get("indent_end"),
|
||||
op.get("space_above"),
|
||||
op.get("space_below"),
|
||||
tab_id,
|
||||
op.get("named_style_type"),
|
||||
)
|
||||
|
||||
if not request:
|
||||
raise ValueError("No paragraph style options provided")
|
||||
|
||||
_PT_PARAMS = {
|
||||
"indent_first_line",
|
||||
"indent_start",
|
||||
"indent_end",
|
||||
"space_above",
|
||||
"space_below",
|
||||
}
|
||||
_SUFFIX = {
|
||||
"heading_level": lambda v: f"H{v}",
|
||||
"line_spacing": lambda v: f"{v}x",
|
||||
}
|
||||
|
||||
style_changes = []
|
||||
for param, name in [
|
||||
("heading_level", "heading"),
|
||||
("alignment", "alignment"),
|
||||
("line_spacing", "line spacing"),
|
||||
("indent_first_line", "first line indent"),
|
||||
("indent_start", "start indent"),
|
||||
("indent_end", "end indent"),
|
||||
("space_above", "space above"),
|
||||
("space_below", "space below"),
|
||||
]:
|
||||
if op.get(param) is not None:
|
||||
raw = op[param]
|
||||
fmt = _SUFFIX.get(param)
|
||||
if fmt:
|
||||
value = fmt(raw)
|
||||
elif param in _PT_PARAMS:
|
||||
value = f"{raw}pt"
|
||||
else:
|
||||
value = raw
|
||||
style_changes.append(f"{name}: {value}")
|
||||
|
||||
description = f"paragraph style {op['start_index']}-{op['end_index']} ({', '.join(style_changes)})"
|
||||
|
||||
elif op_type == "insert_table":
|
||||
request = create_insert_table_request(
|
||||
op["index"], op["rows"], op["columns"], tab_id
|
||||
)
|
||||
description = f"insert {op['rows']}x{op['columns']} table at {op['index']}"
|
||||
|
||||
elif op_type == "insert_page_break":
|
||||
request = create_insert_page_break_request(op["index"], tab_id)
|
||||
description = f"insert page break at {op['index']}"
|
||||
|
||||
elif op_type == "find_replace":
|
||||
request = create_find_replace_request(
|
||||
op["find_text"], op["replace_text"], op.get("match_case", False), tab_id
|
||||
)
|
||||
description = f"find/replace '{op['find_text']}' → '{op['replace_text']}'"
|
||||
|
||||
elif op_type == "create_bullet_list":
|
||||
list_type = op.get("list_type", "UNORDERED")
|
||||
if list_type not in ("UNORDERED", "ORDERED", "NONE"):
|
||||
raise ValueError(
|
||||
f"Invalid list_type '{list_type}'. Must be 'UNORDERED', 'ORDERED', or 'NONE'"
|
||||
)
|
||||
if list_type == "NONE":
|
||||
request = create_delete_bullet_list_request(
|
||||
op["start_index"], op["end_index"], tab_id
|
||||
)
|
||||
description = f"remove bullets {op['start_index']}-{op['end_index']}"
|
||||
else:
|
||||
request = create_bullet_list_request(
|
||||
op["start_index"],
|
||||
op["end_index"],
|
||||
list_type,
|
||||
op.get("nesting_level"),
|
||||
op.get("paragraph_start_indices"),
|
||||
tab_id,
|
||||
)
|
||||
style = "bulleted" if list_type == "UNORDERED" else "numbered"
|
||||
description = (
|
||||
f"create {style} list {op['start_index']}-{op['end_index']}"
|
||||
)
|
||||
if op.get("nesting_level"):
|
||||
description += f" (nesting level {op['nesting_level']})"
|
||||
|
||||
elif op_type == "insert_doc_tab":
|
||||
request = create_insert_doc_tab_request(
|
||||
op["title"], op["index"], op.get("parent_tab_id")
|
||||
)
|
||||
description = f"insert tab '{op['title']}' at {op['index']}"
|
||||
if op.get("parent_tab_id"):
|
||||
description += f" under parent tab {op['parent_tab_id']}"
|
||||
|
||||
elif op_type == "delete_doc_tab":
|
||||
request = create_delete_doc_tab_request(op["tab_id"])
|
||||
description = f"delete tab '{op['tab_id']}'"
|
||||
|
||||
elif op_type == "update_doc_tab":
|
||||
request = create_update_doc_tab_request(op["tab_id"], op["title"])
|
||||
description = f"rename tab '{op['tab_id']}' to '{op['title']}'"
|
||||
|
||||
else:
|
||||
supported_types = [
|
||||
"insert_text",
|
||||
"delete_text",
|
||||
"replace_text",
|
||||
"format_text",
|
||||
"update_paragraph_style",
|
||||
"insert_table",
|
||||
"insert_page_break",
|
||||
"find_replace",
|
||||
"create_bullet_list",
|
||||
"insert_doc_tab",
|
||||
"delete_doc_tab",
|
||||
"update_doc_tab",
|
||||
]
|
||||
raise ValueError(
|
||||
f"Unsupported operation type '{op_type}'. Supported: {', '.join(supported_types)}"
|
||||
)
|
||||
|
||||
return request, description
|
||||
|
||||
async def _execute_batch_requests(
|
||||
self, document_id: str, requests: list[dict[str, Any]]
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Execute the batch requests against the Google Docs API.
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
requests: List of API requests
|
||||
|
||||
Returns:
|
||||
API response
|
||||
"""
|
||||
return await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(documentId=document_id, body={"requests": requests})
|
||||
.execute
|
||||
)
|
||||
|
||||
def _extract_created_tabs(self, result: dict[str, Any]) -> list[dict[str, str]]:
|
||||
"""
|
||||
Extract tab IDs from insert_doc_tab replies in the batchUpdate response.
|
||||
|
||||
Args:
|
||||
result: The batchUpdate API response
|
||||
|
||||
Returns:
|
||||
List of dicts with tab_id and title for each created tab
|
||||
"""
|
||||
created_tabs = []
|
||||
for reply in result.get("replies", []):
|
||||
if "createDocumentTab" in reply:
|
||||
props = reply["createDocumentTab"].get("tabProperties", {})
|
||||
tab_id = props.get("tabId")
|
||||
title = props.get("title", "")
|
||||
if tab_id:
|
||||
created_tabs.append({"tab_id": tab_id, "title": title})
|
||||
return created_tabs
|
||||
|
||||
def _build_operation_summary(self, operation_descriptions: list[str]) -> str:
|
||||
"""
|
||||
Build a concise summary of operations performed.
|
||||
|
||||
Args:
|
||||
operation_descriptions: List of operation descriptions
|
||||
|
||||
Returns:
|
||||
Summary string
|
||||
"""
|
||||
if not operation_descriptions:
|
||||
return "no operations"
|
||||
|
||||
summary_items = operation_descriptions[:3] # Show first 3 operations
|
||||
summary = ", ".join(summary_items)
|
||||
|
||||
if len(operation_descriptions) > 3:
|
||||
remaining = len(operation_descriptions) - 3
|
||||
summary += f" and {remaining} more operation{'s' if remaining > 1 else ''}"
|
||||
|
||||
return summary
|
||||
|
||||
def get_supported_operations(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get information about supported batch operations.
|
||||
|
||||
Returns:
|
||||
Dictionary with supported operation types and their required parameters
|
||||
"""
|
||||
return {
|
||||
"supported_operations": {
|
||||
"insert_text": {
|
||||
"required": ["index", "text"],
|
||||
"description": "Insert text at specified index",
|
||||
},
|
||||
"delete_text": {
|
||||
"required": ["start_index", "end_index"],
|
||||
"description": "Delete text in specified range",
|
||||
},
|
||||
"replace_text": {
|
||||
"required": ["start_index", "end_index", "text"],
|
||||
"description": "Replace text in range with new text",
|
||||
},
|
||||
"format_text": {
|
||||
"required": ["start_index", "end_index"],
|
||||
"optional": [
|
||||
"bold",
|
||||
"italic",
|
||||
"underline",
|
||||
"font_size",
|
||||
"font_family",
|
||||
"text_color",
|
||||
"background_color",
|
||||
"link_url",
|
||||
],
|
||||
"description": "Apply formatting to text range",
|
||||
},
|
||||
"update_paragraph_style": {
|
||||
"required": ["start_index", "end_index"],
|
||||
"optional": [
|
||||
"heading_level",
|
||||
"alignment",
|
||||
"line_spacing",
|
||||
"indent_first_line",
|
||||
"indent_start",
|
||||
"indent_end",
|
||||
"space_above",
|
||||
"space_below",
|
||||
"named_style_type",
|
||||
],
|
||||
"description": "Apply paragraph-level styling (headings, alignment, spacing, indentation)",
|
||||
},
|
||||
"insert_table": {
|
||||
"required": ["index", "rows", "columns"],
|
||||
"description": "Insert table at specified index",
|
||||
},
|
||||
"insert_page_break": {
|
||||
"required": ["index"],
|
||||
"description": "Insert page break at specified index",
|
||||
},
|
||||
"find_replace": {
|
||||
"required": ["find_text", "replace_text"],
|
||||
"optional": ["match_case"],
|
||||
"description": "Find and replace text throughout document",
|
||||
},
|
||||
"create_bullet_list": {
|
||||
"required": ["start_index", "end_index"],
|
||||
"optional": [
|
||||
"list_type",
|
||||
"nesting_level",
|
||||
"paragraph_start_indices",
|
||||
],
|
||||
"description": "Apply or remove native bullet/numbered list formatting (list_type: UNORDERED, ORDERED, or NONE to remove; nesting_level: 0-8)",
|
||||
},
|
||||
"insert_doc_tab": {
|
||||
"required": ["title", "index"],
|
||||
"description": "Insert a new document tab with given title at specified index",
|
||||
},
|
||||
"delete_doc_tab": {
|
||||
"required": ["tab_id"],
|
||||
"description": "Delete a document tab by its ID",
|
||||
},
|
||||
"update_doc_tab": {
|
||||
"required": ["tab_id", "title"],
|
||||
"description": "Rename a document tab",
|
||||
},
|
||||
},
|
||||
"example_operations": [
|
||||
{"type": "insert_text", "index": 1, "text": "Hello World"},
|
||||
{
|
||||
"type": "format_text",
|
||||
"start_index": 1,
|
||||
"end_index": 12,
|
||||
"bold": True,
|
||||
},
|
||||
{"type": "insert_table", "index": 20, "rows": 2, "columns": 3},
|
||||
{
|
||||
"type": "update_paragraph_style",
|
||||
"start_index": 1,
|
||||
"end_index": 20,
|
||||
"heading_level": 1,
|
||||
"alignment": "CENTER",
|
||||
},
|
||||
],
|
||||
}
|
||||
339
gdocs/managers/header_footer_manager.py
Normal file
339
gdocs/managers/header_footer_manager.py
Normal file
@@ -0,0 +1,339 @@
|
||||
"""
|
||||
Header Footer Manager
|
||||
|
||||
This module provides high-level operations for managing headers and footers
|
||||
in Google Docs, extracting complex logic from the main tools module.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HeaderFooterManager:
|
||||
"""
|
||||
High-level manager for Google Docs header and footer operations.
|
||||
|
||||
Handles complex header/footer operations including:
|
||||
- Finding and updating existing headers/footers
|
||||
- Content replacement with proper range calculation
|
||||
- Section type management
|
||||
"""
|
||||
|
||||
def __init__(self, service):
|
||||
"""
|
||||
Initialize the header footer manager.
|
||||
|
||||
Args:
|
||||
service: Google Docs API service instance
|
||||
"""
|
||||
self.service = service
|
||||
|
||||
async def update_header_footer_content(
|
||||
self,
|
||||
document_id: str,
|
||||
section_type: str,
|
||||
content: str,
|
||||
header_footer_type: str = "DEFAULT",
|
||||
) -> tuple[bool, str]:
|
||||
"""
|
||||
Updates header or footer content in a document.
|
||||
|
||||
This method extracts the complex logic from update_doc_headers_footers tool function.
|
||||
|
||||
Args:
|
||||
document_id: ID of the document to update
|
||||
section_type: Type of section ("header" or "footer")
|
||||
content: New content for the section
|
||||
header_footer_type: Type of header/footer ("DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE")
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
logger.info(f"Updating {section_type} in document {document_id}")
|
||||
|
||||
# Validate section type
|
||||
if section_type not in ["header", "footer"]:
|
||||
return False, "section_type must be 'header' or 'footer'"
|
||||
|
||||
# Validate header/footer type
|
||||
if header_footer_type not in ["DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE"]:
|
||||
return (
|
||||
False,
|
||||
"header_footer_type must be 'DEFAULT', 'FIRST_PAGE_ONLY', or 'EVEN_PAGE'",
|
||||
)
|
||||
|
||||
try:
|
||||
# Get document structure
|
||||
doc = await self._get_document(document_id)
|
||||
|
||||
# Find the target section
|
||||
target_section, section_id = await self._find_target_section(
|
||||
doc, section_type, header_footer_type
|
||||
)
|
||||
|
||||
if not target_section:
|
||||
return (
|
||||
False,
|
||||
f"No {section_type} found in document. Please create a {section_type} first in Google Docs.",
|
||||
)
|
||||
|
||||
# Update the content
|
||||
success = await self._replace_section_content(
|
||||
document_id, target_section, content
|
||||
)
|
||||
|
||||
if success:
|
||||
return True, f"Updated {section_type} content in document {document_id}"
|
||||
else:
|
||||
return (
|
||||
False,
|
||||
f"Could not find content structure in {section_type} to update",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update {section_type}: {str(e)}")
|
||||
return False, f"Failed to update {section_type}: {str(e)}"
|
||||
|
||||
async def _get_document(self, document_id: str) -> dict[str, Any]:
|
||||
"""Get the full document data."""
|
||||
return await asyncio.to_thread(
|
||||
self.service.documents().get(documentId=document_id).execute
|
||||
)
|
||||
|
||||
async def _find_target_section(
|
||||
self, doc: dict[str, Any], section_type: str, header_footer_type: str
|
||||
) -> tuple[Optional[dict[str, Any]], Optional[str]]:
|
||||
"""
|
||||
Find the target header or footer section.
|
||||
|
||||
Args:
|
||||
doc: Document data
|
||||
section_type: "header" or "footer"
|
||||
header_footer_type: Type of header/footer
|
||||
|
||||
Returns:
|
||||
Tuple of (section_data, section_id) or (None, None) if not found
|
||||
"""
|
||||
if section_type == "header":
|
||||
sections = doc.get("headers", {})
|
||||
else:
|
||||
sections = doc.get("footers", {})
|
||||
|
||||
# Try to match section based on header_footer_type
|
||||
# Google Docs API typically uses section IDs that correspond to types
|
||||
|
||||
# First, try to find an exact match based on common patterns
|
||||
for section_id, section_data in sections.items():
|
||||
# Check if section_data contains type information
|
||||
if "type" in section_data and section_data["type"] == header_footer_type:
|
||||
return section_data, section_id
|
||||
|
||||
# If no exact match, try pattern matching on section ID
|
||||
# Google Docs often uses predictable section ID patterns
|
||||
target_patterns = {
|
||||
"DEFAULT": ["default", "kix"], # DEFAULT headers often have these patterns
|
||||
"FIRST_PAGE": ["first", "firstpage"],
|
||||
"EVEN_PAGE": ["even", "evenpage"],
|
||||
"FIRST_PAGE_ONLY": ["first", "firstpage"], # Legacy support
|
||||
}
|
||||
|
||||
patterns = target_patterns.get(header_footer_type, [])
|
||||
for pattern in patterns:
|
||||
for section_id, section_data in sections.items():
|
||||
if pattern.lower() in section_id.lower():
|
||||
return section_data, section_id
|
||||
|
||||
# If still no match, return the first available section as fallback
|
||||
# This maintains backward compatibility
|
||||
for section_id, section_data in sections.items():
|
||||
return section_data, section_id
|
||||
|
||||
return None, None
|
||||
|
||||
async def _replace_section_content(
|
||||
self, document_id: str, section: dict[str, Any], new_content: str
|
||||
) -> bool:
|
||||
"""
|
||||
Replace the content in a header or footer section.
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
section: Section data containing content elements
|
||||
new_content: New content to insert
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
content_elements = section.get("content", [])
|
||||
if not content_elements:
|
||||
return False
|
||||
|
||||
# Find the first paragraph to replace content
|
||||
first_para = self._find_first_paragraph(content_elements)
|
||||
if not first_para:
|
||||
return False
|
||||
|
||||
# Calculate content range
|
||||
start_index = first_para.get("startIndex", 0)
|
||||
end_index = first_para.get("endIndex", 0)
|
||||
|
||||
# Build requests to replace content
|
||||
requests = []
|
||||
|
||||
# Delete existing content if any (preserve paragraph structure)
|
||||
if end_index > start_index:
|
||||
requests.append(
|
||||
{
|
||||
"deleteContentRange": {
|
||||
"range": {
|
||||
"startIndex": start_index,
|
||||
"endIndex": end_index - 1, # Keep the paragraph end marker
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Insert new content
|
||||
requests.append(
|
||||
{"insertText": {"location": {"index": start_index}, "text": new_content}}
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(documentId=document_id, body={"requests": requests})
|
||||
.execute
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to replace section content: {str(e)}")
|
||||
return False
|
||||
|
||||
def _find_first_paragraph(
|
||||
self, content_elements: list[dict[str, Any]]
|
||||
) -> Optional[dict[str, Any]]:
|
||||
"""Find the first paragraph element in content."""
|
||||
for element in content_elements:
|
||||
if "paragraph" in element:
|
||||
return element
|
||||
return None
|
||||
|
||||
async def get_header_footer_info(self, document_id: str) -> dict[str, Any]:
|
||||
"""
|
||||
Get information about all headers and footers in the document.
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
|
||||
Returns:
|
||||
Dictionary with header and footer information
|
||||
"""
|
||||
try:
|
||||
doc = await self._get_document(document_id)
|
||||
|
||||
headers_info = {}
|
||||
for header_id, header_data in doc.get("headers", {}).items():
|
||||
headers_info[header_id] = self._extract_section_info(header_data)
|
||||
|
||||
footers_info = {}
|
||||
for footer_id, footer_data in doc.get("footers", {}).items():
|
||||
footers_info[footer_id] = self._extract_section_info(footer_data)
|
||||
|
||||
return {
|
||||
"headers": headers_info,
|
||||
"footers": footers_info,
|
||||
"has_headers": bool(headers_info),
|
||||
"has_footers": bool(footers_info),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get header/footer info: {str(e)}")
|
||||
return {"error": str(e)}
|
||||
|
||||
def _extract_section_info(self, section_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Extract useful information from a header/footer section."""
|
||||
content_elements = section_data.get("content", [])
|
||||
|
||||
# Extract text content
|
||||
text_content = ""
|
||||
for element in content_elements:
|
||||
if "paragraph" in element:
|
||||
para = element["paragraph"]
|
||||
for para_element in para.get("elements", []):
|
||||
if "textRun" in para_element:
|
||||
text_content += para_element["textRun"].get("content", "")
|
||||
|
||||
return {
|
||||
"content_preview": text_content[:100] if text_content else "(empty)",
|
||||
"element_count": len(content_elements),
|
||||
"start_index": content_elements[0].get("startIndex", 0)
|
||||
if content_elements
|
||||
else 0,
|
||||
"end_index": content_elements[-1].get("endIndex", 0)
|
||||
if content_elements
|
||||
else 0,
|
||||
}
|
||||
|
||||
async def create_header_footer(
|
||||
self, document_id: str, section_type: str, header_footer_type: str = "DEFAULT"
|
||||
) -> tuple[bool, str]:
|
||||
"""
|
||||
Create a new header or footer section.
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
section_type: "header" or "footer"
|
||||
header_footer_type: Type of header/footer ("DEFAULT", "FIRST_PAGE", or "EVEN_PAGE")
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
if section_type not in ["header", "footer"]:
|
||||
return False, "section_type must be 'header' or 'footer'"
|
||||
|
||||
# Map our type names to API type names
|
||||
type_mapping = {
|
||||
"DEFAULT": "DEFAULT",
|
||||
"FIRST_PAGE": "FIRST_PAGE",
|
||||
"EVEN_PAGE": "EVEN_PAGE",
|
||||
"FIRST_PAGE_ONLY": "FIRST_PAGE", # Support legacy name
|
||||
}
|
||||
|
||||
api_type = type_mapping.get(header_footer_type, header_footer_type)
|
||||
if api_type not in ["DEFAULT", "FIRST_PAGE", "EVEN_PAGE"]:
|
||||
return (
|
||||
False,
|
||||
"header_footer_type must be 'DEFAULT', 'FIRST_PAGE', or 'EVEN_PAGE'",
|
||||
)
|
||||
|
||||
try:
|
||||
# Build the request
|
||||
request = {"type": api_type}
|
||||
|
||||
# Create the appropriate request type
|
||||
if section_type == "header":
|
||||
batch_request = {"createHeader": request}
|
||||
else:
|
||||
batch_request = {"createFooter": request}
|
||||
|
||||
# Execute the request
|
||||
await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(documentId=document_id, body={"requests": [batch_request]})
|
||||
.execute
|
||||
)
|
||||
|
||||
return True, f"Successfully created {section_type} with type {api_type}"
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if "already exists" in error_msg.lower():
|
||||
return (
|
||||
False,
|
||||
f"A {section_type} of type {api_type} already exists in the document",
|
||||
)
|
||||
return False, f"Failed to create {section_type}: {error_msg}"
|
||||
405
gdocs/managers/table_operation_manager.py
Normal file
405
gdocs/managers/table_operation_manager.py
Normal file
@@ -0,0 +1,405 @@
|
||||
"""
|
||||
Table Operation Manager
|
||||
|
||||
This module provides high-level table operations that orchestrate
|
||||
multiple Google Docs API calls for complex table manipulations.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import List, Dict, Any, Tuple
|
||||
|
||||
from gdocs.docs_helpers import create_insert_table_request
|
||||
from gdocs.docs_structure import find_tables
|
||||
from gdocs.docs_tables import validate_table_data
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TableOperationManager:
|
||||
"""
|
||||
High-level manager for Google Docs table operations.
|
||||
|
||||
Handles complex multi-step table operations including:
|
||||
- Creating tables with data population
|
||||
- Populating existing tables
|
||||
- Managing cell-by-cell operations with proper index refreshing
|
||||
"""
|
||||
|
||||
def __init__(self, service):
|
||||
"""
|
||||
Initialize the table operation manager.
|
||||
|
||||
Args:
|
||||
service: Google Docs API service instance
|
||||
"""
|
||||
self.service = service
|
||||
|
||||
async def create_and_populate_table(
|
||||
self,
|
||||
document_id: str,
|
||||
table_data: List[List[str]],
|
||||
index: int,
|
||||
bold_headers: bool = True,
|
||||
tab_id: str = None,
|
||||
) -> Tuple[bool, str, Dict[str, Any]]:
|
||||
"""
|
||||
Creates a table and populates it with data in a reliable multi-step process.
|
||||
|
||||
This method extracts the complex logic from create_table_with_data tool function.
|
||||
|
||||
Args:
|
||||
document_id: ID of the document to update
|
||||
table_data: 2D list of strings for table content
|
||||
index: Position to insert the table
|
||||
bold_headers: Whether to make the first row bold
|
||||
tab_id: Optional tab ID for targeting a specific tab
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message, metadata)
|
||||
"""
|
||||
logger.debug(
|
||||
f"Creating table at index {index}, dimensions: {len(table_data)}x{len(table_data[0]) if table_data and len(table_data) > 0 else 0}"
|
||||
)
|
||||
|
||||
# Validate input data
|
||||
is_valid, error_msg = validate_table_data(table_data)
|
||||
if not is_valid:
|
||||
return False, f"Invalid table data: {error_msg}", {}
|
||||
|
||||
rows = len(table_data)
|
||||
cols = len(table_data[0])
|
||||
|
||||
try:
|
||||
# Step 1: Create empty table
|
||||
await self._create_empty_table(document_id, index, rows, cols, tab_id)
|
||||
|
||||
# Step 2: Get fresh document structure to find actual cell positions
|
||||
fresh_tables = await self._get_document_tables(document_id, tab_id)
|
||||
if not fresh_tables:
|
||||
return False, "Could not find table after creation", {}
|
||||
|
||||
# Step 3: Populate each cell with proper index refreshing
|
||||
population_count = await self._populate_table_cells(
|
||||
document_id, table_data, bold_headers, tab_id
|
||||
)
|
||||
|
||||
metadata = {
|
||||
"rows": rows,
|
||||
"columns": cols,
|
||||
"populated_cells": population_count,
|
||||
"table_index": len(fresh_tables) - 1,
|
||||
}
|
||||
|
||||
return (
|
||||
True,
|
||||
f"Successfully created {rows}x{cols} table and populated {population_count} cells",
|
||||
metadata,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create and populate table: {str(e)}")
|
||||
return False, f"Table creation failed: {str(e)}", {}
|
||||
|
||||
async def _create_empty_table(
|
||||
self, document_id: str, index: int, rows: int, cols: int, tab_id: str = None
|
||||
) -> None:
|
||||
"""Create an empty table at the specified index."""
|
||||
logger.debug(f"Creating {rows}x{cols} table at index {index}")
|
||||
|
||||
await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(
|
||||
documentId=document_id,
|
||||
body={
|
||||
"requests": [create_insert_table_request(index, rows, cols, tab_id)]
|
||||
},
|
||||
)
|
||||
.execute
|
||||
)
|
||||
|
||||
async def _get_document_tables(
|
||||
self, document_id: str, tab_id: str = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Get fresh document structure and extract table information."""
|
||||
doc = await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.get(documentId=document_id, includeTabsContent=True)
|
||||
.execute
|
||||
)
|
||||
|
||||
if tab_id:
|
||||
tab = self._find_tab(doc.get("tabs", []), tab_id)
|
||||
if tab and "documentTab" in tab:
|
||||
doc = doc.copy()
|
||||
doc["body"] = tab["documentTab"].get("body", {})
|
||||
|
||||
return find_tables(doc)
|
||||
|
||||
@staticmethod
|
||||
def _find_tab(tabs: list, target_id: str):
|
||||
"""Recursively find a tab by ID."""
|
||||
for tab in tabs:
|
||||
if tab.get("tabProperties", {}).get("tabId") == target_id:
|
||||
return tab
|
||||
if "childTabs" in tab:
|
||||
found = TableOperationManager._find_tab(tab["childTabs"], target_id)
|
||||
if found:
|
||||
return found
|
||||
return None
|
||||
|
||||
async def _populate_table_cells(
|
||||
self,
|
||||
document_id: str,
|
||||
table_data: List[List[str]],
|
||||
bold_headers: bool,
|
||||
tab_id: str = None,
|
||||
) -> int:
|
||||
"""
|
||||
Populate table cells with data, refreshing structure after each insertion.
|
||||
|
||||
This prevents index shifting issues by getting fresh cell positions
|
||||
before each insertion.
|
||||
"""
|
||||
population_count = 0
|
||||
|
||||
for row_idx, row_data in enumerate(table_data):
|
||||
logger.debug(f"Processing row {row_idx}: {len(row_data)} cells")
|
||||
|
||||
for col_idx, cell_text in enumerate(row_data):
|
||||
if not cell_text: # Skip empty cells
|
||||
continue
|
||||
|
||||
try:
|
||||
# CRITICAL: Refresh document structure before each insertion
|
||||
success = await self._populate_single_cell(
|
||||
document_id,
|
||||
row_idx,
|
||||
col_idx,
|
||||
cell_text,
|
||||
bold_headers and row_idx == 0,
|
||||
tab_id,
|
||||
)
|
||||
|
||||
if success:
|
||||
population_count += 1
|
||||
logger.debug(f"Populated cell ({row_idx},{col_idx})")
|
||||
else:
|
||||
logger.warning(f"Failed to populate cell ({row_idx},{col_idx})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error populating cell ({row_idx},{col_idx}): {str(e)}"
|
||||
)
|
||||
|
||||
return population_count
|
||||
|
||||
async def _populate_single_cell(
|
||||
self,
|
||||
document_id: str,
|
||||
row_idx: int,
|
||||
col_idx: int,
|
||||
cell_text: str,
|
||||
apply_bold: bool = False,
|
||||
tab_id: str = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Populate a single cell with text, with optional bold formatting.
|
||||
|
||||
Returns True if successful, False otherwise.
|
||||
"""
|
||||
try:
|
||||
# Get fresh table structure to avoid index shifting issues
|
||||
tables = await self._get_document_tables(document_id, tab_id)
|
||||
if not tables:
|
||||
return False
|
||||
|
||||
table = tables[-1] # Use the last table (newly created one)
|
||||
cells = table.get("cells", [])
|
||||
|
||||
# Bounds checking
|
||||
if row_idx >= len(cells) or col_idx >= len(cells[row_idx]):
|
||||
logger.error(f"Cell ({row_idx},{col_idx}) out of bounds")
|
||||
return False
|
||||
|
||||
cell = cells[row_idx][col_idx]
|
||||
insertion_index = cell.get("insertion_index")
|
||||
|
||||
if not insertion_index:
|
||||
logger.warning(f"No insertion_index for cell ({row_idx},{col_idx})")
|
||||
return False
|
||||
|
||||
# Insert text
|
||||
await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(
|
||||
documentId=document_id,
|
||||
body={
|
||||
"requests": [
|
||||
{
|
||||
"insertText": {
|
||||
"location": {"index": insertion_index},
|
||||
"text": cell_text,
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
.execute
|
||||
)
|
||||
|
||||
# Apply bold formatting if requested
|
||||
if apply_bold:
|
||||
await self._apply_bold_formatting(
|
||||
document_id, insertion_index, insertion_index + len(cell_text)
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to populate single cell: {str(e)}")
|
||||
return False
|
||||
|
||||
async def _apply_bold_formatting(
|
||||
self, document_id: str, start_index: int, end_index: int
|
||||
) -> None:
|
||||
"""Apply bold formatting to a text range."""
|
||||
await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(
|
||||
documentId=document_id,
|
||||
body={
|
||||
"requests": [
|
||||
{
|
||||
"updateTextStyle": {
|
||||
"range": {
|
||||
"startIndex": start_index,
|
||||
"endIndex": end_index,
|
||||
},
|
||||
"textStyle": {"bold": True},
|
||||
"fields": "bold",
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
.execute
|
||||
)
|
||||
|
||||
async def populate_existing_table(
|
||||
self,
|
||||
document_id: str,
|
||||
table_index: int,
|
||||
table_data: List[List[str]],
|
||||
clear_existing: bool = False,
|
||||
) -> Tuple[bool, str, Dict[str, Any]]:
|
||||
"""
|
||||
Populate an existing table with data.
|
||||
|
||||
Args:
|
||||
document_id: ID of the document
|
||||
table_index: Index of the table to populate (0-based)
|
||||
table_data: 2D list of data to insert
|
||||
clear_existing: Whether to clear existing content first
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message, metadata)
|
||||
"""
|
||||
try:
|
||||
tables = await self._get_document_tables(document_id)
|
||||
if table_index >= len(tables):
|
||||
return (
|
||||
False,
|
||||
f"Table index {table_index} not found. Document has {len(tables)} tables",
|
||||
{},
|
||||
)
|
||||
|
||||
table_info = tables[table_index]
|
||||
|
||||
# Validate dimensions
|
||||
table_rows = table_info["rows"]
|
||||
table_cols = table_info["columns"]
|
||||
data_rows = len(table_data)
|
||||
data_cols = len(table_data[0]) if table_data else 0
|
||||
|
||||
if data_rows > table_rows or data_cols > table_cols:
|
||||
return (
|
||||
False,
|
||||
f"Data ({data_rows}x{data_cols}) exceeds table dimensions ({table_rows}x{table_cols})",
|
||||
{},
|
||||
)
|
||||
|
||||
# Populate cells
|
||||
population_count = await self._populate_existing_table_cells(
|
||||
document_id, table_index, table_data
|
||||
)
|
||||
|
||||
metadata = {
|
||||
"table_index": table_index,
|
||||
"populated_cells": population_count,
|
||||
"table_dimensions": f"{table_rows}x{table_cols}",
|
||||
"data_dimensions": f"{data_rows}x{data_cols}",
|
||||
}
|
||||
|
||||
return (
|
||||
True,
|
||||
f"Successfully populated {population_count} cells in existing table",
|
||||
metadata,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return False, f"Failed to populate existing table: {str(e)}", {}
|
||||
|
||||
async def _populate_existing_table_cells(
|
||||
self, document_id: str, table_index: int, table_data: List[List[str]]
|
||||
) -> int:
|
||||
"""Populate cells in an existing table."""
|
||||
population_count = 0
|
||||
|
||||
for row_idx, row_data in enumerate(table_data):
|
||||
for col_idx, cell_text in enumerate(row_data):
|
||||
if not cell_text:
|
||||
continue
|
||||
|
||||
# Get fresh table structure for each cell
|
||||
tables = await self._get_document_tables(document_id)
|
||||
if table_index >= len(tables):
|
||||
break
|
||||
|
||||
table = tables[table_index]
|
||||
cells = table.get("cells", [])
|
||||
|
||||
if row_idx >= len(cells) or col_idx >= len(cells[row_idx]):
|
||||
continue
|
||||
|
||||
cell = cells[row_idx][col_idx]
|
||||
|
||||
# For existing tables, append to existing content
|
||||
cell_end = cell["end_index"] - 1 # Don't include cell end marker
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(
|
||||
self.service.documents()
|
||||
.batchUpdate(
|
||||
documentId=document_id,
|
||||
body={
|
||||
"requests": [
|
||||
{
|
||||
"insertText": {
|
||||
"location": {"index": cell_end},
|
||||
"text": cell_text,
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
.execute
|
||||
)
|
||||
population_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to populate existing cell ({row_idx},{col_idx}): {str(e)}"
|
||||
)
|
||||
|
||||
return population_count
|
||||
727
gdocs/managers/validation_manager.py
Normal file
727
gdocs/managers/validation_manager.py
Normal file
@@ -0,0 +1,727 @@
|
||||
"""
|
||||
Validation Manager
|
||||
|
||||
This module provides centralized validation logic for Google Docs operations,
|
||||
extracting validation patterns from individual tool functions.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List, Tuple, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from gdocs.docs_helpers import validate_operation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ValidationManager:
|
||||
"""
|
||||
Centralized validation manager for Google Docs operations.
|
||||
|
||||
Provides consistent validation patterns and error messages across
|
||||
all document operations, reducing code duplication and improving
|
||||
error message quality.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the validation manager."""
|
||||
self.validation_rules = self._setup_validation_rules()
|
||||
|
||||
def _setup_validation_rules(self) -> Dict[str, Any]:
|
||||
"""Setup validation rules and constraints."""
|
||||
return {
|
||||
"table_max_rows": 1000,
|
||||
"table_max_columns": 20,
|
||||
"document_id_pattern": r"^[a-zA-Z0-9-_]+$",
|
||||
"max_text_length": 1000000, # 1MB text limit
|
||||
"font_size_range": (1, 400), # Google Docs font size limits
|
||||
"valid_header_footer_types": ["DEFAULT", "FIRST_PAGE_ONLY", "EVEN_PAGE"],
|
||||
"valid_section_types": ["header", "footer"],
|
||||
"valid_list_types": ["UNORDERED", "ORDERED"],
|
||||
"valid_element_types": ["table", "list", "page_break"],
|
||||
"valid_alignments": ["START", "CENTER", "END", "JUSTIFIED"],
|
||||
"heading_level_range": (0, 6),
|
||||
}
|
||||
|
||||
def validate_document_id(self, document_id: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate Google Docs document ID format.
|
||||
|
||||
Args:
|
||||
document_id: Document ID to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if not document_id:
|
||||
return False, "Document ID cannot be empty"
|
||||
|
||||
if not isinstance(document_id, str):
|
||||
return (
|
||||
False,
|
||||
f"Document ID must be a string, got {type(document_id).__name__}",
|
||||
)
|
||||
|
||||
# Basic length check (Google Docs IDs are typically 40+ characters)
|
||||
if len(document_id) < 20:
|
||||
return False, "Document ID appears too short to be valid"
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_table_data(self, table_data: List[List[str]]) -> Tuple[bool, str]:
|
||||
"""
|
||||
Comprehensive validation for table data format.
|
||||
|
||||
This extracts and centralizes table validation logic from multiple functions.
|
||||
|
||||
Args:
|
||||
table_data: 2D array of data to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, detailed_error_message)
|
||||
"""
|
||||
if not table_data:
|
||||
return (
|
||||
False,
|
||||
"Table data cannot be empty. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
|
||||
)
|
||||
|
||||
if not isinstance(table_data, list):
|
||||
return (
|
||||
False,
|
||||
f"Table data must be a list, got {type(table_data).__name__}. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
|
||||
)
|
||||
|
||||
# Check if it's a 2D list
|
||||
if not all(isinstance(row, list) for row in table_data):
|
||||
non_list_rows = [
|
||||
i for i, row in enumerate(table_data) if not isinstance(row, list)
|
||||
]
|
||||
return (
|
||||
False,
|
||||
f"All rows must be lists. Rows {non_list_rows} are not lists. Required format: [['col1', 'col2'], ['row1col1', 'row1col2']]",
|
||||
)
|
||||
|
||||
# Check for empty rows
|
||||
if any(len(row) == 0 for row in table_data):
|
||||
empty_rows = [i for i, row in enumerate(table_data) if len(row) == 0]
|
||||
return (
|
||||
False,
|
||||
f"Rows cannot be empty. Empty rows found at indices: {empty_rows}",
|
||||
)
|
||||
|
||||
# Check column consistency
|
||||
col_counts = [len(row) for row in table_data]
|
||||
if len(set(col_counts)) > 1:
|
||||
return (
|
||||
False,
|
||||
f"All rows must have the same number of columns. Found column counts: {col_counts}. Fix your data structure.",
|
||||
)
|
||||
|
||||
rows = len(table_data)
|
||||
cols = col_counts[0]
|
||||
|
||||
# Check dimension limits
|
||||
if rows > self.validation_rules["table_max_rows"]:
|
||||
return (
|
||||
False,
|
||||
f"Too many rows ({rows}). Maximum allowed: {self.validation_rules['table_max_rows']}",
|
||||
)
|
||||
|
||||
if cols > self.validation_rules["table_max_columns"]:
|
||||
return (
|
||||
False,
|
||||
f"Too many columns ({cols}). Maximum allowed: {self.validation_rules['table_max_columns']}",
|
||||
)
|
||||
|
||||
# Check cell content types
|
||||
for row_idx, row in enumerate(table_data):
|
||||
for col_idx, cell in enumerate(row):
|
||||
if cell is None:
|
||||
return (
|
||||
False,
|
||||
f"Cell ({row_idx},{col_idx}) is None. All cells must be strings, use empty string '' for empty cells.",
|
||||
)
|
||||
|
||||
if not isinstance(cell, str):
|
||||
return (
|
||||
False,
|
||||
f"Cell ({row_idx},{col_idx}) is {type(cell).__name__}, not string. All cells must be strings. Value: {repr(cell)}",
|
||||
)
|
||||
|
||||
return True, f"Valid table data: {rows}×{cols} table format"
|
||||
|
||||
def validate_text_formatting_params(
|
||||
self,
|
||||
bold: Optional[bool] = None,
|
||||
italic: Optional[bool] = None,
|
||||
underline: Optional[bool] = None,
|
||||
font_size: Optional[int] = None,
|
||||
font_family: Optional[str] = None,
|
||||
text_color: Optional[str] = None,
|
||||
background_color: Optional[str] = None,
|
||||
link_url: Optional[str] = None,
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate text formatting parameters.
|
||||
|
||||
Args:
|
||||
bold: Bold setting
|
||||
italic: Italic setting
|
||||
underline: Underline setting
|
||||
font_size: Font size in points
|
||||
font_family: Font family name
|
||||
text_color: Text color in "#RRGGBB" format
|
||||
background_color: Background color in "#RRGGBB" format
|
||||
link_url: Hyperlink URL (http/https)
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
# Check if at least one formatting option is provided
|
||||
formatting_params = [
|
||||
bold,
|
||||
italic,
|
||||
underline,
|
||||
font_size,
|
||||
font_family,
|
||||
text_color,
|
||||
background_color,
|
||||
link_url,
|
||||
]
|
||||
if all(param is None for param in formatting_params):
|
||||
return (
|
||||
False,
|
||||
"At least one formatting parameter must be provided (bold, italic, underline, font_size, font_family, text_color, background_color, or link_url)",
|
||||
)
|
||||
|
||||
# Validate boolean parameters
|
||||
for param, name in [
|
||||
(bold, "bold"),
|
||||
(italic, "italic"),
|
||||
(underline, "underline"),
|
||||
]:
|
||||
if param is not None and not isinstance(param, bool):
|
||||
return (
|
||||
False,
|
||||
f"{name} parameter must be boolean (True/False), got {type(param).__name__}",
|
||||
)
|
||||
|
||||
# Validate font size
|
||||
if font_size is not None:
|
||||
if not isinstance(font_size, int):
|
||||
return (
|
||||
False,
|
||||
f"font_size must be an integer, got {type(font_size).__name__}",
|
||||
)
|
||||
|
||||
min_size, max_size = self.validation_rules["font_size_range"]
|
||||
if not (min_size <= font_size <= max_size):
|
||||
return (
|
||||
False,
|
||||
f"font_size must be between {min_size} and {max_size} points, got {font_size}",
|
||||
)
|
||||
|
||||
# Validate font family
|
||||
if font_family is not None:
|
||||
if not isinstance(font_family, str):
|
||||
return (
|
||||
False,
|
||||
f"font_family must be a string, got {type(font_family).__name__}",
|
||||
)
|
||||
|
||||
if not font_family.strip():
|
||||
return False, "font_family cannot be empty"
|
||||
|
||||
# Validate colors
|
||||
is_valid, error_msg = self.validate_color_param(text_color, "text_color")
|
||||
if not is_valid:
|
||||
return False, error_msg
|
||||
|
||||
is_valid, error_msg = self.validate_color_param(
|
||||
background_color, "background_color"
|
||||
)
|
||||
if not is_valid:
|
||||
return False, error_msg
|
||||
|
||||
is_valid, error_msg = self.validate_link_url(link_url)
|
||||
if not is_valid:
|
||||
return False, error_msg
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_link_url(self, link_url: Optional[str]) -> Tuple[bool, str]:
|
||||
"""Validate hyperlink URL parameters."""
|
||||
if link_url is None:
|
||||
return True, ""
|
||||
|
||||
if not isinstance(link_url, str):
|
||||
return False, f"link_url must be a string, got {type(link_url).__name__}"
|
||||
|
||||
if not link_url.strip():
|
||||
return False, "link_url cannot be empty"
|
||||
|
||||
parsed = urlparse(link_url)
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
return False, "link_url must start with http:// or https://"
|
||||
|
||||
if not parsed.netloc:
|
||||
return False, "link_url must include a valid host"
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_paragraph_style_params(
|
||||
self,
|
||||
heading_level: Optional[int] = None,
|
||||
alignment: Optional[str] = None,
|
||||
line_spacing: Optional[float] = None,
|
||||
indent_first_line: Optional[float] = None,
|
||||
indent_start: Optional[float] = None,
|
||||
indent_end: Optional[float] = None,
|
||||
space_above: Optional[float] = None,
|
||||
space_below: Optional[float] = None,
|
||||
named_style_type: Optional[str] = None,
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate paragraph style parameters.
|
||||
|
||||
Args:
|
||||
heading_level: Heading level 0-6 (0 = NORMAL_TEXT, 1-6 = HEADING_N)
|
||||
alignment: Text alignment - 'START', 'CENTER', 'END', or 'JUSTIFIED'
|
||||
line_spacing: Line spacing multiplier (must be positive)
|
||||
indent_first_line: First line indent in points
|
||||
indent_start: Left/start indent in points
|
||||
indent_end: Right/end indent in points
|
||||
space_above: Space above paragraph in points
|
||||
space_below: Space below paragraph in points
|
||||
named_style_type: Direct named style (TITLE, SUBTITLE, HEADING_1..6, NORMAL_TEXT)
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
style_params = [
|
||||
heading_level,
|
||||
alignment,
|
||||
line_spacing,
|
||||
indent_first_line,
|
||||
indent_start,
|
||||
indent_end,
|
||||
space_above,
|
||||
space_below,
|
||||
named_style_type,
|
||||
]
|
||||
if all(param is None for param in style_params):
|
||||
return (
|
||||
False,
|
||||
"At least one paragraph style parameter must be provided (heading_level, alignment, line_spacing, indent_first_line, indent_start, indent_end, space_above, space_below, or named_style_type)",
|
||||
)
|
||||
|
||||
if heading_level is not None and named_style_type is not None:
|
||||
return (
|
||||
False,
|
||||
"heading_level and named_style_type are mutually exclusive; provide only one",
|
||||
)
|
||||
|
||||
if named_style_type is not None:
|
||||
valid_styles = [
|
||||
"NORMAL_TEXT",
|
||||
"TITLE",
|
||||
"SUBTITLE",
|
||||
"HEADING_1",
|
||||
"HEADING_2",
|
||||
"HEADING_3",
|
||||
"HEADING_4",
|
||||
"HEADING_5",
|
||||
"HEADING_6",
|
||||
]
|
||||
if named_style_type not in valid_styles:
|
||||
return (
|
||||
False,
|
||||
f"Invalid named_style_type '{named_style_type}'. Must be one of: {', '.join(valid_styles)}",
|
||||
)
|
||||
|
||||
if heading_level is not None:
|
||||
if not isinstance(heading_level, int):
|
||||
return (
|
||||
False,
|
||||
f"heading_level must be an integer, got {type(heading_level).__name__}",
|
||||
)
|
||||
min_level, max_level = self.validation_rules["heading_level_range"]
|
||||
if not (min_level <= heading_level <= max_level):
|
||||
return (
|
||||
False,
|
||||
f"heading_level must be between {min_level} and {max_level}, got {heading_level}",
|
||||
)
|
||||
|
||||
if alignment is not None:
|
||||
if not isinstance(alignment, str):
|
||||
return (
|
||||
False,
|
||||
f"alignment must be a string, got {type(alignment).__name__}",
|
||||
)
|
||||
valid = self.validation_rules["valid_alignments"]
|
||||
if alignment.upper() not in valid:
|
||||
return (
|
||||
False,
|
||||
f"alignment must be one of: {', '.join(valid)}, got '{alignment}'",
|
||||
)
|
||||
|
||||
if line_spacing is not None:
|
||||
if not isinstance(line_spacing, (int, float)):
|
||||
return (
|
||||
False,
|
||||
f"line_spacing must be a number, got {type(line_spacing).__name__}",
|
||||
)
|
||||
if line_spacing <= 0:
|
||||
return False, "line_spacing must be positive"
|
||||
|
||||
for param, name in [
|
||||
(indent_first_line, "indent_first_line"),
|
||||
(indent_start, "indent_start"),
|
||||
(indent_end, "indent_end"),
|
||||
(space_above, "space_above"),
|
||||
(space_below, "space_below"),
|
||||
]:
|
||||
if param is not None:
|
||||
if not isinstance(param, (int, float)):
|
||||
return (
|
||||
False,
|
||||
f"{name} must be a number, got {type(param).__name__}",
|
||||
)
|
||||
# indent_first_line may be negative (hanging indent)
|
||||
if name != "indent_first_line" and param < 0:
|
||||
return False, f"{name} must be non-negative, got {param}"
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_color_param(
|
||||
self, color: Optional[str], param_name: str
|
||||
) -> Tuple[bool, str]:
|
||||
"""Validate color parameters (hex string "#RRGGBB")."""
|
||||
if color is None:
|
||||
return True, ""
|
||||
|
||||
if not isinstance(color, str):
|
||||
return False, f"{param_name} must be a hex string like '#RRGGBB'"
|
||||
|
||||
if len(color) != 7 or not color.startswith("#"):
|
||||
return False, f"{param_name} must be a hex string like '#RRGGBB'"
|
||||
|
||||
hex_color = color[1:]
|
||||
if any(c not in "0123456789abcdefABCDEF" for c in hex_color):
|
||||
return False, f"{param_name} must be a hex string like '#RRGGBB'"
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_index(self, index: int, context: str = "Index") -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate a single document index.
|
||||
|
||||
Args:
|
||||
index: Index to validate
|
||||
context: Context description for error messages
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if not isinstance(index, int):
|
||||
return False, f"{context} must be an integer, got {type(index).__name__}"
|
||||
|
||||
if index < 0:
|
||||
return (
|
||||
False,
|
||||
f"{context} {index} is negative. You MUST call inspect_doc_structure first to get the proper insertion index.",
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_index_range(
|
||||
self,
|
||||
start_index: int,
|
||||
end_index: Optional[int] = None,
|
||||
document_length: Optional[int] = None,
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate document index ranges.
|
||||
|
||||
Args:
|
||||
start_index: Starting index
|
||||
end_index: Ending index (optional)
|
||||
document_length: Total document length for bounds checking
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
# Validate start_index
|
||||
if not isinstance(start_index, int):
|
||||
return (
|
||||
False,
|
||||
f"start_index must be an integer, got {type(start_index).__name__}",
|
||||
)
|
||||
|
||||
if start_index < 0:
|
||||
return False, f"start_index cannot be negative, got {start_index}"
|
||||
|
||||
# Validate end_index if provided
|
||||
if end_index is not None:
|
||||
if not isinstance(end_index, int):
|
||||
return (
|
||||
False,
|
||||
f"end_index must be an integer, got {type(end_index).__name__}",
|
||||
)
|
||||
|
||||
if end_index <= start_index:
|
||||
return (
|
||||
False,
|
||||
f"end_index ({end_index}) must be greater than start_index ({start_index})",
|
||||
)
|
||||
|
||||
# Validate against document length if provided
|
||||
if document_length is not None:
|
||||
if start_index >= document_length:
|
||||
return (
|
||||
False,
|
||||
f"start_index ({start_index}) exceeds document length ({document_length})",
|
||||
)
|
||||
|
||||
if end_index is not None and end_index > document_length:
|
||||
return (
|
||||
False,
|
||||
f"end_index ({end_index}) exceeds document length ({document_length})",
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_element_insertion_params(
|
||||
self, element_type: str, index: int, **kwargs
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate parameters for element insertion.
|
||||
|
||||
Args:
|
||||
element_type: Type of element to insert
|
||||
index: Insertion index
|
||||
**kwargs: Additional parameters specific to element type
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
# Validate element type
|
||||
if element_type not in self.validation_rules["valid_element_types"]:
|
||||
valid_types = ", ".join(self.validation_rules["valid_element_types"])
|
||||
return (
|
||||
False,
|
||||
f"Invalid element_type '{element_type}'. Must be one of: {valid_types}",
|
||||
)
|
||||
|
||||
# Validate index
|
||||
if not isinstance(index, int) or index < 0:
|
||||
return False, f"index must be a non-negative integer, got {index}"
|
||||
|
||||
# Validate element-specific parameters
|
||||
if element_type == "table":
|
||||
rows = kwargs.get("rows")
|
||||
columns = kwargs.get("columns")
|
||||
|
||||
if not rows or not columns:
|
||||
return False, "Table insertion requires 'rows' and 'columns' parameters"
|
||||
|
||||
if not isinstance(rows, int) or not isinstance(columns, int):
|
||||
return False, "Table rows and columns must be integers"
|
||||
|
||||
if rows <= 0 or columns <= 0:
|
||||
return False, "Table rows and columns must be positive integers"
|
||||
|
||||
if rows > self.validation_rules["table_max_rows"]:
|
||||
return (
|
||||
False,
|
||||
f"Too many rows ({rows}). Maximum: {self.validation_rules['table_max_rows']}",
|
||||
)
|
||||
|
||||
if columns > self.validation_rules["table_max_columns"]:
|
||||
return (
|
||||
False,
|
||||
f"Too many columns ({columns}). Maximum: {self.validation_rules['table_max_columns']}",
|
||||
)
|
||||
|
||||
elif element_type == "list":
|
||||
list_type = kwargs.get("list_type")
|
||||
|
||||
if not list_type:
|
||||
return False, "List insertion requires 'list_type' parameter"
|
||||
|
||||
if list_type not in self.validation_rules["valid_list_types"]:
|
||||
valid_types = ", ".join(self.validation_rules["valid_list_types"])
|
||||
return (
|
||||
False,
|
||||
f"Invalid list_type '{list_type}'. Must be one of: {valid_types}",
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_header_footer_params(
|
||||
self, section_type: str, header_footer_type: str = "DEFAULT"
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate header/footer operation parameters.
|
||||
|
||||
Args:
|
||||
section_type: Type of section ("header" or "footer")
|
||||
header_footer_type: Specific header/footer type
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if section_type not in self.validation_rules["valid_section_types"]:
|
||||
valid_types = ", ".join(self.validation_rules["valid_section_types"])
|
||||
return (
|
||||
False,
|
||||
f"section_type must be one of: {valid_types}, got '{section_type}'",
|
||||
)
|
||||
|
||||
if header_footer_type not in self.validation_rules["valid_header_footer_types"]:
|
||||
valid_types = ", ".join(self.validation_rules["valid_header_footer_types"])
|
||||
return (
|
||||
False,
|
||||
f"header_footer_type must be one of: {valid_types}, got '{header_footer_type}'",
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_batch_operations(
|
||||
self, operations: List[Dict[str, Any]]
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate a list of batch operations.
|
||||
|
||||
Args:
|
||||
operations: List of operation dictionaries
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if not operations:
|
||||
return False, "Operations list cannot be empty"
|
||||
|
||||
if not isinstance(operations, list):
|
||||
return False, f"Operations must be a list, got {type(operations).__name__}"
|
||||
|
||||
# Validate each operation
|
||||
for i, op in enumerate(operations):
|
||||
if not isinstance(op, dict):
|
||||
return (
|
||||
False,
|
||||
f"Operation {i + 1} must be a dictionary, got {type(op).__name__}",
|
||||
)
|
||||
|
||||
if "type" not in op:
|
||||
return False, f"Operation {i + 1} missing required 'type' field"
|
||||
|
||||
# Validate required fields for the operation type
|
||||
is_valid, error_msg = validate_operation(op)
|
||||
if not is_valid:
|
||||
return False, f"Operation {i + 1}: {error_msg}"
|
||||
|
||||
op_type = op["type"]
|
||||
|
||||
if op_type == "format_text":
|
||||
is_valid, error_msg = self.validate_text_formatting_params(
|
||||
op.get("bold"),
|
||||
op.get("italic"),
|
||||
op.get("underline"),
|
||||
op.get("font_size"),
|
||||
op.get("font_family"),
|
||||
op.get("text_color"),
|
||||
op.get("background_color"),
|
||||
op.get("link_url"),
|
||||
)
|
||||
if not is_valid:
|
||||
return False, f"Operation {i + 1} (format_text): {error_msg}"
|
||||
|
||||
is_valid, error_msg = self.validate_index_range(
|
||||
op["start_index"], op["end_index"]
|
||||
)
|
||||
if not is_valid:
|
||||
return False, f"Operation {i + 1} (format_text): {error_msg}"
|
||||
|
||||
elif op_type == "update_paragraph_style":
|
||||
is_valid, error_msg = self.validate_paragraph_style_params(
|
||||
op.get("heading_level"),
|
||||
op.get("alignment"),
|
||||
op.get("line_spacing"),
|
||||
op.get("indent_first_line"),
|
||||
op.get("indent_start"),
|
||||
op.get("indent_end"),
|
||||
op.get("space_above"),
|
||||
op.get("space_below"),
|
||||
op.get("named_style_type"),
|
||||
)
|
||||
if not is_valid:
|
||||
return (
|
||||
False,
|
||||
f"Operation {i + 1} (update_paragraph_style): {error_msg}",
|
||||
)
|
||||
|
||||
is_valid, error_msg = self.validate_index_range(
|
||||
op["start_index"], op["end_index"]
|
||||
)
|
||||
if not is_valid:
|
||||
return (
|
||||
False,
|
||||
f"Operation {i + 1} (update_paragraph_style): {error_msg}",
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
def validate_text_content(
|
||||
self, text: str, max_length: Optional[int] = None
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate text content for insertion.
|
||||
|
||||
Args:
|
||||
text: Text to validate
|
||||
max_length: Maximum allowed length
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
return False, f"Text must be a string, got {type(text).__name__}"
|
||||
|
||||
max_len = max_length or self.validation_rules["max_text_length"]
|
||||
if len(text) > max_len:
|
||||
return False, f"Text too long ({len(text)} characters). Maximum: {max_len}"
|
||||
|
||||
return True, ""
|
||||
|
||||
def get_validation_summary(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get a summary of all validation rules and constraints.
|
||||
|
||||
Returns:
|
||||
Dictionary containing validation rules
|
||||
"""
|
||||
return {
|
||||
"constraints": self.validation_rules.copy(),
|
||||
"supported_operations": {
|
||||
"table_operations": ["create_table", "populate_table"],
|
||||
"text_operations": [
|
||||
"insert_text",
|
||||
"format_text",
|
||||
"find_replace",
|
||||
"update_paragraph_style",
|
||||
],
|
||||
"element_operations": [
|
||||
"insert_table",
|
||||
"insert_list",
|
||||
"insert_page_break",
|
||||
],
|
||||
"header_footer_operations": ["update_header", "update_footer"],
|
||||
},
|
||||
"data_formats": {
|
||||
"table_data": "2D list of strings: [['col1', 'col2'], ['row1col1', 'row1col2']]",
|
||||
"text_formatting": "Optional boolean/integer parameters for styling",
|
||||
"document_indices": "Non-negative integers for position specification",
|
||||
},
|
||||
}
|
||||
Reference in New Issue
Block a user