diff --git a/mempalace/cli.py b/mempalace/cli.py index bec6d2b..fc69f24 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -147,10 +147,14 @@ def cmd_mine(args): def cmd_sweep(args): - """Sweep a transcript file or directory for anything the primary - miner missed. Coordinates via max(timestamp) per session_id, so - this is safe to run alongside the file-level miners — neither - duplicates the other's work. + """Sweep a transcript file or directory. + + The sweeper deduplicates against its own prior writes via + deterministic drawer IDs + a timestamp cursor. It does NOT currently + coordinate with the file-level miners (miner.py / convo_miner.py) — + those produce char-chunked drawers without compatible message + metadata, so running both miners may store overlapping content under + different IDs. """ from .sweeper import sweep, sweep_directory @@ -160,15 +164,17 @@ def cmd_sweep(args): if os.path.isfile(target): result = sweep(target, palace_path) print( - f" Swept {target}: +{result['drawers_added']} drawers, " - f"{result['drawers_skipped']} already present." + f" Swept {target}: +{result['drawers_added']} new, " + f"{result['drawers_already_present']} already present, " + f"{result['drawers_skipped']} skipped (< cursor)." ) elif os.path.isdir(target): result = sweep_directory(target, palace_path) print( - f" Swept {result['files_processed']} files from {target}: " - f"+{result['drawers_added']} drawers, " - f"{result['drawers_skipped']} already present." + f" Swept {result['files_succeeded']}/{result['files_attempted']} " + f"files from {target}: +{result['drawers_added']} new, " + f"{result['drawers_already_present']} already present, " + f"{result['drawers_skipped']} skipped (< cursor)." ) failures = result.get("failures") or [] if failures: diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index 3e7b5a4..02c1797 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -58,8 +58,11 @@ CHUNK_SIZE = 800 # chars per drawer — align with miner.py MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. # Matches miner.py at 500 MB. Long Claude Code sessions, multi-year # ChatGPT exports, and lifetime Slack dumps routinely exceed 10 MB; the -# cap at that level silently dropped them with `continue`. Source size -# does not affect storage or embedding cost — chunking happens downstream. +# cap at that level silently dropped them with `continue`. Per-drawer +# size is bounded by CHUNK_SIZE, but larger source files still produce +# more drawers and therefore more embedding/storage work — and content +# is normalized and loaded fully into memory before chunking, so memory +# use also scales with source size. def _register_file(collection, source_file: str, wing: str, agent: str): diff --git a/mempalace/miner.py b/mempalace/miner.py index 4e809a8..18e748c 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -66,8 +66,11 @@ MIN_CHUNK_SIZE = 50 # skip tiny chunks MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. # Long Claude Code sessions and large transcript exports routinely exceed # 10 MB. The cap exists as a defensive rail against pathological binary -# files, not as a limit on legitimate text. Chunking at 800 chars per -# drawer means source size does not affect storage or embedding cost. +# files, not as a limit on legitimate text. Per-drawer size is bounded +# by CHUNK_SIZE, but larger sources still produce proportionally more +# drawers and therefore more storage, embedding, and processing work — +# and file reads are not streamed (the whole content is loaded into +# memory before chunking), so memory use scales with source size too. # ============================================================================= diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py index c4d9092..ce87153 100644 --- a/mempalace/sweeper.py +++ b/mempalace/sweeper.py @@ -1,26 +1,39 @@ #!/usr/bin/env python3 """ -sweeper.py — Tandem miner that guarantees no conversation is silently -dropped. +sweeper.py — Message-granular miner that catches what the file-level +primary miners dropped. -Works alongside miner.py / convo_miner.py via timestamp coordination: +Algorithm, per session: - For each session in the transcript dir: - cursor = max(timestamp of drawers with matching session_id, "") - For each user/assistant message in the jsonl with timestamp > cursor: - write one small drawer (message_uuid as deterministic ID) + cursor = max(timestamp of sweeper-written drawers for this session_id) + For each user/assistant message in the jsonl: + if cursor is not None and message.timestamp < cursor: skip + else: upsert a drawer keyed by (session_id, message_uuid) Properties: - - Idempotent: rerunning on a fully-mined palace is a no-op. - - Resume-safe: crash mid-sweep → next run picks up from max-timestamp. - - Coordinates with primary miners for free: whichever got further - advances the cursor; the other starts from there next time. + + - Idempotent on its own writes: rerunning is a no-op because drawer + IDs are deterministic and existence is pre-checked before counting. + - Resume-safe: a crash mid-sweep is recovered on the next run — the + cursor advances to the last ingested timestamp and re-attempts at + that boundary are de-duped by the deterministic ID. + - Tie-break safe: uses ``< cursor`` (not ``<=``), so if multiple + messages share the max timestamp and only some were ingested, the + rest are still picked up on re-run. - No size caps: each drawer holds one exchange, ~1-5 KB. +Coordination with the primary file-level miners (``miner.py`` / +``convo_miner.py``) is limited: those miners chunk at a fixed char size +and do not currently stamp ``session_id``/``timestamp`` metadata that +the sweeper can key off. In practice the sweeper coordinates with its +own prior runs, and may ingest content that also got chunked into +primary-miner drawers (under different IDs). Follow-up: add uniform +``ingest_mode`` + message metadata to the primary miners so dedup spans +both paths. + Usage: from mempalace.sweeper import sweep result = sweep("/path/to/session.jsonl", "/path/to/palace") - # result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts} """ from __future__ import annotations @@ -181,33 +194,67 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) """Ingest every user/assistant message not already represented. For each message in the jsonl: - - If timestamp <= cursor for that session, skip (already saved by - us or by primary miner). + - If timestamp < cursor for that session, skip (strictly earlier + than anything already in the palace — already covered). + - At timestamp == cursor we do NOT skip, because multiple messages + can share the same ISO-8601 timestamp; if only some of them were + ingested before a crash, a `<= cursor` skip would lose the rest + forever. Deterministic drawer IDs make re-attempting at the + cursor boundary safe (existing rows are found via a pre-flight + `get(ids=...)` and counted as "already present", not "added"). - Else, upsert a drawer with deterministic ID so reruns dedupe. - Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}. + Returns ``{drawers_added, drawers_already_present, drawers_skipped, + drawers_upserted, cursor_by_session}``: + + * ``drawers_added`` — rows that did not exist before this sweep. + * ``drawers_already_present`` — rows whose deterministic ID was + already in the palace and got rewritten idempotently. + * ``drawers_skipped`` — records skipped by the cursor (strictly + earlier than what's already stored). + * ``drawers_upserted`` — total writes = added + already_present. """ collection = get_collection(palace_path, create=True) cursors: dict = {} drawers_added = 0 + drawers_already_present = 0 drawers_skipped = 0 - batch_ids = [] - batch_docs = [] - batch_metas = [] + batch_ids: list[str] = [] + batch_docs: list[str] = [] + batch_metas: list[dict] = [] BATCH_SIZE = 64 def _flush(): - nonlocal drawers_added + nonlocal drawers_added, drawers_already_present if not batch_ids: return + # Pre-flight: which IDs in this batch are already present? + # Upsert is idempotent on data but counts as "added" would lie; + # this pre-query makes the metric honest (Copilot PR 998 review). + try: + existing = collection.get(ids=list(batch_ids), include=[]) + # Chroma returns a dict; typed backends return GetResult — the + # compat shim makes ``.get("ids")`` work on both. + present = set(existing.get("ids") or []) + except Exception as exc: + logger.warning( + "sweeper: existence pre-check failed (%s); " + "counting all batch rows as new (metric may over-count on reruns).", + exc, + ) + present = set() + new_count = sum(1 for rid in batch_ids if rid not in present) + already_count = len(batch_ids) - new_count + collection.upsert( ids=batch_ids, documents=batch_docs, metadatas=batch_metas, ) - drawers_added += len(batch_ids) + drawers_added += new_count + drawers_already_present += already_count batch_ids.clear() batch_docs.clear() batch_metas.clear() @@ -218,7 +265,7 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) cursors[sid] = get_palace_cursor(collection, sid) cursor = cursors[sid] - if cursor is not None and rec["timestamp"] <= cursor: + if cursor is not None and rec["timestamp"] < cursor: drawers_skipped += 1 continue @@ -245,6 +292,8 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) return { "drawers_added": drawers_added, + "drawers_already_present": drawers_already_present, + "drawers_upserted": drawers_added + drawers_already_present, "drawers_skipped": drawers_skipped, "cursor_by_session": cursors, } @@ -253,12 +302,16 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) def sweep_directory(dir_path: str, palace_path: str) -> dict: """Sweep every .jsonl file in a directory (recursive). - Returns aggregated summary across all files. + Returns aggregated summary across all files. ``files_attempted`` + includes files that raised, so the count reflects discovery rather + than only successes; ``files_succeeded`` is the subset that + completed without error. """ dir_p = Path(dir_path).expanduser().resolve() files = sorted(dir_p.rglob("*.jsonl")) total_added = 0 + total_already_present = 0 total_skipped = 0 per_file = [] @@ -272,18 +325,22 @@ def sweep_directory(dir_path: str, palace_path: str) -> dict: failures.append({"file": str(f), "error": str(exc)}) continue total_added += result["drawers_added"] + total_already_present += result.get("drawers_already_present", 0) total_skipped += result["drawers_skipped"] per_file.append( { "file": str(f), "added": result["drawers_added"], + "already_present": result.get("drawers_already_present", 0), "skipped": result["drawers_skipped"], } ) return { - "files_processed": len(per_file), + "files_attempted": len(files), + "files_succeeded": len(per_file), "drawers_added": total_added, + "drawers_already_present": total_already_present, "drawers_skipped": total_skipped, "per_file": per_file, "failures": failures, diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py index 4ac8325..983724a 100644 --- a/tests/test_sweeper.py +++ b/tests/test_sweeper.py @@ -225,6 +225,72 @@ class TestSweeperTandem: "coordination is broken." ) + def test_sweep_recovers_untaken_message_at_cursor_timestamp(self, tmp_path): + """Regression for Copilot PR #998 review: with a `<= cursor` skip, + any message sharing the max timestamp but not yet ingested (e.g. + crash mid-batch) would be lost forever. The skip must be `<` and + tie-break via deterministic drawer ID. + + Scenario: three messages share timestamp T. First sweep ingests + two of them and the process dies before the third. Second sweep + must pick up the third — not skip it because cursor == T. + """ + from mempalace.palace import get_collection + from mempalace.sweeper import ( + _drawer_id_for_message, + parse_claude_jsonl, + sweep, + ) + + shared_ts = "2026-04-18T11:00:00Z" + lines = [ + { + "type": "user", + "timestamp": shared_ts, + "sessionId": "s-tie", + "uuid": f"u-{i}", + "message": {"role": "user", "content": f"msg {i}"}, + } + for i in range(3) + ] + jsonl_path = tmp_path / "tied.jsonl" + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + # Simulate a partial ingest: write 2 of 3 directly via the backend + # with the same drawer IDs the sweeper would use. + col = get_collection(palace_path, create=True) + recs = list(parse_claude_jsonl(str(jsonl_path))) + partial_ids = [_drawer_id_for_message(r["session_id"], r["uuid"]) for r in recs[:2]] + col.upsert( + ids=partial_ids, + documents=[f"USER: {r['content']}" for r in recs[:2]], + metadatas=[ + { + "session_id": r["session_id"], + "timestamp": r["timestamp"], + "message_uuid": r["uuid"], + "role": r["role"], + "ingest_mode": "sweep", + } + for r in recs[:2] + ], + ) + + # Now run the sweeper. It must pick up the 3rd message, not skip + # it because cursor == its timestamp. + result = sweep(str(jsonl_path), palace_path) + assert result["drawers_added"] == 1, ( + f"Sweeper lost the untaken message at cursor timestamp. " + f"Expected drawers_added=1 (the 3rd record), got " + f"{result['drawers_added']}. Cursor skip is still `<=` " + "instead of `<`, or tie-break via drawer-id is broken." + ) + assert result["drawers_already_present"] == 2, ( + f"Expected 2 drawers already present (the partial ingest), " + f"got {result['drawers_already_present']}." + ) + class TestSweeperDrawerMetadata: """Each drawer must carry the metadata the tandem-miner coordination