Merge pull request #510 from taylorwilsdon/issues/503

enh: Partial Scope Grants & Granular Permissions
This commit is contained in:
Taylor Wilsdon
2026-02-28 16:58:44 -04:00
committed by GitHub
13 changed files with 845 additions and 10 deletions

View File

@@ -560,6 +560,22 @@ Read-only mode provides secure, restricted access by:
- Automatically filtering out tools that require write permissions at startup - Automatically filtering out tools that require write permissions at startup
- Allowing read operations: list, get, search, and export across all services - Allowing read operations: list, get, search, and export across all services
**🔐 Granular Permissions**
```bash
# Per-service permission levels
uv run main.py --permissions gmail:organize drive:readonly
# Combine permissions with tier filtering
uv run main.py --permissions gmail:send drive:full --tool-tier core
```
Granular permissions mode provides service-by-service scope control:
- Format: `service:level` (one entry per service)
- Gmail levels: `readonly`, `organize`, `drafts`, `send`, `full` (cumulative)
- Other services currently support: `readonly`, `full`
- `--permissions` and `--read-only` are mutually exclusive
- `--permissions` cannot be combined with `--tools`; enabled services are determined by the `--permissions` entries (optionally filtered by `--tool-tier`)
- With `--tool-tier`, only tier-matched tools are enabled and only services that have tools in the selected tier are imported
**★ Tool Tiers** **★ Tool Tiers**
```bash ```bash
uv run main.py --tool-tier core # ● Essential tools only uv run main.py --tool-tier core # ● Essential tools only
@@ -738,6 +754,9 @@ uv run main.py --tool-tier complete # Enable all availabl
uv run main.py --tools gmail drive --tool-tier core # Core tools for specific services uv run main.py --tools gmail drive --tool-tier core # Core tools for specific services
uv run main.py --tools gmail --tool-tier extended # Extended Gmail functionality only uv run main.py --tools gmail --tool-tier extended # Extended Gmail functionality only
uv run main.py --tools docs sheets --tool-tier complete # Full access to Docs and Sheets uv run main.py --tools docs sheets --tool-tier complete # Full access to Docs and Sheets
# Combine tier selection with granular permission levels
uv run main.py --permissions gmail:organize drive:full --tool-tier core
``` ```
## 📋 Credential Configuration ## 📋 Credential Configuration

View File

