fix(repair): add --mode from-sqlite to recover palaces with corrupt HNSW (#1308)
Both `--mode legacy` and the inline `cli.cmd_repair` rebuild path call `Collection.count()` as their first read — the same call that raises `chromadb.errors.InternalError: Failed to apply logs to the hnsw segment writer` on the corruption class reported in #1308. Repair would print "Cannot recover — palace may need to be re-mined from source files" even though the underlying SQLite tables were fully intact. The new `--mode from-sqlite` reads `(id, document, metadata)` rows directly from `chroma.sqlite3` via `segments` → `embeddings` → `embedding_metadata` joins, never opens a chromadb client against the corrupt palace, and re-upserts everything into a fresh palace. - `--source PATH` extracts from a corrupt palace already moved aside - `--archive-existing` handles the in-place case by renaming the existing palace to `<palace>.pre-rebuild-<timestamp>` first - Partial-rebuild failures raise `RebuildPartialError` with the archive path so users can recover; CLI exits non-zero - In-place mode calls `SharedSystemClient.clear_system_cache()` to drop chromadb's process-wide System registry (cross-palace use does not, to limit blast radius for library callers) - Source validation runs before any destructive moves Verified end-to-end recovering a 52,300-row real-world corrupt palace. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
committed by
Igor Lins e Silva
parent
6741b6908e
commit
a7c4ed24d7
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||
- **CLI and `fact_checker --stdin` mojibaked non-ASCII content on Windows.** Python defaults `sys.stdin`/`stdout`/`stderr` to the system ANSI codepage (cp1252/cp1251/cp950), so `mempalace search > out.txt` and piped fact_checker invocations corrupted Cyrillic / CJK drawer text at the process boundary. New `mempalace/_stdio.py` helper reconfigures all three streams to UTF-8 on `sys.platform == "win32"`, with per-stream `errors` policy: `surrogateescape` on stdin (preserves bad bytes from redirected files for the consumer's parser), `replace` on stdout/stderr (substitutes U+FFFD instead of `UnicodeEncodeError`-ing mid-print). With this, all three user-facing console_scripts (`mcp_server`, `hooks_cli`, `cli`/`fact_checker`) now reconfigure identically on Windows. (#1282)
|
||||
- **MCP knowledge-graph tools forwarded malformed date strings to SQLite.** `tool_kg_query` (`as_of`), `tool_kg_add` (`valid_from`), and `tool_kg_invalidate` (`ended`) accepted any string and produced empty result sets on natural-language inputs like `"March 2026"` or `"yesterday"` — callers (especially LLM agents) could not distinguish "no fact at this time" from "your date format was unrecognized." New `sanitize_iso_date()` validator in `config.py` accepts `YYYY`, `YYYY-MM`, `YYYY-MM-DD` (and passes through `None`/`""`); all three tools call it before values reach the storage layer. **Behavior change:** previously-silent date typos now raise a clear `ValueError` naming the offending field; full ISO-8601 with time (`YYYY-MM-DDTHH:MM:SS`, timezone offsets) is not yet accepted — file an issue if you have a use case. (#1164, #1167)
|
||||
- **MCP server's `_kg` was a module-level singleton.** Multi-tenant hosts that rotate `MEMPALACE_PALACE_PATH` between tool calls hit the wrong sqlite file, because the KG was constructed once at import time while the ChromaDB side was already per-call via `_get_client()`. The KG is now resolved per-call through a lazy per-path cache (`_kg_by_path` keyed by `os.path.abspath`, with a double-checked-locking init under `_kg_cache_lock`). `tool_reconnect` drains and `close()`s cached KGs alongside the existing chroma reconnect. A `_call_kg` retry guard catches `sqlite3.ProgrammingError` once after a reconnect race. (#1136, #1160)
|
||||
- **`mempalace repair` can now recover palaces whose HNSW segment writer is stuck on `apply_logs`.** Both the existing `--mode legacy` rebuild and the inline `cli.cmd_repair` path call `Collection.count()` as their first read — exactly the call that raises `chromadb.errors.InternalError: Failed to apply logs to the hnsw segment writer` on the corruption class introduced upstream and reported in #1308. Repair would print `Cannot recover — palace may need to be re-mined from source files` even though the underlying SQLite tables were fully intact (the corruption lives in the on-disk index files, not the data layer). New `--mode from-sqlite` reads `(id, document, metadata)` rows directly from `chroma.sqlite3` via a `segments` → `embeddings` → `embedding_metadata` join, never opens a chromadb client against the corrupt palace, and re-upserts everything into a fresh palace at `--palace`. `--source PATH` extracts from a corrupt palace already moved aside; `--archive-existing` handles the in-place case by renaming the existing palace to `<palace>.pre-rebuild-<timestamp>` before reading from it. Documents are re-embedded under the user's configured embedding function (the original HNSW vectors live in the corrupt `data_level0.bin` and cannot be recovered, but the embedding model is deterministic so search results remain semantically equivalent). Verified end-to-end on a 52,300-row real-world corrupt palace. (#1308)
|
||||
|
||||
---
|
||||
|
||||
|
||||
+65
-3
@@ -673,6 +673,48 @@ def cmd_repair(args):
|
||||
)
|
||||
return
|
||||
|
||||
if getattr(args, "mode", "legacy") == "from-sqlite":
|
||||
from .migrate import confirm_destructive_action
|
||||
from .repair import RebuildPartialError, rebuild_from_sqlite
|
||||
|
||||
source_path = getattr(args, "source", None)
|
||||
source_path = (
|
||||
os.path.abspath(os.path.expanduser(source_path)) if source_path else palace_path
|
||||
)
|
||||
archive_existing = getattr(args, "archive_existing", False)
|
||||
|
||||
# Gate any path that touches the user's existing palace dir
|
||||
# behind confirm_destructive_action. The legacy mode already
|
||||
# gates; from-sqlite needs the same protection because:
|
||||
# (a) --archive-existing renames the existing palace,
|
||||
# (b) --source PATH writes into --palace dir which the user
|
||||
# may not realize is also a palace.
|
||||
# No prompt when source != dest AND dest does not exist (pure
|
||||
# extract-into-fresh-dir case is non-destructive to existing
|
||||
# palaces).
|
||||
is_destructive_to_dest = source_path == palace_path or os.path.exists(palace_path)
|
||||
if is_destructive_to_dest and not confirm_destructive_action(
|
||||
"Rebuild from SQLite", palace_path, assume_yes=getattr(args, "yes", False)
|
||||
):
|
||||
return
|
||||
|
||||
try:
|
||||
rebuild_from_sqlite(
|
||||
source_palace=source_path,
|
||||
dest_palace=palace_path,
|
||||
archive_existing_dest=archive_existing,
|
||||
)
|
||||
except RebuildPartialError as exc:
|
||||
# The error itself was already printed by rebuild_from_sqlite
|
||||
# with recovery instructions; surface a non-zero exit so
|
||||
# scripts and CI gates see the failure.
|
||||
print(
|
||||
"\n Rebuild partial — see message above. "
|
||||
f"Failed in collection: {exc.failed_collection}"
|
||||
)
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
db_path = os.path.join(palace_path, "chroma.sqlite3")
|
||||
|
||||
if not os.path.isdir(palace_path):
|
||||
@@ -1213,11 +1255,31 @@ def main():
|
||||
)
|
||||
p_repair.add_argument(
|
||||
"--mode",
|
||||
choices=["legacy", "max-seq-id"],
|
||||
choices=["legacy", "max-seq-id", "from-sqlite"],
|
||||
default="legacy",
|
||||
help=(
|
||||
"legacy: full-palace rebuild (default). "
|
||||
"max-seq-id: un-poison max_seq_id rows corrupted by the legacy 0.6.x shim."
|
||||
"legacy: full-palace rebuild via the chromadb client (default). "
|
||||
"max-seq-id: un-poison max_seq_id rows corrupted by the legacy 0.6.x shim. "
|
||||
"from-sqlite: rebuild by reading rows directly from chroma.sqlite3, "
|
||||
"bypassing the chromadb client. Use when legacy mode bails because the "
|
||||
"chromadb client cannot open the collection."
|
||||
),
|
||||
)
|
||||
p_repair.add_argument(
|
||||
"--source",
|
||||
default=None,
|
||||
help=(
|
||||
"Source palace path for --mode from-sqlite (defaults to --palace). "
|
||||
"Use when extracting from an archived corrupt palace into a new location."
|
||||
),
|
||||
)
|
||||
p_repair.add_argument(
|
||||
"--archive-existing",
|
||||
action="store_true",
|
||||
help=(
|
||||
"For --mode from-sqlite when --source equals --palace: rename the "
|
||||
"existing palace to <palace>.pre-rebuild-<timestamp> before "
|
||||
"rebuilding so the corrupt copy is preserved."
|
||||
),
|
||||
)
|
||||
p_repair.add_argument(
|
||||
|
||||
+357
-1
@@ -34,14 +34,21 @@ import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from typing import Iterator, Optional
|
||||
|
||||
from .backends.chroma import ChromaBackend, hnsw_capacity_status
|
||||
|
||||
|
||||
COLLECTION_NAME = "mempalace_drawers"
|
||||
|
||||
# Collections rebuilt by ``rebuild_from_sqlite``. Order matters for the
|
||||
# upsert pass — drawers carry the bulk of the data, closets are the AAAK
|
||||
# index layer and reference drawer IDs by string in their documents (no
|
||||
# foreign-key validation, so ordering is informational, not load-bearing).
|
||||
RECOVERABLE_COLLECTIONS = ("mempalace_drawers", "mempalace_closets")
|
||||
|
||||
|
||||
def _get_palace_path():
|
||||
"""Resolve palace path from config."""
|
||||
@@ -436,6 +443,355 @@ def rebuild_index(palace_path=None, confirm_truncation_ok: bool = False):
|
||||
print(f"\n{'=' * 55}\n")
|
||||
|
||||
|
||||
class RebuildPartialError(Exception):
|
||||
"""Raised when ``rebuild_from_sqlite`` fails partway through upserts.
|
||||
|
||||
Carries enough state for the user (or CLI) to recover: the
|
||||
per-collection counts that succeeded, the collection that failed,
|
||||
the dest path holding the partial palace, and the archive path
|
||||
(when an in-place rebuild had moved the original aside). Re-raises
|
||||
the underlying chromadb error as ``__cause__``.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
partial_counts: dict[str, int],
|
||||
failed_collection: str,
|
||||
dest_palace: str,
|
||||
archive_path: Optional[str],
|
||||
):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
self.partial_counts = partial_counts
|
||||
self.failed_collection = failed_collection
|
||||
self.dest_palace = dest_palace
|
||||
self.archive_path = archive_path
|
||||
|
||||
|
||||
def _rebuild_one_collection(
|
||||
*,
|
||||
backend: ChromaBackend,
|
||||
source_palace: str,
|
||||
dest_palace: str,
|
||||
collection_name: str,
|
||||
batch_size: int,
|
||||
archive_path: Optional[str],
|
||||
counts_so_far: dict[str, int],
|
||||
) -> int:
|
||||
"""Stream rows for one collection from SQLite and upsert into a
|
||||
freshly-created collection at ``dest_palace``. Returns rows
|
||||
upserted. Raises :class:`RebuildPartialError` (with the underlying
|
||||
chromadb exception as ``__cause__``) on any upsert failure so the
|
||||
caller can stop the loop and print recovery instructions instead of
|
||||
silently shipping a partial palace.
|
||||
"""
|
||||
col = backend.create_collection(dest_palace, collection_name)
|
||||
|
||||
ids: list[str] = []
|
||||
docs: list[str] = []
|
||||
metas: list[dict] = []
|
||||
upserted = 0
|
||||
|
||||
def _flush() -> int:
|
||||
nonlocal upserted
|
||||
if not ids:
|
||||
return upserted
|
||||
col.upsert(ids=list(ids), documents=list(docs), metadatas=list(metas))
|
||||
upserted += len(ids)
|
||||
print(f" upserted {upserted}")
|
||||
ids.clear()
|
||||
docs.clear()
|
||||
metas.clear()
|
||||
return upserted
|
||||
|
||||
try:
|
||||
for emb_id, doc, meta in extract_via_sqlite(source_palace, collection_name):
|
||||
ids.append(emb_id)
|
||||
docs.append(doc or "")
|
||||
# chromadb 1.5.x rejects None entries in the metadatas list
|
||||
# but accepts empty dicts. Mempalace drawers always carry at
|
||||
# least wing/room, so this branch is defensive — corruption
|
||||
# in embedding_metadata could yield an emb_id with no rows.
|
||||
metas.append(meta if meta else {})
|
||||
if len(ids) >= batch_size:
|
||||
_flush()
|
||||
_flush()
|
||||
except Exception as exc: # noqa: BLE001 — chromadb raises many shapes
|
||||
partial = dict(counts_so_far)
|
||||
partial[collection_name] = upserted
|
||||
msg_parts = [
|
||||
f"Upsert failed in collection {collection_name!r} after {upserted} rows: {exc!r}",
|
||||
f"Partial palace left at: {dest_palace}",
|
||||
]
|
||||
if archive_path is not None:
|
||||
msg_parts.append(f"Original palace archived at: {archive_path}")
|
||||
msg_parts.append(
|
||||
" Recover by removing the partial dest and re-running with "
|
||||
f"--source {archive_path}"
|
||||
)
|
||||
else:
|
||||
msg_parts.append(" Source palace is unchanged. Remove the partial dest and re-run.")
|
||||
message = "\n ".join(msg_parts)
|
||||
print(f"\n ERROR: {message}")
|
||||
raise RebuildPartialError(
|
||||
message,
|
||||
partial_counts=partial,
|
||||
failed_collection=collection_name,
|
||||
dest_palace=dest_palace,
|
||||
archive_path=archive_path,
|
||||
) from exc
|
||||
|
||||
return upserted
|
||||
|
||||
|
||||
def extract_via_sqlite(palace_path: str, collection_name: str) -> Iterator[tuple[str, str, dict]]:
|
||||
"""Yield ``(embedding_id, document, metadata)`` for every row in
|
||||
``collection_name``'s metadata segment by reading ``chroma.sqlite3``
|
||||
directly.
|
||||
|
||||
Bypasses the chromadb client entirely — never opens a
|
||||
``PersistentClient``, never imports hnswlib, never invokes the
|
||||
HNSW segment writer. This is the recovery path for palaces where
|
||||
``Collection.count()`` / ``Collection.get()`` raise ``InternalError``
|
||||
because the compactor cannot apply WAL logs to the HNSW segment
|
||||
(#1308). The drawer rows are still on disk in
|
||||
``embeddings`` + ``embedding_metadata``; the corruption lives in the
|
||||
on-disk index files, not the SQLite tables.
|
||||
|
||||
Resolution rule for chromadb's typed metadata columns: each
|
||||
``embedding_metadata`` row stores its value in exactly one of
|
||||
``string_value`` / ``int_value`` / ``float_value`` / ``bool_value``;
|
||||
we pick the first non-NULL column in that order. Rows where every
|
||||
typed column is NULL are dropped (chromadb never writes that shape).
|
||||
The ``chroma:document`` key is removed from the metadata dict and
|
||||
returned as the document; this matches how chromadb itself stores
|
||||
``add(documents=...)``.
|
||||
|
||||
Silent on missing palace, missing ``chroma.sqlite3``, or unknown
|
||||
collection name — yields nothing. Callers that need to distinguish
|
||||
"empty collection" from "collection not present" should query
|
||||
:func:`sqlite_drawer_count` first.
|
||||
"""
|
||||
sqlite_path = os.path.join(palace_path, "chroma.sqlite3")
|
||||
if not os.path.isfile(sqlite_path):
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(f"file:{sqlite_path}?mode=ro", uri=True)
|
||||
try:
|
||||
seg_row = conn.execute(
|
||||
"""
|
||||
SELECT s.id FROM segments s
|
||||
JOIN collections c ON s.collection = c.id
|
||||
WHERE c.name = ? AND s.scope = 'METADATA'
|
||||
""",
|
||||
(collection_name,),
|
||||
).fetchone()
|
||||
if not seg_row:
|
||||
return
|
||||
segment_id = seg_row[0]
|
||||
|
||||
per_id: dict[str, dict] = defaultdict(dict)
|
||||
order: list[str] = []
|
||||
for emb_id, key, sv, iv, fv, bv in conn.execute(
|
||||
"""
|
||||
SELECT e.embedding_id, em.key, em.string_value, em.int_value,
|
||||
em.float_value, em.bool_value
|
||||
FROM embedding_metadata em
|
||||
JOIN embeddings e ON em.id = e.id
|
||||
WHERE e.segment_id = ?
|
||||
ORDER BY em.id
|
||||
""",
|
||||
(segment_id,),
|
||||
):
|
||||
if emb_id not in per_id:
|
||||
order.append(emb_id)
|
||||
if sv is not None:
|
||||
per_id[emb_id][key] = sv
|
||||
elif iv is not None:
|
||||
per_id[emb_id][key] = iv
|
||||
elif fv is not None:
|
||||
per_id[emb_id][key] = fv
|
||||
elif bv is not None:
|
||||
per_id[emb_id][key] = bool(bv)
|
||||
|
||||
for emb_id in order:
|
||||
kv = per_id[emb_id]
|
||||
doc = kv.pop("chroma:document", "")
|
||||
yield emb_id, doc, kv
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def rebuild_from_sqlite(
|
||||
source_palace: str,
|
||||
dest_palace: str,
|
||||
*,
|
||||
archive_existing_dest: bool = False,
|
||||
batch_size: int = 1000,
|
||||
) -> dict[str, int]:
|
||||
"""Rebuild a palace by reading drawers from ``source_palace``'s
|
||||
``chroma.sqlite3`` and upserting them into a fresh palace at
|
||||
``dest_palace``.
|
||||
|
||||
Recovery path for the #1308 failure mode: the chromadb client raises
|
||||
``InternalError: Failed to apply logs to the hnsw segment writer``
|
||||
on every operation that touches the index (``count``, ``get``,
|
||||
``query``), but the underlying SQLite tables are intact. Both the
|
||||
legacy ``rebuild_index`` and the inline ``cli.cmd_repair`` path call
|
||||
``Collection.count()`` as their first read — exactly the call that
|
||||
fails — so neither can recover this class of corruption. This
|
||||
function bypasses the chromadb read path entirely via
|
||||
:func:`extract_via_sqlite`.
|
||||
|
||||
Re-embeds documents at upsert time using the configured embedding
|
||||
function; the original HNSW vectors are not preserved (they live in
|
||||
the corrupt ``data_level0.bin`` / ``link_lists.bin``, not in
|
||||
SQLite). Acceptable for a corruption-recovery flow because the
|
||||
embedding model is deterministic — same model + same document text
|
||||
yields semantically equivalent search results.
|
||||
|
||||
``archive_existing_dest`` controls behavior when ``dest_palace``
|
||||
already exists:
|
||||
|
||||
* ``False`` (default) — refuse with a clear message. Callers must
|
||||
manually move the existing palace aside first.
|
||||
* ``True`` — rename ``dest_palace`` to
|
||||
``<dest_palace>.pre-rebuild-<timestamp>`` and read from there
|
||||
instead. Used by the in-place CLI flow where ``--source`` defaults
|
||||
to the same path as ``--palace``.
|
||||
|
||||
Returns a ``{collection_name: row_count}`` dict so callers (CLI,
|
||||
tests) can verify the per-collection rebuild count without parsing
|
||||
stdout. Returns ``{}`` on validation failures (missing source,
|
||||
refusing to overwrite). Raises :class:`RebuildPartialError` if a
|
||||
chromadb upsert fails partway through; the dest palace is left in
|
||||
place so the user can inspect what landed, and the in-place archive
|
||||
(when applicable) is reported in the error so the user can re-run
|
||||
against it.
|
||||
|
||||
.. warning::
|
||||
|
||||
In-place mode (``source_palace == dest_palace`` with
|
||||
``archive_existing_dest=True``) calls
|
||||
``chromadb.api.client.SharedSystemClient.clear_system_cache()`` to
|
||||
drop chromadb's process-wide System registry — required because
|
||||
an existing cached System built against the original palace will
|
||||
refuse ``create_collection`` after the dir is renamed (chromadb
|
||||
still thinks the collections exist). This invalidates any
|
||||
PersistentClient instances held elsewhere in the same process for
|
||||
*any* palace, not just this one. Do not call this function from
|
||||
inside a long-running mempalace process (MCP server, daemon)
|
||||
while other callers hold live ``PersistentClient`` references —
|
||||
use the CLI in a separate process instead. Cross-palace use
|
||||
(``source != dest``) does not touch the cache.
|
||||
|
||||
Note on metadata fidelity: the resolution rule
|
||||
(``string_value`` → ``int_value`` → ``float_value`` → ``bool_value``)
|
||||
matches the precedent in :mod:`mempalace.migrate`. ChromaDB 0.4.x
|
||||
occasionally wrote booleans as ``int_value=0/1``; those will
|
||||
round-trip as ``int`` rather than ``bool`` after this rebuild. This
|
||||
is a known divergence and matches the existing migrate-path
|
||||
behavior.
|
||||
"""
|
||||
source_palace = os.path.abspath(os.path.expanduser(source_palace))
|
||||
dest_palace = os.path.abspath(os.path.expanduser(dest_palace))
|
||||
|
||||
src_db = os.path.join(source_palace, "chroma.sqlite3")
|
||||
|
||||
in_place = source_palace == dest_palace
|
||||
|
||||
print(f"\n{'=' * 55}")
|
||||
print(" MemPalace Repair — Rebuild from SQLite")
|
||||
print(f"{'=' * 55}\n")
|
||||
print(f" Source: {source_palace}")
|
||||
print(f" Dest: {dest_palace}")
|
||||
|
||||
# Validate source BEFORE any destructive moves. An earlier draft
|
||||
# archived the dest first and surfaced the missing-chroma.sqlite3
|
||||
# error after — leaving the user with a renamed dir to manually undo
|
||||
# when the archive itself was empty. Validate first so a user error
|
||||
# (--source pointing at a non-palace dir) bails cleanly.
|
||||
if in_place:
|
||||
if not archive_existing_dest:
|
||||
print(
|
||||
"\n Source and dest are the same path. Pass "
|
||||
"archive_existing_dest=True (CLI: --archive-existing) to move "
|
||||
"the existing palace aside, or pass a different source_palace= "
|
||||
"(CLI: --source)."
|
||||
)
|
||||
return {}
|
||||
if not os.path.isfile(src_db):
|
||||
print(f"\n Source palace has no chroma.sqlite3 at {src_db}")
|
||||
return {}
|
||||
else:
|
||||
if not os.path.isfile(src_db):
|
||||
print(f"\n Source palace has no chroma.sqlite3 at {src_db}")
|
||||
return {}
|
||||
if os.path.exists(dest_palace):
|
||||
print(
|
||||
f"\n Refusing to rebuild into existing path: {dest_palace}\n"
|
||||
" Move it aside, pass a different dest, or set "
|
||||
"archive_existing_dest=True if rebuilding in place "
|
||||
"(source_palace == dest_palace)."
|
||||
)
|
||||
return {}
|
||||
|
||||
archive_path: Optional[str] = None
|
||||
if in_place:
|
||||
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
archive_path = f"{dest_palace}.pre-rebuild-{ts}"
|
||||
print(f" Archiving {dest_palace} → {archive_path}")
|
||||
shutil.move(dest_palace, archive_path)
|
||||
source_palace = archive_path
|
||||
src_db = os.path.join(source_palace, "chroma.sqlite3")
|
||||
|
||||
# In-place only: drop chromadb's process-wide System registry so
|
||||
# the new client at dest_palace builds a fresh System. Without
|
||||
# this, ``create_collection`` raises "Collection already exists"
|
||||
# because the cached System still holds the pre-rename schema.
|
||||
# Cross-palace mode does not need this and would needlessly
|
||||
# invalidate other callers' clients (see docstring warning).
|
||||
try:
|
||||
from chromadb.api.client import SharedSystemClient
|
||||
|
||||
SharedSystemClient.clear_system_cache()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(
|
||||
f" Warning: could not clear chromadb system cache ({exc!r}); "
|
||||
"in-place rebuild may fail with 'Collection already exists'."
|
||||
)
|
||||
|
||||
os.makedirs(dest_palace, exist_ok=True)
|
||||
|
||||
backend = ChromaBackend()
|
||||
counts: dict[str, int] = {}
|
||||
|
||||
for cname in RECOVERABLE_COLLECTIONS:
|
||||
print(f"\n [{cname}]")
|
||||
upserted = _rebuild_one_collection(
|
||||
backend=backend,
|
||||
source_palace=source_palace,
|
||||
dest_palace=dest_palace,
|
||||
collection_name=cname,
|
||||
batch_size=batch_size,
|
||||
archive_path=archive_path,
|
||||
counts_so_far=counts,
|
||||
)
|
||||
counts[cname] = upserted
|
||||
if upserted == 0:
|
||||
print(f" no rows found for {cname} in source palace")
|
||||
else:
|
||||
print(f" done: {upserted} rows in {cname}")
|
||||
|
||||
print(f"\n Rebuild complete. {sum(counts.values())} total rows.")
|
||||
if archive_path is not None:
|
||||
print(f" Original palace archived at: {archive_path}")
|
||||
print(f"{'=' * 55}\n")
|
||||
return counts
|
||||
|
||||
|
||||
def status(palace_path=None) -> dict:
|
||||
"""Read-only health check: compare sqlite vs HNSW element counts.
|
||||
|
||||
|
||||
@@ -682,3 +682,294 @@ def test_max_seq_id_rollback_on_verification_failure(tmp_path, monkeypatch):
|
||||
# A backup file is still present — caller can roll back from it.
|
||||
leftover = [fn for fn in os.listdir(palace) if "max-seq-id-backup-" in fn]
|
||||
assert leftover
|
||||
|
||||
|
||||
# ── extract_via_sqlite + rebuild_from_sqlite (#1308) ──────────────────
|
||||
#
|
||||
# These tests build real chromadb palaces in tmp_path rather than mocking
|
||||
# the SQLite layer. The bug class they guard against is "extraction sees
|
||||
# different rows than chromadb stored" — the only honest check is to let
|
||||
# chromadb actually write rows and then read them back via the SQLite
|
||||
# bypass. Mocking the SQLite cursor would defeat the test.
|
||||
|
||||
|
||||
def _seed_palace(palace_path, collection_name, rows):
|
||||
"""Build a real chromadb palace at ``palace_path`` and add ``rows``.
|
||||
|
||||
``rows`` is a list of ``(id, document, metadata)`` tuples. Returns
|
||||
the populated collection so callers can assert on the writer's view
|
||||
of state before the SQLite read.
|
||||
"""
|
||||
from mempalace.backends.chroma import ChromaBackend
|
||||
|
||||
backend = ChromaBackend()
|
||||
col = backend.create_collection(str(palace_path), collection_name)
|
||||
col.upsert(
|
||||
ids=[r[0] for r in rows],
|
||||
documents=[r[1] for r in rows],
|
||||
metadatas=[r[2] for r in rows],
|
||||
)
|
||||
return col
|
||||
|
||||
|
||||
def test_extract_via_sqlite_returns_all_rows_with_metadata(tmp_path):
|
||||
"""Round-trip: a chromadb palace with N upserted rows returns those
|
||||
same N rows when read via the SQLite bypass.
|
||||
|
||||
Catches: anyone who breaks the segments/embeddings/embedding_metadata
|
||||
JOIN, swaps the metadata vs vector segment, or changes how the
|
||||
document is stored under the ``chroma:document`` key.
|
||||
"""
|
||||
rows = [
|
||||
(f"drawer_{i:03d}", f"document body {i}", {"wing": "test_wing", "room": f"r{i % 3}"})
|
||||
for i in range(25)
|
||||
]
|
||||
_seed_palace(tmp_path, "mempalace_drawers", rows)
|
||||
|
||||
extracted = list(repair.extract_via_sqlite(str(tmp_path), "mempalace_drawers"))
|
||||
|
||||
assert len(extracted) == 25
|
||||
by_id = {emb_id: (doc, meta) for emb_id, doc, meta in extracted}
|
||||
assert set(by_id) == {r[0] for r in rows}
|
||||
for emb_id, doc, meta in rows:
|
||||
got_doc, got_meta = by_id[emb_id]
|
||||
assert got_doc == doc, f"document mangled for {emb_id}"
|
||||
assert got_meta == meta, f"metadata mangled for {emb_id}: {got_meta!r}"
|
||||
|
||||
|
||||
def test_extract_via_sqlite_preserves_typed_metadata(tmp_path):
|
||||
"""Chromadb stores int / float / bool / string in distinct typed
|
||||
columns. Extraction must round-trip the original type, not coerce
|
||||
everything to string.
|
||||
|
||||
Catches: a regression where the SELECT order changes and ints come
|
||||
back as None, or where the column-resolution rule prefers the wrong
|
||||
column.
|
||||
"""
|
||||
rows = [
|
||||
(
|
||||
"drawer_typed",
|
||||
"doc",
|
||||
{
|
||||
"wing": "w",
|
||||
"chunk_index": 7, # int
|
||||
"score": 0.42, # float
|
||||
"is_active": True, # bool
|
||||
},
|
||||
),
|
||||
]
|
||||
_seed_palace(tmp_path, "mempalace_drawers", rows)
|
||||
|
||||
extracted = list(repair.extract_via_sqlite(str(tmp_path), "mempalace_drawers"))
|
||||
assert len(extracted) == 1
|
||||
_, _, meta = extracted[0]
|
||||
|
||||
assert meta["chunk_index"] == 7 and isinstance(meta["chunk_index"], int)
|
||||
assert meta["score"] == 0.42 and isinstance(meta["score"], float)
|
||||
assert meta["is_active"] is True
|
||||
assert meta["wing"] == "w"
|
||||
|
||||
|
||||
def test_extract_via_sqlite_unknown_collection_yields_nothing(tmp_path):
|
||||
"""Asking for a collection that isn't in the palace must return an
|
||||
empty iterator, not silently fall back to another collection's
|
||||
metadata segment. Seeds two real collections and queries for a third
|
||||
name so a regression that drops the WHERE c.name=? filter would leak
|
||||
rows from the seeded collections rather than passing.
|
||||
"""
|
||||
_seed_palace(tmp_path, "mempalace_drawers", [("d1", "doc", {"wing": "w"})])
|
||||
_seed_palace(tmp_path, "mempalace_closets", [("c1", "abbrev", {"wing": "w"})])
|
||||
assert list(repair.extract_via_sqlite(str(tmp_path), "not_a_real_collection")) == []
|
||||
|
||||
|
||||
def test_extract_via_sqlite_missing_palace_yields_nothing(tmp_path):
|
||||
"""No chroma.sqlite3 → empty iterator, no exception. Callers depend
|
||||
on this when probing speculatively."""
|
||||
empty = tmp_path / "no_palace_here"
|
||||
empty.mkdir()
|
||||
assert list(repair.extract_via_sqlite(str(empty), "mempalace_drawers")) == []
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_roundtrips_via_real_chromadb(tmp_path):
|
||||
"""End-to-end: seed source palace, rebuild into a fresh dest, then
|
||||
open dest with a fresh ChromaBackend and verify ``count()`` and
|
||||
metadata filters return the original rows. Also asserts a closet
|
||||
document round-trips so a future regression that re-embeds with the
|
||||
wrong EF or swaps drawer/closet content would fail here.
|
||||
|
||||
This is the single most important regression guard. If
|
||||
``rebuild_from_sqlite`` silently drops rows or mangles metadata, no
|
||||
other test in this file would catch it because they all stop at the
|
||||
extraction layer.
|
||||
"""
|
||||
from mempalace.backends.chroma import ChromaBackend
|
||||
|
||||
source = tmp_path / "source"
|
||||
dest = tmp_path / "dest"
|
||||
|
||||
rows = [
|
||||
(f"drawer_{i:03d}", f"body {i}", {"wing": "alpha" if i % 2 else "beta", "room": "r0"})
|
||||
for i in range(40)
|
||||
]
|
||||
_seed_palace(source, "mempalace_drawers", rows)
|
||||
_seed_palace(
|
||||
source,
|
||||
"mempalace_closets",
|
||||
[("closet_x", "abbrev pointer →drawer_001", {"wing": "alpha"})],
|
||||
)
|
||||
|
||||
counts = repair.rebuild_from_sqlite(str(source), str(dest))
|
||||
assert counts == {"mempalace_drawers": 40, "mempalace_closets": 1}
|
||||
|
||||
backend = ChromaBackend()
|
||||
drawers = backend.get_collection(str(dest), "mempalace_drawers")
|
||||
assert drawers.count() == 40
|
||||
alpha = drawers.get(where={"wing": "alpha"})
|
||||
assert len(alpha["ids"]) == 20
|
||||
|
||||
# Spot-check that document text round-trips for one specific drawer
|
||||
# — protects against a regression where extraction or upsert order
|
||||
# silently swaps document bodies between IDs.
|
||||
one = drawers.get(ids=["drawer_007"], include=["documents", "metadatas"])
|
||||
assert one["documents"] == ["body 7"]
|
||||
assert one["metadatas"][0]["wing"] == "alpha"
|
||||
|
||||
# Closets: the AAAK index layer. Re-embedded with the same EF so a
|
||||
# known closet ID and its document body must come back intact.
|
||||
closets = backend.get_collection(str(dest), "mempalace_closets")
|
||||
assert closets.count() == 1
|
||||
closet_row = closets.get(ids=["closet_x"], include=["documents", "metadatas"])
|
||||
assert closet_row["documents"] == ["abbrev pointer →drawer_001"]
|
||||
assert closet_row["metadatas"][0] == {"wing": "alpha"}
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_refuses_existing_dest(tmp_path):
|
||||
"""Refuse to write into a directory that already exists when source
|
||||
and dest differ. Without this, an unattended re-run would silently
|
||||
interleave a partial rebuild with whatever's already at dest.
|
||||
"""
|
||||
source = tmp_path / "source"
|
||||
dest = tmp_path / "dest"
|
||||
_seed_palace(source, "mempalace_drawers", [("d1", "doc", {"wing": "w"})])
|
||||
dest.mkdir()
|
||||
# Drop a marker file so we can prove the dir wasn't touched.
|
||||
(dest / "marker.txt").write_text("preexisting")
|
||||
|
||||
counts = repair.rebuild_from_sqlite(str(source), str(dest))
|
||||
assert counts == {}
|
||||
assert (dest / "marker.txt").read_text() == "preexisting"
|
||||
assert not (dest / "chroma.sqlite3").exists()
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_in_place_archives_when_opted_in(tmp_path):
|
||||
"""In-place rebuild (source == dest) with ``archive_existing_dest=True``
|
||||
must move the original aside to ``<dest>.pre-rebuild-<ts>`` and read
|
||||
from the archive — the original drawer rows must survive in the new
|
||||
palace, AND the archive itself must still contain the original rows.
|
||||
|
||||
Catches: a refactor that moves the original out but then reads from
|
||||
the now-empty original location, producing an empty rebuild; also
|
||||
catches a swap that empties the archive after reading.
|
||||
"""
|
||||
palace = tmp_path / "palace"
|
||||
rows = [(f"d{i}", f"body {i}", {"wing": "w", "room": "r"}) for i in range(15)]
|
||||
_seed_palace(palace, "mempalace_drawers", rows)
|
||||
|
||||
counts = repair.rebuild_from_sqlite(str(palace), str(palace), archive_existing_dest=True)
|
||||
assert counts["mempalace_drawers"] == 15
|
||||
|
||||
archives = [p for p in tmp_path.iterdir() if p.name.startswith("palace.pre-rebuild-")]
|
||||
assert len(archives) == 1
|
||||
assert (archives[0] / "chroma.sqlite3").exists()
|
||||
# Archive must still hold the same row count via the SQLite bypass —
|
||||
# proves the archive wasn't silently truncated as a side effect.
|
||||
archived_rows = list(repair.extract_via_sqlite(str(archives[0]), "mempalace_drawers"))
|
||||
assert len(archived_rows) == 15
|
||||
|
||||
from mempalace.backends.chroma import ChromaBackend
|
||||
|
||||
rebuilt = ChromaBackend().get_collection(str(palace), "mempalace_drawers")
|
||||
assert rebuilt.count() == 15
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_in_place_refuses_without_archive_flag(tmp_path):
|
||||
"""Source == dest without archive flag must abort untouched. The
|
||||
most catastrophic possible regression of this code path is silently
|
||||
deleting the only copy of the user's data."""
|
||||
palace = tmp_path / "palace"
|
||||
_seed_palace(palace, "mempalace_drawers", [("d1", "doc", {"wing": "w"})])
|
||||
sqlite_before = (palace / "chroma.sqlite3").stat().st_size
|
||||
|
||||
counts = repair.rebuild_from_sqlite(str(palace), str(palace))
|
||||
assert counts == {}
|
||||
# Same file, untouched.
|
||||
assert (palace / "chroma.sqlite3").stat().st_size == sqlite_before
|
||||
archives = [p for p in tmp_path.iterdir() if "pre-rebuild" in p.name]
|
||||
assert archives == []
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_source_missing_chroma_db(tmp_path):
|
||||
"""Source dir exists but has no chroma.sqlite3 → returns empty,
|
||||
leaves dest untouched."""
|
||||
source = tmp_path / "source"
|
||||
source.mkdir()
|
||||
(source / "stray_file").write_text("not a palace")
|
||||
dest = tmp_path / "dest"
|
||||
|
||||
counts = repair.rebuild_from_sqlite(str(source), str(dest))
|
||||
assert counts == {}
|
||||
assert not dest.exists()
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_in_place_validates_source_before_archiving(tmp_path):
|
||||
"""In-place + archive_existing_dest=True with a dir that lacks
|
||||
chroma.sqlite3 must NOT rename the dir before bailing. An earlier
|
||||
revision archived first and validated second, leaving the user with
|
||||
a renamed empty dir to manually undo. Catches that ordering bug.
|
||||
"""
|
||||
palace = tmp_path / "palace"
|
||||
palace.mkdir()
|
||||
(palace / "marker.txt").write_text("not a real palace")
|
||||
|
||||
counts = repair.rebuild_from_sqlite(str(palace), str(palace), archive_existing_dest=True)
|
||||
assert counts == {}
|
||||
# No archive created — original dir still in place with its marker.
|
||||
assert palace.exists()
|
||||
assert (palace / "marker.txt").read_text() == "not a real palace"
|
||||
archives = [p for p in tmp_path.iterdir() if "pre-rebuild" in p.name]
|
||||
assert archives == []
|
||||
|
||||
|
||||
def test_rebuild_from_sqlite_raises_on_upsert_failure(tmp_path, monkeypatch):
|
||||
"""Mid-batch upsert failure must raise ``RebuildPartialError`` and
|
||||
surface the failed collection + archive path so the user can recover.
|
||||
Without this, an unattended script gets exit-code-zero on a partial
|
||||
rebuild and the user discovers the data loss only when search starts
|
||||
returning fewer hits.
|
||||
"""
|
||||
palace = tmp_path / "palace"
|
||||
rows = [(f"d{i}", f"body {i}", {"wing": "w", "room": "r"}) for i in range(5)]
|
||||
_seed_palace(palace, "mempalace_drawers", rows)
|
||||
|
||||
# Make the very first upsert raise so we don't depend on batch
|
||||
# boundary behavior. Patching ChromaCollection.upsert (the wrapper
|
||||
# mempalace's backend returns) keeps the failure path realistic.
|
||||
# ``monkeypatch`` is pytest's built-in fixture that auto-restores
|
||||
# the original attribute when the test exits, so we don't need to
|
||||
# undo this manually.
|
||||
from mempalace.backends.chroma import ChromaCollection
|
||||
|
||||
def boom(self, **kwargs):
|
||||
raise RuntimeError("simulated chromadb upsert failure")
|
||||
|
||||
monkeypatch.setattr(ChromaCollection, "upsert", boom)
|
||||
|
||||
with pytest.raises(repair.RebuildPartialError) as excinfo:
|
||||
repair.rebuild_from_sqlite(str(palace), str(palace), archive_existing_dest=True)
|
||||
|
||||
err = excinfo.value
|
||||
assert err.failed_collection == "mempalace_drawers"
|
||||
assert err.partial_counts.get("mempalace_drawers") == 0
|
||||
assert err.archive_path is not None
|
||||
assert os.path.isfile(os.path.join(err.archive_path, "chroma.sqlite3"))
|
||||
assert err.dest_palace == os.path.abspath(str(palace))
|
||||
|
||||
Reference in New Issue
Block a user