a2432a3245
* fix: parse Claude.ai privacy export with messages key and sender field (#677) The privacy-export branch in _try_claude_ai_json only checked for the "chat_messages" key, missing exports that use "messages" instead. It also only read the "role" field while real privacy exports use "sender". Both gaps caused the file to fall through to plain-text, producing a single giant drawer. Changes: - Accept "messages" alongside "chat_messages" in the conversation-object guard and inner extraction. - Accept "sender" alongside "role" as the author field. - Fall back to a top-level "text" key when content blocks are empty. - Produce one transcript per conversation instead of concatenating all conversations into a single blob. - Extract shared logic into _collect_claude_messages helper. - Add 6 regression tests covering each variant. * style: apply ruff format to normalize.py * fix: guard against null text field in Claude.ai export parsing item.get("text", "").strip() crashes when "text" is explicitly null in the JSON (legal and observed in some exports). Use (item.get("text") or "").strip() and add a regression test. --------- Co-authored-by: Igor Lins e Silva <4753812+igorls@users.noreply.github.com>
1051 lines
33 KiB
Python
1051 lines
33 KiB
Python
import json
|
|
from unittest.mock import patch
|
|
|
|
from mempalace.normalize import (
|
|
_extract_content,
|
|
_format_tool_result,
|
|
_format_tool_use,
|
|
_messages_to_transcript,
|
|
_try_chatgpt_json,
|
|
_try_claude_ai_json,
|
|
_try_claude_code_jsonl,
|
|
_try_codex_jsonl,
|
|
_try_normalize_json,
|
|
_try_slack_json,
|
|
normalize,
|
|
)
|
|
|
|
|
|
# ── normalize() top-level ──────────────────────────────────────────────
|
|
|
|
|
|
def test_plain_text(tmp_path):
|
|
f = tmp_path / "plain.txt"
|
|
f.write_text("Hello world\nSecond line\n")
|
|
result = normalize(str(f))
|
|
assert "Hello world" in result
|
|
|
|
|
|
def test_claude_json(tmp_path):
|
|
data = [{"role": "user", "content": "Hi"}, {"role": "assistant", "content": "Hello"}]
|
|
f = tmp_path / "claude.json"
|
|
f.write_text(json.dumps(data))
|
|
result = normalize(str(f))
|
|
assert "Hi" in result
|
|
|
|
|
|
def test_empty(tmp_path):
|
|
f = tmp_path / "empty.txt"
|
|
f.write_text("")
|
|
result = normalize(str(f))
|
|
assert result.strip() == ""
|
|
|
|
|
|
def test_normalize_io_error():
|
|
"""normalize raises IOError for unreadable file."""
|
|
try:
|
|
normalize("/nonexistent/path/file.txt")
|
|
assert False, "Should have raised"
|
|
except IOError as e:
|
|
assert "Could not read" in str(e)
|
|
|
|
|
|
def test_normalize_already_has_markers(tmp_path):
|
|
"""Files with >= 3 '>' lines pass through unchanged."""
|
|
content = "> question 1\nanswer 1\n> question 2\nanswer 2\n> question 3\nanswer 3\n"
|
|
f = tmp_path / "markers.txt"
|
|
f.write_text(content)
|
|
result = normalize(str(f))
|
|
assert result == content
|
|
|
|
|
|
def test_normalize_json_content_detected_by_brace(tmp_path):
|
|
"""A .txt file starting with [ triggers JSON parsing."""
|
|
data = [{"role": "user", "content": "Hey"}, {"role": "assistant", "content": "Hi there"}]
|
|
f = tmp_path / "chat.txt"
|
|
f.write_text(json.dumps(data))
|
|
result = normalize(str(f))
|
|
assert "Hey" in result
|
|
|
|
|
|
def test_normalize_whitespace_only(tmp_path):
|
|
f = tmp_path / "ws.txt"
|
|
f.write_text(" \n \n ")
|
|
result = normalize(str(f))
|
|
assert result.strip() == ""
|
|
|
|
|
|
# ── _extract_content ───────────────────────────────────────────────────
|
|
|
|
|
|
def test_extract_content_string():
|
|
assert _extract_content("hello") == "hello"
|
|
|
|
|
|
def test_extract_content_list_of_strings():
|
|
assert _extract_content(["hello", "world"]) == "hello\nworld"
|
|
|
|
|
|
def test_extract_content_list_of_blocks():
|
|
blocks = [{"type": "text", "text": "hello"}, {"type": "image", "url": "x"}]
|
|
assert _extract_content(blocks) == "hello"
|
|
|
|
|
|
def test_extract_content_dict():
|
|
assert _extract_content({"text": "hello"}) == "hello"
|
|
|
|
|
|
def test_extract_content_none():
|
|
assert _extract_content(None) == ""
|
|
|
|
|
|
def test_extract_content_mixed_list():
|
|
blocks = ["plain", {"type": "text", "text": "block"}]
|
|
assert _extract_content(blocks) == "plain\nblock"
|
|
|
|
|
|
# ── _format_tool_use ──────────────────────────────────────────────────
|
|
|
|
|
|
def test_format_tool_use_bash():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Bash",
|
|
"input": {"command": "lsusb | grep razer", "description": "Check USB"},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Bash] lsusb | grep razer"
|
|
|
|
|
|
def test_format_tool_use_bash_truncates_long_command():
|
|
block = {"type": "tool_use", "id": "t1", "name": "Bash", "input": {"command": "x" * 300}}
|
|
result = _format_tool_use(block)
|
|
assert len(result) <= len("[Bash] ") + 200 + len("...")
|
|
assert result.endswith("...")
|
|
|
|
|
|
def test_format_tool_use_read():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Read",
|
|
"input": {"file_path": "/home/jp/file.py"},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Read /home/jp/file.py]"
|
|
|
|
|
|
def test_format_tool_use_read_with_range():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Read",
|
|
"input": {"file_path": "/home/jp/file.py", "offset": 10, "limit": 50},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Read /home/jp/file.py:10-60]"
|
|
|
|
|
|
def test_format_tool_use_grep():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Grep",
|
|
"input": {"pattern": "firmware", "path": "/home/jp/proj"},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Grep] firmware in /home/jp/proj"
|
|
|
|
|
|
def test_format_tool_use_grep_with_glob():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Grep",
|
|
"input": {"pattern": "TODO", "glob": "*.py"},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Grep] TODO in *.py"
|
|
|
|
|
|
def test_format_tool_use_glob():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Glob",
|
|
"input": {"pattern": "/home/jp/proj/**/*.py"},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Glob] /home/jp/proj/**/*.py"
|
|
|
|
|
|
def test_format_tool_use_edit():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Edit",
|
|
"input": {"file_path": "/home/jp/file.py", "old_string": "x", "new_string": "y"},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Edit /home/jp/file.py]"
|
|
|
|
|
|
def test_format_tool_use_write():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Write",
|
|
"input": {"file_path": "/home/jp/file.py", "content": "..."},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result == "[Write /home/jp/file.py]"
|
|
|
|
|
|
def test_format_tool_use_unknown_tool():
|
|
block = {
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "mcp__mempalace__search",
|
|
"input": {"query": "firmware probe", "limit": 5},
|
|
}
|
|
result = _format_tool_use(block)
|
|
assert result.startswith("[mcp__mempalace__search]")
|
|
assert "firmware probe" in result
|
|
|
|
|
|
def test_format_tool_use_unknown_tool_truncates():
|
|
block = {"type": "tool_use", "id": "t1", "name": "SomeTool", "input": {"data": "x" * 300}}
|
|
result = _format_tool_use(block)
|
|
assert result.endswith("...")
|
|
assert len(result) <= len("[SomeTool] ") + 200 + len("...")
|
|
|
|
|
|
# ── _format_tool_result ──────────────────────────────────────────────
|
|
|
|
|
|
def test_format_tool_result_bash_short():
|
|
"""Short Bash output is preserved in full."""
|
|
content = "Bus 002 Device 005: ID 1532:0e05 Razer Kiyo Pro"
|
|
result = _format_tool_result(content, "Bash")
|
|
assert result == "→ Bus 002 Device 005: ID 1532:0e05 Razer Kiyo Pro"
|
|
|
|
|
|
def test_format_tool_result_bash_head_tail():
|
|
"""Long Bash output gets head+tail with gap marker."""
|
|
lines = [f"line {i}" for i in range(60)]
|
|
content = "\n".join(lines)
|
|
result = _format_tool_result(content, "Bash")
|
|
assert "line 0" in result
|
|
assert "line 19" in result
|
|
assert "line 40" in result
|
|
assert "line 59" in result
|
|
assert "20 lines omitted" in result
|
|
# Lines 20-39 should be gone
|
|
assert "line 20\n" not in result
|
|
|
|
|
|
def test_format_tool_result_bash_exactly_40_lines():
|
|
"""Bash output at exactly 40 lines is not truncated."""
|
|
lines = [f"line {i}" for i in range(40)]
|
|
content = "\n".join(lines)
|
|
result = _format_tool_result(content, "Bash")
|
|
assert "omitted" not in result
|
|
assert "line 0" in result
|
|
assert "line 39" in result
|
|
|
|
|
|
def test_format_tool_result_read_omitted():
|
|
"""Read results are omitted (content already in palace from project mining)."""
|
|
result = _format_tool_result("lots of file content here...", "Read")
|
|
assert result == ""
|
|
|
|
|
|
def test_format_tool_result_edit_omitted():
|
|
"""Edit results are omitted (diff is in git)."""
|
|
result = _format_tool_result("file updated", "Edit")
|
|
assert result == ""
|
|
|
|
|
|
def test_format_tool_result_write_omitted():
|
|
"""Write results are omitted."""
|
|
result = _format_tool_result("file created", "Write")
|
|
assert result == ""
|
|
|
|
|
|
def test_format_tool_result_grep_short():
|
|
"""Short Grep output is kept."""
|
|
content = "src/foo.py\nsrc/bar.py\nsrc/baz.py"
|
|
result = _format_tool_result(content, "Grep")
|
|
assert "→ src/foo.py" in result
|
|
assert "→ src/baz.py" in result
|
|
|
|
|
|
def test_format_tool_result_grep_caps_at_20():
|
|
"""Grep output beyond 20 lines is truncated."""
|
|
lines = [f"match_{i}.py" for i in range(30)]
|
|
content = "\n".join(lines)
|
|
result = _format_tool_result(content, "Grep")
|
|
assert "match_19.py" in result
|
|
assert "match_20.py" not in result
|
|
assert "10 more matches" in result
|
|
|
|
|
|
def test_format_tool_result_glob_caps_at_20():
|
|
"""Glob output beyond 20 lines is truncated."""
|
|
lines = [f"/path/file_{i}.py" for i in range(25)]
|
|
content = "\n".join(lines)
|
|
result = _format_tool_result(content, "Glob")
|
|
assert "file_19.py" in result
|
|
assert "file_20.py" not in result
|
|
assert "5 more matches" in result
|
|
|
|
|
|
def test_format_tool_result_unknown_short():
|
|
"""Unknown tool with short output is kept."""
|
|
result = _format_tool_result("some output", "mcp__mempalace__search")
|
|
assert result == "→ some output"
|
|
|
|
|
|
def test_format_tool_result_unknown_truncates():
|
|
"""Unknown tool output over 2KB is truncated."""
|
|
content = "x" * 3000
|
|
result = _format_tool_result(content, "SomeTool")
|
|
assert result.endswith("... [truncated, 3000 chars]")
|
|
assert len(result) < 2200
|
|
|
|
|
|
def test_format_tool_result_list_content():
|
|
"""tool_result content can be a list of text blocks."""
|
|
content = [{"type": "text", "text": "result line 1"}, {"type": "text", "text": "result line 2"}]
|
|
result = _format_tool_result(content, "Bash")
|
|
assert "result line 1" in result
|
|
assert "result line 2" in result
|
|
|
|
|
|
def test_format_tool_result_empty():
|
|
"""Empty result returns empty string."""
|
|
result = _format_tool_result("", "Bash")
|
|
assert result == ""
|
|
|
|
|
|
# ── _try_claude_code_jsonl ─────────────────────────────────────────────
|
|
|
|
|
|
def test_claude_code_jsonl_valid():
|
|
lines = [
|
|
json.dumps({"type": "human", "message": {"content": "What is X?"}}),
|
|
json.dumps({"type": "assistant", "message": {"content": "X is Y."}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "> What is X?" in result
|
|
assert "X is Y." in result
|
|
|
|
|
|
def test_claude_code_jsonl_user_type():
|
|
lines = [
|
|
json.dumps({"type": "user", "message": {"content": "Q"}}),
|
|
json.dumps({"type": "assistant", "message": {"content": "A"}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "> Q" in result
|
|
|
|
|
|
def test_claude_code_jsonl_too_few_messages():
|
|
lines = [json.dumps({"type": "human", "message": {"content": "only one"}})]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is None
|
|
|
|
|
|
def test_claude_code_jsonl_invalid_json_lines():
|
|
lines = [
|
|
"not json",
|
|
json.dumps({"type": "human", "message": {"content": "Q"}}),
|
|
json.dumps({"type": "assistant", "message": {"content": "A"}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
|
|
|
|
def test_claude_code_jsonl_non_dict_entries():
|
|
lines = [
|
|
json.dumps([1, 2, 3]),
|
|
json.dumps({"type": "human", "message": {"content": "Q"}}),
|
|
json.dumps({"type": "assistant", "message": {"content": "A"}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
|
|
|
|
# ── _try_codex_jsonl ───────────────────────────────────────────────────
|
|
|
|
|
|
def test_codex_jsonl_valid():
|
|
lines = [
|
|
json.dumps({"type": "session_meta", "payload": {}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
|
|
]
|
|
result = _try_codex_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "> Q" in result
|
|
|
|
|
|
def test_codex_jsonl_no_session_meta():
|
|
"""Without session_meta, codex parser returns None."""
|
|
lines = [
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
|
|
]
|
|
result = _try_codex_jsonl("\n".join(lines))
|
|
assert result is None
|
|
|
|
|
|
def test_codex_jsonl_skips_non_event_msg():
|
|
lines = [
|
|
json.dumps({"type": "session_meta"}),
|
|
json.dumps({"type": "response_item", "payload": {"type": "user_message", "message": "X"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
|
|
]
|
|
result = _try_codex_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "X" not in result.split("> Q")[0]
|
|
|
|
|
|
def test_codex_jsonl_non_string_message():
|
|
lines = [
|
|
json.dumps({"type": "session_meta"}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": 123}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
|
|
]
|
|
result = _try_codex_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
|
|
|
|
def test_codex_jsonl_empty_text_skipped():
|
|
lines = [
|
|
json.dumps({"type": "session_meta"}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": " "}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
|
|
]
|
|
result = _try_codex_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
|
|
|
|
def test_codex_jsonl_payload_not_dict():
|
|
lines = [
|
|
json.dumps({"type": "session_meta"}),
|
|
json.dumps({"type": "event_msg", "payload": "not a dict"}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}),
|
|
json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}),
|
|
]
|
|
result = _try_codex_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
|
|
|
|
# ── _try_claude_ai_json ───────────────────────────────────────────────
|
|
|
|
|
|
def test_claude_ai_flat_messages():
|
|
data = [
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "Hi there"},
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Hello" in result
|
|
|
|
|
|
def test_claude_ai_dict_with_messages_key():
|
|
data = {
|
|
"messages": [
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "Hi"},
|
|
]
|
|
}
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_claude_ai_privacy_export():
|
|
data = [
|
|
{
|
|
"chat_messages": [
|
|
{"role": "human", "content": "Q1"},
|
|
{"role": "ai", "content": "A1"},
|
|
]
|
|
}
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Q1" in result
|
|
|
|
|
|
def test_claude_ai_not_a_list():
|
|
result = _try_claude_ai_json("not a list")
|
|
assert result is None
|
|
|
|
|
|
def test_claude_ai_too_few_messages():
|
|
data = [{"role": "user", "content": "Hello"}]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is None
|
|
|
|
|
|
def test_claude_ai_dict_with_chat_messages_key():
|
|
data = {
|
|
"chat_messages": [
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "World"},
|
|
]
|
|
}
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_claude_ai_privacy_export_non_dict_items():
|
|
"""Non-dict items in privacy export are skipped."""
|
|
data = [
|
|
{
|
|
"chat_messages": [
|
|
"not a dict",
|
|
{"role": "user", "content": "Q"},
|
|
{"role": "assistant", "content": "A"},
|
|
]
|
|
},
|
|
"not a convo",
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_claude_ai_privacy_export_messages_key():
|
|
"""Privacy export using 'messages' key instead of 'chat_messages'."""
|
|
data = [
|
|
{
|
|
"uuid": "abc-123",
|
|
"name": "Test convo",
|
|
"messages": [
|
|
{"role": "human", "content": "Q1"},
|
|
{"role": "ai", "content": "A1"},
|
|
],
|
|
}
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Q1" in result
|
|
|
|
|
|
def test_claude_ai_privacy_export_sender_field():
|
|
"""Privacy export using 'sender' instead of 'role'."""
|
|
data = [
|
|
{
|
|
"chat_messages": [
|
|
{"sender": "human", "content": "Q1"},
|
|
{"sender": "assistant", "content": "A1"},
|
|
]
|
|
}
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Q1" in result
|
|
|
|
|
|
def test_claude_ai_privacy_export_text_fallback():
|
|
"""Privacy export where content is empty but text field has the message."""
|
|
data = [
|
|
{
|
|
"chat_messages": [
|
|
{"sender": "human", "text": "Q1", "content": []},
|
|
{"sender": "assistant", "text": "A1", "content": []},
|
|
]
|
|
}
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Q1" in result
|
|
|
|
|
|
def test_claude_ai_privacy_export_null_text():
|
|
"""Privacy export where text field is explicitly null must not crash."""
|
|
data = [
|
|
{
|
|
"chat_messages": [
|
|
{"sender": "human", "text": None, "content": "Q1"},
|
|
{"sender": "assistant", "text": None, "content": "A1"},
|
|
]
|
|
}
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Q1" in result
|
|
|
|
|
|
def test_claude_ai_privacy_export_per_conversation():
|
|
"""Multiple conversations produce separate transcripts."""
|
|
data = [
|
|
{
|
|
"uuid": "convo-1",
|
|
"chat_messages": [
|
|
{"role": "human", "content": "Q1"},
|
|
{"role": "ai", "content": "A1"},
|
|
],
|
|
},
|
|
{
|
|
"uuid": "convo-2",
|
|
"chat_messages": [
|
|
{"role": "human", "content": "Q2"},
|
|
{"role": "ai", "content": "A2"},
|
|
],
|
|
},
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "> Q1" in result
|
|
assert "> Q2" in result
|
|
# each conversation is a separate transcript block
|
|
parts = result.split("\n\n")
|
|
q1_parts = [p for p in parts if "> Q1" in p]
|
|
q2_parts = [p for p in parts if "> Q2" in p]
|
|
assert len(q1_parts) >= 1
|
|
assert len(q2_parts) >= 1
|
|
|
|
|
|
def test_claude_ai_privacy_export_skips_empty_conversations():
|
|
"""Conversations with <2 messages are skipped."""
|
|
data = [
|
|
{
|
|
"chat_messages": [
|
|
{"role": "human", "content": "lonely message"},
|
|
],
|
|
},
|
|
{
|
|
"chat_messages": [
|
|
{"role": "human", "content": "Q1"},
|
|
{"role": "ai", "content": "A1"},
|
|
],
|
|
},
|
|
]
|
|
result = _try_claude_ai_json(data)
|
|
assert result is not None
|
|
assert "lonely message" not in result
|
|
assert "> Q1" in result
|
|
|
|
|
|
# ── _try_chatgpt_json ─────────────────────────────────────────────────
|
|
|
|
|
|
def test_chatgpt_json_valid():
|
|
data = {
|
|
"mapping": {
|
|
"root": {
|
|
"parent": None,
|
|
"message": None,
|
|
"children": ["msg1"],
|
|
},
|
|
"msg1": {
|
|
"parent": "root",
|
|
"message": {
|
|
"author": {"role": "user"},
|
|
"content": {"parts": ["Hello ChatGPT"]},
|
|
},
|
|
"children": ["msg2"],
|
|
},
|
|
"msg2": {
|
|
"parent": "msg1",
|
|
"message": {
|
|
"author": {"role": "assistant"},
|
|
"content": {"parts": ["Hello! How can I help?"]},
|
|
},
|
|
"children": [],
|
|
},
|
|
}
|
|
}
|
|
result = _try_chatgpt_json(data)
|
|
assert result is not None
|
|
assert "> Hello ChatGPT" in result
|
|
|
|
|
|
def test_chatgpt_json_no_mapping():
|
|
result = _try_chatgpt_json({"data": []})
|
|
assert result is None
|
|
|
|
|
|
def test_chatgpt_json_not_dict():
|
|
result = _try_chatgpt_json([1, 2, 3])
|
|
assert result is None
|
|
|
|
|
|
def test_chatgpt_json_fallback_root():
|
|
"""Root node has a message (no synthetic root), uses fallback."""
|
|
data = {
|
|
"mapping": {
|
|
"root": {
|
|
"parent": None,
|
|
"message": {
|
|
"author": {"role": "system"},
|
|
"content": {"parts": ["system prompt"]},
|
|
},
|
|
"children": ["msg1"],
|
|
},
|
|
"msg1": {
|
|
"parent": "root",
|
|
"message": {
|
|
"author": {"role": "user"},
|
|
"content": {"parts": ["Hello"]},
|
|
},
|
|
"children": ["msg2"],
|
|
},
|
|
"msg2": {
|
|
"parent": "msg1",
|
|
"message": {
|
|
"author": {"role": "assistant"},
|
|
"content": {"parts": ["Hi there"]},
|
|
},
|
|
"children": [],
|
|
},
|
|
}
|
|
}
|
|
result = _try_chatgpt_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_chatgpt_json_too_few_messages():
|
|
data = {
|
|
"mapping": {
|
|
"root": {
|
|
"parent": None,
|
|
"message": None,
|
|
"children": ["msg1"],
|
|
},
|
|
"msg1": {
|
|
"parent": "root",
|
|
"message": {
|
|
"author": {"role": "user"},
|
|
"content": {"parts": ["Only one"]},
|
|
},
|
|
"children": [],
|
|
},
|
|
}
|
|
}
|
|
result = _try_chatgpt_json(data)
|
|
assert result is None
|
|
|
|
|
|
# ── _try_slack_json ────────────────────────────────────────────────────
|
|
|
|
|
|
def test_slack_json_valid():
|
|
data = [
|
|
{"type": "message", "user": "U1", "text": "Hello"},
|
|
{"type": "message", "user": "U2", "text": "Hi there"},
|
|
]
|
|
result = _try_slack_json(data)
|
|
assert result is not None
|
|
assert "Hello" in result
|
|
|
|
|
|
def test_slack_json_not_a_list():
|
|
result = _try_slack_json({"type": "message"})
|
|
assert result is None
|
|
|
|
|
|
def test_slack_json_too_few_messages():
|
|
data = [{"type": "message", "user": "U1", "text": "Hello"}]
|
|
result = _try_slack_json(data)
|
|
assert result is None
|
|
|
|
|
|
def test_slack_json_skips_non_message_types():
|
|
data = [
|
|
{"type": "channel_join", "user": "U1", "text": "joined"},
|
|
{"type": "message", "user": "U1", "text": "Hello"},
|
|
{"type": "message", "user": "U2", "text": "Hi"},
|
|
]
|
|
result = _try_slack_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_slack_json_three_users():
|
|
"""Three speakers get alternating roles."""
|
|
data = [
|
|
{"type": "message", "user": "U1", "text": "Hello"},
|
|
{"type": "message", "user": "U2", "text": "Hi"},
|
|
{"type": "message", "user": "U3", "text": "Hey"},
|
|
]
|
|
result = _try_slack_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_slack_json_empty_text_skipped():
|
|
data = [
|
|
{"type": "message", "user": "U1", "text": ""},
|
|
{"type": "message", "user": "U1", "text": "Hello"},
|
|
{"type": "message", "user": "U2", "text": "Hi"},
|
|
]
|
|
result = _try_slack_json(data)
|
|
assert result is not None
|
|
|
|
|
|
def test_slack_json_username_fallback():
|
|
data = [
|
|
{"type": "message", "username": "bot1", "text": "Hello"},
|
|
{"type": "message", "username": "bot2", "text": "Hi"},
|
|
]
|
|
result = _try_slack_json(data)
|
|
assert result is not None
|
|
|
|
|
|
# ── _try_normalize_json ────────────────────────────────────────────────
|
|
|
|
|
|
def test_try_normalize_json_invalid_json():
|
|
result = _try_normalize_json("not json at all {{{")
|
|
assert result is None
|
|
|
|
|
|
def test_try_normalize_json_valid_but_unknown_schema():
|
|
result = _try_normalize_json(json.dumps({"random": "data"}))
|
|
assert result is None
|
|
|
|
|
|
# ── _messages_to_transcript ────────────────────────────────────────────
|
|
|
|
|
|
def test_messages_to_transcript_basic():
|
|
msgs = [("user", "Q"), ("assistant", "A")]
|
|
with patch("mempalace.normalize.spellcheck_user_text", side_effect=lambda x: x, create=True):
|
|
result = _messages_to_transcript(msgs, spellcheck=False)
|
|
assert "> Q" in result
|
|
assert "A" in result
|
|
|
|
|
|
def test_messages_to_transcript_consecutive_users():
|
|
"""Two user messages in a row (no assistant between)."""
|
|
msgs = [("user", "Q1"), ("user", "Q2"), ("assistant", "A")]
|
|
result = _messages_to_transcript(msgs, spellcheck=False)
|
|
assert "> Q1" in result
|
|
assert "> Q2" in result
|
|
|
|
|
|
def test_messages_to_transcript_assistant_first():
|
|
"""Leading assistant message (no user before it)."""
|
|
msgs = [("assistant", "preamble"), ("user", "Q"), ("assistant", "A")]
|
|
result = _messages_to_transcript(msgs, spellcheck=False)
|
|
assert "preamble" in result
|
|
assert "> Q" in result
|
|
|
|
|
|
# ── Tool block integration (Task 3) ───────────────────────────────────
|
|
|
|
|
|
def test_extract_content_with_tool_use():
|
|
"""_extract_content includes formatted tool_use blocks."""
|
|
content = [
|
|
{"type": "text", "text": "Let me check."},
|
|
{"type": "tool_use", "id": "t1", "name": "Bash", "input": {"command": "lsusb"}},
|
|
]
|
|
result = _extract_content(content)
|
|
assert "Let me check." in result
|
|
assert "[Bash] lsusb" in result
|
|
|
|
|
|
def test_extract_content_with_tool_result():
|
|
"""_extract_content includes formatted tool_result blocks (needs tool_use_map)."""
|
|
content = [
|
|
{"type": "tool_result", "tool_use_id": "t1", "content": "some output"},
|
|
]
|
|
result = _extract_content(content, tool_use_map={"t1": "Bash"})
|
|
assert "→ some output" in result
|
|
|
|
|
|
def test_extract_content_tool_result_without_map_uses_fallback():
|
|
"""tool_result without a map entry uses fallback strategy."""
|
|
content = [
|
|
{"type": "tool_result", "tool_use_id": "t1", "content": "some output"},
|
|
]
|
|
result = _extract_content(content)
|
|
assert "→ some output" in result
|
|
|
|
|
|
def test_claude_code_jsonl_captures_tool_output():
|
|
"""Full integration: tool_use + tool_result appear in normalized transcript."""
|
|
lines = [
|
|
json.dumps({"type": "human", "message": {"content": "Check the camera"}}),
|
|
json.dumps(
|
|
{
|
|
"type": "assistant",
|
|
"message": {
|
|
"content": [
|
|
{"type": "text", "text": "Let me check."},
|
|
{
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Bash",
|
|
"input": {"command": "lsusb | grep razer"},
|
|
},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
json.dumps(
|
|
{
|
|
"type": "human",
|
|
"message": {
|
|
"content": [
|
|
{
|
|
"type": "tool_result",
|
|
"tool_use_id": "t1",
|
|
"content": "Bus 002 Device 005: ID 1532:0e05 Razer Kiyo Pro",
|
|
},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
json.dumps({"type": "assistant", "message": {"content": "Found it."}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "> Check the camera" in result
|
|
assert "[Bash] lsusb | grep razer" in result
|
|
assert "→ Bus 002 Device 005" in result
|
|
assert "Found it." in result
|
|
|
|
|
|
def test_claude_code_jsonl_read_result_omitted():
|
|
"""Read tool results are omitted but the path breadcrumb is kept."""
|
|
lines = [
|
|
json.dumps({"type": "human", "message": {"content": "Show me the file"}}),
|
|
json.dumps(
|
|
{
|
|
"type": "assistant",
|
|
"message": {
|
|
"content": [
|
|
{"type": "text", "text": "Reading it."},
|
|
{
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Read",
|
|
"input": {"file_path": "/home/jp/file.py"},
|
|
},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
json.dumps(
|
|
{
|
|
"type": "human",
|
|
"message": {
|
|
"content": [
|
|
{
|
|
"type": "tool_result",
|
|
"tool_use_id": "t1",
|
|
"content": "entire file contents here that should not appear",
|
|
},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
json.dumps({"type": "assistant", "message": {"content": "Here it is."}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "[Read /home/jp/file.py]" in result
|
|
assert "entire file contents here" not in result
|
|
|
|
|
|
def test_claude_code_jsonl_tool_only_user_message_not_counted():
|
|
"""A user message containing ONLY tool_results (no text) should not
|
|
be added as a separate user turn with '>'."""
|
|
lines = [
|
|
json.dumps({"type": "human", "message": {"content": "Do it"}}),
|
|
json.dumps(
|
|
{
|
|
"type": "assistant",
|
|
"message": {
|
|
"content": [
|
|
{"type": "text", "text": "Running."},
|
|
{
|
|
"type": "tool_use",
|
|
"id": "t1",
|
|
"name": "Bash",
|
|
"input": {"command": "echo hi"},
|
|
},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
json.dumps(
|
|
{
|
|
"type": "human",
|
|
"message": {
|
|
"content": [
|
|
{"type": "tool_result", "tool_use_id": "t1", "content": "hi"},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
json.dumps({"type": "assistant", "message": {"content": "Done."}}),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
# Only one user turn marker — the original "Do it"
|
|
user_turns = [line for line in result.split("\n") if line.strip().startswith(">")]
|
|
assert len(user_turns) == 1
|
|
assert "> Do it" in result
|
|
|
|
|
|
def test_extract_content_text_only_backward_compat():
|
|
"""Text-only content blocks still work (backward compat)."""
|
|
content = [
|
|
{"type": "text", "text": "Hello"},
|
|
{"type": "text", "text": "World"},
|
|
]
|
|
result = _extract_content(content)
|
|
assert "Hello" in result
|
|
assert "World" in result
|
|
|
|
|
|
def test_extract_content_string_unchanged():
|
|
"""Plain string content still works."""
|
|
result = _extract_content("just a string")
|
|
assert result == "just a string"
|
|
|
|
|
|
def test_claude_code_jsonl_thinking_blocks_ignored():
|
|
"""Thinking blocks are still ignored."""
|
|
lines = [
|
|
json.dumps({"type": "human", "message": {"content": "Q"}}),
|
|
json.dumps(
|
|
{
|
|
"type": "assistant",
|
|
"message": {
|
|
"content": [
|
|
{"type": "thinking", "thinking": "", "signature": "abc"},
|
|
{"type": "text", "text": "A"},
|
|
]
|
|
},
|
|
}
|
|
),
|
|
]
|
|
result = _try_claude_code_jsonl("\n".join(lines))
|
|
assert result is not None
|
|
assert "thinking" not in result.lower()
|
|
assert "signature" not in result
|
|
assert "A" in result
|
|
|
|
|
|
def test_normalize_rejects_large_file():
|
|
"""Files over 500 MB should raise IOError before reading."""
|
|
with patch("mempalace.normalize.os.path.getsize", return_value=600 * 1024 * 1024):
|
|
try:
|
|
normalize("/fake/huge_file.txt")
|
|
assert False, "Should have raised IOError"
|
|
except IOError as e:
|
|
assert "too large" in str(e).lower()
|