From b8816e0fe2fa857efb984e0aff9e52739fb2af34 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Sun, 3 May 2026 21:43:51 +0500 Subject: [PATCH] fix(mcp): retry KG handlers once on concurrent close race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Race scenario: a KG tool handler calls _get_kg() and gets a live KnowledgeGraph; another thread fires tool_reconnect() between that return and the handler's kg.add_triple()/kg.query_entity()/etc call. tool_reconnect drains _kg_by_path and closes the underlying sqlite3.Connection; the handler then raises sqlite3.ProgrammingError: 'Cannot operate on a closed database', which surfaces as a -32000 to the MCP client even though the user just asked for a reconnect. New _call_kg(op) helper wraps each handler's kg call in a one-shot retry: catch exactly sqlite3.ProgrammingError, evict the stale entry (only if the cache slot still points at the closed instance — another thread may have already replaced it), and rerun op against a fresh _get_kg(). Beyond one retry give up so a sustained close-stream surfaces clearly instead of looping. All five KG handlers (tool_kg_query, tool_kg_add, tool_kg_invalidate, tool_kg_timeline, tool_kg_stats) now route through _call_kg. Tests pin the contract: * retries with a fresh KG and returns the second result * non-ProgrammingError exceptions propagate without retry * gives up after exactly one retry on sustained close --- mempalace/mcp_server.py | 61 ++++++++++++++++++++++++------- tests/test_mcp_server.py | 77 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 13 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 3b3b4e2..c67619e 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -46,6 +46,7 @@ import argparse # noqa: E402 (deferred until after stdio protection above) import json # noqa: E402 import logging # noqa: E402 import hashlib # noqa: E402 +import sqlite3 # noqa: E402 import threading # noqa: E402 import time # noqa: E402 from datetime import date, datetime # noqa: E402 @@ -129,6 +130,38 @@ def _get_kg() -> KnowledgeGraph: return kg +def _call_kg(op): + """Run ``op(kg)`` against the cached KG with one-shot retry on close. + + Race we're guarding against: a handler grabs ``kg = _get_kg()`` and is + about to call ``kg.add_triple(...)`` when ``tool_reconnect`` fires on + another thread, drains ``_kg_by_path``, and closes the underlying + sqlite3.Connection. The handler's call then raises + ``sqlite3.ProgrammingError: Cannot operate on a closed database`` and + bubbles up as a -32000 to the MCP client even though the user just + asked for a reconnect. + + Catch that single class of error, evict the stale entry from the + cache (only if it still points at the closed instance — another + thread may have already replaced it), and try once more with a fresh + KG. Beyond one retry give up: a second close means we're losing a + sustained race we won't win in this loop, and a hung loop is worse + than a clear failure surface. + """ + for attempt in range(2): + kg = _get_kg() + try: + return op(kg) + except sqlite3.ProgrammingError: + if attempt == 0: + path = os.path.abspath(_resolve_kg_path()) + with _kg_cache_lock: + if _kg_by_path.get(path) is kg: + _kg_by_path.pop(path, None) + continue + raise + + _client_cache = None _collection_cache = None _palace_db_inode = 0 # inode of chroma.sqlite3 at cache time @@ -1081,7 +1114,7 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"): return {"error": str(e)} if direction not in ("outgoing", "incoming", "both"): return {"error": "direction must be 'outgoing', 'incoming', or 'both'"} - results = _get_kg().query_entity(entity, as_of=as_of, direction=direction) + results = _call_kg(lambda kg: kg.query_entity(entity, as_of=as_of, direction=direction)) return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)} @@ -1126,15 +1159,17 @@ def tool_kg_add( "source_drawer_id": source_drawer_id, }, ) - triple_id = _get_kg().add_triple( - subject, - predicate, - object, - valid_from=valid_from, - valid_to=valid_to, - source_closet=source_closet, - source_file=source_file, - source_drawer_id=source_drawer_id, + triple_id = _call_kg( + lambda kg: kg.add_triple( + subject, + predicate, + object, + valid_from=valid_from, + valid_to=valid_to, + source_closet=source_closet, + source_file=source_file, + source_drawer_id=source_drawer_id, + ) ) return {"success": True, "triple_id": triple_id, "fact": f"{subject} → {predicate} → {object}"} @@ -1165,7 +1200,7 @@ def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = N "ended": resolved_ended, }, ) - _get_kg().invalidate(subject, predicate, object, ended=resolved_ended) + _call_kg(lambda kg: kg.invalidate(subject, predicate, object, ended=resolved_ended)) return { "success": True, "fact": f"{subject} → {predicate} → {object}", @@ -1180,13 +1215,13 @@ def tool_kg_timeline(entity: str = None): entity = sanitize_kg_value(entity, "entity") except ValueError as e: return {"error": str(e)} - results = _get_kg().timeline(entity) + results = _call_kg(lambda kg: kg.timeline(entity)) return {"entity": entity or "all", "timeline": results, "count": len(results)} def tool_kg_stats(): """Knowledge graph overview: entities, triples, relationship types.""" - return _get_kg().stats() + return _call_kg(lambda kg: kg.stats()) # ==================== AGENT DIARY ==================== diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 092b707..e365afc 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1295,3 +1295,80 @@ class TestKGLazyCache: mcp_server.tool_reconnect() assert mcp_server._kg_by_path == {} + + def test_call_kg_retries_after_concurrent_close(self, monkeypatch): + """A KG closed mid-handler must trigger a one-shot retry with a fresh + instance — not surface a -32000 to the MCP client.""" + import sqlite3 as _sqlite3 + + from mempalace import mcp_server + + path = "/fake/palace/knowledge_graph.sqlite3" + monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path) + + class _ClosedKG: + def query_entity(self, entity, **kwargs): + raise _sqlite3.ProgrammingError("Cannot operate on a closed database") + + class _FreshKG: + def query_entity(self, entity, **kwargs): + return [{"entity": entity}] + + cache = {os.path.abspath(path): _ClosedKG()} + monkeypatch.setattr(mcp_server, "_kg_by_path", cache) + + # Second _get_kg() call (after the cache eviction) constructs a new + # KG. Patch the constructor so we don't open a real sqlite file. + monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FreshKG()) + + result = mcp_server._call_kg(lambda kg: kg.query_entity("Alice")) + assert result == [{"entity": "Alice"}] + # The closed instance must be evicted; the fresh one must be cached. + assert isinstance(cache[os.path.abspath(path)], _FreshKG) + + def test_call_kg_does_not_retry_on_other_errors(self, monkeypatch): + """Non-ProgrammingError exceptions must propagate without retry — + we don't want the retry guard masking real bugs.""" + from mempalace import mcp_server + + path = "/fake/palace/knowledge_graph.sqlite3" + monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path) + + calls = {"count": 0} + + class _FailingKG: + def query_entity(self, entity, **kwargs): + calls["count"] += 1 + raise ValueError("bad input") + + monkeypatch.setattr(mcp_server, "_kg_by_path", {os.path.abspath(path): _FailingKG()}) + monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FailingKG()) + + with pytest.raises(ValueError, match="bad input"): + mcp_server._call_kg(lambda kg: kg.query_entity("Alice")) + assert calls["count"] == 1, "non-ProgrammingError must not trigger retry" + + def test_call_kg_gives_up_after_one_retry(self, monkeypatch): + """If the second attempt also hits a closed DB, give up rather than + loop forever — a sustained close-stream is a different bug.""" + import sqlite3 as _sqlite3 + + from mempalace import mcp_server + + path = "/fake/palace/knowledge_graph.sqlite3" + monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path) + + calls = {"count": 0} + + class _AlwaysClosedKG: + def query_entity(self, entity, **kwargs): + calls["count"] += 1 + raise _sqlite3.ProgrammingError("closed again") + + cache = {} + monkeypatch.setattr(mcp_server, "_kg_by_path", cache) + monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _AlwaysClosedKG()) + + with pytest.raises(_sqlite3.ProgrammingError): + mcp_server._call_kg(lambda kg: kg.query_entity("Alice")) + assert calls["count"] == 2, "expected exactly one retry beyond the initial attempt"