@@ -495,6 +495,12 @@ def handle_auth_callback(
) )
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
# Allow partial scope grants without raising an exception.
# When users decline some scopes on Google's consent screen,
# oauthlib raises because the granted scopes differ from requested.
if "OAUTHLIB_RELAX_TOKEN_SCOPE" not in os.environ:
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"
store = get_oauth21_session_store() store = get_oauth21_session_store()
parsed_response = urlparse(authorization_response) parsed_response = urlparse(authorization_response)
state_values = parse_qs(parsed_response.query).get("state") state_values = parse_qs(parsed_response.query).get("state")
@@ -522,6 +528,29 @@ def handle_auth_callback(
credentials = flow.credentials credentials = flow.credentials
logger.info("Successfully exchanged authorization code for tokens.") logger.info("Successfully exchanged authorization code for tokens.")
# Handle partial OAuth grants: if the user declined some scopes on
# Google's consent screen, credentials.granted_scopes contains only
# what was actually authorized. Store those instead of the inflated
# requested scopes so that refresh() sends the correct scope set.
granted = getattr(credentials, "granted_scopes", None)
if granted and set(granted) != set(credentials.scopes or []):
logger.warning(
"Partial OAuth grant detected. Requested: %s, Granted: %s",
credentials.scopes,
granted,
)
credentials = Credentials(
token=credentials.token,
refresh_token=credentials.refresh_token,
id_token=getattr(credentials, "id_token", None),
token_uri=credentials.token_uri,
client_id=credentials.client_id,
client_secret=credentials.client_secret,
scopes=list(granted),
expiry=credentials.expiry,
quota_project_id=getattr(credentials, "quota_project_id", None),
)
# Get user info to determine user_id (using email here) # Get user info to determine user_id (using email here)
user_info = get_user_info(credentials) user_info = get_user_info(credentials)
if not user_info or "email" not in user_info: if not user_info or "email" not in user_info:

248
auth/permissions.py Normal file
View File

@@ -0,0 +1,248 @@
"""
Granular per-service permission levels.
Each service has named permission levels (cumulative), mapping to a list of
OAuth scopes. The levels for a service are ordered from least to most
permissive — requesting level N implicitly includes all scopes from levels < N.
Usage:
--permissions gmail:organize drive:readonly
Gmail levels: readonly, organize, drafts, send, full
Other services: readonly, full (extensible by adding entries to SERVICE_PERMISSION_LEVELS)
"""
import logging
from typing import Dict, List, Optional, Tuple
from auth.scopes import (
GMAIL_READONLY_SCOPE,
GMAIL_LABELS_SCOPE,
GMAIL_MODIFY_SCOPE,
GMAIL_COMPOSE_SCOPE,
GMAIL_SEND_SCOPE,
GMAIL_SETTINGS_BASIC_SCOPE,
DRIVE_READONLY_SCOPE,
DRIVE_FILE_SCOPE,
DRIVE_SCOPE,
CALENDAR_READONLY_SCOPE,
CALENDAR_EVENTS_SCOPE,
CALENDAR_SCOPE,
DOCS_READONLY_SCOPE,
DOCS_WRITE_SCOPE,
SHEETS_READONLY_SCOPE,
SHEETS_WRITE_SCOPE,
CHAT_READONLY_SCOPE,
CHAT_WRITE_SCOPE,
CHAT_SPACES_SCOPE,
CHAT_SPACES_READONLY_SCOPE,
FORMS_BODY_SCOPE,
FORMS_BODY_READONLY_SCOPE,
FORMS_RESPONSES_READONLY_SCOPE,
SLIDES_SCOPE,
SLIDES_READONLY_SCOPE,
TASKS_SCOPE,
TASKS_READONLY_SCOPE,
CONTACTS_SCOPE,
CONTACTS_READONLY_SCOPE,
CUSTOM_SEARCH_SCOPE,
SCRIPT_PROJECTS_SCOPE,
SCRIPT_PROJECTS_READONLY_SCOPE,
SCRIPT_DEPLOYMENTS_SCOPE,
SCRIPT_DEPLOYMENTS_READONLY_SCOPE,
SCRIPT_PROCESSES_READONLY_SCOPE,
SCRIPT_METRICS_SCOPE,
)
logger = logging.getLogger(__name__)
# Ordered permission levels per service.
# Each entry is (level_name, [additional_scopes_at_this_level]).
# Scopes are CUMULATIVE: level N includes all scopes from levels 0..N.
SERVICE_PERMISSION_LEVELS: Dict[str, List[Tuple[str, List[str]]]] = {
"gmail": [
("readonly", [GMAIL_READONLY_SCOPE]),
("organize", [GMAIL_LABELS_SCOPE, GMAIL_MODIFY_SCOPE]),
("drafts", [GMAIL_COMPOSE_SCOPE]),
("send", [GMAIL_SEND_SCOPE]),
("full", [GMAIL_SETTINGS_BASIC_SCOPE]),
],
"drive": [
("readonly", [DRIVE_READONLY_SCOPE]),
("full", [DRIVE_SCOPE, DRIVE_FILE_SCOPE]),
],
"calendar": [
("readonly", [CALENDAR_READONLY_SCOPE]),
("full", [CALENDAR_SCOPE, CALENDAR_EVENTS_SCOPE]),
],
"docs": [
("readonly", [DOCS_READONLY_SCOPE, DRIVE_READONLY_SCOPE]),
("full", [DOCS_WRITE_SCOPE, DRIVE_READONLY_SCOPE, DRIVE_FILE_SCOPE]),
],
"sheets": [
("readonly", [SHEETS_READONLY_SCOPE, DRIVE_READONLY_SCOPE]),
("full", [SHEETS_WRITE_SCOPE, DRIVE_READONLY_SCOPE]),
],
"chat": [
("readonly", [CHAT_READONLY_SCOPE, CHAT_SPACES_READONLY_SCOPE]),
("full", [CHAT_WRITE_SCOPE, CHAT_SPACES_SCOPE]),
],
"forms": [
("readonly", [FORMS_BODY_READONLY_SCOPE, FORMS_RESPONSES_READONLY_SCOPE]),
("full", [FORMS_BODY_SCOPE, FORMS_RESPONSES_READONLY_SCOPE]),
],
"slides": [
("readonly", [SLIDES_READONLY_SCOPE]),
("full", [SLIDES_SCOPE]),
],
"tasks": [
("readonly", [TASKS_READONLY_SCOPE]),
("full", [TASKS_SCOPE]),
],
"contacts": [
("readonly", [CONTACTS_READONLY_SCOPE]),
("full", [CONTACTS_SCOPE]),
],
"search": [
("readonly", [CUSTOM_SEARCH_SCOPE]),
("full", [CUSTOM_SEARCH_SCOPE]),
],
"appscript": [
(
"readonly",
[
SCRIPT_PROJECTS_READONLY_SCOPE,
SCRIPT_DEPLOYMENTS_READONLY_SCOPE,
SCRIPT_PROCESSES_READONLY_SCOPE,
SCRIPT_METRICS_SCOPE,
DRIVE_READONLY_SCOPE,
],
),
(
"full",
[
SCRIPT_PROJECTS_SCOPE,
SCRIPT_DEPLOYMENTS_SCOPE,
SCRIPT_PROCESSES_READONLY_SCOPE,
SCRIPT_METRICS_SCOPE,
DRIVE_FILE_SCOPE,
],
),
],
}
# Module-level state: parsed --permissions config
# Dict mapping service_name -> level_name, e.g. {"gmail": "organize"}
_PERMISSIONS: Optional[Dict[str, str]] = None
def set_permissions(permissions: Dict[str, str]) -> None:
"""Set granular permissions from parsed --permissions argument."""
global _PERMISSIONS
_PERMISSIONS = permissions
logger.info("Granular permissions set: %s", permissions)
def get_permissions() -> Optional[Dict[str, str]]:
"""Return current permissions dict, or None if not using granular mode."""
return _PERMISSIONS
def is_permissions_mode() -> bool:
"""Check if granular permissions mode is active."""
return _PERMISSIONS is not None
def get_scopes_for_permission(service: str, level: str) -> List[str]:
"""
Get cumulative scopes for a service at a given permission level.
Returns all scopes up to and including the named level.
Raises ValueError if service or level is unknown.
"""
levels = SERVICE_PERMISSION_LEVELS.get(service)
if levels is None:
raise ValueError(f"Unknown service: '{service}'")
cumulative: List[str] = []
found = False
for level_name, level_scopes in levels:
cumulative.extend(level_scopes)
if level_name == level:
found = True
break
if not found:
valid = [name for name, _ in levels]
raise ValueError(
f"Unknown permission level '{level}' for service '{service}'. "
f"Valid levels: {valid}"
)
return sorted(set(cumulative))
def get_all_permission_scopes() -> List[str]:
"""
Get the combined scopes for all services at their configured permission levels.
Only meaningful when is_permissions_mode() is True.
"""
if _PERMISSIONS is None:
return []
all_scopes: set = set()
for service, level in _PERMISSIONS.items():
all_scopes.update(get_scopes_for_permission(service, level))
return list(all_scopes)
def get_allowed_scopes_set() -> Optional[set]:
"""
Get the set of allowed scopes under permissions mode (for tool filtering).
Returns None if permissions mode is not active.
"""
if _PERMISSIONS is None:
return None
return set(get_all_permission_scopes())
def get_valid_levels(service: str) -> List[str]:
"""Get valid permission level names for a service."""
levels = SERVICE_PERMISSION_LEVELS.get(service)
if levels is None:
return []
return [name for name, _ in levels]
def parse_permissions_arg(permissions_list: List[str]) -> Dict[str, str]:
"""
Parse --permissions arguments like ["gmail:organize", "drive:full"].
Returns dict mapping service -> level.
Raises ValueError on parse errors (unknown service, invalid level, bad format).
"""
result: Dict[str, str] = {}
for entry in permissions_list:
if ":" not in entry:
raise ValueError(
f"Invalid permission format: '{entry}'. "
f"Expected 'service:level' (e.g., 'gmail:organize', 'drive:readonly')"
)
service, level = entry.split(":", 1)
if service in result:
raise ValueError(f"Duplicate service in permissions: '{service}'")
if service not in SERVICE_PERMISSION_LEVELS:
raise ValueError(
f"Unknown service: '{service}'. "
f"Valid services: {sorted(SERVICE_PERMISSION_LEVELS.keys())}"
)
valid = get_valid_levels(service)
if level not in valid:
raise ValueError(
f"Unknown level '{level}' for service '{service}'. "
f"Valid levels: {valid}"
)
result[service] = level
return result

View File

@@ -291,6 +291,24 @@ def get_scopes_for_tools(enabled_tools=None):
Returns: Returns:
List of unique scopes for the enabled tools plus base scopes. List of unique scopes for the enabled tools plus base scopes.
""" """
# Granular permissions mode overrides both full and read-only scope maps.
# Lazy import with guard to avoid circular dependency during module init
# (SCOPES = get_scopes_for_tools() runs at import time before auth.permissions
# is fully loaded, but permissions mode is never active at that point).
try:
from auth.permissions import is_permissions_mode, get_all_permission_scopes
if is_permissions_mode():
scopes = BASE_SCOPES.copy()
scopes.extend(get_all_permission_scopes())
logger.debug(
"Generated scopes from granular permissions: %d unique scopes",
len(set(scopes)),
)
return list(set(scopes))
except ImportError:
pass
if enabled_tools is None: if enabled_tools is None:
# Default behavior - return all scopes # Default behavior - return all scopes
enabled_tools = TOOL_SCOPES_MAP.keys() enabled_tools = TOOL_SCOPES_MAP.keys()

View File

@@ -1,3 +1,4 @@
import hashlib
import logging import logging
import os import os
from typing import List, Optional from typing import List, Optional
@@ -6,6 +7,7 @@ from importlib import metadata
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from starlette.applications import Starlette from starlette.applications import Starlette
from starlette.requests import Request from starlette.requests import Request
from starlette.responses import Response
from starlette.middleware import Middleware from starlette.middleware import Middleware
from fastmcp import FastMCP from fastmcp import FastMCP
@@ -38,6 +40,34 @@ _legacy_callback_registered = False
session_middleware = Middleware(MCPSessionMiddleware) session_middleware = Middleware(MCPSessionMiddleware)
def _compute_scope_fingerprint() -> str:
"""Compute a short hash of the current scope configuration for cache-busting."""
scopes_str = ",".join(sorted(get_current_scopes()))
return hashlib.sha256(scopes_str.encode()).hexdigest()[:12]
def _wrap_well_known_endpoint(endpoint, etag: str):
"""Wrap a well-known metadata endpoint to prevent browser caching.
The MCP SDK hardcodes ``Cache-Control: public, max-age=3600`` on discovery
responses. When the server restarts with different ``--permissions`` or
``--read-only`` flags, browsers / MCP clients serve stale metadata that
advertises the wrong scopes, causing OAuth to silently fail.
The wrapper overrides the header to ``no-store`` and adds an ``ETag``
derived from the current scope set so intermediary caches that ignore
``no-store`` still see a fingerprint change.
"""
async def _no_cache_endpoint(request: Request) -> Response:
response = await endpoint(request)
response.headers["Cache-Control"] = "no-store, must-revalidate"
response.headers["ETag"] = etag
return response
return _no_cache_endpoint
# Custom FastMCP that adds secure middleware stack for OAuth 2.1 # Custom FastMCP that adds secure middleware stack for OAuth 2.1
class SecureFastMCP(FastMCP): class SecureFastMCP(FastMCP):
def http_app(self, **kwargs) -> "Starlette": def http_app(self, **kwargs) -> "Starlette":
@@ -387,14 +417,18 @@ def configure_server_for_http():
"OAuth 2.1 enabled using FastMCP GoogleProvider with protocol-level auth" "OAuth 2.1 enabled using FastMCP GoogleProvider with protocol-level auth"
) )
# Explicitly mount well-known routes from the OAuth provider # Mount well-known routes with cache-busting headers.
# These should be auto-mounted but we ensure they're available # The MCP SDK hardcodes Cache-Control: public, max-age=3600
# on discovery responses which causes stale-scope bugs when
# the server is restarted with a different --permissions config.
try: try:
scope_etag = f'"{_compute_scope_fingerprint()}"'
well_known_routes = provider.get_well_known_routes() well_known_routes = provider.get_well_known_routes()
for route in well_known_routes: for route in well_known_routes:
logger.info(f"Mounting OAuth well-known route: {route.path}") logger.info(f"Mounting OAuth well-known route: {route.path}")
wrapped = _wrap_well_known_endpoint(route.endpoint, scope_etag)
server.custom_route(route.path, methods=list(route.methods))( server.custom_route(route.path, methods=list(route.methods))(
route.endpoint wrapped
) )
except Exception as e: except Exception as e:
logger.warning(f"Could not mount well-known routes: {e}") logger.warning(f"Could not mount well-known routes: {e}")

View File

@@ -9,6 +9,7 @@ import logging
from typing import Set, Optional, Callable from typing import Set, Optional, Callable
from auth.oauth_config import is_oauth21_enabled from auth.oauth_config import is_oauth21_enabled
from auth.permissions import is_permissions_mode, get_allowed_scopes_set
from auth.scopes import is_read_only_mode, get_all_read_only_scopes from auth.scopes import is_read_only_mode, get_all_read_only_scopes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -104,7 +105,13 @@ def filter_server_tools(server):
"""Remove disabled tools from the server after registration.""" """Remove disabled tools from the server after registration."""
enabled_tools = get_enabled_tools() enabled_tools = get_enabled_tools()
oauth21_enabled = is_oauth21_enabled() oauth21_enabled = is_oauth21_enabled()
if enabled_tools is None and not oauth21_enabled and not is_read_only_mode(): permissions_mode = is_permissions_mode()
if (
enabled_tools is None
and not oauth21_enabled
and not is_read_only_mode()
and not permissions_mode
):
return return
tools_removed = 0 tools_removed = 0
@@ -126,8 +133,8 @@ def filter_server_tools(server):
tools_to_remove.add("start_google_auth") tools_to_remove.add("start_google_auth")
logger.info("OAuth 2.1 enabled: disabling start_google_auth tool") logger.info("OAuth 2.1 enabled: disabling start_google_auth tool")
# 3. Read-only mode filtering # 3. Read-only mode filtering (skipped when granular permissions are active)
if read_only_mode: if read_only_mode and not permissions_mode:
for tool_name, tool_obj in tool_components.items(): for tool_name, tool_obj in tool_components.items():
if tool_name in tools_to_remove: if tool_name in tools_to_remove:
continue continue
@@ -147,6 +154,32 @@ def filter_server_tools(server):
) )
tools_to_remove.add(tool_name) tools_to_remove.add(tool_name)
# 4. Granular permissions filtering
# No scope hierarchy expansion here — permission levels are already cumulative
# and explicitly define allowed scopes. Hierarchy expansion would defeat the
# purpose (e.g. gmail.modify in the hierarchy covers gmail.send, but the
# "organize" permission level intentionally excludes gmail.send).
if permissions_mode:
perm_allowed = get_allowed_scopes_set() or set()
for tool_name, tool_obj in tool_components.items():
if tool_name in tools_to_remove:
continue
func_to_check = tool_obj
if hasattr(tool_obj, "fn"):
func_to_check = tool_obj.fn
required_scopes = getattr(func_to_check, "_required_google_scopes", [])
if required_scopes:
if not all(scope in perm_allowed for scope in required_scopes):
logger.info(
"Permissions mode: Disabling tool '%s' (requires: %s)",
tool_name,
required_scopes,
)
tools_to_remove.add(tool_name)
for tool_name in tools_to_remove: for tool_name in tools_to_remove:
try: try:
server.local_provider.remove_tool(tool_name) server.local_provider.remove_tool(tool_name)
@@ -167,7 +200,12 @@ def filter_server_tools(server):
if tools_removed > 0: if tools_removed > 0:
enabled_count = len(enabled_tools) if enabled_tools is not None else "all" enabled_count = len(enabled_tools) if enabled_tools is not None else "all"
mode = "Read-Only" if is_read_only_mode() else "Full" if permissions_mode:
mode = "Permissions"
elif is_read_only_mode():
mode = "Read-Only"
else:
mode = "Full"
logger.info( logger.info(
f"Tool filtering: removed {tools_removed} tools, {enabled_count} enabled. Mode: {mode}" f"Tool filtering: removed {tools_removed} tools, {enabled_count} enabled. Mode: {mode}"
) )

92
main.py
View File

@@ -91,6 +91,32 @@ def configure_safe_logging():
handler.setFormatter(safe_formatter) handler.setFormatter(safe_formatter)
def resolve_permissions_mode_selection(
permission_services: list[str], tool_tier: str | None
) -> tuple[list[str], set[str] | None]:
"""
Resolve service imports and optional tool-name filtering for --permissions mode.
When a tier is specified, both:
- imported services are narrowed to services with tier-matched tools
- registered tools are narrowed to the resolved tool names
"""
if tool_tier is None:
return permission_services, None
tier_tools, tier_services = resolve_tools_from_tier(tool_tier, permission_services)
return tier_services, set(tier_tools)
def narrow_permissions_to_services(
permissions: dict[str, str], services: list[str]
) -> dict[str, str]:
"""Restrict permission entries to the provided service list order."""
return {
service: permissions[service] for service in services if service in permissions
}
def main(): def main():
""" """
Main entry point for the Google Workspace MCP server. Main entry point for the Google Workspace MCP server.
@@ -155,6 +181,18 @@ def main():
action="store_true", action="store_true",
help="Run in read-only mode - requests only read-only scopes and disables tools requiring write permissions", help="Run in read-only mode - requests only read-only scopes and disables tools requiring write permissions",
) )
parser.add_argument(
"--permissions",
nargs="+",
metavar="SERVICE:LEVEL",
help=(
"Granular per-service permission levels. Format: service:level. "
"Example: --permissions gmail:organize drive:readonly. "
"Gmail levels: readonly, organize, drafts, send, full (cumulative). "
"Other services: readonly, full. "
"Mutually exclusive with --read-only and --tools."
),
)
args = parser.parse_args() args = parser.parse_args()
# Clean up CLI args - argparse.REMAINDER may include leading dashes from first arg # Clean up CLI args - argparse.REMAINDER may include leading dashes from first arg
@@ -162,6 +200,22 @@ def main():
# Filter out empty strings that might appear # Filter out empty strings that might appear
args.cli = [a for a in args.cli if a] args.cli = [a for a in args.cli if a]
# Validate mutually exclusive flags
if args.permissions and args.read_only:
print(
"Error: --permissions and --read-only are mutually exclusive. "
"Use service:readonly within --permissions instead.",
file=sys.stderr,
)
sys.exit(1)
if args.permissions and args.tools is not None:
print(
"Error: --permissions and --tools cannot be combined. "
"Select services via --permissions (optionally with --tool-tier).",
file=sys.stderr,
)
sys.exit(1)
# Set port and base URI once for reuse throughout the function # Set port and base URI once for reuse throughout the function
port = int(os.getenv("PORT", os.getenv("WORKSPACE_MCP_PORT", 8000))) port = int(os.getenv("PORT", os.getenv("WORKSPACE_MCP_PORT", 8000)))
base_uri = os.getenv("WORKSPACE_MCP_BASE_URI", "http://localhost") base_uri = os.getenv("WORKSPACE_MCP_BASE_URI", "http://localhost")
@@ -184,6 +238,8 @@ def main():
safe_print(f" 👤 Mode: {'Single-user' if args.single_user else 'Multi-user'}") safe_print(f" 👤 Mode: {'Single-user' if args.single_user else 'Multi-user'}")
if args.read_only: if args.read_only:
safe_print(" 🔒 Read-Only: Enabled") safe_print(" 🔒 Read-Only: Enabled")
if args.permissions:
safe_print(" 🔒 Permissions: Granular mode")
safe_print(f" 🐍 Python: {sys.version.split()[0]}") safe_print(f" 🐍 Python: {sys.version.split()[0]}")
safe_print("") safe_print("")
@@ -265,7 +321,36 @@ def main():
} }
# Determine which tools to import based on arguments # Determine which tools to import based on arguments
if args.tool_tier is not None: perms = None
if args.permissions:
# Granular permissions mode — parse and activate before tool selection
from auth.permissions import parse_permissions_arg, set_permissions
try:
perms = parse_permissions_arg(args.permissions)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Permissions implicitly defines which services to load
tools_to_import = list(perms.keys())
set_enabled_tool_names(None)
if args.tool_tier is not None:
# Combine with tier filtering within the permission-selected services
try:
tools_to_import, tier_tool_filter = resolve_permissions_mode_selection(
tools_to_import, args.tool_tier
)
set_enabled_tool_names(tier_tool_filter)
perms = narrow_permissions_to_services(perms, tools_to_import)
except Exception as e:
print(
f"Error loading tools for tier '{args.tool_tier}': {e}",
file=sys.stderr,
)
sys.exit(1)
set_permissions(perms)
elif args.tool_tier is not None:
# Use tier-based tool selection, optionally filtered by services # Use tier-based tool selection, optionally filtered by services
try: try:
tier_tools, suggested_services = resolve_tools_from_tier( tier_tools, suggested_services = resolve_tools_from_tier(
@@ -314,6 +399,11 @@ def main():
except ModuleNotFoundError as exc: except ModuleNotFoundError as exc:
logger.error("Failed to import tool '%s': %s", tool, exc, exc_info=True) logger.error("Failed to import tool '%s': %s", tool, exc, exc_info=True)
safe_print(f" ⚠️ Failed to load {tool.title()} tool module ({exc}).") safe_print(f" ⚠️ Failed to load {tool.title()} tool module ({exc}).")
if perms:
safe_print("🔒 Permission Levels:")
for svc, lvl in sorted(perms.items()):
safe_print(f" {tool_icons.get(svc, ' ')} {svc}: {lvl}")
safe_print("") safe_print("")
# Filter tools based on tier configuration (if tier-based loading is enabled) # Filter tools based on tier configuration (if tier-based loading is enabled)

View File

@@ -108,7 +108,7 @@ where = ["."]
exclude = ["tests*", "docs*", "build", "dist"] exclude = ["tests*", "docs*", "build", "dist"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
collect_ignore_glob = ["**/manual_test.py"] addopts = "--ignore=tests/gappsscript/manual_test.py"
[tool.setuptools.package-data] [tool.setuptools.package-data]
core = ["tool_tiers.yaml"] core = ["tool_tiers.yaml"]

View File

@@ -43,7 +43,7 @@ def _make_attachment(
def _unwrap(tool): def _unwrap(tool):
"""Unwrap a FunctionTool + decorator chain to the original async function.""" """Unwrap a FunctionTool + decorator chain to the original async function."""
fn = tool.fn # FunctionTool stores the wrapped callable in .fn fn = getattr(tool, "fn", tool)
while hasattr(fn, "__wrapped__"): while hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__ fn = fn.__wrapped__
return fn return fn

View File

@@ -0,0 +1,147 @@
"""
Unit tests for create_drive_folder tool.
"""
import os
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from gdrive.drive_tools import _create_drive_folder_impl as _raw_create_drive_folder
def _make_service(created_response):
"""Build a mock Drive service whose files().create().execute returns *created_response*."""
execute = MagicMock(return_value=created_response)
create = MagicMock()
create.return_value.execute = execute
files = MagicMock()
files.return_value.create = create
service = MagicMock()
service.files = files
return service
@pytest.mark.asyncio
async def test_create_folder_root_skips_resolve():
"""Parent 'root' should pass through resolve_folder_id and produce correct output."""
api_response = {
"id": "new-folder-id",
"name": "My Folder",
"webViewLink": "https://drive.google.com/drive/folders/new-folder-id",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="root",
):
result = await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="My Folder",
parent_folder_id="root",
)
assert "new-folder-id" in result
assert "My Folder" in result
assert "https://drive.google.com/drive/folders/new-folder-id" in result
@pytest.mark.asyncio
async def test_create_folder_custom_parent_resolves():
"""A non-root parent_folder_id should go through resolve_folder_id."""
api_response = {
"id": "new-folder-id",
"name": "Sub Folder",
"webViewLink": "https://drive.google.com/drive/folders/new-folder-id",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="resolved-parent-id",
) as mock_resolve:
result = await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="Sub Folder",
parent_folder_id="shortcut-id",
)
mock_resolve.assert_awaited_once_with(service, "shortcut-id")
# The output message uses the original parent_folder_id, not the resolved one
assert "shortcut-id" in result
# But the API call should use the resolved ID
service.files().create.assert_called_once_with(
body={
"name": "Sub Folder",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["resolved-parent-id"],
},
fields="id, name, webViewLink",
supportsAllDrives=True,
)
@pytest.mark.asyncio
async def test_create_folder_passes_correct_metadata():
"""Verify the metadata dict sent to the Drive API is correct."""
api_response = {
"id": "abc123",
"name": "Test",
"webViewLink": "https://drive.google.com/drive/folders/abc123",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="resolved-id",
):
await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="Test",
parent_folder_id="some-parent",
)
service.files().create.assert_called_once_with(
body={
"name": "Test",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["resolved-id"],
},
fields="id, name, webViewLink",
supportsAllDrives=True,
)
@pytest.mark.asyncio
async def test_create_folder_missing_webviewlink():
"""When the API omits webViewLink, the result should have an empty link."""
api_response = {
"id": "abc123",
"name": "NoLink",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="root",
):
result = await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="NoLink",
parent_folder_id="root",
)
assert "abc123" in result
assert "NoLink" in result

