diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 2650e30..beb870d 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -217,9 +217,16 @@ def _get_collection(create=False): try: client = _get_client() if create: + # hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor + # HNSW insert path, which has a race in repairConnectionsForUpdate / + # addPoint (see issues #974, #965). The setting is only honored at + # collection creation time — pre-existing palaces created before + # this fix keep the unsafe default; users must `mempalace nuke` + + # re-mine to get the protection on legacy palaces. _collection_cache = ChromaCollection( client.get_or_create_collection( - _config.collection_name, metadata={"hnsw:space": "cosine"} + _config.collection_name, + metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) ) _metadata_cache = None diff --git a/mempalace/miner.py b/mempalace/miner.py index 2fde777..a4219ae 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -25,8 +25,8 @@ from .palace import ( file_already_mined, get_closets_collection, get_collection, - mine_global_lock, mine_lock, + mine_palace_lock, purge_file_closets, upsert_closet_lines, ) @@ -1008,7 +1008,7 @@ def mine( ) try: - with mine_global_lock(): + with mine_palace_lock(palace_path): return _mine_impl( project_dir, palace_path, @@ -1021,7 +1021,8 @@ def mine( ) except MineAlreadyRunning: print( - "mempalace: another `mine` is already running — exiting cleanly.", + f"mempalace: another `mine` is already running against " + f"{palace_path} — exiting cleanly.", file=sys.stderr, ) return diff --git a/mempalace/palace.py b/mempalace/palace.py index 76a037e..917c76d 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -311,27 +311,33 @@ def mine_lock(source_file: str): class MineAlreadyRunning(RuntimeError): - """Raised when another `mempalace mine` process already holds the global lock.""" + """Raised when another `mempalace mine` already holds the per-palace lock.""" @contextlib.contextmanager -def mine_global_lock(): - """Process-wide non-blocking lock around the full `mine` pipeline. +def mine_palace_lock(palace_path: str): + """Per-palace non-blocking lock around the full `mine` pipeline. The per-file `mine_lock` only protects delete+insert interleave for a single source; it does not prevent N copies of `mempalace mine ` from being spawned concurrently by hooks. When that happens, each copy - drives ChromaDB HNSW inserts in parallel, which (combined with - chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and - produce sparse link_lists.bin blowups. + drives ChromaDB HNSW inserts in parallel against the same palace, + which (combined with chromadb's multi-threaded ParallelFor) can + corrupt the HNSW graph and produce sparse link_lists.bin blowups. - This lock is non-blocking: if another `mine` is already running, we + The lock file is keyed by sha256(palace_path) so mines against + *different* palaces can still run in parallel — we only serialize + writes into the same palace, which is the correctness boundary. + + Non-blocking: if another `mine` is already writing to this palace, raise MineAlreadyRunning so the caller can exit cleanly instead of - piling up waiting workers. + piling up as a waiting worker. """ lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") os.makedirs(lock_dir, exist_ok=True) - lock_path = os.path.join(lock_dir, "mine_global.lock") + resolved = os.path.abspath(os.path.expanduser(palace_path)) + palace_key = hashlib.sha256(resolved.encode()).hexdigest()[:16] + lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") lf = open(lock_path, "w") acquired = False @@ -344,7 +350,7 @@ def mine_global_lock(): acquired = True except OSError as exc: raise MineAlreadyRunning( - "another `mempalace mine` is already running" + f"another `mempalace mine` is already running against {resolved}" ) from exc else: import fcntl @@ -354,7 +360,7 @@ def mine_global_lock(): acquired = True except BlockingIOError as exc: raise MineAlreadyRunning( - "another `mempalace mine` is already running" + f"another `mempalace mine` is already running against {resolved}" ) from exc yield finally: @@ -373,6 +379,12 @@ def mine_global_lock(): lf.close() +# Backward-compatible alias (previous patch iteration used a single global +# lock). Kept so third-party callers that imported it continue to work; new +# code should use `mine_palace_lock(palace_path)` for per-palace scoping. +mine_global_lock = mine_palace_lock + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/tests/test_hooks_cli.py b/tests/test_hooks_cli.py index c9a0022..a406164 100644 --- a/tests/test_hooks_cli.py +++ b/tests/test_hooks_cli.py @@ -9,6 +9,7 @@ from unittest.mock import MagicMock, patch import pytest from mempalace.hooks_cli import ( + MAX_PRECOMPACT_BLOCK_ATTEMPTS, SAVE_INTERVAL, _count_human_messages, _extract_recent_messages, @@ -59,6 +60,85 @@ def test_sanitize_empty_returns_unknown(): assert _sanitize_session_id("!!!") == "unknown" +# --- hook_precompact attempt cap (regression for #955 deadlock fix) --- + + +def _call_precompact(session_id: str) -> dict: + """Invoke hook_precompact with a deterministic session_id, capture stdout. + + Returns the parsed JSON decision emitted by the hook. + """ + stdout = io.StringIO() + with contextlib.redirect_stdout(stdout): + hook_precompact({"session_id": session_id}, "claude-code") + raw = stdout.getvalue().strip() + return json.loads(raw) if raw else {} + + +def test_precompact_first_two_attempts_block(tmp_path, monkeypatch): + """First MAX_PRECOMPACT_BLOCK_ATTEMPTS calls must block with a reason.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid = "test-session-block" + for i in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + decision = _call_precompact(sid) + assert decision.get("decision") == "block", ( + f"attempt {i + 1}/{MAX_PRECOMPACT_BLOCK_ATTEMPTS}: expected block, " + f"got {decision}" + ) + assert decision.get("reason") == PRECOMPACT_BLOCK_REASON + + +def test_precompact_passes_through_after_cap(tmp_path, monkeypatch): + """After the cap is reached, the hook must stop blocking (fix for #955).""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid = "test-session-passthrough" + for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + _call_precompact(sid) # exhaust the budget + + # Next call must pass through (empty JSON decision) + decision = _call_precompact(sid) + assert decision == {}, ( + f"after {MAX_PRECOMPACT_BLOCK_ATTEMPTS} attempts, hook must pass " + f"through to avoid deadlock; got {decision}" + ) + + +def test_precompact_counter_is_per_session(tmp_path, monkeypatch): + """A fresh session_id must get a fresh attempt budget.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid_a = "session-a" + sid_b = "session-b" + + # Exhaust session A + for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + _call_precompact(sid_a) + assert _call_precompact(sid_a) == {} # A is done blocking + + # Session B must still block on its first call — isolation between sessions + assert _call_precompact(sid_b).get("decision") == "block" + + # --- _count_human_messages --- diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index 5b94a5d..a7596b9 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -1,59 +1,146 @@ -"""Tests for mine_global_lock: non-blocking cross-process lock semantics.""" -import threading +"""Tests for mine_palace_lock — the per-palace non-blocking mine guard. + +Covers the fix for the runaway mine fan-out described alongside issues +#974 and #965: if N copies of `mempalace mine` are spawned concurrently +against the same palace, they must collapse to a single runner rather +than queue as waiters that will drive parallel HNSW inserts. Mines +against *different* palaces must still be free to run in parallel. +""" + +from __future__ import annotations + +import multiprocessing +import os +import time import pytest -from mempalace.palace import MineAlreadyRunning, mine_global_lock +from mempalace.palace import ( + MineAlreadyRunning, + mine_global_lock, + mine_palace_lock, +) -def test_mine_global_lock_acquired(tmp_path, monkeypatch): - """Lock is acquired and released without error.""" +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int: + """Acquire mine_palace_lock, signal readiness, wait for release flag. + + Returns 0 if we acquired the lock, 1 if MineAlreadyRunning was raised. + Runs in a child process for true cross-process locking semantics. + """ + try: + with mine_palace_lock(palace_path): + # Tell the parent we hold the lock + open(ready_flag, "w").close() + # Wait until parent tells us to release + for _ in range(500): + if os.path.exists(release_flag): + return 0 + time.sleep(0.01) + return 0 + except MineAlreadyRunning: + return 1 + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_single_acquire_succeeds(tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) - with mine_global_lock(): + with mine_palace_lock(str(tmp_path / "palace")): pass # should not raise -def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch): - """Concurrent second acquire raises MineAlreadyRunning.""" +def test_lock_reusable_after_release(tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) - results: list[str] = [] - - with mine_global_lock(): - # While this lock is held, spawn a thread that tries to acquire. - def try_acquire(): - try: - with mine_global_lock(): - results.append("acquired") - except MineAlreadyRunning: - results.append("blocked") - - t = threading.Thread(target=try_acquire) - t.start() - t.join(timeout=5) - - assert results == ["blocked"] - - -def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch): - """Lock can be re-acquired after the context manager exits.""" - monkeypatch.setenv("HOME", str(tmp_path)) - - with mine_global_lock(): - pass # first acquire + release - - # Second acquire must succeed; MineAlreadyRunning would propagate as failure. - with mine_global_lock(): + palace = str(tmp_path / "palace") + with mine_palace_lock(palace): + pass + # Re-acquire must succeed now that the previous holder released + with mine_palace_lock(palace): pass -def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch): - """Lock is released even when the body raises.""" +def test_same_palace_serializes_across_processes(tmp_path, monkeypatch): + """Two processes contending for the same palace: second must be rejected.""" monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") - with pytest.raises(ValueError): - with mine_global_lock(): - raise ValueError("boom") + ctx = multiprocessing.get_context("fork") + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + # Wait for the holder to acquire + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" - # Must be acquirable again after the exception. - with mine_global_lock(): - pass + # From the parent, we must not be able to acquire the same palace lock + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(palace): + pytest.fail("second acquire of same palace should have raised") + finally: + open(release, "w").close() + holder.join(timeout=5) + assert holder.exitcode == 0 + + +def test_different_palaces_dont_conflict(tmp_path, monkeypatch): + """Mines against different palaces must NOT block each other.""" + monkeypatch.setenv("HOME", str(tmp_path)) + palace_a = str(tmp_path / "palace_a") + palace_b = str(tmp_path / "palace_b") + ready = str(tmp_path / "ready_a") + release = str(tmp_path / "release_a") + + ctx = multiprocessing.get_context("fork") + holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # Different palace — must succeed even while palace_a is held + with mine_palace_lock(palace_b): + pass # no exception expected + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_palace_path_is_normalized(tmp_path, monkeypatch): + """Relative and absolute forms of the same path must use the same lock.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.chdir(tmp_path) + os.makedirs(tmp_path / "palace", exist_ok=True) + absolute = str(tmp_path / "palace") + relative = "palace" + + # Hold the lock with the absolute form; attempting to re-acquire with + # the relative form (which resolves to the same absolute path) must fail. + with mine_palace_lock(absolute): + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(relative): + pytest.fail("normalized path collision should have raised") + + +def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch): + """Old callers of `mine_global_lock` should still work.""" + monkeypatch.setenv("HOME", str(tmp_path)) + assert mine_global_lock is mine_palace_lock + with mine_global_lock(str(tmp_path / "palace")): + pass # the alias accepts the same palace_path argument