abd52534bb
- Add tests for config, convo_miner, spellcheck, knowledge_graph - Fix Windows PermissionError in test cleanup (chromadb file locks) - Add UTF-8 encoding to split_mega_files, entity_registry, hooks_cli - Fix mcp_server parse_known_args logging for unknown args - Set coverage threshold to 85 in pyproject.toml and CI - Reset all version files to 3.0.11 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
310 lines
10 KiB
Python
310 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
split_mega_files.py — Split concatenated transcript files into per-session files
|
|
=================================================================================
|
|
|
|
Scans a directory for .txt files that contain multiple Claude Code sessions
|
|
(identified by "Claude Code v" headers). Splits each into individual files
|
|
named with: date, time, people detected, and subject from first prompt.
|
|
|
|
Distinguishes true session starts from mid-session context restores
|
|
(which show "Ctrl+E to show X previous messages").
|
|
|
|
Output files are written to --output-dir (default: same dir as source).
|
|
Original files are renamed with .mega_backup extension (not deleted).
|
|
|
|
Usage:
|
|
python3 split_mega_files.py # scan ~/Desktop/transcripts
|
|
python3 split_mega_files.py --source ~/Desktop/transcripts # explicit source
|
|
python3 split_mega_files.py --dry-run # show what would happen
|
|
python3 split_mega_files.py --min-sessions 2 # only files with 2+ sessions
|
|
|
|
By: Ben, 2026-03-30
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
|
|
HOME = Path.home()
|
|
LUMI_DIR = Path(os.environ.get("MEMPALACE_SOURCE_DIR", str(HOME / "Desktop/transcripts")))
|
|
|
|
# People we know about (for name detection in content)
|
|
# Loaded from ~/.mempalace/known_names.json if it exists, otherwise generic fallback.
|
|
_KNOWN_NAMES_PATH = HOME / ".mempalace" / "known_names.json"
|
|
_FALLBACK_KNOWN_PEOPLE = ["Alice", "Ben", "Riley", "Max", "Sam", "Devon", "Jordan"]
|
|
_KNOWN_NAMES_CACHE = None
|
|
|
|
|
|
def _load_known_names_config(force_reload: bool = False):
|
|
"""Load and cache the optional known-names config file."""
|
|
global _KNOWN_NAMES_CACHE
|
|
|
|
if force_reload:
|
|
_KNOWN_NAMES_CACHE = None
|
|
|
|
if _KNOWN_NAMES_CACHE is not None:
|
|
return _KNOWN_NAMES_CACHE
|
|
|
|
if _KNOWN_NAMES_PATH.exists():
|
|
try:
|
|
_KNOWN_NAMES_CACHE = json.loads(_KNOWN_NAMES_PATH.read_text())
|
|
return _KNOWN_NAMES_CACHE
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
_KNOWN_NAMES_CACHE = None
|
|
return None
|
|
|
|
|
|
def _load_known_people() -> list:
|
|
"""Load known names from config file, falling back to a generic list."""
|
|
data = _load_known_names_config()
|
|
if isinstance(data, list):
|
|
return data
|
|
if isinstance(data, dict):
|
|
return data.get("names", [])
|
|
return list(_FALLBACK_KNOWN_PEOPLE)
|
|
|
|
|
|
KNOWN_PEOPLE = _load_known_people()
|
|
|
|
|
|
def _load_username_map() -> dict:
|
|
"""Load username-to-name mapping from config file."""
|
|
data = _load_known_names_config()
|
|
if isinstance(data, dict):
|
|
return data.get("username_map", {})
|
|
return {}
|
|
|
|
|
|
def is_true_session_start(lines, idx):
|
|
"""
|
|
True session start: 'Claude Code v' header NOT followed by 'Ctrl+E'/'previous messages'
|
|
within the next 6 lines (those are context restores, not new sessions).
|
|
"""
|
|
nearby = "".join(lines[idx : idx + 6])
|
|
return "Ctrl+E" not in nearby and "previous messages" not in nearby
|
|
|
|
|
|
def find_session_boundaries(lines):
|
|
"""Return list of line indices where true new sessions begin."""
|
|
boundaries = []
|
|
for i, line in enumerate(lines):
|
|
if "Claude Code v" in line and is_true_session_start(lines, i):
|
|
boundaries.append(i)
|
|
return boundaries
|
|
|
|
|
|
def extract_timestamp(lines):
|
|
"""
|
|
Find the first timestamp line: ⏺ H:MM AM/PM Weekday, Month DD, YYYY
|
|
Returns (datetime_str, iso_str) or (None, None).
|
|
"""
|
|
ts_pattern = re.compile(r"⏺\s+(\d{1,2}:\d{2}\s+[AP]M)\s+\w+,\s+(\w+)\s+(\d{1,2}),\s+(\d{4})")
|
|
months = {
|
|
"January": "01",
|
|
"February": "02",
|
|
"March": "03",
|
|
"April": "04",
|
|
"May": "05",
|
|
"June": "06",
|
|
"July": "07",
|
|
"August": "08",
|
|
"September": "09",
|
|
"October": "10",
|
|
"November": "11",
|
|
"December": "12",
|
|
}
|
|
for line in lines[:50]:
|
|
m = ts_pattern.search(line)
|
|
if m:
|
|
time_str, month, day, year = m.groups()
|
|
mon = months.get(month, "00")
|
|
day_z = day.zfill(2)
|
|
time_safe = time_str.replace(":", "").replace(" ", "")
|
|
iso = f"{year}-{mon}-{day_z}"
|
|
human = f"{year}-{mon}-{day_z}_{time_safe}"
|
|
return human, iso
|
|
return None, None
|
|
|
|
|
|
def extract_people(lines):
|
|
"""
|
|
Detect people mentioned as speakers or by name in first 100 lines.
|
|
Returns sorted list of detected names.
|
|
"""
|
|
found = set()
|
|
text = "".join(lines[:100])
|
|
|
|
# Speaker tags: "Alice:", "Ben:", etc.
|
|
for person in KNOWN_PEOPLE:
|
|
if re.search(rf"\b{person}\b", text, re.IGNORECASE):
|
|
found.add(person)
|
|
|
|
# Working directory username hint — map to known people if configured
|
|
dir_match = re.search(r"/Users/(\w+)/", text)
|
|
if dir_match:
|
|
username = dir_match.group(1)
|
|
# User can map usernames to names in ~/.mempalace/known_names.json
|
|
# under a "username_map" key, e.g. {"username_map": {"jdoe": "John"}}
|
|
username_map = _load_username_map()
|
|
if username in username_map:
|
|
found.add(username_map[username])
|
|
|
|
return sorted(found)
|
|
|
|
|
|
def extract_subject(lines):
|
|
"""
|
|
Find the first meaningful user prompt (> line that isn't a shell command).
|
|
Returns cleaned, filename-safe subject string.
|
|
"""
|
|
skip_patterns = re.compile(
|
|
r"^(\.\/|cd |ls |python|bash|git |cat |source |export |claude|./activate)"
|
|
)
|
|
for line in lines:
|
|
if line.startswith("> "):
|
|
prompt = line[2:].strip()
|
|
if prompt and not skip_patterns.match(prompt) and len(prompt) > 5:
|
|
# Clean for filename
|
|
subject = re.sub(r"[^\w\s-]", "", prompt)
|
|
subject = re.sub(r"\s+", "-", subject.strip())
|
|
return subject[:60]
|
|
return "session"
|
|
|
|
|
|
def split_file(filepath, output_dir, dry_run=False):
|
|
"""
|
|
Split a single mega-file into per-session files.
|
|
Returns list of output paths written (or would be written if dry_run).
|
|
"""
|
|
path = Path(filepath)
|
|
lines = path.read_text(errors="replace").splitlines(keepends=True)
|
|
|
|
boundaries = find_session_boundaries(lines)
|
|
if len(boundaries) < 2:
|
|
return [] # Not a mega-file
|
|
|
|
# Add sentinel at end
|
|
boundaries.append(len(lines))
|
|
|
|
out_dir = Path(output_dir) if output_dir else path.parent
|
|
written = []
|
|
|
|
for i, (start, end) in enumerate(zip(boundaries, boundaries[1:])):
|
|
chunk = lines[start:end]
|
|
if len(chunk) < 10:
|
|
continue # Skip tiny fragments
|
|
|
|
ts_human, ts_iso = extract_timestamp(chunk)
|
|
people = extract_people(chunk)
|
|
subject = extract_subject(chunk)
|
|
|
|
# Build filename: SOURCESTEM__DATE_TIME_People_subject.txt
|
|
# Source stem prefix prevents collisions when multiple mega-files
|
|
# produce sessions with the same timestamp/people/subject.
|
|
ts_part = ts_human or f"part{i + 1:02d}"
|
|
people_part = "-".join(people[:3]) if people else "unknown"
|
|
src_stem = re.sub(r"[^\w-]", "_", path.stem)[:40]
|
|
name = f"{src_stem}__{ts_part}_{people_part}_{subject}.txt"
|
|
# Sanitize
|
|
name = re.sub(r"[^\w\.\-]", "_", name)
|
|
name = re.sub(r"_+", "_", name)
|
|
|
|
out_path = out_dir / name
|
|
|
|
if dry_run:
|
|
print(f" [{i + 1}/{len(boundaries) - 1}] {name} ({len(chunk)} lines)")
|
|
else:
|
|
out_path.write_text("".join(chunk), encoding="utf-8")
|
|
print(f" ✓ {name} ({len(chunk)} lines)")
|
|
|
|
written.append(out_path)
|
|
|
|
return written
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Split concatenated transcript mega-files into per-session files"
|
|
)
|
|
parser.add_argument(
|
|
"--source",
|
|
type=str,
|
|
default=None,
|
|
help="Source directory (default: MEMPALACE_SOURCE_DIR or ~/Desktop/transcripts)",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir", type=str, default=None, help="Output directory (default: same as source)"
|
|
)
|
|
parser.add_argument(
|
|
"--min-sessions",
|
|
type=int,
|
|
default=2,
|
|
help="Only split files with at least N sessions (default: 2)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true", help="Show what would happen without writing files"
|
|
)
|
|
parser.add_argument(
|
|
"--file",
|
|
type=str,
|
|
default=None,
|
|
help="Split a single specific file instead of scanning dir",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
src_dir = Path(args.source) if args.source else LUMI_DIR
|
|
output_dir = args.output_dir or None # None = same dir as file
|
|
|
|
if args.file:
|
|
files = [Path(args.file)]
|
|
else:
|
|
files = sorted(src_dir.glob("*.txt"))
|
|
|
|
mega_files = []
|
|
for f in files:
|
|
lines = f.read_text(errors="replace").splitlines(keepends=True)
|
|
boundaries = find_session_boundaries(lines)
|
|
if len(boundaries) >= args.min_sessions:
|
|
mega_files.append((f, len(boundaries)))
|
|
|
|
if not mega_files:
|
|
print(f"No mega-files found in {src_dir} (min {args.min_sessions} sessions).")
|
|
return
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print(f" Mega-file splitter — {'DRY RUN' if args.dry_run else 'SPLITTING'}")
|
|
print(f"{'=' * 60}")
|
|
print(f" Source: {src_dir}")
|
|
print(f" Output: {output_dir or 'same dir as source'}")
|
|
print(f" Mega-files: {len(mega_files)}")
|
|
print(f"{'─' * 60}\n")
|
|
|
|
total_written = 0
|
|
for f, n_sessions in mega_files:
|
|
print(f" {f.name} ({n_sessions} sessions, {f.stat().st_size // 1024}KB)")
|
|
written = split_file(f, output_dir, dry_run=args.dry_run)
|
|
total_written += len(written)
|
|
|
|
if not args.dry_run and written:
|
|
backup = f.with_suffix(".mega_backup")
|
|
f.rename(backup)
|
|
print(f" → Original renamed to {backup.name}\n")
|
|
else:
|
|
print()
|
|
|
|
print(f"{'─' * 60}")
|
|
if args.dry_run:
|
|
print(f" DRY RUN — would create {total_written} files from {len(mega_files)} mega-files")
|
|
else:
|
|
print(f" Done — created {total_written} files from {len(mega_files)} mega-files")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|