View File

@@ -0,0 +1,60 @@
import os
import sys
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import main
def test_resolve_permissions_mode_selection_without_tier():
services = ["gmail", "drive"]
resolved_services, tier_tool_filter = main.resolve_permissions_mode_selection(
services, None
)
assert resolved_services == services
assert tier_tool_filter is None
def test_resolve_permissions_mode_selection_with_tier_filters_services(monkeypatch):
def fake_resolve_tools_from_tier(tier, services):
assert tier == "core"
assert services == ["gmail", "drive", "slides"]
return ["search_gmail_messages"], ["gmail"]
monkeypatch.setattr(main, "resolve_tools_from_tier", fake_resolve_tools_from_tier)
resolved_services, tier_tool_filter = main.resolve_permissions_mode_selection(
["gmail", "drive", "slides"], "core"
)
assert resolved_services == ["gmail"]
assert tier_tool_filter == {"search_gmail_messages"}
def test_narrow_permissions_to_services_keeps_selected_order():
permissions = {"drive": "full", "gmail": "readonly", "calendar": "readonly"}
narrowed = main.narrow_permissions_to_services(permissions, ["gmail", "drive"])
assert narrowed == {"gmail": "readonly", "drive": "full"}
def test_narrow_permissions_to_services_drops_non_selected_services():
permissions = {"gmail": "send", "drive": "full"}
narrowed = main.narrow_permissions_to_services(permissions, ["gmail"])
assert narrowed == {"gmail": "send"}
def test_permissions_and_tools_flags_are_rejected(monkeypatch, capsys):
monkeypatch.setattr(main, "configure_safe_logging", lambda: None)
monkeypatch.setattr(
sys,
"argv",
["main.py", "--permissions", "gmail:readonly", "--tools", "gmail"],
)
with pytest.raises(SystemExit) as exc:
main.main()
assert exc.value.code == 1
captured = capsys.readouterr()
assert "--permissions and --tools cannot be combined" in captured.err

