"""Tests for mempalace.repair — scan, prune, and rebuild HNSW index.""" import os import sqlite3 from unittest.mock import MagicMock, call, patch import pytest from mempalace import repair # ── _get_palace_path ────────────────────────────────────────────────── @patch("mempalace.repair.MempalaceConfig", create=True) def test_get_palace_path_from_config(mock_config_cls): mock_config_cls.return_value.palace_path = "/configured/palace" with patch.dict("sys.modules", {}): # Force reimport to pick up the mock result = repair._get_palace_path() assert isinstance(result, str) def test_get_palace_path_fallback(): with patch("mempalace.repair._get_palace_path") as mock_get: mock_get.return_value = os.path.join(os.path.expanduser("~"), ".mempalace", "palace") result = mock_get() assert ".mempalace" in result def test_get_collection_name_from_config(): from mempalace.config import get_configured_collection_name get_configured_collection_name.cache_clear() with patch("mempalace.config.MempalaceConfig") as mock_config_cls: mock_config_cls.return_value.collection_name = "custom_drawers" assert repair._drawers_collection_name() == "custom_drawers" get_configured_collection_name.cache_clear() # ── _paginate_ids ───────────────────────────────────────────────────── def test_paginate_ids_single_batch(): col = MagicMock() col.get.return_value = {"ids": ["id1", "id2", "id3"]} ids = repair._paginate_ids(col) assert ids == ["id1", "id2", "id3"] def test_paginate_ids_empty(): col = MagicMock() col.get.return_value = {"ids": []} ids = repair._paginate_ids(col) assert ids == [] def test_paginate_ids_with_where(): col = MagicMock() col.get.return_value = {"ids": ["id1"]} repair._paginate_ids(col, where={"wing": "test"}) col.get.assert_called_with(where={"wing": "test"}, include=[], limit=1000, offset=0) def test_paginate_ids_offset_exception_fallback(): col = MagicMock() # First call raises, fallback returns ids, second fallback returns empty col.get.side_effect = [ Exception("offset bug"), {"ids": ["id1", "id2"]}, Exception("offset bug"), {"ids": ["id1", "id2"]}, # same ids = no new = break ] ids = repair._paginate_ids(col) assert "id1" in ids # ── scan_palace ─────────────────────────────────────────────────────── def _install_mock_backend(mock_backend_cls, collection): """Wire mock_backend_cls so ChromaBackend().get_collection(...) returns *collection*.""" mock_backend = MagicMock() mock_backend.get_collection.return_value = collection mock_backend_cls.return_value = mock_backend return mock_backend @patch("mempalace.repair.ChromaBackend") def test_scan_palace_no_ids(mock_backend_cls, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 0 mock_col.get.return_value = {"ids": []} _install_mock_backend(mock_backend_cls, mock_col) good, bad = repair.scan_palace(palace_path=str(tmp_path)) assert good == set() assert bad == set() @patch("mempalace.repair.ChromaBackend") def test_scan_palace_all_good(mock_backend_cls, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 2 # _paginate_ids call mock_col.get.side_effect = [ {"ids": ["id1", "id2"]}, # paginate {"ids": ["id1", "id2"]}, # probe batch — both returned ] _install_mock_backend(mock_backend_cls, mock_col) good, bad = repair.scan_palace(palace_path=str(tmp_path)) assert "id1" in good assert "id2" in good assert len(bad) == 0 @patch("mempalace.repair.ChromaBackend") def test_scan_palace_with_bad_ids(mock_backend_cls, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 2 def get_side_effect(**kwargs): ids = kwargs.get("ids", None) if ids is None: # paginate call return {"ids": ["good1", "bad1"]} if "bad1" in ids and len(ids) == 1: raise Exception("corrupt") if "good1" in ids and len(ids) == 1: return {"ids": ["good1"]} # batch probe — raise to force per-id raise Exception("batch fail") mock_col.get.side_effect = get_side_effect _install_mock_backend(mock_backend_cls, mock_col) good, bad = repair.scan_palace(palace_path=str(tmp_path)) assert "good1" in good assert "bad1" in bad @patch("mempalace.repair.ChromaBackend") def test_scan_palace_with_wing_filter(mock_backend_cls, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 1 mock_col.get.side_effect = [ {"ids": ["id1"]}, # paginate {"ids": ["id1"]}, # probe ] _install_mock_backend(mock_backend_cls, mock_col) repair.scan_palace(palace_path=str(tmp_path), only_wing="test_wing") # Verify where filter was passed first_call = mock_col.get.call_args_list[0] assert first_call.kwargs.get("where") == {"wing": "test_wing"} # ── prune_corrupt ───────────────────────────────────────────────────── @patch("mempalace.repair.ChromaBackend") def test_prune_corrupt_no_file(mock_backend_cls, tmp_path): # Should print message and return without error repair.prune_corrupt(palace_path=str(tmp_path)) @patch("mempalace.repair.ChromaBackend") def test_prune_corrupt_dry_run(mock_backend_cls, tmp_path): bad_file = tmp_path / "corrupt_ids.txt" bad_file.write_text("bad1\nbad2\n") repair.prune_corrupt(palace_path=str(tmp_path), confirm=False) # No backend calls in dry run mock_backend_cls.assert_not_called() @patch("mempalace.repair.ChromaBackend") def test_prune_corrupt_confirmed(mock_backend_cls, tmp_path): bad_file = tmp_path / "corrupt_ids.txt" bad_file.write_text("bad1\nbad2\n") mock_col = MagicMock() mock_col.count.side_effect = [10, 8] _install_mock_backend(mock_backend_cls, mock_col) repair.prune_corrupt(palace_path=str(tmp_path), confirm=True) mock_col.delete.assert_called_once() @patch("mempalace.repair.ChromaBackend") def test_prune_corrupt_delete_failure_fallback(mock_backend_cls, tmp_path): bad_file = tmp_path / "corrupt_ids.txt" bad_file.write_text("bad1\nbad2\n") mock_col = MagicMock() mock_col.count.side_effect = [10, 8] # Batch delete fails, per-id succeeds mock_col.delete.side_effect = [Exception("batch fail"), None, None] _install_mock_backend(mock_backend_cls, mock_col) repair.prune_corrupt(palace_path=str(tmp_path), confirm=True) assert mock_col.delete.call_count == 3 # 1 batch + 2 individual # ── rebuild_index ───────────────────────────────────────────────────── @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_no_palace(mock_backend_cls, tmp_path): nonexistent = str(tmp_path / "nope") repair.rebuild_index(palace_path=nonexistent) mock_backend_cls.assert_not_called() @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_empty_palace(mock_backend_cls, mock_shutil, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 0 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) repair.rebuild_index(palace_path=str(tmp_path)) mock_backend.delete_collection.assert_not_called() @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_success(mock_backend_cls, mock_shutil, tmp_path): # Create a fake sqlite file sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_new_col = MagicMock() mock_new_col.count.return_value = 2 mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] repair.rebuild_index(palace_path=str(tmp_path)) # Verify: backed up sqlite only (not copytree) mock_shutil.copy2.assert_called_once() assert "chroma.sqlite3" in str(mock_shutil.copy2.call_args) # Verify: deleted and recreated (cosine is the backend default) assert mock_backend.create_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), ] assert mock_backend.delete_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), call(str(tmp_path), "mempalace_drawers__repair_tmp"), ] # Verify: used upsert not add mock_temp_col.upsert.assert_called_once() mock_new_col.upsert.assert_called_once() mock_new_col.add.assert_not_called() @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_ignores_missing_temp_collection_at_start( mock_backend_cls, mock_shutil, tmp_path ): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") def _fake_copy2(src, dst): with open(dst, "w") as handle: handle.write("backup") mock_shutil.copy2.side_effect = _fake_copy2 mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_new_col = MagicMock() mock_new_col.count.return_value = 2 mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] mock_backend.delete_collection.side_effect = [ ValueError("Collection [mempalace_drawers__repair_tmp] does not exist"), None, None, ] repair.rebuild_index(palace_path=str(tmp_path)) assert mock_shutil.copy2.call_count == 1 assert mock_backend.delete_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), call(str(tmp_path), "mempalace_drawers__repair_tmp"), ] def test_delete_collection_if_exists_reraises_unexpected_value_error(): mock_backend = MagicMock() mock_backend.delete_collection.side_effect = ValueError("invalid collection name") with pytest.raises(ValueError, match="invalid collection name"): repair._delete_collection_if_exists(mock_backend, "/palace", "bad/name") @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_error_reading(mock_backend_cls, mock_shutil, tmp_path): mock_backend = MagicMock() mock_backend.get_collection.side_effect = Exception("corrupt") mock_backend_cls.return_value = mock_backend repair.rebuild_index(palace_path=str(tmp_path)) mock_backend.delete_collection.assert_not_called() # ── #1208 truncation safety ─────────────────────────────────────────── def test_check_extraction_safety_passes_when_counts_match(tmp_path): """SQLite reports same count as extracted → no exception.""" with patch("mempalace.repair.sqlite_drawer_count", return_value=500): repair.check_extraction_safety(str(tmp_path), 500) def test_check_extraction_safety_uses_configured_collection(tmp_path): with patch("mempalace.repair.sqlite_drawer_count", return_value=500) as count: repair.check_extraction_safety(str(tmp_path), 500, collection_name="custom_drawers") count.assert_called_once_with(str(tmp_path), "custom_drawers") def test_check_extraction_safety_default_uses_configured_collection(tmp_path): with ( patch("mempalace.repair._drawers_collection_name", return_value="custom_drawers"), patch("mempalace.repair.sqlite_drawer_count", return_value=500) as count, ): repair.check_extraction_safety(str(tmp_path), 500) count.assert_called_once_with(str(tmp_path), "custom_drawers") def test_check_extraction_safety_passes_when_sqlite_unreadable_and_under_cap(tmp_path): """SQLite check fails (None) but extraction is well under the cap → safe.""" with patch("mempalace.repair.sqlite_drawer_count", return_value=None): repair.check_extraction_safety(str(tmp_path), 5_000) def test_check_extraction_safety_aborts_when_sqlite_higher(tmp_path): """SQLite reports more than extracted — the user-reported #1208 case.""" with patch("mempalace.repair.sqlite_drawer_count", return_value=67_580): try: repair.check_extraction_safety(str(tmp_path), 10_000) except repair.TruncationDetected as e: assert e.sqlite_count == 67_580 assert e.extracted == 10_000 assert "67,580" in e.message assert "10,000" in e.message assert "57,580" in e.message # the loss number else: raise AssertionError("expected TruncationDetected") def test_check_extraction_safety_aborts_when_unreadable_and_at_cap(tmp_path): """SQLite unreadable but extraction == default get() cap → suspicious.""" with patch("mempalace.repair.sqlite_drawer_count", return_value=None): try: repair.check_extraction_safety(str(tmp_path), repair.CHROMADB_DEFAULT_GET_LIMIT) except repair.TruncationDetected as e: assert e.sqlite_count is None assert e.extracted == repair.CHROMADB_DEFAULT_GET_LIMIT assert "10,000" in e.message else: raise AssertionError("expected TruncationDetected") def test_check_extraction_safety_override_skips_check(tmp_path): """``confirm_truncation_ok=True`` short-circuits both signals.""" with patch("mempalace.repair.sqlite_drawer_count", return_value=99_999): # Would normally abort — override allows through repair.check_extraction_safety(str(tmp_path), 10_000, confirm_truncation_ok=True) def test_sqlite_drawer_count_returns_none_on_missing_file(tmp_path): """Palace dir exists but no chroma.sqlite3 → None, not crash.""" assert repair.sqlite_drawer_count(str(tmp_path)) is None def test_sqlite_drawer_count_returns_none_on_unreadable_schema(tmp_path): """File exists but isn't a chromadb sqlite → None, not crash.""" sqlite_path = os.path.join(str(tmp_path), "chroma.sqlite3") with open(sqlite_path, "wb") as f: f.write(b"not a sqlite file at all") assert repair.sqlite_drawer_count(str(tmp_path)) is None @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_default_uses_configured_collection(mock_backend_cls, mock_shutil, tmp_path): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_new_col = MagicMock() mock_new_col.count.return_value = 2 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] with ( patch("mempalace.repair._drawers_collection_name", return_value="custom_drawers"), patch("mempalace.repair.sqlite_drawer_count", return_value=2) as count, ): repair.rebuild_index(palace_path=str(tmp_path)) mock_backend.get_collection.assert_called_once_with(str(tmp_path), "custom_drawers") count.assert_called_once_with(str(tmp_path), "custom_drawers") assert mock_backend.create_collection.call_args_list == [ call(str(tmp_path), "custom_drawers__repair_tmp"), call(str(tmp_path), "custom_drawers"), ] assert mock_backend.delete_collection.call_args_list == [ call(str(tmp_path), "custom_drawers__repair_tmp"), call(str(tmp_path), "custom_drawers"), call(str(tmp_path), "custom_drawers__repair_tmp"), ] def test_status_default_uses_configured_drawer_collection(tmp_path): with ( patch("mempalace.repair._drawers_collection_name", return_value="custom_drawers"), patch("mempalace.repair.hnsw_capacity_status") as capacity_status, ): capacity_status.side_effect = [ { "sqlite_count": 1, "hnsw_count": 1, "divergence": 0, "diverged": False, "status": "ok", "message": "", }, { "sqlite_count": 0, "hnsw_count": 0, "divergence": 0, "diverged": False, "status": "ok", "message": "", }, ] repair.status(palace_path=str(tmp_path)) assert capacity_status.call_args_list[0].args == (str(tmp_path), "custom_drawers") assert capacity_status.call_args_list[1].args == (str(tmp_path), "mempalace_closets") @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_aborts_on_truncation_signal(mock_backend_cls, mock_shutil, tmp_path): """rebuild_index honors the safety guard: SQLite says 67k, get() returns 10k → no delete_collection, no upsert, no backup.""" mock_backend = MagicMock() mock_col = MagicMock() mock_col.count.return_value = 10_000 # Single page comes back with 10_000 ids mock_col.get.side_effect = [ { "ids": [f"id{i}" for i in range(10_000)], "documents": ["x"] * 10_000, "metadatas": [{}] * 10_000, }, {"ids": [], "documents": [], "metadatas": []}, ] mock_backend.get_collection.return_value = mock_col mock_backend_cls.return_value = mock_backend with patch("mempalace.repair.sqlite_drawer_count", return_value=67_580): repair.rebuild_index(palace_path=str(tmp_path)) # Guard fired: nothing destructive happened mock_backend.delete_collection.assert_not_called() mock_backend.create_collection.assert_not_called() mock_shutil.copy2.assert_not_called() @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_proceeds_with_override(mock_backend_cls, mock_shutil, tmp_path): """Override flag lets repair proceed even when the guard would fire.""" mock_backend = MagicMock() mock_col = MagicMock() mock_col.count.return_value = 10_000 mock_col.get.side_effect = [ { "ids": [f"id{i}" for i in range(10_000)], "documents": ["x"] * 10_000, "metadatas": [{}] * 10_000, }, {"ids": [], "documents": [], "metadatas": []}, ] mock_temp_col = MagicMock() mock_temp_col.count.return_value = 10_000 mock_new_col = MagicMock() mock_new_col.count.return_value = 10_000 mock_backend.get_collection.return_value = mock_col mock_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] mock_backend_cls.return_value = mock_backend with patch("mempalace.repair.sqlite_drawer_count", return_value=67_580): repair.rebuild_index(palace_path=str(tmp_path), confirm_truncation_ok=True) assert mock_backend.delete_collection.call_count == 3 assert mock_backend.create_collection.call_count == 2 mock_temp_col.upsert.assert_called() mock_new_col.upsert.assert_called() @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_stage_failure_leaves_live_collection_untouched( mock_backend_cls, mock_shutil, tmp_path ): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_temp_col = MagicMock() mock_temp_col.count.return_value = 1 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.return_value = mock_temp_col with pytest.raises(repair.RebuildCollectionError) as excinfo: repair.rebuild_index(palace_path=str(tmp_path)) assert excinfo.value.live_replaced is False assert mock_shutil.copy2.call_count == 1 assert mock_backend.delete_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers__repair_tmp"), ] @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_live_failure_restores_backup(mock_backend_cls, mock_shutil, tmp_path): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") def _fake_copy2(src, dst): with open(dst, "w") as handle: handle.write("backup") mock_shutil.copy2.side_effect = _fake_copy2 mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_new_col = MagicMock() mock_new_col.upsert.side_effect = RuntimeError("live upsert failed") active_backend = MagicMock() active_backend.get_collection.return_value = mock_col active_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] helper_backend = MagicMock() mock_backend_cls.side_effect = [active_backend, helper_backend] with pytest.raises(repair.RebuildCollectionError) as excinfo: repair.rebuild_index(palace_path=str(tmp_path)) assert excinfo.value.live_replaced is True assert mock_shutil.copy2.call_count == 2 assert active_backend.delete_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), ] active_backend.close_palace.assert_called_once_with(str(tmp_path)) helper_backend.close_palace.assert_not_called() @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_live_delete_missing_still_restores_backup( mock_backend_cls, mock_shutil, tmp_path ): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") def _fake_copy2(src, dst): with open(dst, "w") as handle: handle.write("backup") mock_shutil.copy2.side_effect = _fake_copy2 mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, RuntimeError("create failed")] mock_backend.delete_collection.side_effect = [ None, None, None, repair.ChromaNotFoundError("missing"), ] with pytest.raises(repair.RebuildCollectionError) as excinfo: repair.rebuild_index(palace_path=str(tmp_path)) assert excinfo.value.live_replaced is True assert mock_shutil.copy2.call_count == 2 assert mock_backend.delete_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), ] @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_restore_failure_preserves_original_error( mock_backend_cls, mock_shutil, tmp_path, capsys ): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") def _copy2_side_effect(src, dst): if str(src).endswith(".backup"): raise PermissionError("locked sqlite") with open(dst, "w") as handle: handle.write("backup") mock_shutil.copy2.side_effect = _copy2_side_effect mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_new_col = MagicMock() mock_new_col.upsert.side_effect = RuntimeError("live upsert failed") mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] with pytest.raises(repair.RebuildCollectionError) as excinfo: repair.rebuild_index(palace_path=str(tmp_path)) out = capsys.readouterr().out assert "locked sqlite" in out assert "Manual restore required" in out assert "live upsert failed" in str(excinfo.value) @patch("mempalace.repair.ChromaBackend") def test_rebuild_collection_via_temp_keeps_original_error_when_cleanup_fails( mock_backend_cls, ): mock_col = MagicMock() mock_col.count.return_value = 2 mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, RuntimeError("live build failed")] mock_backend.delete_collection.side_effect = [ None, None, RuntimeError("cleanup failed"), ] with pytest.raises(repair.RebuildCollectionError) as excinfo: repair._rebuild_collection_via_temp( mock_backend, "/palace", ["id1", "id2"], ["doc1", "doc2"], [{"wing": "a"}, {"wing": "b"}], batch_size=5000, progress=lambda *args, **kwargs: None, ) assert "live build failed" in str(excinfo.value) assert excinfo.value.live_replaced is True assert mock_backend.delete_collection.call_args_list == [ call("/palace", "mempalace_drawers__repair_tmp"), call("/palace", "mempalace_drawers"), call("/palace", "mempalace_drawers__repair_tmp"), ] @patch("mempalace.repair.shutil") @patch("mempalace.repair.ChromaBackend") def test_rebuild_index_ignores_temp_cleanup_failure_after_success( mock_backend_cls, mock_shutil, tmp_path ): sqlite_path = tmp_path / "chroma.sqlite3" sqlite_path.write_text("fake") def _fake_copy2(src, dst): with open(dst, "w") as handle: handle.write("backup") mock_shutil.copy2.side_effect = _fake_copy2 mock_col = MagicMock() mock_col.count.return_value = 2 mock_col.get.return_value = { "ids": ["id1", "id2"], "documents": ["doc1", "doc2"], "metadatas": [{"wing": "a"}, {"wing": "b"}], } mock_temp_col = MagicMock() mock_temp_col.count.return_value = 2 mock_new_col = MagicMock() mock_new_col.count.return_value = 2 mock_backend = _install_mock_backend(mock_backend_cls, mock_col) mock_backend.create_collection.side_effect = [mock_temp_col, mock_new_col] mock_backend.delete_collection.side_effect = [ None, None, RuntimeError("cleanup failed"), ] repair.rebuild_index(palace_path=str(tmp_path)) assert mock_shutil.copy2.call_count == 1 assert mock_backend.delete_collection.call_args_list == [ call(str(tmp_path), "mempalace_drawers__repair_tmp"), call(str(tmp_path), "mempalace_drawers"), call(str(tmp_path), "mempalace_drawers__repair_tmp"), ] # ── repair_max_seq_id ───────────────────────────────────────────────── # Realistic poisoned values from the 2026-04-20 incident — from the sysdb-10 # b'\x11\x11' + 6 ASCII digit format being misread as big-endian u64. _POISON_VAL = 1_229_822_654_365_970_487 def _seed_poisoned_max_seq_id( palace_path: str, *, drawers_meta_max: int = 502607, closets_meta_max: int = 501418, drawers_vec_poison: int = _POISON_VAL, drawers_meta_poison: int = _POISON_VAL + 1, closets_vec_poison: int = _POISON_VAL + 2, closets_meta_poison: int = _POISON_VAL + 3, ): """Build a minimal palace with poisoned max_seq_id rows. Returns a dict with segment UUIDs and the expected clean values. """ os.makedirs(palace_path, exist_ok=True) db_path = os.path.join(palace_path, "chroma.sqlite3") drawers_coll = "coll-drawers-0000-1111-2222-333344445555" closets_coll = "coll-closets-0000-1111-2222-333344445555" drawers_vec = "seg-drawers-vec-0000-1111-2222-333344445555" drawers_meta = "seg-drawers-meta-0000-1111-2222-33334444555" closets_vec = "seg-closets-vec-0000-1111-2222-333344445555" closets_meta = "seg-closets-meta-0000-1111-2222-33334444555" conn = sqlite3.connect(db_path) conn.executescript( """ CREATE TABLE segments( id TEXT PRIMARY KEY, type TEXT, scope TEXT, collection TEXT ); CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id); CREATE TABLE embeddings( id INTEGER PRIMARY KEY AUTOINCREMENT, segment_id TEXT, embedding_id TEXT, seq_id ); CREATE TABLE embeddings_queue(seq_id INTEGER PRIMARY KEY, topic TEXT, id TEXT); CREATE TABLE collection_metadata(collection_id TEXT, key TEXT, str_value TEXT); """ ) conn.executemany( "INSERT INTO segments VALUES (?, ?, ?, ?)", [ (drawers_vec, "urn:vector", "VECTOR", drawers_coll), (drawers_meta, "urn:metadata", "METADATA", drawers_coll), (closets_vec, "urn:vector", "VECTOR", closets_coll), (closets_meta, "urn:metadata", "METADATA", closets_coll), ], ) conn.executemany( "INSERT INTO max_seq_id(segment_id, seq_id) VALUES (?, ?)", [ (drawers_vec, drawers_vec_poison), (drawers_meta, drawers_meta_poison), (closets_vec, closets_vec_poison), (closets_meta, closets_meta_poison), ], ) # Populate embeddings so the collection-MAX heuristic has data to work with. # drawers METADATA owns the max at drawers_meta_max; closets likewise. for i in range(1, drawers_meta_max + 1, max(drawers_meta_max // 5, 1)): conn.execute( "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", (drawers_meta, f"d-{i}", i), ) conn.execute( "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", (drawers_meta, "d-max", drawers_meta_max), ) for i in range(1, closets_meta_max + 1, max(closets_meta_max // 5, 1)): conn.execute( "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", (closets_meta, f"c-{i}", i), ) conn.execute( "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", (closets_meta, "c-max", closets_meta_max), ) conn.commit() conn.close() return { "drawers_vec": drawers_vec, "drawers_meta": drawers_meta, "closets_vec": closets_vec, "closets_meta": closets_meta, "drawers_meta_max": drawers_meta_max, "closets_meta_max": closets_meta_max, "poisoned_values": { drawers_vec: drawers_vec_poison, drawers_meta: drawers_meta_poison, closets_vec: closets_vec_poison, closets_meta: closets_meta_poison, }, } def test_max_seq_id_detects_poison_rows(tmp_path): palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) db_path = os.path.join(palace, "chroma.sqlite3") # Add one clean row to confirm the threshold actually filters. with sqlite3.connect(db_path) as conn: conn.execute( "INSERT INTO segments VALUES ('seg-clean', 'urn:vector', 'VECTOR', 'coll-clean')" ) conn.execute("INSERT INTO max_seq_id VALUES ('seg-clean', 1234)") conn.commit() found = repair._detect_poisoned_max_seq_ids(db_path) ids = {sid for sid, _ in found} assert ids == { seg["drawers_vec"], seg["drawers_meta"], seg["closets_vec"], seg["closets_meta"], } for sid, val in found: assert val > repair.MAX_SEQ_ID_SANITY_THRESHOLD assert "seg-clean" not in ids def test_max_seq_id_heuristic_uses_collection_max(tmp_path): palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) result = repair.repair_max_seq_id(palace, dry_run=True) # Both drawers segments (VECTOR + METADATA) get the drawers collection max. assert result["after"][seg["drawers_vec"]] == seg["drawers_meta_max"] assert result["after"][seg["drawers_meta"]] == seg["drawers_meta_max"] # Both closets segments get the closets collection max. assert result["after"][seg["closets_vec"]] == seg["closets_meta_max"] assert result["after"][seg["closets_meta"]] == seg["closets_meta_max"] def test_max_seq_id_from_sidecar_exact_restore(tmp_path): palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) # Craft a sidecar with known clean values that differ from the heuristic's # collection-max, so we can prove the sidecar path is preferred. sidecar_path = str(tmp_path / "chroma.sqlite3.sidecar") clean = { seg["drawers_vec"]: 499001, seg["drawers_meta"]: 499002, seg["closets_vec"]: 498001, seg["closets_meta"]: 498002, } with sqlite3.connect(sidecar_path) as conn: conn.execute("CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id INTEGER)") conn.executemany( "INSERT INTO max_seq_id VALUES (?, ?)", list(clean.items()), ) conn.commit() result = repair.repair_max_seq_id(palace, from_sidecar=sidecar_path, assume_yes=True) assert result["segment_repaired"] db_path = os.path.join(palace, "chroma.sqlite3") with sqlite3.connect(db_path) as conn: rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) for sid, val in clean.items(): assert rows[sid] == val def test_max_seq_id_dry_run_no_mutation(tmp_path): palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) db_path = os.path.join(palace, "chroma.sqlite3") with sqlite3.connect(db_path) as conn: before = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) result = repair.repair_max_seq_id(palace, dry_run=True) assert result["dry_run"] is True assert result["segment_repaired"] == [] with sqlite3.connect(db_path) as conn: after = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) assert before == after # Nothing dropped into the palace dir either (no backup on dry-run). assert not any(fn.startswith("chroma.sqlite3.max-seq-id-backup-") for fn in os.listdir(palace)) assert seg["drawers_vec"] in before # sanity def test_max_seq_id_segment_filter(tmp_path): palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) result = repair.repair_max_seq_id(palace, segment=seg["drawers_meta"], assume_yes=True) assert result["segment_repaired"] == [seg["drawers_meta"]] db_path = os.path.join(palace, "chroma.sqlite3") with sqlite3.connect(db_path) as conn: rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) # Filtered segment is fixed; the other three remain poisoned. assert rows[seg["drawers_meta"]] == seg["drawers_meta_max"] for other in (seg["drawers_vec"], seg["closets_vec"], seg["closets_meta"]): assert rows[other] > repair.MAX_SEQ_ID_SANITY_THRESHOLD def test_max_seq_id_heuristic_decodes_blob_embeddings_seq_id(tmp_path): """`embeddings.seq_id` rows can be BLOB-typed on palaces where chromadb 1.5.x has been writing seq_ids natively (8-byte big-endian uint64). `_compute_heuristic_seq_id` must decode those rather than crashing on `int(bytes)` — the recovery feature is meaningless if it can't read the storage format it was designed to repair. """ palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) db_path = os.path.join(palace, "chroma.sqlite3") drawers_meta_max = seg["drawers_meta_max"] blob_max = drawers_meta_max + 7 blob_value = blob_max.to_bytes(8, "big") with sqlite3.connect(db_path) as conn: conn.execute( "INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)", (seg["drawers_meta"], "d-blob-max", blob_value), ) conn.commit() result = repair.repair_max_seq_id(palace, dry_run=True) assert result["after"][seg["drawers_vec"]] == blob_max assert result["after"][seg["drawers_meta"]] == blob_max def test_max_seq_id_no_poison_is_noop(tmp_path): palace = str(tmp_path / "palace") os.makedirs(palace) db_path = os.path.join(palace, "chroma.sqlite3") with sqlite3.connect(db_path) as conn: conn.executescript( """ CREATE TABLE segments( id TEXT PRIMARY KEY, type TEXT, scope TEXT, collection TEXT ); CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id); CREATE TABLE embeddings( id INTEGER PRIMARY KEY AUTOINCREMENT, segment_id TEXT, embedding_id TEXT, seq_id ); INSERT INTO segments VALUES ('s1', 'urn:vector', 'VECTOR', 'coll'); INSERT INTO max_seq_id VALUES ('s1', 12345); """ ) conn.commit() result = repair.repair_max_seq_id(palace, assume_yes=True) assert result["segment_repaired"] == [] assert result["backup"] is None with sqlite3.connect(db_path) as conn: rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) assert rows == {"s1": 12345} def test_max_seq_id_backup_created(tmp_path): palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id(palace) result = repair.repair_max_seq_id(palace, assume_yes=True) assert result["backup"] is not None assert os.path.isfile(result["backup"]) with sqlite3.connect(result["backup"]) as conn: rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall()) # Backup preserves the poisoned values from before the repair. assert rows[seg["drawers_vec"]] == seg["poisoned_values"][seg["drawers_vec"]] assert rows[seg["drawers_meta"]] == seg["poisoned_values"][seg["drawers_meta"]] def test_max_seq_id_rollback_on_verification_failure(tmp_path, monkeypatch): """If the post-update detector still sees poison, raise and leave a backup.""" palace = str(tmp_path / "palace") _seed_poisoned_max_seq_id(palace) real_detect = repair._detect_poisoned_max_seq_ids calls = {"n": 0} def flaky_detect(*args, **kwargs): calls["n"] += 1 # First call (pre-repair) returns the real set so the repair proceeds. if calls["n"] == 1: return real_detect(*args, **kwargs) # Second call (post-repair verification) claims poison still exists. return [("seg-fake-still-poisoned", repair.MAX_SEQ_ID_SANITY_THRESHOLD + 1)] monkeypatch.setattr(repair, "_detect_poisoned_max_seq_ids", flaky_detect) with pytest.raises(repair.MaxSeqIdVerificationError): repair.repair_max_seq_id(palace, assume_yes=True) # A backup file is still present — caller can roll back from it. leftover = [fn for fn in os.listdir(palace) if "max-seq-id-backup-" in fn] assert leftover def test_max_seq_id_preflight_preserves_embeddings_queue(tmp_path): """#1295: default repair preflight must not drop queued writes.""" palace = str(tmp_path / "palace") seg = _seed_poisoned_max_seq_id( palace, drawers_meta_max=102, closets_meta_max=11, ) db_path = os.path.join(palace, "chroma.sqlite3") with sqlite3.connect(db_path) as conn: conn.executemany( "INSERT INTO embeddings_queue(seq_id, topic, id) VALUES (?, ?, ?)", [ (seq_id, "persistent://default/default/mempalace_drawers", f"queued-{seq_id}") for seq_id in range(103, 123) ], ) conn.commit() result = repair.maybe_repair_poisoned_max_seq_id_before_rebuild( palace, assume_yes=True, ) assert result is not None assert result["segment_repaired"] with sqlite3.connect(db_path) as conn: max_seq_rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id")) queue_count = conn.execute("SELECT COUNT(*) FROM embeddings_queue").fetchone()[0] assert max_seq_rows[seg["drawers_vec"]] == seg["drawers_meta_max"] assert max_seq_rows[seg["drawers_meta"]] == seg["drawers_meta_max"] assert max_seq_rows[seg["closets_vec"]] == seg["closets_meta_max"] assert max_seq_rows[seg["closets_meta"]] == seg["closets_meta_max"] # The old legacy rebuild path can discard queued writes. The preflight # repair must leave them on disk for Chroma to drain after the bookmark is # unpoisoned. assert queue_count == 20 def test_rebuild_index_repairs_poisoned_max_seq_id_before_collection_rebuild(tmp_path, capsys): """A poisoned bookmark should short-circuit before the legacy rebuild path.""" palace = str(tmp_path / "palace") _seed_poisoned_max_seq_id(palace) with patch("mempalace.repair.ChromaBackend") as mock_backend: repair.rebuild_index(palace) out = capsys.readouterr().out backend = mock_backend.return_value # repair_max_seq_id may instantiate ChromaBackend to close cached clients # after editing sqlite directly. That is safe. The important thing is that # rebuild_index must not continue into the legacy Chroma collection read / # count / rebuild path after the max_seq_id preflight handles the issue. backend.get_collection.assert_not_called() assert "Detected poisoned max_seq_id rows" in out assert "non-destructive max_seq_id repair" in out # ── extract_via_sqlite + rebuild_from_sqlite (#1308) ────────────────── # # These tests build real chromadb palaces in tmp_path rather than mocking # the SQLite layer. The bug class they guard against is "extraction sees # different rows than chromadb stored" — the only honest check is to let # chromadb actually write rows and then read them back via the SQLite # bypass. Mocking the SQLite cursor would defeat the test. def _seed_palace(palace_path, collection_name, rows): """Build a real chromadb palace at ``palace_path`` and add ``rows``. ``rows`` is a list of ``(id, document, metadata)`` tuples. """ from mempalace.backends.chroma import ChromaBackend backend = ChromaBackend() try: col = backend.create_collection(str(palace_path), collection_name) col.upsert( ids=[r[0] for r in rows], documents=[r[1] for r in rows], metadatas=[r[2] for r in rows], ) finally: # Release chromadb's rust-side SQLite/HNSW file locks before the # caller proceeds. Without this, an in-place rebuild on Windows # fails with WinError 32 on data_level0.bin during the archive # rename (cf. PR #1310 test-windows job). backend.close() def test_extract_via_sqlite_returns_all_rows_with_metadata(tmp_path): """Round-trip: a chromadb palace with N upserted rows returns those same N rows when read via the SQLite bypass. Catches: anyone who breaks the segments/embeddings/embedding_metadata JOIN, swaps the metadata vs vector segment, or changes how the document is stored under the ``chroma:document`` key. Also asserts every embedding row underlying the extraction lives in a ``segments.scope = 'METADATA'`` segment. Document + metadata rows are stored under METADATA in Chroma's segment layout while HNSW files live under ``VECTOR``; locking that assumption in here means a future refactor that accidentally points the JOIN at ``VECTOR`` fails this test instead of silently regressing the recovery path. """ rows = [ (f"drawer_{i:03d}", f"document body {i}", {"wing": "test_wing", "room": f"r{i % 3}"}) for i in range(25) ] _seed_palace(tmp_path, "mempalace_drawers", rows) extracted = list(repair.extract_via_sqlite(str(tmp_path), "mempalace_drawers")) assert len(extracted) == 25 by_id = {emb_id: (doc, meta) for emb_id, doc, meta in extracted} assert set(by_id) == {r[0] for r in rows} for emb_id, doc, meta in rows: got_doc, got_meta = by_id[emb_id] assert got_doc == doc, f"document mangled for {emb_id}" assert got_meta == meta, f"metadata mangled for {emb_id}: {got_meta!r}" # Lock the segment-scope assumption directly against Chroma's on-disk # layout so a future change that points the extraction JOIN at the # VECTOR segment cannot pass this test. Query each extracted row's # backing segment scope via the same SQLite tables ``extract_via_sqlite`` # reads from. sqlite_path = os.path.join(str(tmp_path), "chroma.sqlite3") conn = sqlite3.connect(f"file:{sqlite_path}?mode=ro", uri=True) try: scopes = { scope for (scope,) in conn.execute( """ SELECT DISTINCT s.scope FROM embeddings e JOIN segments s ON e.segment_id = s.id JOIN collections c ON s.collection = c.id WHERE c.name = ? AND e.embedding_id IN ({}) """.format(",".join("?" * len(extracted))), ("mempalace_drawers", *(emb_id for emb_id, _, _ in extracted)), ) } finally: conn.close() assert scopes == {"METADATA"}, ( f"extraction is reading from segments scoped {scopes!r}; only " "'METADATA' should back the document/metadata rows. If Chroma's " "segment layout changed, update extract_via_sqlite's WHERE clause." ) def test_extract_via_sqlite_preserves_typed_metadata(tmp_path): """Chromadb stores int / float / bool / string in distinct typed columns. Extraction must round-trip the original type, not coerce everything to string. Catches: a regression where the SELECT order changes and ints come back as None, or where the column-resolution rule prefers the wrong column. """ rows = [ ( "drawer_typed", "doc", { "wing": "w", "chunk_index": 7, # int "score": 0.42, # float "is_active": True, # bool }, ), ] _seed_palace(tmp_path, "mempalace_drawers", rows) extracted = list(repair.extract_via_sqlite(str(tmp_path), "mempalace_drawers")) assert len(extracted) == 1 _, _, meta = extracted[0] assert meta["chunk_index"] == 7 and isinstance(meta["chunk_index"], int) assert meta["score"] == 0.42 and isinstance(meta["score"], float) assert meta["is_active"] is True assert meta["wing"] == "w" def test_extract_via_sqlite_unknown_collection_yields_nothing(tmp_path): """Asking for a collection that isn't in the palace must return an empty iterator, not silently fall back to another collection's metadata segment. Seeds two real collections and queries for a third name so a regression that drops the WHERE c.name=? filter would leak rows from the seeded collections rather than passing. """ _seed_palace(tmp_path, "mempalace_drawers", [("d1", "doc", {"wing": "w"})]) _seed_palace(tmp_path, "mempalace_closets", [("c1", "abbrev", {"wing": "w"})]) assert list(repair.extract_via_sqlite(str(tmp_path), "not_a_real_collection")) == [] def test_extract_via_sqlite_missing_palace_yields_nothing(tmp_path): """No chroma.sqlite3 → empty iterator, no exception. Callers depend on this when probing speculatively.""" empty = tmp_path / "no_palace_here" empty.mkdir() assert list(repair.extract_via_sqlite(str(empty), "mempalace_drawers")) == [] def test_rebuild_from_sqlite_roundtrips_via_real_chromadb(tmp_path): """End-to-end: seed source palace, rebuild into a fresh dest, then open dest with a fresh ChromaBackend and verify ``count()`` and metadata filters return the original rows. Also asserts a closet document round-trips so a future regression that re-embeds with the wrong EF or swaps drawer/closet content would fail here. This is the single most important regression guard. If ``rebuild_from_sqlite`` silently drops rows or mangles metadata, no other test in this file would catch it because they all stop at the extraction layer. """ from mempalace.backends.chroma import ChromaBackend source = tmp_path / "source" dest = tmp_path / "dest" rows = [ (f"drawer_{i:03d}", f"body {i}", {"wing": "alpha" if i % 2 else "beta", "room": "r0"}) for i in range(40) ] _seed_palace(source, "mempalace_drawers", rows) _seed_palace( source, "mempalace_closets", [("closet_x", "abbrev pointer →drawer_001", {"wing": "alpha"})], ) counts = repair.rebuild_from_sqlite(str(source), str(dest)) assert counts == {"mempalace_drawers": 40, "mempalace_closets": 1} backend = ChromaBackend() drawers = backend.get_collection(str(dest), "mempalace_drawers") assert drawers.count() == 40 alpha = drawers.get(where={"wing": "alpha"}) assert len(alpha["ids"]) == 20 # Spot-check that document text round-trips for one specific drawer # — protects against a regression where extraction or upsert order # silently swaps document bodies between IDs. one = drawers.get(ids=["drawer_007"], include=["documents", "metadatas"]) assert one["documents"] == ["body 7"] assert one["metadatas"][0]["wing"] == "alpha" # Closets: the AAAK index layer. Re-embedded with the same EF so a # known closet ID and its document body must come back intact. closets = backend.get_collection(str(dest), "mempalace_closets") assert closets.count() == 1 closet_row = closets.get(ids=["closet_x"], include=["documents", "metadatas"]) assert closet_row["documents"] == ["abbrev pointer →drawer_001"] assert closet_row["metadatas"][0] == {"wing": "alpha"} def test_rebuild_from_sqlite_refuses_existing_dest(tmp_path): """Refuse to write into a directory that already exists when source and dest differ. Without this, an unattended re-run would silently interleave a partial rebuild with whatever's already at dest. """ source = tmp_path / "source" dest = tmp_path / "dest" _seed_palace(source, "mempalace_drawers", [("d1", "doc", {"wing": "w"})]) dest.mkdir() # Drop a marker file so we can prove the dir wasn't touched. (dest / "marker.txt").write_text("preexisting") counts = repair.rebuild_from_sqlite(str(source), str(dest)) assert counts == {} assert (dest / "marker.txt").read_text() == "preexisting" assert not (dest / "chroma.sqlite3").exists() def test_rebuild_from_sqlite_in_place_archives_when_opted_in(tmp_path): """In-place rebuild (source == dest) with ``archive_existing_dest=True`` must move the original aside to ``.pre-rebuild-`` and read from the archive — the original drawer rows must survive in the new palace, AND the archive itself must still contain the original rows. Catches: a refactor that moves the original out but then reads from the now-empty original location, producing an empty rebuild; also catches a swap that empties the archive after reading. """ palace = tmp_path / "palace" rows = [(f"d{i}", f"body {i}", {"wing": "w", "room": "r"}) for i in range(15)] _seed_palace(palace, "mempalace_drawers", rows) counts = repair.rebuild_from_sqlite(str(palace), str(palace), archive_existing_dest=True) assert counts["mempalace_drawers"] == 15 archives = [p for p in tmp_path.iterdir() if p.name.startswith("palace.pre-rebuild-")] assert len(archives) == 1 assert (archives[0] / "chroma.sqlite3").exists() # Archive must still hold the same row count via the SQLite bypass — # proves the archive wasn't silently truncated as a side effect. archived_rows = list(repair.extract_via_sqlite(str(archives[0]), "mempalace_drawers")) assert len(archived_rows) == 15 from mempalace.backends.chroma import ChromaBackend rebuilt = ChromaBackend().get_collection(str(palace), "mempalace_drawers") assert rebuilt.count() == 15 def test_rebuild_from_sqlite_in_place_refuses_without_archive_flag(tmp_path): """Source == dest without archive flag must abort untouched. The most catastrophic possible regression of this code path is silently deleting the only copy of the user's data.""" palace = tmp_path / "palace" _seed_palace(palace, "mempalace_drawers", [("d1", "doc", {"wing": "w"})]) sqlite_before = (palace / "chroma.sqlite3").stat().st_size counts = repair.rebuild_from_sqlite(str(palace), str(palace)) assert counts == {} # Same file, untouched. assert (palace / "chroma.sqlite3").stat().st_size == sqlite_before archives = [p for p in tmp_path.iterdir() if "pre-rebuild" in p.name] assert archives == [] def test_rebuild_from_sqlite_source_missing_chroma_db(tmp_path): """Source dir exists but has no chroma.sqlite3 → returns empty, leaves dest untouched.""" source = tmp_path / "source" source.mkdir() (source / "stray_file").write_text("not a palace") dest = tmp_path / "dest" counts = repair.rebuild_from_sqlite(str(source), str(dest)) assert counts == {} assert not dest.exists() def test_rebuild_from_sqlite_in_place_validates_source_before_archiving(tmp_path): """In-place + archive_existing_dest=True with a dir that lacks chroma.sqlite3 must NOT rename the dir before bailing. An earlier revision archived first and validated second, leaving the user with a renamed empty dir to manually undo. Catches that ordering bug. """ palace = tmp_path / "palace" palace.mkdir() (palace / "marker.txt").write_text("not a real palace") counts = repair.rebuild_from_sqlite(str(palace), str(palace), archive_existing_dest=True) assert counts == {} # No archive created — original dir still in place with its marker. assert palace.exists() assert (palace / "marker.txt").read_text() == "not a real palace" archives = [p for p in tmp_path.iterdir() if "pre-rebuild" in p.name] assert archives == [] def test_rebuild_from_sqlite_raises_on_upsert_failure(tmp_path, monkeypatch): """Mid-batch upsert failure must raise ``RebuildPartialError`` and surface the failed collection + archive path so the user can recover. Without this, an unattended script gets exit-code-zero on a partial rebuild and the user discovers the data loss only when search starts returning fewer hits. """ palace = tmp_path / "palace" rows = [(f"d{i}", f"body {i}", {"wing": "w", "room": "r"}) for i in range(5)] _seed_palace(palace, "mempalace_drawers", rows) # Make the very first upsert raise so we don't depend on batch # boundary behavior. Patching ChromaCollection.upsert (the wrapper # mempalace's backend returns) keeps the failure path realistic. # ``monkeypatch`` is pytest's built-in fixture that auto-restores # the original attribute when the test exits, so we don't need to # undo this manually. from mempalace.backends.chroma import ChromaCollection def boom(self, **kwargs): raise RuntimeError("simulated chromadb upsert failure") monkeypatch.setattr(ChromaCollection, "upsert", boom) with pytest.raises(repair.RebuildPartialError) as excinfo: repair.rebuild_from_sqlite(str(palace), str(palace), archive_existing_dest=True) err = excinfo.value assert err.failed_collection == "mempalace_drawers" assert err.partial_counts.get("mempalace_drawers") == 0 assert err.archive_path is not None assert os.path.isfile(os.path.join(err.archive_path, "chroma.sqlite3")) assert err.dest_palace == os.path.abspath(str(palace)) def test_rebuild_from_sqlite_honors_configured_drawer_collection_name(tmp_path, monkeypatch): """A user with a non-default drawers collection name (set via ``MempalaceConfig().collection_name``) must have THAT collection rebuilt — not the hardcoded ``mempalace_drawers``. Catches: a regression where the recovery path silently rebuilds the default-name collection on a custom-named palace, leaving the user's actual data unrebuilt while reporting "rebuild complete." This is the failure mode reviewer mjc flagged on PR #1310 as needing to line up with the configured-collection-name work in #1312. Closets stay fixed (``mempalace_closets``) by design — the AAAK index references drawer IDs by string and is not per-deployment configurable. Strategy: monkeypatch the lazy resolver so the test is hermetic and does not depend on the global config file or env state. """ from mempalace.backends.chroma import ChromaBackend custom_drawers = "custom_drawers_xyz" monkeypatch.setattr(repair, "_drawers_collection_name", lambda: custom_drawers) source = tmp_path / "source" dest = tmp_path / "dest" drawer_rows = [(f"d{i}", f"body {i}", {"wing": "alpha"}) for i in range(3)] closet_rows = [("closet_a", "abbrev →d0", {"wing": "alpha"})] _seed_palace(source, custom_drawers, drawer_rows) _seed_palace(source, "mempalace_closets", closet_rows) counts = repair.rebuild_from_sqlite(str(source), str(dest)) # Rebuilt under the custom name, not under the default "mempalace_drawers". assert counts == {custom_drawers: 3, "mempalace_closets": 1} backend = ChromaBackend() rebuilt_drawers = backend.get_collection(str(dest), custom_drawers) assert rebuilt_drawers.count() == 3 # Default-name collection must NOT exist in dest — proves we did not # silently fall back to the hardcoded name during rebuild. try: rebuilt_default = backend.get_collection(str(dest), "mempalace_drawers") # If get_collection returns without raising, count() should be 0 # (chromadb may auto-create on get with some EFs); a non-zero # count would mean we wrote rows to the wrong collection. assert rebuilt_default.count() == 0, ( "rebuild leaked rows into the default-name collection on a " "custom-name palace — recovery wrote to the wrong collection." ) except Exception: pass # Expected: collection wasn't created.