diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index d406073..f24fa69 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -16,7 +16,7 @@ from datetime import datetime from collections import defaultdict from .normalize import normalize -from .palace import SKIP_DIRS, get_collection, file_already_mined +from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock # File types that might contain conversations @@ -375,34 +375,40 @@ def mine_convos( if extract_mode != "general": room_counts[room] += 1 - # File each chunk + # File each chunk — lock to prevent concurrent agents duplicating drawers_added = 0 - for chunk in chunks: - chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room - if extract_mode == "general": - room_counts[chunk_room] += 1 - drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" - try: - collection.upsert( - documents=[chunk["content"]], - ids=[drawer_id], - metadatas=[ - { - "wing": wing, - "room": chunk_room, - "source_file": source_file, - "chunk_index": chunk["chunk_index"], - "added_by": agent, - "filed_at": datetime.now().isoformat(), - "ingest_mode": "convos", - "extract_mode": extract_mode, - } - ], - ) - drawers_added += 1 - except Exception as e: - if "already exists" not in str(e).lower(): - raise + with mine_lock(source_file): + # Re-check after lock — another agent may have just finished this file + if file_already_mined(collection, source_file): + files_skipped += 1 + continue + + for chunk in chunks: + chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room + if extract_mode == "general": + room_counts[chunk_room] += 1 + drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" + try: + collection.upsert( + documents=[chunk["content"]], + ids=[drawer_id], + metadatas=[ + { + "wing": wing, + "room": chunk_room, + "source_file": source_file, + "chunk_index": chunk["chunk_index"], + "added_by": agent, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "convos", + "extract_mode": extract_mode, + } + ], + ) + drawers_added += 1 + except Exception as e: + if "already exists" not in str(e).lower(): + raise total_drawers += drawers_added print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}") diff --git a/mempalace/miner.py b/mempalace/miner.py index 22c8af3..801ed7e 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -15,7 +15,7 @@ from pathlib import Path from datetime import datetime from collections import defaultdict -from .palace import SKIP_DIRS, get_collection, file_already_mined +from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock READABLE_EXTENSIONS = { ".txt", @@ -434,29 +434,37 @@ def process_file( print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)") return len(chunks), room - # Purge stale drawers for this file before re-inserting the fresh chunks. - # Converts modified-file re-mines from upsert-over-existing-IDs (which hits - # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM - # with chromadb 0.6.3) into a clean delete+insert, bypassing the update - # path entirely. - try: - collection.delete(where={"source_file": source_file}) - except Exception: - pass + # Lock this file so concurrent agents don't interleave delete+insert. + # Without the lock, two agents can both pass file_already_mined(), + # both delete, and both insert — creating duplicates or losing data. + with mine_lock(source_file): + # Re-check after acquiring lock — another agent may have just finished + if file_already_mined(collection, source_file, check_mtime=True): + return 0, room - drawers_added = 0 - for chunk in chunks: - added = add_drawer( - collection=collection, - wing=wing, - room=room, - content=chunk["content"], - source_file=source_file, - chunk_index=chunk["chunk_index"], - agent=agent, - ) - if added: - drawers_added += 1 + # Purge stale drawers for this file before re-inserting the fresh chunks. + # Converts modified-file re-mines from upsert-over-existing-IDs (which hits + # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM + # with chromadb 0.6.3) into a clean delete+insert, bypassing the update + # path entirely. + try: + collection.delete(where={"source_file": source_file}) + except Exception: + pass + + drawers_added = 0 + for chunk in chunks: + added = add_drawer( + collection=collection, + wing=wing, + room=room, + content=chunk["content"], + source_file=source_file, + chunk_index=chunk["chunk_index"], + agent=agent, + ) + if added: + drawers_added += 1 return drawers_added, room diff --git a/mempalace/palace.py b/mempalace/palace.py index 948fecc..ed5382a 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -4,6 +4,8 @@ palace.py — Shared palace operations. Consolidates collection access patterns used by both miners and the MCP server. """ +import contextlib +import hashlib import os from .backends.chroma import ChromaBackend @@ -50,6 +52,41 @@ def get_collection( ) +@contextlib.contextmanager +def mine_lock(source_file: str): + """Cross-platform file lock for mine operations. + + Prevents multiple agents from mining the same file simultaneously, + which causes duplicate drawers when the delete+insert cycle interleaves. + """ + lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") + os.makedirs(lock_dir, exist_ok=True) + lock_path = os.path.join( + lock_dir, hashlib.sha256(source_file.encode()).hexdigest()[:16] + ".lock" + ) + + lf = open(lock_path, "w") + try: + if os.name == "nt": + import msvcrt + msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + fcntl.flock(lf, fcntl.LOCK_EX) + yield + finally: + try: + if os.name == "nt": + import msvcrt + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + fcntl.flock(lf, fcntl.LOCK_UN) + except Exception: + pass + lf.close() + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace.