merge: develop into hnsw-repair (resolve chroma.py + test_backends.py conflicts)

Develop (post-#1162 lock-plumbing era) refactored the per-open quarantine
pass into ChromaBackend._prepare_palace_for_open. This branch's
inline-expansion form added quarantine_invalid_hnsw_metadata as a third
check, plus a "discard from _quarantined_paths on inode swap" guard so
re-opens against a different physical DB re-run quarantine.

Resolution merges both:

- _prepare_palace_for_open now also calls quarantine_invalid_hnsw_metadata,
  gated by the same _quarantined_paths set.
- _client keeps the inode_changed -> _quarantined_paths.discard() guard
  before calling the helper, so a fresh inode triggers a fresh pass.
- make_client collapses to a single _prepare_palace_for_open() call.
- test_backends.py keeps both the pickle (#1285) and shutil (develop)
  imports — both are used.
This commit is contained in:
Igor Lins e Silva
2026-05-07 07:48:45 -03:00
45 changed files with 3380 additions and 165 deletions
+71
View File
@@ -0,0 +1,71 @@
"""Stdio UTF-8 reconfiguration helper for Windows entry points.
Python on Windows defaults stdio to the system ANSI codepage
(cp1252/cp1251/cp950 depending on locale), which mojibakes UTF-8 input
or output the moment a non-Latin character shows up. Every console
entry point that touches stdio needs to fix this on Windows -- the MCP
server, the CLI, the fact_checker `--stdin` mode -- so the
reconfigure code lives here in one place to keep the per-stream
errors policies aligned across them.
Per-stream errors policy is caller-chosen:
* MCP server uses ``strict`` on stdout/stderr because everything written
there is server-controlled JSON-RPC; any encode failure is a real bug
the operator wants loud.
* CLI / fact_checker use ``replace`` on stdout/stderr because they print
verbatim drawer text that may contain surrogate halves round-tripped
from filenames -- ``strict`` would crash mid-print.
* All callers use ``surrogateescape`` on stdin so a malformed byte from
a redirected file or a misbehaving client survives as a lone surrogate
the consumer's parser surfaces, instead of ``UnicodeDecodeError``
killing the read loop on the first bad byte.
"""
from __future__ import annotations
import sys
from typing import Callable, Optional
def reconfigure_stdio_utf8_on_windows(
*,
stdin_errors: str = "surrogateescape",
stdout_errors: str = "strict",
stderr_errors: str = "strict",
on_failure: Optional[Callable[[str, BaseException], None]] = None,
) -> None:
"""Reconfigure stdio to UTF-8 on Windows. No-op elsewhere.
Args:
stdin_errors: errors= policy for stdin.reconfigure().
stdout_errors: errors= policy for stdout.reconfigure().
stderr_errors: errors= policy for stderr.reconfigure().
on_failure: optional ``(stream_name, exc) -> None`` callback for
streams whose ``reconfigure`` raises (e.g. Jupyter-replaced
streams that lack the method-shape we expect). Defaults to a
``WARNING:`` line on the original sys.stderr.
"""
if sys.platform != "win32":
return
policies = (
("stdin", stdin_errors),
("stdout", stdout_errors),
("stderr", stderr_errors),
)
for name, errors in policies:
stream = getattr(sys, name, None)
reconfigure = getattr(stream, "reconfigure", None)
if reconfigure is None:
continue
try:
reconfigure(encoding="utf-8", errors=errors)
except Exception as exc: # noqa: BLE001 -- last-resort guard
if on_failure is not None:
on_failure(name, exc)
else:
print(
f"WARNING: Could not reconfigure {name} to UTF-8: {exc}",
file=sys.stderr,
)
+109 -22
View File
@@ -1,5 +1,6 @@
"""ChromaDB-backed MemPalace storage backend (RFC 001 reference implementation)."""
import contextlib
import datetime as _dt
import logging
import os
@@ -764,11 +765,58 @@ def _as_list(v: Any) -> list:
return [v]
class ChromaCollection(BaseCollection):
"""Thin adapter translating ChromaDB dict returns into typed results."""
def _close_client(client) -> None:
"""Call ``PersistentClient.close()`` if available, swallow otherwise.
def __init__(self, collection):
chromadb 1.5.x exposes ``Client.close()`` to release rust-side SQLite
file locks; older versions relied on GC. Try/except keeps forward-compat.
"""
if client is None:
return
try:
client.close()
except Exception:
logger.debug("client.close() unavailable or failed", exc_info=True)
class ChromaCollection(BaseCollection):
"""Thin adapter translating ChromaDB dict returns into typed results.
When ``palace_path`` is set, all write methods (``add``, ``upsert``,
``update``, ``delete``) acquire ``mine_palace_lock(palace_path)`` for the
duration of the underlying chromadb call. This serializes MCP and other
direct-backend writers against ``mempalace mine`` and against each other,
closing the race between concurrent writers that triggers ChromaDB's
multi-threaded HNSW corruption (#974/#965).
The lock is the same primitive used by ``miner.mine()`` so re-entrant
acquisition from inside the mine pipeline (mine -> _mine_body ->
collection.upsert) is short-circuited by the per-thread guard inside
``mine_palace_lock`` — no self-deadlock.
``palace_path=None`` disables the wrapping, preserving the legacy
no-lock behaviour for callers that construct a ``ChromaCollection``
directly without going through ``ChromaBackend``.
"""
def __init__(self, collection, palace_path: Optional[str] = None):
self._collection = collection
self._palace_path = palace_path
@contextlib.contextmanager
def _write_lock(self):
"""Acquire ``mine_palace_lock`` for the configured palace, if any.
No-op (yields immediately) when ``self._palace_path`` is None.
"""
if self._palace_path is None:
yield
return
# Late import — palace.py imports ChromaBackend from this module.
from ..palace import mine_palace_lock
with mine_palace_lock(self._palace_path):
yield
# ------------------------------------------------------------------
# Writes
@@ -780,7 +828,8 @@ class ChromaCollection(BaseCollection):
kwargs["metadatas"] = metadatas
if embeddings is not None:
kwargs["embeddings"] = embeddings
self._collection.add(**kwargs)
with self._write_lock():
self._collection.add(**kwargs)
def upsert(self, *, documents, ids, metadatas=None, embeddings=None):
kwargs: dict[str, Any] = {"documents": documents, "ids": ids}
@@ -788,7 +837,8 @@ class ChromaCollection(BaseCollection):
kwargs["metadatas"] = metadatas
if embeddings is not None:
kwargs["embeddings"] = embeddings
self._collection.upsert(**kwargs)
with self._write_lock():
self._collection.upsert(**kwargs)
def update(
self,
@@ -807,7 +857,8 @@ class ChromaCollection(BaseCollection):
kwargs["metadatas"] = metadatas
if embeddings is not None:
kwargs["embeddings"] = embeddings
self._collection.update(**kwargs)
with self._write_lock():
self._collection.update(**kwargs)
# ------------------------------------------------------------------
# Reads
@@ -951,7 +1002,8 @@ class ChromaCollection(BaseCollection):
kwargs["ids"] = ids
if where is not None:
kwargs["where"] = where
self._collection.delete(**kwargs)
with self._write_lock():
self._collection.delete(**kwargs)
def count(self):
return self._collection.count()
@@ -1065,7 +1117,7 @@ class ChromaBackend(BaseBackend):
db_path = os.path.join(palace_path, "chroma.sqlite3")
# DB was present when cache was built but is now missing → invalidate.
if cached is not None and not os.path.isfile(db_path):
self._clients.pop(palace_path, None)
_close_client(self._clients.pop(palace_path, None))
self._freshness.pop(palace_path, None)
cached = None
cached_inode, cached_mtime = 0, 0.0
@@ -1081,13 +1133,14 @@ class ChromaBackend(BaseBackend):
)
if cached is None or inode_changed or mtime_changed or mtime_appeared:
_fix_blob_seq_ids(palace_path)
# An inode swap means we are reopening a different physical DB
# (post-restore, fresh palace at the same path, etc.); drop the
# per-process gate so the quarantine pre-checks run again
# against the new disk state instead of trusting cached "we
# already cleaned this path" credit from the prior inode.
if inode_changed:
ChromaBackend._quarantined_paths.discard(palace_path)
if palace_path not in ChromaBackend._quarantined_paths:
quarantine_invalid_hnsw_metadata(palace_path)
quarantine_stale_hnsw(palace_path)
ChromaBackend._quarantined_paths.add(palace_path)
ChromaBackend._prepare_palace_for_open(palace_path)
cached = chromadb.PersistentClient(path=palace_path)
self._clients[palace_path] = cached
# Re-stat after the client constructor runs: chromadb creates
@@ -1123,6 +1176,36 @@ class ChromaBackend(BaseBackend):
# property; locking would add cost without correctness gain.
_quarantined_paths: set[str] = set()
@staticmethod
def _prepare_palace_for_open(palace_path: str) -> None:
"""Run the pre-open safety pass shared by :meth:`make_client` and
:meth:`_client`.
Three steps, all required before constructing a ``PersistentClient``:
1. ``_fix_blob_seq_ids`` — repairs the BLOB seq_id quirk that bites
certain chromadb migrations.
2. ``quarantine_invalid_hnsw_metadata`` — renames aside any HNSW
``index_metadata.pickle`` that fails to load, so chromadb opens
against an empty index instead of crashing on the unloadable
pickle (#1266 / PR #1285).
3. ``quarantine_stale_hnsw`` — also gated by :attr:`_quarantined_paths`
so it fires once per palace per process. This is the SIGSEGV
prevention path for stale HNSW segments (see #1121, #1132, #1263);
wiring it through this helper means CLI mining, search, repair,
and status all benefit, not just the legacy ``make_client``
callers.
Idempotent: safe to call from any code path that is about to open or
re-open a palace. The ``_quarantined_paths`` gate prevents thrash on
hot paths (e.g. ``_client()`` is called on every backend operation).
"""
_fix_blob_seq_ids(palace_path)
if palace_path not in ChromaBackend._quarantined_paths:
quarantine_invalid_hnsw_metadata(palace_path)
quarantine_stale_hnsw(palace_path)
ChromaBackend._quarantined_paths.add(palace_path)
@staticmethod
def make_client(palace_path: str):
"""Create a fresh ``PersistentClient`` (fixes BLOB seq_ids first).
@@ -1135,11 +1218,7 @@ class ChromaBackend(BaseBackend):
:attr:`_quarantined_paths` for the rationale (cold-start protection
vs. runtime thrash on steady-write daemons).
"""
_fix_blob_seq_ids(palace_path)
if palace_path not in ChromaBackend._quarantined_paths:
quarantine_invalid_hnsw_metadata(palace_path)
quarantine_stale_hnsw(palace_path)
ChromaBackend._quarantined_paths.add(palace_path)
ChromaBackend._prepare_palace_for_open(palace_path)
return chromadb.PersistentClient(path=palace_path)
@staticmethod
@@ -1205,17 +1284,25 @@ class ChromaBackend(BaseBackend):
else:
collection = client.get_collection(collection_name, **ef_kwargs)
_pin_hnsw_threads(collection)
return ChromaCollection(collection)
return ChromaCollection(collection, palace_path=palace_path)
def close_palace(self, palace) -> None:
"""Drop cached handles for ``palace``. Accepts ``PalaceRef`` or legacy path str."""
"""Drop cached handles for ``palace`` and release its SQLite file lock.
Accepts ``PalaceRef`` or legacy path str. chromadb's rust-side file
lock is held until ``PersistentClient.close()`` is called, so plain
dict eviction would leave the palace path unreopenable and
unremovable in the same process.
"""
path = palace.local_path if isinstance(palace, PalaceRef) else palace
if path is None:
return
self._clients.pop(path, None)
_close_client(self._clients.pop(path, None))
self._freshness.pop(path, None)
def close(self) -> None:
for client in self._clients.values():
_close_client(client)
self._clients.clear()
self._freshness.clear()
self._closed = True
@@ -1256,7 +1343,7 @@ class ChromaBackend(BaseBackend):
},
**ef_kwargs,
)
return ChromaCollection(collection)
return ChromaCollection(collection, palace_path=palace_path)
def _normalize_get_collection_args(args, kwargs):
+28 -4
View File
@@ -232,6 +232,13 @@ def cmd_init(args):
from .project_scanner import discover_entities
from .room_detector_local import detect_rooms_local
# Honor --palace (issue #1313): without this, init silently ignored the
# flag and always used ~/.mempalace. Mirror the env-var pattern used by
# mcp_server.py so every downstream read of ``cfg.palace_path`` (Pass 0,
# cfg.init(), the post-init mine) routes to the user-specified location.
if getattr(args, "palace", None):
os.environ["MEMPALACE_PALACE_PATH"] = os.path.abspath(os.path.expanduser(args.palace))
cfg = MempalaceConfig()
# Resolve entity-detection languages: --lang overrides config.
@@ -310,8 +317,7 @@ def cmd_init(args):
)
except LLMError as e:
print(
f" LLM init failed ({e}). "
f"Running heuristics-only — pass --no-llm to silence this."
f" LLM init failed ({e}). Running heuristics-only — pass --no-llm to silence this."
)
# Pass 0: detect whether the corpus is AI-dialogue. Writes
@@ -912,7 +918,7 @@ def cmd_compress(args):
# Store compressed versions (unless dry-run)
if not args.dry_run:
try:
comp_col = backend.get_or_create_collection(palace_path, "mempalace_compressed")
comp_col = backend.get_or_create_collection(palace_path, "mempalace_closets")
for doc_id, compressed, meta, stats in compressed_entries:
comp_meta = dict(meta)
comp_meta["compression_ratio"] = round(stats["size_ratio"], 1)
@@ -923,7 +929,7 @@ def cmd_compress(args):
metadatas=[comp_meta],
)
print(
f" Stored {len(compressed_entries)} compressed drawers in 'mempalace_compressed' collection."
f" Stored {len(compressed_entries)} compressed drawers in 'mempalace_closets' collection."
)
except Exception as e:
print(f" Error storing compressed drawers: {e}")
@@ -939,7 +945,25 @@ def cmd_compress(args):
print(" (dry run -- nothing stored)")
def _reconfigure_stdio_utf8_on_windows():
"""Decode stdio as UTF-8 on Windows for the primary `mempalace` CLI.
Thin wrapper around the shared helper in ``mempalace._stdio``. The CLI
overrides stdout/stderr to ``replace`` because ``mempalace search``
prints verbatim drawer text that may carry surrogate halves
round-tripped from filenames -- ``strict`` would crash mid-print and
lose the rest of the search result block. stdin keeps the default
``surrogateescape`` so a redirected non-UTF-8 file does not kill the
read on the first bad byte.
"""
from ._stdio import reconfigure_stdio_utf8_on_windows
reconfigure_stdio_utf8_on_windows(stdout_errors="replace", stderr_errors="replace")
def main():
_reconfigure_stdio_utf8_on_windows()
version_label = f"MemPalace {__version__}"
parser = argparse.ArgumentParser(
description="MemPalace — Give your AI a memory. No API key required.",
+31 -11
View File
@@ -40,6 +40,7 @@ import json
import os
import re
import time
import urllib.parse
import urllib.request
import urllib.error
from datetime import datetime
@@ -101,6 +102,14 @@ class LLMConfig:
self.endpoint = (endpoint or os.environ.get("LLM_ENDPOINT", "")).rstrip("/")
self.key = key or os.environ.get("LLM_KEY", "")
self.model = model or os.environ.get("LLM_MODEL", "")
if self.endpoint:
# Privacy-by-architecture: reject file:// and other non-HTTP schemes
# so a misconfigured endpoint cannot exfiltrate local files.
scheme = urllib.parse.urlparse(self.endpoint).scheme.lower()
if scheme not in ("http", "https"):
raise ValueError(
f"LLM_ENDPOINT must use http:// or https:// (got scheme {scheme!r})"
)
def missing(self) -> list:
missing = []
@@ -221,17 +230,28 @@ def regenerate_closets(
print("No drawers in palace.")
return {"processed": 0}
all_data = drawers_col.get(limit=total, include=["documents", "metadatas"])
by_source = {}
for doc_id, doc, meta in zip(all_data["ids"], all_data["documents"], all_data["metadatas"]):
source = meta.get("source_file", "unknown")
w = meta.get("wing", "")
if wing and w != wing:
continue
if source not in by_source:
by_source[source] = {"drawer_ids": [], "content": [], "meta": meta}
by_source[source]["drawer_ids"].append(doc_id)
by_source[source]["content"].append(doc)
# Paginate the fetch — a single get(limit=total, ...) blows through
# SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) on large palaces and
# crashes inside chromadb (see #802, #850, #1073).
by_source: dict = {}
batch_size = 5000
offset = 0
while offset < total:
batch = drawers_col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"])
ids = batch["ids"]
if not ids:
break
for doc_id, doc, meta in zip(ids, batch["documents"], batch["metadatas"]):
meta = meta or {}
source = meta.get("source_file", "unknown")
w = meta.get("wing", "")
if wing and w != wing:
continue
if source not in by_source:
by_source[source] = {"drawer_ids": [], "content": [], "meta": meta}
by_source[source]["drawer_ids"].append(doc_id)
by_source[source]["content"].append(doc)
offset += len(ids)
sources = list(by_source.keys())
if sample > 0:
+32
View File
@@ -81,6 +81,38 @@ def sanitize_kg_value(value: str, field_name: str = "value") -> str:
return value
# ISO-8601 date validator for knowledge-graph temporal parameters
# (as_of, valid_from, valid_to, ended). Parameterized queries already
# prevent SQL injection, but unvalidated date strings silently miss
# every row — callers cannot distinguish "no fact at this time" from
# "your date format was unrecognized." Require full YYYY-MM-DD: KG
# queries compare TEXT dates lexicographically, so partials like "2026"
# would re-introduce silent empty results (e.g. "2026-01-01" <= "2026"
# is False), defeating the purpose of validation.
_ISO_DATE_RE = re.compile(r"^\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12]\d|3[01])$")
def sanitize_iso_date(value, field_name: str = "date"):
"""Validate an ISO-8601 date string, accepting None or empty as-is.
Accepts only ``YYYY-MM-DD``. Raises ValueError on any other
non-empty input so the MCP layer can surface a clear error to the
caller instead of silently returning empty results. Partial dates
(``YYYY``, ``YYYY-MM``) are rejected because KG queries compare
TEXT dates lexicographically and would silently exclude valid facts.
"""
if value is None or value == "":
return value
if not isinstance(value, str):
raise ValueError(f"{field_name} must be a string")
value = value.strip()
if not _ISO_DATE_RE.match(value):
raise ValueError(
f"{field_name}={value!r} is not a valid ISO-8601 date " f"(expected YYYY-MM-DD)"
)
return value
def sanitize_content(value: str, max_length: int = 100_000) -> str:
"""Validate drawer/diary content length."""
if not isinstance(value, str) or not value.strip():
+4 -1
View File
@@ -11,6 +11,7 @@ Same palace as project mining. Different ingest strategy.
import os
import sys
import hashlib
import logging
from pathlib import Path
from datetime import datetime
from collections import defaultdict
@@ -24,6 +25,8 @@ from .palace import (
mine_lock,
)
logger = logging.getLogger("mempalace_mcp")
# Cached hall keywords — avoids re-reading config per drawer
_HALL_KEYWORDS_CACHE = None
@@ -331,7 +334,7 @@ def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extr
try:
collection.delete(where={"source_file": source_file})
except Exception:
pass
logger.debug("Stale-drawer purge failed for %s", source_file, exc_info=True)
# Batch chunks into bounded upserts so large transcripts keep most of
# the embedding speedup without one huge Chroma/SQLite request. Keep
+1 -1
View File
@@ -89,7 +89,7 @@ def dedup_source_group(col, drawer_ids, threshold=DEFAULT_THRESHOLD, dry_run=Tru
kept = []
to_delete = []
for did, doc, meta in items:
for did, doc, _meta in items:
if not doc or len(doc) < 20:
to_delete.append(did)
continue
+1 -1
View File
@@ -873,7 +873,7 @@ class Dialect:
for date_key in sorted(by_date.keys()):
lines.append(f"=MOMENTS[{date_key}]=")
for z, fnum in by_date[date_key]:
for z, _fnum in by_date[date_key]:
entities = []
for p in z.get("people", []):
code = self.encode_entity(p)
+27 -2
View File
@@ -16,6 +16,7 @@ Usage:
"""
import json
import os
import re
import urllib.request
import urllib.parse
@@ -320,11 +321,35 @@ class EntityRegistry:
self._path.parent.chmod(0o700)
except (OSError, NotImplementedError):
pass
self._path.write_text(json.dumps(self._data, indent=2), encoding="utf-8")
# Atomic write: serialize to a sibling temp file in the same dir
# (so os.replace stays on one filesystem), fsync, then rename over
# the target. A crash mid-write leaves the previous registry intact
# instead of a half-written file or an empty file from the truncate.
payload = json.dumps(self._data, indent=2)
tmp_path = self._path.with_name(self._path.name + ".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(payload)
f.flush()
os.fsync(f.fileno())
try:
self._path.chmod(0o600)
tmp_path.chmod(0o600)
except (OSError, NotImplementedError):
pass
os.replace(tmp_path, self._path)
# On ext4 (and similar) the rename's durability across power loss
# requires an additional fsync on the parent directory. Without it,
# the kernel can ack the rename and a crash reverts to the state
# where the temp file is present and the target is at the old version.
try:
dir_fd = os.open(str(self._path.parent), os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except OSError:
# Windows and some special filesystems reject directory fds — they
# have different durability semantics on rename anyway.
pass
@staticmethod
def _empty() -> dict:
+20
View File
@@ -27,6 +27,7 @@ Usage:
from __future__ import annotations
import logging
import os
import re
from datetime import datetime, timezone
@@ -35,6 +36,8 @@ from datetime import datetime, timezone
# ~/.mempalace/known_entities.json on every check_text call.
from .miner import _load_known_entities_raw
logger = logging.getLogger("mempalace_mcp")
# Narrow detection patterns — parse "X is Y's Z" and "X's Z is Y".
# Names are captured greedily as word sequences (letters + optional
@@ -214,6 +217,7 @@ def _check_kg_contradictions(text: str, palace_path: str) -> list:
try:
facts = kg.query_entity(subject, direction="outgoing")
except Exception:
logger.debug("KG lookup failed for subject %r", subject, exc_info=True)
continue
if not facts:
continue
@@ -303,11 +307,27 @@ def _edit_distance(s1: str, s2: str) -> int:
return prev[-1]
def _reconfigure_stdio_utf8_on_windows():
"""Decode --stdin payload as UTF-8 on Windows.
Thin wrapper around the shared helper in ``mempalace._stdio``. Mirrors
the primary CLI policy: stdout/stderr use ``replace`` because
extracted fact text can include surrogate halves round-tripped from
filenames -- ``strict`` would raise UnicodeEncodeError mid-print.
stdin keeps the default ``surrogateescape``.
"""
from ._stdio import reconfigure_stdio_utf8_on_windows
reconfigure_stdio_utf8_on_windows(stdout_errors="replace", stderr_errors="replace")
if __name__ == "__main__":
import argparse
import json
import sys
_reconfigure_stdio_utf8_on_windows()
parser = argparse.ArgumentParser(
description="Check text against known facts in the MemPalace palace.",
epilog="Exits 0 when no issues found, 1 when one or more issues detected.",
+28
View File
@@ -16,6 +16,23 @@ from pathlib import Path
SAVE_INTERVAL = 15
STATE_DIR = Path.home() / ".mempalace" / "hook_state"
PALACE_ROOT = Path.home() / ".mempalace"
def _palace_root_exists() -> bool:
"""User-removable kill-switch.
If ~/.mempalace/ does not exist, the user has explicitly cleared it.
All hook side effects (logging, state dir creation, mining, ingestion)
must respect this and short-circuit BEFORE touching disk — including
before logging the short-circuit itself.
Uses ``is_dir()`` rather than ``exists()`` so a stray regular file at
``~/.mempalace`` (or a broken symlink) is treated as absent — otherwise
the kill-switch would be bypassed and ``STATE_DIR.mkdir()`` would later
crash on ``NotADirectoryError``.
"""
return PALACE_ROOT.is_dir()
def _mempalace_python() -> str:
@@ -142,6 +159,8 @@ _state_dir_initialized = False
def _log(message: str):
"""Append to hook state log file."""
if not _palace_root_exists():
return # User removed the palace; do not recreate by logging
global _state_dir_initialized
try:
if not _state_dir_initialized:
@@ -550,6 +569,9 @@ def _wing_from_transcript_path(transcript_path: str) -> str:
def hook_stop(data: dict, harness: str):
"""Stop hook: block every N messages for auto-save."""
if not _palace_root_exists():
_output({})
return
parsed = _parse_harness_input(data, harness)
session_id = parsed["session_id"]
stop_hook_active = parsed["stop_hook_active"]
@@ -659,6 +681,9 @@ def hook_stop(data: dict, harness: str):
def hook_session_start(data: dict, harness: str):
"""Session start hook: initialize session tracking state."""
if not _palace_root_exists():
_output({})
return
parsed = _parse_harness_input(data, harness)
session_id = parsed["session_id"]
@@ -673,6 +698,9 @@ def hook_session_start(data: dict, harness: str):
def hook_precompact(data: dict, harness: str):
"""Precompact hook: mine transcript synchronously, then allow compaction."""
if not _palace_root_exists():
_output({})
return
parsed = _parse_harness_input(data, harness)
session_id = parsed["session_id"]
transcript_path = parsed["transcript_path"]
+9
View File
@@ -171,6 +171,15 @@ class KnowledgeGraph:
add_triple("Max", "does", "swimming", valid_from="2025-01-01")
add_triple("Alice", "worried_about", "Max injury", valid_from="2026-01", valid_to="2026-02")
"""
# Reject inverted intervals: a triple with valid_to < valid_from
# would never satisfy `valid_from <= as_of AND valid_to >= as_of`,
# so it would be invisible to every query — silently corrupt.
if valid_from is not None and valid_to is not None and valid_to < valid_from:
raise ValueError(
f"valid_to={valid_to!r} is before valid_from={valid_from!r}; "
"an inverted interval would be invisible to every KG query"
)
sub_id = self._entity_id(subject)
obj_id = self._entity_id(obj)
pred = predicate.lower().replace(" ", "_")
+6 -2
View File
@@ -124,6 +124,8 @@ class Layer1:
# Score each drawer: prefer high importance, recent filing
scored = []
for doc, meta in zip(docs, metas):
meta = meta or {}
doc = doc or ""
importance = 3
# Try multiple metadata keys that might carry weight info
for key in ("importance", "emotional_weight", "weight"):
@@ -155,7 +157,7 @@ class Layer1:
lines.append(room_line)
total_len += len(room_line)
for imp, meta, doc in entries:
for _imp, meta, doc in entries:
source = Path(meta.get("source_file", "")).name if meta.get("source_file") else ""
# Truncate doc to keep L1 compact
@@ -222,6 +224,8 @@ class Layer2:
lines = [f"## L2 — ON-DEMAND ({len(docs)} drawers)"]
for doc, meta in zip(docs[:n_results], metas[:n_results]):
meta = meta or {}
doc = doc or ""
room_name = meta.get("room", "?")
source = Path(meta.get("source_file", "")).name if meta.get("source_file") else ""
snippet = doc.strip().replace("\n", " ")
@@ -283,7 +287,7 @@ class Layer3:
for i, (doc, meta, dist) in enumerate(zip(docs, metas, dists), 1):
meta = meta or {}
doc = doc or ""
similarity = round(1 - dist, 3)
similarity = round(max(0.0, 1 - dist), 3)
wing_name = meta.get("wing", "?")
room_name = meta.get("room", "?")
source = Path(meta.get("source_file", "")).name if meta.get("source_file") else ""
+302 -80
View File
@@ -46,8 +46,10 @@ import argparse # noqa: E402 (deferred until after stdio protection above)
import json # noqa: E402
import logging # noqa: E402
import hashlib # noqa: E402
import sqlite3 # noqa: E402
import threading # noqa: E402
import time # noqa: E402
from datetime import datetime # noqa: E402
from datetime import date, datetime # noqa: E402
from pathlib import Path # noqa: E402
from .config import ( # noqa: E402
@@ -55,6 +57,7 @@ from .config import ( # noqa: E402
sanitize_kg_value,
sanitize_name,
sanitize_content,
sanitize_iso_date,
)
from .version import __version__ # noqa: E402
from chromadb.errors import NotFoundError as _ChromaNotFoundError # noqa: E402
@@ -78,7 +81,7 @@ from .palace_graph import ( # noqa: E402
follow_tunnels,
)
from .knowledge_graph import KnowledgeGraph # noqa: E402
from .knowledge_graph import KnowledgeGraph, DEFAULT_KG_PATH # noqa: E402
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stderr)
logger = logging.getLogger("mempalace_mcp")
@@ -103,12 +106,61 @@ if _args.palace:
os.environ["MEMPALACE_PALACE_PATH"] = os.path.abspath(_args.palace)
_config = MempalaceConfig()
# Only override KG path when --palace is explicitly provided; otherwise use
# KnowledgeGraph's default (~/.mempalace/knowledge_graph.sqlite3).
if _args.palace:
_kg = KnowledgeGraph(db_path=os.path.join(_config.palace_path, "knowledge_graph.sqlite3"))
else:
_kg = KnowledgeGraph()
_kg_by_path: dict[str, KnowledgeGraph] = {}
_kg_cache_lock = threading.Lock()
_palace_flag_given: bool = bool(_args.palace)
def _resolve_kg_path() -> str:
if _palace_flag_given:
return os.path.join(_config.palace_path, "knowledge_graph.sqlite3")
return DEFAULT_KG_PATH
def _get_kg() -> KnowledgeGraph:
path = os.path.abspath(_resolve_kg_path())
kg = _kg_by_path.get(path)
if kg is not None:
return kg
with _kg_cache_lock:
kg = _kg_by_path.get(path)
if kg is None:
kg = KnowledgeGraph(db_path=path)
_kg_by_path[path] = kg
return kg
def _call_kg(op):
"""Run ``op(kg)`` against the cached KG with one-shot retry on close.
Race we're guarding against: a handler grabs ``kg = _get_kg()`` and is
about to call ``kg.add_triple(...)`` when ``tool_reconnect`` fires on
another thread, drains ``_kg_by_path``, and closes the underlying
sqlite3.Connection. The handler's call then raises
``sqlite3.ProgrammingError: Cannot operate on a closed database`` and
bubbles up as a -32000 to the MCP client even though the user just
asked for a reconnect.
Catch that single class of error, evict the stale entry from the
cache (only if it still points at the closed instance — another
thread may have already replaced it), and try once more with a fresh
KG. Beyond one retry give up: a second close means we're losing a
sustained race we won't win in this loop, and a hung loop is worse
than a clear failure surface.
"""
for attempt in range(2):
kg = _get_kg()
try:
return op(kg)
except sqlite3.ProgrammingError:
if attempt == 0:
path = os.path.abspath(_resolve_kg_path())
with _kg_cache_lock:
if _kg_by_path.get(path) is kg:
_kg_by_path.pop(path, None)
continue
raise
_client_cache = None
@@ -274,47 +326,94 @@ def _get_client():
def _get_collection(create=False):
"""Return the ChromaDB collection, caching the client between calls."""
global _collection_cache, _metadata_cache, _metadata_cache_time
try:
client = _get_client()
if create:
# hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor
# HNSW insert path, which has a race in repairConnectionsForUpdate /
# addPoint (see issues #974, #965). Set via metadata on fresh
# collections and re-applied via _pin_hnsw_threads() for legacy
# palaces whose collections were created before this fix (the
# runtime config does not persist cross-process in chromadb 1.5.x,
# so the retrofit runs every time _get_collection opens a cache).
#
# ChromaDB 1.5.x's Rust binding SIGSEGVs when get_or_create_collection
# is called with metadata that differs from what's stored. The split
# below skips the metadata-comparison codepath for existing
# collections, mirroring the backend-layer fix from #1262.
try:
raw = client.get_collection(_config.collection_name)
except _ChromaNotFoundError:
raw = client.create_collection(
_config.collection_name,
metadata={
"hnsw:space": "cosine",
"hnsw:num_threads": 1,
**_HNSW_BLOAT_GUARD,
},
)
_pin_hnsw_threads(raw)
_collection_cache = ChromaCollection(raw)
_metadata_cache = None
_metadata_cache_time = 0
elif _collection_cache is None:
raw = client.get_collection(_config.collection_name)
_pin_hnsw_threads(raw)
_collection_cache = ChromaCollection(raw)
_metadata_cache = None
_metadata_cache_time = 0
return _collection_cache
except Exception:
return None
"""Return the ChromaDB collection, caching the client between calls.
On failure, log the exception and retry once after clearing the client
and collection caches. Tools were silently returning ``None`` when a
cached client/collection went stale — typically after the chromadb
rust bindings invalidated a handle following an out-of-band write —
leaving the LLM with no diagnostic and no recovery path. The retry
forces ``_get_client()`` to rebuild from scratch (which re-runs
``quarantine_stale_hnsw`` per #1322), so the second attempt heals the
common stale-handle / stale-HNSW case automatically.
"""
global _client_cache, _collection_cache, _metadata_cache, _metadata_cache_time
for attempt in range(2):
try:
client = _get_client()
# ChromaDB 1.x persists the EF *identity* (its ``name()``) with the
# collection but not the EF *instance/configuration*. So a reader or
# writer that omits ``embedding_function=`` silently gets chromadb's
# built-in ``DefaultEmbeddingFunction`` — its ``name()`` matches the
# one we spoof in ``mempalace.embedding`` (both report ``"default"``,
# the identity check passes), but the *provider list* is chromadb's
# default rather than the user's resolved device. On bleeding-edge
# interpreters (#1299: python 3.14 + chromadb 1.5.x on Apple Silicon)
# that default provider selection can SIGSEGV the host process on
# first ``col.add()``. The miner / Stop hook ingest path avoids this
# because it routes through ``ChromaBackend.get_collection``, which
# resolves the EF via ``ChromaBackend._resolve_embedding_function``;
# the MCP server bypassed that abstraction. Resolve the EF inside the
# branches that actually open a collection so warm-cache reads stay
# zero-cost. Reuse the backend helper so the two call sites can't
# drift on logging or fallback semantics.
if create:
ef = ChromaBackend._resolve_embedding_function()
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
# hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor
# HNSW insert path, which has a race in repairConnectionsForUpdate /
# addPoint (see issues #974, #965). Set via metadata on fresh
# collections and re-applied via _pin_hnsw_threads() for legacy
# palaces whose collections were created before this fix (the
# runtime config does not persist cross-process in chromadb 1.5.x,
# so the retrofit runs every time _get_collection opens a cache).
#
# ChromaDB 1.5.x's Rust binding SIGSEGVs when get_or_create_collection
# is called with metadata that differs from what's stored. The split
# below skips the metadata-comparison codepath for existing
# collections, mirroring the backend-layer fix from #1262.
try:
raw = client.get_collection(_config.collection_name, **ef_kwargs)
except _ChromaNotFoundError:
raw = client.create_collection(
_config.collection_name,
metadata={
"hnsw:space": "cosine",
"hnsw:num_threads": 1,
**_HNSW_BLOAT_GUARD,
},
**ef_kwargs,
)
_pin_hnsw_threads(raw)
_collection_cache = ChromaCollection(raw, palace_path=_config.palace_path)
_metadata_cache = None
_metadata_cache_time = 0
elif _collection_cache is None:
ef = ChromaBackend._resolve_embedding_function()
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
raw = client.get_collection(_config.collection_name, **ef_kwargs)
_pin_hnsw_threads(raw)
_collection_cache = ChromaCollection(raw, palace_path=_config.palace_path)
_metadata_cache = None
_metadata_cache_time = 0
return _collection_cache
except Exception:
logger.exception(
"_get_collection attempt %d/2 failed (palace=%s, create=%s)",
attempt + 1,
_config.palace_path,
create,
)
if attempt == 0:
# Reset all caches so the next attempt forces _get_client()
# to rebuild the chromadb client from scratch — that path
# re-runs quarantine_stale_hnsw (#1322) and reopens the
# collection cleanly, healing the common stale-handle case.
_client_cache = None
_collection_cache = None
_metadata_cache = None
_metadata_cache_time = 0
return None
def _no_palace():
@@ -433,7 +532,6 @@ def _tool_status_via_sqlite() -> dict:
"total_drawers": total,
"wings": wings,
"rooms": rooms,
"palace_path": _config.palace_path,
"protocol": PALACE_PROTOCOL,
"aaak_dialect": AAAK_SPEC,
"vector_disabled": True,
@@ -472,7 +570,6 @@ def tool_status():
"total_drawers": count,
"wings": wings,
"rooms": rooms,
"palace_path": _config.palace_path,
"protocol": PALACE_PROTOCOL,
"aaak_dialect": AAAK_SPEC,
}
@@ -656,7 +753,7 @@ def tool_check_duplicate(content: str, threshold: float = 0.9):
"vector_disabled": True,
"vector_disabled_reason": _vector_disabled_reason,
"hint": (
"duplicate detection requires vector search; run " "`mempalace repair` to restore"
"duplicate detection requires vector search; run `mempalace repair` to restore"
),
}
try:
@@ -669,10 +766,12 @@ def tool_check_duplicate(content: str, threshold: float = 0.9):
if results["ids"] and results["ids"][0]:
for i, drawer_id in enumerate(results["ids"][0]):
dist = results["distances"][0][i]
similarity = round(1 - dist, 3)
similarity = round(max(0.0, 1 - dist), 3)
if similarity >= threshold:
meta = results["metadatas"][0][i]
doc = results["documents"][0][i]
# Chroma 1.5.x can return None for partially-flushed rows;
# coerce to empty sentinels so downstream .get() is safe.
meta = results["metadatas"][0][i] or {}
doc = results["documents"][0][i] or ""
duplicates.append(
{
"id": drawer_id,
@@ -827,7 +926,7 @@ def tool_add_drawer(
if existing and existing["ids"]:
return {"success": True, "reason": "already_exists", "drawer_id": drawer_id}
except Exception:
pass
logger.debug("Idempotency pre-check failed for %s", drawer_id, exc_info=True)
try:
col.upsert(
@@ -893,12 +992,21 @@ def tool_get_drawer(drawer_id: str):
return {"error": f"Drawer not found: {drawer_id}"}
meta = result["metadatas"][0]
doc = result["documents"][0]
# source_file is the absolute filesystem path written by the
# miners. Reduce to its basename before handing it to the MCP
# client — same threat model as the palace_path leak fix:
# nested-agent / multi-server topologies treat the client as a
# separate trust domain. Basename preserves citation utility.
# Mirrors the searcher.search_memories() return shape.
safe_meta = dict(meta) if meta else {}
if safe_meta.get("source_file"):
safe_meta["source_file"] = Path(safe_meta["source_file"]).name
return {
"drawer_id": drawer_id,
"content": doc,
"wing": meta.get("wing", ""),
"room": meta.get("room", ""),
"metadata": meta,
"wing": safe_meta.get("wing", ""),
"room": safe_meta.get("room", ""),
"metadata": safe_meta,
}
except Exception as e:
return {"error": str(e)}
@@ -933,6 +1041,13 @@ def tool_list_drawers(wing: str = None, room: str = None, limit: int = 20, offse
kwargs["where"] = where
result = col.get(**kwargs)
# Compute total matching drawers for pagination.
if where:
total_result = col.get(where=where, include=[])
total = len(total_result["ids"])
else:
total = col.count()
drawers = []
for i, did in enumerate(result["ids"]):
meta = result["metadatas"][i]
@@ -947,6 +1062,7 @@ def tool_list_drawers(wing: str = None, room: str = None, limit: int = 20, offse
)
return {
"drawers": drawers,
"total": total,
"count": len(drawers),
"offset": offset,
"limit": limit,
@@ -1031,22 +1147,41 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"):
"""Query the knowledge graph for an entity's relationships."""
try:
entity = sanitize_kg_value(entity, "entity")
as_of = sanitize_iso_date(as_of, "as_of")
except ValueError as e:
return {"error": str(e)}
if direction not in ("outgoing", "incoming", "both"):
return {"error": "direction must be 'outgoing', 'incoming', or 'both'"}
results = _kg.query_entity(entity, as_of=as_of, direction=direction)
results = _call_kg(lambda kg: kg.query_entity(entity, as_of=as_of, direction=direction))
return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)}
def tool_kg_add(
subject: str, predicate: str, object: str, valid_from: str = None, source_closet: str = None
subject: str,
predicate: str,
object: str,
valid_from: str = None,
valid_to: str = None,
source_closet: str = None,
source_file: str = None,
source_drawer_id: str = None,
):
"""Add a relationship to the knowledge graph."""
"""Add a relationship to the knowledge graph.
All temporal and provenance fields are optional. ``valid_to`` lets callers
backfill historical facts with a known end date in a single call (instead
of a separate ``kg_invalidate``). ``source_file`` and ``source_drawer_id``
are RFC 002 provenance fields populated by adapters / bulk importers.
TODO(#1283): once the ISO-8601 validation PR lands, wire ``validate_iso_date``
over ``valid_from`` / ``valid_to`` here so malformed dates fail fast at the
MCP boundary instead of silently producing empty query results.
"""
try:
subject = sanitize_kg_value(subject, "subject")
predicate = sanitize_name(predicate, "predicate")
object = sanitize_kg_value(object, "object")
valid_from = sanitize_iso_date(valid_from, "valid_from")
except ValueError as e:
return {"success": False, "error": str(e)}
@@ -1057,32 +1192,59 @@ def tool_kg_add(
"predicate": predicate,
"object": object,
"valid_from": valid_from,
"valid_to": valid_to,
"source_closet": source_closet,
"source_file": source_file,
"source_drawer_id": source_drawer_id,
},
)
triple_id = _kg.add_triple(
subject, predicate, object, valid_from=valid_from, source_closet=source_closet
triple_id = _call_kg(
lambda kg: kg.add_triple(
subject,
predicate,
object,
valid_from=valid_from,
valid_to=valid_to,
source_closet=source_closet,
source_file=source_file,
source_drawer_id=source_drawer_id,
)
)
return {"success": True, "triple_id": triple_id, "fact": f"{subject}{predicate}{object}"}
def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = None):
"""Mark a fact as no longer true (set end date)."""
"""Mark a fact as no longer true (set end date).
Returns the actual ``ended`` date that was stored — when the caller omits
``ended``, the underlying graph stamps ``date.today()``, and the response
reflects that resolved value (instead of the literal string ``"today"``)
so callers can verify what was persisted.
TODO(#1283): apply ``validate_iso_date`` to ``ended`` once that PR lands.
"""
try:
subject = sanitize_kg_value(subject, "subject")
predicate = sanitize_name(predicate, "predicate")
object = sanitize_kg_value(object, "object")
ended = sanitize_iso_date(ended, "ended")
except ValueError as e:
return {"success": False, "error": str(e)}
resolved_ended = ended or date.today().isoformat()
_wal_log(
"kg_invalidate",
{"subject": subject, "predicate": predicate, "object": object, "ended": ended},
{
"subject": subject,
"predicate": predicate,
"object": object,
"ended": resolved_ended,
},
)
_kg.invalidate(subject, predicate, object, ended=ended)
_call_kg(lambda kg: kg.invalidate(subject, predicate, object, ended=resolved_ended))
return {
"success": True,
"fact": f"{subject}{predicate}{object}",
"ended": ended or "today",
"ended": resolved_ended,
}
@@ -1093,13 +1255,13 @@ def tool_kg_timeline(entity: str = None):
entity = sanitize_kg_value(entity, "entity")
except ValueError as e:
return {"error": str(e)}
results = _kg.timeline(entity)
results = _call_kg(lambda kg: kg.timeline(entity))
return {"entity": entity or "all", "timeline": results, "count": len(results)}
def tool_kg_stats():
"""Knowledge graph overview: entities, triples, relationship types."""
return _kg.stats()
return _call_kg(lambda kg: kg.stats())
# ==================== AGENT DIARY ====================
@@ -1112,9 +1274,13 @@ def tool_diary_write(agent_name: str, entry: str, topic: str = "general", wing:
This is the agent's personal journal — observations, thoughts,
what it worked on, what it noticed, what it thinks matters.
Note: ``agent_name`` is normalized to lowercase before storage so
that diary reads are case-insensitive (see #1243). "Claude",
"claude", and "CLAUDE" all resolve to the same agent.
"""
try:
agent_name = sanitize_name(agent_name, "agent_name")
agent_name = sanitize_name(agent_name, "agent_name").lower()
entry = sanitize_content(entry)
topic = sanitize_name(topic, "topic")
except ValueError as e:
@@ -1123,7 +1289,7 @@ def tool_diary_write(agent_name: str, entry: str, topic: str = "general", wing:
if wing:
wing = sanitize_name(wing)
else:
wing = f"wing_{agent_name.lower().replace(' ', '_')}"
wing = f"wing_{agent_name.replace(' ', '_')}"
room = "diary"
col = _get_collection(create=True)
if not col:
@@ -1188,9 +1354,14 @@ def tool_diary_read(agent_name: str, last_n: int = 10, wing: str = ""):
written to. Diary writes from hooks land in project-derived wings
(``wing_<project>``), so requiring a specific wing on read would
silo those entries from agent-initiated reads.
Note: ``agent_name`` is normalized to lowercase before filtering so
that reads are case-insensitive (see #1243). Entries written under
pre-fix mixed-case agent names will not match the lowercase filter;
use ``mempalace repair`` to migrate legacy data if needed.
"""
try:
agent_name = sanitize_name(agent_name, "agent_name")
agent_name = sanitize_name(agent_name, "agent_name").lower()
if wing:
wing = sanitize_name(wing)
except ValueError as e:
@@ -1273,7 +1444,7 @@ def tool_hook_settings(silent_save: bool = None, desktop_toast: bool = None):
try:
config = MempalaceConfig()
except Exception:
pass
logger.debug("Could not re-read config after update", exc_info=True)
result = {
"success": True,
@@ -1322,10 +1493,11 @@ def tool_memories_filed_away():
def tool_reconnect():
"""Force the MCP server to drop the cached ChromaDB collection and reconnect.
"""Force the MCP server to drop cached ChromaDB + KnowledgeGraph state.
Use after external scripts or CLI commands modify the palace database
directly, which can leave the in-memory HNSW index stale.
or replace ``knowledge_graph.sqlite3`` directly, which can leave the
in-memory HNSW index stale or pin a closed-on-disk SQLite connection.
"""
global \
_client_cache, \
@@ -1343,6 +1515,15 @@ def tool_reconnect():
# still applies after the reconnect.
_vector_disabled = False
_vector_disabled_reason = ""
# Drain the per-path KnowledgeGraph cache so a replaced sqlite file is
# reopened on the next tool call rather than served from a stale handle.
with _kg_cache_lock:
for kg in _kg_by_path.values():
try:
kg.close()
except Exception:
pass
_kg_by_path.clear()
try:
col = _get_collection()
if col is None:
@@ -1419,7 +1600,7 @@ TOOLS = {
"handler": tool_kg_query,
},
"mempalace_kg_add": {
"description": "Add a fact to the knowledge graph. Subject → predicate → object with optional time window. E.g. ('Max', 'started_school', 'Year 7', valid_from='2026-09-01').",
"description": "Add a fact to the knowledge graph. Subject → predicate → object with optional time window. E.g. ('Max', 'started_school', 'Year 7', valid_from='2026-09-01'). Pass valid_to to backfill an already-ended historical fact in a single call.",
"input_schema": {
"type": "object",
"properties": {
@@ -1433,10 +1614,22 @@ TOOLS = {
"type": "string",
"description": "When this became true (YYYY-MM-DD, optional)",
},
"valid_to": {
"type": "string",
"description": "When this stopped being true (YYYY-MM-DD, optional). Use for backfilling already-ended historical facts.",
},
"source_closet": {
"type": "string",
"description": "Closet ID where this fact appears (optional)",
},
"source_file": {
"type": "string",
"description": "Source file path the fact was extracted from (optional)",
},
"source_drawer_id": {
"type": "string",
"description": "Drawer ID the fact was extracted from (optional, RFC 002 provenance)",
},
},
"required": ["subject", "predicate", "object"],
},
@@ -1660,7 +1853,7 @@ TOOLS = {
"handler": tool_get_drawer,
},
"mempalace_list_drawers": {
"description": "List drawers with pagination. Optional wing/room filter. Returns IDs, wings, rooms, and content previews.",
"description": "List drawers with pagination. Optional wing/room filter. Returns IDs, wings, rooms, content previews, and total matching count for pagination.",
"input_schema": {
"type": "object",
"properties": {
@@ -1801,6 +1994,12 @@ SUPPORTED_PROTOCOL_VERSIONS = [
def handle_request(request):
if not isinstance(request, dict):
return {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32600, "message": "Invalid Request"},
}
method = request.get("method") or ""
params = request.get("params") or {}
req_id = request.get("id")
@@ -1838,6 +2037,15 @@ def handle_request(request):
},
}
elif method == "tools/call":
if not isinstance(params, dict) or "name" not in params:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32602,
"message": "Invalid params: 'name' is required for tools/call",
},
}
tool_name = params.get("name")
tool_args = params.get("arguments") or {}
if tool_name not in TOOLS:
@@ -1886,7 +2094,11 @@ def handle_request(request):
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]},
"result": {
"content": [
{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}
]
},
}
except Exception:
logger.exception(f"Tool error in {tool_name}")
@@ -1921,6 +2133,16 @@ def _restore_stdout():
def main():
_restore_stdout()
# Force UTF-8 on stdio. MCP JSON-RPC is UTF-8, but Python on Windows
# defaults stdin/stdout to the system codepage (e.g. cp1251), which
# corrupts non-ASCII payloads and surfaces as generic -32000 errors on
# Cyrillic/CJK content. See PEP 540.
for stream in (sys.stdin, sys.stdout):
if hasattr(stream, "reconfigure"):
try:
stream.reconfigure(encoding="utf-8", errors="replace")
except (AttributeError, OSError):
pass
logger.info("MemPalace MCP Server starting...")
# Pre-flight: probe HNSW capacity before any tool call so the warning
# is visible at startup rather than on first use (#1222). Pure
@@ -1937,7 +2159,7 @@ def main():
request = json.loads(line)
response = handle_request(request)
if response is not None:
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.write(json.dumps(response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
break
+4 -1
View File
@@ -12,6 +12,7 @@ import sys
import shlex
import hashlib
import fnmatch
import logging
from pathlib import Path
from datetime import datetime
from collections import defaultdict
@@ -31,6 +32,8 @@ from .palace import (
upsert_closet_lines,
)
logger = logging.getLogger("mempalace_mcp")
READABLE_EXTENSIONS = {
".txt",
".md",
@@ -842,7 +845,7 @@ def process_file(
try:
collection.delete(where={"source_file": source_file})
except Exception:
pass
logger.debug("Stale-drawer purge failed for %s", source_file, exc_info=True)
# Batch chunks into bounded upserts so the embedding model sees many
# chunks per forward pass without building one huge Chroma/SQLite
+2 -2
View File
@@ -118,14 +118,14 @@ def normalize(filepath: str) -> str:
try:
file_size = os.path.getsize(filepath)
except OSError as e:
raise IOError(f"Could not read {filepath}: {e}")
raise IOError(f"Could not read {filepath}: {e}") from e
if file_size > 500 * 1024 * 1024: # 500 MB safety limit
raise IOError(f"File too large ({file_size // (1024 * 1024)} MB): {filepath}")
try:
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
except OSError as e:
raise IOError(f"Could not read {filepath}: {e}")
raise IOError(f"Could not read {filepath}: {e}") from e
if not content.strip():
return content
+63 -3
View File
@@ -6,11 +6,15 @@ Consolidates collection access patterns used by both miners and the MCP server.
import contextlib
import hashlib
import logging
import os
import re
import threading
from .backends.chroma import ChromaBackend
logger = logging.getLogger("mempalace_mcp")
SKIP_DIRS = {
".git",
"node_modules",
@@ -228,7 +232,7 @@ def purge_file_closets(closets_col, source_file: str) -> None:
try:
closets_col.delete(where={"source_file": source_file})
except Exception:
pass
logger.debug("Closet purge failed for %s", source_file, exc_info=True)
def upsert_closet_lines(closets_col, closet_id_base, lines, metadata):
@@ -306,7 +310,7 @@ def mine_lock(source_file: str):
fcntl.flock(lf, fcntl.LOCK_UN)
except Exception:
pass
logger.debug("Mine-lock release failed", exc_info=True)
lf.close()
@@ -314,6 +318,47 @@ class MineAlreadyRunning(RuntimeError):
"""Raised when another `mempalace mine` already holds the per-palace lock."""
# Per-thread record of palaces this thread already holds the lock for. Used by
# `mine_palace_lock` to short-circuit re-entrant acquisition from the same
# thread (e.g. miner.mine() acquires the outer lock then calls
# ChromaCollection.upsert which now also tries to acquire). Without this guard
# the inner call would block on its own outer flock (Linux fcntl locks are per
# open file description, so a same-thread second open of the lock file is a
# distinct lock and self-deadlocks).
#
# The holder set is tagged with ``pid`` so that a forked child does NOT
# inherit re-entrant credit from its parent: the OS-level flock IS NOT
# inherited as a "we hold it" semantically — the child must reacquire — but
# Python's ``threading.local`` IS inherited across fork. The pid check
# clears stale state so a forked child correctly hits the fcntl path.
_palace_lock_holders = threading.local()
def _holder_state():
"""Return the per-thread (pid, keys) record, refreshing after fork."""
keys = getattr(_palace_lock_holders, "keys", None)
pid = getattr(_palace_lock_holders, "pid", None)
current_pid = os.getpid()
if keys is None or pid != current_pid:
keys = set()
_palace_lock_holders.keys = keys
_palace_lock_holders.pid = current_pid
return keys
def _held_by_this_thread(lock_key: str) -> bool:
"""Return True if this thread already holds ``mine_palace_lock`` for ``lock_key``."""
return lock_key in _holder_state()
def _mark_held(lock_key: str) -> None:
_holder_state().add(lock_key)
def _mark_released(lock_key: str) -> None:
_holder_state().discard(lock_key)
@contextlib.contextmanager
def mine_palace_lock(palace_path: str):
"""Per-palace non-blocking lock around the full `mine` pipeline.
@@ -338,6 +383,12 @@ def mine_palace_lock(palace_path: str):
Non-blocking: if another `mine` is already writing to this palace,
raise MineAlreadyRunning so the caller can exit cleanly instead of
piling up as a waiting worker.
Re-entrant: if the current thread already holds the lock for the same
palace, the context manager passes through without re-acquiring. This
lets ChromaCollection write methods (which acquire the lock themselves
to protect MCP/direct callers) compose with miner.mine() (which holds
the outer lock for the entire mine pipeline) without self-deadlock.
"""
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
os.makedirs(lock_dir, exist_ok=True)
@@ -346,6 +397,11 @@ def mine_palace_lock(palace_path: str):
palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16]
lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock")
if _held_by_this_thread(palace_key):
# Same thread already holds the lock for this palace — pass through.
yield
return
lf = open(lock_path, "w")
acquired = False
try:
@@ -369,7 +425,11 @@ def mine_palace_lock(palace_path: str):
raise MineAlreadyRunning(
f"another `mempalace mine` is already running against {resolved}"
) from exc
yield
_mark_held(palace_key)
try:
yield
finally:
_mark_released(palace_key)
finally:
if acquired:
try:
+1 -1
View File
@@ -575,7 +575,7 @@ def follow_tunnels(wing: str, room: str, col=None, config=None):
if did and did in drawer_map:
c["drawer_preview"] = drawer_map[did][:300]
except Exception:
pass
logger.debug("Drawer preview hydration failed", exc_info=True)
return connections
+1 -1
View File
@@ -202,7 +202,7 @@ def detect_rooms_from_files(project_dir: str) -> list:
SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
for root, dirs, filenames in os.walk(project_path):
for _root, dirs, filenames in os.walk(project_path):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for filename in filenames:
name_lower = filename.lower().replace("-", "_").replace(" ", "_")
+199 -10
View File
@@ -134,6 +134,11 @@ def _hybrid_rank(
themselves. Since the absolute scale is unbounded, BM25 is min-max
normalized within the candidate set so weights are commensurable.
Candidates with ``distance=None`` are treated as vector-unknown
(no vector signal available) and scored on BM25 contribution alone.
Used by candidate-union mode to merge BM25-only candidates that the
vector index didn't surface.
Mutates each result dict to add ``bm25_score`` and reorders the list
in place. Returns the same list for convenience.
"""
@@ -147,7 +152,11 @@ def _hybrid_rank(
scored = []
for r, raw, norm in zip(results, bm25_raw, bm25_norm):
vec_sim = max(0.0, 1.0 - r.get("distance", 1.0))
distance = r.get("distance")
if distance is None:
vec_sim = 0.0
else:
vec_sim = max(0.0, 1.0 - distance)
r["bm25_score"] = round(raw, 3)
scored.append((vector_weight * vec_sim + bm25_weight * norm, r))
@@ -236,7 +245,7 @@ def _expand_with_neighbors(drawers_col, matched_doc: str, matched_meta: dict, ra
all_meta = drawers_col.get(where={"source_file": src}, include=["metadatas"])
total_drawers = len(all_meta.ids) if all_meta.ids else None
except Exception:
pass
logger.debug("total_drawers lookup failed for %s", src, exc_info=True)
return {
"text": combined_text,
@@ -288,10 +297,10 @@ def search(query: str, palace_path: str, wing: str = None, room: str = None, n_r
"""
try:
col = get_collection(palace_path, create=False)
except Exception:
except Exception as e:
print(f"\n No palace found at {palace_path}")
print(" Run: mempalace init <dir> then mempalace mine <dir>")
raise SearchError(f"No palace found at {palace_path}")
raise SearchError(f"No palace found at {palace_path}") from e
# Alert the user if this palace predates hnsw:space=cosine being set on
# creation — their similarity scores will be junk until they run repair.
@@ -331,7 +340,7 @@ def search(query: str, palace_path: str, wing: str = None, room: str = None, n_r
# `_hybrid_rank`; do the same here so CLI results match what agents
# see via `mempalace_search`.
hits = [
{"text": doc, "distance": float(dist), "metadata": meta or {}}
{"text": doc or "", "distance": float(dist), "metadata": meta or {}}
for doc, meta, dist in zip(docs, metas, dists)
]
hits = _hybrid_rank(hits, query)
@@ -372,6 +381,7 @@ def _bm25_only_via_sqlite(
room: str = None,
n_results: int = 5,
max_candidates: int = 500,
_include_internal: bool = False,
) -> dict:
"""BM25-only search reading drawers directly from chroma.sqlite3.
@@ -540,17 +550,25 @@ def _bm25_only_via_sqlite(
continue
if room and meta.get("room") != room:
continue
full_source = meta.get("source_file", "") or ""
candidates.append(
{
"text": d["text"],
"wing": meta.get("wing", "unknown"),
"room": meta.get("room", "unknown"),
"source_file": Path(meta.get("source_file", "?") or "?").name,
"source_file": Path(full_source).name if full_source else "?",
"created_at": meta.get("filed_at", "unknown"),
# No vector distance available in BM25-only mode.
"similarity": None,
"distance": None,
"matched_via": "bm25_sqlite",
# Internal: full path + chunk_index let callers (notably
# candidate_strategy="union") dedupe at chunk granularity
# rather than basename — two files in different directories
# may share a basename, and one source_file is split across
# multiple chunks. Stripped before this helper returns.
"_source_file_full": full_source,
"_chunk_index": meta.get("chunk_index"),
}
)
@@ -565,6 +583,12 @@ def _bm25_only_via_sqlite(
hits = candidates[:n_results]
for h in hits:
h.pop("_score", None)
# Strip internal fields by default so the public BM25-only fallback
# response stays clean. Callers that need chunk-precise dedup
# (notably the union-merge path) opt in via _include_internal.
if not _include_internal:
h.pop("_source_file_full", None)
h.pop("_chunk_index", None)
return {
"query": query,
@@ -576,6 +600,117 @@ def _bm25_only_via_sqlite(
}
def _merge_bm25_union_candidates(
hits: list,
query: str,
palace_path: str,
wing: str,
room: str,
n_results: int,
max_distance: float = 0.0,
) -> None:
"""Append top-K BM25-only candidates from sqlite into ``hits`` in place.
Used by ``search_memories(..., candidate_strategy="union")`` to widen
the rerank pool's *source* (not just its size) — vector-only candidate
selection skips docs whose embeddings are far from the query even when
BM25 signal is strong.
Dedup is chunk-precise: the key is ``(_source_file_full, _chunk_index)``
so two files sharing a basename in different directories don't collide,
and a vector hit on chunk N of a file doesn't block BM25 from
contributing chunk M of the same file. Falls back to ``source_file``
only when full-path/chunk metadata is absent.
BM25-only additions carry ``distance=None`` so ``_hybrid_rank`` scores
them on BM25 contribution alone.
When ``max_distance > 0.0`` (a strict vector-distance threshold is
set), BM25-only candidates are skipped entirely — they have no vector
distance to satisfy the threshold, and silently injecting them would
break the existing ``max_distance`` guarantee that hybrid results lie
within the requested vector-distance bound.
"""
if max_distance > 0.0:
return
try:
bm25_extra = _bm25_only_via_sqlite(
query,
palace_path,
wing=wing,
room=room,
n_results=n_results * 3,
_include_internal=True,
).get("results", [])
except Exception:
logger.debug("candidate_strategy=union: BM25 fetch failed", exc_info=True)
return
def _dedup_key(entry: dict):
full = entry.get("_source_file_full")
ci = entry.get("_chunk_index")
if full and ci is not None:
return (full, ci)
# Fall back to basename only when richer metadata is missing —
# avoids silently dropping candidates on legacy data while still
# giving chunk-precise dedup whenever the metadata is present.
return entry.get("source_file")
seen = {_dedup_key(h) for h in hits}
for bh in bm25_extra:
key = _dedup_key(bh)
if not key or key == "?" or key in seen:
continue
bh["distance"] = None
bh["effective_distance"] = None
bh["closet_boost"] = 0.0
hits.append(bh)
seen.add(key)
# Strategy dispatch — keeps search_memories' branch count under the
# project's complexity ceiling (C901 max-complexity=25). New strategies
# register here.
_CANDIDATE_MERGERS = {
"vector": None, # default no-op
"union": _merge_bm25_union_candidates,
}
def _validate_candidate_strategy(strategy: str) -> None:
"""Raise ``ValueError`` for unknown strategies.
Called eagerly at the top of ``search_memories`` so invalid values
fail consistently regardless of whether the call routes through the
vector path, the BM25-only fallback, or returns an early error dict.
"""
if strategy not in _CANDIDATE_MERGERS:
raise ValueError(
f"candidate_strategy must be one of {tuple(_CANDIDATE_MERGERS)}, got {strategy!r}"
)
def _apply_candidate_strategy(
strategy: str,
hits: list,
query: str,
palace_path: str,
wing: str,
room: str,
n_results: int,
max_distance: float = 0.0,
) -> None:
"""Dispatch to the registered merger for ``strategy``.
Strategy validity is assumed (``_validate_candidate_strategy`` runs
earlier); ``"vector"`` is a no-op.
"""
merger = _CANDIDATE_MERGERS[strategy]
if merger is not None:
merger(hits, query, palace_path, wing, room, n_results, max_distance=max_distance)
def search_memories(
query: str,
palace_path: str,
@@ -584,6 +719,7 @@ def search_memories(
n_results: int = 5,
max_distance: float = 0.0,
vector_disabled: bool = False,
candidate_strategy: str = "vector",
) -> dict:
"""Programmatic search — returns a dict instead of printing.
@@ -603,7 +739,30 @@ def search_memories(
(#1222). Set by the MCP server when the HNSW capacity probe
detects a divergence that would segfault chromadb on segment
load.
candidate_strategy: How candidates for the hybrid re-rank are gathered.
* ``"vector"`` (default) — preserves historical behavior: top
``n_results * 3`` rows from the vector index are the rerank pool.
Cheap; works well when query and target docs agree in the
embedding space.
* ``"union"`` — also pull top ``n_results * 3`` BM25 candidates
from the sqlite FTS5 index and merge them into the rerank pool
(deduped by source_file). Catches docs with strong BM25 signal
that are vector-distant from the query (e.g. terminology guides
looked up by narrative-shaped queries; policy clauses surfaced
by scenario descriptions). Adds one sqlite open + FTS5 MATCH
per query; perf cost is small but unmeasured at corpus scale.
Opt in until the cost is characterized.
When ``max_distance > 0.0`` is also set, BM25-only candidates
are skipped — they have no vector distance and would silently
violate the requested distance threshold.
"""
# Validate the strategy eagerly so invalid values fail the same way
# regardless of whether the call routes through the vector path or
# the BM25-only fallback below.
_validate_candidate_strategy(candidate_strategy)
if vector_disabled:
return _bm25_only_via_sqlite(
query,
@@ -667,7 +826,8 @@ def search_memories(
if source and source not in closet_boost_by_source:
closet_boost_by_source[source] = (rank, cdist, cdoc[:200])
except Exception:
pass # no closets yet — hybrid degrades to pure drawer search
# No closets yet — hybrid degrades to pure drawer search.
logger.debug("Closet collection unavailable; using drawer-only search", exc_info=True)
# Rank-based boost. The ordinal signal ("which closet matched best") is
# more reliable than absolute distance on narrative content, where
@@ -681,6 +841,8 @@ def search_memories(
_first_or_empty(drawer_results, "metadatas"),
_first_or_empty(drawer_results, "distances"),
):
meta = meta or {}
doc = doc or ""
# Filter on raw distance before rounding to avoid precision loss.
if max_distance > 0.0 and dist > max_distance:
continue
@@ -697,7 +859,12 @@ def search_memories(
matched_via = "drawer+closet"
closet_preview = c_preview
effective_dist = dist - boost
# Clamp to the valid cosine-distance range [0, 2]. When a strong
# closet boost (up to 0.40) exceeds the raw distance, the subtraction
# can go negative — which (a) yields ``similarity > 1.0`` downstream
# and (b) makes the sort key land *below* ordinary positive distances,
# inverting the ranking so the best hybrid matches sort last.
effective_dist = max(0.0, min(2.0, dist - boost))
entry = {
"text": doc,
"wing": meta.get("wing", "unknown"),
@@ -742,6 +909,7 @@ def search_memories(
include=["documents", "metadatas"],
)
except Exception:
logger.debug("Neighbor fetch failed for %s", full_source, exc_info=True)
continue
docs = source_drawers.documents
metas_ = source_drawers.metadatas
@@ -779,8 +947,29 @@ def search_memories(
h["drawer_index"] = best_idx
h["total_drawers"] = len(ordered_docs)
# BM25 hybrid re-rank within the final candidate set.
hits = _hybrid_rank(hits, query)
# Candidate strategy hook: optionally widen the rerank pool's *source*
# before ranking. Default ("vector") is a no-op; "union" merges top-K
# BM25 candidates from sqlite. See `_apply_candidate_strategy`.
# ``max_distance`` is forwarded so union mode can refuse to inject
# BM25-only (distance=None) candidates that would silently bypass the
# caller's strict distance threshold.
_apply_candidate_strategy(
candidate_strategy,
hits,
query,
palace_path,
wing,
room,
n_results,
max_distance=max_distance,
)
# BM25 hybrid re-rank within the final candidate set, then trim back
# to the requested size. Without the trim, ``candidate_strategy="union"``
# would return up to 4× ``n_results`` (vector hits + BM25 union pool),
# breaking the existing ``search_memories`` size contract that the MCP
# ``limit`` parameter is built on.
hits = _hybrid_rank(hits, query)[:n_results]
for h in hits:
h.pop("_sort_key", None)
h.pop("_source_file_full", None)