fix(sync): symmetric source_file resolve + perf optimizations
CI fix: `_classify_drawer` now resolves `source_file` symmetric to
`project_roots` (which `_normalize_project_dirs` and
`_auto_detect_project_roots` already `.resolve()`). Without this, on
platforms where the temp directory is a symlink (macOS `/var/folders` ->
`/private/var/folders`, Windows 8.3 short-name normalization), every
drawer mis-bucketed as `out_of_scope` and survived prune.
Perf:
- `_resolve_project_root`: early-return on first match (sorted-desc
precondition).
- `_normalize_project_dirs`: sort `(-len(str(p)), str(p))` desc for
early-return + deterministic tie-break on equal-length paths.
- `_auto_detect_project_roots`: `seen_sources` dedupe so a 200-chunk
file costs one disk walk, not 200.
- `sync_palace` main loop: per-file classification cache; registry
sentinels (`_reg_*`, `room=_registry`, `ingest_mode=registry`) routed
to "kept" before cache lookup so a sentinel sharing a `source_file`
with a pruned drawer cannot inherit a stale "gitignored" verdict.
- Closet purge: collapse O(N) per-file purge into one
`where={"source_file": {"$in": [...]}}` get + one bulk delete.
Tests (5 new in `TestSyncPalace`, 38 total):
- `test_symlinked_project_root_resolves`: pins symmetric resolve via
real `os.symlink` (skipped on Windows).
- `test_classification_cache_avoids_redundant_disk_hits`: monkeypatch
counter on `_classify_drawer` asserts `call_count == 1` for 5 chunks
sharing one source_file.
- `test_closet_batch_purge_single_call`: wraps closets collection with
`CallCountingCol` (forwards `.get`/`.delete`); asserts
`delete_calls == 1` and `get_calls == 1`; expected `removed_closets`
derived from `report["by_source"]` to stay robust to fixture changes.
- `test_registry_check_runs_before_cache_lookup`: a regular drawer
caches "gitignored" first; a sentinel with the same source_file must
still be kept.
- `test_normalize_project_dirs_sort_stable_on_equal_length`: pins the
alphabetical secondary key when paths share length.
This commit is contained in:
+47
-24
@@ -22,7 +22,6 @@ from .palace import (
|
||||
get_closets_collection,
|
||||
get_collection,
|
||||
mine_palace_lock,
|
||||
purge_file_closets,
|
||||
)
|
||||
|
||||
|
||||
@@ -44,16 +43,18 @@ class SyncReport(TypedDict):
|
||||
|
||||
|
||||
def _resolve_project_root(source_file: Path, project_roots: list) -> Optional[Path]:
|
||||
"""Return the longest project_root that source_file lives under."""
|
||||
best: Optional[Path] = None
|
||||
"""Return the longest project_root that source_file lives under.
|
||||
|
||||
Assumes ``project_roots`` is sorted by path-length descending so the
|
||||
first match is the longest (deepest) prefix.
|
||||
"""
|
||||
for root in project_roots:
|
||||
try:
|
||||
source_file.relative_to(root)
|
||||
return root
|
||||
except ValueError:
|
||||
continue
|
||||
if best is None or len(str(root)) > len(str(best)):
|
||||
best = root
|
||||
return best
|
||||
return None
|
||||
|
||||
|
||||
def _ancestor_matchers(source_file: Path, root: Path, matcher_cache: dict) -> list:
|
||||
@@ -102,6 +103,7 @@ def _classify_drawer(
|
||||
|
||||
Returns one of: kept, gitignored, missing, no_source, out_of_scope.
|
||||
"""
|
||||
# Defensive: main loop filters registry rows; this guards direct callers.
|
||||
if _is_registry_row(meta, drawer_id):
|
||||
return "kept"
|
||||
|
||||
@@ -112,6 +114,7 @@ def _classify_drawer(
|
||||
src = Path(source_file)
|
||||
if not src.is_absolute():
|
||||
return "no_source"
|
||||
src = src.resolve(strict=False)
|
||||
|
||||
root = _resolve_project_root(src, project_roots)
|
||||
if root is None:
|
||||
@@ -154,12 +157,17 @@ def _auto_detect_project_roots(col, wing: Optional[str]) -> list:
|
||||
a `.git` directory or a `.gitignore` file. The deepest such ancestor
|
||||
wins, so nested-but-still-tracked subprojects are honoured.
|
||||
`Path.parents` iterates deepest-first, so the first hit IS deepest.
|
||||
|
||||
Dedupes on ``source_file`` string so a 200-chunk file costs one disk
|
||||
walk, not 200.
|
||||
"""
|
||||
roots = set()
|
||||
roots: set = set()
|
||||
seen_sources: set = set()
|
||||
for _, meta in _iter_drawer_metadata(col, wing):
|
||||
source_file = (meta or {}).get("source_file")
|
||||
if not source_file:
|
||||
if not source_file or source_file in seen_sources:
|
||||
continue
|
||||
seen_sources.add(source_file)
|
||||
src = Path(source_file)
|
||||
if not src.is_absolute():
|
||||
continue
|
||||
@@ -167,13 +175,13 @@ def _auto_detect_project_roots(col, wing: Optional[str]) -> list:
|
||||
if (parent / ".git").exists() or (parent / ".gitignore").is_file():
|
||||
roots.add(parent.resolve(strict=False))
|
||||
break
|
||||
# Sort by depth (deepest first) with secondary lexicographic key for
|
||||
# deterministic order when two roots share string length.
|
||||
return sorted(roots, key=lambda p: (-len(str(p)), str(p)))
|
||||
|
||||
|
||||
def _normalize_project_dirs(project_dirs) -> list:
|
||||
return [Path(p).resolve(strict=False) for p in project_dirs]
|
||||
"""Resolve and sort project dirs so deepest-prefix wins on first match."""
|
||||
resolved = [Path(p).resolve(strict=False) for p in project_dirs]
|
||||
return sorted(resolved, key=lambda p: (-len(str(p)), str(p)))
|
||||
|
||||
|
||||
def _delete_in_batches(col, ids: list, batch_size: int, wal_log: Optional[Callable]):
|
||||
@@ -246,17 +254,30 @@ def sync_palace(
|
||||
roots = _auto_detect_project_roots(col, wing)
|
||||
|
||||
matcher_cache: dict = {}
|
||||
# Same source_file → same verdict holds because mine_palace_lock
|
||||
# blocks concurrent writers and the loop is synchronous.
|
||||
classification_cache: dict = {}
|
||||
|
||||
for drawer_id, meta in _iter_drawer_metadata(col, wing):
|
||||
counts["scanned"] += 1
|
||||
bucket = _classify_drawer(meta or {}, matcher_cache, roots, drawer_id)
|
||||
meta = meta or {}
|
||||
source_file = meta.get("source_file")
|
||||
|
||||
if _is_registry_row(meta, drawer_id):
|
||||
bucket = "kept"
|
||||
elif source_file and source_file in classification_cache:
|
||||
bucket = classification_cache[source_file]
|
||||
else:
|
||||
bucket = _classify_drawer(meta, matcher_cache, roots, drawer_id)
|
||||
if source_file:
|
||||
classification_cache[source_file] = bucket
|
||||
|
||||
counts[bucket] += 1
|
||||
if bucket in ("gitignored", "missing"):
|
||||
removable_ids.append(drawer_id)
|
||||
src = (meta or {}).get("source_file")
|
||||
if src:
|
||||
removable_sources.add(src)
|
||||
by_source[src] += 1
|
||||
if source_file:
|
||||
removable_sources.add(source_file)
|
||||
by_source[source_file] += 1
|
||||
|
||||
report: SyncReport = {
|
||||
**counts,
|
||||
@@ -278,15 +299,17 @@ def sync_palace(
|
||||
logger.warning("Closet purge skipped (collection unavailable): %s", exc)
|
||||
|
||||
closets_removed = 0
|
||||
if closets_col is not None:
|
||||
for source_file in removable_sources:
|
||||
before = (
|
||||
closets_col.get(where={"source_file": source_file}, include=[]).get("ids") or []
|
||||
if closets_col is not None and removable_sources:
|
||||
closet_ids = (
|
||||
closets_col.get(
|
||||
where={"source_file": {"$in": list(removable_sources)}},
|
||||
include=[],
|
||||
).get("ids")
|
||||
or []
|
||||
)
|
||||
if not before:
|
||||
continue
|
||||
purge_file_closets(closets_col, source_file)
|
||||
closets_removed += len(before)
|
||||
if closet_ids:
|
||||
closets_col.delete(ids=closet_ids)
|
||||
closets_removed = len(closet_ids)
|
||||
report["removed_closets"] = closets_removed
|
||||
return report
|
||||
|
||||
|
||||
+280
-12
@@ -441,9 +441,9 @@ class TestSyncPalace:
|
||||
# Allow-list — params must be exactly the documented audit shape so
|
||||
# any future leak (source_file, content, ID lists, etc.) trips a
|
||||
# test failure rather than slipping through a deny-list.
|
||||
assert set(params.keys()) <= {
|
||||
"first_id"
|
||||
}, f"WAL params drifted from the audit allow-list: {params.keys()}"
|
||||
assert set(params.keys()) <= {"first_id"}, (
|
||||
f"WAL params drifted from the audit allow-list: {params.keys()}"
|
||||
)
|
||||
|
||||
def test_registry_sentinels_preserved_on_apply(self, tmp_dir, palace_path):
|
||||
"""F2 regression: convo miner `_reg_*` sentinels must survive sync apply.
|
||||
@@ -564,9 +564,9 @@ class TestSyncPalace:
|
||||
inner_resolved = inner.resolve(strict=False)
|
||||
outer_resolved = outer.resolve(strict=False)
|
||||
assert inner_resolved in roots, f"expected inner in roots, got {roots}"
|
||||
assert (
|
||||
outer_resolved not in roots
|
||||
), f"deepest should win exclusively: roots={roots}, outer leaked"
|
||||
assert outer_resolved not in roots, (
|
||||
f"deepest should win exclusively: roots={roots}, outer leaked"
|
||||
)
|
||||
|
||||
def test_apply_with_empty_project_dirs_raises(self, palace_path):
|
||||
"""Round-2 P1: `project_dirs=[]` (empty list) with apply must raise,
|
||||
@@ -605,9 +605,9 @@ class TestSyncPalace:
|
||||
project_dirs=[synced_world["repo_path"]],
|
||||
dry_run=False,
|
||||
)
|
||||
assert any(
|
||||
"Closet purge skipped" in record.getMessage() for record in caplog.records
|
||||
), f"expected closet-skip warning, got: {[r.getMessage() for r in caplog.records]}"
|
||||
assert any("Closet purge skipped" in record.getMessage() for record in caplog.records), (
|
||||
f"expected closet-skip warning, got: {[r.getMessage() for r in caplog.records]}"
|
||||
)
|
||||
|
||||
def test_metadata_cache_cleared_on_exception(self, monkeypatch, config, synced_world, kg):
|
||||
"""F9 regression: tool_sync's try/finally must clear `_metadata_cache`
|
||||
@@ -648,9 +648,9 @@ class TestSyncPalace:
|
||||
assert result.get("success") is False
|
||||
assert "simulated" in result.get("error", "")
|
||||
|
||||
assert (
|
||||
mcp_server._metadata_cache is None
|
||||
), "F9: cache must be cleared even when sync_palace raises"
|
||||
assert mcp_server._metadata_cache is None, (
|
||||
"F9: cache must be cleared even when sync_palace raises"
|
||||
)
|
||||
|
||||
def test_sync_report_keys_stable(self, synced_world):
|
||||
"""Regression: SyncReport schema must not silently drop a field."""
|
||||
@@ -897,6 +897,274 @@ class TestSyncPalace:
|
||||
dry_run=True,
|
||||
)
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="os.symlink needs admin on Windows")
|
||||
def test_symlinked_project_root_resolves(self, tmp_dir, palace_path):
|
||||
"""source_file may be written through a symlinked tmp directory
|
||||
(real macOS behaviour: /var/folders/... is a symlink to
|
||||
/private/var/folders/...). project_dirs goes through .resolve()
|
||||
which follows the symlink. Without matching .resolve() on the
|
||||
source side, _resolve_project_root would mis-bucket every drawer
|
||||
as out_of_scope. This test pins symmetric resolution.
|
||||
"""
|
||||
from mempalace.sync import sync_palace
|
||||
|
||||
real_root = Path(tmp_dir) / "real"
|
||||
(real_root / "build").mkdir(parents=True)
|
||||
(real_root / ".gitignore").write_text("build/\n")
|
||||
(real_root / "build" / "x.py").write_text("# ignored\n")
|
||||
|
||||
link_root = Path(tmp_dir) / "link"
|
||||
os.symlink(str(real_root), str(link_root))
|
||||
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
col = client.get_or_create_collection(
|
||||
"mempalace_drawers", metadata={"hnsw:space": "cosine"}
|
||||
)
|
||||
col.add(
|
||||
ids=["d_via_link"],
|
||||
documents=["x"],
|
||||
embeddings=[[1.0, 0.0, 0.0]],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": "demo",
|
||||
"room": "build",
|
||||
"source_file": str(link_root / "build" / "x.py"),
|
||||
"chunk_index": 0,
|
||||
"added_by": "miner",
|
||||
"filed_at": "2026-05-09T00:00:00",
|
||||
}
|
||||
],
|
||||
)
|
||||
del client
|
||||
|
||||
report = sync_palace(
|
||||
palace_path=palace_path,
|
||||
project_dirs=[str(real_root)],
|
||||
wing="demo",
|
||||
dry_run=True,
|
||||
)
|
||||
assert report["gitignored"] == 1, (
|
||||
f"symmetric resolve broken: drawer mis-bucketed; report={report}"
|
||||
)
|
||||
assert report["out_of_scope"] == 0
|
||||
|
||||
def test_classification_cache_avoids_redundant_disk_hits(
|
||||
self, tmp_dir, palace_path, monkeypatch
|
||||
):
|
||||
"""Per-file classification cache: N chunks of the same source_file
|
||||
cost one _classify_drawer invocation, not N. Verifies the perf
|
||||
optimisation actually short-circuits without changing behaviour.
|
||||
"""
|
||||
from mempalace import sync as sync_mod
|
||||
from mempalace.sync import sync_palace
|
||||
|
||||
repo_path = Path(tmp_dir) / "repo"
|
||||
(repo_path / "build").mkdir(parents=True)
|
||||
(repo_path / ".gitignore").write_text("build/\n")
|
||||
(repo_path / "build" / "shared.py").write_text("# ignored\n")
|
||||
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
col = client.get_or_create_collection(
|
||||
"mempalace_drawers", metadata={"hnsw:space": "cosine"}
|
||||
)
|
||||
col.add(
|
||||
ids=[f"d_chunk_{i}" for i in range(5)],
|
||||
documents=[f"chunk{i}" for i in range(5)],
|
||||
embeddings=[[float(i + 1), 0.0, 0.0] for i in range(5)],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": "demo",
|
||||
"room": "build",
|
||||
"source_file": str(repo_path / "build" / "shared.py"),
|
||||
"chunk_index": i,
|
||||
"added_by": "miner",
|
||||
"filed_at": "2026-05-09T00:00:00",
|
||||
}
|
||||
for i in range(5)
|
||||
],
|
||||
)
|
||||
del client
|
||||
|
||||
call_count = {"n": 0}
|
||||
real_classify = sync_mod._classify_drawer
|
||||
|
||||
def counting_classify(*args, **kwargs):
|
||||
call_count["n"] += 1
|
||||
return real_classify(*args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(sync_mod, "_classify_drawer", counting_classify)
|
||||
|
||||
report = sync_palace(
|
||||
palace_path=palace_path,
|
||||
project_dirs=[str(repo_path)],
|
||||
wing="demo",
|
||||
dry_run=True,
|
||||
)
|
||||
assert report["scanned"] == 5
|
||||
assert report["gitignored"] == 5
|
||||
assert call_count["n"] == 1, (
|
||||
f"cache miss: expected 1 _classify_drawer call (4 cache hits), got {call_count['n']}"
|
||||
)
|
||||
|
||||
def test_closet_batch_purge_single_call(self, synced_world, monkeypatch):
|
||||
"""Batched $in closet purge: one delete() call across all removable
|
||||
source files, not N. Wraps the real collection so chromadb still
|
||||
does the work; only the call count is intercepted.
|
||||
"""
|
||||
from mempalace import sync as sync_mod
|
||||
|
||||
repo_path = Path(synced_world["repo_path"])
|
||||
palace_path = synced_world["palace_path"]
|
||||
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
closets_col = client.get_or_create_collection(
|
||||
"mempalace_closets", metadata={"hnsw:space": "cosine"}
|
||||
)
|
||||
closets_col.add(
|
||||
ids=["c1", "c2", "c3"],
|
||||
documents=["c1", "c2", "c3"],
|
||||
embeddings=[[1.0, 0.0, 0.0], [2.0, 0.0, 0.0], [3.0, 0.0, 0.0]],
|
||||
metadatas=[
|
||||
{"source_file": str(repo_path / "build" / "ignored.py")},
|
||||
{"source_file": str(repo_path / "app.log")},
|
||||
{"source_file": str(repo_path / "deleted.py")},
|
||||
],
|
||||
)
|
||||
del client
|
||||
|
||||
class CallCountingCol:
|
||||
def __init__(self, real):
|
||||
self._real = real
|
||||
self.delete_calls = 0
|
||||
self.get_calls = 0
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
self.get_calls += 1
|
||||
return self._real.get(*args, **kwargs)
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
self.delete_calls += 1
|
||||
return self._real.delete(*args, **kwargs)
|
||||
|
||||
captured: dict = {}
|
||||
real_get_closets = sync_mod.get_closets_collection
|
||||
|
||||
def wrapped_get_closets(p, create=False):
|
||||
real = real_get_closets(p, create=create)
|
||||
wrapper = CallCountingCol(real)
|
||||
captured["wrapper"] = wrapper
|
||||
return wrapper
|
||||
|
||||
monkeypatch.setattr(sync_mod, "get_closets_collection", wrapped_get_closets)
|
||||
|
||||
from mempalace.sync import sync_palace
|
||||
|
||||
report = sync_palace(
|
||||
palace_path=palace_path,
|
||||
project_dirs=[synced_world["repo_path"]],
|
||||
dry_run=False,
|
||||
)
|
||||
|
||||
seeded_sources = {
|
||||
str(repo_path / "build" / "ignored.py"),
|
||||
str(repo_path / "app.log"),
|
||||
str(repo_path / "deleted.py"),
|
||||
}
|
||||
expected = len(seeded_sources & set(report["by_source"].keys()))
|
||||
assert report["removed_closets"] == expected, (
|
||||
f"removed_closets ({report['removed_closets']}) != |seeded ∩ removable| ({expected})"
|
||||
)
|
||||
assert "wrapper" in captured, "get_closets_collection patch not invoked"
|
||||
assert captured["wrapper"].delete_calls == 1, (
|
||||
f"expected one batch delete call, got {captured['wrapper'].delete_calls}"
|
||||
)
|
||||
assert captured["wrapper"].get_calls == 1, (
|
||||
f"expected one batch get call, got {captured['wrapper'].get_calls}"
|
||||
)
|
||||
|
||||
def test_registry_check_runs_before_cache_lookup(self, tmp_dir, palace_path):
|
||||
"""A non-registry drawer with the same source_file must NOT poison
|
||||
the bucket of a subsequent _reg_* drawer via the classification
|
||||
cache. Order matters for chromadb iteration: seed the regular
|
||||
drawer FIRST so it caches `gitignored`, then a registry sentinel
|
||||
with the same source_file. Without the registry-bypass at the
|
||||
top of the main loop, the cache lookup would route the sentinel
|
||||
to gitignored and delete it.
|
||||
"""
|
||||
from mempalace.sync import sync_palace
|
||||
|
||||
repo_path = Path(tmp_dir) / "repo"
|
||||
(repo_path / "build").mkdir(parents=True)
|
||||
(repo_path / ".gitignore").write_text("build/\n")
|
||||
(repo_path / "build" / "shared.py").write_text("# ignored\n")
|
||||
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
col = client.get_or_create_collection(
|
||||
"mempalace_drawers", metadata={"hnsw:space": "cosine"}
|
||||
)
|
||||
shared_source = str(repo_path / "build" / "shared.py")
|
||||
col.add(
|
||||
ids=["a_regular", "_reg_zzz_sentinel"],
|
||||
documents=["regular chunk", "registry sentinel"],
|
||||
embeddings=[[1.0, 0.0, 0.0], [2.0, 0.0, 0.0]],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": "demo",
|
||||
"room": "build",
|
||||
"source_file": shared_source,
|
||||
"chunk_index": 0,
|
||||
"added_by": "miner",
|
||||
"filed_at": "2026-05-09T00:00:00",
|
||||
},
|
||||
{
|
||||
"wing": "demo",
|
||||
"room": "_registry",
|
||||
"source_file": shared_source,
|
||||
"chunk_index": 0,
|
||||
"ingest_mode": "registry",
|
||||
"added_by": "convo_miner",
|
||||
"filed_at": "2026-05-09T00:00:00",
|
||||
},
|
||||
],
|
||||
)
|
||||
del client
|
||||
|
||||
report = sync_palace(
|
||||
palace_path=palace_path,
|
||||
project_dirs=[str(repo_path)],
|
||||
wing="demo",
|
||||
dry_run=False,
|
||||
)
|
||||
assert report["gitignored"] == 1
|
||||
assert report["kept"] == 1
|
||||
assert report["removed_drawers"] == 1
|
||||
|
||||
client, col = _open_drawers(palace_path)
|
||||
try:
|
||||
survivors = _drawer_ids(col)
|
||||
finally:
|
||||
del client
|
||||
assert "a_regular" not in survivors
|
||||
assert "_reg_zzz_sentinel" in survivors, (
|
||||
"registry sentinel was incorrectly pruned via cached non-registry verdict"
|
||||
)
|
||||
|
||||
def test_normalize_project_dirs_sort_stable_on_equal_length(self):
|
||||
"""`_normalize_project_dirs` must sort by `(-len, str)` so equal-length
|
||||
roots are alphabetically deterministic; otherwise overlapping nested
|
||||
scope choice depends on argv order.
|
||||
"""
|
||||
from mempalace.sync import _normalize_project_dirs
|
||||
|
||||
result = _normalize_project_dirs(["/tmp/zzz", "/tmp/aaa"])
|
||||
names = [p.name for p in result]
|
||||
assert names == ["aaa", "zzz"], f"equal-length sort not deterministic: got {names}"
|
||||
|
||||
# Different lengths: deepest first.
|
||||
deep = _normalize_project_dirs(["/tmp/short", "/tmp/much/deeper/path"])
|
||||
assert str(deep[0]).endswith("path")
|
||||
assert str(deep[1]).endswith("short")
|
||||
|
||||
|
||||
class TestSyncMcpTool:
|
||||
"""T2: `mempalace_sync` MCP entry point must keep apply polarity stable."""
|
||||
|
||||
Reference in New Issue
Block a user