test: verify mine_lock via disjoint critical-section intervals

The previous revision used multiprocessing but still relied on timing
("second process waited at least N seconds") which flakes on CI where
spawn overhead eats into the hold window. Linux CI observed the second
process report a 0.088s wait — below the 0.1s threshold — even though
the lock behavior was correct; spawn was just slow enough that the
first process had nearly finished holding when the second got past
its own spawn.

Switch to effect-based verification: each worker logs its
[enter_time, exit_time] inside the critical section, and the test
asserts the two intervals are disjoint after sorting. A broken lock
would produce overlapping intervals regardless of spawn latency; a
working lock cannot.

Also removed the mp.Queue since we no longer pass timing data back.
This commit is contained in:
Igor Lins e Silva
2026-04-13 19:08:57 -03:00
parent e052074624
commit 1dc20e307b
+50 -28
View File
@@ -64,16 +64,22 @@ from mempalace.searcher import (
# ── mine_lock ──────────────────────────────────────────────────────────── # ── mine_lock ────────────────────────────────────────────────────────────
def _lock_worker(target: str, name: str, hold_seconds: float, queue) -> None: def _lock_worker(target: str, name: str, hold_seconds: float, log_path: str) -> None:
"""Module-level worker for multiprocessing spawn; must be pickle-able.""" """Worker for multiprocessing-spawn concurrency test. Writes its
critical-section enter/exit timestamps to ``log_path`` so the test
can verify the sections did not overlap in time."""
import time as _time
from mempalace.palace import mine_lock as _mine_lock from mempalace.palace import mine_lock as _mine_lock
start = time.time()
with _mine_lock(target): with _mine_lock(target):
elapsed = time.time() - start t_enter = _time.time()
queue.put((name, elapsed)) _time.sleep(hold_seconds)
if hold_seconds > 0: t_exit = _time.time()
time.sleep(hold_seconds) # Append atomically so concurrent writers don't stomp each other.
with open(log_path, "a") as f:
f.write(f"{name} {t_enter} {t_exit}\n")
f.flush()
class TestMineLock: class TestMineLock:
@@ -91,35 +97,51 @@ class TestMineLock:
def test_lock_blocks_concurrent_access(self, tmp_path): def test_lock_blocks_concurrent_access(self, tmp_path):
"""The lock's contract is inter-*process* (multi-agent), not """The lock's contract is inter-*process* (multi-agent), not
inter-thread. Use multiprocessing so the test reflects the real inter-thread. Use multiprocessing so the test reflects the real
use case and is portable: on macOS/BSD, ``fcntl.flock`` is use case and is portable: on macOS/BSD ``fcntl.flock`` is
per-process, so two threads in one process would both acquire — per-process, so two threads would both acquire — a thread-based
a threading-based test would flake there even when the lock is test would flake there even when the lock is correct.
behaving correctly for its intended users."""
Verify mutual exclusion by the effect the critical section
actually has — each worker records its enter/exit timestamps
under the lock, and the test asserts the two intervals do not
overlap. This is robust to spawn-overhead timing, unlike
"second worker waited at least N seconds" which flakes when CI
spawn latency eats into the hold window.
"""
target = str(tmp_path / "concurrent_lock.txt") target = str(tmp_path / "concurrent_lock.txt")
# Use multiprocessing so each worker has its own process. log_path = str(tmp_path / "critical_section.log")
# Use "spawn" to stay consistent across platforms (macOS defaults # Spawn so the same code path runs on every OS (macOS 3.8+ and
# to spawn on 3.8+; Linux defaults to fork). Both work here. # Windows already default to spawn; Linux is fork by default).
ctx = multiprocessing.get_context("spawn") ctx = multiprocessing.get_context("spawn")
queue = ctx.Queue()
p1 = ctx.Process(target=_lock_worker, args=(target, "a", 0.3, queue)) # Each worker holds the lock for HOLD seconds. With real mutual
p2 = ctx.Process(target=_lock_worker, args=(target, "b", 0.0, queue)) # exclusion, the two [enter, exit] intervals must be disjoint.
HOLD = 0.3
p1 = ctx.Process(target=_lock_worker, args=(target, "a", HOLD, log_path))
p2 = ctx.Process(target=_lock_worker, args=(target, "b", HOLD, log_path))
p1.start() p1.start()
time.sleep(0.2) # ensure p1 acquires first
p2.start() p2.start()
p1.join(timeout=10) p1.join(timeout=30)
p2.join(timeout=10) p2.join(timeout=30)
results = [] assert p1.exitcode == 0, f"p1 exited non-zero: {p1.exitcode}"
while not queue.empty(): assert p2.exitcode == 0, f"p2 exited non-zero: {p2.exitcode}"
results.append(queue.get())
assert len(results) == 2, f"both workers should report, got {results}"
# The second worker must have waited until p1 released the lock. # Parse the log: "<name> <enter_ts> <exit_ts>".
wait_times = sorted(r[1] for r in results) intervals = []
with open(log_path) as f:
for line in f:
parts = line.strip().split()
if len(parts) == 3:
intervals.append((parts[0], float(parts[1]), float(parts[2])))
assert len(intervals) == 2, f"expected two critical sections, got {intervals}"
# Sort by entry time and verify the second entry is after the first exit.
intervals.sort(key=lambda iv: iv[1])
(_, enter_a, exit_a), (_, enter_b, exit_b) = intervals
assert ( assert (
wait_times[1] > 0.1 enter_a < exit_a <= enter_b < exit_b
), f"second process should block on mine_lock, waited only {wait_times[1]:.3f}s" ), f"critical sections overlapped — lock failed to serialize: {intervals}"
# ── build_closet_lines ───────────────────────────────────────────────── # ── build_closet_lines ─────────────────────────────────────────────────