refac authentication handler

This commit is contained in:
Taylor Wilsdon
2025-06-06 18:51:34 -04:00
parent 100fb4399d
commit f143998ae7
7 changed files with 364 additions and 371 deletions

View File

@@ -13,7 +13,6 @@ from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from config.google_config import OAUTH_STATE_TO_SESSION_ID_MAP, SCOPES
from mcp import types
# Configure logging
logging.basicConfig(level=logging.INFO)
@@ -163,7 +162,7 @@ async def start_auth_flow(
user_google_email: Optional[str],
service_name: str, # e.g., "Google Calendar", "Gmail" for user messages
redirect_uri: str, # Added redirect_uri as a required parameter
) -> types.CallToolResult:
) -> str:
"""
Initiates the Google OAuth flow and returns an actionable message for the user.
@@ -174,7 +173,10 @@ async def start_auth_flow(
redirect_uri: The URI Google will redirect to after authorization.
Returns:
A CallToolResult with isError=True containing guidance for the LLM/user.
A formatted string containing guidance for the LLM/user.
Raises:
Exception: If the OAuth flow cannot be initiated.
"""
initial_email_provided = bool(user_google_email and user_google_email.strip() and user_google_email.lower() != 'default')
user_display_name = f"{service_name} for '{user_google_email}'" if initial_email_provided else service_name
@@ -222,20 +224,16 @@ async def start_auth_flow(
message_lines.append(f"2. After successful authorization{session_info_for_llm}, **retry their original command**.")
message_lines.append(f"\nThe application will use the new credentials. If '{user_google_email}' was provided, it must match the authenticated account.")
message = "\n".join(message_lines)
return "\n".join(message_lines)
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text=message)]
)
except FileNotFoundError as e:
error_text = f"OAuth client secrets file not found: {e}. Please ensure '{CONFIG_CLIENT_SECRETS_PATH}' is correctly configured."
logger.error(error_text, exc_info=True)
return types.CallToolResult(isError=True, content=[types.TextContent(type="text", text=error_text)])
raise Exception(error_text)
except Exception as e:
error_text = f"Could not initiate authentication for {user_display_name} due to an unexpected error: {str(e)}"
logger.error(f"Failed to start the OAuth flow for {user_display_name}: {e}", exc_info=True)
return types.CallToolResult(isError=True, content=[types.TextContent(type="text", text=error_text)])
raise Exception(error_text)
def handle_auth_callback(
client_secrets_path: str,
@@ -348,7 +346,6 @@ def get_credentials(
logger.debug(f"[get_credentials] Single-user mode: could not extract user email: {e}")
else:
credentials: Optional[Credentials] = None
loaded_from_session = False
# Session ID should be provided by the caller
if not session_id:
@@ -360,7 +357,6 @@ def get_credentials(
credentials = load_credentials_from_session(session_id)
if credentials:
logger.debug(f"[get_credentials] Loaded credentials from session for session_id '{session_id}'.")
loaded_from_session = True
if not credentials and user_google_email:
logger.debug(f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'.")
@@ -432,16 +428,23 @@ def get_user_info(credentials: Credentials) -> Optional[Dict[str, Any]]:
# --- Centralized Google Service Authentication ---
class GoogleAuthenticationError(Exception):
"""Exception raised when Google authentication is required or fails."""
def __init__(self, message: str, auth_url: Optional[str] = None):
super().__init__(message)
self.auth_url = auth_url
async def get_authenticated_google_service(
service_name: str, # "gmail", "calendar", "drive", "docs"
version: str, # "v1", "v3"
tool_name: str, # For logging/debugging
user_google_email: str, # Required - no more Optional
required_scopes: List[str],
) -> tuple[Any, str] | types.CallToolResult:
) -> tuple[Any, str]:
"""
Centralized Google service authentication for all MCP tools.
Returns (service, user_email) on success or CallToolResult on failure.
Returns (service, user_email) on success or raises GoogleAuthenticationError.
Args:
service_name: The Google service name ("gmail", "calendar", "drive", "docs")
@@ -451,7 +454,10 @@ async def get_authenticated_google_service(
required_scopes: List of required OAuth scopes
Returns:
tuple[service, user_email] on success, or CallToolResult on auth failure
tuple[service, user_email] on success
Raises:
GoogleAuthenticationError: When authentication is required or fails
"""
logger.info(
f"[{tool_name}] Attempting to get authenticated {service_name} service. Email: '{user_google_email}'"
@@ -461,9 +467,7 @@ async def get_authenticated_google_service(
if not user_google_email or "@" not in user_google_email:
error_msg = f"Authentication required for {tool_name}. No valid 'user_google_email' provided. Please provide a valid Google email address."
logger.info(f"[{tool_name}] {error_msg}")
return types.CallToolResult(
isError=True, content=[types.TextContent(type="text", text=error_msg)]
)
raise GoogleAuthenticationError(error_msg)
credentials = await asyncio.to_thread(
get_credentials,
@@ -484,13 +488,16 @@ async def get_authenticated_google_service(
# Import here to avoid circular import
from core.server import OAUTH_REDIRECT_URI
# This call will return a CallToolResult which should be propagated
return await start_auth_flow(
# Generate auth URL and raise exception with it
auth_response = await start_auth_flow(
mcp_session_id=None, # No longer using session-based auth
user_google_email=user_google_email,
service_name=f"Google {service_name.title()}",
redirect_uri=OAUTH_REDIRECT_URI,
)
# Extract the auth URL from the response and raise with it
raise GoogleAuthenticationError(auth_response)
try:
service = build(service_name, version, credentials=credentials)
@@ -515,19 +522,6 @@ async def get_authenticated_google_service(
except Exception as e:
error_msg = f"[{tool_name}] Failed to build {service_name} service: {str(e)}"
logger.error(error_msg, exc_info=True)
return types.CallToolResult(
isError=True, content=[types.TextContent(type="text", text=error_msg)]
)
raise GoogleAuthenticationError(error_msg)
# Example Usage (Illustrative - not meant to be run directly without context)
if __name__ == '__main__':
# This block is for demonstration/testing purposes only.
# Replace with actual paths and logic in your application.
_CLIENT_SECRETS_FILE = 'path/to/your/client_secrets.json' # IMPORTANT: Replace this
_SCOPES = ['https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/calendar.readonly']
_TEST_USER_ID = 'test.user@example.com' # Example user
# --- Flow Initiation Example ---
# In a real app, this URL would be presented to the user.
# try: