diff --git a/tests/test_closets.py b/tests/test_closets.py new file mode 100644 index 0000000..b365102 --- /dev/null +++ b/tests/test_closets.py @@ -0,0 +1,201 @@ +"""Tests for the closet layer, mine_lock, entity metadata, BM25 hybrid search, +and diary ingest. + +Content derived from Milla's omnibus test file; trimmed to only the features +present in this PR stack (#784 lock, #788 closets, this PR's entity/BM25/diary). +Strip-noise tests live with #785; tunnel tests live with the tunnels PR. +""" + +import os +import tempfile +import threading +import time + +from mempalace.palace import ( + CLOSET_CHAR_LIMIT, + build_closet_lines, + get_closets_collection, + get_collection, + mine_lock, + upsert_closet_lines, +) +from mempalace.miner import _extract_entities_for_metadata +from mempalace.searcher import _bm25_score, _hybrid_rank + + +# ── mine_lock ──────────────────────────────────────────────────────────── + + +class TestMineLock: + def test_lock_acquires_and_releases(self): + with mine_lock("/tmp/test_lock_file.txt"): + lock_dir = os.path.expanduser("~/.mempalace/locks") + assert os.path.isdir(lock_dir) + + def test_lock_blocks_concurrent_access(self): + results = [] + + def worker(name): + start = time.time() + with mine_lock("/tmp/same_file_lock_test.txt"): + results.append((name, time.time() - start)) + time.sleep(0.2) + + t1 = threading.Thread(target=worker, args=("a",)) + t2 = threading.Thread(target=worker, args=("b",)) + t1.start() + time.sleep(0.05) + t2.start() + t1.join() + t2.join() + + # Second thread should have waited + wait_times = sorted(results, key=lambda x: x[1]) + assert wait_times[1][1] > 0.1, "Second thread should block" + + +# ── closet lines ───────────────────────────────────────────────────────── + + +class TestBuildClosetLines: + def test_returns_list_of_lines(self): + lines = build_closet_lines( + "/tmp/test.py", ["drawer_001"], "We built the auth system", "code", "general" + ) + assert isinstance(lines, list) + assert len(lines) >= 1 + + def test_each_line_has_pointer(self): + lines = build_closet_lines( + "/tmp/test.py", + ["drawer_001", "drawer_002"], + "We built the auth system and tested the login flow", + "code", + "general", + ) + for line in lines: + assert "→" in line, f"Line missing pointer: {line}" + + def test_fallback_when_no_topics(self): + lines = build_closet_lines( + "/tmp/test.py", ["drawer_001"], "short text", "wing", "room" + ) + assert len(lines) >= 1 + assert "→" in lines[0] + + +# ── upsert_closet_lines ───────────────────────────────────────────────── + + +class TestUpsertClosetLines: + def test_writes_closets(self): + with tempfile.TemporaryDirectory() as tmpdir: + col = get_closets_collection(tmpdir) + lines = [ + "topic one|Entity1|→drawer_001", + "topic two|Entity2|→drawer_002", + ] + n = upsert_closet_lines(col, "test_closet", lines, {"wing": "test"}) + assert n >= 1 + assert col.count() >= 1 + + def test_never_splits_mid_topic(self): + with tempfile.TemporaryDirectory() as tmpdir: + col = get_closets_collection(tmpdir) + # Create lines that together exceed CLOSET_CHAR_LIMIT + lines = [f"topic_{i}|{'x' * 200}|→drawer_{i}" for i in range(20)] + n = upsert_closet_lines(col, "test_closet", lines, {"wing": "test"}) + assert n >= 2, "Should create multiple closets" + + # Verify each closet has complete lines + all_data = col.get(include=["documents"]) + for doc in all_data["documents"]: + for line in doc.strip().split("\n"): + assert "→" in line, f"Split topic found: {line}" + + def test_respects_char_limit(self): + with tempfile.TemporaryDirectory() as tmpdir: + col = get_closets_collection(tmpdir) + lines = [f"topic_{i}|entities|→drawer_{i}" for i in range(50)] + upsert_closet_lines(col, "test_closet", lines, {"wing": "test"}) + + all_data = col.get(include=["documents"]) + for doc in all_data["documents"]: + assert len(doc) <= CLOSET_CHAR_LIMIT + 100 # small buffer for existing content + + +# ── entity metadata ────────────────────────────────────────────────────── + + +class TestEntityMetadata: + def test_extracts_capitalized_names(self): + text = "Ben reviewed the code. Ben approved it. Igor flagged two issues. Igor fixed them." + entities = _extract_entities_for_metadata(text) + assert "Ben" in entities + assert "Igor" in entities + + def test_empty_for_no_entities(self): + text = "this is all lowercase with no proper nouns at all" + entities = _extract_entities_for_metadata(text) + assert entities == "" + + def test_semicolon_separated(self): + text = "Alice and Bob met Charlie. Alice said hello. Bob agreed. Charlie laughed." + entities = _extract_entities_for_metadata(text) + assert ";" in entities + + +# ── BM25 hybrid search ────────────────────────────────────────────────── + + +class TestBM25: + def test_bm25_score_positive_for_match(self): + score = _bm25_score("database migration", "We migrated the database to Postgres") + assert score > 0 + + def test_bm25_score_zero_for_no_match(self): + score = _bm25_score("quantum physics", "We built a web application in React") + assert score == 0.0 + + def test_hybrid_rank_reorders(self): + results = [ + {"text": "database schema design for Postgres", "distance": 0.5}, + {"text": "unrelated topic about cooking", "distance": 0.3}, + ] + ranked = _hybrid_rank(results, "database Postgres schema") + # The database result should rank higher despite worse vector distance + assert "database" in ranked[0]["text"] + + +# ── diary ingest ───────────────────────────────────────────────────────── + + +class TestDiaryIngest: + def test_ingest_creates_drawers_and_closets(self): + with tempfile.TemporaryDirectory() as palace_dir: + diary_dir = tempfile.mkdtemp() + # Write a test diary + with open(os.path.join(diary_dir, "2026-04-13.md"), "w") as f: + f.write("# 2026-04-13\n\n## 10:00 PDT — Test\n\nBuilt the auth system.\n") + + from mempalace.diary_ingest import ingest_diaries + + result = ingest_diaries(diary_dir, palace_dir, force=True) + assert result["days_updated"] >= 1 + + # Check drawer exists + drawers = get_collection(palace_dir) + count = drawers.count() + assert count >= 1 + + def test_ingest_skips_unchanged(self): + with tempfile.TemporaryDirectory() as palace_dir: + diary_dir = tempfile.mkdtemp() + with open(os.path.join(diary_dir, "2026-04-13.md"), "w") as f: + f.write("# 2026-04-13\n\n## 10:00 — Test\n\nContent.\n") + + from mempalace.diary_ingest import ingest_diaries + + ingest_diaries(diary_dir, palace_dir, force=True) + result = ingest_diaries(diary_dir, palace_dir) # second run, no force + assert result["days_updated"] == 0