fix: add file-level locking to prevent multi-agent duplicate drawers

Root cause: when multiple agents mine simultaneously, both pass
file_already_mined() check, both delete+insert the same file's
drawers, creating duplicates or losing data.

Fix: mine_lock() in palace.py — cross-platform file lock (fcntl on
Unix, msvcrt on Windows). Both miner.py and convo_miner.py now lock
per-file during the delete+insert cycle and re-check after acquiring
the lock.

Tested:
- Lock acquires and releases correctly
- Second agent blocks until first releases (0.25s wait)
- 33/33 existing tests pass
- Cross-platform: fcntl (macOS/Linux), msvcrt (Windows)

Based on v3.2.0 tag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
MSL
2026-04-13 01:16:51 -07:00
committed by Igor Lins e Silva
parent 6614b9b4e7
commit 30a431924b
3 changed files with 102 additions and 51 deletions
+34 -28
View File
@@ -16,7 +16,7 @@ from datetime import datetime
from collections import defaultdict from collections import defaultdict
from .normalize import normalize from .normalize import normalize
from .palace import SKIP_DIRS, get_collection, file_already_mined from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
# File types that might contain conversations # File types that might contain conversations
@@ -375,34 +375,40 @@ def mine_convos(
if extract_mode != "general": if extract_mode != "general":
room_counts[room] += 1 room_counts[room] += 1
# File each chunk # File each chunk — lock to prevent concurrent agents duplicating
drawers_added = 0 drawers_added = 0
for chunk in chunks: with mine_lock(source_file):
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room # Re-check after lock — another agent may have just finished this file
if extract_mode == "general": if file_already_mined(collection, source_file):
room_counts[chunk_room] += 1 files_skipped += 1
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" continue
try:
collection.upsert( for chunk in chunks:
documents=[chunk["content"]], chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
ids=[drawer_id], if extract_mode == "general":
metadatas=[ room_counts[chunk_room] += 1
{ drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
"wing": wing, try:
"room": chunk_room, collection.upsert(
"source_file": source_file, documents=[chunk["content"]],
"chunk_index": chunk["chunk_index"], ids=[drawer_id],
"added_by": agent, metadatas=[
"filed_at": datetime.now().isoformat(), {
"ingest_mode": "convos", "wing": wing,
"extract_mode": extract_mode, "room": chunk_room,
} "source_file": source_file,
], "chunk_index": chunk["chunk_index"],
) "added_by": agent,
drawers_added += 1 "filed_at": datetime.now().isoformat(),
except Exception as e: "ingest_mode": "convos",
if "already exists" not in str(e).lower(): "extract_mode": extract_mode,
raise }
],
)
drawers_added += 1
except Exception as e:
if "already exists" not in str(e).lower():
raise
total_drawers += drawers_added total_drawers += drawers_added
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}") print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
+31 -23
View File
@@ -15,7 +15,7 @@ from pathlib import Path
from datetime import datetime from datetime import datetime
from collections import defaultdict from collections import defaultdict
from .palace import SKIP_DIRS, get_collection, file_already_mined from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
READABLE_EXTENSIONS = { READABLE_EXTENSIONS = {
".txt", ".txt",
@@ -434,29 +434,37 @@ def process_file(
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)") print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)")
return len(chunks), room return len(chunks), room
# Purge stale drawers for this file before re-inserting the fresh chunks. # Lock this file so concurrent agents don't interleave delete+insert.
# Converts modified-file re-mines from upsert-over-existing-IDs (which hits # Without the lock, two agents can both pass file_already_mined(),
# hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM # both delete, and both insert — creating duplicates or losing data.
# with chromadb 0.6.3) into a clean delete+insert, bypassing the update with mine_lock(source_file):
# path entirely. # Re-check after acquiring lock — another agent may have just finished
try: if file_already_mined(collection, source_file, check_mtime=True):
collection.delete(where={"source_file": source_file}) return 0, room
except Exception:
pass
drawers_added = 0 # Purge stale drawers for this file before re-inserting the fresh chunks.
for chunk in chunks: # Converts modified-file re-mines from upsert-over-existing-IDs (which hits
added = add_drawer( # hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM
collection=collection, # with chromadb 0.6.3) into a clean delete+insert, bypassing the update
wing=wing, # path entirely.
room=room, try:
content=chunk["content"], collection.delete(where={"source_file": source_file})
source_file=source_file, except Exception:
chunk_index=chunk["chunk_index"], pass
agent=agent,
) drawers_added = 0
if added: for chunk in chunks:
drawers_added += 1 added = add_drawer(
collection=collection,
wing=wing,
room=room,
content=chunk["content"],
source_file=source_file,
chunk_index=chunk["chunk_index"],
agent=agent,
)
if added:
drawers_added += 1
return drawers_added, room return drawers_added, room
+37
View File
@@ -4,6 +4,8 @@ palace.py — Shared palace operations.
Consolidates collection access patterns used by both miners and the MCP server. Consolidates collection access patterns used by both miners and the MCP server.
""" """
import contextlib
import hashlib
import os import os
from .backends.chroma import ChromaBackend from .backends.chroma import ChromaBackend
@@ -50,6 +52,41 @@ def get_collection(
) )
@contextlib.contextmanager
def mine_lock(source_file: str):
"""Cross-platform file lock for mine operations.
Prevents multiple agents from mining the same file simultaneously,
which causes duplicate drawers when the delete+insert cycle interleaves.
"""
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
os.makedirs(lock_dir, exist_ok=True)
lock_path = os.path.join(
lock_dir, hashlib.sha256(source_file.encode()).hexdigest()[:16] + ".lock"
)
lf = open(lock_path, "w")
try:
if os.name == "nt":
import msvcrt
msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1)
else:
import fcntl
fcntl.flock(lf, fcntl.LOCK_EX)
yield
finally:
try:
if os.name == "nt":
import msvcrt
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
else:
import fcntl
fcntl.flock(lf, fcntl.LOCK_UN)
except Exception:
pass
lf.close()
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
"""Check if a file has already been filed in the palace. """Check if a file has already been filed in the palace.