fix: resolve hooks_cli.py merge conflict + add mine_global_lock tests
- Resolve UU conflict in hooks_cli.py: take develop/HEAD approach (mine synchronously via _mine_sync, then pass through unconditionally). _mine_sync already catches subprocess.TimeoutExpired — fixes Copilot #1. - Add tests/test_palace_locks.py: 4 tests covering mine_global_lock non-blocking semantics (acquire, second-acquire raises MineAlreadyRunning, reusable after release, release on exception) — fixes Copilot #4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
committed by
Igor Lins e Silva
parent
91a60263e3
commit
7e18a70796
@@ -566,7 +566,9 @@ class ChromaBackend(BaseBackend):
|
||||
|
||||
if create:
|
||||
collection = client.get_or_create_collection(
|
||||
collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs
|
||||
collection_name,
|
||||
metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1},
|
||||
**ef_kwargs,
|
||||
)
|
||||
else:
|
||||
collection = client.get_collection(collection_name, **ef_kwargs)
|
||||
@@ -613,7 +615,9 @@ class ChromaBackend(BaseBackend):
|
||||
ef = self._resolve_embedding_function()
|
||||
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
|
||||
collection = self._client(palace_path).create_collection(
|
||||
collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs
|
||||
collection_name,
|
||||
metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1},
|
||||
**ef_kwargs,
|
||||
)
|
||||
return ChromaCollection(collection)
|
||||
|
||||
|
||||
@@ -643,6 +643,9 @@ def hook_session_start(data: dict, harness: str):
|
||||
_output({})
|
||||
|
||||
|
||||
MAX_PRECOMPACT_BLOCK_ATTEMPTS = 2
|
||||
|
||||
|
||||
def hook_precompact(data: dict, harness: str):
|
||||
"""Precompact hook: mine transcript synchronously, then allow compaction."""
|
||||
parsed = _parse_harness_input(data, harness)
|
||||
|
||||
@@ -20,10 +20,12 @@ from typing import Optional
|
||||
from .palace import (
|
||||
NORMALIZE_VERSION,
|
||||
SKIP_DIRS,
|
||||
MineAlreadyRunning,
|
||||
build_closet_lines,
|
||||
file_already_mined,
|
||||
get_closets_collection,
|
||||
get_collection,
|
||||
mine_global_lock,
|
||||
mine_lock,
|
||||
purge_file_closets,
|
||||
upsert_closet_lines,
|
||||
@@ -993,6 +995,48 @@ def mine(
|
||||
``mine`` walks the tree itself just like before.
|
||||
"""
|
||||
|
||||
if dry_run:
|
||||
return _mine_impl(
|
||||
project_dir,
|
||||
palace_path,
|
||||
wing_override=wing_override,
|
||||
agent=agent,
|
||||
limit=limit,
|
||||
dry_run=dry_run,
|
||||
respect_gitignore=respect_gitignore,
|
||||
include_ignored=include_ignored,
|
||||
)
|
||||
|
||||
try:
|
||||
with mine_global_lock():
|
||||
return _mine_impl(
|
||||
project_dir,
|
||||
palace_path,
|
||||
wing_override=wing_override,
|
||||
agent=agent,
|
||||
limit=limit,
|
||||
dry_run=dry_run,
|
||||
respect_gitignore=respect_gitignore,
|
||||
include_ignored=include_ignored,
|
||||
)
|
||||
except MineAlreadyRunning:
|
||||
print(
|
||||
"mempalace: another `mine` is already running — exiting cleanly.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
def _mine_impl(
|
||||
project_dir: str,
|
||||
palace_path: str,
|
||||
wing_override: str = None,
|
||||
agent: str = "mempalace",
|
||||
limit: int = 0,
|
||||
dry_run: bool = False,
|
||||
respect_gitignore: bool = True,
|
||||
include_ignored: list = None,
|
||||
):
|
||||
project_path = Path(project_dir).expanduser().resolve()
|
||||
config = load_config(project_dir)
|
||||
|
||||
|
||||
@@ -310,6 +310,69 @@ def mine_lock(source_file: str):
|
||||
lf.close()
|
||||
|
||||
|
||||
class MineAlreadyRunning(RuntimeError):
|
||||
"""Raised when another `mempalace mine` process already holds the global lock."""
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def mine_global_lock():
|
||||
"""Process-wide non-blocking lock around the full `mine` pipeline.
|
||||
|
||||
The per-file `mine_lock` only protects delete+insert interleave for a
|
||||
single source; it does not prevent N copies of `mempalace mine <dir>`
|
||||
from being spawned concurrently by hooks. When that happens, each copy
|
||||
drives ChromaDB HNSW inserts in parallel, which (combined with
|
||||
chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and
|
||||
produce sparse link_lists.bin blowups.
|
||||
|
||||
This lock is non-blocking: if another `mine` is already running, we
|
||||
raise MineAlreadyRunning so the caller can exit cleanly instead of
|
||||
piling up waiting workers.
|
||||
"""
|
||||
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
|
||||
os.makedirs(lock_dir, exist_ok=True)
|
||||
lock_path = os.path.join(lock_dir, "mine_global.lock")
|
||||
|
||||
lf = open(lock_path, "w")
|
||||
acquired = False
|
||||
try:
|
||||
if os.name == "nt":
|
||||
import msvcrt
|
||||
|
||||
try:
|
||||
msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
acquired = True
|
||||
except OSError as exc:
|
||||
raise MineAlreadyRunning(
|
||||
"another `mempalace mine` is already running"
|
||||
) from exc
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
try:
|
||||
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
acquired = True
|
||||
except BlockingIOError as exc:
|
||||
raise MineAlreadyRunning(
|
||||
"another `mempalace mine` is already running"
|
||||
) from exc
|
||||
yield
|
||||
finally:
|
||||
if acquired:
|
||||
try:
|
||||
if os.name == "nt":
|
||||
import msvcrt
|
||||
|
||||
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(lf, fcntl.LOCK_UN)
|
||||
except Exception:
|
||||
pass
|
||||
lf.close()
|
||||
|
||||
|
||||
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
|
||||
"""Check if a file has already been filed in the palace.
|
||||
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
"""Tests for mine_global_lock: non-blocking cross-process lock semantics."""
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
|
||||
from mempalace.palace import MineAlreadyRunning, mine_global_lock
|
||||
|
||||
|
||||
def test_mine_global_lock_acquired(tmp_path, monkeypatch):
|
||||
"""Lock is acquired and released without error."""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
with mine_global_lock():
|
||||
pass # should not raise
|
||||
|
||||
|
||||
def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch):
|
||||
"""Concurrent second acquire raises MineAlreadyRunning."""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
results: list[str] = []
|
||||
|
||||
with mine_global_lock():
|
||||
# While this lock is held, spawn a thread that tries to acquire.
|
||||
def try_acquire():
|
||||
try:
|
||||
with mine_global_lock():
|
||||
results.append("acquired")
|
||||
except MineAlreadyRunning:
|
||||
results.append("blocked")
|
||||
|
||||
t = threading.Thread(target=try_acquire)
|
||||
t.start()
|
||||
t.join(timeout=5)
|
||||
|
||||
assert results == ["blocked"]
|
||||
|
||||
|
||||
def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch):
|
||||
"""Lock can be re-acquired after the context manager exits."""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
|
||||
with mine_global_lock():
|
||||
pass # first acquire + release
|
||||
|
||||
# Second acquire must succeed; MineAlreadyRunning would propagate as failure.
|
||||
with mine_global_lock():
|
||||
pass
|
||||
|
||||
|
||||
def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch):
|
||||
"""Lock is released even when the body raises."""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with mine_global_lock():
|
||||
raise ValueError("boom")
|
||||
|
||||
# Must be acquirable again after the exception.
|
||||
with mine_global_lock():
|
||||
pass
|
||||
Reference in New Issue
Block a user