Merge branch 'main' of https://github.com/taylorwilsdon/google_workspace_mcp into drive_files_pagination_fix
This commit is contained in:
69
tests/core/test_attachment_route.py
Normal file
69
tests/core/test_attachment_route.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import pytest
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import FileResponse, JSONResponse
|
||||
|
||||
from core.server import serve_attachment
|
||||
|
||||
|
||||
def _build_request(file_id: str) -> Request:
|
||||
scope = {
|
||||
"type": "http",
|
||||
"asgi": {"version": "3.0"},
|
||||
"http_version": "1.1",
|
||||
"method": "GET",
|
||||
"scheme": "http",
|
||||
"path": f"/attachments/{file_id}",
|
||||
"raw_path": f"/attachments/{file_id}".encode(),
|
||||
"query_string": b"",
|
||||
"headers": [],
|
||||
"client": ("127.0.0.1", 12345),
|
||||
"server": ("localhost", 8000),
|
||||
"path_params": {"file_id": file_id},
|
||||
}
|
||||
|
||||
async def receive():
|
||||
return {"type": "http.request", "body": b"", "more_body": False}
|
||||
|
||||
return Request(scope, receive)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_serve_attachment_uses_path_param_file_id(monkeypatch, tmp_path):
|
||||
file_path = tmp_path / "sample.pdf"
|
||||
file_path.write_bytes(b"%PDF-1.3\n")
|
||||
captured = {}
|
||||
|
||||
class DummyStorage:
|
||||
def get_attachment_metadata(self, file_id):
|
||||
captured["file_id"] = file_id
|
||||
return {"filename": "sample.pdf", "mime_type": "application/pdf"}
|
||||
|
||||
def get_attachment_path(self, _file_id):
|
||||
return file_path
|
||||
|
||||
monkeypatch.setattr(
|
||||
"core.attachment_storage.get_attachment_storage", lambda: DummyStorage()
|
||||
)
|
||||
|
||||
response = await serve_attachment(_build_request("abc123"))
|
||||
|
||||
assert captured["file_id"] == "abc123"
|
||||
assert isinstance(response, FileResponse)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_serve_attachment_404_when_metadata_missing(monkeypatch):
|
||||
class DummyStorage:
|
||||
def get_attachment_metadata(self, _file_id):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(
|
||||
"core.attachment_storage.get_attachment_storage", lambda: DummyStorage()
|
||||
)
|
||||
|
||||
response = await serve_attachment(_build_request("missing"))
|
||||
|
||||
assert isinstance(response, JSONResponse)
|
||||
assert response.status_code == 404
|
||||
assert b"Attachment not found or expired" in response.body
|
||||
@@ -43,7 +43,7 @@ def _make_attachment(
|
||||
|
||||
def _unwrap(tool):
|
||||
"""Unwrap a FunctionTool + decorator chain to the original async function."""
|
||||
fn = tool.fn # FunctionTool stores the wrapped callable in .fn
|
||||
fn = getattr(tool, "fn", tool)
|
||||
while hasattr(fn, "__wrapped__"):
|
||||
fn = fn.__wrapped__
|
||||
return fn
|
||||
|
||||
147
tests/gdrive/test_create_drive_folder.py
Normal file
147
tests/gdrive/test_create_drive_folder.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""
|
||||
Unit tests for create_drive_folder tool.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
|
||||
|
||||
from gdrive.drive_tools import _create_drive_folder_impl as _raw_create_drive_folder
|
||||
|
||||
|
||||
def _make_service(created_response):
|
||||
"""Build a mock Drive service whose files().create().execute returns *created_response*."""
|
||||
execute = MagicMock(return_value=created_response)
|
||||
create = MagicMock()
|
||||
create.return_value.execute = execute
|
||||
files = MagicMock()
|
||||
files.return_value.create = create
|
||||
service = MagicMock()
|
||||
service.files = files
|
||||
return service
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_folder_root_skips_resolve():
|
||||
"""Parent 'root' should pass through resolve_folder_id and produce correct output."""
|
||||
api_response = {
|
||||
"id": "new-folder-id",
|
||||
"name": "My Folder",
|
||||
"webViewLink": "https://drive.google.com/drive/folders/new-folder-id",
|
||||
}
|
||||
service = _make_service(api_response)
|
||||
|
||||
with patch(
|
||||
"gdrive.drive_tools.resolve_folder_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value="root",
|
||||
):
|
||||
result = await _raw_create_drive_folder(
|
||||
service,
|
||||
user_google_email="user@example.com",
|
||||
folder_name="My Folder",
|
||||
parent_folder_id="root",
|
||||
)
|
||||
|
||||
assert "new-folder-id" in result
|
||||
assert "My Folder" in result
|
||||
assert "https://drive.google.com/drive/folders/new-folder-id" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_folder_custom_parent_resolves():
|
||||
"""A non-root parent_folder_id should go through resolve_folder_id."""
|
||||
api_response = {
|
||||
"id": "new-folder-id",
|
||||
"name": "Sub Folder",
|
||||
"webViewLink": "https://drive.google.com/drive/folders/new-folder-id",
|
||||
}
|
||||
service = _make_service(api_response)
|
||||
|
||||
with patch(
|
||||
"gdrive.drive_tools.resolve_folder_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value="resolved-parent-id",
|
||||
) as mock_resolve:
|
||||
result = await _raw_create_drive_folder(
|
||||
service,
|
||||
user_google_email="user@example.com",
|
||||
folder_name="Sub Folder",
|
||||
parent_folder_id="shortcut-id",
|
||||
)
|
||||
|
||||
mock_resolve.assert_awaited_once_with(service, "shortcut-id")
|
||||
# The output message uses the original parent_folder_id, not the resolved one
|
||||
assert "shortcut-id" in result
|
||||
# But the API call should use the resolved ID
|
||||
service.files().create.assert_called_once_with(
|
||||
body={
|
||||
"name": "Sub Folder",
|
||||
"mimeType": "application/vnd.google-apps.folder",
|
||||
"parents": ["resolved-parent-id"],
|
||||
},
|
||||
fields="id, name, webViewLink",
|
||||
supportsAllDrives=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_folder_passes_correct_metadata():
|
||||
"""Verify the metadata dict sent to the Drive API is correct."""
|
||||
api_response = {
|
||||
"id": "abc123",
|
||||
"name": "Test",
|
||||
"webViewLink": "https://drive.google.com/drive/folders/abc123",
|
||||
}
|
||||
service = _make_service(api_response)
|
||||
|
||||
with patch(
|
||||
"gdrive.drive_tools.resolve_folder_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value="resolved-id",
|
||||
):
|
||||
await _raw_create_drive_folder(
|
||||
service,
|
||||
user_google_email="user@example.com",
|
||||
folder_name="Test",
|
||||
parent_folder_id="some-parent",
|
||||
)
|
||||
|
||||
service.files().create.assert_called_once_with(
|
||||
body={
|
||||
"name": "Test",
|
||||
"mimeType": "application/vnd.google-apps.folder",
|
||||
"parents": ["resolved-id"],
|
||||
},
|
||||
fields="id, name, webViewLink",
|
||||
supportsAllDrives=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_folder_missing_webviewlink():
|
||||
"""When the API omits webViewLink, the result should have an empty link."""
|
||||
api_response = {
|
||||
"id": "abc123",
|
||||
"name": "NoLink",
|
||||
}
|
||||
service = _make_service(api_response)
|
||||
|
||||
with patch(
|
||||
"gdrive.drive_tools.resolve_folder_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value="root",
|
||||
):
|
||||
result = await _raw_create_drive_folder(
|
||||
service,
|
||||
user_google_email="user@example.com",
|
||||
folder_name="NoLink",
|
||||
parent_folder_id="root",
|
||||
)
|
||||
|
||||
assert "abc123" in result
|
||||
assert "NoLink" in result
|
||||
60
tests/test_main_permissions_tier.py
Normal file
60
tests/test_main_permissions_tier.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
import main
|
||||
|
||||
|
||||
def test_resolve_permissions_mode_selection_without_tier():
|
||||
services = ["gmail", "drive"]
|
||||
resolved_services, tier_tool_filter = main.resolve_permissions_mode_selection(
|
||||
services, None
|
||||
)
|
||||
assert resolved_services == services
|
||||
assert tier_tool_filter is None
|
||||
|
||||
|
||||
def test_resolve_permissions_mode_selection_with_tier_filters_services(monkeypatch):
|
||||
def fake_resolve_tools_from_tier(tier, services):
|
||||
assert tier == "core"
|
||||
assert services == ["gmail", "drive", "slides"]
|
||||
return ["search_gmail_messages"], ["gmail"]
|
||||
|
||||
monkeypatch.setattr(main, "resolve_tools_from_tier", fake_resolve_tools_from_tier)
|
||||
|
||||
resolved_services, tier_tool_filter = main.resolve_permissions_mode_selection(
|
||||
["gmail", "drive", "slides"], "core"
|
||||
)
|
||||
assert resolved_services == ["gmail"]
|
||||
assert tier_tool_filter == {"search_gmail_messages"}
|
||||
|
||||
|
||||
def test_narrow_permissions_to_services_keeps_selected_order():
|
||||
permissions = {"drive": "full", "gmail": "readonly", "calendar": "readonly"}
|
||||
narrowed = main.narrow_permissions_to_services(permissions, ["gmail", "drive"])
|
||||
assert narrowed == {"gmail": "readonly", "drive": "full"}
|
||||
|
||||
|
||||
def test_narrow_permissions_to_services_drops_non_selected_services():
|
||||
permissions = {"gmail": "send", "drive": "full"}
|
||||
narrowed = main.narrow_permissions_to_services(permissions, ["gmail"])
|
||||
assert narrowed == {"gmail": "send"}
|
||||
|
||||
|
||||
def test_permissions_and_tools_flags_are_rejected(monkeypatch, capsys):
|
||||
monkeypatch.setattr(main, "configure_safe_logging", lambda: None)
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
["main.py", "--permissions", "gmail:readonly", "--tools", "gmail"],
|
||||
)
|
||||
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
main.main()
|
||||
|
||||
assert exc.value.code == 1
|
||||
captured = capsys.readouterr()
|
||||
assert "--permissions and --tools cannot be combined" in captured.err
|
||||
118
tests/test_permissions.py
Normal file
118
tests/test_permissions.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
Unit tests for granular per-service permission parsing and scope resolution.
|
||||
|
||||
Covers parse_permissions_arg() validation (format, duplicates, unknown
|
||||
service/level) and cumulative scope expansion in get_scopes_for_permission().
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
from auth.permissions import (
|
||||
get_scopes_for_permission,
|
||||
parse_permissions_arg,
|
||||
SERVICE_PERMISSION_LEVELS,
|
||||
)
|
||||
from auth.scopes import (
|
||||
GMAIL_READONLY_SCOPE,
|
||||
GMAIL_LABELS_SCOPE,
|
||||
GMAIL_MODIFY_SCOPE,
|
||||
GMAIL_COMPOSE_SCOPE,
|
||||
DRIVE_READONLY_SCOPE,
|
||||
DRIVE_SCOPE,
|
||||
DRIVE_FILE_SCOPE,
|
||||
)
|
||||
|
||||
|
||||
class TestParsePermissionsArg:
|
||||
"""Tests for parse_permissions_arg()."""
|
||||
|
||||
def test_single_valid_entry(self):
|
||||
result = parse_permissions_arg(["gmail:readonly"])
|
||||
assert result == {"gmail": "readonly"}
|
||||
|
||||
def test_multiple_valid_entries(self):
|
||||
result = parse_permissions_arg(["gmail:organize", "drive:full"])
|
||||
assert result == {"gmail": "organize", "drive": "full"}
|
||||
|
||||
def test_all_services_at_readonly(self):
|
||||
entries = [f"{svc}:readonly" for svc in SERVICE_PERMISSION_LEVELS]
|
||||
result = parse_permissions_arg(entries)
|
||||
assert set(result.keys()) == set(SERVICE_PERMISSION_LEVELS.keys())
|
||||
|
||||
def test_missing_colon_raises(self):
|
||||
with pytest.raises(ValueError, match="Invalid permission format"):
|
||||
parse_permissions_arg(["gmail_readonly"])
|
||||
|
||||
def test_duplicate_service_raises(self):
|
||||
with pytest.raises(ValueError, match="Duplicate service"):
|
||||
parse_permissions_arg(["gmail:readonly", "gmail:full"])
|
||||
|
||||
def test_unknown_service_raises(self):
|
||||
with pytest.raises(ValueError, match="Unknown service"):
|
||||
parse_permissions_arg(["fakesvc:readonly"])
|
||||
|
||||
def test_unknown_level_raises(self):
|
||||
with pytest.raises(ValueError, match="Unknown level"):
|
||||
parse_permissions_arg(["gmail:superadmin"])
|
||||
|
||||
def test_empty_list_returns_empty(self):
|
||||
assert parse_permissions_arg([]) == {}
|
||||
|
||||
def test_extra_colon_in_value(self):
|
||||
"""A level containing a colon should fail as unknown level."""
|
||||
with pytest.raises(ValueError, match="Unknown level"):
|
||||
parse_permissions_arg(["gmail:read:only"])
|
||||
|
||||
|
||||
class TestGetScopesForPermission:
|
||||
"""Tests for get_scopes_for_permission() cumulative scope expansion."""
|
||||
|
||||
def test_gmail_readonly_returns_readonly_scope(self):
|
||||
scopes = get_scopes_for_permission("gmail", "readonly")
|
||||
assert GMAIL_READONLY_SCOPE in scopes
|
||||
|
||||
def test_gmail_organize_includes_readonly(self):
|
||||
"""Organize level should cumulatively include readonly scopes."""
|
||||
scopes = get_scopes_for_permission("gmail", "organize")
|
||||
assert GMAIL_READONLY_SCOPE in scopes
|
||||
assert GMAIL_LABELS_SCOPE in scopes
|
||||
assert GMAIL_MODIFY_SCOPE in scopes
|
||||
|
||||
def test_gmail_drafts_includes_organize_and_readonly(self):
|
||||
scopes = get_scopes_for_permission("gmail", "drafts")
|
||||
assert GMAIL_READONLY_SCOPE in scopes
|
||||
assert GMAIL_LABELS_SCOPE in scopes
|
||||
assert GMAIL_COMPOSE_SCOPE in scopes
|
||||
|
||||
def test_drive_readonly_excludes_full(self):
|
||||
scopes = get_scopes_for_permission("drive", "readonly")
|
||||
assert DRIVE_READONLY_SCOPE in scopes
|
||||
assert DRIVE_SCOPE not in scopes
|
||||
assert DRIVE_FILE_SCOPE not in scopes
|
||||
|
||||
def test_drive_full_includes_readonly(self):
|
||||
scopes = get_scopes_for_permission("drive", "full")
|
||||
assert DRIVE_READONLY_SCOPE in scopes
|
||||
assert DRIVE_SCOPE in scopes
|
||||
|
||||
def test_unknown_service_raises(self):
|
||||
with pytest.raises(ValueError, match="Unknown service"):
|
||||
get_scopes_for_permission("nonexistent", "readonly")
|
||||
|
||||
def test_unknown_level_raises(self):
|
||||
with pytest.raises(ValueError, match="Unknown permission level"):
|
||||
get_scopes_for_permission("gmail", "nonexistent")
|
||||
|
||||
def test_no_duplicate_scopes(self):
|
||||
"""Cumulative expansion should deduplicate scopes."""
|
||||
for service, levels in SERVICE_PERMISSION_LEVELS.items():
|
||||
for level_name, _ in levels:
|
||||
scopes = get_scopes_for_permission(service, level_name)
|
||||
assert len(scopes) == len(set(scopes)), (
|
||||
f"Duplicate scopes for {service}:{level_name}"
|
||||
)
|
||||
@@ -12,6 +12,7 @@ import os
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
from auth.scopes import (
|
||||
BASE_SCOPES,
|
||||
CALENDAR_READONLY_SCOPE,
|
||||
CALENDAR_SCOPE,
|
||||
CONTACTS_READONLY_SCOPE,
|
||||
@@ -31,6 +32,8 @@ from auth.scopes import (
|
||||
has_required_scopes,
|
||||
set_read_only,
|
||||
)
|
||||
from auth.permissions import get_scopes_for_permission, set_permissions
|
||||
import auth.permissions as permissions_module
|
||||
|
||||
|
||||
class TestDocsScopes:
|
||||
@@ -195,3 +198,34 @@ class TestHasRequiredScopes:
|
||||
available = [GMAIL_MODIFY_SCOPE]
|
||||
required = [GMAIL_READONLY_SCOPE, DRIVE_READONLY_SCOPE]
|
||||
assert not has_required_scopes(available, required)
|
||||
|
||||
|
||||
class TestGranularPermissionsScopes:
|
||||
"""Tests for granular permissions scope generation path."""
|
||||
|
||||
def setup_method(self):
|
||||
set_read_only(False)
|
||||
permissions_module._PERMISSIONS = None
|
||||
|
||||
def teardown_method(self):
|
||||
set_read_only(False)
|
||||
permissions_module._PERMISSIONS = None
|
||||
|
||||
def test_permissions_mode_returns_base_plus_permission_scopes(self):
|
||||
set_permissions({"gmail": "send", "drive": "readonly"})
|
||||
scopes = get_scopes_for_tools(["calendar"]) # ignored in permissions mode
|
||||
|
||||
expected = set(BASE_SCOPES)
|
||||
expected.update(get_scopes_for_permission("gmail", "send"))
|
||||
expected.update(get_scopes_for_permission("drive", "readonly"))
|
||||
assert set(scopes) == expected
|
||||
|
||||
def test_permissions_mode_overrides_read_only_and_full_maps(self):
|
||||
set_read_only(True)
|
||||
without_permissions = get_scopes_for_tools(["drive"])
|
||||
assert DRIVE_READONLY_SCOPE in without_permissions
|
||||
|
||||
set_permissions({"gmail": "readonly"})
|
||||
with_permissions = get_scopes_for_tools(["drive"])
|
||||
assert GMAIL_READONLY_SCOPE in with_permissions
|
||||
assert DRIVE_READONLY_SCOPE not in with_permissions
|
||||
|
||||
Reference in New Issue
Block a user