Merge pull request #785 from MemPalace/pr/strip-noise-from-transcripts
fix: strip system tags, hook output, and Claude UI chrome from drawers
This commit is contained in:
@@ -16,7 +16,13 @@ from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
from .normalize import normalize
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
|
||||
from .palace import (
|
||||
NORMALIZE_VERSION,
|
||||
SKIP_DIRS,
|
||||
file_already_mined,
|
||||
get_collection,
|
||||
mine_lock,
|
||||
)
|
||||
|
||||
|
||||
# File types that might contain conversations
|
||||
@@ -51,6 +57,7 @@ def _register_file(collection, source_file: str, wing: str, agent: str):
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "registry",
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
@@ -273,7 +280,11 @@ def scan_convos(convo_dir: str) -> list:
|
||||
|
||||
|
||||
def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode):
|
||||
"""Acquire the per-file lock, double-check mined status, and upsert chunks.
|
||||
"""Lock the source file, purge stale drawers, and upsert fresh chunks.
|
||||
|
||||
Combines the per-file serialization that prevents concurrent agents from
|
||||
duplicating work (via mine_lock) with the normalize-version rebuild
|
||||
contract (purge-before-insert so pre-v2 drawers don't survive).
|
||||
|
||||
Returns (drawers_added, room_counts_delta, skipped).
|
||||
"""
|
||||
@@ -281,9 +292,19 @@ def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extr
|
||||
drawers_added = 0
|
||||
with mine_lock(source_file):
|
||||
# Re-check after lock — another agent may have just finished this file
|
||||
# at the current schema. A stale-version hit here returns False, so we
|
||||
# still fall through to the purge+rebuild path below.
|
||||
if file_already_mined(collection, source_file):
|
||||
return 0, room_counts_delta, True
|
||||
|
||||
# Purge stale drawers first. When the normalize schema bumps,
|
||||
# file_already_mined() returned False for pre-v2 drawers — clean
|
||||
# them out so the source doesn't end up with mixed old/new drawers.
|
||||
try:
|
||||
collection.delete(where={"source_file": source_file})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for chunk in chunks:
|
||||
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
||||
if extract_mode == "general":
|
||||
@@ -303,6 +324,7 @@ def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extr
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "convos",
|
||||
"extract_mode": extract_mode,
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
@@ -416,7 +438,8 @@ def mine_convos(
|
||||
if extract_mode != "general":
|
||||
room_counts[room] += 1
|
||||
|
||||
# File each chunk — lock to prevent concurrent agents duplicating
|
||||
# Lock + purge stale + file fresh chunks. Lock serializes concurrent
|
||||
# agents; purge removes pre-v2 drawers so the schema bump applies.
|
||||
drawers_added, room_delta, skipped = _file_chunks_locked(
|
||||
collection, source_file, chunks, wing, room, agent, extract_mode
|
||||
)
|
||||
|
||||
+8
-1
@@ -15,7 +15,13 @@ from pathlib import Path
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
|
||||
from .palace import (
|
||||
NORMALIZE_VERSION,
|
||||
SKIP_DIRS,
|
||||
file_already_mined,
|
||||
get_collection,
|
||||
mine_lock,
|
||||
)
|
||||
|
||||
READABLE_EXTENSIONS = {
|
||||
".txt",
|
||||
@@ -381,6 +387,7 @@ def add_drawer(
|
||||
"chunk_index": chunk_index,
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
# Store file mtime so we can detect modifications later.
|
||||
try:
|
||||
|
||||
+93
-2
@@ -16,10 +16,93 @@ No API key. No internet. Everything local.
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ─── Noise stripping ─────────────────────────────────────────────────────
|
||||
# Claude Code and other tools inject system tags, hook output, and UI chrome
|
||||
# into transcripts. These waste drawer space and pollute search results.
|
||||
#
|
||||
# Verbatim is sacred — every pattern here is anchored to line boundaries and
|
||||
# refuses to cross blank lines, so a stray unclosed tag in one message can
|
||||
# never eat content from neighboring messages. When in doubt, leave text
|
||||
# alone.
|
||||
|
||||
_NOISE_TAGS = (
|
||||
"system-reminder",
|
||||
"command-message",
|
||||
"command-name",
|
||||
"task-notification",
|
||||
"user-prompt-submit-hook",
|
||||
"hook_output",
|
||||
)
|
||||
|
||||
|
||||
def _tag_pattern(name: str) -> "re.Pattern[str]":
|
||||
# Opening tag must begin a line (optionally after a `> ` blockquote marker,
|
||||
# since _messages_to_transcript prefixes lines with `> `). Body is lazy but
|
||||
# forbidden from crossing a blank line, so a dangling open tag can't span
|
||||
# multiple messages. Closing tag eats optional trailing whitespace + newline.
|
||||
return re.compile(
|
||||
rf"(?m)^(?:> )?<{name}(?:\s[^>]*)?>" rf"(?:(?!\n\s*\n)[\s\S])*?" rf"</{name}>[ \t]*\n?"
|
||||
)
|
||||
|
||||
|
||||
_NOISE_TAG_PATTERNS = [_tag_pattern(t) for t in _NOISE_TAGS]
|
||||
|
||||
# Strings that identify an entire noise line when found at its start.
|
||||
# Matched case-sensitively and anchored to line-start so user prose mentioning
|
||||
# e.g. "current time:" in a sentence is untouched.
|
||||
_NOISE_LINE_PREFIXES = (
|
||||
"CURRENT TIME:",
|
||||
"VERIFIED FACTS (do not contradict)",
|
||||
"AGENT SPECIALIZATION:",
|
||||
"Checking verified facts...",
|
||||
"Injecting timestamp...",
|
||||
"Starting background pipeline...",
|
||||
"Checking emotional weights...",
|
||||
"Auto-save reminder...",
|
||||
"Checking pipeline...",
|
||||
"MemPalace auto-save checkpoint.",
|
||||
)
|
||||
|
||||
_NOISE_LINE_PATTERNS = [
|
||||
re.compile(rf"(?m)^(?:> )?{re.escape(p)}.*\n?") for p in _NOISE_LINE_PREFIXES
|
||||
]
|
||||
|
||||
# Claude Code TUI hook-run chrome, e.g. "Ran 2 Stop hook", "Ran 1 PreCompact hook".
|
||||
# Line-anchored, case-sensitive, explicit hook names — prose like
|
||||
# "our CI has a stop hook" stays intact.
|
||||
_HOOK_LINE_RE = re.compile(
|
||||
r"(?m)^(?:> )?Ran \d+ (?:Stop|PreCompact|PreToolUse|PostToolUse|UserPromptSubmit|Notification|SessionStart|SessionEnd) hook[s]?.*\n?"
|
||||
)
|
||||
|
||||
# "… +N lines" collapsed-output marker, line-anchored.
|
||||
_COLLAPSED_LINES_RE = re.compile(r"(?m)^(?:> )?…\s*\+\d+ lines.*\n?")
|
||||
|
||||
|
||||
def strip_noise(text: str) -> str:
|
||||
"""Remove system tags, hook output, and Claude Code UI chrome from text.
|
||||
|
||||
All patterns are line-anchored. User prose that happens to mention these
|
||||
strings inline (e.g., documenting them) is preserved verbatim.
|
||||
"""
|
||||
for pat in _NOISE_TAG_PATTERNS:
|
||||
text = pat.sub("", text)
|
||||
for pat in _NOISE_LINE_PATTERNS:
|
||||
text = pat.sub("", text)
|
||||
text = _HOOK_LINE_RE.sub("", text)
|
||||
text = _COLLAPSED_LINES_RE.sub("", text)
|
||||
# Strip the Claude Code collapsed-output chrome "[N tokens] (ctrl+o to expand)".
|
||||
# Narrow shape — a bare "(ctrl+o to expand)" in user prose stays intact.
|
||||
text = re.sub(r"\s*\[\d+\s+tokens?\]\s*\(ctrl\+o to expand\)", "", text)
|
||||
# Collapse runs of blank lines created by the removals
|
||||
text = re.sub(r"\n{4,}", "\n\n\n", text)
|
||||
return text.strip()
|
||||
|
||||
|
||||
def normalize(filepath: str) -> str:
|
||||
"""
|
||||
Load a file and normalize to transcript format if it's a chat export.
|
||||
@@ -40,12 +123,14 @@ def normalize(filepath: str) -> str:
|
||||
if not content.strip():
|
||||
return content
|
||||
|
||||
# Already has > markers — pass through
|
||||
# Already has > markers — pass through unchanged.
|
||||
lines = content.split("\n")
|
||||
if sum(1 for line in lines if line.strip().startswith(">")) >= 3:
|
||||
return content
|
||||
|
||||
# Try JSON normalization
|
||||
# Try JSON normalization. strip_noise is applied inside the Claude Code
|
||||
# JSONL parser (the only format that injects system tags/hook chrome);
|
||||
# other formats pass through verbatim.
|
||||
ext = Path(filepath).suffix.lower()
|
||||
if ext in (".json", ".jsonl") or content.strip()[:1] in ("{", "["):
|
||||
normalized = _try_normalize_json(content)
|
||||
@@ -112,6 +197,10 @@ def _try_claude_code_jsonl(content: str) -> Optional[str]:
|
||||
isinstance(b, dict) and b.get("type") == "tool_result" for b in msg_content
|
||||
)
|
||||
text = _extract_content(msg_content, tool_use_map=tool_use_map)
|
||||
# Strip Claude Code system-injected noise per message, never across
|
||||
# message boundaries — prevents span-eating.
|
||||
if text:
|
||||
text = strip_noise(text)
|
||||
if text:
|
||||
if is_tool_only and messages and messages[-1][0] == "assistant":
|
||||
# Append tool results to the previous assistant message
|
||||
@@ -121,6 +210,8 @@ def _try_claude_code_jsonl(content: str) -> Optional[str]:
|
||||
messages.append(("user", text))
|
||||
elif msg_type == "assistant":
|
||||
text = _extract_content(msg_content, tool_use_map=tool_use_map)
|
||||
if text:
|
||||
text = strip_noise(text)
|
||||
if text:
|
||||
# If previous message is also assistant (multi-turn tool loop),
|
||||
# merge into the same assistant turn
|
||||
|
||||
+24
-4
@@ -38,6 +38,16 @@ SKIP_DIRS = {
|
||||
|
||||
_DEFAULT_BACKEND = ChromaBackend()
|
||||
|
||||
# Schema version for drawer normalization. Bump when the normalization
|
||||
# pipeline changes in a way that existing drawers should be rebuilt to pick up
|
||||
# (e.g., new noise-stripping rules). `file_already_mined` treats drawers with
|
||||
# a missing or stale `normalize_version` as "not mined", so the next mine pass
|
||||
# silently rebuilds them — users don't need to manually erase + re-mine.
|
||||
#
|
||||
# v2 (2026-04): introduced strip_noise() for Claude Code JSONL; previous
|
||||
# drawers stored system tags / hook chrome verbatim.
|
||||
NORMALIZE_VERSION = 2
|
||||
|
||||
|
||||
def get_collection(
|
||||
palace_path: str,
|
||||
@@ -94,16 +104,26 @@ def mine_lock(source_file: str):
|
||||
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
|
||||
"""Check if a file has already been filed in the palace.
|
||||
|
||||
When check_mtime=True (used by project miner), returns False if the file
|
||||
has been modified since it was last mined, so it gets re-mined.
|
||||
When check_mtime=False (used by convo miner), just checks existence.
|
||||
Returns False (so the file gets re-mined) when:
|
||||
- no drawers exist for this source_file
|
||||
- the stored `normalize_version` is missing or older than the current
|
||||
schema (triggers silent rebuild after a normalization upgrade)
|
||||
- `check_mtime=True` and the file's mtime differs from the stored one
|
||||
|
||||
When check_mtime=True (used by project miner), also re-mines on content
|
||||
change. When check_mtime=False (used by convo miner), transcripts are
|
||||
assumed immutable, so only the version gate triggers a rebuild.
|
||||
"""
|
||||
try:
|
||||
results = collection.get(where={"source_file": source_file}, limit=1)
|
||||
if not results.get("ids"):
|
||||
return False
|
||||
stored_meta = results.get("metadatas", [{}])[0] or {}
|
||||
# Pre-v2 drawers have no version field — treat them as stale.
|
||||
stored_version = stored_meta.get("normalize_version", 1)
|
||||
if stored_version < NORMALIZE_VERSION:
|
||||
return False
|
||||
if check_mtime:
|
||||
stored_meta = results.get("metadatas", [{}])[0]
|
||||
stored_mtime = stored_meta.get("source_mtime")
|
||||
if stored_mtime is None:
|
||||
return False
|
||||
|
||||
@@ -75,3 +75,86 @@ def test_mine_convos_does_not_reprocess_empty_chunk_files(capsys):
|
||||
assert "Files skipped (already filed): 1" in out2
|
||||
finally:
|
||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||
|
||||
|
||||
def test_mine_convos_rebuilds_stale_drawers_after_schema_bump(capsys):
|
||||
"""When stored drawers have an older normalize_version, the next mine
|
||||
silently purges them and refiles — no manual erase required.
|
||||
|
||||
This is what makes the strip_noise upgrade apply to existing corpora:
|
||||
users just run `mempalace mine` again and old noise-filled drawers get
|
||||
replaced with clean ones."""
|
||||
from mempalace.palace import NORMALIZE_VERSION
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
try:
|
||||
convo_path = Path(tmpdir) / "chat.txt"
|
||||
convo_path.write_text(
|
||||
"> What is memory?\nMemory is persistence.\n\n"
|
||||
"> Why does it matter?\nIt enables continuity.\n\n"
|
||||
"> How do we build it?\nWith structured storage.\n"
|
||||
)
|
||||
palace_path = os.path.join(tmpdir, "palace")
|
||||
|
||||
# First mine — stamps drawers with NORMALIZE_VERSION
|
||||
mine_convos(tmpdir, palace_path, wing="test")
|
||||
capsys.readouterr()
|
||||
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
resolved = str(Path(tmpdir).resolve() / "chat.txt")
|
||||
first_pass = col.get(where={"source_file": resolved})
|
||||
first_ids = set(first_pass["ids"])
|
||||
assert first_ids, "first mine should produce drawers"
|
||||
for meta in first_pass["metadatas"]:
|
||||
assert meta.get("normalize_version") == NORMALIZE_VERSION
|
||||
|
||||
# Simulate pre-v2 drawers: rewrite metadata to an older version,
|
||||
# and replace content with "noise" so we can see it get cleaned up.
|
||||
stale_metas = []
|
||||
for meta in first_pass["metadatas"]:
|
||||
stale = dict(meta)
|
||||
stale["normalize_version"] = 1
|
||||
stale_metas.append(stale)
|
||||
col.update(
|
||||
ids=list(first_pass["ids"]),
|
||||
documents=["STALE NOISE"] * len(first_pass["ids"]),
|
||||
metadatas=stale_metas,
|
||||
)
|
||||
# Add an extra orphan drawer that should also be purged.
|
||||
col.add(
|
||||
ids=["orphan_drawer"],
|
||||
documents=["OLD ORPHAN"],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": "test",
|
||||
"room": "default",
|
||||
"source_file": resolved,
|
||||
"chunk_index": 999,
|
||||
"normalize_version": 1,
|
||||
}
|
||||
],
|
||||
)
|
||||
del col, client
|
||||
|
||||
# Second mine — version gate should trigger rebuild
|
||||
mine_convos(tmpdir, palace_path, wing="test")
|
||||
out = capsys.readouterr().out
|
||||
assert (
|
||||
"Files skipped (already filed): 0" in out
|
||||
), "stale drawers should force a rebuild, not a skip"
|
||||
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
rebuilt = col.get(where={"source_file": resolved})
|
||||
# Orphan is gone
|
||||
assert "orphan_drawer" not in rebuilt["ids"]
|
||||
# No stale content survived
|
||||
assert all("STALE NOISE" not in d for d in rebuilt["documents"])
|
||||
assert all("OLD ORPHAN" not in d for d in rebuilt["documents"])
|
||||
# All rebuilt drawers carry the current version
|
||||
for meta in rebuilt["metadatas"]:
|
||||
assert meta.get("normalize_version") == NORMALIZE_VERSION
|
||||
del col, client
|
||||
finally:
|
||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||
|
||||
+90
-4
@@ -7,7 +7,7 @@ import chromadb
|
||||
import yaml
|
||||
|
||||
from mempalace.miner import mine, scan_project, status
|
||||
from mempalace.palace import file_already_mined
|
||||
from mempalace.palace import NORMALIZE_VERSION, file_already_mined
|
||||
|
||||
|
||||
def write_file(path: Path, content: str):
|
||||
@@ -227,11 +227,17 @@ def test_file_already_mined_check_mtime():
|
||||
assert file_already_mined(col, test_file) is False
|
||||
assert file_already_mined(col, test_file, check_mtime=True) is False
|
||||
|
||||
# Add it with mtime
|
||||
# Add it with mtime + current normalize_version
|
||||
col.add(
|
||||
ids=["d1"],
|
||||
documents=["hello world"],
|
||||
metadatas=[{"source_file": test_file, "source_mtime": str(mtime)}],
|
||||
metadatas=[
|
||||
{
|
||||
"source_file": test_file,
|
||||
"source_mtime": str(mtime),
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
# Already mined (no mtime check)
|
||||
@@ -253,7 +259,12 @@ def test_file_already_mined_check_mtime():
|
||||
col.add(
|
||||
ids=["d2"],
|
||||
documents=["other"],
|
||||
metadatas=[{"source_file": "/fake/no_mtime.txt"}],
|
||||
metadatas=[
|
||||
{
|
||||
"source_file": "/fake/no_mtime.txt",
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
assert file_already_mined(col, "/fake/no_mtime.txt", check_mtime=True) is False
|
||||
finally:
|
||||
@@ -296,3 +307,78 @@ def test_status_missing_palace_does_not_create_empty_collection(tmp_path, capsys
|
||||
out = capsys.readouterr().out
|
||||
assert "No palace found" in out
|
||||
assert not palace_path.exists()
|
||||
|
||||
|
||||
# ── normalize_version schema gate ───────────────────────────────────────
|
||||
#
|
||||
# When the normalization pipeline changes shape (e.g., strip_noise lands),
|
||||
# `NORMALIZE_VERSION` is bumped so pre-existing drawers can be silently
|
||||
# rebuilt on the next mine. These tests pin that contract.
|
||||
|
||||
|
||||
def test_file_already_mined_returns_false_for_stale_normalize_version():
|
||||
"""Pre-v2 drawers (no field, or older integer) must not short-circuit."""
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
try:
|
||||
palace_path = os.path.join(tmpdir, "palace")
|
||||
os.makedirs(palace_path)
|
||||
client = chromadb.PersistentClient(path=palace_path)
|
||||
col = client.get_or_create_collection("mempalace_drawers")
|
||||
|
||||
# Pre-v2 drawer: no normalize_version field at all
|
||||
col.add(
|
||||
ids=["d_old"],
|
||||
documents=["old"],
|
||||
metadatas=[{"source_file": "/fake/old.jsonl"}],
|
||||
)
|
||||
assert file_already_mined(col, "/fake/old.jsonl") is False
|
||||
|
||||
# Explicitly older version
|
||||
col.add(
|
||||
ids=["d_v1"],
|
||||
documents=["v1"],
|
||||
metadatas=[{"source_file": "/fake/v1.jsonl", "normalize_version": 1}],
|
||||
)
|
||||
assert file_already_mined(col, "/fake/v1.jsonl") is False
|
||||
|
||||
# Current version — short-circuits
|
||||
col.add(
|
||||
ids=["d_current"],
|
||||
documents=["cur"],
|
||||
metadatas=[
|
||||
{
|
||||
"source_file": "/fake/current.jsonl",
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
assert file_already_mined(col, "/fake/current.jsonl") is True
|
||||
finally:
|
||||
del col, client
|
||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||
|
||||
|
||||
def test_add_drawer_stamps_normalize_version(tmp_path):
|
||||
"""Fresh drawers carry the current schema version so future upgrades work."""
|
||||
from mempalace.miner import add_drawer
|
||||
|
||||
palace_path = tmp_path / "palace"
|
||||
palace_path.mkdir()
|
||||
client = chromadb.PersistentClient(path=str(palace_path))
|
||||
col = client.get_or_create_collection("mempalace_drawers")
|
||||
try:
|
||||
added = add_drawer(
|
||||
collection=col,
|
||||
wing="test",
|
||||
room="notes",
|
||||
content="hello",
|
||||
source_file=str(tmp_path / "src.md"),
|
||||
chunk_index=0,
|
||||
agent="unit",
|
||||
)
|
||||
assert added is True
|
||||
stored = col.get(limit=1)
|
||||
meta = stored["metadatas"][0]
|
||||
assert meta["normalize_version"] == NORMALIZE_VERSION
|
||||
finally:
|
||||
del col, client
|
||||
|
||||
@@ -13,6 +13,7 @@ from mempalace.normalize import (
|
||||
_try_normalize_json,
|
||||
_try_slack_json,
|
||||
normalize,
|
||||
strip_noise,
|
||||
)
|
||||
|
||||
|
||||
@@ -1048,3 +1049,148 @@ def test_normalize_rejects_large_file():
|
||||
assert False, "Should have raised IOError"
|
||||
except IOError as e:
|
||||
assert "too large" in str(e).lower()
|
||||
|
||||
|
||||
# ── strip_noise() — verbatim-safety boundary tests ─────────────────────
|
||||
#
|
||||
# The "Verbatim always" design principle requires that we never delete
|
||||
# user-authored text. These tests pin down the boundary between system
|
||||
# noise (which we strip) and user prose that happens to mention the same
|
||||
# strings (which must survive untouched).
|
||||
|
||||
|
||||
class TestStripNoisePreservesUserContent:
|
||||
"""User prose that mentions noise strings inline must be preserved."""
|
||||
|
||||
def test_user_discusses_stop_hook_in_prose(self):
|
||||
# Regression: original regex with IGNORECASE + `.*\n?` ate the second
|
||||
# sentence from real user commentary.
|
||||
text = (
|
||||
"> User:\n"
|
||||
"> Our CI has a stop hook that rejects merges after 5pm. "
|
||||
"Ran 2 stop hooks last week.\n"
|
||||
"> Assistant:\n"
|
||||
"> Got it."
|
||||
)
|
||||
assert strip_noise(text) == text.strip()
|
||||
|
||||
def test_user_mentions_system_reminder_inline(self):
|
||||
# Inline <system-reminder> tags inside user prose (e.g. documenting
|
||||
# Claude Code behavior) must not be stripped.
|
||||
text = (
|
||||
"> User:\n"
|
||||
"> Here is what Claude Code emits: "
|
||||
"<system-reminder>Auto-save reminder...</system-reminder>"
|
||||
" — I want to ignore it."
|
||||
)
|
||||
assert strip_noise(text) == text.strip()
|
||||
|
||||
def test_ctrl_o_hint_in_prose_preserved(self):
|
||||
# Regression: original `.*\(ctrl\+o to expand\).*\n?` nuked the whole
|
||||
# line whenever a user documented the TUI shortcut.
|
||||
text = (
|
||||
"> User:\n"
|
||||
"> In the TUI you hit (ctrl+o to expand) to see more. "
|
||||
"That is the shortcut I want to document."
|
||||
)
|
||||
assert strip_noise(text) == text.strip()
|
||||
|
||||
def test_current_time_inline_in_prose(self):
|
||||
text = "> User:\n> At CURRENT TIME: the meeting starts, not before."
|
||||
assert strip_noise(text) == text.strip()
|
||||
|
||||
def test_plus_n_lines_marker_inline(self):
|
||||
text = "> User:\n> The log showed … +50 lines of stack trace, useful."
|
||||
assert strip_noise(text) == text.strip()
|
||||
|
||||
def test_dangling_open_tag_does_not_span_messages(self):
|
||||
# THE span-eating bug: a stray unclosed <system-reminder> in one
|
||||
# message must NOT merge with a closing tag in another message and
|
||||
# silently delete everything in between.
|
||||
text = (
|
||||
"> User 1: normal content <system-reminder>A\n"
|
||||
"> Assistant: reply\n"
|
||||
"> User 2: more content</system-reminder> tail"
|
||||
)
|
||||
out = strip_noise(text)
|
||||
assert "Assistant: reply" in out
|
||||
assert "User 2: more content" in out
|
||||
assert "User 1: normal content" in out
|
||||
|
||||
|
||||
class TestStripNoiseRemovesSystemChrome:
|
||||
"""System-injected noise with standalone/line-anchored shape must be stripped."""
|
||||
|
||||
def test_strips_line_anchored_system_reminder_block(self):
|
||||
text = (
|
||||
"> User:\n"
|
||||
"<system-reminder>\n"
|
||||
"Auto-save reminder...\n"
|
||||
"</system-reminder>\n"
|
||||
"> Real message."
|
||||
)
|
||||
out = strip_noise(text)
|
||||
assert "system-reminder" not in out
|
||||
assert "Auto-save reminder" not in out
|
||||
assert "Real message." in out
|
||||
|
||||
def test_strips_system_reminder_with_blockquote_prefix(self):
|
||||
# _messages_to_transcript prefixes lines with "> ", so the line
|
||||
# anchor must also accept that shape.
|
||||
text = "> User:\n" "> <system-reminder>Injected noise</system-reminder>\n" "> Real message."
|
||||
out = strip_noise(text)
|
||||
assert "Injected noise" not in out
|
||||
assert "Real message." in out
|
||||
|
||||
def test_strips_standalone_ran_hook_line(self):
|
||||
text = "Ran 2 Stop hook\n> User: real content"
|
||||
out = strip_noise(text)
|
||||
assert "Ran 2 Stop hook" not in out
|
||||
assert "real content" in out
|
||||
|
||||
def test_strips_known_hook_names(self):
|
||||
for hook in ("Stop", "PreCompact", "PreToolUse", "PostToolUse", "UserPromptSubmit"):
|
||||
text = f"Ran 1 {hook} hook\n> User: content"
|
||||
assert hook not in strip_noise(text)
|
||||
|
||||
def test_strips_current_time_standalone(self):
|
||||
text = "CURRENT TIME: 2026-04-13 10:00 UTC\n> User: Hello"
|
||||
out = strip_noise(text)
|
||||
assert "CURRENT TIME" not in out
|
||||
assert "Hello" in out
|
||||
|
||||
def test_strips_collapsed_lines_marker(self):
|
||||
text = "… +42 lines\n> User: Hello"
|
||||
out = strip_noise(text)
|
||||
assert "+42 lines" not in out
|
||||
assert "Hello" in out
|
||||
|
||||
def test_strips_token_count_ctrl_o_chrome(self):
|
||||
# Claude Code's actual collapsed-output chrome: "[N tokens] (ctrl+o to expand)"
|
||||
text = "> Assistant: some output [5 tokens] (ctrl+o to expand)\n> User: ok"
|
||||
out = strip_noise(text)
|
||||
assert "(ctrl+o to expand)" not in out
|
||||
assert "[5 tokens]" not in out
|
||||
assert "some output" in out
|
||||
|
||||
def test_strips_each_known_noise_tag(self):
|
||||
for tag in (
|
||||
"system-reminder",
|
||||
"command-message",
|
||||
"command-name",
|
||||
"task-notification",
|
||||
"user-prompt-submit-hook",
|
||||
"hook_output",
|
||||
):
|
||||
text = f"> User:\n<{tag}>junk</{tag}>\n> Real."
|
||||
out = strip_noise(text)
|
||||
assert tag not in out, f"{tag} leaked into output"
|
||||
assert "Real." in out
|
||||
|
||||
def test_collapses_excessive_blank_lines(self):
|
||||
text = "line one\n\n\n\n\n\nline two"
|
||||
out = strip_noise(text)
|
||||
assert "line one" in out
|
||||
assert "line two" in out
|
||||
# Should collapse to no more than 3 newlines
|
||||
assert "\n\n\n\n" not in out
|
||||
|
||||
Reference in New Issue
Block a user