feat(sync): add gitignore-aware drawer prune (#1252)

Add `mempalace sync` CLI command and `mempalace_sync` MCP tool that
prune drawers whose source files are gitignored, deleted, or moved
out of the project. Reuses the existing GitignoreMatcher
infrastructure in mempalace/miner.py so the same gitignore rules
that block ingest also drive the corresponding cleanup.

Closes #1252.
This commit is contained in:
mvalentsev
2026-05-09 03:16:03 +05:00
parent 02dd6dc19e
commit 1d3eecbf9d
5 changed files with 1604 additions and 1 deletions
+111
View File
@@ -579,6 +579,84 @@ def cmd_sweep(args):
sys.exit(1) sys.exit(1)
def cmd_sync(args):
"""Prune drawers whose source files are gitignored, deleted, or moved (#1252)."""
from .mcp_server import _wal_log
from .palace import MineAlreadyRunning
from .sync import sync_palace
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
if not os.path.isdir(palace_path):
print(f"\n No palace found at {palace_path}")
return
project_dirs = []
if args.dir:
project_dirs.append(os.path.expanduser(args.dir))
project_dirs.extend(os.path.expanduser(r) for r in args.root)
project_dirs = project_dirs or None
print(f"\n{'=' * 55}")
print(" MemPalace Sync — Gitignore-aware drawer prune")
print(f"{'=' * 55}")
print(f" Palace: {palace_path}")
if args.wing:
print(f" Wing: {args.wing}")
if project_dirs:
for p in project_dirs:
print(f" Project: {p}")
if args.dry_run:
print(" Mode: DRY RUN (no deletions)")
else:
print(" Mode: APPLY (deleting drawers)")
print(f"{'-' * 55}\n")
try:
report = sync_palace(
palace_path=palace_path,
project_dirs=project_dirs,
wing=args.wing,
dry_run=args.dry_run,
wal_log=_wal_log,
)
except MineAlreadyRunning as exc:
print(f"mempalace: {exc}", file=sys.stderr)
sys.exit(1)
except ValueError as exc:
print(f"mempalace: {exc}", file=sys.stderr)
sys.exit(2)
except Exception as exc:
print(f"mempalace: sync failed: {exc}", file=sys.stderr)
sys.exit(1)
removed_suffix = "(would remove)" if args.dry_run else "(removed)"
print(f" Scanned: {report['scanned']}")
print(f" Kept: {report['kept']}")
print(f" Gitignored: {report['gitignored']} {removed_suffix}")
print(f" Missing: {report['missing']} {removed_suffix}")
print(f" No source: {report['no_source']} (kept)")
print(f" Out of scope: {report['out_of_scope']} (kept)")
by_source = report.get("by_source") or {}
if by_source:
top = sorted(by_source.items(), key=lambda kv: -kv[1])[:5]
label = "Top sources to remove" if args.dry_run else "Top sources removed"
print(f"\n {label}:")
for src, n in top:
print(f" {src} ({n})")
if args.dry_run:
if report["gitignored"] + report["missing"] > 0:
print("\n Re-run with --apply to commit these deletions.")
else:
print(
f"\n Removed {report['removed_drawers']} drawers, {report['removed_closets']} closets."
)
print(f"\n{'=' * 55}\n")
def cmd_search(args): def cmd_search(args):
from .searcher import search, SearchError from .searcher import search, SearchError
@@ -1214,6 +1292,38 @@ def main():
help="A .jsonl transcript file, or a directory to scan recursively", help="A .jsonl transcript file, or a directory to scan recursively",
) )
# sync
p_sync = sub.add_parser(
"sync",
help="Prune drawers whose source files are gitignored, deleted, or moved (#1252)",
)
p_sync.add_argument(
"dir",
nargs="?",
default=None,
help="Project root to sync (optional; auto-detects from drawer metadata)",
)
p_sync.add_argument("--wing", default=None, help="Limit to one wing")
p_sync.add_argument(
"--root",
action="append",
default=[],
help="Additional project root (repeatable)",
)
p_sync.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
default=True,
help="Preview only (default)",
)
p_sync.add_argument(
"--apply",
dest="dry_run",
action="store_false",
help="Actually delete drawers (overrides --dry-run; requires --wing or a project root)",
)
# search # search
p_search = sub.add_parser("search", help="Find anything, exact words") p_search = sub.add_parser("search", help="Find anything, exact words")
p_search.add_argument("query", help="What to search for") p_search.add_argument("query", help="What to search for")
@@ -1422,6 +1532,7 @@ def main():
"split": cmd_split, "split": cmd_split,
"search": cmd_search, "search": cmd_search,
"sweep": cmd_sweep, "sweep": cmd_sweep,
"sync": cmd_sync,
"mcp": cmd_mcp, "mcp": cmd_mcp,
"compress": cmd_compress, "compress": cmd_compress,
"wake-up": cmd_wakeup, "wake-up": cmd_wakeup,
+52
View File
@@ -990,6 +990,40 @@ def tool_delete_drawer(drawer_id: str):
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
def tool_sync(project_dir: str = None, wing: str = None, apply: bool = False):
"""Prune drawers whose source files are gitignored, missing, or moved (#1252)."""
global _metadata_cache
from .palace import MineAlreadyRunning
from .sync import sync_palace
if not _config.palace_path:
np = _no_palace()
return {"success": False, "error": np.get("error", "no palace"), "hint": np.get("hint")}
project_dirs = [project_dir] if project_dir else None
try:
try:
report = sync_palace(
palace_path=_config.palace_path,
project_dirs=project_dirs,
wing=wing,
dry_run=not apply,
wal_log=_wal_log,
)
return {"success": True, **report}
# Order matters: typed handlers must precede the bare Exception
# below, otherwise MineAlreadyRunning and ValueError fall into the
# generic "sync failed" branch and break the structured-error tests.
except MineAlreadyRunning as exc:
return {"success": False, "error": f"another mine is in progress: {exc}"}
except ValueError as exc:
return {"success": False, "error": str(exc)}
except Exception as exc:
return {"success": False, "error": f"sync failed: {exc}"}
finally:
if apply:
_metadata_cache = None
def tool_get_drawer(drawer_id: str): def tool_get_drawer(drawer_id: str):
"""Fetch a single drawer by ID. Returns full content and metadata.""" """Fetch a single drawer by ID. Returns full content and metadata."""
col = _get_collection() col = _get_collection()
@@ -1886,6 +1920,24 @@ TOOLS = {
}, },
"handler": tool_delete_drawer, "handler": tool_delete_drawer,
}, },
"mempalace_sync": {
"description": "Prune drawers whose source files are gitignored, deleted, or moved. Returns dry-run report by default; pass apply=true to commit deletions.",
"input_schema": {
"type": "object",
"properties": {
"project_dir": {
"type": "string",
"description": "Project root to scope the sync (optional; auto-detected from drawer metadata if omitted)",
},
"wing": {"type": "string", "description": "Limit to one wing (optional)"},
"apply": {
"type": "boolean",
"description": "Actually delete drawers; default is dry-run preview",
},
},
},
"handler": tool_sync,
},
"mempalace_get_drawer": { "mempalace_get_drawer": {
"description": "Fetch a single drawer by ID — returns full content and metadata.", "description": "Fetch a single drawer by ID — returns full content and metadata.",
"input_schema": { "input_schema": {
+298
View File
@@ -0,0 +1,298 @@
"""
sync.py — Gitignore-aware drawer prune (#1252).
Removes drawers whose source files are now gitignored, deleted, or moved
out of the project. Reuses the same GitignoreMatcher infrastructure that
the miner uses on the way in, so the same rules that block ingest also
drive the corresponding cleanup.
Usage:
from mempalace.sync import sync_palace
report = sync_palace(palace_path, project_dirs=["/repo"], dry_run=True)
"""
import logging
from collections import defaultdict
from pathlib import Path
from typing import Callable, Optional, TypedDict
from .miner import is_gitignored, load_gitignore_matcher
from .palace import (
MineAlreadyRunning,
get_closets_collection,
get_collection,
mine_palace_lock,
purge_file_closets,
)
logger = logging.getLogger(__name__)
_BATCH = 1000
class SyncReport(TypedDict):
scanned: int
kept: int
gitignored: int
missing: int
no_source: int
out_of_scope: int
removed_drawers: int
removed_closets: int
dry_run: bool
by_source: dict[str, int]
def _resolve_project_root(source_file: Path, project_roots: list) -> Optional[Path]:
"""Return the longest project_root that source_file lives under."""
best: Optional[Path] = None
for root in project_roots:
try:
source_file.relative_to(root)
except ValueError:
continue
if best is None or len(str(root)) > len(str(best)):
best = root
return best
def _ancestor_matchers(source_file: Path, root: Path, matcher_cache: dict) -> list:
"""Build the ancestor-chain matcher list, root → file's parent.
Callers are expected to invoke this only after `_resolve_project_root`
confirms `source_file` lives under `root`. The defensive try/except
keeps the function safe if a future caller skips that check.
"""
matchers: list = []
try:
parts = source_file.relative_to(root).parts
except ValueError:
return matchers
cursor = root
matcher = load_gitignore_matcher(cursor, matcher_cache)
if matcher is not None:
matchers.append(matcher)
for part in parts[:-1]:
cursor = cursor / part
matcher = load_gitignore_matcher(cursor, matcher_cache)
if matcher is not None:
matchers.append(matcher)
return matchers
def _is_registry_row(meta: dict, drawer_id: str) -> bool:
"""Convo miner sentinels track 'have I seen this transcript' — preserve them.
Deleting a `_reg_*` sentinel makes the next mine pass re-chunk and re-embed
the entire transcript even though its content has not changed.
"""
if (meta or {}).get("room") == "_registry":
return True
if (meta or {}).get("ingest_mode") == "registry":
return True
if drawer_id and drawer_id.startswith("_reg_"):
return True
return False
def _classify_drawer(
meta: dict, matcher_cache: dict, project_roots: list, drawer_id: str = ""
) -> str:
"""Classify a drawer by its source_file metadata.
Returns one of: kept, gitignored, missing, no_source, out_of_scope.
"""
if _is_registry_row(meta, drawer_id):
return "kept"
source_file = (meta or {}).get("source_file")
if not source_file:
return "no_source"
src = Path(source_file)
if not src.is_absolute():
return "no_source"
root = _resolve_project_root(src, project_roots)
if root is None:
return "out_of_scope"
if not src.exists():
return "missing"
matchers = _ancestor_matchers(src, root, matcher_cache)
if matchers and is_gitignored(src, matchers, is_dir=False):
return "gitignored"
return "kept"
def _iter_drawer_metadata(col, wing: Optional[str]):
"""Yield (id, metadata) tuples from the drawers collection in batches."""
offset = 0
where = {"wing": wing} if wing else None
while True:
kwargs = {"include": ["metadatas"], "limit": _BATCH, "offset": offset}
if where:
kwargs["where"] = where
batch = col.get(**kwargs)
ids = batch.get("ids") or []
metas = batch.get("metadatas") or []
if not ids:
return
for drawer_id, meta in zip(ids, metas):
yield drawer_id, meta
if len(ids) < _BATCH:
return
offset += len(ids)
def _auto_detect_project_roots(col, wing: Optional[str]) -> list:
"""Walk drawer metadata once collecting candidate project roots.
A path is a project root if any ancestor up to filesystem root holds
a `.git` directory or a `.gitignore` file. The deepest such ancestor
wins, so nested-but-still-tracked subprojects are honoured.
`Path.parents` iterates deepest-first, so the first hit IS deepest.
"""
roots = set()
for _, meta in _iter_drawer_metadata(col, wing):
source_file = (meta or {}).get("source_file")
if not source_file:
continue
src = Path(source_file)
if not src.is_absolute():
continue
for parent in src.parents:
if (parent / ".git").exists() or (parent / ".gitignore").is_file():
roots.add(parent.resolve(strict=False))
break
# Sort by depth (deepest first) with secondary lexicographic key for
# deterministic order when two roots share string length.
return sorted(roots, key=lambda p: (-len(str(p)), str(p)))
def _normalize_project_dirs(project_dirs) -> list:
return [Path(p).resolve(strict=False) for p in project_dirs]
def _delete_in_batches(col, ids: list, batch_size: int, wal_log: Optional[Callable]):
"""Delete drawer IDs in batches, optionally logging each batch to WAL."""
deleted = 0
for i in range(0, len(ids), batch_size):
chunk = ids[i : i + batch_size]
col.delete(ids=chunk)
deleted += len(chunk)
if wal_log is not None:
wal_log(
"sync_prune",
{"first_id": chunk[0]},
{"removed_count": len(chunk)},
)
return deleted
def sync_palace(
palace_path: str,
project_dirs: Optional[list] = None,
wing: Optional[str] = None,
dry_run: bool = True,
batch_size: int = _BATCH,
wal_log: Optional[Callable] = None,
) -> SyncReport:
"""Prune drawers whose source files are gitignored, missing, or moved.
Returns a SyncReport with bucket counts. Dry-run by default; pass
dry_run=False to actually delete drawers and matching closets.
Holds ``mine_palace_lock`` for the whole call so the classify pass and
the apply branch see the same drawer snapshot. Raises
``MineAlreadyRunning`` if another mine is in progress on this palace.
On apply (``dry_run=False``), at least one of ``wing`` or
``project_dirs`` must be set so a caller cannot accidentally prune
every wing in a multi-project palace via auto-detected roots.
"""
if not dry_run and not wing and not project_dirs:
raise ValueError(
"sync apply requires explicit wing= or project_dirs= so it cannot "
"auto-prune every wing in a multi-project palace; pass --wing or "
"a project directory"
)
if project_dirs is not None and not project_dirs:
raise ValueError(
"project_dirs was provided but is empty; pass at least one project "
"root or pass project_dirs=None to auto-detect from drawer metadata"
)
counts = {
"scanned": 0,
"kept": 0,
"gitignored": 0,
"missing": 0,
"no_source": 0,
"out_of_scope": 0,
}
by_source: dict = defaultdict(int)
removable_ids: list = []
removable_sources: set = set()
with mine_palace_lock(palace_path):
col = get_collection(palace_path, create=False)
if project_dirs is not None:
roots = _normalize_project_dirs(project_dirs)
else:
roots = _auto_detect_project_roots(col, wing)
matcher_cache: dict = {}
for drawer_id, meta in _iter_drawer_metadata(col, wing):
counts["scanned"] += 1
bucket = _classify_drawer(meta or {}, matcher_cache, roots, drawer_id)
counts[bucket] += 1
if bucket in ("gitignored", "missing"):
removable_ids.append(drawer_id)
src = (meta or {}).get("source_file")
if src:
removable_sources.add(src)
by_source[src] += 1
report: SyncReport = {
**counts,
"removed_drawers": 0,
"removed_closets": 0,
"dry_run": dry_run,
"by_source": dict(by_source),
}
if dry_run or not removable_ids:
return report
report["removed_drawers"] = _delete_in_batches(col, removable_ids, batch_size, wal_log)
closets_col = None
try:
closets_col = get_closets_collection(palace_path, create=False)
except Exception as exc:
logger.warning("Closet purge skipped (collection unavailable): %s", exc)
closets_removed = 0
if closets_col is not None:
for source_file in removable_sources:
before = (
closets_col.get(where={"source_file": source_file}, include=[]).get("ids") or []
)
if not before:
continue
purge_file_closets(closets_col, source_file)
closets_removed += len(before)
report["removed_closets"] = closets_removed
return report
__all__ = [
"MineAlreadyRunning",
"SyncReport",
"sync_palace",
]
+1128
View File
File diff suppressed because it is too large Load Diff
+15 -1
View File
@@ -1,6 +1,6 @@
# MCP Tools Reference # MCP Tools Reference
Detailed parameter schemas for all 29 MCP tools. Detailed parameter schemas for all 30 MCP tools.
## Palace — Read Tools ## Palace — Read Tools
@@ -114,6 +114,20 @@ Delete a drawer by ID. Irreversible.
--- ---
### `mempalace_sync`
Prune drawers whose source files are gitignored, deleted, or moved. Returns a dry-run report by default; pass `apply=true` to commit deletions.
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `project_dir` | string | No | Project root to scope the sync (auto-detected from drawer metadata if omitted) |
| `wing` | string | No | Limit to one wing |
| `apply` | boolean | No | Actually delete drawers; default is dry-run preview |
**Returns:** `{ scanned, kept, gitignored, missing, no_source, out_of_scope, removed_drawers, removed_closets, dry_run, by_source }`
---
### `mempalace_get_drawer` ### `mempalace_get_drawer`
Fetch a single drawer by ID — returns full content and metadata. Fetch a single drawer by ID — returns full content and metadata.