From 30a431924bfcfa17ffdda209eba28192a78db9ca Mon Sep 17 00:00:00 2001 From: MSL <232237854+milla-jovovich@users.noreply.github.com> Date: Mon, 13 Apr 2026 01:16:51 -0700 Subject: [PATCH 1/4] fix: add file-level locking to prevent multi-agent duplicate drawers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: when multiple agents mine simultaneously, both pass file_already_mined() check, both delete+insert the same file's drawers, creating duplicates or losing data. Fix: mine_lock() in palace.py — cross-platform file lock (fcntl on Unix, msvcrt on Windows). Both miner.py and convo_miner.py now lock per-file during the delete+insert cycle and re-check after acquiring the lock. Tested: - Lock acquires and releases correctly - Second agent blocks until first releases (0.25s wait) - 33/33 existing tests pass - Cross-platform: fcntl (macOS/Linux), msvcrt (Windows) Based on v3.2.0 tag. Co-Authored-By: Claude Opus 4.6 (1M context) --- mempalace/convo_miner.py | 62 ++++++++++++++++++++++------------------ mempalace/miner.py | 54 +++++++++++++++++++--------------- mempalace/palace.py | 37 ++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 51 deletions(-) diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index d406073..f24fa69 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -16,7 +16,7 @@ from datetime import datetime from collections import defaultdict from .normalize import normalize -from .palace import SKIP_DIRS, get_collection, file_already_mined +from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock # File types that might contain conversations @@ -375,34 +375,40 @@ def mine_convos( if extract_mode != "general": room_counts[room] += 1 - # File each chunk + # File each chunk — lock to prevent concurrent agents duplicating drawers_added = 0 - for chunk in chunks: - chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room - if extract_mode == "general": - room_counts[chunk_room] += 1 - drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" - try: - collection.upsert( - documents=[chunk["content"]], - ids=[drawer_id], - metadatas=[ - { - "wing": wing, - "room": chunk_room, - "source_file": source_file, - "chunk_index": chunk["chunk_index"], - "added_by": agent, - "filed_at": datetime.now().isoformat(), - "ingest_mode": "convos", - "extract_mode": extract_mode, - } - ], - ) - drawers_added += 1 - except Exception as e: - if "already exists" not in str(e).lower(): - raise + with mine_lock(source_file): + # Re-check after lock — another agent may have just finished this file + if file_already_mined(collection, source_file): + files_skipped += 1 + continue + + for chunk in chunks: + chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room + if extract_mode == "general": + room_counts[chunk_room] += 1 + drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" + try: + collection.upsert( + documents=[chunk["content"]], + ids=[drawer_id], + metadatas=[ + { + "wing": wing, + "room": chunk_room, + "source_file": source_file, + "chunk_index": chunk["chunk_index"], + "added_by": agent, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "convos", + "extract_mode": extract_mode, + } + ], + ) + drawers_added += 1 + except Exception as e: + if "already exists" not in str(e).lower(): + raise total_drawers += drawers_added print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}") diff --git a/mempalace/miner.py b/mempalace/miner.py index 22c8af3..801ed7e 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -15,7 +15,7 @@ from pathlib import Path from datetime import datetime from collections import defaultdict -from .palace import SKIP_DIRS, get_collection, file_already_mined +from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock READABLE_EXTENSIONS = { ".txt", @@ -434,29 +434,37 @@ def process_file( print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)") return len(chunks), room - # Purge stale drawers for this file before re-inserting the fresh chunks. - # Converts modified-file re-mines from upsert-over-existing-IDs (which hits - # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM - # with chromadb 0.6.3) into a clean delete+insert, bypassing the update - # path entirely. - try: - collection.delete(where={"source_file": source_file}) - except Exception: - pass + # Lock this file so concurrent agents don't interleave delete+insert. + # Without the lock, two agents can both pass file_already_mined(), + # both delete, and both insert — creating duplicates or losing data. + with mine_lock(source_file): + # Re-check after acquiring lock — another agent may have just finished + if file_already_mined(collection, source_file, check_mtime=True): + return 0, room - drawers_added = 0 - for chunk in chunks: - added = add_drawer( - collection=collection, - wing=wing, - room=room, - content=chunk["content"], - source_file=source_file, - chunk_index=chunk["chunk_index"], - agent=agent, - ) - if added: - drawers_added += 1 + # Purge stale drawers for this file before re-inserting the fresh chunks. + # Converts modified-file re-mines from upsert-over-existing-IDs (which hits + # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM + # with chromadb 0.6.3) into a clean delete+insert, bypassing the update + # path entirely. + try: + collection.delete(where={"source_file": source_file}) + except Exception: + pass + + drawers_added = 0 + for chunk in chunks: + added = add_drawer( + collection=collection, + wing=wing, + room=room, + content=chunk["content"], + source_file=source_file, + chunk_index=chunk["chunk_index"], + agent=agent, + ) + if added: + drawers_added += 1 return drawers_added, room diff --git a/mempalace/palace.py b/mempalace/palace.py index 948fecc..ed5382a 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -4,6 +4,8 @@ palace.py — Shared palace operations. Consolidates collection access patterns used by both miners and the MCP server. """ +import contextlib +import hashlib import os from .backends.chroma import ChromaBackend @@ -50,6 +52,41 @@ def get_collection( ) +@contextlib.contextmanager +def mine_lock(source_file: str): + """Cross-platform file lock for mine operations. + + Prevents multiple agents from mining the same file simultaneously, + which causes duplicate drawers when the delete+insert cycle interleaves. + """ + lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") + os.makedirs(lock_dir, exist_ok=True) + lock_path = os.path.join( + lock_dir, hashlib.sha256(source_file.encode()).hexdigest()[:16] + ".lock" + ) + + lf = open(lock_path, "w") + try: + if os.name == "nt": + import msvcrt + msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + fcntl.flock(lf, fcntl.LOCK_EX) + yield + finally: + try: + if os.name == "nt": + import msvcrt + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + fcntl.flock(lf, fcntl.LOCK_UN) + except Exception: + pass + lf.close() + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. From 69d6e2f7f3a6703396b10e39a790b8aa5e193a0c Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:46:27 -0300 Subject: [PATCH 2/4] fix: sync version.py to 3.2.0 Commit 6614b9b bumped pyproject.toml to 3.2.0 but missed mempalace/version.py, breaking test_version_consistency on every PR's CI. This syncs them. --- mempalace/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mempalace/version.py b/mempalace/version.py index 1eb21a2..45176bc 100644 --- a/mempalace/version.py +++ b/mempalace/version.py @@ -1,3 +1,3 @@ """Single source of truth for the MemPalace package version.""" -__version__ = "3.1.0" +__version__ = "3.2.0" From 09f218cbb2912df53c6ec563c89f08251235a92f Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:48:54 -0300 Subject: [PATCH 3/4] refactor: extract locked filing block to keep mine_convos under C901 Adding the per-file lock + double-checked file_already_mined() in the previous commit pushed mine_convos cyclomatic complexity from 25 to 26, just over ruff's max-complexity threshold. Hoist the locked critical section into _file_chunks_locked() so the outer loop stays within budget. No behavior change. --- mempalace/convo_miner.py | 82 ++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index f24fa69..6a021ec 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -272,6 +272,47 @@ def scan_convos(convo_dir: str) -> list: # ============================================================================= +def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode): + """Acquire the per-file lock, double-check mined status, and upsert chunks. + + Returns (drawers_added, room_counts_delta, skipped). + """ + room_counts_delta: dict = defaultdict(int) + drawers_added = 0 + with mine_lock(source_file): + # Re-check after lock — another agent may have just finished this file + if file_already_mined(collection, source_file): + return 0, room_counts_delta, True + + for chunk in chunks: + chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room + if extract_mode == "general": + room_counts_delta[chunk_room] += 1 + drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" + try: + collection.upsert( + documents=[chunk["content"]], + ids=[drawer_id], + metadatas=[ + { + "wing": wing, + "room": chunk_room, + "source_file": source_file, + "chunk_index": chunk["chunk_index"], + "added_by": agent, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "convos", + "extract_mode": extract_mode, + } + ], + ) + drawers_added += 1 + except Exception as e: + if "already exists" not in str(e).lower(): + raise + return drawers_added, room_counts_delta, False + + def mine_convos( convo_dir: str, palace_path: str, @@ -376,39 +417,14 @@ def mine_convos( room_counts[room] += 1 # File each chunk — lock to prevent concurrent agents duplicating - drawers_added = 0 - with mine_lock(source_file): - # Re-check after lock — another agent may have just finished this file - if file_already_mined(collection, source_file): - files_skipped += 1 - continue - - for chunk in chunks: - chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room - if extract_mode == "general": - room_counts[chunk_room] += 1 - drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" - try: - collection.upsert( - documents=[chunk["content"]], - ids=[drawer_id], - metadatas=[ - { - "wing": wing, - "room": chunk_room, - "source_file": source_file, - "chunk_index": chunk["chunk_index"], - "added_by": agent, - "filed_at": datetime.now().isoformat(), - "ingest_mode": "convos", - "extract_mode": extract_mode, - } - ], - ) - drawers_added += 1 - except Exception as e: - if "already exists" not in str(e).lower(): - raise + drawers_added, room_delta, skipped = _file_chunks_locked( + collection, source_file, chunks, wing, room, agent, extract_mode + ) + if skipped: + files_skipped += 1 + continue + for r, n in room_delta.items(): + room_counts[r] += n total_drawers += drawers_added print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}") From 386da51ae54ca09ba491d04cb50ebe00efc73944 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:54:52 -0300 Subject: [PATCH 4/4] style: ruff format mempalace/palace.py Add blank lines after inline imports in mine_lock. Pure formatting. --- mempalace/palace.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mempalace/palace.py b/mempalace/palace.py index ed5382a..7b47f2f 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -69,18 +69,22 @@ def mine_lock(source_file: str): try: if os.name == "nt": import msvcrt + msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1) else: import fcntl + fcntl.flock(lf, fcntl.LOCK_EX) yield finally: try: if os.name == "nt": import msvcrt + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) else: import fcntl + fcntl.flock(lf, fcntl.LOCK_UN) except Exception: pass