convert to streamable http

This commit is contained in:
Taylor Wilsdon
2025-05-11 10:07:37 -04:00
parent e8c05af5d8
commit 6e94512ca3
5 changed files with 353 additions and 972 deletions

View File

@@ -8,20 +8,23 @@ import logging
import asyncio
import os
import sys
from typing import List, Optional, Dict
from typing import List, Optional, Dict, Any
# Import MCP types for proper response formatting
from mcp import types
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Use functions directly from google_auth
from auth.google_auth import get_credentials, start_auth_flow, handle_auth_callback
from auth.google_auth import get_credentials, handle_auth_callback
# Configure module logger
logger = logging.getLogger(__name__)
# Import the server directly (will be initialized before this module is imported)
from core.server import server
from core.server import server, OAUTH_REDIRECT_URI
# Define Google Calendar API Scopes
CALENDAR_READONLY_SCOPE = "https://www.googleapis.com/auth/calendar.readonly"
@@ -34,101 +37,79 @@ _client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRETS")
if _client_secrets_env:
CONFIG_CLIENT_SECRETS_PATH = _client_secrets_env # User provided, assume correct path
else:
# Default to client_secret.json in the same directory as this script (gcalendar_tools.py)
_current_dir = os.path.dirname(os.path.abspath(__file__))
CONFIG_CLIENT_SECRETS_PATH = os.path.join(_current_dir, "client_secret.json")
# Default to client_secret.json in the root directory
CONFIG_CLIENT_SECRETS_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'client_secret.json')
CONFIG_PORT = int(os.getenv("OAUTH_CALLBACK_PORT", 8080))
CONFIG_REDIRECT_URI = os.getenv("OAUTH_REDIRECT_URI", f"http://localhost:{CONFIG_PORT}/callback")
# Use MCP server's OAuth callback endpoint
CONFIG_REDIRECT_URI = OAUTH_REDIRECT_URI
# ---
async def _initiate_auth_and_get_message(user_id: str, scopes: List[str]) -> Dict:
async def _initiate_auth_and_get_message(user_id: str, scopes: List[str]) -> types.CallToolResult:
"""
Initiates the Google OAuth flow and returns a message for the user.
Handles the callback internally to exchange the code for tokens.
Returns a standard envelope with status and data/error.
Uses the MCP server's OAuth resource endpoint for callback handling.
Returns a CallToolResult with appropriate content types.
"""
logger.info(f"Initiating auth for user '{user_id}' with scopes: {scopes}")
# This inner function is called by OAuthCallbackServer (via start_auth_flow) with code and state
def _handle_redirect_for_token_exchange(received_code: str, received_state: str):
# This function runs in the OAuthCallbackServer's thread.
# It needs access to user_id, scopes, CONFIG_CLIENT_SECRETS_PATH, CONFIG_REDIRECT_URI
# These are available via closure from the _initiate_auth_and_get_message call.
current_user_id_for_flow = user_id # Capture user_id for this specific flow instance
flow_scopes = scopes # Capture scopes for this specific flow instance
logger.info(f"OAuth callback received for user '{current_user_id_for_flow}', state '{received_state}'. Exchanging code.")
try:
full_auth_response_url = f"{CONFIG_REDIRECT_URI}?code={received_code}&state={received_state}"
authenticated_user_email, credentials = handle_auth_callback(
client_secrets_path=CONFIG_CLIENT_SECRETS_PATH,
scopes=flow_scopes, # Crucial: these must be the scopes used for auth_url generation
authorization_response=full_auth_response_url,
redirect_uri=CONFIG_REDIRECT_URI
)
# Credentials are saved by handle_auth_callback under the email
# Update our reference to use the email as user_id for consistency
if current_user_id_for_flow == 'default':
current_user_id_for_flow = authenticated_user_email
logger.info(f"Updated user_id from 'default' to {authenticated_user_email}")
logger.info(f"Successfully exchanged token and saved credentials for {authenticated_user_email} (flow initiated for '{current_user_id_for_flow}').")
except Exception as e:
logger.error(f"Error during token exchange for user '{current_user_id_for_flow}', state '{received_state}': {e}", exc_info=True)
# Optionally, could signal error if a wait mechanism was in place.
try:
# Ensure the callback function uses the specific user_id and scopes for *this* auth attempt
# by defining it within this scope or ensuring it has access to them.
# The current closure approach for _handle_redirect_for_token_exchange handles this.
auth_url, state = await asyncio.to_thread(
start_auth_flow, # This is now the function from auth.google_auth
client_secrets_path=CONFIG_CLIENT_SECRETS_PATH,
logger.info(f"[_initiate_auth_and_get_message] For user '{user_id}', initiating auth flow with scopes: {scopes}")
# Ensure OAUTHLIB_INSECURE_TRANSPORT is set
if 'OAUTHLIB_INSECURE_TRANSPORT' not in os.environ:
logger.warning("OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost development.")
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
# Set up the OAuth flow using the MCP server's callback endpoint
from google_auth_oauthlib.flow import Flow
flow = Flow.from_client_secrets_file(
CONFIG_CLIENT_SECRETS_PATH,
scopes=scopes,
redirect_uri=CONFIG_REDIRECT_URI,
auto_handle_callback=True, # Server starts, browser opens
callback_function=_handle_redirect_for_token_exchange, # Receives (code, state)
port=CONFIG_PORT
)
logger.info(f"Auth flow started for user '{user_id}'. State: {state}. Advise user to visit: {auth_url}")
message = (
f"ACTION REQUIRED for user '{user_id}':\n"
f"1. You must instruct them to visit this URL to authorize access: {auth_url}\n"
f"2. If you have the ability to trigger browser use, open the window automatically. Complete the authorization.\n"
f"3. After successful authorization, please **RETRY** your original command.\n\n"
f"(OAuth callback server is listening on port {CONFIG_PORT} for the redirect)."
redirect_uri=CONFIG_REDIRECT_URI
)
return {
"status": "ok",
"data": {
"auth_required": True,
"auth_url": auth_url,
"message": message
},
"error": None
}
# Generate the authorization URL with offline access and consent prompt
auth_url, state = flow.authorization_url(
access_type='offline',
prompt='consent'
)
logger.info(f"Auth flow started for user '{user_id}'. State: {state}. Advise user to visit: {auth_url}")
# Return MCP-formatted response with auth URL
return types.CallToolResult(
content=[
types.TextContent(
type="text",
text=f"ACTION REQUIRED for user '{user_id}':"
),
types.LinkContent(
type="link",
url=auth_url,
display_text="Click here to authorize Google Calendar access"
),
types.TextContent(
type="text",
text="After successful authorization, please RETRY your original command."
)
]
)
except Exception as e:
error_message = f"Could not initiate authentication for user '{user_id}'. {str(e)}"
logger.error(f"Failed to start the OAuth flow: {e}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "auth_initialization_error",
"message": error_message
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="auth_initialization_error",
message=error_message
)
]
)
# --- Tool Implementations ---
@server.tool()
async def start_auth(user_id: str) -> Dict:
async def start_auth(user_id: str) -> types.CallToolResult:
"""
Starts the Google OAuth authentication process.
The user will be prompted to visit a URL and then retry their command.
@@ -138,10 +119,7 @@ async def start_auth(user_id: str) -> Dict:
user_id (str): The user identifier to authenticate
Returns:
A JSON envelope with:
- status: "ok" or "error"
- data: Contains auth_required (bool), auth_url, and message if status is "ok"
- error: Error details if status is "error"
A CallToolResult with LinkContent for authentication URL and TextContent for instructions
"""
logger.info(f"Tool 'start_auth' invoked for user: {user_id}")
# Define desired scopes for general authentication, including userinfo
@@ -154,7 +132,7 @@ async def start_auth(user_id: str) -> Dict:
@server.tool()
async def list_calendars(user_id: str) -> Dict:
async def list_calendars(user_id: str) -> types.CallToolResult:
"""
Lists the Google Calendars the user has access to.
If not authenticated, prompts the user to authenticate and retry.
@@ -163,12 +141,9 @@ async def list_calendars(user_id: str) -> Dict:
user_id (str): The user identifier to list calendars for
Returns:
A JSON envelope with:
- status: "ok" or "error"
- data: Contains calendars list if status is "ok"
- error: Error details if status is "error"
A CallToolResult with either JsonContent containing calendars or ErrorContent
"""
logger.info(f"Attempting to list calendars for user: {user_id}")
logger.info(f"[list_calendars] Tool invoked for user: {user_id}") # ADDED LOG
required_scopes = [CALENDAR_READONLY_SCOPE]
# If user_id is 'default', try to find existing credentials
@@ -184,6 +159,7 @@ async def list_calendars(user_id: str) -> Dict:
break
try:
logger.info(f"[list_calendars] Attempting to get_credentials for user_id: '{user_id}' with scopes: {required_scopes}") # ADDED LOG
credentials = await asyncio.to_thread(
get_credentials,
user_id,
@@ -193,17 +169,18 @@ async def list_calendars(user_id: str) -> Dict:
logger.debug(f"get_credentials returned: {credentials}")
except Exception as e:
logger.error(f"Error getting credentials for {user_id}: {e}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "credential_error",
"message": f"Failed to get credentials: {e}. You might need to authenticate using the 'start_auth' tool."
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="credential_error",
message=f"Failed to get credentials: {e}. You might need to authenticate using the 'start_auth' tool."
)
]
)
if not credentials or not credentials.valid:
logger.warning(f"Missing or invalid credentials for user '{user_id}' for list_calendars. Initiating auth.")
logger.warning(f"[list_calendars] Missing or invalid credentials for user '{user_id}'. Initiating auth with scopes: {required_scopes}") # MODIFIED LOG
return await _initiate_auth_and_get_message(user_id, required_scopes)
try:
@@ -225,34 +202,37 @@ async def list_calendars(user_id: str) -> Dict:
logger.info(f"Successfully listed {len(items)} calendars for user: {user_id}")
return {
"status": "ok",
"data": {
"calendars": calendars
},
"error": None
}
return types.CallToolResult(
content=[
types.JsonContent(
type="json",
json={"calendars": calendars}
)
]
)
except HttpError as error:
logger.error(f"An API error occurred for user {user_id} listing calendars: {error}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "api_error",
"message": f"An API error occurred: {error}. You might need to re-authenticate using 'start_auth'."
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="api_error",
message=f"An API error occurred: {error}. You might need to re-authenticate using 'start_auth'."
)
]
)
except Exception as e:
logger.exception(f"An unexpected error occurred while listing calendars for {user_id}: {e}")
return {
"status": "error",
"data": None,
"error": {
"type": "unexpected_error",
"message": f"An unexpected error occurred: {e}"
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="unexpected_error",
message=f"An unexpected error occurred: {e}"
)
]
)
@server.tool()
async def get_events(
@@ -261,7 +241,7 @@ async def get_events(
time_min: Optional[str] = None,
time_max: Optional[str] = None,
max_results: int = 25,
) -> Dict:
) -> types.CallToolResult:
"""
Lists events from a specified Google Calendar within a given time range.
If not authenticated, prompts the user to authenticate and retry.
@@ -274,15 +254,13 @@ async def get_events(
max_results (int): Maximum number of events to return (default: 25)
Returns:
A JSON envelope with:
- status: "ok" or "error"
- data: Contains events list if status is "ok"
- error: Error details if status is "error"
A CallToolResult with either JsonContent containing events or ErrorContent
"""
logger.info(f"Attempting to get events for user: {user_id}, calendar: {calendar_id}")
logger.info(f"[get_events] Tool invoked for user: {user_id}, calendar: {calendar_id}") # ADDED LOG
required_scopes = [CALENDAR_READONLY_SCOPE]
try:
logger.info(f"[get_events] Attempting to get_credentials for user_id: '{user_id}' with scopes: {required_scopes}") # ADDED LOG
credentials = await asyncio.to_thread(
get_credentials,
user_id,
@@ -292,17 +270,18 @@ async def get_events(
logger.debug(f"get_credentials returned: {credentials}")
except Exception as e:
logger.error(f"Error getting credentials for {user_id}: {e}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "credential_error",
"message": f"Failed to get credentials: {e}. You might need to authenticate using the 'start_auth' tool."
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="credential_error",
message=f"Failed to get credentials: {e}. You might need to authenticate using the 'start_auth' tool."
)
]
)
if not credentials or not credentials.valid:
logger.warning(f"Missing or invalid credentials for user '{user_id}' for get_events. Initiating auth.")
logger.warning(f"[get_events] Missing or invalid credentials for user '{user_id}'. Initiating auth with scopes: {required_scopes}") # MODIFIED LOG
return await _initiate_auth_and_get_message(user_id, required_scopes)
try:
@@ -340,36 +319,41 @@ async def get_events(
logger.info(f"Successfully retrieved {len(events)} events for user: {user_id}, calendar: {calendar_id}")
return {
"status": "ok",
"data": {
"calendar_id": calendar_id,
"events": parsed_events,
"event_count": len(parsed_events)
},
"error": None
}
return types.CallToolResult(
content=[
types.JsonContent(
type="json",
json={
"calendar_id": calendar_id,
"events": parsed_events,
"event_count": len(parsed_events)
}
)
]
)
except HttpError as error:
logger.error(f"An API error occurred for user {user_id} getting events: {error}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "api_error",
"message": f"An API error occurred while fetching events: {error}. You might need to re-authenticate using 'start_auth'."
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="api_error",
message=f"An API error occurred while fetching events: {error}. You might need to re-authenticate using 'start_auth'."
)
]
)
except Exception as e:
logger.exception(f"An unexpected error occurred while getting events for {user_id}: {e}")
return {
"status": "error",
"data": None,
"error": {
"type": "unexpected_error",
"message": f"An unexpected error occurred: {e}"
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="unexpected_error",
message=f"An unexpected error occurred: {e}"
)
]
)
@server.tool()
@@ -383,7 +367,7 @@ async def create_event(
location: Optional[str] = None,
attendees: Optional[List[str]] = None,
timezone: Optional[str] = None,
) -> Dict:
) -> types.CallToolResult:
"""
Creates a new event in a specified Google Calendar.
If not authenticated, prompts the user to authenticate and retry.
@@ -400,10 +384,7 @@ async def create_event(
timezone (Optional[str]): Timezone for the event
Returns:
A JSON envelope with:
- status: "ok" or "error"
- data: Contains created event details if status is "ok"
- error: Error details if status is "error"
A CallToolResult with either JsonContent containing created event details or ErrorContent
"""
logger.info(f"Attempting to create event for user: {user_id}, calendar: {calendar_id}")
required_scopes = [CALENDAR_EVENTS_SCOPE] # Write scope needed
@@ -418,14 +399,15 @@ async def create_event(
logger.debug(f"get_credentials returned: {credentials}")
except Exception as e:
logger.error(f"Error getting credentials for {user_id}: {e}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "credential_error",
"message": f"Failed to get credentials: {e}. You might need to authenticate using the 'start_auth' tool."
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="credential_error",
message=f"Failed to get credentials: {e}. You might need to authenticate using the 'start_auth' tool."
)
]
)
if not credentials or not credentials.valid:
logger.warning(f"Missing or invalid credentials for user '{user_id}' for create_event. Initiating auth.")
@@ -461,35 +443,44 @@ async def create_event(
logger.info(f"Successfully created event for user: {user_id}, event ID: {created_event['id']}")
return {
"status": "ok",
"data": {
"event_id": created_event['id'],
"html_link": created_event.get('htmlLink', ''),
"summary": created_event.get('summary', ''),
"calendar_id": calendar_id,
"created": created_event.get('created', '')
},
"error": None
}
return types.CallToolResult(
content=[
types.TextContent(
type="text",
text=f"Successfully created event '{created_event.get('summary', '')}'"
),
types.JsonContent(
type="json",
json={
"event_id": created_event['id'],
"html_link": created_event.get('htmlLink', ''),
"summary": created_event.get('summary', ''),
"calendar_id": calendar_id,
"created": created_event.get('created', '')
}
)
]
)
except HttpError as error:
logger.error(f"An API error occurred for user {user_id} creating event: {error}", exc_info=True)
return {
"status": "error",
"data": None,
"error": {
"type": "api_error",
"message": f"An API error occurred while creating the event: {error}. You might need to re-authenticate using 'start_auth'."
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="api_error",
message=f"An API error occurred while creating the event: {error}. You might need to re-authenticate using 'start_auth'."
)
]
)
except Exception as e:
logger.exception(f"An unexpected error occurred while creating event for {user_id}: {e}")
return {
"status": "error",
"data": None,
"error": {
"type": "unexpected_error",
"message": f"An unexpected error occurred: {e}"
}
}
return types.CallToolResult(
content=[
types.ErrorContent(
type="error",
error_type="unexpected_error",
message=f"An unexpected error occurred: {e}"
)
]
)