Merge pull request #1173 from jphein/fix/quarantine-on-make-client
fix: call quarantine_stale_hnsw() in make_client(); lower threshold to 5min
This commit is contained in:
@@ -46,6 +46,14 @@ def _reset_mcp_cache():
|
||||
mcp_server._collection_cache = None
|
||||
except (ImportError, AttributeError):
|
||||
pass
|
||||
try:
|
||||
# Reset the per-process quarantine gate so tests don't leak
|
||||
# state through ChromaBackend._quarantined_paths.
|
||||
from mempalace.backends.chroma import ChromaBackend
|
||||
|
||||
ChromaBackend._quarantined_paths.clear()
|
||||
except (ImportError, AttributeError):
|
||||
pass
|
||||
|
||||
_clear_cache()
|
||||
yield
|
||||
|
||||
+138
-8
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import chromadb
|
||||
import pytest
|
||||
@@ -384,36 +385,102 @@ def test_fix_blob_seq_ids_noop_without_database(tmp_path):
|
||||
# ── quarantine_stale_hnsw ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_palace_with_segment(tmp_path, hnsw_mtime, sqlite_mtime):
|
||||
"""Helper: build a palace dir with one HNSW segment + sqlite at given mtimes."""
|
||||
# Marker bytes for the chromadb segment metadata file. A complete
|
||||
# write begins with PROTO opcode (0x80) and ends with STOP opcode
|
||||
# (0x2e); _segment_appears_healthy sniffs these bytes without parsing
|
||||
# the file.
|
||||
_HEALTHY_META = b"\x80\x04" + b"\x00" * 32 + b"\x2e"
|
||||
_CORRUPT_META = b"\x00" * 64
|
||||
|
||||
|
||||
def _make_palace_with_segment(tmp_path, hnsw_mtime, sqlite_mtime, meta_bytes=_HEALTHY_META):
|
||||
"""Helper: build a palace dir with one HNSW segment + sqlite at given
|
||||
mtimes. ``meta_bytes`` controls whether the segment looks healthy
|
||||
(default), corrupt (``_CORRUPT_META``), or has no metadata file at
|
||||
all (``None``)."""
|
||||
palace = tmp_path / "palace"
|
||||
palace.mkdir()
|
||||
(palace / "chroma.sqlite3").write_text("")
|
||||
seg = palace / "abcd-1234-5678"
|
||||
seg.mkdir()
|
||||
(seg / "data_level0.bin").write_text("")
|
||||
if meta_bytes is not None:
|
||||
(seg / "index_metadata.pickle").write_bytes(meta_bytes)
|
||||
os.utime(seg / "data_level0.bin", (hnsw_mtime, hnsw_mtime))
|
||||
os.utime(palace / "chroma.sqlite3", (sqlite_mtime, sqlite_mtime))
|
||||
return palace, seg
|
||||
|
||||
|
||||
def test_quarantine_stale_hnsw_renames_drifted_segment(tmp_path):
|
||||
"""Segment whose data_level0.bin is 2h older than sqlite gets renamed."""
|
||||
def test_quarantine_stale_hnsw_renames_corrupt_segment(tmp_path):
|
||||
"""Segment with stale mtime AND a malformed metadata file gets renamed."""
|
||||
now = 1_700_000_000.0
|
||||
palace, seg = _make_palace_with_segment(tmp_path, hnsw_mtime=now - 7200, sqlite_mtime=now)
|
||||
palace, seg = _make_palace_with_segment(
|
||||
tmp_path,
|
||||
hnsw_mtime=now - 7200,
|
||||
sqlite_mtime=now,
|
||||
meta_bytes=_CORRUPT_META,
|
||||
)
|
||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||
assert len(moved) == 1
|
||||
assert ".drift-" in moved[0]
|
||||
assert not seg.exists()
|
||||
# the renamed directory still exists and contains the original file
|
||||
renamed = list(palace.iterdir())
|
||||
drift_dirs = [p for p in renamed if ".drift-" in p.name]
|
||||
assert len(drift_dirs) == 1
|
||||
assert (drift_dirs[0] / "data_level0.bin").exists()
|
||||
|
||||
|
||||
def test_quarantine_stale_hnsw_leaves_healthy_segment_with_drift_alone(tmp_path):
|
||||
"""Segment with stale mtime but a complete metadata file is NOT
|
||||
renamed — this is the chromadb-1.5.x async-flush steady state, not
|
||||
corruption. Production case at 06:24 PDT 2026-04-26: cold-start
|
||||
quarantine renamed three healthy segments after a clean shutdown,
|
||||
leaving 151K-drawer palace with vector_ranked=0."""
|
||||
now = 1_700_000_000.0
|
||||
palace, seg = _make_palace_with_segment(
|
||||
tmp_path,
|
||||
hnsw_mtime=now - 7200,
|
||||
sqlite_mtime=now,
|
||||
meta_bytes=_HEALTHY_META,
|
||||
)
|
||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||
assert moved == []
|
||||
assert seg.exists()
|
||||
|
||||
|
||||
def test_quarantine_stale_hnsw_leaves_segment_without_metadata_alone(tmp_path):
|
||||
"""Segment with no metadata file is treated as fresh / never-flushed
|
||||
and not quarantined — renaming an empty dir orphans nothing."""
|
||||
now = 1_700_000_000.0
|
||||
palace, seg = _make_palace_with_segment(
|
||||
tmp_path,
|
||||
hnsw_mtime=now - 7200,
|
||||
sqlite_mtime=now,
|
||||
meta_bytes=None,
|
||||
)
|
||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||
assert moved == []
|
||||
assert seg.exists()
|
||||
|
||||
|
||||
def test_quarantine_stale_hnsw_renames_truncated_metadata(tmp_path):
|
||||
"""Segment with a truncated (under-floor-size) metadata file is
|
||||
quarantined — shape of a partial-flush during process kill."""
|
||||
now = 1_700_000_000.0
|
||||
palace, seg = _make_palace_with_segment(
|
||||
tmp_path,
|
||||
hnsw_mtime=now - 7200,
|
||||
sqlite_mtime=now,
|
||||
meta_bytes=b"\x80\x04",
|
||||
)
|
||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||
assert len(moved) == 1
|
||||
assert ".drift-" in moved[0]
|
||||
|
||||
|
||||
def test_quarantine_stale_hnsw_leaves_fresh_segment_alone(tmp_path):
|
||||
"""Segment with recent mtime vs sqlite is not touched."""
|
||||
"""Segment with recent mtime vs sqlite is not touched (mtime gate
|
||||
short-circuits before integrity gate)."""
|
||||
now = 1_700_000_000.0
|
||||
palace, seg = _make_palace_with_segment(tmp_path, hnsw_mtime=now - 10, sqlite_mtime=now)
|
||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||
@@ -446,7 +513,70 @@ def test_quarantine_stale_hnsw_skips_already_quarantined(tmp_path):
|
||||
assert drift.exists()
|
||||
|
||||
|
||||
# ── _pin_hnsw_threads ─────────────────────────────────────────────────────
|
||||
# ── make_client cold-start gate ──────────────────────────────────────────
|
||||
|
||||
|
||||
def test_make_client_quarantines_only_on_first_call_per_palace(tmp_path, monkeypatch):
|
||||
"""Quarantine fires on first ``make_client()`` for a palace, then is
|
||||
skipped on subsequent calls — prevents runtime thrash where a daemon's
|
||||
own steady writes bump ``chroma.sqlite3`` faster than HNSW flushes,
|
||||
making the mtime heuristic falsely trigger every reconnect."""
|
||||
from mempalace.backends.chroma import ChromaBackend
|
||||
|
||||
palace_path = str(tmp_path / "palace")
|
||||
os.makedirs(palace_path, exist_ok=True)
|
||||
(Path(palace_path) / "chroma.sqlite3").write_text("")
|
||||
|
||||
# Reset the per-process cache so this test is independent of others.
|
||||
monkeypatch.setattr(ChromaBackend, "_quarantined_paths", set())
|
||||
|
||||
calls: list[str] = []
|
||||
|
||||
def _spy(path, stale_seconds=300.0):
|
||||
calls.append(path)
|
||||
return []
|
||||
|
||||
monkeypatch.setattr("mempalace.backends.chroma.quarantine_stale_hnsw", _spy)
|
||||
|
||||
ChromaBackend.make_client(palace_path)
|
||||
ChromaBackend.make_client(palace_path)
|
||||
ChromaBackend.make_client(palace_path)
|
||||
|
||||
assert calls == [
|
||||
palace_path
|
||||
], "quarantine_stale_hnsw should fire once per palace per process, not on every reconnect"
|
||||
|
||||
|
||||
def test_make_client_quarantines_each_palace_independently(tmp_path, monkeypatch):
|
||||
"""Two distinct palaces each get one quarantine attempt — the gate is
|
||||
keyed by palace path, not global."""
|
||||
from mempalace.backends.chroma import ChromaBackend
|
||||
|
||||
palace_a = str(tmp_path / "palace_a")
|
||||
palace_b = str(tmp_path / "palace_b")
|
||||
for p in (palace_a, palace_b):
|
||||
os.makedirs(p, exist_ok=True)
|
||||
(Path(p) / "chroma.sqlite3").write_text("")
|
||||
|
||||
monkeypatch.setattr(ChromaBackend, "_quarantined_paths", set())
|
||||
|
||||
calls: list[str] = []
|
||||
|
||||
def _spy(path, stale_seconds=300.0):
|
||||
calls.append(path)
|
||||
return []
|
||||
|
||||
monkeypatch.setattr("mempalace.backends.chroma.quarantine_stale_hnsw", _spy)
|
||||
|
||||
ChromaBackend.make_client(palace_a)
|
||||
ChromaBackend.make_client(palace_b)
|
||||
ChromaBackend.make_client(palace_a) # already gated
|
||||
ChromaBackend.make_client(palace_b) # already gated
|
||||
|
||||
assert calls == [palace_a, palace_b]
|
||||
|
||||
|
||||
# ── _pin_hnsw_threads (per-process retrofit, separate from this PR's gate) ──
|
||||
|
||||
|
||||
def test_pin_hnsw_threads_retrofits_legacy_collection(tmp_path):
|
||||
|
||||
Reference in New Issue
Block a user