Files

322 lines
12 KiB
Python
Raw Permalink Normal View History

"""Tests for ChromaCollection's palace-write-lock integration.
Closes the gap left by ``mine_palace_lock`` only protecting the
``mempalace mine`` pipeline: MCP/direct writers that call
``ChromaCollection.add/upsert/update/delete`` must also serialize against
mine and against each other to avoid the multi-threaded HNSW corruption
documented in #974/#965.
Property tested:
* ``ChromaCollection(c, palace_path=p)`` wraps every write with
``mine_palace_lock(p)``.
* Writes raise ``MineAlreadyRunning`` when another holder owns the lock
(instead of silently racing into the underlying chromadb call).
* Re-entrant composition with ``miner.mine()`` does not self-deadlock:
``with mine_palace_lock(p): col.upsert(...)`` runs to completion.
* ``ChromaCollection(c)`` (no palace_path) preserves legacy no-lock
behaviour for tests/callers that build the adapter directly without
going through ``ChromaBackend``.
POSIX-only: ``mine_palace_lock`` uses ``fcntl`` on Unix and ``msvcrt`` on
Windows; the contention semantics differ enough that the cross-process
tests are skipped on Windows runners.
"""
from __future__ import annotations
import multiprocessing
import os
import time
import pytest
from mempalace.backends.chroma import ChromaCollection
from mempalace.palace import MineAlreadyRunning, mine_palace_lock
def _get_mp_context():
"""Same start-method picker as test_palace_locks.py."""
start_method = "spawn" if os.name == "nt" else "fork"
return multiprocessing.get_context(start_method)
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
class _FakeChromaCollection:
"""Records calls; never blocks. Stand-in for chromadb.Collection."""
def __init__(self):
self.adds: list[dict] = []
self.upserts: list[dict] = []
self.updates: list[dict] = []
self.deletes: list[dict] = []
def add(self, **kwargs):
self.adds.append(kwargs)
def upsert(self, **kwargs):
self.upserts.append(kwargs)
def update(self, **kwargs):
self.updates.append(kwargs)
def delete(self, **kwargs):
self.deletes.append(kwargs)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int:
"""Acquire ``mine_palace_lock``, signal readiness, wait for release.
Mirrors the helper in ``test_palace_locks.py`` so the contention
semantics match across both test files.
"""
try:
with mine_palace_lock(palace_path):
open(ready_flag, "w").close()
for _ in range(500):
if os.path.exists(release_flag):
return 0
time.sleep(0.01)
return 0
except MineAlreadyRunning:
return 1
# ---------------------------------------------------------------------------
# Tests — opt-in lock wiring
# ---------------------------------------------------------------------------
def test_palace_path_none_skips_lock(tmp_path, monkeypatch):
"""Legacy callers (``ChromaCollection(c)``) keep no-lock behaviour.
A ``ChromaCollection`` built without ``palace_path`` must not touch the
lock infrastructure at all. This guards against regressions where a
test or third-party caller relies on the historical bare-write path.
"""
monkeypatch.setenv("HOME", str(tmp_path))
fake = _FakeChromaCollection()
col = ChromaCollection(fake) # no palace_path -> no lock
# Hold the lock in a child process. Without palace_path, the parent
# write must still succeed (the lock does not gate this caller).
palace = str(tmp_path / "palace")
ready = str(tmp_path / "ready")
release = str(tmp_path / "release")
ctx = _get_mp_context()
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
holder.start()
try:
for _ in range(500):
if os.path.exists(ready):
break
time.sleep(0.01)
assert os.path.exists(ready), "holder failed to acquire lock"
col.upsert(documents=["doc"], ids=["id-1"])
assert fake.upserts == [{"documents": ["doc"], "ids": ["id-1"]}]
finally:
open(release, "w").close()
holder.join(timeout=5)
def test_writer_blocks_during_mine(tmp_path, monkeypatch):
"""A held ``mine_palace_lock`` causes ``ChromaCollection`` writes to raise.
This is the property that closes the MCP-bypass gap: when a mine is in
flight, MCP/direct writes raise ``MineAlreadyRunning`` rather than
silently entering chromadb's write path concurrent with mine.
"""
monkeypatch.setenv("HOME", str(tmp_path))
palace = str(tmp_path / "palace")
ready = str(tmp_path / "ready")
release = str(tmp_path / "release")
ctx = _get_mp_context()
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
holder.start()
try:
for _ in range(500):
if os.path.exists(ready):
break
time.sleep(0.01)
assert os.path.exists(ready), "holder failed to acquire lock"
fake = _FakeChromaCollection()
col = ChromaCollection(fake, palace_path=palace)
with pytest.raises(MineAlreadyRunning):
col.upsert(documents=["doc"], ids=["id-1"])
with pytest.raises(MineAlreadyRunning):
col.add(documents=["doc"], ids=["id-2"])
with pytest.raises(MineAlreadyRunning):
col.update(ids=["id-3"], documents=["doc"])
with pytest.raises(MineAlreadyRunning):
col.delete(ids=["id-4"])
# The fake must have received NO calls — the lock must gate
# before reaching the underlying chromadb layer.
assert fake.upserts == []
assert fake.adds == []
assert fake.updates == []
assert fake.deletes == []
finally:
open(release, "w").close()
holder.join(timeout=5)
def test_reentrant_inside_mine_passes_through(tmp_path, monkeypatch):
"""``ChromaCollection.upsert`` inside ``mine_palace_lock`` does not deadlock.
``miner.mine()`` already holds ``mine_palace_lock(palace_path)`` for the
full mine pipeline; ``_mine_body`` then calls
``collection.upsert(...)``. With the per-thread re-entrant guard in
``mine_palace_lock``, the inner acquire is a pass-through and the
underlying chromadb call runs immediately.
"""
monkeypatch.setenv("HOME", str(tmp_path))
palace = str(tmp_path / "palace")
fake = _FakeChromaCollection()
col = ChromaCollection(fake, palace_path=palace)
with mine_palace_lock(palace):
# If the re-entrant guard were missing, this would self-deadlock on
# the underlying flock. We rely on pytest-timeout (configured in
# pyproject.toml) to enforce this in CI; the assertion just confirms
# the call landed.
col.upsert(documents=["d"], ids=["i"], metadatas=[{"k": "v"}])
col.add(documents=["d2"], ids=["i2"])
col.update(ids=["i"], documents=["d-updated"])
col.delete(ids=["i2"])
assert len(fake.upserts) == 1
assert len(fake.adds) == 1
assert len(fake.updates) == 1
assert len(fake.deletes) == 1
class _SlowFakeChromaCollection(_FakeChromaCollection):
"""Fake whose write methods hold the caller for ``hold_seconds``.
Used to keep ``mine_palace_lock`` acquired long enough for a sibling
process to contend deterministically.
"""
def __init__(self, hold_seconds: float = 0.3):
super().__init__()
self._hold = hold_seconds
def upsert(self, **kwargs):
time.sleep(self._hold)
super().upsert(**kwargs)
def _slow_writer_target(palace_path, tmp_path_str, pid, result_q):
"""Subprocess target: try a slow upsert, report ok/busy."""
os.environ["HOME"] = tmp_path_str
# Fresh import inside child so HOME monkeypatch routes the lock dir.
from mempalace.backends.chroma import ChromaCollection as _CC
from mempalace.palace import MineAlreadyRunning as _MAR
fake = _SlowFakeChromaCollection(hold_seconds=0.3)
col = _CC(fake, palace_path=palace_path)
try:
col.upsert(documents=[f"d{pid}"], ids=[f"i{pid}"])
result_q.put(("ok", pid))
except _MAR:
result_q.put(("busy", pid))
def test_concurrent_writers_serialize(tmp_path, monkeypatch):
"""Two processes calling ``ChromaCollection.upsert`` against the same
palace must be serialized: at most one enters chromadb at a time, the
other raises ``MineAlreadyRunning``.
This is the property that prevents the parallel HNSW insert race that
drives #974/#965 — under concurrent MCP write fan-out, exactly one
writer reaches chromadb and the rest fail loudly instead of corrupting
the index.
The slow fake holds the lock for 0.3s per writer, large enough for the
second process to contend even on slow CI runners.
"""
monkeypatch.setenv("HOME", str(tmp_path))
palace = str(tmp_path / "palace")
ctx = _get_mp_context()
result_q = ctx.Queue()
2026-05-06 01:47:46 -03:00
p1 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q))
p2 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q))
p1.start()
# Tiny stagger so p1 wins the race deterministically; without it the
# OS scheduler can pick either, which is also a valid outcome but
# makes the assertion brittle on slow CI.
time.sleep(0.05)
p2.start()
p1.join(timeout=5)
p2.join(timeout=5)
outcomes = [result_q.get(timeout=1) for _ in range(2)]
statuses = sorted(o[0] for o in outcomes)
2026-05-06 01:47:46 -03:00
assert statuses == ["busy", "ok"], f"expected one ok + one busy, got {outcomes}"
def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch):
"""``query`` / ``get`` / ``count`` must not be gated by the write lock.
Read traffic is the dominant workload (semantic search, MCP get, etc.)
and serializing it against mine would tank latency for no correctness
benefit. This test pins that property: with another process holding
the write lock, reads must still complete instantly.
"""
monkeypatch.setenv("HOME", str(tmp_path))
palace = str(tmp_path / "palace")
ready = str(tmp_path / "ready")
release = str(tmp_path / "release")
ctx = _get_mp_context()
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
holder.start()
try:
for _ in range(500):
if os.path.exists(ready):
break
time.sleep(0.01)
assert os.path.exists(ready), "holder failed to acquire lock"
# _FakeChromaCollection doesn't implement query/get/count; we only
# need to confirm the wrapper does not call into mine_palace_lock
# for reads, which we assert by observing the wrapped methods are
# NOT in ChromaCollection's _write_lock path. A direct check via
# source inspection is more honest than mocking the entire chroma
# surface here.
import inspect
from mempalace.backends.chroma import ChromaCollection as _CC
for write_attr in ("add", "upsert", "update", "delete"):
src = inspect.getsource(getattr(_CC, write_attr))
assert "_write_lock" in src, f"{write_attr} should acquire write lock"
for read_attr in ("query", "get", "count"):
method = getattr(_CC, read_attr, None)
if method is None:
continue
src = inspect.getsource(method)
2026-05-06 01:47:46 -03:00
assert (
"_write_lock" not in src
), f"{read_attr} must NOT acquire the write lock (read path)"
finally:
open(release, "w").close()
holder.join(timeout=5)