fix(mcp): retry KG handlers once on concurrent close race
Race scenario: a KG tool handler calls _get_kg() and gets a live KnowledgeGraph; another thread fires tool_reconnect() between that return and the handler's kg.add_triple()/kg.query_entity()/etc call. tool_reconnect drains _kg_by_path and closes the underlying sqlite3.Connection; the handler then raises sqlite3.ProgrammingError: 'Cannot operate on a closed database', which surfaces as a -32000 to the MCP client even though the user just asked for a reconnect. New _call_kg(op) helper wraps each handler's kg call in a one-shot retry: catch exactly sqlite3.ProgrammingError, evict the stale entry (only if the cache slot still points at the closed instance — another thread may have already replaced it), and rerun op against a fresh _get_kg(). Beyond one retry give up so a sustained close-stream surfaces clearly instead of looping. All five KG handlers (tool_kg_query, tool_kg_add, tool_kg_invalidate, tool_kg_timeline, tool_kg_stats) now route through _call_kg. Tests pin the contract: * retries with a fresh KG and returns the second result * non-ProgrammingError exceptions propagate without retry * gives up after exactly one retry on sustained close
This commit is contained in:
+40
-5
@@ -46,6 +46,7 @@ import argparse # noqa: E402 (deferred until after stdio protection above)
|
|||||||
import json # noqa: E402
|
import json # noqa: E402
|
||||||
import logging # noqa: E402
|
import logging # noqa: E402
|
||||||
import hashlib # noqa: E402
|
import hashlib # noqa: E402
|
||||||
|
import sqlite3 # noqa: E402
|
||||||
import threading # noqa: E402
|
import threading # noqa: E402
|
||||||
import time # noqa: E402
|
import time # noqa: E402
|
||||||
from datetime import date, datetime # noqa: E402
|
from datetime import date, datetime # noqa: E402
|
||||||
@@ -129,6 +130,38 @@ def _get_kg() -> KnowledgeGraph:
|
|||||||
return kg
|
return kg
|
||||||
|
|
||||||
|
|
||||||
|
def _call_kg(op):
|
||||||
|
"""Run ``op(kg)`` against the cached KG with one-shot retry on close.
|
||||||
|
|
||||||
|
Race we're guarding against: a handler grabs ``kg = _get_kg()`` and is
|
||||||
|
about to call ``kg.add_triple(...)`` when ``tool_reconnect`` fires on
|
||||||
|
another thread, drains ``_kg_by_path``, and closes the underlying
|
||||||
|
sqlite3.Connection. The handler's call then raises
|
||||||
|
``sqlite3.ProgrammingError: Cannot operate on a closed database`` and
|
||||||
|
bubbles up as a -32000 to the MCP client even though the user just
|
||||||
|
asked for a reconnect.
|
||||||
|
|
||||||
|
Catch that single class of error, evict the stale entry from the
|
||||||
|
cache (only if it still points at the closed instance — another
|
||||||
|
thread may have already replaced it), and try once more with a fresh
|
||||||
|
KG. Beyond one retry give up: a second close means we're losing a
|
||||||
|
sustained race we won't win in this loop, and a hung loop is worse
|
||||||
|
than a clear failure surface.
|
||||||
|
"""
|
||||||
|
for attempt in range(2):
|
||||||
|
kg = _get_kg()
|
||||||
|
try:
|
||||||
|
return op(kg)
|
||||||
|
except sqlite3.ProgrammingError:
|
||||||
|
if attempt == 0:
|
||||||
|
path = os.path.abspath(_resolve_kg_path())
|
||||||
|
with _kg_cache_lock:
|
||||||
|
if _kg_by_path.get(path) is kg:
|
||||||
|
_kg_by_path.pop(path, None)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
_client_cache = None
|
_client_cache = None
|
||||||
_collection_cache = None
|
_collection_cache = None
|
||||||
_palace_db_inode = 0 # inode of chroma.sqlite3 at cache time
|
_palace_db_inode = 0 # inode of chroma.sqlite3 at cache time
|
||||||
@@ -1081,7 +1114,7 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"):
|
|||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
if direction not in ("outgoing", "incoming", "both"):
|
if direction not in ("outgoing", "incoming", "both"):
|
||||||
return {"error": "direction must be 'outgoing', 'incoming', or 'both'"}
|
return {"error": "direction must be 'outgoing', 'incoming', or 'both'"}
|
||||||
results = _get_kg().query_entity(entity, as_of=as_of, direction=direction)
|
results = _call_kg(lambda kg: kg.query_entity(entity, as_of=as_of, direction=direction))
|
||||||
return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)}
|
return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)}
|
||||||
|
|
||||||
|
|
||||||
@@ -1126,7 +1159,8 @@ def tool_kg_add(
|
|||||||
"source_drawer_id": source_drawer_id,
|
"source_drawer_id": source_drawer_id,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
triple_id = _get_kg().add_triple(
|
triple_id = _call_kg(
|
||||||
|
lambda kg: kg.add_triple(
|
||||||
subject,
|
subject,
|
||||||
predicate,
|
predicate,
|
||||||
object,
|
object,
|
||||||
@@ -1136,6 +1170,7 @@ def tool_kg_add(
|
|||||||
source_file=source_file,
|
source_file=source_file,
|
||||||
source_drawer_id=source_drawer_id,
|
source_drawer_id=source_drawer_id,
|
||||||
)
|
)
|
||||||
|
)
|
||||||
return {"success": True, "triple_id": triple_id, "fact": f"{subject} → {predicate} → {object}"}
|
return {"success": True, "triple_id": triple_id, "fact": f"{subject} → {predicate} → {object}"}
|
||||||
|
|
||||||
|
|
||||||
@@ -1165,7 +1200,7 @@ def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = N
|
|||||||
"ended": resolved_ended,
|
"ended": resolved_ended,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
_get_kg().invalidate(subject, predicate, object, ended=resolved_ended)
|
_call_kg(lambda kg: kg.invalidate(subject, predicate, object, ended=resolved_ended))
|
||||||
return {
|
return {
|
||||||
"success": True,
|
"success": True,
|
||||||
"fact": f"{subject} → {predicate} → {object}",
|
"fact": f"{subject} → {predicate} → {object}",
|
||||||
@@ -1180,13 +1215,13 @@ def tool_kg_timeline(entity: str = None):
|
|||||||
entity = sanitize_kg_value(entity, "entity")
|
entity = sanitize_kg_value(entity, "entity")
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
results = _get_kg().timeline(entity)
|
results = _call_kg(lambda kg: kg.timeline(entity))
|
||||||
return {"entity": entity or "all", "timeline": results, "count": len(results)}
|
return {"entity": entity or "all", "timeline": results, "count": len(results)}
|
||||||
|
|
||||||
|
|
||||||
def tool_kg_stats():
|
def tool_kg_stats():
|
||||||
"""Knowledge graph overview: entities, triples, relationship types."""
|
"""Knowledge graph overview: entities, triples, relationship types."""
|
||||||
return _get_kg().stats()
|
return _call_kg(lambda kg: kg.stats())
|
||||||
|
|
||||||
|
|
||||||
# ==================== AGENT DIARY ====================
|
# ==================== AGENT DIARY ====================
|
||||||
|
|||||||
@@ -1295,3 +1295,80 @@ class TestKGLazyCache:
|
|||||||
mcp_server.tool_reconnect()
|
mcp_server.tool_reconnect()
|
||||||
|
|
||||||
assert mcp_server._kg_by_path == {}
|
assert mcp_server._kg_by_path == {}
|
||||||
|
|
||||||
|
def test_call_kg_retries_after_concurrent_close(self, monkeypatch):
|
||||||
|
"""A KG closed mid-handler must trigger a one-shot retry with a fresh
|
||||||
|
instance — not surface a -32000 to the MCP client."""
|
||||||
|
import sqlite3 as _sqlite3
|
||||||
|
|
||||||
|
from mempalace import mcp_server
|
||||||
|
|
||||||
|
path = "/fake/palace/knowledge_graph.sqlite3"
|
||||||
|
monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path)
|
||||||
|
|
||||||
|
class _ClosedKG:
|
||||||
|
def query_entity(self, entity, **kwargs):
|
||||||
|
raise _sqlite3.ProgrammingError("Cannot operate on a closed database")
|
||||||
|
|
||||||
|
class _FreshKG:
|
||||||
|
def query_entity(self, entity, **kwargs):
|
||||||
|
return [{"entity": entity}]
|
||||||
|
|
||||||
|
cache = {os.path.abspath(path): _ClosedKG()}
|
||||||
|
monkeypatch.setattr(mcp_server, "_kg_by_path", cache)
|
||||||
|
|
||||||
|
# Second _get_kg() call (after the cache eviction) constructs a new
|
||||||
|
# KG. Patch the constructor so we don't open a real sqlite file.
|
||||||
|
monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FreshKG())
|
||||||
|
|
||||||
|
result = mcp_server._call_kg(lambda kg: kg.query_entity("Alice"))
|
||||||
|
assert result == [{"entity": "Alice"}]
|
||||||
|
# The closed instance must be evicted; the fresh one must be cached.
|
||||||
|
assert isinstance(cache[os.path.abspath(path)], _FreshKG)
|
||||||
|
|
||||||
|
def test_call_kg_does_not_retry_on_other_errors(self, monkeypatch):
|
||||||
|
"""Non-ProgrammingError exceptions must propagate without retry —
|
||||||
|
we don't want the retry guard masking real bugs."""
|
||||||
|
from mempalace import mcp_server
|
||||||
|
|
||||||
|
path = "/fake/palace/knowledge_graph.sqlite3"
|
||||||
|
monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path)
|
||||||
|
|
||||||
|
calls = {"count": 0}
|
||||||
|
|
||||||
|
class _FailingKG:
|
||||||
|
def query_entity(self, entity, **kwargs):
|
||||||
|
calls["count"] += 1
|
||||||
|
raise ValueError("bad input")
|
||||||
|
|
||||||
|
monkeypatch.setattr(mcp_server, "_kg_by_path", {os.path.abspath(path): _FailingKG()})
|
||||||
|
monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FailingKG())
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="bad input"):
|
||||||
|
mcp_server._call_kg(lambda kg: kg.query_entity("Alice"))
|
||||||
|
assert calls["count"] == 1, "non-ProgrammingError must not trigger retry"
|
||||||
|
|
||||||
|
def test_call_kg_gives_up_after_one_retry(self, monkeypatch):
|
||||||
|
"""If the second attempt also hits a closed DB, give up rather than
|
||||||
|
loop forever — a sustained close-stream is a different bug."""
|
||||||
|
import sqlite3 as _sqlite3
|
||||||
|
|
||||||
|
from mempalace import mcp_server
|
||||||
|
|
||||||
|
path = "/fake/palace/knowledge_graph.sqlite3"
|
||||||
|
monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path)
|
||||||
|
|
||||||
|
calls = {"count": 0}
|
||||||
|
|
||||||
|
class _AlwaysClosedKG:
|
||||||
|
def query_entity(self, entity, **kwargs):
|
||||||
|
calls["count"] += 1
|
||||||
|
raise _sqlite3.ProgrammingError("closed again")
|
||||||
|
|
||||||
|
cache = {}
|
||||||
|
monkeypatch.setattr(mcp_server, "_kg_by_path", cache)
|
||||||
|
monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _AlwaysClosedKG())
|
||||||
|
|
||||||
|
with pytest.raises(_sqlite3.ProgrammingError):
|
||||||
|
mcp_server._call_kg(lambda kg: kg.query_entity("Alice"))
|
||||||
|
assert calls["count"] == 2, "expected exactly one retry beyond the initial attempt"
|
||||||
|
|||||||
Reference in New Issue
Block a user