merge: develop (#784 file-locking, #820 version sync)

Non-trivial merge in convo_miner.py: this branch's _file_convo_chunks
(purge stale + upsert with normalize_version) and develop's
_file_chunks_locked (mine_lock + double-checked file_already_mined)
both touched the same critical section. Combined into a single
_file_chunks_locked helper that does lock → double-check → purge →
upsert, preserving both the multi-agent safety guarantee from #784
and the schema-rebuild contract from this PR.

Also folds develop's mine_lock import into both miner.py and
convo_miner.py alongside NORMALIZE_VERSION.

707/707 tests pass, ruff + format clean under CI-pinned 0.4.x.
This commit is contained in:
Igor Lins e Silva
2026-04-13 16:29:50 -03:00
4 changed files with 141 additions and 66 deletions
+62 -42
View File
@@ -16,7 +16,13 @@ from datetime import datetime
from collections import defaultdict
from .normalize import normalize
from .palace import NORMALIZE_VERSION, SKIP_DIRS, file_already_mined, get_collection
from .palace import (
NORMALIZE_VERSION,
SKIP_DIRS,
file_already_mined,
get_collection,
mine_lock,
)
# File types that might contain conversations
@@ -273,50 +279,60 @@ def scan_convos(convo_dir: str) -> list:
# =============================================================================
def _file_convo_chunks(collection, source_file, chunks, wing, room, agent, extract_mode):
"""Purge stale drawers for ``source_file`` then upsert fresh chunks.
def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode):
"""Lock the source file, purge stale drawers, and upsert fresh chunks.
Returns (drawers_added, room_counts_delta).
Combines the per-file serialization that prevents concurrent agents from
duplicating work (via mine_lock) with the normalize-version rebuild
contract (purge-before-insert so pre-v2 drawers don't survive).
Returns (drawers_added, room_counts_delta, skipped).
"""
# Purge stale drawers first. When the normalize schema bumps,
# file_already_mined() returns False for pre-v2 drawers and we land
# here — clean them out so the source doesn't end up with a mix of
# old-noise and new-clean drawers.
try:
collection.delete(where={"source_file": source_file})
except Exception:
pass
room_counts_delta: dict = defaultdict(int)
drawers_added = 0
for chunk in chunks:
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
if extract_mode == "general":
room_counts_delta[chunk_room] += 1
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
with mine_lock(source_file):
# Re-check after lock — another agent may have just finished this file
# at the current schema. A stale-version hit here returns False, so we
# still fall through to the purge+rebuild path below.
if file_already_mined(collection, source_file):
return 0, room_counts_delta, True
# Purge stale drawers first. When the normalize schema bumps,
# file_already_mined() returned False for pre-v2 drawers — clean
# them out so the source doesn't end up with mixed old/new drawers.
try:
collection.upsert(
documents=[chunk["content"]],
ids=[drawer_id],
metadatas=[
{
"wing": wing,
"room": chunk_room,
"source_file": source_file,
"chunk_index": chunk["chunk_index"],
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "convos",
"extract_mode": extract_mode,
"normalize_version": NORMALIZE_VERSION,
}
],
)
drawers_added += 1
except Exception as e:
if "already exists" not in str(e).lower():
raise
return drawers_added, room_counts_delta
collection.delete(where={"source_file": source_file})
except Exception:
pass
for chunk in chunks:
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
if extract_mode == "general":
room_counts_delta[chunk_room] += 1
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
try:
collection.upsert(
documents=[chunk["content"]],
ids=[drawer_id],
metadatas=[
{
"wing": wing,
"room": chunk_room,
"source_file": source_file,
"chunk_index": chunk["chunk_index"],
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "convos",
"extract_mode": extract_mode,
"normalize_version": NORMALIZE_VERSION,
}
],
)
drawers_added += 1
except Exception as e:
if "already exists" not in str(e).lower():
raise
return drawers_added, room_counts_delta, False
def mine_convos(
@@ -422,10 +438,14 @@ def mine_convos(
if extract_mode != "general":
room_counts[room] += 1
# Purge stale drawers + file fresh chunks.
drawers_added, room_delta = _file_convo_chunks(
# Lock + purge stale + file fresh chunks. Lock serializes concurrent
# agents; purge removes pre-v2 drawers so the schema bump applies.
drawers_added, room_delta, skipped = _file_chunks_locked(
collection, source_file, chunks, wing, room, agent, extract_mode
)
if skipped:
files_skipped += 1
continue
for r, n in room_delta.items():
room_counts[r] += n