Merge branch 'main' of https://github.com/taylorwilsdon/google_workspace_mcp into fix/email-threading-headers

This commit is contained in:
Taylor Wilsdon
2026-03-15 18:19:09 -04:00
16 changed files with 1575 additions and 1052 deletions

View File

@@ -13,7 +13,7 @@ RUN pip install --no-cache-dir uv
COPY . . COPY . .
# Install Python dependencies using uv sync # Install Python dependencies using uv sync
RUN uv sync --frozen --no-dev RUN uv sync --frozen --no-dev --extra disk
# Create non-root user for security # Create non-root user for security
RUN useradd --create-home --shell /bin/bash app \ RUN useradd --create-home --shell /bin/bash app \

View File

@@ -10,10 +10,9 @@
[![PyPI Downloads](https://static.pepy.tech/personalized-badge/workspace-mcp?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=BLUE&left_text=downloads)](https://pepy.tech/projects/workspace-mcp) [![PyPI Downloads](https://static.pepy.tech/personalized-badge/workspace-mcp?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=BLUE&left_text=downloads)](https://pepy.tech/projects/workspace-mcp)
[![Website](https://img.shields.io/badge/Website-workspacemcp.com-green.svg)](https://workspacemcp.com) [![Website](https://img.shields.io/badge/Website-workspacemcp.com-green.svg)](https://workspacemcp.com)
*Full natural language control over Google Calendar, Drive, Gmail, Docs, Sheets, Slides, Forms, Tasks, Contacts, and Chat through all MCP clients, AI assistants and developer tools. Now includes a full featured CLI for use with tools like Claude Code and Codex!* *Full natural language control over Google Calendar, Drive, Gmail, Docs, Sheets, Slides, Forms, Tasks, Contacts, and Chat through all MCP clients, AI assistants and developer tools. Includes a full featured CLI for use with tools like Claude Code and Codex!*
**The most feature-complete Google Workspace MCP server**, with Remote OAuth2.1 multi-user support and 1-click Claude installation.
**The most feature-complete Google Workspace MCP server**, with Remote OAuth2.1 multi-user support and 1-click Claude installation. With native OAuth 2.1, stateless mode and external auth server support, it's the only Workspace MCP you can host for your whole organization centrally & securely!
###### Support for all free Google accounts (Gmail, Docs, Drive etc) & Google Workspace plans (Starter, Standard, Plus, Enterprise, Non Profit) with expanded app options like Chat & Spaces. <br/><br /> Interested in a private, managed cloud instance? [That can be arranged.](https://workspacemcp.com/workspace-mcp-cloud) ###### Support for all free Google accounts (Gmail, Docs, Drive etc) & Google Workspace plans (Starter, Standard, Plus, Enterprise, Non Profit) with expanded app options like Chat & Spaces. <br/><br /> Interested in a private, managed cloud instance? [That can be arranged.](https://workspacemcp.com/workspace-mcp-cloud)
@@ -1308,6 +1307,8 @@ export WORKSPACE_MCP_OAUTH_PROXY_VALKEY_HOST=redis.example.com
export WORKSPACE_MCP_OAUTH_PROXY_VALKEY_PORT=6379 export WORKSPACE_MCP_OAUTH_PROXY_VALKEY_PORT=6379
``` ```
> Disk support requires `workspace-mcp[disk]` (or `py-key-value-aio[disk]`) when installing from source.
> The official Docker image includes the `disk` extra by default.
> Valkey support is optional. Install `workspace-mcp[valkey]` (or `py-key-value-aio[valkey]`) only if you enable the Valkey backend. > Valkey support is optional. Install `workspace-mcp[valkey]` (or `py-key-value-aio[valkey]`) only if you enable the Valkey backend.
> Windows: building `valkey-glide` from source requires MSVC C++ build tools with C11 support. If you see `aws-lc-sys` C11 errors, set `CFLAGS=/std:c11`. > Windows: building `valkey-glide` from source requires MSVC C++ build tools with C11 support. If you see `aws-lc-sys` C11 errors, set `CFLAGS=/std:c11`.

View File

@@ -641,8 +641,8 @@ def get_credentials(
f"[get_credentials] Found OAuth 2.1 credentials for MCP session {session_id}" f"[get_credentials] Found OAuth 2.1 credentials for MCP session {session_id}"
) )
# Refresh expired credentials before checking scopes # Refresh invalid credentials before checking scopes
if credentials.expired and credentials.refresh_token: if (not credentials.valid) and credentials.refresh_token:
try: try:
credentials.refresh(Request()) credentials.refresh(Request())
logger.info( logger.info(
@@ -772,9 +772,9 @@ def get_credentials(
logger.debug( logger.debug(
f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'" f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'"
) )
elif credentials.expired and credentials.refresh_token: elif credentials.refresh_token:
logger.info( logger.info(
f"[get_credentials] Credentials expired. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'" f"[get_credentials] Credentials not valid. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'"
) )
try: try:
logger.debug( logger.debug(

View File

@@ -14,7 +14,7 @@ Other services: readonly, full (extensible by adding entries to SERVICE_PERMISSI
""" """
import logging import logging
from typing import Dict, FrozenSet, List, Optional, Set, Tuple from typing import Dict, FrozenSet, List, Optional, Tuple
from auth.scopes import ( from auth.scopes import (
GMAIL_READONLY_SCOPE, GMAIL_READONLY_SCOPE,

View File

@@ -113,6 +113,7 @@ def build_paragraph_style(
indent_end: float = None, indent_end: float = None,
space_above: float = None, space_above: float = None,
space_below: float = None, space_below: float = None,
named_style_type: str = None,
) -> tuple[Dict[str, Any], list[str]]: ) -> tuple[Dict[str, Any], list[str]]:
""" """
Build paragraph style object for Google Docs API requests. Build paragraph style object for Google Docs API requests.
@@ -126,6 +127,8 @@ def build_paragraph_style(
indent_end: Right/end indent in points indent_end: Right/end indent in points
space_above: Space above paragraph in points space_above: Space above paragraph in points
space_below: Space below paragraph in points space_below: Space below paragraph in points
named_style_type: Direct named style (TITLE, SUBTITLE, HEADING_1..6, NORMAL_TEXT).
Takes precedence over heading_level when both are provided.
Returns: Returns:
Tuple of (paragraph_style_dict, list_of_field_names) Tuple of (paragraph_style_dict, list_of_field_names)
@@ -133,7 +136,26 @@ def build_paragraph_style(
paragraph_style = {} paragraph_style = {}
fields = [] fields = []
if heading_level is not None: if named_style_type is not None:
valid_styles = [
"NORMAL_TEXT",
"TITLE",
"SUBTITLE",
"HEADING_1",
"HEADING_2",
"HEADING_3",
"HEADING_4",
"HEADING_5",
"HEADING_6",
]
if named_style_type not in valid_styles:
raise ValueError(
f"Invalid named_style_type '{named_style_type}'. "
f"Must be one of: {', '.join(valid_styles)}"
)
paragraph_style["namedStyleType"] = named_style_type
fields.append("namedStyleType")
elif heading_level is not None:
if heading_level < 0 or heading_level > 6: if heading_level < 0 or heading_level > 6:
raise ValueError("heading_level must be between 0 (normal text) and 6") raise ValueError("heading_level must be between 0 (normal text) and 6")
if heading_level == 0: if heading_level == 0:
@@ -321,6 +343,7 @@ def create_update_paragraph_style_request(
space_above: float = None, space_above: float = None,
space_below: float = None, space_below: float = None,
tab_id: Optional[str] = None, tab_id: Optional[str] = None,
named_style_type: str = None,
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
""" """
Create an updateParagraphStyle request for Google Docs API. Create an updateParagraphStyle request for Google Docs API.
@@ -337,6 +360,7 @@ def create_update_paragraph_style_request(
space_above: Space above paragraph in points space_above: Space above paragraph in points
space_below: Space below paragraph in points space_below: Space below paragraph in points
tab_id: Optional ID of the tab to target tab_id: Optional ID of the tab to target
named_style_type: Direct named style (TITLE, SUBTITLE, HEADING_1..6, NORMAL_TEXT)
Returns: Returns:
Dictionary representing the updateParagraphStyle request, or None if no styles provided Dictionary representing the updateParagraphStyle request, or None if no styles provided
@@ -350,6 +374,7 @@ def create_update_paragraph_style_request(
indent_end, indent_end,
space_above, space_above,
space_below, space_below,
named_style_type,
) )
if not paragraph_style: if not paragraph_style:
@@ -628,6 +653,33 @@ def create_bullet_list_request(
return requests return requests
def create_delete_bullet_list_request(
start_index: int,
end_index: int,
doc_tab_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a deleteParagraphBullets request to remove bullet/list formatting.
Args:
start_index: Start of the paragraph range
end_index: End of the paragraph range
doc_tab_id: Optional ID of the tab to target
Returns:
Dictionary representing the deleteParagraphBullets request
"""
range_obj = {"startIndex": start_index, "endIndex": end_index}
if doc_tab_id:
range_obj["tabId"] = doc_tab_id
return {
"deleteParagraphBullets": {
"range": range_obj,
}
}
def validate_operation(operation: Dict[str, Any]) -> tuple[bool, str]: def validate_operation(operation: Dict[str, Any]) -> tuple[bool, str]:
""" """
Validate a batch operation dictionary. Validate a batch operation dictionary.
@@ -652,6 +704,7 @@ def validate_operation(operation: Dict[str, Any]) -> tuple[bool, str]:
"insert_table": ["index", "rows", "columns"], "insert_table": ["index", "rows", "columns"],
"insert_page_break": ["index"], "insert_page_break": ["index"],
"find_replace": ["find_text", "replace_text"], "find_replace": ["find_text", "replace_text"],
"create_bullet_list": ["start_index", "end_index"],
"insert_doc_tab": ["title", "index"], "insert_doc_tab": ["title", "index"],
"delete_doc_tab": ["tab_id"], "delete_doc_tab": ["tab_id"],
"update_doc_tab": ["tab_id", "title"], "update_doc_tab": ["tab_id", "title"],

View File

@@ -872,6 +872,10 @@ async def batch_update_doc(
insert_page_break- required: index (int) insert_page_break- required: index (int)
find_replace - required: find_text (str), replace_text (str) find_replace - required: find_text (str), replace_text (str)
optional: match_case (bool, default false) optional: match_case (bool, default false)
create_bullet_list - required: start_index (int), end_index (int)
optional: list_type ('UNORDERED'|'ORDERED'|'NONE', default UNORDERED),
nesting_level (0-8), paragraph_start_indices (list[int])
Use list_type='NONE' to remove existing bullet/list formatting
insert_doc_tab - required: title (str), index (int) insert_doc_tab - required: title (str), index (int)
optional: parent_tab_id (str) optional: parent_tab_id (str)
delete_doc_tab - required: tab_id (str) delete_doc_tab - required: tab_id (str)

View File

@@ -17,6 +17,8 @@ from gdocs.docs_helpers import (
create_find_replace_request, create_find_replace_request,
create_insert_table_request, create_insert_table_request,
create_insert_page_break_request, create_insert_page_break_request,
create_bullet_list_request,
create_delete_bullet_list_request,
create_insert_doc_tab_request, create_insert_doc_tab_request,
create_delete_doc_tab_request, create_delete_doc_tab_request,
create_update_doc_tab_request, create_update_doc_tab_request,
@@ -244,6 +246,7 @@ class BatchOperationManager:
op.get("space_above"), op.get("space_above"),
op.get("space_below"), op.get("space_below"),
tab_id, tab_id,
op.get("named_style_type"),
) )
if not request: if not request:
@@ -301,6 +304,33 @@ class BatchOperationManager:
) )
description = f"find/replace '{op['find_text']}''{op['replace_text']}'" description = f"find/replace '{op['find_text']}''{op['replace_text']}'"
elif op_type == "create_bullet_list":
list_type = op.get("list_type", "UNORDERED")
if list_type not in ("UNORDERED", "ORDERED", "NONE"):
raise ValueError(
f"Invalid list_type '{list_type}'. Must be 'UNORDERED', 'ORDERED', or 'NONE'"
)
if list_type == "NONE":
request = create_delete_bullet_list_request(
op["start_index"], op["end_index"], tab_id
)
description = f"remove bullets {op['start_index']}-{op['end_index']}"
else:
request = create_bullet_list_request(
op["start_index"],
op["end_index"],
list_type,
op.get("nesting_level"),
op.get("paragraph_start_indices"),
tab_id,
)
style = "bulleted" if list_type == "UNORDERED" else "numbered"
description = (
f"create {style} list {op['start_index']}-{op['end_index']}"
)
if op.get("nesting_level"):
description += f" (nesting level {op['nesting_level']})"
elif op_type == "insert_doc_tab": elif op_type == "insert_doc_tab":
request = create_insert_doc_tab_request( request = create_insert_doc_tab_request(
op["title"], op["index"], op.get("parent_tab_id") op["title"], op["index"], op.get("parent_tab_id")
@@ -327,6 +357,7 @@ class BatchOperationManager:
"insert_table", "insert_table",
"insert_page_break", "insert_page_break",
"find_replace", "find_replace",
"create_bullet_list",
"insert_doc_tab", "insert_doc_tab",
"delete_doc_tab", "delete_doc_tab",
"update_doc_tab", "update_doc_tab",
@@ -460,6 +491,15 @@ class BatchOperationManager:
"optional": ["match_case"], "optional": ["match_case"],
"description": "Find and replace text throughout document", "description": "Find and replace text throughout document",
}, },
"create_bullet_list": {
"required": ["start_index", "end_index"],
"optional": [
"list_type",
"nesting_level",
"paragraph_start_indices",
],
"description": "Apply or remove native bullet/numbered list formatting (list_type: UNORDERED, ORDERED, or NONE to remove; nesting_level: 0-8)",
},
"insert_doc_tab": { "insert_doc_tab": {
"required": ["title", "index"], "required": ["title", "index"],
"description": "Insert a new document tab with given title at specified index", "description": "Insert a new document tab with given title at specified index",

View File

@@ -280,6 +280,7 @@ class ValidationManager:
indent_end: Optional[float] = None, indent_end: Optional[float] = None,
space_above: Optional[float] = None, space_above: Optional[float] = None,
space_below: Optional[float] = None, space_below: Optional[float] = None,
named_style_type: Optional[str] = None,
) -> Tuple[bool, str]: ) -> Tuple[bool, str]:
""" """
Validate paragraph style parameters. Validate paragraph style parameters.
@@ -293,6 +294,7 @@ class ValidationManager:
indent_end: Right/end indent in points indent_end: Right/end indent in points
space_above: Space above paragraph in points space_above: Space above paragraph in points
space_below: Space below paragraph in points space_below: Space below paragraph in points
named_style_type: Direct named style (TITLE, SUBTITLE, HEADING_1..6, NORMAL_TEXT)
Returns: Returns:
Tuple of (is_valid, error_message) Tuple of (is_valid, error_message)
@@ -306,13 +308,32 @@ class ValidationManager:
indent_end, indent_end,
space_above, space_above,
space_below, space_below,
named_style_type,
] ]
if all(param is None for param in style_params): if all(param is None for param in style_params):
return ( return (
False, False,
"At least one paragraph style parameter must be provided (heading_level, alignment, line_spacing, indent_first_line, indent_start, indent_end, space_above, or space_below)", "At least one paragraph style parameter must be provided (heading_level, alignment, line_spacing, indent_first_line, indent_start, indent_end, space_above, space_below, or named_style_type)",
) )
if named_style_type is not None:
valid_styles = [
"NORMAL_TEXT",
"TITLE",
"SUBTITLE",
"HEADING_1",
"HEADING_2",
"HEADING_3",
"HEADING_4",
"HEADING_5",
"HEADING_6",
]
if named_style_type not in valid_styles:
return (
False,
f"Invalid named_style_type '{named_style_type}'. Must be one of: {', '.join(valid_styles)}",
)
if heading_level is not None: if heading_level is not None:
if not isinstance(heading_level, int): if not isinstance(heading_level, int):
return ( return (
@@ -627,6 +648,7 @@ class ValidationManager:
op.get("indent_end"), op.get("indent_end"),
op.get("space_above"), op.get("space_above"),
op.get("space_below"), op.get("space_below"),
op.get("named_style_type"),
) )
if not is_valid: if not is_valid:
return ( return (

View File

@@ -6,6 +6,7 @@ This module provides MCP tools for interacting with Google Forms API.
import logging import logging
import asyncio import asyncio
import json
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
@@ -16,6 +17,105 @@ from core.utils import handle_http_errors
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _extract_option_values(options: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract valid option objects from Forms choice option objects.
Returns the full option dicts (preserving fields like ``isOther``,
``image``, ``goToAction``, and ``goToSectionId``) while filtering
out entries that lack a truthy ``value``.
"""
return [option for option in options if option.get("value")]
def _get_question_type(question: Dict[str, Any]) -> str:
"""Infer a stable question/item type label from a Forms question payload."""
choice_question = question.get("choiceQuestion")
if choice_question:
return choice_question.get("type", "CHOICE")
text_question = question.get("textQuestion")
if text_question:
return "PARAGRAPH" if text_question.get("paragraph") else "TEXT"
if "rowQuestion" in question:
return "GRID_ROW"
if "scaleQuestion" in question:
return "SCALE"
if "dateQuestion" in question:
return "DATE"
if "timeQuestion" in question:
return "TIME"
if "fileUploadQuestion" in question:
return "FILE_UPLOAD"
if "ratingQuestion" in question:
return "RATING"
return "QUESTION"
def _serialize_form_item(item: Dict[str, Any], index: int) -> Dict[str, Any]:
"""Serialize a Forms item with the key metadata agents need for edits."""
serialized_item: Dict[str, Any] = {
"index": index,
"itemId": item.get("itemId"),
"title": item.get("title", f"Question {index}"),
}
if item.get("description"):
serialized_item["description"] = item["description"]
if "questionItem" in item:
question = item.get("questionItem", {}).get("question", {})
serialized_item["type"] = _get_question_type(question)
serialized_item["required"] = question.get("required", False)
question_id = question.get("questionId")
if question_id:
serialized_item["questionId"] = question_id
choice_question = question.get("choiceQuestion")
if choice_question:
serialized_item["options"] = _extract_option_values(
choice_question.get("options", [])
)
return serialized_item
if "questionGroupItem" in item:
question_group = item.get("questionGroupItem", {})
columns = _extract_option_values(
question_group.get("grid", {}).get("columns", {}).get("options", [])
)
rows = []
for question in question_group.get("questions", []):
row: Dict[str, Any] = {
"title": question.get("rowQuestion", {}).get("title", "")
}
row_question_id = question.get("questionId")
if row_question_id:
row["questionId"] = row_question_id
row["required"] = question.get("required", False)
rows.append(row)
serialized_item["type"] = "GRID"
serialized_item["grid"] = {"rows": rows, "columns": columns}
return serialized_item
if "pageBreakItem" in item:
serialized_item["type"] = "PAGE_BREAK"
elif "textItem" in item:
serialized_item["type"] = "TEXT_ITEM"
elif "imageItem" in item:
serialized_item["type"] = "IMAGE"
elif "videoItem" in item:
serialized_item["type"] = "VIDEO"
else:
serialized_item["type"] = "UNKNOWN"
return serialized_item
@server.tool() @server.tool()
@handle_http_errors("create_form", service_type="forms") @handle_http_errors("create_form", service_type="forms")
@require_google_service("forms", "forms") @require_google_service("forms", "forms")
@@ -92,18 +192,24 @@ async def get_form(service, user_google_email: str, form_id: str) -> str:
) )
items = form.get("items", []) items = form.get("items", [])
questions_summary = [] serialized_items = [
for i, item in enumerate(items, 1): _serialize_form_item(item, i) for i, item in enumerate(items, 1)
item_title = item.get("title", f"Question {i}") ]
item_type = (
item.get("questionItem", {}).get("question", {}).get("required", False)
)
required_text = " (Required)" if item_type else ""
questions_summary.append(f" {i}. {item_title}{required_text}")
questions_text = ( items_summary = []
"\n".join(questions_summary) if questions_summary else " No questions found" for serialized_item in serialized_items:
item_index = serialized_item["index"]
item_title = serialized_item.get("title", f"Item {item_index}")
item_type = serialized_item.get("type", "UNKNOWN")
required_text = " (Required)" if serialized_item.get("required") else ""
items_summary.append(
f" {item_index}. {item_title} [{item_type}]{required_text}"
)
items_summary_text = (
"\n".join(items_summary) if items_summary else " No items found"
) )
items_text = json.dumps(serialized_items, indent=2) if serialized_items else "[]"
result = f"""Form Details for {user_google_email}: result = f"""Form Details for {user_google_email}:
- Title: "{title}" - Title: "{title}"
@@ -112,8 +218,10 @@ async def get_form(service, user_google_email: str, form_id: str) -> str:
- Form ID: {form_id} - Form ID: {form_id}
- Edit URL: {edit_url} - Edit URL: {edit_url}
- Responder URL: {responder_url} - Responder URL: {responder_url}
- Questions ({len(items)} total): - Items ({len(items)} total):
{questions_text}""" {items_summary_text}
- Items (structured):
{items_text}"""
logger.info(f"Successfully retrieved form for {user_google_email}. ID: {form_id}") logger.info(f"Successfully retrieved form for {user_google_email}. ID: {form_id}")
return result return result

View File

@@ -7,11 +7,15 @@ conditional formatting helpers.
import asyncio import asyncio
import json import json
import logging
import re import re
from typing import List, Optional, Union from typing import List, Optional, Union
from core.utils import UserInputError from core.utils import UserInputError
logger = logging.getLogger(__name__)
MAX_GRID_METADATA_CELLS = 5000
A1_PART_REGEX = re.compile(r"^([A-Za-z]*)(\d*)$") A1_PART_REGEX = re.compile(r"^([A-Za-z]*)(\d*)$")
SHEET_TITLE_SAFE_RE = re.compile(r"^[A-Za-z0-9_]+$") SHEET_TITLE_SAFE_RE = re.compile(r"^[A-Za-z0-9_]+$")
@@ -877,3 +881,170 @@ def _build_gradient_rule(
rule_body["gradientRule"]["midpoint"] = gradient_points[1] rule_body["gradientRule"]["midpoint"] = gradient_points[1]
rule_body["gradientRule"]["maxpoint"] = gradient_points[2] rule_body["gradientRule"]["maxpoint"] = gradient_points[2]
return rule_body return rule_body
def _extract_cell_notes_from_grid(spreadsheet: dict) -> list[dict[str, str]]:
"""
Extract cell notes from spreadsheet grid data.
Returns a list of dictionaries with:
- "cell": cell A1 reference
- "note": the note text
"""
notes: list[dict[str, str]] = []
for sheet in spreadsheet.get("sheets", []) or []:
sheet_title = sheet.get("properties", {}).get("title") or "Unknown"
for grid in sheet.get("data", []) or []:
start_row = _coerce_int(grid.get("startRow"), default=0)
start_col = _coerce_int(grid.get("startColumn"), default=0)
for row_offset, row_data in enumerate(grid.get("rowData", []) or []):
if not row_data:
continue
for col_offset, cell_data in enumerate(
row_data.get("values", []) or []
):
if not cell_data:
continue
note = cell_data.get("note")
if not note:
continue
notes.append(
{
"cell": _format_a1_cell(
sheet_title,
start_row + row_offset,
start_col + col_offset,
),
"note": note,
}
)
return notes
async def _fetch_sheet_notes(
service, spreadsheet_id: str, a1_range: str
) -> list[dict[str, str]]:
"""Fetch cell notes for the given range via spreadsheets.get with includeGridData."""
response = await asyncio.to_thread(
service.spreadsheets()
.get(
spreadsheetId=spreadsheet_id,
ranges=[a1_range],
includeGridData=True,
fields="sheets(properties(title),data(startRow,startColumn,rowData(values(note))))",
)
.execute
)
return _extract_cell_notes_from_grid(response)
def _format_sheet_notes_section(
*, notes: list[dict[str, str]], range_label: str, max_details: int = 25
) -> str:
"""
Format a list of cell notes into a human-readable section.
"""
if not notes:
return ""
lines = []
for item in notes[:max_details]:
cell = item.get("cell") or "(unknown cell)"
note = item.get("note") or "(empty note)"
lines.append(f"- {cell}: {note}")
suffix = (
f"\n... and {len(notes) - max_details} more notes"
if len(notes) > max_details
else ""
)
return f"\n\nCell notes in range '{range_label}':\n" + "\n".join(lines) + suffix
async def _fetch_grid_metadata(
service,
spreadsheet_id: str,
resolved_range: str,
values: List[List[object]],
include_hyperlinks: bool = False,
include_notes: bool = False,
) -> tuple[str, str]:
"""Fetch hyperlinks and/or notes for a range via a single spreadsheets.get call.
Computes tight range bounds, enforces the cell-count cap, builds a combined
``fields`` selector so only one API round-trip is needed when both flags are
``True``, then parses the response into formatted output sections.
Returns:
(hyperlink_section, notes_section) — each is an empty string when the
corresponding flag is ``False`` or no data was found.
"""
if not include_hyperlinks and not include_notes:
return "", ""
tight_range = _a1_range_for_values(resolved_range, values)
if not tight_range:
logger.info(
"[read_sheet_values] Skipping grid metadata fetch for range '%s': "
"unable to determine tight bounds",
resolved_range,
)
return "", ""
cell_count = _a1_range_cell_count(tight_range) or sum(len(row) for row in values)
if cell_count > MAX_GRID_METADATA_CELLS:
logger.info(
"[read_sheet_values] Skipping grid metadata fetch for large range "
"'%s' (%d cells > %d limit)",
tight_range,
cell_count,
MAX_GRID_METADATA_CELLS,
)
return "", ""
# Build a combined fields selector so we hit the API at most once.
value_fields: list[str] = []
if include_hyperlinks:
value_fields.extend(["hyperlink", "textFormatRuns(format(link(uri)))"])
if include_notes:
value_fields.append("note")
fields = (
"sheets(properties(title),data(startRow,startColumn,"
f"rowData(values({','.join(value_fields)}))))"
)
try:
response = await asyncio.to_thread(
service.spreadsheets()
.get(
spreadsheetId=spreadsheet_id,
ranges=[tight_range],
includeGridData=True,
fields=fields,
)
.execute
)
except Exception as exc:
logger.warning(
"[read_sheet_values] Failed fetching grid metadata for range '%s': %s",
tight_range,
exc,
)
return "", ""
hyperlink_section = ""
if include_hyperlinks:
hyperlinks = _extract_cell_hyperlinks_from_grid(response)
hyperlink_section = _format_sheet_hyperlink_section(
hyperlinks=hyperlinks, range_label=tight_range
)
notes_section = ""
if include_notes:
notes = _extract_cell_notes_from_grid(response)
notes_section = _format_sheet_notes_section(
notes=notes, range_label=tight_range
)
return hyperlink_section, notes_section

View File

@@ -15,16 +15,14 @@ from core.server import server
from core.utils import handle_http_errors, UserInputError from core.utils import handle_http_errors, UserInputError
from core.comments import create_comment_tools from core.comments import create_comment_tools
from gsheets.sheets_helpers import ( from gsheets.sheets_helpers import (
_a1_range_cell_count,
CONDITION_TYPES, CONDITION_TYPES,
_a1_range_for_values, _a1_range_for_values,
_build_boolean_rule, _build_boolean_rule,
_build_gradient_rule, _build_gradient_rule,
_fetch_detailed_sheet_errors, _fetch_detailed_sheet_errors,
_fetch_sheet_hyperlinks, _fetch_grid_metadata,
_fetch_sheets_with_rules, _fetch_sheets_with_rules,
_format_conditional_rules_section, _format_conditional_rules_section,
_format_sheet_hyperlink_section,
_format_sheet_error_section, _format_sheet_error_section,
_parse_a1_range, _parse_a1_range,
_parse_condition_values, _parse_condition_values,
@@ -36,7 +34,6 @@ from gsheets.sheets_helpers import (
# Configure module logger # Configure module logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MAX_HYPERLINK_FETCH_CELLS = 5000
@server.tool() @server.tool()
@@ -179,6 +176,7 @@ async def read_sheet_values(
spreadsheet_id: str, spreadsheet_id: str,
range_name: str = "A1:Z1000", range_name: str = "A1:Z1000",
include_hyperlinks: bool = False, include_hyperlinks: bool = False,
include_notes: bool = False,
) -> str: ) -> str:
""" """
Reads values from a specific range in a Google Sheet. Reads values from a specific range in a Google Sheet.
@@ -189,6 +187,8 @@ async def read_sheet_values(
range_name (str): The range to read (e.g., "Sheet1!A1:D10", "A1:D10"). Defaults to "A1:Z1000". range_name (str): The range to read (e.g., "Sheet1!A1:D10", "A1:D10"). Defaults to "A1:Z1000".
include_hyperlinks (bool): If True, also fetch hyperlink metadata for the range. include_hyperlinks (bool): If True, also fetch hyperlink metadata for the range.
Defaults to False to avoid expensive includeGridData requests. Defaults to False to avoid expensive includeGridData requests.
include_notes (bool): If True, also fetch cell notes for the range.
Defaults to False to avoid expensive includeGridData requests.
Returns: Returns:
str: The formatted values from the specified range. str: The formatted values from the specified range.
@@ -211,41 +211,14 @@ async def read_sheet_values(
resolved_range = result.get("range", range_name) resolved_range = result.get("range", range_name)
detailed_range = _a1_range_for_values(resolved_range, values) or resolved_range detailed_range = _a1_range_for_values(resolved_range, values) or resolved_range
hyperlink_section = "" hyperlink_section, notes_section = await _fetch_grid_metadata(
if include_hyperlinks: service,
# Use a tight A1 range for includeGridData fetches to avoid expensive spreadsheet_id,
# open-ended requests (e.g., A:Z). resolved_range,
hyperlink_range = _a1_range_for_values(resolved_range, values) values,
if not hyperlink_range: include_hyperlinks=include_hyperlinks,
logger.info( include_notes=include_notes,
"[read_sheet_values] Skipping hyperlink fetch for range '%s': unable to determine tight bounds", )
resolved_range,
)
else:
cell_count = _a1_range_cell_count(hyperlink_range) or sum(
len(row) for row in values
)
if cell_count <= MAX_HYPERLINK_FETCH_CELLS:
try:
hyperlinks = await _fetch_sheet_hyperlinks(
service, spreadsheet_id, hyperlink_range
)
hyperlink_section = _format_sheet_hyperlink_section(
hyperlinks=hyperlinks, range_label=hyperlink_range
)
except Exception as exc:
logger.warning(
"[read_sheet_values] Failed fetching hyperlinks for range '%s': %s",
hyperlink_range,
exc,
)
else:
logger.info(
"[read_sheet_values] Skipping hyperlink fetch for large range '%s' (%d cells > %d limit)",
hyperlink_range,
cell_count,
MAX_HYPERLINK_FETCH_CELLS,
)
detailed_errors_section = "" detailed_errors_section = ""
if _values_contain_sheets_errors(values): if _values_contain_sheets_errors(values):
@@ -277,7 +250,7 @@ async def read_sheet_values(
) )
logger.info(f"Successfully read {len(values)} rows for {user_google_email}.") logger.info(f"Successfully read {len(values)} rows for {user_google_email}.")
return text_output + hyperlink_section + detailed_errors_section return text_output + hyperlink_section + notes_section + detailed_errors_section
@server.tool() @server.tool()

View File

@@ -2,7 +2,7 @@
"dxt_version": "0.1", "dxt_version": "0.1",
"name": "workspace-mcp", "name": "workspace-mcp",
"display_name": "Google Workspace MCP", "display_name": "Google Workspace MCP",
"version": "1.14.2", "version": "1.14.3",
"description": "Full natural language control over Google Calendar, Drive, Gmail, Docs, Sheets, Slides, Forms, Tasks, Chat and Custom Search through all MCP clients, AI assistants and developer tools", "description": "Full natural language control over Google Calendar, Drive, Gmail, Docs, Sheets, Slides, Forms, Tasks, Chat and Custom Search through all MCP clients, AI assistants and developer tools",
"long_description": "A production-ready MCP server that integrates all major Google Workspace services with AI assistants. Includes Google PSE integration for custom web searches.", "long_description": "A production-ready MCP server that integrates all major Google Workspace services with AI assistants. Includes Google PSE integration for custom web searches.",
"author": { "author": {

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "workspace-mcp" name = "workspace-mcp"
version = "1.14.2" version = "1.14.3"
description = "Comprehensive, highly performant Google Workspace Streamable HTTP & SSE MCP Server for Calendar, Gmail, Docs, Sheets, Slides & Drive" description = "Comprehensive, highly performant Google Workspace Streamable HTTP & SSE MCP Server for Calendar, Gmail, Docs, Sheets, Slides & Drive"
readme = "README.md" readme = "README.md"
keywords = [ "mcp", "google", "workspace", "llm", "ai", "claude", "model", "context", "protocol", "server"] keywords = [ "mcp", "google", "workspace", "llm", "ai", "claude", "model", "context", "protocol", "server"]
@@ -58,6 +58,9 @@ Changelog = "https://github.com/taylorwilsdon/google_workspace_mcp/releases"
workspace-mcp = "main:main" workspace-mcp = "main:main"
[project.optional-dependencies] [project.optional-dependencies]
disk = [
"py-key-value-aio[disk]>=0.3.0",
]
valkey = [ valkey = [
"py-key-value-aio[valkey]>=0.3.0", "py-key-value-aio[valkey]>=0.3.0",
] ]
@@ -80,6 +83,9 @@ dev = [
] ]
[dependency-groups] [dependency-groups]
disk = [
"py-key-value-aio[disk]>=0.3.0",
]
valkey = [ valkey = [
"py-key-value-aio[valkey]>=0.3.0", "py-key-value-aio[valkey]>=0.3.0",
] ]

View File

@@ -3,7 +3,7 @@
"name": "io.github.taylorwilsdon/workspace-mcp", "name": "io.github.taylorwilsdon/workspace-mcp",
"description": "Google Workspace MCP server for Gmail, Drive, Calendar, Docs, Sheets, Slides, Forms, Tasks, Chat.", "description": "Google Workspace MCP server for Gmail, Drive, Calendar, Docs, Sheets, Slides, Forms, Tasks, Chat.",
"status": "active", "status": "active",
"version": "1.14.2", "version": "1.14.3",
"packages": [ "packages": [
{ {
"registryType": "pypi", "registryType": "pypi",
@@ -11,7 +11,7 @@
"transport": { "transport": {
"type": "stdio" "type": "stdio"
}, },
"version": "1.14.2" "version": "1.14.3"
} }
] ]
} }

View File

@@ -11,8 +11,8 @@ import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
# Import the internal implementation function (not the decorated one) # Import internal implementation functions (not decorated tool wrappers)
from gforms.forms_tools import _batch_update_form_impl from gforms.forms_tools import _batch_update_form_impl, _serialize_form_item, get_form
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -229,3 +229,116 @@ async def test_batch_update_form_mixed_reply_types():
assert "Replies Received: 3" in result assert "Replies Received: 3" in result
assert "item_a" in result assert "item_a" in result
assert "item_c" in result assert "item_c" in result
def test_serialize_form_item_choice_question_includes_ids_and_options():
"""Choice question items should expose questionId/options/type metadata."""
item = {
"itemId": "item_123",
"title": "Favorite color?",
"questionItem": {
"question": {
"questionId": "q_123",
"required": True,
"choiceQuestion": {
"type": "RADIO",
"options": [{"value": "Red"}, {"value": "Blue"}],
},
}
},
}
serialized = _serialize_form_item(item, 1)
assert serialized["index"] == 1
assert serialized["itemId"] == "item_123"
assert serialized["type"] == "RADIO"
assert serialized["questionId"] == "q_123"
assert serialized["required"] is True
assert serialized["options"] == [{"value": "Red"}, {"value": "Blue"}]
def test_serialize_form_item_grid_includes_row_and_column_structure():
"""Grid question groups should expose row labels/IDs and column options."""
item = {
"itemId": "grid_item_1",
"title": "Weekly chores",
"questionGroupItem": {
"questions": [
{
"questionId": "row_q1",
"required": True,
"rowQuestion": {"title": "Laundry"},
},
{
"questionId": "row_q2",
"required": False,
"rowQuestion": {"title": "Dishes"},
},
],
"grid": {"columns": {"options": [{"value": "Never"}, {"value": "Often"}]}},
},
}
serialized = _serialize_form_item(item, 2)
assert serialized["index"] == 2
assert serialized["type"] == "GRID"
assert serialized["grid"]["columns"] == [{"value": "Never"}, {"value": "Often"}]
assert serialized["grid"]["rows"] == [
{"title": "Laundry", "questionId": "row_q1", "required": True},
{"title": "Dishes", "questionId": "row_q2", "required": False},
]
@pytest.mark.asyncio
async def test_get_form_returns_structured_item_metadata():
"""get_form should include question IDs, options, and grid structure."""
mock_service = Mock()
mock_service.forms().get().execute.return_value = {
"formId": "form_1",
"info": {"title": "Survey", "description": "Test survey"},
"items": [
{
"itemId": "item_1",
"title": "Favorite fruit?",
"questionItem": {
"question": {
"questionId": "q_1",
"required": True,
"choiceQuestion": {
"type": "RADIO",
"options": [{"value": "Apple"}, {"value": "Banana"}],
},
}
},
},
{
"itemId": "item_2",
"title": "Household chores",
"questionGroupItem": {
"questions": [
{
"questionId": "row_1",
"required": True,
"rowQuestion": {"title": "Laundry"},
}
],
"grid": {"columns": {"options": [{"value": "Never"}]}},
},
},
],
}
# Bypass decorators and call the core implementation directly.
result = await get_form.__wrapped__.__wrapped__(
mock_service, "user@example.com", "form_1"
)
assert "- Items (structured):" in result
assert '"questionId": "q_1"' in result
assert '"options": [' in result
assert '"Apple"' in result
assert '"type": "GRID"' in result
assert '"columns": [' in result
assert '"rows": [' in result

1998
uv.lock generated

File diff suppressed because it is too large Load Diff