Merge pull request #1162 from imtylervo/fix/palace-write-lock-queue-pattern
fix: serialize ChromaCollection writes through palace lock
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
"""ChromaDB-backed MemPalace storage backend (RFC 001 reference implementation)."""
|
"""ChromaDB-backed MemPalace storage backend (RFC 001 reference implementation)."""
|
||||||
|
|
||||||
|
import contextlib
|
||||||
import datetime as _dt
|
import datetime as _dt
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -691,10 +692,43 @@ def _close_client(client) -> None:
|
|||||||
|
|
||||||
|
|
||||||
class ChromaCollection(BaseCollection):
|
class ChromaCollection(BaseCollection):
|
||||||
"""Thin adapter translating ChromaDB dict returns into typed results."""
|
"""Thin adapter translating ChromaDB dict returns into typed results.
|
||||||
|
|
||||||
def __init__(self, collection):
|
When ``palace_path`` is set, all write methods (``add``, ``upsert``,
|
||||||
|
``update``, ``delete``) acquire ``mine_palace_lock(palace_path)`` for the
|
||||||
|
duration of the underlying chromadb call. This serializes MCP and other
|
||||||
|
direct-backend writers against ``mempalace mine`` and against each other,
|
||||||
|
closing the race between concurrent writers that triggers ChromaDB's
|
||||||
|
multi-threaded HNSW corruption (#974/#965).
|
||||||
|
|
||||||
|
The lock is the same primitive used by ``miner.mine()`` so re-entrant
|
||||||
|
acquisition from inside the mine pipeline (mine -> _mine_body ->
|
||||||
|
collection.upsert) is short-circuited by the per-thread guard inside
|
||||||
|
``mine_palace_lock`` — no self-deadlock.
|
||||||
|
|
||||||
|
``palace_path=None`` disables the wrapping, preserving the legacy
|
||||||
|
no-lock behaviour for callers that construct a ``ChromaCollection``
|
||||||
|
directly without going through ``ChromaBackend``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, collection, palace_path: Optional[str] = None):
|
||||||
self._collection = collection
|
self._collection = collection
|
||||||
|
self._palace_path = palace_path
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _write_lock(self):
|
||||||
|
"""Acquire ``mine_palace_lock`` for the configured palace, if any.
|
||||||
|
|
||||||
|
No-op (yields immediately) when ``self._palace_path`` is None.
|
||||||
|
"""
|
||||||
|
if self._palace_path is None:
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
# Late import — palace.py imports ChromaBackend from this module.
|
||||||
|
from ..palace import mine_palace_lock
|
||||||
|
|
||||||
|
with mine_palace_lock(self._palace_path):
|
||||||
|
yield
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Writes
|
# Writes
|
||||||
@@ -706,7 +740,8 @@ class ChromaCollection(BaseCollection):
|
|||||||
kwargs["metadatas"] = metadatas
|
kwargs["metadatas"] = metadatas
|
||||||
if embeddings is not None:
|
if embeddings is not None:
|
||||||
kwargs["embeddings"] = embeddings
|
kwargs["embeddings"] = embeddings
|
||||||
self._collection.add(**kwargs)
|
with self._write_lock():
|
||||||
|
self._collection.add(**kwargs)
|
||||||
|
|
||||||
def upsert(self, *, documents, ids, metadatas=None, embeddings=None):
|
def upsert(self, *, documents, ids, metadatas=None, embeddings=None):
|
||||||
kwargs: dict[str, Any] = {"documents": documents, "ids": ids}
|
kwargs: dict[str, Any] = {"documents": documents, "ids": ids}
|
||||||
@@ -714,7 +749,8 @@ class ChromaCollection(BaseCollection):
|
|||||||
kwargs["metadatas"] = metadatas
|
kwargs["metadatas"] = metadatas
|
||||||
if embeddings is not None:
|
if embeddings is not None:
|
||||||
kwargs["embeddings"] = embeddings
|
kwargs["embeddings"] = embeddings
|
||||||
self._collection.upsert(**kwargs)
|
with self._write_lock():
|
||||||
|
self._collection.upsert(**kwargs)
|
||||||
|
|
||||||
def update(
|
def update(
|
||||||
self,
|
self,
|
||||||
@@ -733,7 +769,8 @@ class ChromaCollection(BaseCollection):
|
|||||||
kwargs["metadatas"] = metadatas
|
kwargs["metadatas"] = metadatas
|
||||||
if embeddings is not None:
|
if embeddings is not None:
|
||||||
kwargs["embeddings"] = embeddings
|
kwargs["embeddings"] = embeddings
|
||||||
self._collection.update(**kwargs)
|
with self._write_lock():
|
||||||
|
self._collection.update(**kwargs)
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Reads
|
# Reads
|
||||||
@@ -877,7 +914,8 @@ class ChromaCollection(BaseCollection):
|
|||||||
kwargs["ids"] = ids
|
kwargs["ids"] = ids
|
||||||
if where is not None:
|
if where is not None:
|
||||||
kwargs["where"] = where
|
kwargs["where"] = where
|
||||||
self._collection.delete(**kwargs)
|
with self._write_lock():
|
||||||
|
self._collection.delete(**kwargs)
|
||||||
|
|
||||||
def count(self):
|
def count(self):
|
||||||
return self._collection.count()
|
return self._collection.count()
|
||||||
@@ -1145,7 +1183,7 @@ class ChromaBackend(BaseBackend):
|
|||||||
else:
|
else:
|
||||||
collection = client.get_collection(collection_name, **ef_kwargs)
|
collection = client.get_collection(collection_name, **ef_kwargs)
|
||||||
_pin_hnsw_threads(collection)
|
_pin_hnsw_threads(collection)
|
||||||
return ChromaCollection(collection)
|
return ChromaCollection(collection, palace_path=palace_path)
|
||||||
|
|
||||||
def close_palace(self, palace) -> None:
|
def close_palace(self, palace) -> None:
|
||||||
"""Drop cached handles for ``palace`` and release its SQLite file lock.
|
"""Drop cached handles for ``palace`` and release its SQLite file lock.
|
||||||
@@ -1204,7 +1242,7 @@ class ChromaBackend(BaseBackend):
|
|||||||
},
|
},
|
||||||
**ef_kwargs,
|
**ef_kwargs,
|
||||||
)
|
)
|
||||||
return ChromaCollection(collection)
|
return ChromaCollection(collection, palace_path=palace_path)
|
||||||
|
|
||||||
|
|
||||||
def _normalize_get_collection_args(args, kwargs):
|
def _normalize_get_collection_args(args, kwargs):
|
||||||
|
|||||||
@@ -374,7 +374,7 @@ def _get_collection(create=False):
|
|||||||
**ef_kwargs,
|
**ef_kwargs,
|
||||||
)
|
)
|
||||||
_pin_hnsw_threads(raw)
|
_pin_hnsw_threads(raw)
|
||||||
_collection_cache = ChromaCollection(raw)
|
_collection_cache = ChromaCollection(raw, palace_path=_config.palace_path)
|
||||||
_metadata_cache = None
|
_metadata_cache = None
|
||||||
_metadata_cache_time = 0
|
_metadata_cache_time = 0
|
||||||
elif _collection_cache is None:
|
elif _collection_cache is None:
|
||||||
@@ -382,7 +382,7 @@ def _get_collection(create=False):
|
|||||||
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
|
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
|
||||||
raw = client.get_collection(_config.collection_name, **ef_kwargs)
|
raw = client.get_collection(_config.collection_name, **ef_kwargs)
|
||||||
_pin_hnsw_threads(raw)
|
_pin_hnsw_threads(raw)
|
||||||
_collection_cache = ChromaCollection(raw)
|
_collection_cache = ChromaCollection(raw, palace_path=_config.palace_path)
|
||||||
_metadata_cache = None
|
_metadata_cache = None
|
||||||
_metadata_cache_time = 0
|
_metadata_cache_time = 0
|
||||||
return _collection_cache
|
return _collection_cache
|
||||||
|
|||||||
+58
-1
@@ -8,6 +8,7 @@ import contextlib
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import threading
|
||||||
|
|
||||||
from .backends.chroma import ChromaBackend
|
from .backends.chroma import ChromaBackend
|
||||||
|
|
||||||
@@ -314,6 +315,47 @@ class MineAlreadyRunning(RuntimeError):
|
|||||||
"""Raised when another `mempalace mine` already holds the per-palace lock."""
|
"""Raised when another `mempalace mine` already holds the per-palace lock."""
|
||||||
|
|
||||||
|
|
||||||
|
# Per-thread record of palaces this thread already holds the lock for. Used by
|
||||||
|
# `mine_palace_lock` to short-circuit re-entrant acquisition from the same
|
||||||
|
# thread (e.g. miner.mine() acquires the outer lock then calls
|
||||||
|
# ChromaCollection.upsert which now also tries to acquire). Without this guard
|
||||||
|
# the inner call would block on its own outer flock (Linux fcntl locks are per
|
||||||
|
# open file description, so a same-thread second open of the lock file is a
|
||||||
|
# distinct lock and self-deadlocks).
|
||||||
|
#
|
||||||
|
# The holder set is tagged with ``pid`` so that a forked child does NOT
|
||||||
|
# inherit re-entrant credit from its parent: the OS-level flock IS NOT
|
||||||
|
# inherited as a "we hold it" semantically — the child must reacquire — but
|
||||||
|
# Python's ``threading.local`` IS inherited across fork. The pid check
|
||||||
|
# clears stale state so a forked child correctly hits the fcntl path.
|
||||||
|
_palace_lock_holders = threading.local()
|
||||||
|
|
||||||
|
|
||||||
|
def _holder_state():
|
||||||
|
"""Return the per-thread (pid, keys) record, refreshing after fork."""
|
||||||
|
keys = getattr(_palace_lock_holders, "keys", None)
|
||||||
|
pid = getattr(_palace_lock_holders, "pid", None)
|
||||||
|
current_pid = os.getpid()
|
||||||
|
if keys is None or pid != current_pid:
|
||||||
|
keys = set()
|
||||||
|
_palace_lock_holders.keys = keys
|
||||||
|
_palace_lock_holders.pid = current_pid
|
||||||
|
return keys
|
||||||
|
|
||||||
|
|
||||||
|
def _held_by_this_thread(lock_key: str) -> bool:
|
||||||
|
"""Return True if this thread already holds ``mine_palace_lock`` for ``lock_key``."""
|
||||||
|
return lock_key in _holder_state()
|
||||||
|
|
||||||
|
|
||||||
|
def _mark_held(lock_key: str) -> None:
|
||||||
|
_holder_state().add(lock_key)
|
||||||
|
|
||||||
|
|
||||||
|
def _mark_released(lock_key: str) -> None:
|
||||||
|
_holder_state().discard(lock_key)
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def mine_palace_lock(palace_path: str):
|
def mine_palace_lock(palace_path: str):
|
||||||
"""Per-palace non-blocking lock around the full `mine` pipeline.
|
"""Per-palace non-blocking lock around the full `mine` pipeline.
|
||||||
@@ -338,6 +380,12 @@ def mine_palace_lock(palace_path: str):
|
|||||||
Non-blocking: if another `mine` is already writing to this palace,
|
Non-blocking: if another `mine` is already writing to this palace,
|
||||||
raise MineAlreadyRunning so the caller can exit cleanly instead of
|
raise MineAlreadyRunning so the caller can exit cleanly instead of
|
||||||
piling up as a waiting worker.
|
piling up as a waiting worker.
|
||||||
|
|
||||||
|
Re-entrant: if the current thread already holds the lock for the same
|
||||||
|
palace, the context manager passes through without re-acquiring. This
|
||||||
|
lets ChromaCollection write methods (which acquire the lock themselves
|
||||||
|
to protect MCP/direct callers) compose with miner.mine() (which holds
|
||||||
|
the outer lock for the entire mine pipeline) without self-deadlock.
|
||||||
"""
|
"""
|
||||||
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
|
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
|
||||||
os.makedirs(lock_dir, exist_ok=True)
|
os.makedirs(lock_dir, exist_ok=True)
|
||||||
@@ -346,6 +394,11 @@ def mine_palace_lock(palace_path: str):
|
|||||||
palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16]
|
palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16]
|
||||||
lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock")
|
lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock")
|
||||||
|
|
||||||
|
if _held_by_this_thread(palace_key):
|
||||||
|
# Same thread already holds the lock for this palace — pass through.
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
|
||||||
lf = open(lock_path, "w")
|
lf = open(lock_path, "w")
|
||||||
acquired = False
|
acquired = False
|
||||||
try:
|
try:
|
||||||
@@ -369,7 +422,11 @@ def mine_palace_lock(palace_path: str):
|
|||||||
raise MineAlreadyRunning(
|
raise MineAlreadyRunning(
|
||||||
f"another `mempalace mine` is already running against {resolved}"
|
f"another `mempalace mine` is already running against {resolved}"
|
||||||
) from exc
|
) from exc
|
||||||
yield
|
_mark_held(palace_key)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
_mark_released(palace_key)
|
||||||
finally:
|
finally:
|
||||||
if acquired:
|
if acquired:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -0,0 +1,321 @@
|
|||||||
|
"""Tests for ChromaCollection's palace-write-lock integration.
|
||||||
|
|
||||||
|
Closes the gap left by ``mine_palace_lock`` only protecting the
|
||||||
|
``mempalace mine`` pipeline: MCP/direct writers that call
|
||||||
|
``ChromaCollection.add/upsert/update/delete`` must also serialize against
|
||||||
|
mine and against each other to avoid the multi-threaded HNSW corruption
|
||||||
|
documented in #974/#965.
|
||||||
|
|
||||||
|
Property tested:
|
||||||
|
|
||||||
|
* ``ChromaCollection(c, palace_path=p)`` wraps every write with
|
||||||
|
``mine_palace_lock(p)``.
|
||||||
|
* Writes raise ``MineAlreadyRunning`` when another holder owns the lock
|
||||||
|
(instead of silently racing into the underlying chromadb call).
|
||||||
|
* Re-entrant composition with ``miner.mine()`` does not self-deadlock:
|
||||||
|
``with mine_palace_lock(p): col.upsert(...)`` runs to completion.
|
||||||
|
* ``ChromaCollection(c)`` (no palace_path) preserves legacy no-lock
|
||||||
|
behaviour for tests/callers that build the adapter directly without
|
||||||
|
going through ``ChromaBackend``.
|
||||||
|
|
||||||
|
POSIX-only: ``mine_palace_lock`` uses ``fcntl`` on Unix and ``msvcrt`` on
|
||||||
|
Windows; the contention semantics differ enough that the cross-process
|
||||||
|
tests are skipped on Windows runners.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from mempalace.backends.chroma import ChromaCollection
|
||||||
|
from mempalace.palace import MineAlreadyRunning, mine_palace_lock
|
||||||
|
|
||||||
|
|
||||||
|
def _get_mp_context():
|
||||||
|
"""Same start-method picker as test_palace_locks.py."""
|
||||||
|
start_method = "spawn" if os.name == "nt" else "fork"
|
||||||
|
return multiprocessing.get_context(start_method)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fakes
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeChromaCollection:
|
||||||
|
"""Records calls; never blocks. Stand-in for chromadb.Collection."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.adds: list[dict] = []
|
||||||
|
self.upserts: list[dict] = []
|
||||||
|
self.updates: list[dict] = []
|
||||||
|
self.deletes: list[dict] = []
|
||||||
|
|
||||||
|
def add(self, **kwargs):
|
||||||
|
self.adds.append(kwargs)
|
||||||
|
|
||||||
|
def upsert(self, **kwargs):
|
||||||
|
self.upserts.append(kwargs)
|
||||||
|
|
||||||
|
def update(self, **kwargs):
|
||||||
|
self.updates.append(kwargs)
|
||||||
|
|
||||||
|
def delete(self, **kwargs):
|
||||||
|
self.deletes.append(kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int:
|
||||||
|
"""Acquire ``mine_palace_lock``, signal readiness, wait for release.
|
||||||
|
|
||||||
|
Mirrors the helper in ``test_palace_locks.py`` so the contention
|
||||||
|
semantics match across both test files.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with mine_palace_lock(palace_path):
|
||||||
|
open(ready_flag, "w").close()
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(release_flag):
|
||||||
|
return 0
|
||||||
|
time.sleep(0.01)
|
||||||
|
return 0
|
||||||
|
except MineAlreadyRunning:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tests — opt-in lock wiring
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_palace_path_none_skips_lock(tmp_path, monkeypatch):
|
||||||
|
"""Legacy callers (``ChromaCollection(c)``) keep no-lock behaviour.
|
||||||
|
|
||||||
|
A ``ChromaCollection`` built without ``palace_path`` must not touch the
|
||||||
|
lock infrastructure at all. This guards against regressions where a
|
||||||
|
test or third-party caller relies on the historical bare-write path.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
fake = _FakeChromaCollection()
|
||||||
|
col = ChromaCollection(fake) # no palace_path -> no lock
|
||||||
|
|
||||||
|
# Hold the lock in a child process. Without palace_path, the parent
|
||||||
|
# write must still succeed (the lock does not gate this caller).
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
ready = str(tmp_path / "ready")
|
||||||
|
release = str(tmp_path / "release")
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||||
|
holder.start()
|
||||||
|
try:
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock"
|
||||||
|
|
||||||
|
col.upsert(documents=["doc"], ids=["id-1"])
|
||||||
|
assert fake.upserts == [{"documents": ["doc"], "ids": ["id-1"]}]
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_writer_blocks_during_mine(tmp_path, monkeypatch):
|
||||||
|
"""A held ``mine_palace_lock`` causes ``ChromaCollection`` writes to raise.
|
||||||
|
|
||||||
|
This is the property that closes the MCP-bypass gap: when a mine is in
|
||||||
|
flight, MCP/direct writes raise ``MineAlreadyRunning`` rather than
|
||||||
|
silently entering chromadb's write path concurrent with mine.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
ready = str(tmp_path / "ready")
|
||||||
|
release = str(tmp_path / "release")
|
||||||
|
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||||
|
holder.start()
|
||||||
|
try:
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock"
|
||||||
|
|
||||||
|
fake = _FakeChromaCollection()
|
||||||
|
col = ChromaCollection(fake, palace_path=palace)
|
||||||
|
|
||||||
|
with pytest.raises(MineAlreadyRunning):
|
||||||
|
col.upsert(documents=["doc"], ids=["id-1"])
|
||||||
|
with pytest.raises(MineAlreadyRunning):
|
||||||
|
col.add(documents=["doc"], ids=["id-2"])
|
||||||
|
with pytest.raises(MineAlreadyRunning):
|
||||||
|
col.update(ids=["id-3"], documents=["doc"])
|
||||||
|
with pytest.raises(MineAlreadyRunning):
|
||||||
|
col.delete(ids=["id-4"])
|
||||||
|
|
||||||
|
# The fake must have received NO calls — the lock must gate
|
||||||
|
# before reaching the underlying chromadb layer.
|
||||||
|
assert fake.upserts == []
|
||||||
|
assert fake.adds == []
|
||||||
|
assert fake.updates == []
|
||||||
|
assert fake.deletes == []
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_reentrant_inside_mine_passes_through(tmp_path, monkeypatch):
|
||||||
|
"""``ChromaCollection.upsert`` inside ``mine_palace_lock`` does not deadlock.
|
||||||
|
|
||||||
|
``miner.mine()`` already holds ``mine_palace_lock(palace_path)`` for the
|
||||||
|
full mine pipeline; ``_mine_body`` then calls
|
||||||
|
``collection.upsert(...)``. With the per-thread re-entrant guard in
|
||||||
|
``mine_palace_lock``, the inner acquire is a pass-through and the
|
||||||
|
underlying chromadb call runs immediately.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
fake = _FakeChromaCollection()
|
||||||
|
col = ChromaCollection(fake, palace_path=palace)
|
||||||
|
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
# If the re-entrant guard were missing, this would self-deadlock on
|
||||||
|
# the underlying flock. We rely on pytest-timeout (configured in
|
||||||
|
# pyproject.toml) to enforce this in CI; the assertion just confirms
|
||||||
|
# the call landed.
|
||||||
|
col.upsert(documents=["d"], ids=["i"], metadatas=[{"k": "v"}])
|
||||||
|
col.add(documents=["d2"], ids=["i2"])
|
||||||
|
col.update(ids=["i"], documents=["d-updated"])
|
||||||
|
col.delete(ids=["i2"])
|
||||||
|
|
||||||
|
assert len(fake.upserts) == 1
|
||||||
|
assert len(fake.adds) == 1
|
||||||
|
assert len(fake.updates) == 1
|
||||||
|
assert len(fake.deletes) == 1
|
||||||
|
|
||||||
|
|
||||||
|
class _SlowFakeChromaCollection(_FakeChromaCollection):
|
||||||
|
"""Fake whose write methods hold the caller for ``hold_seconds``.
|
||||||
|
|
||||||
|
Used to keep ``mine_palace_lock`` acquired long enough for a sibling
|
||||||
|
process to contend deterministically.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hold_seconds: float = 0.3):
|
||||||
|
super().__init__()
|
||||||
|
self._hold = hold_seconds
|
||||||
|
|
||||||
|
def upsert(self, **kwargs):
|
||||||
|
time.sleep(self._hold)
|
||||||
|
super().upsert(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def _slow_writer_target(palace_path, tmp_path_str, pid, result_q):
|
||||||
|
"""Subprocess target: try a slow upsert, report ok/busy."""
|
||||||
|
os.environ["HOME"] = tmp_path_str
|
||||||
|
# Fresh import inside child so HOME monkeypatch routes the lock dir.
|
||||||
|
from mempalace.backends.chroma import ChromaCollection as _CC
|
||||||
|
from mempalace.palace import MineAlreadyRunning as _MAR
|
||||||
|
|
||||||
|
fake = _SlowFakeChromaCollection(hold_seconds=0.3)
|
||||||
|
col = _CC(fake, palace_path=palace_path)
|
||||||
|
try:
|
||||||
|
col.upsert(documents=[f"d{pid}"], ids=[f"i{pid}"])
|
||||||
|
result_q.put(("ok", pid))
|
||||||
|
except _MAR:
|
||||||
|
result_q.put(("busy", pid))
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_writers_serialize(tmp_path, monkeypatch):
|
||||||
|
"""Two processes calling ``ChromaCollection.upsert`` against the same
|
||||||
|
palace must be serialized: at most one enters chromadb at a time, the
|
||||||
|
other raises ``MineAlreadyRunning``.
|
||||||
|
|
||||||
|
This is the property that prevents the parallel HNSW insert race that
|
||||||
|
drives #974/#965 — under concurrent MCP write fan-out, exactly one
|
||||||
|
writer reaches chromadb and the rest fail loudly instead of corrupting
|
||||||
|
the index.
|
||||||
|
|
||||||
|
The slow fake holds the lock for 0.3s per writer, large enough for the
|
||||||
|
second process to contend even on slow CI runners.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
result_q = ctx.Queue()
|
||||||
|
|
||||||
|
p1 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q))
|
||||||
|
p2 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q))
|
||||||
|
p1.start()
|
||||||
|
# Tiny stagger so p1 wins the race deterministically; without it the
|
||||||
|
# OS scheduler can pick either, which is also a valid outcome but
|
||||||
|
# makes the assertion brittle on slow CI.
|
||||||
|
time.sleep(0.05)
|
||||||
|
p2.start()
|
||||||
|
p1.join(timeout=5)
|
||||||
|
p2.join(timeout=5)
|
||||||
|
|
||||||
|
outcomes = [result_q.get(timeout=1) for _ in range(2)]
|
||||||
|
statuses = sorted(o[0] for o in outcomes)
|
||||||
|
assert statuses == ["busy", "ok"], f"expected one ok + one busy, got {outcomes}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch):
|
||||||
|
"""``query`` / ``get`` / ``count`` must not be gated by the write lock.
|
||||||
|
|
||||||
|
Read traffic is the dominant workload (semantic search, MCP get, etc.)
|
||||||
|
and serializing it against mine would tank latency for no correctness
|
||||||
|
benefit. This test pins that property: with another process holding
|
||||||
|
the write lock, reads must still complete instantly.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
ready = str(tmp_path / "ready")
|
||||||
|
release = str(tmp_path / "release")
|
||||||
|
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||||
|
holder.start()
|
||||||
|
try:
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock"
|
||||||
|
|
||||||
|
# _FakeChromaCollection doesn't implement query/get/count; we only
|
||||||
|
# need to confirm the wrapper does not call into mine_palace_lock
|
||||||
|
# for reads, which we assert by observing the wrapped methods are
|
||||||
|
# NOT in ChromaCollection's _write_lock path. A direct check via
|
||||||
|
# source inspection is more honest than mocking the entire chroma
|
||||||
|
# surface here.
|
||||||
|
import inspect
|
||||||
|
|
||||||
|
from mempalace.backends.chroma import ChromaCollection as _CC
|
||||||
|
|
||||||
|
for write_attr in ("add", "upsert", "update", "delete"):
|
||||||
|
src = inspect.getsource(getattr(_CC, write_attr))
|
||||||
|
assert "_write_lock" in src, f"{write_attr} should acquire write lock"
|
||||||
|
|
||||||
|
for read_attr in ("query", "get", "count"):
|
||||||
|
method = getattr(_CC, read_attr, None)
|
||||||
|
if method is None:
|
||||||
|
continue
|
||||||
|
src = inspect.getsource(method)
|
||||||
|
assert (
|
||||||
|
"_write_lock" not in src
|
||||||
|
), f"{read_attr} must NOT acquire the write lock (read path)"
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
@@ -135,19 +135,77 @@ def test_different_palaces_dont_conflict(tmp_path, monkeypatch):
|
|||||||
|
|
||||||
|
|
||||||
def test_palace_path_is_normalized(tmp_path, monkeypatch):
|
def test_palace_path_is_normalized(tmp_path, monkeypatch):
|
||||||
"""Relative and absolute forms of the same path must use the same lock."""
|
"""Relative and absolute forms of the same path must use the same lock.
|
||||||
|
|
||||||
|
Cross-process variant: a child holds the absolute form, a relative form
|
||||||
|
in the parent must hash to the same lock key and raise
|
||||||
|
``MineAlreadyRunning``. (The same-thread case is now a re-entrant
|
||||||
|
pass-through by design — see ``test_reentrant_same_thread_passes_through``
|
||||||
|
— so we exercise the normalization invariant across a process boundary
|
||||||
|
where re-entrance does not apply.)
|
||||||
|
"""
|
||||||
monkeypatch.setenv("HOME", str(tmp_path))
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
monkeypatch.chdir(tmp_path)
|
monkeypatch.chdir(tmp_path)
|
||||||
os.makedirs(tmp_path / "palace", exist_ok=True)
|
os.makedirs(tmp_path / "palace", exist_ok=True)
|
||||||
absolute = str(tmp_path / "palace")
|
absolute = str(tmp_path / "palace")
|
||||||
relative = "palace"
|
ready = str(tmp_path / "ready")
|
||||||
|
release = str(tmp_path / "release")
|
||||||
|
|
||||||
# Hold the lock with the absolute form; attempting to re-acquire with
|
ctx = _get_mp_context()
|
||||||
# the relative form (which resolves to the same absolute path) must fail.
|
holder = ctx.Process(target=_hold_lock, args=(absolute, ready, release))
|
||||||
with mine_palace_lock(absolute):
|
holder.start()
|
||||||
|
try:
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock in time"
|
||||||
|
|
||||||
|
# Parent holds CWD = tmp_path so "palace" is the same on-disk dir as
|
||||||
|
# the absolute form. The lock key is sha256(realpath+normcase) so the
|
||||||
|
# two forms must collide.
|
||||||
with pytest.raises(MineAlreadyRunning):
|
with pytest.raises(MineAlreadyRunning):
|
||||||
with mine_palace_lock(relative):
|
with mine_palace_lock("palace"):
|
||||||
pytest.fail("normalized path collision should have raised")
|
pytest.fail("normalized path collision should have raised")
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_reentrant_same_thread_passes_through(tmp_path, monkeypatch):
|
||||||
|
"""Same thread re-acquiring the same palace lock must not deadlock or raise.
|
||||||
|
|
||||||
|
This is the invariant that makes ``ChromaCollection`` write methods (which
|
||||||
|
take ``mine_palace_lock`` for MCP/direct-writer protection) compose with
|
||||||
|
``miner.mine()`` (which already holds the lock for the entire mine
|
||||||
|
pipeline). Without the per-thread re-entrant guard the inner acquire
|
||||||
|
would self-deadlock on the outer flock.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
# Re-enter from the same thread — must yield without raising or hanging.
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
pass
|
||||||
|
# After the inner exits, the outer is still held: confirm via a
|
||||||
|
# subprocess that tries to acquire and reports back.
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
result_q = ctx.Queue()
|
||||||
|
child = ctx.Process(target=_try_acquire_expect_busy, args=(palace, result_q))
|
||||||
|
child.start()
|
||||||
|
child.join(timeout=5)
|
||||||
|
assert (
|
||||||
|
result_q.get(timeout=1) == "busy"
|
||||||
|
), "outer lock should still be held by parent after inner re-entrant exit"
|
||||||
|
|
||||||
|
|
||||||
|
def _try_acquire_expect_busy(palace_path, result_q):
|
||||||
|
"""Helper: try to acquire, push 'busy' (raised) or 'free' (acquired) into queue."""
|
||||||
|
try:
|
||||||
|
with mine_palace_lock(palace_path):
|
||||||
|
result_q.put("free")
|
||||||
|
except MineAlreadyRunning:
|
||||||
|
result_q.put("busy")
|
||||||
|
|
||||||
|
|
||||||
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
|
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
|
||||||
|
|||||||
Reference in New Issue
Block a user