Merge pull request #1227 from MemPalace/fix/hnsw-capacity-divergence-1222

fix(repair): detect HNSW capacity divergence and fall back to BM25 (#1222)
This commit is contained in:
Igor Lins e Silva
2026-04-26 21:58:57 -03:00
committed by GitHub
6 changed files with 1118 additions and 8 deletions
+266
View File
@@ -211,6 +211,272 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 300.0) -> lis
return moved
def _vector_segment_id(palace_path: str, collection_name: str) -> Optional[str]:
"""Return the VECTOR segment UUID for ``collection_name`` or ``None``.
Reads ``chroma.sqlite3`` directly so we never have to load a segment
that may segfault on open (#1222 is exactly this case).
"""
db_path = os.path.join(palace_path, "chroma.sqlite3")
if not os.path.isfile(db_path):
return None
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
row = conn.execute(
"""
SELECT s.id
FROM segments s
JOIN collections c ON s.collection = c.id
WHERE c.name = ? AND s.scope = 'VECTOR'
LIMIT 1
""",
(collection_name,),
).fetchone()
return row[0] if row else None
finally:
conn.close()
except sqlite3.Error:
return None
class _PersistentDataStub:
"""Minimal stand-in for chromadb's ``PersistentData`` during safe unpickling.
Accepts any constructor args so pickle's REDUCE opcode succeeds,
captures ``__setstate__`` into ``__dict__``. Only used by
:func:`_hnsw_element_count` — never persisted, never re-pickled.
"""
def __init__(self, *args, **kwargs):
# Some chromadb versions pickle PersistentData by passing init args
# positionally via REDUCE. We don't care about reconstructing the
# object faithfully — we only need the id_to_label dict — so swallow
# all positional args and re-expose the relevant attributes via
# __setstate__ or __dict__ population further down.
pass
def __setstate__(self, state):
if isinstance(state, dict):
self.__dict__.update(state)
elif isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], dict):
# (slot_state, dict_state) two-tuple form — only the dict part has
# the named attributes we care about.
self.__dict__.update(state[1])
class _SafePersistentDataUnpickler:
"""Whitelist-only unpickler for ``index_metadata.pickle``.
Allows only ``PersistentData`` from chromadb's HNSW module; everything
else raises ``UnpicklingError``. Standard container types (dict, list,
tuple, str, int, float) are handled by the pickle machinery itself
and don't need allowlisting via ``find_class`` — only constructed
classes do.
This is the same trust model chromadb uses (it pickles its own files),
but with a tight class allowlist so a tampered file can't instantiate
arbitrary classes during deserialization.
"""
_ALLOWED = frozenset(
{
(
"chromadb.segment.impl.vector.local_persistent_hnsw",
"PersistentData",
),
}
)
@classmethod
def load(cls, path: str):
import pickle
class _Restricted(pickle.Unpickler):
def find_class(self, module: str, name: str):
if (module, name) in cls._ALLOWED:
return _PersistentDataStub
raise pickle.UnpicklingError(f"disallowed class: {module}.{name}")
with open(path, "rb") as f:
return _Restricted(f).load()
def _hnsw_element_count(palace_path: str, segment_id: str) -> Optional[int]:
"""Return the element count chromadb thinks the HNSW segment holds.
Reads ``index_metadata.pickle`` via a tight-allowlist unpickler and
counts ``id_to_label`` entries. This is the count chromadb consults
when sizing/loading the HNSW index on next open — distinct from
hnswlib's internal ``cur_element_count`` in the binary files. For
#1222's divergence check this is the number that matters, because it
is what gets compared against ``count() * resize_factor`` when
chromadb decides whether to resize HNSW on load.
Uses :class:`_SafePersistentDataUnpickler` rather than chromadb's own
``PersistentData.load_from_file`` so the probe works even when
``hnswlib`` is not installed (chromadb's persistent_hnsw module
imports hnswlib at module load — a probe that requires hnswlib would
refuse to run in environments where the segfault risk is moot
anyway). The allowlisted unpickler is also the safer default: the
pickle file is owned by the same user, but a tighter trust boundary
costs us nothing.
Returns ``None`` when the file is absent (fresh / never-flushed
segment) or the unpickle fails. Callers treat ``None`` as "unknown".
"""
pickle_path = os.path.join(palace_path, segment_id, "index_metadata.pickle")
if not os.path.isfile(pickle_path):
return None
try:
pd = _SafePersistentDataUnpickler.load(pickle_path)
# ChromaDB serializes PersistentData differently across versions:
# 1.5.x writes a plain dict via ``__reduce_ex__``; older versions
# pickled the class instance and rely on ``__setstate__`` to
# populate ``__dict__``. Handle both shapes.
if isinstance(pd, dict):
id_to_label = pd.get("id_to_label")
else:
id_to_label = getattr(pd, "id_to_label", None)
if isinstance(id_to_label, dict):
return len(id_to_label)
return None
except Exception:
logger.debug("_hnsw_element_count failed for %s", pickle_path, exc_info=True)
return None
# Divergence threshold: chromadb's HNSW flushes asynchronously, so HNSW
# typically lags sqlite by up to ``sync_threshold`` (default 1000) records
# under active write load — that's the *brute-force batch* that hasn't
# been compacted into HNSW yet, plus the un-persisted tail beyond the
# last sync. Two synchronization windows worth (2 × sync_threshold = 2000)
# is a safe steady-state ceiling; anything past that is real divergence,
# not flush-lag.
#
# The #1222 case was 176 613 missing out of 192 997 (91% gone) — orders
# of magnitude past 2000. A typical post-mine palace shows a few hundred
# to ~1000 missing, well under threshold.
_HNSW_DIVERGENCE_ABSOLUTE = 2000
_HNSW_DIVERGENCE_FRACTION = 0.10
def hnsw_capacity_status(palace_path: str, collection_name: str = "mempalace_drawers") -> dict:
"""Compare sqlite embedding count against HNSW element count.
The #1222 failure mode: ``max_elements`` froze at 16 384 while sqlite
accumulated 192 997 embeddings. Every subsequent tool call segfaulted
when chromadb tried to load the undersized HNSW. This probe runs
*before* anything touches the segment so we can warn (or fall back to
BM25) instead of crashing.
Returns a dict with:
* ``segment_id`` — VECTOR segment UUID, or ``None`` if no palace
* ``sqlite_count`` — embeddings present in chroma.sqlite3
* ``hnsw_count`` — elements chromadb's pickle knows about
* ``divergence`` — ``sqlite_count - hnsw_count`` when both known
* ``diverged`` — True when divergence exceeds the threshold
* ``status`` — ``"ok"`` | ``"diverged"`` | ``"unknown"``
* ``message`` — human-readable summary
Never raises — a probe that throws would defeat the point.
"""
out: dict[str, Any] = {
"segment_id": None,
"sqlite_count": None,
"hnsw_count": None,
"divergence": None,
"diverged": False,
"status": "unknown",
"message": "",
}
try:
seg_id = _vector_segment_id(palace_path, collection_name)
out["segment_id"] = seg_id
sqlite_count = _sqlite_embedding_count(palace_path, collection_name)
out["sqlite_count"] = sqlite_count
if seg_id is None or sqlite_count is None:
out["message"] = "palace state unreadable; skipping HNSW capacity check"
return out
hnsw_count = _hnsw_element_count(palace_path, seg_id)
out["hnsw_count"] = hnsw_count
if hnsw_count is None:
# No pickle yet — segment hasn't persisted metadata. Could be
# fresh-but-unflushed (normal) or interrupted-mid-flush (bad).
# We can't distinguish without the pickle, so only flag
# divergence when sqlite holds clearly more than two flush
# windows worth — same threshold as the with-pickle path.
if sqlite_count > _HNSW_DIVERGENCE_ABSOLUTE:
out["status"] = "diverged"
out["diverged"] = True
out["divergence"] = sqlite_count
out["message"] = (
f"sqlite holds {sqlite_count:,} embeddings but the HNSW segment "
"has never flushed metadata — vector search will return nothing "
"until the segment is rebuilt. Run `mempalace repair`."
)
else:
out["message"] = "HNSW segment metadata not yet flushed; skipping"
return out
divergence = sqlite_count - hnsw_count
out["divergence"] = divergence
threshold = max(_HNSW_DIVERGENCE_ABSOLUTE, int(sqlite_count * _HNSW_DIVERGENCE_FRACTION))
if divergence > threshold:
out["status"] = "diverged"
out["diverged"] = True
pct = 100.0 * divergence / max(sqlite_count, 1)
out["message"] = (
f"HNSW index holds {hnsw_count:,} elements but sqlite has "
f"{sqlite_count:,} embeddings — {divergence:,} drawers ({pct:.0f}%) "
"are invisible to vector search. Run `mempalace repair` to rebuild."
)
else:
out["status"] = "ok"
out["message"] = (
f"HNSW {hnsw_count:,} / sqlite {sqlite_count:,} (within flush-lag tolerance)"
)
except Exception:
logger.debug("hnsw_capacity_status failed", exc_info=True)
out["message"] = "HNSW capacity probe raised; skipping"
return out
def _sqlite_embedding_count(palace_path: str, collection_name: str) -> Optional[int]:
"""Count rows in chroma.sqlite3.embeddings for ``collection_name``.
Mirrors :func:`mempalace.repair.sqlite_drawer_count` but kept in this
module so the backend probe doesn't pull in the repair CLI module.
"""
db_path = os.path.join(palace_path, "chroma.sqlite3")
if not os.path.isfile(db_path):
return None
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
row = conn.execute(
"""
SELECT COUNT(*)
FROM embeddings e
JOIN segments s ON e.segment_id = s.id
JOIN collections c ON s.collection = c.id
WHERE c.name = ?
""",
(collection_name,),
).fetchone()
return int(row[0]) if row and row[0] is not None else None
finally:
conn.close()
except sqlite3.Error:
return None
def _pin_hnsw_threads(collection) -> None:
"""Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection.
+15
View File
@@ -607,6 +607,14 @@ def cmd_status(args):
status(palace_path=palace_path)
def cmd_repair_status(args):
"""Read-only HNSW capacity health check (#1222)."""
from .repair import status as repair_status
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
repair_status(palace_path=palace_path)
def cmd_repair(args):
"""Rebuild palace vector index from SQLite metadata."""
import shutil
@@ -1125,6 +1133,12 @@ def main():
),
)
# repair-status — read-only HNSW capacity health check (#1222)
sub.add_parser(
"repair-status",
help="Compare sqlite vs HNSW element counts (read-only; never opens a chromadb client)",
)
# mcp
sub.add_parser(
"mcp",
@@ -1181,6 +1195,7 @@ def main():
"compress": cmd_compress,
"wake-up": cmd_wakeup,
"repair": cmd_repair,
"repair-status": cmd_repair_status,
"migrate": cmd_migrate,
"status": cmd_status,
}
+189 -4
View File
@@ -57,7 +57,12 @@ from .config import ( # noqa: E402
sanitize_content,
)
from .version import __version__ # noqa: E402
from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402
from .backends.chroma import ( # noqa: E402
ChromaBackend,
ChromaCollection,
_pin_hnsw_threads,
hnsw_capacity_status,
)
from .query_sanitizer import sanitize_query # noqa: E402
from .searcher import search_memories # noqa: E402
from .palace_graph import ( # noqa: E402
@@ -108,6 +113,55 @@ _collection_cache = None
_palace_db_inode = 0 # inode of chroma.sqlite3 at cache time
_palace_db_mtime = 0.0 # mtime of chroma.sqlite3 at cache time
# ── Vector-search disabled flag (#1222) ──────────────────────────────────
# Set when ``hnsw_capacity_status`` reports a divergence between sqlite
# and the HNSW segment large enough that chromadb would segfault on
# segment load. While this is set, vector-shaped tools (``search``,
# ``check_duplicate``) route to the sqlite-only BM25 fallback in
# :func:`mempalace.searcher._bm25_only_via_sqlite`. Cleared after a
# successful repair via :func:`tool_reconnect` (which re-runs the probe).
_vector_disabled = False
_vector_disabled_reason = ""
# Optional[dict] (not ``dict | None``) keeps Python 3.9 import-time
# parsing happy — PEP 604 unions in annotations only became unconditional
# at module-eval time in 3.10.
_vector_capacity_status = None # type: Optional[dict]
def _refresh_vector_disabled_flag() -> None:
"""Re-run the HNSW capacity probe and update the module-level flag.
Called from :func:`_get_client` whenever the client cache is rebuilt
(first open or palace replacement). Cheap — pure sqlite + pickle
read, no chromadb interaction. Never raises: a probe that crashes
would defeat the point.
"""
global _vector_disabled, _vector_disabled_reason, _vector_capacity_status
try:
info = hnsw_capacity_status(_config.palace_path, "mempalace_drawers")
except Exception:
logger.debug("HNSW capacity probe raised", exc_info=True)
return
_vector_capacity_status = info
if info.get("diverged"):
if not _vector_disabled:
logger.warning(
"HNSW capacity divergence detected (%s) — routing search to "
"BM25-only sqlite fallback. Run `mempalace repair` to restore "
"vector search.",
info.get("message", "unknown"),
)
_vector_disabled = True
_vector_disabled_reason = info.get("message", "")
else:
if _vector_disabled:
logger.info(
"HNSW capacity within tolerance (%s) — vector search re-enabled",
info.get("message", ""),
)
_vector_disabled = False
_vector_disabled_reason = ""
# ==================== WRITE-AHEAD LOG ====================
# Every write operation is logged to a JSONL file before execution.
@@ -202,6 +256,11 @@ def _get_client():
mtime_changed = current_mtime != 0.0 and abs(current_mtime - _palace_db_mtime) > 0.01
if _client_cache is None or inode_changed or mtime_changed:
# Run the HNSW capacity probe BEFORE chromadb opens the segment —
# if the index is severely undersized, segment load can segfault
# the whole MCP server (#1222). The probe is pure sqlite +
# metadata-pickle read; never touches the HNSW binary files.
_refresh_vector_disabled_flag()
_client_cache = ChromaBackend.make_client(_config.palace_path)
_collection_cache = None
_metadata_cache = None
@@ -303,11 +362,91 @@ def _sanitize_optional_name(value: str = None, field_name: str = "name") -> str:
# ==================== READ TOOLS ====================
def _tool_status_via_sqlite() -> dict:
"""Pure-sqlite status reader for the #1222 fallback path.
When the HNSW capacity probe detects divergence, opening the chromadb
persistent client can segfault. This reader pulls the same wing/room
breakdown directly from ``embedding_metadata`` so the operator still
gets a working status response — and crucially the
``vector_disabled`` flag — without us touching the vector segment.
"""
import sqlite3 as _sqlite3
db_path = os.path.join(_config.palace_path, "chroma.sqlite3")
if not os.path.isfile(db_path):
return _no_palace()
wings: dict = {}
rooms: dict = {}
total = 0
try:
conn = _sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
row = conn.execute(
"""
SELECT COUNT(*)
FROM embeddings e
JOIN segments s ON e.segment_id = s.id
JOIN collections c ON s.collection = c.id
WHERE c.name = 'mempalace_drawers'
"""
).fetchone()
total = int(row[0]) if row and row[0] is not None else 0
for key, target in (("wing", wings), ("room", rooms)):
for value, count in conn.execute(
"""
SELECT em.string_value, COUNT(*)
FROM embedding_metadata em
JOIN embeddings e ON em.id = e.id
JOIN segments s ON e.segment_id = s.id
JOIN collections c ON s.collection = c.id
WHERE c.name = 'mempalace_drawers'
AND em.key = ?
AND em.string_value IS NOT NULL
GROUP BY em.string_value
""",
(key,),
):
target[value] = count
finally:
conn.close()
except _sqlite3.Error:
logger.exception("tool_status sqlite fallback read failed")
result = {
"total_drawers": total,
"wings": wings,
"rooms": rooms,
"palace_path": _config.palace_path,
"protocol": PALACE_PROTOCOL,
"aaak_dialect": AAAK_SPEC,
"vector_disabled": True,
"vector_disabled_reason": _vector_disabled_reason,
}
if _vector_capacity_status:
result["hnsw_capacity"] = {
"sqlite_count": _vector_capacity_status.get("sqlite_count"),
"hnsw_count": _vector_capacity_status.get("hnsw_count"),
"divergence": _vector_capacity_status.get("divergence"),
}
return result
def tool_status():
# Run the safe sqlite/pickle probe before we touch chromadb. In the
# #1222 failure mode, opening the persistent client to call .count()
# can segfault — short-circuit to a pure-sqlite path when divergence
# is detected so status stays reachable.
db_exists = os.path.isfile(os.path.join(_config.palace_path, "chroma.sqlite3"))
_refresh_vector_disabled_flag()
if _vector_disabled:
return _tool_status_via_sqlite()
# Use create=True only when a palace DB already exists on disk -- this
# bootstraps the ChromaDB collection on a valid-but-empty palace without
# accidentally creating a palace in a non-existent directory (#830).
db_exists = os.path.isfile(os.path.join(_config.palace_path, "chroma.sqlite3"))
col = _get_collection(create=db_exists)
if not col:
return _no_palace()
@@ -456,6 +595,11 @@ def tool_search(
dist = (1.0 - min_similarity) if min_similarity is not None else max_distance
# Mitigate system prompt contamination (Issue #333)
sanitized = sanitize_query(query)
# Ensure the vector-disabled probe has been run via the safe
# sqlite/pickle path before we touch chromadb. Calling _get_client()
# here would defeat the fallback — it constructs a PersistentClient
# which can segfault on segment load in the #1222 failure mode.
_refresh_vector_disabled_flag()
result = search_memories(
sanitized["clean_query"],
palace_path=_config.palace_path,
@@ -463,7 +607,11 @@ def tool_search(
room=room,
n_results=limit,
max_distance=dist,
vector_disabled=_vector_disabled,
)
if _vector_disabled:
result["vector_disabled"] = True
result["vector_disabled_reason"] = _vector_disabled_reason
# Attach sanitizer metadata for transparency
if sanitized["was_sanitized"]:
result["query_sanitized"] = True
@@ -482,6 +630,20 @@ def tool_check_duplicate(content: str, threshold: float = 0.9):
col = _get_collection()
if not col:
return _no_palace()
if _vector_disabled:
# Without a usable HNSW we can't compute cosine similarity for
# near-duplicate detection. Report the limitation rather than
# silently returning "not a duplicate" — false negatives here
# would let the AI re-file content the palace already holds.
return {
"is_duplicate": False,
"matches": [],
"vector_disabled": True,
"vector_disabled_reason": _vector_disabled_reason,
"hint": (
"duplicate detection requires vector search; run " "`mempalace repair` to restore"
),
}
try:
results = col.query(
query_texts=[content],
@@ -1150,10 +1312,22 @@ def tool_reconnect():
Use after external scripts or CLI commands modify the palace database
directly, which can leave the in-memory HNSW index stale.
"""
global _collection_cache, _palace_db_inode, _palace_db_mtime
global \
_client_cache, \
_collection_cache, \
_palace_db_inode, \
_palace_db_mtime, \
_vector_disabled, \
_vector_disabled_reason
_client_cache = None
_collection_cache = None
_palace_db_inode = 0
_palace_db_mtime = 0.0
# Force probe re-run on next _get_client by clearing the flag now;
# _refresh_vector_disabled_flag will re-set it if the divergence
# still applies after the reconnect.
_vector_disabled = False
_vector_disabled_reason = ""
try:
col = _get_collection()
if col is None:
@@ -1161,8 +1335,15 @@ def tool_reconnect():
"success": False,
"message": "No palace found after reconnect",
"drawers": 0,
"vector_disabled": _vector_disabled,
}
return {"success": True, "message": "Reconnected to palace", "drawers": col.count()}
return {
"success": True,
"message": "Reconnected to palace",
"drawers": col.count(),
"vector_disabled": _vector_disabled,
"vector_disabled_reason": _vector_disabled_reason,
}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -1726,6 +1907,10 @@ def _restore_stdout():
def main():
_restore_stdout()
logger.info("MemPalace MCP Server starting...")
# Pre-flight: probe HNSW capacity before any tool call so the warning
# is visible at startup rather than on first use (#1222). Pure
# filesystem read; never opens a chromadb client.
_refresh_vector_disabled_flag()
while True:
try:
line = sys.stdin.readline()
+61 -4
View File
@@ -6,8 +6,9 @@ When ChromaDB's HNSW index accumulates duplicate entries (from repeated
add() calls with the same ID), link_lists.bin can grow unbounded —
terabytes on large palaces — eventually causing segfaults.
This module provides three operations:
This module provides four operations:
status — compare sqlite vs HNSW element counts (read-only health check)
scan — find every corrupt/unfetchable ID in the palace
prune — delete only the corrupt IDs (surgical)
rebuild — extract all drawers, delete the collection, recreate with
@@ -17,6 +18,7 @@ The rebuild backs up ONLY chroma.sqlite3 (the source of truth), not the
full palace directory — so it works even when link_lists.bin is bloated.
Usage (standalone):
python -m mempalace.repair status
python -m mempalace.repair scan [--wing X]
python -m mempalace.repair prune --confirm
python -m mempalace.repair rebuild
@@ -32,7 +34,7 @@ import os
import shutil
import time
from .backends.chroma import ChromaBackend
from .backends.chroma import ChromaBackend, hnsw_capacity_status
COLLECTION_NAME = "mempalace_drawers"
@@ -431,9 +433,62 @@ def rebuild_index(palace_path=None, confirm_truncation_ok: bool = False):
print(f"\n{'=' * 55}\n")
def status(palace_path=None) -> dict:
"""Read-only health check: compare sqlite vs HNSW element counts.
Catches the #1222 failure mode where chromadb's HNSW segment freezes
at a stale ``max_elements`` while sqlite keeps accumulating rows.
Once the divergence is large enough, every tool call segfaults when
chromadb tries to load the undersized HNSW. Running ``mempalace
repair-status`` *before* opening the segment lets the operator
discover the problem without crashing the MCP server.
The check itself never opens a chromadb client and never imports
hnswlib — it reads ``chroma.sqlite3`` and ``index_metadata.pickle``
directly via :func:`mempalace.backends.chroma.hnsw_capacity_status`.
Returns the capacity-status dict (also printed). Returns a dict with
``status="unknown"`` when no palace exists at the given path.
"""
palace_path = palace_path or _get_palace_path()
print(f"\n{'=' * 55}")
print(" MemPalace Repair — Status")
print(f"{'=' * 55}\n")
print(f" Palace: {palace_path}")
if not os.path.isdir(palace_path):
print(" No palace found.\n")
return {"status": "unknown", "message": "no palace at path"}
drawers = hnsw_capacity_status(palace_path, "mempalace_drawers")
closets = hnsw_capacity_status(palace_path, "mempalace_closets")
for label, info in (("drawers", drawers), ("closets", closets)):
print(f"\n [{label}]")
if info["sqlite_count"] is None:
print(" sqlite count: (unreadable)")
else:
print(f" sqlite count: {info['sqlite_count']:,}")
if info["hnsw_count"] is None:
print(" hnsw count: (no flushed metadata yet)")
else:
print(f" hnsw count: {info['hnsw_count']:,}")
if info["divergence"] is not None:
print(f" divergence: {info['divergence']:,}")
marker = "DIVERGED" if info["diverged"] else info["status"].upper()
print(f" status: {marker}")
if info["message"]:
print(f" note: {info['message']}")
if drawers["diverged"] or closets["diverged"]:
print("\n Recommended: run `mempalace repair` to rebuild the index.")
print()
return {"drawers": drawers, "closets": closets}
if __name__ == "__main__":
p = argparse.ArgumentParser(description="MemPalace repair tools")
p.add_argument("command", choices=["scan", "prune", "rebuild"])
p.add_argument("command", choices=["status", "scan", "prune", "rebuild"])
p.add_argument("--palace", default=None, help="Palace directory path")
p.add_argument("--wing", default=None, help="Scan only this wing")
p.add_argument("--confirm", action="store_true", help="Actually delete corrupt IDs")
@@ -441,7 +496,9 @@ if __name__ == "__main__":
path = os.path.expanduser(args.palace) if args.palace else None
if args.command == "scan":
if args.command == "status":
status(palace_path=path)
elif args.command == "scan":
scan_palace(palace_path=path, only_wing=args.wing)
elif args.command == "prune":
prune_corrupt(palace_path=path, confirm=args.confirm)
+196
View File
@@ -11,7 +11,9 @@ hide drawers the direct path would have found.
import logging
import math
import os
import re
import sqlite3
from pathlib import Path
from .palace import get_closets_collection, get_collection
@@ -363,6 +365,186 @@ def search(query: str, palace_path: str, wing: str = None, room: str = None, n_r
print()
def _bm25_only_via_sqlite(
query: str,
palace_path: str,
wing: str = None,
room: str = None,
n_results: int = 5,
max_candidates: int = 500,
) -> dict:
"""BM25-only search reading drawers directly from chroma.sqlite3.
Used when HNSW is diverged or unloadable (#1222). Bypasses chromadb's
Python client entirely so a corrupt vector segment can't segfault the
MCP server. Routes through chromadb's own FTS5 trigram index
(``embedding_fulltext_search``) for candidate selection, then re-ranks
with the same Okapi-BM25 used in :func:`_hybrid_rank` so the result
shape matches the vector path.
The query is split into ≥3-char trigram-tokens and OR-joined for the
FTS5 MATCH — chromadb writes the index with ``tokenize='trigram'``,
so single-character tokens never match. When no usable token survives
(e.g. "is a"), candidate selection falls back to the most-recent
``max_candidates`` rows so we still return *something* rather than
nothing.
"""
db_path = os.path.join(palace_path, "chroma.sqlite3")
if not os.path.isfile(db_path):
return {
"error": "No palace found",
"hint": "Run: mempalace init <dir> && mempalace mine <dir>",
}
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
except sqlite3.Error as e:
return {"error": f"sqlite open failed: {e}"}
try:
# FTS5 MATCH expects whitespace-separated tokens. Drop tokens
# shorter than 3 chars (trigram tokenizer can't match them).
tokens = [t for t in _tokenize(query) if len(t) >= 3]
candidate_ids: list[int] = []
if tokens:
fts_query = " OR ".join(tokens)
try:
rows = conn.execute(
"""
SELECT rowid
FROM embedding_fulltext_search
WHERE embedding_fulltext_search MATCH ?
LIMIT ?
""",
(fts_query, max_candidates),
).fetchall()
candidate_ids = [r[0] for r in rows]
except sqlite3.Error:
# FTS5 tokenizer mismatch or syntax error — fall through
# to the recency-window selector below.
logger.debug("FTS5 MATCH failed; using recency fallback", exc_info=True)
if not candidate_ids:
# No FTS hits (or no usable tokens) — pull the most recent
# rows for the drawers segment so we can BM25-rank something
# rather than return empty-handed. Wrapped in try/except
# because the schema may differ on legacy palaces (older
# chromadb without ``created_at``, missing ``segments``
# rows after partial restore, etc.); on schema mismatch we
# fall back to ordering by primary-key id and finally to an
# empty result rather than letting search raise.
try:
rows = conn.execute(
"""
SELECT e.id
FROM embeddings e
JOIN segments s ON e.segment_id = s.id
JOIN collections c ON s.collection = c.id
WHERE c.name = 'mempalace_drawers'
ORDER BY e.created_at DESC
LIMIT ?
""",
(max_candidates,),
).fetchall()
candidate_ids = [r[0] for r in rows]
except sqlite3.Error:
logger.debug(
"recency-window query failed; trying id-ordered fallback",
exc_info=True,
)
try:
rows = conn.execute(
"""
SELECT e.id
FROM embeddings e
JOIN segments s ON e.segment_id = s.id
JOIN collections c ON s.collection = c.id
WHERE c.name = 'mempalace_drawers'
ORDER BY e.id DESC
LIMIT ?
""",
(max_candidates,),
).fetchall()
candidate_ids = [r[0] for r in rows]
except sqlite3.Error:
logger.debug("id-ordered fallback also failed", exc_info=True)
candidate_ids = []
if not candidate_ids:
return {
"query": query,
"filters": {"wing": wing, "room": room},
"total_before_filter": 0,
"results": [],
"fallback": "bm25_only_via_sqlite",
}
placeholders = ",".join(["?"] * len(candidate_ids))
meta_rows = conn.execute(
f"""
SELECT id, key, string_value, int_value
FROM embedding_metadata
WHERE id IN ({placeholders})
""",
candidate_ids,
).fetchall()
finally:
conn.close()
# Group metadata rows into per-drawer dicts.
drawers: dict[int, dict] = {}
for emb_id, key, sval, ival in meta_rows:
d = drawers.setdefault(emb_id, {"_id": emb_id, "metadata": {}, "text": ""})
if key == "chroma:document":
d["text"] = sval or ""
else:
d["metadata"][key] = sval if sval is not None else ival
# Apply wing/room filters in Python (FTS5 candidates may include
# entries from other wings).
candidates = []
for d in drawers.values():
meta = d["metadata"]
if wing and meta.get("wing") != wing:
continue
if room and meta.get("room") != room:
continue
candidates.append(
{
"text": d["text"],
"wing": meta.get("wing", "unknown"),
"room": meta.get("room", "unknown"),
"source_file": Path(meta.get("source_file", "?") or "?").name,
"created_at": meta.get("filed_at", "unknown"),
# No vector distance available in BM25-only mode.
"similarity": None,
"distance": None,
"matched_via": "bm25_sqlite",
}
)
# Local BM25 over the candidate set.
docs = [c["text"] for c in candidates]
bm25_raw = _bm25_scores(query, docs)
max_bm25 = max(bm25_raw) if bm25_raw else 0.0
for c, raw in zip(candidates, bm25_raw):
c["bm25_score"] = round(raw, 3)
c["_score"] = (raw / max_bm25) if max_bm25 > 0 else 0.0
candidates.sort(key=lambda c: c["_score"], reverse=True)
hits = candidates[:n_results]
for h in hits:
h.pop("_score", None)
return {
"query": query,
"filters": {"wing": wing, "room": room},
"total_before_filter": len(candidates),
"results": hits,
"fallback": "bm25_only_via_sqlite",
"fallback_reason": "vector_search_disabled",
}
def search_memories(
query: str,
palace_path: str,
@@ -370,6 +552,7 @@ def search_memories(
room: str = None,
n_results: int = 5,
max_distance: float = 0.0,
vector_disabled: bool = False,
) -> dict:
"""Programmatic search — returns a dict instead of printing.
@@ -385,7 +568,20 @@ def search_memories(
cosine distance (hnsw:space=cosine) — 0 = identical, 2 = opposite.
Results with distance > this value are filtered out. A value of
0.0 disables filtering. Typical useful range: 0.31.0.
vector_disabled: When True, route to the sqlite-only BM25 fallback
(#1222). Set by the MCP server when the HNSW capacity probe
detects a divergence that would segfault chromadb on segment
load.
"""
if vector_disabled:
return _bm25_only_via_sqlite(
query,
palace_path,
wing=wing,
room=room,
n_results=n_results,
)
try:
drawers_col = get_collection(palace_path, create=False)
except Exception as e:
+391
View File
@@ -0,0 +1,391 @@
"""Tests for the #1222 HNSW capacity probe and BM25-only fallback.
The probe and fallback never load chromadb's HNSW segment, so all of
these tests synthesize the on-disk shape directly: a chroma.sqlite3 with
the relevant schema rows and an ``index_metadata.pickle`` matching what
chromadb 1.5.x writes (``{"id_to_label": {...}, ...}``).
"""
from __future__ import annotations
import os
import pickle
import sqlite3
import pytest
from mempalace.backends.chroma import (
_hnsw_element_count,
_vector_segment_id,
hnsw_capacity_status,
)
from mempalace.searcher import _bm25_only_via_sqlite
COLLECTION = "mempalace_drawers"
# ── Fixtures ──────────────────────────────────────────────────────────
def _seed_chroma_db(palace: str, sqlite_count: int, segment_id: str) -> None:
"""Create a minimal chroma.sqlite3 with one collection + VECTOR segment.
Mirrors the columns the probe queries: ``segments``, ``collections``,
``embeddings``, ``embedding_metadata``. Schema matches chromadb
1.5.x; column types are kept loose because we read with COUNT(*) and
SELECT key, *_value rather than driver-specific casts.
"""
db_path = os.path.join(palace, "chroma.sqlite3")
conn = sqlite3.connect(db_path)
try:
conn.executescript(
"""
CREATE TABLE collections (
id TEXT PRIMARY KEY,
name TEXT NOT NULL
);
CREATE TABLE segments (
id TEXT PRIMARY KEY,
collection TEXT NOT NULL,
scope TEXT NOT NULL
);
CREATE TABLE embeddings (
id INTEGER PRIMARY KEY,
segment_id TEXT NOT NULL,
embedding_id TEXT NOT NULL,
seq_id BLOB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE embedding_metadata (
id INTEGER REFERENCES embeddings(id),
key TEXT NOT NULL,
string_value TEXT,
int_value INTEGER,
float_value REAL,
bool_value INTEGER,
PRIMARY KEY (id, key)
);
CREATE VIRTUAL TABLE embedding_fulltext_search
USING fts5(string_value, tokenize='trigram');
"""
)
col_id = "col-test"
meta_seg = "seg-meta"
conn.execute("INSERT INTO collections (id, name) VALUES (?, ?)", (col_id, COLLECTION))
conn.execute(
"INSERT INTO segments (id, collection, scope) VALUES (?, ?, 'VECTOR')",
(segment_id, col_id),
)
conn.execute(
"INSERT INTO segments (id, collection, scope) VALUES (?, ?, 'METADATA')",
(meta_seg, col_id),
)
for i in range(sqlite_count):
conn.execute(
"""INSERT INTO embeddings (id, segment_id, embedding_id, seq_id)
VALUES (?, ?, ?, ?)""",
(i + 1, segment_id, f"d-{i}", b"\x00\x00\x00\x00\x00\x00\x00\x01"),
)
conn.commit()
finally:
conn.close()
def _write_pickle(palace: str, segment_id: str, hnsw_count: int) -> None:
"""Write an index_metadata.pickle matching chromadb 1.5.x's shape.
1.5.x ``__reduce_ex__`` serializes the PersistentData instance as a
plain dict; we replicate that so the safe unpickler in
``_hnsw_element_count`` reads the same bytes shape it would in
production.
"""
seg_dir = os.path.join(palace, segment_id)
os.makedirs(seg_dir, exist_ok=True)
pickle_path = os.path.join(seg_dir, "index_metadata.pickle")
state = {
"dimensionality": 384,
"total_elements_added": hnsw_count,
"max_seq_id": None,
"id_to_label": {f"d-{i}": i for i in range(hnsw_count)},
"label_to_id": {i: f"d-{i}" for i in range(hnsw_count)},
"id_to_seq_id": {},
}
with open(pickle_path, "wb") as f:
pickle.dump(state, f, pickle.HIGHEST_PROTOCOL)
# ── _vector_segment_id ────────────────────────────────────────────────
def test_vector_segment_id_returns_uuid(tmp_path):
seg = "11111111-2222-3333-4444-555555555555"
_seed_chroma_db(str(tmp_path), sqlite_count=10, segment_id=seg)
assert _vector_segment_id(str(tmp_path), COLLECTION) == seg
def test_vector_segment_id_no_palace(tmp_path):
assert _vector_segment_id(str(tmp_path), COLLECTION) is None
def test_vector_segment_id_unknown_collection(tmp_path):
seg = "11111111-2222-3333-4444-555555555555"
_seed_chroma_db(str(tmp_path), sqlite_count=10, segment_id=seg)
assert _vector_segment_id(str(tmp_path), "nope") is None
# ── _hnsw_element_count ───────────────────────────────────────────────
def test_hnsw_element_count_reads_pickle(tmp_path):
seg = "seg-001"
_seed_chroma_db(str(tmp_path), sqlite_count=100, segment_id=seg)
_write_pickle(str(tmp_path), seg, hnsw_count=42)
assert _hnsw_element_count(str(tmp_path), seg) == 42
def test_hnsw_element_count_missing_pickle(tmp_path):
seg = "seg-001"
_seed_chroma_db(str(tmp_path), sqlite_count=100, segment_id=seg)
# Segment dir doesn't even exist — no flush ever happened.
assert _hnsw_element_count(str(tmp_path), seg) is None
def test_hnsw_element_count_rejects_arbitrary_class(tmp_path):
"""Pickled references to unallowed classes must not deserialize.
Guards against a tampered ``index_metadata.pickle`` triggering code
execution. The unpickler allowlist is the only protection between
the file and arbitrary import-time side effects. We hand-craft the
pickle bytes (rather than ``pickle.dump`` a local class) because
pickle can't serialize locally-defined classes — but the bytes form
that names an arbitrary stdlib class is a faithful proxy for the
tampered-file threat we want to test.
"""
import pickle as _pickle
seg = "seg-evil"
seg_dir = tmp_path / seg
seg_dir.mkdir()
pickle_path = seg_dir / "index_metadata.pickle"
# GLOBAL opcode pointing at os.system, then STOP. If the unpickler
# didn't enforce the allowlist, find_class would resolve os.system
# and pickle would set up the call. The allowlist must reject it
# before find_class returns anything.
payload = b"c" + b"os\nsystem\n" + _pickle.STOP
pickle_path.write_bytes(payload)
assert _hnsw_element_count(str(tmp_path), seg) is None
# ── hnsw_capacity_status ──────────────────────────────────────────────
def test_capacity_status_ok_when_balanced(tmp_path):
seg = "seg-001"
_seed_chroma_db(str(tmp_path), sqlite_count=1000, segment_id=seg)
_write_pickle(str(tmp_path), seg, hnsw_count=950)
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
assert info["status"] == "ok"
assert info["diverged"] is False
assert info["sqlite_count"] == 1000
assert info["hnsw_count"] == 950
def test_capacity_status_flags_severe_divergence(tmp_path):
"""Reproduces #1222: sqlite has 192k, HNSW frozen at ~16k."""
seg = "seg-1222"
_seed_chroma_db(str(tmp_path), sqlite_count=20_000, segment_id=seg)
_write_pickle(str(tmp_path), seg, hnsw_count=2_000)
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
assert info["status"] == "diverged"
assert info["diverged"] is True
assert info["divergence"] == 18_000
assert "repair" in info["message"].lower()
def test_capacity_status_tolerates_flush_lag(tmp_path):
"""A few hundred entries behind sqlite is normal post-mine state."""
seg = "seg-lag"
_seed_chroma_db(str(tmp_path), sqlite_count=5_000, segment_id=seg)
_write_pickle(str(tmp_path), seg, hnsw_count=4_500)
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
assert info["diverged"] is False
assert info["status"] == "ok"
def test_capacity_status_flags_unflushed_with_large_sqlite(tmp_path):
"""No pickle + many sqlite rows is its own divergence signal."""
seg = "seg-noflush"
_seed_chroma_db(str(tmp_path), sqlite_count=10_000, segment_id=seg)
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
assert info["diverged"] is True
assert info["hnsw_count"] is None
assert "never flushed" in info["message"]
def test_capacity_status_quiet_for_empty_palace(tmp_path):
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
assert info["diverged"] is False
assert info["status"] == "unknown"
# ── BM25-only sqlite fallback ─────────────────────────────────────────
def _seed_drawers(palace: str, segment_id: str, drawers: list[tuple[str, dict, str]]) -> None:
"""Insert (text, metadata, embedding_id) tuples into a seeded palace.
Replaces the bare ``embeddings`` rows from ``_seed_chroma_db`` so the
sqlite count matches what we insert here.
"""
db_path = os.path.join(palace, "chroma.sqlite3")
conn = sqlite3.connect(db_path)
try:
conn.execute("DELETE FROM embeddings")
for i, (text, meta, eid) in enumerate(drawers, start=1):
conn.execute(
"""INSERT INTO embeddings (id, segment_id, embedding_id, seq_id)
VALUES (?, ?, ?, ?)""",
(i, segment_id, eid, b"\x00" * 8),
)
conn.execute(
"""INSERT INTO embedding_metadata (id, key, string_value)
VALUES (?, 'chroma:document', ?)""",
(i, text),
)
conn.execute(
"INSERT INTO embedding_fulltext_search (rowid, string_value) VALUES (?, ?)",
(i, text),
)
for k, v in meta.items():
if isinstance(v, int):
conn.execute(
"""INSERT INTO embedding_metadata (id, key, int_value)
VALUES (?, ?, ?)""",
(i, k, v),
)
else:
conn.execute(
"""INSERT INTO embedding_metadata (id, key, string_value)
VALUES (?, ?, ?)""",
(i, k, str(v)),
)
conn.commit()
finally:
conn.close()
@pytest.fixture
def palace_with_drawers(tmp_path):
seg = "seg-bm25"
_seed_chroma_db(str(tmp_path), sqlite_count=0, segment_id=seg)
drawers = [
(
"ChromaDB segfault on every tool call after HNSW divergence",
{"wing": "ops", "room": "incidents", "source_file": "/x/incident.md"},
"d-1",
),
(
"Memory palace technique using rooms and drawers for recall",
{"wing": "design", "room": "metaphor", "source_file": "/x/design.md"},
"d-2",
),
(
"Repair rebuild backs up only the sqlite database",
{"wing": "ops", "room": "runbook", "source_file": "/x/repair.md"},
"d-3",
),
]
_seed_drawers(str(tmp_path), seg, drawers)
return tmp_path
def test_bm25_fallback_returns_matches(palace_with_drawers):
out = _bm25_only_via_sqlite("segfault chromadb", str(palace_with_drawers), n_results=5)
assert out["fallback"] == "bm25_only_via_sqlite"
assert len(out["results"]) >= 1
top = out["results"][0]
# The incident drawer is the closest BM25 match for these terms.
assert "segfault" in top["text"].lower()
assert top["matched_via"] == "bm25_sqlite"
# Vector fields are intentionally absent in fallback mode.
assert top["similarity"] is None
assert top["distance"] is None
def test_bm25_fallback_filters_by_wing(palace_with_drawers):
out = _bm25_only_via_sqlite(
"memory palace recall", str(palace_with_drawers), wing="design", n_results=5
)
assert all(r["wing"] == "design" for r in out["results"])
def test_bm25_fallback_no_palace(tmp_path):
out = _bm25_only_via_sqlite("anything", str(tmp_path))
assert "error" in out
def test_bm25_fallback_handles_short_query(palace_with_drawers):
"""Single-character tokens are unmatchable in trigram FTS5 — must
not crash, must fall back to the recency window."""
out = _bm25_only_via_sqlite("a", str(palace_with_drawers), n_results=5)
# Falls back to recency window; returns whatever it can rank.
assert out["fallback"] == "bm25_only_via_sqlite"
assert isinstance(out["results"], list)
# ── repair.status CLI command ─────────────────────────────────────────
def test_repair_status_reports_diverged(tmp_path, capsys):
"""The status command prints DIVERGED and recommends rebuild."""
from mempalace.repair import status as repair_status
seg = "seg-status"
_seed_chroma_db(str(tmp_path), sqlite_count=20_000, segment_id=seg)
_write_pickle(str(tmp_path), seg, hnsw_count=2_000)
out = repair_status(palace_path=str(tmp_path))
captured = capsys.readouterr().out
assert "DIVERGED" in captured
assert "mempalace repair`" in captured
assert out["drawers"]["diverged"] is True
def test_repair_status_quiet_on_healthy_palace(tmp_path, capsys):
from mempalace.repair import status as repair_status
seg = "seg-status-ok"
_seed_chroma_db(str(tmp_path), sqlite_count=500, segment_id=seg)
_write_pickle(str(tmp_path), seg, hnsw_count=480)
repair_status(palace_path=str(tmp_path))
captured = capsys.readouterr().out
assert "DIVERGED" not in captured
assert "Recommended" not in captured
# ── tool_status sqlite fallback (#1222 short-circuit) ─────────────────
def test_tool_status_via_sqlite_returns_breakdown(palace_with_drawers, monkeypatch):
"""When _vector_disabled is set, tool_status reads counts from sqlite
instead of opening a chromadb client."""
from mempalace import mcp_server
# _config.palace_path is a read-only property; swap the whole object
# for a tiny stand-in so we don't have to monkey with the real
# MempalaceConfig.
class _Cfg:
palace_path = str(palace_with_drawers)
monkeypatch.setattr(mcp_server, "_config", _Cfg())
monkeypatch.setattr(mcp_server, "_vector_disabled", True)
monkeypatch.setattr(mcp_server, "_vector_disabled_reason", "test divergence")
out = mcp_server._tool_status_via_sqlite()
assert out["vector_disabled"] is True
assert out["vector_disabled_reason"] == "test divergence"
assert out["total_drawers"] == 3
# Wing breakdown comes from the seeded palace_with_drawers fixture:
# ops×2 (incident + repair runbook), design×1 (metaphor).
assert out["wings"].get("ops") == 2
assert out["wings"].get("design") == 1