fix(migrate): verify write roundtrip before bailout
This commit is contained in:
+63
-2
@@ -22,6 +22,7 @@ import errno
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
@@ -155,6 +156,55 @@ def confirm_destructive_action(
|
||||
return True
|
||||
|
||||
|
||||
def _result_ids(result) -> list:
|
||||
"""Return ids from either the backend typed result or raw Chroma dict."""
|
||||
|
||||
if isinstance(result, dict):
|
||||
return list(result.get("ids") or [])
|
||||
|
||||
return list(getattr(result, "ids", []) or [])
|
||||
|
||||
|
||||
def collection_write_roundtrip_works(col) -> bool:
|
||||
"""Return True only if the collection can upsert, read, and delete.
|
||||
|
||||
Some ChromaDB 0.6.x -> 1.5.x migrated collections remain readable while
|
||||
writes and deletes silently no-op. A plain ``count()`` probe misses that
|
||||
failure mode, so migrate must verify an actual write round-trip before
|
||||
deciding that no rebuild is needed.
|
||||
"""
|
||||
|
||||
probe_id = f"_mempalace_migrate_probe_{uuid.uuid4().hex}"
|
||||
probe_doc = "mempalace migrate write round-trip probe"
|
||||
probe_meta = {
|
||||
"wing": "_mempalace_probe",
|
||||
"room": "_mempalace_probe",
|
||||
"source_file": "mempalace_migrate_probe",
|
||||
"chunk_index": 0,
|
||||
}
|
||||
|
||||
try:
|
||||
col.upsert(
|
||||
ids=[probe_id],
|
||||
documents=[probe_doc],
|
||||
metadatas=[probe_meta],
|
||||
)
|
||||
|
||||
after_upsert = col.get(ids=[probe_id], include=[])
|
||||
if probe_id not in _result_ids(after_upsert):
|
||||
return False
|
||||
|
||||
col.delete(ids=[probe_id])
|
||||
|
||||
after_delete = col.get(ids=[probe_id], include=[])
|
||||
if probe_id in _result_ids(after_delete):
|
||||
return False
|
||||
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def migrate(palace_path: str, dry_run: bool = False, confirm: bool = False):
|
||||
"""Migrate a palace to the currently installed ChromaDB version."""
|
||||
from .backends.chroma import ChromaBackend
|
||||
@@ -179,13 +229,24 @@ def migrate(palace_path: str, dry_run: bool = False, confirm: bool = False):
|
||||
print(f" Source: ChromaDB {source_version}")
|
||||
print(f" Target: ChromaDB {target_version}")
|
||||
|
||||
# Try reading with current chromadb first
|
||||
# Try reading and writing with current chromadb first.
|
||||
#
|
||||
# A plain count() is not enough: some 0.6.x -> 1.5.x migrated collections
|
||||
# are readable but silently drop upsert/delete operations. In that state,
|
||||
# migrate must rebuild from SQLite instead of returning "No migration needed."
|
||||
try:
|
||||
col = ChromaBackend().get_collection(palace_path, "mempalace_drawers")
|
||||
count = col.count()
|
||||
print(f"\n Palace is already readable by chromadb {target_version}.")
|
||||
|
||||
if collection_write_roundtrip_works(col):
|
||||
print(f"\n Palace is already readable and writable by chromadb {target_version}.")
|
||||
print(f" {count} drawers found. No migration needed.")
|
||||
return True
|
||||
|
||||
print(
|
||||
f"\n Palace is readable by chromadb {target_version}, but write/delete verification failed."
|
||||
)
|
||||
print(" Rebuilding from SQLite to restore native write/delete behavior...")
|
||||
except Exception:
|
||||
print(f"\n Palace is NOT readable by chromadb {target_version}.")
|
||||
print(" Extracting from SQLite directly...")
|
||||
|
||||
+100
-1
@@ -4,7 +4,7 @@ import os
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from mempalace.migrate import _restore_stale_palace, migrate
|
||||
from mempalace.migrate import collection_write_roundtrip_works, _restore_stale_palace, migrate
|
||||
|
||||
|
||||
def test_migrate_requires_palace_database(tmp_path, capsys):
|
||||
@@ -101,3 +101,102 @@ def test_restore_stale_palace_logs_and_swallows_on_failure(tmp_path, capsys):
|
||||
assert "CRITICAL" in out
|
||||
assert os.fspath(palace_path) in out
|
||||
assert os.fspath(stale_path) in out
|
||||
|
||||
|
||||
class _FakeGetResult:
|
||||
def __init__(self, ids):
|
||||
self.ids = ids
|
||||
|
||||
|
||||
class _WritableFakeCollection:
|
||||
def __init__(self):
|
||||
self.ids = set()
|
||||
self.deleted = []
|
||||
|
||||
def upsert(self, *, ids, documents, metadatas):
|
||||
self.ids.update(ids)
|
||||
|
||||
def get(self, *, ids, include=None):
|
||||
return _FakeGetResult([drawer_id for drawer_id in ids if drawer_id in self.ids])
|
||||
|
||||
def delete(self, *, ids=None, where=None):
|
||||
for drawer_id in ids or []:
|
||||
self.ids.discard(drawer_id)
|
||||
self.deleted.append(drawer_id)
|
||||
|
||||
|
||||
class _SilentWriteDropCollection(_WritableFakeCollection):
|
||||
def upsert(self, *, ids, documents, metadatas):
|
||||
return None
|
||||
|
||||
|
||||
class _SilentDeleteDropCollection(_WritableFakeCollection):
|
||||
def delete(self, *, ids=None, where=None):
|
||||
self.deleted.extend(ids or [])
|
||||
|
||||
|
||||
def test_collection_write_roundtrip_works_when_probe_persists_and_deletes():
|
||||
col = _WritableFakeCollection()
|
||||
|
||||
assert collection_write_roundtrip_works(col) is True
|
||||
assert col.ids == set()
|
||||
assert len(col.deleted) == 1
|
||||
|
||||
|
||||
def test_collection_write_roundtrip_fails_when_upsert_silently_drops():
|
||||
col = _SilentWriteDropCollection()
|
||||
|
||||
assert collection_write_roundtrip_works(col) is False
|
||||
assert col.ids == set()
|
||||
|
||||
|
||||
def test_collection_write_roundtrip_fails_when_delete_silently_drops():
|
||||
col = _SilentDeleteDropCollection()
|
||||
|
||||
assert collection_write_roundtrip_works(col) is False
|
||||
assert len(col.ids) == 1
|
||||
|
||||
|
||||
def test_migrate_dry_run_rebuilds_when_collection_is_readable_but_not_writable(tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
(palace_dir / "chroma.sqlite3").write_text("db")
|
||||
|
||||
fake_col = MagicMock()
|
||||
fake_col.count.return_value = 102
|
||||
|
||||
drawers = [
|
||||
{
|
||||
"id": "id1",
|
||||
"document": "hello",
|
||||
"metadata": {"wing": "test-wing", "room": "general"},
|
||||
}
|
||||
]
|
||||
|
||||
with (
|
||||
patch("mempalace.migrate.detect_chromadb_version", return_value="1.x"),
|
||||
patch("mempalace.backends.chroma.ChromaBackend") as mock_backend,
|
||||
patch(
|
||||
"mempalace.migrate.collection_write_roundtrip_works", return_value=False
|
||||
) as mock_probe,
|
||||
patch(
|
||||
"mempalace.migrate.extract_drawers_from_sqlite", return_value=drawers
|
||||
) as mock_extract,
|
||||
):
|
||||
mock_backend.backend_version.return_value = "1.5.8"
|
||||
mock_backend.return_value.get_collection.return_value = fake_col
|
||||
|
||||
result = migrate(str(palace_dir), dry_run=True)
|
||||
|
||||
out = capsys.readouterr().out
|
||||
|
||||
assert result is True
|
||||
mock_probe.assert_called_once_with(fake_col)
|
||||
mock_extract.assert_called_once_with(
|
||||
os.path.join(os.path.abspath(os.fspath(palace_dir)), "chroma.sqlite3")
|
||||
)
|
||||
|
||||
assert "readable by chromadb 1.5.8, but write/delete verification failed" in out
|
||||
assert "Rebuilding from SQLite" in out
|
||||
assert "Extracted 1 drawers from SQLite" in out
|
||||
assert "DRY RUN" in out
|
||||
|
||||
Reference in New Issue
Block a user