ebc26f3960
- Run ruff format on all benchmark files (fixes CI lint job) - Fix check_regression() substring ambiguity: ordered keyword matching so "latency_improvement_pct" is correctly classified as higher-is-better - Update stale comments in conftest.py referencing wrong fixture - Add pytest addopts to skip benchmark/slow/stress markers by default
210 lines
7.4 KiB
Python
210 lines
7.4 KiB
Python
"""
|
|
Memory stack (layers.py) benchmarks.
|
|
|
|
Tests MemoryStack.wake_up(), Layer1.generate(), and Layer2/L3
|
|
at scale. Layer1 has the same unbounded col.get() as tool_status.
|
|
"""
|
|
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from tests.benchmarks.data_generator import PalaceDataGenerator
|
|
from tests.benchmarks.report import record_metric
|
|
|
|
|
|
def _get_rss_mb():
|
|
try:
|
|
import psutil
|
|
|
|
return psutil.Process().memory_info().rss / (1024 * 1024)
|
|
except ImportError:
|
|
import resource
|
|
import platform
|
|
|
|
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
|
if platform.system() == "Darwin":
|
|
return usage / (1024 * 1024)
|
|
return usage / 1024
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
class TestWakeUpCost:
|
|
"""Measure wake_up() time (L0 + L1) at different palace sizes."""
|
|
|
|
SIZES = [500, 1_000, 2_500, 5_000]
|
|
|
|
@pytest.mark.parametrize("n_drawers", SIZES)
|
|
def test_wakeup_latency(self, n_drawers, tmp_path, bench_scale):
|
|
"""L0+L1 generation time grows with palace size because L1 fetches all."""
|
|
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
|
|
palace_path = str(tmp_path / "palace")
|
|
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
|
|
|
|
# Create identity file
|
|
identity_path = str(tmp_path / "identity.txt")
|
|
with open(identity_path, "w") as f:
|
|
f.write("I am a test AI. Traits: precise, fast.\n")
|
|
|
|
from mempalace.layers import MemoryStack
|
|
|
|
stack = MemoryStack(palace_path=palace_path, identity_path=identity_path)
|
|
|
|
latencies = []
|
|
for _ in range(5):
|
|
start = time.perf_counter()
|
|
text = stack.wake_up()
|
|
elapsed_ms = (time.perf_counter() - start) * 1000
|
|
latencies.append(elapsed_ms)
|
|
assert "L0" in text or "L1" in text or "IDENTITY" in text or "ESSENTIAL" in text
|
|
|
|
avg_ms = sum(latencies) / len(latencies)
|
|
record_metric("layers_wakeup", f"avg_ms_at_{n_drawers}", round(avg_ms, 1))
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
class TestLayer1UnboundedFetch:
|
|
"""Layer1.generate() fetches ALL drawers — same pattern as tool_status."""
|
|
|
|
SIZES = [500, 1_000, 2_500, 5_000]
|
|
|
|
@pytest.mark.parametrize("n_drawers", SIZES)
|
|
def test_layer1_rss_growth(self, n_drawers, tmp_path):
|
|
"""Track RSS from Layer1 fetching all drawers at different sizes."""
|
|
gen = PalaceDataGenerator(seed=42, scale="small")
|
|
palace_path = str(tmp_path / "palace")
|
|
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
|
|
|
|
from mempalace.layers import Layer1
|
|
|
|
layer = Layer1(palace_path=palace_path)
|
|
|
|
rss_before = _get_rss_mb()
|
|
start = time.perf_counter()
|
|
text = layer.generate()
|
|
elapsed_ms = (time.perf_counter() - start) * 1000
|
|
rss_after = _get_rss_mb()
|
|
|
|
rss_delta = rss_after - rss_before
|
|
assert "L1" in text
|
|
|
|
record_metric("layer1", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
|
|
record_metric("layer1", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2))
|
|
|
|
def test_layer1_wing_filtered(self, tmp_path):
|
|
"""Wing-filtered Layer1 should fetch fewer drawers."""
|
|
gen = PalaceDataGenerator(seed=42, scale="small")
|
|
palace_path = str(tmp_path / "palace")
|
|
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
|
|
|
|
from mempalace.layers import Layer1
|
|
|
|
wing = gen.wings[0]
|
|
|
|
# Unfiltered
|
|
layer_all = Layer1(palace_path=palace_path)
|
|
start = time.perf_counter()
|
|
layer_all.generate()
|
|
unfiltered_ms = (time.perf_counter() - start) * 1000
|
|
|
|
# Wing-filtered
|
|
layer_wing = Layer1(palace_path=palace_path, wing=wing)
|
|
start = time.perf_counter()
|
|
layer_wing.generate()
|
|
filtered_ms = (time.perf_counter() - start) * 1000
|
|
|
|
record_metric("layer1_filter", "unfiltered_ms", round(unfiltered_ms, 1))
|
|
record_metric("layer1_filter", "filtered_ms", round(filtered_ms, 1))
|
|
if unfiltered_ms > 0:
|
|
record_metric(
|
|
"layer1_filter", "speedup_pct", round((1 - filtered_ms / unfiltered_ms) * 100, 1)
|
|
)
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
class TestWakeUpTokenBudget:
|
|
"""Verify L0+L1 stays within token budget even at large palace sizes."""
|
|
|
|
SIZES = [500, 1_000, 2_500, 5_000]
|
|
|
|
@pytest.mark.parametrize("n_drawers", SIZES)
|
|
def test_token_budget(self, n_drawers, tmp_path):
|
|
"""L1 has MAX_CHARS=3200 cap. Verify it holds at scale."""
|
|
gen = PalaceDataGenerator(seed=42, scale="small")
|
|
palace_path = str(tmp_path / "palace")
|
|
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
|
|
|
|
identity_path = str(tmp_path / "identity.txt")
|
|
with open(identity_path, "w") as f:
|
|
f.write("I am a benchmark AI.\n")
|
|
|
|
from mempalace.layers import MemoryStack
|
|
|
|
stack = MemoryStack(palace_path=palace_path, identity_path=identity_path)
|
|
text = stack.wake_up()
|
|
token_estimate = len(text) // 4
|
|
|
|
# Budget is ~600-900 tokens. Allow up to 1200 for safety margin.
|
|
record_metric("wakeup_budget", f"tokens_at_{n_drawers}", token_estimate)
|
|
record_metric("wakeup_budget", f"chars_at_{n_drawers}", len(text))
|
|
|
|
assert token_estimate < 1200, (
|
|
f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers"
|
|
)
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
class TestLayer2Retrieval:
|
|
"""Layer2 on-demand retrieval with filters."""
|
|
|
|
def test_layer2_latency(self, tmp_path, bench_scale):
|
|
"""L2 retrieval with wing filter at scale."""
|
|
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
|
|
palace_path = str(tmp_path / "palace")
|
|
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
|
|
|
|
from mempalace.layers import Layer2
|
|
|
|
layer = Layer2(palace_path=palace_path)
|
|
wing = gen.wings[0]
|
|
|
|
latencies = []
|
|
for _ in range(10):
|
|
start = time.perf_counter()
|
|
layer.retrieve(wing=wing, n_results=10)
|
|
elapsed_ms = (time.perf_counter() - start) * 1000
|
|
latencies.append(elapsed_ms)
|
|
|
|
avg_ms = sum(latencies) / len(latencies)
|
|
record_metric("layer2", "avg_retrieval_ms", round(avg_ms, 1))
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
class TestLayer3Search:
|
|
"""Layer3 semantic search through the MemoryStack interface."""
|
|
|
|
def test_layer3_latency(self, tmp_path, bench_scale):
|
|
"""L3 search latency through MemoryStack."""
|
|
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
|
|
palace_path = str(tmp_path / "palace")
|
|
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
|
|
|
|
identity_path = str(tmp_path / "identity.txt")
|
|
with open(identity_path, "w") as f:
|
|
f.write("I am a benchmark AI.\n")
|
|
|
|
from mempalace.layers import MemoryStack
|
|
|
|
stack = MemoryStack(palace_path=palace_path, identity_path=identity_path)
|
|
|
|
queries = ["authentication", "database", "deployment", "testing", "monitoring"]
|
|
latencies = []
|
|
for q in queries:
|
|
start = time.perf_counter()
|
|
stack.search(q, n_results=5)
|
|
elapsed_ms = (time.perf_counter() - start) * 1000
|
|
latencies.append(elapsed_ms)
|
|
|
|
avg_ms = sum(latencies) / len(latencies)
|
|
record_metric("layer3", "avg_search_ms", round(avg_ms, 1))
|