test: serialize mine_lock concurrency test with multiprocessing

The macOS CI job failed ``test_lock_blocks_concurrent_access`` because
``fcntl.flock`` on BSD/macOS is per-*process*, not per-FD: two threads
in the same process both acquire even when they open their own file
descriptors. The test passed on Linux (per-FD flock) and Windows
(per-FD ``msvcrt.locking``) but was never actually exercising the
lock's real contract.

``mine_lock`` is designed to serialize multi-*agent* access — i.e.,
separate processes, not threads. Switch the test to
``multiprocessing.get_context('spawn')`` with a module-level worker
(so the spawn pickles cleanly) so it:

  1. reflects the actual use case (one lock per mining process);
  2. passes on all three OSes without flock-semantics branching;
  3. catches real regressions (a broken lock would now let both
     processes through, exactly what we care about).

Hold time bumped to 0.3s and the "wait until p1 acquires" delay to
0.2s to tolerate spawn's higher startup latency on macOS/Windows.
This commit is contained in:
Igor Lins e Silva
2026-04-13 19:02:51 -03:00
parent 7192552624
commit e052074624
+38 -16
View File
@@ -24,6 +24,7 @@ Coverage map:
"""
import json
import multiprocessing
import os
import tempfile
import threading
@@ -63,6 +64,18 @@ from mempalace.searcher import (
# ── mine_lock ────────────────────────────────────────────────────────────
def _lock_worker(target: str, name: str, hold_seconds: float, queue) -> None:
"""Module-level worker for multiprocessing spawn; must be pickle-able."""
from mempalace.palace import mine_lock as _mine_lock
start = time.time()
with _mine_lock(target):
elapsed = time.time() - start
queue.put((name, elapsed))
if hold_seconds > 0:
time.sleep(hold_seconds)
class TestMineLock:
def test_lock_acquires_and_releases(self, tmp_path):
target = str(tmp_path / "lock_target.txt")
@@ -76,28 +89,37 @@ class TestMineLock:
assert time.time() - start < 1.0
def test_lock_blocks_concurrent_access(self, tmp_path):
"""The lock's contract is inter-*process* (multi-agent), not
inter-thread. Use multiprocessing so the test reflects the real
use case and is portable: on macOS/BSD, ``fcntl.flock`` is
per-process, so two threads in one process would both acquire —
a threading-based test would flake there even when the lock is
behaving correctly for its intended users."""
target = str(tmp_path / "concurrent_lock.txt")
# Use multiprocessing so each worker has its own process.
# Use "spawn" to stay consistent across platforms (macOS defaults
# to spawn on 3.8+; Linux defaults to fork). Both work here.
ctx = multiprocessing.get_context("spawn")
queue = ctx.Queue()
p1 = ctx.Process(target=_lock_worker, args=(target, "a", 0.3, queue))
p2 = ctx.Process(target=_lock_worker, args=(target, "b", 0.0, queue))
p1.start()
time.sleep(0.2) # ensure p1 acquires first
p2.start()
p1.join(timeout=10)
p2.join(timeout=10)
results = []
while not queue.empty():
results.append(queue.get())
assert len(results) == 2, f"both workers should report, got {results}"
def worker(name):
start = time.time()
with mine_lock(target):
results.append((name, time.time() - start))
time.sleep(0.2)
t1 = threading.Thread(target=worker, args=("a",))
t2 = threading.Thread(target=worker, args=("b",))
t1.start()
time.sleep(0.05) # ensure t1 acquires first
t2.start()
t1.join()
t2.join()
# The second worker must have waited at least most of t1's hold time.
# The second worker must have waited until p1 released the lock.
wait_times = sorted(r[1] for r in results)
assert (
wait_times[1] > 0.1
), f"second thread should block on mine_lock, waited only {wait_times[1]:.3f}s"
), f"second process should block on mine_lock, waited only {wait_times[1]:.3f}s"
# ── build_closet_lines ─────────────────────────────────────────────────