118
tests/test_permissions.py Normal file
View File

@@ -0,0 +1,118 @@
"""
Unit tests for granular per-service permission parsing and scope resolution.
Covers parse_permissions_arg() validation (format, duplicates, unknown
service/level) and cumulative scope expansion in get_scopes_for_permission().
"""
import sys
import os
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from auth.permissions import (
get_scopes_for_permission,
parse_permissions_arg,
SERVICE_PERMISSION_LEVELS,
)
from auth.scopes import (
GMAIL_READONLY_SCOPE,
GMAIL_LABELS_SCOPE,
GMAIL_MODIFY_SCOPE,
GMAIL_COMPOSE_SCOPE,
DRIVE_READONLY_SCOPE,
DRIVE_SCOPE,
DRIVE_FILE_SCOPE,
)
class TestParsePermissionsArg:
"""Tests for parse_permissions_arg()."""
def test_single_valid_entry(self):
result = parse_permissions_arg(["gmail:readonly"])
assert result == {"gmail": "readonly"}
def test_multiple_valid_entries(self):
result = parse_permissions_arg(["gmail:organize", "drive:full"])
assert result == {"gmail": "organize", "drive": "full"}
def test_all_services_at_readonly(self):
entries = [f"{svc}:readonly" for svc in SERVICE_PERMISSION_LEVELS]
result = parse_permissions_arg(entries)
assert set(result.keys()) == set(SERVICE_PERMISSION_LEVELS.keys())
def test_missing_colon_raises(self):
with pytest.raises(ValueError, match="Invalid permission format"):
parse_permissions_arg(["gmail_readonly"])
def test_duplicate_service_raises(self):
with pytest.raises(ValueError, match="Duplicate service"):
parse_permissions_arg(["gmail:readonly", "gmail:full"])
def test_unknown_service_raises(self):
with pytest.raises(ValueError, match="Unknown service"):
parse_permissions_arg(["fakesvc:readonly"])
def test_unknown_level_raises(self):
with pytest.raises(ValueError, match="Unknown level"):
parse_permissions_arg(["gmail:superadmin"])
def test_empty_list_returns_empty(self):
assert parse_permissions_arg([]) == {}
def test_extra_colon_in_value(self):
"""A level containing a colon should fail as unknown level."""
with pytest.raises(ValueError, match="Unknown level"):
parse_permissions_arg(["gmail:read:only"])
class TestGetScopesForPermission:
"""Tests for get_scopes_for_permission() cumulative scope expansion."""
def test_gmail_readonly_returns_readonly_scope(self):
scopes = get_scopes_for_permission("gmail", "readonly")
assert GMAIL_READONLY_SCOPE in scopes
def test_gmail_organize_includes_readonly(self):
"""Organize level should cumulatively include readonly scopes."""
scopes = get_scopes_for_permission("gmail", "organize")
assert GMAIL_READONLY_SCOPE in scopes
assert GMAIL_LABELS_SCOPE in scopes
assert GMAIL_MODIFY_SCOPE in scopes
def test_gmail_drafts_includes_organize_and_readonly(self):
scopes = get_scopes_for_permission("gmail", "drafts")
assert GMAIL_READONLY_SCOPE in scopes
assert GMAIL_LABELS_SCOPE in scopes
assert GMAIL_COMPOSE_SCOPE in scopes
def test_drive_readonly_excludes_full(self):
scopes = get_scopes_for_permission("drive", "readonly")
assert DRIVE_READONLY_SCOPE in scopes
assert DRIVE_SCOPE not in scopes
assert DRIVE_FILE_SCOPE not in scopes
def test_drive_full_includes_readonly(self):
scopes = get_scopes_for_permission("drive", "full")
assert DRIVE_READONLY_SCOPE in scopes
assert DRIVE_SCOPE in scopes
def test_unknown_service_raises(self):
with pytest.raises(ValueError, match="Unknown service"):
get_scopes_for_permission("nonexistent", "readonly")
def test_unknown_level_raises(self):
with pytest.raises(ValueError, match="Unknown permission level"):
get_scopes_for_permission("gmail", "nonexistent")
def test_no_duplicate_scopes(self):
"""Cumulative expansion should deduplicate scopes."""
for service, levels in SERVICE_PERMISSION_LEVELS.items():
for level_name, _ in levels:
scopes = get_scopes_for_permission(service, level_name)
assert len(scopes) == len(set(scopes)), (
f"Duplicate scopes for {service}:{level_name}"
)

