diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index c36e2c7..f701b17 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -130,6 +130,37 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 3600.0) -> li return moved +def _pin_hnsw_threads(collection) -> None: + """Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection. + + Fresh collections set this via ``metadata=`` at creation. Legacy palaces + built before that change keep the default (parallel insert) and can hit + the HNSW race described in #974/#965. ChromaDB's + ``collection.modify(configuration=...)`` lets us re-apply ``num_threads=1`` + in memory at load time so every new process is protected. + + Note: in chromadb 1.5.x the modified ``configuration_json["hnsw"]`` does + not persist to disk across ``PersistentClient`` reopens, so this must + run on every ``get_collection`` call, not just once. + """ + try: + from chromadb.api.collection_configuration import ( + UpdateCollectionConfiguration, + UpdateHNSWConfiguration, + ) + except ImportError: + logger.debug("_pin_hnsw_threads skipped: chromadb too old", exc_info=True) + return + try: + collection.modify( + configuration=UpdateCollectionConfiguration( + hnsw=UpdateHNSWConfiguration(num_threads=1) + ) + ) + except Exception: + logger.debug("_pin_hnsw_threads modify failed", exc_info=True) + + def _fix_blob_seq_ids(palace_path: str) -> None: """Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER. @@ -572,6 +603,7 @@ class ChromaBackend(BaseBackend): ) else: collection = client.get_collection(collection_name, **ef_kwargs) + _pin_hnsw_threads(collection) return ChromaCollection(collection) def close_palace(self, palace) -> None: diff --git a/mempalace/hooks_cli.py b/mempalace/hooks_cli.py index bdcc97d..01eca3f 100644 --- a/mempalace/hooks_cli.py +++ b/mempalace/hooks_cli.py @@ -643,9 +643,6 @@ def hook_session_start(data: dict, harness: str): _output({}) -MAX_PRECOMPACT_BLOCK_ATTEMPTS = 2 - - def hook_precompact(data: dict, harness: str): """Precompact hook: mine transcript synchronously, then allow compaction.""" parsed = _parse_harness_input(data, harness) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index beb870d..485bbe5 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -57,7 +57,7 @@ from .config import ( # noqa: E402 sanitize_content, ) from .version import __version__ # noqa: E402 -from .backends.chroma import ChromaBackend, ChromaCollection # noqa: E402 +from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402 from .query_sanitizer import sanitize_query # noqa: E402 from .searcher import search_memories # noqa: E402 from .palace_graph import ( # noqa: E402 @@ -219,20 +219,23 @@ def _get_collection(create=False): if create: # hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor # HNSW insert path, which has a race in repairConnectionsForUpdate / - # addPoint (see issues #974, #965). The setting is only honored at - # collection creation time — pre-existing palaces created before - # this fix keep the unsafe default; users must `mempalace nuke` + - # re-mine to get the protection on legacy palaces. - _collection_cache = ChromaCollection( - client.get_or_create_collection( - _config.collection_name, - metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, - ) + # addPoint (see issues #974, #965). Set via metadata on fresh + # collections and re-applied via _pin_hnsw_threads() for legacy + # palaces whose collections were created before this fix (the + # runtime config does not persist cross-process in chromadb 1.5.x, + # so the retrofit runs every time _get_collection opens a cache). + raw = client.get_or_create_collection( + _config.collection_name, + metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) + _pin_hnsw_threads(raw) + _collection_cache = ChromaCollection(raw) _metadata_cache = None _metadata_cache_time = 0 elif _collection_cache is None: - _collection_cache = ChromaCollection(client.get_collection(_config.collection_name)) + raw = client.get_collection(_config.collection_name) + _pin_hnsw_threads(raw) + _collection_cache = ChromaCollection(raw) _metadata_cache = None _metadata_cache_time = 0 return _collection_cache diff --git a/tests/test_backends.py b/tests/test_backends.py index b3f009a..780671b 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -16,6 +16,7 @@ from mempalace.backends.chroma import ( ChromaBackend, ChromaCollection, _fix_blob_seq_ids, + _pin_hnsw_threads, quarantine_stale_hnsw, ) @@ -443,3 +444,56 @@ def test_quarantine_stale_hnsw_skips_already_quarantined(tmp_path): moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0) assert moved == [] assert drift.exists() + + +# ── _pin_hnsw_threads ───────────────────────────────────────────────────── + + +def test_pin_hnsw_threads_retrofits_legacy_collection(tmp_path): + """Legacy collections (created without num_threads) get the retrofit applied.""" + palace_path = tmp_path / "legacy-palace" + palace_path.mkdir() + + client = chromadb.PersistentClient(path=str(palace_path)) + col = client.create_collection( + "mempalace_drawers", + metadata={"hnsw:space": "cosine"}, # no num_threads — legacy + ) + assert col.configuration_json.get("hnsw", {}).get("num_threads") is None + + _pin_hnsw_threads(col) + + assert col.configuration_json["hnsw"]["num_threads"] == 1 + + +def test_pin_hnsw_threads_swallows_all_errors(): + """Retrofit never raises even when collection.modify explodes.""" + + class _ExplodingCollection: + def modify(self, *args, **kwargs): + raise RuntimeError("boom") + + _pin_hnsw_threads(_ExplodingCollection()) # must not raise + + +def test_get_collection_applies_retrofit_on_existing_palace(tmp_path): + """ChromaBackend.get_collection(create=False) applies the retrofit.""" + palace_path = tmp_path / "palace" + palace_path.mkdir() + + # Simulate a legacy palace: create collection without num_threads + bootstrap_client = chromadb.PersistentClient(path=str(palace_path)) + bootstrap_client.create_collection( + "mempalace_drawers", metadata={"hnsw:space": "cosine"} + ) + del bootstrap_client # drop reference so a fresh client reopens cleanly + + wrapper = ChromaBackend().get_collection( + str(palace_path), + collection_name="mempalace_drawers", + create=False, + ) + + assert ( + wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1 + )