diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index 663f1a0..63b46f0 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -16,7 +16,13 @@ from datetime import datetime from collections import defaultdict from .normalize import normalize -from .palace import NORMALIZE_VERSION, SKIP_DIRS, file_already_mined, get_collection +from .palace import ( + NORMALIZE_VERSION, + SKIP_DIRS, + file_already_mined, + get_collection, + mine_lock, +) # File types that might contain conversations @@ -273,50 +279,60 @@ def scan_convos(convo_dir: str) -> list: # ============================================================================= -def _file_convo_chunks(collection, source_file, chunks, wing, room, agent, extract_mode): - """Purge stale drawers for ``source_file`` then upsert fresh chunks. +def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode): + """Lock the source file, purge stale drawers, and upsert fresh chunks. - Returns (drawers_added, room_counts_delta). + Combines the per-file serialization that prevents concurrent agents from + duplicating work (via mine_lock) with the normalize-version rebuild + contract (purge-before-insert so pre-v2 drawers don't survive). + + Returns (drawers_added, room_counts_delta, skipped). """ - # Purge stale drawers first. When the normalize schema bumps, - # file_already_mined() returns False for pre-v2 drawers and we land - # here — clean them out so the source doesn't end up with a mix of - # old-noise and new-clean drawers. - try: - collection.delete(where={"source_file": source_file}) - except Exception: - pass - room_counts_delta: dict = defaultdict(int) drawers_added = 0 - for chunk in chunks: - chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room - if extract_mode == "general": - room_counts_delta[chunk_room] += 1 - drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" + with mine_lock(source_file): + # Re-check after lock — another agent may have just finished this file + # at the current schema. A stale-version hit here returns False, so we + # still fall through to the purge+rebuild path below. + if file_already_mined(collection, source_file): + return 0, room_counts_delta, True + + # Purge stale drawers first. When the normalize schema bumps, + # file_already_mined() returned False for pre-v2 drawers — clean + # them out so the source doesn't end up with mixed old/new drawers. try: - collection.upsert( - documents=[chunk["content"]], - ids=[drawer_id], - metadatas=[ - { - "wing": wing, - "room": chunk_room, - "source_file": source_file, - "chunk_index": chunk["chunk_index"], - "added_by": agent, - "filed_at": datetime.now().isoformat(), - "ingest_mode": "convos", - "extract_mode": extract_mode, - "normalize_version": NORMALIZE_VERSION, - } - ], - ) - drawers_added += 1 - except Exception as e: - if "already exists" not in str(e).lower(): - raise - return drawers_added, room_counts_delta + collection.delete(where={"source_file": source_file}) + except Exception: + pass + + for chunk in chunks: + chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room + if extract_mode == "general": + room_counts_delta[chunk_room] += 1 + drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" + try: + collection.upsert( + documents=[chunk["content"]], + ids=[drawer_id], + metadatas=[ + { + "wing": wing, + "room": chunk_room, + "source_file": source_file, + "chunk_index": chunk["chunk_index"], + "added_by": agent, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "convos", + "extract_mode": extract_mode, + "normalize_version": NORMALIZE_VERSION, + } + ], + ) + drawers_added += 1 + except Exception as e: + if "already exists" not in str(e).lower(): + raise + return drawers_added, room_counts_delta, False def mine_convos( @@ -422,10 +438,14 @@ def mine_convos( if extract_mode != "general": room_counts[room] += 1 - # Purge stale drawers + file fresh chunks. - drawers_added, room_delta = _file_convo_chunks( + # Lock + purge stale + file fresh chunks. Lock serializes concurrent + # agents; purge removes pre-v2 drawers so the schema bump applies. + drawers_added, room_delta, skipped = _file_chunks_locked( collection, source_file, chunks, wing, room, agent, extract_mode ) + if skipped: + files_skipped += 1 + continue for r, n in room_delta.items(): room_counts[r] += n diff --git a/mempalace/miner.py b/mempalace/miner.py index 49e0d25..522b33a 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -15,7 +15,13 @@ from pathlib import Path from datetime import datetime from collections import defaultdict -from .palace import NORMALIZE_VERSION, SKIP_DIRS, file_already_mined, get_collection +from .palace import ( + NORMALIZE_VERSION, + SKIP_DIRS, + file_already_mined, + get_collection, + mine_lock, +) READABLE_EXTENSIONS = { ".txt", @@ -435,29 +441,37 @@ def process_file( print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)") return len(chunks), room - # Purge stale drawers for this file before re-inserting the fresh chunks. - # Converts modified-file re-mines from upsert-over-existing-IDs (which hits - # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM - # with chromadb 0.6.3) into a clean delete+insert, bypassing the update - # path entirely. - try: - collection.delete(where={"source_file": source_file}) - except Exception: - pass + # Lock this file so concurrent agents don't interleave delete+insert. + # Without the lock, two agents can both pass file_already_mined(), + # both delete, and both insert — creating duplicates or losing data. + with mine_lock(source_file): + # Re-check after acquiring lock — another agent may have just finished + if file_already_mined(collection, source_file, check_mtime=True): + return 0, room - drawers_added = 0 - for chunk in chunks: - added = add_drawer( - collection=collection, - wing=wing, - room=room, - content=chunk["content"], - source_file=source_file, - chunk_index=chunk["chunk_index"], - agent=agent, - ) - if added: - drawers_added += 1 + # Purge stale drawers for this file before re-inserting the fresh chunks. + # Converts modified-file re-mines from upsert-over-existing-IDs (which hits + # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM + # with chromadb 0.6.3) into a clean delete+insert, bypassing the update + # path entirely. + try: + collection.delete(where={"source_file": source_file}) + except Exception: + pass + + drawers_added = 0 + for chunk in chunks: + added = add_drawer( + collection=collection, + wing=wing, + room=room, + content=chunk["content"], + source_file=source_file, + chunk_index=chunk["chunk_index"], + agent=agent, + ) + if added: + drawers_added += 1 return drawers_added, room diff --git a/mempalace/palace.py b/mempalace/palace.py index 9cfb55e..bb7916e 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -4,6 +4,8 @@ palace.py — Shared palace operations. Consolidates collection access patterns used by both miners and the MCP server. """ +import contextlib +import hashlib import os from .backends.chroma import ChromaBackend @@ -60,6 +62,45 @@ def get_collection( ) +@contextlib.contextmanager +def mine_lock(source_file: str): + """Cross-platform file lock for mine operations. + + Prevents multiple agents from mining the same file simultaneously, + which causes duplicate drawers when the delete+insert cycle interleaves. + """ + lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") + os.makedirs(lock_dir, exist_ok=True) + lock_path = os.path.join( + lock_dir, hashlib.sha256(source_file.encode()).hexdigest()[:16] + ".lock" + ) + + lf = open(lock_path, "w") + try: + if os.name == "nt": + import msvcrt + + msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + + fcntl.flock(lf, fcntl.LOCK_EX) + yield + finally: + try: + if os.name == "nt": + import msvcrt + + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + + fcntl.flock(lf, fcntl.LOCK_UN) + except Exception: + pass + lf.close() + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/mempalace/version.py b/mempalace/version.py index 1eb21a2..45176bc 100644 --- a/mempalace/version.py +++ b/mempalace/version.py @@ -1,3 +1,3 @@ """Single source of truth for the MemPalace package version.""" -__version__ = "3.1.0" +__version__ = "3.2.0"