fix(repair): rebuild collections through temp staging
This commit is contained in:
+129
-30
@@ -37,10 +37,13 @@ import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from chromadb.errors import NotFoundError as ChromaNotFoundError
|
||||
|
||||
from .backends.chroma import ChromaBackend, hnsw_capacity_status
|
||||
|
||||
|
||||
COLLECTION_NAME = "mempalace_drawers"
|
||||
REPAIR_TEMP_COLLECTION = f"{COLLECTION_NAME}__repair_tmp"
|
||||
|
||||
|
||||
def _get_palace_path():
|
||||
@@ -83,6 +86,108 @@ def _paginate_ids(col, where=None):
|
||||
return ids
|
||||
|
||||
|
||||
def _extract_drawers(col, total: int, batch_size: int):
|
||||
all_ids = []
|
||||
all_docs = []
|
||||
all_metas = []
|
||||
offset = 0
|
||||
while offset < total:
|
||||
batch = col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"])
|
||||
if not batch["ids"]:
|
||||
break
|
||||
all_ids.extend(batch["ids"])
|
||||
all_docs.extend(batch["documents"])
|
||||
all_metas.extend(batch["metadatas"])
|
||||
offset += len(batch["ids"])
|
||||
return all_ids, all_docs, all_metas
|
||||
|
||||
|
||||
def _verify_collection_count(col, expected: int, label: str) -> None:
|
||||
actual = col.count()
|
||||
if actual != expected:
|
||||
raise RuntimeError(f"{label} count mismatch: expected {expected}, got {actual}")
|
||||
|
||||
|
||||
def _is_missing_collection_value_error(exc: ValueError) -> bool:
|
||||
message = str(exc).lower()
|
||||
return "does not exist" in message or "not found" in message
|
||||
|
||||
|
||||
def _delete_collection_if_exists(backend, palace_path: str, collection_name: str) -> None:
|
||||
try:
|
||||
backend.delete_collection(palace_path, collection_name)
|
||||
except ValueError as exc:
|
||||
if _is_missing_collection_value_error(exc):
|
||||
return
|
||||
raise
|
||||
except (FileNotFoundError, ChromaNotFoundError):
|
||||
return
|
||||
|
||||
|
||||
class RebuildCollectionError(RuntimeError):
|
||||
"""Raised when temp rebuild fails, carrying whether the live swap happened."""
|
||||
|
||||
def __init__(self, message: str, *, live_replaced: bool):
|
||||
super().__init__(message)
|
||||
self.live_replaced = live_replaced
|
||||
|
||||
|
||||
def _rebuild_collection_via_temp(
|
||||
backend,
|
||||
palace_path: str,
|
||||
all_ids,
|
||||
all_docs,
|
||||
all_metas,
|
||||
batch_size: int,
|
||||
progress=print,
|
||||
) -> int:
|
||||
expected = len(all_ids)
|
||||
temp_name = REPAIR_TEMP_COLLECTION
|
||||
live_replaced = False
|
||||
|
||||
try:
|
||||
_delete_collection_if_exists(backend, palace_path, temp_name)
|
||||
|
||||
progress(f" Building temporary collection: {temp_name}")
|
||||
temp_col = backend.create_collection(palace_path, temp_name)
|
||||
staged = 0
|
||||
for i in range(0, expected, batch_size):
|
||||
batch_ids = all_ids[i : i + batch_size]
|
||||
batch_docs = all_docs[i : i + batch_size]
|
||||
batch_metas = all_metas[i : i + batch_size]
|
||||
temp_col.upsert(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
|
||||
staged += len(batch_ids)
|
||||
progress(f" Staged {staged}/{expected} drawers...")
|
||||
_verify_collection_count(temp_col, expected, "temporary rebuild")
|
||||
|
||||
progress(" Rebuilding live collection...")
|
||||
backend.delete_collection(palace_path, COLLECTION_NAME)
|
||||
live_replaced = True
|
||||
new_col = backend.create_collection(palace_path, COLLECTION_NAME)
|
||||
|
||||
rebuilt = 0
|
||||
for i in range(0, expected, batch_size):
|
||||
batch_ids = all_ids[i : i + batch_size]
|
||||
batch_docs = all_docs[i : i + batch_size]
|
||||
batch_metas = all_metas[i : i + batch_size]
|
||||
new_col.upsert(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
|
||||
rebuilt += len(batch_ids)
|
||||
progress(f" Re-filed {rebuilt}/{expected} drawers...")
|
||||
_verify_collection_count(new_col, expected, "rebuilt live collection")
|
||||
|
||||
try:
|
||||
_delete_collection_if_exists(backend, palace_path, temp_name)
|
||||
except Exception:
|
||||
pass
|
||||
return rebuilt
|
||||
except Exception as exc:
|
||||
try:
|
||||
_delete_collection_if_exists(backend, palace_path, temp_name)
|
||||
except Exception:
|
||||
pass
|
||||
raise RebuildCollectionError(str(exc), live_replaced=live_replaced) from exc
|
||||
|
||||
|
||||
def scan_palace(palace_path=None, only_wing=None):
|
||||
"""Scan the palace for corrupt/unfetchable IDs.
|
||||
|
||||
@@ -373,18 +478,7 @@ def rebuild_index(palace_path=None, confirm_truncation_ok: bool = False):
|
||||
# Extract all drawers in batches
|
||||
print("\n Extracting drawers...")
|
||||
batch_size = 5000
|
||||
all_ids = []
|
||||
all_docs = []
|
||||
all_metas = []
|
||||
offset = 0
|
||||
while offset < total:
|
||||
batch = col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"])
|
||||
if not batch["ids"]:
|
||||
break
|
||||
all_ids.extend(batch["ids"])
|
||||
all_docs.extend(batch["documents"])
|
||||
all_metas.extend(batch["metadatas"])
|
||||
offset += len(batch["ids"])
|
||||
all_ids, all_docs, all_metas = _extract_drawers(col, total, batch_size)
|
||||
print(f" Extracted {len(all_ids)} drawers")
|
||||
|
||||
# ── #1208 guard ──────────────────────────────────────────────────
|
||||
@@ -407,28 +501,33 @@ def rebuild_index(palace_path=None, confirm_truncation_ok: bool = False):
|
||||
|
||||
# Rebuild with correct HNSW settings
|
||||
print(" Rebuilding collection with hnsw:space=cosine...")
|
||||
backend.delete_collection(palace_path, COLLECTION_NAME)
|
||||
new_col = backend.create_collection(palace_path, COLLECTION_NAME)
|
||||
|
||||
filed = 0
|
||||
try:
|
||||
for i in range(0, len(all_ids), batch_size):
|
||||
batch_ids = all_ids[i : i + batch_size]
|
||||
batch_docs = all_docs[i : i + batch_size]
|
||||
batch_metas = all_metas[i : i + batch_size]
|
||||
new_col.upsert(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
|
||||
filed += len(batch_ids)
|
||||
print(f" Re-filed {filed}/{len(all_ids)} drawers...")
|
||||
except Exception as e:
|
||||
filed = _rebuild_collection_via_temp(
|
||||
backend,
|
||||
palace_path,
|
||||
all_ids,
|
||||
all_docs,
|
||||
all_metas,
|
||||
batch_size,
|
||||
progress=print,
|
||||
)
|
||||
except RebuildCollectionError as e:
|
||||
print(f"\n ERROR during rebuild: {e}")
|
||||
print(f" Only {filed}/{len(all_ids)} drawers were re-filed.")
|
||||
if os.path.exists(backup_path):
|
||||
print(" Rebuild aborted before completion.")
|
||||
if e.live_replaced and os.path.exists(backup_path):
|
||||
print(f" Restoring from backup: {backup_path}")
|
||||
backend.delete_collection(palace_path, COLLECTION_NAME)
|
||||
shutil.copy2(backup_path, sqlite_path)
|
||||
print(" Backup restored. Palace is back to pre-repair state.")
|
||||
else:
|
||||
try:
|
||||
_close_chroma_handles(palace_path)
|
||||
_delete_collection_if_exists(backend, palace_path, COLLECTION_NAME)
|
||||
shutil.copy2(backup_path, sqlite_path)
|
||||
print(" Backup restored. Palace is back to pre-repair state.")
|
||||
except Exception as restore_error:
|
||||
print(f" Backup restore failed: {restore_error}")
|
||||
print(f" Manual restore required from: {backup_path}")
|
||||
elif e.live_replaced:
|
||||
print(" No backup available. Re-mine from source files to recover.")
|
||||
else:
|
||||
print(" Live collection was not replaced; leaving the original palace untouched.")
|
||||
raise
|
||||
|
||||
print(f"\n Repair complete. {filed} drawers rebuilt.")
|
||||
|
||||
Reference in New Issue
Block a user