* fix: register 0-chunk files to prevent re-processing on every mine (#654) mine_convos() has three early-exit paths (OSError, content too short, zero chunks) that skip writing anything to ChromaDB. Since file_already_mined() checks for the presence of a document with a matching source_file, these files are re-read and re-processed on every subsequent run. Add _register_file() that upserts a lightweight sentinel document (room="_registry", ingest_mode="registry") so file_already_mined() returns True on future runs. Note: Bug 2 from the issue (drawers_added counter always 0) was already resolved upstream via the switch from collection.add() to collection.upsert(). * fix: resolve macOS path symlink in test + remove unused variable
This commit is contained in:
committed by
GitHub
parent
9b60c6edd7
commit
87e8bafad8
@@ -32,6 +32,30 @@ CHUNK_SIZE = 800 # chars per drawer — align with miner.py
|
|||||||
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this
|
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this
|
||||||
|
|
||||||
|
|
||||||
|
def _register_file(collection, source_file: str, wing: str, agent: str):
|
||||||
|
"""Write a sentinel so file_already_mined() returns True for 0-chunk files.
|
||||||
|
|
||||||
|
Without this, files that normalize to nothing or produce zero chunks are
|
||||||
|
re-read and re-processed on every mine run because nothing was written to
|
||||||
|
ChromaDB on the first pass.
|
||||||
|
"""
|
||||||
|
sentinel_id = f"_reg_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}"
|
||||||
|
collection.upsert(
|
||||||
|
documents=[f"[registry] {source_file}"],
|
||||||
|
ids=[sentinel_id],
|
||||||
|
metadatas=[
|
||||||
|
{
|
||||||
|
"wing": wing,
|
||||||
|
"room": "_registry",
|
||||||
|
"source_file": source_file,
|
||||||
|
"added_by": agent,
|
||||||
|
"filed_at": datetime.now().isoformat(),
|
||||||
|
"ingest_mode": "registry",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# CHUNKING — exchange pairs for conversations
|
# CHUNKING — exchange pairs for conversations
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
@@ -305,9 +329,13 @@ def mine_convos(
|
|||||||
try:
|
try:
|
||||||
content = normalize(str(filepath))
|
content = normalize(str(filepath))
|
||||||
except (OSError, ValueError):
|
except (OSError, ValueError):
|
||||||
|
if not dry_run:
|
||||||
|
_register_file(collection, source_file, wing, agent)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not content or len(content.strip()) < MIN_CHUNK_SIZE:
|
if not content or len(content.strip()) < MIN_CHUNK_SIZE:
|
||||||
|
if not dry_run:
|
||||||
|
_register_file(collection, source_file, wing, agent)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Chunk — either exchange pairs or general extraction
|
# Chunk — either exchange pairs or general extraction
|
||||||
@@ -320,6 +348,8 @@ def mine_convos(
|
|||||||
chunks = chunk_exchanges(content)
|
chunks = chunk_exchanges(content)
|
||||||
|
|
||||||
if not chunks:
|
if not chunks:
|
||||||
|
if not dry_run:
|
||||||
|
_register_file(collection, source_file, wing, agent)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Detect room from content (general mode uses memory_type instead)
|
# Detect room from content (general mode uses memory_type instead)
|
||||||
|
|||||||
@@ -1,8 +1,12 @@
|
|||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import chromadb
|
import chromadb
|
||||||
|
|
||||||
from mempalace.convo_miner import mine_convos
|
from mempalace.convo_miner import mine_convos
|
||||||
|
from mempalace.palace import file_already_mined
|
||||||
|
|
||||||
|
|
||||||
def test_convo_mining():
|
def test_convo_mining():
|
||||||
@@ -24,3 +28,50 @@ def test_convo_mining():
|
|||||||
assert len(results["documents"][0]) > 0
|
assert len(results["documents"][0]) > 0
|
||||||
|
|
||||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mine_convos_does_not_reprocess_short_files(capsys):
|
||||||
|
"""Files below MIN_CHUNK_SIZE get a sentinel so they are skipped on re-run."""
|
||||||
|
tmpdir = tempfile.mkdtemp()
|
||||||
|
try:
|
||||||
|
# A file too short to produce any chunks
|
||||||
|
with open(os.path.join(tmpdir, "tiny.txt"), "w") as f:
|
||||||
|
f.write("hi")
|
||||||
|
|
||||||
|
palace_path = os.path.join(tmpdir, "palace")
|
||||||
|
|
||||||
|
# First run -- file is processed (sentinel written)
|
||||||
|
mine_convos(tmpdir, palace_path, wing="test")
|
||||||
|
capsys.readouterr() # drain output
|
||||||
|
|
||||||
|
# Verify sentinel was written (resolve path -- macOS /var -> /private/var)
|
||||||
|
resolved_file = str(Path(tmpdir).resolve() / "tiny.txt")
|
||||||
|
client = chromadb.PersistentClient(path=palace_path)
|
||||||
|
col = client.get_collection("mempalace_drawers")
|
||||||
|
assert file_already_mined(col, resolved_file)
|
||||||
|
|
||||||
|
# Second run -- file should be skipped
|
||||||
|
mine_convos(tmpdir, palace_path, wing="test")
|
||||||
|
out2 = capsys.readouterr().out
|
||||||
|
assert "Files skipped (already filed): 1" in out2
|
||||||
|
finally:
|
||||||
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mine_convos_does_not_reprocess_empty_chunk_files(capsys):
|
||||||
|
"""Files that normalize but produce 0 exchange chunks get a sentinel."""
|
||||||
|
tmpdir = tempfile.mkdtemp()
|
||||||
|
try:
|
||||||
|
# Content long enough to pass MIN_CHUNK_SIZE but with no exchange markers
|
||||||
|
# (no "> " lines), so chunk_exchanges returns []
|
||||||
|
with open(os.path.join(tmpdir, "no_exchanges.txt"), "w") as f:
|
||||||
|
f.write("This is a plain paragraph without any exchange markers. " * 5)
|
||||||
|
|
||||||
|
palace_path = os.path.join(tmpdir, "palace")
|
||||||
|
|
||||||
|
mine_convos(tmpdir, palace_path, wing="test")
|
||||||
|
mine_convos(tmpdir, palace_path, wing="test")
|
||||||
|
out2 = capsys.readouterr().out
|
||||||
|
assert "Files skipped (already filed): 1" in out2
|
||||||
|
finally:
|
||||||
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user