From 0d349c3d868fb28db98764c7bbb7eb88a2d9ebd1 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Sun, 26 Apr 2026 19:54:00 -0300 Subject: [PATCH] fix(repair): detect HNSW capacity divergence and fall back to BM25 (#1222) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When chromadb's HNSW segment freezes at a stale max_elements while sqlite keeps accumulating embeddings, the next chromadb open segfaults the MCP server on every tool call. Adds a pure-filesystem capacity probe (zero chromadb interaction), a `mempalace repair-status` read-only health check, and a BM25-only sqlite fallback so the palace stays reachable even when vector search is unavailable. * `hnsw_capacity_status` reads sqlite + index_metadata.pickle directly via a tight-allowlist unpickler — no hnswlib import, no segment load. * MCP server runs the probe at startup and after every reconnect; sets `_vector_disabled` and routes search to the sqlite FTS5 + BM25 path. * `tool_status` and `tool_reconnect` surface the fallback state. * Threshold tuned for chromadb 1.5.x async-flush lag (2× sync_threshold). --- mempalace/backends/chroma.py | 266 +++++++++++++++++++++++++ mempalace/cli.py | 15 ++ mempalace/mcp_server.py | 118 +++++++++++- mempalace/repair.py | 65 ++++++- mempalace/searcher.py | 168 ++++++++++++++++ tests/test_hnsw_capacity.py | 363 +++++++++++++++++++++++++++++++++++ 6 files changed, 988 insertions(+), 7 deletions(-) create mode 100644 tests/test_hnsw_capacity.py diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index 13976a2..ad7748f 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -211,6 +211,272 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 300.0) -> lis return moved +def _vector_segment_id(palace_path: str, collection_name: str) -> Optional[str]: + """Return the VECTOR segment UUID for ``collection_name`` or ``None``. + + Reads ``chroma.sqlite3`` directly so we never have to load a segment + that may segfault on open (#1222 is exactly this case). + """ + db_path = os.path.join(palace_path, "chroma.sqlite3") + if not os.path.isfile(db_path): + return None + try: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + try: + row = conn.execute( + """ + SELECT s.id + FROM segments s + JOIN collections c ON s.collection = c.id + WHERE c.name = ? AND s.scope = 'VECTOR' + LIMIT 1 + """, + (collection_name,), + ).fetchone() + return row[0] if row else None + finally: + conn.close() + except sqlite3.Error: + return None + + +class _PersistentDataStub: + """Minimal stand-in for chromadb's ``PersistentData`` during safe unpickling. + + Accepts any constructor args so pickle's REDUCE opcode succeeds, + captures ``__setstate__`` into ``__dict__``. Only used by + :func:`_hnsw_element_count` — never persisted, never re-pickled. + """ + + def __init__(self, *args, **kwargs): + # Some chromadb versions pickle PersistentData by passing init args + # positionally via REDUCE. We don't care about reconstructing the + # object faithfully — we only need the id_to_label dict — so swallow + # all positional args and re-expose the relevant attributes via + # __setstate__ or __dict__ population further down. + pass + + def __setstate__(self, state): + if isinstance(state, dict): + self.__dict__.update(state) + elif isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], dict): + # (slot_state, dict_state) two-tuple form — only the dict part has + # the named attributes we care about. + self.__dict__.update(state[1]) + + +class _SafePersistentDataUnpickler: + """Whitelist-only unpickler for ``index_metadata.pickle``. + + Allows only ``PersistentData`` from chromadb's HNSW module; everything + else raises ``UnpicklingError``. Standard container types (dict, list, + tuple, str, int, float) are handled by the pickle machinery itself + and don't need allowlisting via ``find_class`` — only constructed + classes do. + + This is the same trust model chromadb uses (it pickles its own files), + but with a tight class allowlist so a tampered file can't instantiate + arbitrary classes during deserialization. + """ + + _ALLOWED = frozenset( + { + ( + "chromadb.segment.impl.vector.local_persistent_hnsw", + "PersistentData", + ), + } + ) + + @classmethod + def load(cls, path: str): + import pickle + + class _Restricted(pickle.Unpickler): + def find_class(self, module: str, name: str): + if (module, name) in cls._ALLOWED: + return _PersistentDataStub + raise pickle.UnpicklingError(f"disallowed class: {module}.{name}") + + with open(path, "rb") as f: + return _Restricted(f).load() + + +def _hnsw_element_count(palace_path: str, segment_id: str) -> Optional[int]: + """Return the element count chromadb thinks the HNSW segment holds. + + Reads ``index_metadata.pickle`` via a tight-allowlist unpickler and + counts ``id_to_label`` entries. This is the count chromadb consults + when sizing/loading the HNSW index on next open — distinct from + hnswlib's internal ``cur_element_count`` in the binary files. For + #1222's divergence check this is the number that matters, because it + is what gets compared against ``count() * resize_factor`` when + chromadb decides whether to resize HNSW on load. + + Uses :class:`_SafePersistentDataUnpickler` rather than chromadb's own + ``PersistentData.load_from_file`` so the probe works even when + ``hnswlib`` is not installed (chromadb's persistent_hnsw module + imports hnswlib at module load — a probe that requires hnswlib would + refuse to run in environments where the segfault risk is moot + anyway). The allowlisted unpickler is also the safer default: the + pickle file is owned by the same user, but a tighter trust boundary + costs us nothing. + + Returns ``None`` when the file is absent (fresh / never-flushed + segment) or the unpickle fails. Callers treat ``None`` as "unknown". + """ + pickle_path = os.path.join(palace_path, segment_id, "index_metadata.pickle") + if not os.path.isfile(pickle_path): + return None + try: + pd = _SafePersistentDataUnpickler.load(pickle_path) + # ChromaDB serializes PersistentData differently across versions: + # 1.5.x writes a plain dict via ``__reduce_ex__``; older versions + # pickled the class instance and rely on ``__setstate__`` to + # populate ``__dict__``. Handle both shapes. + if isinstance(pd, dict): + id_to_label = pd.get("id_to_label") + else: + id_to_label = getattr(pd, "id_to_label", None) + if isinstance(id_to_label, dict): + return len(id_to_label) + return None + except Exception: + logger.debug("_hnsw_element_count failed for %s", pickle_path, exc_info=True) + return None + + +# Divergence threshold: chromadb's HNSW flushes asynchronously, so HNSW +# typically lags sqlite by up to ``sync_threshold`` (default 1000) records +# under active write load — that's the *brute-force batch* that hasn't +# been compacted into HNSW yet, plus the un-persisted tail beyond the +# last sync. Two synchronization windows worth (2 × sync_threshold = 2000) +# is a safe steady-state ceiling; anything past that is real divergence, +# not flush-lag. +# +# The #1222 case was 176 613 missing out of 192 997 (91% gone) — orders +# of magnitude past 2000. A typical post-mine palace shows a few hundred +# to ~1000 missing, well under threshold. +_HNSW_DIVERGENCE_ABSOLUTE = 2000 +_HNSW_DIVERGENCE_FRACTION = 0.10 + + +def hnsw_capacity_status(palace_path: str, collection_name: str = "mempalace_drawers") -> dict: + """Compare sqlite embedding count against HNSW element count. + + The #1222 failure mode: ``max_elements`` froze at 16 384 while sqlite + accumulated 192 997 embeddings. Every subsequent tool call segfaulted + when chromadb tried to load the undersized HNSW. This probe runs + *before* anything touches the segment so we can warn (or fall back to + BM25) instead of crashing. + + Returns a dict with: + + * ``segment_id`` — VECTOR segment UUID, or ``None`` if no palace + * ``sqlite_count`` — embeddings present in chroma.sqlite3 + * ``hnsw_count`` — elements chromadb's pickle knows about + * ``divergence`` — ``sqlite_count - hnsw_count`` when both known + * ``diverged`` — True when divergence exceeds the threshold + * ``status`` — ``"ok"`` | ``"diverged"`` | ``"unknown"`` + * ``message`` — human-readable summary + + Never raises — a probe that throws would defeat the point. + """ + out: dict[str, Any] = { + "segment_id": None, + "sqlite_count": None, + "hnsw_count": None, + "divergence": None, + "diverged": False, + "status": "unknown", + "message": "", + } + + try: + seg_id = _vector_segment_id(palace_path, collection_name) + out["segment_id"] = seg_id + + sqlite_count = _sqlite_embedding_count(palace_path, collection_name) + out["sqlite_count"] = sqlite_count + + if seg_id is None or sqlite_count is None: + out["message"] = "palace state unreadable; skipping HNSW capacity check" + return out + + hnsw_count = _hnsw_element_count(palace_path, seg_id) + out["hnsw_count"] = hnsw_count + + if hnsw_count is None: + # No pickle yet — segment hasn't persisted metadata. Could be + # fresh-but-unflushed (normal) or interrupted-mid-flush (bad). + # We can't distinguish without the pickle, so only flag + # divergence when sqlite holds clearly more than two flush + # windows worth — same threshold as the with-pickle path. + if sqlite_count > _HNSW_DIVERGENCE_ABSOLUTE: + out["status"] = "diverged" + out["diverged"] = True + out["divergence"] = sqlite_count + out["message"] = ( + f"sqlite holds {sqlite_count:,} embeddings but the HNSW segment " + "has never flushed metadata — vector search will return nothing " + "until the segment is rebuilt. Run `mempalace repair`." + ) + else: + out["message"] = "HNSW segment metadata not yet flushed; skipping" + return out + + divergence = sqlite_count - hnsw_count + out["divergence"] = divergence + threshold = max(_HNSW_DIVERGENCE_ABSOLUTE, int(sqlite_count * _HNSW_DIVERGENCE_FRACTION)) + if divergence > threshold: + out["status"] = "diverged" + out["diverged"] = True + pct = 100.0 * divergence / max(sqlite_count, 1) + out["message"] = ( + f"HNSW index holds {hnsw_count:,} elements but sqlite has " + f"{sqlite_count:,} embeddings — {divergence:,} drawers ({pct:.0f}%) " + "are invisible to vector search. Run `mempalace repair` to rebuild." + ) + else: + out["status"] = "ok" + out["message"] = ( + f"HNSW {hnsw_count:,} / sqlite {sqlite_count:,} (within flush-lag tolerance)" + ) + except Exception: + logger.debug("hnsw_capacity_status failed", exc_info=True) + out["message"] = "HNSW capacity probe raised; skipping" + return out + + +def _sqlite_embedding_count(palace_path: str, collection_name: str) -> Optional[int]: + """Count rows in chroma.sqlite3.embeddings for ``collection_name``. + + Mirrors :func:`mempalace.repair.sqlite_drawer_count` but kept in this + module so the backend probe doesn't pull in the repair CLI module. + """ + db_path = os.path.join(palace_path, "chroma.sqlite3") + if not os.path.isfile(db_path): + return None + try: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + try: + row = conn.execute( + """ + SELECT COUNT(*) + FROM embeddings e + JOIN segments s ON e.segment_id = s.id + JOIN collections c ON s.collection = c.id + WHERE c.name = ? + """, + (collection_name,), + ).fetchone() + return int(row[0]) if row and row[0] is not None else None + finally: + conn.close() + except sqlite3.Error: + return None + + def _pin_hnsw_threads(collection) -> None: """Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection. diff --git a/mempalace/cli.py b/mempalace/cli.py index 51e3109..80ac9b0 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -607,6 +607,14 @@ def cmd_status(args): status(palace_path=palace_path) +def cmd_repair_status(args): + """Read-only HNSW capacity health check (#1222).""" + from .repair import status as repair_status + + palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path + repair_status(palace_path=palace_path) + + def cmd_repair(args): """Rebuild palace vector index from SQLite metadata.""" import shutil @@ -1125,6 +1133,12 @@ def main(): ), ) + # repair-status — read-only HNSW capacity health check (#1222) + sub.add_parser( + "repair-status", + help="Compare sqlite vs HNSW element counts (read-only; never opens a chromadb client)", + ) + # mcp sub.add_parser( "mcp", @@ -1181,6 +1195,7 @@ def main(): "compress": cmd_compress, "wake-up": cmd_wakeup, "repair": cmd_repair, + "repair-status": cmd_repair_status, "migrate": cmd_migrate, "status": cmd_status, } diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 9c87708..e016202 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -57,7 +57,12 @@ from .config import ( # noqa: E402 sanitize_content, ) from .version import __version__ # noqa: E402 -from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402 +from .backends.chroma import ( # noqa: E402 + ChromaBackend, + ChromaCollection, + _pin_hnsw_threads, + hnsw_capacity_status, +) from .query_sanitizer import sanitize_query # noqa: E402 from .searcher import search_memories # noqa: E402 from .palace_graph import ( # noqa: E402 @@ -108,6 +113,52 @@ _collection_cache = None _palace_db_inode = 0 # inode of chroma.sqlite3 at cache time _palace_db_mtime = 0.0 # mtime of chroma.sqlite3 at cache time +# ── Vector-search disabled flag (#1222) ────────────────────────────────── +# Set when ``hnsw_capacity_status`` reports a divergence between sqlite +# and the HNSW segment large enough that chromadb would segfault on +# segment load. While this is set, vector-shaped tools (``search``, +# ``check_duplicate``) route to the sqlite-only BM25 fallback in +# :func:`mempalace.searcher._bm25_only_via_sqlite`. Cleared after a +# successful repair via :func:`tool_reconnect` (which re-runs the probe). +_vector_disabled = False +_vector_disabled_reason = "" +_vector_capacity_status: dict | None = None + + +def _refresh_vector_disabled_flag() -> None: + """Re-run the HNSW capacity probe and update the module-level flag. + + Called from :func:`_get_client` whenever the client cache is rebuilt + (first open or palace replacement). Cheap — pure sqlite + pickle + read, no chromadb interaction. Never raises: a probe that crashes + would defeat the point. + """ + global _vector_disabled, _vector_disabled_reason, _vector_capacity_status + try: + info = hnsw_capacity_status(_config.palace_path, "mempalace_drawers") + except Exception: + logger.debug("HNSW capacity probe raised", exc_info=True) + return + _vector_capacity_status = info + if info.get("diverged"): + if not _vector_disabled: + logger.warning( + "HNSW capacity divergence detected (%s) — routing search to " + "BM25-only sqlite fallback. Run `mempalace repair rebuild` to restore " + "vector search.", + info.get("message", "unknown"), + ) + _vector_disabled = True + _vector_disabled_reason = info.get("message", "") + else: + if _vector_disabled: + logger.info( + "HNSW capacity within tolerance (%s) — vector search re-enabled", + info.get("message", ""), + ) + _vector_disabled = False + _vector_disabled_reason = "" + # ==================== WRITE-AHEAD LOG ==================== # Every write operation is logged to a JSONL file before execution. @@ -202,6 +253,11 @@ def _get_client(): mtime_changed = current_mtime != 0.0 and abs(current_mtime - _palace_db_mtime) > 0.01 if _client_cache is None or inode_changed or mtime_changed: + # Run the HNSW capacity probe BEFORE chromadb opens the segment — + # if the index is severely undersized, segment load can segfault + # the whole MCP server (#1222). The probe is pure sqlite + + # metadata-pickle read; never touches the HNSW binary files. + _refresh_vector_disabled_flag() _client_cache = ChromaBackend.make_client(_config.palace_path) _collection_cache = None _metadata_cache = None @@ -322,6 +378,17 @@ def tool_status(): "protocol": PALACE_PROTOCOL, "aaak_dialect": AAAK_SPEC, } + if _vector_disabled: + # Surface the #1222 fallback state so the AI knows search results + # are BM25-only and recommends the operator run repair. + result["vector_disabled"] = True + result["vector_disabled_reason"] = _vector_disabled_reason + if _vector_capacity_status: + result["hnsw_capacity"] = { + "sqlite_count": _vector_capacity_status.get("sqlite_count"), + "hnsw_count": _vector_capacity_status.get("hnsw_count"), + "divergence": _vector_capacity_status.get("divergence"), + } try: all_meta = _get_cached_metadata(col) for m in all_meta: @@ -456,6 +523,9 @@ def tool_search( dist = (1.0 - min_similarity) if min_similarity is not None else max_distance # Mitigate system prompt contamination (Issue #333) sanitized = sanitize_query(query) + # Ensure the capacity probe has been run at least once before we + # decide which path to take — _get_client triggers it on first open. + _get_client() result = search_memories( sanitized["clean_query"], palace_path=_config.palace_path, @@ -463,7 +533,11 @@ def tool_search( room=room, n_results=limit, max_distance=dist, + vector_disabled=_vector_disabled, ) + if _vector_disabled: + result["vector_disabled"] = True + result["vector_disabled_reason"] = _vector_disabled_reason # Attach sanitizer metadata for transparency if sanitized["was_sanitized"]: result["query_sanitized"] = True @@ -482,6 +556,21 @@ def tool_check_duplicate(content: str, threshold: float = 0.9): col = _get_collection() if not col: return _no_palace() + if _vector_disabled: + # Without a usable HNSW we can't compute cosine similarity for + # near-duplicate detection. Report the limitation rather than + # silently returning "not a duplicate" — false negatives here + # would let the AI re-file content the palace already holds. + return { + "is_duplicate": False, + "matches": [], + "vector_disabled": True, + "vector_disabled_reason": _vector_disabled_reason, + "hint": ( + "duplicate detection requires vector search; run " + "`mempalace repair rebuild` to restore" + ), + } try: results = col.query( query_texts=[content], @@ -1150,10 +1239,22 @@ def tool_reconnect(): Use after external scripts or CLI commands modify the palace database directly, which can leave the in-memory HNSW index stale. """ - global _collection_cache, _palace_db_inode, _palace_db_mtime + global \ + _client_cache, \ + _collection_cache, \ + _palace_db_inode, \ + _palace_db_mtime, \ + _vector_disabled, \ + _vector_disabled_reason + _client_cache = None _collection_cache = None _palace_db_inode = 0 _palace_db_mtime = 0.0 + # Force probe re-run on next _get_client by clearing the flag now; + # _refresh_vector_disabled_flag will re-set it if the divergence + # still applies after the reconnect. + _vector_disabled = False + _vector_disabled_reason = "" try: col = _get_collection() if col is None: @@ -1161,8 +1262,15 @@ def tool_reconnect(): "success": False, "message": "No palace found after reconnect", "drawers": 0, + "vector_disabled": _vector_disabled, } - return {"success": True, "message": "Reconnected to palace", "drawers": col.count()} + return { + "success": True, + "message": "Reconnected to palace", + "drawers": col.count(), + "vector_disabled": _vector_disabled, + "vector_disabled_reason": _vector_disabled_reason, + } except Exception as e: return {"success": False, "error": str(e)} @@ -1726,6 +1834,10 @@ def _restore_stdout(): def main(): _restore_stdout() logger.info("MemPalace MCP Server starting...") + # Pre-flight: probe HNSW capacity before any tool call so the warning + # is visible at startup rather than on first use (#1222). Pure + # filesystem read; never opens a chromadb client. + _refresh_vector_disabled_flag() while True: try: line = sys.stdin.readline() diff --git a/mempalace/repair.py b/mempalace/repair.py index a75ce35..4af32df 100644 --- a/mempalace/repair.py +++ b/mempalace/repair.py @@ -6,8 +6,9 @@ When ChromaDB's HNSW index accumulates duplicate entries (from repeated add() calls with the same ID), link_lists.bin can grow unbounded — terabytes on large palaces — eventually causing segfaults. -This module provides three operations: +This module provides four operations: + status — compare sqlite vs HNSW element counts (read-only health check) scan — find every corrupt/unfetchable ID in the palace prune — delete only the corrupt IDs (surgical) rebuild — extract all drawers, delete the collection, recreate with @@ -17,6 +18,7 @@ The rebuild backs up ONLY chroma.sqlite3 (the source of truth), not the full palace directory — so it works even when link_lists.bin is bloated. Usage (standalone): + python -m mempalace.repair status python -m mempalace.repair scan [--wing X] python -m mempalace.repair prune --confirm python -m mempalace.repair rebuild @@ -32,7 +34,7 @@ import os import shutil import time -from .backends.chroma import ChromaBackend +from .backends.chroma import ChromaBackend, hnsw_capacity_status COLLECTION_NAME = "mempalace_drawers" @@ -431,9 +433,62 @@ def rebuild_index(palace_path=None, confirm_truncation_ok: bool = False): print(f"\n{'=' * 55}\n") +def status(palace_path=None) -> dict: + """Read-only health check: compare sqlite vs HNSW element counts. + + Catches the #1222 failure mode where chromadb's HNSW segment freezes + at a stale ``max_elements`` while sqlite keeps accumulating rows. + Once the divergence is large enough, every tool call segfaults when + chromadb tries to load the undersized HNSW. Running ``mempalace + repair status`` *before* opening the segment lets the operator + discover the problem without crashing the MCP server. + + The check itself never opens a chromadb client and never imports + hnswlib — it reads ``chroma.sqlite3`` and ``index_metadata.pickle`` + directly via :func:`mempalace.backends.chroma.hnsw_capacity_status`. + + Returns the capacity-status dict (also printed). Returns a dict with + ``status="unknown"`` when no palace exists at the given path. + """ + palace_path = palace_path or _get_palace_path() + print(f"\n{'=' * 55}") + print(" MemPalace Repair — Status") + print(f"{'=' * 55}\n") + print(f" Palace: {palace_path}") + + if not os.path.isdir(palace_path): + print(" No palace found.\n") + return {"status": "unknown", "message": "no palace at path"} + + drawers = hnsw_capacity_status(palace_path, "mempalace_drawers") + closets = hnsw_capacity_status(palace_path, "mempalace_closets") + + for label, info in (("drawers", drawers), ("closets", closets)): + print(f"\n [{label}]") + if info["sqlite_count"] is None: + print(" sqlite count: (unreadable)") + else: + print(f" sqlite count: {info['sqlite_count']:,}") + if info["hnsw_count"] is None: + print(" hnsw count: (no flushed metadata yet)") + else: + print(f" hnsw count: {info['hnsw_count']:,}") + if info["divergence"] is not None: + print(f" divergence: {info['divergence']:,}") + marker = "DIVERGED" if info["diverged"] else info["status"].upper() + print(f" status: {marker}") + if info["message"]: + print(f" note: {info['message']}") + + if drawers["diverged"] or closets["diverged"]: + print("\n Recommended: run `mempalace repair rebuild` to rebuild the index.") + print() + return {"drawers": drawers, "closets": closets} + + if __name__ == "__main__": p = argparse.ArgumentParser(description="MemPalace repair tools") - p.add_argument("command", choices=["scan", "prune", "rebuild"]) + p.add_argument("command", choices=["status", "scan", "prune", "rebuild"]) p.add_argument("--palace", default=None, help="Palace directory path") p.add_argument("--wing", default=None, help="Scan only this wing") p.add_argument("--confirm", action="store_true", help="Actually delete corrupt IDs") @@ -441,7 +496,9 @@ if __name__ == "__main__": path = os.path.expanduser(args.palace) if args.palace else None - if args.command == "scan": + if args.command == "status": + status(palace_path=path) + elif args.command == "scan": scan_palace(palace_path=path, only_wing=args.wing) elif args.command == "prune": prune_corrupt(palace_path=path, confirm=args.confirm) diff --git a/mempalace/searcher.py b/mempalace/searcher.py index c2fcdb4..bee8509 100644 --- a/mempalace/searcher.py +++ b/mempalace/searcher.py @@ -11,7 +11,9 @@ hide drawers the direct path would have found. import logging import math +import os import re +import sqlite3 from pathlib import Path from .palace import get_closets_collection, get_collection @@ -363,6 +365,158 @@ def search(query: str, palace_path: str, wing: str = None, room: str = None, n_r print() +def _bm25_only_via_sqlite( + query: str, + palace_path: str, + wing: str = None, + room: str = None, + n_results: int = 5, + max_candidates: int = 500, +) -> dict: + """BM25-only search reading drawers directly from chroma.sqlite3. + + Used when HNSW is diverged or unloadable (#1222). Bypasses chromadb's + Python client entirely so a corrupt vector segment can't segfault the + MCP server. Routes through chromadb's own FTS5 trigram index + (``embedding_fulltext_search``) for candidate selection, then re-ranks + with the same Okapi-BM25 used in :func:`_hybrid_rank` so the result + shape matches the vector path. + + The query is split into ≥3-char trigram-tokens and OR-joined for the + FTS5 MATCH — chromadb writes the index with ``tokenize='trigram'``, + so single-character tokens never match. When no usable token survives + (e.g. "is a"), candidate selection falls back to the most-recent + ``max_candidates`` rows so we still return *something* rather than + nothing. + """ + db_path = os.path.join(palace_path, "chroma.sqlite3") + if not os.path.isfile(db_path): + return { + "error": "No palace found", + "hint": "Run: mempalace init && mempalace mine ", + } + + try: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + except sqlite3.Error as e: + return {"error": f"sqlite open failed: {e}"} + + try: + # FTS5 MATCH expects whitespace-separated tokens. Drop tokens + # shorter than 3 chars (trigram tokenizer can't match them). + tokens = [t for t in _tokenize(query) if len(t) >= 3] + candidate_ids: list[int] = [] + if tokens: + fts_query = " OR ".join(tokens) + try: + rows = conn.execute( + """ + SELECT rowid + FROM embedding_fulltext_search + WHERE embedding_fulltext_search MATCH ? + LIMIT ? + """, + (fts_query, max_candidates), + ).fetchall() + candidate_ids = [r[0] for r in rows] + except sqlite3.Error: + # FTS5 tokenizer mismatch or syntax error — fall through + # to the recency-window selector below. + logger.debug("FTS5 MATCH failed; using recency fallback", exc_info=True) + + if not candidate_ids: + # No FTS hits (or no usable tokens) — pull the most recent + # rows for the drawers segment so we can BM25-rank something + # rather than return empty-handed. + rows = conn.execute( + """ + SELECT e.id + FROM embeddings e + JOIN segments s ON e.segment_id = s.id + JOIN collections c ON s.collection = c.id + WHERE c.name = 'mempalace_drawers' + ORDER BY e.created_at DESC + LIMIT ? + """, + (max_candidates,), + ).fetchall() + candidate_ids = [r[0] for r in rows] + + if not candidate_ids: + return { + "query": query, + "filters": {"wing": wing, "room": room}, + "total_before_filter": 0, + "results": [], + "fallback": "bm25_only_via_sqlite", + } + + placeholders = ",".join(["?"] * len(candidate_ids)) + meta_rows = conn.execute( + f""" + SELECT id, key, string_value, int_value + FROM embedding_metadata + WHERE id IN ({placeholders}) + """, + candidate_ids, + ).fetchall() + finally: + conn.close() + + # Group metadata rows into per-drawer dicts. + drawers: dict[int, dict] = {} + for emb_id, key, sval, ival in meta_rows: + d = drawers.setdefault(emb_id, {"_id": emb_id, "metadata": {}, "text": ""}) + if key == "chroma:document": + d["text"] = sval or "" + else: + d["metadata"][key] = sval if sval is not None else ival + + # Apply wing/room filters in Python (FTS5 candidates may include + # entries from other wings). + candidates = [] + for d in drawers.values(): + meta = d["metadata"] + if wing and meta.get("wing") != wing: + continue + if room and meta.get("room") != room: + continue + candidates.append( + { + "text": d["text"], + "wing": meta.get("wing", "unknown"), + "room": meta.get("room", "unknown"), + "source_file": Path(meta.get("source_file", "?") or "?").name, + "created_at": meta.get("filed_at", "unknown"), + # No vector distance available in BM25-only mode. + "similarity": None, + "distance": None, + "matched_via": "bm25_sqlite", + } + ) + + # Local BM25 over the candidate set. + docs = [c["text"] for c in candidates] + bm25_raw = _bm25_scores(query, docs) + max_bm25 = max(bm25_raw) if bm25_raw else 0.0 + for c, raw in zip(candidates, bm25_raw): + c["bm25_score"] = round(raw, 3) + c["_score"] = (raw / max_bm25) if max_bm25 > 0 else 0.0 + candidates.sort(key=lambda c: c["_score"], reverse=True) + hits = candidates[:n_results] + for h in hits: + h.pop("_score", None) + + return { + "query": query, + "filters": {"wing": wing, "room": room}, + "total_before_filter": len(candidates), + "results": hits, + "fallback": "bm25_only_via_sqlite", + "fallback_reason": "vector_search_disabled", + } + + def search_memories( query: str, palace_path: str, @@ -370,6 +524,7 @@ def search_memories( room: str = None, n_results: int = 5, max_distance: float = 0.0, + vector_disabled: bool = False, ) -> dict: """Programmatic search — returns a dict instead of printing. @@ -385,7 +540,20 @@ def search_memories( cosine distance (hnsw:space=cosine) — 0 = identical, 2 = opposite. Results with distance > this value are filtered out. A value of 0.0 disables filtering. Typical useful range: 0.3–1.0. + vector_disabled: When True, route to the sqlite-only BM25 fallback + (#1222). Set by the MCP server when the HNSW capacity probe + detects a divergence that would segfault chromadb on segment + load. """ + if vector_disabled: + return _bm25_only_via_sqlite( + query, + palace_path, + wing=wing, + room=room, + n_results=n_results, + ) + try: drawers_col = get_collection(palace_path, create=False) except Exception as e: diff --git a/tests/test_hnsw_capacity.py b/tests/test_hnsw_capacity.py new file mode 100644 index 0000000..ca4ccff --- /dev/null +++ b/tests/test_hnsw_capacity.py @@ -0,0 +1,363 @@ +"""Tests for the #1222 HNSW capacity probe and BM25-only fallback. + +The probe and fallback never load chromadb's HNSW segment, so all of +these tests synthesize the on-disk shape directly: a chroma.sqlite3 with +the relevant schema rows and an ``index_metadata.pickle`` matching what +chromadb 1.5.x writes (``{"id_to_label": {...}, ...}``). +""" + +from __future__ import annotations + +import os +import pickle +import sqlite3 + +import pytest + +from mempalace.backends.chroma import ( + _hnsw_element_count, + _vector_segment_id, + hnsw_capacity_status, +) +from mempalace.searcher import _bm25_only_via_sqlite + + +COLLECTION = "mempalace_drawers" + + +# ── Fixtures ────────────────────────────────────────────────────────── + + +def _seed_chroma_db(palace: str, sqlite_count: int, segment_id: str) -> None: + """Create a minimal chroma.sqlite3 with one collection + VECTOR segment. + + Mirrors the columns the probe queries: ``segments``, ``collections``, + ``embeddings``, ``embedding_metadata``. Schema matches chromadb + 1.5.x; column types are kept loose because we read with COUNT(*) and + SELECT key, *_value rather than driver-specific casts. + """ + db_path = os.path.join(palace, "chroma.sqlite3") + conn = sqlite3.connect(db_path) + try: + conn.executescript( + """ + CREATE TABLE collections ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL + ); + CREATE TABLE segments ( + id TEXT PRIMARY KEY, + collection TEXT NOT NULL, + scope TEXT NOT NULL + ); + CREATE TABLE embeddings ( + id INTEGER PRIMARY KEY, + segment_id TEXT NOT NULL, + embedding_id TEXT NOT NULL, + seq_id BLOB NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE embedding_metadata ( + id INTEGER REFERENCES embeddings(id), + key TEXT NOT NULL, + string_value TEXT, + int_value INTEGER, + float_value REAL, + bool_value INTEGER, + PRIMARY KEY (id, key) + ); + CREATE VIRTUAL TABLE embedding_fulltext_search + USING fts5(string_value, tokenize='trigram'); + """ + ) + col_id = "col-test" + meta_seg = "seg-meta" + conn.execute("INSERT INTO collections (id, name) VALUES (?, ?)", (col_id, COLLECTION)) + conn.execute( + "INSERT INTO segments (id, collection, scope) VALUES (?, ?, 'VECTOR')", + (segment_id, col_id), + ) + conn.execute( + "INSERT INTO segments (id, collection, scope) VALUES (?, ?, 'METADATA')", + (meta_seg, col_id), + ) + for i in range(sqlite_count): + conn.execute( + """INSERT INTO embeddings (id, segment_id, embedding_id, seq_id) + VALUES (?, ?, ?, ?)""", + (i + 1, segment_id, f"d-{i}", b"\x00\x00\x00\x00\x00\x00\x00\x01"), + ) + conn.commit() + finally: + conn.close() + + +def _write_pickle(palace: str, segment_id: str, hnsw_count: int) -> None: + """Write an index_metadata.pickle matching chromadb 1.5.x's shape. + + 1.5.x ``__reduce_ex__`` serializes the PersistentData instance as a + plain dict; we replicate that so the safe unpickler in + ``_hnsw_element_count`` reads the same bytes shape it would in + production. + """ + seg_dir = os.path.join(palace, segment_id) + os.makedirs(seg_dir, exist_ok=True) + pickle_path = os.path.join(seg_dir, "index_metadata.pickle") + state = { + "dimensionality": 384, + "total_elements_added": hnsw_count, + "max_seq_id": None, + "id_to_label": {f"d-{i}": i for i in range(hnsw_count)}, + "label_to_id": {i: f"d-{i}" for i in range(hnsw_count)}, + "id_to_seq_id": {}, + } + with open(pickle_path, "wb") as f: + pickle.dump(state, f, pickle.HIGHEST_PROTOCOL) + + +# ── _vector_segment_id ──────────────────────────────────────────────── + + +def test_vector_segment_id_returns_uuid(tmp_path): + seg = "11111111-2222-3333-4444-555555555555" + _seed_chroma_db(str(tmp_path), sqlite_count=10, segment_id=seg) + assert _vector_segment_id(str(tmp_path), COLLECTION) == seg + + +def test_vector_segment_id_no_palace(tmp_path): + assert _vector_segment_id(str(tmp_path), COLLECTION) is None + + +def test_vector_segment_id_unknown_collection(tmp_path): + seg = "11111111-2222-3333-4444-555555555555" + _seed_chroma_db(str(tmp_path), sqlite_count=10, segment_id=seg) + assert _vector_segment_id(str(tmp_path), "nope") is None + + +# ── _hnsw_element_count ─────────────────────────────────────────────── + + +def test_hnsw_element_count_reads_pickle(tmp_path): + seg = "seg-001" + _seed_chroma_db(str(tmp_path), sqlite_count=100, segment_id=seg) + _write_pickle(str(tmp_path), seg, hnsw_count=42) + assert _hnsw_element_count(str(tmp_path), seg) == 42 + + +def test_hnsw_element_count_missing_pickle(tmp_path): + seg = "seg-001" + _seed_chroma_db(str(tmp_path), sqlite_count=100, segment_id=seg) + # Segment dir doesn't even exist — no flush ever happened. + assert _hnsw_element_count(str(tmp_path), seg) is None + + +def test_hnsw_element_count_rejects_arbitrary_class(tmp_path): + """Pickled references to unallowed classes must not deserialize. + + Guards against a tampered ``index_metadata.pickle`` triggering code + execution. The unpickler allowlist is the only protection between + the file and arbitrary import-time side effects. We hand-craft the + pickle bytes (rather than ``pickle.dump`` a local class) because + pickle can't serialize locally-defined classes — but the bytes form + that names an arbitrary stdlib class is a faithful proxy for the + tampered-file threat we want to test. + """ + import pickle as _pickle + + seg = "seg-evil" + seg_dir = tmp_path / seg + seg_dir.mkdir() + pickle_path = seg_dir / "index_metadata.pickle" + # GLOBAL opcode pointing at os.system, then STOP. If the unpickler + # didn't enforce the allowlist, find_class would resolve os.system + # and pickle would set up the call. The allowlist must reject it + # before find_class returns anything. + payload = b"c" + b"os\nsystem\n" + _pickle.STOP + pickle_path.write_bytes(payload) + assert _hnsw_element_count(str(tmp_path), seg) is None + + +# ── hnsw_capacity_status ────────────────────────────────────────────── + + +def test_capacity_status_ok_when_balanced(tmp_path): + seg = "seg-001" + _seed_chroma_db(str(tmp_path), sqlite_count=1000, segment_id=seg) + _write_pickle(str(tmp_path), seg, hnsw_count=950) + info = hnsw_capacity_status(str(tmp_path), COLLECTION) + assert info["status"] == "ok" + assert info["diverged"] is False + assert info["sqlite_count"] == 1000 + assert info["hnsw_count"] == 950 + + +def test_capacity_status_flags_severe_divergence(tmp_path): + """Reproduces #1222: sqlite has 192k, HNSW frozen at ~16k.""" + seg = "seg-1222" + _seed_chroma_db(str(tmp_path), sqlite_count=20_000, segment_id=seg) + _write_pickle(str(tmp_path), seg, hnsw_count=2_000) + info = hnsw_capacity_status(str(tmp_path), COLLECTION) + assert info["status"] == "diverged" + assert info["diverged"] is True + assert info["divergence"] == 18_000 + assert "repair" in info["message"].lower() + + +def test_capacity_status_tolerates_flush_lag(tmp_path): + """A few hundred entries behind sqlite is normal post-mine state.""" + seg = "seg-lag" + _seed_chroma_db(str(tmp_path), sqlite_count=5_000, segment_id=seg) + _write_pickle(str(tmp_path), seg, hnsw_count=4_500) + info = hnsw_capacity_status(str(tmp_path), COLLECTION) + assert info["diverged"] is False + assert info["status"] == "ok" + + +def test_capacity_status_flags_unflushed_with_large_sqlite(tmp_path): + """No pickle + many sqlite rows is its own divergence signal.""" + seg = "seg-noflush" + _seed_chroma_db(str(tmp_path), sqlite_count=10_000, segment_id=seg) + info = hnsw_capacity_status(str(tmp_path), COLLECTION) + assert info["diverged"] is True + assert info["hnsw_count"] is None + assert "never flushed" in info["message"] + + +def test_capacity_status_quiet_for_empty_palace(tmp_path): + info = hnsw_capacity_status(str(tmp_path), COLLECTION) + assert info["diverged"] is False + assert info["status"] == "unknown" + + +# ── BM25-only sqlite fallback ───────────────────────────────────────── + + +def _seed_drawers(palace: str, segment_id: str, drawers: list[tuple[str, dict, str]]) -> None: + """Insert (text, metadata, embedding_id) tuples into a seeded palace. + + Replaces the bare ``embeddings`` rows from ``_seed_chroma_db`` so the + sqlite count matches what we insert here. + """ + db_path = os.path.join(palace, "chroma.sqlite3") + conn = sqlite3.connect(db_path) + try: + conn.execute("DELETE FROM embeddings") + for i, (text, meta, eid) in enumerate(drawers, start=1): + conn.execute( + """INSERT INTO embeddings (id, segment_id, embedding_id, seq_id) + VALUES (?, ?, ?, ?)""", + (i, segment_id, eid, b"\x00" * 8), + ) + conn.execute( + """INSERT INTO embedding_metadata (id, key, string_value) + VALUES (?, 'chroma:document', ?)""", + (i, text), + ) + conn.execute( + "INSERT INTO embedding_fulltext_search (rowid, string_value) VALUES (?, ?)", + (i, text), + ) + for k, v in meta.items(): + if isinstance(v, int): + conn.execute( + """INSERT INTO embedding_metadata (id, key, int_value) + VALUES (?, ?, ?)""", + (i, k, v), + ) + else: + conn.execute( + """INSERT INTO embedding_metadata (id, key, string_value) + VALUES (?, ?, ?)""", + (i, k, str(v)), + ) + conn.commit() + finally: + conn.close() + + +@pytest.fixture +def palace_with_drawers(tmp_path): + seg = "seg-bm25" + _seed_chroma_db(str(tmp_path), sqlite_count=0, segment_id=seg) + drawers = [ + ( + "ChromaDB segfault on every tool call after HNSW divergence", + {"wing": "ops", "room": "incidents", "source_file": "/x/incident.md"}, + "d-1", + ), + ( + "Memory palace technique using rooms and drawers for recall", + {"wing": "design", "room": "metaphor", "source_file": "/x/design.md"}, + "d-2", + ), + ( + "Repair rebuild backs up only the sqlite database", + {"wing": "ops", "room": "runbook", "source_file": "/x/repair.md"}, + "d-3", + ), + ] + _seed_drawers(str(tmp_path), seg, drawers) + return tmp_path + + +def test_bm25_fallback_returns_matches(palace_with_drawers): + out = _bm25_only_via_sqlite("segfault chromadb", str(palace_with_drawers), n_results=5) + assert out["fallback"] == "bm25_only_via_sqlite" + assert len(out["results"]) >= 1 + top = out["results"][0] + # The incident drawer is the closest BM25 match for these terms. + assert "segfault" in top["text"].lower() + assert top["matched_via"] == "bm25_sqlite" + # Vector fields are intentionally absent in fallback mode. + assert top["similarity"] is None + assert top["distance"] is None + + +def test_bm25_fallback_filters_by_wing(palace_with_drawers): + out = _bm25_only_via_sqlite( + "memory palace recall", str(palace_with_drawers), wing="design", n_results=5 + ) + assert all(r["wing"] == "design" for r in out["results"]) + + +def test_bm25_fallback_no_palace(tmp_path): + out = _bm25_only_via_sqlite("anything", str(tmp_path)) + assert "error" in out + + +def test_bm25_fallback_handles_short_query(palace_with_drawers): + """Single-character tokens are unmatchable in trigram FTS5 — must + not crash, must fall back to the recency window.""" + out = _bm25_only_via_sqlite("a", str(palace_with_drawers), n_results=5) + # Falls back to recency window; returns whatever it can rank. + assert out["fallback"] == "bm25_only_via_sqlite" + assert isinstance(out["results"], list) + + +# ── repair.status CLI command ───────────────────────────────────────── + + +def test_repair_status_reports_diverged(tmp_path, capsys): + """The status command prints DIVERGED and recommends rebuild.""" + from mempalace.repair import status as repair_status + + seg = "seg-status" + _seed_chroma_db(str(tmp_path), sqlite_count=20_000, segment_id=seg) + _write_pickle(str(tmp_path), seg, hnsw_count=2_000) + out = repair_status(palace_path=str(tmp_path)) + captured = capsys.readouterr().out + assert "DIVERGED" in captured + assert "mempalace repair rebuild" in captured + assert out["drawers"]["diverged"] is True + + +def test_repair_status_quiet_on_healthy_palace(tmp_path, capsys): + from mempalace.repair import status as repair_status + + seg = "seg-status-ok" + _seed_chroma_db(str(tmp_path), sqlite_count=500, segment_id=seg) + _write_pickle(str(tmp_path), seg, hnsw_count=480) + repair_status(palace_path=str(tmp_path)) + captured = capsys.readouterr().out + assert "DIVERGED" not in captured + assert "mempalace repair rebuild" not in captured