Merge pull request #784 from MemPalace/pr/multi-agent-lock
fix: file-level locking to prevent multi-agent duplicate drawers
This commit is contained in:
+50
-28
@@ -16,7 +16,7 @@ from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
from .normalize import normalize
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
|
||||
|
||||
|
||||
# File types that might contain conversations
|
||||
@@ -272,6 +272,47 @@ def scan_convos(convo_dir: str) -> list:
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode):
|
||||
"""Acquire the per-file lock, double-check mined status, and upsert chunks.
|
||||
|
||||
Returns (drawers_added, room_counts_delta, skipped).
|
||||
"""
|
||||
room_counts_delta: dict = defaultdict(int)
|
||||
drawers_added = 0
|
||||
with mine_lock(source_file):
|
||||
# Re-check after lock — another agent may have just finished this file
|
||||
if file_already_mined(collection, source_file):
|
||||
return 0, room_counts_delta, True
|
||||
|
||||
for chunk in chunks:
|
||||
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
||||
if extract_mode == "general":
|
||||
room_counts_delta[chunk_room] += 1
|
||||
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
|
||||
try:
|
||||
collection.upsert(
|
||||
documents=[chunk["content"]],
|
||||
ids=[drawer_id],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": wing,
|
||||
"room": chunk_room,
|
||||
"source_file": source_file,
|
||||
"chunk_index": chunk["chunk_index"],
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "convos",
|
||||
"extract_mode": extract_mode,
|
||||
}
|
||||
],
|
||||
)
|
||||
drawers_added += 1
|
||||
except Exception as e:
|
||||
if "already exists" not in str(e).lower():
|
||||
raise
|
||||
return drawers_added, room_counts_delta, False
|
||||
|
||||
|
||||
def mine_convos(
|
||||
convo_dir: str,
|
||||
palace_path: str,
|
||||
@@ -375,34 +416,15 @@ def mine_convos(
|
||||
if extract_mode != "general":
|
||||
room_counts[room] += 1
|
||||
|
||||
# File each chunk
|
||||
drawers_added = 0
|
||||
for chunk in chunks:
|
||||
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
||||
if extract_mode == "general":
|
||||
room_counts[chunk_room] += 1
|
||||
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
|
||||
try:
|
||||
collection.upsert(
|
||||
documents=[chunk["content"]],
|
||||
ids=[drawer_id],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": wing,
|
||||
"room": chunk_room,
|
||||
"source_file": source_file,
|
||||
"chunk_index": chunk["chunk_index"],
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "convos",
|
||||
"extract_mode": extract_mode,
|
||||
}
|
||||
],
|
||||
# File each chunk — lock to prevent concurrent agents duplicating
|
||||
drawers_added, room_delta, skipped = _file_chunks_locked(
|
||||
collection, source_file, chunks, wing, room, agent, extract_mode
|
||||
)
|
||||
drawers_added += 1
|
||||
except Exception as e:
|
||||
if "already exists" not in str(e).lower():
|
||||
raise
|
||||
if skipped:
|
||||
files_skipped += 1
|
||||
continue
|
||||
for r, n in room_delta.items():
|
||||
room_counts[r] += n
|
||||
|
||||
total_drawers += drawers_added
|
||||
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
|
||||
|
||||
+9
-1
@@ -15,7 +15,7 @@ from pathlib import Path
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
|
||||
|
||||
READABLE_EXTENSIONS = {
|
||||
".txt",
|
||||
@@ -434,6 +434,14 @@ def process_file(
|
||||
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)")
|
||||
return len(chunks), room
|
||||
|
||||
# Lock this file so concurrent agents don't interleave delete+insert.
|
||||
# Without the lock, two agents can both pass file_already_mined(),
|
||||
# both delete, and both insert — creating duplicates or losing data.
|
||||
with mine_lock(source_file):
|
||||
# Re-check after acquiring lock — another agent may have just finished
|
||||
if file_already_mined(collection, source_file, check_mtime=True):
|
||||
return 0, room
|
||||
|
||||
# Purge stale drawers for this file before re-inserting the fresh chunks.
|
||||
# Converts modified-file re-mines from upsert-over-existing-IDs (which hits
|
||||
# hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM
|
||||
|
||||
@@ -4,6 +4,8 @@ palace.py — Shared palace operations.
|
||||
Consolidates collection access patterns used by both miners and the MCP server.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
from .backends.chroma import ChromaBackend
|
||||
@@ -50,6 +52,45 @@ def get_collection(
|
||||
)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def mine_lock(source_file: str):
|
||||
"""Cross-platform file lock for mine operations.
|
||||
|
||||
Prevents multiple agents from mining the same file simultaneously,
|
||||
which causes duplicate drawers when the delete+insert cycle interleaves.
|
||||
"""
|
||||
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
|
||||
os.makedirs(lock_dir, exist_ok=True)
|
||||
lock_path = os.path.join(
|
||||
lock_dir, hashlib.sha256(source_file.encode()).hexdigest()[:16] + ".lock"
|
||||
)
|
||||
|
||||
lf = open(lock_path, "w")
|
||||
try:
|
||||
if os.name == "nt":
|
||||
import msvcrt
|
||||
|
||||
msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1)
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(lf, fcntl.LOCK_EX)
|
||||
yield
|
||||
finally:
|
||||
try:
|
||||
if os.name == "nt":
|
||||
import msvcrt
|
||||
|
||||
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(lf, fcntl.LOCK_UN)
|
||||
except Exception:
|
||||
pass
|
||||
lf.close()
|
||||
|
||||
|
||||
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
|
||||
"""Check if a file has already been filed in the palace.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user