From f5c8b095dd9e057d804ed23daa4e1b07a1c9d63b Mon Sep 17 00:00:00 2001 From: eblander Date: Thu, 23 Apr 2026 14:45:38 -0400 Subject: [PATCH] fix: narrow _fix_blob_seq_ids shim + add repair --mode max-seq-id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The BLOB-seq_id migration shim (PR #664) ran int.from_bytes(..., 'big') over every BLOB in max_seq_id, including chromadb 1.5.x's own native format (b'\x11\x11' + 6 ASCII digits). That conversion yields a ~1.23e18 integer that silently suppresses every subsequent embeddings_queue write for the affected segment (queue filter is seq_id > start), causing silent drawer-write drops after a 1.5.x upgrade. Two-part fix: 1. Shim narrowing (mempalace/backends/chroma.py) - Drop max_seq_id from the shim loop. chromadb owns that column's format; we don't reinterpret it. - Defense-in-depth: skip rows in embeddings whose seq_id BLOB has the sysdb-10 b'\x11\x11' prefix rather than misconvert. 2. Recovery command (mempalace/repair.py, mempalace/cli.py) - mempalace repair --mode max-seq-id [--segment ] [--from-sidecar ] [--dry-run] [--yes] [--no-backup] - Detects poisoned rows via threshold (seq_id > 2**53). - Default heuristic: MAX(embeddings.seq_id) over the collection owning the poisoned segment. Matches METADATA max exactly; VECTOR segments get a few seq_ids ahead (queue skips an already-indexed window — an acceptable loss vs. resetting to 0 and re-processing everything). - --from-sidecar copies clean values from a pre-corruption sqlite db. - Backs up chroma.sqlite3, closes chroma handles, atomic UPDATEs, post-repair verification that raises MaxSeqIdVerificationError if any row is still above threshold. Tests: 8 new in tests/test_repair.py (detection, heuristic, sidecar, dry-run, segment filter, no-op, backup, rollback-on-verify-failure). 3 new in tests/test_backends.py (max_seq_id untouched by shim, sysdb-10 prefix skipped in embeddings, legacy big-endian u64 BLOBs still convert). Full suite: 1103 passed. --- CHANGELOG.md | 1 + mempalace/backends/chroma.py | 45 ++++-- mempalace/cli.py | 52 ++++++- mempalace/repair.py | 251 +++++++++++++++++++++++++++++++ tests/test_backends.py | 73 ++++++++- tests/test_repair.py | 282 +++++++++++++++++++++++++++++++++++ 6 files changed, 684 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25b7853..0aa833d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -177,6 +177,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), - Hall detection — routes drawer content to `emotions` / `technical` / `family` / `memory` / `identity` / `consciousness` / `creative` halls, enabling hall-based graph connectivity within wings (#835) ### Bug Fixes +- Repair `max_seq_id` corruption caused by `_fix_blob_seq_ids` misinterpreting chromadb 1.5.x's sysdb-10 BLOB format (`b'\x11\x11'` + ASCII digits) as legacy 0.6.x big-endian BLOBs. The shim now skips the `max_seq_id` table entirely and guards the `embeddings` branch with a prefix check. New subcommand `mempalace repair --mode max-seq-id [--from-sidecar ]` restores affected palaces. Fixes silent drawer-write drops that began after chromadb 1.5.x upgrades on palaces that still had BLOB-typed `max_seq_id` rows at migration time. - Set `hnsw:space=cosine` metadata on all collection creation sites — fixes broken similarity scoring under ChromaDB's default L2 distance (#807, #218) - File-level locking prevents duplicate drawers when agents mine the same file concurrently (#784, #826) - Hybrid closet+drawer retrieval — closets boost ranking, never gate results (#795) diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index ad7748f..5f3540e 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -517,6 +517,21 @@ def _fix_blob_seq_ids(palace_path: str) -> None: the Rust compactor to crash with "mismatched types; Rust type u64 (as SQL type INTEGER) is not compatible with SQL type BLOB". + Scoped to the ``embeddings`` table only. The ``max_seq_id`` table used + to be included in this loop, but chromadb 1.5.x writes its own BLOB + format there (``b'\\x11\\x11'`` + 6 ASCII digits). Misinterpreting that + format via ``int.from_bytes(..., 'big')`` yields a ~1.23e18 integer + that silently suppresses every subsequent write for the affected + segment (``embeddings_queue`` filters on ``seq_id > start``). chromadb + owns the ``max_seq_id`` column — we leave it alone. Palaces already + poisoned by the old behaviour can be repaired via + ``mempalace repair --mode max-seq-id``. + + Defense-in-depth: rows with the sysdb-10 ``b'\\x11\\x11'`` prefix in + ``embeddings`` are skipped rather than converted. Real 0.6.x BLOBs are + pure big-endian u64 with no text prefix, so the prefix check is a + no-op for genuine legacy data. + Must run BEFORE PersistentClient is created (the compactor fires on init). Opening a Python sqlite3 connection against a ChromaDB 1.5.x WAL-mode @@ -533,18 +548,24 @@ def _fix_blob_seq_ids(palace_path: str) -> None: return try: with sqlite3.connect(db_path) as conn: - for table in ("embeddings", "max_seq_id"): - try: - rows = conn.execute( - f"SELECT rowid, seq_id FROM {table} WHERE typeof(seq_id) = 'blob'" - ).fetchall() - except sqlite3.OperationalError: - continue - if not rows: - continue - updates = [(int.from_bytes(blob, byteorder="big"), rowid) for rowid, blob in rows] - conn.executemany(f"UPDATE {table} SET seq_id = ? WHERE rowid = ?", updates) - logger.info("Fixed %d BLOB seq_ids in %s", len(updates), table) + try: + rows = conn.execute( + "SELECT rowid, seq_id FROM embeddings WHERE typeof(seq_id) = 'blob'" + ).fetchall() + except sqlite3.OperationalError: + return + safe_rows = [(rowid, blob) for rowid, blob in rows if not blob.startswith(b"\x11\x11")] + skipped = len(rows) - len(safe_rows) + if skipped: + logger.warning( + "Skipped %d sysdb-10-format BLOB seq_id(s) in embeddings (not converting)", + skipped, + ) + if not safe_rows: + return + updates = [(int.from_bytes(blob, byteorder="big"), rowid) for rowid, blob in safe_rows] + conn.executemany("UPDATE embeddings SET seq_id = ? WHERE rowid = ?", updates) + logger.info("Fixed %d BLOB seq_ids in embeddings", len(updates)) conn.commit() except Exception: logger.exception("Could not fix BLOB seq_ids in %s", db_path) diff --git a/mempalace/cli.py b/mempalace/cli.py index 80ac9b0..850283a 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -625,6 +625,20 @@ def cmd_repair(args): palace_path = os.path.abspath( os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path ) + + if getattr(args, "mode", "legacy") == "max-seq-id": + from .repair import repair_max_seq_id + + repair_max_seq_id( + palace_path, + segment=getattr(args, "segment", None), + from_sidecar=getattr(args, "from_sidecar", None), + backup=getattr(args, "backup", True), + dry_run=getattr(args, "dry_run", False), + assume_yes=getattr(args, "yes", False), + ) + return + db_path = os.path.join(palace_path, "chroma.sqlite3") if not os.path.isdir(palace_path): @@ -1117,7 +1131,10 @@ def main(): # repair p_repair = sub.add_parser( "repair", - help="Rebuild palace vector index from stored data (fixes segfaults after corruption)", + help=( + "Rebuild palace vector index (legacy mode) or un-poison max_seq_id rows " + "(--mode max-seq-id)" + ), ) p_repair.add_argument( "--yes", action="store_true", help="Skip confirmation for destructive changes" @@ -1132,6 +1149,39 @@ def main(): "the palace really contains that count." ), ) + p_repair.add_argument( + "--mode", + choices=["legacy", "max-seq-id"], + default="legacy", + help=( + "legacy: full-palace rebuild (default). " + "max-seq-id: un-poison max_seq_id rows corrupted by the legacy 0.6.x shim." + ), + ) + p_repair.add_argument( + "--segment", + default=None, + help="Segment UUID filter for --mode max-seq-id (repairs only that segment).", + ) + p_repair.add_argument( + "--from-sidecar", + default=None, + help=( + "Path to a pre-corruption chroma.sqlite3 sidecar (for --mode max-seq-id); " + "clean values are copied from its max_seq_id table verbatim." + ), + ) + p_repair.add_argument( + "--backup", + action=argparse.BooleanOptionalAction, + default=True, + help="Back up SQLite before mutation (default: on)", + ) + p_repair.add_argument( + "--dry-run", + action="store_true", + help="Print detected poisoned rows and exit without mutation (--mode max-seq-id only)", + ) # repair-status — read-only HNSW capacity health check (#1222) sub.add_parser( diff --git a/mempalace/repair.py b/mempalace/repair.py index c6e99e7..fe2ba15 100644 --- a/mempalace/repair.py +++ b/mempalace/repair.py @@ -32,7 +32,10 @@ Usage (from CLI): import argparse import os import shutil +import sqlite3 import time +from datetime import datetime +from typing import Optional from .backends.chroma import ChromaBackend, hnsw_capacity_status @@ -486,6 +489,254 @@ def status(palace_path=None) -> dict: return {"drawers": drawers, "closets": closets} +# --------------------------------------------------------------------------- +# max-seq-id mode: un-poison max_seq_id rows corrupted by the old shim +# --------------------------------------------------------------------------- + + +def _close_chroma_handles(palace_path: str) -> None: + """Drop ChromaBackend + chromadb singleton caches so OS mmap handles release.""" + import gc + + try: + ChromaBackend().close_palace(palace_path) + except Exception: + pass + try: + from chromadb.api.client import SharedSystemClient + + SharedSystemClient.clear_system_cache() + except Exception: + pass + gc.collect() + + +class MaxSeqIdVerificationError(RuntimeError): + """Raised when post-repair detection still sees poisoned rows.""" + + +#: Any ``max_seq_id.seq_id`` above this is unreachable by a real palace. +#: Clean values are bounded by the embeddings_queue's monotonic counter (<1e10 +#: in practice), and 2**53 is the float64 exact-integer ceiling. Poisoned +#: values from the 0.6.x shim misinterpreting chromadb 1.5.x's +#: ``b'\x11\x11' + 6 ASCII digits`` format start at ~1.23e18, so anything +#: above the threshold is confidently a shim-poisoning artefact. +MAX_SEQ_ID_SANITY_THRESHOLD = 1 << 53 + + +def _detect_poisoned_max_seq_ids( + db_path: str, + *, + segment: Optional[str] = None, + threshold: int = MAX_SEQ_ID_SANITY_THRESHOLD, +) -> list[tuple[str, int]]: + """Return ``[(segment_id, poisoned_seq_id), ...]`` for rows above threshold. + + If ``segment`` is given, the detection is restricted to that segment id + (still only returning it if it actually exceeds the threshold). + """ + with sqlite3.connect(db_path) as conn: + if segment is not None: + rows = conn.execute( + "SELECT segment_id, seq_id FROM max_seq_id WHERE segment_id = ? AND seq_id > ?", + (segment, threshold), + ).fetchall() + else: + rows = conn.execute( + "SELECT segment_id, seq_id FROM max_seq_id WHERE seq_id > ?", + (threshold,), + ).fetchall() + return [(str(sid), int(val)) for sid, val in rows] + + +def _compute_heuristic_seq_id(cur: sqlite3.Cursor, segment_id: str) -> int: + """Return ``MAX(embeddings.seq_id)`` over the collection owning ``segment_id``. + + Matches the METADATA segment's pre-poison value exactly (its max equals + the collection-wide embeddings max). For the sibling VECTOR segment the + value is a few seq_ids ahead of its own pre-poison max; the queue + treats that as "already consumed", skipping a small window of + already-indexed embeddings on next subscribe. That is an acceptable + loss vs. resetting to 0 (which would re-process the entire queue and + risk HNSW bloat from issue #1046). + """ + row = cur.execute( + """ + SELECT MAX(e.seq_id) + FROM embeddings e + JOIN segments s ON e.segment_id = s.id + WHERE s.collection = ( + SELECT collection FROM segments WHERE id = ? + ) + """, + (segment_id,), + ).fetchone() + if row is None or row[0] is None: + return 0 + return int(row[0]) + + +def _read_sidecar_seq_ids(sidecar_path: str) -> dict[str, int]: + """Load ``{segment_id: seq_id}`` from a sidecar DB's ``max_seq_id`` table. + + Rejects sidecar files whose ``max_seq_id.seq_id`` is itself BLOB-typed + — a sidecar that old predates chromadb's type normalisation and is not + a trustworthy restoration source. + """ + if not os.path.isfile(sidecar_path): + raise FileNotFoundError(f"Sidecar database not found: {sidecar_path}") + out: dict[str, int] = {} + with sqlite3.connect(sidecar_path) as conn: + rows = conn.execute("SELECT segment_id, seq_id, typeof(seq_id) FROM max_seq_id").fetchall() + for segment_id, seq_id, kind in rows: + if kind == "blob": + raise ValueError( + f"Sidecar has BLOB-typed seq_id for {segment_id}; refusing to use it. " + "Pass a sidecar that was already migrated to INTEGER rows." + ) + out[str(segment_id)] = int(seq_id) + return out + + +def repair_max_seq_id( + palace_path: str, + *, + segment: Optional[str] = None, + from_sidecar: Optional[str] = None, + threshold: int = MAX_SEQ_ID_SANITY_THRESHOLD, + backup: bool = True, + dry_run: bool = False, + assume_yes: bool = False, +) -> dict: + """Un-poison ``max_seq_id`` rows corrupted by ``_fix_blob_seq_ids`` misfire. + + The old shim ran ``int.from_bytes(blob, 'big')`` across every BLOB + ``max_seq_id.seq_id`` row, including chromadb 1.5.x's native + ``b'\\x11\\x11' + ASCII digits`` format. That conversion yields a + ~1.23e18 integer that silently suppresses every subsequent + ``embeddings_queue`` write for the affected segment. This command + restores clean values either from a pre-corruption sidecar DB + (exact) or heuristically (``MAX(embeddings.seq_id)`` over the owning + collection). + """ + from .migrate import confirm_destructive_action, contains_palace_database + + palace_path = os.path.abspath(os.path.expanduser(palace_path)) + db_path = os.path.join(palace_path, "chroma.sqlite3") + + result: dict = { + "palace_path": palace_path, + "dry_run": dry_run, + "aborted": False, + "segment_repaired": [], + "before": {}, + "after": {}, + "backup": None, + } + + print(f"\n{'=' * 55}") + print(" MemPalace Repair — max_seq_id Un-poison") + print(f"{'=' * 55}\n") + print(f" Palace: {palace_path}") + if segment: + print(f" Segment: {segment}") + if from_sidecar: + print(f" Sidecar: {from_sidecar}") + + if not os.path.isdir(palace_path): + print(f" No palace found at {palace_path}") + result["aborted"] = True + result["reason"] = "palace-missing" + return result + if not contains_palace_database(palace_path): + print(f" No palace database at {palace_path}") + result["aborted"] = True + result["reason"] = "db-missing" + return result + + poisoned = _detect_poisoned_max_seq_ids(db_path, segment=segment, threshold=threshold) + if not poisoned: + print(" No poisoned max_seq_id rows detected. Nothing to do.") + print(f"\n{'=' * 55}\n") + return result + + sidecar_map: dict[str, int] = {} + if from_sidecar: + sidecar_map = _read_sidecar_seq_ids(from_sidecar) + + plan: list[tuple[str, int, int]] = [] + with sqlite3.connect(db_path) as conn: + cur = conn.cursor() + for seg_id, old_val in poisoned: + if from_sidecar: + if seg_id not in sidecar_map: + print(f" Skipped segment {seg_id}: no sidecar entry") + continue + new_val = sidecar_map[seg_id] + else: + new_val = _compute_heuristic_seq_id(cur, seg_id) + plan.append((seg_id, old_val, new_val)) + result["before"][seg_id] = old_val + result["after"][seg_id] = new_val + + print() + print(" Report") + print(f" poisoned rows {len(poisoned):>6}") + print(f" planned repairs {len(plan):>6}") + source = "sidecar" if from_sidecar else "heuristic (collection MAX)" + print(f" clean-value source {source}") + for seg_id, old_val, new_val in plan: + print(f" {seg_id} {old_val} → {new_val}") + + if dry_run: + print("\n DRY RUN — no rows modified.\n" + "=" * 55 + "\n") + return result + + if not plan: + print(" No actionable repairs.") + print(f"\n{'=' * 55}\n") + return result + + if not confirm_destructive_action("Repair max_seq_id", palace_path, assume_yes=assume_yes): + result["aborted"] = True + result["reason"] = "user-aborted" + return result + + if backup: + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + backup_path = os.path.join(palace_path, f"chroma.sqlite3.max-seq-id-backup-{timestamp}") + shutil.copy2(db_path, backup_path) + result["backup"] = backup_path + print(f" Backup: {backup_path}") + + _close_chroma_handles(palace_path) + + with sqlite3.connect(db_path) as conn: + conn.execute("BEGIN") + try: + conn.executemany( + "UPDATE max_seq_id SET seq_id = ? WHERE segment_id = ?", + [(new_val, seg_id) for seg_id, _old, new_val in plan], + ) + conn.commit() + except Exception: + conn.rollback() + raise + + remaining = _detect_poisoned_max_seq_ids(db_path, segment=segment, threshold=threshold) + if remaining: + raise MaxSeqIdVerificationError( + f"Post-repair detection still found {len(remaining)} poisoned row(s): " + f"{[sid for sid, _ in remaining]}. Backup at {result['backup']}." + ) + + result["segment_repaired"] = [seg_id for seg_id, _old, _new in plan] + print(f"\n Repair complete. {len(plan)} row(s) restored.") + print(f" Backup: {result['backup'] or '(skipped)'}") + print(f"\n{'=' * 55}\n") + return result + + if __name__ == "__main__": p = argparse.ArgumentParser(description="MemPalace repair tools") p.add_argument("command", choices=["status", "scan", "prune", "rebuild"]) diff --git a/tests/test_backends.py b/tests/test_backends.py index 9fe5ca1..9a609d1 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -341,12 +341,9 @@ def test_fix_blob_seq_ids_converts_blobs_to_integers(tmp_path): db_path = tmp_path / "chroma.sqlite3" conn = sqlite3.connect(str(db_path)) conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)") - conn.execute("CREATE TABLE max_seq_id (rowid INTEGER PRIMARY KEY, seq_id)") - # Insert BLOB seq_ids like ChromaDB 0.6.x would + # Insert BLOB seq_id like ChromaDB 0.6.x would blob_42 = (42).to_bytes(8, byteorder="big") - blob_99 = (99).to_bytes(8, byteorder="big") conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (blob_42,)) - conn.execute("INSERT INTO max_seq_id (seq_id) VALUES (?)", (blob_99,)) conn.commit() conn.close() @@ -355,8 +352,6 @@ def test_fix_blob_seq_ids_converts_blobs_to_integers(tmp_path): conn = sqlite3.connect(str(db_path)) row = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings").fetchone() assert row == (42, "integer") - row = conn.execute("SELECT seq_id, typeof(seq_id) FROM max_seq_id").fetchone() - assert row == (99, "integer") conn.close() @@ -382,6 +377,71 @@ def test_fix_blob_seq_ids_noop_without_database(tmp_path): _fix_blob_seq_ids(str(tmp_path)) # should not raise +def test_fix_blob_seq_ids_does_not_touch_max_seq_id(tmp_path): + """chromadb 1.5.x owns max_seq_id; the shim must not interpret its BLOBs. + + Regression guard for the 2026-04-20 incident: the old shim ran + int.from_bytes(..., 'big') over chromadb 1.5.x's native + b'\\x11\\x11' + ASCII-digit BLOB, producing a ~1.23e18 integer that + silently suppressed every subsequent embeddings_queue write. + """ + db_path = tmp_path / "chroma.sqlite3" + conn = sqlite3.connect(str(db_path)) + conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)") + conn.execute("CREATE TABLE max_seq_id (rowid INTEGER PRIMARY KEY, seq_id)") + sysdb10_blob = b"\x11\x11502607" + conn.execute("INSERT INTO max_seq_id (seq_id) VALUES (?)", (sysdb10_blob,)) + conn.commit() + conn.close() + + _fix_blob_seq_ids(str(tmp_path)) + + conn = sqlite3.connect(str(db_path)) + row = conn.execute("SELECT seq_id, typeof(seq_id) FROM max_seq_id").fetchone() + assert row == (sysdb10_blob, "blob") + conn.close() + + +def test_fix_blob_seq_ids_skips_sysdb10_prefix_in_embeddings(tmp_path): + """Defense-in-depth: sysdb-10 prefix in embeddings.seq_id is skipped.""" + db_path = tmp_path / "chroma.sqlite3" + conn = sqlite3.connect(str(db_path)) + conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)") + sysdb10_blob = b"\x11\x11502607" + conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (sysdb10_blob,)) + conn.commit() + conn.close() + + _fix_blob_seq_ids(str(tmp_path)) + + conn = sqlite3.connect(str(db_path)) + row = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings").fetchone() + # Still a BLOB — not converted to 1.23e18. + assert row == (sysdb10_blob, "blob") + conn.close() + + +def test_fix_blob_seq_ids_still_converts_legacy_blobs_in_embeddings(tmp_path): + """Regression guard: pure big-endian u64 BLOBs still convert for genuine 0.6.x.""" + db_path = tmp_path / "chroma.sqlite3" + conn = sqlite3.connect(str(db_path)) + conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)") + conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", ((42).to_bytes(8, "big"),)) + conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (b"\x11\x11502607",)) + conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", ((7).to_bytes(8, "big"),)) + conn.commit() + conn.close() + + _fix_blob_seq_ids(str(tmp_path)) + + conn = sqlite3.connect(str(db_path)) + rows = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings ORDER BY rowid").fetchall() + assert rows[0] == (42, "integer") + assert rows[1] == (b"\x11\x11502607", "blob") # sysdb-10 row left alone + assert rows[2] == (7, "integer") + conn.close() + + def test_fix_blob_seq_ids_writes_marker_after_blob_path(tmp_path): """The .blob_seq_ids_migrated marker is written after a successful BLOB → INTEGER conversion.""" from mempalace.backends.chroma import _BLOB_FIX_MARKER @@ -447,7 +507,6 @@ def test_fix_blob_seq_ids_skips_sqlite_when_marker_present(tmp_path): mock_connect.assert_not_called() - # ── quarantine_stale_hnsw ───────────────────────────────────────────────── diff --git a/tests/test_repair.py b/tests/test_repair.py index 00bcb02..18dd9c4 100644 --- a/tests/test_repair.py +++ b/tests/test_repair.py @@ -1,8 +1,10 @@ """Tests for mempalace.repair — scan, prune, and rebuild HNSW index.""" import os +import sqlite3 from unittest.mock import MagicMock, patch +import pytest from mempalace import repair @@ -374,3 +376,283 @@ def test_rebuild_index_proceeds_with_override(mock_backend_cls, mock_shutil, tmp mock_backend.delete_collection.assert_called_once() mock_backend.create_collection.assert_called_once() mock_new_col.upsert.assert_called() + + +# ── repair_max_seq_id ───────────────────────────────────────────────── + + +# Realistic poisoned values from the 2026-04-20 incident — from the sysdb-10 +# b'\x11\x11' + 6 ASCII digit format being misread as big-endian u64. +_POISON_VAL = 1_229_822_654_365_970_487 + + +def _seed_poisoned_max_seq_id( + palace_path: str, + *, + drawers_meta_max: int = 502607, + closets_meta_max: int = 501418, + drawers_vec_poison: int = _POISON_VAL, + drawers_meta_poison: int = _POISON_VAL + 1, + closets_vec_poison: int = _POISON_VAL + 2, + closets_meta_poison: int = _POISON_VAL + 3, +): + """Build a minimal palace with poisoned max_seq_id rows. + + Returns a dict with segment UUIDs and the expected clean values. + """ + os.makedirs(palace_path, exist_ok=True) + db_path = os.path.join(palace_path, "chroma.sqlite3") + + drawers_coll = "coll-drawers-0000-1111-2222-333344445555" + closets_coll = "coll-closets-0000-1111-2222-333344445555" + drawers_vec = "seg-drawers-vec-0000-1111-2222-333344445555" + drawers_meta = "seg-drawers-meta-0000-1111-2222-33334444555" + closets_vec = "seg-closets-vec-0000-1111-2222-333344445555" + closets_meta = "seg-closets-meta-0000-1111-2222-33334444555" + + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE segments( + id TEXT PRIMARY KEY, type TEXT, scope TEXT, collection TEXT + ); + CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id); + CREATE TABLE embeddings( + id INTEGER PRIMARY KEY AUTOINCREMENT, + segment_id TEXT, + embedding_id TEXT, + seq_id + ); + CREATE TABLE embeddings_queue(seq_id INTEGER PRIMARY KEY, topic TEXT, id TEXT); + CREATE TABLE collection_metadata(collection_id TEXT, key TEXT, str_value TEXT); + """ + ) + conn.executemany( + "INSERT INTO segments VALUES (?, ?, ?, ?)", + [ + (drawers_vec, "urn:vector", "VECTOR", drawers_coll), + (drawers_meta, "urn:metadata", "METADATA", drawers_coll), + (closets_vec, "urn:vector", "VECTOR", closets_coll), + (closets_meta, "urn:metadata", "METADATA", closets_coll), + ], + ) + conn.executemany( + "INSERT INTO max_seq_id(segment_id, seq_id) VALUES (?, ?)", + [ + (drawers_vec, drawers_vec_poison), + (drawers_meta, drawers_meta_poison), + (closets_vec, closets_vec_poison), + (closets_meta, closets_meta_poison), + ], + ) + # Populate embeddings so the collection-MAX heuristic has data to work with. + # drawers METADATA owns the max at drawers_meta_max; closets likewise. + for i in range(1, drawers_meta_max + 1, max(drawers_meta_max // 5, 1)): + conn.execute( + "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", + (drawers_meta, f"d-{i}", i), + ) + conn.execute( + "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", + (drawers_meta, "d-max", drawers_meta_max), + ) + for i in range(1, closets_meta_max + 1, max(closets_meta_max // 5, 1)): + conn.execute( + "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", + (closets_meta, f"c-{i}", i), + ) + conn.execute( + "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", + (closets_meta, "c-max", closets_meta_max), + ) + conn.commit() + conn.close() + return { + "drawers_vec": drawers_vec, + "drawers_meta": drawers_meta, + "closets_vec": closets_vec, + "closets_meta": closets_meta, + "drawers_meta_max": drawers_meta_max, + "closets_meta_max": closets_meta_max, + "poisoned_values": { + drawers_vec: drawers_vec_poison, + drawers_meta: drawers_meta_poison, + closets_vec: closets_vec_poison, + closets_meta: closets_meta_poison, + }, + } + + +def test_max_seq_id_detects_poison_rows(tmp_path): + palace = str(tmp_path / "palace") + seg = _seed_poisoned_max_seq_id(palace) + db_path = os.path.join(palace, "chroma.sqlite3") + + # Add one clean row to confirm the threshold actually filters. + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO segments VALUES ('seg-clean', 'urn:vector', 'VECTOR', 'coll-clean')" + ) + conn.execute("INSERT INTO max_seq_id VALUES ('seg-clean', 1234)") + conn.commit() + + found = repair._detect_poisoned_max_seq_ids(db_path) + ids = {sid for sid, _ in found} + assert ids == { + seg["drawers_vec"], + seg["drawers_meta"], + seg["closets_vec"], + seg["closets_meta"], + } + for sid, val in found: + assert val > repair.MAX_SEQ_ID_SANITY_THRESHOLD + assert "seg-clean" not in ids + + +def test_max_seq_id_heuristic_uses_collection_max(tmp_path): + palace = str(tmp_path / "palace") + seg = _seed_poisoned_max_seq_id(palace) + + result = repair.repair_max_seq_id(palace, dry_run=True) + # Both drawers segments (VECTOR + METADATA) get the drawers collection max. + assert result["after"][seg["drawers_vec"]] == seg["drawers_meta_max"] + assert result["after"][seg["drawers_meta"]] == seg["drawers_meta_max"] + # Both closets segments get the closets collection max. + assert result["after"][seg["closets_vec"]] == seg["closets_meta_max"] + assert result["after"][seg["closets_meta"]] == seg["closets_meta_max"] + + +def test_max_seq_id_from_sidecar_exact_restore(tmp_path): + palace = str(tmp_path / "palace") + seg = _seed_poisoned_max_seq_id(palace) + + # Craft a sidecar with known clean values that differ from the heuristic's + # collection-max, so we can prove the sidecar path is preferred. + sidecar_path = str(tmp_path / "chroma.sqlite3.sidecar") + clean = { + seg["drawers_vec"]: 499001, + seg["drawers_meta"]: 499002, + seg["closets_vec"]: 498001, + seg["closets_meta"]: 498002, + } + with sqlite3.connect(sidecar_path) as conn: + conn.execute("CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id INTEGER)") + conn.executemany( + "INSERT INTO max_seq_id VALUES (?, ?)", + list(clean.items()), + ) + conn.commit() + + result = repair.repair_max_seq_id(palace, from_sidecar=sidecar_path, assume_yes=True) + assert result["segment_repaired"] + db_path = os.path.join(palace, "chroma.sqlite3") + with sqlite3.connect(db_path) as conn: + rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) + for sid, val in clean.items(): + assert rows[sid] == val + + +def test_max_seq_id_dry_run_no_mutation(tmp_path): + palace = str(tmp_path / "palace") + seg = _seed_poisoned_max_seq_id(palace) + db_path = os.path.join(palace, "chroma.sqlite3") + + with sqlite3.connect(db_path) as conn: + before = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) + + result = repair.repair_max_seq_id(palace, dry_run=True) + assert result["dry_run"] is True + assert result["segment_repaired"] == [] + + with sqlite3.connect(db_path) as conn: + after = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) + assert before == after + # Nothing dropped into the palace dir either (no backup on dry-run). + assert not any(fn.startswith("chroma.sqlite3.max-seq-id-backup-") for fn in os.listdir(palace)) + assert seg["drawers_vec"] in before # sanity + + +def test_max_seq_id_segment_filter(tmp_path): + palace = str(tmp_path / "palace") + seg = _seed_poisoned_max_seq_id(palace) + + result = repair.repair_max_seq_id(palace, segment=seg["drawers_meta"], assume_yes=True) + assert result["segment_repaired"] == [seg["drawers_meta"]] + + db_path = os.path.join(palace, "chroma.sqlite3") + with sqlite3.connect(db_path) as conn: + rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) + # Filtered segment is fixed; the other three remain poisoned. + assert rows[seg["drawers_meta"]] == seg["drawers_meta_max"] + for other in (seg["drawers_vec"], seg["closets_vec"], seg["closets_meta"]): + assert rows[other] > repair.MAX_SEQ_ID_SANITY_THRESHOLD + + +def test_max_seq_id_no_poison_is_noop(tmp_path): + palace = str(tmp_path / "palace") + os.makedirs(palace) + db_path = os.path.join(palace, "chroma.sqlite3") + with sqlite3.connect(db_path) as conn: + conn.executescript( + """ + CREATE TABLE segments( + id TEXT PRIMARY KEY, type TEXT, scope TEXT, collection TEXT + ); + CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id); + CREATE TABLE embeddings( + id INTEGER PRIMARY KEY AUTOINCREMENT, + segment_id TEXT, embedding_id TEXT, seq_id + ); + INSERT INTO segments VALUES ('s1', 'urn:vector', 'VECTOR', 'coll'); + INSERT INTO max_seq_id VALUES ('s1', 12345); + """ + ) + conn.commit() + + result = repair.repair_max_seq_id(palace, assume_yes=True) + assert result["segment_repaired"] == [] + assert result["backup"] is None + with sqlite3.connect(db_path) as conn: + rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) + assert rows == {"s1": 12345} + + +def test_max_seq_id_backup_created(tmp_path): + palace = str(tmp_path / "palace") + seg = _seed_poisoned_max_seq_id(palace) + + result = repair.repair_max_seq_id(palace, assume_yes=True) + assert result["backup"] is not None + assert os.path.isfile(result["backup"]) + + with sqlite3.connect(result["backup"]) as conn: + rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) + # Backup preserves the poisoned values from before the repair. + assert rows[seg["drawers_vec"]] == seg["poisoned_values"][seg["drawers_vec"]] + assert rows[seg["drawers_meta"]] == seg["poisoned_values"][seg["drawers_meta"]] + + +def test_max_seq_id_rollback_on_verification_failure(tmp_path, monkeypatch): + """If the post-update detector still sees poison, raise and leave a backup.""" + palace = str(tmp_path / "palace") + _seed_poisoned_max_seq_id(palace) + + real_detect = repair._detect_poisoned_max_seq_ids + calls = {"n": 0} + + def flaky_detect(*args, **kwargs): + calls["n"] += 1 + # First call (pre-repair) returns the real set so the repair proceeds. + if calls["n"] == 1: + return real_detect(*args, **kwargs) + # Second call (post-repair verification) claims poison still exists. + return [("seg-fake-still-poisoned", repair.MAX_SEQ_ID_SANITY_THRESHOLD + 1)] + + monkeypatch.setattr(repair, "_detect_poisoned_max_seq_ids", flaky_detect) + + with pytest.raises(repair.MaxSeqIdVerificationError): + repair.repair_max_seq_id(palace, assume_yes=True) + + # A backup file is still present — caller can roll back from it. + leftover = [fn for fn in os.listdir(palace) if "max-seq-id-backup-" in fn] + assert leftover