refactor: extract locked filing block to keep mine_convos under C901
Adding the per-file lock + double-checked file_already_mined() in the previous commit pushed mine_convos cyclomatic complexity from 25 to 26, just over ruff's max-complexity threshold. Hoist the locked critical section into _file_chunks_locked() so the outer loop stays within budget. No behavior change.
This commit is contained in:
+49
-33
@@ -272,6 +272,47 @@ def scan_convos(convo_dir: str) -> list:
|
|||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode):
|
||||||
|
"""Acquire the per-file lock, double-check mined status, and upsert chunks.
|
||||||
|
|
||||||
|
Returns (drawers_added, room_counts_delta, skipped).
|
||||||
|
"""
|
||||||
|
room_counts_delta: dict = defaultdict(int)
|
||||||
|
drawers_added = 0
|
||||||
|
with mine_lock(source_file):
|
||||||
|
# Re-check after lock — another agent may have just finished this file
|
||||||
|
if file_already_mined(collection, source_file):
|
||||||
|
return 0, room_counts_delta, True
|
||||||
|
|
||||||
|
for chunk in chunks:
|
||||||
|
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
||||||
|
if extract_mode == "general":
|
||||||
|
room_counts_delta[chunk_room] += 1
|
||||||
|
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
|
||||||
|
try:
|
||||||
|
collection.upsert(
|
||||||
|
documents=[chunk["content"]],
|
||||||
|
ids=[drawer_id],
|
||||||
|
metadatas=[
|
||||||
|
{
|
||||||
|
"wing": wing,
|
||||||
|
"room": chunk_room,
|
||||||
|
"source_file": source_file,
|
||||||
|
"chunk_index": chunk["chunk_index"],
|
||||||
|
"added_by": agent,
|
||||||
|
"filed_at": datetime.now().isoformat(),
|
||||||
|
"ingest_mode": "convos",
|
||||||
|
"extract_mode": extract_mode,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
drawers_added += 1
|
||||||
|
except Exception as e:
|
||||||
|
if "already exists" not in str(e).lower():
|
||||||
|
raise
|
||||||
|
return drawers_added, room_counts_delta, False
|
||||||
|
|
||||||
|
|
||||||
def mine_convos(
|
def mine_convos(
|
||||||
convo_dir: str,
|
convo_dir: str,
|
||||||
palace_path: str,
|
palace_path: str,
|
||||||
@@ -376,39 +417,14 @@ def mine_convos(
|
|||||||
room_counts[room] += 1
|
room_counts[room] += 1
|
||||||
|
|
||||||
# File each chunk — lock to prevent concurrent agents duplicating
|
# File each chunk — lock to prevent concurrent agents duplicating
|
||||||
drawers_added = 0
|
drawers_added, room_delta, skipped = _file_chunks_locked(
|
||||||
with mine_lock(source_file):
|
collection, source_file, chunks, wing, room, agent, extract_mode
|
||||||
# Re-check after lock — another agent may have just finished this file
|
)
|
||||||
if file_already_mined(collection, source_file):
|
if skipped:
|
||||||
files_skipped += 1
|
files_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
for r, n in room_delta.items():
|
||||||
for chunk in chunks:
|
room_counts[r] += n
|
||||||
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
|
||||||
if extract_mode == "general":
|
|
||||||
room_counts[chunk_room] += 1
|
|
||||||
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
|
|
||||||
try:
|
|
||||||
collection.upsert(
|
|
||||||
documents=[chunk["content"]],
|
|
||||||
ids=[drawer_id],
|
|
||||||
metadatas=[
|
|
||||||
{
|
|
||||||
"wing": wing,
|
|
||||||
"room": chunk_room,
|
|
||||||
"source_file": source_file,
|
|
||||||
"chunk_index": chunk["chunk_index"],
|
|
||||||
"added_by": agent,
|
|
||||||
"filed_at": datetime.now().isoformat(),
|
|
||||||
"ingest_mode": "convos",
|
|
||||||
"extract_mode": extract_mode,
|
|
||||||
}
|
|
||||||
],
|
|
||||||
)
|
|
||||||
drawers_added += 1
|
|
||||||
except Exception as e:
|
|
||||||
if "already exists" not in str(e).lower():
|
|
||||||
raise
|
|
||||||
|
|
||||||
total_drawers += drawers_added
|
total_drawers += drawers_added
|
||||||
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
|
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
|
||||||
|
|||||||
Reference in New Issue
Block a user