Merge pull request #1413 from MemPalace/fix/1264-mine-lock-holder-diagnostics
fix(mine): identify lock holder + exit non-zero on contention (#1264)
This commit is contained in:
+33
-23
@@ -500,31 +500,41 @@ def cmd_mine(args):
|
|||||||
llm_provider=None,
|
llm_provider=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
if args.mode == "convos":
|
from .palace import MineAlreadyRunning
|
||||||
from .convo_miner import mine_convos
|
|
||||||
|
|
||||||
mine_convos(
|
try:
|
||||||
convo_dir=args.dir,
|
if args.mode == "convos":
|
||||||
palace_path=palace_path,
|
from .convo_miner import mine_convos
|
||||||
wing=args.wing,
|
|
||||||
agent=args.agent,
|
|
||||||
limit=args.limit,
|
|
||||||
dry_run=args.dry_run,
|
|
||||||
extract_mode=args.extract,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
from .miner import mine
|
|
||||||
|
|
||||||
mine(
|
mine_convos(
|
||||||
project_dir=args.dir,
|
convo_dir=args.dir,
|
||||||
palace_path=palace_path,
|
palace_path=palace_path,
|
||||||
wing_override=args.wing,
|
wing=args.wing,
|
||||||
agent=args.agent,
|
agent=args.agent,
|
||||||
limit=args.limit,
|
limit=args.limit,
|
||||||
dry_run=args.dry_run,
|
dry_run=args.dry_run,
|
||||||
respect_gitignore=not args.no_gitignore,
|
extract_mode=args.extract,
|
||||||
include_ignored=include_ignored,
|
)
|
||||||
)
|
else:
|
||||||
|
from .miner import mine
|
||||||
|
|
||||||
|
mine(
|
||||||
|
project_dir=args.dir,
|
||||||
|
palace_path=palace_path,
|
||||||
|
wing_override=args.wing,
|
||||||
|
agent=args.agent,
|
||||||
|
limit=args.limit,
|
||||||
|
dry_run=args.dry_run,
|
||||||
|
respect_gitignore=not args.no_gitignore,
|
||||||
|
include_ignored=include_ignored,
|
||||||
|
)
|
||||||
|
except MineAlreadyRunning as exc:
|
||||||
|
# A live MCP server or another mine is already writing to this
|
||||||
|
# palace. Surface the holder identity so the operator knows what
|
||||||
|
# to wait for (or stop), and exit non-zero so wrappers like
|
||||||
|
# nohup / scripts can detect the contention.
|
||||||
|
print(f"mempalace: {exc}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def cmd_sweep(args):
|
def cmd_sweep(args):
|
||||||
|
|||||||
+14
-20
@@ -21,7 +21,6 @@ from typing import Optional
|
|||||||
from .palace import (
|
from .palace import (
|
||||||
NORMALIZE_VERSION,
|
NORMALIZE_VERSION,
|
||||||
SKIP_DIRS,
|
SKIP_DIRS,
|
||||||
MineAlreadyRunning,
|
|
||||||
build_closet_lines,
|
build_closet_lines,
|
||||||
file_already_mined,
|
file_already_mined,
|
||||||
get_closets_collection,
|
get_closets_collection,
|
||||||
@@ -1035,26 +1034,21 @@ def mine(
|
|||||||
files=files,
|
files=files,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
# MineAlreadyRunning propagates so the CLI can render a clear holder-aware
|
||||||
with mine_palace_lock(palace_path):
|
# message and exit non-zero. In-process callers (tests, library users) that
|
||||||
return _mine_impl(
|
# expect to coexist with another writer should handle the exception.
|
||||||
project_dir,
|
with mine_palace_lock(palace_path):
|
||||||
palace_path,
|
return _mine_impl(
|
||||||
wing_override=wing_override,
|
project_dir,
|
||||||
agent=agent,
|
palace_path,
|
||||||
limit=limit,
|
wing_override=wing_override,
|
||||||
dry_run=dry_run,
|
agent=agent,
|
||||||
respect_gitignore=respect_gitignore,
|
limit=limit,
|
||||||
include_ignored=include_ignored,
|
dry_run=dry_run,
|
||||||
files=files,
|
respect_gitignore=respect_gitignore,
|
||||||
)
|
include_ignored=include_ignored,
|
||||||
except MineAlreadyRunning:
|
files=files,
|
||||||
print(
|
|
||||||
f"mempalace: another `mine` is already running against "
|
|
||||||
f"{palace_path} — exiting cleanly.",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
)
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def _mine_impl(
|
def _mine_impl(
|
||||||
|
|||||||
+77
-3
@@ -9,6 +9,7 @@ import hashlib
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import sys
|
||||||
import threading
|
import threading
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -364,6 +365,53 @@ def _mark_released(lock_key: str) -> None:
|
|||||||
_holder_state().discard(lock_key)
|
_holder_state().discard(lock_key)
|
||||||
|
|
||||||
|
|
||||||
|
def _format_lock_holder(content: str) -> str:
|
||||||
|
"""Render a lock-file body as 'PID N (cmdline)' for diagnostic messages."""
|
||||||
|
parts = content.split(maxsplit=1)
|
||||||
|
if not parts or not parts[0].isdigit():
|
||||||
|
return "another writer (identity not recorded)"
|
||||||
|
pid = parts[0]
|
||||||
|
if len(parts) > 1 and parts[1].strip():
|
||||||
|
return f"PID {pid} ({parts[1].strip()})"
|
||||||
|
return f"PID {pid}"
|
||||||
|
|
||||||
|
|
||||||
|
# Byte 0 of the lock file is reserved as the OS lock sentinel.
|
||||||
|
# Holder identity is written from byte 1 onward so contenders can read
|
||||||
|
# the identity without colliding with byte 0 (Windows msvcrt.locking
|
||||||
|
# blocks both reads and writes on the locked byte).
|
||||||
|
_LOCK_SENTINEL_BYTES = 1
|
||||||
|
|
||||||
|
|
||||||
|
def _read_lock_holder(lock_file) -> str:
|
||||||
|
"""Read the prior holder's identity from the lock-file body, best-effort."""
|
||||||
|
try:
|
||||||
|
lock_file.seek(_LOCK_SENTINEL_BYTES)
|
||||||
|
content = lock_file.read().strip()
|
||||||
|
except OSError:
|
||||||
|
return "another writer (identity not recorded)"
|
||||||
|
if not content:
|
||||||
|
return "another writer (identity not recorded)"
|
||||||
|
return _format_lock_holder(content)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_lock_holder(lock_file) -> None:
|
||||||
|
"""Record this process's identity in the lock-file body. Best-effort.
|
||||||
|
|
||||||
|
Writes from byte 1 onward; byte 0 is the lock sentinel and must not
|
||||||
|
be touched after acquire (truncating it on Windows can interact
|
||||||
|
badly with the active byte-range lock).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
ident = f"{os.getpid()} {' '.join(sys.argv[:3])}".strip()
|
||||||
|
lock_file.seek(_LOCK_SENTINEL_BYTES)
|
||||||
|
lock_file.truncate(_LOCK_SENTINEL_BYTES + len(ident.encode("utf-8")))
|
||||||
|
lock_file.write(ident)
|
||||||
|
lock_file.flush()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def mine_palace_lock(palace_path: str):
|
def mine_palace_lock(palace_path: str):
|
||||||
"""Per-palace non-blocking lock around the full `mine` pipeline.
|
"""Per-palace non-blocking lock around the full `mine` pipeline.
|
||||||
@@ -407,9 +455,27 @@ def mine_palace_lock(palace_path: str):
|
|||||||
yield
|
yield
|
||||||
return
|
return
|
||||||
|
|
||||||
lf = open(lock_path, "w")
|
# Ensure the file exists, then open r+ so we can both read the prior
|
||||||
|
# holder's identity (for failure diagnostics) and write our own. "w"
|
||||||
|
# truncates and erases the prior holder. "a+" puts the position at EOF,
|
||||||
|
# which on Windows breaks ``msvcrt.locking`` (it locks 1 byte at the
|
||||||
|
# *current* position, so two contenders end up locking different bytes
|
||||||
|
# and silently both acquire — observed as Windows-CI lock test
|
||||||
|
# failures during #1264 development).
|
||||||
|
if not os.path.exists(lock_path):
|
||||||
|
# Touch atomically: O_CREAT|O_EXCL would fail if a concurrent
|
||||||
|
# contender just created it, which is fine — we proceed to open.
|
||||||
|
try:
|
||||||
|
fd = os.open(lock_path, os.O_CREAT | os.O_WRONLY, 0o600)
|
||||||
|
os.close(fd)
|
||||||
|
except FileExistsError:
|
||||||
|
pass
|
||||||
|
lf = open(lock_path, "r+")
|
||||||
acquired = False
|
acquired = False
|
||||||
try:
|
try:
|
||||||
|
# Lock byte 0 explicitly. msvcrt.locking is byte-position dependent;
|
||||||
|
# fcntl.flock is whole-file but the seek is harmless there.
|
||||||
|
lf.seek(0)
|
||||||
if os.name == "nt":
|
if os.name == "nt":
|
||||||
import msvcrt
|
import msvcrt
|
||||||
|
|
||||||
@@ -417,8 +483,10 @@ def mine_palace_lock(palace_path: str):
|
|||||||
msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1)
|
msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1)
|
||||||
acquired = True
|
acquired = True
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
|
holder = _read_lock_holder(lf)
|
||||||
raise MineAlreadyRunning(
|
raise MineAlreadyRunning(
|
||||||
f"another `mempalace mine` is already running against {resolved}"
|
f"palace {resolved} is held by {holder}; "
|
||||||
|
"wait for it to finish or stop the holder before retrying"
|
||||||
) from exc
|
) from exc
|
||||||
else:
|
else:
|
||||||
import fcntl
|
import fcntl
|
||||||
@@ -427,9 +495,13 @@ def mine_palace_lock(palace_path: str):
|
|||||||
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
acquired = True
|
acquired = True
|
||||||
except BlockingIOError as exc:
|
except BlockingIOError as exc:
|
||||||
|
holder = _read_lock_holder(lf)
|
||||||
raise MineAlreadyRunning(
|
raise MineAlreadyRunning(
|
||||||
f"another `mempalace mine` is already running against {resolved}"
|
f"palace {resolved} is held by {holder}; "
|
||||||
|
"wait for it to finish or stop the holder before retrying"
|
||||||
) from exc
|
) from exc
|
||||||
|
# Record our own identity for any later contender's diagnostic message.
|
||||||
|
_write_lock_holder(lf)
|
||||||
_mark_held(palace_key)
|
_mark_held(palace_key)
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
@@ -441,6 +513,8 @@ def mine_palace_lock(palace_path: str):
|
|||||||
if os.name == "nt":
|
if os.name == "nt":
|
||||||
import msvcrt
|
import msvcrt
|
||||||
|
|
||||||
|
# Match the lock region: byte 0.
|
||||||
|
lf.seek(0)
|
||||||
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
|
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
|
||||||
else:
|
else:
|
||||||
import fcntl
|
import fcntl
|
||||||
|
|||||||
@@ -555,6 +555,45 @@ def test_cmd_mine_include_ignored_comma_split(mock_config_cls):
|
|||||||
assert call_kwargs["include_ignored"] == ["a.txt", "b.txt", "c.txt"]
|
assert call_kwargs["include_ignored"] == ["a.txt", "b.txt", "c.txt"]
|
||||||
|
|
||||||
|
|
||||||
|
@patch("mempalace.cli.MempalaceConfig")
|
||||||
|
def test_cmd_mine_exits_nonzero_on_lock_holder(mock_config_cls, capsys):
|
||||||
|
"""Regression #1264: lock contention must exit non-zero with a clear message.
|
||||||
|
|
||||||
|
Before this fix the CLI silently returned 0 when another writer held
|
||||||
|
the palace lock — operators using nohup/scripts had no way to detect
|
||||||
|
the contention. The new behavior raises MineAlreadyRunning out of
|
||||||
|
miner.mine() and cmd_mine catches it, printing the holder identity
|
||||||
|
to stderr and exiting non-zero.
|
||||||
|
"""
|
||||||
|
from mempalace.palace import MineAlreadyRunning
|
||||||
|
|
||||||
|
mock_config_cls.return_value.palace_path = "/fake/palace"
|
||||||
|
args = argparse.Namespace(
|
||||||
|
dir="/src",
|
||||||
|
palace=None,
|
||||||
|
mode="projects",
|
||||||
|
wing=None,
|
||||||
|
agent="mempalace",
|
||||||
|
limit=0,
|
||||||
|
dry_run=False,
|
||||||
|
no_gitignore=False,
|
||||||
|
include_ignored=[],
|
||||||
|
extract="exchange",
|
||||||
|
)
|
||||||
|
with patch(
|
||||||
|
"mempalace.miner.mine",
|
||||||
|
side_effect=MineAlreadyRunning(
|
||||||
|
"palace /fake/palace is held by PID 12345 (mempalace mcp_server); wait for it to finish"
|
||||||
|
),
|
||||||
|
):
|
||||||
|
with pytest.raises(SystemExit) as excinfo:
|
||||||
|
cmd_mine(args)
|
||||||
|
assert excinfo.value.code == 1
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert "PID 12345" in captured.err
|
||||||
|
assert "mcp_server" in captured.err
|
||||||
|
|
||||||
|
|
||||||
# ── cmd_wakeup ─────────────────────────────────────────────────────────
|
# ── cmd_wakeup ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -208,6 +208,93 @@ def _try_acquire_expect_busy(palace_path, result_q):
|
|||||||
result_q.put("busy")
|
result_q.put("busy")
|
||||||
|
|
||||||
|
|
||||||
|
def _hold_lock_send_pid(palace_path: str, ready_flag: str, release_flag: str, pid_q) -> None:
|
||||||
|
"""Acquire the lock, push our PID + cmdline through the queue, then wait."""
|
||||||
|
import sys as _sys
|
||||||
|
|
||||||
|
try:
|
||||||
|
with mine_palace_lock(palace_path):
|
||||||
|
pid_q.put((os.getpid(), list(_sys.argv[:3])))
|
||||||
|
open(ready_flag, "w").close()
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(release_flag):
|
||||||
|
return
|
||||||
|
time.sleep(0.01)
|
||||||
|
except MineAlreadyRunning:
|
||||||
|
pid_q.put(("error", "raised"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_lock_failure_message_names_holder(tmp_path, monkeypatch):
|
||||||
|
"""Regression #1264: failed acquire must identify the holder by PID.
|
||||||
|
|
||||||
|
Before this fix, a `mempalace mine` colliding with another writer
|
||||||
|
(mine, MCP server, anything taking mine_palace_lock) saw a generic
|
||||||
|
"another `mempalace mine` is already running" message and exited
|
||||||
|
silently. The operator had no signal of which process to wait for
|
||||||
|
or stop. The new message includes ``PID N`` so the holder can be
|
||||||
|
identified directly.
|
||||||
|
"""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
ready = str(tmp_path / "ready")
|
||||||
|
release = str(tmp_path / "release")
|
||||||
|
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
pid_q = ctx.Queue()
|
||||||
|
holder = ctx.Process(target=_hold_lock_send_pid, args=(palace, ready, release, pid_q))
|
||||||
|
holder.start()
|
||||||
|
try:
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock in time"
|
||||||
|
holder_pid, _holder_argv = pid_q.get(timeout=2)
|
||||||
|
|
||||||
|
with pytest.raises(MineAlreadyRunning) as excinfo:
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
pytest.fail("second acquire of same palace should have raised")
|
||||||
|
|
||||||
|
msg = str(excinfo.value)
|
||||||
|
assert (
|
||||||
|
f"PID {holder_pid}" in msg
|
||||||
|
), f"lock-failure message must name the holder PID; got: {msg!r}"
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_lock_holder_identity_persists_across_release(tmp_path, monkeypatch):
|
||||||
|
"""The holder line is overwritten by each new acquirer, not appended.
|
||||||
|
|
||||||
|
Without explicit truncate the lock file would accumulate lines across
|
||||||
|
runs and grow without bound. Verify that re-acquire keeps the body
|
||||||
|
bounded.
|
||||||
|
"""
|
||||||
|
# ``os.path.expanduser("~")`` reads HOME on POSIX but USERPROFILE on
|
||||||
|
# Windows; setting both makes the ``~/.mempalace/locks`` lookup land
|
||||||
|
# under ``tmp_path`` regardless of platform.
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
for _ in range(5):
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Locate the lock file. The key derivation is internal but we can find
|
||||||
|
# it by scanning the mempalace locks dir for mine_palace_*.lock entries.
|
||||||
|
lock_dir = tmp_path / ".mempalace" / "locks"
|
||||||
|
lock_files = list(lock_dir.glob("mine_palace_*.lock"))
|
||||||
|
assert lock_files, "expected the palace lock file to exist after acquire/release"
|
||||||
|
# Read as bytes so the byte-0 sentinel (\x00) is preserved without
|
||||||
|
# decode quirks; the bound is on the file size, not its line count.
|
||||||
|
body = lock_files[0].read_bytes()
|
||||||
|
# Body is byte-0 sentinel + identity (no trailing accumulation).
|
||||||
|
# Identity is ``f"{pid} {sys.argv[:3]}"``; cap at a generous bound that
|
||||||
|
# still rules out unbounded growth across the 5 re-acquires.
|
||||||
|
assert len(body) < 1024, f"lock body must not grow across re-acquires; got {len(body)} bytes"
|
||||||
|
|
||||||
|
|
||||||
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
|
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
|
||||||
"""Old callers of `mine_global_lock` should still work."""
|
"""Old callers of `mine_global_lock` should still work."""
|
||||||
monkeypatch.setenv("HOME", str(tmp_path))
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
|||||||
Reference in New Issue
Block a user