Files

203 lines
6.5 KiB
Python
Raw Permalink Normal View History

2026-04-12 22:21:42 +00:00
"""Tests for destructive-operation safety in mempalace.migrate."""
import os
2026-04-12 22:21:42 +00:00
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from mempalace.migrate import collection_write_roundtrip_works, _restore_stale_palace, migrate
2026-04-12 22:21:42 +00:00
def test_migrate_requires_palace_database(tmp_path, capsys):
palace_dir = tmp_path / "palace"
palace_dir.mkdir()
result = migrate(str(palace_dir))
out = capsys.readouterr().out
assert result is False
assert "No palace database found" in out
def test_migrate_aborts_without_confirmation(tmp_path, capsys):
palace_dir = tmp_path / "palace"
palace_dir.mkdir()
2026-04-12 22:27:40 +00:00
# Presence of chroma.sqlite3 is the safety gate; validity is mocked below.
2026-04-12 22:21:42 +00:00
(palace_dir / "chroma.sqlite3").write_text("db")
2026-04-12 22:24:41 +00:00
mock_chromadb = SimpleNamespace(
__version__="0.6.0",
PersistentClient=MagicMock(side_effect=Exception("unreadable")),
)
2026-04-12 22:21:42 +00:00
with (
patch.dict("sys.modules", {"chromadb": mock_chromadb}),
patch("mempalace.migrate.detect_chromadb_version", return_value="0.5.x"),
patch(
"mempalace.migrate.extract_drawers_from_sqlite",
return_value=[{"id": "id1", "document": "doc", "metadata": {"wing": "w", "room": "r"}}],
),
patch("builtins.input", return_value="n"),
patch("mempalace.migrate.shutil.copytree") as mock_copytree,
patch("mempalace.migrate.shutil.rmtree") as mock_rmtree,
):
result = migrate(str(palace_dir))
out = capsys.readouterr().out
assert result is False
assert "Aborted." in out
mock_copytree.assert_not_called()
mock_rmtree.assert_not_called()
def test_restore_stale_palace_with_clean_destination(tmp_path):
"""Rollback when no partial copy exists at palace_path."""
palace_path = tmp_path / "palace"
stale_path = tmp_path / "palace.old"
stale_path.mkdir()
(stale_path / "chroma.sqlite3").write_bytes(b"original")
_restore_stale_palace(str(palace_path), str(stale_path))
assert palace_path.is_dir()
assert (palace_path / "chroma.sqlite3").read_bytes() == b"original"
assert not stale_path.exists()
def test_restore_stale_palace_clears_partial_copy(tmp_path):
"""Rollback must remove a partially-copied palace_path before restoring.
Simulates the Qodo-reported hazard: shutil.move() began creating
palace_path, then failed. A bare os.replace(stale, palace_path) would
trip on the existing destination; _restore_stale_palace must clear it.
"""
palace_path = tmp_path / "palace"
stale_path = tmp_path / "palace.old"
stale_path.mkdir()
(stale_path / "chroma.sqlite3").write_bytes(b"original")
palace_path.mkdir()
(palace_path / "half-copied.bin").write_bytes(b"garbage")
_restore_stale_palace(str(palace_path), str(stale_path))
assert palace_path.is_dir()
assert (palace_path / "chroma.sqlite3").read_bytes() == b"original"
assert not (palace_path / "half-copied.bin").exists()
assert not stale_path.exists()
def test_restore_stale_palace_logs_and_swallows_on_failure(tmp_path, capsys):
"""If restore itself fails, log both paths — don't raise from rollback."""
palace_path = tmp_path / "palace"
stale_path = tmp_path / "palace.old"
stale_path.mkdir()
# Force os.replace to fail deterministically.
with patch("mempalace.migrate.os.replace", side_effect=OSError("boom")):
_restore_stale_palace(str(palace_path), str(stale_path))
out = capsys.readouterr().out
assert "CRITICAL" in out
assert os.fspath(palace_path) in out
assert os.fspath(stale_path) in out
class _FakeGetResult:
def __init__(self, ids):
self.ids = ids
class _WritableFakeCollection:
def __init__(self):
self.ids = set()
self.deleted = []
def upsert(self, *, ids, documents, metadatas):
self.ids.update(ids)
def get(self, *, ids, include=None):
return _FakeGetResult([drawer_id for drawer_id in ids if drawer_id in self.ids])
def delete(self, *, ids=None, where=None):
for drawer_id in ids or []:
self.ids.discard(drawer_id)
self.deleted.append(drawer_id)
class _SilentWriteDropCollection(_WritableFakeCollection):
def upsert(self, *, ids, documents, metadatas):
return None
class _SilentDeleteDropCollection(_WritableFakeCollection):
def delete(self, *, ids=None, where=None):
self.deleted.extend(ids or [])
def test_collection_write_roundtrip_works_when_probe_persists_and_deletes():
col = _WritableFakeCollection()
assert collection_write_roundtrip_works(col) is True
assert col.ids == set()
assert len(col.deleted) == 1
def test_collection_write_roundtrip_fails_when_upsert_silently_drops():
col = _SilentWriteDropCollection()
assert collection_write_roundtrip_works(col) is False
assert col.ids == set()
def test_collection_write_roundtrip_fails_when_delete_silently_drops():
col = _SilentDeleteDropCollection()
assert collection_write_roundtrip_works(col) is False
assert len(col.ids) == 1
def test_migrate_dry_run_rebuilds_when_collection_is_readable_but_not_writable(tmp_path, capsys):
palace_dir = tmp_path / "palace"
palace_dir.mkdir()
(palace_dir / "chroma.sqlite3").write_text("db")
fake_col = MagicMock()
fake_col.count.return_value = 102
drawers = [
{
"id": "id1",
"document": "hello",
"metadata": {"wing": "test-wing", "room": "general"},
}
]
with (
patch("mempalace.migrate.detect_chromadb_version", return_value="1.x"),
patch("mempalace.backends.chroma.ChromaBackend") as mock_backend,
patch(
"mempalace.migrate.collection_write_roundtrip_works", return_value=False
) as mock_probe,
patch(
"mempalace.migrate.extract_drawers_from_sqlite", return_value=drawers
) as mock_extract,
):
mock_backend.backend_version.return_value = "1.5.8"
mock_backend.return_value.get_collection.return_value = fake_col
result = migrate(str(palace_dir), dry_run=True)
out = capsys.readouterr().out
assert result is True
mock_probe.assert_called_once_with(fake_col)
mock_extract.assert_called_once_with(
os.path.join(os.path.abspath(os.fspath(palace_dir)), "chroma.sqlite3")
)
assert "readable by chromadb 1.5.8, but write/delete verification failed" in out
assert "Rebuilding from SQLite" in out
assert "Extracted 1 drawers from SQLite" in out
assert "DRY RUN" in out