feat: abstract credential store
This commit is contained in:
246
auth/credential_store.py
Normal file
246
auth/credential_store.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""
|
||||
Credential Store API for Google Workspace MCP
|
||||
|
||||
This module provides a standardized interface for credential storage and retrieval,
|
||||
supporting multiple backends configurable via environment variables.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional, List
|
||||
from datetime import datetime
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CredentialStore(ABC):
|
||||
"""Abstract base class for credential storage."""
|
||||
|
||||
@abstractmethod
|
||||
def get_credential(self, user_email: str) -> Optional[Credentials]:
|
||||
"""
|
||||
Get credentials for a user by email.
|
||||
|
||||
Args:
|
||||
user_email: User's email address
|
||||
|
||||
Returns:
|
||||
Google Credentials object or None if not found
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def store_credential(self, user_email: str, credentials: Credentials) -> bool:
|
||||
"""
|
||||
Store credentials for a user.
|
||||
|
||||
Args:
|
||||
user_email: User's email address
|
||||
credentials: Google Credentials object to store
|
||||
|
||||
Returns:
|
||||
True if successfully stored, False otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_credential(self, user_email: str) -> bool:
|
||||
"""
|
||||
Delete credentials for a user.
|
||||
|
||||
Args:
|
||||
user_email: User's email address
|
||||
|
||||
Returns:
|
||||
True if successfully deleted, False otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_users(self) -> List[str]:
|
||||
"""
|
||||
List all users with stored credentials.
|
||||
|
||||
Returns:
|
||||
List of user email addresses
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class LocalDirectoryCredentialStore(CredentialStore):
|
||||
"""Credential store that uses local JSON files for storage."""
|
||||
|
||||
def __init__(self, base_dir: Optional[str] = None):
|
||||
"""
|
||||
Initialize the local JSON credential store.
|
||||
|
||||
Args:
|
||||
base_dir: Base directory for credential files. If None, uses the directory
|
||||
configured by the GOOGLE_MCP_CREDENTIALS_DIR environment variable,
|
||||
or defaults to ~/.google_workspace_mcp/credentials if the environment
|
||||
variable is not set.
|
||||
"""
|
||||
if base_dir is None:
|
||||
if os.getenv("GOOGLE_MCP_CREDENTIALS_DIR"):
|
||||
base_dir = os.getenv("GOOGLE_MCP_CREDENTIALS_DIR")
|
||||
else:
|
||||
home_dir = os.path.expanduser("~")
|
||||
if home_dir and home_dir != "~":
|
||||
base_dir = os.path.join(
|
||||
home_dir, ".google_workspace_mcp", "credentials"
|
||||
)
|
||||
else:
|
||||
base_dir = os.path.join(os.getcwd(), ".credentials")
|
||||
|
||||
self.base_dir = base_dir
|
||||
logger.info(f"LocalJsonCredentialStore initialized with base_dir: {base_dir}")
|
||||
|
||||
def _get_credential_path(self, user_email: str) -> str:
|
||||
"""Get the file path for a user's credentials."""
|
||||
if not os.path.exists(self.base_dir):
|
||||
os.makedirs(self.base_dir)
|
||||
logger.info(f"Created credentials directory: {self.base_dir}")
|
||||
return os.path.join(self.base_dir, f"{user_email}.json")
|
||||
|
||||
def get_credential(self, user_email: str) -> Optional[Credentials]:
|
||||
"""Get credentials from local JSON file."""
|
||||
creds_path = self._get_credential_path(user_email)
|
||||
|
||||
if not os.path.exists(creds_path):
|
||||
logger.debug(f"No credential file found for {user_email} at {creds_path}")
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(creds_path, "r") as f:
|
||||
creds_data = json.load(f)
|
||||
|
||||
# Parse expiry if present
|
||||
expiry = None
|
||||
if creds_data.get("expiry"):
|
||||
try:
|
||||
expiry = datetime.fromisoformat(creds_data["expiry"])
|
||||
# Ensure timezone-naive datetime for Google auth library compatibility
|
||||
if expiry.tzinfo is not None:
|
||||
expiry = expiry.replace(tzinfo=None)
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.warning(f"Could not parse expiry time for {user_email}: {e}")
|
||||
|
||||
credentials = Credentials(
|
||||
token=creds_data.get("token"),
|
||||
refresh_token=creds_data.get("refresh_token"),
|
||||
token_uri=creds_data.get("token_uri"),
|
||||
client_id=creds_data.get("client_id"),
|
||||
client_secret=creds_data.get("client_secret"),
|
||||
scopes=creds_data.get("scopes"),
|
||||
expiry=expiry,
|
||||
)
|
||||
|
||||
logger.debug(f"Loaded credentials for {user_email} from {creds_path}")
|
||||
return credentials
|
||||
|
||||
except (IOError, json.JSONDecodeError, KeyError) as e:
|
||||
logger.error(
|
||||
f"Error loading credentials for {user_email} from {creds_path}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
def store_credential(self, user_email: str, credentials: Credentials) -> bool:
|
||||
"""Store credentials to local JSON file."""
|
||||
creds_path = self._get_credential_path(user_email)
|
||||
|
||||
creds_data = {
|
||||
"token": credentials.token,
|
||||
"refresh_token": credentials.refresh_token,
|
||||
"token_uri": credentials.token_uri,
|
||||
"client_id": credentials.client_id,
|
||||
"client_secret": credentials.client_secret,
|
||||
"scopes": credentials.scopes,
|
||||
"expiry": credentials.expiry.isoformat() if credentials.expiry else None,
|
||||
}
|
||||
|
||||
try:
|
||||
with open(creds_path, "w") as f:
|
||||
json.dump(creds_data, f, indent=2)
|
||||
logger.info(f"Stored credentials for {user_email} to {creds_path}")
|
||||
return True
|
||||
except IOError as e:
|
||||
logger.error(
|
||||
f"Error storing credentials for {user_email} to {creds_path}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
def delete_credential(self, user_email: str) -> bool:
|
||||
"""Delete credential file for a user."""
|
||||
creds_path = self._get_credential_path(user_email)
|
||||
|
||||
try:
|
||||
if os.path.exists(creds_path):
|
||||
os.remove(creds_path)
|
||||
logger.info(f"Deleted credentials for {user_email} from {creds_path}")
|
||||
return True
|
||||
else:
|
||||
logger.debug(
|
||||
f"No credential file to delete for {user_email} at {creds_path}"
|
||||
)
|
||||
return True # Consider it a success if file doesn't exist
|
||||
except IOError as e:
|
||||
logger.error(
|
||||
f"Error deleting credentials for {user_email} from {creds_path}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
def list_users(self) -> List[str]:
|
||||
"""List all users with credential files."""
|
||||
if not os.path.exists(self.base_dir):
|
||||
return []
|
||||
|
||||
users = []
|
||||
try:
|
||||
for filename in os.listdir(self.base_dir):
|
||||
if filename.endswith(".json"):
|
||||
user_email = filename[:-5] # Remove .json extension
|
||||
users.append(user_email)
|
||||
logger.debug(
|
||||
f"Found {len(users)} users with credentials in {self.base_dir}"
|
||||
)
|
||||
except OSError as e:
|
||||
logger.error(f"Error listing credential files in {self.base_dir}: {e}")
|
||||
|
||||
return sorted(users)
|
||||
|
||||
|
||||
# Global credential store instance
|
||||
_credential_store: Optional[CredentialStore] = None
|
||||
|
||||
|
||||
def get_credential_store() -> CredentialStore:
|
||||
"""
|
||||
Get the global credential store instance.
|
||||
|
||||
Returns:
|
||||
Configured credential store instance
|
||||
"""
|
||||
global _credential_store
|
||||
|
||||
if _credential_store is None:
|
||||
# always use LocalJsonCredentialStore as the default
|
||||
# Future enhancement: support other backends via environment variables
|
||||
_credential_store = LocalDirectoryCredentialStore()
|
||||
logger.info(f"Initialized credential store: {type(_credential_store).__name__}")
|
||||
|
||||
return _credential_store
|
||||
|
||||
|
||||
def set_credential_store(store: CredentialStore):
|
||||
"""
|
||||
Set the global credential store instance.
|
||||
|
||||
Args:
|
||||
store: Credential store instance to use
|
||||
"""
|
||||
global _credential_store
|
||||
_credential_store = store
|
||||
logger.info(f"Set credential store: {type(store).__name__}")
|
||||
Reference in New Issue
Block a user