"""Tests for mempalace.dedup — near-duplicate drawer detection and removal.""" from unittest.mock import MagicMock, patch from mempalace import dedup # ── get_source_groups ───────────────────────────────────────────────── def test_get_source_groups_basic(): col = MagicMock() col.count.return_value = 5 col.get.side_effect = [ { "ids": ["d1", "d2", "d3", "d4", "d5"], "metadatas": [ {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, ], }, {"ids": []}, ] groups = dedup.get_source_groups(col, min_count=5) assert "a.txt" in groups assert len(groups["a.txt"]) == 5 def test_get_source_groups_below_min(): col = MagicMock() col.count.return_value = 2 col.get.side_effect = [ { "ids": ["d1", "d2"], "metadatas": [ {"source_file": "a.txt"}, {"source_file": "a.txt"}, ], }, {"ids": []}, ] groups = dedup.get_source_groups(col, min_count=5) assert len(groups) == 0 def test_get_source_groups_source_filter(): col = MagicMock() col.count.return_value = 6 col.get.side_effect = [ { "ids": ["d1", "d2", "d3", "d4", "d5", "d6"], "metadatas": [ {"source_file": "project_a.txt"}, {"source_file": "project_a.txt"}, {"source_file": "project_a.txt"}, {"source_file": "project_a.txt"}, {"source_file": "project_a.txt"}, {"source_file": "other.txt"}, ], }, {"ids": []}, ] groups = dedup.get_source_groups(col, min_count=5, source_pattern="project_a") assert "project_a.txt" in groups assert "other.txt" not in groups def test_get_source_groups_wing_filter(): col = MagicMock() col.count.return_value = 5 col.get.side_effect = [ { "ids": ["d1", "d2", "d3", "d4", "d5"], "metadatas": [ {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, ], }, {"ids": []}, ] dedup.get_source_groups(col, min_count=5, wing="my_wing") # Verify where filter was passed first_call = col.get.call_args_list[0] assert first_call.kwargs.get("where") == {"wing": "my_wing"} def test_get_source_groups_missing_source_file(): col = MagicMock() col.count.return_value = 5 col.get.side_effect = [ { "ids": ["d1", "d2", "d3", "d4", "d5"], "metadatas": [{}, {}, {}, {}, {}], }, {"ids": []}, ] groups = dedup.get_source_groups(col, min_count=5) assert "unknown" in groups # ── dedup_source_group ──────────────────────────────────────────────── def test_dedup_source_group_all_unique(): col = MagicMock() col.get.return_value = { "ids": ["d1", "d2"], "documents": ["long document one content here", "different document two here"], "metadatas": [{"wing": "a"}, {"wing": "a"}], } col.query.return_value = { "ids": [["d1"]], "distances": [[0.8]], # far apart = unique } kept, deleted = dedup.dedup_source_group(col, ["d1", "d2"], threshold=0.15, dry_run=True) assert len(kept) == 2 assert len(deleted) == 0 def test_dedup_source_group_with_duplicate(): col = MagicMock() col.get.return_value = { "ids": ["d1", "d2"], "documents": [ "long document content that is fairly long", "long document content that is fairly long", ], "metadatas": [{"wing": "a"}, {"wing": "a"}], } col.query.return_value = { "ids": [["d1"]], "distances": [[0.05]], # very close = duplicate } kept, deleted = dedup.dedup_source_group(col, ["d1", "d2"], threshold=0.15, dry_run=True) assert len(kept) == 1 assert len(deleted) == 1 def test_dedup_source_group_short_docs_deleted(): col = MagicMock() col.get.return_value = { "ids": ["d1", "d2"], "documents": ["long enough document to keep in the palace", "tiny"], "metadatas": [{"wing": "a"}, {"wing": "a"}], } kept, deleted = dedup.dedup_source_group(col, ["d1", "d2"], threshold=0.15, dry_run=True) assert "d2" in deleted # too short def test_dedup_source_group_empty_doc_deleted(): col = MagicMock() col.get.return_value = { "ids": ["d1", "d2"], "documents": ["real document content here that is long enough", None], "metadatas": [{"wing": "a"}, {"wing": "a"}], } kept, deleted = dedup.dedup_source_group(col, ["d1", "d2"], threshold=0.15, dry_run=True) assert "d2" in deleted def test_dedup_source_group_live_deletes(): col = MagicMock() col.get.return_value = { "ids": ["d1", "d2"], "documents": ["long document content here enough", "long document content here enough"], "metadatas": [{"wing": "a"}, {"wing": "a"}], } col.query.return_value = { "ids": [["d1"]], "distances": [[0.05]], } kept, deleted = dedup.dedup_source_group(col, ["d1", "d2"], threshold=0.15, dry_run=False) col.delete.assert_called_once() def test_dedup_source_group_query_failure_keeps(): col = MagicMock() col.get.return_value = { "ids": ["d1", "d2"], "documents": [ "long document one content here enough", "long document two content here enough", ], "metadatas": [{"wing": "a"}, {"wing": "a"}], } col.query.side_effect = Exception("query failed") kept, deleted = dedup.dedup_source_group(col, ["d1", "d2"], threshold=0.15, dry_run=True) assert len(kept) == 2 # both kept on error # ── show_stats ──────────────────────────────────────────────────────── @patch("mempalace.dedup.chromadb") def test_show_stats(mock_chromadb, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 5 mock_col.get.side_effect = [ { "ids": ["d1", "d2", "d3", "d4", "d5"], "metadatas": [ {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, {"source_file": "a.txt"}, ], }, {"ids": []}, ] mock_client = MagicMock() mock_client.get_collection.return_value = mock_col mock_chromadb.PersistentClient.return_value = mock_client dedup.show_stats(palace_path=str(tmp_path)) # should not raise # ── dedup_palace ────────────────────────────────────────────────────── @patch("mempalace.dedup.dedup_source_group") @patch("mempalace.dedup.get_source_groups") @patch("mempalace.dedup.chromadb") def test_dedup_palace_dry_run(mock_chromadb, mock_groups, mock_dedup_group, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 10 mock_client = MagicMock() mock_client.get_collection.return_value = mock_col mock_chromadb.PersistentClient.return_value = mock_client mock_groups.return_value = {"a.txt": ["d1", "d2", "d3", "d4", "d5"]} mock_dedup_group.return_value = (["d1", "d2", "d3"], ["d4", "d5"]) dedup.dedup_palace(palace_path=str(tmp_path), dry_run=True) mock_dedup_group.assert_called_once() @patch("mempalace.dedup.dedup_source_group") @patch("mempalace.dedup.get_source_groups") @patch("mempalace.dedup.chromadb") def test_dedup_palace_with_wing(mock_chromadb, mock_groups, mock_dedup_group, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 10 mock_client = MagicMock() mock_client.get_collection.return_value = mock_col mock_chromadb.PersistentClient.return_value = mock_client mock_groups.return_value = {} dedup.dedup_palace(palace_path=str(tmp_path), wing="test_wing", dry_run=True) mock_groups.assert_called_once_with(mock_col, 5, None, wing="test_wing") @patch("mempalace.dedup.dedup_source_group") @patch("mempalace.dedup.get_source_groups") @patch("mempalace.dedup.chromadb") def test_dedup_palace_no_groups(mock_chromadb, mock_groups, mock_dedup_group, tmp_path): mock_col = MagicMock() mock_col.count.return_value = 3 mock_client = MagicMock() mock_client.get_collection.return_value = mock_col mock_chromadb.PersistentClient.return_value = mock_client mock_groups.return_value = {} dedup.dedup_palace(palace_path=str(tmp_path), dry_run=True) mock_dedup_group.assert_not_called()