""" test_mcp_server.py — Tests for the MCP server tool handlers and dispatch. Tests each tool handler directly (unit-level) and the handle_request dispatch layer (integration-level). Uses isolated palace + KG fixtures via monkeypatch to avoid touching real data. """ from datetime import datetime import json import sys import pytest def _patch_mcp_server(monkeypatch, config, kg): """Patch the mcp_server module globals to use test fixtures.""" from mempalace import mcp_server monkeypatch.setattr(mcp_server, "_config", config) monkeypatch.setattr(mcp_server, "_kg", kg) def _get_collection(palace_path, create=False): """Helper to get collection from test palace. Returns (client, collection) so callers can clean up the client when they are done. """ import chromadb client = chromadb.PersistentClient(path=palace_path) if create: return ( client, client.get_or_create_collection("mempalace_drawers", metadata={"hnsw:space": "cosine"}), ) return client, client.get_collection("mempalace_drawers") # ── Protocol Layer ────────────────────────────────────────────────────── class TestHandleRequest: def test_initialize(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "initialize", "id": 1, "params": {}}) assert resp["result"]["serverInfo"]["name"] == "mempalace" assert resp["id"] == 1 def test_initialize_negotiates_client_version(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "initialize", "id": 1, "params": {"protocolVersion": "2025-11-25"}, } ) assert resp["result"]["protocolVersion"] == "2025-11-25" def test_initialize_negotiates_older_supported_version(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "initialize", "id": 1, "params": {"protocolVersion": "2025-03-26"}, } ) assert resp["result"]["protocolVersion"] == "2025-03-26" def test_initialize_unknown_version_falls_back_to_latest(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "initialize", "id": 1, "params": {"protocolVersion": "9999-12-31"}, } ) from mempalace.mcp_server import SUPPORTED_PROTOCOL_VERSIONS assert resp["result"]["protocolVersion"] == SUPPORTED_PROTOCOL_VERSIONS[0] def test_initialize_missing_version_uses_oldest(self): from mempalace.mcp_server import handle_request, SUPPORTED_PROTOCOL_VERSIONS resp = handle_request({"method": "initialize", "id": 1, "params": {}}) assert resp["result"]["protocolVersion"] == SUPPORTED_PROTOCOL_VERSIONS[-1] def test_notifications_initialized_returns_none(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "notifications/initialized", "id": None, "params": {}}) assert resp is None def test_ping_returns_empty_result(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "ping", "id": 11, "params": {}}) assert resp["id"] == 11 assert resp["result"] == {} def test_tools_list(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "tools/list", "id": 2, "params": {}}) tools = resp["result"]["tools"] names = {t["name"] for t in tools} assert "mempalace_status" in names assert "mempalace_search" in names assert "mempalace_add_drawer" in names assert "mempalace_kg_add" in names def test_null_arguments_does_not_hang(self, monkeypatch, config, palace_path, seeded_kg): """Sending arguments: null should return a result, not hang (#394).""" _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import handle_request _client, _col = _get_collection(palace_path, create=True) del _client resp = handle_request( { "method": "tools/call", "id": 10, "params": {"name": "mempalace_status", "arguments": None}, } ) assert "error" not in resp assert resp["result"] is not None def test_unknown_tool(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "tools/call", "id": 3, "params": {"name": "nonexistent_tool", "arguments": {}}, } ) assert resp["error"]["code"] == -32601 def test_unknown_method(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "unknown/method", "id": 4, "params": {}}) assert resp["error"]["code"] == -32601 def test_any_notification_returns_none(self): """All notifications/* methods should return None (no response).""" from mempalace.mcp_server import handle_request for method in [ "notifications/initialized", "notifications/cancelled", "notifications/progress", "notifications/roots/list_changed", ]: resp = handle_request({"method": method, "params": {}}) assert resp is None, f"{method} should return None" def test_unknown_method_no_id_returns_none(self): """Messages without id (notifications) must never get a response.""" from mempalace.mcp_server import handle_request resp = handle_request({"method": "unknown/thing", "params": {}}) assert resp is None def test_malformed_method_none(self): """method=None or missing should not crash.""" from mempalace.mcp_server import handle_request # Explicit None resp = handle_request({"method": None, "params": {}}) assert resp is None # no id → no response # Missing method entirely resp = handle_request({"params": {}}) assert resp is None # method=None with id → should return error, not crash resp = handle_request({"method": None, "id": 99, "params": {}}) assert resp["error"]["code"] == -32601 def test_tools_call_dispatches(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import handle_request # Create a collection so status works _client, _col = _get_collection(palace_path, create=True) del _client resp = handle_request( { "method": "tools/call", "id": 5, "params": {"name": "mempalace_status", "arguments": {}}, } ) assert "result" in resp content = json.loads(resp["result"]["content"][0]["text"]) assert "total_drawers" in content # ── Read Tools ────────────────────────────────────────────────────────── class TestReadTools: def test_status_cold_start_no_collection(self, monkeypatch, config, palace_path, kg): """Status on a valid palace with no ChromaDB collection yet (#830). After `mempalace init`, chroma.sqlite3 exists but the mempalace_drawers collection has not been created (no mine or add_drawer yet). Status should return total_drawers: 0, not 'No palace found'. """ import chromadb _patch_mcp_server(monkeypatch, config, kg) # Create the DB file (init does this) but NOT the collection client = chromadb.PersistentClient(path=palace_path) del client from mempalace.mcp_server import tool_status result = tool_status() assert "error" not in result, f"cold-start should not error: {result}" assert result["total_drawers"] == 0 def test_status_empty_palace(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_status result = tool_status() assert result["total_drawers"] == 0 assert result["wings"] == {} def test_status_with_data(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_status result = tool_status() assert result["total_drawers"] == 4 assert "project" in result["wings"] assert "notes" in result["wings"] def test_status_handles_none_metadata_without_partial( self, monkeypatch, config, palace_path, kg ): """tool_status must not crash or go partial when the metadata cache returns a ``None`` entry — palaces can contain drawers with no metadata (older mining paths, third-party writes). Before the guard, ``m.get("wing")`` raised AttributeError mid-tally and the result carried ``"error"`` + ``"partial": True`` even though the data was perfectly fetchable.""" from unittest.mock import patch as _patch _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_status # Inject a metadata cache where one entry is None with _patch("mempalace.mcp_server._get_collection") as mock_get_col: fake_col = type("C", (), {"count": lambda self: 2})() mock_get_col.return_value = fake_col with _patch( "mempalace.mcp_server._get_cached_metadata", return_value=[{"wing": "proj", "room": "r"}, None], ): result = tool_status() # The None-metadata drawer falls under 'unknown/unknown' — no crash, # no partial flag. assert "error" not in result assert result.get("partial") is not True assert result["total_drawers"] == 2 assert result["wings"].get("proj") == 1 assert result["wings"].get("unknown") == 1 def test_list_wings(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_wings result = tool_list_wings() assert result["wings"]["project"] == 3 assert result["wings"]["notes"] == 1 def test_list_rooms_all(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_rooms result = tool_list_rooms() assert "backend" in result["rooms"] assert "frontend" in result["rooms"] assert "planning" in result["rooms"] def test_list_rooms_filtered(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_rooms result = tool_list_rooms(wing="project") assert "backend" in result["rooms"] assert "planning" not in result["rooms"] def test_get_taxonomy(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_get_taxonomy result = tool_get_taxonomy() assert result["taxonomy"]["project"]["backend"] == 2 assert result["taxonomy"]["project"]["frontend"] == 1 assert result["taxonomy"]["notes"]["planning"] == 1 def test_no_palace_returns_error(self, monkeypatch, config, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_status result = tool_status() assert "error" in result # ── Search Tool ───────────────────────────────────────────────────────── class TestSearchTool: def test_search_basic(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search result = tool_search(query="JWT authentication tokens") assert "results" in result assert len(result["results"]) > 0 # Top result should be the auth drawer top = result["results"][0] assert "JWT" in top["text"] or "authentication" in top["text"].lower() def test_search_with_wing_filter(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search result = tool_search(query="planning", wing="notes") assert all(r["wing"] == "notes" for r in result["results"]) def test_search_with_room_filter(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search result = tool_search(query="database", room="backend") assert all(r["room"] == "backend" for r in result["results"]) def test_search_min_similarity_backwards_compat( self, monkeypatch, config, palace_path, seeded_collection, kg ): """Old min_similarity param still works via backwards-compat shim.""" _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search # Old name should work result = tool_search(query="JWT", min_similarity=1.5) assert "results" in result # Old name takes precedence when both provided result_strict = tool_search(query="JWT", max_distance=999.0, min_similarity=0.01) result_loose = tool_search(query="JWT", max_distance=0.01, min_similarity=999.0) assert len(result_strict["results"]) <= len(result_loose["results"]) def test_list_rooms_rejects_invalid_wing(self, monkeypatch, config, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server monkeypatch.setattr(mcp_server, "_get_collection", lambda: pytest.fail()) result = mcp_server.tool_list_rooms(wing="../etc/passwd") assert "error" in result def test_search_rejects_invalid_room(self, monkeypatch, config, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server monkeypatch.setattr(mcp_server, "search_memories", lambda: pytest.fail()) result = mcp_server.tool_search(query="JWT", room="../backend") assert "error" in result def test_list_drawers_rejects_invalid_wing(self, monkeypatch, config, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server monkeypatch.setattr(mcp_server, "_get_collection", lambda: pytest.fail()) result = mcp_server.tool_list_drawers(wing="../notes") assert "error" in result def test_find_tunnels_rejects_invalid_wing(self, monkeypatch, config, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server monkeypatch.setattr(mcp_server, "_get_collection", lambda: pytest.fail()) result = mcp_server.tool_find_tunnels(wing_a="../project") assert "error" in result def test_wal_redacts_sensitive_fields(self, monkeypatch, config, kg, tmp_path): _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server wal_file = tmp_path / "write_log.jsonl" monkeypatch.setattr(mcp_server, "_WAL_FILE", wal_file) mcp_server._wal_log( "test", {"content": "secret note", "query": "private search", "safe": "ok"}, ) entry = json.loads(wal_file.read_text().strip()) assert entry["params"]["content"].startswith("[REDACTED") assert entry["params"]["query"].startswith("[REDACTED") assert entry["params"]["safe"] == "ok" # ── Write Tools ───────────────────────────────────────────────────────── class TestWriteTools: def test_add_drawer(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_add_drawer result = tool_add_drawer( wing="test_wing", room="test_room", content="This is a test memory about Python decorators and metaclasses.", ) assert result["success"] is True assert result["wing"] == "test_wing" assert result["room"] == "test_room" assert result["drawer_id"].startswith("drawer_test_wing_test_room_") def test_add_drawer_duplicate_detection(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_add_drawer content = "This is a unique test memory about Rust ownership and borrowing." result1 = tool_add_drawer(wing="w", room="r", content=content) assert result1["success"] is True result2 = tool_add_drawer(wing="w", room="r", content=content) assert result2["success"] is True assert result2["reason"] == "already_exists" def test_add_drawer_shared_header_no_collision(self, monkeypatch, config, palace_path, kg): """Documents sharing a >100-char header must get distinct IDs (full-content hash).""" _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_add_drawer header = "# ACME Corp Knowledge Base\n**Project:** Alpha | **Team:** Backend | **Status:** Active\n\n" doc1 = ( header + "Decision: Use PostgreSQL for primary storage. Rationale: ACID compliance required." ) doc2 = header + "Decision: Use Redis for session caching. Rationale: sub-ms latency needed." result1 = tool_add_drawer(wing="work", room="decisions", content=doc1) result2 = tool_add_drawer(wing="work", room="decisions", content=doc2) assert result1["success"] is True assert result2["success"] is True assert ( result1["drawer_id"] != result2["drawer_id"] ), "Documents with shared header but different content must have distinct drawer IDs" def test_delete_drawer(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_delete_drawer result = tool_delete_drawer("drawer_proj_backend_aaa") assert result["success"] is True assert seeded_collection.count() == 3 def test_delete_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_delete_drawer result = tool_delete_drawer("nonexistent_drawer") assert result["success"] is False def test_check_duplicate(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_check_duplicate # Exact match text from seeded_collection should be flagged result = tool_check_duplicate( "The authentication module uses JWT tokens for session management. " "Tokens expire after 24 hours. Refresh tokens are stored in HttpOnly cookies.", threshold=0.5, ) assert result["is_duplicate"] is True # Unrelated content should not be flagged result = tool_check_duplicate( "Black holes emit Hawking radiation at the event horizon.", threshold=0.99, ) assert result["is_duplicate"] is False def test_get_drawer(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_get_drawer result = tool_get_drawer("drawer_proj_backend_aaa") assert result["drawer_id"] == "drawer_proj_backend_aaa" assert result["wing"] == "project" assert result["room"] == "backend" assert "JWT tokens" in result["content"] def test_get_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_get_drawer result = tool_get_drawer("nonexistent_drawer") assert "error" in result def test_list_drawers(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers() assert result["count"] == 4 assert len(result["drawers"]) == 4 def test_list_drawers_with_wing_filter( self, monkeypatch, config, palace_path, seeded_collection, kg ): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(wing="project") assert result["count"] == 3 assert all(d["wing"] == "project" for d in result["drawers"]) def test_list_drawers_with_room_filter( self, monkeypatch, config, palace_path, seeded_collection, kg ): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(wing="project", room="backend") assert result["count"] == 2 assert all(d["room"] == "backend" for d in result["drawers"]) def test_list_drawers_pagination(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(limit=2, offset=0) assert result["count"] == 2 assert result["limit"] == 2 assert result["offset"] == 0 def test_list_drawers_negative_offset_clamped( self, monkeypatch, config, palace_path, seeded_collection, kg ): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(offset=-5) assert result["offset"] == 0 def test_update_drawer_content(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer, tool_get_drawer result = tool_update_drawer( "drawer_proj_backend_aaa", content="Updated content about auth." ) assert result["success"] is True fetched = tool_get_drawer("drawer_proj_backend_aaa") assert fetched["content"] == "Updated content about auth." def test_update_drawer_wing_and_room( self, monkeypatch, config, palace_path, seeded_collection, kg ): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer result = tool_update_drawer("drawer_proj_backend_aaa", wing="new_wing", room="new_room") assert result["success"] is True assert result["wing"] == "new_wing" assert result["room"] == "new_room" def test_update_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer result = tool_update_drawer("nonexistent_drawer", content="hello") assert result["success"] is False def test_update_drawer_noop(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer result = tool_update_drawer("drawer_proj_backend_aaa") assert result["success"] is True assert result.get("noop") is True # ── KG Tools ──────────────────────────────────────────────────────────── class TestKGTools: def test_kg_add(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_kg_add result = tool_kg_add( subject="Alice", predicate="likes", object="coffee", valid_from="2025-01-01", ) assert result["success"] is True def test_kg_query(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_query result = tool_kg_query(entity="Max") assert result["count"] > 0 def test_kg_invalidate(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_invalidate result = tool_kg_invalidate( subject="Max", predicate="does", object="chess", ended="2026-03-01", ) assert result["success"] is True def test_kg_timeline(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_timeline result = tool_kg_timeline(entity="Alice") assert result["count"] > 0 def test_kg_stats(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_stats result = tool_kg_stats() assert result["entities"] >= 4 # ── Diary Tools ───────────────────────────────────────────────────────── class TestDiaryTools: def test_diary_write_and_read(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_diary_write, tool_diary_read w = tool_diary_write( agent_name="TestAgent", entry="Today we discussed authentication patterns.", topic="architecture", ) assert w["success"] is True assert w["agent"] == "TestAgent" r = tool_diary_read(agent_name="TestAgent") assert r["total"] == 1 assert r["entries"][0]["topic"] == "architecture" assert "authentication" in r["entries"][0]["content"] def test_diary_read_empty(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_diary_read r = tool_diary_read(agent_name="Nobody") assert r["entries"] == [] def test_diary_write_same_second_shared_prefix_no_collision( self, monkeypatch, config, palace_path, kg ): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace import mcp_server class FrozenDateTime: calls = [ datetime(2026, 4, 13, 22, 15, 30, 123456), datetime(2026, 4, 13, 22, 15, 30, 123457), ] fallback = datetime(2026, 4, 13, 22, 15, 30, 123457) @classmethod def now(cls): if cls.calls: return cls.calls.pop(0) return cls.fallback monkeypatch.setattr(mcp_server, "datetime", FrozenDateTime) from mempalace.mcp_server import tool_diary_read, tool_diary_write entry1 = "A" * 50 + " entry one" entry2 = "A" * 50 + " entry two" result1 = tool_diary_write(agent_name="TestAgent", entry=entry1, topic="status") result2 = tool_diary_write(agent_name="TestAgent", entry=entry2, topic="status") assert result1["success"] is True assert result2["success"] is True assert result1["entry_id"] != result2["entry_id"] read_result = tool_diary_read(agent_name="TestAgent") contents = [entry["content"] for entry in read_result["entries"]] assert read_result["total"] == 2 assert entry1 in contents assert entry2 in contents def test_diary_read_empty_wing_spans_all_wings(self, monkeypatch, config, palace_path, kg): """diary_read(wing='') must return entries from every wing this agent wrote to. Hooks write to project-derived wings (#659); a reader that silos by default wing would never see those entries.""" _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_diary_read, tool_diary_write w1 = tool_diary_write( agent_name="TestAgent", entry="default-wing entry", topic="general", ) w2 = tool_diary_write( agent_name="TestAgent", entry="project-wing entry", topic="general", wing="wing_someproject", ) assert w1["success"] and w2["success"] # Empty wing → return both entries r = tool_diary_read(agent_name="TestAgent", wing="") assert r["total"] == 2 contents = {e["content"] for e in r["entries"]} assert "default-wing entry" in contents assert "project-wing entry" in contents # Explicit wing → return only that wing's entries r_scoped = tool_diary_read(agent_name="TestAgent", wing="wing_someproject") assert r_scoped["total"] == 1 assert r_scoped["entries"][0]["content"] == "project-wing entry" # ── Cache Invalidation (inode/mtime) ────────────────────────────────── class TestCacheInvalidation: """Tests for _get_collection inode/mtime cache invalidation logic.""" def test_mtime_change_invalidates_cache(self, monkeypatch, config, palace_path, kg): """When mtime changes, the cached collection should be replaced.""" _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server # Create a real collection so _get_collection succeeds _client, _col = _get_collection(palace_path, create=True) del _client # Prime the cache col1 = mcp_server._get_collection() assert col1 is not None # Simulate an external write changing the mtime old_mtime = mcp_server._palace_db_mtime monkeypatch.setattr(mcp_server, "_palace_db_mtime", old_mtime - 10.0) # _get_collection should detect the mtime drift and reconnect col2 = mcp_server._get_collection() assert col2 is not None def test_inode_change_invalidates_cache(self, monkeypatch, config, palace_path, kg): """When inode changes (file replaced), the cached collection should be replaced.""" _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server _client, _col = _get_collection(palace_path, create=True) del _client # Prime the cache col1 = mcp_server._get_collection() assert col1 is not None # Simulate a rebuild that changes the inode monkeypatch.setattr(mcp_server, "_palace_db_inode", 99999) col2 = mcp_server._get_collection() assert col2 is not None @pytest.mark.skipif( sys.platform == "win32", reason="Windows holds chroma.sqlite3 open while the client is cached, blocking os.remove", ) def test_missing_db_invalidates_cache(self, monkeypatch, config, palace_path, kg): """When chroma.sqlite3 disappears, a cached collection should be invalidated.""" _patch_mcp_server(monkeypatch, config, kg) import os from mempalace import mcp_server _client, _col = _get_collection(palace_path, create=True) del _client # Prime the cache col1 = mcp_server._get_collection() assert col1 is not None assert mcp_server._collection_cache is not None # Delete the DB file to simulate a rebuild in progress db_file = os.path.join(palace_path, "chroma.sqlite3") if os.path.isfile(db_file): os.remove(db_file) # Cache should be invalidated; _get_collection returns None # because the backend can't open a missing DB without create=True mcp_server._get_collection() # The key assertion: the old cached collection was dropped assert mcp_server._palace_db_inode == 0 assert mcp_server._palace_db_mtime == 0.0 def test_reconnect_reports_failure_when_no_palace(self, monkeypatch, config, kg): """tool_reconnect should report failure when no collection is available.""" _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server # Make _get_collection always return None monkeypatch.setattr(mcp_server, "_get_collection", lambda create=False: None) result = mcp_server.tool_reconnect() assert result["success"] is False assert "No palace found" in result["message"] assert result["drawers"] == 0 def test_reconnect_reports_success(self, monkeypatch, config, palace_path, kg): """tool_reconnect should report success with drawer count.""" _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace import mcp_server result = mcp_server.tool_reconnect() assert result["success"] is True assert "Reconnected" in result["message"] assert isinstance(result["drawers"], int) def test_get_collection_create_true_avoids_get_or_create_on_reopen( self, monkeypatch, config, palace_path, kg ): """Regression for the MCP-server half of #1262. ChromaDB 1.5.x's Rust bindings SIGSEGV when ``client.get_or_create_collection`` is called with metadata that differs from the collection's stored metadata. The Stop hook path (``tool_diary_write`` -> ``_get_collection(create=True)``) was reaching that codepath on every session-end; #1262 fixed the equivalent crash class in ``ChromaBackend`` but left this site untouched. ``_get_collection(create=True)`` must call ``client.get_collection`` first and only fall back to ``client.create_collection`` when the collection does not yet exist on disk. """ _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server col1 = mcp_server._get_collection(create=True) assert col1 is not None client = mcp_server._client_cache assert client is not None # Patch at the class level — chromadb's mtime-change detection # may rebuild the client between calls, so an instance-level # spy would not survive. client_cls = type(client) calls: list[tuple] = [] def _spy(self, *args, **kwargs): calls.append((args, kwargs)) raise AssertionError( "get_or_create_collection must not be called on reopen " "(SIGSEGV path on metadata mismatch)" ) monkeypatch.setattr(client_cls, "get_or_create_collection", _spy) mcp_server._collection_cache = None col2 = mcp_server._get_collection(create=True) assert col2 is not None assert calls == [], f"get_or_create_collection was called: {calls}" def test_get_collection_passes_embedding_function(self, monkeypatch, config, palace_path, kg): """Regression for #1299. ``mcp_server._get_collection`` must pass ``embedding_function=`` into both ``client.get_collection`` and ``client.create_collection``, mirroring ``ChromaBackend.get_collection``. Without it, ChromaDB 1.x falls back to its built-in ``DefaultEmbeddingFunction`` (whose lazy ONNX provider selection has SIGSEGV'd on python 3.14 + Apple Silicon), and writers/readers can disagree with the miner about which EF is bound to the collection. The miner / Stop hook ingest path routes through ``ChromaBackend.get_collection`` which does this correctly; the MCP server must match. """ _patch_mcp_server(monkeypatch, config, kg) from mempalace import mcp_server client = mcp_server._get_client() client_cls = type(client) captured: dict[str, list[dict]] = {"get": [], "create": []} real_get = client_cls.get_collection real_create = client_cls.create_collection def _spy_get(self, name, **kwargs): captured["get"].append(dict(kwargs)) return real_get(self, name, **kwargs) def _spy_create(self, name, **kwargs): captured["create"].append(dict(kwargs)) return real_create(self, name, **kwargs) monkeypatch.setattr(client_cls, "get_collection", _spy_get) monkeypatch.setattr(client_cls, "create_collection", _spy_create) mcp_server._collection_cache = None col = mcp_server._get_collection(create=True) assert col is not None all_calls = captured["get"] + captured["create"] assert all_calls, "expected get_collection or create_collection to be called" for kwargs in all_calls: assert ( "embedding_function" in kwargs ), f"missing embedding_function= in chromadb call: {kwargs}" assert kwargs["embedding_function"] is not None # Same expectation on the create=False (cache-miss) reopen path. mcp_server._collection_cache = None captured["get"].clear() captured["create"].clear() col2 = mcp_server._get_collection() assert col2 is not None assert captured["get"], "expected get_collection on cache-miss reopen" for kwargs in captured["get"]: assert "embedding_function" in kwargs assert kwargs["embedding_function"] is not None