Merge pull request #998 from MemPalace/fix/silent-transcript-drop

Fix silent transcript drop: .jsonl ingestion + 500 MB cap + tandem sweeper
This commit is contained in:
Igor Lins e Silva
2026-04-18 13:38:02 -03:00
committed by GitHub
7 changed files with 893 additions and 2 deletions
+54
View File
@@ -146,6 +146,48 @@ def cmd_mine(args):
)
def cmd_sweep(args):
"""Sweep a transcript file or directory.
The sweeper deduplicates against its own prior writes via
deterministic drawer IDs + a timestamp cursor. It does NOT currently
coordinate with the file-level miners (miner.py / convo_miner.py) —
those produce char-chunked drawers without compatible message
metadata, so running both miners may store overlapping content under
different IDs.
"""
from .sweeper import sweep, sweep_directory
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
target = os.path.expanduser(args.target)
if os.path.isfile(target):
result = sweep(target, palace_path)
print(
f" Swept {target}: +{result['drawers_added']} new, "
f"{result['drawers_already_present']} already present, "
f"{result['drawers_skipped']} skipped (< cursor)."
)
elif os.path.isdir(target):
result = sweep_directory(target, palace_path)
print(
f" Swept {result['files_succeeded']}/{result['files_attempted']} "
f"files from {target}: +{result['drawers_added']} new, "
f"{result['drawers_already_present']} already present, "
f"{result['drawers_skipped']} skipped (< cursor)."
)
failures = result.get("failures") or []
if failures:
print(
f"{len(failures)} file(s) failed to sweep — see stderr / logs for details.",
file=sys.stderr,
)
sys.exit(2)
else:
print(f" ✗ Not a file or directory: {target}", file=sys.stderr)
sys.exit(1)
def cmd_search(args):
from .searcher import search, SearchError
@@ -547,6 +589,17 @@ def main():
help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)",
)
# sweep
p_sweep = sub.add_parser(
"sweep",
help="Tandem miner: catch anything the primary miner missed "
"(message-level, timestamp-coordinated, idempotent)",
)
p_sweep.add_argument(
"target",
help="A .jsonl transcript file, or a directory to scan recursively",
)
# search
p_search = sub.add_parser("search", help="Find anything, exact words")
p_search.add_argument("query", help="What to search for")
@@ -679,6 +732,7 @@ def main():
"mine": cmd_mine,
"split": cmd_split,
"search": cmd_search,
"sweep": cmd_sweep,
"mcp": cmd_mcp,
"compress": cmd_compress,
"wake-up": cmd_wakeup,
+8 -1
View File
@@ -55,7 +55,14 @@ CONVO_EXTENSIONS = {
MIN_CHUNK_SIZE = 30
CHUNK_SIZE = 800 # chars per drawer — align with miner.py
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this.
# Matches miner.py at 500 MB. Long Claude Code sessions, multi-year
# ChatGPT exports, and lifetime Slack dumps routinely exceed 10 MB; the
# cap at that level silently dropped them with `continue`. Per-drawer
# size is bounded by CHUNK_SIZE, but larger source files still produce
# more drawers and therefore more embedding/storage work — and content
# is normalized and loaded fully into memory before chunking, so memory
# use also scales with source size.
def _register_file(collection, source_file: str, wing: str, agent: str):
+9 -1
View File
@@ -36,6 +36,7 @@ READABLE_EXTENSIONS = {
".jsx",
".tsx",
".json",
".jsonl",
".yaml",
".yml",
".html",
@@ -62,7 +63,14 @@ SKIP_FILENAMES = {
CHUNK_SIZE = 800 # chars per drawer
CHUNK_OVERLAP = 100 # overlap between chunks
MIN_CHUNK_SIZE = 50 # skip tiny chunks
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this.
# Long Claude Code sessions and large transcript exports routinely exceed
# 10 MB. The cap exists as a defensive rail against pathological binary
# files, not as a limit on legitimate text. Per-drawer size is bounded
# by CHUNK_SIZE, but larger sources still produce proportionally more
# drawers and therefore more storage, embedding, and processing work —
# and file reads are not streamed (the whole content is loaded into
# memory before chunking), so memory use scales with source size too.
# =============================================================================
+347
View File
@@ -0,0 +1,347 @@
#!/usr/bin/env python3
"""
sweeper.py — Message-granular miner that catches what the file-level
primary miners dropped.
Algorithm, per session:
cursor = max(timestamp of sweeper-written drawers for this session_id)
For each user/assistant message in the jsonl:
if cursor is not None and message.timestamp < cursor: skip
else: upsert a drawer keyed by (session_id, message_uuid)
Properties:
- Idempotent on its own writes: rerunning is a no-op because drawer
IDs are deterministic and existence is pre-checked before counting.
- Resume-safe: a crash mid-sweep is recovered on the next run — the
cursor advances to the last ingested timestamp and re-attempts at
that boundary are de-duped by the deterministic ID.
- Tie-break safe: uses ``< cursor`` (not ``<=``), so if multiple
messages share the max timestamp and only some were ingested, the
rest are still picked up on re-run.
- No size caps: each drawer holds one exchange, ~1-5 KB.
Coordination with the primary file-level miners (``miner.py`` /
``convo_miner.py``) is limited: those miners chunk at a fixed char size
and do not currently stamp ``session_id``/``timestamp`` metadata that
the sweeper can key off. In practice the sweeper coordinates with its
own prior runs, and may ingest content that also got chunked into
primary-miner drawers (under different IDs). Follow-up: add uniform
``ingest_mode`` + message metadata to the primary miners so dedup spans
both paths.
Usage:
from mempalace.sweeper import sweep
result = sweep("/path/to/session.jsonl", "/path/to/palace")
"""
from __future__ import annotations
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Iterator, Optional
from .palace import get_collection
logger = logging.getLogger(__name__)
# ── JSONL parsing ────────────────────────────────────────────────────
def _flatten_content(content) -> str:
"""Normalize Claude Code's message content to a plain string.
User messages are strings already; assistant messages are a list of
content blocks like [{"type": "text", "text": "..."}, {"type":
"tool_use", ...}]. All blocks are preserved verbatim — the design
principle is "verbatim always", so tool inputs and results are
serialized in full, never truncated.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type", "")
if btype == "text":
parts.append(block.get("text", ""))
elif btype == "tool_use":
parts.append(
f"[tool_use: {block.get('name', '?')} "
f"input={json.dumps(block.get('input', {}), default=str)}]"
)
elif btype == "tool_result":
parts.append(f"[tool_result: {json.dumps(block.get('content', ''), default=str)}]")
else:
parts.append(f"[{btype}: {json.dumps(block, default=str)}]")
return "\n".join(p for p in parts if p)
return str(content)
def parse_claude_jsonl(path: str) -> Iterator[dict]:
"""Yield user/assistant records from a Claude Code .jsonl file.
Each yield is:
{
"session_id": str,
"uuid": str, # per-message UUID
"timestamp": str, # ISO 8601
"role": "user" | "assistant",
"content": str, # flattened text
}
Non-message records (progress, file-history-snapshot, system,
queue-operation, last-prompt) are filtered out. Malformed lines are
skipped silently — data quality is the transcript writer's problem,
not ours.
"""
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
rtype = record.get("type")
if rtype not in ("user", "assistant"):
continue
msg = record.get("message") or {}
if not isinstance(msg, dict):
continue
role = msg.get("role")
if role not in ("user", "assistant"):
continue
timestamp = record.get("timestamp")
if not timestamp:
continue
uuid = record.get("uuid")
if not uuid:
continue
session_id = record.get("sessionId") or record.get("session_id")
if not session_id:
continue
content = _flatten_content(msg.get("content", ""))
if not content.strip():
continue
yield {
"session_id": session_id,
"uuid": uuid,
"timestamp": timestamp,
"role": role,
"content": content,
}
# ── Cursor resolution ────────────────────────────────────────────────
def get_palace_cursor(collection, session_id: str) -> Optional[str]:
"""Return the max timestamp of drawers for this session_id, or None.
ISO-8601 strings compare lexically in the right order, so we don't
need to parse them. Query scans metadatas for the session via the
backend's where-filter, then reduces.
Backend errors are logged at WARNING and surface as a `None` cursor —
which makes the caller treat the session as empty and ingest every
message. That's intentional: a no-cursor sweep is recovered from on
the next run by deterministic drawer IDs, so a degraded cursor never
causes silent data loss.
"""
try:
data = collection.get(
where={"session_id": session_id},
include=["metadatas"],
)
except Exception as exc:
logger.warning(
"sweeper: cursor lookup failed for session_id=%s (%s); "
"treating as empty — drawers will be re-upserted idempotently.",
session_id,
exc,
)
return None
metas = data.get("metadatas") or []
timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")]
if not timestamps:
return None
return max(timestamps)
# ── Sweep ────────────────────────────────────────────────────────────
def _drawer_id_for_message(session_id: str, message_uuid: str) -> str:
"""Deterministic drawer ID so upserts at the same message are no-ops.
Uses the full session_id (not a prefix) to avoid any cross-session
collision risk if a transcript source ever uses non-UUID session
identifiers or shares prefixes across sessions.
"""
return f"sweep_{session_id}_{message_uuid}"
def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) -> dict:
"""Ingest every user/assistant message not already represented.
For each message in the jsonl:
- If timestamp < cursor for that session, skip (strictly earlier
than anything already in the palace — already covered).
- At timestamp == cursor we do NOT skip, because multiple messages
can share the same ISO-8601 timestamp; if only some of them were
ingested before a crash, a `<= cursor` skip would lose the rest
forever. Deterministic drawer IDs make re-attempting at the
cursor boundary safe (existing rows are found via a pre-flight
`get(ids=...)` and counted as "already present", not "added").
- Else, upsert a drawer with deterministic ID so reruns dedupe.
Returns ``{drawers_added, drawers_already_present, drawers_skipped,
drawers_upserted, cursor_by_session}``:
* ``drawers_added`` — rows that did not exist before this sweep.
* ``drawers_already_present`` — rows whose deterministic ID was
already in the palace and got rewritten idempotently.
* ``drawers_skipped`` — records skipped by the cursor (strictly
earlier than what's already stored).
* ``drawers_upserted`` — total writes = added + already_present.
"""
collection = get_collection(palace_path, create=True)
cursors: dict = {}
drawers_added = 0
drawers_already_present = 0
drawers_skipped = 0
batch_ids: list[str] = []
batch_docs: list[str] = []
batch_metas: list[dict] = []
BATCH_SIZE = 64
def _flush():
nonlocal drawers_added, drawers_already_present
if not batch_ids:
return
# Pre-flight: which IDs in this batch are already present?
# Upsert is idempotent on data but counts as "added" would lie;
# this pre-query makes the metric honest (Copilot PR 998 review).
try:
existing = collection.get(ids=list(batch_ids), include=[])
# Chroma returns a dict; typed backends return GetResult — the
# compat shim makes ``.get("ids")`` work on both.
present = set(existing.get("ids") or [])
except Exception as exc:
logger.warning(
"sweeper: existence pre-check failed (%s); "
"counting all batch rows as new (metric may over-count on reruns).",
exc,
)
present = set()
new_count = sum(1 for rid in batch_ids if rid not in present)
already_count = len(batch_ids) - new_count
collection.upsert(
ids=batch_ids,
documents=batch_docs,
metadatas=batch_metas,
)
drawers_added += new_count
drawers_already_present += already_count
batch_ids.clear()
batch_docs.clear()
batch_metas.clear()
for rec in parse_claude_jsonl(jsonl_path):
sid = rec["session_id"]
if sid not in cursors:
cursors[sid] = get_palace_cursor(collection, sid)
cursor = cursors[sid]
if cursor is not None and rec["timestamp"] < cursor:
drawers_skipped += 1
continue
drawer_id = _drawer_id_for_message(sid, rec["uuid"])
document = f"{rec['role'].upper()}: {rec['content']}"
metadata = {
"session_id": sid,
"timestamp": rec["timestamp"],
"message_uuid": rec["uuid"],
"role": rec["role"],
"source_file": source_label or jsonl_path,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "sweep",
}
batch_ids.append(drawer_id)
batch_docs.append(document)
batch_metas.append(metadata)
if len(batch_ids) >= BATCH_SIZE:
_flush()
_flush()
return {
"drawers_added": drawers_added,
"drawers_already_present": drawers_already_present,
"drawers_upserted": drawers_added + drawers_already_present,
"drawers_skipped": drawers_skipped,
"cursor_by_session": cursors,
}
def sweep_directory(dir_path: str, palace_path: str) -> dict:
"""Sweep every .jsonl file in a directory (recursive).
Returns aggregated summary across all files. ``files_attempted``
includes files that raised, so the count reflects discovery rather
than only successes; ``files_succeeded`` is the subset that
completed without error.
"""
dir_p = Path(dir_path).expanduser().resolve()
files = sorted(dir_p.rglob("*.jsonl"))
total_added = 0
total_already_present = 0
total_skipped = 0
per_file = []
failures: list[dict] = []
for f in files:
try:
result = sweep(str(f), palace_path, source_label=str(f))
except Exception as exc:
logger.error("sweeper: sweep failed on %s: %s", f, exc)
print(f" \u26a0 sweep failed on {f}: {exc}", file=sys.stderr)
failures.append({"file": str(f), "error": str(exc)})
continue
total_added += result["drawers_added"]
total_already_present += result.get("drawers_already_present", 0)
total_skipped += result["drawers_skipped"]
per_file.append(
{
"file": str(f),
"added": result["drawers_added"],
"already_present": result.get("drawers_already_present", 0),
"skipped": result["drawers_skipped"],
}
)
return {
"files_attempted": len(files),
"files_succeeded": len(per_file),
"drawers_added": total_added,
"drawers_already_present": total_already_present,
"drawers_skipped": total_skipped,
"per_file": per_file,
"failures": failures,
}
+31
View File
@@ -0,0 +1,31 @@
"""TDD: convo_miner.py must not silently drop transcripts larger than 10 MB.
Mirrors the miner.py fix shipped in the same PR family (see
test_miner_jsonl_visibility.py). Long Claude Code sessions, ChatGPT
exports, and multi-year Slack dumps routinely exceed 10 MB. The cap
silently `continue`s past them at convo_miner.py:~289, same silent-drop
pattern as the project miner's.
Written BEFORE the fix.
"""
from mempalace.convo_miner import MAX_FILE_SIZE
class TestConvoMinerSizeCap:
def test_max_file_size_accommodates_long_transcripts(self):
"""The cap must be well above any realistic transcript.
Long sessions and lifetime exports exceed 10 MB. The cap exists
as a sanity rail against pathological binaries, not as a limit
on legitimate text — downstream chunking means source size does
not matter for storage or embedding cost.
"""
assert MAX_FILE_SIZE >= 100 * 1024 * 1024, (
f"convo_miner.MAX_FILE_SIZE is {MAX_FILE_SIZE} bytes "
f"({MAX_FILE_SIZE / 1024 / 1024:.0f} MB). Same silent-drop "
"bug as miner.py's old 10 MB cap — long transcripts get "
"filtered out at convo_miner.py:~289 with `continue`. "
"Raise to at least 100 MB (match miner.py at 500 MB for "
"consistency across both miners)."
)
+126
View File
@@ -0,0 +1,126 @@
"""TDD: miner.py must not silently drop .jsonl files.
The project miner (mempalace.miner.scan_project) walks a directory and
keeps only files whose suffix is in READABLE_EXTENSIONS. The whitelist
contains `.json` but NOT `.jsonl`. Every ChatGPT export, Claude Code
transcript, or any other jsonl transcript dumped into a project
directory is silently dropped with no user-visible output.
Two paths to fix this, both tested here:
1. READABLE_EXTENSIONS must include `.jsonl` so the file is at least
readable as text (jsonl is line-delimited JSON — each line is
already valid text for embedding).
2. OR scan_project must surface skipped .jsonl files to the user so
they know to use `--mode convos`.
We test (1) — include .jsonl in READABLE_EXTENSIONS. This matches how
`.json` is already handled: the miner doesn't care what the structure
is, it chunks the text.
Written BEFORE the fix.
"""
import tempfile
from pathlib import Path
from unittest.mock import patch
from mempalace.miner import MAX_FILE_SIZE, READABLE_EXTENSIONS, scan_project
class TestJsonlNotSilentlySkipped:
def test_jsonl_in_readable_extensions(self):
"""`.jsonl` must be in the readable-extensions whitelist.
`.json` is already there (see mempalace/miner.py:30). `.jsonl`
is conceptually the same thing — line-delimited JSON — and all
of Claude Code's transcripts, ChatGPT exports, and similar
tooling writes `.jsonl`. Excluding it silently drops user data.
"""
assert ".jsonl" in READABLE_EXTENSIONS, (
"mempalace/miner.py:READABLE_EXTENSIONS contains `.json` "
"but NOT `.jsonl`. Every jsonl file in a mined project is "
"silently skipped at miner.py:722 "
"(`if filepath.suffix.lower() not in READABLE_EXTENSIONS: "
"continue`). This causes the 'convos not being saved' bug "
"reported by users — the hook fires `mempalace mine`, the "
"miner walks the directory, skips every .jsonl file, exits "
"cleanly. No warning, no log line, user sees nothing wrong. "
"Add `.jsonl` to READABLE_EXTENSIONS."
)
def test_scan_project_picks_up_jsonl_file(self):
"""scan_project should find .jsonl files in the target dir."""
with tempfile.TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
jsonl_path = tmpdir / "transcript.jsonl"
jsonl_path.write_text(
'{"role": "user", "content": "hello"}\n'
'{"role": "assistant", "content": "hi there"}\n'
'{"role": "user", "content": "how do I install this"}\n'
'{"role": "assistant", "content": "pip install mempalace"}\n'
)
found = scan_project(str(tmpdir))
found_names = [p.name for p in found]
assert "transcript.jsonl" in found_names, (
"scan_project silently dropped transcript.jsonl. "
f"Returned: {found_names}. Users placing transcript "
"exports in a project directory expect them to be mined."
)
def test_large_jsonl_not_silently_dropped_by_size_cap(self):
"""Long sessions produce >10 MB transcripts. They must still mine.
The legacy cap was 10 MB, which is smaller than a long Claude Code
session's transcript. Users hitting the cap lost their entire
conversation to a silent `if size > MAX: continue` at miner.py:732.
Raise the cap well above any realistic transcript size.
"""
# 10 MB cap was silent failure — real Claude Code long sessions
# exceed this. The cap must accommodate them.
assert MAX_FILE_SIZE >= 100 * 1024 * 1024, (
f"MAX_FILE_SIZE is {MAX_FILE_SIZE} bytes "
f"({MAX_FILE_SIZE / 1024 / 1024:.0f} MB). Long Claude Code "
"sessions produce transcripts larger than 10 MB and get "
"silently dropped. Raise to at least 100 MB — chunking "
"at 800 chars per drawer means source file size doesn't "
"matter for downstream storage."
)
def test_scan_project_picks_up_50mb_jsonl(self):
"""A 50 MB .jsonl must not be filtered out by the size cap.
We don't actually write 50 MB (slow test). Instead, we mock
stat().st_size to report a 50 MB file and confirm scan_project
still includes it.
"""
with tempfile.TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
big_jsonl = tmpdir / "big_transcript.jsonl"
# Write a small real file so the existence / extension / text
# checks pass; then mock its reported size.
big_jsonl.write_text('{"role": "user", "content": "hi"}\n')
fake_size = 50 * 1024 * 1024 # 50 MB
real_stat = Path.stat
def fake_stat(self, *args, **kwargs):
result = real_stat(self, *args, **kwargs)
if self.name == "big_transcript.jsonl":
class _FakeStat:
st_size = fake_size
st_mode = result.st_mode
return _FakeStat()
return result
with patch.object(Path, "stat", fake_stat):
found = scan_project(str(tmpdir))
found_names = [p.name for p in found]
assert "big_transcript.jsonl" in found_names, (
f"50 MB .jsonl was dropped by size cap (MAX_FILE_SIZE="
f"{MAX_FILE_SIZE}). Returned: {found_names}."
)
+318
View File
@@ -0,0 +1,318 @@
"""TDD: tandem sweeper that catches what the primary miner missed.
The primary miner (miner.py / convo_miner.py) runs at file granularity
and can drop data (size caps, silent OSError, dedup false-positives).
The sweeper is a second miner that works at MESSAGE granularity,
using timestamp as the coordination cursor.
For each session in the transcript directory:
1. Look up max(timestamp) across all drawers with matching session_id
2. Stream the jsonl, yielding only user/assistant messages after the cursor
3. Write one small drawer per message with:
session_id, uuid, timestamp, role, content
4. Idempotent: re-running sweeps should find nothing new on a complete palace.
This test file is TDD — written BEFORE mempalace/sweeper.py exists.
"""
import json
import pytest
@pytest.fixture
def mock_claude_jsonl(tmp_path):
"""Real Claude Code jsonl shape: user/assistant records among progress noise."""
path = tmp_path / "session_abc.jsonl"
lines = [
# Noise: progress event, no message
{
"type": "progress",
"timestamp": "2026-04-18T10:00:00Z",
"sessionId": "abc",
"uuid": "p-1",
},
# User message
{
"type": "user",
"timestamp": "2026-04-18T10:00:05Z",
"sessionId": "abc",
"uuid": "u-1",
"message": {"role": "user", "content": "What's the capital of France?"},
},
# Assistant reply
{
"type": "assistant",
"timestamp": "2026-04-18T10:00:06Z",
"sessionId": "abc",
"uuid": "a-1",
"message": {"role": "assistant", "content": [{"type": "text", "text": "Paris."}]},
},
# Noise: file-history-snapshot
{"type": "file-history-snapshot", "messageId": "abc-snap"},
# Second user/assistant exchange
{
"type": "user",
"timestamp": "2026-04-18T10:01:00Z",
"sessionId": "abc",
"uuid": "u-2",
"message": {"role": "user", "content": "And of Germany?"},
},
{
"type": "assistant",
"timestamp": "2026-04-18T10:01:01Z",
"sessionId": "abc",
"uuid": "a-2",
"message": {"role": "assistant", "content": [{"type": "text", "text": "Berlin."}]},
},
]
path.write_text("\n".join(json.dumps(x) for x in lines) + "\n")
return path
class TestSweeperParsing:
def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl):
from mempalace.sweeper import parse_claude_jsonl
records = list(parse_claude_jsonl(str(mock_claude_jsonl)))
roles = [r["role"] for r in records]
assert roles == ["user", "assistant", "user", "assistant"], (
f"Expected 4 user/assistant in order, got {roles}. "
"Noise records (progress, file-history-snapshot) must be "
"filtered out."
)
def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl):
from mempalace.sweeper import parse_claude_jsonl
records = list(parse_claude_jsonl(str(mock_claude_jsonl)))
first = records[0]
assert first["session_id"] == "abc"
assert first["timestamp"] == "2026-04-18T10:00:05Z"
assert first["uuid"] == "u-1"
def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl):
from mempalace.sweeper import parse_claude_jsonl
records = list(parse_claude_jsonl(str(mock_claude_jsonl)))
assistant_rec = records[1]
assert assistant_rec["role"] == "assistant"
assert (
"Paris" in assistant_rec["content"]
), f"Assistant content blocks must be flattened to text; got: {assistant_rec['content']!r}"
def test_parse_preserves_tool_blocks_verbatim(self, tmp_path):
"""Per the design principle "verbatim always", tool_use and
tool_result blocks must NOT be truncated. A long tool input
(e.g. a large diff handed to a code-edit tool) must round-trip
in full, otherwise we silently lose user-adjacent data.
"""
import json as _json
from mempalace.sweeper import parse_claude_jsonl
big_input = {"diff": "x" * 5000} # well past the old 500-char cap
path = tmp_path / "session_tools.jsonl"
path.write_text(
_json.dumps(
{
"type": "assistant",
"timestamp": "2026-04-18T10:00:00Z",
"sessionId": "tools-1",
"uuid": "a-tool",
"message": {
"role": "assistant",
"content": [
{"type": "tool_use", "name": "Edit", "input": big_input},
],
},
}
)
+ "\n"
)
records = list(parse_claude_jsonl(str(path)))
assert len(records) == 1
content = records[0]["content"]
# The full 5000-char value must be present — no truncation marker,
# no [:500] slice. Look for the raw string in the serialized form.
assert big_input["diff"] in content, (
"tool_use input was truncated. The verbatim guarantee requires "
f"the full payload to round-trip. Got len={len(content)}."
)
class TestSweeperTandem:
"""The sweeper coordinates with other miners via max(timestamp)."""
def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path):
from mempalace.sweeper import sweep
palace_path = str(tmp_path / "palace")
result = sweep(str(mock_claude_jsonl), palace_path)
assert result["drawers_added"] == 4, (
f"Empty palace: all 4 user/assistant messages should ingest. "
f"Got drawers_added={result['drawers_added']}."
)
def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path):
"""Running the sweep twice must not duplicate drawers."""
from mempalace.sweeper import sweep
palace_path = str(tmp_path / "palace")
first = sweep(str(mock_claude_jsonl), palace_path)
second = sweep(str(mock_claude_jsonl), palace_path)
assert first["drawers_added"] == 4
assert second["drawers_added"] == 0, (
f"Second sweep must be a no-op on unchanged data. "
f"Got drawers_added={second['drawers_added']}"
"cursor logic is broken."
)
def test_sweep_resumes_from_cursor(self, tmp_path):
"""If half the messages are already in the palace, sweep picks up
only the later half."""
from mempalace.sweeper import sweep
jsonl_path = tmp_path / "session.jsonl"
lines = [
{
"type": "user",
"timestamp": "2026-04-18T09:00:00Z",
"sessionId": "s1",
"uuid": "u1",
"message": {"role": "user", "content": "first"},
},
{
"type": "assistant",
"timestamp": "2026-04-18T09:00:01Z",
"sessionId": "s1",
"uuid": "a1",
"message": {"role": "assistant", "content": [{"type": "text", "text": "one"}]},
},
]
jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n")
palace_path = str(tmp_path / "palace")
first = sweep(str(jsonl_path), palace_path)
assert first["drawers_added"] == 2
# Append two more exchanges simulating live session growth.
more_lines = [
{
"type": "user",
"timestamp": "2026-04-18T09:05:00Z",
"sessionId": "s1",
"uuid": "u2",
"message": {"role": "user", "content": "second"},
},
{
"type": "assistant",
"timestamp": "2026-04-18T09:05:01Z",
"sessionId": "s1",
"uuid": "a2",
"message": {"role": "assistant", "content": [{"type": "text", "text": "two"}]},
},
]
with open(jsonl_path, "a") as f:
for x in more_lines:
f.write(json.dumps(x) + "\n")
second = sweep(str(jsonl_path), palace_path)
assert second["drawers_added"] == 2, (
f"Second sweep should pick up only the 2 new exchanges, "
f"got {second['drawers_added']}. Cursor (max-timestamp) "
"coordination is broken."
)
def test_sweep_recovers_untaken_message_at_cursor_timestamp(self, tmp_path):
"""Regression for Copilot PR #998 review: with a `<= cursor` skip,
any message sharing the max timestamp but not yet ingested (e.g.
crash mid-batch) would be lost forever. The skip must be `<` and
tie-break via deterministic drawer ID.
Scenario: three messages share timestamp T. First sweep ingests
two of them and the process dies before the third. Second sweep
must pick up the third — not skip it because cursor == T.
"""
from mempalace.palace import get_collection
from mempalace.sweeper import (
_drawer_id_for_message,
parse_claude_jsonl,
sweep,
)
shared_ts = "2026-04-18T11:00:00Z"
lines = [
{
"type": "user",
"timestamp": shared_ts,
"sessionId": "s-tie",
"uuid": f"u-{i}",
"message": {"role": "user", "content": f"msg {i}"},
}
for i in range(3)
]
jsonl_path = tmp_path / "tied.jsonl"
jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n")
palace_path = str(tmp_path / "palace")
# Simulate a partial ingest: write 2 of 3 directly via the backend
# with the same drawer IDs the sweeper would use.
col = get_collection(palace_path, create=True)
recs = list(parse_claude_jsonl(str(jsonl_path)))
partial_ids = [_drawer_id_for_message(r["session_id"], r["uuid"]) for r in recs[:2]]
col.upsert(
ids=partial_ids,
documents=[f"USER: {r['content']}" for r in recs[:2]],
metadatas=[
{
"session_id": r["session_id"],
"timestamp": r["timestamp"],
"message_uuid": r["uuid"],
"role": r["role"],
"ingest_mode": "sweep",
}
for r in recs[:2]
],
)
# Now run the sweeper. It must pick up the 3rd message, not skip
# it because cursor == its timestamp.
result = sweep(str(jsonl_path), palace_path)
assert result["drawers_added"] == 1, (
f"Sweeper lost the untaken message at cursor timestamp. "
f"Expected drawers_added=1 (the 3rd record), got "
f"{result['drawers_added']}. Cursor skip is still `<=` "
"instead of `<`, or tie-break via drawer-id is broken."
)
assert result["drawers_already_present"] == 2, (
f"Expected 2 drawers already present (the partial ingest), "
f"got {result['drawers_already_present']}."
)
class TestSweeperDrawerMetadata:
"""Each drawer must carry the metadata the tandem-miner coordination
depends on: session_id, timestamp, uuid, role."""
def test_drawer_has_session_id_and_timestamp_metadata(self, mock_claude_jsonl, tmp_path):
from mempalace.sweeper import sweep
from mempalace.palace import get_collection
palace_path = str(tmp_path / "palace")
sweep(str(mock_claude_jsonl), palace_path)
col = get_collection(palace_path, create=False)
data = col.get(include=["metadatas"])
metas = data["metadatas"]
assert metas, "No drawers written"
for m in metas:
assert m.get("session_id") == "abc", f"Drawer missing session_id metadata: {m}"
assert m.get("timestamp"), f"Drawer missing timestamp metadata: {m}"
assert m.get("message_uuid"), f"Drawer missing message_uuid metadata: {m}"
assert m.get("role") in (
"user",
"assistant",
), f"Drawer missing or wrong role metadata: {m}"