From f30fdf2672f340c773d4f0554b0017abb48ed4b9 Mon Sep 17 00:00:00 2001 From: imtylervo Date: Mon, 27 Apr 2026 14:16:20 +1000 Subject: [PATCH 1/2] fix: serialize ChromaCollection writes through palace lock #976 protects `mempalace mine`, but MCP/direct backend writers still call ChromaCollection.add/upsert/update/delete without the palace lock. This moves the lock boundary to the Chroma backend seam so all Chroma writes share the same palace-level serialization, with a re-entrant guard for miner paths that already hold the lock. mine_palace_lock(palace_path) gains a per-thread re-entrant guard (threading.local + pid-tag against fork inheritance) so ChromaCollection write methods can take the lock without self-deadlocking when called from inside miner.mine()'s outer hold. ChromaCollection.__init__ accepts an optional palace_path; when set, add/upsert/update/delete wrap their underlying chromadb call with mine_palace_lock(palace_path). palace_path=None preserves the legacy no-lock behaviour for direct callers and tests. ChromaBackend's get_collection/create_collection pass palace_path through; mcp_server._get_collection forwards _config.palace_path so all MCP write tools inherit the wrapping. Tests: 5 new in tests/test_chroma_collection_lock.py covering opt-in, writer-blocks-during-mine, re-entrant-inside-mine, two-process serialization, and a source-level read-path-not-locked pin. Plus 1 new + 1 rewritten in tests/test_palace_locks.py for the re-entrant semantics. 52 passed in 1.01s including the existing test_backends.py regression suite. Refs #1161. --- mempalace/backends/chroma.py | 54 ++++- mempalace/mcp_server.py | 4 +- mempalace/palace.py | 59 ++++- tests/test_chroma_collection_lock.py | 327 +++++++++++++++++++++++++++ tests/test_palace_locks.py | 70 +++++- 5 files changed, 497 insertions(+), 17 deletions(-) create mode 100644 tests/test_chroma_collection_lock.py diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index ad7748f..d438236 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -1,5 +1,6 @@ """ChromaDB-backed MemPalace storage backend (RFC 001 reference implementation).""" +import contextlib import datetime as _dt import logging import os @@ -573,10 +574,43 @@ def _as_list(v: Any) -> list: class ChromaCollection(BaseCollection): - """Thin adapter translating ChromaDB dict returns into typed results.""" + """Thin adapter translating ChromaDB dict returns into typed results. - def __init__(self, collection): + When ``palace_path`` is set, all write methods (``add``, ``upsert``, + ``update``, ``delete``) acquire ``mine_palace_lock(palace_path)`` for the + duration of the underlying chromadb call. This serializes MCP and other + direct-backend writers against ``mempalace mine`` and against each other, + closing the race between concurrent writers that triggers ChromaDB's + multi-threaded HNSW corruption (#974/#965). + + The lock is the same primitive used by ``miner.mine()`` so re-entrant + acquisition from inside the mine pipeline (mine -> _mine_body -> + collection.upsert) is short-circuited by the per-thread guard inside + ``mine_palace_lock`` — no self-deadlock. + + ``palace_path=None`` disables the wrapping, preserving the legacy + no-lock behaviour for callers that construct a ``ChromaCollection`` + directly without going through ``ChromaBackend``. + """ + + def __init__(self, collection, palace_path: Optional[str] = None): self._collection = collection + self._palace_path = palace_path + + @contextlib.contextmanager + def _write_lock(self): + """Acquire ``mine_palace_lock`` for the configured palace, if any. + + No-op (yields immediately) when ``self._palace_path`` is None. + """ + if self._palace_path is None: + yield + return + # Late import — palace.py imports ChromaBackend from this module. + from ..palace import mine_palace_lock + + with mine_palace_lock(self._palace_path): + yield # ------------------------------------------------------------------ # Writes @@ -588,7 +622,8 @@ class ChromaCollection(BaseCollection): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.add(**kwargs) + with self._write_lock(): + self._collection.add(**kwargs) def upsert(self, *, documents, ids, metadatas=None, embeddings=None): kwargs: dict[str, Any] = {"documents": documents, "ids": ids} @@ -596,7 +631,8 @@ class ChromaCollection(BaseCollection): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.upsert(**kwargs) + with self._write_lock(): + self._collection.upsert(**kwargs) def update( self, @@ -615,7 +651,8 @@ class ChromaCollection(BaseCollection): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.update(**kwargs) + with self._write_lock(): + self._collection.update(**kwargs) # ------------------------------------------------------------------ # Reads @@ -759,7 +796,8 @@ class ChromaCollection(BaseCollection): kwargs["ids"] = ids if where is not None: kwargs["where"] = where - self._collection.delete(**kwargs) + with self._write_lock(): + self._collection.delete(**kwargs) def count(self): return self._collection.count() @@ -998,7 +1036,7 @@ class ChromaBackend(BaseBackend): else: collection = client.get_collection(collection_name, **ef_kwargs) _pin_hnsw_threads(collection) - return ChromaCollection(collection) + return ChromaCollection(collection, palace_path=palace_path) def close_palace(self, palace) -> None: """Drop cached handles for ``palace``. Accepts ``PalaceRef`` or legacy path str.""" @@ -1045,7 +1083,7 @@ class ChromaBackend(BaseBackend): metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, **ef_kwargs, ) - return ChromaCollection(collection) + return ChromaCollection(collection, palace_path=palace_path) def _normalize_get_collection_args(args, kwargs): diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 9cc454e..7e22227 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -288,13 +288,13 @@ def _get_collection(create=False): metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) _pin_hnsw_threads(raw) - _collection_cache = ChromaCollection(raw) + _collection_cache = ChromaCollection(raw, palace_path=_config.palace_path) _metadata_cache = None _metadata_cache_time = 0 elif _collection_cache is None: raw = client.get_collection(_config.collection_name) _pin_hnsw_threads(raw) - _collection_cache = ChromaCollection(raw) + _collection_cache = ChromaCollection(raw, palace_path=_config.palace_path) _metadata_cache = None _metadata_cache_time = 0 return _collection_cache diff --git a/mempalace/palace.py b/mempalace/palace.py index 07efb6a..97f67ff 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -8,6 +8,7 @@ import contextlib import hashlib import os import re +import threading from .backends.chroma import ChromaBackend @@ -314,6 +315,47 @@ class MineAlreadyRunning(RuntimeError): """Raised when another `mempalace mine` already holds the per-palace lock.""" +# Per-thread record of palaces this thread already holds the lock for. Used by +# `mine_palace_lock` to short-circuit re-entrant acquisition from the same +# thread (e.g. miner.mine() acquires the outer lock then calls +# ChromaCollection.upsert which now also tries to acquire). Without this guard +# the inner call would block on its own outer flock (Linux fcntl locks are per +# open file description, so a same-thread second open of the lock file is a +# distinct lock and self-deadlocks). +# +# The holder set is tagged with ``pid`` so that a forked child does NOT +# inherit re-entrant credit from its parent: the OS-level flock IS NOT +# inherited as a "we hold it" semantically — the child must reacquire — but +# Python's ``threading.local`` IS inherited across fork. The pid check +# clears stale state so a forked child correctly hits the fcntl path. +_palace_lock_holders = threading.local() + + +def _holder_state(): + """Return the per-thread (pid, keys) record, refreshing after fork.""" + keys = getattr(_palace_lock_holders, "keys", None) + pid = getattr(_palace_lock_holders, "pid", None) + current_pid = os.getpid() + if keys is None or pid != current_pid: + keys = set() + _palace_lock_holders.keys = keys + _palace_lock_holders.pid = current_pid + return keys + + +def _held_by_this_thread(lock_key: str) -> bool: + """Return True if this thread already holds ``mine_palace_lock`` for ``lock_key``.""" + return lock_key in _holder_state() + + +def _mark_held(lock_key: str) -> None: + _holder_state().add(lock_key) + + +def _mark_released(lock_key: str) -> None: + _holder_state().discard(lock_key) + + @contextlib.contextmanager def mine_palace_lock(palace_path: str): """Per-palace non-blocking lock around the full `mine` pipeline. @@ -338,6 +380,12 @@ def mine_palace_lock(palace_path: str): Non-blocking: if another `mine` is already writing to this palace, raise MineAlreadyRunning so the caller can exit cleanly instead of piling up as a waiting worker. + + Re-entrant: if the current thread already holds the lock for the same + palace, the context manager passes through without re-acquiring. This + lets ChromaCollection write methods (which acquire the lock themselves + to protect MCP/direct callers) compose with miner.mine() (which holds + the outer lock for the entire mine pipeline) without self-deadlock. """ lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") os.makedirs(lock_dir, exist_ok=True) @@ -346,6 +394,11 @@ def mine_palace_lock(palace_path: str): palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16] lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") + if _held_by_this_thread(palace_key): + # Same thread already holds the lock for this palace — pass through. + yield + return + lf = open(lock_path, "w") acquired = False try: @@ -369,7 +422,11 @@ def mine_palace_lock(palace_path: str): raise MineAlreadyRunning( f"another `mempalace mine` is already running against {resolved}" ) from exc - yield + _mark_held(palace_key) + try: + yield + finally: + _mark_released(palace_key) finally: if acquired: try: diff --git a/tests/test_chroma_collection_lock.py b/tests/test_chroma_collection_lock.py new file mode 100644 index 0000000..b5d30fb --- /dev/null +++ b/tests/test_chroma_collection_lock.py @@ -0,0 +1,327 @@ +"""Tests for ChromaCollection's palace-write-lock integration. + +Closes the gap left by ``mine_palace_lock`` only protecting the +``mempalace mine`` pipeline: MCP/direct writers that call +``ChromaCollection.add/upsert/update/delete`` must also serialize against +mine and against each other to avoid the multi-threaded HNSW corruption +documented in #974/#965. + +Property tested: + +* ``ChromaCollection(c, palace_path=p)`` wraps every write with + ``mine_palace_lock(p)``. +* Writes raise ``MineAlreadyRunning`` when another holder owns the lock + (instead of silently racing into the underlying chromadb call). +* Re-entrant composition with ``miner.mine()`` does not self-deadlock: + ``with mine_palace_lock(p): col.upsert(...)`` runs to completion. +* ``ChromaCollection(c)`` (no palace_path) preserves legacy no-lock + behaviour for tests/callers that build the adapter directly without + going through ``ChromaBackend``. + +POSIX-only: ``mine_palace_lock`` uses ``fcntl`` on Unix and ``msvcrt`` on +Windows; the contention semantics differ enough that the cross-process +tests are skipped on Windows runners. +""" + +from __future__ import annotations + +import multiprocessing +import os +import time + +import pytest + +from mempalace.backends.chroma import ChromaCollection +from mempalace.palace import MineAlreadyRunning, mine_palace_lock + + +def _get_mp_context(): + """Same start-method picker as test_palace_locks.py.""" + start_method = "spawn" if os.name == "nt" else "fork" + return multiprocessing.get_context(start_method) + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +class _FakeChromaCollection: + """Records calls; never blocks. Stand-in for chromadb.Collection.""" + + def __init__(self): + self.adds: list[dict] = [] + self.upserts: list[dict] = [] + self.updates: list[dict] = [] + self.deletes: list[dict] = [] + + def add(self, **kwargs): + self.adds.append(kwargs) + + def upsert(self, **kwargs): + self.upserts.append(kwargs) + + def update(self, **kwargs): + self.updates.append(kwargs) + + def delete(self, **kwargs): + self.deletes.append(kwargs) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int: + """Acquire ``mine_palace_lock``, signal readiness, wait for release. + + Mirrors the helper in ``test_palace_locks.py`` so the contention + semantics match across both test files. + """ + try: + with mine_palace_lock(palace_path): + open(ready_flag, "w").close() + for _ in range(500): + if os.path.exists(release_flag): + return 0 + time.sleep(0.01) + return 0 + except MineAlreadyRunning: + return 1 + + +# --------------------------------------------------------------------------- +# Tests — opt-in lock wiring +# --------------------------------------------------------------------------- + + +def test_palace_path_none_skips_lock(tmp_path, monkeypatch): + """Legacy callers (``ChromaCollection(c)``) keep no-lock behaviour. + + A ``ChromaCollection`` built without ``palace_path`` must not touch the + lock infrastructure at all. This guards against regressions where a + test or third-party caller relies on the historical bare-write path. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + fake = _FakeChromaCollection() + col = ChromaCollection(fake) # no palace_path -> no lock + + # Hold the lock in a child process. Without palace_path, the parent + # write must still succeed (the lock does not gate this caller). + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock" + + col.upsert(documents=["doc"], ids=["id-1"]) + assert fake.upserts == [{"documents": ["doc"], "ids": ["id-1"]}] + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_writer_blocks_during_mine(tmp_path, monkeypatch): + """A held ``mine_palace_lock`` causes ``ChromaCollection`` writes to raise. + + This is the property that closes the MCP-bypass gap: when a mine is in + flight, MCP/direct writes raise ``MineAlreadyRunning`` rather than + silently entering chromadb's write path concurrent with mine. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock" + + fake = _FakeChromaCollection() + col = ChromaCollection(fake, palace_path=palace) + + with pytest.raises(MineAlreadyRunning): + col.upsert(documents=["doc"], ids=["id-1"]) + with pytest.raises(MineAlreadyRunning): + col.add(documents=["doc"], ids=["id-2"]) + with pytest.raises(MineAlreadyRunning): + col.update(ids=["id-3"], documents=["doc"]) + with pytest.raises(MineAlreadyRunning): + col.delete(ids=["id-4"]) + + # The fake must have received NO calls — the lock must gate + # before reaching the underlying chromadb layer. + assert fake.upserts == [] + assert fake.adds == [] + assert fake.updates == [] + assert fake.deletes == [] + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_reentrant_inside_mine_passes_through(tmp_path, monkeypatch): + """``ChromaCollection.upsert`` inside ``mine_palace_lock`` does not deadlock. + + ``miner.mine()`` already holds ``mine_palace_lock(palace_path)`` for the + full mine pipeline; ``_mine_body`` then calls + ``collection.upsert(...)``. With the per-thread re-entrant guard in + ``mine_palace_lock``, the inner acquire is a pass-through and the + underlying chromadb call runs immediately. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + fake = _FakeChromaCollection() + col = ChromaCollection(fake, palace_path=palace) + + with mine_palace_lock(palace): + # If the re-entrant guard were missing, this would self-deadlock on + # the underlying flock. We rely on pytest-timeout (configured in + # pyproject.toml) to enforce this in CI; the assertion just confirms + # the call landed. + col.upsert(documents=["d"], ids=["i"], metadatas=[{"k": "v"}]) + col.add(documents=["d2"], ids=["i2"]) + col.update(ids=["i"], documents=["d-updated"]) + col.delete(ids=["i2"]) + + assert len(fake.upserts) == 1 + assert len(fake.adds) == 1 + assert len(fake.updates) == 1 + assert len(fake.deletes) == 1 + + +class _SlowFakeChromaCollection(_FakeChromaCollection): + """Fake whose write methods hold the caller for ``hold_seconds``. + + Used to keep ``mine_palace_lock`` acquired long enough for a sibling + process to contend deterministically. + """ + + def __init__(self, hold_seconds: float = 0.3): + super().__init__() + self._hold = hold_seconds + + def upsert(self, **kwargs): + time.sleep(self._hold) + super().upsert(**kwargs) + + +def _slow_writer_target(palace_path, tmp_path_str, pid, result_q): + """Subprocess target: try a slow upsert, report ok/busy.""" + os.environ["HOME"] = tmp_path_str + # Fresh import inside child so HOME monkeypatch routes the lock dir. + from mempalace.backends.chroma import ChromaCollection as _CC + from mempalace.palace import MineAlreadyRunning as _MAR + + fake = _SlowFakeChromaCollection(hold_seconds=0.3) + col = _CC(fake, palace_path=palace_path) + try: + col.upsert(documents=[f"d{pid}"], ids=[f"i{pid}"]) + result_q.put(("ok", pid)) + except _MAR: + result_q.put(("busy", pid)) + + +def test_concurrent_writers_serialize(tmp_path, monkeypatch): + """Two processes calling ``ChromaCollection.upsert`` against the same + palace must be serialized: at most one enters chromadb at a time, the + other raises ``MineAlreadyRunning``. + + This is the property that prevents the parallel HNSW insert race that + drives #974/#965 — under concurrent MCP write fan-out, exactly one + writer reaches chromadb and the rest fail loudly instead of corrupting + the index. + + The slow fake holds the lock for 0.3s per writer, large enough for the + second process to contend even on slow CI runners. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + + ctx = _get_mp_context() + result_q = ctx.Queue() + + p1 = ctx.Process( + target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q) + ) + p2 = ctx.Process( + target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q) + ) + p1.start() + # Tiny stagger so p1 wins the race deterministically; without it the + # OS scheduler can pick either, which is also a valid outcome but + # makes the assertion brittle on slow CI. + time.sleep(0.05) + p2.start() + p1.join(timeout=5) + p2.join(timeout=5) + + outcomes = [result_q.get(timeout=1) for _ in range(2)] + statuses = sorted(o[0] for o in outcomes) + assert statuses == ["busy", "ok"], ( + f"expected one ok + one busy, got {outcomes}" + ) + + +def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch): + """``query`` / ``get`` / ``count`` must not be gated by the write lock. + + Read traffic is the dominant workload (semantic search, MCP get, etc.) + and serializing it against mine would tank latency for no correctness + benefit. This test pins that property: with another process holding + the write lock, reads must still complete instantly. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock" + + # _FakeChromaCollection doesn't implement query/get/count; we only + # need to confirm the wrapper does not call into mine_palace_lock + # for reads, which we assert by observing the wrapped methods are + # NOT in ChromaCollection's _write_lock path. A direct check via + # source inspection is more honest than mocking the entire chroma + # surface here. + import inspect + + from mempalace.backends.chroma import ChromaCollection as _CC + + for write_attr in ("add", "upsert", "update", "delete"): + src = inspect.getsource(getattr(_CC, write_attr)) + assert "_write_lock" in src, f"{write_attr} should acquire write lock" + + for read_attr in ("query", "get", "count"): + method = getattr(_CC, read_attr, None) + if method is None: + continue + src = inspect.getsource(method) + assert "_write_lock" not in src, ( + f"{read_attr} must NOT acquire the write lock (read path)" + ) + finally: + open(release, "w").close() + holder.join(timeout=5) diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index 601c894..39aa50c 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -135,19 +135,77 @@ def test_different_palaces_dont_conflict(tmp_path, monkeypatch): def test_palace_path_is_normalized(tmp_path, monkeypatch): - """Relative and absolute forms of the same path must use the same lock.""" + """Relative and absolute forms of the same path must use the same lock. + + Cross-process variant: a child holds the absolute form, a relative form + in the parent must hash to the same lock key and raise + ``MineAlreadyRunning``. (The same-thread case is now a re-entrant + pass-through by design — see ``test_reentrant_same_thread_passes_through`` + — so we exercise the normalization invariant across a process boundary + where re-entrance does not apply.) + """ monkeypatch.setenv("HOME", str(tmp_path)) monkeypatch.chdir(tmp_path) os.makedirs(tmp_path / "palace", exist_ok=True) absolute = str(tmp_path / "palace") - relative = "palace" + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") - # Hold the lock with the absolute form; attempting to re-acquire with - # the relative form (which resolves to the same absolute path) must fail. - with mine_palace_lock(absolute): + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(absolute, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # Parent holds CWD = tmp_path so "palace" is the same on-disk dir as + # the absolute form. The lock key is sha256(realpath+normcase) so the + # two forms must collide. with pytest.raises(MineAlreadyRunning): - with mine_palace_lock(relative): + with mine_palace_lock("palace"): pytest.fail("normalized path collision should have raised") + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_reentrant_same_thread_passes_through(tmp_path, monkeypatch): + """Same thread re-acquiring the same palace lock must not deadlock or raise. + + This is the invariant that makes ``ChromaCollection`` write methods (which + take ``mine_palace_lock`` for MCP/direct-writer protection) compose with + ``miner.mine()`` (which already holds the lock for the entire mine + pipeline). Without the per-thread re-entrant guard the inner acquire + would self-deadlock on the outer flock. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + with mine_palace_lock(palace): + # Re-enter from the same thread — must yield without raising or hanging. + with mine_palace_lock(palace): + pass + # After the inner exits, the outer is still held: confirm via a + # subprocess that tries to acquire and reports back. + ctx = _get_mp_context() + result_q = ctx.Queue() + child = ctx.Process(target=_try_acquire_expect_busy, args=(palace, result_q)) + child.start() + child.join(timeout=5) + assert result_q.get(timeout=1) == "busy", ( + "outer lock should still be held by parent after inner re-entrant exit" + ) + + +def _try_acquire_expect_busy(palace_path, result_q): + """Helper: try to acquire, push 'busy' (raised) or 'free' (acquired) into queue.""" + try: + with mine_palace_lock(palace_path): + result_q.put("free") + except MineAlreadyRunning: + result_q.put("busy") def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch): From d1e27b8c42f3892046bb20f950d7b2e04726e230 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Wed, 6 May 2026 01:47:46 -0300 Subject: [PATCH 2/2] style: ruff format new test files (CI lint) --- tests/test_chroma_collection_lock.py | 18 ++++++------------ tests/test_palace_locks.py | 6 +++--- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/tests/test_chroma_collection_lock.py b/tests/test_chroma_collection_lock.py index b5d30fb..536b5e8 100644 --- a/tests/test_chroma_collection_lock.py +++ b/tests/test_chroma_collection_lock.py @@ -255,12 +255,8 @@ def test_concurrent_writers_serialize(tmp_path, monkeypatch): ctx = _get_mp_context() result_q = ctx.Queue() - p1 = ctx.Process( - target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q) - ) - p2 = ctx.Process( - target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q) - ) + p1 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q)) + p2 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q)) p1.start() # Tiny stagger so p1 wins the race deterministically; without it the # OS scheduler can pick either, which is also a valid outcome but @@ -272,9 +268,7 @@ def test_concurrent_writers_serialize(tmp_path, monkeypatch): outcomes = [result_q.get(timeout=1) for _ in range(2)] statuses = sorted(o[0] for o in outcomes) - assert statuses == ["busy", "ok"], ( - f"expected one ok + one busy, got {outcomes}" - ) + assert statuses == ["busy", "ok"], f"expected one ok + one busy, got {outcomes}" def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch): @@ -319,9 +313,9 @@ def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch): if method is None: continue src = inspect.getsource(method) - assert "_write_lock" not in src, ( - f"{read_attr} must NOT acquire the write lock (read path)" - ) + assert ( + "_write_lock" not in src + ), f"{read_attr} must NOT acquire the write lock (read path)" finally: open(release, "w").close() holder.join(timeout=5) diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index 39aa50c..d239757 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -194,9 +194,9 @@ def test_reentrant_same_thread_passes_through(tmp_path, monkeypatch): child = ctx.Process(target=_try_acquire_expect_busy, args=(palace, result_q)) child.start() child.join(timeout=5) - assert result_q.get(timeout=1) == "busy", ( - "outer lock should still be held by parent after inner re-entrant exit" - ) + assert ( + result_q.get(timeout=1) == "busy" + ), "outer lock should still be held by parent after inner re-entrant exit" def _try_acquire_expect_busy(palace_path, result_q):