diff --git a/mempalace/cli.py b/mempalace/cli.py index f1bc919..bec6d2b 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -170,6 +170,13 @@ def cmd_sweep(args): f"+{result['drawers_added']} drawers, " f"{result['drawers_skipped']} already present." ) + failures = result.get("failures") or [] + if failures: + print( + f" ⚠ {len(failures)} file(s) failed to sweep — see stderr / logs for details.", + file=sys.stderr, + ) + sys.exit(2) else: print(f" ✗ Not a file or directory: {target}", file=sys.stderr) sys.exit(1) @@ -580,7 +587,7 @@ def main(): p_sweep = sub.add_parser( "sweep", help="Tandem miner: catch anything the primary miner missed " - "(message-level, timestamp-coordinated, idempotent)", + "(message-level, timestamp-coordinated, idempotent)", ) p_sweep.add_argument( "target", diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py index a46876b..c4d9092 100644 --- a/mempalace/sweeper.py +++ b/mempalace/sweeper.py @@ -26,6 +26,7 @@ Usage: from __future__ import annotations import json +import logging import sys from datetime import datetime from pathlib import Path @@ -33,16 +34,20 @@ from typing import Iterator, Optional from .palace import get_collection +logger = logging.getLogger(__name__) + # ── JSONL parsing ──────────────────────────────────────────────────── + def _flatten_content(content) -> str: """Normalize Claude Code's message content to a plain string. User messages are strings already; assistant messages are a list of content blocks like [{"type": "text", "text": "..."}, {"type": - "tool_use", ...}]. We keep text blocks verbatim and describe non-text - blocks as a marker so the drawer carries a faithful record. + "tool_use", ...}]. All blocks are preserved verbatim — the design + principle is "verbatim always", so tool inputs and results are + serialized in full, never truncated. """ if isinstance(content, str): return content @@ -57,14 +62,12 @@ def _flatten_content(content) -> str: elif btype == "tool_use": parts.append( f"[tool_use: {block.get('name', '?')} " - f"input={json.dumps(block.get('input', {}), default=str)[:500]}]" + f"input={json.dumps(block.get('input', {}), default=str)}]" ) elif btype == "tool_result": - parts.append( - f"[tool_result: {json.dumps(block.get('content', ''), default=str)[:500]}]" - ) + parts.append(f"[tool_result: {json.dumps(block.get('content', ''), default=str)}]") else: - parts.append(f"[{btype}]") + parts.append(f"[{btype}: {json.dumps(block, default=str)}]") return "\n".join(p for p in parts if p) return str(content) @@ -127,19 +130,32 @@ def parse_claude_jsonl(path: str) -> Iterator[dict]: # ── Cursor resolution ──────────────────────────────────────────────── + def get_palace_cursor(collection, session_id: str) -> Optional[str]: """Return the max timestamp of drawers for this session_id, or None. ISO-8601 strings compare lexically in the right order, so we don't - need to parse them. Query scans metadatas for the session (ChromaDB - where-filter), then reduces. + need to parse them. Query scans metadatas for the session via the + backend's where-filter, then reduces. + + Backend errors are logged at WARNING and surface as a `None` cursor — + which makes the caller treat the session as empty and ingest every + message. That's intentional: a no-cursor sweep is recovered from on + the next run by deterministic drawer IDs, so a degraded cursor never + causes silent data loss. """ try: data = collection.get( where={"session_id": session_id}, include=["metadatas"], ) - except Exception: + except Exception as exc: + logger.warning( + "sweeper: cursor lookup failed for session_id=%s (%s); " + "treating as empty — drawers will be re-upserted idempotently.", + session_id, + exc, + ) return None metas = data.get("metadatas") or [] timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")] @@ -150,13 +166,18 @@ def get_palace_cursor(collection, session_id: str) -> Optional[str]: # ── Sweep ──────────────────────────────────────────────────────────── + def _drawer_id_for_message(session_id: str, message_uuid: str) -> str: - """Deterministic drawer ID so upserts at the same message are no-ops.""" - return f"sweep_{session_id[:12]}_{message_uuid}" + """Deterministic drawer ID so upserts at the same message are no-ops. + + Uses the full session_id (not a prefix) to avoid any cross-session + collision risk if a transcript source ever uses non-UUID session + identifiers or shares prefixes across sessions. + """ + return f"sweep_{session_id}_{message_uuid}" -def sweep(jsonl_path: str, palace_path: str, - source_label: Optional[str] = None) -> dict: +def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) -> dict: """Ingest every user/assistant message not already represented. For each message in the jsonl: @@ -241,23 +262,29 @@ def sweep_directory(dir_path: str, palace_path: str) -> dict: total_skipped = 0 per_file = [] + failures: list[dict] = [] for f in files: try: result = sweep(str(f), palace_path, source_label=str(f)) except Exception as exc: - print(f" ⚠ sweep failed on {f}: {exc}", file=sys.stderr) + logger.error("sweeper: sweep failed on %s: %s", f, exc) + print(f" \u26a0 sweep failed on {f}: {exc}", file=sys.stderr) + failures.append({"file": str(f), "error": str(exc)}) continue total_added += result["drawers_added"] total_skipped += result["drawers_skipped"] - per_file.append({ - "file": str(f), - "added": result["drawers_added"], - "skipped": result["drawers_skipped"], - }) + per_file.append( + { + "file": str(f), + "added": result["drawers_added"], + "skipped": result["drawers_skipped"], + } + ) return { "files_processed": len(per_file), "drawers_added": total_added, "drawers_skipped": total_skipped, "per_file": per_file, + "failures": failures, } diff --git a/tests/test_miner_jsonl_visibility.py b/tests/test_miner_jsonl_visibility.py index 2ef3e18..9db7fb5 100644 --- a/tests/test_miner_jsonl_visibility.py +++ b/tests/test_miner_jsonl_visibility.py @@ -108,9 +108,11 @@ class TestJsonlNotSilentlySkipped: def fake_stat(self, *args, **kwargs): result = real_stat(self, *args, **kwargs) if self.name == "big_transcript.jsonl": + class _FakeStat: st_size = fake_size st_mode = result.st_mode + return _FakeStat() return result diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py index 247125a..4ac8325 100644 --- a/tests/test_sweeper.py +++ b/tests/test_sweeper.py @@ -16,8 +16,6 @@ This test file is TDD — written BEFORE mempalace/sweeper.py exists. """ import json -import tempfile -from pathlib import Path import pytest @@ -28,27 +26,45 @@ def mock_claude_jsonl(tmp_path): path = tmp_path / "session_abc.jsonl" lines = [ # Noise: progress event, no message - {"type": "progress", "timestamp": "2026-04-18T10:00:00Z", - "sessionId": "abc", "uuid": "p-1"}, + { + "type": "progress", + "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "abc", + "uuid": "p-1", + }, # User message - {"type": "user", "timestamp": "2026-04-18T10:00:05Z", - "sessionId": "abc", "uuid": "u-1", - "message": {"role": "user", "content": "What's the capital of France?"}}, + { + "type": "user", + "timestamp": "2026-04-18T10:00:05Z", + "sessionId": "abc", + "uuid": "u-1", + "message": {"role": "user", "content": "What's the capital of France?"}, + }, # Assistant reply - {"type": "assistant", "timestamp": "2026-04-18T10:00:06Z", - "sessionId": "abc", "uuid": "a-1", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "Paris."}]}}, + { + "type": "assistant", + "timestamp": "2026-04-18T10:00:06Z", + "sessionId": "abc", + "uuid": "a-1", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Paris."}]}, + }, # Noise: file-history-snapshot {"type": "file-history-snapshot", "messageId": "abc-snap"}, # Second user/assistant exchange - {"type": "user", "timestamp": "2026-04-18T10:01:00Z", - "sessionId": "abc", "uuid": "u-2", - "message": {"role": "user", "content": "And of Germany?"}}, - {"type": "assistant", "timestamp": "2026-04-18T10:01:01Z", - "sessionId": "abc", "uuid": "a-2", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "Berlin."}]}}, + { + "type": "user", + "timestamp": "2026-04-18T10:01:00Z", + "sessionId": "abc", + "uuid": "u-2", + "message": {"role": "user", "content": "And of Germany?"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T10:01:01Z", + "sessionId": "abc", + "uuid": "a-2", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Berlin."}]}, + }, ] path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") return path @@ -57,6 +73,7 @@ def mock_claude_jsonl(tmp_path): class TestSweeperParsing: def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl): from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) roles = [r["role"] for r in records] assert roles == ["user", "assistant", "user", "assistant"], ( @@ -67,6 +84,7 @@ class TestSweeperParsing: def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl): from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) first = records[0] assert first["session_id"] == "abc" @@ -75,12 +93,52 @@ class TestSweeperParsing: def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl): from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) assistant_rec = records[1] assert assistant_rec["role"] == "assistant" - assert "Paris" in assistant_rec["content"], ( - f"Assistant content blocks must be flattened to text; " - f"got: {assistant_rec['content']!r}" + assert ( + "Paris" in assistant_rec["content"] + ), f"Assistant content blocks must be flattened to text; got: {assistant_rec['content']!r}" + + def test_parse_preserves_tool_blocks_verbatim(self, tmp_path): + """Per the design principle "verbatim always", tool_use and + tool_result blocks must NOT be truncated. A long tool input + (e.g. a large diff handed to a code-edit tool) must round-trip + in full, otherwise we silently lose user-adjacent data. + """ + import json as _json + + from mempalace.sweeper import parse_claude_jsonl + + big_input = {"diff": "x" * 5000} # well past the old 500-char cap + path = tmp_path / "session_tools.jsonl" + path.write_text( + _json.dumps( + { + "type": "assistant", + "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "tools-1", + "uuid": "a-tool", + "message": { + "role": "assistant", + "content": [ + {"type": "tool_use", "name": "Edit", "input": big_input}, + ], + }, + } + ) + + "\n" + ) + + records = list(parse_claude_jsonl(str(path))) + assert len(records) == 1 + content = records[0]["content"] + # The full 5000-char value must be present — no truncation marker, + # no [:500] slice. Look for the raw string in the serialized form. + assert big_input["diff"] in content, ( + "tool_use input was truncated. The verbatim guarantee requires " + f"the full payload to round-trip. Got len={len(content)}." ) @@ -89,6 +147,7 @@ class TestSweeperTandem: def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path): from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") result = sweep(str(mock_claude_jsonl), palace_path) assert result["drawers_added"] == 4, ( @@ -99,6 +158,7 @@ class TestSweeperTandem: def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path): """Running the sweep twice must not duplicate drawers.""" from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") first = sweep(str(mock_claude_jsonl), palace_path) second = sweep(str(mock_claude_jsonl), palace_path) @@ -116,13 +176,20 @@ class TestSweeperTandem: jsonl_path = tmp_path / "session.jsonl" lines = [ - {"type": "user", "timestamp": "2026-04-18T09:00:00Z", - "sessionId": "s1", "uuid": "u1", - "message": {"role": "user", "content": "first"}}, - {"type": "assistant", "timestamp": "2026-04-18T09:00:01Z", - "sessionId": "s1", "uuid": "a1", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "one"}]}}, + { + "type": "user", + "timestamp": "2026-04-18T09:00:00Z", + "sessionId": "s1", + "uuid": "u1", + "message": {"role": "user", "content": "first"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T09:00:01Z", + "sessionId": "s1", + "uuid": "a1", + "message": {"role": "assistant", "content": [{"type": "text", "text": "one"}]}, + }, ] jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") @@ -132,13 +199,20 @@ class TestSweeperTandem: # Append two more exchanges simulating live session growth. more_lines = [ - {"type": "user", "timestamp": "2026-04-18T09:05:00Z", - "sessionId": "s1", "uuid": "u2", - "message": {"role": "user", "content": "second"}}, - {"type": "assistant", "timestamp": "2026-04-18T09:05:01Z", - "sessionId": "s1", "uuid": "a2", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "two"}]}}, + { + "type": "user", + "timestamp": "2026-04-18T09:05:00Z", + "sessionId": "s1", + "uuid": "u2", + "message": {"role": "user", "content": "second"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T09:05:01Z", + "sessionId": "s1", + "uuid": "a2", + "message": {"role": "assistant", "content": [{"type": "text", "text": "two"}]}, + }, ] with open(jsonl_path, "a") as f: for x in more_lines: @@ -156,8 +230,7 @@ class TestSweeperDrawerMetadata: """Each drawer must carry the metadata the tandem-miner coordination depends on: session_id, timestamp, uuid, role.""" - def test_drawer_has_session_id_and_timestamp_metadata( - self, mock_claude_jsonl, tmp_path): + def test_drawer_has_session_id_and_timestamp_metadata(self, mock_claude_jsonl, tmp_path): from mempalace.sweeper import sweep from mempalace.palace import get_collection @@ -170,15 +243,10 @@ class TestSweeperDrawerMetadata: assert metas, "No drawers written" for m in metas: - assert m.get("session_id") == "abc", ( - f"Drawer missing session_id metadata: {m}" - ) - assert m.get("timestamp"), ( - f"Drawer missing timestamp metadata: {m}" - ) - assert m.get("message_uuid"), ( - f"Drawer missing message_uuid metadata: {m}" - ) - assert m.get("role") in ("user", "assistant"), ( - f"Drawer missing or wrong role metadata: {m}" - ) + assert m.get("session_id") == "abc", f"Drawer missing session_id metadata: {m}" + assert m.get("timestamp"), f"Drawer missing timestamp metadata: {m}" + assert m.get("message_uuid"), f"Drawer missing message_uuid metadata: {m}" + assert m.get("role") in ( + "user", + "assistant", + ), f"Drawer missing or wrong role metadata: {m}"