203 lines
6.5 KiB
Python
203 lines
6.5 KiB
Python
"""Tests for destructive-operation safety in mempalace.migrate."""
|
|
|
|
import os
|
|
from types import SimpleNamespace
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from mempalace.migrate import collection_write_roundtrip_works, _restore_stale_palace, migrate
|
|
|
|
|
|
def test_migrate_requires_palace_database(tmp_path, capsys):
|
|
palace_dir = tmp_path / "palace"
|
|
palace_dir.mkdir()
|
|
|
|
result = migrate(str(palace_dir))
|
|
|
|
out = capsys.readouterr().out
|
|
assert result is False
|
|
assert "No palace database found" in out
|
|
|
|
|
|
def test_migrate_aborts_without_confirmation(tmp_path, capsys):
|
|
palace_dir = tmp_path / "palace"
|
|
palace_dir.mkdir()
|
|
# Presence of chroma.sqlite3 is the safety gate; validity is mocked below.
|
|
(palace_dir / "chroma.sqlite3").write_text("db")
|
|
|
|
mock_chromadb = SimpleNamespace(
|
|
__version__="0.6.0",
|
|
PersistentClient=MagicMock(side_effect=Exception("unreadable")),
|
|
)
|
|
|
|
with (
|
|
patch.dict("sys.modules", {"chromadb": mock_chromadb}),
|
|
patch("mempalace.migrate.detect_chromadb_version", return_value="0.5.x"),
|
|
patch(
|
|
"mempalace.migrate.extract_drawers_from_sqlite",
|
|
return_value=[{"id": "id1", "document": "doc", "metadata": {"wing": "w", "room": "r"}}],
|
|
),
|
|
patch("builtins.input", return_value="n"),
|
|
patch("mempalace.migrate.shutil.copytree") as mock_copytree,
|
|
patch("mempalace.migrate.shutil.rmtree") as mock_rmtree,
|
|
):
|
|
result = migrate(str(palace_dir))
|
|
|
|
out = capsys.readouterr().out
|
|
assert result is False
|
|
assert "Aborted." in out
|
|
mock_copytree.assert_not_called()
|
|
mock_rmtree.assert_not_called()
|
|
|
|
|
|
def test_restore_stale_palace_with_clean_destination(tmp_path):
|
|
"""Rollback when no partial copy exists at palace_path."""
|
|
palace_path = tmp_path / "palace"
|
|
stale_path = tmp_path / "palace.old"
|
|
stale_path.mkdir()
|
|
(stale_path / "chroma.sqlite3").write_bytes(b"original")
|
|
|
|
_restore_stale_palace(str(palace_path), str(stale_path))
|
|
|
|
assert palace_path.is_dir()
|
|
assert (palace_path / "chroma.sqlite3").read_bytes() == b"original"
|
|
assert not stale_path.exists()
|
|
|
|
|
|
def test_restore_stale_palace_clears_partial_copy(tmp_path):
|
|
"""Rollback must remove a partially-copied palace_path before restoring.
|
|
|
|
Simulates the Qodo-reported hazard: shutil.move() began creating
|
|
palace_path, then failed. A bare os.replace(stale, palace_path) would
|
|
trip on the existing destination; _restore_stale_palace must clear it.
|
|
"""
|
|
palace_path = tmp_path / "palace"
|
|
stale_path = tmp_path / "palace.old"
|
|
|
|
stale_path.mkdir()
|
|
(stale_path / "chroma.sqlite3").write_bytes(b"original")
|
|
|
|
palace_path.mkdir()
|
|
(palace_path / "half-copied.bin").write_bytes(b"garbage")
|
|
|
|
_restore_stale_palace(str(palace_path), str(stale_path))
|
|
|
|
assert palace_path.is_dir()
|
|
assert (palace_path / "chroma.sqlite3").read_bytes() == b"original"
|
|
assert not (palace_path / "half-copied.bin").exists()
|
|
assert not stale_path.exists()
|
|
|
|
|
|
def test_restore_stale_palace_logs_and_swallows_on_failure(tmp_path, capsys):
|
|
"""If restore itself fails, log both paths — don't raise from rollback."""
|
|
palace_path = tmp_path / "palace"
|
|
stale_path = tmp_path / "palace.old"
|
|
stale_path.mkdir()
|
|
|
|
# Force os.replace to fail deterministically.
|
|
with patch("mempalace.migrate.os.replace", side_effect=OSError("boom")):
|
|
_restore_stale_palace(str(palace_path), str(stale_path))
|
|
|
|
out = capsys.readouterr().out
|
|
assert "CRITICAL" in out
|
|
assert os.fspath(palace_path) in out
|
|
assert os.fspath(stale_path) in out
|
|
|
|
|
|
class _FakeGetResult:
|
|
def __init__(self, ids):
|
|
self.ids = ids
|
|
|
|
|
|
class _WritableFakeCollection:
|
|
def __init__(self):
|
|
self.ids = set()
|
|
self.deleted = []
|
|
|
|
def upsert(self, *, ids, documents, metadatas):
|
|
self.ids.update(ids)
|
|
|
|
def get(self, *, ids, include=None):
|
|
return _FakeGetResult([drawer_id for drawer_id in ids if drawer_id in self.ids])
|
|
|
|
def delete(self, *, ids=None, where=None):
|
|
for drawer_id in ids or []:
|
|
self.ids.discard(drawer_id)
|
|
self.deleted.append(drawer_id)
|
|
|
|
|
|
class _SilentWriteDropCollection(_WritableFakeCollection):
|
|
def upsert(self, *, ids, documents, metadatas):
|
|
return None
|
|
|
|
|
|
class _SilentDeleteDropCollection(_WritableFakeCollection):
|
|
def delete(self, *, ids=None, where=None):
|
|
self.deleted.extend(ids or [])
|
|
|
|
|
|
def test_collection_write_roundtrip_works_when_probe_persists_and_deletes():
|
|
col = _WritableFakeCollection()
|
|
|
|
assert collection_write_roundtrip_works(col) is True
|
|
assert col.ids == set()
|
|
assert len(col.deleted) == 1
|
|
|
|
|
|
def test_collection_write_roundtrip_fails_when_upsert_silently_drops():
|
|
col = _SilentWriteDropCollection()
|
|
|
|
assert collection_write_roundtrip_works(col) is False
|
|
assert col.ids == set()
|
|
|
|
|
|
def test_collection_write_roundtrip_fails_when_delete_silently_drops():
|
|
col = _SilentDeleteDropCollection()
|
|
|
|
assert collection_write_roundtrip_works(col) is False
|
|
assert len(col.ids) == 1
|
|
|
|
|
|
def test_migrate_dry_run_rebuilds_when_collection_is_readable_but_not_writable(tmp_path, capsys):
|
|
palace_dir = tmp_path / "palace"
|
|
palace_dir.mkdir()
|
|
(palace_dir / "chroma.sqlite3").write_text("db")
|
|
|
|
fake_col = MagicMock()
|
|
fake_col.count.return_value = 102
|
|
|
|
drawers = [
|
|
{
|
|
"id": "id1",
|
|
"document": "hello",
|
|
"metadata": {"wing": "test-wing", "room": "general"},
|
|
}
|
|
]
|
|
|
|
with (
|
|
patch("mempalace.migrate.detect_chromadb_version", return_value="1.x"),
|
|
patch("mempalace.backends.chroma.ChromaBackend") as mock_backend,
|
|
patch(
|
|
"mempalace.migrate.collection_write_roundtrip_works", return_value=False
|
|
) as mock_probe,
|
|
patch(
|
|
"mempalace.migrate.extract_drawers_from_sqlite", return_value=drawers
|
|
) as mock_extract,
|
|
):
|
|
mock_backend.backend_version.return_value = "1.5.8"
|
|
mock_backend.return_value.get_collection.return_value = fake_col
|
|
|
|
result = migrate(str(palace_dir), dry_run=True)
|
|
|
|
out = capsys.readouterr().out
|
|
|
|
assert result is True
|
|
mock_probe.assert_called_once_with(fake_col)
|
|
mock_extract.assert_called_once_with(
|
|
os.path.join(os.path.abspath(os.fspath(palace_dir)), "chroma.sqlite3")
|
|
)
|
|
|
|
assert "readable by chromadb 1.5.8, but write/delete verification failed" in out
|
|
assert "Rebuilding from SQLite" in out
|
|
assert "Extracted 1 drawers from SQLite" in out
|
|
assert "DRY RUN" in out
|