The hook PID guard used a single global ``~/.mempalace/hook_state/mine.pid`` file, which failed two ways: 1. ``_mine_already_running`` read-then-spawn was a TOCTOU race. Two near-simultaneous Stop hook fires both passed the existence/liveness check before either wrote — so both ended up calling ``_spawn_mine``. 2. ``_spawn_mine`` unconditionally overwrote the global PID file with the new child's PID. The first PID was lost, orphaning the first child. The user-visible result in #1212 was two concurrent ``mempalace mine`` processes running against the same source, both driving HNSW inserts in parallel — exactly the corruption pattern the guard was meant to prevent. #1206 reported the same shape from the perspective of the user (two mines hung on a 350MB folder). Replace the global file with per-target slots under ``~/.mempalace/hook_state/mine_pids/``, keyed by sha256 of the mine sub-arguments (everything after ``mine``). The slot is claimed via ``O_CREAT | O_EXCL`` so the claim is atomic — two simultaneous fires can never both pass. Stale slots (PID exists but is dead) are reclaimed transparently. Different targets (e.g. project mine vs transcript ingest, or two different MEMPAL_DIRs) get independent slots and run in parallel. The mine subprocess receives its slot path via ``MEMPALACE_MINE_PID_FILE`` env var; ``miner._cleanup_mine_pid_file`` reads that var on exit and removes the slot if it points at our PID, so orphaned slots from crashed mines don't accumulate. Also routes ``_ingest_transcript`` through ``_spawn_mine`` so the transcript ingest path now participates in the same dedup — repeated Stop fires for the same transcript no longer stack parallel mines. Closes #1212 Closes #1206
This commit is contained in:
+143
-31
@@ -6,6 +6,7 @@ Supported hooks: session-start, stop, precompact
|
||||
Supported harnesses: claude-code, codex (extensible to cursor, gemini, etc.)
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -256,7 +257,45 @@ def _get_mine_targets() -> list[tuple[str, str]]:
|
||||
return targets
|
||||
|
||||
|
||||
_MINE_PID_FILE = STATE_DIR / "mine.pid"
|
||||
# Per-target PID guard.
|
||||
#
|
||||
# Hook fires ingest mines in the background. If a previous fire's child is
|
||||
# still running for the *same* target (same source dir, mode, wing), the new
|
||||
# fire should skip rather than pile up — multiple concurrent mines against the
|
||||
# same source corrupt the HNSW index and exhaust disk via duplicate upserts
|
||||
# (#1212, #1206). But mines targeting *different* sources / modes must remain
|
||||
# independent so the user can have e.g. project-mining and transcript-ingest
|
||||
# running in parallel.
|
||||
#
|
||||
# The single ``mine.pid`` global file used previously failed both ways: the
|
||||
# guard was rebuilt every spawn (so two near-simultaneous fires both passed
|
||||
# the check before either wrote), and the file was unconditionally overwritten
|
||||
# (so the second spawn lost the first PID, orphaning it). The replacement is
|
||||
# a directory of per-target slots, claimed via ``O_CREAT | O_EXCL`` so the
|
||||
# claim is atomic and per-target.
|
||||
_MINE_PID_DIR = STATE_DIR / "mine_pids"
|
||||
|
||||
# The per-process PID file path is communicated to the mine subprocess via
|
||||
# this env var so the child's cleanup hook (in miner.py) can remove its
|
||||
# own slot on exit without scanning the whole directory.
|
||||
_MINE_PID_FILE_ENV = "MEMPALACE_MINE_PID_FILE"
|
||||
|
||||
|
||||
def _pid_file_for_cmd(cmd: list[str]) -> Path:
|
||||
"""Return the per-target PID file path for a mine subcommand.
|
||||
|
||||
The key is derived from the mine arguments (everything after ``mine``)
|
||||
so different (dir, mode, wing) combinations get independent slots.
|
||||
Two fires with the same arguments collapse to the same slot — which is
|
||||
exactly the dedup we want.
|
||||
"""
|
||||
try:
|
||||
idx = cmd.index("mine")
|
||||
key = " ".join(cmd[idx:])
|
||||
except ValueError:
|
||||
key = " ".join(cmd)
|
||||
digest = hashlib.sha256(key.encode("utf-8")).hexdigest()[:16]
|
||||
return _MINE_PID_DIR / f"mine_{digest}.pid"
|
||||
|
||||
|
||||
def _pid_alive(pid: int) -> bool:
|
||||
@@ -292,22 +331,96 @@ def _pid_alive(pid: int) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _mine_already_running() -> bool:
|
||||
"""Return True if a background mine process from a previous hook fire is still alive."""
|
||||
def _mine_already_running(cmd: list[str]) -> bool:
|
||||
"""Return True if a previous mine for ``cmd``'s target is still alive."""
|
||||
pid_file = _pid_file_for_cmd(cmd)
|
||||
try:
|
||||
pid = int(_MINE_PID_FILE.read_text().strip())
|
||||
except (OSError, ValueError):
|
||||
recorded = pid_file.read_text().strip()
|
||||
except OSError:
|
||||
return False
|
||||
return _pid_alive(pid)
|
||||
if not recorded.isdigit():
|
||||
return False
|
||||
return _pid_alive(int(recorded))
|
||||
|
||||
|
||||
def _claim_mine_slot(cmd: list[str]) -> Path | None:
|
||||
"""Atomically reserve the per-target PID slot for ``cmd``.
|
||||
|
||||
Returns the slot path on success, or ``None`` if the target is
|
||||
already being mined by a live process. The reservation is done via
|
||||
``O_CREAT | O_EXCL`` so two simultaneous hook fires can never both
|
||||
pass the check; one wins, the other returns None.
|
||||
|
||||
A stale slot (file exists but the recorded PID is dead) is reclaimed
|
||||
transparently — orphan miners that crashed without cleanup do not
|
||||
block future hook fires forever.
|
||||
"""
|
||||
pid_file = _pid_file_for_cmd(cmd)
|
||||
pid_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
fd = os.open(str(pid_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
|
||||
os.close(fd)
|
||||
return pid_file
|
||||
except FileExistsError:
|
||||
pass
|
||||
# Slot exists. If the holder is alive, defer.
|
||||
if _mine_already_running(cmd):
|
||||
return None
|
||||
# Stale entry; reclaim. The unlink+create is racy against another hook
|
||||
# firing right now, but the second create's O_EXCL will fail and that
|
||||
# caller will see the live PID via the next round.
|
||||
try:
|
||||
pid_file.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except OSError:
|
||||
return None
|
||||
try:
|
||||
fd = os.open(str(pid_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
|
||||
os.close(fd)
|
||||
return pid_file
|
||||
except FileExistsError:
|
||||
return None
|
||||
|
||||
|
||||
def _spawn_mine(cmd: list) -> None:
|
||||
"""Spawn a mine subprocess, write its PID to the lock file, log to hook.log."""
|
||||
"""Spawn a mine subprocess if no live mine is already targeting it.
|
||||
|
||||
The PID slot is claimed atomically *before* the spawn, so two near-
|
||||
simultaneous hook fires can't both proceed — the second sees the
|
||||
claimed slot and silently skips. The spawned process inherits a
|
||||
``MEMPALACE_MINE_PID_FILE`` env var so its cleanup hook can remove
|
||||
the slot on exit without scanning the directory.
|
||||
"""
|
||||
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
log_path = STATE_DIR / "hook.log"
|
||||
pid_file = _claim_mine_slot(cmd)
|
||||
if pid_file is None:
|
||||
_log(f"Skipping mine: target already running ({' '.join(cmd[-3:])})")
|
||||
return
|
||||
child_env = os.environ.copy()
|
||||
child_env[_MINE_PID_FILE_ENV] = str(pid_file)
|
||||
with open(log_path, "a") as log_f:
|
||||
proc = subprocess.Popen(cmd, stdout=log_f, stderr=log_f, **_detached_popen_kwargs())
|
||||
_MINE_PID_FILE.write_text(str(proc.pid))
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=log_f,
|
||||
stderr=log_f,
|
||||
env=child_env,
|
||||
**_detached_popen_kwargs(),
|
||||
)
|
||||
except OSError:
|
||||
# Spawn failed; release the slot we just claimed so the next
|
||||
# hook fire can try again rather than skipping forever.
|
||||
try:
|
||||
pid_file.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
try:
|
||||
pid_file.write_text(str(proc.pid))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _maybe_auto_ingest():
|
||||
@@ -317,13 +430,15 @@ def _maybe_auto_ingest():
|
||||
in the hook handlers — this function does not handle them, to avoid
|
||||
asymmetric interpreter handling and PID-file overwrite when both
|
||||
targets fire from a single hook call (#1231 review).
|
||||
|
||||
Per-target dedup is done by ``_spawn_mine`` itself: each (dir, mode)
|
||||
target gets its own PID slot, so distinct targets never block each
|
||||
other but a re-fire of the same target while the previous one is
|
||||
still running is silently skipped.
|
||||
"""
|
||||
targets = _get_mine_targets()
|
||||
if not targets:
|
||||
return
|
||||
if _mine_already_running():
|
||||
_log("Skipping auto-ingest: mine already running")
|
||||
return
|
||||
for mine_dir, mode in targets:
|
||||
try:
|
||||
_spawn_mine([_mempalace_python(), "-m", "mempalace", "mine", mine_dir, "--mode", mode])
|
||||
@@ -518,25 +633,22 @@ def _ingest_transcript(transcript_path: str):
|
||||
return
|
||||
|
||||
try:
|
||||
log_path = STATE_DIR / "hook.log"
|
||||
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with open(log_path, "a") as log_f:
|
||||
subprocess.Popen(
|
||||
[
|
||||
_mempalace_python(),
|
||||
"-m",
|
||||
"mempalace",
|
||||
"mine",
|
||||
str(path.parent),
|
||||
"--mode",
|
||||
"convos",
|
||||
"--wing",
|
||||
"sessions",
|
||||
],
|
||||
stdout=log_f,
|
||||
stderr=log_f,
|
||||
**_detached_popen_kwargs(),
|
||||
)
|
||||
# Route through ``_spawn_mine`` so the per-target PID guard kicks
|
||||
# in here too — repeated Stop/PreCompact fires for the same
|
||||
# transcript should not stack up parallel ingest mines.
|
||||
_spawn_mine(
|
||||
[
|
||||
_mempalace_python(),
|
||||
"-m",
|
||||
"mempalace",
|
||||
"mine",
|
||||
str(path.parent),
|
||||
"--mode",
|
||||
"convos",
|
||||
"--wing",
|
||||
"sessions",
|
||||
]
|
||||
)
|
||||
_log(f"Transcript ingest started: {path.name}")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
+16
-17
@@ -1206,30 +1206,29 @@ def _mine_impl(
|
||||
|
||||
|
||||
def _cleanup_mine_pid_file() -> None:
|
||||
"""Remove the global mine PID file if it currently points at us.
|
||||
"""Remove this process's per-target PID slot on exit.
|
||||
|
||||
The PID file (``~/.mempalace/hook_state/mine.pid``, written by the
|
||||
hook in :func:`mempalace.hooks_cli._spawn_mine`) tracks the PID of
|
||||
the most recently spawned mine subprocess so the hook can dedup
|
||||
concurrent auto-ingest fires. When that subprocess exits — cleanly,
|
||||
on error, or via Ctrl-C — it should remove its own entry so the
|
||||
next hook fire isn't briefly fooled by a stale PID before
|
||||
``_pid_alive`` returns False.
|
||||
Hook-spawned mines receive ``MEMPALACE_MINE_PID_FILE`` in their env
|
||||
pointing at the slot the hook claimed for them
|
||||
(``~/.mempalace/hook_state/mine_pids/mine_<sha>.pid``). When the
|
||||
subprocess exits — cleanly, on error, or via Ctrl-C — it removes its
|
||||
own slot so the next hook fire isn't briefly fooled by a stale PID
|
||||
before ``_pid_alive`` returns False.
|
||||
|
||||
We only delete the file if it claims our own PID; any other PID is
|
||||
left alone (could be an unrelated mine running concurrently from
|
||||
a different worktree / session).
|
||||
Only delete the slot if it claims our own PID; any other PID is left
|
||||
alone (it could belong to an unrelated mine that just claimed the
|
||||
same slot via a stale-reclaim race).
|
||||
"""
|
||||
try:
|
||||
from .hooks_cli import _MINE_PID_FILE
|
||||
except Exception:
|
||||
pid_file_env = os.environ.get("MEMPALACE_MINE_PID_FILE", "")
|
||||
if not pid_file_env:
|
||||
return
|
||||
try:
|
||||
if not _MINE_PID_FILE.exists():
|
||||
pid_file = Path(pid_file_env)
|
||||
if not pid_file.exists():
|
||||
return
|
||||
recorded = _MINE_PID_FILE.read_text().strip()
|
||||
recorded = pid_file.read_text().strip()
|
||||
if recorded and recorded.isdigit() and int(recorded) == os.getpid():
|
||||
_MINE_PID_FILE.unlink()
|
||||
pid_file.unlink()
|
||||
except OSError:
|
||||
# Best-effort cleanup; never fail the mine over PID bookkeeping.
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user