diff --git a/auth/google_auth.py b/auth/google_auth.py index 81680a5..b656a8f 100644 --- a/auth/google_auth.py +++ b/auth/google_auth.py @@ -19,27 +19,48 @@ from auth.scopes import OAUTH_STATE_TO_SESSION_ID_MAP, SCOPES logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + # Constants -DEFAULT_CREDENTIALS_DIR = ".credentials" +def get_default_credentials_dir(): + """Get the default credentials directory path, preferring user-specific locations.""" + # Check for explicit environment variable override + if os.getenv("GOOGLE_MCP_CREDENTIALS_DIR"): + return os.getenv("GOOGLE_MCP_CREDENTIALS_DIR") + + # Use user home directory for credentials storage + home_dir = os.path.expanduser("~") + if home_dir and home_dir != "~": # Valid home directory found + return os.path.join(home_dir, ".google_workspace_mcp", "credentials") + + # Fallback to current working directory if home directory is not accessible + return os.path.join(os.getcwd(), ".credentials") + + +DEFAULT_CREDENTIALS_DIR = get_default_credentials_dir() # In-memory cache for session credentials, maps session_id to Credentials object # This is brittle and bad, but our options are limited with Claude in present state. # This should be more robust in a production system once OAuth2.1 is implemented in client. _SESSION_CREDENTIALS_CACHE: Dict[str, Credentials] = {} # Centralized Client Secrets Path Logic -_client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRET_PATH") or os.getenv("GOOGLE_CLIENT_SECRETS") +_client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRET_PATH") or os.getenv( + "GOOGLE_CLIENT_SECRETS" +) if _client_secrets_env: CONFIG_CLIENT_SECRETS_PATH = _client_secrets_env else: # Assumes this file is in auth/ and client_secret.json is in the root CONFIG_CLIENT_SECRETS_PATH = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), - 'client_secret.json' + "client_secret.json", ) # --- Helper Functions --- -def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[Credentials]: + +def _find_any_credentials( + base_dir: str = DEFAULT_CREDENTIALS_DIR, +) -> Optional[Credentials]: """ Find and load any valid credentials from the credentials directory. Used in single-user mode to bypass session-to-OAuth mapping. @@ -53,104 +74,135 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C # Scan for any .json credential files for filename in os.listdir(base_dir): - if filename.endswith('.json'): + if filename.endswith(".json"): filepath = os.path.join(base_dir, filename) try: - with open(filepath, 'r') as f: + with open(filepath, "r") as f: creds_data = json.load(f) credentials = Credentials( - token=creds_data.get('token'), - refresh_token=creds_data.get('refresh_token'), - token_uri=creds_data.get('token_uri'), - client_id=creds_data.get('client_id'), - client_secret=creds_data.get('client_secret'), - scopes=creds_data.get('scopes') + token=creds_data.get("token"), + refresh_token=creds_data.get("refresh_token"), + token_uri=creds_data.get("token_uri"), + client_id=creds_data.get("client_id"), + client_secret=creds_data.get("client_secret"), + scopes=creds_data.get("scopes"), ) logger.info(f"[single-user] Found credentials in {filepath}") return credentials except (IOError, json.JSONDecodeError, KeyError) as e: - logger.warning(f"[single-user] Error loading credentials from {filepath}: {e}") + logger.warning( + f"[single-user] Error loading credentials from {filepath}: {e}" + ) continue logger.info(f"[single-user] No valid credentials found in {base_dir}") return None -def _get_user_credential_path(user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR) -> str: + +def _get_user_credential_path( + user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR +) -> str: """Constructs the path to a user's credential file.""" if not os.path.exists(base_dir): os.makedirs(base_dir) logger.info(f"Created credentials directory: {base_dir}") return os.path.join(base_dir, f"{user_google_email}.json") -def save_credentials_to_file(user_google_email: str, credentials: Credentials, base_dir: str = DEFAULT_CREDENTIALS_DIR): + +def save_credentials_to_file( + user_google_email: str, + credentials: Credentials, + base_dir: str = DEFAULT_CREDENTIALS_DIR, +): """Saves user credentials to a file.""" creds_path = _get_user_credential_path(user_google_email, base_dir) creds_data = { - 'token': credentials.token, - 'refresh_token': credentials.refresh_token, - 'token_uri': credentials.token_uri, - 'client_id': credentials.client_id, - 'client_secret': credentials.client_secret, - 'scopes': credentials.scopes, - 'expiry': credentials.expiry.isoformat() if credentials.expiry else None + "token": credentials.token, + "refresh_token": credentials.refresh_token, + "token_uri": credentials.token_uri, + "client_id": credentials.client_id, + "client_secret": credentials.client_secret, + "scopes": credentials.scopes, + "expiry": credentials.expiry.isoformat() if credentials.expiry else None, } try: - with open(creds_path, 'w') as f: + with open(creds_path, "w") as f: json.dump(creds_data, f) logger.info(f"Credentials saved for user {user_google_email} to {creds_path}") except IOError as e: - logger.error(f"Error saving credentials for user {user_google_email} to {creds_path}: {e}") + logger.error( + f"Error saving credentials for user {user_google_email} to {creds_path}: {e}" + ) raise + def save_credentials_to_session(session_id: str, credentials: Credentials): """Saves user credentials to the in-memory session cache.""" _SESSION_CREDENTIALS_CACHE[session_id] = credentials logger.debug(f"Credentials saved to session cache for session_id: {session_id}") -def load_credentials_from_file(user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[Credentials]: + +def load_credentials_from_file( + user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR +) -> Optional[Credentials]: """Loads user credentials from a file.""" creds_path = _get_user_credential_path(user_google_email, base_dir) if not os.path.exists(creds_path): - logger.info(f"No credentials file found for user {user_google_email} at {creds_path}") + logger.info( + f"No credentials file found for user {user_google_email} at {creds_path}" + ) return None try: - with open(creds_path, 'r') as f: + with open(creds_path, "r") as f: creds_data = json.load(f) # Parse expiry if present expiry = None - if creds_data.get('expiry'): + if creds_data.get("expiry"): try: from datetime import datetime - expiry = datetime.fromisoformat(creds_data['expiry']) + + expiry = datetime.fromisoformat(creds_data["expiry"]) except (ValueError, TypeError) as e: - logger.warning(f"Could not parse expiry time for {user_google_email}: {e}") + logger.warning( + f"Could not parse expiry time for {user_google_email}: {e}" + ) credentials = Credentials( - token=creds_data.get('token'), - refresh_token=creds_data.get('refresh_token'), - token_uri=creds_data.get('token_uri'), - client_id=creds_data.get('client_id'), - client_secret=creds_data.get('client_secret'), - scopes=creds_data.get('scopes'), - expiry=expiry + token=creds_data.get("token"), + refresh_token=creds_data.get("refresh_token"), + token_uri=creds_data.get("token_uri"), + client_id=creds_data.get("client_id"), + client_secret=creds_data.get("client_secret"), + scopes=creds_data.get("scopes"), + expiry=expiry, + ) + logger.debug( + f"Credentials loaded for user {user_google_email} from {creds_path}" ) - logger.debug(f"Credentials loaded for user {user_google_email} from {creds_path}") return credentials except (IOError, json.JSONDecodeError, KeyError) as e: - logger.error(f"Error loading or parsing credentials for user {user_google_email} from {creds_path}: {e}") + logger.error( + f"Error loading or parsing credentials for user {user_google_email} from {creds_path}: {e}" + ) return None + def load_credentials_from_session(session_id: str) -> Optional[Credentials]: """Loads user credentials from the in-memory session cache.""" credentials = _SESSION_CREDENTIALS_CACHE.get(session_id) if credentials: - logger.debug(f"Credentials loaded from session cache for session_id: {session_id}") + logger.debug( + f"Credentials loaded from session cache for session_id: {session_id}" + ) else: - logger.debug(f"No credentials found in session cache for session_id: {session_id}") + logger.debug( + f"No credentials found in session cache for session_id: {session_id}" + ) return credentials + def load_client_secrets_from_env() -> Optional[Dict[str, Any]]: """ Loads the client secrets from environment variables. @@ -175,7 +227,7 @@ def load_client_secrets_from_env() -> Optional[Dict[str, Any]]: "client_secret": client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs" + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", } # Add redirect_uri if provided via environment variable @@ -191,6 +243,7 @@ def load_client_secrets_from_env() -> Optional[Dict[str, Any]]: logger.debug("OAuth client credentials not found in environment variables") return None + def load_client_secrets(client_secrets_path: str) -> Dict[str, Any]: """ Loads the client secrets from environment variables (preferred) or from the client secrets file. @@ -217,21 +270,29 @@ def load_client_secrets(client_secrets_path: str) -> Dict[str, Any]: # Fall back to loading from file try: - with open(client_secrets_path, 'r') as f: + with open(client_secrets_path, "r") as f: client_config = json.load(f) # The file usually contains a top-level key like "web" or "installed" if "web" in client_config: - logger.info(f"Loaded OAuth client credentials from file: {client_secrets_path}") + logger.info( + f"Loaded OAuth client credentials from file: {client_secrets_path}" + ) return client_config["web"] elif "installed" in client_config: - logger.info(f"Loaded OAuth client credentials from file: {client_secrets_path}") + logger.info( + f"Loaded OAuth client credentials from file: {client_secrets_path}" + ) return client_config["installed"] else: - logger.error(f"Client secrets file {client_secrets_path} has unexpected format.") - raise ValueError("Invalid client secrets file format") + logger.error( + f"Client secrets file {client_secrets_path} has unexpected format." + ) + raise ValueError("Invalid client secrets file format") except (IOError, json.JSONDecodeError) as e: logger.error(f"Error loading client secrets file {client_secrets_path}: {e}") raise + + def check_client_secrets() -> Optional[str]: """ Checks for the presence of OAuth client secrets, either as environment @@ -242,45 +303,53 @@ def check_client_secrets() -> Optional[str]: """ env_config = load_client_secrets_from_env() if not env_config and not os.path.exists(CONFIG_CLIENT_SECRETS_PATH): - logger.error(f"OAuth client credentials not found. No environment variables set and no file at {CONFIG_CLIENT_SECRETS_PATH}") + logger.error( + f"OAuth client credentials not found. No environment variables set and no file at {CONFIG_CLIENT_SECRETS_PATH}" + ) return f"OAuth client credentials not found. Please set GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables or provide a client secrets file at {CONFIG_CLIENT_SECRETS_PATH}." return None -def create_oauth_flow(scopes: List[str], redirect_uri: str, state: Optional[str] = None) -> Flow: + +def create_oauth_flow( + scopes: List[str], redirect_uri: str, state: Optional[str] = None +) -> Flow: """Creates an OAuth flow using environment variables or client secrets file.""" # Try environment variables first env_config = load_client_secrets_from_env() if env_config: # Use client config directly flow = Flow.from_client_config( - env_config, - scopes=scopes, - redirect_uri=redirect_uri, - state=state + env_config, scopes=scopes, redirect_uri=redirect_uri, state=state ) logger.debug("Created OAuth flow from environment variables") return flow # Fall back to file-based config if not os.path.exists(CONFIG_CLIENT_SECRETS_PATH): - raise FileNotFoundError(f"OAuth client secrets file not found at {CONFIG_CLIENT_SECRETS_PATH} and no environment variables set") + raise FileNotFoundError( + f"OAuth client secrets file not found at {CONFIG_CLIENT_SECRETS_PATH} and no environment variables set" + ) flow = Flow.from_client_secrets_file( CONFIG_CLIENT_SECRETS_PATH, scopes=scopes, redirect_uri=redirect_uri, - state=state + state=state, + ) + logger.debug( + f"Created OAuth flow from client secrets file: {CONFIG_CLIENT_SECRETS_PATH}" ) - logger.debug(f"Created OAuth flow from client secrets file: {CONFIG_CLIENT_SECRETS_PATH}") return flow + # --- Core OAuth Logic --- + async def start_auth_flow( mcp_session_id: Optional[str], user_google_email: Optional[str], - service_name: str, # e.g., "Google Calendar", "Gmail" for user messages - redirect_uri: str, # Added redirect_uri as a required parameter + service_name: str, # e.g., "Google Calendar", "Gmail" for user messages + redirect_uri: str, # Added redirect_uri as a required parameter ) -> str: """ Initiates the Google OAuth flow and returns an actionable message for the user. @@ -297,29 +366,47 @@ async def start_auth_flow( Raises: Exception: If the OAuth flow cannot be initiated. """ - initial_email_provided = bool(user_google_email and user_google_email.strip() and user_google_email.lower() != 'default') - user_display_name = f"{service_name} for '{user_google_email}'" if initial_email_provided else service_name + initial_email_provided = bool( + user_google_email + and user_google_email.strip() + and user_google_email.lower() != "default" + ) + user_display_name = ( + f"{service_name} for '{user_google_email}'" + if initial_email_provided + else service_name + ) - logger.info(f"[start_auth_flow] Initiating auth for {user_display_name} (session: {mcp_session_id}) with global SCOPES.") + logger.info( + f"[start_auth_flow] Initiating auth for {user_display_name} (session: {mcp_session_id}) with global SCOPES." + ) try: - if 'OAUTHLIB_INSECURE_TRANSPORT' not in os.environ and ("localhost" in redirect_uri or "127.0.0.1" in redirect_uri): # Use passed redirect_uri - logger.warning("OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost/local development.") - os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' + if "OAUTHLIB_INSECURE_TRANSPORT" not in os.environ and ( + "localhost" in redirect_uri or "127.0.0.1" in redirect_uri + ): # Use passed redirect_uri + logger.warning( + "OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost/local development." + ) + os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" oauth_state = os.urandom(16).hex() if mcp_session_id: OAUTH_STATE_TO_SESSION_ID_MAP[oauth_state] = mcp_session_id - logger.info(f"[start_auth_flow] Stored mcp_session_id '{mcp_session_id}' for oauth_state '{oauth_state}'.") + logger.info( + f"[start_auth_flow] Stored mcp_session_id '{mcp_session_id}' for oauth_state '{oauth_state}'." + ) flow = create_oauth_flow( - scopes=SCOPES, # Use global SCOPES - redirect_uri=redirect_uri, # Use passed redirect_uri - state=oauth_state + scopes=SCOPES, # Use global SCOPES + redirect_uri=redirect_uri, # Use passed redirect_uri + state=oauth_state, ) - auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent') - logger.info(f"Auth flow started for {user_display_name}. State: {oauth_state}. Advise user to visit: {auth_url}") + auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent") + logger.info( + f"Auth flow started for {user_display_name}. State: {oauth_state}. Advise user to visit: {auth_url}" + ) message_lines = [ f"**ACTION REQUIRED: Google Authentication Needed for {user_display_name}**\n", @@ -330,18 +417,28 @@ async def start_auth_flow( "**LLM, after presenting the link, instruct the user as follows:**", "1. Click the link and complete the authorization in their browser.", ] - session_info_for_llm = f" (this will link to your current session {mcp_session_id})" if mcp_session_id else "" + session_info_for_llm = ( + f" (this will link to your current session {mcp_session_id})" + if mcp_session_id + else "" + ) if not initial_email_provided: - message_lines.extend([ - f"2. After successful authorization{session_info_for_llm}, the browser page will display the authenticated email address.", - " **LLM: Instruct the user to provide you with this email address.**", - "3. Once you have the email, **retry their original command, ensuring you include this `user_google_email`.**" - ]) + message_lines.extend( + [ + f"2. After successful authorization{session_info_for_llm}, the browser page will display the authenticated email address.", + " **LLM: Instruct the user to provide you with this email address.**", + "3. Once you have the email, **retry their original command, ensuring you include this `user_google_email`.**", + ] + ) else: - message_lines.append(f"2. After successful authorization{session_info_for_llm}, **retry their original command**.") + message_lines.append( + f"2. After successful authorization{session_info_for_llm}, **retry their original command**." + ) - message_lines.append(f"\nThe application will use the new credentials. If '{user_google_email}' was provided, it must match the authenticated account.") + message_lines.append( + f"\nThe application will use the new credentials. If '{user_google_email}' was provided, it must match the authenticated account." + ) return "\n".join(message_lines) except FileNotFoundError as e: @@ -350,16 +447,22 @@ async def start_auth_flow( raise Exception(error_text) except Exception as e: error_text = f"Could not initiate authentication for {user_display_name} due to an unexpected error: {str(e)}" - logger.error(f"Failed to start the OAuth flow for {user_display_name}: {e}", exc_info=True) + logger.error( + f"Failed to start the OAuth flow for {user_display_name}: {e}", + exc_info=True, + ) raise Exception(error_text) + def handle_auth_callback( scopes: List[str], authorization_response: str, redirect_uri: str, credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR, session_id: Optional[str] = None, - client_secrets_path: Optional[str] = None # Deprecated: kept for backward compatibility + client_secrets_path: Optional[ + str + ] = None, # Deprecated: kept for backward compatibility ) -> Tuple[str, Credentials]: """ Handles the callback from Google, exchanges the code for credentials, @@ -385,17 +488,18 @@ def handle_auth_callback( try: # Log deprecation warning if old parameter is used if client_secrets_path: - logger.warning("The 'client_secrets_path' parameter is deprecated. Use GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables instead.") + logger.warning( + "The 'client_secrets_path' parameter is deprecated. Use GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables instead." + ) # Allow HTTP for localhost in development - if 'OAUTHLIB_INSECURE_TRANSPORT' not in os.environ: - logger.warning("OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost development.") - os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' + if "OAUTHLIB_INSECURE_TRANSPORT" not in os.environ: + logger.warning( + "OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost development." + ) + os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" - flow = create_oauth_flow( - scopes=scopes, - redirect_uri=redirect_uri - ) + flow = create_oauth_flow(scopes=scopes, redirect_uri=redirect_uri) # Exchange the authorization code for credentials # Note: fetch_token will use the redirect_uri configured in the flow @@ -405,11 +509,11 @@ def handle_auth_callback( # Get user info to determine user_id (using email here) user_info = get_user_info(credentials) - if not user_info or 'email' not in user_info: - logger.error("Could not retrieve user email from Google.") - raise ValueError("Failed to get user email for identification.") + if not user_info or "email" not in user_info: + logger.error("Could not retrieve user email from Google.") + raise ValueError("Failed to get user email for identification.") - user_google_email = user_info['email'] + user_google_email = user_info["email"] logger.info(f"Identified user_google_email: {user_google_email}") # Save the credentials to file @@ -421,16 +525,17 @@ def handle_auth_callback( return user_google_email, credentials - except Exception as e: # Catch specific exceptions like FlowExchangeError if needed + except Exception as e: # Catch specific exceptions like FlowExchangeError if needed logger.error(f"Error handling auth callback: {e}") - raise # Re-raise for the caller + raise # Re-raise for the caller + def get_credentials( - user_google_email: Optional[str], # Can be None if relying on session_id + user_google_email: Optional[str], # Can be None if relying on session_id required_scopes: List[str], client_secrets_path: Optional[str] = None, credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR, - session_id: Optional[str] = None + session_id: Optional[str] = None, ) -> Optional[Credentials]: """ Retrieves stored credentials, prioritizing session, then file. Refreshes if necessary. @@ -448,11 +553,15 @@ def get_credentials( Valid Credentials object or None. """ # Check for single-user mode - if os.getenv('MCP_SINGLE_USER_MODE') == '1': - logger.info(f"[get_credentials] Single-user mode: bypassing session mapping, finding any credentials") + if os.getenv("MCP_SINGLE_USER_MODE") == "1": + logger.info( + f"[get_credentials] Single-user mode: bypassing session mapping, finding any credentials" + ) credentials = _find_any_credentials(credentials_base_dir) if not credentials: - logger.info(f"[get_credentials] Single-user mode: No credentials found in {credentials_base_dir}") + logger.info( + f"[get_credentials] Single-user mode: No credentials found in {credentials_base_dir}" + ) return None # In single-user mode, if user_google_email wasn't provided, try to get it from user info @@ -460,11 +569,15 @@ def get_credentials( if not user_google_email and credentials.valid: try: user_info = get_user_info(credentials) - if user_info and 'email' in user_info: - user_google_email = user_info['email'] - logger.debug(f"[get_credentials] Single-user mode: extracted user email {user_google_email} from credentials") + if user_info and "email" in user_info: + user_google_email = user_info["email"] + logger.debug( + f"[get_credentials] Single-user mode: extracted user email {user_google_email} from credentials" + ) except Exception as e: - logger.debug(f"[get_credentials] Single-user mode: could not extract user email: {e}") + logger.debug( + f"[get_credentials] Single-user mode: could not extract user email: {e}" + ) else: credentials: Optional[Credentials] = None @@ -472,61 +585,100 @@ def get_credentials( if not session_id: logger.debug("[get_credentials] No session_id provided") - logger.debug(f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}") + logger.debug( + f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}" + ) if session_id: credentials = load_credentials_from_session(session_id) if credentials: - logger.debug(f"[get_credentials] Loaded credentials from session for session_id '{session_id}'.") + logger.debug( + f"[get_credentials] Loaded credentials from session for session_id '{session_id}'." + ) if not credentials and user_google_email: - logger.debug(f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'.") - credentials = load_credentials_from_file(user_google_email, credentials_base_dir) + logger.debug( + f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'." + ) + credentials = load_credentials_from_file( + user_google_email, credentials_base_dir + ) if credentials and session_id: - logger.debug(f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'.") - save_credentials_to_session(session_id, credentials) # Cache for current session + logger.debug( + f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'." + ) + save_credentials_to_session( + session_id, credentials + ) # Cache for current session if not credentials: - logger.info(f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'.") + logger.info( + f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'." + ) return None - logger.debug(f"[get_credentials] Credentials found. Scopes: {credentials.scopes}, Valid: {credentials.valid}, Expired: {credentials.expired}") + logger.debug( + f"[get_credentials] Credentials found. Scopes: {credentials.scopes}, Valid: {credentials.valid}, Expired: {credentials.expired}" + ) if not all(scope in credentials.scopes for scope in required_scopes): - logger.warning(f"[get_credentials] Credentials lack required scopes. Need: {required_scopes}, Have: {credentials.scopes}. User: '{user_google_email}', Session: '{session_id}'") - return None # Re-authentication needed for scopes + logger.warning( + f"[get_credentials] Credentials lack required scopes. Need: {required_scopes}, Have: {credentials.scopes}. User: '{user_google_email}', Session: '{session_id}'" + ) + return None # Re-authentication needed for scopes - logger.debug(f"[get_credentials] Credentials have sufficient scopes. User: '{user_google_email}', Session: '{session_id}'") + logger.debug( + f"[get_credentials] Credentials have sufficient scopes. User: '{user_google_email}', Session: '{session_id}'" + ) if credentials.valid: - logger.debug(f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'") + logger.debug( + f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'" + ) return credentials elif credentials.expired and credentials.refresh_token: - logger.info(f"[get_credentials] Credentials expired. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'") + logger.info( + f"[get_credentials] Credentials expired. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'" + ) if not client_secrets_path: - logger.error("[get_credentials] Client secrets path required for refresh but not provided.") - return None + logger.error( + "[get_credentials] Client secrets path required for refresh but not provided." + ) + return None try: - logger.debug(f"[get_credentials] Refreshing token using client_secrets_path: {client_secrets_path}") + logger.debug( + f"[get_credentials] Refreshing token using client_secrets_path: {client_secrets_path}" + ) # client_config = load_client_secrets(client_secrets_path) # Not strictly needed if creds have client_id/secret credentials.refresh(Request()) - logger.info(f"[get_credentials] Credentials refreshed successfully. User: '{user_google_email}', Session: '{session_id}'") + logger.info( + f"[get_credentials] Credentials refreshed successfully. User: '{user_google_email}', Session: '{session_id}'" + ) # Save refreshed credentials - if user_google_email: # Always save to file if email is known - save_credentials_to_file(user_google_email, credentials, credentials_base_dir) - if session_id: # Update session cache if it was the source or is active + if user_google_email: # Always save to file if email is known + save_credentials_to_file( + user_google_email, credentials, credentials_base_dir + ) + if session_id: # Update session cache if it was the source or is active save_credentials_to_session(session_id, credentials) return credentials except RefreshError as e: - logger.warning(f"[get_credentials] RefreshError - token expired/revoked: {e}. User: '{user_google_email}', Session: '{session_id}'") + logger.warning( + f"[get_credentials] RefreshError - token expired/revoked: {e}. User: '{user_google_email}', Session: '{session_id}'" + ) # For RefreshError, we should return None to trigger reauthentication return None except Exception as e: - logger.error(f"[get_credentials] Error refreshing credentials: {e}. User: '{user_google_email}', Session: '{session_id}'", exc_info=True) - return None # Failed to refresh + logger.error( + f"[get_credentials] Error refreshing credentials: {e}. User: '{user_google_email}', Session: '{session_id}'", + exc_info=True, + ) + return None # Failed to refresh else: - logger.warning(f"[get_credentials] Credentials invalid/cannot refresh. Valid: {credentials.valid}, Refresh Token: {credentials.refresh_token is not None}. User: '{user_google_email}', Session: '{session_id}'") + logger.warning( + f"[get_credentials] Credentials invalid/cannot refresh. Valid: {credentials.valid}, Refresh Token: {credentials.refresh_token is not None}. User: '{user_google_email}', Session: '{session_id}'" + ) return None @@ -538,7 +690,7 @@ def get_user_info(credentials: Credentials) -> Optional[Dict[str, Any]]: try: # Using googleapiclient discovery to get user info # Requires 'google-api-python-client' library - service = build('oauth2', 'v2', credentials=credentials) + service = build("oauth2", "v2", credentials=credentials) user_info = service.userinfo().get().execute() logger.info(f"Successfully fetched user info: {user_info.get('email')}") return user_info @@ -553,18 +705,20 @@ def get_user_info(credentials: Credentials) -> Optional[Dict[str, Any]]: # --- Centralized Google Service Authentication --- + class GoogleAuthenticationError(Exception): """Exception raised when Google authentication is required or fails.""" + def __init__(self, message: str, auth_url: Optional[str] = None): super().__init__(message) self.auth_url = auth_url async def get_authenticated_google_service( - service_name: str, # "gmail", "calendar", "drive", "docs" - version: str, # "v1", "v3" - tool_name: str, # For logging/debugging - user_google_email: str, # Required - no more Optional + service_name: str, # "gmail", "calendar", "drive", "docs" + version: str, # "v1", "v3" + tool_name: str, # For logging/debugging + user_google_email: str, # Required - no more Optional required_scopes: List[str], ) -> tuple[Any, str]: """ @@ -602,7 +756,6 @@ async def get_authenticated_google_service( session_id=None, # Session ID not available in service layer ) - if not credentials or not credentials.valid: logger.warning( f"[{tool_name}] No valid credentials. Email: '{user_google_email}'." @@ -637,8 +790,11 @@ async def get_authenticated_google_service( if credentials and credentials.id_token: try: import jwt + # Decode without verification (just to get email for logging) - decoded_token = jwt.decode(credentials.id_token, options={"verify_signature": False}) + decoded_token = jwt.decode( + credentials.id_token, options={"verify_signature": False} + ) token_email = decoded_token.get("email") if token_email: log_user_email = token_email @@ -646,12 +802,12 @@ async def get_authenticated_google_service( except Exception as e: logger.debug(f"[{tool_name}] Could not decode id_token: {e}") - logger.info(f"[{tool_name}] Successfully authenticated {service_name} service for user: {log_user_email}") + logger.info( + f"[{tool_name}] Successfully authenticated {service_name} service for user: {log_user_email}" + ) return service, log_user_email except Exception as e: error_msg = f"[{tool_name}] Failed to build {service_name} service: {str(e)}" logger.error(error_msg, exc_info=True) raise GoogleAuthenticationError(error_msg) - - diff --git a/core/utils.py b/core/utils.py index fe1f7b4..0928278 100644 --- a/core/utils.py +++ b/core/utils.py @@ -8,17 +8,21 @@ from typing import List, Optional logger = logging.getLogger(__name__) -def check_credentials_directory_permissions(credentials_dir: str = ".credentials") -> None: +def check_credentials_directory_permissions(credentials_dir: str = None) -> None: """ Check if the service has appropriate permissions to create and write to the .credentials directory. Args: - credentials_dir: Path to the credentials directory (default: ".credentials") + credentials_dir: Path to the credentials directory (default: uses get_default_credentials_dir()) Raises: PermissionError: If the service lacks necessary permissions OSError: If there are other file system issues """ + if credentials_dir is None: + from auth.google_auth import get_default_credentials_dir + credentials_dir = get_default_credentials_dir() + try: # Check if directory exists if os.path.exists(credentials_dir): diff --git a/main.py b/main.py index adc8f10..d2e56e3 100644 --- a/main.py +++ b/main.py @@ -34,6 +34,13 @@ except Exception as e: sys.stderr.write(f"CRITICAL: Failed to set up file logging to '{log_file_path}': {e}\n") def safe_print(text): + # Don't print to stderr when running as MCP server via uvx to avoid JSON parsing errors + # Check if we're running as MCP server (no TTY and uvx in process name) + if not sys.stderr.isatty(): + # Running as MCP server, suppress output to avoid JSON parsing errors + logger.debug(f"[MCP Server] {text}") + return + try: print(text, file=sys.stderr) except UnicodeEncodeError: @@ -47,7 +54,7 @@ def main(): # Parse command line arguments parser = argparse.ArgumentParser(description='Google Workspace MCP Server') parser.add_argument('--single-user', action='store_true', - help='Run in single-user mode - bypass session mapping and use any credentials from ./credentials directory') + help='Run in single-user mode - bypass session mapping and use any credentials from the credentials directory') parser.add_argument('--tools', nargs='*', choices=['gmail', 'drive', 'calendar', 'docs', 'sheets', 'chat', 'forms', 'slides'], help='Specify which tools to register. If not provided, all tools are registered.') @@ -73,7 +80,7 @@ def main(): safe_print(f" 🔐 OAuth Callback: {base_uri}:{port}/oauth2callback") safe_print(f" 👤 Mode: {'Single-user' if args.single_user else 'Multi-user'}") safe_print(f" 🐍 Python: {sys.version.split()[0]}") - print(file=sys.stderr) + safe_print("") # Import tool modules to register them with the MCP server via decorators tool_imports = { @@ -104,29 +111,29 @@ def main(): for tool in tools_to_import: tool_imports[tool]() safe_print(f" {tool_icons[tool]} {tool.title()} - Google {tool.title()} API integration") - print(file=sys.stderr) + safe_print("") safe_print(f"📊 Configuration Summary:") safe_print(f" 🔧 Tools Enabled: {len(tools_to_import)}/{len(tool_imports)}") safe_print(f" 🔑 Auth Method: OAuth 2.0 with PKCE") safe_print(f" 📝 Log Level: {logging.getLogger().getEffectiveLevel()}") - print(file=sys.stderr) + safe_print("") # Set global single-user mode flag if args.single_user: os.environ['MCP_SINGLE_USER_MODE'] = '1' safe_print("🔐 Single-user mode enabled") - print(file=sys.stderr) + safe_print("") # Check credentials directory permissions before starting try: safe_print("🔍 Checking credentials directory permissions...") check_credentials_directory_permissions() safe_print("✅ Credentials directory permissions verified") - print(file=sys.stderr) + safe_print("") except (PermissionError, OSError) as e: safe_print(f"❌ Credentials directory permission check failed: {e}") - print(" Please ensure the service has write permissions to create/access the .credentials directory", file=sys.stderr) + safe_print(" Please ensure the service has write permissions to create/access the credentials directory") logger.error(f"Failed credentials directory permission check: {e}") sys.exit(1) @@ -141,12 +148,12 @@ def main(): # Start minimal OAuth callback server for stdio mode from auth.oauth_callback_server import ensure_oauth_callback_available if ensure_oauth_callback_available('stdio', port, base_uri): - print(f" OAuth callback server started on {base_uri}:{port}/oauth2callback", file=sys.stderr) + safe_print(f" OAuth callback server started on {base_uri}:{port}/oauth2callback") else: safe_print(" ⚠️ Warning: Failed to start OAuth callback server") - print(" Ready for MCP connections!", file=sys.stderr) - print(file=sys.stderr) + safe_print(" Ready for MCP connections!") + safe_print("") if args.transport == 'streamable-http': # The server is already configured with port and server_url in core/server.py