View File

@@ -12,6 +12,7 @@ import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from auth.scopes import ( from auth.scopes import (
BASE_SCOPES,
CALENDAR_READONLY_SCOPE, CALENDAR_READONLY_SCOPE,
CALENDAR_SCOPE, CALENDAR_SCOPE,
CONTACTS_READONLY_SCOPE, CONTACTS_READONLY_SCOPE,
@@ -31,6 +32,8 @@ from auth.scopes import (
has_required_scopes, has_required_scopes,
set_read_only, set_read_only,
) )
from auth.permissions import get_scopes_for_permission, set_permissions
import auth.permissions as permissions_module
class TestDocsScopes: class TestDocsScopes:
@@ -195,3 +198,34 @@ class TestHasRequiredScopes:
available = [GMAIL_MODIFY_SCOPE] available = [GMAIL_MODIFY_SCOPE]
required = [GMAIL_READONLY_SCOPE, DRIVE_READONLY_SCOPE] required = [GMAIL_READONLY_SCOPE, DRIVE_READONLY_SCOPE]
assert not has_required_scopes(available, required) assert not has_required_scopes(available, required)
class TestGranularPermissionsScopes:
"""Tests for granular permissions scope generation path."""
def setup_method(self):
set_read_only(False)
permissions_module._PERMISSIONS = None
def teardown_method(self):
set_read_only(False)
permissions_module._PERMISSIONS = None
def test_permissions_mode_returns_base_plus_permission_scopes(self):
set_permissions({"gmail": "send", "drive": "readonly"})
scopes = get_scopes_for_tools(["calendar"]) # ignored in permissions mode
expected = set(BASE_SCOPES)
expected.update(get_scopes_for_permission("gmail", "send"))
expected.update(get_scopes_for_permission("drive", "readonly"))
assert set(scopes) == expected
def test_permissions_mode_overrides_read_only_and_full_maps(self):
set_read_only(True)
without_permissions = get_scopes_for_tools(["drive"])
assert DRIVE_READONLY_SCOPE in without_permissions
set_permissions({"gmail": "readonly"})
with_permissions = get_scopes_for_tools(["drive"])
assert GMAIL_READONLY_SCOPE in with_permissions
assert DRIVE_READONLY_SCOPE not in with_permissions