merge conflicts

This commit is contained in:
Taylor Wilsdon
2026-02-28 17:53:39 -04:00
20 changed files with 1591 additions and 127 deletions

View File

@@ -0,0 +1,69 @@
import pytest
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse
from core.server import serve_attachment
def _build_request(file_id: str) -> Request:
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"method": "GET",
"scheme": "http",
"path": f"/attachments/{file_id}",
"raw_path": f"/attachments/{file_id}".encode(),
"query_string": b"",
"headers": [],
"client": ("127.0.0.1", 12345),
"server": ("localhost", 8000),
"path_params": {"file_id": file_id},
}
async def receive():
return {"type": "http.request", "body": b"", "more_body": False}
return Request(scope, receive)
@pytest.mark.asyncio
async def test_serve_attachment_uses_path_param_file_id(monkeypatch, tmp_path):
file_path = tmp_path / "sample.pdf"
file_path.write_bytes(b"%PDF-1.3\n")
captured = {}
class DummyStorage:
def get_attachment_metadata(self, file_id):
captured["file_id"] = file_id
return {"filename": "sample.pdf", "mime_type": "application/pdf"}
def get_attachment_path(self, _file_id):
return file_path
monkeypatch.setattr(
"core.attachment_storage.get_attachment_storage", lambda: DummyStorage()
)
response = await serve_attachment(_build_request("abc123"))
assert captured["file_id"] == "abc123"
assert isinstance(response, FileResponse)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_serve_attachment_404_when_metadata_missing(monkeypatch):
class DummyStorage:
def get_attachment_metadata(self, _file_id):
return None
monkeypatch.setattr(
"core.attachment_storage.get_attachment_storage", lambda: DummyStorage()
)
response = await serve_attachment(_build_request("missing"))
assert isinstance(response, JSONResponse)
assert response.status_code == 404
assert b"Attachment not found or expired" in response.body

View File

@@ -43,7 +43,7 @@ def _make_attachment(
def _unwrap(tool):
"""Unwrap a FunctionTool + decorator chain to the original async function."""
fn = tool.fn # FunctionTool stores the wrapped callable in .fn
fn = getattr(tool, "fn", tool)
while hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
return fn

View File

@@ -0,0 +1,147 @@
"""
Unit tests for create_drive_folder tool.
"""
import os
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from gdrive.drive_tools import _create_drive_folder_impl as _raw_create_drive_folder
def _make_service(created_response):
"""Build a mock Drive service whose files().create().execute returns *created_response*."""
execute = MagicMock(return_value=created_response)
create = MagicMock()
create.return_value.execute = execute
files = MagicMock()
files.return_value.create = create
service = MagicMock()
service.files = files
return service
@pytest.mark.asyncio
async def test_create_folder_root_skips_resolve():
"""Parent 'root' should pass through resolve_folder_id and produce correct output."""
api_response = {
"id": "new-folder-id",
"name": "My Folder",
"webViewLink": "https://drive.google.com/drive/folders/new-folder-id",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="root",
):
result = await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="My Folder",
parent_folder_id="root",
)
assert "new-folder-id" in result
assert "My Folder" in result
assert "https://drive.google.com/drive/folders/new-folder-id" in result
@pytest.mark.asyncio
async def test_create_folder_custom_parent_resolves():
"""A non-root parent_folder_id should go through resolve_folder_id."""
api_response = {
"id": "new-folder-id",
"name": "Sub Folder",
"webViewLink": "https://drive.google.com/drive/folders/new-folder-id",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="resolved-parent-id",
) as mock_resolve:
result = await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="Sub Folder",
parent_folder_id="shortcut-id",
)
mock_resolve.assert_awaited_once_with(service, "shortcut-id")
# The output message uses the original parent_folder_id, not the resolved one
assert "shortcut-id" in result
# But the API call should use the resolved ID
service.files().create.assert_called_once_with(
body={
"name": "Sub Folder",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["resolved-parent-id"],
},
fields="id, name, webViewLink",
supportsAllDrives=True,
)
@pytest.mark.asyncio
async def test_create_folder_passes_correct_metadata():
"""Verify the metadata dict sent to the Drive API is correct."""
api_response = {
"id": "abc123",
"name": "Test",
"webViewLink": "https://drive.google.com/drive/folders/abc123",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="resolved-id",
):
await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="Test",
parent_folder_id="some-parent",
)
service.files().create.assert_called_once_with(
body={
"name": "Test",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["resolved-id"],
},
fields="id, name, webViewLink",
supportsAllDrives=True,
)
@pytest.mark.asyncio
async def test_create_folder_missing_webviewlink():
"""When the API omits webViewLink, the result should have an empty link."""
api_response = {
"id": "abc123",
"name": "NoLink",
}
service = _make_service(api_response)
with patch(
"gdrive.drive_tools.resolve_folder_id",
new_callable=AsyncMock,
return_value="root",
):
result = await _raw_create_drive_folder(
service,
user_google_email="user@example.com",
folder_name="NoLink",
parent_folder_id="root",
)
assert "abc123" in result
assert "NoLink" in result

View File

@@ -1,8 +1,9 @@
"""
Unit tests for Google Drive MCP tools.
Tests create_drive_folder with mocked API responses,
and the list_drive_items and search_drive_files tools
Tests create_drive_folder with mocked API responses, plus coverage for
`search_drive_files` and `list_drive_items` pagination, `detailed` output,
and `file_type` filtering behaviors.
"""
import pytest
@@ -12,6 +13,221 @@ import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from gdrive.drive_helpers import build_drive_list_params
from gdrive.drive_tools import list_drive_items, search_drive_files
def _unwrap(tool):
"""Unwrap a FunctionTool + decorator chain to the original async function.
Handles both older FastMCP (FunctionTool with .fn) and newer FastMCP
(server.tool() returns the function directly).
"""
fn = tool.fn if hasattr(tool, "fn") else tool
while hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
return fn
# ---------------------------------------------------------------------------
# search_drive_files — page_token
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_search_drive_files_page_token_passed_to_api():
"""page_token is forwarded to the Drive API as pageToken."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
{
"id": "f1",
"name": "Report.pdf",
"mimeType": "application/pdf",
"webViewLink": "https://drive.google.com/file/f1",
"modifiedTime": "2024-01-01T00:00:00Z",
}
]
}
await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="budget",
page_token="tok_abc123",
)
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert call_kwargs.get("pageToken") == "tok_abc123"
@pytest.mark.asyncio
async def test_search_drive_files_next_page_token_in_output():
"""nextPageToken from the API response is appended at the end of the output."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
{
"id": "f2",
"name": "Notes.docx",
"mimeType": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"webViewLink": "https://drive.google.com/file/f2",
"modifiedTime": "2024-02-01T00:00:00Z",
}
],
"nextPageToken": "next_tok_xyz",
}
result = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="notes",
)
assert result.endswith("nextPageToken: next_tok_xyz")
@pytest.mark.asyncio
async def test_search_drive_files_no_next_page_token_when_absent():
"""nextPageToken does not appear in output when the API has no more pages."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
{
"id": "f3",
"name": "Summary.txt",
"mimeType": "text/plain",
"webViewLink": "https://drive.google.com/file/f3",
"modifiedTime": "2024-03-01T00:00:00Z",
}
]
# no nextPageToken key
}
result = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="summary",
)
assert "nextPageToken" not in result
# ---------------------------------------------------------------------------
# list_drive_items — page_token
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_drive_items_page_token_passed_to_api(mock_resolve_folder):
"""page_token is forwarded to the Drive API as pageToken."""
mock_resolve_folder.return_value = "root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
{
"id": "folder1",
"name": "Archive",
"mimeType": "application/vnd.google-apps.folder",
"webViewLink": "https://drive.google.com/drive/folders/folder1",
"modifiedTime": "2024-01-15T00:00:00Z",
}
]
}
await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
page_token="tok_page2",
)
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert call_kwargs.get("pageToken") == "tok_page2"
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_drive_items_next_page_token_in_output(mock_resolve_folder):
"""nextPageToken from the API response is appended at the end of the output."""
mock_resolve_folder.return_value = "root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
{
"id": "file99",
"name": "data.csv",
"mimeType": "text/csv",
"webViewLink": "https://drive.google.com/file/file99",
"modifiedTime": "2024-04-01T00:00:00Z",
}
],
"nextPageToken": "next_list_tok",
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
)
assert result.endswith("nextPageToken: next_list_tok")
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_drive_items_no_next_page_token_when_absent(mock_resolve_folder):
"""nextPageToken does not appear in output when the API has no more pages."""
mock_resolve_folder.return_value = "root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
{
"id": "file100",
"name": "readme.txt",
"mimeType": "text/plain",
"webViewLink": "https://drive.google.com/file/file100",
"modifiedTime": "2024-05-01T00:00:00Z",
}
]
# no nextPageToken key
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
)
assert "nextPageToken" not in result
# Helpers
# ---------------------------------------------------------------------------
def _make_file(
file_id: str,
name: str,
mime_type: str,
link: str = "http://link",
modified: str = "2024-01-01T00:00:00Z",
size: str | None = None,
) -> dict:
item = {
"id": file_id,
"name": name,
"mimeType": mime_type,
"webViewLink": link,
"modifiedTime": modified,
}
if size is not None:
item["size"] = size
return item
# ---------------------------------------------------------------------------
# create_drive_folder
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_drive_folder():
@@ -47,40 +263,324 @@ async def test_create_drive_folder():
assert "https://drive.google.com/drive/folders/folder123" in result
# ---------------------------------------------------------------------------
# build_drive_list_params — detailed flag (pure unit tests, no I/O)
# ---------------------------------------------------------------------------
from gdrive.drive_tools import list_drive_items, search_drive_files
def test_build_params_detailed_true_includes_extra_fields():
"""detailed=True requests modifiedTime, webViewLink, and size from the API."""
params = build_drive_list_params(query="name='x'", page_size=10, detailed=True)
assert "modifiedTime" in params["fields"]
assert "webViewLink" in params["fields"]
assert "size" in params["fields"]
def test_build_params_detailed_false_omits_extra_fields():
"""detailed=False omits modifiedTime, webViewLink, and size from the API request."""
params = build_drive_list_params(query="name='x'", page_size=10, detailed=False)
assert "modifiedTime" not in params["fields"]
assert "webViewLink" not in params["fields"]
assert "size" not in params["fields"]
def test_build_params_detailed_false_keeps_core_fields():
"""detailed=False still requests id, name, and mimeType."""
params = build_drive_list_params(query="name='x'", page_size=10, detailed=False)
assert "id" in params["fields"]
assert "name" in params["fields"]
assert "mimeType" in params["fields"]
def test_build_params_default_is_detailed():
"""Omitting detailed behaves identically to detailed=True."""
params_default = build_drive_list_params(query="q", page_size=5)
params_true = build_drive_list_params(query="q", page_size=5, detailed=True)
assert params_default["fields"] == params_true["fields"]
# ---------------------------------------------------------------------------
# Helpers
# search_drive_files — detailed flag
# ---------------------------------------------------------------------------
def _unwrap(fn):
"""Unwrap a FunctionTool or plain-function decorator chain to the original async function."""
if hasattr(fn, "fn"):
fn = fn.fn # FunctionTool wrapper (some server versions)
while hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
return fn
def _make_file(
file_id: str,
name: str,
mime_type: str,
link: str = "http://link",
modified: str = "2024-01-01T00:00:00Z",
) -> dict:
return {
"id": file_id,
"name": name,
"mimeType": mime_type,
"webViewLink": link,
"modifiedTime": modified,
@pytest.mark.asyncio
async def test_search_detailed_true_output_includes_metadata():
"""detailed=True (default) includes modified time and link in output."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file(
"f1",
"My Doc",
"application/vnd.google-apps.document",
modified="2024-06-01T12:00:00Z",
link="http://link/f1",
)
]
}
result = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="my doc",
detailed=True,
)
assert "My Doc" in result
assert "2024-06-01T12:00:00Z" in result
assert "http://link/f1" in result
@pytest.mark.asyncio
async def test_search_detailed_false_output_excludes_metadata():
"""detailed=False omits modified time and link from output."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file(
"f1",
"My Doc",
"application/vnd.google-apps.document",
modified="2024-06-01T12:00:00Z",
link="http://link/f1",
)
]
}
result = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="my doc",
detailed=False,
)
assert "My Doc" in result
assert "f1" in result
assert "2024-06-01T12:00:00Z" not in result
assert "http://link/f1" not in result
@pytest.mark.asyncio
async def test_search_detailed_true_with_size():
"""When the item has a size field, detailed=True includes it in output."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file("f2", "Big File", "application/pdf", size="102400"),
]
}
result = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="big",
detailed=True,
)
assert "102400" in result
@pytest.mark.asyncio
async def test_search_detailed_true_requests_extra_api_fields():
"""detailed=True passes full fields string to the Drive API."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="anything",
detailed=True,
)
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert "modifiedTime" in call_kwargs["fields"]
assert "webViewLink" in call_kwargs["fields"]
assert "size" in call_kwargs["fields"]
@pytest.mark.asyncio
async def test_search_detailed_false_requests_compact_api_fields():
"""detailed=False passes compact fields string to the Drive API."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="anything",
detailed=False,
)
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert "modifiedTime" not in call_kwargs["fields"]
assert "webViewLink" not in call_kwargs["fields"]
assert "size" not in call_kwargs["fields"]
@pytest.mark.asyncio
async def test_search_default_detailed_matches_detailed_true():
"""Omitting detailed produces the same output as detailed=True."""
file = _make_file(
"f1",
"Doc",
"application/vnd.google-apps.document",
modified="2024-01-01T00:00:00Z",
link="http://l",
)
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": [file]}
result_default = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="doc",
)
mock_service.files().list().execute.return_value = {"files": [file]}
result_true = await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="doc",
detailed=True,
)
assert result_default == result_true
# ---------------------------------------------------------------------------
# search_drive_files
# list_drive_items — detailed flag
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_detailed_true_output_includes_metadata(mock_resolve_folder):
"""detailed=True (default) includes modified time and link in output."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file(
"id1",
"Report",
"application/vnd.google-apps.document",
modified="2024-03-15T08:00:00Z",
link="http://link/id1",
)
]
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
detailed=True,
)
assert "Report" in result
assert "2024-03-15T08:00:00Z" in result
assert "http://link/id1" in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_detailed_false_output_excludes_metadata(mock_resolve_folder):
"""detailed=False omits modified time and link from output."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file(
"id1",
"Report",
"application/vnd.google-apps.document",
modified="2024-03-15T08:00:00Z",
link="http://link/id1",
)
]
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
detailed=False,
)
assert "Report" in result
assert "id1" in result
assert "2024-03-15T08:00:00Z" not in result
assert "http://link/id1" not in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_detailed_true_with_size(mock_resolve_folder):
"""When item has a size field, detailed=True includes it in output."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file("id2", "Big File", "application/pdf", size="204800"),
]
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
detailed=True,
)
assert "204800" in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_detailed_true_requests_extra_api_fields(mock_resolve_folder):
"""detailed=True passes full fields string to the Drive API."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
detailed=True,
)
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert "modifiedTime" in call_kwargs["fields"]
assert "webViewLink" in call_kwargs["fields"]
assert "size" in call_kwargs["fields"]
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_detailed_false_requests_compact_api_fields(mock_resolve_folder):
"""detailed=False passes compact fields string to the Drive API."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
detailed=False,
)
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert "modifiedTime" not in call_kwargs["fields"]
assert "webViewLink" not in call_kwargs["fields"]
assert "size" not in call_kwargs["fields"]
# ---------------------------------------------------------------------------
# Existing behavior coverage
# ---------------------------------------------------------------------------
@@ -120,15 +620,59 @@ async def test_search_no_results():
assert "No files found" in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_items_basic(mock_resolve_folder):
"""Basic listing without filters returns all items."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file("id1", "Folder A", "application/vnd.google-apps.folder"),
_make_file("id2", "Doc B", "application/vnd.google-apps.document"),
]
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
)
assert "Found 2 items" in result
assert "Folder A" in result
assert "Doc B" in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_items_no_results(mock_resolve_folder):
"""Empty folder returns a clear message."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
)
assert "No items found" in result
# ---------------------------------------------------------------------------
# file_type filtering
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_search_file_type_folder_adds_mime_filter():
"""file_type='folder' appends the folder MIME type to the query."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file(
"fold1", "My Folder", "application/vnd.google-apps.folder"
)
_make_file("fold1", "My Folder", "application/vnd.google-apps.folder")
]
}
@@ -142,7 +686,6 @@ async def test_search_file_type_folder_adds_mime_filter():
assert "Found 1 files" in result
assert "My Folder" in result
# Verify the API was called with the mimeType filter in the query
call_kwargs = mock_service.files.return_value.list.call_args.kwargs
assert "mimeType = 'application/vnd.google-apps.folder'" in call_kwargs["q"]
@@ -227,7 +770,7 @@ async def test_search_file_type_structured_query_combined():
await _unwrap(search_drive_files)(
service=mock_service,
user_google_email="user@example.com",
query="name contains 'budget'", # structured query
query="name contains 'budget'",
file_type="spreadsheet",
)
@@ -251,52 +794,6 @@ async def test_search_file_type_unknown_raises_value_error():
)
# ---------------------------------------------------------------------------
# list_drive_items
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_items_basic(mock_resolve_folder):
"""Basic listing without filters returns all items."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file("id1", "Folder A", "application/vnd.google-apps.folder"),
_make_file("id2", "Doc B", "application/vnd.google-apps.document"),
]
}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
)
assert "Found 2 items" in result
assert "Folder A" in result
assert "Doc B" in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_items_no_results(mock_resolve_folder):
"""Empty folder returns a clear message."""
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
result = await _unwrap(list_drive_items)(
service=mock_service,
user_google_email="user@example.com",
folder_id="root",
)
assert "No items found" in result
@pytest.mark.asyncio
@patch("gdrive.drive_tools.resolve_folder_id", new_callable=AsyncMock)
async def test_list_items_file_type_folder_adds_mime_filter(mock_resolve_folder):
@@ -304,9 +801,7 @@ async def test_list_items_file_type_folder_adds_mime_filter(mock_resolve_folder)
mock_resolve_folder.return_value = "resolved_root"
mock_service = Mock()
mock_service.files().list().execute.return_value = {
"files": [
_make_file("sub1", "SubFolder", "application/vnd.google-apps.folder")
]
"files": [_make_file("sub1", "SubFolder", "application/vnd.google-apps.folder")]
}
result = await _unwrap(list_drive_items)(
@@ -406,7 +901,7 @@ async def test_list_items_file_type_unknown_raises(mock_resolve_folder):
@pytest.mark.asyncio
async def test_search_or_query_is_grouped_before_mime_filter():
"""An OR structured query is wrapped in parentheses so the MIME filter binds correctly."""
"""An OR structured query is wrapped in parentheses so MIME filter precedence is correct."""
mock_service = Mock()
mock_service.files().list().execute.return_value = {"files": []}
@@ -418,8 +913,6 @@ async def test_search_or_query_is_grouped_before_mime_filter():
)
q = mock_service.files.return_value.list.call_args.kwargs["q"]
# Without grouping this would be: name contains 'a' or name contains 'b' and mimeType = ...
# The 'and' would only bind to the second term, leaking the first term through unfiltered.
assert q.startswith("(")
assert "name contains 'a' or name contains 'b'" in q
assert ") and mimeType = 'application/vnd.google-apps.document'" in q

View File

@@ -0,0 +1,60 @@
import os
import sys
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import main
def test_resolve_permissions_mode_selection_without_tier():
services = ["gmail", "drive"]
resolved_services, tier_tool_filter = main.resolve_permissions_mode_selection(
services, None
)
assert resolved_services == services
assert tier_tool_filter is None
def test_resolve_permissions_mode_selection_with_tier_filters_services(monkeypatch):
def fake_resolve_tools_from_tier(tier, services):
assert tier == "core"
assert services == ["gmail", "drive", "slides"]
return ["search_gmail_messages"], ["gmail"]
monkeypatch.setattr(main, "resolve_tools_from_tier", fake_resolve_tools_from_tier)
resolved_services, tier_tool_filter = main.resolve_permissions_mode_selection(
["gmail", "drive", "slides"], "core"
)
assert resolved_services == ["gmail"]
assert tier_tool_filter == {"search_gmail_messages"}
def test_narrow_permissions_to_services_keeps_selected_order():
permissions = {"drive": "full", "gmail": "readonly", "calendar": "readonly"}
narrowed = main.narrow_permissions_to_services(permissions, ["gmail", "drive"])
assert narrowed == {"gmail": "readonly", "drive": "full"}
def test_narrow_permissions_to_services_drops_non_selected_services():
permissions = {"gmail": "send", "drive": "full"}
narrowed = main.narrow_permissions_to_services(permissions, ["gmail"])
assert narrowed == {"gmail": "send"}
def test_permissions_and_tools_flags_are_rejected(monkeypatch, capsys):
monkeypatch.setattr(main, "configure_safe_logging", lambda: None)
monkeypatch.setattr(
sys,
"argv",
["main.py", "--permissions", "gmail:readonly", "--tools", "gmail"],
)
with pytest.raises(SystemExit) as exc:
main.main()
assert exc.value.code == 1
captured = capsys.readouterr()
assert "--permissions and --tools cannot be combined" in captured.err

118
tests/test_permissions.py Normal file
View File

@@ -0,0 +1,118 @@
"""
Unit tests for granular per-service permission parsing and scope resolution.
Covers parse_permissions_arg() validation (format, duplicates, unknown
service/level) and cumulative scope expansion in get_scopes_for_permission().
"""
import sys
import os
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from auth.permissions import (
get_scopes_for_permission,
parse_permissions_arg,
SERVICE_PERMISSION_LEVELS,
)
from auth.scopes import (
GMAIL_READONLY_SCOPE,
GMAIL_LABELS_SCOPE,
GMAIL_MODIFY_SCOPE,
GMAIL_COMPOSE_SCOPE,
DRIVE_READONLY_SCOPE,
DRIVE_SCOPE,
DRIVE_FILE_SCOPE,
)
class TestParsePermissionsArg:
"""Tests for parse_permissions_arg()."""
def test_single_valid_entry(self):
result = parse_permissions_arg(["gmail:readonly"])
assert result == {"gmail": "readonly"}
def test_multiple_valid_entries(self):
result = parse_permissions_arg(["gmail:organize", "drive:full"])
assert result == {"gmail": "organize", "drive": "full"}
def test_all_services_at_readonly(self):
entries = [f"{svc}:readonly" for svc in SERVICE_PERMISSION_LEVELS]
result = parse_permissions_arg(entries)
assert set(result.keys()) == set(SERVICE_PERMISSION_LEVELS.keys())
def test_missing_colon_raises(self):
with pytest.raises(ValueError, match="Invalid permission format"):
parse_permissions_arg(["gmail_readonly"])
def test_duplicate_service_raises(self):
with pytest.raises(ValueError, match="Duplicate service"):
parse_permissions_arg(["gmail:readonly", "gmail:full"])
def test_unknown_service_raises(self):
with pytest.raises(ValueError, match="Unknown service"):
parse_permissions_arg(["fakesvc:readonly"])
def test_unknown_level_raises(self):
with pytest.raises(ValueError, match="Unknown level"):
parse_permissions_arg(["gmail:superadmin"])
def test_empty_list_returns_empty(self):
assert parse_permissions_arg([]) == {}
def test_extra_colon_in_value(self):
"""A level containing a colon should fail as unknown level."""
with pytest.raises(ValueError, match="Unknown level"):
parse_permissions_arg(["gmail:read:only"])
class TestGetScopesForPermission:
"""Tests for get_scopes_for_permission() cumulative scope expansion."""
def test_gmail_readonly_returns_readonly_scope(self):
scopes = get_scopes_for_permission("gmail", "readonly")
assert GMAIL_READONLY_SCOPE in scopes
def test_gmail_organize_includes_readonly(self):
"""Organize level should cumulatively include readonly scopes."""
scopes = get_scopes_for_permission("gmail", "organize")
assert GMAIL_READONLY_SCOPE in scopes
assert GMAIL_LABELS_SCOPE in scopes
assert GMAIL_MODIFY_SCOPE in scopes
def test_gmail_drafts_includes_organize_and_readonly(self):
scopes = get_scopes_for_permission("gmail", "drafts")
assert GMAIL_READONLY_SCOPE in scopes
assert GMAIL_LABELS_SCOPE in scopes
assert GMAIL_COMPOSE_SCOPE in scopes
def test_drive_readonly_excludes_full(self):
scopes = get_scopes_for_permission("drive", "readonly")
assert DRIVE_READONLY_SCOPE in scopes
assert DRIVE_SCOPE not in scopes
assert DRIVE_FILE_SCOPE not in scopes
def test_drive_full_includes_readonly(self):
scopes = get_scopes_for_permission("drive", "full")
assert DRIVE_READONLY_SCOPE in scopes
assert DRIVE_SCOPE in scopes
def test_unknown_service_raises(self):
with pytest.raises(ValueError, match="Unknown service"):
get_scopes_for_permission("nonexistent", "readonly")
def test_unknown_level_raises(self):
with pytest.raises(ValueError, match="Unknown permission level"):
get_scopes_for_permission("gmail", "nonexistent")
def test_no_duplicate_scopes(self):
"""Cumulative expansion should deduplicate scopes."""
for service, levels in SERVICE_PERMISSION_LEVELS.items():
for level_name, _ in levels:
scopes = get_scopes_for_permission(service, level_name)
assert len(scopes) == len(set(scopes)), (
f"Duplicate scopes for {service}:{level_name}"
)

View File

@@ -12,6 +12,7 @@ import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from auth.scopes import (
BASE_SCOPES,
CALENDAR_READONLY_SCOPE,
CALENDAR_SCOPE,
CONTACTS_READONLY_SCOPE,
@@ -31,6 +32,8 @@ from auth.scopes import (
has_required_scopes,
set_read_only,
)
from auth.permissions import get_scopes_for_permission, set_permissions
import auth.permissions as permissions_module
class TestDocsScopes:
@@ -195,3 +198,34 @@ class TestHasRequiredScopes:
available = [GMAIL_MODIFY_SCOPE]
required = [GMAIL_READONLY_SCOPE, DRIVE_READONLY_SCOPE]
assert not has_required_scopes(available, required)
class TestGranularPermissionsScopes:
"""Tests for granular permissions scope generation path."""
def setup_method(self):
set_read_only(False)
permissions_module._PERMISSIONS = None
def teardown_method(self):
set_read_only(False)
permissions_module._PERMISSIONS = None
def test_permissions_mode_returns_base_plus_permission_scopes(self):
set_permissions({"gmail": "send", "drive": "readonly"})
scopes = get_scopes_for_tools(["calendar"]) # ignored in permissions mode
expected = set(BASE_SCOPES)
expected.update(get_scopes_for_permission("gmail", "send"))
expected.update(get_scopes_for_permission("drive", "readonly"))
assert set(scopes) == expected
def test_permissions_mode_overrides_read_only_and_full_maps(self):
set_read_only(True)
without_permissions = get_scopes_for_tools(["drive"])
assert DRIVE_READONLY_SCOPE in without_permissions
set_permissions({"gmail": "readonly"})
with_permissions = get_scopes_for_tools(["drive"])
assert GMAIL_READONLY_SCOPE in with_permissions
assert DRIVE_READONLY_SCOPE not in with_permissions