test: expand coverage to 70%, fix mcp_server CI crash (threshold 60%)

Add/expand tests for normalize (39%→97%), searcher (39%→100%),
layers (28%→97%), split_mega_files (34%→72%).

Fix mcp_server.py parse_args→parse_known_args to prevent SystemExit
when imported during pytest (CI was crashing on all test jobs).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Tal Muskal
2026-04-08 21:07:03 +03:00
parent ab66c0e945
commit e24d8ca733
6 changed files with 1458 additions and 42 deletions
+2 -1
View File
@@ -44,7 +44,8 @@ def _parse_args():
metavar="PATH",
help="Path to the palace directory (overrides config file and env var)",
)
return parser.parse_args()
args, _ = parser.parse_known_args()
return args
_args = _parse_args()
+1 -1
View File
@@ -69,7 +69,7 @@ testpaths = ["tests"]
source = ["mempalace"]
[tool.coverage.report]
fail_under = 50
fail_under = 60
show_missing = true
exclude_lines = [
"if __name__",
+614 -17
View File
@@ -1,9 +1,9 @@
"""Tests for mempalace.layers — focused on Layer0."""
"""Tests for mempalace.layers — Layer0, Layer1, Layer2, Layer3, MemoryStack."""
import os
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from mempalace.layers import Layer0
from mempalace.layers import Layer0, Layer1, Layer2, Layer3, MemoryStack
# ── Layer0 — with identity file ─────────────────────────────────────────
@@ -23,10 +23,8 @@ def test_layer0_caches_text(tmp_path):
identity_file.write_text("Hello world")
layer = Layer0(identity_path=str(identity_file))
first = layer.render()
# Modify file after first read
identity_file.write_text("Changed content")
second = layer.render()
# Should return cached version
assert first == second
assert second == "Hello world"
@@ -41,7 +39,7 @@ def test_layer0_missing_file_returns_default(tmp_path):
def test_layer0_token_estimate(tmp_path):
identity_file = tmp_path / "identity.txt"
content = "A" * 400 # 400 chars ~ 100 tokens
content = "A" * 400
identity_file.write_text(content)
layer = Layer0(identity_path=str(identity_file))
estimate = layer.token_estimate()
@@ -72,51 +70,650 @@ def test_layer0_default_path():
# ── Layer1 — mocked chromadb ────────────────────────────────────────────
def _mock_chromadb_for_layer(docs, metas, monkeypatch=None):
"""Return a mock PersistentClient whose collection.get returns docs/metas."""
mock_col = MagicMock()
# First batch returns data, second batch returns empty (end of pagination)
mock_col.get.side_effect = [
{"documents": docs, "metadatas": metas},
{"documents": [], "metadatas": []},
]
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
return mock_client
def test_layer1_no_palace():
"""Layer1 returns helpful message when no palace exists."""
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent/palace"
from mempalace.layers import Layer1
layer = Layer1(palace_path="/nonexistent/palace")
result = layer.generate()
assert "No palace found" in result or "No memories" in result
def test_layer1_generates_essential_story():
docs = [
"Important memory about project decisions",
"Key architectural choice for the backend",
]
metas = [
{"room": "decisions", "source_file": "meeting.txt", "importance": 5},
{"room": "architecture", "source_file": "design.txt", "importance": 4},
]
mock_client = _mock_chromadb_for_layer(docs, metas)
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake")
result = layer.generate()
assert "ESSENTIAL STORY" in result
assert "project decisions" in result
def test_layer1_empty_palace():
mock_col = MagicMock()
mock_col.get.return_value = {"documents": [], "metadatas": []}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake")
result = layer.generate()
assert "No memories" in result
def test_layer1_with_wing_filter():
docs = ["Memory about project X"]
metas = [{"room": "general", "source_file": "x.txt", "importance": 3}]
mock_client = _mock_chromadb_for_layer(docs, metas)
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake", wing="project_x")
result = layer.generate()
assert "ESSENTIAL STORY" in result
# Verify wing filter was passed
call_kwargs = mock_client.get_collection.return_value.get.call_args_list[0][1]
assert call_kwargs.get("where") == {"wing": "project_x"}
def test_layer1_truncates_long_snippets():
docs = ["A" * 300]
metas = [{"room": "general", "source_file": "long.txt"}]
mock_client = _mock_chromadb_for_layer(docs, metas)
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake")
result = layer.generate()
assert "..." in result
def test_layer1_respects_max_chars():
"""L1 stops adding entries once MAX_CHARS is reached."""
docs = [f"Memory number {i} with substantial content padding here" for i in range(30)]
metas = [{"room": "general", "source_file": f"f{i}.txt", "importance": 5} for i in range(30)]
mock_client = _mock_chromadb_for_layer(docs, metas)
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake")
layer.MAX_CHARS = 200 # Very low cap to trigger truncation
result = layer.generate()
assert "more in L3 search" in result
def test_layer1_importance_from_various_keys():
"""Layer1 tries importance, emotional_weight, weight keys."""
docs = ["mem1", "mem2", "mem3"]
metas = [
{"room": "r", "emotional_weight": 5},
{"room": "r", "weight": 1},
{"room": "r"}, # no weight key, defaults to 3
]
mock_client = _mock_chromadb_for_layer(docs, metas)
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake")
result = layer.generate()
assert "ESSENTIAL STORY" in result
def test_layer1_batch_exception_breaks():
"""If col.get raises on a batch, loop breaks gracefully."""
mock_col = MagicMock()
mock_col.get.side_effect = [
{"documents": ["doc1"], "metadatas": [{"room": "r"}]},
RuntimeError("batch error"),
]
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer1(palace_path="/fake")
result = layer.generate()
assert "ESSENTIAL STORY" in result
# ── Layer2 — mocked chromadb ────────────────────────────────────────────
def test_layer2_no_palace():
"""Layer2 returns message when no palace exists."""
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent/palace"
from mempalace.layers import Layer2
layer = Layer2(palace_path="/nonexistent/palace")
result = layer.retrieve(wing="test")
assert "No palace found" in result
def test_layer2_retrieve_with_wing():
mock_col = MagicMock()
mock_col.get.return_value = {
"documents": ["Some memory about the project"],
"metadatas": [{"room": "backend", "source_file": "notes.txt"}],
}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
result = layer.retrieve(wing="project")
assert "ON-DEMAND" in result
assert "memory about the project" in result
def test_layer2_retrieve_with_room():
mock_col = MagicMock()
mock_col.get.return_value = {
"documents": ["Backend architecture notes"],
"metadatas": [{"room": "architecture", "source_file": "arch.txt"}],
}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
result = layer.retrieve(room="architecture")
assert "ON-DEMAND" in result
def test_layer2_retrieve_wing_and_room():
mock_col = MagicMock()
mock_col.get.return_value = {
"documents": ["Filtered result"],
"metadatas": [{"room": "backend", "source_file": "x.txt"}],
}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
result = layer.retrieve(wing="proj", room="backend")
assert "ON-DEMAND" in result
call_kwargs = mock_col.get.call_args[1]
assert "$and" in call_kwargs.get("where", {})
def test_layer2_retrieve_empty():
mock_col = MagicMock()
mock_col.get.return_value = {"documents": [], "metadatas": []}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
result = layer.retrieve(wing="missing")
assert "No drawers found" in result
def test_layer2_retrieve_no_filter():
mock_col = MagicMock()
mock_col.get.return_value = {"documents": [], "metadatas": []}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
layer.retrieve()
# No where filter should be passed
call_kwargs = mock_col.get.call_args[1]
assert "where" not in call_kwargs
def test_layer2_retrieve_error():
mock_col = MagicMock()
mock_col.get.side_effect = RuntimeError("db error")
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
result = layer.retrieve(wing="test")
assert "Retrieval error" in result
def test_layer2_truncates_long_snippets():
mock_col = MagicMock()
mock_col.get.return_value = {
"documents": ["B" * 400],
"metadatas": [{"room": "r", "source_file": "s.txt"}],
}
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer2(palace_path="/fake")
result = layer.retrieve(wing="test")
assert "..." in result
# ── Layer3 — mocked chromadb ────────────────────────────────────────────
def _mock_query_results(docs, metas, dists):
return {
"documents": [docs],
"metadatas": [metas],
"distances": [dists],
}
def test_layer3_no_palace():
"""Layer3 returns message when no palace exists."""
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent/palace"
from mempalace.layers import Layer3
layer = Layer3(palace_path="/nonexistent/palace")
result = layer.search("test query")
assert "No palace found" in result
def test_layer3_search_raw_no_palace():
"""Layer3.search_raw returns empty list when no palace exists."""
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent/palace"
from mempalace.layers import Layer3
layer = Layer3(palace_path="/nonexistent/palace")
result = layer.search_raw("test query")
assert result == []
def test_layer3_search_with_results():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["Found this important memory"],
[{"wing": "project", "room": "backend", "source_file": "notes.txt"}],
[0.2],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
result = layer.search("important")
assert "SEARCH RESULTS" in result
assert "important memory" in result
assert "sim=0.8" in result
def test_layer3_search_no_results():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results([], [], [])
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
result = layer.search("nothing")
assert "No results found" in result
def test_layer3_search_with_wing_filter():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["result"],
[{"wing": "proj", "room": "r"}],
[0.1],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
layer.search("q", wing="proj")
call_kwargs = mock_col.query.call_args[1]
assert call_kwargs["where"] == {"wing": "proj"}
def test_layer3_search_with_room_filter():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["result"],
[{"wing": "w", "room": "backend"}],
[0.1],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
layer.search("q", room="backend")
call_kwargs = mock_col.query.call_args[1]
assert call_kwargs["where"] == {"room": "backend"}
def test_layer3_search_with_wing_and_room():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["result"],
[{"wing": "proj", "room": "backend"}],
[0.1],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
layer.search("q", wing="proj", room="backend")
call_kwargs = mock_col.query.call_args[1]
assert "$and" in call_kwargs["where"]
def test_layer3_search_error():
mock_col = MagicMock()
mock_col.query.side_effect = RuntimeError("search failed")
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
result = layer.search("q")
assert "Search error" in result
def test_layer3_search_truncates_long_docs():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["C" * 400],
[{"wing": "w", "room": "r", "source_file": "s.txt"}],
[0.1],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
result = layer.search("q")
assert "..." in result
def test_layer3_search_raw_returns_dicts():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["doc text"],
[{"wing": "proj", "room": "backend", "source_file": "f.txt"}],
[0.3],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
hits = layer.search_raw("q")
assert len(hits) == 1
assert hits[0]["text"] == "doc text"
assert hits[0]["wing"] == "proj"
assert hits[0]["similarity"] == 0.7
assert "metadata" in hits[0]
def test_layer3_search_raw_with_filters():
mock_col = MagicMock()
mock_col.query.return_value = _mock_query_results(
["doc"],
[{"wing": "w", "room": "r"}],
[0.1],
)
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
layer.search_raw("q", wing="w", room="r")
call_kwargs = mock_col.query.call_args[1]
assert "$and" in call_kwargs["where"]
def test_layer3_search_raw_error():
mock_col = MagicMock()
mock_col.query.side_effect = RuntimeError("fail")
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
layer = Layer3(palace_path="/fake")
result = layer.search_raw("q")
assert result == []
# ── MemoryStack ─────────────────────────────────────────────────────────
def test_memory_stack_wake_up(tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Atlas.")
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent"
stack = MemoryStack(
palace_path="/nonexistent",
identity_path=str(identity_file),
)
result = stack.wake_up()
assert "Atlas" in result
# L1 will say no palace found
assert "No palace" in result or "No memories" in result
def test_memory_stack_wake_up_with_wing(tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Atlas.")
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent"
stack = MemoryStack(
palace_path="/nonexistent",
identity_path=str(identity_file),
)
result = stack.wake_up(wing="my_project")
assert stack.l1.wing == "my_project"
assert "Atlas" in result
def test_memory_stack_recall(tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Atlas.")
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent"
stack = MemoryStack(
palace_path="/nonexistent",
identity_path=str(identity_file),
)
result = stack.recall(wing="test")
assert "No palace found" in result
def test_memory_stack_search(tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Atlas.")
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent"
stack = MemoryStack(
palace_path="/nonexistent",
identity_path=str(identity_file),
)
result = stack.search("test query")
assert "No palace found" in result
def test_memory_stack_status(tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Atlas.")
with patch("mempalace.layers.MempalaceConfig") as mock_cfg:
mock_cfg.return_value.palace_path = "/nonexistent"
stack = MemoryStack(
palace_path="/nonexistent",
identity_path=str(identity_file),
)
result = stack.status()
assert result["palace_path"] == "/nonexistent"
assert result["total_drawers"] == 0
assert "L0_identity" in result
assert "L1_essential" in result
assert "L2_on_demand" in result
assert "L3_deep_search" in result
def test_memory_stack_status_with_palace(tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Atlas.")
mock_col = MagicMock()
mock_col.count.return_value = 42
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with (
patch("mempalace.layers.MempalaceConfig") as mock_cfg,
patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client),
):
mock_cfg.return_value.palace_path = "/fake"
stack = MemoryStack(
palace_path="/fake",
identity_path=str(identity_file),
)
result = stack.status()
assert result["total_drawers"] == 42
assert result["L0_identity"]["exists"] is True
+514 -20
View File
@@ -1,31 +1,525 @@
import os
import json
import tempfile
from mempalace.normalize import normalize
from unittest.mock import patch
from mempalace.normalize import (
_extract_content,
_messages_to_transcript,
_try_chatgpt_json,
_try_claude_ai_json,
_try_claude_code_jsonl,
_try_codex_jsonl,
_try_normalize_json,
_try_slack_json,
normalize,
)
def test_plain_text():
f = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
f.write("Hello world\nSecond line\n")
f.close()
result = normalize(f.name)
# ── normalize() top-level ──────────────────────────────────────────────
def test_plain_text(tmp_path):
f = tmp_path / "plain.txt"
f.write_text("Hello world\nSecond line\n")
result = normalize(str(f))
assert "Hello world" in result
os.unlink(f.name)
def test_claude_json():
def test_claude_json(tmp_path):
data = [{"role": "user", "content": "Hi"}, {"role": "assistant", "content": "Hello"}]
f = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
json.dump(data, f)
f.close()
result = normalize(f.name)
f = tmp_path / "claude.json"
f.write_text(json.dumps(data))
result = normalize(str(f))
assert "Hi" in result
os.unlink(f.name)
def test_empty():
f = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
f.close()
result = normalize(f.name)
def test_empty(tmp_path):
f = tmp_path / "empty.txt"
f.write_text("")
result = normalize(str(f))
assert result.strip() == ""
os.unlink(f.name)
def test_normalize_io_error():
"""normalize raises IOError for unreadable file."""
try:
normalize("/nonexistent/path/file.txt")
assert False, "Should have raised"
except IOError as e:
assert "Could not read" in str(e)
def test_normalize_already_has_markers(tmp_path):
"""Files with >= 3 '>' lines pass through unchanged."""
content = "> question 1\nanswer 1\n> question 2\nanswer 2\n> question 3\nanswer 3\n"
f = tmp_path / "markers.txt"
f.write_text(content)
result = normalize(str(f))
assert result == content
def test_normalize_json_content_detected_by_brace(tmp_path):
"""A .txt file starting with [ triggers JSON parsing."""
data = [{"role": "user", "content": "Hey"}, {"role": "assistant", "content": "Hi there"}]
f = tmp_path / "chat.txt"
f.write_text(json.dumps(data))
result = normalize(str(f))
assert "Hey" in result
def test_normalize_whitespace_only(tmp_path):
f = tmp_path / "ws.txt"
f.write_text(" \n \n ")
result = normalize(str(f))
assert result.strip() == ""
# ── _extract_content ───────────────────────────────────────────────────
def test_extract_content_string():
assert _extract_content("hello") == "hello"
def test_extract_content_list_of_strings():
assert _extract_content(["hello", "world"]) == "hello world"
def test_extract_content_list_of_blocks():
blocks = [{"type": "text", "text": "hello"}, {"type": "image", "url": "x"}]
assert _extract_content(blocks) == "hello"
def test_extract_content_dict():
assert _extract_content({"text": "hello"}) == "hello"
def test_extract_content_none():
assert _extract_content(None) == ""
def test_extract_content_mixed_list():
blocks = ["plain", {"type": "text", "text": "block"}]
assert _extract_content(blocks) == "plain block"
# ── _try_claude_code_jsonl ─────────────────────────────────────────────
def test_claude_code_jsonl_valid():
lines = [
json.dumps({"type": "human", "message": {"content": "What is X?"}}),
json.dumps({"type": "assistant", "message": {"content": "X is Y."}}),
]
result = _try_claude_code_jsonl("\n".join(lines))
assert result is not None
assert "> What is X?" in result
assert "X is Y." in result
def test_claude_code_jsonl_user_type():
lines = [
json.dumps({"type": "user", "message": {"content": "Q"}}),
json.dumps({"type": "assistant", "message": {"content": "A"}}),
]
result = _try_claude_code_jsonl("\n".join(lines))
assert result is not None
assert "> Q" in result
def test_claude_code_jsonl_too_few_messages():
lines = [json.dumps({"type": "human", "message": {"content": "only one"}})]
result = _try_claude_code_jsonl("\n".join(lines))
assert result is None
def test_claude_code_jsonl_invalid_json_lines():
lines = [
"not json",
json.dumps({"type": "human", "message": {"content": "Q"}}),
json.dumps({"type": "assistant", "message": {"content": "A"}}),
]
result = _try_claude_code_jsonl("\n".join(lines))
assert result is not None
def test_claude_code_jsonl_non_dict_entries():
lines = [
json.dumps([1, 2, 3]),
json.dumps({"type": "human", "message": {"content": "Q"}}),
json.dumps({"type": "assistant", "message": {"content": "A"}}),
]
result = _try_claude_code_jsonl("\n".join(lines))
assert result is not None
# ── _try_codex_jsonl ───────────────────────────────────────────────────
def test_codex_jsonl_valid():
lines = [
json.dumps({"type": "session_meta", "payload": {}}),
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}
),
]
result = _try_codex_jsonl("\n".join(lines))
assert result is not None
assert "> Q" in result
def test_codex_jsonl_no_session_meta():
"""Without session_meta, codex parser returns None."""
lines = [
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}
),
]
result = _try_codex_jsonl("\n".join(lines))
assert result is None
def test_codex_jsonl_skips_non_event_msg():
lines = [
json.dumps({"type": "session_meta"}),
json.dumps({"type": "response_item", "payload": {"type": "user_message", "message": "X"}}),
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
]
result = _try_codex_jsonl("\n".join(lines))
assert result is not None
assert "X" not in result.split("> Q")[0]
def test_codex_jsonl_non_string_message():
lines = [
json.dumps({"type": "session_meta"}),
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": 123}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}
),
]
result = _try_codex_jsonl("\n".join(lines))
assert result is not None
def test_codex_jsonl_empty_text_skipped():
lines = [
json.dumps({"type": "session_meta"}),
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": " "}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}
),
]
result = _try_codex_jsonl("\n".join(lines))
assert result is not None
def test_codex_jsonl_payload_not_dict():
lines = [
json.dumps({"type": "session_meta"}),
json.dumps({"type": "event_msg", "payload": "not a dict"}),
json.dumps(
{"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}
),
json.dumps(
{"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}
),
]
result = _try_codex_jsonl("\n".join(lines))
assert result is not None
# ── _try_claude_ai_json ───────────────────────────────────────────────
def test_claude_ai_flat_messages():
data = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
]
result = _try_claude_ai_json(data)
assert result is not None
assert "> Hello" in result
def test_claude_ai_dict_with_messages_key():
data = {
"messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
]
}
result = _try_claude_ai_json(data)
assert result is not None
def test_claude_ai_privacy_export():
data = [
{
"chat_messages": [
{"role": "human", "content": "Q1"},
{"role": "ai", "content": "A1"},
]
}
]
result = _try_claude_ai_json(data)
assert result is not None
assert "> Q1" in result
def test_claude_ai_not_a_list():
result = _try_claude_ai_json("not a list")
assert result is None
def test_claude_ai_too_few_messages():
data = [{"role": "user", "content": "Hello"}]
result = _try_claude_ai_json(data)
assert result is None
def test_claude_ai_dict_with_chat_messages_key():
data = {
"chat_messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "World"},
]
}
result = _try_claude_ai_json(data)
assert result is not None
def test_claude_ai_privacy_export_non_dict_items():
"""Non-dict items in privacy export are skipped."""
data = [
{
"chat_messages": [
"not a dict",
{"role": "user", "content": "Q"},
{"role": "assistant", "content": "A"},
]
},
"not a convo",
]
result = _try_claude_ai_json(data)
assert result is not None
# ── _try_chatgpt_json ─────────────────────────────────────────────────
def test_chatgpt_json_valid():
data = {
"mapping": {
"root": {
"parent": None,
"message": None,
"children": ["msg1"],
},
"msg1": {
"parent": "root",
"message": {
"author": {"role": "user"},
"content": {"parts": ["Hello ChatGPT"]},
},
"children": ["msg2"],
},
"msg2": {
"parent": "msg1",
"message": {
"author": {"role": "assistant"},
"content": {"parts": ["Hello! How can I help?"]},
},
"children": [],
},
}
}
result = _try_chatgpt_json(data)
assert result is not None
assert "> Hello ChatGPT" in result
def test_chatgpt_json_no_mapping():
result = _try_chatgpt_json({"data": []})
assert result is None
def test_chatgpt_json_not_dict():
result = _try_chatgpt_json([1, 2, 3])
assert result is None
def test_chatgpt_json_fallback_root():
"""Root node has a message (no synthetic root), uses fallback."""
data = {
"mapping": {
"root": {
"parent": None,
"message": {
"author": {"role": "system"},
"content": {"parts": ["system prompt"]},
},
"children": ["msg1"],
},
"msg1": {
"parent": "root",
"message": {
"author": {"role": "user"},
"content": {"parts": ["Hello"]},
},
"children": ["msg2"],
},
"msg2": {
"parent": "msg1",
"message": {
"author": {"role": "assistant"},
"content": {"parts": ["Hi there"]},
},
"children": [],
},
}
}
result = _try_chatgpt_json(data)
assert result is not None
def test_chatgpt_json_too_few_messages():
data = {
"mapping": {
"root": {
"parent": None,
"message": None,
"children": ["msg1"],
},
"msg1": {
"parent": "root",
"message": {
"author": {"role": "user"},
"content": {"parts": ["Only one"]},
},
"children": [],
},
}
}
result = _try_chatgpt_json(data)
assert result is None
# ── _try_slack_json ────────────────────────────────────────────────────
def test_slack_json_valid():
data = [
{"type": "message", "user": "U1", "text": "Hello"},
{"type": "message", "user": "U2", "text": "Hi there"},
]
result = _try_slack_json(data)
assert result is not None
assert "Hello" in result
def test_slack_json_not_a_list():
result = _try_slack_json({"type": "message"})
assert result is None
def test_slack_json_too_few_messages():
data = [{"type": "message", "user": "U1", "text": "Hello"}]
result = _try_slack_json(data)
assert result is None
def test_slack_json_skips_non_message_types():
data = [
{"type": "channel_join", "user": "U1", "text": "joined"},
{"type": "message", "user": "U1", "text": "Hello"},
{"type": "message", "user": "U2", "text": "Hi"},
]
result = _try_slack_json(data)
assert result is not None
def test_slack_json_three_users():
"""Three speakers get alternating roles."""
data = [
{"type": "message", "user": "U1", "text": "Hello"},
{"type": "message", "user": "U2", "text": "Hi"},
{"type": "message", "user": "U3", "text": "Hey"},
]
result = _try_slack_json(data)
assert result is not None
def test_slack_json_empty_text_skipped():
data = [
{"type": "message", "user": "U1", "text": ""},
{"type": "message", "user": "U1", "text": "Hello"},
{"type": "message", "user": "U2", "text": "Hi"},
]
result = _try_slack_json(data)
assert result is not None
def test_slack_json_username_fallback():
data = [
{"type": "message", "username": "bot1", "text": "Hello"},
{"type": "message", "username": "bot2", "text": "Hi"},
]
result = _try_slack_json(data)
assert result is not None
# ── _try_normalize_json ────────────────────────────────────────────────
def test_try_normalize_json_invalid_json():
result = _try_normalize_json("not json at all {{{")
assert result is None
def test_try_normalize_json_valid_but_unknown_schema():
result = _try_normalize_json(json.dumps({"random": "data"}))
assert result is None
# ── _messages_to_transcript ────────────────────────────────────────────
def test_messages_to_transcript_basic():
msgs = [("user", "Q"), ("assistant", "A")]
with patch("mempalace.normalize.spellcheck_user_text", side_effect=lambda x: x, create=True):
result = _messages_to_transcript(msgs, spellcheck=False)
assert "> Q" in result
assert "A" in result
def test_messages_to_transcript_consecutive_users():
"""Two user messages in a row (no assistant between)."""
msgs = [("user", "Q1"), ("user", "Q2"), ("assistant", "A")]
result = _messages_to_transcript(msgs, spellcheck=False)
assert "> Q1" in result
assert "> Q2" in result
def test_messages_to_transcript_assistant_first():
"""Leading assistant message (no user before it)."""
msgs = [("assistant", "preamble"), ("user", "Q"), ("assistant", "A")]
result = _messages_to_transcript(msgs, spellcheck=False)
assert "preamble" in result
assert "> Q" in result
+83 -3
View File
@@ -1,10 +1,18 @@
"""
test_searcher.py Tests for the programmatic search_memories API.
test_searcher.py -- Tests for both search() (CLI) and search_memories() (API).
Tests the library-facing search interface (not the CLI print variant).
Uses the real ChromaDB fixtures from conftest.py for integration tests,
plus mock-based tests for error paths.
"""
from mempalace.searcher import search_memories
from unittest.mock import MagicMock, patch
import pytest
from mempalace.searcher import SearchError, search, search_memories
# ── search_memories (API) ──────────────────────────────────────────────
class TestSearchMemories:
@@ -43,3 +51,75 @@ class TestSearchMemories:
assert "source_file" in hit
assert "similarity" in hit
assert isinstance(hit["similarity"], float)
def test_search_memories_query_error(self):
"""search_memories returns error dict when query raises."""
mock_col = MagicMock()
mock_col.query.side_effect = RuntimeError("query failed")
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with patch("mempalace.searcher.chromadb.PersistentClient", return_value=mock_client):
result = search_memories("test", "/fake/path")
assert "error" in result
assert "query failed" in result["error"]
def test_search_memories_filters_in_result(self, palace_path, seeded_collection):
result = search_memories("test", palace_path, wing="project", room="backend")
assert result["filters"]["wing"] == "project"
assert result["filters"]["room"] == "backend"
# ── search() (CLI print function) ─────────────────────────────────────
class TestSearchCLI:
def test_search_prints_results(self, palace_path, seeded_collection, capsys):
search("JWT authentication", palace_path)
captured = capsys.readouterr()
assert "JWT" in captured.out or "authentication" in captured.out
def test_search_with_wing_filter(self, palace_path, seeded_collection, capsys):
search("planning", palace_path, wing="notes")
captured = capsys.readouterr()
assert "Results for" in captured.out
def test_search_with_room_filter(self, palace_path, seeded_collection, capsys):
search("database", palace_path, room="backend")
captured = capsys.readouterr()
assert "Room:" in captured.out
def test_search_with_wing_and_room(self, palace_path, seeded_collection, capsys):
search("code", palace_path, wing="project", room="frontend")
captured = capsys.readouterr()
assert "Wing:" in captured.out
assert "Room:" in captured.out
def test_search_no_palace_raises(self, tmp_path):
with pytest.raises(SearchError, match="No palace found"):
search("anything", str(tmp_path / "missing"))
def test_search_no_results(self, palace_path, collection, capsys):
"""Empty collection returns no results message."""
# collection is empty (no seeded data)
result = search("xyzzy_nonexistent_query", palace_path, n_results=1)
captured = capsys.readouterr()
# Either prints "No results" or returns None
assert result is None or "No results" in captured.out
def test_search_query_error_raises(self):
"""search raises SearchError when query fails."""
mock_col = MagicMock()
mock_col.query.side_effect = RuntimeError("boom")
mock_client = MagicMock()
mock_client.get_collection.return_value = mock_col
with patch("mempalace.searcher.chromadb.PersistentClient", return_value=mock_client):
with pytest.raises(SearchError, match="Search error"):
search("test", "/fake/path")
def test_search_n_results(self, palace_path, seeded_collection, capsys):
search("code", palace_path, n_results=1)
captured = capsys.readouterr()
# Should have output with at least one result block
assert "[1]" in captured.out
+244
View File
@@ -3,6 +3,9 @@ import json
from mempalace import split_mega_files as smf
# ── Config loading ─────────────────────────────────────────────────────
def test_load_known_people_falls_back_when_config_missing(monkeypatch, tmp_path):
monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", tmp_path / "missing.json")
smf._KNOWN_NAMES_CACHE = None
@@ -46,3 +49,244 @@ def test_extract_people_detects_names_from_content(monkeypatch):
monkeypatch.setattr(smf, "KNOWN_PEOPLE", ["Alice", "Ben"])
people = smf.extract_people(["> Alice reviewed the change with Ben\n"])
assert people == ["Alice", "Ben"]
# ── Config: force_reload and invalid JSON ──────────────────────────────
def test_load_known_names_force_reload(monkeypatch, tmp_path):
config_path = tmp_path / "known_names.json"
config_path.write_text(json.dumps(["Alice"]))
monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", config_path)
smf._KNOWN_NAMES_CACHE = None
smf._load_known_names_config()
assert smf._KNOWN_NAMES_CACHE == ["Alice"]
config_path.write_text(json.dumps(["Bob"]))
smf._load_known_names_config(force_reload=True)
assert smf._KNOWN_NAMES_CACHE == ["Bob"]
def test_load_known_names_invalid_json(monkeypatch, tmp_path):
config_path = tmp_path / "known_names.json"
config_path.write_text("not json {{{")
monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", config_path)
smf._KNOWN_NAMES_CACHE = None
result = smf._load_known_names_config()
assert result is None
def test_load_known_names_caching(monkeypatch, tmp_path):
config_path = tmp_path / "known_names.json"
config_path.write_text(json.dumps(["Alice"]))
monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", config_path)
smf._KNOWN_NAMES_CACHE = None
smf._load_known_names_config()
# Second call returns cached value without re-reading
config_path.write_text(json.dumps(["Changed"]))
result = smf._load_known_names_config()
assert result == ["Alice"]
# ── is_true_session_start ──────────────────────────────────────────────
def test_is_true_session_start_yes():
lines = ["Claude Code v1.0", "Some content", "More content", "", "", ""]
assert smf.is_true_session_start(lines, 0) is True
def test_is_true_session_start_no_ctrl_e():
lines = [
"Claude Code v1.0",
"Ctrl+E to show 5 previous messages",
"",
"",
"",
"",
]
assert smf.is_true_session_start(lines, 0) is False
def test_is_true_session_start_no_previous_messages():
lines = [
"Claude Code v1.0",
"Some text",
"previous messages here",
"",
"",
"",
]
assert smf.is_true_session_start(lines, 0) is False
# ── find_session_boundaries ────────────────────────────────────────────
def test_find_session_boundaries_two_sessions():
lines = [
"Claude Code v1.0",
"content 1",
"",
"",
"",
"",
"",
"Claude Code v1.0",
"content 2",
"",
"",
"",
"",
"",
]
boundaries = smf.find_session_boundaries(lines)
assert boundaries == [0, 7]
def test_find_session_boundaries_none():
lines = ["Just some text", "No sessions here"]
assert smf.find_session_boundaries(lines) == []
def test_find_session_boundaries_context_restore_skipped():
lines = [
"Claude Code v1.0",
"content",
"",
"",
"",
"",
"",
"Claude Code v1.0",
"Ctrl+E to show 5 previous messages",
"",
"",
"",
"",
]
boundaries = smf.find_session_boundaries(lines)
assert len(boundaries) == 1
# ── extract_timestamp ──────────────────────────────────────────────────
def test_extract_timestamp_found():
lines = ["⏺ 2:30 PM Wednesday, March 25, 2026"]
human, iso = smf.extract_timestamp(lines)
assert human == "2026-03-25_230PM"
assert iso == "2026-03-25"
def test_extract_timestamp_not_found():
lines = ["No timestamp here"]
human, iso = smf.extract_timestamp(lines)
assert human is None
assert iso is None
def test_extract_timestamp_only_checks_first_50():
lines = ["filler\n"] * 51 + ["⏺ 1:00 AM Monday, January 01, 2026"]
human, iso = smf.extract_timestamp(lines)
assert human is None
# ── extract_subject ────────────────────────────────────────────────────
def test_extract_subject_found():
lines = ["> How do we handle authentication?"]
subject = smf.extract_subject(lines)
assert "authentication" in subject.lower()
def test_extract_subject_skips_commands():
lines = ["> cd /some/dir", "> git status", "> What is the plan?"]
subject = smf.extract_subject(lines)
assert "plan" in subject.lower()
def test_extract_subject_fallback():
lines = ["No prompts at all", "Just text"]
subject = smf.extract_subject(lines)
assert subject == "session"
def test_extract_subject_short_prompt_skipped():
lines = ["> ok", "> yes", "> What about the deployment strategy?"]
subject = smf.extract_subject(lines)
assert "deployment" in subject.lower()
def test_extract_subject_truncated():
lines = ["> " + "a" * 100]
subject = smf.extract_subject(lines)
assert len(subject) <= 60
# ── split_file ─────────────────────────────────────────────────────────
def _make_mega_file(tmp_path, n_sessions=3, lines_per_session=15):
"""Create a mega-file with N sessions."""
content = ""
for i in range(n_sessions):
content += f"Claude Code v1.{i}\n"
content += f"> What about topic {i} and how it works?\n"
for j in range(lines_per_session - 2):
content += f"Line {j} of session {i}\n"
path = tmp_path / "mega.txt"
path.write_text(content)
return path
def test_split_file_creates_output(tmp_path):
mega = _make_mega_file(tmp_path)
out_dir = tmp_path / "output"
out_dir.mkdir()
written = smf.split_file(str(mega), str(out_dir))
assert len(written) >= 2
for p in written:
assert p.exists()
def test_split_file_dry_run(tmp_path):
mega = _make_mega_file(tmp_path)
out_dir = tmp_path / "output"
out_dir.mkdir()
written = smf.split_file(str(mega), str(out_dir), dry_run=True)
assert len(written) >= 2
for p in written:
assert not p.exists()
def test_split_file_not_mega(tmp_path):
"""File with fewer than 2 sessions is not split."""
path = tmp_path / "single.txt"
path.write_text("Claude Code v1.0\nJust one session\n" + "line\n" * 20)
written = smf.split_file(str(path), str(tmp_path))
assert written == []
def test_split_file_output_dir_none(tmp_path):
"""When output_dir is None, writes to same dir as source."""
mega = _make_mega_file(tmp_path)
written = smf.split_file(str(mega), None)
assert len(written) >= 2
for p in written:
assert str(p.parent) == str(tmp_path)
def test_split_file_tiny_fragments_skipped(tmp_path):
"""Tiny chunks (< 10 lines) are skipped."""
content = "Claude Code v1.0\nline\n" * 2 + "Claude Code v1.0\n" + "line\n" * 20
path = tmp_path / "tiny.txt"
path.write_text(content)
written = smf.split_file(str(path), str(tmp_path))
# The first chunk is very small, should be skipped
for p in written:
assert p.stat().st_size > 0