fix: narrow _fix_blob_seq_ids shim + add repair --mode max-seq-id
The BLOB-seq_id migration shim (PR #664) ran int.from_bytes(..., 'big') over every BLOB in max_seq_id, including chromadb 1.5.x's own native format (b'\x11\x11' + 6 ASCII digits). That conversion yields a ~1.23e18 integer that silently suppresses every subsequent embeddings_queue write for the affected segment (queue filter is seq_id > start), causing silent drawer-write drops after a 1.5.x upgrade. Two-part fix: 1. Shim narrowing (mempalace/backends/chroma.py) - Drop max_seq_id from the shim loop. chromadb owns that column's format; we don't reinterpret it. - Defense-in-depth: skip rows in embeddings whose seq_id BLOB has the sysdb-10 b'\x11\x11' prefix rather than misconvert. 2. Recovery command (mempalace/repair.py, mempalace/cli.py) - mempalace repair --mode max-seq-id [--segment <uuid>] [--from-sidecar <path>] [--dry-run] [--yes] [--no-backup] - Detects poisoned rows via threshold (seq_id > 2**53). - Default heuristic: MAX(embeddings.seq_id) over the collection owning the poisoned segment. Matches METADATA max exactly; VECTOR segments get a few seq_ids ahead (queue skips an already-indexed window — an acceptable loss vs. resetting to 0 and re-processing everything). - --from-sidecar copies clean values from a pre-corruption sqlite db. - Backs up chroma.sqlite3, closes chroma handles, atomic UPDATEs, post-repair verification that raises MaxSeqIdVerificationError if any row is still above threshold. Tests: 8 new in tests/test_repair.py (detection, heuristic, sidecar, dry-run, segment filter, no-op, backup, rollback-on-verify-failure). 3 new in tests/test_backends.py (max_seq_id untouched by shim, sysdb-10 prefix skipped in embeddings, legacy big-endian u64 BLOBs still convert). Full suite: 1103 passed.
This commit is contained in:
@@ -32,7 +32,10 @@ Usage (from CLI):
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from .backends.chroma import ChromaBackend, hnsw_capacity_status
|
||||
|
||||
@@ -486,6 +489,254 @@ def status(palace_path=None) -> dict:
|
||||
return {"drawers": drawers, "closets": closets}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# max-seq-id mode: un-poison max_seq_id rows corrupted by the old shim
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _close_chroma_handles(palace_path: str) -> None:
|
||||
"""Drop ChromaBackend + chromadb singleton caches so OS mmap handles release."""
|
||||
import gc
|
||||
|
||||
try:
|
||||
ChromaBackend().close_palace(palace_path)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
from chromadb.api.client import SharedSystemClient
|
||||
|
||||
SharedSystemClient.clear_system_cache()
|
||||
except Exception:
|
||||
pass
|
||||
gc.collect()
|
||||
|
||||
|
||||
class MaxSeqIdVerificationError(RuntimeError):
|
||||
"""Raised when post-repair detection still sees poisoned rows."""
|
||||
|
||||
|
||||
#: Any ``max_seq_id.seq_id`` above this is unreachable by a real palace.
|
||||
#: Clean values are bounded by the embeddings_queue's monotonic counter (<1e10
|
||||
#: in practice), and 2**53 is the float64 exact-integer ceiling. Poisoned
|
||||
#: values from the 0.6.x shim misinterpreting chromadb 1.5.x's
|
||||
#: ``b'\x11\x11' + 6 ASCII digits`` format start at ~1.23e18, so anything
|
||||
#: above the threshold is confidently a shim-poisoning artefact.
|
||||
MAX_SEQ_ID_SANITY_THRESHOLD = 1 << 53
|
||||
|
||||
|
||||
def _detect_poisoned_max_seq_ids(
|
||||
db_path: str,
|
||||
*,
|
||||
segment: Optional[str] = None,
|
||||
threshold: int = MAX_SEQ_ID_SANITY_THRESHOLD,
|
||||
) -> list[tuple[str, int]]:
|
||||
"""Return ``[(segment_id, poisoned_seq_id), ...]`` for rows above threshold.
|
||||
|
||||
If ``segment`` is given, the detection is restricted to that segment id
|
||||
(still only returning it if it actually exceeds the threshold).
|
||||
"""
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
if segment is not None:
|
||||
rows = conn.execute(
|
||||
"SELECT segment_id, seq_id FROM max_seq_id WHERE segment_id = ? AND seq_id > ?",
|
||||
(segment, threshold),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"SELECT segment_id, seq_id FROM max_seq_id WHERE seq_id > ?",
|
||||
(threshold,),
|
||||
).fetchall()
|
||||
return [(str(sid), int(val)) for sid, val in rows]
|
||||
|
||||
|
||||
def _compute_heuristic_seq_id(cur: sqlite3.Cursor, segment_id: str) -> int:
|
||||
"""Return ``MAX(embeddings.seq_id)`` over the collection owning ``segment_id``.
|
||||
|
||||
Matches the METADATA segment's pre-poison value exactly (its max equals
|
||||
the collection-wide embeddings max). For the sibling VECTOR segment the
|
||||
value is a few seq_ids ahead of its own pre-poison max; the queue
|
||||
treats that as "already consumed", skipping a small window of
|
||||
already-indexed embeddings on next subscribe. That is an acceptable
|
||||
loss vs. resetting to 0 (which would re-process the entire queue and
|
||||
risk HNSW bloat from issue #1046).
|
||||
"""
|
||||
row = cur.execute(
|
||||
"""
|
||||
SELECT MAX(e.seq_id)
|
||||
FROM embeddings e
|
||||
JOIN segments s ON e.segment_id = s.id
|
||||
WHERE s.collection = (
|
||||
SELECT collection FROM segments WHERE id = ?
|
||||
)
|
||||
""",
|
||||
(segment_id,),
|
||||
).fetchone()
|
||||
if row is None or row[0] is None:
|
||||
return 0
|
||||
return int(row[0])
|
||||
|
||||
|
||||
def _read_sidecar_seq_ids(sidecar_path: str) -> dict[str, int]:
|
||||
"""Load ``{segment_id: seq_id}`` from a sidecar DB's ``max_seq_id`` table.
|
||||
|
||||
Rejects sidecar files whose ``max_seq_id.seq_id`` is itself BLOB-typed
|
||||
— a sidecar that old predates chromadb's type normalisation and is not
|
||||
a trustworthy restoration source.
|
||||
"""
|
||||
if not os.path.isfile(sidecar_path):
|
||||
raise FileNotFoundError(f"Sidecar database not found: {sidecar_path}")
|
||||
out: dict[str, int] = {}
|
||||
with sqlite3.connect(sidecar_path) as conn:
|
||||
rows = conn.execute("SELECT segment_id, seq_id, typeof(seq_id) FROM max_seq_id").fetchall()
|
||||
for segment_id, seq_id, kind in rows:
|
||||
if kind == "blob":
|
||||
raise ValueError(
|
||||
f"Sidecar has BLOB-typed seq_id for {segment_id}; refusing to use it. "
|
||||
"Pass a sidecar that was already migrated to INTEGER rows."
|
||||
)
|
||||
out[str(segment_id)] = int(seq_id)
|
||||
return out
|
||||
|
||||
|
||||
def repair_max_seq_id(
|
||||
palace_path: str,
|
||||
*,
|
||||
segment: Optional[str] = None,
|
||||
from_sidecar: Optional[str] = None,
|
||||
threshold: int = MAX_SEQ_ID_SANITY_THRESHOLD,
|
||||
backup: bool = True,
|
||||
dry_run: bool = False,
|
||||
assume_yes: bool = False,
|
||||
) -> dict:
|
||||
"""Un-poison ``max_seq_id`` rows corrupted by ``_fix_blob_seq_ids`` misfire.
|
||||
|
||||
The old shim ran ``int.from_bytes(blob, 'big')`` across every BLOB
|
||||
``max_seq_id.seq_id`` row, including chromadb 1.5.x's native
|
||||
``b'\\x11\\x11' + ASCII digits`` format. That conversion yields a
|
||||
~1.23e18 integer that silently suppresses every subsequent
|
||||
``embeddings_queue`` write for the affected segment. This command
|
||||
restores clean values either from a pre-corruption sidecar DB
|
||||
(exact) or heuristically (``MAX(embeddings.seq_id)`` over the owning
|
||||
collection).
|
||||
"""
|
||||
from .migrate import confirm_destructive_action, contains_palace_database
|
||||
|
||||
palace_path = os.path.abspath(os.path.expanduser(palace_path))
|
||||
db_path = os.path.join(palace_path, "chroma.sqlite3")
|
||||
|
||||
result: dict = {
|
||||
"palace_path": palace_path,
|
||||
"dry_run": dry_run,
|
||||
"aborted": False,
|
||||
"segment_repaired": [],
|
||||
"before": {},
|
||||
"after": {},
|
||||
"backup": None,
|
||||
}
|
||||
|
||||
print(f"\n{'=' * 55}")
|
||||
print(" MemPalace Repair — max_seq_id Un-poison")
|
||||
print(f"{'=' * 55}\n")
|
||||
print(f" Palace: {palace_path}")
|
||||
if segment:
|
||||
print(f" Segment: {segment}")
|
||||
if from_sidecar:
|
||||
print(f" Sidecar: {from_sidecar}")
|
||||
|
||||
if not os.path.isdir(palace_path):
|
||||
print(f" No palace found at {palace_path}")
|
||||
result["aborted"] = True
|
||||
result["reason"] = "palace-missing"
|
||||
return result
|
||||
if not contains_palace_database(palace_path):
|
||||
print(f" No palace database at {palace_path}")
|
||||
result["aborted"] = True
|
||||
result["reason"] = "db-missing"
|
||||
return result
|
||||
|
||||
poisoned = _detect_poisoned_max_seq_ids(db_path, segment=segment, threshold=threshold)
|
||||
if not poisoned:
|
||||
print(" No poisoned max_seq_id rows detected. Nothing to do.")
|
||||
print(f"\n{'=' * 55}\n")
|
||||
return result
|
||||
|
||||
sidecar_map: dict[str, int] = {}
|
||||
if from_sidecar:
|
||||
sidecar_map = _read_sidecar_seq_ids(from_sidecar)
|
||||
|
||||
plan: list[tuple[str, int, int]] = []
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
cur = conn.cursor()
|
||||
for seg_id, old_val in poisoned:
|
||||
if from_sidecar:
|
||||
if seg_id not in sidecar_map:
|
||||
print(f" Skipped segment {seg_id}: no sidecar entry")
|
||||
continue
|
||||
new_val = sidecar_map[seg_id]
|
||||
else:
|
||||
new_val = _compute_heuristic_seq_id(cur, seg_id)
|
||||
plan.append((seg_id, old_val, new_val))
|
||||
result["before"][seg_id] = old_val
|
||||
result["after"][seg_id] = new_val
|
||||
|
||||
print()
|
||||
print(" Report")
|
||||
print(f" poisoned rows {len(poisoned):>6}")
|
||||
print(f" planned repairs {len(plan):>6}")
|
||||
source = "sidecar" if from_sidecar else "heuristic (collection MAX)"
|
||||
print(f" clean-value source {source}")
|
||||
for seg_id, old_val, new_val in plan:
|
||||
print(f" {seg_id} {old_val} → {new_val}")
|
||||
|
||||
if dry_run:
|
||||
print("\n DRY RUN — no rows modified.\n" + "=" * 55 + "\n")
|
||||
return result
|
||||
|
||||
if not plan:
|
||||
print(" No actionable repairs.")
|
||||
print(f"\n{'=' * 55}\n")
|
||||
return result
|
||||
|
||||
if not confirm_destructive_action("Repair max_seq_id", palace_path, assume_yes=assume_yes):
|
||||
result["aborted"] = True
|
||||
result["reason"] = "user-aborted"
|
||||
return result
|
||||
|
||||
if backup:
|
||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
backup_path = os.path.join(palace_path, f"chroma.sqlite3.max-seq-id-backup-{timestamp}")
|
||||
shutil.copy2(db_path, backup_path)
|
||||
result["backup"] = backup_path
|
||||
print(f" Backup: {backup_path}")
|
||||
|
||||
_close_chroma_handles(palace_path)
|
||||
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("BEGIN")
|
||||
try:
|
||||
conn.executemany(
|
||||
"UPDATE max_seq_id SET seq_id = ? WHERE segment_id = ?",
|
||||
[(new_val, seg_id) for seg_id, _old, new_val in plan],
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
|
||||
remaining = _detect_poisoned_max_seq_ids(db_path, segment=segment, threshold=threshold)
|
||||
if remaining:
|
||||
raise MaxSeqIdVerificationError(
|
||||
f"Post-repair detection still found {len(remaining)} poisoned row(s): "
|
||||
f"{[sid for sid, _ in remaining]}. Backup at {result['backup']}."
|
||||
)
|
||||
|
||||
result["segment_repaired"] = [seg_id for seg_id, _old, _new in plan]
|
||||
print(f"\n Repair complete. {len(plan)} row(s) restored.")
|
||||
print(f" Backup: {result['backup'] or '(skipped)'}")
|
||||
print(f"\n{'=' * 55}\n")
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
p = argparse.ArgumentParser(description="MemPalace repair tools")
|
||||
p.add_argument("command", choices=["status", "scan", "prune", "rebuild"])
|
||||
|
||||
Reference in New Issue
Block a user