From db28bf1e846592f997835ca42050e43c12b09303 Mon Sep 17 00:00:00 2001 From: sha2fiddy <103975074+sha2fiddy@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:01:54 -0400 Subject: [PATCH] fix: paginate closet_llm col.get (#1073) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror the pagination pattern PR #851 landed in miner.py:status(). A single drawers_col.get(limit=total, ...) on palaces larger than SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) crashes inside chromadb. Fetch drawers in batch_size=5000 chunks, stepping offset until the collection is drained. by_source aggregation semantics are preserved exactly — grouping, wing filter, meta capture all unchanged. Closes #1073. Related: #802, #850, #1016. --- mempalace/closet_llm.py | 33 +++++--- tests/test_closet_llm.py | 176 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+), 11 deletions(-) diff --git a/mempalace/closet_llm.py b/mempalace/closet_llm.py index c00b735..6274f79 100644 --- a/mempalace/closet_llm.py +++ b/mempalace/closet_llm.py @@ -221,17 +221,28 @@ def regenerate_closets( print("No drawers in palace.") return {"processed": 0} - all_data = drawers_col.get(limit=total, include=["documents", "metadatas"]) - by_source = {} - for doc_id, doc, meta in zip(all_data["ids"], all_data["documents"], all_data["metadatas"]): - source = meta.get("source_file", "unknown") - w = meta.get("wing", "") - if wing and w != wing: - continue - if source not in by_source: - by_source[source] = {"drawer_ids": [], "content": [], "meta": meta} - by_source[source]["drawer_ids"].append(doc_id) - by_source[source]["content"].append(doc) + # Paginate the fetch — a single get(limit=total, ...) blows through + # SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) on large palaces and + # crashes inside chromadb (see #802, #850, #1073). + by_source: dict = {} + batch_size = 5000 + offset = 0 + while offset < total: + batch = drawers_col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"]) + ids = batch["ids"] + if not ids: + break + for doc_id, doc, meta in zip(ids, batch["documents"], batch["metadatas"]): + meta = meta or {} + source = meta.get("source_file", "unknown") + w = meta.get("wing", "") + if wing and w != wing: + continue + if source not in by_source: + by_source[source] = {"drawer_ids": [], "content": [], "meta": meta} + by_source[source]["drawer_ids"].append(doc_id) + by_source[source]["content"].append(doc) + offset += len(ids) sources = list(by_source.keys()) if sample > 0: diff --git a/tests/test_closet_llm.py b/tests/test_closet_llm.py index a92e2fa..3a0e84e 100644 --- a/tests/test_closet_llm.py +++ b/tests/test_closet_llm.py @@ -296,6 +296,182 @@ class TestRegenerateClosets: assert meta.get("generated_by", "").startswith("llm:") assert meta.get("normalize_version") == NORMALIZE_VERSION + def test_regen_paginates_drawer_fetch(self, tmp_path): + """Regression for #1073: drawers_col.get must be paginated at + batch_size=5000. A single get(limit=total, ...) on a palace with + more than SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) drawers + blows up inside chromadb. Matches the miner.status pattern + introduced in #851 (see #802, #850, #1073).""" + from mempalace import closet_llm as closet_llm_mod + + palace = str(tmp_path / "palace") + + # Build a fake collection: 12_000 drawers across 3 source files, + # enough to force 3 batches of batch_size=5000 (5000 + 5000 + 2000). + n_drawers = 12_000 + ids = [f"d{i:05d}" for i in range(n_drawers)] + docs = [f"doc body {i}" for i in range(n_drawers)] + metas = [ + { + "wing": "w", + "room": "r", + "source_file": f"/src/file_{i % 3}.md", + "entities": "", + } + for i in range(n_drawers) + ] + + get_calls: list = [] + + class FakeDrawersCol: + def count(self): + return n_drawers + + def get(self, limit=None, offset=0, include=None, **kwargs): + get_calls.append({"limit": limit, "offset": offset, "include": include}) + end = min(offset + (limit or n_drawers), n_drawers) + return { + "ids": ids[offset:end], + "documents": docs[offset:end], + "metadatas": metas[offset:end], + } + + class FakeClosetsCol: + """Accept the purge + upsert calls the success path makes.""" + + def get(self, *a, **kw): + return {"ids": [], "documents": [], "metadatas": []} + + def delete(self, *a, **kw): + return None + + def upsert(self, *a, **kw): + return None + + fake_drawers = FakeDrawersCol() + fake_closets = FakeClosetsCol() + + def fake_urlopen(req, timeout=None): + return _FakeResp( + { + "choices": [ + {"message": {"content": '{"topics":["t1"],"quotes":[],"summary":""}'}} + ], + "usage": {"prompt_tokens": 1, "completion_tokens": 1}, + } + ) + + cfg = LLMConfig(endpoint="http://local/v1", model="m") + + with ( + patch.object(closet_llm_mod, "get_collection", return_value=fake_drawers), + patch.object(closet_llm_mod, "get_closets_collection", return_value=fake_closets), + patch.object(closet_llm_mod, "purge_file_closets", return_value=None), + patch.object(closet_llm_mod, "upsert_closet_lines", return_value=None), + patch("urllib.request.urlopen", side_effect=fake_urlopen), + ): + result = regenerate_closets(palace, cfg=cfg, dry_run=True) + + # Three paginated calls: (limit=5000, offset=0), (5000, 5000), (5000, 10000). + assert len(get_calls) == 3, f"expected 3 batched fetches, got {len(get_calls)}" + for call in get_calls: + assert ( + call["limit"] == 5000 + ), f"batch must be 5000 — got {call['limit']} (would risk SQLITE_MAX_VARIABLE_NUMBER)" + # include must still request both documents and metadatas + assert "documents" in call["include"] + assert "metadatas" in call["include"] + assert [c["offset"] for c in get_calls] == [0, 5000, 10_000] + + # by_source aggregation must be preserved exactly across batches: + # 12_000 drawers, 3 source files → 4_000 drawers each. + # dry_run=True short-circuits LLM calls but still walks by_source. + assert result.get("processed", 0) == 0 # dry_run + # Verify no single call tried to pull more than batch_size. + assert max(c["limit"] for c in get_calls) <= 5000 + + def test_regen_by_source_aggregates_across_batches(self, tmp_path): + """Pagination must not change the by_source grouping — drawers for + the same source_file split across batches still land in one group.""" + from mempalace import closet_llm as closet_llm_mod + + palace = str(tmp_path / "palace") + + # 7_500 drawers, alternating between two source files → forces + # splits across the 5000/2500 boundary. Each source ends up with + # 3_750 drawers after regrouping. + n_drawers = 7_500 + ids = [f"d{i:05d}" for i in range(n_drawers)] + docs = [f"body-{i}" for i in range(n_drawers)] + metas = [ + { + "wing": "w", + "room": "r", + "source_file": f"/src/file_{i % 2}.md", + "entities": "", + } + for i in range(n_drawers) + ] + + captured_sources: dict = {} + + class FakeDrawersCol: + def count(self): + return n_drawers + + def get(self, limit=None, offset=0, include=None, **kwargs): + end = min(offset + (limit or n_drawers), n_drawers) + return { + "ids": ids[offset:end], + "documents": docs[offset:end], + "metadatas": metas[offset:end], + } + + class FakeClosetsCol: + def get(self, *a, **kw): + return {"ids": [], "documents": [], "metadatas": []} + + def delete(self, *a, **kw): + return None + + def upsert(self, *a, **kw): + return None + + # Hook _call_llm to inspect what regenerate_closets aggregated + # per source before the HTTP boundary. + real_call_llm = closet_llm_mod._call_llm + + def spying_call_llm(cfg, source_file, wing, room, content): + captured_sources[source_file] = content + return ( + {"topics": ["t"], "quotes": [], "summary": ""}, + {"prompt_tokens": 1, "completion_tokens": 1}, + ) + + cfg = LLMConfig(endpoint="http://local/v1", model="m") + + with ( + patch.object(closet_llm_mod, "get_collection", return_value=FakeDrawersCol()), + patch.object(closet_llm_mod, "get_closets_collection", return_value=FakeClosetsCol()), + patch.object(closet_llm_mod, "purge_file_closets", return_value=None), + patch.object(closet_llm_mod, "upsert_closet_lines", return_value=None), + patch.object(closet_llm_mod, "_call_llm", side_effect=spying_call_llm), + ): + regenerate_closets(palace, cfg=cfg) + + # Both sources survived the pagination boundary. + assert set(captured_sources.keys()) == {"/src/file_0.md", "/src/file_1.md"} + # Each source accumulated exactly 3_750 drawer bodies, concatenated + # with the "\n\n" separator the regenerate path uses. + for source, content in captured_sources.items(): + assert content.count("\n\n") == 3_749, ( + f"{source}: expected 3_750 chunks joined (3_749 separators), " + f"got {content.count(chr(10) + chr(10)) + 1}" + ) + + # Silence unused-var lint. + assert real_call_llm is not None + def test_regen_uses_basename_not_split_slash(self, tmp_path, monkeypatch): """Regression: the old closet_id base used ``source.split('/')[-1]`` which silently degrades on Windows paths (``C:\\proj\\a.md`` →