implement single user mode
This commit is contained in:
@@ -4,9 +4,7 @@ import os
|
||||
import json
|
||||
import logging
|
||||
from typing import List, Optional, Tuple, Dict, Any, Callable
|
||||
import os # Ensure os is imported
|
||||
|
||||
from oauthlib.oauth2.rfc6749.errors import InsecureTransportError
|
||||
import os
|
||||
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google_auth_oauthlib.flow import Flow, InstalledAppFlow
|
||||
@@ -28,9 +26,9 @@ logger = logging.getLogger(__name__)
|
||||
# Constants
|
||||
DEFAULT_CREDENTIALS_DIR = ".credentials"
|
||||
|
||||
# In-memory cache for session credentials
|
||||
# Maps session_id to Credentials object
|
||||
# This should be a more robust cache in a production system (e.g., Redis)
|
||||
# In-memory cache for session credentials, maps session_id to Credentials object
|
||||
# This is brittle and bad, but our options are limited with Claude in present state.
|
||||
# This should be more robust in a production system once OAuth2.1 is implemented in client.
|
||||
_SESSION_CREDENTIALS_CACHE: Dict[str, Credentials] = {}
|
||||
# Centralized Client Secrets Path Logic
|
||||
_client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRETS")
|
||||
@@ -43,9 +41,44 @@ else:
|
||||
'client_secret.json'
|
||||
)
|
||||
|
||||
|
||||
# --- Helper Functions ---
|
||||
|
||||
def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[Credentials]:
|
||||
"""
|
||||
Find and load any valid credentials from the credentials directory.
|
||||
Used in single-user mode to bypass session-to-OAuth mapping.
|
||||
|
||||
Returns:
|
||||
First valid Credentials object found, or None if none exist.
|
||||
"""
|
||||
if not os.path.exists(base_dir):
|
||||
logger.info(f"[single-user] Credentials directory not found: {base_dir}")
|
||||
return None
|
||||
|
||||
# Scan for any .json credential files
|
||||
for filename in os.listdir(base_dir):
|
||||
if filename.endswith('.json'):
|
||||
filepath = os.path.join(base_dir, filename)
|
||||
try:
|
||||
with open(filepath, 'r') as f:
|
||||
creds_data = json.load(f)
|
||||
credentials = Credentials(
|
||||
token=creds_data.get('token'),
|
||||
refresh_token=creds_data.get('refresh_token'),
|
||||
token_uri=creds_data.get('token_uri'),
|
||||
client_id=creds_data.get('client_id'),
|
||||
client_secret=creds_data.get('client_secret'),
|
||||
scopes=creds_data.get('scopes')
|
||||
)
|
||||
logger.info(f"[single-user] Found credentials in {filepath}")
|
||||
return credentials
|
||||
except (IOError, json.JSONDecodeError, KeyError) as e:
|
||||
logger.warning(f"[single-user] Error loading credentials from {filepath}: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"[single-user] No valid credentials found in {base_dir}")
|
||||
return None
|
||||
|
||||
def _get_user_credential_path(user_google_email: str, base_dir: str = DEFAULT_CREDENTIALS_DIR) -> str:
|
||||
"""Constructs the path to a user's credential file."""
|
||||
if not os.path.exists(base_dir):
|
||||
@@ -287,6 +320,7 @@ def get_credentials(
|
||||
"""
|
||||
Retrieves stored credentials, prioritizing session, then file. Refreshes if necessary.
|
||||
If credentials are loaded from file and a session_id is present, they are cached in the session.
|
||||
In single-user mode, bypasses session mapping and uses any available credentials.
|
||||
|
||||
Args:
|
||||
user_google_email: Optional user's Google email.
|
||||
@@ -298,34 +332,53 @@ def get_credentials(
|
||||
Returns:
|
||||
Valid Credentials object or None.
|
||||
"""
|
||||
credentials: Optional[Credentials] = None
|
||||
loaded_from_session = False
|
||||
|
||||
# Try to get the current session ID if not explicitly provided
|
||||
if not session_id:
|
||||
current_session_id = get_current_session_id()
|
||||
if current_session_id:
|
||||
session_id = current_session_id
|
||||
logger.info(f"[get_credentials] No session_id provided, using current session ID: '{session_id}'")
|
||||
# Check for single-user mode
|
||||
if os.getenv('MCP_SINGLE_USER_MODE') == '1':
|
||||
logger.info(f"[get_credentials] Single-user mode: bypassing session mapping, finding any credentials")
|
||||
credentials = _find_any_credentials(credentials_base_dir)
|
||||
if not credentials:
|
||||
logger.info(f"[get_credentials] Single-user mode: No credentials found in {credentials_base_dir}")
|
||||
return None
|
||||
|
||||
logger.debug(f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}")
|
||||
# In single-user mode, if user_google_email wasn't provided, try to get it from user info
|
||||
# This is needed for proper credential saving after refresh
|
||||
if not user_google_email and credentials.valid:
|
||||
try:
|
||||
user_info = get_user_info(credentials)
|
||||
if user_info and 'email' in user_info:
|
||||
user_google_email = user_info['email']
|
||||
logger.debug(f"[get_credentials] Single-user mode: extracted user email {user_google_email} from credentials")
|
||||
except Exception as e:
|
||||
logger.debug(f"[get_credentials] Single-user mode: could not extract user email: {e}")
|
||||
else:
|
||||
credentials: Optional[Credentials] = None
|
||||
loaded_from_session = False
|
||||
|
||||
if session_id:
|
||||
credentials = load_credentials_from_session(session_id)
|
||||
if credentials:
|
||||
logger.debug(f"[get_credentials] Loaded credentials from session for session_id '{session_id}'.")
|
||||
loaded_from_session = True
|
||||
# Try to get the current session ID if not explicitly provided
|
||||
if not session_id:
|
||||
current_session_id = get_current_session_id()
|
||||
if current_session_id:
|
||||
session_id = current_session_id
|
||||
logger.info(f"[get_credentials] No session_id provided, using current session ID: '{session_id}'")
|
||||
|
||||
if not credentials and user_google_email:
|
||||
logger.debug(f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'.")
|
||||
credentials = load_credentials_from_file(user_google_email, credentials_base_dir)
|
||||
if credentials and session_id:
|
||||
logger.debug(f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'.")
|
||||
save_credentials_to_session(session_id, credentials) # Cache for current session
|
||||
logger.debug(f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}")
|
||||
|
||||
if not credentials:
|
||||
logger.info(f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'.")
|
||||
return None
|
||||
if session_id:
|
||||
credentials = load_credentials_from_session(session_id)
|
||||
if credentials:
|
||||
logger.debug(f"[get_credentials] Loaded credentials from session for session_id '{session_id}'.")
|
||||
loaded_from_session = True
|
||||
|
||||
if not credentials and user_google_email:
|
||||
logger.debug(f"[get_credentials] No session credentials, trying file for user_google_email '{user_google_email}'.")
|
||||
credentials = load_credentials_from_file(user_google_email, credentials_base_dir)
|
||||
if credentials and session_id:
|
||||
logger.debug(f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'.")
|
||||
save_credentials_to_session(session_id, credentials) # Cache for current session
|
||||
|
||||
if not credentials:
|
||||
logger.info(f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'.")
|
||||
return None
|
||||
|
||||
logger.debug(f"[get_credentials] Credentials found. Scopes: {credentials.scopes}, Valid: {credentials.valid}, Expired: {credentials.expired}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user