From 7e18a707963a0c96653bfdb04fd56eff56964549 Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 17:14:22 -0300 Subject: [PATCH 1/6] fix: resolve hooks_cli.py merge conflict + add mine_global_lock tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Resolve UU conflict in hooks_cli.py: take develop/HEAD approach (mine synchronously via _mine_sync, then pass through unconditionally). _mine_sync already catches subprocess.TimeoutExpired — fixes Copilot #1. - Add tests/test_palace_locks.py: 4 tests covering mine_global_lock non-blocking semantics (acquire, second-acquire raises MineAlreadyRunning, reusable after release, release on exception) — fixes Copilot #4. Co-Authored-By: Claude Sonnet 4.6 --- mempalace/backends/chroma.py | 8 +++-- mempalace/hooks_cli.py | 3 ++ mempalace/miner.py | 44 +++++++++++++++++++++++++ mempalace/palace.py | 63 ++++++++++++++++++++++++++++++++++++ tests/test_palace_locks.py | 59 +++++++++++++++++++++++++++++++++ 5 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 tests/test_palace_locks.py diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index 14ae9cd..c36e2c7 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -566,7 +566,9 @@ class ChromaBackend(BaseBackend): if create: collection = client.get_or_create_collection( - collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs + collection_name, + metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, + **ef_kwargs, ) else: collection = client.get_collection(collection_name, **ef_kwargs) @@ -613,7 +615,9 @@ class ChromaBackend(BaseBackend): ef = self._resolve_embedding_function() ef_kwargs = {"embedding_function": ef} if ef is not None else {} collection = self._client(palace_path).create_collection( - collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs + collection_name, + metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, + **ef_kwargs, ) return ChromaCollection(collection) diff --git a/mempalace/hooks_cli.py b/mempalace/hooks_cli.py index 01eca3f..bdcc97d 100644 --- a/mempalace/hooks_cli.py +++ b/mempalace/hooks_cli.py @@ -643,6 +643,9 @@ def hook_session_start(data: dict, harness: str): _output({}) +MAX_PRECOMPACT_BLOCK_ATTEMPTS = 2 + + def hook_precompact(data: dict, harness: str): """Precompact hook: mine transcript synchronously, then allow compaction.""" parsed = _parse_harness_input(data, harness) diff --git a/mempalace/miner.py b/mempalace/miner.py index b593797..2fde777 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -20,10 +20,12 @@ from typing import Optional from .palace import ( NORMALIZE_VERSION, SKIP_DIRS, + MineAlreadyRunning, build_closet_lines, file_already_mined, get_closets_collection, get_collection, + mine_global_lock, mine_lock, purge_file_closets, upsert_closet_lines, @@ -993,6 +995,48 @@ def mine( ``mine`` walks the tree itself just like before. """ + if dry_run: + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + ) + + try: + with mine_global_lock(): + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + ) + except MineAlreadyRunning: + print( + "mempalace: another `mine` is already running — exiting cleanly.", + file=sys.stderr, + ) + return + + +def _mine_impl( + project_dir: str, + palace_path: str, + wing_override: str = None, + agent: str = "mempalace", + limit: int = 0, + dry_run: bool = False, + respect_gitignore: bool = True, + include_ignored: list = None, +): project_path = Path(project_dir).expanduser().resolve() config = load_config(project_dir) diff --git a/mempalace/palace.py b/mempalace/palace.py index a2a4a8e..76a037e 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -310,6 +310,69 @@ def mine_lock(source_file: str): lf.close() +class MineAlreadyRunning(RuntimeError): + """Raised when another `mempalace mine` process already holds the global lock.""" + + +@contextlib.contextmanager +def mine_global_lock(): + """Process-wide non-blocking lock around the full `mine` pipeline. + + The per-file `mine_lock` only protects delete+insert interleave for a + single source; it does not prevent N copies of `mempalace mine ` + from being spawned concurrently by hooks. When that happens, each copy + drives ChromaDB HNSW inserts in parallel, which (combined with + chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and + produce sparse link_lists.bin blowups. + + This lock is non-blocking: if another `mine` is already running, we + raise MineAlreadyRunning so the caller can exit cleanly instead of + piling up waiting workers. + """ + lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") + os.makedirs(lock_dir, exist_ok=True) + lock_path = os.path.join(lock_dir, "mine_global.lock") + + lf = open(lock_path, "w") + acquired = False + try: + if os.name == "nt": + import msvcrt + + try: + msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1) + acquired = True + except OSError as exc: + raise MineAlreadyRunning( + "another `mempalace mine` is already running" + ) from exc + else: + import fcntl + + try: + fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB) + acquired = True + except BlockingIOError as exc: + raise MineAlreadyRunning( + "another `mempalace mine` is already running" + ) from exc + yield + finally: + if acquired: + try: + if os.name == "nt": + import msvcrt + + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + + fcntl.flock(lf, fcntl.LOCK_UN) + except Exception: + pass + lf.close() + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py new file mode 100644 index 0000000..5b94a5d --- /dev/null +++ b/tests/test_palace_locks.py @@ -0,0 +1,59 @@ +"""Tests for mine_global_lock: non-blocking cross-process lock semantics.""" +import threading + +import pytest + +from mempalace.palace import MineAlreadyRunning, mine_global_lock + + +def test_mine_global_lock_acquired(tmp_path, monkeypatch): + """Lock is acquired and released without error.""" + monkeypatch.setenv("HOME", str(tmp_path)) + with mine_global_lock(): + pass # should not raise + + +def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch): + """Concurrent second acquire raises MineAlreadyRunning.""" + monkeypatch.setenv("HOME", str(tmp_path)) + results: list[str] = [] + + with mine_global_lock(): + # While this lock is held, spawn a thread that tries to acquire. + def try_acquire(): + try: + with mine_global_lock(): + results.append("acquired") + except MineAlreadyRunning: + results.append("blocked") + + t = threading.Thread(target=try_acquire) + t.start() + t.join(timeout=5) + + assert results == ["blocked"] + + +def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch): + """Lock can be re-acquired after the context manager exits.""" + monkeypatch.setenv("HOME", str(tmp_path)) + + with mine_global_lock(): + pass # first acquire + release + + # Second acquire must succeed; MineAlreadyRunning would propagate as failure. + with mine_global_lock(): + pass + + +def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch): + """Lock is released even when the body raises.""" + monkeypatch.setenv("HOME", str(tmp_path)) + + with pytest.raises(ValueError): + with mine_global_lock(): + raise ValueError("boom") + + # Must be acquirable again after the exception. + with mine_global_lock(): + pass From 99b820cb42d9488e6c08dea2186d87f6b3621ef5 Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 16:03:07 -0300 Subject: [PATCH 2/6] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?per-palace=20lock,=20MCP=20server=20path,=20hook=20timeout,=20t?= =?UTF-8?q?ests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the six Copilot review comments on the initial commit. 1) #6 (critical) — mcp_server.py `_get_collection` bypassed ChromaBackend The MCP server creates its palace collection directly via `chromadb.PersistentClient.get_or_create_collection` in `_get_collection`, not through `ChromaBackend.get_collection`. That path was missing the `hnsw:num_threads=1` metadata, so the primary crash surface for #974 and #965 was untouched by the original patch. Fixed by passing `hnsw:num_threads=1` at the mcp_server create site too. Documented in a code comment that the setting is only honored at creation time — existing palaces created before this fix still need a `mempalace nuke` + re-mine to gain the protection. 2) #3 — mine_global_lock over-serialized mines across unrelated palaces Replaced the single global lock file `mine_global.lock` with a per-palace lock keyed by `sha256(os.path.abspath(palace_path))` (`mine_palace_.lock`). Mines against the same palace still collapse to a single runner (the correctness boundary), but mines against *different* palaces are now free to run in parallel. `mine_global_lock` is kept as a backward-compatible alias for `mine_palace_lock` so any external callers that imported the previous name keep working. 3) #1 — hook_precompact swallowed OSError but not subprocess.TimeoutExpired `subprocess.run(..., timeout=60)` raises `TimeoutExpired` on slow palaces. The previous `except OSError` clause didn't catch it, so the hook could raise and fail to emit any JSON decision — leaving the harness without a block/passthrough signal. Fixed by catching `(OSError, subprocess.TimeoutExpired)` together and always falling through to the block decision so the hook reliably emits a response. 4) #2 + #4 — tests - tests/test_hooks_cli.py: added `test_precompact_first_two_attempts_block`, `test_precompact_passes_through_after_cap`, and `test_precompact_counter_is_per_session` to lock in the #955 deadlock fix. - tests/test_palace_locks.py (new): covers `mine_palace_lock` single-acquire, reuse-after-release, cross-process serialization on the same palace, non-interference across different palaces, path normalization, and the `mine_global_lock` back-compat alias. 5) #5 — known limitation, documented but not auto-fixed Copilot suggested detecting collections missing `hnsw:num_threads=1` and calling `collection.modify(metadata=...)` to retrofit existing palaces. Verified against chromadb 1.5.7: `modify(metadata=...)` replaces metadata rather than merging, and re-passing `hnsw:space="cosine"` then raises `ValueError: Changing the distance function of a collection once it is created is not supported currently.` The HNSW runtime configuration (`configuration_json`) also does not expose `num_threads` in chromadb 1.5.x, so the flag appears to be read only at creation time. Rather than paper over the limitation with a best-effort `modify` that silently drops `hnsw:space`, documented in the mcp_server comment that pre-existing palaces need a `mempalace nuke` + re-mine to gain the protection. Fresh palaces are always protected. Testing - pytest tests/test_palace_locks.py tests/test_hooks_cli.py tests/test_backends.py tests/test_cli.py → **98 passed, 0 failed**. - Runtime validation with two concurrent `mempalace mine` calls: - Different palaces → both complete in parallel ✓ - Same palace → one completes, the other exits with "another `mine` is already running against — exiting cleanly." ✓ --- mempalace/mcp_server.py | 9 +- mempalace/miner.py | 7 +- mempalace/palace.py | 34 +++++--- tests/test_hooks_cli.py | 80 +++++++++++++++++ tests/test_palace_locks.py | 173 ++++++++++++++++++++++++++++--------- 5 files changed, 245 insertions(+), 58 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 2650e30..beb870d 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -217,9 +217,16 @@ def _get_collection(create=False): try: client = _get_client() if create: + # hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor + # HNSW insert path, which has a race in repairConnectionsForUpdate / + # addPoint (see issues #974, #965). The setting is only honored at + # collection creation time — pre-existing palaces created before + # this fix keep the unsafe default; users must `mempalace nuke` + + # re-mine to get the protection on legacy palaces. _collection_cache = ChromaCollection( client.get_or_create_collection( - _config.collection_name, metadata={"hnsw:space": "cosine"} + _config.collection_name, + metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) ) _metadata_cache = None diff --git a/mempalace/miner.py b/mempalace/miner.py index 2fde777..a4219ae 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -25,8 +25,8 @@ from .palace import ( file_already_mined, get_closets_collection, get_collection, - mine_global_lock, mine_lock, + mine_palace_lock, purge_file_closets, upsert_closet_lines, ) @@ -1008,7 +1008,7 @@ def mine( ) try: - with mine_global_lock(): + with mine_palace_lock(palace_path): return _mine_impl( project_dir, palace_path, @@ -1021,7 +1021,8 @@ def mine( ) except MineAlreadyRunning: print( - "mempalace: another `mine` is already running — exiting cleanly.", + f"mempalace: another `mine` is already running against " + f"{palace_path} — exiting cleanly.", file=sys.stderr, ) return diff --git a/mempalace/palace.py b/mempalace/palace.py index 76a037e..917c76d 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -311,27 +311,33 @@ def mine_lock(source_file: str): class MineAlreadyRunning(RuntimeError): - """Raised when another `mempalace mine` process already holds the global lock.""" + """Raised when another `mempalace mine` already holds the per-palace lock.""" @contextlib.contextmanager -def mine_global_lock(): - """Process-wide non-blocking lock around the full `mine` pipeline. +def mine_palace_lock(palace_path: str): + """Per-palace non-blocking lock around the full `mine` pipeline. The per-file `mine_lock` only protects delete+insert interleave for a single source; it does not prevent N copies of `mempalace mine ` from being spawned concurrently by hooks. When that happens, each copy - drives ChromaDB HNSW inserts in parallel, which (combined with - chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and - produce sparse link_lists.bin blowups. + drives ChromaDB HNSW inserts in parallel against the same palace, + which (combined with chromadb's multi-threaded ParallelFor) can + corrupt the HNSW graph and produce sparse link_lists.bin blowups. - This lock is non-blocking: if another `mine` is already running, we + The lock file is keyed by sha256(palace_path) so mines against + *different* palaces can still run in parallel — we only serialize + writes into the same palace, which is the correctness boundary. + + Non-blocking: if another `mine` is already writing to this palace, raise MineAlreadyRunning so the caller can exit cleanly instead of - piling up waiting workers. + piling up as a waiting worker. """ lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") os.makedirs(lock_dir, exist_ok=True) - lock_path = os.path.join(lock_dir, "mine_global.lock") + resolved = os.path.abspath(os.path.expanduser(palace_path)) + palace_key = hashlib.sha256(resolved.encode()).hexdigest()[:16] + lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") lf = open(lock_path, "w") acquired = False @@ -344,7 +350,7 @@ def mine_global_lock(): acquired = True except OSError as exc: raise MineAlreadyRunning( - "another `mempalace mine` is already running" + f"another `mempalace mine` is already running against {resolved}" ) from exc else: import fcntl @@ -354,7 +360,7 @@ def mine_global_lock(): acquired = True except BlockingIOError as exc: raise MineAlreadyRunning( - "another `mempalace mine` is already running" + f"another `mempalace mine` is already running against {resolved}" ) from exc yield finally: @@ -373,6 +379,12 @@ def mine_global_lock(): lf.close() +# Backward-compatible alias (previous patch iteration used a single global +# lock). Kept so third-party callers that imported it continue to work; new +# code should use `mine_palace_lock(palace_path)` for per-palace scoping. +mine_global_lock = mine_palace_lock + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/tests/test_hooks_cli.py b/tests/test_hooks_cli.py index c9a0022..a406164 100644 --- a/tests/test_hooks_cli.py +++ b/tests/test_hooks_cli.py @@ -9,6 +9,7 @@ from unittest.mock import MagicMock, patch import pytest from mempalace.hooks_cli import ( + MAX_PRECOMPACT_BLOCK_ATTEMPTS, SAVE_INTERVAL, _count_human_messages, _extract_recent_messages, @@ -59,6 +60,85 @@ def test_sanitize_empty_returns_unknown(): assert _sanitize_session_id("!!!") == "unknown" +# --- hook_precompact attempt cap (regression for #955 deadlock fix) --- + + +def _call_precompact(session_id: str) -> dict: + """Invoke hook_precompact with a deterministic session_id, capture stdout. + + Returns the parsed JSON decision emitted by the hook. + """ + stdout = io.StringIO() + with contextlib.redirect_stdout(stdout): + hook_precompact({"session_id": session_id}, "claude-code") + raw = stdout.getvalue().strip() + return json.loads(raw) if raw else {} + + +def test_precompact_first_two_attempts_block(tmp_path, monkeypatch): + """First MAX_PRECOMPACT_BLOCK_ATTEMPTS calls must block with a reason.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid = "test-session-block" + for i in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + decision = _call_precompact(sid) + assert decision.get("decision") == "block", ( + f"attempt {i + 1}/{MAX_PRECOMPACT_BLOCK_ATTEMPTS}: expected block, " + f"got {decision}" + ) + assert decision.get("reason") == PRECOMPACT_BLOCK_REASON + + +def test_precompact_passes_through_after_cap(tmp_path, monkeypatch): + """After the cap is reached, the hook must stop blocking (fix for #955).""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid = "test-session-passthrough" + for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + _call_precompact(sid) # exhaust the budget + + # Next call must pass through (empty JSON decision) + decision = _call_precompact(sid) + assert decision == {}, ( + f"after {MAX_PRECOMPACT_BLOCK_ATTEMPTS} attempts, hook must pass " + f"through to avoid deadlock; got {decision}" + ) + + +def test_precompact_counter_is_per_session(tmp_path, monkeypatch): + """A fresh session_id must get a fresh attempt budget.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid_a = "session-a" + sid_b = "session-b" + + # Exhaust session A + for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + _call_precompact(sid_a) + assert _call_precompact(sid_a) == {} # A is done blocking + + # Session B must still block on its first call — isolation between sessions + assert _call_precompact(sid_b).get("decision") == "block" + + # --- _count_human_messages --- diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index 5b94a5d..a7596b9 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -1,59 +1,146 @@ -"""Tests for mine_global_lock: non-blocking cross-process lock semantics.""" -import threading +"""Tests for mine_palace_lock — the per-palace non-blocking mine guard. + +Covers the fix for the runaway mine fan-out described alongside issues +#974 and #965: if N copies of `mempalace mine` are spawned concurrently +against the same palace, they must collapse to a single runner rather +than queue as waiters that will drive parallel HNSW inserts. Mines +against *different* palaces must still be free to run in parallel. +""" + +from __future__ import annotations + +import multiprocessing +import os +import time import pytest -from mempalace.palace import MineAlreadyRunning, mine_global_lock +from mempalace.palace import ( + MineAlreadyRunning, + mine_global_lock, + mine_palace_lock, +) -def test_mine_global_lock_acquired(tmp_path, monkeypatch): - """Lock is acquired and released without error.""" +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int: + """Acquire mine_palace_lock, signal readiness, wait for release flag. + + Returns 0 if we acquired the lock, 1 if MineAlreadyRunning was raised. + Runs in a child process for true cross-process locking semantics. + """ + try: + with mine_palace_lock(palace_path): + # Tell the parent we hold the lock + open(ready_flag, "w").close() + # Wait until parent tells us to release + for _ in range(500): + if os.path.exists(release_flag): + return 0 + time.sleep(0.01) + return 0 + except MineAlreadyRunning: + return 1 + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_single_acquire_succeeds(tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) - with mine_global_lock(): + with mine_palace_lock(str(tmp_path / "palace")): pass # should not raise -def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch): - """Concurrent second acquire raises MineAlreadyRunning.""" +def test_lock_reusable_after_release(tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) - results: list[str] = [] - - with mine_global_lock(): - # While this lock is held, spawn a thread that tries to acquire. - def try_acquire(): - try: - with mine_global_lock(): - results.append("acquired") - except MineAlreadyRunning: - results.append("blocked") - - t = threading.Thread(target=try_acquire) - t.start() - t.join(timeout=5) - - assert results == ["blocked"] - - -def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch): - """Lock can be re-acquired after the context manager exits.""" - monkeypatch.setenv("HOME", str(tmp_path)) - - with mine_global_lock(): - pass # first acquire + release - - # Second acquire must succeed; MineAlreadyRunning would propagate as failure. - with mine_global_lock(): + palace = str(tmp_path / "palace") + with mine_palace_lock(palace): + pass + # Re-acquire must succeed now that the previous holder released + with mine_palace_lock(palace): pass -def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch): - """Lock is released even when the body raises.""" +def test_same_palace_serializes_across_processes(tmp_path, monkeypatch): + """Two processes contending for the same palace: second must be rejected.""" monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") - with pytest.raises(ValueError): - with mine_global_lock(): - raise ValueError("boom") + ctx = multiprocessing.get_context("fork") + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + # Wait for the holder to acquire + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" - # Must be acquirable again after the exception. - with mine_global_lock(): - pass + # From the parent, we must not be able to acquire the same palace lock + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(palace): + pytest.fail("second acquire of same palace should have raised") + finally: + open(release, "w").close() + holder.join(timeout=5) + assert holder.exitcode == 0 + + +def test_different_palaces_dont_conflict(tmp_path, monkeypatch): + """Mines against different palaces must NOT block each other.""" + monkeypatch.setenv("HOME", str(tmp_path)) + palace_a = str(tmp_path / "palace_a") + palace_b = str(tmp_path / "palace_b") + ready = str(tmp_path / "ready_a") + release = str(tmp_path / "release_a") + + ctx = multiprocessing.get_context("fork") + holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # Different palace — must succeed even while palace_a is held + with mine_palace_lock(palace_b): + pass # no exception expected + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_palace_path_is_normalized(tmp_path, monkeypatch): + """Relative and absolute forms of the same path must use the same lock.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.chdir(tmp_path) + os.makedirs(tmp_path / "palace", exist_ok=True) + absolute = str(tmp_path / "palace") + relative = "palace" + + # Hold the lock with the absolute form; attempting to re-acquire with + # the relative form (which resolves to the same absolute path) must fail. + with mine_palace_lock(absolute): + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(relative): + pytest.fail("normalized path collision should have raised") + + +def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch): + """Old callers of `mine_global_lock` should still work.""" + monkeypatch.setenv("HOME", str(tmp_path)) + assert mine_global_lock is mine_palace_lock + with mine_global_lock(str(tmp_path / "palace")): + pass # the alias accepts the same palace_path argument From 1998aede66053a9d54421f0c5c533ee89a617e89 Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 16:26:11 -0300 Subject: [PATCH 3/6] fix: Windows CI compat for palace lock tests and path normalization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the two actionable Copilot comments from the 2nd review pass. tests/test_palace_locks.py (#7, #8) multiprocessing.get_context("fork") is unavailable on Windows, so the cross-process tests would crash the Windows CI runner. Added `_get_mp_context()` that picks "spawn" on Windows and "fork" elsewhere. Spawn re-imports the module in the child; it inherits os.environ (including the monkeypatched HOME), which is all these tests need. mempalace/palace.py (#10) The per-palace lock key was computed from os.path.abspath(palace_path). On Windows the filesystem is case-insensitive, so `C:\\Palace` and `c:\\palace` would hash to different keys and two concurrent mines could touch the same on-disk palace. Switched to `os.path.normcase(os.path.realpath(...))` so: * realpath resolves symlinks and `..` segments * normcase folds case on Windows (no-op on POSIX) Testing pytest tests/test_palace_locks.py tests/test_hooks_cli.py tests/test_backends.py tests/test_cli.py → 98 passed, 0 failed. --- mempalace/palace.py | 11 +++++++++-- tests/test_palace_locks.py | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/mempalace/palace.py b/mempalace/palace.py index 917c76d..07efb6a 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -329,14 +329,21 @@ def mine_palace_lock(palace_path: str): *different* palaces can still run in parallel — we only serialize writes into the same palace, which is the correctness boundary. + The key is derived from a fully normalized form of the path: + `realpath` resolves symlinks and `..` segments, and `normcase` folds + case on Windows (which has a case-insensitive filesystem). Without + normcase, `C:\\Palace` and `c:\\palace` would hash to different keys + on Windows and let two concurrent mines touch the same on-disk palace. + Non-blocking: if another `mine` is already writing to this palace, raise MineAlreadyRunning so the caller can exit cleanly instead of piling up as a waiting worker. """ lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") os.makedirs(lock_dir, exist_ok=True) - resolved = os.path.abspath(os.path.expanduser(palace_path)) - palace_key = hashlib.sha256(resolved.encode()).hexdigest()[:16] + resolved = os.path.realpath(os.path.expanduser(palace_path)) + lock_key_source = os.path.normcase(resolved) + palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16] lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") lf = open(lock_path, "w") diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index a7596b9..601c894 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -22,6 +22,18 @@ from mempalace.palace import ( ) +def _get_mp_context(): + """Pick a start method that works on every CI runner. + + `fork` is cheaper (no re-import) but is unavailable on Windows, so we fall + back to `spawn` there. `spawn` inherits ``os.environ`` (including the + monkeypatched ``HOME``) and re-imports the ``mempalace`` package in the + child, which is sufficient for the lock-file semantics exercised here. + """ + start_method = "spawn" if os.name == "nt" else "fork" + return multiprocessing.get_context(start_method) + + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -75,7 +87,7 @@ def test_same_palace_serializes_across_processes(tmp_path, monkeypatch): ready = str(tmp_path / "ready") release = str(tmp_path / "release") - ctx = multiprocessing.get_context("fork") + ctx = _get_mp_context() holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) holder.start() try: @@ -104,7 +116,7 @@ def test_different_palaces_dont_conflict(tmp_path, monkeypatch): ready = str(tmp_path / "ready_a") release = str(tmp_path / "release_a") - ctx = multiprocessing.get_context("fork") + ctx = _get_mp_context() holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release)) holder.start() try: From 40d7958ca1a0a67eceb580e3f0010b9eb64414a6 Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 17:17:36 -0300 Subject: [PATCH 4/6] test: remove attempt-cap tests obsoleted by develop's pass-through approach PR #863 on develop eliminated precompact blocking entirely. After rebasing, the attempt-cap tests (test_precompact_first_two_attempts_block, test_precompact_passes_through_after_cap, test_precompact_counter_is_per_session) would always fail because hook_precompact now mines synchronously and passes through unconditionally. Remove them to keep the suite green. Co-Authored-By: Claude Sonnet 4.6 --- tests/test_hooks_cli.py | 80 ----------------------------------------- 1 file changed, 80 deletions(-) diff --git a/tests/test_hooks_cli.py b/tests/test_hooks_cli.py index a406164..c9a0022 100644 --- a/tests/test_hooks_cli.py +++ b/tests/test_hooks_cli.py @@ -9,7 +9,6 @@ from unittest.mock import MagicMock, patch import pytest from mempalace.hooks_cli import ( - MAX_PRECOMPACT_BLOCK_ATTEMPTS, SAVE_INTERVAL, _count_human_messages, _extract_recent_messages, @@ -60,85 +59,6 @@ def test_sanitize_empty_returns_unknown(): assert _sanitize_session_id("!!!") == "unknown" -# --- hook_precompact attempt cap (regression for #955 deadlock fix) --- - - -def _call_precompact(session_id: str) -> dict: - """Invoke hook_precompact with a deterministic session_id, capture stdout. - - Returns the parsed JSON decision emitted by the hook. - """ - stdout = io.StringIO() - with contextlib.redirect_stdout(stdout): - hook_precompact({"session_id": session_id}, "claude-code") - raw = stdout.getvalue().strip() - return json.loads(raw) if raw else {} - - -def test_precompact_first_two_attempts_block(tmp_path, monkeypatch): - """First MAX_PRECOMPACT_BLOCK_ATTEMPTS calls must block with a reason.""" - monkeypatch.setenv("HOME", str(tmp_path)) - monkeypatch.delenv("MEMPAL_DIR", raising=False) - - import mempalace.hooks_cli as hooks_cli - monkeypatch.setattr( - hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False - ) - - sid = "test-session-block" - for i in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): - decision = _call_precompact(sid) - assert decision.get("decision") == "block", ( - f"attempt {i + 1}/{MAX_PRECOMPACT_BLOCK_ATTEMPTS}: expected block, " - f"got {decision}" - ) - assert decision.get("reason") == PRECOMPACT_BLOCK_REASON - - -def test_precompact_passes_through_after_cap(tmp_path, monkeypatch): - """After the cap is reached, the hook must stop blocking (fix for #955).""" - monkeypatch.setenv("HOME", str(tmp_path)) - monkeypatch.delenv("MEMPAL_DIR", raising=False) - - import mempalace.hooks_cli as hooks_cli - monkeypatch.setattr( - hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False - ) - - sid = "test-session-passthrough" - for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): - _call_precompact(sid) # exhaust the budget - - # Next call must pass through (empty JSON decision) - decision = _call_precompact(sid) - assert decision == {}, ( - f"after {MAX_PRECOMPACT_BLOCK_ATTEMPTS} attempts, hook must pass " - f"through to avoid deadlock; got {decision}" - ) - - -def test_precompact_counter_is_per_session(tmp_path, monkeypatch): - """A fresh session_id must get a fresh attempt budget.""" - monkeypatch.setenv("HOME", str(tmp_path)) - monkeypatch.delenv("MEMPAL_DIR", raising=False) - - import mempalace.hooks_cli as hooks_cli - monkeypatch.setattr( - hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False - ) - - sid_a = "session-a" - sid_b = "session-b" - - # Exhaust session A - for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): - _call_precompact(sid_a) - assert _call_precompact(sid_a) == {} # A is done blocking - - # Session B must still block on its first call — isolation between sessions - assert _call_precompact(sid_b).get("decision") == "block" - - # --- _count_human_messages --- From 8df944a54d9ff452f8926d5e1b647f20daa348fe Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 20:04:37 -0300 Subject: [PATCH 5/6] fix: best-effort HNSW thread-pin retrofit + drop dead attempt-cap constant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses remaining PR #976 review items after rebase on develop. `get_collection(create=False)` previously returned existing collections without re-applying `hnsw:num_threads=1`, so palaces created before the fix kept the unsafe parallel-insert path. Add `_pin_hnsw_threads()` helper that calls `collection.modify(configuration=UpdateCollectionConfiguration( hnsw=UpdateHNSWConfiguration(num_threads=1)))` best-effort on every `get_collection` call (including the MCP server's `_get_collection`). In chromadb 1.5.x the runtime config does not persist to disk across `PersistentClient` reopens, so the retrofit is re-applied each process start rather than being a one-shot migration. Fresh palaces keep the metadata-based pin as primary defense; legacy palaces now also get per-session protection without requiring `mempalace nuke` + re-mine. After the rebase on develop, `hook_precompact` delegates to `_mine_sync` and no longer emits `decision: block`, so the attempt-cap constant was orphaned. Grep confirms 0 usages in the repo — remove it. - `_pin_hnsw_threads` retrofits legacy collection (num_threads None -> 1) - `_pin_hnsw_threads` swallows all errors (never raises) - `ChromaBackend.get_collection(create=False)` applies retrofit on legacy palace - 62 tests pass (10 backends + 6 palace locks + 46 hooks_cli) --- mempalace/backends/chroma.py | 32 +++++++++++++++++++++ mempalace/hooks_cli.py | 3 -- mempalace/mcp_server.py | 25 +++++++++-------- tests/test_backends.py | 54 ++++++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 14 deletions(-) diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index c36e2c7..f701b17 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -130,6 +130,37 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 3600.0) -> li return moved +def _pin_hnsw_threads(collection) -> None: + """Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection. + + Fresh collections set this via ``metadata=`` at creation. Legacy palaces + built before that change keep the default (parallel insert) and can hit + the HNSW race described in #974/#965. ChromaDB's + ``collection.modify(configuration=...)`` lets us re-apply ``num_threads=1`` + in memory at load time so every new process is protected. + + Note: in chromadb 1.5.x the modified ``configuration_json["hnsw"]`` does + not persist to disk across ``PersistentClient`` reopens, so this must + run on every ``get_collection`` call, not just once. + """ + try: + from chromadb.api.collection_configuration import ( + UpdateCollectionConfiguration, + UpdateHNSWConfiguration, + ) + except ImportError: + logger.debug("_pin_hnsw_threads skipped: chromadb too old", exc_info=True) + return + try: + collection.modify( + configuration=UpdateCollectionConfiguration( + hnsw=UpdateHNSWConfiguration(num_threads=1) + ) + ) + except Exception: + logger.debug("_pin_hnsw_threads modify failed", exc_info=True) + + def _fix_blob_seq_ids(palace_path: str) -> None: """Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER. @@ -572,6 +603,7 @@ class ChromaBackend(BaseBackend): ) else: collection = client.get_collection(collection_name, **ef_kwargs) + _pin_hnsw_threads(collection) return ChromaCollection(collection) def close_palace(self, palace) -> None: diff --git a/mempalace/hooks_cli.py b/mempalace/hooks_cli.py index bdcc97d..01eca3f 100644 --- a/mempalace/hooks_cli.py +++ b/mempalace/hooks_cli.py @@ -643,9 +643,6 @@ def hook_session_start(data: dict, harness: str): _output({}) -MAX_PRECOMPACT_BLOCK_ATTEMPTS = 2 - - def hook_precompact(data: dict, harness: str): """Precompact hook: mine transcript synchronously, then allow compaction.""" parsed = _parse_harness_input(data, harness) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index beb870d..485bbe5 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -57,7 +57,7 @@ from .config import ( # noqa: E402 sanitize_content, ) from .version import __version__ # noqa: E402 -from .backends.chroma import ChromaBackend, ChromaCollection # noqa: E402 +from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402 from .query_sanitizer import sanitize_query # noqa: E402 from .searcher import search_memories # noqa: E402 from .palace_graph import ( # noqa: E402 @@ -219,20 +219,23 @@ def _get_collection(create=False): if create: # hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor # HNSW insert path, which has a race in repairConnectionsForUpdate / - # addPoint (see issues #974, #965). The setting is only honored at - # collection creation time — pre-existing palaces created before - # this fix keep the unsafe default; users must `mempalace nuke` + - # re-mine to get the protection on legacy palaces. - _collection_cache = ChromaCollection( - client.get_or_create_collection( - _config.collection_name, - metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, - ) + # addPoint (see issues #974, #965). Set via metadata on fresh + # collections and re-applied via _pin_hnsw_threads() for legacy + # palaces whose collections were created before this fix (the + # runtime config does not persist cross-process in chromadb 1.5.x, + # so the retrofit runs every time _get_collection opens a cache). + raw = client.get_or_create_collection( + _config.collection_name, + metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) + _pin_hnsw_threads(raw) + _collection_cache = ChromaCollection(raw) _metadata_cache = None _metadata_cache_time = 0 elif _collection_cache is None: - _collection_cache = ChromaCollection(client.get_collection(_config.collection_name)) + raw = client.get_collection(_config.collection_name) + _pin_hnsw_threads(raw) + _collection_cache = ChromaCollection(raw) _metadata_cache = None _metadata_cache_time = 0 return _collection_cache diff --git a/tests/test_backends.py b/tests/test_backends.py index b3f009a..780671b 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -16,6 +16,7 @@ from mempalace.backends.chroma import ( ChromaBackend, ChromaCollection, _fix_blob_seq_ids, + _pin_hnsw_threads, quarantine_stale_hnsw, ) @@ -443,3 +444,56 @@ def test_quarantine_stale_hnsw_skips_already_quarantined(tmp_path): moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0) assert moved == [] assert drift.exists() + + +# ── _pin_hnsw_threads ───────────────────────────────────────────────────── + + +def test_pin_hnsw_threads_retrofits_legacy_collection(tmp_path): + """Legacy collections (created without num_threads) get the retrofit applied.""" + palace_path = tmp_path / "legacy-palace" + palace_path.mkdir() + + client = chromadb.PersistentClient(path=str(palace_path)) + col = client.create_collection( + "mempalace_drawers", + metadata={"hnsw:space": "cosine"}, # no num_threads — legacy + ) + assert col.configuration_json.get("hnsw", {}).get("num_threads") is None + + _pin_hnsw_threads(col) + + assert col.configuration_json["hnsw"]["num_threads"] == 1 + + +def test_pin_hnsw_threads_swallows_all_errors(): + """Retrofit never raises even when collection.modify explodes.""" + + class _ExplodingCollection: + def modify(self, *args, **kwargs): + raise RuntimeError("boom") + + _pin_hnsw_threads(_ExplodingCollection()) # must not raise + + +def test_get_collection_applies_retrofit_on_existing_palace(tmp_path): + """ChromaBackend.get_collection(create=False) applies the retrofit.""" + palace_path = tmp_path / "palace" + palace_path.mkdir() + + # Simulate a legacy palace: create collection without num_threads + bootstrap_client = chromadb.PersistentClient(path=str(palace_path)) + bootstrap_client.create_collection( + "mempalace_drawers", metadata={"hnsw:space": "cosine"} + ) + del bootstrap_client # drop reference so a fresh client reopens cleanly + + wrapper = ChromaBackend().get_collection( + str(palace_path), + collection_name="mempalace_drawers", + create=False, + ) + + assert ( + wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1 + ) From 7773432bcac3c1472e17d3c62d0e94d344fd6617 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Sat, 25 Apr 2026 04:39:31 -0300 Subject: [PATCH 6/6] chore(rebase): reconcile with develop and apply ruff format After rebasing onto current develop: - chroma.py: keep develop's quarantine_stale_hnsw + UnsupportedFilterError validation alongside this PR's _pin_hnsw_threads retrofit. - tests/test_backends.py: combine quarantine_stale_hnsw and _pin_hnsw_threads test sections; ruff format. - miner.py: propagate the new `files=` kwarg (added on develop in #1183 for the init -> mine flow) through _mine_impl so the caller can pass a pre-scanned file list under the global lock. --- mempalace/backends/chroma.py | 4 +--- mempalace/miner.py | 3 +++ tests/test_backends.py | 8 ++------ 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index f701b17..c8d2f46 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -153,9 +153,7 @@ def _pin_hnsw_threads(collection) -> None: return try: collection.modify( - configuration=UpdateCollectionConfiguration( - hnsw=UpdateHNSWConfiguration(num_threads=1) - ) + configuration=UpdateCollectionConfiguration(hnsw=UpdateHNSWConfiguration(num_threads=1)) ) except Exception: logger.debug("_pin_hnsw_threads modify failed", exc_info=True) diff --git a/mempalace/miner.py b/mempalace/miner.py index a4219ae..2d610ea 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -1005,6 +1005,7 @@ def mine( dry_run=dry_run, respect_gitignore=respect_gitignore, include_ignored=include_ignored, + files=files, ) try: @@ -1018,6 +1019,7 @@ def mine( dry_run=dry_run, respect_gitignore=respect_gitignore, include_ignored=include_ignored, + files=files, ) except MineAlreadyRunning: print( @@ -1037,6 +1039,7 @@ def _mine_impl( dry_run: bool = False, respect_gitignore: bool = True, include_ignored: list = None, + files: list = None, ): project_path = Path(project_dir).expanduser().resolve() config = load_config(project_dir) diff --git a/tests/test_backends.py b/tests/test_backends.py index 780671b..e47eb6f 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -483,9 +483,7 @@ def test_get_collection_applies_retrofit_on_existing_palace(tmp_path): # Simulate a legacy palace: create collection without num_threads bootstrap_client = chromadb.PersistentClient(path=str(palace_path)) - bootstrap_client.create_collection( - "mempalace_drawers", metadata={"hnsw:space": "cosine"} - ) + bootstrap_client.create_collection("mempalace_drawers", metadata={"hnsw:space": "cosine"}) del bootstrap_client # drop reference so a fresh client reopens cleanly wrapper = ChromaBackend().get_collection( @@ -494,6 +492,4 @@ def test_get_collection_applies_retrofit_on_existing_palace(tmp_path): create=False, ) - assert ( - wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1 - ) + assert wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1