Files
mempalace/mempalace/cli.py
T

971 lines
34 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
MemPalace — Give your AI a memory. No API key required.
Two ways to ingest:
Projects: mempalace mine ~/projects/my_app (code, docs, notes)
Conversations: mempalace mine <convo-dir> --mode convos (Claude Code, Claude.ai, ChatGPT, Slack exports)
Same palace. Same search. Different ingest strategies.
Commands:
mempalace init <dir> Detect rooms from folder structure
mempalace split <dir> Split concatenated mega-files into per-session files
mempalace mine <dir> Mine project files (default)
mempalace mine <dir> --mode convos Mine conversation exports
mempalace search "query" Find anything, exact words
mempalace mcp Show MCP setup command
mempalace wake-up Show L0 + L1 wake-up context
mempalace wake-up --wing my_app Wake-up for a specific project
mempalace status Show what's been filed
Examples:
mempalace init ~/projects/my_app
mempalace mine ~/projects/my_app
mempalace mine ~/.claude/projects/-Users-you-Projects-my_app --mode convos --wing my_app
mempalace search "why did we switch to GraphQL"
mempalace search "pricing discussion" --wing my_app --room costs
"""
import os
import sys
import shlex
import argparse
from pathlib import Path
from .config import MempalaceConfig
from .version import __version__
_MEMPALACE_PROJECT_FILES = ("mempalace.yaml", "entities.json")
def _ensure_mempalace_files_gitignored(project_dir) -> bool:
"""If project_dir is a git repo, ensure MemPalace's per-project files
are listed in .gitignore so they don't get committed by accident.
Returns True if .gitignore was updated, False otherwise. Issue #185:
`mempalace init` writes mempalace.yaml + entities.json into the
project root, where they previously had no protection against being
staged into git.
"""
from pathlib import Path
project_path = Path(project_dir).expanduser().resolve()
if not (project_path / ".git").exists():
return False
gitignore = project_path / ".gitignore"
existing = gitignore.read_text() if gitignore.exists() else ""
existing_lines = {line.strip() for line in existing.splitlines()}
missing = [p for p in _MEMPALACE_PROJECT_FILES if p not in existing_lines]
if not missing:
return False
prefix = "" if not existing or existing.endswith("\n") else "\n"
block = prefix + "\n# MemPalace per-project files (issue #185)\n" + "\n".join(missing) + "\n"
with open(gitignore, "a") as f:
f.write(block)
print(f" Added {', '.join(missing)} to {gitignore.name}")
return True
def cmd_init(args):
import json
from pathlib import Path
from .entity_detector import confirm_entities
from .project_scanner import discover_entities
from .room_detector_local import detect_rooms_local
cfg = MempalaceConfig()
# Resolve entity-detection languages: --lang overrides config.
lang_arg = getattr(args, "lang", None)
if lang_arg:
languages = [s.strip() for s in lang_arg.split(",") if s.strip()] or ["en"]
cfg.set_entity_languages(languages)
else:
languages = cfg.entity_languages
languages_tuple = tuple(languages)
# Optional phase-2 LLM provider (opt-in via --llm).
llm_provider = None
if getattr(args, "llm", False):
from .llm_client import LLMError, get_provider
try:
llm_provider = get_provider(
name=args.llm_provider,
model=args.llm_model,
endpoint=args.llm_endpoint,
api_key=args.llm_api_key,
)
except LLMError as e:
print(f" ERROR: {e}", file=sys.stderr)
sys.exit(2)
ok, msg = llm_provider.check_available()
if not ok:
print(
f" ERROR: LLM provider '{args.llm_provider}' unavailable: {msg}",
file=sys.stderr,
)
sys.exit(2)
print(f" LLM refinement enabled: {args.llm_provider}/{args.llm_model}")
# Pass 1: discover entities — manifests + git authors first, prose detection
# as supplement for names mentioned only in docs/notes. Optional phase-2
# LLM refinement runs inside discover_entities when llm_provider is given.
print(f"\n Scanning for entities in: {args.dir}")
if languages_tuple != ("en",):
print(f" Languages: {', '.join(languages_tuple)}")
detected = discover_entities(args.dir, languages=languages_tuple, llm_provider=llm_provider)
total = (
len(detected["people"])
+ len(detected["projects"])
+ len(detected.get("topics", []))
+ len(detected["uncertain"])
)
if total > 0:
confirmed = confirm_entities(detected, yes=getattr(args, "yes", False))
# Save confirmed entities to <project>/entities.json (per-project
# audit trail — user can inspect or hand-edit) AND merge into the
# global registry the miner reads at mine time. Topics are kept
# separately so the miner can later compute cross-wing tunnels
# from shared topics (see palace_graph.compute_topic_tunnels).
if confirmed["people"] or confirmed["projects"] or confirmed.get("topics"):
project_path = Path(args.dir).expanduser().resolve()
entities_path = project_path / "entities.json"
2026-04-24 05:25:34 +00:00
with open(entities_path, "w", encoding="utf-8") as f:
json.dump(confirmed, f, indent=2, ensure_ascii=False)
print(f" Entities saved: {entities_path}")
from .miner import add_to_known_entities
# Wing matches the default produced by ``room_detector_local``
# (folder basename) and the miner fallback in ``load_config``.
# Used by the topics_by_wing map so cross-wing tunnels can be
# computed at mine time.
wing = project_path.name
registry_path = add_to_known_entities(confirmed, wing=wing)
print(f" Registry updated: {registry_path}")
else:
print(" No entities detected — proceeding with directory-based rooms.")
# Pass 2: detect rooms from folder structure
detect_rooms_local(project_dir=args.dir, yes=getattr(args, "yes", False))
cfg.init()
# Pass 3: protect git repos from accidentally committing per-project files
_ensure_mempalace_files_gitignored(args.dir)
# Pass 4: offer to run mine immediately. The directory just had its
# rooms + entities set up, so 99% of users will mine next anyway —
# asking here removes the "remember to type the next command" friction.
# `--auto-mine` skips the prompt and mines automatically; `--yes` is
# SCOPED to entity auto-accept and does NOT imply mining.
_maybe_run_mine_after_init(args, cfg)
def _format_size_mb(num_bytes: int) -> str:
"""Render a byte count as a human-readable size for the mine estimate.
< 1 MB rounds up to ``<1 MB`` so users never see a misleading ``0 MB``
on small projects. Otherwise reports an integer megabyte count.
"""
if num_bytes <= 0:
return "<1 MB"
mb = num_bytes / (1024 * 1024)
if mb < 1:
return "<1 MB"
return f"{mb:.0f} MB"
def _maybe_run_mine_after_init(args, cfg) -> None:
"""Prompt the user to mine the directory just initialised, or auto-mine
when ``--auto-mine`` was passed. Extracted so the prompt path is
unit-testable.
Behaviour matrix:
- default (no flags) — prompt, default Yes, mine in-process if accepted
- ``--yes`` — entity auto-accept only; STILL prompts for the mine step
- ``--auto-mine`` — skip the mine prompt and mine directly
- ``--yes --auto-mine`` — fully non-interactive
Mine errors are surfaced (not swallowed): a failing mine exits with a
non-zero status via :func:`sys.exit` so downstream scripts can see it.
The pre-scan that produces the file-count estimate is reused as the
mine input so we never walk the corpus twice.
"""
from .miner import mine, scan_project
project_dir = args.dir
auto_mine = bool(getattr(args, "auto_mine", False))
# Single corpus walk: this scan feeds BOTH the "what would be mined"
# estimate the user sees in the prompt AND the file list mine() will
# process. We pass the result into mine() via the `files` kwarg so it
# doesn't re-walk the tree.
try:
scanned_files = scan_project(project_dir)
file_count = len(scanned_files)
total_bytes = 0
for fp in scanned_files:
try:
total_bytes += fp.stat().st_size
except OSError:
# Skip files that vanished between scan and stat — mine()
# will skip them too.
continue
size_str = _format_size_mb(total_bytes)
except Exception:
scanned_files = None
file_count = None
size_str = None
# Show the scope estimate BEFORE the prompt so the user knows what
# they are agreeing to. On a real corpus mine takes minutes; hitting
# Enter on a default-Y prompt with no size cue is a footgun.
if isinstance(file_count, int):
if size_str:
print(f" ~{file_count} files (~{size_str}) would be mined into this palace.\n")
else:
print(f" ~{file_count} files would be mined into this palace.\n")
if not auto_mine:
try:
answer = input(" Mine this directory now? [Y/n] ").strip().lower()
except EOFError:
# Non-interactive stdin (e.g. piped) — treat like decline so
# we don't block. User can re-run with --auto-mine to opt in.
answer = "n"
if answer not in ("", "y", "yes"):
print(f"\n Skipped. Run `mempalace mine {shlex.quote(project_dir)}` when ready.")
return
palace_path = cfg.palace_path
try:
mine(
project_dir=project_dir,
palace_path=palace_path,
files=scanned_files,
)
except KeyboardInterrupt:
# mine() handles its own SIGINT summary + sys.exit(130); re-raise
# any KeyboardInterrupt that escapes (shouldn't happen) so the
# shell still sees a clean interrupt rather than a swallowed one.
raise
except Exception as e:
print(f"\n ERROR: mine failed: {e}", file=sys.stderr)
sys.exit(1)
def cmd_mine(args):
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
include_ignored = []
for raw in args.include_ignored or []:
include_ignored.extend(part.strip() for part in raw.split(",") if part.strip())
if args.mode == "convos":
from .convo_miner import mine_convos
mine_convos(
convo_dir=args.dir,
palace_path=palace_path,
wing=args.wing,
agent=args.agent,
limit=args.limit,
dry_run=args.dry_run,
extract_mode=args.extract,
)
else:
from .miner import mine
mine(
project_dir=args.dir,
palace_path=palace_path,
wing_override=args.wing,
agent=args.agent,
limit=args.limit,
dry_run=args.dry_run,
respect_gitignore=not args.no_gitignore,
include_ignored=include_ignored,
)
def cmd_sweep(args):
"""Sweep a transcript file or directory.
The sweeper deduplicates against its own prior writes via
deterministic drawer IDs + a timestamp cursor. It does NOT currently
coordinate with the file-level miners (miner.py / convo_miner.py) —
those produce char-chunked drawers without compatible message
metadata, so running both miners may store overlapping content under
different IDs.
"""
from .sweeper import sweep, sweep_directory
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
target = os.path.expanduser(args.target)
if os.path.isfile(target):
result = sweep(target, palace_path)
print(
f" Swept {target}: +{result['drawers_added']} new, "
f"{result['drawers_already_present']} already present, "
f"{result['drawers_skipped']} skipped (< cursor)."
)
elif os.path.isdir(target):
result = sweep_directory(target, palace_path)
print(
f" Swept {result['files_succeeded']}/{result['files_attempted']} "
f"files from {target}: +{result['drawers_added']} new, "
f"{result['drawers_already_present']} already present, "
f"{result['drawers_skipped']} skipped (< cursor)."
)
failures = result.get("failures") or []
if failures:
print(
f" WARNING: {len(failures)} file(s) failed to sweep - see stderr / logs for details.",
file=sys.stderr,
)
sys.exit(2)
else:
print(f" ERROR: Not a file or directory: {target}", file=sys.stderr)
sys.exit(1)
def cmd_search(args):
from .searcher import search, SearchError
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
try:
search(
query=args.query,
palace_path=palace_path,
wing=args.wing,
room=args.room,
n_results=args.results,
)
except SearchError:
sys.exit(1)
def cmd_wakeup(args):
"""Show L0 (identity) + L1 (essential story) — the wake-up context."""
from .layers import MemoryStack
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
stack = MemoryStack(palace_path=palace_path)
text = stack.wake_up(wing=args.wing)
tokens = len(text) // 4
print(f"Wake-up text (~{tokens} tokens):")
print("=" * 50)
print(text)
def cmd_split(args):
"""Split concatenated transcript mega-files into per-session files."""
from .split_mega_files import main as split_main
import sys
# Rebuild argv for split_mega_files argparse
# Expand ~ and resolve to absolute path so split_mega_files sees a real path
argv = ["--source", str(Path(args.dir).expanduser().resolve())]
if args.output_dir:
argv += ["--output-dir", args.output_dir]
if args.dry_run:
argv.append("--dry-run")
if args.min_sessions != 2:
argv += ["--min-sessions", str(args.min_sessions)]
old_argv = sys.argv
sys.argv = ["mempalace split"] + argv
try:
split_main()
finally:
sys.argv = old_argv
def cmd_migrate(args):
"""Migrate palace from a different ChromaDB version."""
from .migrate import migrate
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
migrate(
palace_path=palace_path,
dry_run=args.dry_run,
confirm=getattr(args, "yes", False),
)
def cmd_status(args):
from .miner import status
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
status(palace_path=palace_path)
def cmd_repair(args):
"""Rebuild palace vector index from SQLite metadata."""
import shutil
from .backends.chroma import ChromaBackend
2026-04-12 22:23:13 +00:00
from .migrate import confirm_destructive_action, contains_palace_database
from .repair import TruncationDetected, check_extraction_safety
2026-04-12 22:21:42 +00:00
palace_path = os.path.abspath(
os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
2026-04-12 22:21:42 +00:00
)
db_path = os.path.join(palace_path, "chroma.sqlite3")
if not os.path.isdir(palace_path):
print(f"\n No palace found at {palace_path}")
return
2026-04-12 22:23:13 +00:00
if not contains_palace_database(palace_path):
2026-04-12 22:21:42 +00:00
print(f"\n No palace database found at {db_path}")
return
print(f"\n{'=' * 55}")
print(" MemPalace Repair")
print(f"{'=' * 55}\n")
print(f" Palace: {palace_path}")
backend = ChromaBackend()
# Try to read existing drawers
try:
col = backend.get_collection(palace_path, "mempalace_drawers")
total = col.count()
print(f" Drawers found: {total}")
except Exception as e:
print(f" Error reading palace: {e}")
print(" Cannot recover — palace may need to be re-mined from source files.")
return
if total == 0:
print(" Nothing to repair.")
return
2026-04-12 22:20:27 -03:00
if not confirm_destructive_action(
"Repair", palace_path, assume_yes=getattr(args, "yes", False)
):
2026-04-12 22:21:42 +00:00
return
# Extract all drawers in batches
print("\n Extracting drawers...")
batch_size = 5000
all_ids = []
all_docs = []
all_metas = []
offset = 0
while offset < total:
batch = col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"])
if not batch["ids"]:
break
all_ids.extend(batch["ids"])
all_docs.extend(batch["documents"])
all_metas.extend(batch["metadatas"])
offset += len(batch["ids"])
print(f" Extracted {len(all_ids)} drawers")
# ── #1208 guard ──────────────────────────────────────────────────
# Cross-check against the SQLite ground truth before doing anything
# destructive. Catches the user-reported case where chromadb's
# collection-layer get() silently caps at 10,000 rows even on much
# larger palaces (e.g. after manual HNSW quarantine). Override with
# --confirm-truncation-ok only after independently verifying the
# extraction count is real.
try:
check_extraction_safety(
palace_path,
len(all_ids),
confirm_truncation_ok=getattr(args, "confirm_truncation_ok", False),
)
except TruncationDetected as e:
print(e.message)
return
# Backup and rebuild
palace_path = os.path.normpath(palace_path)
backup_path = palace_path + ".backup"
if os.path.exists(backup_path):
2026-04-12 22:23:13 +00:00
if not contains_palace_database(backup_path):
print(
2026-04-12 22:27:40 +00:00
" Backup validation failed: backup path exists but does not contain chroma.sqlite3. "
2026-04-12 22:23:13 +00:00
f"Please remove or rename: {backup_path}"
)
2026-04-12 22:21:42 +00:00
return
shutil.rmtree(backup_path)
print(f" Backing up to {backup_path}...")
shutil.copytree(palace_path, backup_path)
print(" Rebuilding collection...")
backend.delete_collection(palace_path, "mempalace_drawers")
new_col = backend.create_collection(palace_path, "mempalace_drawers")
filed = 0
for i in range(0, len(all_ids), batch_size):
batch_ids = all_ids[i : i + batch_size]
batch_docs = all_docs[i : i + batch_size]
batch_metas = all_metas[i : i + batch_size]
new_col.add(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
filed += len(batch_ids)
print(f" Re-filed {filed}/{len(all_ids)} drawers...")
print(f"\n Repair complete. {filed} drawers rebuilt.")
print(f" Backup saved at {backup_path}")
print(f"\n{'=' * 55}\n")
def cmd_hook(args):
"""Run hook logic: reads JSON from stdin, outputs JSON to stdout."""
from .hooks_cli import run_hook
run_hook(hook_name=args.hook, harness=args.harness)
def cmd_instructions(args):
"""Output skill instructions to stdout."""
from .instructions_cli import run_instructions
run_instructions(name=args.name)
def cmd_mcp(args):
"""Show how to wire MemPalace into MCP-capable hosts."""
base_server_cmd = "mempalace-mcp"
if args.palace:
resolved_palace = str(Path(args.palace).expanduser())
server_cmd = f"{base_server_cmd} --palace {shlex.quote(resolved_palace)}"
else:
server_cmd = base_server_cmd
print("MemPalace MCP quick setup:")
print(f" claude mcp add mempalace -- {server_cmd}")
print("\nRun the server directly:")
print(f" {server_cmd}")
if not args.palace:
print("\nOptional custom palace:")
print(f" claude mcp add mempalace -- {base_server_cmd} --palace /path/to/palace")
print(f" {base_server_cmd} --palace /path/to/palace")
def cmd_compress(args):
"""Compress drawers in a wing using AAAK Dialect."""
from .backends.chroma import ChromaBackend
from .dialect import Dialect
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
# Load dialect (with optional entity config)
config_path = args.config
if not config_path:
for candidate in ["entities.json", os.path.join(palace_path, "entities.json")]:
if os.path.exists(candidate):
config_path = candidate
break
if config_path and os.path.exists(config_path):
dialect = Dialect.from_config(config_path)
print(f" Loaded entity config: {config_path}")
else:
dialect = Dialect()
# Connect to palace
backend = ChromaBackend()
try:
col = backend.get_collection(palace_path, "mempalace_drawers")
except Exception:
print(f"\n No palace found at {palace_path}")
print(" Run: mempalace init <dir> then mempalace mine <dir>")
sys.exit(1)
# Query drawers in batches to avoid SQLite variable limit (~999)
where = {"wing": args.wing} if args.wing else None
_BATCH = 500
docs, metas, ids = [], [], []
offset = 0
while True:
try:
kwargs = {
"include": ["documents", "metadatas"],
"limit": _BATCH,
"offset": offset,
}
if where:
kwargs["where"] = where
batch = col.get(**kwargs)
except Exception as e:
if not docs:
print(f"\n Error reading drawers: {e}")
sys.exit(1)
break
batch_docs = batch.get("documents", [])
if not batch_docs:
break
docs.extend(batch_docs)
metas.extend(batch.get("metadatas", []))
ids.extend(batch.get("ids", []))
offset += len(batch_docs)
if len(batch_docs) < _BATCH:
break
if not docs:
wing_label = f" in wing '{args.wing}'" if args.wing else ""
print(f"\n No drawers found{wing_label}.")
return
print(
f"\n Compressing {len(docs)} drawers"
+ (f" in wing '{args.wing}'" if args.wing else "")
+ "..."
)
print()
total_original = 0
total_compressed = 0
compressed_entries = []
for doc, meta, doc_id in zip(docs, metas, ids):
compressed = dialect.compress(doc, metadata=meta)
stats = dialect.compression_stats(doc, compressed)
total_original += stats["original_chars"]
total_compressed += stats["summary_chars"]
compressed_entries.append((doc_id, compressed, meta, stats))
if args.dry_run:
wing_name = meta.get("wing", "?")
room_name = meta.get("room", "?")
source = Path(meta.get("source_file", "?")).name
print(f" [{wing_name}/{room_name}] {source}")
print(
f" {stats['original_tokens_est']}t -> {stats['summary_tokens_est']}t ({stats['size_ratio']:.1f}x)"
)
print(f" {compressed}")
print()
# Store compressed versions (unless dry-run)
if not args.dry_run:
try:
comp_col = backend.get_or_create_collection(palace_path, "mempalace_compressed")
for doc_id, compressed, meta, stats in compressed_entries:
comp_meta = dict(meta)
comp_meta["compression_ratio"] = round(stats["size_ratio"], 1)
comp_meta["original_tokens"] = stats["original_tokens_est"]
comp_col.upsert(
ids=[doc_id],
documents=[compressed],
metadatas=[comp_meta],
)
print(
f" Stored {len(compressed_entries)} compressed drawers in 'mempalace_compressed' collection."
)
except Exception as e:
print(f" Error storing compressed drawers: {e}")
sys.exit(1)
# Summary
ratio = total_original / max(total_compressed, 1)
# Estimate tokens from char count (~3.8 chars/token for English text)
orig_tokens = max(1, int(total_original / 3.8))
comp_tokens = max(1, int(total_compressed / 3.8))
print(f" Total: {orig_tokens:,}t -> {comp_tokens:,}t ({ratio:.1f}x compression)")
if args.dry_run:
print(" (dry run -- nothing stored)")
def main():
version_label = f"MemPalace {__version__}"
parser = argparse.ArgumentParser(
description="MemPalace — Give your AI a memory. No API key required.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"{version_label}\n\n{__doc__}",
)
parser.add_argument(
"--version",
action="version",
version=version_label,
help="Show version and exit",
)
parser.add_argument(
"--palace",
default=None,
help="Where the palace lives (default: from ~/.mempalace/config.json or ~/.mempalace/palace)",
)
sub = parser.add_subparsers(dest="command")
# init
p_init = sub.add_parser("init", help="Detect rooms from your folder structure")
p_init.add_argument("dir", help="Project directory to set up")
p_init.add_argument(
"--yes",
action="store_true",
help="Auto-accept all detected entities (non-interactive)",
)
p_init.add_argument(
"--auto-mine",
action="store_true",
help=(
"Skip the post-init mine prompt and run mine automatically. "
"Combine with --yes for a fully non-interactive setup."
),
)
p_init.add_argument(
"--lang",
default=None,
help=(
"Comma-separated language codes for entity detection "
"(e.g. 'en' or 'en,pt-br'). Defaults to value from config "
"(MEMPALACE_ENTITY_LANGUAGES env var or config.json), or 'en'. "
"When given, the value is also persisted to config.json."
),
)
p_init.add_argument(
"--llm",
action="store_true",
help=(
"Enable LLM-assisted entity refinement (opt-in, local-first). "
"Runs after manifest/git/regex detection, asking the configured "
"provider to reclassify ambiguous candidates. "
"Ctrl-C during refinement returns partial results."
),
)
p_init.add_argument(
"--llm-provider",
default="ollama",
choices=["ollama", "openai-compat", "anthropic"],
help="LLM provider (default: ollama). Use --llm to enable.",
)
p_init.add_argument(
"--llm-model",
default="gemma4:e4b",
help="Model name for the chosen provider (default: gemma4:e4b for Ollama).",
)
p_init.add_argument(
"--llm-endpoint",
default=None,
help=(
"Provider endpoint URL. Default for Ollama: http://localhost:11434. "
"Required for openai-compat."
),
)
p_init.add_argument(
"--llm-api-key",
default=None,
help=(
"API key for the provider. For anthropic, defaults to $ANTHROPIC_API_KEY; "
"for openai-compat, defaults to $OPENAI_API_KEY."
),
)
# mine
p_mine = sub.add_parser("mine", help="Mine files into the palace")
p_mine.add_argument("dir", help="Directory to mine")
p_mine.add_argument(
"--mode",
choices=["projects", "convos"],
default="projects",
help="Ingest mode: 'projects' for code/docs (default), 'convos' for chat exports",
)
p_mine.add_argument("--wing", default=None, help="Wing name (default: directory name)")
p_mine.add_argument(
"--no-gitignore",
action="store_true",
help="Don't respect .gitignore files when scanning project files",
)
p_mine.add_argument(
"--include-ignored",
action="append",
default=[],
help="Always scan these project-relative paths even if ignored; repeat or pass comma-separated paths",
)
p_mine.add_argument(
"--agent",
default="mempalace",
help="Your name — recorded on every drawer (default: mempalace)",
)
p_mine.add_argument("--limit", type=int, default=0, help="Max files to process (0 = all)")
p_mine.add_argument(
"--dry-run", action="store_true", help="Show what would be filed without filing"
)
p_mine.add_argument(
"--extract",
choices=["exchange", "general"],
default="exchange",
help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)",
)
# sweep
p_sweep = sub.add_parser(
"sweep",
help="Tandem miner: catch anything the primary miner missed "
"(message-level, timestamp-coordinated, idempotent)",
)
p_sweep.add_argument(
"target",
help="A .jsonl transcript file, or a directory to scan recursively",
)
# search
p_search = sub.add_parser("search", help="Find anything, exact words")
p_search.add_argument("query", help="What to search for")
p_search.add_argument("--wing", default=None, help="Limit to one project")
p_search.add_argument("--room", default=None, help="Limit to one room")
p_search.add_argument("--results", type=int, default=5, help="Number of results")
# compress
p_compress = sub.add_parser(
"compress", help="Compress drawers using AAAK Dialect (~30x reduction)"
)
p_compress.add_argument("--wing", default=None, help="Wing to compress (default: all wings)")
p_compress.add_argument(
"--dry-run", action="store_true", help="Preview compression without storing"
)
p_compress.add_argument(
"--config", default=None, help="Entity config JSON (e.g. entities.json)"
)
# wake-up
p_wakeup = sub.add_parser("wake-up", help="Show L0 + L1 wake-up context (~600-900 tokens)")
p_wakeup.add_argument("--wing", default=None, help="Wake-up for a specific project/wing")
# split
p_split = sub.add_parser(
"split",
help="Split concatenated transcript mega-files into per-session files (run before mine)",
)
p_split.add_argument("dir", help="Directory containing transcript files")
p_split.add_argument(
"--output-dir",
default=None,
help="Write split files here (default: same directory as source files)",
)
p_split.add_argument(
"--dry-run",
action="store_true",
help="Show what would be split without writing files",
)
p_split.add_argument(
"--min-sessions",
type=int,
default=2,
help="Only split files containing at least N sessions (default: 2)",
)
# hook
p_hook = sub.add_parser(
"hook",
help="Run hook logic (reads JSON from stdin, outputs JSON to stdout)",
)
hook_sub = p_hook.add_subparsers(dest="hook_action")
p_hook_run = hook_sub.add_parser("run", help="Execute a hook")
p_hook_run.add_argument(
"--hook",
required=True,
choices=["session-start", "stop", "precompact"],
help="Hook name to run",
)
p_hook_run.add_argument(
"--harness",
required=True,
choices=["claude-code", "codex"],
help="Harness type (determines stdin JSON format)",
)
# instructions
p_instructions = sub.add_parser(
"instructions",
help="Output skill instructions to stdout",
)
instructions_sub = p_instructions.add_subparsers(dest="instructions_name")
for instr_name in ["init", "search", "mine", "help", "status"]:
instructions_sub.add_parser(instr_name, help=f"Output {instr_name} instructions")
# repair
p_repair = sub.add_parser(
"repair",
help="Rebuild palace vector index from stored data (fixes segfaults after corruption)",
)
p_repair.add_argument(
"--yes", action="store_true", help="Skip confirmation for destructive changes"
)
p_repair.add_argument(
"--confirm-truncation-ok",
action="store_true",
help=(
"Override the #1208 safety guard. Required when chromadb's collection-layer "
"extraction returns exactly 10,000 drawers and the SQLite ground-truth check "
"either matches or can't be read. Use only after independently confirming "
"the palace really contains that count."
),
)
# mcp
sub.add_parser(
"mcp",
help="Show MCP setup command for connecting MemPalace to your AI client",
)
# status
# migrate
p_migrate = sub.add_parser(
"migrate",
help="Migrate palace from a different ChromaDB version (fixes 3.0.0 → 3.1.0 upgrade)",
)
p_migrate.add_argument(
"--dry-run",
action="store_true",
help="Show what would be migrated without changing anything",
)
2026-04-12 22:20:27 -03:00
p_migrate.add_argument(
"--yes", action="store_true", help="Skip confirmation for destructive changes"
)
sub.add_parser("status", help="Show what's been filed")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Handle two-level subcommands
if args.command == "hook":
if not getattr(args, "hook_action", None):
p_hook.print_help()
return
cmd_hook(args)
return
if args.command == "instructions":
name = getattr(args, "instructions_name", None)
if not name:
p_instructions.print_help()
return
args.name = name
cmd_instructions(args)
return
dispatch = {
"init": cmd_init,
"mine": cmd_mine,
"split": cmd_split,
"search": cmd_search,
"sweep": cmd_sweep,
"mcp": cmd_mcp,
"compress": cmd_compress,
"wake-up": cmd_wakeup,
"repair": cmd_repair,
"migrate": cmd_migrate,
"status": cmd_status,
}
dispatch[args.command](args)
if __name__ == "__main__":
main()