merge: pr/cross-wing-tunnels + rebuild drawer-grep on hardened closet path

Merges the full hardened stack (#788 closets, #789 entity/BM25/diary,
#790 tunnels) and reimplements the drawer-grep feature in a way that
composes with the chunk-level closet-first search instead of fighting it.

## Background

The original PR added "drawer-grep" on top of the pre-hardening closet
code that returned whole-file blobs. My #788 hardening changed that
path to return *chunk-level* hits by parsing each closet's
``→drawer_id`` pointers and hydrating exactly those drawers. That made
the original drawer-grep grep-over-all-drawers logic redundant — the
closet already points at the relevant chunk.

What remained valuable from the original PR was the *context expansion*
idea: a chunk boundary can clip a thought mid-stride (matched chunk
says "here's a breakdown:" and the breakdown lives in the next chunk),
so callers want ±1 neighbor chunks for free rather than a follow-up
get_drawer call.

## Change

New ``_expand_with_neighbors(drawers_col, doc, meta, radius=1)`` helper
in searcher.py:

* Reads ``source_file`` + ``chunk_index`` from the matched drawer's
  metadata.
* Fetches the ±radius sibling chunks in a SINGLE ChromaDB query using
  ``$and + $in`` — no "fetch all drawers for source" blowup.
* Sorts retrieved chunks by chunk_index, joins with ``\n\n``.
* Does a cheap metadata-only second query to compute ``total_drawers``
  so callers know where in the file they landed.
* Graceful fallback to the matched doc alone on any ChromaDB failure or
  missing metadata — search never breaks because expansion failed.

``_closet_first_hits`` now calls this helper and tags each hit with
``drawer_index`` + ``total_drawers``. Hit shape stays consistent with
the direct-search path (both still carry ``matched_via``) so callers
can't tell which path produced a given hit except via that field.

## Tests

6 new cases in TestDrawerGrepExpansion:
* neighbors returned in chunk_index order (not hash order)
* edge case: matched chunk at index 0 — only next neighbor surfaces
* edge case: matched chunk at last index — only prev neighbor surfaces
* edge case: 1-drawer file — returns just the matched doc
* missing/non-int chunk_index metadata — graceful fallback
* end-to-end via ``search_memories`` — closet-first hit carries
  drawer_index, total_drawers, and includes ±1 neighbors

761/761 suite pass; ruff + format clean on CI-pinned 0.4.x.

Merge resolutions: miner.py kept develop's purge+NORMALIZE_VERSION;
searcher.py dropped the old whole-file-blob block entirely in favor of
rebuilding context expansion on top of ``_closet_first_hits``;
test_closets.py took develop's 47-test baseline and appended
TestDrawerGrepExpansion.
This commit is contained in:
Igor Lins e Silva
2026-04-13 18:08:01 -03:00
18 changed files with 2088 additions and 506 deletions
+305 -134
View File
@@ -12,7 +12,11 @@ import math
import re
from pathlib import Path
from .palace import get_collection, get_closets_collection
from .palace import get_closets_collection, get_collection
# Closet pointer line format: "topic|entities|→drawer_id_a,drawer_id_b"
# Multiple lines may join with newlines inside one closet document.
_CLOSET_DRAWER_REF_RE = re.compile(r"→([\w,]+)")
logger = logging.getLogger("mempalace_mcp")
@@ -21,57 +25,109 @@ class SearchError(Exception):
"""Raised when search cannot proceed (e.g. no palace found)."""
def _bm25_score(query: str, document: str, k1: float = 1.5, b: float = 0.75, avg_dl: float = 500) -> float:
"""Simple BM25 score for a single document against a query.
_TOKEN_RE = re.compile(r"\w{2,}", re.UNICODE)
This is a lightweight keyword-matching signal that complements vector
similarity. It catches exact matches that embeddings might miss
(e.g., specific names, project codes, error messages).
def _tokenize(text: str) -> list:
"""Lowercase + strip to alphanumeric tokens of length ≥ 2."""
return _TOKEN_RE.findall(text.lower())
def _bm25_scores(
query: str,
documents: list,
k1: float = 1.5,
b: float = 0.75,
) -> list:
"""Compute Okapi-BM25 scores for ``query`` against each document.
IDF is computed over the *provided corpus* using the Lucene/BM25+
smoothed formula ``log((N - df + 0.5) / (df + 0.5) + 1)``, which is
always non-negative. This is well-defined for re-ranking a small
candidate set returned by vector retrieval — IDF then reflects how
discriminative each query term is *within the candidates*, exactly
what's needed to reorder them.
Parameters mirror Okapi-BM25 conventions:
k1 — term-frequency saturation (1.2-2.0 typical, 1.5 default)
b — length normalization (0.0 = none, 1.0 = full, 0.75 default)
Returns a list of scores in the same order as ``documents``.
"""
query_terms = set(re.findall(r'\w{2,}', query.lower()))
doc_terms = re.findall(r'\w{2,}', document.lower())
if not query_terms or not doc_terms:
return 0.0
doc_len = len(doc_terms)
term_freq = {}
for t in doc_terms:
term_freq[t] = term_freq.get(t, 0) + 1
n_docs = len(documents)
query_terms = set(_tokenize(query))
if not query_terms or n_docs == 0:
return [0.0] * n_docs
score = 0.0
for term in query_terms:
tf = term_freq.get(term, 0)
if tf > 0:
# Simplified IDF — treat each query term as moderately rare
idf = math.log(2.0)
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * doc_len / avg_dl)
score += idf * numerator / denominator
return score
tokenized = [_tokenize(d) for d in documents]
doc_lens = [len(toks) for toks in tokenized]
if not any(doc_lens):
return [0.0] * n_docs
avgdl = sum(doc_lens) / n_docs or 1.0
# Document frequency: how many docs contain each query term?
df = {term: 0 for term in query_terms}
for toks in tokenized:
seen = set(toks) & query_terms
for term in seen:
df[term] += 1
idf = {term: math.log((n_docs - df[term] + 0.5) / (df[term] + 0.5) + 1) for term in query_terms}
scores = []
for toks, dl in zip(tokenized, doc_lens):
if dl == 0:
scores.append(0.0)
continue
tf: dict = {}
for t in toks:
if t in query_terms:
tf[t] = tf.get(t, 0) + 1
score = 0.0
for term, freq in tf.items():
num = freq * (k1 + 1)
den = freq + k1 * (1 - b + b * dl / avgdl)
score += idf[term] * num / den
scores.append(score)
return scores
def _hybrid_rank(vector_results, query: str, vector_weight: float = 0.6, bm25_weight: float = 0.4):
"""Re-rank results using both vector distance and BM25 keyword score.
def _hybrid_rank(
results: list,
query: str,
vector_weight: float = 0.6,
bm25_weight: float = 0.4,
) -> list:
"""Re-rank ``results`` by a convex combination of vector similarity and BM25.
Returns results sorted by combined score (higher = better).
* Vector similarity uses absolute cosine sim ``max(0, 1 - distance)`` —
ChromaDB's hnsw cosine distance lives in ``[0, 2]`` (0 = identical).
Absolute (not relative-to-max) means adding/removing a candidate
can't reshuffle the others.
* BM25 is real Okapi-BM25 with corpus-relative IDF over the candidates
themselves. Since the absolute scale is unbounded, BM25 is min-max
normalized within the candidate set so weights are commensurable.
Mutates each result dict to add ``bm25_score`` and reorders the list
in place. Returns the same list for convenience.
"""
if not vector_results:
return vector_results
if not results:
return results
# Normalize vector distances to 0-1 similarity
max_dist = max(r.get("distance", 1.0) for r in vector_results) or 1.0
for r in vector_results:
vec_sim = max(0.0, 1 - r.get("distance", 1.0) / max(max_dist, 0.001))
bm25 = _bm25_score(query, r.get("text", ""))
# Normalize BM25 to roughly 0-1 range
bm25_norm = min(bm25 / 3.0, 1.0)
r["_hybrid_score"] = vector_weight * vec_sim + bm25_weight * bm25_norm
r["bm25_score"] = round(bm25, 3)
docs = [r.get("text", "") for r in results]
bm25_raw = _bm25_scores(query, docs)
max_bm25 = max(bm25_raw) if bm25_raw else 0.0
bm25_norm = [s / max_bm25 for s in bm25_raw] if max_bm25 > 0 else [0.0] * len(bm25_raw)
vector_results.sort(key=lambda r: r["_hybrid_score"], reverse=True)
# Clean up internal field
for r in vector_results:
del r["_hybrid_score"]
return vector_results
scored = []
for r, raw, norm in zip(results, bm25_raw, bm25_norm):
vec_sim = max(0.0, 1.0 - r.get("distance", 1.0))
r["bm25_score"] = round(raw, 3)
scored.append((vector_weight * vec_sim + bm25_weight * norm, r))
scored.sort(key=lambda pair: pair[0], reverse=True)
results[:] = [r for _, r in scored]
return results
def build_where_filter(wing: str = None, room: str = None) -> dict:
@@ -85,6 +141,187 @@ def build_where_filter(wing: str = None, room: str = None) -> dict:
return {}
def _extract_drawer_ids_from_closet(closet_doc: str) -> list:
"""Parse all `→drawer_id_a,drawer_id_b` pointers out of a closet document.
Preserves order and dedupes.
"""
seen: dict = {}
for match in _CLOSET_DRAWER_REF_RE.findall(closet_doc):
for did in match.split(","):
did = did.strip()
if did and did not in seen:
seen[did] = None
return list(seen.keys())
def _expand_with_neighbors(drawers_col, matched_doc: str, matched_meta: dict, radius: int = 1):
"""Expand a matched drawer with its ±radius sibling chunks in the same source file.
Motivation — "drawer-grep context" feature: a closet hit returns one
drawer, but the chunk boundary may clip mid-thought (e.g., the matched
chunk says "here's a breakdown:" and the actual breakdown lives in the
next chunk). Fetching the small neighborhood around the match gives
callers enough context without forcing a follow-up ``get_drawer`` call.
Returns a dict with:
``text`` combined chunks in chunk_index order
``drawer_index`` the matched chunk's index in the source file
``total_drawers`` total drawer count for the source file (or None)
On any ChromaDB failure or missing metadata, falls back to returning the
matched drawer alone so search never breaks because neighbor expansion
failed.
"""
src = matched_meta.get("source_file")
chunk_idx = matched_meta.get("chunk_index")
if not src or not isinstance(chunk_idx, int):
return {"text": matched_doc, "drawer_index": chunk_idx, "total_drawers": None}
target_indexes = [chunk_idx + offset for offset in range(-radius, radius + 1)]
try:
neighbors = drawers_col.get(
where={
"$and": [
{"source_file": src},
{"chunk_index": {"$in": target_indexes}},
]
},
include=["documents", "metadatas"],
)
except Exception:
return {"text": matched_doc, "drawer_index": chunk_idx, "total_drawers": None}
indexed_docs = []
for doc, meta in zip(neighbors.get("documents") or [], neighbors.get("metadatas") or []):
ci = meta.get("chunk_index")
if isinstance(ci, int):
indexed_docs.append((ci, doc))
indexed_docs.sort(key=lambda pair: pair[0])
if not indexed_docs:
combined_text = matched_doc
else:
combined_text = "\n\n".join(doc for _, doc in indexed_docs)
# Cheap total_drawers lookup: metadata-only scan of the source file.
total_drawers = None
try:
all_meta = drawers_col.get(where={"source_file": src}, include=["metadatas"])
ids = all_meta.get("ids") or []
total_drawers = len(ids) if ids else None
except Exception:
pass
return {
"text": combined_text,
"drawer_index": chunk_idx,
"total_drawers": total_drawers,
}
def _closet_first_hits(
palace_path: str,
query: str,
where: dict,
drawers_col,
n_results: int,
max_distance: float,
):
"""Run a closet-first search and return chunk-level drawer hits.
Returns:
non-empty list of hits when the closet path produced usable matches.
``None`` when the closet collection is empty/missing OR when every
candidate drawer was filtered out (e.g. by max_distance); the
caller should fall back to direct drawer search.
"""
try:
closets_col = get_closets_collection(palace_path, create=False)
except Exception:
return None
try:
ckwargs = {
"query_texts": [query],
"n_results": max(n_results * 2, 5),
"include": ["documents", "metadatas", "distances"],
}
if where:
ckwargs["where"] = where
closet_results = closets_col.query(**ckwargs)
except Exception:
return None
closet_docs = closet_results["documents"][0] if closet_results["documents"] else []
if not closet_docs:
return None
closet_metas = closet_results["metadatas"][0]
closet_dists = closet_results["distances"][0]
# Collect candidate drawer IDs in closet-rank order, dedupe, remember
# which closet (and its distance/preview) introduced each one.
drawer_id_order: list = []
drawer_provenance: dict = {}
for cdoc, cmeta, cdist in zip(closet_docs, closet_metas, closet_dists):
for did in _extract_drawer_ids_from_closet(cdoc):
if did in drawer_provenance:
continue
drawer_provenance[did] = (cdist, cdoc, cmeta)
drawer_id_order.append(did)
if not drawer_id_order:
return None
# Hydrate exactly those drawers — chunk-level, not whole-file.
try:
fetched = drawers_col.get(
ids=drawer_id_order,
include=["documents", "metadatas"],
)
except Exception:
return None
fetched_ids = fetched.get("ids") or []
fetched_docs = fetched.get("documents") or []
fetched_metas = fetched.get("metadatas") or []
fetched_map = {
did: (doc, meta) for did, doc, meta in zip(fetched_ids, fetched_docs, fetched_metas)
}
hits: list = []
for did in drawer_id_order:
if did not in fetched_map:
continue # closet pointed to a drawer that no longer exists
doc, meta = fetched_map[did]
cdist, cdoc, _ = drawer_provenance[did]
if max_distance > 0.0 and cdist > max_distance:
continue
# Expand with ±1 neighbor chunks from the same source file so a
# closet hit that lands mid-thought still returns enough context to
# be useful without a follow-up get_drawer call.
expansion = _expand_with_neighbors(drawers_col, doc, meta, radius=1)
hits.append(
{
"text": expansion["text"],
"wing": meta.get("wing", "unknown"),
"room": meta.get("room", "unknown"),
"source_file": Path(meta.get("source_file", "?")).name,
"similarity": round(max(0.0, 1 - cdist), 3),
"distance": round(cdist, 4),
"matched_via": "closet",
"closet_preview": cdoc[:200],
"drawer_index": expansion["drawer_index"],
"total_drawers": expansion["total_drawers"],
}
)
if len(hits) >= n_results:
break
return hits if hits else None
def search(query: str, palace_path: str, wing: str = None, room: str = None, n_results: int = 5):
"""
Search the palace. Returns verbatim drawer content.
@@ -183,98 +420,31 @@ def search_memories(
where = build_where_filter(wing, room)
# Try closet-first search: search the compact index, then hydrate drawers
closet_hits = []
try:
closets_col = get_closets_collection(palace_path, create=False)
ckwargs = {
"query_texts": [query],
"n_results": n_results * 2, # over-fetch closets to find best drawers
"include": ["documents", "metadatas", "distances"],
# Closet-first search: scan the compact index, parse drawer pointers
# from each matching line, then hydrate exactly those drawers. This
# keeps the result shape chunk-level (consistent with direct search)
# and applies the same max_distance filter.
closet_hits = _closet_first_hits(
palace_path=palace_path,
query=query,
where=where,
drawers_col=drawers_col,
n_results=n_results,
max_distance=max_distance,
)
if closet_hits is not None:
# Re-rank chunk-level closet hits with the same hybrid scoring as
# the direct path. The vector half here uses the closet's distance
# (query↔topic-line) — that's intentional: closets are *meant* to
# be the semantic-narrowing signal, and BM25 then enforces actual
# keyword presence in the hydrated drawer text.
closet_hits = _hybrid_rank(closet_hits, query)
return {
"query": query,
"filters": {"wing": wing, "room": room},
"total_before_filter": len(closet_hits),
"results": closet_hits,
}
if where:
ckwargs["where"] = where
closet_results = closets_col.query(**ckwargs)
if closet_results["documents"][0]:
closet_hits = list(zip(
closet_results["documents"][0],
closet_results["metadatas"][0],
closet_results["distances"][0],
))
except Exception:
pass # no closets yet — fall through to direct drawer search
# If closets found results, hydrate the referenced drawers
MAX_HYDRATION_CHARS = 10000 # cap to prevent blowup on large source files
if closet_hits:
import re
seen_sources = set()
hits = []
for closet_doc, closet_meta, closet_dist in closet_hits:
source = closet_meta.get("source_file", "")
if source in seen_sources:
continue
seen_sources.add(source)
# Find drawers for this source file, grep for most relevant chunk
try:
drawer_results = drawers_col.get(
where={"source_file": source},
include=["documents", "metadatas"],
)
if drawer_results.get("ids"):
# Drawer-grep: score each chunk against the query,
# return the best-matching chunk first + surrounding context
query_terms = set(re.findall(r'\w{2,}', query.lower()))
best_idx = 0
best_score = -1
for idx, doc in enumerate(drawer_results["documents"]):
doc_lower = doc.lower()
score = sum(1 for t in query_terms if t in doc_lower)
if score > best_score:
best_score = score
best_idx = idx
# Build result: best chunk first, then neighbors
docs = drawer_results["documents"]
n_docs = len(docs)
# Include best chunk + 1 before + 1 after for context
start = max(0, best_idx - 1)
end = min(n_docs, best_idx + 2)
relevant_text = "\n\n".join(docs[start:end])
if len(relevant_text) > MAX_HYDRATION_CHARS:
relevant_text = relevant_text[:MAX_HYDRATION_CHARS] + f"\n\n[...truncated. {n_docs} total drawers. Use mempalace_get_drawer for full content.]"
meta = drawer_results["metadatas"][best_idx]
hits.append({
"text": relevant_text,
"wing": meta.get("wing", "unknown"),
"room": meta.get("room", "unknown"),
"source_file": Path(source).name,
"similarity": round(max(0.0, 1 - closet_dist), 3),
"distance": round(closet_dist, 4),
"matched_via": "closet",
"closet_preview": closet_doc[:200],
"drawer_index": best_idx,
"total_drawers": n_docs,
})
except Exception:
pass
if len(hits) >= n_results:
break
if hits:
# Re-rank with BM25 hybrid scoring
hits = _hybrid_rank(hits, query)
return {
"query": query,
"filters": {"wing": wing, "room": room},
"total_before_filter": len(closet_hits),
"results": hits,
}
# Fallback: direct drawer search (no closets yet, or closets empty)
try:
@@ -307,6 +477,7 @@ def search_memories(
"source_file": Path(meta.get("source_file", "?")).name,
"similarity": round(max(0.0, 1 - dist), 3),
"distance": round(dist, 4),
"matched_via": "drawer",
}
)