feat: entity metadata + diary ingest + BM25 hybrid search

Three features that close the gap between the architecture docs
and the actual codebase:

1. Entity metadata on drawers and closets
   - _extract_entities_for_metadata() pulls names from known_entities.json
     + proper nouns appearing 2+ times
   - Stamped as "entities" field in ChromaDB metadata
   - Enables filterable search by person/project name

2. Day-based diary ingest (diary_ingest.py)
   - ONE drawer per day, upserted as the day grows
   - Closets pack topics atomically, never split mid-topic
   - Tracks entry count in state file, only processes new entries
   - Usage: python -m mempalace.diary_ingest --dir ~/summaries

3. BM25 hybrid search in searcher.py
   - _bm25_score() keyword matching complements vector similarity
   - _hybrid_rank() combines both signals (60% vector, 40% BM25)
   - Catches exact name/term matches that embeddings miss
   - Applied to both closet-first and direct drawer search paths

689/689 tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
MSL
2026-04-13 01:47:19 -07:00
committed by Igor Lins e Silva
parent ee60cad652
commit f935e85ead
3 changed files with 282 additions and 4 deletions
+173
View File
@@ -0,0 +1,173 @@
"""
diary_ingest.py — Ingest daily summary files into the palace.
Architecture:
- ONE drawer per day — full verbatim content, upserted as the day grows
- Closets pack topics up to 1500 chars, never split mid-topic
- Only new entries are processed (tracks entry count in state file)
- Entities extracted and stamped on metadata for filterable search
Usage:
python -m mempalace.diary_ingest --dir ~/daily_summaries --palace ~/.mempalace/palace
python -m mempalace.diary_ingest --dir ~/daily_summaries --palace ~/.mempalace/palace --force
"""
import hashlib
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from .palace import (
get_collection,
get_closets_collection,
build_closet_lines,
upsert_closet_lines,
CLOSET_CHAR_LIMIT,
)
from .miner import _extract_entities_for_metadata
DIARY_ENTRY_RE = re.compile(r"^## .+", re.MULTILINE)
def _split_entries(text):
"""Split diary text into (header, body) pairs per ## entry."""
parts = DIARY_ENTRY_RE.split(text)
headers = DIARY_ENTRY_RE.findall(text)
entries = []
for i, header in enumerate(headers):
body = parts[i + 1] if i + 1 < len(parts) else ""
entries.append((header.strip(), body.strip()))
return entries
def ingest_diaries(
diary_dir,
palace_path,
wing="diary",
force=False,
):
"""Ingest daily summary files into the palace.
Each date file gets ONE drawer (upserted as day grows) and
closets that pack topics atomically up to 1500 chars.
"""
diary_dir = Path(diary_dir).expanduser().resolve()
if not diary_dir.exists():
print(f"Diary directory not found: {diary_dir}")
return
diary_files = sorted(diary_dir.glob("*.md"))
if not diary_files:
print(f"No .md files in {diary_dir}")
return
# State tracks which entries have been closeted per file
state_file = diary_dir / ".diary_ingest_state.json"
state = {} if force else (
json.loads(state_file.read_text()) if state_file.exists() else {}
)
drawers_col = get_collection(palace_path)
closets_col = get_closets_collection(palace_path)
days_updated = 0
closets_created = 0
for diary_path in diary_files:
text = diary_path.read_text(encoding="utf-8", errors="replace")
if len(text.strip()) < 50:
continue
date_match = re.match(r"(\d{4}-\d{2}-\d{2})", diary_path.stem)
if not date_match:
continue
date_str = date_match.group(1)
# Skip if content hasn't changed
prev_size = state.get(diary_path.name, {}).get("size", 0)
curr_size = len(text)
if curr_size == prev_size and not force:
continue
now_iso = datetime.now(timezone.utc).isoformat()
drawer_id = f"drawer_diary_{date_str}"
# Extract entities from full day text
entities = _extract_entities_for_metadata(text)
# UPSERT the day's drawer (full verbatim, replaces as day grows)
drawer_meta = {
"date": date_str,
"wing": wing,
"room": "daily",
"source_file": str(diary_path),
"source_session": "daily_diary",
"filed_at": now_iso,
}
if entities:
drawer_meta["entities"] = entities
drawers_col.upsert(
documents=[text],
ids=[drawer_id],
metadatas=[drawer_meta],
)
# Split into entries and find new ones
entries = _split_entries(text)
prev_entry_count = state.get(diary_path.name, {}).get("entry_count", 0)
new_entries = entries[prev_entry_count:] if not force else entries
if new_entries:
# Build closet lines from new entries
all_lines = []
for header, body in new_entries:
entry_text = f"{header}\n{body}"
entry_lines = build_closet_lines(
str(diary_path), [drawer_id], entry_text, wing, "daily"
)
all_lines.extend(entry_lines)
if all_lines:
closet_id_base = f"closet_diary_{date_str}"
closet_meta = {
"date": date_str,
"wing": wing,
"room": "daily",
"source_file": str(diary_path),
"filed_at": now_iso,
}
if entities:
closet_meta["entities"] = entities
n = upsert_closet_lines(
closets_col, closet_id_base, all_lines, closet_meta
)
closets_created += n
state[diary_path.name] = {
"size": curr_size,
"entry_count": len(entries),
"ingested_at": now_iso,
}
days_updated += 1
state_file.write_text(json.dumps(state, indent=2))
if days_updated:
print(f"Diary: {days_updated} days updated, {closets_created} new closets")
return {"days_updated": days_updated, "closets_created": closets_created}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Ingest daily summaries into the palace")
parser.add_argument("--dir", required=True, help="Path to daily_summaries directory")
parser.add_argument("--palace", default=os.path.expanduser("~/.mempalace/palace"))
parser.add_argument("--wing", default="diary")
parser.add_argument("--force", action="store_true")
args = parser.parse_args()
ingest_diaries(args.dir, args.palace, wing=args.wing, force=args.force)
+47 -2
View File
@@ -371,6 +371,43 @@ def chunk_text(content: str, source_file: str) -> list:
# =============================================================================
def _extract_entities_for_metadata(content: str) -> str:
"""Extract entity names from content for metadata tagging.
Returns semicolon-separated string of entity names found in the text,
suitable for ChromaDB metadata filtering.
"""
import re
# Load known entities from registry if available
known_names = set()
registry_path = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json")
if os.path.exists(registry_path):
try:
import json
kd = json.loads(open(registry_path).read())
for cat in kd.values():
if isinstance(cat, list):
known_names.update(cat)
except Exception:
pass
matched = set()
# Match known entities
for name in known_names:
if re.search(r'(?<!\w)' + re.escape(name) + r'(?!\w)', content):
matched.add(name)
# Also catch capitalized words appearing 2+ times (likely proper nouns)
words = re.findall(r"\b[A-Z][a-z]{2,}\b", content[:5000])
freq = {}
for w in words:
freq[w] = freq.get(w, 0) + 1
for w, c in freq.items():
if c >= 2 and len(w) > 2:
matched.add(w)
return ";".join(sorted(matched))[:500] if matched else ""
def add_drawer(
collection, wing: str, room: str, content: str, source_file: str, chunk_index: int, agent: str
):
@@ -390,6 +427,10 @@ def add_drawer(
metadata["source_mtime"] = os.path.getmtime(source_file)
except OSError:
pass
# Tag with entity names for filterable search
entities = _extract_entities_for_metadata(content)
if entities:
metadata["entities"] = entities
collection.upsert(
documents=[content],
ids=[drawer_id],
@@ -479,13 +520,17 @@ def process_file(
]
closet_lines = build_closet_lines(source_file, drawer_ids, content, wing, room)
closet_id_base = f"closet_{wing}_{room}_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}"
upsert_closet_lines(closets_col, closet_id_base, closet_lines, {
entities = _extract_entities_for_metadata(content)
closet_meta = {
"wing": wing,
"room": room,
"source_file": source_file,
"drawer_count": drawers_added,
"filed_at": datetime.now().isoformat(),
})
}
if entities:
closet_meta["entities"] = entities
upsert_closet_lines(closets_col, closet_id_base, closet_lines, closet_meta)
return drawers_added, room
+62 -2
View File
@@ -2,11 +2,14 @@
"""
searcher.py — Find anything. Exact words.
Semantic search against the palace.
Returns verbatim text — the actual words, never summaries.
Hybrid search: BM25 keyword matching + vector semantic similarity.
Searches closets first (fast index), then hydrates full drawer content.
Falls back to direct drawer search for palaces without closets.
"""
import logging
import math
import re
from pathlib import Path
from .palace import get_collection, get_closets_collection
@@ -18,6 +21,59 @@ class SearchError(Exception):
"""Raised when search cannot proceed (e.g. no palace found)."""
def _bm25_score(query: str, document: str, k1: float = 1.5, b: float = 0.75, avg_dl: float = 500) -> float:
"""Simple BM25 score for a single document against a query.
This is a lightweight keyword-matching signal that complements vector
similarity. It catches exact matches that embeddings might miss
(e.g., specific names, project codes, error messages).
"""
query_terms = set(re.findall(r'\w{2,}', query.lower()))
doc_terms = re.findall(r'\w{2,}', document.lower())
if not query_terms or not doc_terms:
return 0.0
doc_len = len(doc_terms)
term_freq = {}
for t in doc_terms:
term_freq[t] = term_freq.get(t, 0) + 1
score = 0.0
for term in query_terms:
tf = term_freq.get(term, 0)
if tf > 0:
# Simplified IDF — treat each query term as moderately rare
idf = math.log(2.0)
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * doc_len / avg_dl)
score += idf * numerator / denominator
return score
def _hybrid_rank(vector_results, query: str, vector_weight: float = 0.6, bm25_weight: float = 0.4):
"""Re-rank results using both vector distance and BM25 keyword score.
Returns results sorted by combined score (higher = better).
"""
if not vector_results:
return vector_results
# Normalize vector distances to 0-1 similarity
max_dist = max(r.get("distance", 1.0) for r in vector_results) or 1.0
for r in vector_results:
vec_sim = max(0.0, 1 - r.get("distance", 1.0) / max(max_dist, 0.001))
bm25 = _bm25_score(query, r.get("text", ""))
# Normalize BM25 to roughly 0-1 range
bm25_norm = min(bm25 / 3.0, 1.0)
r["_hybrid_score"] = vector_weight * vec_sim + bm25_weight * bm25_norm
r["bm25_score"] = round(bm25, 3)
vector_results.sort(key=lambda r: r["_hybrid_score"], reverse=True)
# Clean up internal field
for r in vector_results:
del r["_hybrid_score"]
return vector_results
def build_where_filter(wing: str = None, room: str = None) -> dict:
"""Build ChromaDB where filter for wing/room filtering."""
if wing and room:
@@ -186,6 +242,8 @@ def search_memories(
break
if hits:
# Re-rank with BM25 hybrid scoring
hits = _hybrid_rank(hits, query)
return {
"query": query,
"filters": {"wing": wing, "room": room},
@@ -227,6 +285,8 @@ def search_memories(
}
)
# Re-rank with BM25 hybrid scoring
hits = _hybrid_rank(hits, query)
return {
"query": query,
"filters": {"wing": wing, "room": room},