4949aab68b
Chroma 1.5.x can return ``None`` inside the ``metadatas`` / ``documents`` lists of a query/get result for partially-flushed rows. The codebase already has a systemic None-guard pattern (merged #999, #1013, #1019) but three call sites were still unguarded: * ``mcp_server.tool_check_duplicate`` (``mcp_server.py:487-488``) — ``meta = results["metadatas"][0][i]`` followed by ``meta.get(...)`` raises ``AttributeError: 'NoneType' object has no attribute 'get'``. The broad ``except Exception`` wrapper (line 504) swallows it and returns an uninformative ``"Duplicate check failed"``. * ``layers.Layer1.generate`` (``layers.py:126``) — iterates ``zip(docs, metas)`` and calls ``meta.get(key)`` in the importance loop. A single None metadata blows up the entire wake-up render. * ``layers.Layer2.retrieve`` (``layers.py:224``) — same pattern, same crash path for the on-demand render. Apply the same ``meta = meta or {}`` / ``doc = doc or ""`` idiom used by the merged guards in the search path. Three-line additions, no behaviour change on well-formed results. Tests added: * ``test_check_duplicate_handles_none_metadata`` — mocks the collection query to return ``None`` for one metadata and document, asserts the call does not crash and the sentinel-rendered entry has wing/room "?" and empty content. * ``test_layer1_handles_none_metadata`` / ``_handles_none_document`` * ``test_layer2_handles_none_metadata`` Relationship to other open PRs: * **#1019** guarded ``searcher.py`` loops. This PR extends the same guard to the three call sites #1019 did not touch. * **#979** fixed ``tool_check_duplicate`` negative similarity but left the None-metadata path unguarded. * Does not overlap **#1013** (``Layer3.search_raw``) or **#999**.
507 lines
17 KiB
Python
507 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
layers.py — 4-Layer Memory Stack for mempalace
|
|
===================================================
|
|
|
|
Load only what you need, when you need it.
|
|
|
|
Layer 0: Identity (~100 tokens) — Always loaded. "Who am I?"
|
|
Layer 1: Essential Story (~500-800) — Always loaded. Top moments from the palace.
|
|
Layer 2: On-Demand (~200-500 each) — Loaded when a topic/wing comes up.
|
|
Layer 3: Deep Search (unlimited) — Full ChromaDB semantic search.
|
|
|
|
Wake-up cost: ~600-900 tokens (L0+L1). Leaves 95%+ of context free.
|
|
|
|
Reads directly from ChromaDB (mempalace_drawers)
|
|
and ~/.mempalace/identity.txt.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
from .config import MempalaceConfig
|
|
from .palace import get_collection as _get_collection
|
|
from .searcher import _first_or_empty, build_where_filter
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 0 — Identity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Layer0:
|
|
"""
|
|
~100 tokens. Always loaded.
|
|
Reads from ~/.mempalace/identity.txt — a plain-text file the user writes.
|
|
|
|
Example identity.txt:
|
|
I am Atlas, a personal AI assistant for Alice.
|
|
Traits: warm, direct, remembers everything.
|
|
People: Alice (creator), Bob (Alice's partner).
|
|
Project: A journaling app that helps people process emotions.
|
|
"""
|
|
|
|
def __init__(self, identity_path: str = None):
|
|
if identity_path is None:
|
|
identity_path = os.path.expanduser("~/.mempalace/identity.txt")
|
|
self.path = identity_path
|
|
self._text = None
|
|
|
|
def render(self) -> str:
|
|
"""Return the identity text, or a sensible default."""
|
|
if self._text is not None:
|
|
return self._text
|
|
|
|
if os.path.exists(self.path):
|
|
with open(self.path, "r") as f:
|
|
self._text = f.read().strip()
|
|
else:
|
|
self._text = (
|
|
"## L0 — IDENTITY\nNo identity configured. Create ~/.mempalace/identity.txt"
|
|
)
|
|
|
|
return self._text
|
|
|
|
def token_estimate(self) -> int:
|
|
return len(self.render()) // 4
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 1 — Essential Story (auto-generated from palace)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Layer1:
|
|
"""
|
|
~500-800 tokens. Always loaded.
|
|
Auto-generated from the highest-weight / most-recent drawers in the palace.
|
|
Groups by room, picks the top N moments, compresses to a compact summary.
|
|
"""
|
|
|
|
MAX_DRAWERS = 15 # at most 15 moments in wake-up
|
|
MAX_CHARS = 3200 # hard cap on total L1 text (~800 tokens)
|
|
MAX_SCAN = 2000 # don't scan more than this for L1 generation
|
|
|
|
def __init__(self, palace_path: str = None, wing: str = None):
|
|
cfg = MempalaceConfig()
|
|
self.palace_path = palace_path or cfg.palace_path
|
|
self.wing = wing
|
|
|
|
def generate(self) -> str:
|
|
"""Pull top drawers from ChromaDB and format as compact L1 text."""
|
|
try:
|
|
col = _get_collection(self.palace_path, create=False)
|
|
except Exception:
|
|
return "## L1 — No palace found. Run: mempalace mine <dir>"
|
|
|
|
# Fetch all drawers in batches to avoid SQLite variable limit (~999)
|
|
_BATCH = 500
|
|
docs, metas = [], []
|
|
offset = 0
|
|
while True:
|
|
kwargs = {"include": ["documents", "metadatas"], "limit": _BATCH, "offset": offset}
|
|
if self.wing:
|
|
kwargs["where"] = {"wing": self.wing}
|
|
try:
|
|
batch = col.get(**kwargs)
|
|
except Exception:
|
|
break
|
|
batch_docs = batch.get("documents", [])
|
|
batch_metas = batch.get("metadatas", [])
|
|
if not batch_docs:
|
|
break
|
|
docs.extend(batch_docs)
|
|
metas.extend(batch_metas)
|
|
offset += len(batch_docs)
|
|
if len(batch_docs) < _BATCH or len(docs) >= self.MAX_SCAN:
|
|
break
|
|
|
|
if not docs:
|
|
return "## L1 — No memories yet."
|
|
|
|
# Score each drawer: prefer high importance, recent filing
|
|
scored = []
|
|
for doc, meta in zip(docs, metas):
|
|
meta = meta or {}
|
|
doc = doc or ""
|
|
importance = 3
|
|
# Try multiple metadata keys that might carry weight info
|
|
for key in ("importance", "emotional_weight", "weight"):
|
|
val = meta.get(key)
|
|
if val is not None:
|
|
try:
|
|
importance = float(val)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
break
|
|
scored.append((importance, meta, doc))
|
|
|
|
# Sort by importance descending, take top N
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
top = scored[: self.MAX_DRAWERS]
|
|
|
|
# Group by room for readability
|
|
by_room = defaultdict(list)
|
|
for imp, meta, doc in top:
|
|
room = meta.get("room", "general")
|
|
by_room[room].append((imp, meta, doc))
|
|
|
|
# Build compact text
|
|
lines = ["## L1 — ESSENTIAL STORY"]
|
|
|
|
total_len = 0
|
|
for room, entries in sorted(by_room.items()):
|
|
room_line = f"\n[{room}]"
|
|
lines.append(room_line)
|
|
total_len += len(room_line)
|
|
|
|
for imp, meta, doc in entries:
|
|
source = Path(meta.get("source_file", "")).name if meta.get("source_file") else ""
|
|
|
|
# Truncate doc to keep L1 compact
|
|
snippet = doc.strip().replace("\n", " ")
|
|
if len(snippet) > 200:
|
|
snippet = snippet[:197] + "..."
|
|
|
|
entry_line = f" - {snippet}"
|
|
if source:
|
|
entry_line += f" ({source})"
|
|
|
|
if total_len + len(entry_line) > self.MAX_CHARS:
|
|
lines.append(" ... (more in L3 search)")
|
|
return "\n".join(lines)
|
|
|
|
lines.append(entry_line)
|
|
total_len += len(entry_line)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 2 — On-Demand (wing/room filtered retrieval)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Layer2:
|
|
"""
|
|
~200-500 tokens per retrieval.
|
|
Loaded when a specific topic or wing comes up in conversation.
|
|
Queries ChromaDB with a wing/room filter.
|
|
"""
|
|
|
|
def __init__(self, palace_path: str = None):
|
|
cfg = MempalaceConfig()
|
|
self.palace_path = palace_path or cfg.palace_path
|
|
|
|
def retrieve(self, wing: str = None, room: str = None, n_results: int = 10) -> str:
|
|
"""Retrieve drawers filtered by wing and/or room."""
|
|
try:
|
|
col = _get_collection(self.palace_path, create=False)
|
|
except Exception:
|
|
return "No palace found."
|
|
|
|
where = build_where_filter(wing, room)
|
|
|
|
kwargs = {"include": ["documents", "metadatas"], "limit": n_results}
|
|
if where:
|
|
kwargs["where"] = where
|
|
|
|
try:
|
|
results = col.get(**kwargs)
|
|
except Exception as e:
|
|
return f"Retrieval error: {e}"
|
|
|
|
docs = results.get("documents", [])
|
|
metas = results.get("metadatas", [])
|
|
|
|
if not docs:
|
|
label = f"wing={wing}" if wing else ""
|
|
if room:
|
|
label += f" room={room}" if label else f"room={room}"
|
|
return f"No drawers found for {label}."
|
|
|
|
lines = [f"## L2 — ON-DEMAND ({len(docs)} drawers)"]
|
|
for doc, meta in zip(docs[:n_results], metas[:n_results]):
|
|
meta = meta or {}
|
|
doc = doc or ""
|
|
room_name = meta.get("room", "?")
|
|
source = Path(meta.get("source_file", "")).name if meta.get("source_file") else ""
|
|
snippet = doc.strip().replace("\n", " ")
|
|
if len(snippet) > 300:
|
|
snippet = snippet[:297] + "..."
|
|
entry = f" [{room_name}] {snippet}"
|
|
if source:
|
|
entry += f" ({source})"
|
|
lines.append(entry)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 3 — Deep Search (full semantic search via ChromaDB)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Layer3:
|
|
"""
|
|
Unlimited depth. Semantic search against the full palace.
|
|
Reuses searcher.py logic against mempalace_drawers.
|
|
"""
|
|
|
|
def __init__(self, palace_path: str = None):
|
|
cfg = MempalaceConfig()
|
|
self.palace_path = palace_path or cfg.palace_path
|
|
|
|
def search(self, query: str, wing: str = None, room: str = None, n_results: int = 5) -> str:
|
|
"""Semantic search, returns compact result text."""
|
|
try:
|
|
col = _get_collection(self.palace_path, create=False)
|
|
except Exception:
|
|
return "No palace found."
|
|
|
|
where = build_where_filter(wing, room)
|
|
|
|
kwargs = {
|
|
"query_texts": [query],
|
|
"n_results": n_results,
|
|
"include": ["documents", "metadatas", "distances"],
|
|
}
|
|
if where:
|
|
kwargs["where"] = where
|
|
|
|
try:
|
|
results = col.query(**kwargs)
|
|
except Exception as e:
|
|
return f"Search error: {e}"
|
|
|
|
docs = _first_or_empty(results, "documents")
|
|
metas = _first_or_empty(results, "metadatas")
|
|
dists = _first_or_empty(results, "distances")
|
|
|
|
if not docs:
|
|
return "No results found."
|
|
|
|
lines = [f'## L3 — SEARCH RESULTS for "{query}"']
|
|
for i, (doc, meta, dist) in enumerate(zip(docs, metas, dists), 1):
|
|
meta = meta or {}
|
|
doc = doc or ""
|
|
similarity = round(1 - dist, 3)
|
|
wing_name = meta.get("wing", "?")
|
|
room_name = meta.get("room", "?")
|
|
source = Path(meta.get("source_file", "")).name if meta.get("source_file") else ""
|
|
|
|
snippet = doc.strip().replace("\n", " ")
|
|
if len(snippet) > 300:
|
|
snippet = snippet[:297] + "..."
|
|
|
|
lines.append(f" [{i}] {wing_name}/{room_name} (sim={similarity})")
|
|
lines.append(f" {snippet}")
|
|
if source:
|
|
lines.append(f" src: {source}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def search_raw(
|
|
self, query: str, wing: str = None, room: str = None, n_results: int = 5
|
|
) -> list:
|
|
"""Return raw dicts instead of formatted text."""
|
|
try:
|
|
col = _get_collection(self.palace_path, create=False)
|
|
except Exception:
|
|
return []
|
|
|
|
where = build_where_filter(wing, room)
|
|
|
|
kwargs = {
|
|
"query_texts": [query],
|
|
"n_results": n_results,
|
|
"include": ["documents", "metadatas", "distances"],
|
|
}
|
|
if where:
|
|
kwargs["where"] = where
|
|
|
|
try:
|
|
results = col.query(**kwargs)
|
|
except Exception:
|
|
return []
|
|
|
|
hits = []
|
|
for doc, meta, dist in zip(
|
|
_first_or_empty(results, "documents"),
|
|
_first_or_empty(results, "metadatas"),
|
|
_first_or_empty(results, "distances"),
|
|
):
|
|
# ChromaDB may return None for doc/meta when a drawer's HNSW entry
|
|
# exists but its metadata/document rows haven't been materialized
|
|
# (partial-flush states, mid-delete, schema upgrade boundaries).
|
|
# Degrade gracefully — the hit still appears with real distance;
|
|
# storage fields show their fallback where content is missing.
|
|
meta = meta or {}
|
|
doc = doc or ""
|
|
hits.append(
|
|
{
|
|
"text": doc,
|
|
"wing": meta.get("wing", "unknown"),
|
|
"room": meta.get("room", "unknown"),
|
|
"source_file": Path(meta.get("source_file", "?")).name,
|
|
"similarity": round(1 - dist, 3),
|
|
"metadata": meta,
|
|
}
|
|
)
|
|
return hits
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MemoryStack — unified interface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class MemoryStack:
|
|
"""
|
|
The full 4-layer stack. One class, one palace, everything works.
|
|
|
|
stack = MemoryStack()
|
|
print(stack.wake_up()) # L0 + L1 (~600-900 tokens)
|
|
print(stack.recall(wing="my_app")) # L2 on-demand
|
|
print(stack.search("pricing change")) # L3 deep search
|
|
"""
|
|
|
|
def __init__(self, palace_path: str = None, identity_path: str = None):
|
|
cfg = MempalaceConfig()
|
|
self.palace_path = palace_path or cfg.palace_path
|
|
self.identity_path = identity_path or os.path.expanduser("~/.mempalace/identity.txt")
|
|
|
|
self.l0 = Layer0(self.identity_path)
|
|
self.l1 = Layer1(self.palace_path)
|
|
self.l2 = Layer2(self.palace_path)
|
|
self.l3 = Layer3(self.palace_path)
|
|
|
|
def wake_up(self, wing: str = None) -> str:
|
|
"""
|
|
Generate wake-up text: L0 (identity) + L1 (essential story).
|
|
Typically ~600-900 tokens. Inject into system prompt or first message.
|
|
|
|
Args:
|
|
wing: Optional wing filter for L1 (project-specific wake-up).
|
|
"""
|
|
parts = []
|
|
|
|
# L0: Identity
|
|
parts.append(self.l0.render())
|
|
parts.append("")
|
|
|
|
# L1: Essential Story
|
|
if wing:
|
|
self.l1.wing = wing
|
|
parts.append(self.l1.generate())
|
|
|
|
return "\n".join(parts)
|
|
|
|
def recall(self, wing: str = None, room: str = None, n_results: int = 10) -> str:
|
|
"""On-demand L2 retrieval filtered by wing/room."""
|
|
return self.l2.retrieve(wing=wing, room=room, n_results=n_results)
|
|
|
|
def search(self, query: str, wing: str = None, room: str = None, n_results: int = 5) -> str:
|
|
"""Deep L3 semantic search."""
|
|
return self.l3.search(query, wing=wing, room=room, n_results=n_results)
|
|
|
|
def status(self) -> dict:
|
|
"""Status of all layers."""
|
|
result = {
|
|
"palace_path": self.palace_path,
|
|
"L0_identity": {
|
|
"path": self.identity_path,
|
|
"exists": os.path.exists(self.identity_path),
|
|
"tokens": self.l0.token_estimate(),
|
|
},
|
|
"L1_essential": {
|
|
"description": "Auto-generated from top palace drawers",
|
|
},
|
|
"L2_on_demand": {
|
|
"description": "Wing/room filtered retrieval",
|
|
},
|
|
"L3_deep_search": {
|
|
"description": "Full semantic search via ChromaDB",
|
|
},
|
|
}
|
|
|
|
# Count drawers
|
|
try:
|
|
col = _get_collection(self.palace_path, create=False)
|
|
count = col.count()
|
|
result["total_drawers"] = count
|
|
except Exception:
|
|
result["total_drawers"] = 0
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI (standalone)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
import json
|
|
|
|
def usage():
|
|
print("layers.py — 4-Layer Memory Stack")
|
|
print()
|
|
print("Usage:")
|
|
print(" python layers.py wake-up Show L0 + L1")
|
|
print(" python layers.py wake-up --wing=NAME Wake-up for a specific project")
|
|
print(" python layers.py recall --wing=NAME On-demand L2 retrieval")
|
|
print(" python layers.py search <query> Deep L3 search")
|
|
print(" python layers.py status Show layer status")
|
|
sys.exit(0)
|
|
|
|
if len(sys.argv) < 2:
|
|
usage()
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
# Parse flags
|
|
flags = {}
|
|
positional = []
|
|
for arg in sys.argv[2:]:
|
|
if arg.startswith("--") and "=" in arg:
|
|
key, val = arg.split("=", 1)
|
|
flags[key.lstrip("-")] = val
|
|
elif not arg.startswith("--"):
|
|
positional.append(arg)
|
|
|
|
palace_path = flags.get("palace")
|
|
stack = MemoryStack(palace_path=palace_path)
|
|
|
|
if cmd in ("wake-up", "wakeup"):
|
|
wing = flags.get("wing")
|
|
text = stack.wake_up(wing=wing)
|
|
tokens = len(text) // 4
|
|
print(f"Wake-up text (~{tokens} tokens):")
|
|
print("=" * 50)
|
|
print(text)
|
|
|
|
elif cmd == "recall":
|
|
wing = flags.get("wing")
|
|
room = flags.get("room")
|
|
text = stack.recall(wing=wing, room=room)
|
|
print(text)
|
|
|
|
elif cmd == "search":
|
|
query = " ".join(positional) if positional else ""
|
|
if not query:
|
|
print("Usage: python layers.py search <query>")
|
|
sys.exit(1)
|
|
wing = flags.get("wing")
|
|
room = flags.get("room")
|
|
text = stack.search(query, wing=wing, room=room)
|
|
print(text)
|
|
|
|
elif cmd == "status":
|
|
s = stack.status()
|
|
print(json.dumps(s, indent=2))
|
|
|
|
else:
|
|
usage()
|