Files
google-mcp/core/server.py
T

612 lines
24 KiB
Python
Raw Normal View History

import hashlib
import logging
import os
2026-02-28 11:50:09 -04:00
from typing import List, Optional
2025-06-14 13:15:36 -04:00
from importlib import metadata
2025-05-06 09:36:48 -04:00
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
2025-08-02 18:55:35 -04:00
from starlette.applications import Starlette
2026-03-01 17:59:45 -05:00
from starlette.datastructures import MutableHeaders
from starlette.types import Scope, Receive, Send
2025-05-22 17:02:00 -04:00
from starlette.requests import Request
from starlette.middleware import Middleware
2025-05-06 09:36:48 -04:00
from fastmcp import FastMCP
2025-10-05 18:00:10 -04:00
from fastmcp.server.auth.providers.google import GoogleProvider
from auth.oauth21_session_store import get_oauth21_session_store, set_auth_provider
from auth.google_auth import handle_auth_callback, start_auth_flow, check_client_secrets
2026-01-08 15:26:35 +01:00
from auth.oauth_config import is_oauth21_enabled, is_external_oauth21_provider
2025-08-02 18:55:35 -04:00
from auth.mcp_session_middleware import MCPSessionMiddleware
2025-12-13 13:49:28 -08:00
from auth.oauth_responses import (
create_error_response,
create_success_response,
create_server_error_response,
)
2025-08-05 10:22:01 -04:00
from auth.auth_info_middleware import AuthInfoMiddleware
2025-12-13 13:49:28 -08:00
from auth.scopes import SCOPES, get_current_scopes # noqa
2025-08-03 10:30:04 -04:00
from core.config import (
USER_GOOGLE_EMAIL,
get_transport_mode,
set_transport_mode as _set_transport_mode,
get_oauth_redirect_uri as get_oauth_redirect_uri_for_current_mode,
2025-05-13 12:36:53 -04:00
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
2025-10-05 18:00:10 -04:00
_auth_provider: Optional[GoogleProvider] = None
_legacy_callback_registered = False
session_middleware = Middleware(MCPSessionMiddleware)
2025-12-13 13:49:28 -08:00
2026-03-01 17:59:45 -05:00
class WellKnownCacheControlMiddleware:
"""Force no-cache headers for OAuth well-known discovery endpoints."""
def __init__(self, app):
self.app = app
2026-03-01 17:59:45 -05:00
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
2026-03-01 17:59:45 -05:00
path = scope.get("path", "")
2026-03-01 18:21:12 -05:00
is_oauth_well_known = (
path == "/.well-known/oauth-authorization-server"
or path.startswith("/.well-known/oauth-authorization-server/")
or path == "/.well-known/oauth-protected-resource"
or path.startswith("/.well-known/oauth-protected-resource/")
)
2026-03-01 17:59:45 -05:00
if not is_oauth_well_known:
await self.app(scope, receive, send)
return
2026-03-01 17:59:45 -05:00
async def send_with_no_cache_headers(message):
if message["type"] == "http.response.start":
headers = MutableHeaders(raw=message.setdefault("headers", []))
headers["Cache-Control"] = "no-store, must-revalidate"
headers["ETag"] = f'"{_compute_scope_fingerprint()}"'
await send(message)
2026-03-01 17:59:45 -05:00
await self.app(scope, receive, send_with_no_cache_headers)
2026-03-01 17:59:45 -05:00
well_known_cache_control_middleware = Middleware(WellKnownCacheControlMiddleware)
def _compute_scope_fingerprint() -> str:
"""Compute a short hash of the current scope configuration for cache-busting."""
scopes_str = ",".join(sorted(get_current_scopes()))
return hashlib.sha256(scopes_str.encode()).hexdigest()[:12]
2026-02-28 11:50:09 -04:00
# Custom FastMCP that adds secure middleware stack for OAuth 2.1
class SecureFastMCP(FastMCP):
2026-02-13 10:20:39 -05:00
def http_app(self, **kwargs) -> "Starlette":
"""Override to add secure middleware stack for OAuth 2.1."""
2026-02-13 10:20:39 -05:00
app = super().http_app(**kwargs)
# Add middleware in order (first added = outermost layer)
2026-03-01 17:59:45 -05:00
app.user_middleware.insert(0, well_known_cache_control_middleware)
2025-08-09 11:44:12 -04:00
# Session Management - extracts session info for MCP context
2026-03-01 17:59:45 -05:00
app.user_middleware.insert(1, session_middleware)
# Rebuild middleware stack
app.middleware_stack = app.build_middleware_stack()
2026-03-03 15:12:33 +00:00
logger.info("Added middleware stack: WellKnownCacheControl, Session Management")
return app
2025-12-13 13:49:28 -08:00
server = SecureFastMCP(
name="google_workspace",
auth=None,
2025-05-11 10:07:37 -04:00
)
2025-08-05 10:22:01 -04:00
# Add the AuthInfo middleware to inject authentication into FastMCP context
auth_info_middleware = AuthInfoMiddleware()
server.add_middleware(auth_info_middleware)
2025-08-03 12:00:41 -04:00
2025-12-27 12:23:49 -08:00
def _parse_bool_env(value: str) -> bool:
"""Parse environment variable string to boolean."""
return value.lower() in ("1", "true", "yes", "on")
def set_transport_mode(mode: str):
"""Sets the transport mode for the server."""
2025-08-03 10:30:04 -04:00
_set_transport_mode(mode)
2025-09-09 11:09:58 -04:00
logger.info(f"Transport: {mode}")
2025-10-05 18:00:10 -04:00
def _ensure_legacy_callback_route() -> None:
global _legacy_callback_registered
if _legacy_callback_registered:
return
server.custom_route("/oauth2callback", methods=["GET"])(legacy_oauth2_callback)
_legacy_callback_registered = True
2025-12-13 13:49:28 -08:00
def configure_server_for_http():
"""
Configures the authentication provider for HTTP transport.
This must be called BEFORE server.run().
"""
2025-08-02 14:32:42 -04:00
global _auth_provider
transport_mode = get_transport_mode()
2025-08-02 14:32:42 -04:00
if transport_mode != "streamable-http":
return
# Use centralized OAuth configuration
from auth.oauth_config import get_oauth_config
2025-12-13 13:49:28 -08:00
config = get_oauth_config()
2025-08-12 08:45:24 -04:00
# Check if OAuth 2.1 is enabled via centralized config
oauth21_enabled = config.is_oauth21_enabled()
if oauth21_enabled:
if not config.is_configured():
2025-09-09 11:13:30 -04:00
logger.warning("OAuth 2.1 enabled but OAuth credentials not configured")
return
2025-08-03 12:15:44 -04:00
2025-12-27 12:20:26 -08:00
def validate_and_derive_jwt_key(
jwt_signing_key_override: str | None, client_secret: str
) -> bytes:
2025-12-27 12:16:58 -08:00
"""Validate JWT signing key override and derive the final JWT key."""
if jwt_signing_key_override:
if len(jwt_signing_key_override) < 12:
logger.warning(
"OAuth 2.1: FASTMCP_SERVER_AUTH_GOOGLE_JWT_SIGNING_KEY is less than 12 characters; "
"use a longer secret to improve key derivation strength."
)
return derive_jwt_key(
low_entropy_material=jwt_signing_key_override,
salt="fastmcp-jwt-signing-key",
)
else:
return derive_jwt_key(
high_entropy_material=client_secret,
salt="fastmcp-jwt-signing-key",
)
2025-08-12 08:45:24 -04:00
try:
2025-12-27 12:23:49 -08:00
# Import common dependencies for storage backends
from key_value.aio.wrappers.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
from fastmcp.server.auth.jwt_issuer import derive_jwt_key
2025-10-05 18:00:10 -04:00
required_scopes: List[str] = sorted(get_current_scopes())
client_storage = None
jwt_signing_key_override = (
os.getenv("FASTMCP_SERVER_AUTH_GOOGLE_JWT_SIGNING_KEY", "").strip()
or None
)
2025-12-27 12:08:09 -08:00
storage_backend = (
os.getenv("WORKSPACE_MCP_OAUTH_PROXY_STORAGE_BACKEND", "")
.strip()
.lower()
)
valkey_host = os.getenv("WORKSPACE_MCP_OAUTH_PROXY_VALKEY_HOST", "").strip()
# Determine storage backend: valkey, disk, memory (default)
use_valkey = storage_backend == "valkey" or bool(valkey_host)
use_disk = storage_backend == "disk"
if use_valkey:
try:
from key_value.aio.stores.valkey import ValkeyStore
valkey_port_raw = os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_PORT", "6379"
).strip()
valkey_db_raw = os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_DB", "0"
).strip()
valkey_port = int(valkey_port_raw)
valkey_db = int(valkey_db_raw)
valkey_use_tls_raw = os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_USE_TLS", ""
).strip()
2025-12-27 12:23:49 -08:00
valkey_use_tls = (
2025-12-27 12:24:39 -08:00
_parse_bool_env(valkey_use_tls_raw)
if valkey_use_tls_raw
2025-12-27 12:23:49 -08:00
else valkey_port == 6380
)
valkey_request_timeout_ms_raw = os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_REQUEST_TIMEOUT_MS", ""
).strip()
valkey_connection_timeout_ms_raw = os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_CONNECTION_TIMEOUT_MS", ""
).strip()
valkey_request_timeout_ms = (
int(valkey_request_timeout_ms_raw)
if valkey_request_timeout_ms_raw
else None
)
valkey_connection_timeout_ms = (
int(valkey_connection_timeout_ms_raw)
if valkey_connection_timeout_ms_raw
else None
)
valkey_username = (
os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_USERNAME", ""
).strip()
or None
)
valkey_password = (
os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_VALKEY_PASSWORD", ""
).strip()
or None
)
if not valkey_host:
valkey_host = "localhost"
client_storage = ValkeyStore(
host=valkey_host,
port=valkey_port,
db=valkey_db,
username=valkey_username,
password=valkey_password,
)
# Configure TLS and timeouts on the underlying Glide client config.
# ValkeyStore currently doesn't expose these settings directly.
glide_config = getattr(client_storage, "_client_config", None)
if glide_config is not None:
glide_config.use_tls = valkey_use_tls
is_remote_host = valkey_host not in {"localhost", "127.0.0.1"}
if valkey_request_timeout_ms is None and (
valkey_use_tls or is_remote_host
):
# Glide defaults to 250ms if unset; increase for remote/TLS endpoints.
valkey_request_timeout_ms = 5000
if valkey_request_timeout_ms is not None:
glide_config.request_timeout = valkey_request_timeout_ms
if valkey_connection_timeout_ms is None and (
valkey_use_tls or is_remote_host
):
valkey_connection_timeout_ms = 10000
if valkey_connection_timeout_ms is not None:
from glide_shared.config import (
AdvancedGlideClientConfiguration,
)
glide_config.advanced_config = (
AdvancedGlideClientConfiguration(
connection_timeout=valkey_connection_timeout_ms
)
)
2025-12-27 12:20:26 -08:00
jwt_signing_key = validate_and_derive_jwt_key(
jwt_signing_key_override, config.client_secret
)
storage_encryption_key = derive_jwt_key(
high_entropy_material=jwt_signing_key.decode(),
salt="fastmcp-storage-encryption-key",
)
client_storage = FernetEncryptionWrapper(
key_value=client_storage,
fernet=Fernet(key=storage_encryption_key),
)
logger.info(
"OAuth 2.1: Using ValkeyStore for FastMCP OAuth proxy client_storage (host=%s, port=%s, db=%s, tls=%s)",
valkey_host,
valkey_port,
valkey_db,
valkey_use_tls,
)
if valkey_request_timeout_ms is not None:
logger.info(
"OAuth 2.1: Valkey request timeout set to %sms",
valkey_request_timeout_ms,
)
if valkey_connection_timeout_ms is not None:
logger.info(
"OAuth 2.1: Valkey connection timeout set to %sms",
valkey_connection_timeout_ms,
)
logger.info(
"OAuth 2.1: Applied Fernet encryption wrapper to Valkey client_storage (key derived from FASTMCP_SERVER_AUTH_GOOGLE_JWT_SIGNING_KEY or GOOGLE_OAUTH_CLIENT_SECRET)."
)
except ImportError as exc:
logger.warning(
"OAuth 2.1: Valkey client_storage requested but Valkey dependencies are not installed (%s). "
2025-12-28 12:52:40 -08:00
"Install 'workspace-mcp[valkey]' (or 'py-key-value-aio[valkey]', which includes 'valkey-glide') "
"or unset WORKSPACE_MCP_OAUTH_PROXY_STORAGE_BACKEND/WORKSPACE_MCP_OAUTH_PROXY_VALKEY_HOST.",
exc,
)
except ValueError as exc:
logger.warning(
"OAuth 2.1: Invalid Valkey configuration; falling back to default storage (%s).",
exc,
)
elif use_disk:
try:
from key_value.aio.stores.disk import DiskStore
disk_directory = os.getenv(
"WORKSPACE_MCP_OAUTH_PROXY_DISK_DIRECTORY", ""
).strip()
if not disk_directory:
# Default to FASTMCP_HOME/oauth-proxy or ~/.fastmcp/oauth-proxy
fastmcp_home = os.getenv("FASTMCP_HOME", "").strip()
if fastmcp_home:
disk_directory = os.path.join(fastmcp_home, "oauth-proxy")
else:
2025-12-27 12:08:09 -08:00
disk_directory = os.path.expanduser(
"~/.fastmcp/oauth-proxy"
)
client_storage = DiskStore(directory=disk_directory)
2025-12-27 12:20:26 -08:00
jwt_signing_key = validate_and_derive_jwt_key(
jwt_signing_key_override, config.client_secret
)
storage_encryption_key = derive_jwt_key(
high_entropy_material=jwt_signing_key.decode(),
salt="fastmcp-storage-encryption-key",
)
client_storage = FernetEncryptionWrapper(
key_value=client_storage,
fernet=Fernet(key=storage_encryption_key),
)
logger.info(
"OAuth 2.1: Using DiskStore for FastMCP OAuth proxy client_storage (directory=%s)",
disk_directory,
)
except ImportError as exc:
logger.warning(
"OAuth 2.1: Disk storage requested but dependencies not available (%s). "
"Falling back to default storage.",
exc,
)
elif storage_backend == "memory":
from key_value.aio.stores.memory import MemoryStore
2025-12-27 12:08:09 -08:00
client_storage = MemoryStore()
2025-12-27 12:08:09 -08:00
logger.info(
"OAuth 2.1: Using MemoryStore for FastMCP OAuth proxy client_storage"
)
# else: client_storage remains None, FastMCP uses its default
2025-12-27 12:23:49 -08:00
# Ensure JWT signing key is always derived for all storage backends
2025-12-27 12:24:39 -08:00
if "jwt_signing_key" not in locals():
2025-12-27 12:23:49 -08:00
jwt_signing_key = validate_and_derive_jwt_key(
jwt_signing_key_override, config.client_secret
)
# Check if external OAuth provider is configured
if config.is_external_oauth21_provider():
# External OAuth mode: use custom provider that handles ya29.* access tokens
from auth.external_oauth_provider import ExternalOAuthProvider
provider = ExternalOAuthProvider(
client_id=config.client_id,
client_secret=config.client_secret,
base_url=config.get_oauth_base_url(),
redirect_path=config.redirect_path,
required_scopes=required_scopes,
resource_server_url=config.get_oauth_base_url(),
)
server.auth = provider
2026-01-28 16:39:53 -05:00
logger.info("OAuth 2.1 enabled with EXTERNAL provider mode")
2025-12-13 13:49:28 -08:00
logger.info(
"Expecting Authorization bearer tokens in tool call headers"
)
logger.info(
"Protected resource metadata points to Google's authorization server"
)
else:
# Standard OAuth 2.1 mode: use FastMCP's GoogleProvider
provider = GoogleProvider(
client_id=config.client_id,
client_secret=config.client_secret,
base_url=config.get_oauth_base_url(),
redirect_path=config.redirect_path,
required_scopes=required_scopes,
client_storage=client_storage,
2025-12-27 12:23:49 -08:00
jwt_signing_key=jwt_signing_key,
)
# Enable protocol-level auth
server.auth = provider
2025-12-13 13:49:28 -08:00
logger.info(
"OAuth 2.1 enabled using FastMCP GoogleProvider with protocol-level auth"
)
2026-01-28 13:07:03 -05:00
# Always set auth provider for token validation in middleware
2025-10-05 18:00:10 -04:00
set_auth_provider(provider)
_auth_provider = provider
except Exception as exc:
2025-12-13 13:49:28 -08:00
logger.error(
"Failed to initialize FastMCP GoogleProvider: %s", exc, exc_info=True
)
2025-08-12 08:45:24 -04:00
raise
else:
logger.info("OAuth 2.0 mode - Server will use legacy authentication.")
server.auth = None
2025-10-05 18:00:10 -04:00
_auth_provider = None
set_auth_provider(None)
_ensure_legacy_callback_route()
2025-07-19 19:33:55 -04:00
2025-10-05 18:00:10 -04:00
def get_auth_provider() -> Optional[GoogleProvider]:
"""Gets the global authentication provider instance."""
2025-08-02 14:32:42 -04:00
return _auth_provider
2025-07-19 19:33:55 -04:00
2025-12-13 13:49:28 -08:00
2026-02-08 16:09:37 -05:00
@server.custom_route("/", methods=["GET"])
2025-06-07 18:49:54 -04:00
@server.custom_route("/health", methods=["GET"])
async def health_check(request: Request):
2025-06-14 13:15:36 -04:00
try:
2025-06-14 13:32:30 -04:00
version = metadata.version("workspace-mcp")
2025-06-14 13:15:36 -04:00
except metadata.PackageNotFoundError:
version = "dev"
2025-12-13 13:49:28 -08:00
return JSONResponse(
{
"status": "healthy",
"service": "workspace-mcp",
"version": version,
"transport": get_transport_mode(),
}
)
2025-06-07 18:49:54 -04:00
@server.custom_route("/attachments/{file_id}", methods=["GET"])
2026-02-28 17:12:30 -04:00
async def serve_attachment(request: Request):
"""Serve a stored attachment file."""
from core.attachment_storage import get_attachment_storage
2025-12-13 13:49:28 -08:00
2026-02-28 17:12:30 -04:00
file_id = request.path_params["file_id"]
storage = get_attachment_storage()
metadata = storage.get_attachment_metadata(file_id)
2025-12-13 13:49:28 -08:00
if not metadata:
return JSONResponse(
2025-12-13 13:49:28 -08:00
{"error": "Attachment not found or expired"}, status_code=404
)
2025-12-13 13:49:28 -08:00
file_path = storage.get_attachment_path(file_id)
if not file_path:
2025-12-13 13:49:28 -08:00
return JSONResponse({"error": "Attachment file not found"}, status_code=404)
return FileResponse(
path=str(file_path),
filename=metadata["filename"],
2025-12-13 13:49:28 -08:00
media_type=metadata["mime_type"],
)
2025-12-13 13:49:28 -08:00
2025-10-05 18:00:10 -04:00
async def legacy_oauth2_callback(request: Request) -> HTMLResponse:
state = request.query_params.get("state")
code = request.query_params.get("code")
error = request.query_params.get("error")
if error:
2025-12-13 13:49:28 -08:00
msg = (
f"Authentication failed: Google returned an error: {error}. State: {state}."
)
logger.error(msg)
return create_error_response(msg)
if not code:
msg = "Authentication failed: No authorization code received from Google."
logger.error(msg)
return create_error_response(msg)
2025-05-11 10:07:37 -04:00
try:
error_message = check_client_secrets()
if error_message:
return create_server_error_response(error_message)
2026-02-10 17:00:41 -05:00
logger.info("OAuth callback: Received authorization code.")
2025-05-13 12:36:53 -04:00
2025-09-28 16:08:41 -04:00
mcp_session_id = None
2025-12-13 13:49:28 -08:00
if hasattr(request, "state") and hasattr(request.state, "session_id"):
2025-09-28 16:08:41 -04:00
mcp_session_id = request.state.session_id
verified_user_id, credentials = handle_auth_callback(
scopes=get_current_scopes(),
authorization_response=str(request.url),
redirect_uri=get_oauth_redirect_uri_for_current_mode(),
2025-12-13 13:49:28 -08:00
session_id=mcp_session_id,
2025-05-11 10:07:37 -04:00
)
2025-05-13 12:36:53 -04:00
2025-12-13 13:49:28 -08:00
logger.info(
f"OAuth callback: Successfully authenticated user: {verified_user_id}."
)
2025-08-02 14:32:42 -04:00
try:
store = get_oauth21_session_store()
store.store_session(
user_email=verified_user_id,
access_token=credentials.token,
refresh_token=credentials.refresh_token,
token_uri=credentials.token_uri,
client_id=credentials.client_id,
client_secret=credentials.client_secret,
scopes=credentials.scopes,
expiry=credentials.expiry,
session_id=f"google-{state}",
mcp_session_id=mcp_session_id,
)
2025-12-13 13:49:28 -08:00
logger.info(
f"Stored Google credentials in OAuth 2.1 session store for {verified_user_id}"
)
except Exception as e:
logger.error(f"Failed to store credentials in OAuth 2.1 store: {e}")
2025-05-13 12:36:53 -04:00
2025-06-07 16:16:48 -04:00
return create_success_response(verified_user_id)
2025-05-11 10:07:37 -04:00
except Exception as e:
logger.error(f"Error processing OAuth callback: {str(e)}", exc_info=True)
2025-06-07 16:16:48 -04:00
return create_server_error_response(str(e))
2025-12-13 13:49:28 -08:00
@server.tool()
2025-12-13 13:49:28 -08:00
async def start_google_auth(
service_name: str, user_google_email: str = USER_GOOGLE_EMAIL
) -> str:
2025-08-10 17:40:27 -04:00
"""
Manually initiate Google OAuth authentication flow.
2025-08-12 08:45:24 -04:00
2026-01-08 15:26:35 +01:00
NOTE: This is a legacy OAuth 2.0 tool and is disabled when OAuth 2.1 is enabled.
The authentication system automatically handles credential checks and prompts for
authentication when needed. Only use this tool if:
2025-08-10 17:40:27 -04:00
1. You need to re-authenticate with different credentials
2. You want to proactively authenticate before using other tools
3. The automatic authentication flow failed and you need to retry
2025-08-12 08:45:24 -04:00
In most cases, simply try calling the Google Workspace tool you need - it will
2025-08-10 17:40:27 -04:00
automatically handle authentication if required.
"""
2026-01-08 15:26:35 +01:00
if is_oauth21_enabled():
if is_external_oauth21_provider():
return (
"start_google_auth is disabled when OAuth 2.1 is enabled. "
"Provide a valid OAuth 2.1 bearer token in the Authorization header "
"and retry the original tool."
)
return (
"start_google_auth is disabled when OAuth 2.1 is enabled. "
"Authenticate through your MCP client's OAuth 2.1 flow and retry the "
"original tool."
)
if not user_google_email:
raise ValueError("user_google_email must be provided.")
2025-08-03 12:15:44 -04:00
error_message = check_client_secrets()
if error_message:
return f"**Authentication Error:** {error_message}"
try:
auth_message = await start_auth_flow(
user_google_email=user_google_email,
service_name=service_name,
2025-12-13 13:49:28 -08:00
redirect_uri=get_oauth_redirect_uri_for_current_mode(),
)
return auth_message
except Exception as e:
logger.error(f"Failed to start Google authentication flow: {e}", exc_info=True)
return f"**Error:** An unexpected error occurred: {e}"