From fed69935d343aba8f96b222b0f1f23e21e86583d Mon Sep 17 00:00:00 2001 From: MSL <232237854+milla-jovovich@users.noreply.github.com> Date: Sat, 18 Apr 2026 07:51:10 -0700 Subject: [PATCH] Add tandem sweeper: message-level safety net for dropped transcripts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The primary miners (miner.py, convo_miner.py) operate at file granularity and can drop data for several reasons: size caps, silent OSError on read, dedup false positives, extensions the project miner does not recognize. Even with tonight's hotfixes, any future bug in the file-level path risks silent data loss. The sweeper is a second, cooperating miner that works at MESSAGE granularity: - Parses Claude Code .jsonl line by line, yielding only user/assistant records (filters progress, file-history-snapshot, etc. noise). - For each session_id, queries the palace for max(timestamp) and treats that as the cursor. - Ingests only messages newer than the cursor, as one small drawer per exchange (never hits a size cap — each drawer is 1-5 KB). - Deterministic drawer IDs from session_id + message UUID make reruns idempotent; crash mid-sweep is safe. Tandem coordination is free: if the primary miner committed up to timestamp T, the sweeper resumes from T. If the primary miner missed everything, the sweeper catches it all. Neither duplicates the other. Smoke test on a real Claude Code transcript: 1st run: +39 drawers, 0 already present 2nd run: +0 drawers, 39 already present (perfect idempotence) Opt-in via: mempalace sweep mempalace sweep No changes to existing miners. No schema migration. Purely additive. Tests: tests/test_sweeper.py (7 tests covering parsing, tandem coordination, idempotency, resume-from-cursor, metadata correctness). Co-Authored-By: Claude Opus 4.7 (1M context) --- mempalace/cli.py | 41 +++++++ mempalace/sweeper.py | 263 ++++++++++++++++++++++++++++++++++++++++++ tests/test_sweeper.py | 184 +++++++++++++++++++++++++++++ 3 files changed, 488 insertions(+) create mode 100644 mempalace/sweeper.py create mode 100644 tests/test_sweeper.py diff --git a/mempalace/cli.py b/mempalace/cli.py index 69cd244..f1bc919 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -146,6 +146,35 @@ def cmd_mine(args): ) +def cmd_sweep(args): + """Sweep a transcript file or directory for anything the primary + miner missed. Coordinates via max(timestamp) per session_id, so + this is safe to run alongside the file-level miners — neither + duplicates the other's work. + """ + from .sweeper import sweep, sweep_directory + + palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path + target = os.path.expanduser(args.target) + + if os.path.isfile(target): + result = sweep(target, palace_path) + print( + f" Swept {target}: +{result['drawers_added']} drawers, " + f"{result['drawers_skipped']} already present." + ) + elif os.path.isdir(target): + result = sweep_directory(target, palace_path) + print( + f" Swept {result['files_processed']} files from {target}: " + f"+{result['drawers_added']} drawers, " + f"{result['drawers_skipped']} already present." + ) + else: + print(f" ✗ Not a file or directory: {target}", file=sys.stderr) + sys.exit(1) + + def cmd_search(args): from .searcher import search, SearchError @@ -547,6 +576,17 @@ def main(): help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)", ) + # sweep + p_sweep = sub.add_parser( + "sweep", + help="Tandem miner: catch anything the primary miner missed " + "(message-level, timestamp-coordinated, idempotent)", + ) + p_sweep.add_argument( + "target", + help="A .jsonl transcript file, or a directory to scan recursively", + ) + # search p_search = sub.add_parser("search", help="Find anything, exact words") p_search.add_argument("query", help="What to search for") @@ -679,6 +719,7 @@ def main(): "mine": cmd_mine, "split": cmd_split, "search": cmd_search, + "sweep": cmd_sweep, "mcp": cmd_mcp, "compress": cmd_compress, "wake-up": cmd_wakeup, diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py new file mode 100644 index 0000000..a46876b --- /dev/null +++ b/mempalace/sweeper.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +sweeper.py — Tandem miner that guarantees no conversation is silently +dropped. + +Works alongside miner.py / convo_miner.py via timestamp coordination: + + For each session in the transcript dir: + cursor = max(timestamp of drawers with matching session_id, "") + For each user/assistant message in the jsonl with timestamp > cursor: + write one small drawer (message_uuid as deterministic ID) + +Properties: + - Idempotent: rerunning on a fully-mined palace is a no-op. + - Resume-safe: crash mid-sweep → next run picks up from max-timestamp. + - Coordinates with primary miners for free: whichever got further + advances the cursor; the other starts from there next time. + - No size caps: each drawer holds one exchange, ~1-5 KB. + +Usage: + from mempalace.sweeper import sweep + result = sweep("/path/to/session.jsonl", "/path/to/palace") + # result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts} +""" + +from __future__ import annotations + +import json +import sys +from datetime import datetime +from pathlib import Path +from typing import Iterator, Optional + +from .palace import get_collection + + +# ── JSONL parsing ──────────────────────────────────────────────────── + +def _flatten_content(content) -> str: + """Normalize Claude Code's message content to a plain string. + + User messages are strings already; assistant messages are a list of + content blocks like [{"type": "text", "text": "..."}, {"type": + "tool_use", ...}]. We keep text blocks verbatim and describe non-text + blocks as a marker so the drawer carries a faithful record. + """ + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text": + parts.append(block.get("text", "")) + elif btype == "tool_use": + parts.append( + f"[tool_use: {block.get('name', '?')} " + f"input={json.dumps(block.get('input', {}), default=str)[:500]}]" + ) + elif btype == "tool_result": + parts.append( + f"[tool_result: {json.dumps(block.get('content', ''), default=str)[:500]}]" + ) + else: + parts.append(f"[{btype}]") + return "\n".join(p for p in parts if p) + return str(content) + + +def parse_claude_jsonl(path: str) -> Iterator[dict]: + """Yield user/assistant records from a Claude Code .jsonl file. + + Each yield is: + { + "session_id": str, + "uuid": str, # per-message UUID + "timestamp": str, # ISO 8601 + "role": "user" | "assistant", + "content": str, # flattened text + } + + Non-message records (progress, file-history-snapshot, system, + queue-operation, last-prompt) are filtered out. Malformed lines are + skipped silently — data quality is the transcript writer's problem, + not ours. + """ + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + rtype = record.get("type") + if rtype not in ("user", "assistant"): + continue + msg = record.get("message") or {} + if not isinstance(msg, dict): + continue + role = msg.get("role") + if role not in ("user", "assistant"): + continue + timestamp = record.get("timestamp") + if not timestamp: + continue + uuid = record.get("uuid") + if not uuid: + continue + session_id = record.get("sessionId") or record.get("session_id") + if not session_id: + continue + content = _flatten_content(msg.get("content", "")) + if not content.strip(): + continue + yield { + "session_id": session_id, + "uuid": uuid, + "timestamp": timestamp, + "role": role, + "content": content, + } + + +# ── Cursor resolution ──────────────────────────────────────────────── + +def get_palace_cursor(collection, session_id: str) -> Optional[str]: + """Return the max timestamp of drawers for this session_id, or None. + + ISO-8601 strings compare lexically in the right order, so we don't + need to parse them. Query scans metadatas for the session (ChromaDB + where-filter), then reduces. + """ + try: + data = collection.get( + where={"session_id": session_id}, + include=["metadatas"], + ) + except Exception: + return None + metas = data.get("metadatas") or [] + timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")] + if not timestamps: + return None + return max(timestamps) + + +# ── Sweep ──────────────────────────────────────────────────────────── + +def _drawer_id_for_message(session_id: str, message_uuid: str) -> str: + """Deterministic drawer ID so upserts at the same message are no-ops.""" + return f"sweep_{session_id[:12]}_{message_uuid}" + + +def sweep(jsonl_path: str, palace_path: str, + source_label: Optional[str] = None) -> dict: + """Ingest every user/assistant message not already represented. + + For each message in the jsonl: + - If timestamp <= cursor for that session, skip (already saved by + us or by primary miner). + - Else, upsert a drawer with deterministic ID so reruns dedupe. + + Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}. + """ + collection = get_collection(palace_path, create=True) + cursors: dict = {} + + drawers_added = 0 + drawers_skipped = 0 + + batch_ids = [] + batch_docs = [] + batch_metas = [] + BATCH_SIZE = 64 + + def _flush(): + nonlocal drawers_added + if not batch_ids: + return + collection.upsert( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + ) + drawers_added += len(batch_ids) + batch_ids.clear() + batch_docs.clear() + batch_metas.clear() + + for rec in parse_claude_jsonl(jsonl_path): + sid = rec["session_id"] + if sid not in cursors: + cursors[sid] = get_palace_cursor(collection, sid) + + cursor = cursors[sid] + if cursor is not None and rec["timestamp"] <= cursor: + drawers_skipped += 1 + continue + + drawer_id = _drawer_id_for_message(sid, rec["uuid"]) + document = f"{rec['role'].upper()}: {rec['content']}" + metadata = { + "session_id": sid, + "timestamp": rec["timestamp"], + "message_uuid": rec["uuid"], + "role": rec["role"], + "source_file": source_label or jsonl_path, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "sweep", + } + + batch_ids.append(drawer_id) + batch_docs.append(document) + batch_metas.append(metadata) + + if len(batch_ids) >= BATCH_SIZE: + _flush() + + _flush() + + return { + "drawers_added": drawers_added, + "drawers_skipped": drawers_skipped, + "cursor_by_session": cursors, + } + + +def sweep_directory(dir_path: str, palace_path: str) -> dict: + """Sweep every .jsonl file in a directory (recursive). + + Returns aggregated summary across all files. + """ + dir_p = Path(dir_path).expanduser().resolve() + files = sorted(dir_p.rglob("*.jsonl")) + + total_added = 0 + total_skipped = 0 + per_file = [] + + for f in files: + try: + result = sweep(str(f), palace_path, source_label=str(f)) + except Exception as exc: + print(f" ⚠ sweep failed on {f}: {exc}", file=sys.stderr) + continue + total_added += result["drawers_added"] + total_skipped += result["drawers_skipped"] + per_file.append({ + "file": str(f), + "added": result["drawers_added"], + "skipped": result["drawers_skipped"], + }) + + return { + "files_processed": len(per_file), + "drawers_added": total_added, + "drawers_skipped": total_skipped, + "per_file": per_file, + } diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py new file mode 100644 index 0000000..247125a --- /dev/null +++ b/tests/test_sweeper.py @@ -0,0 +1,184 @@ +"""TDD: tandem sweeper that catches what the primary miner missed. + +The primary miner (miner.py / convo_miner.py) runs at file granularity +and can drop data (size caps, silent OSError, dedup false-positives). +The sweeper is a second miner that works at MESSAGE granularity, +using timestamp as the coordination cursor. + +For each session in the transcript directory: + 1. Look up max(timestamp) across all drawers with matching session_id + 2. Stream the jsonl, yielding only user/assistant messages after the cursor + 3. Write one small drawer per message with: + session_id, uuid, timestamp, role, content + 4. Idempotent: re-running sweeps should find nothing new on a complete palace. + +This test file is TDD — written BEFORE mempalace/sweeper.py exists. +""" + +import json +import tempfile +from pathlib import Path + +import pytest + + +@pytest.fixture +def mock_claude_jsonl(tmp_path): + """Real Claude Code jsonl shape: user/assistant records among progress noise.""" + path = tmp_path / "session_abc.jsonl" + lines = [ + # Noise: progress event, no message + {"type": "progress", "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "abc", "uuid": "p-1"}, + # User message + {"type": "user", "timestamp": "2026-04-18T10:00:05Z", + "sessionId": "abc", "uuid": "u-1", + "message": {"role": "user", "content": "What's the capital of France?"}}, + # Assistant reply + {"type": "assistant", "timestamp": "2026-04-18T10:00:06Z", + "sessionId": "abc", "uuid": "a-1", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "Paris."}]}}, + # Noise: file-history-snapshot + {"type": "file-history-snapshot", "messageId": "abc-snap"}, + # Second user/assistant exchange + {"type": "user", "timestamp": "2026-04-18T10:01:00Z", + "sessionId": "abc", "uuid": "u-2", + "message": {"role": "user", "content": "And of Germany?"}}, + {"type": "assistant", "timestamp": "2026-04-18T10:01:01Z", + "sessionId": "abc", "uuid": "a-2", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "Berlin."}]}}, + ] + path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + return path + + +class TestSweeperParsing: + def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + roles = [r["role"] for r in records] + assert roles == ["user", "assistant", "user", "assistant"], ( + f"Expected 4 user/assistant in order, got {roles}. " + "Noise records (progress, file-history-snapshot) must be " + "filtered out." + ) + + def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + first = records[0] + assert first["session_id"] == "abc" + assert first["timestamp"] == "2026-04-18T10:00:05Z" + assert first["uuid"] == "u-1" + + def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + assistant_rec = records[1] + assert assistant_rec["role"] == "assistant" + assert "Paris" in assistant_rec["content"], ( + f"Assistant content blocks must be flattened to text; " + f"got: {assistant_rec['content']!r}" + ) + + +class TestSweeperTandem: + """The sweeper coordinates with other miners via max(timestamp).""" + + def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") + result = sweep(str(mock_claude_jsonl), palace_path) + assert result["drawers_added"] == 4, ( + f"Empty palace: all 4 user/assistant messages should ingest. " + f"Got drawers_added={result['drawers_added']}." + ) + + def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path): + """Running the sweep twice must not duplicate drawers.""" + from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") + first = sweep(str(mock_claude_jsonl), palace_path) + second = sweep(str(mock_claude_jsonl), palace_path) + assert first["drawers_added"] == 4 + assert second["drawers_added"] == 0, ( + f"Second sweep must be a no-op on unchanged data. " + f"Got drawers_added={second['drawers_added']} — " + "cursor logic is broken." + ) + + def test_sweep_resumes_from_cursor(self, tmp_path): + """If half the messages are already in the palace, sweep picks up + only the later half.""" + from mempalace.sweeper import sweep + + jsonl_path = tmp_path / "session.jsonl" + lines = [ + {"type": "user", "timestamp": "2026-04-18T09:00:00Z", + "sessionId": "s1", "uuid": "u1", + "message": {"role": "user", "content": "first"}}, + {"type": "assistant", "timestamp": "2026-04-18T09:00:01Z", + "sessionId": "s1", "uuid": "a1", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "one"}]}}, + ] + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + first = sweep(str(jsonl_path), palace_path) + assert first["drawers_added"] == 2 + + # Append two more exchanges simulating live session growth. + more_lines = [ + {"type": "user", "timestamp": "2026-04-18T09:05:00Z", + "sessionId": "s1", "uuid": "u2", + "message": {"role": "user", "content": "second"}}, + {"type": "assistant", "timestamp": "2026-04-18T09:05:01Z", + "sessionId": "s1", "uuid": "a2", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "two"}]}}, + ] + with open(jsonl_path, "a") as f: + for x in more_lines: + f.write(json.dumps(x) + "\n") + + second = sweep(str(jsonl_path), palace_path) + assert second["drawers_added"] == 2, ( + f"Second sweep should pick up only the 2 new exchanges, " + f"got {second['drawers_added']}. Cursor (max-timestamp) " + "coordination is broken." + ) + + +class TestSweeperDrawerMetadata: + """Each drawer must carry the metadata the tandem-miner coordination + depends on: session_id, timestamp, uuid, role.""" + + def test_drawer_has_session_id_and_timestamp_metadata( + self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + from mempalace.palace import get_collection + + palace_path = str(tmp_path / "palace") + sweep(str(mock_claude_jsonl), palace_path) + + col = get_collection(palace_path, create=False) + data = col.get(include=["metadatas"]) + metas = data["metadatas"] + assert metas, "No drawers written" + + for m in metas: + assert m.get("session_id") == "abc", ( + f"Drawer missing session_id metadata: {m}" + ) + assert m.get("timestamp"), ( + f"Drawer missing timestamp metadata: {m}" + ) + assert m.get("message_uuid"), ( + f"Drawer missing message_uuid metadata: {m}" + ) + assert m.get("role") in ("user", "assistant"), ( + f"Drawer missing or wrong role metadata: {m}" + )