1420 lines
50 KiB
Python
1420 lines
50 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MemPalace MCP Server — read/write palace access for Claude Code
|
|
================================================================
|
|
Install: claude mcp add mempalace -- python -m mempalace.mcp_server [--palace /path/to/palace]
|
|
|
|
Tools (read):
|
|
mempalace_status — total drawers, wing/room breakdown
|
|
mempalace_list_wings — all wings with drawer counts
|
|
mempalace_list_rooms — rooms within a wing
|
|
mempalace_get_taxonomy — full wing → room → count tree
|
|
mempalace_search — semantic search, optional wing/room filter
|
|
mempalace_check_duplicate — check if content already exists before filing
|
|
|
|
Tools (write):
|
|
mempalace_add_drawer — file verbatim content into a wing/room
|
|
mempalace_delete_drawer — remove a drawer by ID
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import json
|
|
import logging
|
|
import hashlib
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from .config import MempalaceConfig, sanitize_name, sanitize_content
|
|
from .version import __version__
|
|
import chromadb
|
|
from .query_sanitizer import sanitize_query
|
|
from .searcher import search_memories
|
|
from .palace_graph import traverse, find_tunnels, graph_stats
|
|
|
|
from .knowledge_graph import KnowledgeGraph
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stderr)
|
|
logger = logging.getLogger("mempalace_mcp")
|
|
|
|
|
|
def _parse_args():
|
|
parser = argparse.ArgumentParser(description="MemPalace MCP Server")
|
|
parser.add_argument(
|
|
"--palace",
|
|
metavar="PATH",
|
|
help="Path to the palace directory (overrides config file and env var)",
|
|
)
|
|
args, unknown = parser.parse_known_args()
|
|
if unknown:
|
|
logger.debug("Ignoring unknown args: %s", unknown)
|
|
return args
|
|
|
|
|
|
_args = _parse_args()
|
|
|
|
if _args.palace:
|
|
os.environ["MEMPALACE_PALACE_PATH"] = os.path.abspath(_args.palace)
|
|
|
|
_config = MempalaceConfig()
|
|
# Only override KG path when --palace is explicitly provided; otherwise use
|
|
# KnowledgeGraph's default (~/.mempalace/knowledge_graph.sqlite3).
|
|
if _args.palace:
|
|
_kg = KnowledgeGraph(db_path=os.path.join(_config.palace_path, "knowledge_graph.sqlite3"))
|
|
else:
|
|
_kg = KnowledgeGraph()
|
|
|
|
|
|
_client_cache = None
|
|
_collection_cache = None
|
|
_palace_db_inode = 0 # inode of chroma.sqlite3 at cache time
|
|
|
|
|
|
# ==================== WRITE-AHEAD LOG ====================
|
|
# Every write operation is logged to a JSONL file before execution.
|
|
# This provides an audit trail for detecting memory poisoning and
|
|
# enables review/rollback of writes from external or untrusted sources.
|
|
|
|
_WAL_DIR = Path(os.path.expanduser("~/.mempalace/wal"))
|
|
_WAL_DIR.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
_WAL_DIR.chmod(0o700)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
_WAL_FILE = _WAL_DIR / "write_log.jsonl"
|
|
# Pre-create WAL file with restricted permissions to avoid race condition
|
|
if not _WAL_FILE.exists():
|
|
_WAL_FILE.touch(mode=0o600)
|
|
else:
|
|
try:
|
|
_WAL_FILE.chmod(0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
|
|
# Keys whose values should be redacted in WAL entries to avoid logging sensitive content
|
|
_WAL_REDACT_KEYS = frozenset({"content_preview", "entry_preview"})
|
|
|
|
|
|
def _wal_log(operation: str, params: dict, result: dict = None):
|
|
"""Append a write operation to the write-ahead log."""
|
|
# Redact sensitive content from params before logging
|
|
safe_params = {}
|
|
for k, v in params.items():
|
|
if k in _WAL_REDACT_KEYS:
|
|
safe_params[k] = f"[REDACTED {len(v)} chars]" if isinstance(v, str) else "[REDACTED]"
|
|
else:
|
|
safe_params[k] = v
|
|
entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"operation": operation,
|
|
"params": safe_params,
|
|
"result": result,
|
|
}
|
|
try:
|
|
fd = os.open(str(_WAL_FILE), os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600)
|
|
with os.fdopen(fd, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(entry, default=str) + "\n")
|
|
except Exception as e:
|
|
logger.error(f"WAL write failed: {e}")
|
|
|
|
|
|
def _get_client():
|
|
"""Return a ChromaDB PersistentClient, reconnecting if the database changed on disk.
|
|
|
|
Detects palace rebuilds (repair/nuke/purge) by checking the inode of
|
|
chroma.sqlite3. A full rebuild replaces the file, changing the inode.
|
|
"""
|
|
global _client_cache, _collection_cache, _palace_db_inode, _metadata_cache, _metadata_cache_time
|
|
db_path = os.path.join(_config.palace_path, "chroma.sqlite3")
|
|
try:
|
|
current_inode = os.stat(db_path).st_ino
|
|
except OSError:
|
|
current_inode = 0
|
|
|
|
if _client_cache is None or current_inode != _palace_db_inode:
|
|
_client_cache = chromadb.PersistentClient(path=_config.palace_path)
|
|
_collection_cache = None
|
|
_metadata_cache = None
|
|
_metadata_cache_time = 0
|
|
_palace_db_inode = current_inode
|
|
return _client_cache
|
|
|
|
|
|
def _get_collection(create=False):
|
|
"""Return the ChromaDB collection, caching the client between calls."""
|
|
global _collection_cache, _metadata_cache, _metadata_cache_time
|
|
try:
|
|
client = _get_client()
|
|
if create:
|
|
_collection_cache = client.get_or_create_collection(
|
|
_config.collection_name, metadata={"hnsw:space": "cosine"}
|
|
)
|
|
_metadata_cache = None
|
|
_metadata_cache_time = 0
|
|
elif _collection_cache is None:
|
|
_collection_cache = client.get_collection(_config.collection_name)
|
|
_metadata_cache = None
|
|
_metadata_cache_time = 0
|
|
return _collection_cache
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _no_palace():
|
|
return {
|
|
"error": "No palace found",
|
|
"hint": "Run: mempalace init <dir> && mempalace mine <dir>",
|
|
}
|
|
|
|
|
|
# ==================== HELPERS ====================
|
|
|
|
|
|
def _fetch_all_metadata(col, where=None):
|
|
"""Paginate col.get() to avoid the 10K silent truncation limit."""
|
|
total = col.count()
|
|
all_meta = []
|
|
offset = 0
|
|
while offset < total:
|
|
kwargs = {"include": ["metadatas"], "limit": 1000, "offset": offset}
|
|
if where:
|
|
kwargs["where"] = where
|
|
batch = col.get(**kwargs)
|
|
if not batch["metadatas"]:
|
|
break
|
|
all_meta.extend(batch["metadatas"])
|
|
offset += len(batch["metadatas"])
|
|
return all_meta
|
|
|
|
|
|
_metadata_cache = None
|
|
_metadata_cache_time = 0
|
|
_METADATA_CACHE_TTL = 5.0 # seconds
|
|
_MAX_RESULTS = 100 # upper bound for search/list limit params
|
|
|
|
|
|
def _get_cached_metadata(col, where=None):
|
|
"""Return cached metadata if fresh, else fetch and cache."""
|
|
global _metadata_cache, _metadata_cache_time
|
|
now = time.time()
|
|
if (
|
|
where is None
|
|
and _metadata_cache is not None
|
|
and (now - _metadata_cache_time) < _METADATA_CACHE_TTL
|
|
):
|
|
return _metadata_cache
|
|
result = _fetch_all_metadata(col, where=where)
|
|
if where is None:
|
|
_metadata_cache = result
|
|
_metadata_cache_time = now
|
|
return result
|
|
|
|
|
|
# ==================== READ TOOLS ====================
|
|
|
|
|
|
def tool_status():
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
count = col.count()
|
|
wings = {}
|
|
rooms = {}
|
|
result = {
|
|
"total_drawers": count,
|
|
"wings": wings,
|
|
"rooms": rooms,
|
|
"palace_path": _config.palace_path,
|
|
"protocol": PALACE_PROTOCOL,
|
|
"aaak_dialect": AAAK_SPEC,
|
|
}
|
|
try:
|
|
all_meta = _get_cached_metadata(col)
|
|
for m in all_meta:
|
|
w = m.get("wing", "unknown")
|
|
r = m.get("room", "unknown")
|
|
wings[w] = wings.get(w, 0) + 1
|
|
rooms[r] = rooms.get(r, 0) + 1
|
|
except Exception as e:
|
|
logger.exception("tool_status metadata fetch failed")
|
|
result["error"] = str(e)
|
|
result["partial"] = True
|
|
return result
|
|
|
|
|
|
# ── AAAK Dialect Spec ─────────────────────────────────────────────────────────
|
|
# Included in status response so the AI learns it on first wake-up call.
|
|
# Also available via mempalace_get_aaak_spec tool.
|
|
|
|
PALACE_PROTOCOL = """IMPORTANT — MemPalace Memory Protocol:
|
|
1. ON WAKE-UP: Call mempalace_status to load palace overview + AAAK spec.
|
|
2. BEFORE RESPONDING about any person, project, or past event: call mempalace_kg_query or mempalace_search FIRST. Never guess — verify.
|
|
3. IF UNSURE about a fact (name, gender, age, relationship): say "let me check" and query the palace. Wrong is worse than slow.
|
|
4. AFTER EACH SESSION: call mempalace_diary_write to record what happened, what you learned, what matters.
|
|
5. WHEN FACTS CHANGE: call mempalace_kg_invalidate on the old fact, mempalace_kg_add for the new one.
|
|
|
|
This protocol ensures the AI KNOWS before it speaks. Storage is not memory — but storage + this protocol = memory."""
|
|
|
|
AAAK_SPEC = """AAAK is a compressed memory dialect that MemPalace uses for efficient storage.
|
|
It is designed to be readable by both humans and LLMs without decoding.
|
|
|
|
FORMAT:
|
|
ENTITIES: 3-letter uppercase codes. ALC=Alice, JOR=Jordan, RIL=Riley, MAX=Max, BEN=Ben.
|
|
EMOTIONS: *action markers* before/during text. *warm*=joy, *fierce*=determined, *raw*=vulnerable, *bloom*=tenderness.
|
|
STRUCTURE: Pipe-separated fields. FAM: family | PROJ: projects | ⚠: warnings/reminders.
|
|
DATES: ISO format (2026-03-31). COUNTS: Nx = N mentions (e.g., 570x).
|
|
IMPORTANCE: ★ to ★★★★★ (1-5 scale).
|
|
HALLS: hall_facts, hall_events, hall_discoveries, hall_preferences, hall_advice.
|
|
WINGS: wing_user, wing_agent, wing_team, wing_code, wing_myproject, wing_hardware, wing_ue5, wing_ai_research.
|
|
ROOMS: Hyphenated slugs representing named ideas (e.g., chromadb-setup, gpu-pricing).
|
|
|
|
EXAMPLE:
|
|
FAM: ALC→♡JOR | 2D(kids): RIL(18,sports) MAX(11,chess+swimming) | BEN(contributor)
|
|
|
|
Read AAAK naturally — expand codes mentally, treat *markers* as emotional context.
|
|
When WRITING AAAK: use entity codes, mark emotions, keep structure tight."""
|
|
|
|
|
|
def tool_list_wings():
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
wings = {}
|
|
result = {"wings": wings}
|
|
try:
|
|
all_meta = _get_cached_metadata(col)
|
|
for m in all_meta:
|
|
w = m.get("wing", "unknown")
|
|
wings[w] = wings.get(w, 0) + 1
|
|
except Exception as e:
|
|
logger.exception("tool_list_wings metadata fetch failed")
|
|
result["error"] = str(e)
|
|
result["partial"] = True
|
|
return result
|
|
|
|
|
|
def tool_list_rooms(wing: str = None):
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
rooms = {}
|
|
result = {"wing": wing or "all", "rooms": rooms}
|
|
try:
|
|
where = {"wing": wing} if wing else None
|
|
all_meta = _fetch_all_metadata(col, where=where)
|
|
for m in all_meta:
|
|
r = m.get("room", "unknown")
|
|
rooms[r] = rooms.get(r, 0) + 1
|
|
except Exception as e:
|
|
logger.exception("tool_list_rooms metadata fetch failed")
|
|
result["error"] = str(e)
|
|
result["partial"] = True
|
|
return result
|
|
|
|
|
|
def tool_get_taxonomy():
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
taxonomy = {}
|
|
result = {"taxonomy": taxonomy}
|
|
try:
|
|
all_meta = _get_cached_metadata(col)
|
|
for m in all_meta:
|
|
w = m.get("wing", "unknown")
|
|
r = m.get("room", "unknown")
|
|
if w not in taxonomy:
|
|
taxonomy[w] = {}
|
|
taxonomy[w][r] = taxonomy[w].get(r, 0) + 1
|
|
except Exception as e:
|
|
logger.exception("tool_get_taxonomy metadata fetch failed")
|
|
result["error"] = str(e)
|
|
result["partial"] = True
|
|
return result
|
|
|
|
|
|
def tool_search(
|
|
query: str,
|
|
limit: int = 5,
|
|
wing: str = None,
|
|
room: str = None,
|
|
max_distance: float = 1.5,
|
|
min_similarity: float = None,
|
|
context: str = None,
|
|
):
|
|
limit = max(1, min(limit, _MAX_RESULTS))
|
|
# Backwards compat: accept old name
|
|
# Backwards compat: convert old similarity scale (higher=stricter) to
|
|
# distance scale (lower=stricter). Similarity 0.8 → distance 0.2.
|
|
dist = (1.0 - min_similarity) if min_similarity is not None else max_distance
|
|
# Mitigate system prompt contamination (Issue #333)
|
|
sanitized = sanitize_query(query)
|
|
result = search_memories(
|
|
sanitized["clean_query"],
|
|
palace_path=_config.palace_path,
|
|
wing=wing,
|
|
room=room,
|
|
n_results=limit,
|
|
max_distance=dist,
|
|
)
|
|
# Attach sanitizer metadata for transparency
|
|
if sanitized["was_sanitized"]:
|
|
result["query_sanitized"] = True
|
|
result["sanitizer"] = {
|
|
"method": sanitized["method"],
|
|
"original_length": sanitized["original_length"],
|
|
"clean_length": sanitized["clean_length"],
|
|
"clean_query": sanitized["clean_query"],
|
|
}
|
|
if context:
|
|
result["context_received"] = True
|
|
return result
|
|
|
|
|
|
def tool_check_duplicate(content: str, threshold: float = 0.9):
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
try:
|
|
results = col.query(
|
|
query_texts=[content],
|
|
n_results=5,
|
|
include=["metadatas", "documents", "distances"],
|
|
)
|
|
duplicates = []
|
|
if results["ids"] and results["ids"][0]:
|
|
for i, drawer_id in enumerate(results["ids"][0]):
|
|
dist = results["distances"][0][i]
|
|
similarity = round(1 - dist, 3)
|
|
if similarity >= threshold:
|
|
meta = results["metadatas"][0][i]
|
|
doc = results["documents"][0][i]
|
|
duplicates.append(
|
|
{
|
|
"id": drawer_id,
|
|
"wing": meta.get("wing", "?"),
|
|
"room": meta.get("room", "?"),
|
|
"similarity": similarity,
|
|
"content": doc[:200] + "..." if len(doc) > 200 else doc,
|
|
}
|
|
)
|
|
return {
|
|
"is_duplicate": len(duplicates) > 0,
|
|
"matches": duplicates,
|
|
}
|
|
except Exception:
|
|
logger.exception("check_duplicate failed")
|
|
return {"error": "Duplicate check failed"}
|
|
|
|
|
|
def tool_get_aaak_spec():
|
|
"""Return the AAAK dialect specification."""
|
|
return {"aaak_spec": AAAK_SPEC}
|
|
|
|
|
|
def tool_traverse_graph(start_room: str, max_hops: int = 2):
|
|
"""Walk the palace graph from a room. Find connected ideas across wings."""
|
|
max_hops = max(1, min(max_hops, 10))
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
return traverse(start_room, col=col, max_hops=max_hops)
|
|
|
|
|
|
def tool_find_tunnels(wing_a: str = None, wing_b: str = None):
|
|
"""Find rooms that bridge two wings — the hallways connecting domains."""
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
return find_tunnels(wing_a, wing_b, col=col)
|
|
|
|
|
|
def tool_graph_stats():
|
|
"""Palace graph overview: nodes, tunnels, edges, connectivity."""
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
return graph_stats(col=col)
|
|
|
|
|
|
# ==================== WRITE TOOLS ====================
|
|
|
|
|
|
def tool_add_drawer(
|
|
wing: str, room: str, content: str, source_file: str = None, added_by: str = "mcp"
|
|
):
|
|
"""File verbatim content into a wing/room. Checks for duplicates first."""
|
|
global _metadata_cache
|
|
try:
|
|
wing = sanitize_name(wing, "wing")
|
|
room = sanitize_name(room, "room")
|
|
content = sanitize_content(content)
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
col = _get_collection(create=True)
|
|
if not col:
|
|
return _no_palace()
|
|
|
|
drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((wing + room + content[:100]).encode()).hexdigest()[:24]}"
|
|
|
|
_wal_log(
|
|
"add_drawer",
|
|
{
|
|
"drawer_id": drawer_id,
|
|
"wing": wing,
|
|
"room": room,
|
|
"added_by": added_by,
|
|
"content_length": len(content),
|
|
"content_preview": content[:200],
|
|
},
|
|
)
|
|
|
|
# Idempotency: if the deterministic ID already exists, return success as a no-op.
|
|
try:
|
|
existing = col.get(ids=[drawer_id])
|
|
if existing and existing["ids"]:
|
|
return {"success": True, "reason": "already_exists", "drawer_id": drawer_id}
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
col.upsert(
|
|
ids=[drawer_id],
|
|
documents=[content],
|
|
metadatas=[
|
|
{
|
|
"wing": wing,
|
|
"room": room,
|
|
"source_file": source_file or "",
|
|
"chunk_index": 0,
|
|
"added_by": added_by,
|
|
"filed_at": datetime.now().isoformat(),
|
|
}
|
|
],
|
|
)
|
|
_metadata_cache = None
|
|
logger.info(f"Filed drawer: {drawer_id} → {wing}/{room}")
|
|
return {"success": True, "drawer_id": drawer_id, "wing": wing, "room": room}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def tool_delete_drawer(drawer_id: str):
|
|
"""Delete a single drawer by ID."""
|
|
global _metadata_cache
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
existing = col.get(ids=[drawer_id])
|
|
if not existing["ids"]:
|
|
return {"success": False, "error": f"Drawer not found: {drawer_id}"}
|
|
|
|
# Log the deletion with the content being removed for audit trail
|
|
deleted_content = existing.get("documents", [""])[0] if existing.get("documents") else ""
|
|
deleted_meta = existing.get("metadatas", [{}])[0] if existing.get("metadatas") else {}
|
|
_wal_log(
|
|
"delete_drawer",
|
|
{
|
|
"drawer_id": drawer_id,
|
|
"deleted_meta": deleted_meta,
|
|
"content_preview": deleted_content[:200],
|
|
},
|
|
)
|
|
|
|
try:
|
|
col.delete(ids=[drawer_id])
|
|
_metadata_cache = None
|
|
logger.info(f"Deleted drawer: {drawer_id}")
|
|
return {"success": True, "drawer_id": drawer_id}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def tool_get_drawer(drawer_id: str):
|
|
"""Fetch a single drawer by ID. Returns full content and metadata."""
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
try:
|
|
result = col.get(ids=[drawer_id], include=["documents", "metadatas"])
|
|
if not result["ids"]:
|
|
return {"error": f"Drawer not found: {drawer_id}"}
|
|
meta = result["metadatas"][0]
|
|
doc = result["documents"][0]
|
|
return {
|
|
"drawer_id": drawer_id,
|
|
"content": doc,
|
|
"wing": meta.get("wing", ""),
|
|
"room": meta.get("room", ""),
|
|
"metadata": meta,
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def tool_list_drawers(wing: str = None, room: str = None, limit: int = 20, offset: int = 0):
|
|
"""List drawers with pagination. Optional wing/room filter."""
|
|
limit = max(1, min(limit, _MAX_RESULTS))
|
|
offset = max(0, offset)
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
try:
|
|
where = None
|
|
conditions = []
|
|
if wing:
|
|
conditions.append({"wing": wing})
|
|
if room:
|
|
conditions.append({"room": room})
|
|
if len(conditions) == 1:
|
|
where = conditions[0]
|
|
elif len(conditions) > 1:
|
|
where = {"$and": conditions}
|
|
|
|
kwargs = {"include": ["documents", "metadatas"], "limit": limit, "offset": offset}
|
|
if where:
|
|
kwargs["where"] = where
|
|
result = col.get(**kwargs)
|
|
|
|
drawers = []
|
|
for i, did in enumerate(result["ids"]):
|
|
meta = result["metadatas"][i]
|
|
doc = result["documents"][i]
|
|
drawers.append(
|
|
{
|
|
"drawer_id": did,
|
|
"wing": meta.get("wing", ""),
|
|
"room": meta.get("room", ""),
|
|
"content_preview": doc[:200] + "..." if len(doc) > 200 else doc,
|
|
}
|
|
)
|
|
return {
|
|
"drawers": drawers,
|
|
"count": len(drawers),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def tool_update_drawer(drawer_id: str, content: str = None, wing: str = None, room: str = None):
|
|
"""Update an existing drawer's content and/or metadata."""
|
|
global _metadata_cache
|
|
|
|
if content is None and wing is None and room is None:
|
|
return {"success": True, "drawer_id": drawer_id, "noop": True}
|
|
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
try:
|
|
existing = col.get(ids=[drawer_id], include=["documents", "metadatas"])
|
|
if not existing["ids"]:
|
|
return {"success": False, "error": f"Drawer not found: {drawer_id}"}
|
|
|
|
old_meta = existing["metadatas"][0]
|
|
old_doc = existing["documents"][0]
|
|
|
|
new_doc = old_doc
|
|
if content is not None:
|
|
try:
|
|
new_doc = sanitize_content(content)
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
new_meta = dict(old_meta)
|
|
if wing is not None:
|
|
try:
|
|
new_meta["wing"] = sanitize_name(wing, "wing")
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
if room is not None:
|
|
try:
|
|
new_meta["room"] = sanitize_name(room, "room")
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
_wal_log(
|
|
"update_drawer",
|
|
{
|
|
"drawer_id": drawer_id,
|
|
"old_wing": old_meta.get("wing", ""),
|
|
"old_room": old_meta.get("room", ""),
|
|
"new_wing": new_meta.get("wing", ""),
|
|
"new_room": new_meta.get("room", ""),
|
|
"content_changed": content is not None,
|
|
"content_preview": new_doc[:200] if content is not None else None,
|
|
},
|
|
)
|
|
|
|
update_kwargs = {"ids": [drawer_id]}
|
|
if content is not None:
|
|
update_kwargs["documents"] = [new_doc]
|
|
update_kwargs["metadatas"] = [new_meta]
|
|
col.update(**update_kwargs)
|
|
|
|
_metadata_cache = None
|
|
|
|
logger.info(f"Updated drawer: {drawer_id}")
|
|
return {
|
|
"success": True,
|
|
"drawer_id": drawer_id,
|
|
"wing": new_meta.get("wing", ""),
|
|
"room": new_meta.get("room", ""),
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
# ==================== KNOWLEDGE GRAPH ====================
|
|
|
|
|
|
def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"):
|
|
"""Query the knowledge graph for an entity's relationships."""
|
|
try:
|
|
entity = sanitize_name(entity, "entity")
|
|
except ValueError as e:
|
|
return {"error": str(e)}
|
|
if direction not in ("outgoing", "incoming", "both"):
|
|
return {"error": "direction must be 'outgoing', 'incoming', or 'both'"}
|
|
results = _kg.query_entity(entity, as_of=as_of, direction=direction)
|
|
return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)}
|
|
|
|
|
|
def tool_kg_add(
|
|
subject: str, predicate: str, object: str, valid_from: str = None, source_closet: str = None
|
|
):
|
|
"""Add a relationship to the knowledge graph."""
|
|
try:
|
|
subject = sanitize_name(subject, "subject")
|
|
predicate = sanitize_name(predicate, "predicate")
|
|
object = sanitize_name(object, "object")
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
_wal_log(
|
|
"kg_add",
|
|
{
|
|
"subject": subject,
|
|
"predicate": predicate,
|
|
"object": object,
|
|
"valid_from": valid_from,
|
|
"source_closet": source_closet,
|
|
},
|
|
)
|
|
triple_id = _kg.add_triple(
|
|
subject, predicate, object, valid_from=valid_from, source_closet=source_closet
|
|
)
|
|
return {"success": True, "triple_id": triple_id, "fact": f"{subject} → {predicate} → {object}"}
|
|
|
|
|
|
def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = None):
|
|
"""Mark a fact as no longer true (set end date)."""
|
|
try:
|
|
subject = sanitize_name(subject, "subject")
|
|
predicate = sanitize_name(predicate, "predicate")
|
|
object = sanitize_name(object, "object")
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
_wal_log(
|
|
"kg_invalidate",
|
|
{"subject": subject, "predicate": predicate, "object": object, "ended": ended},
|
|
)
|
|
_kg.invalidate(subject, predicate, object, ended=ended)
|
|
return {
|
|
"success": True,
|
|
"fact": f"{subject} → {predicate} → {object}",
|
|
"ended": ended or "today",
|
|
}
|
|
|
|
|
|
def tool_kg_timeline(entity: str = None):
|
|
"""Get chronological timeline of facts, optionally for one entity."""
|
|
if entity is not None:
|
|
try:
|
|
entity = sanitize_name(entity, "entity")
|
|
except ValueError as e:
|
|
return {"error": str(e)}
|
|
results = _kg.timeline(entity)
|
|
return {"entity": entity or "all", "timeline": results, "count": len(results)}
|
|
|
|
|
|
def tool_kg_stats():
|
|
"""Knowledge graph overview: entities, triples, relationship types."""
|
|
return _kg.stats()
|
|
|
|
|
|
# ==================== AGENT DIARY ====================
|
|
|
|
|
|
def tool_diary_write(agent_name: str, entry: str, topic: str = "general"):
|
|
"""
|
|
Write a diary entry for this agent. Each agent gets its own wing
|
|
with a diary room. Entries are timestamped and accumulate over time.
|
|
|
|
This is the agent's personal journal — observations, thoughts,
|
|
what it worked on, what it noticed, what it thinks matters.
|
|
"""
|
|
try:
|
|
agent_name = sanitize_name(agent_name, "agent_name")
|
|
entry = sanitize_content(entry)
|
|
except ValueError as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
wing = f"wing_{agent_name.lower().replace(' ', '_')}"
|
|
room = "diary"
|
|
col = _get_collection(create=True)
|
|
if not col:
|
|
return _no_palace()
|
|
|
|
now = datetime.now()
|
|
entry_id = f"diary_{wing}_{now.strftime('%Y%m%d_%H%M%S')}_{hashlib.sha256(entry[:50].encode()).hexdigest()[:12]}"
|
|
|
|
_wal_log(
|
|
"diary_write",
|
|
{
|
|
"agent_name": agent_name,
|
|
"topic": topic,
|
|
"entry_id": entry_id,
|
|
"entry_preview": entry[:200],
|
|
},
|
|
)
|
|
|
|
try:
|
|
# TODO: Future versions should expand AAAK before embedding to improve
|
|
# semantic search quality. For now, store raw AAAK in metadata so it's
|
|
# preserved, and keep the document as-is for embedding (even though
|
|
# compressed AAAK degrades embedding quality).
|
|
col.add(
|
|
ids=[entry_id],
|
|
documents=[entry],
|
|
metadatas=[
|
|
{
|
|
"wing": wing,
|
|
"room": room,
|
|
"hall": "hall_diary",
|
|
"topic": topic,
|
|
"type": "diary_entry",
|
|
"agent": agent_name,
|
|
"filed_at": now.isoformat(),
|
|
"date": now.strftime("%Y-%m-%d"),
|
|
}
|
|
],
|
|
)
|
|
logger.info(f"Diary entry: {entry_id} → {wing}/diary/{topic}")
|
|
return {
|
|
"success": True,
|
|
"entry_id": entry_id,
|
|
"agent": agent_name,
|
|
"topic": topic,
|
|
"timestamp": now.isoformat(),
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def tool_diary_read(agent_name: str, last_n: int = 10):
|
|
"""
|
|
Read an agent's recent diary entries. Returns the last N entries
|
|
in chronological order — the agent's personal journal.
|
|
"""
|
|
try:
|
|
agent_name = sanitize_name(agent_name, "agent_name")
|
|
except ValueError as e:
|
|
return {"error": str(e)}
|
|
last_n = max(1, min(last_n, 100))
|
|
wing = f"wing_{agent_name.lower().replace(' ', '_')}"
|
|
col = _get_collection()
|
|
if not col:
|
|
return _no_palace()
|
|
|
|
try:
|
|
results = col.get(
|
|
where={"$and": [{"wing": wing}, {"room": "diary"}]},
|
|
include=["documents", "metadatas"],
|
|
limit=10000,
|
|
)
|
|
|
|
if not results["ids"]:
|
|
return {"agent": agent_name, "entries": [], "message": "No diary entries yet."}
|
|
|
|
# Combine and sort by timestamp
|
|
entries = []
|
|
for doc, meta in zip(results["documents"], results["metadatas"]):
|
|
entries.append(
|
|
{
|
|
"date": meta.get("date", ""),
|
|
"timestamp": meta.get("filed_at", ""),
|
|
"topic": meta.get("topic", ""),
|
|
"content": doc,
|
|
}
|
|
)
|
|
|
|
entries.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
entries = entries[:last_n]
|
|
|
|
return {
|
|
"agent": agent_name,
|
|
"entries": entries,
|
|
"total": len(results["ids"]),
|
|
"showing": len(entries),
|
|
}
|
|
except Exception:
|
|
logger.exception("diary_read failed")
|
|
return {"error": "Failed to read diary entries"}
|
|
|
|
|
|
def tool_hook_settings(silent_save: bool = None, desktop_toast: bool = None):
|
|
"""
|
|
Get or set hook behavior settings.
|
|
|
|
- silent_save: True = stop hook saves directly (no MCP clutter),
|
|
False = legacy blocking MCP calls. Default: True.
|
|
- desktop_toast: True = show notify-send desktop toast on save,
|
|
False = terminal-only notification. Default: False.
|
|
|
|
Call with no arguments to see current settings.
|
|
"""
|
|
from .config import MempalaceConfig
|
|
|
|
try:
|
|
config = MempalaceConfig()
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
changed = []
|
|
if silent_save is not None:
|
|
config.set_hook_setting("silent_save", silent_save)
|
|
changed.append(f"silent_save → {silent_save}")
|
|
if desktop_toast is not None:
|
|
config.set_hook_setting("desktop_toast", desktop_toast)
|
|
changed.append(f"desktop_toast → {desktop_toast}")
|
|
|
|
# Re-read to return current state
|
|
try:
|
|
config = MempalaceConfig()
|
|
except Exception:
|
|
pass
|
|
|
|
result = {
|
|
"success": True,
|
|
"settings": {
|
|
"silent_save": config.hook_silent_save,
|
|
"desktop_toast": config.hook_desktop_toast,
|
|
},
|
|
}
|
|
if changed:
|
|
result["updated"] = changed
|
|
return result
|
|
|
|
|
|
def tool_memories_filed_away():
|
|
"""Acknowledge the latest silent checkpoint. Returns a short summary."""
|
|
state_dir = Path.home() / ".mempalace" / "hook_state"
|
|
ack_file = state_dir / "last_checkpoint"
|
|
if not ack_file.is_file():
|
|
return {
|
|
"status": "quiet",
|
|
"message": "No recent journal entry",
|
|
"count": 0,
|
|
"timestamp": None,
|
|
}
|
|
try:
|
|
data = json.loads(ack_file.read_text(encoding="utf-8"))
|
|
ack_file.unlink(missing_ok=True)
|
|
msgs = data.get("msgs", 0)
|
|
return {
|
|
"status": "ok",
|
|
"message": f"\u2726 {msgs} messages tucked into drawers",
|
|
"count": msgs,
|
|
"timestamp": data.get("ts", None),
|
|
}
|
|
except (json.JSONDecodeError, OSError):
|
|
ack_file.unlink(missing_ok=True)
|
|
return {
|
|
"status": "error",
|
|
"message": "\u2726 Journal entry filed in the palace",
|
|
"count": 0,
|
|
"timestamp": None,
|
|
}
|
|
|
|
|
|
# ==================== MCP PROTOCOL ====================
|
|
|
|
TOOLS = {
|
|
"mempalace_status": {
|
|
"description": "Palace overview — total drawers, wing and room counts",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_status,
|
|
},
|
|
"mempalace_list_wings": {
|
|
"description": "List all wings with drawer counts",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_list_wings,
|
|
},
|
|
"mempalace_list_rooms": {
|
|
"description": "List rooms within a wing (or all rooms if no wing given)",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"wing": {"type": "string", "description": "Wing to list rooms for (optional)"},
|
|
},
|
|
},
|
|
"handler": tool_list_rooms,
|
|
},
|
|
"mempalace_get_taxonomy": {
|
|
"description": "Full taxonomy: wing → room → drawer count",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_get_taxonomy,
|
|
},
|
|
"mempalace_get_aaak_spec": {
|
|
"description": "Get the AAAK dialect specification — the compressed memory format MemPalace uses. Call this if you need to read or write AAAK-compressed memories.",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_get_aaak_spec,
|
|
},
|
|
"mempalace_kg_query": {
|
|
"description": "Query the knowledge graph for an entity's relationships. Returns typed facts with temporal validity. E.g. 'Max' → child_of Alice, loves chess, does swimming. Filter by date with as_of to see what was true at a point in time.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"entity": {
|
|
"type": "string",
|
|
"description": "Entity to query (e.g. 'Max', 'MyProject', 'Alice')",
|
|
},
|
|
"as_of": {
|
|
"type": "string",
|
|
"description": "Date filter — only facts valid at this date (YYYY-MM-DD, optional)",
|
|
},
|
|
"direction": {
|
|
"type": "string",
|
|
"description": "outgoing (entity→?), incoming (?→entity), or both (default: both)",
|
|
},
|
|
},
|
|
"required": ["entity"],
|
|
},
|
|
"handler": tool_kg_query,
|
|
},
|
|
"mempalace_kg_add": {
|
|
"description": "Add a fact to the knowledge graph. Subject → predicate → object with optional time window. E.g. ('Max', 'started_school', 'Year 7', valid_from='2026-09-01').",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"subject": {"type": "string", "description": "The entity doing/being something"},
|
|
"predicate": {
|
|
"type": "string",
|
|
"description": "The relationship type (e.g. 'loves', 'works_on', 'daughter_of')",
|
|
},
|
|
"object": {"type": "string", "description": "The entity being connected to"},
|
|
"valid_from": {
|
|
"type": "string",
|
|
"description": "When this became true (YYYY-MM-DD, optional)",
|
|
},
|
|
"source_closet": {
|
|
"type": "string",
|
|
"description": "Closet ID where this fact appears (optional)",
|
|
},
|
|
},
|
|
"required": ["subject", "predicate", "object"],
|
|
},
|
|
"handler": tool_kg_add,
|
|
},
|
|
"mempalace_kg_invalidate": {
|
|
"description": "Mark a fact as no longer true. E.g. ankle injury resolved, job ended, moved house.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"subject": {"type": "string", "description": "Entity"},
|
|
"predicate": {"type": "string", "description": "Relationship"},
|
|
"object": {"type": "string", "description": "Connected entity"},
|
|
"ended": {
|
|
"type": "string",
|
|
"description": "When it stopped being true (YYYY-MM-DD, default: today)",
|
|
},
|
|
},
|
|
"required": ["subject", "predicate", "object"],
|
|
},
|
|
"handler": tool_kg_invalidate,
|
|
},
|
|
"mempalace_kg_timeline": {
|
|
"description": "Chronological timeline of facts. Shows the story of an entity (or everything) in order.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"entity": {
|
|
"type": "string",
|
|
"description": "Entity to get timeline for (optional — omit for full timeline)",
|
|
},
|
|
},
|
|
},
|
|
"handler": tool_kg_timeline,
|
|
},
|
|
"mempalace_kg_stats": {
|
|
"description": "Knowledge graph overview: entities, triples, current vs expired facts, relationship types.",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_kg_stats,
|
|
},
|
|
"mempalace_traverse": {
|
|
"description": "Walk the palace graph from a room. Shows connected ideas across wings — the tunnels. Like following a thread through the palace: start at 'chromadb-setup' in wing_code, discover it connects to wing_myproject (planning) and wing_user (feelings about it).",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"start_room": {
|
|
"type": "string",
|
|
"description": "Room to start from (e.g. 'chromadb-setup', 'riley-school')",
|
|
},
|
|
"max_hops": {
|
|
"type": "integer",
|
|
"description": "How many connections to follow (default: 2)",
|
|
},
|
|
},
|
|
"required": ["start_room"],
|
|
},
|
|
"handler": tool_traverse_graph,
|
|
},
|
|
"mempalace_find_tunnels": {
|
|
"description": "Find rooms that bridge two wings — the hallways connecting different domains. E.g. what topics connect wing_code to wing_team?",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"wing_a": {"type": "string", "description": "First wing (optional)"},
|
|
"wing_b": {"type": "string", "description": "Second wing (optional)"},
|
|
},
|
|
},
|
|
"handler": tool_find_tunnels,
|
|
},
|
|
"mempalace_graph_stats": {
|
|
"description": "Palace graph overview: total rooms, tunnel connections, edges between wings.",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_graph_stats,
|
|
},
|
|
"mempalace_search": {
|
|
"description": "Semantic search. Returns verbatim drawer content with similarity scores. IMPORTANT: 'query' must contain ONLY search keywords. Use 'context' for background. Results with cosine distance > max_distance are filtered out.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {
|
|
"type": "string",
|
|
"description": "Short search query ONLY — keywords or a question. Max 200 chars recommended.",
|
|
"maxLength": 500,
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max results (default 5)",
|
|
"minimum": 1,
|
|
"maximum": 100,
|
|
},
|
|
"wing": {"type": "string", "description": "Filter by wing (optional)"},
|
|
"room": {"type": "string", "description": "Filter by room (optional)"},
|
|
"max_distance": {
|
|
"type": "number",
|
|
"description": "Max cosine distance threshold (0=identical, 2=opposite). Results further than this are dropped. Lower = stricter. Default 1.5. Set to 0 to disable.",
|
|
},
|
|
"context": {
|
|
"type": "string",
|
|
"description": "Background context for the search (optional). NOT used for embedding — only for future re-ranking.",
|
|
},
|
|
},
|
|
"required": ["query"],
|
|
},
|
|
"handler": tool_search,
|
|
},
|
|
"mempalace_check_duplicate": {
|
|
"description": "Check if content already exists in the palace before filing",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {"type": "string", "description": "Content to check"},
|
|
"threshold": {
|
|
"type": "number",
|
|
"description": "Similarity threshold 0-1 (default 0.9)",
|
|
},
|
|
},
|
|
"required": ["content"],
|
|
},
|
|
"handler": tool_check_duplicate,
|
|
},
|
|
"mempalace_add_drawer": {
|
|
"description": "File verbatim content into the palace. Checks for duplicates first.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"wing": {"type": "string", "description": "Wing (project name)"},
|
|
"room": {
|
|
"type": "string",
|
|
"description": "Room (aspect: backend, decisions, meetings...)",
|
|
},
|
|
"content": {
|
|
"type": "string",
|
|
"description": "Verbatim content to store — exact words, never summarized",
|
|
},
|
|
"source_file": {"type": "string", "description": "Where this came from (optional)"},
|
|
"added_by": {"type": "string", "description": "Who is filing this (default: mcp)"},
|
|
},
|
|
"required": ["wing", "room", "content"],
|
|
},
|
|
"handler": tool_add_drawer,
|
|
},
|
|
"mempalace_delete_drawer": {
|
|
"description": "Delete a drawer by ID. Irreversible.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"drawer_id": {"type": "string", "description": "ID of the drawer to delete"},
|
|
},
|
|
"required": ["drawer_id"],
|
|
},
|
|
"handler": tool_delete_drawer,
|
|
},
|
|
"mempalace_get_drawer": {
|
|
"description": "Fetch a single drawer by ID — returns full content and metadata.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"drawer_id": {"type": "string", "description": "ID of the drawer to fetch"},
|
|
},
|
|
"required": ["drawer_id"],
|
|
},
|
|
"handler": tool_get_drawer,
|
|
},
|
|
"mempalace_list_drawers": {
|
|
"description": "List drawers with pagination. Optional wing/room filter. Returns IDs, wings, rooms, and content previews.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"wing": {"type": "string", "description": "Filter by wing (optional)"},
|
|
"room": {"type": "string", "description": "Filter by room (optional)"},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max results per page (default 20, max 100)",
|
|
"minimum": 1,
|
|
"maximum": 100,
|
|
},
|
|
"offset": {
|
|
"type": "integer",
|
|
"description": "Offset for pagination (default 0)",
|
|
"minimum": 0,
|
|
},
|
|
},
|
|
},
|
|
"handler": tool_list_drawers,
|
|
},
|
|
"mempalace_update_drawer": {
|
|
"description": "Update an existing drawer's content and/or metadata (wing, room). Fetches existing drawer first; returns error if not found.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"drawer_id": {"type": "string", "description": "ID of the drawer to update"},
|
|
"content": {
|
|
"type": "string",
|
|
"description": "New content (optional — omit to keep existing)",
|
|
},
|
|
"wing": {
|
|
"type": "string",
|
|
"description": "New wing (optional — omit to keep existing)",
|
|
},
|
|
"room": {
|
|
"type": "string",
|
|
"description": "New room (optional — omit to keep existing)",
|
|
},
|
|
},
|
|
"required": ["drawer_id"],
|
|
},
|
|
"handler": tool_update_drawer,
|
|
},
|
|
"mempalace_diary_write": {
|
|
"description": "Write to your personal agent diary in AAAK format. Your observations, thoughts, what you worked on, what matters. Each agent has their own diary with full history. Write in AAAK for compression — e.g. 'SESSION:2026-04-04|built.palace.graph+diary.tools|ALC.req:agent.diaries.in.aaak|★★★'. Use entity codes from the AAAK spec.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"agent_name": {
|
|
"type": "string",
|
|
"description": "Your name — each agent gets their own diary wing",
|
|
},
|
|
"entry": {
|
|
"type": "string",
|
|
"description": "Your diary entry in AAAK format — compressed, entity-coded, emotion-marked",
|
|
},
|
|
"topic": {
|
|
"type": "string",
|
|
"description": "Topic tag (optional, default: general)",
|
|
},
|
|
},
|
|
"required": ["agent_name", "entry"],
|
|
},
|
|
"handler": tool_diary_write,
|
|
},
|
|
"mempalace_diary_read": {
|
|
"description": "Read your recent diary entries (in AAAK). See what past versions of yourself recorded — your journal across sessions.",
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"agent_name": {
|
|
"type": "string",
|
|
"description": "Your name — each agent gets their own diary wing",
|
|
},
|
|
"last_n": {
|
|
"type": "integer",
|
|
"description": "Number of recent entries to read (default: 10)",
|
|
},
|
|
},
|
|
"required": ["agent_name"],
|
|
},
|
|
"handler": tool_diary_read,
|
|
},
|
|
"mempalace_hook_settings": {
|
|
"description": (
|
|
"Get or set hook behavior. silent_save: True = save directly "
|
|
"(no MCP clutter), False = legacy blocking. desktop_toast: "
|
|
"True = show desktop notification. Call with no args to view."
|
|
),
|
|
"input_schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"silent_save": {
|
|
"type": "boolean",
|
|
"description": "True = silent direct save, False = blocking MCP calls",
|
|
},
|
|
"desktop_toast": {
|
|
"type": "boolean",
|
|
"description": "True = show desktop toast via notify-send",
|
|
},
|
|
},
|
|
},
|
|
"handler": tool_hook_settings,
|
|
},
|
|
"mempalace_memories_filed_away": {
|
|
"description": "Check if a recent palace checkpoint was saved. Returns message count and timestamp.",
|
|
"input_schema": {"type": "object", "properties": {}},
|
|
"handler": tool_memories_filed_away,
|
|
},
|
|
}
|
|
|
|
|
|
SUPPORTED_PROTOCOL_VERSIONS = [
|
|
"2025-11-25",
|
|
"2025-06-18",
|
|
"2025-03-26",
|
|
"2024-11-05",
|
|
]
|
|
|
|
|
|
def handle_request(request):
|
|
method = request.get("method") or ""
|
|
params = request.get("params") or {}
|
|
req_id = request.get("id")
|
|
|
|
if method == "initialize":
|
|
client_version = params.get("protocolVersion", SUPPORTED_PROTOCOL_VERSIONS[-1])
|
|
negotiated = (
|
|
client_version
|
|
if client_version in SUPPORTED_PROTOCOL_VERSIONS
|
|
else SUPPORTED_PROTOCOL_VERSIONS[0]
|
|
)
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"result": {
|
|
"protocolVersion": negotiated,
|
|
"capabilities": {"tools": {}},
|
|
"serverInfo": {"name": "mempalace", "version": __version__},
|
|
},
|
|
}
|
|
elif method == "ping":
|
|
return {"jsonrpc": "2.0", "id": req_id, "result": {}}
|
|
elif method.startswith("notifications/"):
|
|
# Notifications (no id) never get a response per JSON-RPC spec
|
|
return None
|
|
elif method == "tools/list":
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"result": {
|
|
"tools": [
|
|
{"name": n, "description": t["description"], "inputSchema": t["input_schema"]}
|
|
for n, t in TOOLS.items()
|
|
]
|
|
},
|
|
}
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
tool_args = params.get("arguments") or {}
|
|
if tool_name not in TOOLS:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"error": {"code": -32601, "message": f"Unknown tool: {tool_name}"},
|
|
}
|
|
# Whitelist arguments to declared schema properties only.
|
|
# Prevents callers from spoofing internal params like added_by/source_file.
|
|
schema_props = TOOLS[tool_name]["input_schema"].get("properties", {})
|
|
tool_args = {k: v for k, v in tool_args.items() if k in schema_props}
|
|
# Coerce argument types based on input_schema.
|
|
# MCP JSON transport may deliver integers as floats or strings;
|
|
# ChromaDB and Python slicing require native int.
|
|
for key, value in list(tool_args.items()):
|
|
prop_schema = schema_props.get(key, {})
|
|
declared_type = prop_schema.get("type")
|
|
try:
|
|
if declared_type == "integer" and not isinstance(value, int):
|
|
tool_args[key] = int(value)
|
|
elif declared_type == "number" and not isinstance(value, (int, float)):
|
|
tool_args[key] = float(value)
|
|
except (ValueError, TypeError):
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"error": {"code": -32602, "message": f"Invalid value for parameter '{key}'"},
|
|
}
|
|
try:
|
|
result = TOOLS[tool_name]["handler"](**tool_args)
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]},
|
|
}
|
|
except Exception:
|
|
logger.exception(f"Tool error in {tool_name}")
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"error": {"code": -32000, "message": "Internal tool error"},
|
|
}
|
|
|
|
# Notifications (missing id) must never get a response
|
|
if req_id is None:
|
|
return None
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"error": {"code": -32601, "message": f"Unknown method: {method}"},
|
|
}
|
|
|
|
|
|
def main():
|
|
logger.info("MemPalace MCP Server starting...")
|
|
while True:
|
|
try:
|
|
line = sys.stdin.readline()
|
|
if not line:
|
|
break
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
request = json.loads(line)
|
|
response = handle_request(request)
|
|
if response is not None:
|
|
sys.stdout.write(json.dumps(response) + "\n")
|
|
sys.stdout.flush()
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Server error: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|