Update stdio startup and credentails path for easier claude desktop autoinstall

This commit is contained in:
Taylor Wilsdon
2025-07-04 16:10:01 -04:00
parent f4b8e0452b
commit 6f48a04545
3 changed files with 316 additions and 149 deletions

View File

@@ -19,27 +19,48 @@ from auth.scopes import OAUTH_STATE_TO_SESSION_ID_MAP, SCOPES
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Constants # Constants
DEFAULT_CREDENTIALS_DIR = ".credentials" def get_default_credentials_dir():
"""Get the default credentials directory path, preferring user-specific locations."""
# Check for explicit environment variable override
if os.getenv("GOOGLE_MCP_CREDENTIALS_DIR"):
return os.getenv("GOOGLE_MCP_CREDENTIALS_DIR")
# Use user home directory for credentials storage
home_dir = os.path.expanduser("~")
if home_dir and home_dir != "~": # Valid home directory found
return os.path.join(home_dir, ".google_workspace_mcp", "credentials")
# Fallback to current working directory if home directory is not accessible
return os.path.join(os.getcwd(), ".credentials")
DEFAULT_CREDENTIALS_DIR = get_default_credentials_dir()
# In-memory cache for session credentials, maps session_id to Credentials object # In-memory cache for session credentials, maps session_id to Credentials object
# This is brittle and bad, but our options are limited with Claude in present state. # This is brittle and bad, but our options are limited with Claude in present state.
# This should be more robust in a production system once OAuth2.1 is implemented in client. # This should be more robust in a production system once OAuth2.1 is implemented in client.
_SESSION_CREDENTIALS_CACHE: Dict[str, Credentials] = {} _SESSION_CREDENTIALS_CACHE: Dict[str, Credentials] = {}
# Centralized Client Secrets Path Logic # Centralized Client Secrets Path Logic
_client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRET_PATH") or os.getenv("GOOGLE_CLIENT_SECRETS") _client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRET_PATH") or os.getenv(
"GOOGLE_CLIENT_SECRETS"
)
if _client_secrets_env: if _client_secrets_env:
CONFIG_CLIENT_SECRETS_PATH = _client_secrets_env CONFIG_CLIENT_SECRETS_PATH = _client_secrets_env
else: else:
# Assumes this file is in auth/ and client_secret.json is in the root # Assumes this file is in auth/ and client_secret.json is in the root
CONFIG_CLIENT_SECRETS_PATH = os.path.join( CONFIG_CLIENT_SECRETS_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'client_secret.json' "client_secret.json",
) )
# --- Helper Functions --- # --- Helper Functions ---
def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[Credentials]:
def _find_any_credentials(
base_dir: str = DEFAULT_CREDENTIALS_DIR,
) -> Optional[Credentials]:
""" """
Find and load any valid credentials from the credentials directory. Find and load any valid credentials from the credentials directory.
Used in single-user mode to bypass session-to-OAuth mapping. Used in single-user mode to bypass session-to-OAuth mapping.
@@ -53,104 +74,135 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C
# Scan for any .json credential files # Scan for any .json credential files
for filename in os.listdir(base_dir): for filename in os.listdir(base_dir):
if filename.endswith('.json'): if filename.endswith(".json"):
filepath = os.path.join(base_dir, filename) filepath = os.path.join(base_dir, filename)
try: try:
with open(filepath, 'r') as f: with open(filepath, "r") as f:
creds_data = json.load(f) creds_data = json.load(f)
credentials = Credentials( credentials = Credentials(
token=creds_data.get('token'), token=creds_data.get("token"),
refresh_token=creds_data.get('refresh_token'), refresh_token=creds_data.get("refresh_token"),
token_uri=creds_data.get('token_uri'), token_uri=creds_data.get("token_uri"),
client_id=creds_data.get('client_id'), client_id=creds_data.get("client_id"),
client_secret=creds_data.get('client_secret'), client_secret=creds_data.get("client_secret"),
scopes=creds_data.get('scopes') scopes=creds_data.get("scopes"),
) )
logger.info(f"[single-user] Found credentials in {filepath}") logger.info(f"[single-user] Found credentials in {filepath}")
return credentials return credentials
except (IOError, json.JSONDecodeError, KeyError) as e: except (IOError, json.JSONDecodeError, KeyError) as e:
logger.warning(f"[single-user] Error loading credentials from {filepath}: {e}") logger.warning(
f"[single-user] Error loading credentials from {filepath}: {e}"
)
continue continue
logger.info(f"[single-user] No valid credentials found in {base_dir}") logger.info(f"[single-user] No valid credentials found in {base_dir}")
return None return None
def _get_user_credential_path(user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR) -> str:
def _get_user_credential_path(
user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR
) -> str:
"""Constructs the path to a user's credential file.""" """Constructs the path to a user's credential file."""
if not os.path.exists(base_dir): if not os.path.exists(base_dir):
os.makedirs(base_dir) os.makedirs(base_dir)
logger.info(f"Created credentials directory: {base_dir}") logger.info(f"Created credentials directory: {base_dir}")
return os.path.join(base_dir, f"{user_google_email}.json") return os.path.join(base_dir, f"{user_google_email}.json")
def save_credentials_to_file(user_google_email: str, credentials: Credentials, base_dir: str = DEFAULT_CREDENTIALS_DIR):
def save_credentials_to_file(
user_google_email: str,
credentials: Credentials,
base_dir: str = DEFAULT_CREDENTIALS_DIR,
):
"""Saves user credentials to a file.""" """Saves user credentials to a file."""
creds_path = _get_user_credential_path(user_google_email, base_dir) creds_path = _get_user_credential_path(user_google_email, base_dir)
creds_data = { creds_data = {
'token': credentials.token, "token": credentials.token,
'refresh_token': credentials.refresh_token, "refresh_token": credentials.refresh_token,
'token_uri': credentials.token_uri, "token_uri": credentials.token_uri,
'client_id': credentials.client_id, "client_id": credentials.client_id,
'client_secret': credentials.client_secret, "client_secret": credentials.client_secret,
'scopes': credentials.scopes, "scopes": credentials.scopes,
'expiry': credentials.expiry.isoformat() if credentials.expiry else None "expiry": credentials.expiry.isoformat() if credentials.expiry else None,
} }
try: try:
with open(creds_path, 'w') as f: with open(creds_path, "w") as f:
json.dump(creds_data, f) json.dump(creds_data, f)
logger.info(f"Credentials saved for user {user_google_email} to {creds_path}") logger.info(f"Credentials saved for user {user_google_email} to {creds_path}")
except IOError as e: except IOError as e:
logger.error(f"Error saving credentials for user {user_google_email} to {creds_path}: {e}") logger.error(
f"Error saving credentials for user {user_google_email} to {creds_path}: {e}"
)
raise raise
def save_credentials_to_session(session_id: str, credentials: Credentials): def save_credentials_to_session(session_id: str, credentials: Credentials):
"""Saves user credentials to the in-memory session cache.""" """Saves user credentials to the in-memory session cache."""
_SESSION_CREDENTIALS_CACHE[session_id] = credentials _SESSION_CREDENTIALS_CACHE[session_id] = credentials
logger.debug(f"Credentials saved to session cache for session_id: {session_id}") logger.debug(f"Credentials saved to session cache for session_id: {session_id}")
def load_credentials_from_file(user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[Credentials]:
def load_credentials_from_file(
user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR
) -> Optional[Credentials]:
"""Loads user credentials from a file.""" """Loads user credentials from a file."""
creds_path = _get_user_credential_path(user_google_email, base_dir) creds_path = _get_user_credential_path(user_google_email, base_dir)
if not os.path.exists(creds_path): if not os.path.exists(creds_path):
logger.info(f"No credentials file found for user {user_google_email} at {creds_path}") logger.info(
f"No credentials file found for user {user_google_email} at {creds_path}"
)
return None return None
try: try:
with open(creds_path, 'r') as f: with open(creds_path, "r") as f:
creds_data = json.load(f) creds_data = json.load(f)
# Parse expiry if present # Parse expiry if present
expiry = None expiry = None
if creds_data.get('expiry'): if creds_data.get("expiry"):
try: try:
from datetime import datetime from datetime import datetime
expiry = datetime.fromisoformat(creds_data['expiry'])
expiry = datetime.fromisoformat(creds_data["expiry"])
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
logger.warning(f"Could not parse expiry time for {user_google_email}: {e}") logger.warning(
f"Could not parse expiry time for {user_google_email}: {e}"
)
credentials = Credentials( credentials = Credentials(
token=creds_data.get('token'), token=creds_data.get("token"),
refresh_token=creds_data.get('refresh_token'), refresh_token=creds_data.get("refresh_token"),
token_uri=creds_data.get('token_uri'), token_uri=creds_data.get("token_uri"),
client_id=creds_data.get('client_id'), client_id=creds_data.get("client_id"),
client_secret=creds_data.get('client_secret'), client_secret=creds_data.get("client_secret"),
scopes=creds_data.get('scopes'), scopes=creds_data.get("scopes"),
expiry=expiry expiry=expiry,
)
logger.debug(
f"Credentials loaded for user {user_google_email} from {creds_path}"
) )
logger.debug(f"Credentials loaded for user {user_google_email} from {creds_path}")
return credentials return credentials
except (IOError, json.JSONDecodeError, KeyError) as e: except (IOError, json.JSONDecodeError, KeyError) as e:
logger.error(f"Error loading or parsing credentials for user {user_google_email} from {creds_path}: {e}") logger.error(
f"Error loading or parsing credentials for user {user_google_email} from {creds_path}: {e}"
)
return None return None
def load_credentials_from_session(session_id: str) -> Optional[Credentials]: def load_credentials_from_session(session_id: str) -> Optional[Credentials]:
"""Loads user credentials from the in-memory session cache.""" """Loads user credentials from the in-memory session cache."""
credentials = _SESSION_CREDENTIALS_CACHE.get(session_id) credentials = _SESSION_CREDENTIALS_CACHE.get(session_id)
if credentials: if credentials:
logger.debug(f"Credentials loaded from session cache for session_id: {session_id}") logger.debug(
f"Credentials loaded from session cache for session_id: {session_id}"
)
else: else:
logger.debug(f"No credentials found in session cache for session_id: {session_id}") logger.debug(
f"No credentials found in session cache for session_id: {session_id}"
)
return credentials return credentials
def load_client_secrets_from_env() -> Optional[Dict[str, Any]]: def load_client_secrets_from_env() -> Optional[Dict[str, Any]]:
""" """
Loads the client secrets from environment variables. Loads the client secrets from environment variables.
@@ -175,7 +227,7 @@ def load_client_secrets_from_env() -> Optional[Dict[str, Any]]:
"client_secret": client_secret, "client_secret": client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth", "auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token", "token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs" "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
} }
# Add redirect_uri if provided via environment variable # Add redirect_uri if provided via environment variable
@@ -191,6 +243,7 @@ def load_client_secrets_from_env() -> Optional[Dict[str, Any]]:
logger.debug("OAuth client credentials not found in environment variables") logger.debug("OAuth client credentials not found in environment variables")
return None return None
def load_client_secrets(client_secrets_path: str) -> Dict[str, Any]: def load_client_secrets(client_secrets_path: str) -> Dict[str, Any]:
""" """
Loads the client secrets from environment variables (preferred) or from the client secrets file. Loads the client secrets from environment variables (preferred) or from the client secrets file.
@@ -217,21 +270,29 @@ def load_client_secrets(client_secrets_path: str) -> Dict[str, Any]:
# Fall back to loading from file # Fall back to loading from file
try: try:
with open(client_secrets_path, 'r') as f: with open(client_secrets_path, "r") as f:
client_config = json.load(f) client_config = json.load(f)
# The file usually contains a top-level key like "web" or "installed" # The file usually contains a top-level key like "web" or "installed"
if "web" in client_config: if "web" in client_config:
logger.info(f"Loaded OAuth client credentials from file: {client_secrets_path}") logger.info(
f"Loaded OAuth client credentials from file: {client_secrets_path}"
)
return client_config["web"] return client_config["web"]
elif "installed" in client_config: elif "installed" in client_config:
logger.info(f"Loaded OAuth client credentials from file: {client_secrets_path}") logger.info(
f"Loaded OAuth client credentials from file: {client_secrets_path}"
)
return client_config["installed"] return client_config["installed"]
else: else:
logger.error(f"Client secrets file {client_secrets_path} has unexpected format.") logger.error(
f"Client secrets file {client_secrets_path} has unexpected format."
)
raise ValueError("Invalid client secrets file format") raise ValueError("Invalid client secrets file format")
except (IOError, json.JSONDecodeError) as e: except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading client secrets file {client_secrets_path}: {e}") logger.error(f"Error loading client secrets file {client_secrets_path}: {e}")
raise raise
def check_client_secrets() -> Optional[str]: def check_client_secrets() -> Optional[str]:
""" """
Checks for the presence of OAuth client secrets, either as environment Checks for the presence of OAuth client secrets, either as environment
@@ -242,40 +303,48 @@ def check_client_secrets() -> Optional[str]:
""" """
env_config = load_client_secrets_from_env() env_config = load_client_secrets_from_env()
if not env_config and not os.path.exists(CONFIG_CLIENT_SECRETS_PATH): if not env_config and not os.path.exists(CONFIG_CLIENT_SECRETS_PATH):
logger.error(f"OAuth client credentials not found. No environment variables set and no file at {CONFIG_CLIENT_SECRETS_PATH}") logger.error(
f"OAuth client credentials not found. No environment variables set and no file at {CONFIG_CLIENT_SECRETS_PATH}"
)
return f"OAuth client credentials not found. Please set GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables or provide a client secrets file at {CONFIG_CLIENT_SECRETS_PATH}." return f"OAuth client credentials not found. Please set GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables or provide a client secrets file at {CONFIG_CLIENT_SECRETS_PATH}."
return None return None
def create_oauth_flow(scopes: List[str], redirect_uri: str, state: Optional[str] = None) -> Flow:
def create_oauth_flow(
scopes: List[str], redirect_uri: str, state: Optional[str] = None
) -> Flow:
"""Creates an OAuth flow using environment variables or client secrets file.""" """Creates an OAuth flow using environment variables or client secrets file."""
# Try environment variables first # Try environment variables first
env_config = load_client_secrets_from_env() env_config = load_client_secrets_from_env()
if env_config: if env_config:
# Use client config directly # Use client config directly
flow = Flow.from_client_config( flow = Flow.from_client_config(
env_config, env_config, scopes=scopes, redirect_uri=redirect_uri, state=state
scopes=scopes,
redirect_uri=redirect_uri,
state=state
) )
logger.debug("Created OAuth flow from environment variables") logger.debug("Created OAuth flow from environment variables")
return flow return flow
# Fall back to file-based config # Fall back to file-based config
if not os.path.exists(CONFIG_CLIENT_SECRETS_PATH): if not os.path.exists(CONFIG_CLIENT_SECRETS_PATH):
raise FileNotFoundError(f"OAuth client secrets file not found at {CONFIG_CLIENT_SECRETS_PATH} and no environment variables set") raise FileNotFoundError(
f"OAuth client secrets file not found at {CONFIG_CLIENT_SECRETS_PATH} and no environment variables set"
)
flow = Flow.from_client_secrets_file( flow = Flow.from_client_secrets_file(
CONFIG_CLIENT_SECRETS_PATH, CONFIG_CLIENT_SECRETS_PATH,
scopes=scopes, scopes=scopes,
redirect_uri=redirect_uri, redirect_uri=redirect_uri,
state=state state=state,
)
logger.debug(
f"Created OAuth flow from client secrets file: {CONFIG_CLIENT_SECRETS_PATH}"
) )
logger.debug(f"Created OAuth flow from client secrets file: {CONFIG_CLIENT_SECRETS_PATH}")
return flow return flow
# --- Core OAuth Logic --- # --- Core OAuth Logic ---
async def start_auth_flow( async def start_auth_flow(
mcp_session_id: Optional[str], mcp_session_id: Optional[str],
user_google_email: Optional[str], user_google_email: Optional[str],
@@ -297,29 +366,47 @@ async def start_auth_flow(
Raises: Raises:
Exception: If the OAuth flow cannot be initiated. Exception: If the OAuth flow cannot be initiated.
""" """
initial_email_provided = bool(user_google_email and user_google_email.strip() and user_google_email.lower() != 'default') initial_email_provided = bool(
user_display_name = f"{service_name} for '{user_google_email}'" if initial_email_provided else service_name user_google_email
and user_google_email.strip()
and user_google_email.lower() != "default"
)
user_display_name = (
f"{service_name} for '{user_google_email}'"
if initial_email_provided
else service_name
)
logger.info(f"[start_auth_flow] Initiating auth for {user_display_name} (session: {mcp_session_id}) with global SCOPES.") logger.info(
f"[start_auth_flow] Initiating auth for {user_display_name} (session: {mcp_session_id}) with global SCOPES."
)
try: try:
if 'OAUTHLIB_INSECURE_TRANSPORT' not in os.environ and ("localhost" in redirect_uri or "127.0.0.1" in redirect_uri): # Use passed redirect_uri if "OAUTHLIB_INSECURE_TRANSPORT" not in os.environ and (
logger.warning("OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost/local development.") "localhost" in redirect_uri or "127.0.0.1" in redirect_uri
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' ): # Use passed redirect_uri
logger.warning(
"OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost/local development."
)
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
oauth_state = os.urandom(16).hex() oauth_state = os.urandom(16).hex()
if mcp_session_id: if mcp_session_id:
OAUTH_STATE_TO_SESSION_ID_MAP[oauth_state] = mcp_session_id OAUTH_STATE_TO_SESSION_ID_MAP[oauth_state] = mcp_session_id
logger.info(f"[start_auth_flow] Stored mcp_session_id '{mcp_session_id}' for oauth_state '{oauth_state}'.") logger.info(
f"[start_auth_flow] Stored mcp_session_id '{mcp_session_id}' for oauth_state '{oauth_state}'."
)
flow = create_oauth_flow( flow = create_oauth_flow(
scopes=SCOPES, # Use global SCOPES scopes=SCOPES, # Use global SCOPES
redirect_uri=redirect_uri, # Use passed redirect_uri redirect_uri=redirect_uri, # Use passed redirect_uri
state=oauth_state state=oauth_state,
) )
auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent') auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent")
logger.info(f"Auth flow started for {user_display_name}. State: {oauth_state}. Advise user to visit: {auth_url}") logger.info(
f"Auth flow started for {user_display_name}. State: {oauth_state}. Advise user to visit: {auth_url}"
)
message_lines = [ message_lines = [
f"**ACTION REQUIRED: Google Authentication Needed for {user_display_name}**\n", f"**ACTION REQUIRED: Google Authentication Needed for {user_display_name}**\n",
@@ -330,18 +417,28 @@ async def start_auth_flow(
"**LLM, after presenting the link, instruct the user as follows:**", "**LLM, after presenting the link, instruct the user as follows:**",
"1. Click the link and complete the authorization in their browser.", "1. Click the link and complete the authorization in their browser.",
] ]
session_info_for_llm = f" (this will link to your current session {mcp_session_id})" if mcp_session_id else "" session_info_for_llm = (
f" (this will link to your current session {mcp_session_id})"
if mcp_session_id
else ""
)
if not initial_email_provided: if not initial_email_provided:
message_lines.extend([ message_lines.extend(
[
f"2. After successful authorization{session_info_for_llm}, the browser page will display the authenticated email address.", f"2. After successful authorization{session_info_for_llm}, the browser page will display the authenticated email address.",
" **LLM: Instruct the user to provide you with this email address.**", " **LLM: Instruct the user to provide you with this email address.**",
"3. Once you have the email, **retry their original command, ensuring you include this `user_google_email`.**" "3. Once you have the email, **retry their original command, ensuring you include this `user_google_email`.**",
]) ]
)
else: else:
message_lines.append(f"2. After successful authorization{session_info_for_llm}, **retry their original command**.") message_lines.append(
f"2. After successful authorization{session_info_for_llm}, **retry their original command**."
)
message_lines.append(f"\nThe application will use the new credentials. If '{user_google_email}' was provided, it must match the authenticated account.") message_lines.append(
f"\nThe application will use the new credentials. If '{user_google_email}' was provided, it must match the authenticated account."
)
return "\n".join(message_lines) return "\n".join(message_lines)
except FileNotFoundError as e: except FileNotFoundError as e:
@@ -350,16 +447,22 @@ async def start_auth_flow(
raise Exception(error_text) raise Exception(error_text)
except Exception as e: except Exception as e:
error_text = f"Could not initiate authentication for {user_display_name} due to an unexpected error: {str(e)}" error_text = f"Could not initiate authentication for {user_display_name} due to an unexpected error: {str(e)}"
logger.error(f"Failed to start the OAuth flow for {user_display_name}: {e}", exc_info=True) logger.error(
f"Failed to start the OAuth flow for {user_display_name}: {e}",
exc_info=True,
)
raise Exception(error_text) raise Exception(error_text)
def handle_auth_callback( def handle_auth_callback(
scopes: List[str], scopes: List[str],
authorization_response: str, authorization_response: str,
redirect_uri: str, redirect_uri: str,
credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR, credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR,
session_id: Optional[str] = None, session_id: Optional[str] = None,
client_secrets_path: Optional[str] = None # Deprecated: kept for backward compatibility client_secrets_path: Optional[
str
] = None, # Deprecated: kept for backward compatibility
) -> Tuple[str, Credentials]: ) -> Tuple[str, Credentials]:
""" """
Handles the callback from Google, exchanges the code for credentials, Handles the callback from Google, exchanges the code for credentials,
@@ -385,17 +488,18 @@ def handle_auth_callback(
try: try:
# Log deprecation warning if old parameter is used # Log deprecation warning if old parameter is used
if client_secrets_path: if client_secrets_path:
logger.warning("The 'client_secrets_path' parameter is deprecated. Use GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables instead.") logger.warning(
"The 'client_secrets_path' parameter is deprecated. Use GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables instead."
)
# Allow HTTP for localhost in development # Allow HTTP for localhost in development
if 'OAUTHLIB_INSECURE_TRANSPORT' not in os.environ: if "OAUTHLIB_INSECURE_TRANSPORT" not in os.environ:
logger.warning("OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost development.") logger.warning(
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' "OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost development."
flow = create_oauth_flow(
scopes=scopes,
redirect_uri=redirect_uri
) )
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
flow = create_oauth_flow(scopes=scopes, redirect_uri=redirect_uri)
# Exchange the authorization code for credentials # Exchange the authorization code for credentials
# Note: fetch_token will use the redirect_uri configured in the flow # Note: fetch_token will use the redirect_uri configured in the flow
@@ -405,11 +509,11 @@ def handle_auth_callback(
# Get user info to determine user_id (using email here) # Get user info to determine user_id (using email here)
user_info = get_user_info(credentials) user_info = get_user_info(credentials)
if not user_info or 'email' not in user_info: if not user_info or "email" not in user_info:
logger.error("Could not retrieve user email from Google.") logger.error("Could not retrieve user email from Google.")
raise ValueError("Failed to get user email for identification.") raise ValueError("Failed to get user email for identification.")
user_google_email = user_info['email'] user_google_email = user_info["email"]
logger.info(f"Identified user_google_email: {user_google_email}") logger.info(f"Identified user_google_email: {user_google_email}")
# Save the credentials to file # Save the credentials to file
@@ -425,12 +529,13 @@ def handle_auth_callback(
logger.error(f"Error handling auth callback: {e}") logger.error(f"Error handling auth callback: {e}")
raise # Re-raise for the caller raise # Re-raise for the caller
def get_credentials( def get_credentials(
user_google_email: Optional[str], # Can be None if relying on session_id user_google_email: Optional[str], # Can be None if relying on session_id
required_scopes: List[str], required_scopes: List[str],
client_secrets_path: Optional[str] = None, client_secrets_path: Optional[str] = None,
credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR, credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR,
session_id: Optional[str] = None session_id: Optional[str] = None,
) -> Optional[Credentials]: ) -> Optional[Credentials]:
""" """
Retrieves stored credentials, prioritizing session, then file. Refreshes if necessary. Retrieves stored credentials, prioritizing session, then file. Refreshes if necessary.
@@ -448,11 +553,15 @@ def get_credentials(
Valid Credentials object or None. Valid Credentials object or None.
""" """
# Check for single-user mode # Check for single-user mode
if os.getenv('MCP_SINGLE_USER_MODE') == '1': if os.getenv("MCP_SINGLE_USER_MODE") == "1":
logger.info(f"[get_credentials] Single-user mode: bypassing session mapping, finding any credentials") logger.info(
f"[get_credentials] Single-user mode: bypassing session mapping, finding any credentials"
)
credentials = _find_any_credentials(credentials_base_dir) credentials = _find_any_credentials(credentials_base_dir)
if not credentials: if not credentials:
logger.info(f"[get_credentials] Single-user mode: No credentials found in {credentials_base_dir}") logger.info(
f"[get_credentials] Single-user mode: No credentials found in {credentials_base_dir}"
)
return None return None
# In single-user mode, if user_google_email wasn't provided, try to get it from user info # In single-user mode, if user_google_email wasn't provided, try to get it from user info
@@ -460,11 +569,15 @@ def get_credentials(
if not user_google_email and credentials.valid: if not user_google_email and credentials.valid:
try: try:
user_info = get_user_info(credentials) user_info = get_user_info(credentials)
if user_info and 'email' in user_info: if user_info and "email" in user_info:
user_google_email = user_info['email'] user_google_email = user_info["email"]
logger.debug(f"[get_credentials] Single-user mode: extracted user email {user_google_email} from credentials") logger.debug(
f"[get_credentials] Single-user mode: extracted user email {user_google_email} from credentials"
)
except Exception as e: except Exception as e:
logger.debug(f"[get_credentials] Single-user mode: could not extract user email: {e}") logger.debug(
f"[get_credentials] Single-user mode: could not extract user email: {e}"
)
else: else:
credentials: Optional[Credentials] = None credentials: Optional[Credentials] = None
@@ -472,61 +585,100 @@ def get_credentials(
if not session_id: if not session_id:
logger.debug("[get_credentials] No session_id provided") logger.debug("[get_credentials] No session_id provided")
logger.debug(f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}") logger.debug(
f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}"
)
if session_id: if session_id:
credentials = load_credentials_from_session(session_id) credentials = load_credentials_from_session(session_id)
if credentials: if credentials:
logger.debug(f"[get_credentials] Loaded credentials from session for session_id '{session_id}'.") logger.debug(
f"[get_credentials] Loaded credentials from session for session_id '{session_id}'."
)
if not credentials and user_google_email: if not credentials and user_google_email:
logger.debug(f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'.") logger.debug(
credentials = load_credentials_from_file(user_google_email, credentials_base_dir) f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'."
)
credentials = load_credentials_from_file(
user_google_email, credentials_base_dir
)
if credentials and session_id: if credentials and session_id:
logger.debug(f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'.") logger.debug(
save_credentials_to_session(session_id, credentials) # Cache for current session f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'."
)
save_credentials_to_session(
session_id, credentials
) # Cache for current session
if not credentials: if not credentials:
logger.info(f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'.") logger.info(
f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'."
)
return None return None
logger.debug(f"[get_credentials] Credentials found. Scopes: {credentials.scopes}, Valid: {credentials.valid}, Expired: {credentials.expired}") logger.debug(
f"[get_credentials] Credentials found. Scopes: {credentials.scopes}, Valid: {credentials.valid}, Expired: {credentials.expired}"
)
if not all(scope in credentials.scopes for scope in required_scopes): if not all(scope in credentials.scopes for scope in required_scopes):
logger.warning(f"[get_credentials] Credentials lack required scopes. Need: {required_scopes}, Have: {credentials.scopes}. User: '{user_google_email}', Session: '{session_id}'") logger.warning(
f"[get_credentials] Credentials lack required scopes. Need: {required_scopes}, Have: {credentials.scopes}. User: '{user_google_email}', Session: '{session_id}'"
)
return None # Re-authentication needed for scopes return None # Re-authentication needed for scopes
logger.debug(f"[get_credentials] Credentials have sufficient scopes. User: '{user_google_email}', Session: '{session_id}'") logger.debug(
f"[get_credentials] Credentials have sufficient scopes. User: '{user_google_email}', Session: '{session_id}'"
)
if credentials.valid: if credentials.valid:
logger.debug(f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'") logger.debug(
f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'"
)
return credentials return credentials
elif credentials.expired and credentials.refresh_token: elif credentials.expired and credentials.refresh_token:
logger.info(f"[get_credentials] Credentials expired. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'") logger.info(
f"[get_credentials] Credentials expired. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'"
)
if not client_secrets_path: if not client_secrets_path:
logger.error("[get_credentials] Client secrets path required for refresh but not provided.") logger.error(
"[get_credentials] Client secrets path required for refresh but not provided."
)
return None return None
try: try:
logger.debug(f"[get_credentials] Refreshing token using client_secrets_path: {client_secrets_path}") logger.debug(
f"[get_credentials] Refreshing token using client_secrets_path: {client_secrets_path}"
)
# client_config = load_client_secrets(client_secrets_path) # Not strictly needed if creds have client_id/secret # client_config = load_client_secrets(client_secrets_path) # Not strictly needed if creds have client_id/secret
credentials.refresh(Request()) credentials.refresh(Request())
logger.info(f"[get_credentials] Credentials refreshed successfully. User: '{user_google_email}', Session: '{session_id}'") logger.info(
f"[get_credentials] Credentials refreshed successfully. User: '{user_google_email}', Session: '{session_id}'"
)
# Save refreshed credentials # Save refreshed credentials
if user_google_email: # Always save to file if email is known if user_google_email: # Always save to file if email is known
save_credentials_to_file(user_google_email, credentials, credentials_base_dir) save_credentials_to_file(
user_google_email, credentials, credentials_base_dir
)
if session_id: # Update session cache if it was the source or is active if session_id: # Update session cache if it was the source or is active
save_credentials_to_session(session_id, credentials) save_credentials_to_session(session_id, credentials)
return credentials return credentials
except RefreshError as e: except RefreshError as e:
logger.warning(f"[get_credentials] RefreshError - token expired/revoked: {e}. User: '{user_google_email}', Session: '{session_id}'") logger.warning(
f"[get_credentials] RefreshError - token expired/revoked: {e}. User: '{user_google_email}', Session: '{session_id}'"
)
# For RefreshError, we should return None to trigger reauthentication # For RefreshError, we should return None to trigger reauthentication
return None return None
except Exception as e: except Exception as e:
logger.error(f"[get_credentials] Error refreshing credentials: {e}. User: '{user_google_email}', Session: '{session_id}'", exc_info=True) logger.error(
f"[get_credentials] Error refreshing credentials: {e}. User: '{user_google_email}', Session: '{session_id}'",
exc_info=True,
)
return None # Failed to refresh return None # Failed to refresh
else: else:
logger.warning(f"[get_credentials] Credentials invalid/cannot refresh. Valid: {credentials.valid}, Refresh Token: {credentials.refresh_token is not None}. User: '{user_google_email}', Session: '{session_id}'") logger.warning(
f"[get_credentials] Credentials invalid/cannot refresh. Valid: {credentials.valid}, Refresh Token: {credentials.refresh_token is not None}. User: '{user_google_email}', Session: '{session_id}'"
)
return None return None
@@ -538,7 +690,7 @@ def get_user_info(credentials: Credentials) -> Optional[Dict[str, Any]]:
try: try:
# Using googleapiclient discovery to get user info # Using googleapiclient discovery to get user info
# Requires 'google-api-python-client' library # Requires 'google-api-python-client' library
service = build('oauth2', 'v2', credentials=credentials) service = build("oauth2", "v2", credentials=credentials)
user_info = service.userinfo().get().execute() user_info = service.userinfo().get().execute()
logger.info(f"Successfully fetched user info: {user_info.get('email')}") logger.info(f"Successfully fetched user info: {user_info.get('email')}")
return user_info return user_info
@@ -553,8 +705,10 @@ def get_user_info(credentials: Credentials) -> Optional[Dict[str, Any]]:
# --- Centralized Google Service Authentication --- # --- Centralized Google Service Authentication ---
class GoogleAuthenticationError(Exception): class GoogleAuthenticationError(Exception):
"""Exception raised when Google authentication is required or fails.""" """Exception raised when Google authentication is required or fails."""
def __init__(self, message: str, auth_url: Optional[str] = None): def __init__(self, message: str, auth_url: Optional[str] = None):
super().__init__(message) super().__init__(message)
self.auth_url = auth_url self.auth_url = auth_url
@@ -602,7 +756,6 @@ async def get_authenticated_google_service(
session_id=None, # Session ID not available in service layer session_id=None, # Session ID not available in service layer
) )
if not credentials or not credentials.valid: if not credentials or not credentials.valid:
logger.warning( logger.warning(
f"[{tool_name}] No valid credentials. Email: '{user_google_email}'." f"[{tool_name}] No valid credentials. Email: '{user_google_email}'."
@@ -637,8 +790,11 @@ async def get_authenticated_google_service(
if credentials and credentials.id_token: if credentials and credentials.id_token:
try: try:
import jwt import jwt
# Decode without verification (just to get email for logging) # Decode without verification (just to get email for logging)
decoded_token = jwt.decode(credentials.id_token, options={"verify_signature": False}) decoded_token = jwt.decode(
credentials.id_token, options={"verify_signature": False}
)
token_email = decoded_token.get("email") token_email = decoded_token.get("email")
if token_email: if token_email:
log_user_email = token_email log_user_email = token_email
@@ -646,12 +802,12 @@ async def get_authenticated_google_service(
except Exception as e: except Exception as e:
logger.debug(f"[{tool_name}] Could not decode id_token: {e}") logger.debug(f"[{tool_name}] Could not decode id_token: {e}")
logger.info(f"[{tool_name}] Successfully authenticated {service_name} service for user: {log_user_email}") logger.info(
f"[{tool_name}] Successfully authenticated {service_name} service for user: {log_user_email}"
)
return service, log_user_email return service, log_user_email
except Exception as e: except Exception as e:
error_msg = f"[{tool_name}] Failed to build {service_name} service: {str(e)}" error_msg = f"[{tool_name}] Failed to build {service_name} service: {str(e)}"
logger.error(error_msg, exc_info=True) logger.error(error_msg, exc_info=True)
raise GoogleAuthenticationError(error_msg) raise GoogleAuthenticationError(error_msg)

View File

@@ -8,17 +8,21 @@ from typing import List, Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def check_credentials_directory_permissions(credentials_dir: str = ".credentials") -> None: def check_credentials_directory_permissions(credentials_dir: str = None) -> None:
""" """
Check if the service has appropriate permissions to create and write to the .credentials directory. Check if the service has appropriate permissions to create and write to the .credentials directory.
Args: Args:
credentials_dir: Path to the credentials directory (default: ".credentials") credentials_dir: Path to the credentials directory (default: uses get_default_credentials_dir())
Raises: Raises:
PermissionError: If the service lacks necessary permissions PermissionError: If the service lacks necessary permissions
OSError: If there are other file system issues OSError: If there are other file system issues
""" """
if credentials_dir is None:
from auth.google_auth import get_default_credentials_dir
credentials_dir = get_default_credentials_dir()
try: try:
# Check if directory exists # Check if directory exists
if os.path.exists(credentials_dir): if os.path.exists(credentials_dir):

27
main.py
View File

@@ -34,6 +34,13 @@ except Exception as e:
sys.stderr.write(f"CRITICAL: Failed to set up file logging to '{log_file_path}': {e}\n") sys.stderr.write(f"CRITICAL: Failed to set up file logging to '{log_file_path}': {e}\n")
def safe_print(text): def safe_print(text):
# Don't print to stderr when running as MCP server via uvx to avoid JSON parsing errors
# Check if we're running as MCP server (no TTY and uvx in process name)
if not sys.stderr.isatty():
# Running as MCP server, suppress output to avoid JSON parsing errors
logger.debug(f"[MCP Server] {text}")
return
try: try:
print(text, file=sys.stderr) print(text, file=sys.stderr)
except UnicodeEncodeError: except UnicodeEncodeError:
@@ -47,7 +54,7 @@ def main():
# Parse command line arguments # Parse command line arguments
parser = argparse.ArgumentParser(description='Google Workspace MCP Server') parser = argparse.ArgumentParser(description='Google Workspace MCP Server')
parser.add_argument('--single-user', action='store_true', parser.add_argument('--single-user', action='store_true',
help='Run in single-user mode - bypass session mapping and use any credentials from ./credentials directory') help='Run in single-user mode - bypass session mapping and use any credentials from the credentials directory')
parser.add_argument('--tools', nargs='*', parser.add_argument('--tools', nargs='*',
choices=['gmail', 'drive', 'calendar', 'docs', 'sheets', 'chat', 'forms', 'slides'], choices=['gmail', 'drive', 'calendar', 'docs', 'sheets', 'chat', 'forms', 'slides'],
help='Specify which tools to register. If not provided, all tools are registered.') help='Specify which tools to register. If not provided, all tools are registered.')
@@ -73,7 +80,7 @@ def main():
safe_print(f" 🔐 OAuth Callback: {base_uri}:{port}/oauth2callback") safe_print(f" 🔐 OAuth Callback: {base_uri}:{port}/oauth2callback")
safe_print(f" 👤 Mode: {'Single-user' if args.single_user else 'Multi-user'}") safe_print(f" 👤 Mode: {'Single-user' if args.single_user else 'Multi-user'}")
safe_print(f" 🐍 Python: {sys.version.split()[0]}") safe_print(f" 🐍 Python: {sys.version.split()[0]}")
print(file=sys.stderr) safe_print("")
# Import tool modules to register them with the MCP server via decorators # Import tool modules to register them with the MCP server via decorators
tool_imports = { tool_imports = {
@@ -104,29 +111,29 @@ def main():
for tool in tools_to_import: for tool in tools_to_import:
tool_imports[tool]() tool_imports[tool]()
safe_print(f" {tool_icons[tool]} {tool.title()} - Google {tool.title()} API integration") safe_print(f" {tool_icons[tool]} {tool.title()} - Google {tool.title()} API integration")
print(file=sys.stderr) safe_print("")
safe_print(f"📊 Configuration Summary:") safe_print(f"📊 Configuration Summary:")
safe_print(f" 🔧 Tools Enabled: {len(tools_to_import)}/{len(tool_imports)}") safe_print(f" 🔧 Tools Enabled: {len(tools_to_import)}/{len(tool_imports)}")
safe_print(f" 🔑 Auth Method: OAuth 2.0 with PKCE") safe_print(f" 🔑 Auth Method: OAuth 2.0 with PKCE")
safe_print(f" 📝 Log Level: {logging.getLogger().getEffectiveLevel()}") safe_print(f" 📝 Log Level: {logging.getLogger().getEffectiveLevel()}")
print(file=sys.stderr) safe_print("")
# Set global single-user mode flag # Set global single-user mode flag
if args.single_user: if args.single_user:
os.environ['MCP_SINGLE_USER_MODE'] = '1' os.environ['MCP_SINGLE_USER_MODE'] = '1'
safe_print("🔐 Single-user mode enabled") safe_print("🔐 Single-user mode enabled")
print(file=sys.stderr) safe_print("")
# Check credentials directory permissions before starting # Check credentials directory permissions before starting
try: try:
safe_print("🔍 Checking credentials directory permissions...") safe_print("🔍 Checking credentials directory permissions...")
check_credentials_directory_permissions() check_credentials_directory_permissions()
safe_print("✅ Credentials directory permissions verified") safe_print("✅ Credentials directory permissions verified")
print(file=sys.stderr) safe_print("")
except (PermissionError, OSError) as e: except (PermissionError, OSError) as e:
safe_print(f"❌ Credentials directory permission check failed: {e}") safe_print(f"❌ Credentials directory permission check failed: {e}")
print(" Please ensure the service has write permissions to create/access the .credentials directory", file=sys.stderr) safe_print(" Please ensure the service has write permissions to create/access the credentials directory")
logger.error(f"Failed credentials directory permission check: {e}") logger.error(f"Failed credentials directory permission check: {e}")
sys.exit(1) sys.exit(1)
@@ -141,12 +148,12 @@ def main():
# Start minimal OAuth callback server for stdio mode # Start minimal OAuth callback server for stdio mode
from auth.oauth_callback_server import ensure_oauth_callback_available from auth.oauth_callback_server import ensure_oauth_callback_available
if ensure_oauth_callback_available('stdio', port, base_uri): if ensure_oauth_callback_available('stdio', port, base_uri):
print(f" OAuth callback server started on {base_uri}:{port}/oauth2callback", file=sys.stderr) safe_print(f" OAuth callback server started on {base_uri}:{port}/oauth2callback")
else: else:
safe_print(" ⚠️ Warning: Failed to start OAuth callback server") safe_print(" ⚠️ Warning: Failed to start OAuth callback server")
print(" Ready for MCP connections!", file=sys.stderr) safe_print(" Ready for MCP connections!")
print(file=sys.stderr) safe_print("")
if args.transport == 'streamable-http': if args.transport == 'streamable-http':
# The server is already configured with port and server_url in core/server.py # The server is already configured with port and server_url in core/server.py