add
This commit is contained in:
@@ -6,6 +6,7 @@ This module provides MCP tools for interacting with Google Forms API.
|
||||
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
|
||||
@@ -16,6 +17,105 @@ from core.utils import handle_http_errors
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _extract_option_values(options: List[Dict[str, Any]]) -> List[str]:
|
||||
"""Extract non-empty option values from Forms choice option objects."""
|
||||
values: List[str] = []
|
||||
for option in options:
|
||||
value = option.get("value")
|
||||
if value:
|
||||
values.append(value)
|
||||
return values
|
||||
|
||||
|
||||
def _get_question_type(question: Dict[str, Any]) -> str:
|
||||
"""Infer a stable question/item type label from a Forms question payload."""
|
||||
choice_question = question.get("choiceQuestion")
|
||||
if choice_question:
|
||||
return choice_question.get("type", "CHOICE")
|
||||
|
||||
text_question = question.get("textQuestion")
|
||||
if text_question:
|
||||
return "PARAGRAPH" if text_question.get("paragraph") else "TEXT"
|
||||
|
||||
if "rowQuestion" in question:
|
||||
return "GRID_ROW"
|
||||
if "scaleQuestion" in question:
|
||||
return "SCALE"
|
||||
if "dateQuestion" in question:
|
||||
return "DATE"
|
||||
if "timeQuestion" in question:
|
||||
return "TIME"
|
||||
if "fileUploadQuestion" in question:
|
||||
return "FILE_UPLOAD"
|
||||
if "ratingQuestion" in question:
|
||||
return "RATING"
|
||||
|
||||
return "QUESTION"
|
||||
|
||||
|
||||
def _serialize_form_item(item: Dict[str, Any], index: int) -> Dict[str, Any]:
|
||||
"""Serialize a Forms item with the key metadata agents need for edits."""
|
||||
serialized_item: Dict[str, Any] = {
|
||||
"index": index,
|
||||
"itemId": item.get("itemId"),
|
||||
"title": item.get("title", f"Question {index}"),
|
||||
}
|
||||
|
||||
if item.get("description"):
|
||||
serialized_item["description"] = item["description"]
|
||||
|
||||
if "questionItem" in item:
|
||||
question = item.get("questionItem", {}).get("question", {})
|
||||
serialized_item["type"] = _get_question_type(question)
|
||||
serialized_item["required"] = question.get("required", False)
|
||||
|
||||
question_id = question.get("questionId")
|
||||
if question_id:
|
||||
serialized_item["questionId"] = question_id
|
||||
|
||||
choice_question = question.get("choiceQuestion")
|
||||
if choice_question:
|
||||
serialized_item["options"] = _extract_option_values(
|
||||
choice_question.get("options", [])
|
||||
)
|
||||
|
||||
return serialized_item
|
||||
|
||||
if "questionGroupItem" in item:
|
||||
question_group = item.get("questionGroupItem", {})
|
||||
columns = _extract_option_values(
|
||||
question_group.get("grid", {}).get("columns", {}).get("options", [])
|
||||
)
|
||||
|
||||
rows = []
|
||||
for question in question_group.get("questions", []):
|
||||
row: Dict[str, Any] = {
|
||||
"title": question.get("rowQuestion", {}).get("title", "")
|
||||
}
|
||||
row_question_id = question.get("questionId")
|
||||
if row_question_id:
|
||||
row["questionId"] = row_question_id
|
||||
row["required"] = question.get("required", False)
|
||||
rows.append(row)
|
||||
|
||||
serialized_item["type"] = "GRID"
|
||||
serialized_item["grid"] = {"rows": rows, "columns": columns}
|
||||
return serialized_item
|
||||
|
||||
if "pageBreakItem" in item:
|
||||
serialized_item["type"] = "PAGE_BREAK"
|
||||
elif "textItem" in item:
|
||||
serialized_item["type"] = "TEXT_ITEM"
|
||||
elif "imageItem" in item:
|
||||
serialized_item["type"] = "IMAGE"
|
||||
elif "videoItem" in item:
|
||||
serialized_item["type"] = "VIDEO"
|
||||
else:
|
||||
serialized_item["type"] = "UNKNOWN"
|
||||
|
||||
return serialized_item
|
||||
|
||||
|
||||
@server.tool()
|
||||
@handle_http_errors("create_form", service_type="forms")
|
||||
@require_google_service("forms", "forms")
|
||||
@@ -92,17 +192,23 @@ async def get_form(service, user_google_email: str, form_id: str) -> str:
|
||||
)
|
||||
|
||||
items = form.get("items", [])
|
||||
questions_summary = []
|
||||
for i, item in enumerate(items, 1):
|
||||
item_title = item.get("title", f"Question {i}")
|
||||
item_type = (
|
||||
item.get("questionItem", {}).get("question", {}).get("required", False)
|
||||
)
|
||||
required_text = " (Required)" if item_type else ""
|
||||
questions_summary.append(f" {i}. {item_title}{required_text}")
|
||||
serialized_items = [_serialize_form_item(item, i) for i, item in enumerate(items, 1)]
|
||||
|
||||
questions_text = (
|
||||
"\n".join(questions_summary) if questions_summary else " No questions found"
|
||||
questions_summary = []
|
||||
for serialized_item in serialized_items:
|
||||
item_index = serialized_item["index"]
|
||||
item_title = serialized_item.get("title", f"Question {item_index}")
|
||||
item_type = serialized_item.get("type", "UNKNOWN")
|
||||
required_text = " (Required)" if serialized_item.get("required") else ""
|
||||
questions_summary.append(
|
||||
f" {item_index}. {item_title} [{item_type}]{required_text}"
|
||||
)
|
||||
|
||||
questions_text = "\n".join(questions_summary) if questions_summary else " No questions found"
|
||||
items_text = (
|
||||
json.dumps(serialized_items, indent=2)
|
||||
if serialized_items
|
||||
else "[]"
|
||||
)
|
||||
|
||||
result = f"""Form Details for {user_google_email}:
|
||||
@@ -113,7 +219,9 @@ async def get_form(service, user_google_email: str, form_id: str) -> str:
|
||||
- Edit URL: {edit_url}
|
||||
- Responder URL: {responder_url}
|
||||
- Questions ({len(items)} total):
|
||||
{questions_text}"""
|
||||
{questions_text}
|
||||
- Items (structured):
|
||||
{items_text}"""
|
||||
|
||||
logger.info(f"Successfully retrieved form for {user_google_email}. ID: {form_id}")
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user