From 7b892913345882a6475cf39377cfc4b952f19956 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Tue, 7 Apr 2026 19:39:06 -0300 Subject: [PATCH 1/5] bench: add scale benchmark suite (94 tests) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Benchmark mempalace at configurable scale (1K–100K drawers) to find real-world performance limits. Tests cover MCP tool OOM thresholds, ChromaDB query degradation, search recall@k, mining throughput, knowledge graph concurrency, memory leak detection, palace boost quantification, and Layer1 unbounded fetch behavior. - tests/benchmarks/ with 8 test modules + data generator + report system - Deterministic data factory with planted needles for recall measurement - JSON report output with regression detection (--bench-report flag) - CI benchmark job on PRs at small scale - psutil added as dev dependency for RSS tracking --- .github/workflows/ci.yml | 18 +- pyproject.toml | 10 +- tests/benchmarks/README.md | 136 ++++++ tests/benchmarks/__init__.py | 1 + tests/benchmarks/conftest.py | 146 +++++++ tests/benchmarks/data_generator.py | 395 ++++++++++++++++++ tests/benchmarks/report.py | 91 ++++ tests/benchmarks/test_chromadb_stress.py | 203 +++++++++ tests/benchmarks/test_ingest_bench.py | 165 ++++++++ .../benchmarks/test_knowledge_graph_bench.py | 284 +++++++++++++ tests/benchmarks/test_layers_bench.py | 206 +++++++++ tests/benchmarks/test_mcp_bench.py | 226 ++++++++++ tests/benchmarks/test_memory_profile.py | 178 ++++++++ tests/benchmarks/test_palace_boost.py | 172 ++++++++ tests/benchmarks/test_search_bench.py | 225 ++++++++++ 15 files changed, 2453 insertions(+), 3 deletions(-) create mode 100644 tests/benchmarks/README.md create mode 100644 tests/benchmarks/__init__.py create mode 100644 tests/benchmarks/conftest.py create mode 100644 tests/benchmarks/data_generator.py create mode 100644 tests/benchmarks/report.py create mode 100644 tests/benchmarks/test_chromadb_stress.py create mode 100644 tests/benchmarks/test_ingest_bench.py create mode 100644 tests/benchmarks/test_knowledge_graph_bench.py create mode 100644 tests/benchmarks/test_layers_bench.py create mode 100644 tests/benchmarks/test_mcp_bench.py create mode 100644 tests/benchmarks/test_memory_profile.py create mode 100644 tests/benchmarks/test_palace_boost.py create mode 100644 tests/benchmarks/test_search_bench.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8ccb15e..09ddda2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,23 @@ jobs: with: python-version: ${{ matrix.python-version }} - run: pip install -e ".[dev]" - - run: python -m pytest tests/ -v + - run: python -m pytest tests/ -v --ignore=tests/benchmarks + + benchmark: + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-python@v6 + with: + python-version: "3.11" + - run: pip install -e ".[dev]" + - run: python -m pytest tests/benchmarks/ -v -m "benchmark and not stress and not slow" --bench-scale=small --bench-report=bench-results.json + - uses: actions/upload-artifact@v6 + if: always() + with: + name: benchmark-results + path: bench-results.json lint: runs-on: ubuntu-latest diff --git a/pyproject.toml b/pyproject.toml index 4862873..92e6732 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,11 +38,11 @@ Repository = "https://github.com/milla-jovovich/mempalace" mempalace = "mempalace:main" [project.optional-dependencies] -dev = ["pytest>=7.0", "ruff>=0.4.0"] +dev = ["pytest>=7.0", "ruff>=0.4.0", "psutil>=5.9"] spellcheck = ["autocorrect>=2.0"] [dependency-groups] -dev = ["pytest>=7.0", "ruff>=0.4.0"] +dev = ["pytest>=7.0", "ruff>=0.4.0", "psutil>=5.9"] [build-system] requires = ["hatchling"] @@ -64,3 +64,9 @@ quote-style = "double" [tool.pytest.ini_options] testpaths = ["tests"] +pythonpath = ["."] +markers = [ + "benchmark: scale/performance benchmark tests", + "slow: tests that take more than 30 seconds", + "stress: destructive scale tests (100K+ drawers)", +] diff --git a/tests/benchmarks/README.md b/tests/benchmarks/README.md new file mode 100644 index 0000000..bfa5963 --- /dev/null +++ b/tests/benchmarks/README.md @@ -0,0 +1,136 @@ +# MemPalace Scale Benchmark Suite + +94 tests that benchmark mempalace at scale to validate real-world performance limits. + +## Why + +MemPalace has strong academic scores (96.6% R@5 on LongMemEval) but no empirical data on how it behaves at scale. Key unknowns: + +- `tool_status()` loads ALL metadata into memory — at what palace size does this OOM? +- `PersistentClient` is re-instantiated on every MCP call — what's the overhead? +- Modified files are never re-ingested — what's the skip-check cost at scale? +- How does query latency degrade as the palace grows from 1K to 100K drawers? +- Does wing/room filtering actually improve retrieval, and by how much? + +This suite finds those answers. + +## Quick Start + +```bash +# Fast smoke test (~2 min) +uv run pytest tests/benchmarks/ -v --bench-scale=small -m "benchmark and not slow" + +# Full small scale (~30 min) +uv run pytest tests/benchmarks/ -v --bench-scale=small + +# Medium scale with JSON report +uv run pytest tests/benchmarks/ -v --bench-scale=medium --bench-report=results.json + +# Stress test (local only, very slow) +uv run pytest tests/benchmarks/ -v --bench-scale=stress -m stress +``` + +## Scale Levels + +| Level | Drawers | Wings | Rooms/Wing | KG Triples | Use case | +|---------|---------|-------|------------|------------|---------------------| +| small | 1,000 | 3 | 5 | 200 | CI, quick checks | +| medium | 10,000 | 8 | 12 | 2,000 | Pre-release testing | +| large | 50,000 | 15 | 20 | 10,000 | Scale limit finding | +| stress | 100,000 | 25 | 30 | 50,000 | Breaking point | + +## Test Modules + +### Critical Path + +| File | What it tests | +|------|--------------| +| `test_mcp_bench.py` | MCP tool response times, unbounded metadata fetch, client re-instantiation overhead | +| `test_chromadb_stress.py` | ChromaDB breaking point, query degradation curve, batch vs sequential insert | +| `test_memory_profile.py` | RSS/heap growth over repeated operations, leak detection | + +### Performance Baselines + +| File | What it tests | +|------|--------------| +| `test_ingest_bench.py` | Mining throughput (files/sec, drawers/sec), peak RSS, chunking speed, re-ingest skip overhead | +| `test_search_bench.py` | Query latency vs palace size, recall@k with planted needles, concurrent queries, n_results scaling | + +### Architectural Validation + +| File | What it tests | +|------|--------------| +| `test_palace_boost.py` | Retrieval improvement from wing/room filtering at different scales | +| `test_knowledge_graph_bench.py` | Triple insertion rate, temporal query accuracy, SQLite concurrent access | +| `test_layers_bench.py` | MemoryStack wake-up cost, Layer1 unbounded fetch, token budget compliance | + +## Architecture + +``` +tests/benchmarks/ + conftest.py # --bench-scale / --bench-report CLI options, fixtures, markers + data_generator.py # Deterministic data factory (seeded RNG, planted needles) + report.py # JSON report writer + regression checker + test_*.py # 8 test modules (94 tests total) +``` + +### Data Generator + +`PalaceDataGenerator(seed=42, scale="small")` produces deterministic, realistic test data: + +- **`generate_project_tree()`** — writes real files + `mempalace.yaml` for `mine()` to ingest +- **`populate_palace_directly()`** — bypasses mining, inserts directly into ChromaDB (10-100x faster for search/MCP benchmarks) +- **`generate_kg_triples()`** — entity-relationship triples with temporal validity +- **`generate_search_queries()`** — queries with known-good answers for recall measurement + +**Planted needles**: Unique identifiable content (e.g., `NEEDLE_0042: PostgreSQL vacuum autovacuum threshold...`) seeded into specific wings/rooms. Search queries target these needles, enabling recall@k measurement without an LLM judge. + +### JSON Reports + +When run with `--bench-report=path.json`, produces machine-readable output: + +```json +{ + "timestamp": "2026-04-07T...", + "git_sha": "abc123", + "scale": "small", + "system": {"os": "linux", "cpu_count": 8}, + "results": { + "mcp_status": {"latency_ms_at_1000": 45.2, "rss_delta_mb_at_5000": 12.3}, + "search": {"avg_latency_ms_at_5000": 23.1, "recall_at_5": 0.92}, + "chromadb_insert": {"sequential_ms": 8500, "batched_ms": 1200, "speedup_ratio": 7.1} + } +} +``` + +### Regression Detection + +```python +from tests.benchmarks.report import check_regression + +regressions = check_regression("current.json", "baseline.json", threshold=0.2) +# Returns list of metric descriptions that degraded beyond 20% +``` + +## CI Integration + +The GitHub Actions workflow runs benchmarks on PRs at small scale: + +```yaml +benchmark: + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + # Runs: pytest tests/benchmarks/ -m "benchmark and not stress and not slow" --bench-scale=small +``` + +Existing unit tests are isolated with `--ignore=tests/benchmarks`. + +## Markers + +- `@pytest.mark.benchmark` — all benchmark tests +- `@pytest.mark.slow` — tests taking >30s even at small scale +- `@pytest.mark.stress` — tests that should only run at large/stress scale + +## Dependencies + +Only one new dependency beyond the existing dev stack: `psutil` (for cross-platform RSS measurement). `tracemalloc` and `resource` are stdlib. diff --git a/tests/benchmarks/__init__.py b/tests/benchmarks/__init__.py new file mode 100644 index 0000000..0ef3255 --- /dev/null +++ b/tests/benchmarks/__init__.py @@ -0,0 +1 @@ +# MemPalace scale benchmark suite diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py new file mode 100644 index 0000000..4f9fcfe --- /dev/null +++ b/tests/benchmarks/conftest.py @@ -0,0 +1,146 @@ +"""Benchmark-specific pytest configuration, fixtures, and CLI options.""" + +import json +import os +import shutil +import tempfile + +import pytest + + +SCALE_OPTIONS = ["small", "medium", "large", "stress"] + + +def pytest_addoption(parser): + parser.addoption( + "--bench-scale", + default="small", + choices=SCALE_OPTIONS, + help="Scale level for benchmark tests: small (1K), medium (10K), large (50K), stress (100K)", + ) + parser.addoption( + "--bench-report", + default=None, + help="Path for JSON benchmark report output", + ) + + +@pytest.fixture(scope="session") +def bench_scale(request): + """The configured benchmark scale level.""" + return request.config.getoption("--bench-scale") + + +@pytest.fixture(scope="session") +def bench_report_path(request): + """Path for JSON report output, or None.""" + return request.config.getoption("--bench-report") + + +@pytest.fixture +def palace_dir(tmp_path): + """Isolated palace directory for a single test.""" + p = tmp_path / "palace" + p.mkdir() + return str(p) + + +@pytest.fixture +def kg_db(tmp_path): + """Isolated KG SQLite path for a single test.""" + return str(tmp_path / "test_kg.sqlite3") + + +@pytest.fixture +def config_dir(tmp_path): + """Isolated config directory for monkeypatching MempalaceConfig.""" + d = tmp_path / "config" + d.mkdir() + config = {"palace_path": str(tmp_path / "palace"), "collection_name": "mempalace_drawers"} + with open(d / "config.json", "w") as f: + json.dump(config, f) + return str(d) + + +@pytest.fixture +def project_dir(tmp_path): + """Temporary project directory for mining tests.""" + d = tmp_path / "project" + d.mkdir() + return d + + +# ── Session-scoped result collector ────────────────────────────────────── + + +class BenchmarkResults: + """Collect benchmark metrics across all tests in a session.""" + + def __init__(self): + self.results = {} + + def record(self, category: str, metric: str, value): + if category not in self.results: + self.results[category] = {} + self.results[category][metric] = value + + +@pytest.fixture(scope="session") +def bench_results(): + """Session-scoped results collector shared by all benchmark tests.""" + return BenchmarkResults() + + +def pytest_terminal_summary(terminalreporter, config): + """Write JSON benchmark report after all tests complete.""" + report_path = config.getoption("--bench-report", default=None) + if not report_path: + return + + # Collect results from the session fixture if available + # The results are written by individual tests via bench_results fixture + import platform + import subprocess + + try: + git_sha = subprocess.check_output( + ["git", "rev-parse", "--short", "HEAD"], text=True, stderr=subprocess.DEVNULL + ).strip() + except Exception: + git_sha = "unknown" + + try: + import chromadb + + chromadb_version = chromadb.__version__ + except Exception: + chromadb_version = "unknown" + + report = { + "timestamp": __import__("datetime").datetime.now().isoformat(), + "git_sha": git_sha, + "python_version": platform.python_version(), + "chromadb_version": chromadb_version, + "scale": config.getoption("--bench-scale", default="small"), + "system": { + "os": platform.system().lower(), + "cpu_count": os.cpu_count(), + "platform": platform.platform(), + }, + "results": {}, + } + + # Read results from a temp file written by the bench_results fixture + results_file = os.path.join(tempfile.gettempdir(), "mempalace_bench_results.json") + if os.path.exists(results_file): + try: + with open(results_file) as f: + report["results"] = json.load(f) + os.unlink(results_file) + except Exception: + pass + + os.makedirs(os.path.dirname(os.path.abspath(report_path)), exist_ok=True) + with open(report_path, "w") as f: + json.dump(report, f, indent=2) + terminalreporter.write_line(f"\nBenchmark report written to: {report_path}") diff --git a/tests/benchmarks/data_generator.py b/tests/benchmarks/data_generator.py new file mode 100644 index 0000000..8d9a359 --- /dev/null +++ b/tests/benchmarks/data_generator.py @@ -0,0 +1,395 @@ +""" +Deterministic data factory for MemPalace scale benchmarks. + +Generates realistic project files, conversations, and KG triples at +configurable scale levels. All randomness uses seeded RNG for reproducibility. + +Planted "needle" drawers enable recall measurement without an LLM judge. +""" + +import hashlib +import os +import random +import string +from datetime import datetime, timedelta +from pathlib import Path + +import chromadb +import yaml + + +# ── Scale configurations ───────────────────────────────────────────────── + +SCALE_CONFIGS = { + "small": {"drawers": 1_000, "wings": 3, "rooms_per_wing": 5, "kg_entities": 50, "kg_triples": 200, "needles": 20, "search_queries": 20}, + "medium": {"drawers": 10_000, "wings": 8, "rooms_per_wing": 12, "kg_entities": 200, "kg_triples": 2_000, "needles": 50, "search_queries": 50}, + "large": {"drawers": 50_000, "wings": 15, "rooms_per_wing": 20, "kg_entities": 500, "kg_triples": 10_000, "needles": 100, "search_queries": 100}, + "stress": {"drawers": 100_000, "wings": 25, "rooms_per_wing": 30, "kg_entities": 1_000, "kg_triples": 50_000, "needles": 200, "search_queries": 200}, +} + +# ── Vocabulary banks for realistic content ─────────────────────────────── + +WING_NAMES = [ + "webapp", "backend_api", "mobile_app", "data_pipeline", "ml_platform", + "devops", "auth_service", "payments", "analytics", "docs_site", + "cli_tool", "dashboard", "notification_service", "search_engine", + "user_mgmt", "inventory", "reporting", "testing_infra", "monitoring", + "email_service", "chat_bot", "file_storage", "scheduler", "gateway", + "marketplace", +] + +ROOM_NAMES = [ + "backend", "frontend", "api", "database", "auth", "tests", "docs", + "config", "deployment", "models", "views", "controllers", "middleware", + "utils", "schemas", "migrations", "fixtures", "scripts", "styles", + "components", "hooks", "services", "routes", "templates", "static", + "media", "logging", "cache", "queue", "workers", +] + +TECH_TERMS = [ + "authentication", "authorization", "middleware", "endpoint", "REST API", + "GraphQL", "WebSocket", "database migration", "ORM", "query optimization", + "caching strategy", "load balancer", "rate limiting", "pagination", + "serialization", "validation", "error handling", "logging framework", + "monitoring", "deployment pipeline", "CI/CD", "containerization", + "microservice", "event sourcing", "message queue", "pub/sub", + "connection pooling", "session management", "token refresh", "CORS", + "SSL termination", "health check", "circuit breaker", "retry logic", + "batch processing", "stream processing", "data pipeline", "ETL", + "feature flag", "A/B testing", "blue-green deployment", "canary release", +] + +CODE_SNIPPETS = [ + "def process_request(data):\n validated = schema.validate(data)\n result = handler.execute(validated)\n return Response(result, status=200)\n", + "class UserRepository:\n def __init__(self, db):\n self.db = db\n def find_by_id(self, user_id):\n return self.db.query(User).filter(User.id == user_id).first()\n", + "async def fetch_data(url, timeout=30):\n async with aiohttp.ClientSession() as session:\n async with session.get(url, timeout=timeout) as resp:\n return await resp.json()\n", + "const handleSubmit = async (formData) => {\n try {\n const response = await api.post('/users', formData);\n dispatch({ type: 'USER_CREATED', payload: response.data });\n } catch (error) {\n setError(error.message);\n }\n};\n", + "SELECT u.name, COUNT(o.id) as order_count\nFROM users u\nLEFT JOIN orders o ON u.id = o.user_id\nWHERE u.created_at > '2025-01-01'\nGROUP BY u.name\nHAVING COUNT(o.id) > 5\nORDER BY order_count DESC;\n", +] + +PROSE_TEMPLATES = [ + "The {component} module handles {task}. It was refactored in {month} to improve {quality}. Key design decision: {decision}.", + "Bug report: {component} fails when {condition}. Root cause: {cause}. Fixed by {fix}. Regression test added in {test_file}.", + "Architecture decision: switched from {old_tech} to {new_tech} for {reason}. Migration completed {date}. Performance improved by {percent}%.", + "Meeting notes: discussed {topic} with {person}. Agreed to {action}. Deadline: {deadline}. Follow-up: {followup}.", + "Feature spec: {feature_name} allows users to {capability}. Dependencies: {deps}. Estimated effort: {effort} days.", +] + +ENTITY_NAMES = [ + "Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Heidi", + "Ivan", "Judy", "Karl", "Linda", "Mike", "Nina", "Oscar", "Pat", + "Quinn", "Rita", "Steve", "Tina", "Ursula", "Victor", "Wendy", "Xander", +] + +ENTITY_TYPES = ["person", "project", "tool", "concept", "team", "service"] + +PREDICATES = [ + "works_on", "manages", "reports_to", "collaborates_with", "created", + "maintains", "uses", "depends_on", "replaced", "reviewed", "deployed", + "tested", "documented", "mentors", "leads", "contributes_to", +] + + +class PalaceDataGenerator: + """Generate deterministic, realistic test data at configurable scale.""" + + def __init__(self, seed=42, scale="small"): + self.rng = random.Random(seed) + self.scale = scale + self.cfg = SCALE_CONFIGS[scale] + self.wings = WING_NAMES[: self.cfg["wings"]] + self.rooms_by_wing = {} + for wing in self.wings: + n = self.cfg["rooms_per_wing"] + rooms = self.rng.sample(ROOM_NAMES, min(n, len(ROOM_NAMES))) + self.rooms_by_wing[wing] = rooms + # Planted needles for recall measurement + self.needles = [] + self._generate_needles() + + def _generate_needles(self): + """Create unique needle content for recall testing.""" + topics = [ + "Fibonacci sequence optimization uses memoization with O(n) space complexity", + "PostgreSQL vacuum autovacuum threshold set to 50 percent for table users", + "Redis cluster failover timeout configured at 30 seconds with sentinel monitoring", + "Kubernetes horizontal pod autoscaler targets 70 percent CPU utilization", + "GraphQL subscription uses WebSocket transport with heartbeat interval 25 seconds", + "JWT token rotation policy requires refresh every 15 minutes with sliding window", + "Elasticsearch index sharding strategy uses 5 primary shards with 1 replica each", + "Docker multi-stage build reduces image size from 1.2GB to 180MB for production", + "Apache Kafka consumer group rebalance timeout set to 45 seconds", + "MongoDB change streams resume token persisted every 100 operations", + "gRPC streaming uses bidirectional flow control with 64KB window size", + "Prometheus alerting rule fires when p99 latency exceeds 500ms for 5 minutes", + "Terraform state locking uses DynamoDB with consistent reads enabled", + "Nginx rate limiting configured at 100 requests per second with burst of 50", + "SQLAlchemy connection pool size set to 20 with max overflow of 10 connections", + "React concurrent mode uses startTransition for non-urgent state updates", + "AWS Lambda cold start mitigation uses provisioned concurrency of 10 instances", + "Git bisect automated with custom test script for regression hunting", + "OpenTelemetry trace sampling rate set to 10 percent in production environment", + "Celery worker prefetch multiplier set to 1 for fair task distribution", + ] + for i in range(self.cfg["needles"]): + topic = topics[i % len(topics)] + wing = self.rng.choice(self.wings) + room = self.rng.choice(self.rooms_by_wing[wing]) + needle_id = f"NEEDLE_{i:04d}" + content = f"{needle_id}: {topic}. This is a unique planted needle for recall benchmarking at scale." + self.needles.append({ + "id": needle_id, + "content": content, + "wing": wing, + "room": room, + "query": topic.split(" uses ")[0] if " uses " in topic else topic.split(" set to ")[0] if " set to " in topic else topic[:60], + }) + + def _random_text(self, min_chars=600, max_chars=900): + """Generate a random text block of realistic content.""" + parts = [] + total = 0 + target = self.rng.randint(min_chars, max_chars) + while total < target: + choice = self.rng.random() + if choice < 0.3: + text = self.rng.choice(CODE_SNIPPETS) + elif choice < 0.7: + template = self.rng.choice(PROSE_TEMPLATES) + text = template.format( + component=self.rng.choice(ROOM_NAMES), + task=self.rng.choice(TECH_TERMS), + month=self.rng.choice(["January", "February", "March", "April", "May"]), + quality=self.rng.choice(["performance", "readability", "test coverage", "latency"]), + decision=self.rng.choice(TECH_TERMS), + condition=self.rng.choice(TECH_TERMS) + " is null", + cause=self.rng.choice(["race condition", "null pointer", "timeout", "OOM"]), + fix="adding " + self.rng.choice(TECH_TERMS), + test_file=f"test_{self.rng.choice(ROOM_NAMES)}.py", + old_tech=self.rng.choice(["MySQL", "Flask", "REST", "Jenkins"]), + new_tech=self.rng.choice(["PostgreSQL", "FastAPI", "GraphQL", "GitHub Actions"]), + reason=self.rng.choice(TECH_TERMS), + date=f"2025-{self.rng.randint(1,12):02d}-{self.rng.randint(1,28):02d}", + percent=self.rng.randint(10, 80), + topic=self.rng.choice(TECH_TERMS), + person=self.rng.choice(ENTITY_NAMES), + action=self.rng.choice(["refactor", "migrate", "optimize", "test"]), + deadline=f"2025-{self.rng.randint(1,12):02d}-{self.rng.randint(1,28):02d}", + followup=self.rng.choice(TECH_TERMS), + feature_name=self.rng.choice(TECH_TERMS), + capability=self.rng.choice(TECH_TERMS), + deps=", ".join(self.rng.sample(TECH_TERMS, 2)), + effort=self.rng.randint(1, 15), + ) + else: + words = self.rng.sample(TECH_TERMS, min(5, len(TECH_TERMS))) + text = " ".join(words) + ". " + self.rng.choice(TECH_TERMS) + " implementation details follow.\n" + parts.append(text) + total += len(text) + return "\n".join(parts)[:max_chars] + + # ── Project tree generation (for mine() tests) ─────────────────────── + + def generate_project_tree(self, base_path, wing=None, rooms=None, n_files=50): + """ + Write realistic project files + mempalace.yaml to base_path. + + Returns the project path suitable for passing to mine(). + """ + base = Path(base_path) + base.mkdir(parents=True, exist_ok=True) + wing = wing or self.rng.choice(self.wings) + rooms = rooms or self.rooms_by_wing.get(wing, ["general"]) + + # Write mempalace.yaml + room_defs = [{"name": r, "description": f"{r} code and docs"} for r in rooms] + with open(base / "mempalace.yaml", "w") as f: + yaml.dump({"wing": wing, "rooms": room_defs}, f) + + # Write files distributed across room directories + files_written = 0 + for i in range(n_files): + room = rooms[i % len(rooms)] + room_dir = base / room + room_dir.mkdir(parents=True, exist_ok=True) + + ext = self.rng.choice([".py", ".js", ".md", ".ts", ".yaml"]) + filename = f"file_{i:04d}{ext}" + content = self._random_text(400, 2000) + (room_dir / filename).write_text(content, encoding="utf-8") + files_written += 1 + + return str(base), wing, rooms, files_written + + # ── Conversation file generation (for mine_convos() tests) ─────────── + + def generate_conversation_files(self, base_path, wing=None, n_files=20): + """Write conversation transcript files for convo_miner tests.""" + base = Path(base_path) + base.mkdir(parents=True, exist_ok=True) + wing = wing or self.rng.choice(self.wings) + + for i in range(n_files): + lines = [] + n_exchanges = self.rng.randint(5, 20) + for j in range(n_exchanges): + user_msg = f"> User: {self.rng.choice(TECH_TERMS)}? How does {self.rng.choice(TECH_TERMS)} work with {self.rng.choice(TECH_TERMS)}?" + ai_msg = self._random_text(200, 600) + lines.append(user_msg) + lines.append(ai_msg) + lines.append("") + + (base / f"convo_{i:04d}.txt").write_text("\n".join(lines), encoding="utf-8") + + return str(base), wing + + # ── Direct palace population (bypasses mining for speed) ───────────── + + def populate_palace_directly(self, palace_path, n_drawers=None, include_needles=True): + """ + Insert drawers directly into ChromaDB, bypassing the mining pipeline. + + Much faster than mining for benchmarks that only care about + search/MCP behavior on a pre-populated palace. + + Returns (client, collection, needle_info). + """ + n_drawers = n_drawers or self.cfg["drawers"] + os.makedirs(palace_path, exist_ok=True) + client = chromadb.PersistentClient(path=palace_path) + col = client.get_or_create_collection("mempalace_drawers") + + batch_size = 500 + docs = [] + ids = [] + metas = [] + + # Insert needles first + needle_info = [] + if include_needles: + for needle in self.needles: + needle_id = f"drawer_{needle['wing']}_{needle['room']}_{hashlib.md5(needle['id'].encode()).hexdigest()[:16]}" + docs.append(needle["content"]) + ids.append(needle_id) + metas.append({ + "wing": needle["wing"], + "room": needle["room"], + "source_file": f"needle_{needle['id']}.txt", + "chunk_index": 0, + "added_by": "benchmark", + "filed_at": datetime.now().isoformat(), + }) + needle_info.append({"id": needle_id, "query": needle["query"], "wing": needle["wing"], "room": needle["room"]}) + + # Fill remaining drawers with realistic content + remaining = n_drawers - len(docs) + for i in range(remaining): + wing = self.wings[i % len(self.wings)] + rooms = self.rooms_by_wing[wing] + room = rooms[i % len(rooms)] + content = self._random_text(400, 800) + drawer_id = f"drawer_{wing}_{room}_{hashlib.md5(f'gen_{i}'.encode()).hexdigest()[:16]}" + + docs.append(content) + ids.append(drawer_id) + metas.append({ + "wing": wing, + "room": room, + "source_file": f"generated_{i:06d}.txt", + "chunk_index": i % 10, + "added_by": "benchmark", + "filed_at": datetime.now().isoformat(), + }) + + # Flush in batches + if len(docs) >= batch_size: + col.add(documents=docs, ids=ids, metadatas=metas) + docs, ids, metas = [], [], [] + + # Flush remainder + if docs: + col.add(documents=docs, ids=ids, metadatas=metas) + + return client, col, needle_info + + # ── KG triple generation ───────────────────────────────────────────── + + def generate_kg_triples(self, n_entities=None, n_triples=None): + """ + Generate realistic entity-relationship triples. + + Returns (entities, triples) where: + entities = [(name, type), ...] + triples = [(subject, predicate, object, valid_from, valid_to), ...] + """ + n_entities = n_entities or self.cfg["kg_entities"] + n_triples = n_triples or self.cfg["kg_triples"] + + # Generate entities + entities = [] + entity_names = [] + for i in range(n_entities): + if i < len(ENTITY_NAMES): + name = ENTITY_NAMES[i] + else: + name = f"Entity_{i:04d}" + etype = self.rng.choice(ENTITY_TYPES) + entities.append((name, etype)) + entity_names.append(name) + + # Generate triples + triples = [] + base_date = datetime(2024, 1, 1) + for i in range(n_triples): + subject = self.rng.choice(entity_names) + obj = self.rng.choice(entity_names) + while obj == subject: + obj = self.rng.choice(entity_names) + predicate = self.rng.choice(PREDICATES) + days_offset = self.rng.randint(0, 730) + valid_from = (base_date + timedelta(days=days_offset)).strftime("%Y-%m-%d") + # 30% chance of having a valid_to + valid_to = None + if self.rng.random() < 0.3: + end_offset = self.rng.randint(30, 365) + valid_to = (base_date + timedelta(days=days_offset + end_offset)).strftime("%Y-%m-%d") + triples.append((subject, predicate, obj, valid_from, valid_to)) + + return entities, triples + + # ── Search query generation ────────────────────────────────────────── + + def generate_search_queries(self, n_queries=None): + """ + Generate search queries with expected results. + + Returns list of {"query": str, "expected_wing": str|None, "expected_room": str|None, "is_needle": bool}. + Needle queries have known-good answers for recall measurement. + """ + n_queries = n_queries or self.cfg["search_queries"] + queries = [] + + # Half are needle queries (known-good answers) + n_needle = min(n_queries // 2, len(self.needles)) + for needle in self.needles[:n_needle]: + queries.append({ + "query": needle["query"], + "expected_wing": needle["wing"], + "expected_room": needle["room"], + "needle_id": needle["id"], + "is_needle": True, + }) + + # Other half are generic queries (measure latency, not recall) + n_generic = n_queries - n_needle + for _ in range(n_generic): + queries.append({ + "query": self.rng.choice(TECH_TERMS) + " " + self.rng.choice(TECH_TERMS), + "expected_wing": None, + "expected_room": None, + "needle_id": None, + "is_needle": False, + }) + + self.rng.shuffle(queries) + return queries diff --git a/tests/benchmarks/report.py b/tests/benchmarks/report.py new file mode 100644 index 0000000..8defc5a --- /dev/null +++ b/tests/benchmarks/report.py @@ -0,0 +1,91 @@ +""" +Benchmark report utilities — JSON output and regression detection. + +Each test records metrics via record_metric(). At session end, the +conftest.py pytest_terminal_summary hook writes the collected results. +""" + +import json +import os +import tempfile +from datetime import datetime + + +RESULTS_FILE = os.path.join(tempfile.gettempdir(), "mempalace_bench_results.json") + + +def record_metric(category: str, metric: str, value): + """Append a metric to the session results file (JSON on disk).""" + results = {} + if os.path.exists(RESULTS_FILE): + try: + with open(RESULTS_FILE) as f: + results = json.load(f) + except (json.JSONDecodeError, OSError): + results = {} + + if category not in results: + results[category] = {} + results[category][metric] = value + + with open(RESULTS_FILE, "w") as f: + json.dump(results, f, indent=2) + + +def check_regression(current_report: str, baseline_report: str, threshold: float = 0.2): + """ + Compare current benchmark results against a baseline. + + Returns a list of regression descriptions. Empty list = no regressions. + + threshold: fractional degradation allowed (0.2 = 20% worse is OK). + """ + with open(current_report) as f: + current = json.load(f) + with open(baseline_report) as f: + baseline = json.load(f) + + regressions = [] + # Metrics where HIGHER is worse (latency, memory, etc.) + higher_is_worse = { + "latency", "rss", "memory", "oom", "lock_failures", "elapsed", + "p50_ms", "p95_ms", "p99_ms", "rss_delta_mb", "peak_rss_mb", + } + # Metrics where LOWER is worse (throughput, recall, etc.) + lower_is_worse = { + "recall", "throughput", "per_sec", "files_per_sec", "drawers_per_sec", + "triples_per_sec", "improvement", + } + + for category in baseline.get("results", {}): + if category not in current.get("results", {}): + continue + for metric, base_val in baseline["results"][category].items(): + if metric not in current["results"][category]: + continue + curr_val = current["results"][category][metric] + if not isinstance(base_val, (int, float)) or not isinstance(curr_val, (int, float)): + continue + if base_val == 0: + continue + + # Determine direction + is_latency_like = any(kw in metric.lower() for kw in higher_is_worse) + is_throughput_like = any(kw in metric.lower() for kw in lower_is_worse) + + if is_latency_like: + # Higher is worse — check if current exceeds baseline by threshold + if curr_val > base_val * (1 + threshold): + pct = ((curr_val - base_val) / base_val) * 100 + regressions.append( + f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold*100:.0f}%)" + ) + elif is_throughput_like: + # Lower is worse — check if current is below baseline by threshold + if curr_val < base_val * (1 - threshold): + pct = ((curr_val - base_val) / base_val) * 100 + regressions.append( + f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold*100:.0f}%)" + ) + + return regressions diff --git a/tests/benchmarks/test_chromadb_stress.py b/tests/benchmarks/test_chromadb_stress.py new file mode 100644 index 0000000..4e998a0 --- /dev/null +++ b/tests/benchmarks/test_chromadb_stress.py @@ -0,0 +1,203 @@ +""" +ChromaDB stress tests — find the breaking point. + +Tests the raw ChromaDB patterns used by mempalace to determine: + - At what collection size does col.get(include=["metadatas"]) become dangerous? + - How does query latency degrade as collection grows? + - How much faster is batched insertion vs sequential? +""" + +import os +import time + +import chromadb +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +def _get_rss_mb(): + try: + import psutil + + return psutil.Process().memory_info().rss / (1024 * 1024) + except ImportError: + import resource + import platform + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +@pytest.mark.benchmark +class TestGetAllMetadatasOOM: + """ + The specific pattern causing finding #3: + col.get(include=["metadatas"]) with NO limit. + + Measures RSS growth to find when this becomes dangerous. + """ + + SIZES = [1_000, 2_500, 5_000, 10_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_get_all_metadatas_rss(self, n_drawers, tmp_path, bench_scale): + """RSS growth from fetching all metadata at once.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + + client = chromadb.PersistentClient(path=palace_path) + col = client.get_collection("mempalace_drawers") + + rss_before = _get_rss_mb() + start = time.perf_counter() + all_meta = col.get(include=["metadatas"])["metadatas"] + elapsed_ms = (time.perf_counter() - start) * 1000 + rss_after = _get_rss_mb() + + assert len(all_meta) == n_drawers + rss_delta = rss_after - rss_before + + record_metric("chromadb_get_all", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2)) + record_metric("chromadb_get_all", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1)) + + +@pytest.mark.benchmark +class TestQueryDegradation: + """Measure query latency as collection grows.""" + + SIZES = [1_000, 2_500, 5_000, 10_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_query_latency_at_size(self, n_drawers, tmp_path, bench_scale): + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + + client = chromadb.PersistentClient(path=palace_path) + col = client.get_collection("mempalace_drawers") + + queries = [ + "authentication middleware optimization", + "database connection pooling strategy", + "error handling retry logic", + "deployment pipeline configuration", + "load balancer health check", + ] + + latencies = [] + for q in queries: + start = time.perf_counter() + results = col.query(query_texts=[q], n_results=5, include=["documents", "distances"]) + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + assert results["documents"][0] # got results + + avg_ms = sum(latencies) / len(latencies) + p95_ms = sorted(latencies)[int(len(latencies) * 0.95)] + + record_metric("chromadb_query", f"avg_latency_ms_at_{n_drawers}", round(avg_ms, 1)) + record_metric("chromadb_query", f"p95_latency_ms_at_{n_drawers}", round(p95_ms, 1)) + + +@pytest.mark.benchmark +class TestBulkInsertPerformance: + """Compare batch insertion vs sequential add_drawer pattern.""" + + def test_sequential_vs_batched(self, tmp_path): + """The current miner uses single-document add(). How much faster is batching?""" + n_docs = 500 + gen = PalaceDataGenerator(seed=42) + + # Generate content + contents = [gen._random_text(400, 800) for _ in range(n_docs)] + + # Sequential insertion (mimics add_drawer pattern) + palace_seq = str(tmp_path / "seq") + os.makedirs(palace_seq) + client_seq = chromadb.PersistentClient(path=palace_seq) + col_seq = client_seq.get_or_create_collection("mempalace_drawers") + + start = time.perf_counter() + for i, content in enumerate(contents): + col_seq.add( + documents=[content], + ids=[f"seq_{i}"], + metadatas=[{"wing": "test", "room": "bench", "chunk_index": i}], + ) + sequential_ms = (time.perf_counter() - start) * 1000 + + # Batched insertion + palace_batch = str(tmp_path / "batch") + os.makedirs(palace_batch) + client_batch = chromadb.PersistentClient(path=palace_batch) + col_batch = client_batch.get_or_create_collection("mempalace_drawers") + + batch_size = 100 + start = time.perf_counter() + for batch_start in range(0, n_docs, batch_size): + batch_end = min(batch_start + batch_size, n_docs) + batch_docs = contents[batch_start:batch_end] + batch_ids = [f"batch_{i}" for i in range(batch_start, batch_end)] + batch_metas = [{"wing": "test", "room": "bench", "chunk_index": i} for i in range(batch_start, batch_end)] + col_batch.add(documents=batch_docs, ids=batch_ids, metadatas=batch_metas) + batched_ms = (time.perf_counter() - start) * 1000 + + speedup = sequential_ms / max(batched_ms, 0.01) + + assert col_seq.count() == n_docs + assert col_batch.count() == n_docs + + record_metric("chromadb_insert", "sequential_ms", round(sequential_ms, 1)) + record_metric("chromadb_insert", "batched_ms", round(batched_ms, 1)) + record_metric("chromadb_insert", "speedup_ratio", round(speedup, 2)) + record_metric("chromadb_insert", "n_docs", n_docs) + record_metric("chromadb_insert", "batch_size", batch_size) + + +@pytest.mark.benchmark +@pytest.mark.slow +class TestMaxCollectionSize: + """Incrementally grow collection to find practical limits.""" + + def test_incremental_growth(self, tmp_path, bench_scale): + """Add drawers in batches, measure latency per batch.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + cfg = gen.cfg + target = min(cfg["drawers"], 10_000) # cap at 10K for this test + + palace_path = str(tmp_path / "palace") + os.makedirs(palace_path) + client = chromadb.PersistentClient(path=palace_path) + col = client.get_or_create_collection("mempalace_drawers") + + batch_size = 500 + batch_times = [] + total_inserted = 0 + + for batch_num in range(0, target, batch_size): + n = min(batch_size, target - batch_num) + docs = [gen._random_text(400, 800) for _ in range(n)] + ids = [f"growth_{batch_num + i}" for i in range(n)] + metas = [ + {"wing": gen.wings[i % len(gen.wings)], "room": "bench", "chunk_index": i} + for i in range(batch_num, batch_num + n) + ] + + start = time.perf_counter() + col.add(documents=docs, ids=ids, metadatas=metas) + batch_ms = (time.perf_counter() - start) * 1000 + total_inserted += n + batch_times.append({"at_size": total_inserted, "batch_ms": round(batch_ms, 1)}) + + assert col.count() == total_inserted + + # Record first and last batch times to show degradation + record_metric("chromadb_growth", "first_batch_ms", batch_times[0]["batch_ms"]) + record_metric("chromadb_growth", "last_batch_ms", batch_times[-1]["batch_ms"]) + record_metric("chromadb_growth", "total_inserted", total_inserted) + record_metric("chromadb_growth", "batch_times", batch_times) diff --git a/tests/benchmarks/test_ingest_bench.py b/tests/benchmarks/test_ingest_bench.py new file mode 100644 index 0000000..c9179fd --- /dev/null +++ b/tests/benchmarks/test_ingest_bench.py @@ -0,0 +1,165 @@ +""" +Ingestion throughput benchmarks. + +Measures mining performance at scale: + - Files/sec and drawers/sec through the full mine() pipeline + - Peak RSS during mining + - Chunking throughput isolated from ChromaDB + - Re-ingest skip overhead (finding #11: file_already_mined check) +""" + +import os +import time + +import chromadb +import pytest +import yaml + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +def _get_rss_mb(): + try: + import psutil + + return psutil.Process().memory_info().rss / (1024 * 1024) + except ImportError: + import resource + import platform + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +@pytest.mark.benchmark +class TestMineThroughput: + """Measure the full mine() pipeline throughput.""" + + @pytest.mark.parametrize("n_files", [20, 50, 100]) + def test_mine_files_per_second(self, n_files, tmp_path, bench_scale): + """End-to-end mining throughput: generate files, mine, count drawers.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + project_path, wing, rooms, files_written = gen.generate_project_tree( + tmp_path / "project", n_files=n_files + ) + palace_path = str(tmp_path / "palace") + + from mempalace.miner import mine + + start = time.perf_counter() + mine(project_path, palace_path) + elapsed = time.perf_counter() - start + + client = chromadb.PersistentClient(path=palace_path) + col = client.get_collection("mempalace_drawers") + drawer_count = col.count() + + files_per_sec = files_written / max(elapsed, 0.001) + drawers_per_sec = drawer_count / max(elapsed, 0.001) + + record_metric("ingest", f"files_per_sec_at_{n_files}", round(files_per_sec, 1)) + record_metric("ingest", f"drawers_per_sec_at_{n_files}", round(drawers_per_sec, 1)) + record_metric("ingest", f"elapsed_sec_at_{n_files}", round(elapsed, 2)) + record_metric("ingest", f"drawers_created_at_{n_files}", drawer_count) + + def test_mine_peak_rss(self, tmp_path, bench_scale): + """Track peak RSS during a mining run.""" + import threading + + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + project_path, wing, rooms, files_written = gen.generate_project_tree( + tmp_path / "project", n_files=100 + ) + palace_path = str(tmp_path / "palace") + + from mempalace.miner import mine + + rss_samples = [] + stop_sampling = threading.Event() + + def sample_rss(): + while not stop_sampling.is_set(): + rss_samples.append(_get_rss_mb()) + stop_sampling.wait(0.1) + + sampler = threading.Thread(target=sample_rss, daemon=True) + sampler.start() + + rss_before = _get_rss_mb() + mine(project_path, palace_path) + stop_sampling.set() + sampler.join(timeout=1) + + peak_rss = max(rss_samples) if rss_samples else _get_rss_mb() + rss_delta = peak_rss - rss_before + + record_metric("ingest", "peak_rss_mb", round(peak_rss, 1)) + record_metric("ingest", "rss_delta_mb", round(rss_delta, 1)) + + +@pytest.mark.benchmark +class TestChunkThroughput: + """Isolate chunking performance from ChromaDB insertion.""" + + @pytest.mark.parametrize("content_size_kb", [1, 10, 100]) + def test_chunk_text_throughput(self, content_size_kb): + """Measure chunk_text speed for different content sizes.""" + from mempalace.miner import chunk_text + + gen = PalaceDataGenerator(seed=42) + # Generate content of target size + content = gen._random_text(content_size_kb * 500, content_size_kb * 1200) + # Pad to approximate target KB + while len(content) < content_size_kb * 1024: + content += "\n" + gen._random_text(200, 500) + + n_iterations = 50 + start = time.perf_counter() + total_chunks = 0 + for _ in range(n_iterations): + chunks = chunk_text(content, "bench_file.py") + total_chunks += len(chunks) + elapsed = time.perf_counter() - start + + chunks_per_sec = total_chunks / max(elapsed, 0.001) + kb_per_sec = (len(content) * n_iterations / 1024) / max(elapsed, 0.001) + + record_metric("chunking", f"chunks_per_sec_at_{content_size_kb}kb", round(chunks_per_sec, 1)) + record_metric("chunking", f"kb_per_sec_at_{content_size_kb}kb", round(kb_per_sec, 1)) + + +@pytest.mark.benchmark +class TestReingestSkipOverhead: + """Finding #11: file_already_mined() check overhead at scale.""" + + def test_skip_check_cost(self, tmp_path): + """Mine files, then re-mine — measure cost of skip checks.""" + gen = PalaceDataGenerator(seed=42, scale="small") + project_path, wing, rooms, files_written = gen.generate_project_tree( + tmp_path / "project", n_files=50 + ) + palace_path = str(tmp_path / "palace") + + from mempalace.miner import mine + + # First mine + mine(project_path, palace_path) + client = chromadb.PersistentClient(path=palace_path) + col = client.get_collection("mempalace_drawers") + initial_count = col.count() + + # Re-mine (all files should be skipped) + start = time.perf_counter() + mine(project_path, palace_path) + skip_elapsed = time.perf_counter() - start + + # Verify no new drawers added + final_count = col.count() + assert final_count == initial_count, "Re-mine should not add new drawers" + + record_metric("reingest", "skip_check_elapsed_sec", round(skip_elapsed, 2)) + record_metric("reingest", "files_checked", files_written) + record_metric("reingest", "skip_check_per_file_ms", round(skip_elapsed * 1000 / max(files_written, 1), 1)) diff --git a/tests/benchmarks/test_knowledge_graph_bench.py b/tests/benchmarks/test_knowledge_graph_bench.py new file mode 100644 index 0000000..8830d8b --- /dev/null +++ b/tests/benchmarks/test_knowledge_graph_bench.py @@ -0,0 +1,284 @@ +""" +Knowledge graph benchmarks — SQLite temporal KG at scale. + +Tests triple insertion throughput, query latency, temporal accuracy, +and SQLite concurrent access behavior. +""" + +import threading +import time + +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +@pytest.mark.benchmark +class TestTripleInsertionRate: + """Measure triples/sec at different scales.""" + + @pytest.mark.parametrize("n_triples", [200, 1_000, 5_000]) + def test_insertion_throughput(self, n_triples, tmp_path): + gen = PalaceDataGenerator(seed=42, scale="small") + entities, triples = gen.generate_kg_triples( + n_entities=min(n_triples // 2, 200), n_triples=n_triples + ) + + from mempalace.knowledge_graph import KnowledgeGraph + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + + # Insert entities first + for name, etype in entities: + kg.add_entity(name, etype) + + # Measure triple insertion + start = time.perf_counter() + for subject, predicate, obj, valid_from, valid_to in triples: + kg.add_triple( + subject, predicate, obj, valid_from=valid_from, valid_to=valid_to + ) + elapsed = time.perf_counter() - start + + triples_per_sec = n_triples / max(elapsed, 0.001) + + record_metric("kg_insert", f"triples_per_sec_at_{n_triples}", round(triples_per_sec, 1)) + record_metric("kg_insert", f"elapsed_sec_at_{n_triples}", round(elapsed, 3)) + + +@pytest.mark.benchmark +class TestQueryEntityLatency: + """Query latency for entities with varying relationship counts.""" + + def test_query_latency_vs_relationships(self, tmp_path): + """Create entities with 10, 50, 100 relationships and measure query time.""" + from mempalace.knowledge_graph import KnowledgeGraph + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + gen = PalaceDataGenerator(seed=42) + + # Create a hub entity connected to many others + kg.add_entity("Hub", "person") + target_counts = [10, 50, 100] + + for target in target_counts: + for i in range(target): + entity_name = f"Node_{target}_{i}" + kg.add_entity(entity_name, "project") + kg.add_triple("Hub", "works_on", entity_name, valid_from="2025-01-01") + + # Measure query for Hub (which has sum(target_counts) relationships) + latencies = [] + for _ in range(20): + start = time.perf_counter() + result = kg.query_entity("Hub") + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + + avg_ms = sum(latencies) / len(latencies) + total_rels = sum(target_counts) + + record_metric("kg_query", f"avg_ms_with_{total_rels}_rels", round(avg_ms, 2)) + record_metric("kg_query", "total_relationships", total_rels) + + +@pytest.mark.benchmark +class TestTimelinePerformance: + """timeline() with no entity filter does a full table scan.""" + + @pytest.mark.parametrize("n_triples", [200, 1_000, 5_000]) + def test_timeline_latency(self, n_triples, tmp_path): + from mempalace.knowledge_graph import KnowledgeGraph + + gen = PalaceDataGenerator(seed=42) + entities, triples = gen.generate_kg_triples( + n_entities=min(n_triples // 2, 200), n_triples=n_triples + ) + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + for name, etype in entities: + kg.add_entity(name, etype) + for subject, predicate, obj, valid_from, valid_to in triples: + kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to) + + # Measure timeline (no filter = full scan with LIMIT 100) + latencies = [] + for _ in range(10): + start = time.perf_counter() + result = kg.timeline() + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + + avg_ms = sum(latencies) / len(latencies) + record_metric("kg_timeline", f"avg_ms_at_{n_triples}", round(avg_ms, 2)) + + +@pytest.mark.benchmark +class TestTemporalQueryAccuracy: + """Verify temporal filtering correctness at scale.""" + + def test_as_of_filtering(self, tmp_path): + """Insert triples with known temporal ranges, verify as_of queries.""" + from mempalace.knowledge_graph import KnowledgeGraph + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + + kg.add_entity("Alice", "person") + kg.add_entity("ProjectA", "project") + kg.add_entity("ProjectB", "project") + + # Alice worked on ProjectA from 2024-01 to 2024-06 + kg.add_triple("Alice", "works_on", "ProjectA", valid_from="2024-01-01", valid_to="2024-06-30") + # Alice worked on ProjectB from 2024-07 onwards + kg.add_triple("Alice", "works_on", "ProjectB", valid_from="2024-07-01") + + # Add noise triples + gen = PalaceDataGenerator(seed=42) + entities, triples = gen.generate_kg_triples(n_entities=50, n_triples=500) + for name, etype in entities: + kg.add_entity(name, etype) + for subject, predicate, obj, valid_from, valid_to in triples: + kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to) + + # Query Alice as of March 2024 — should find ProjectA + result_march = kg.query_entity("Alice", as_of="2024-03-15") + project_names = [r.get("object") or r.get("name", "") for r in result_march] if isinstance(result_march, list) else [] + + # Query Alice as of September 2024 — should find ProjectB + result_sept = kg.query_entity("Alice", as_of="2024-09-15") + + record_metric("kg_temporal", "march_query_results", len(result_march) if isinstance(result_march, list) else 0) + record_metric("kg_temporal", "sept_query_results", len(result_sept) if isinstance(result_sept, list) else 0) + + +@pytest.mark.benchmark +class TestSQLiteConcurrentAccess: + """Test concurrent read/write behavior with SQLite (finding #8).""" + + def test_concurrent_writers(self, tmp_path): + """N threads writing triples simultaneously — count lock failures.""" + from mempalace.knowledge_graph import KnowledgeGraph + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + gen = PalaceDataGenerator(seed=42) + + # Pre-create entities + for i in range(100): + kg.add_entity(f"Entity_{i}", "concept") + + n_threads = 4 + triples_per_thread = 50 + lock_failures = [] + successes = [] + + def writer(thread_id): + fails = 0 + ok = 0 + for i in range(triples_per_thread): + try: + kg.add_triple( + f"Entity_{thread_id * 10}", + "relates_to", + f"Entity_{(thread_id * 10 + i) % 100}", + valid_from="2025-01-01", + ) + ok += 1 + except Exception: + fails += 1 + lock_failures.append(fails) + successes.append(ok) + + threads = [threading.Thread(target=writer, args=(t,)) for t in range(n_threads)] + start = time.perf_counter() + for t in threads: + t.start() + for t in threads: + t.join(timeout=30) + elapsed = time.perf_counter() - start + + total_failures = sum(lock_failures) + total_successes = sum(successes) + + record_metric("kg_concurrent", "total_failures", total_failures) + record_metric("kg_concurrent", "total_successes", total_successes) + record_metric("kg_concurrent", "elapsed_sec", round(elapsed, 2)) + record_metric("kg_concurrent", "threads", n_threads) + record_metric("kg_concurrent", "triples_per_thread", triples_per_thread) + + def test_concurrent_read_write(self, tmp_path): + """Readers and writers running simultaneously.""" + from mempalace.knowledge_graph import KnowledgeGraph + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + + # Seed some data + for i in range(50): + kg.add_entity(f"E_{i}", "concept") + for i in range(200): + kg.add_triple(f"E_{i % 50}", "links", f"E_{(i + 1) % 50}", valid_from="2025-01-01") + + read_errors = [] + write_errors = [] + + def reader(): + fails = 0 + for i in range(50): + try: + kg.query_entity(f"E_{i % 50}") + except Exception: + fails += 1 + read_errors.append(fails) + + def writer(): + fails = 0 + for i in range(50): + try: + kg.add_triple(f"E_{i % 50}", "new_rel", f"E_{(i + 7) % 50}", valid_from="2025-06-01") + except Exception: + fails += 1 + write_errors.append(fails) + + threads = [ + threading.Thread(target=reader), + threading.Thread(target=reader), + threading.Thread(target=writer), + threading.Thread(target=writer), + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=30) + + record_metric("kg_concurrent_rw", "read_errors", sum(read_errors)) + record_metric("kg_concurrent_rw", "write_errors", sum(write_errors)) + + +@pytest.mark.benchmark +class TestKGStats: + """Measure stats() performance as graph grows.""" + + @pytest.mark.parametrize("n_triples", [200, 1_000, 5_000]) + def test_stats_latency(self, n_triples, tmp_path): + from mempalace.knowledge_graph import KnowledgeGraph + + gen = PalaceDataGenerator(seed=42) + entities, triples = gen.generate_kg_triples( + n_entities=min(n_triples // 2, 200), n_triples=n_triples + ) + + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) + for name, etype in entities: + kg.add_entity(name, etype) + for subject, predicate, obj, valid_from, valid_to in triples: + kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to) + + latencies = [] + for _ in range(10): + start = time.perf_counter() + result = kg.stats() + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + + avg_ms = sum(latencies) / len(latencies) + record_metric("kg_stats", f"avg_ms_at_{n_triples}", round(avg_ms, 2)) diff --git a/tests/benchmarks/test_layers_bench.py b/tests/benchmarks/test_layers_bench.py new file mode 100644 index 0000000..fd13072 --- /dev/null +++ b/tests/benchmarks/test_layers_bench.py @@ -0,0 +1,206 @@ +""" +Memory stack (layers.py) benchmarks. + +Tests MemoryStack.wake_up(), Layer1.generate(), and Layer2/L3 +at scale. Layer1 has the same unbounded col.get() as tool_status. +""" + +import os +import time + +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +def _get_rss_mb(): + try: + import psutil + + return psutil.Process().memory_info().rss / (1024 * 1024) + except ImportError: + import resource + import platform + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +@pytest.mark.benchmark +class TestWakeUpCost: + """Measure wake_up() time (L0 + L1) at different palace sizes.""" + + SIZES = [500, 1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_wakeup_latency(self, n_drawers, tmp_path, bench_scale): + """L0+L1 generation time grows with palace size because L1 fetches all.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + + # Create identity file + identity_path = str(tmp_path / "identity.txt") + with open(identity_path, "w") as f: + f.write("I am a test AI. Traits: precise, fast.\n") + + from mempalace.layers import MemoryStack + + stack = MemoryStack(palace_path=palace_path, identity_path=identity_path) + + latencies = [] + for _ in range(5): + start = time.perf_counter() + text = stack.wake_up() + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + assert "L0" in text or "L1" in text or "IDENTITY" in text or "ESSENTIAL" in text + + avg_ms = sum(latencies) / len(latencies) + record_metric("layers_wakeup", f"avg_ms_at_{n_drawers}", round(avg_ms, 1)) + + +@pytest.mark.benchmark +class TestLayer1UnboundedFetch: + """Layer1.generate() fetches ALL drawers — same pattern as tool_status.""" + + SIZES = [500, 1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_layer1_rss_growth(self, n_drawers, tmp_path): + """Track RSS from Layer1 fetching all drawers at different sizes.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + + from mempalace.layers import Layer1 + + layer = Layer1(palace_path=palace_path) + + rss_before = _get_rss_mb() + start = time.perf_counter() + text = layer.generate() + elapsed_ms = (time.perf_counter() - start) * 1000 + rss_after = _get_rss_mb() + + rss_delta = rss_after - rss_before + assert "L1" in text + + record_metric("layer1", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1)) + record_metric("layer1", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2)) + + def test_layer1_wing_filtered(self, tmp_path): + """Wing-filtered Layer1 should fetch fewer drawers.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + from mempalace.layers import Layer1 + + wing = gen.wings[0] + + # Unfiltered + layer_all = Layer1(palace_path=palace_path) + start = time.perf_counter() + layer_all.generate() + unfiltered_ms = (time.perf_counter() - start) * 1000 + + # Wing-filtered + layer_wing = Layer1(palace_path=palace_path, wing=wing) + start = time.perf_counter() + layer_wing.generate() + filtered_ms = (time.perf_counter() - start) * 1000 + + record_metric("layer1_filter", "unfiltered_ms", round(unfiltered_ms, 1)) + record_metric("layer1_filter", "filtered_ms", round(filtered_ms, 1)) + if unfiltered_ms > 0: + record_metric("layer1_filter", "speedup_pct", round((1 - filtered_ms / unfiltered_ms) * 100, 1)) + + +@pytest.mark.benchmark +class TestWakeUpTokenBudget: + """Verify L0+L1 stays within token budget even at large palace sizes.""" + + SIZES = [500, 1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_token_budget(self, n_drawers, tmp_path): + """L1 has MAX_CHARS=3200 cap. Verify it holds at scale.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + + identity_path = str(tmp_path / "identity.txt") + with open(identity_path, "w") as f: + f.write("I am a benchmark AI.\n") + + from mempalace.layers import MemoryStack + + stack = MemoryStack(palace_path=palace_path, identity_path=identity_path) + text = stack.wake_up() + token_estimate = len(text) // 4 + + # Budget is ~600-900 tokens. Allow up to 1200 for safety margin. + record_metric("wakeup_budget", f"tokens_at_{n_drawers}", token_estimate) + record_metric("wakeup_budget", f"chars_at_{n_drawers}", len(text)) + + assert token_estimate < 1200, f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers" + + +@pytest.mark.benchmark +class TestLayer2Retrieval: + """Layer2 on-demand retrieval with filters.""" + + def test_layer2_latency(self, tmp_path, bench_scale): + """L2 retrieval with wing filter at scale.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + from mempalace.layers import Layer2 + + layer = Layer2(palace_path=palace_path) + wing = gen.wings[0] + + latencies = [] + for _ in range(10): + start = time.perf_counter() + text = layer.retrieve(wing=wing, n_results=10) + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + + avg_ms = sum(latencies) / len(latencies) + record_metric("layer2", "avg_retrieval_ms", round(avg_ms, 1)) + + +@pytest.mark.benchmark +class TestLayer3Search: + """Layer3 semantic search through the MemoryStack interface.""" + + def test_layer3_latency(self, tmp_path, bench_scale): + """L3 search latency through MemoryStack.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + identity_path = str(tmp_path / "identity.txt") + with open(identity_path, "w") as f: + f.write("I am a benchmark AI.\n") + + from mempalace.layers import MemoryStack + + stack = MemoryStack(palace_path=palace_path, identity_path=identity_path) + + queries = ["authentication", "database", "deployment", "testing", "monitoring"] + latencies = [] + for q in queries: + start = time.perf_counter() + text = stack.search(q, n_results=5) + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + + avg_ms = sum(latencies) / len(latencies) + record_metric("layer3", "avg_search_ms", round(avg_ms, 1)) diff --git a/tests/benchmarks/test_mcp_bench.py b/tests/benchmarks/test_mcp_bench.py new file mode 100644 index 0000000..8f2e006 --- /dev/null +++ b/tests/benchmarks/test_mcp_bench.py @@ -0,0 +1,226 @@ +""" +MCP server tool performance benchmarks. + +Validates production readiness findings: + - Finding #3: tool_status() unbounded col.get(include=["metadatas"]) → OOM + - Finding #7: _get_collection() re-instantiates PersistentClient every call + - Finding #3 variants: tool_list_wings(), tool_get_taxonomy() same pattern + +Calls MCP tool handler functions directly with monkeypatched _config. +""" + +import time + +import chromadb +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator, SCALE_CONFIGS +from tests.benchmarks.report import record_metric + + +# ── Helpers ────────────────────────────────────────────────────────────── + + +def _make_palace(tmp_path, n_drawers, scale="small"): + """Create a palace with exactly n_drawers, return palace_path.""" + gen = PalaceDataGenerator(seed=42, scale=scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + return palace_path + + +def _patch_mcp_config(monkeypatch, palace_path, tmp_path): + """Monkeypatch mcp_server._config and _kg to point at test dirs.""" + from mempalace.config import MempalaceConfig + from mempalace.knowledge_graph import KnowledgeGraph + + cfg = MempalaceConfig(config_dir=str(tmp_path / "cfg")) + # Override palace_path directly on the object + monkeypatch.setattr(cfg, "_file_config", {"palace_path": palace_path}) + + import mempalace.mcp_server as mcp_mod + + monkeypatch.setattr(mcp_mod, "_config", cfg) + monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))) + + +def _get_rss_mb(): + """Get current process RSS in MB.""" + try: + import psutil + + return psutil.Process().memory_info().rss / (1024 * 1024) + except ImportError: + import resource + + # ru_maxrss is in KB on Linux, bytes on macOS + import platform + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +# ── Tests ──────────────────────────────────────────────────────────────── + + +@pytest.mark.benchmark +class TestToolStatusOOM: + """Finding #3: tool_status loads ALL metadata into memory.""" + + SIZES = [500, 1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_tool_status_rss_growth(self, n_drawers, tmp_path, monkeypatch): + """Measure RSS growth from tool_status at different palace sizes.""" + palace_path = _make_palace(tmp_path, n_drawers) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import tool_status + + rss_before = _get_rss_mb() + result = tool_status() + rss_after = _get_rss_mb() + + rss_delta = rss_after - rss_before + assert "error" not in result, f"tool_status failed: {result}" + assert result["total_drawers"] == n_drawers + + record_metric("mcp_status", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2)) + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_tool_status_latency(self, n_drawers, tmp_path, monkeypatch): + """Measure tool_status response time at different palace sizes.""" + palace_path = _make_palace(tmp_path, n_drawers) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import tool_status + + # Warm up + tool_status() + + start = time.perf_counter() + result = tool_status() + elapsed_ms = (time.perf_counter() - start) * 1000 + + assert "error" not in result + record_metric("mcp_status", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1)) + + +@pytest.mark.benchmark +class TestToolListWingsUnbounded: + """Finding #3 variant: tool_list_wings also fetches ALL metadata.""" + + @pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500, 5_000]) + def test_list_wings_latency(self, n_drawers, tmp_path, monkeypatch): + palace_path = _make_palace(tmp_path, n_drawers) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import tool_list_wings + + start = time.perf_counter() + result = tool_list_wings() + elapsed_ms = (time.perf_counter() - start) * 1000 + + assert "wings" in result + record_metric("mcp_list_wings", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1)) + + +@pytest.mark.benchmark +class TestToolGetTaxonomyUnbounded: + """Finding #3 variant: tool_get_taxonomy also fetches ALL metadata.""" + + @pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500, 5_000]) + def test_get_taxonomy_latency(self, n_drawers, tmp_path, monkeypatch): + palace_path = _make_palace(tmp_path, n_drawers) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import tool_get_taxonomy + + start = time.perf_counter() + result = tool_get_taxonomy() + elapsed_ms = (time.perf_counter() - start) * 1000 + + assert "taxonomy" in result + record_metric("mcp_taxonomy", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1)) + + +@pytest.mark.benchmark +class TestClientReinstantiation: + """Finding #7: _get_collection() creates new PersistentClient every call.""" + + def test_reinstantiation_overhead(self, tmp_path, monkeypatch): + """Measure cost of 50 _get_collection() calls vs a cached client.""" + palace_path = _make_palace(tmp_path, 500) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import _get_collection + + n_calls = 50 + + # Measure re-instantiation (current behavior) + start = time.perf_counter() + for _ in range(n_calls): + col = _get_collection() + assert col is not None + uncached_ms = (time.perf_counter() - start) * 1000 + + # Measure cached client (what it should be) + client = chromadb.PersistentClient(path=palace_path) + cached_col = client.get_collection("mempalace_drawers") + start = time.perf_counter() + for _ in range(n_calls): + _ = cached_col.count() + cached_ms = (time.perf_counter() - start) * 1000 + + overhead_ratio = uncached_ms / max(cached_ms, 0.01) + + record_metric("client_reinstantiation", "uncached_total_ms", round(uncached_ms, 1)) + record_metric("client_reinstantiation", "cached_total_ms", round(cached_ms, 1)) + record_metric("client_reinstantiation", "overhead_ratio", round(overhead_ratio, 2)) + record_metric("client_reinstantiation", "n_calls", n_calls) + + +@pytest.mark.benchmark +class TestToolSearchLatency: + """tool_search uses query() not get(), should scale better.""" + + @pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500, 5_000]) + def test_search_latency(self, n_drawers, tmp_path, monkeypatch): + palace_path = _make_palace(tmp_path, n_drawers) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import tool_search + + queries = ["authentication middleware", "database migration", "error handling"] + latencies = [] + for q in queries: + start = time.perf_counter() + result = tool_search(query=q, limit=5) + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + assert "error" not in result + + avg_ms = sum(latencies) / len(latencies) + record_metric("mcp_search", f"avg_latency_ms_at_{n_drawers}", round(avg_ms, 1)) + + +@pytest.mark.benchmark +class TestDuplicateCheckCost: + """tool_add_drawer calls tool_check_duplicate first — measure overhead.""" + + @pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500]) + def test_duplicate_check_latency(self, n_drawers, tmp_path, monkeypatch): + palace_path = _make_palace(tmp_path, n_drawers) + _patch_mcp_config(monkeypatch, palace_path, tmp_path) + + from mempalace.mcp_server import tool_check_duplicate + + test_content = "This is unique test content for duplicate checking benchmark." + start = time.perf_counter() + result = tool_check_duplicate(content=test_content) + elapsed_ms = (time.perf_counter() - start) * 1000 + + assert "error" not in result + record_metric("mcp_duplicate_check", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1)) diff --git a/tests/benchmarks/test_memory_profile.py b/tests/benchmarks/test_memory_profile.py new file mode 100644 index 0000000..2b30688 --- /dev/null +++ b/tests/benchmarks/test_memory_profile.py @@ -0,0 +1,178 @@ +""" +Memory profiling benchmarks — detect leaks and measure RSS growth. + +Uses tracemalloc for heap snapshots and psutil/resource for RSS. +Targets the highest-risk code paths: + - Repeated search() calls (PersistentClient re-instantiation) + - Repeated tool_status() calls (unbounded metadata fetch) + - Layer1.generate() (fetches all drawers) +""" + +import time +import tracemalloc + +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +def _get_rss_mb(): + try: + import psutil + + return psutil.Process().memory_info().rss / (1024 * 1024) + except ImportError: + import resource + import platform + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +@pytest.mark.benchmark +class TestSearchMemoryProfile: + """Track RSS growth over repeated search_memories() calls.""" + + def test_search_rss_growth(self, tmp_path): + """Issue 200 searches and track RSS every 50 calls.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=1_000, include_needles=False) + + from mempalace.searcher import search_memories + + n_calls = 200 + check_interval = 50 + queries = ["authentication", "database", "deployment", "error handling", "testing"] + rss_readings = [] + rss_readings.append(("start", _get_rss_mb())) + + for i in range(n_calls): + q = queries[i % len(queries)] + search_memories(q, palace_path=palace_path, n_results=5) + if (i + 1) % check_interval == 0: + rss_readings.append((f"after_{i + 1}", _get_rss_mb())) + + start_rss = rss_readings[0][1] + end_rss = rss_readings[-1][1] + growth = end_rss - start_rss + + record_metric("memory_search", "rss_start_mb", round(start_rss, 2)) + record_metric("memory_search", "rss_end_mb", round(end_rss, 2)) + record_metric("memory_search", "rss_growth_mb", round(growth, 2)) + record_metric("memory_search", "n_calls", n_calls) + record_metric("memory_search", "growth_per_100_calls_mb", round(growth / (n_calls / 100), 2)) + + +@pytest.mark.benchmark +class TestToolStatusMemoryProfile: + """Track RSS growth from repeated tool_status() calls.""" + + def test_tool_status_repeated_calls(self, tmp_path, monkeypatch): + """tool_status loads ALL metadata each call — does it leak?""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + from mempalace.config import MempalaceConfig + from mempalace.knowledge_graph import KnowledgeGraph + import mempalace.mcp_server as mcp_mod + + cfg = MempalaceConfig(config_dir=str(tmp_path / "cfg")) + monkeypatch.setattr(cfg, "_file_config", {"palace_path": palace_path}) + monkeypatch.setattr(mcp_mod, "_config", cfg) + monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))) + + from mempalace.mcp_server import tool_status + + n_calls = 50 + rss_readings = [] + rss_readings.append(("start", _get_rss_mb())) + + for i in range(n_calls): + result = tool_status() + assert result["total_drawers"] == 2_000 + if (i + 1) % 10 == 0: + rss_readings.append((f"after_{i + 1}", _get_rss_mb())) + + start_rss = rss_readings[0][1] + end_rss = rss_readings[-1][1] + growth = end_rss - start_rss + + record_metric("memory_tool_status", "rss_start_mb", round(start_rss, 2)) + record_metric("memory_tool_status", "rss_end_mb", round(end_rss, 2)) + record_metric("memory_tool_status", "rss_growth_mb", round(growth, 2)) + record_metric("memory_tool_status", "n_calls", n_calls) + record_metric("memory_tool_status", "palace_size", 2_000) + + +@pytest.mark.benchmark +class TestLayer1MemoryProfile: + """Layer1.generate() fetches ALL drawers — same risk as tool_status.""" + + def test_layer1_repeated_generate(self, tmp_path): + """Layer1 fetches all drawers for scoring. Track memory over repeats.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + from mempalace.layers import Layer1 + + layer = Layer1(palace_path=palace_path) + + n_calls = 30 + rss_readings = [] + rss_readings.append(("start", _get_rss_mb())) + + for i in range(n_calls): + text = layer.generate() + assert "L1" in text + if (i + 1) % 10 == 0: + rss_readings.append((f"after_{i + 1}", _get_rss_mb())) + + start_rss = rss_readings[0][1] + end_rss = rss_readings[-1][1] + growth = end_rss - start_rss + + record_metric("memory_layer1", "rss_start_mb", round(start_rss, 2)) + record_metric("memory_layer1", "rss_end_mb", round(end_rss, 2)) + record_metric("memory_layer1", "rss_growth_mb", round(growth, 2)) + record_metric("memory_layer1", "n_calls", n_calls) + + +@pytest.mark.benchmark +class TestHeapSnapshot: + """Use tracemalloc to identify top memory allocators during search.""" + + def test_search_heap_top_allocators(self, tmp_path): + """Identify which code paths allocate the most memory during search.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=1_000, include_needles=False) + + from mempalace.searcher import search_memories + + tracemalloc.start() + snap_before = tracemalloc.take_snapshot() + + for i in range(100): + search_memories("test query", palace_path=palace_path, n_results=5) + + snap_after = tracemalloc.take_snapshot() + tracemalloc.stop() + + stats = snap_after.compare_to(snap_before, "lineno") + top_allocators = [] + for stat in stats[:10]: + top_allocators.append({ + "file": str(stat.traceback), + "size_kb": round(stat.size / 1024, 1), + "count": stat.count, + }) + + total_growth_kb = sum(s["size_kb"] for s in top_allocators) + record_metric("heap_search", "top_10_growth_kb", round(total_growth_kb, 1)) + record_metric("heap_search", "n_searches", 100) diff --git a/tests/benchmarks/test_palace_boost.py b/tests/benchmarks/test_palace_boost.py new file mode 100644 index 0000000..6994313 --- /dev/null +++ b/tests/benchmarks/test_palace_boost.py @@ -0,0 +1,172 @@ +""" +Palace boost validation — does wing/room filtering actually help? + +Quantifies the retrieval improvement from the palace spatial metaphor. +Uses planted needles to measure recall with and without filtering +at different scales. +""" + +import time + +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +@pytest.mark.benchmark +class TestFilteredVsUnfilteredRecall: + """Quantify palace boost: recall improvement from wing/room filtering.""" + + SIZES = [1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_palace_boost_recall(self, n_drawers, tmp_path, bench_scale): + """Compare recall@5 with/without wing filter at increasing scale.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + _, _, needle_info = gen.populate_palace_directly( + palace_path, n_drawers=n_drawers, include_needles=True + ) + + from mempalace.searcher import search_memories + + n_queries = min(10, len(needle_info)) + unfiltered_hits = 0 + wing_filtered_hits = 0 + room_filtered_hits = 0 + + for needle in needle_info[:n_queries]: + # Unfiltered search + result = search_memories(needle["query"], palace_path=palace_path, n_results=5) + texts = [h["text"] for h in result.get("results", [])] + if any("NEEDLE_" in t for t in texts[:5]): + unfiltered_hits += 1 + + # Wing-filtered search + result = search_memories( + needle["query"], palace_path=palace_path, wing=needle["wing"], n_results=5 + ) + texts = [h["text"] for h in result.get("results", [])] + if any("NEEDLE_" in t for t in texts[:5]): + wing_filtered_hits += 1 + + # Wing+room filtered search + result = search_memories( + needle["query"], + palace_path=palace_path, + wing=needle["wing"], + room=needle["room"], + n_results=5, + ) + texts = [h["text"] for h in result.get("results", [])] + if any("NEEDLE_" in t for t in texts[:5]): + room_filtered_hits += 1 + + recall_none = unfiltered_hits / max(n_queries, 1) + recall_wing = wing_filtered_hits / max(n_queries, 1) + recall_room = room_filtered_hits / max(n_queries, 1) + + boost_wing = recall_wing - recall_none + boost_room = recall_room - recall_none + + record_metric("palace_boost", f"recall_unfiltered_at_{n_drawers}", round(recall_none, 3)) + record_metric("palace_boost", f"recall_wing_filtered_at_{n_drawers}", round(recall_wing, 3)) + record_metric("palace_boost", f"recall_room_filtered_at_{n_drawers}", round(recall_room, 3)) + record_metric("palace_boost", f"wing_boost_at_{n_drawers}", round(boost_wing, 3)) + record_metric("palace_boost", f"room_boost_at_{n_drawers}", round(boost_room, 3)) + + +@pytest.mark.benchmark +class TestFilterLatencyBenefit: + """Does filtering reduce query latency by narrowing the search space?""" + + def test_filter_speedup(self, tmp_path, bench_scale): + """Compare latency: no filter vs wing vs wing+room.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=5_000, include_needles=False) + + from mempalace.searcher import search_memories + + wing = gen.wings[0] + room = gen.rooms_by_wing[wing][0] + query = "authentication middleware optimization" + n_runs = 10 + + # No filter + latencies_none = [] + for _ in range(n_runs): + start = time.perf_counter() + search_memories(query, palace_path=palace_path, n_results=5) + latencies_none.append((time.perf_counter() - start) * 1000) + + # Wing filter + latencies_wing = [] + for _ in range(n_runs): + start = time.perf_counter() + search_memories(query, palace_path=palace_path, wing=wing, n_results=5) + latencies_wing.append((time.perf_counter() - start) * 1000) + + # Wing + room filter + latencies_room = [] + for _ in range(n_runs): + start = time.perf_counter() + search_memories(query, palace_path=palace_path, wing=wing, room=room, n_results=5) + latencies_room.append((time.perf_counter() - start) * 1000) + + avg_none = sum(latencies_none) / len(latencies_none) + avg_wing = sum(latencies_wing) / len(latencies_wing) + avg_room = sum(latencies_room) / len(latencies_room) + + record_metric("filter_latency", "avg_unfiltered_ms", round(avg_none, 1)) + record_metric("filter_latency", "avg_wing_filtered_ms", round(avg_wing, 1)) + record_metric("filter_latency", "avg_room_filtered_ms", round(avg_room, 1)) + if avg_none > 0: + record_metric("filter_latency", "wing_speedup_pct", round((1 - avg_wing / avg_none) * 100, 1)) + record_metric("filter_latency", "room_speedup_pct", round((1 - avg_room / avg_none) * 100, 1)) + + +@pytest.mark.benchmark +class TestBoostAtIncreasingScale: + """Does the palace boost increase as the palace grows?""" + + def test_boost_scaling(self, tmp_path, bench_scale): + """Measure wing-filtered recall improvement at multiple sizes.""" + sizes = [500, 1_000, 2_500] + boosts = [] + + for size in sizes: + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / f"palace_{size}") + _, _, needle_info = gen.populate_palace_directly( + palace_path, n_drawers=size, include_needles=True + ) + + from mempalace.searcher import search_memories + + n_queries = min(8, len(needle_info)) + unfiltered_hits = 0 + filtered_hits = 0 + + for needle in needle_info[:n_queries]: + result = search_memories(needle["query"], palace_path=palace_path, n_results=5) + if any("NEEDLE_" in h["text"] for h in result.get("results", [])[:5]): + unfiltered_hits += 1 + + result = search_memories( + needle["query"], palace_path=palace_path, wing=needle["wing"], n_results=5 + ) + if any("NEEDLE_" in h["text"] for h in result.get("results", [])[:5]): + filtered_hits += 1 + + recall_none = unfiltered_hits / max(n_queries, 1) + recall_filtered = filtered_hits / max(n_queries, 1) + boost = recall_filtered - recall_none + boosts.append({"size": size, "boost": boost}) + + record_metric("boost_scaling", "boosts_by_size", boosts) + # Check if boost increases with scale (the hypothesis) + if len(boosts) >= 2: + trend_positive = boosts[-1]["boost"] >= boosts[0]["boost"] + record_metric("boost_scaling", "trend_positive", trend_positive) diff --git a/tests/benchmarks/test_search_bench.py b/tests/benchmarks/test_search_bench.py new file mode 100644 index 0000000..274c693 --- /dev/null +++ b/tests/benchmarks/test_search_bench.py @@ -0,0 +1,225 @@ +""" +Search performance benchmarks. + +Measures query latency, recall@k, and concurrent search behavior +as palace size grows. Uses planted needles for recall measurement. +""" + +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +@pytest.mark.benchmark +class TestSearchLatencyVsSize: + """Query latency scaling as palace grows.""" + + SIZES = [500, 1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_search_latency_curve(self, n_drawers, tmp_path, bench_scale): + """Measure average search latency at different palace sizes.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False) + + from mempalace.searcher import search_memories + + queries = [ + "authentication middleware", + "database optimization", + "error handling patterns", + "deployment configuration", + "testing strategy", + ] + + latencies = [] + for q in queries: + start = time.perf_counter() + result = search_memories(q, palace_path=palace_path, n_results=5) + elapsed_ms = (time.perf_counter() - start) * 1000 + latencies.append(elapsed_ms) + assert "error" not in result + + avg_ms = sum(latencies) / len(latencies) + sorted_lat = sorted(latencies) + p50_ms = sorted_lat[len(sorted_lat) // 2] + p95_ms = sorted_lat[int(len(sorted_lat) * 0.95)] + + record_metric("search", f"avg_latency_ms_at_{n_drawers}", round(avg_ms, 1)) + record_metric("search", f"p50_ms_at_{n_drawers}", round(p50_ms, 1)) + record_metric("search", f"p95_ms_at_{n_drawers}", round(p95_ms, 1)) + + +@pytest.mark.benchmark +class TestSearchRecallAtScale: + """Planted needle recall — does accuracy degrade as palace grows?""" + + SIZES = [500, 1_000, 2_500, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_recall_at_k(self, n_drawers, tmp_path, bench_scale): + """Recall@5 and Recall@10 using planted needles.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + _, _, needle_info = gen.populate_palace_directly( + palace_path, n_drawers=n_drawers, include_needles=True + ) + + from mempalace.searcher import search_memories + + hits_at_5 = 0 + hits_at_10 = 0 + total_needle_queries = min(10, len(needle_info)) + + for needle in needle_info[:total_needle_queries]: + result = search_memories( + needle["query"], palace_path=palace_path, n_results=10 + ) + if "error" in result: + continue + + texts = [h["text"] for h in result.get("results", [])] + + # Check if needle content appears in top 5 + found_at_5 = any("NEEDLE_" in t for t in texts[:5]) + found_at_10 = any("NEEDLE_" in t for t in texts[:10]) + + if found_at_5: + hits_at_5 += 1 + if found_at_10: + hits_at_10 += 1 + + recall_at_5 = hits_at_5 / max(total_needle_queries, 1) + recall_at_10 = hits_at_10 / max(total_needle_queries, 1) + + record_metric("search_recall", f"recall_at_5_at_{n_drawers}", round(recall_at_5, 3)) + record_metric("search_recall", f"recall_at_10_at_{n_drawers}", round(recall_at_10, 3)) + + +@pytest.mark.benchmark +class TestSearchFilteredVsUnfiltered: + """Compare search performance with and without wing/room filters.""" + + def test_filter_impact(self, tmp_path, bench_scale): + """Measure latency and recall difference with wing filtering.""" + gen = PalaceDataGenerator(seed=42, scale=bench_scale) + palace_path = str(tmp_path / "palace") + _, _, needle_info = gen.populate_palace_directly( + palace_path, n_drawers=2_000, include_needles=True + ) + + from mempalace.searcher import search_memories + + filtered_latencies = [] + unfiltered_latencies = [] + filtered_hits = 0 + unfiltered_hits = 0 + n_queries = min(10, len(needle_info)) + + for needle in needle_info[:n_queries]: + # Unfiltered + start = time.perf_counter() + result_unfiltered = search_memories( + needle["query"], palace_path=palace_path, n_results=5 + ) + unfiltered_latencies.append((time.perf_counter() - start) * 1000) + if any("NEEDLE_" in h["text"] for h in result_unfiltered.get("results", [])[:5]): + unfiltered_hits += 1 + + # Filtered by wing + start = time.perf_counter() + result_filtered = search_memories( + needle["query"], + palace_path=palace_path, + wing=needle["wing"], + n_results=5, + ) + filtered_latencies.append((time.perf_counter() - start) * 1000) + if any("NEEDLE_" in h["text"] for h in result_filtered.get("results", [])[:5]): + filtered_hits += 1 + + avg_unfiltered = sum(unfiltered_latencies) / max(len(unfiltered_latencies), 1) + avg_filtered = sum(filtered_latencies) / max(len(filtered_latencies), 1) + latency_improvement = ((avg_unfiltered - avg_filtered) / max(avg_unfiltered, 0.01)) * 100 + + record_metric("search_filter", "avg_unfiltered_ms", round(avg_unfiltered, 1)) + record_metric("search_filter", "avg_filtered_ms", round(avg_filtered, 1)) + record_metric("search_filter", "latency_improvement_pct", round(latency_improvement, 1)) + record_metric("search_filter", "unfiltered_recall_at_5", round(unfiltered_hits / max(n_queries, 1), 3)) + record_metric("search_filter", "filtered_recall_at_5", round(filtered_hits / max(n_queries, 1), 3)) + + +@pytest.mark.benchmark +class TestConcurrentSearch: + """Concurrent query performance — tests PersistentClient contention.""" + + def test_concurrent_queries(self, tmp_path): + """Issue N simultaneous queries and measure p50/p95/p99.""" + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + from mempalace.searcher import search_memories + + queries = [ + "authentication", "database", "deployment", "error handling", + "testing", "monitoring", "caching", "middleware", + "serialization", "validation", + ] * 3 # 30 total queries + + def run_search(query): + start = time.perf_counter() + result = search_memories(query, palace_path=palace_path, n_results=5) + elapsed = (time.perf_counter() - start) * 1000 + return elapsed, "error" not in result + + # Concurrent execution + latencies = [] + errors = 0 + with ThreadPoolExecutor(max_workers=4) as executor: + futures = {executor.submit(run_search, q): q for q in queries} + for future in as_completed(futures): + elapsed, success = future.result() + latencies.append(elapsed) + if not success: + errors += 1 + + sorted_lat = sorted(latencies) + n = len(sorted_lat) + + record_metric("concurrent_search", "p50_ms", round(sorted_lat[n // 2], 1)) + record_metric("concurrent_search", "p95_ms", round(sorted_lat[int(n * 0.95)], 1)) + record_metric("concurrent_search", "p99_ms", round(sorted_lat[int(n * 0.99)], 1)) + record_metric("concurrent_search", "avg_ms", round(sum(sorted_lat) / n, 1)) + record_metric("concurrent_search", "error_count", errors) + record_metric("concurrent_search", "total_queries", len(queries)) + record_metric("concurrent_search", "workers", 4) + + +@pytest.mark.benchmark +class TestSearchNResultsScaling: + """How does n_results affect query latency?""" + + @pytest.mark.parametrize("n_results", [1, 5, 10, 25, 50]) + def test_n_results_latency(self, n_results, tmp_path): + gen = PalaceDataGenerator(seed=42, scale="small") + palace_path = str(tmp_path / "palace") + gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False) + + from mempalace.searcher import search_memories + + latencies = [] + for _ in range(5): + start = time.perf_counter() + result = search_memories( + "authentication middleware", palace_path=palace_path, n_results=n_results + ) + latencies.append((time.perf_counter() - start) * 1000) + + avg_ms = sum(latencies) / len(latencies) + record_metric("search_n_results", f"avg_ms_at_n_{n_results}", round(avg_ms, 1)) From e8017ca2ec07c37db9272fe49df84c5bc9d0805a Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Wed, 8 Apr 2026 05:01:51 -0300 Subject: [PATCH 2/5] bench: add per-room recall threshold test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concentrates all drawers into a single wing+room to isolate the embedding model's retrieval limit independent of palace filtering. Confirms recall degrades to ~0.4-0.5 at 5K drawers per room even with wing+room filters applied — the spatial structure helps by keeping buckets small, but can't fix the underlying embedding ceiling. --- tests/benchmarks/README.md | 14 +- tests/benchmarks/test_recall_threshold.py | 179 ++++++++++++++++++++++ 2 files changed, 187 insertions(+), 6 deletions(-) create mode 100644 tests/benchmarks/test_recall_threshold.py diff --git a/tests/benchmarks/README.md b/tests/benchmarks/README.md index bfa5963..965afee 100644 --- a/tests/benchmarks/README.md +++ b/tests/benchmarks/README.md @@ -1,6 +1,6 @@ # MemPalace Scale Benchmark Suite -94 tests that benchmark mempalace at scale to validate real-world performance limits. +106 tests that benchmark mempalace at scale to validate real-world performance limits. ## Why @@ -11,6 +11,7 @@ MemPalace has strong academic scores (96.6% R@5 on LongMemEval) but no empirical - Modified files are never re-ingested — what's the skip-check cost at scale? - How does query latency degrade as the palace grows from 1K to 100K drawers? - Does wing/room filtering actually improve retrieval, and by how much? +- At what per-room drawer count does recall break regardless of filtering? This suite finds those answers. @@ -20,7 +21,7 @@ This suite finds those answers. # Fast smoke test (~2 min) uv run pytest tests/benchmarks/ -v --bench-scale=small -m "benchmark and not slow" -# Full small scale (~30 min) +# Full small scale (~35 min) uv run pytest tests/benchmarks/ -v --bench-scale=small # Medium scale with JSON report @@ -61,6 +62,7 @@ uv run pytest tests/benchmarks/ -v --bench-scale=stress -m stress | File | What it tests | |------|--------------| | `test_palace_boost.py` | Retrieval improvement from wing/room filtering at different scales | +| `test_recall_threshold.py` | Per-room recall ceiling — isolates embedding model limit with all drawers in one bucket | | `test_knowledge_graph_bench.py` | Triple insertion rate, temporal query accuracy, SQLite concurrent access | | `test_layers_bench.py` | MemoryStack wake-up cost, Layer1 unbounded fetch, token budget compliance | @@ -68,10 +70,10 @@ uv run pytest tests/benchmarks/ -v --bench-scale=stress -m stress ``` tests/benchmarks/ - conftest.py # --bench-scale / --bench-report CLI options, fixtures, markers - data_generator.py # Deterministic data factory (seeded RNG, planted needles) - report.py # JSON report writer + regression checker - test_*.py # 8 test modules (94 tests total) + conftest.py # --bench-scale / --bench-report CLI options, fixtures, markers + data_generator.py # Deterministic data factory (seeded RNG, planted needles) + report.py # JSON report writer + regression checker + test_*.py # 9 test modules (106 tests total) ``` ### Data Generator diff --git a/tests/benchmarks/test_recall_threshold.py b/tests/benchmarks/test_recall_threshold.py new file mode 100644 index 0000000..48f590b --- /dev/null +++ b/tests/benchmarks/test_recall_threshold.py @@ -0,0 +1,179 @@ +""" +Recall threshold test — find the per-bucket size where retrieval breaks. + +The palace_boost tests showed room-filtered recall of 1.0, but only because +each room had ~333 drawers. This test concentrates ALL drawers into a single +wing+room to find the actual embedding model limit. +""" + +import hashlib +import os +import time +from datetime import datetime + +import chromadb +import pytest + +from tests.benchmarks.data_generator import PalaceDataGenerator +from tests.benchmarks.report import record_metric + + +NEEDLE_TOPICS = [ + "Fibonacci sequence optimization uses memoization with O(n) space complexity", + "PostgreSQL vacuum autovacuum threshold set to 50 percent for table users", + "Redis cluster failover timeout configured at 30 seconds with sentinel monitoring", + "Kubernetes horizontal pod autoscaler targets 70 percent CPU utilization", + "GraphQL subscription uses WebSocket transport with heartbeat interval 25 seconds", + "JWT token rotation policy requires refresh every 15 minutes with sliding window", + "Elasticsearch index sharding strategy uses 5 primary shards with 1 replica each", + "Docker multi-stage build reduces image size from 1.2GB to 180MB for production", + "Apache Kafka consumer group rebalance timeout set to 45 seconds", + "MongoDB change streams resume token persisted every 100 operations", +] + +NEEDLE_QUERIES = [ + "Fibonacci sequence optimization memoization", + "PostgreSQL vacuum autovacuum threshold", + "Redis cluster failover timeout sentinel", + "Kubernetes horizontal pod autoscaler CPU", + "GraphQL subscription WebSocket heartbeat", + "JWT token rotation policy refresh", + "Elasticsearch index sharding primary replica", + "Docker multi-stage build image size production", + "Apache Kafka consumer group rebalance", + "MongoDB change streams resume token", +] + + +def _populate_single_room(palace_path, n_drawers, n_needles=10): + """Pack all drawers into one wing+room, plant needles, return queries.""" + gen = PalaceDataGenerator(seed=42, scale="small") + os.makedirs(palace_path, exist_ok=True) + client = chromadb.PersistentClient(path=palace_path) + col = client.get_or_create_collection("mempalace_drawers") + + batch_size = 500 + docs, ids, metas = [], [], [] + + # Plant needles + for i in range(n_needles): + needle_id = f"NEEDLE_{i:04d}" + content = f"{needle_id}: {NEEDLE_TOPICS[i]}. Unique planted needle for threshold test." + drawer_id = f"drawer_single_room_{hashlib.md5(needle_id.encode()).hexdigest()[:16]}" + docs.append(content) + ids.append(drawer_id) + metas.append({ + "wing": "concentrated", + "room": "single_room", + "source_file": f"needle_{i}.txt", + "chunk_index": 0, + "added_by": "threshold_bench", + "filed_at": datetime.now().isoformat(), + }) + + # Fill with noise — all in the SAME room + remaining = n_drawers - len(docs) + for i in range(remaining): + content = gen._random_text(400, 800) + drawer_id = f"drawer_single_room_{hashlib.md5(f'noise_{i}'.encode()).hexdigest()[:16]}" + docs.append(content) + ids.append(drawer_id) + metas.append({ + "wing": "concentrated", + "room": "single_room", + "source_file": f"noise_{i:06d}.txt", + "chunk_index": i % 10, + "added_by": "threshold_bench", + "filed_at": datetime.now().isoformat(), + }) + + if len(docs) >= batch_size: + col.add(documents=docs, ids=ids, metadatas=metas) + docs, ids, metas = [], [], [] + + if docs: + col.add(documents=docs, ids=ids, metadatas=metas) + + return client, col + + +@pytest.mark.benchmark +class TestRecallThresholdSingleRoom: + """ + All drawers in one room — isolates the embedding model's retrieval limit. + + Room filtering can't help here. This is the true ceiling. + """ + + SIZES = [250, 500, 1_000, 2_000, 3_000, 5_000] + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_single_room_recall(self, n_drawers, tmp_path): + """Recall@5 and @10 with all drawers in one bucket.""" + palace_path = str(tmp_path / "palace") + _populate_single_room(palace_path, n_drawers, n_needles=10) + + from mempalace.searcher import search_memories + + hits_at_5 = 0 + hits_at_10 = 0 + n_queries = len(NEEDLE_QUERIES) + + for i, query in enumerate(NEEDLE_QUERIES): + result = search_memories( + query, + palace_path=palace_path, + wing="concentrated", + room="single_room", + n_results=10, + ) + if "error" in result: + continue + + texts = [h["text"] for h in result.get("results", [])] + needle_id = f"NEEDLE_{i:04d}" + + found_at_5 = any(needle_id in t for t in texts[:5]) + found_at_10 = any(needle_id in t for t in texts[:10]) + + if found_at_5: + hits_at_5 += 1 + if found_at_10: + hits_at_10 += 1 + + recall_5 = hits_at_5 / n_queries + recall_10 = hits_at_10 / n_queries + + record_metric("single_room_recall", f"recall_at_5_at_{n_drawers}", round(recall_5, 3)) + record_metric("single_room_recall", f"recall_at_10_at_{n_drawers}", round(recall_10, 3)) + + @pytest.mark.parametrize("n_drawers", SIZES) + def test_single_room_no_filter_recall(self, n_drawers, tmp_path): + """Same test but WITHOUT wing/room filter — pure unfiltered search.""" + palace_path = str(tmp_path / "palace") + _populate_single_room(palace_path, n_drawers, n_needles=10) + + from mempalace.searcher import search_memories + + hits_at_5 = 0 + hits_at_10 = 0 + n_queries = len(NEEDLE_QUERIES) + + for i, query in enumerate(NEEDLE_QUERIES): + result = search_memories(query, palace_path=palace_path, n_results=10) + if "error" in result: + continue + + texts = [h["text"] for h in result.get("results", [])] + needle_id = f"NEEDLE_{i:04d}" + + if any(needle_id in t for t in texts[:5]): + hits_at_5 += 1 + if any(needle_id in t for t in texts[:10]): + hits_at_10 += 1 + + recall_5 = hits_at_5 / n_queries + recall_10 = hits_at_10 / n_queries + + record_metric("single_room_unfiltered", f"recall_at_5_at_{n_drawers}", round(recall_5, 3)) + record_metric("single_room_unfiltered", f"recall_at_10_at_{n_drawers}", round(recall_10, 3)) From 7e4db3306195a8c4d07a5df7e982e062082c047b Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Wed, 8 Apr 2026 05:10:26 -0300 Subject: [PATCH 3/5] fix: resolve ruff lint errors in benchmark suite Remove unused imports (shutil, string, datetime, os, yaml, time, SCALE_CONFIGS) and unused variable assignments in timing-only calls. --- tests/benchmarks/conftest.py | 1 - tests/benchmarks/data_generator.py | 1 - tests/benchmarks/report.py | 1 - tests/benchmarks/test_ingest_bench.py | 2 -- tests/benchmarks/test_knowledge_graph_bench.py | 10 +++------- tests/benchmarks/test_layers_bench.py | 5 ++--- tests/benchmarks/test_mcp_bench.py | 2 +- tests/benchmarks/test_memory_profile.py | 1 - tests/benchmarks/test_recall_threshold.py | 1 - tests/benchmarks/test_search_bench.py | 2 +- 10 files changed, 7 insertions(+), 19 deletions(-) diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py index 4f9fcfe..8852a3b 100644 --- a/tests/benchmarks/conftest.py +++ b/tests/benchmarks/conftest.py @@ -2,7 +2,6 @@ import json import os -import shutil import tempfile import pytest diff --git a/tests/benchmarks/data_generator.py b/tests/benchmarks/data_generator.py index 8d9a359..7dd093b 100644 --- a/tests/benchmarks/data_generator.py +++ b/tests/benchmarks/data_generator.py @@ -10,7 +10,6 @@ Planted "needle" drawers enable recall measurement without an LLM judge. import hashlib import os import random -import string from datetime import datetime, timedelta from pathlib import Path diff --git a/tests/benchmarks/report.py b/tests/benchmarks/report.py index 8defc5a..87009ca 100644 --- a/tests/benchmarks/report.py +++ b/tests/benchmarks/report.py @@ -8,7 +8,6 @@ conftest.py pytest_terminal_summary hook writes the collected results. import json import os import tempfile -from datetime import datetime RESULTS_FILE = os.path.join(tempfile.gettempdir(), "mempalace_bench_results.json") diff --git a/tests/benchmarks/test_ingest_bench.py b/tests/benchmarks/test_ingest_bench.py index c9179fd..6703d11 100644 --- a/tests/benchmarks/test_ingest_bench.py +++ b/tests/benchmarks/test_ingest_bench.py @@ -8,12 +8,10 @@ Measures mining performance at scale: - Re-ingest skip overhead (finding #11: file_already_mined check) """ -import os import time import chromadb import pytest -import yaml from tests.benchmarks.data_generator import PalaceDataGenerator from tests.benchmarks.report import record_metric diff --git a/tests/benchmarks/test_knowledge_graph_bench.py b/tests/benchmarks/test_knowledge_graph_bench.py index 8830d8b..c9897fb 100644 --- a/tests/benchmarks/test_knowledge_graph_bench.py +++ b/tests/benchmarks/test_knowledge_graph_bench.py @@ -56,7 +56,6 @@ class TestQueryEntityLatency: from mempalace.knowledge_graph import KnowledgeGraph kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) - gen = PalaceDataGenerator(seed=42) # Create a hub entity connected to many others kg.add_entity("Hub", "person") @@ -72,7 +71,7 @@ class TestQueryEntityLatency: latencies = [] for _ in range(20): start = time.perf_counter() - result = kg.query_entity("Hub") + kg.query_entity("Hub") elapsed_ms = (time.perf_counter() - start) * 1000 latencies.append(elapsed_ms) @@ -106,7 +105,7 @@ class TestTimelinePerformance: latencies = [] for _ in range(10): start = time.perf_counter() - result = kg.timeline() + kg.timeline() elapsed_ms = (time.perf_counter() - start) * 1000 latencies.append(elapsed_ms) @@ -143,8 +142,6 @@ class TestTemporalQueryAccuracy: # Query Alice as of March 2024 — should find ProjectA result_march = kg.query_entity("Alice", as_of="2024-03-15") - project_names = [r.get("object") or r.get("name", "") for r in result_march] if isinstance(result_march, list) else [] - # Query Alice as of September 2024 — should find ProjectB result_sept = kg.query_entity("Alice", as_of="2024-09-15") @@ -161,7 +158,6 @@ class TestSQLiteConcurrentAccess: from mempalace.knowledge_graph import KnowledgeGraph kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) - gen = PalaceDataGenerator(seed=42) # Pre-create entities for i in range(100): @@ -276,7 +272,7 @@ class TestKGStats: latencies = [] for _ in range(10): start = time.perf_counter() - result = kg.stats() + kg.stats() elapsed_ms = (time.perf_counter() - start) * 1000 latencies.append(elapsed_ms) diff --git a/tests/benchmarks/test_layers_bench.py b/tests/benchmarks/test_layers_bench.py index fd13072..2237209 100644 --- a/tests/benchmarks/test_layers_bench.py +++ b/tests/benchmarks/test_layers_bench.py @@ -5,7 +5,6 @@ Tests MemoryStack.wake_up(), Layer1.generate(), and Layer2/L3 at scale. Layer1 has the same unbounded col.get() as tool_status. """ -import os import time import pytest @@ -168,7 +167,7 @@ class TestLayer2Retrieval: latencies = [] for _ in range(10): start = time.perf_counter() - text = layer.retrieve(wing=wing, n_results=10) + layer.retrieve(wing=wing, n_results=10) elapsed_ms = (time.perf_counter() - start) * 1000 latencies.append(elapsed_ms) @@ -198,7 +197,7 @@ class TestLayer3Search: latencies = [] for q in queries: start = time.perf_counter() - text = stack.search(q, n_results=5) + stack.search(q, n_results=5) elapsed_ms = (time.perf_counter() - start) * 1000 latencies.append(elapsed_ms) diff --git a/tests/benchmarks/test_mcp_bench.py b/tests/benchmarks/test_mcp_bench.py index 8f2e006..4e8330b 100644 --- a/tests/benchmarks/test_mcp_bench.py +++ b/tests/benchmarks/test_mcp_bench.py @@ -14,7 +14,7 @@ import time import chromadb import pytest -from tests.benchmarks.data_generator import PalaceDataGenerator, SCALE_CONFIGS +from tests.benchmarks.data_generator import PalaceDataGenerator from tests.benchmarks.report import record_metric diff --git a/tests/benchmarks/test_memory_profile.py b/tests/benchmarks/test_memory_profile.py index 2b30688..769c501 100644 --- a/tests/benchmarks/test_memory_profile.py +++ b/tests/benchmarks/test_memory_profile.py @@ -8,7 +8,6 @@ Targets the highest-risk code paths: - Layer1.generate() (fetches all drawers) """ -import time import tracemalloc import pytest diff --git a/tests/benchmarks/test_recall_threshold.py b/tests/benchmarks/test_recall_threshold.py index 48f590b..e2c14ac 100644 --- a/tests/benchmarks/test_recall_threshold.py +++ b/tests/benchmarks/test_recall_threshold.py @@ -8,7 +8,6 @@ wing+room to find the actual embedding model limit. import hashlib import os -import time from datetime import datetime import chromadb diff --git a/tests/benchmarks/test_search_bench.py b/tests/benchmarks/test_search_bench.py index 274c693..5c2559e 100644 --- a/tests/benchmarks/test_search_bench.py +++ b/tests/benchmarks/test_search_bench.py @@ -216,7 +216,7 @@ class TestSearchNResultsScaling: latencies = [] for _ in range(5): start = time.perf_counter() - result = search_memories( + search_memories( "authentication middleware", palace_path=palace_path, n_results=n_results ) latencies.append((time.perf_counter() - start) * 1000) From ebc26f396052f91d12c04fa9cc6f90f278a27e8f Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:56:39 -0300 Subject: [PATCH 4/5] fix: resolve formatting, regression logic, and pytest defaults - Run ruff format on all benchmark files (fixes CI lint job) - Fix check_regression() substring ambiguity: ordered keyword matching so "latency_improvement_pct" is correctly classified as higher-is-better - Update stale comments in conftest.py referencing wrong fixture - Add pytest addopts to skip benchmark/slow/stress markers by default --- pyproject.toml | 1 + tests/benchmarks/conftest.py | 5 +- tests/benchmarks/data_generator.py | 322 ++++++++++++++---- tests/benchmarks/report.py | 61 +++- tests/benchmarks/test_chromadb_stress.py | 5 +- tests/benchmarks/test_ingest_bench.py | 10 +- .../benchmarks/test_knowledge_graph_bench.py | 24 +- tests/benchmarks/test_layers_bench.py | 8 +- tests/benchmarks/test_memory_profile.py | 16 +- tests/benchmarks/test_palace_boost.py | 8 +- tests/benchmarks/test_recall_threshold.py | 36 +- tests/benchmarks/test_search_bench.py | 25 +- 12 files changed, 383 insertions(+), 138 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 92e6732..9166a43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ quote-style = "double" [tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["."] +addopts = "-m 'not benchmark and not slow and not stress'" markers = [ "benchmark: scale/performance benchmark tests", "slow: tests that take more than 30 seconds", diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py index 8852a3b..bd3f201 100644 --- a/tests/benchmarks/conftest.py +++ b/tests/benchmarks/conftest.py @@ -96,8 +96,7 @@ def pytest_terminal_summary(terminalreporter, config): if not report_path: return - # Collect results from the session fixture if available - # The results are written by individual tests via bench_results fixture + # Collect results written by individual tests via record_metric() import platform import subprocess @@ -129,7 +128,7 @@ def pytest_terminal_summary(terminalreporter, config): "results": {}, } - # Read results from a temp file written by the bench_results fixture + # Read results from the temp file written by record_metric() results_file = os.path.join(tempfile.gettempdir(), "mempalace_bench_results.json") if os.path.exists(results_file): try: diff --git a/tests/benchmarks/data_generator.py b/tests/benchmarks/data_generator.py index 7dd093b..0184239 100644 --- a/tests/benchmarks/data_generator.py +++ b/tests/benchmarks/data_generator.py @@ -20,42 +20,150 @@ import yaml # ── Scale configurations ───────────────────────────────────────────────── SCALE_CONFIGS = { - "small": {"drawers": 1_000, "wings": 3, "rooms_per_wing": 5, "kg_entities": 50, "kg_triples": 200, "needles": 20, "search_queries": 20}, - "medium": {"drawers": 10_000, "wings": 8, "rooms_per_wing": 12, "kg_entities": 200, "kg_triples": 2_000, "needles": 50, "search_queries": 50}, - "large": {"drawers": 50_000, "wings": 15, "rooms_per_wing": 20, "kg_entities": 500, "kg_triples": 10_000, "needles": 100, "search_queries": 100}, - "stress": {"drawers": 100_000, "wings": 25, "rooms_per_wing": 30, "kg_entities": 1_000, "kg_triples": 50_000, "needles": 200, "search_queries": 200}, + "small": { + "drawers": 1_000, + "wings": 3, + "rooms_per_wing": 5, + "kg_entities": 50, + "kg_triples": 200, + "needles": 20, + "search_queries": 20, + }, + "medium": { + "drawers": 10_000, + "wings": 8, + "rooms_per_wing": 12, + "kg_entities": 200, + "kg_triples": 2_000, + "needles": 50, + "search_queries": 50, + }, + "large": { + "drawers": 50_000, + "wings": 15, + "rooms_per_wing": 20, + "kg_entities": 500, + "kg_triples": 10_000, + "needles": 100, + "search_queries": 100, + }, + "stress": { + "drawers": 100_000, + "wings": 25, + "rooms_per_wing": 30, + "kg_entities": 1_000, + "kg_triples": 50_000, + "needles": 200, + "search_queries": 200, + }, } # ── Vocabulary banks for realistic content ─────────────────────────────── WING_NAMES = [ - "webapp", "backend_api", "mobile_app", "data_pipeline", "ml_platform", - "devops", "auth_service", "payments", "analytics", "docs_site", - "cli_tool", "dashboard", "notification_service", "search_engine", - "user_mgmt", "inventory", "reporting", "testing_infra", "monitoring", - "email_service", "chat_bot", "file_storage", "scheduler", "gateway", + "webapp", + "backend_api", + "mobile_app", + "data_pipeline", + "ml_platform", + "devops", + "auth_service", + "payments", + "analytics", + "docs_site", + "cli_tool", + "dashboard", + "notification_service", + "search_engine", + "user_mgmt", + "inventory", + "reporting", + "testing_infra", + "monitoring", + "email_service", + "chat_bot", + "file_storage", + "scheduler", + "gateway", "marketplace", ] ROOM_NAMES = [ - "backend", "frontend", "api", "database", "auth", "tests", "docs", - "config", "deployment", "models", "views", "controllers", "middleware", - "utils", "schemas", "migrations", "fixtures", "scripts", "styles", - "components", "hooks", "services", "routes", "templates", "static", - "media", "logging", "cache", "queue", "workers", + "backend", + "frontend", + "api", + "database", + "auth", + "tests", + "docs", + "config", + "deployment", + "models", + "views", + "controllers", + "middleware", + "utils", + "schemas", + "migrations", + "fixtures", + "scripts", + "styles", + "components", + "hooks", + "services", + "routes", + "templates", + "static", + "media", + "logging", + "cache", + "queue", + "workers", ] TECH_TERMS = [ - "authentication", "authorization", "middleware", "endpoint", "REST API", - "GraphQL", "WebSocket", "database migration", "ORM", "query optimization", - "caching strategy", "load balancer", "rate limiting", "pagination", - "serialization", "validation", "error handling", "logging framework", - "monitoring", "deployment pipeline", "CI/CD", "containerization", - "microservice", "event sourcing", "message queue", "pub/sub", - "connection pooling", "session management", "token refresh", "CORS", - "SSL termination", "health check", "circuit breaker", "retry logic", - "batch processing", "stream processing", "data pipeline", "ETL", - "feature flag", "A/B testing", "blue-green deployment", "canary release", + "authentication", + "authorization", + "middleware", + "endpoint", + "REST API", + "GraphQL", + "WebSocket", + "database migration", + "ORM", + "query optimization", + "caching strategy", + "load balancer", + "rate limiting", + "pagination", + "serialization", + "validation", + "error handling", + "logging framework", + "monitoring", + "deployment pipeline", + "CI/CD", + "containerization", + "microservice", + "event sourcing", + "message queue", + "pub/sub", + "connection pooling", + "session management", + "token refresh", + "CORS", + "SSL termination", + "health check", + "circuit breaker", + "retry logic", + "batch processing", + "stream processing", + "data pipeline", + "ETL", + "feature flag", + "A/B testing", + "blue-green deployment", + "canary release", ] CODE_SNIPPETS = [ @@ -75,17 +183,51 @@ PROSE_TEMPLATES = [ ] ENTITY_NAMES = [ - "Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Heidi", - "Ivan", "Judy", "Karl", "Linda", "Mike", "Nina", "Oscar", "Pat", - "Quinn", "Rita", "Steve", "Tina", "Ursula", "Victor", "Wendy", "Xander", + "Alice", + "Bob", + "Carol", + "Dave", + "Eve", + "Frank", + "Grace", + "Heidi", + "Ivan", + "Judy", + "Karl", + "Linda", + "Mike", + "Nina", + "Oscar", + "Pat", + "Quinn", + "Rita", + "Steve", + "Tina", + "Ursula", + "Victor", + "Wendy", + "Xander", ] ENTITY_TYPES = ["person", "project", "tool", "concept", "team", "service"] PREDICATES = [ - "works_on", "manages", "reports_to", "collaborates_with", "created", - "maintains", "uses", "depends_on", "replaced", "reviewed", "deployed", - "tested", "documented", "mentors", "leads", "contributes_to", + "works_on", + "manages", + "reports_to", + "collaborates_with", + "created", + "maintains", + "uses", + "depends_on", + "replaced", + "reviewed", + "deployed", + "tested", + "documented", + "mentors", + "leads", + "contributes_to", ] @@ -136,13 +278,19 @@ class PalaceDataGenerator: room = self.rng.choice(self.rooms_by_wing[wing]) needle_id = f"NEEDLE_{i:04d}" content = f"{needle_id}: {topic}. This is a unique planted needle for recall benchmarking at scale." - self.needles.append({ - "id": needle_id, - "content": content, - "wing": wing, - "room": room, - "query": topic.split(" uses ")[0] if " uses " in topic else topic.split(" set to ")[0] if " set to " in topic else topic[:60], - }) + self.needles.append( + { + "id": needle_id, + "content": content, + "wing": wing, + "room": room, + "query": topic.split(" uses ")[0] + if " uses " in topic + else topic.split(" set to ")[0] + if " set to " in topic + else topic[:60], + } + ) def _random_text(self, min_chars=600, max_chars=900): """Generate a random text block of realistic content.""" @@ -159,21 +307,25 @@ class PalaceDataGenerator: component=self.rng.choice(ROOM_NAMES), task=self.rng.choice(TECH_TERMS), month=self.rng.choice(["January", "February", "March", "April", "May"]), - quality=self.rng.choice(["performance", "readability", "test coverage", "latency"]), + quality=self.rng.choice( + ["performance", "readability", "test coverage", "latency"] + ), decision=self.rng.choice(TECH_TERMS), condition=self.rng.choice(TECH_TERMS) + " is null", cause=self.rng.choice(["race condition", "null pointer", "timeout", "OOM"]), fix="adding " + self.rng.choice(TECH_TERMS), test_file=f"test_{self.rng.choice(ROOM_NAMES)}.py", old_tech=self.rng.choice(["MySQL", "Flask", "REST", "Jenkins"]), - new_tech=self.rng.choice(["PostgreSQL", "FastAPI", "GraphQL", "GitHub Actions"]), + new_tech=self.rng.choice( + ["PostgreSQL", "FastAPI", "GraphQL", "GitHub Actions"] + ), reason=self.rng.choice(TECH_TERMS), - date=f"2025-{self.rng.randint(1,12):02d}-{self.rng.randint(1,28):02d}", + date=f"2025-{self.rng.randint(1, 12):02d}-{self.rng.randint(1, 28):02d}", percent=self.rng.randint(10, 80), topic=self.rng.choice(TECH_TERMS), person=self.rng.choice(ENTITY_NAMES), action=self.rng.choice(["refactor", "migrate", "optimize", "test"]), - deadline=f"2025-{self.rng.randint(1,12):02d}-{self.rng.randint(1,28):02d}", + deadline=f"2025-{self.rng.randint(1, 12):02d}-{self.rng.randint(1, 28):02d}", followup=self.rng.choice(TECH_TERMS), feature_name=self.rng.choice(TECH_TERMS), capability=self.rng.choice(TECH_TERMS), @@ -182,7 +334,12 @@ class PalaceDataGenerator: ) else: words = self.rng.sample(TECH_TERMS, min(5, len(TECH_TERMS))) - text = " ".join(words) + ". " + self.rng.choice(TECH_TERMS) + " implementation details follow.\n" + text = ( + " ".join(words) + + ". " + + self.rng.choice(TECH_TERMS) + + " implementation details follow.\n" + ) parts.append(text) total += len(text) return "\n".join(parts)[:max_chars] @@ -270,15 +427,24 @@ class PalaceDataGenerator: needle_id = f"drawer_{needle['wing']}_{needle['room']}_{hashlib.md5(needle['id'].encode()).hexdigest()[:16]}" docs.append(needle["content"]) ids.append(needle_id) - metas.append({ - "wing": needle["wing"], - "room": needle["room"], - "source_file": f"needle_{needle['id']}.txt", - "chunk_index": 0, - "added_by": "benchmark", - "filed_at": datetime.now().isoformat(), - }) - needle_info.append({"id": needle_id, "query": needle["query"], "wing": needle["wing"], "room": needle["room"]}) + metas.append( + { + "wing": needle["wing"], + "room": needle["room"], + "source_file": f"needle_{needle['id']}.txt", + "chunk_index": 0, + "added_by": "benchmark", + "filed_at": datetime.now().isoformat(), + } + ) + needle_info.append( + { + "id": needle_id, + "query": needle["query"], + "wing": needle["wing"], + "room": needle["room"], + } + ) # Fill remaining drawers with realistic content remaining = n_drawers - len(docs) @@ -291,14 +457,16 @@ class PalaceDataGenerator: docs.append(content) ids.append(drawer_id) - metas.append({ - "wing": wing, - "room": room, - "source_file": f"generated_{i:06d}.txt", - "chunk_index": i % 10, - "added_by": "benchmark", - "filed_at": datetime.now().isoformat(), - }) + metas.append( + { + "wing": wing, + "room": room, + "source_file": f"generated_{i:06d}.txt", + "chunk_index": i % 10, + "added_by": "benchmark", + "filed_at": datetime.now().isoformat(), + } + ) # Flush in batches if len(docs) >= batch_size: @@ -351,7 +519,9 @@ class PalaceDataGenerator: valid_to = None if self.rng.random() < 0.3: end_offset = self.rng.randint(30, 365) - valid_to = (base_date + timedelta(days=days_offset + end_offset)).strftime("%Y-%m-%d") + valid_to = (base_date + timedelta(days=days_offset + end_offset)).strftime( + "%Y-%m-%d" + ) triples.append((subject, predicate, obj, valid_from, valid_to)) return entities, triples @@ -371,24 +541,28 @@ class PalaceDataGenerator: # Half are needle queries (known-good answers) n_needle = min(n_queries // 2, len(self.needles)) for needle in self.needles[:n_needle]: - queries.append({ - "query": needle["query"], - "expected_wing": needle["wing"], - "expected_room": needle["room"], - "needle_id": needle["id"], - "is_needle": True, - }) + queries.append( + { + "query": needle["query"], + "expected_wing": needle["wing"], + "expected_room": needle["room"], + "needle_id": needle["id"], + "is_needle": True, + } + ) # Other half are generic queries (measure latency, not recall) n_generic = n_queries - n_needle for _ in range(n_generic): - queries.append({ - "query": self.rng.choice(TECH_TERMS) + " " + self.rng.choice(TECH_TERMS), - "expected_wing": None, - "expected_room": None, - "needle_id": None, - "is_needle": False, - }) + queries.append( + { + "query": self.rng.choice(TECH_TERMS) + " " + self.rng.choice(TECH_TERMS), + "expected_wing": None, + "expected_room": None, + "needle_id": None, + "is_needle": False, + } + ) self.rng.shuffle(queries) return queries diff --git a/tests/benchmarks/report.py b/tests/benchmarks/report.py index 87009ca..61ac937 100644 --- a/tests/benchmarks/report.py +++ b/tests/benchmarks/report.py @@ -45,16 +45,45 @@ def check_regression(current_report: str, baseline_report: str, threshold: float baseline = json.load(f) regressions = [] - # Metrics where HIGHER is worse (latency, memory, etc.) - higher_is_worse = { - "latency", "rss", "memory", "oom", "lock_failures", "elapsed", - "p50_ms", "p95_ms", "p99_ms", "rss_delta_mb", "peak_rss_mb", - } - # Metrics where LOWER is worse (throughput, recall, etc.) - lower_is_worse = { - "recall", "throughput", "per_sec", "files_per_sec", "drawers_per_sec", - "triples_per_sec", "improvement", - } + # Keywords for metric direction — checked in order, first match wins. + # "improvement" is checked before "latency" so that composite names + # like "latency_improvement_pct" are classified correctly. + _higher_is_better_kw = [ + "improvement", + "recall", + "throughput", + "per_sec", + "files_per_sec", + "drawers_per_sec", + "triples_per_sec", + "speedup", + ] + _higher_is_worse_kw = [ + "latency", + "rss", + "memory", + "oom", + "lock_failures", + "elapsed", + "p50_ms", + "p95_ms", + "p99_ms", + "rss_delta_mb", + "peak_rss_mb", + "errors", + "failures", + ] + + def _metric_direction(name: str) -> str: + """Return 'higher_better', 'higher_worse', or 'unknown'.""" + low = name.lower() + for kw in _higher_is_better_kw: + if kw in low: + return "higher_better" + for kw in _higher_is_worse_kw: + if kw in low: + return "higher_worse" + return "unknown" for category in baseline.get("results", {}): if category not in current.get("results", {}): @@ -68,23 +97,21 @@ def check_regression(current_report: str, baseline_report: str, threshold: float if base_val == 0: continue - # Determine direction - is_latency_like = any(kw in metric.lower() for kw in higher_is_worse) - is_throughput_like = any(kw in metric.lower() for kw in lower_is_worse) + direction = _metric_direction(metric) - if is_latency_like: + if direction == "higher_worse": # Higher is worse — check if current exceeds baseline by threshold if curr_val > base_val * (1 + threshold): pct = ((curr_val - base_val) / base_val) * 100 regressions.append( - f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold*100:.0f}%)" + f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold * 100:.0f}%)" ) - elif is_throughput_like: + elif direction == "higher_better": # Lower is worse — check if current is below baseline by threshold if curr_val < base_val * (1 - threshold): pct = ((curr_val - base_val) / base_val) * 100 regressions.append( - f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold*100:.0f}%)" + f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold * 100:.0f}%)" ) return regressions diff --git a/tests/benchmarks/test_chromadb_stress.py b/tests/benchmarks/test_chromadb_stress.py index 4e998a0..1a77529 100644 --- a/tests/benchmarks/test_chromadb_stress.py +++ b/tests/benchmarks/test_chromadb_stress.py @@ -143,7 +143,10 @@ class TestBulkInsertPerformance: batch_end = min(batch_start + batch_size, n_docs) batch_docs = contents[batch_start:batch_end] batch_ids = [f"batch_{i}" for i in range(batch_start, batch_end)] - batch_metas = [{"wing": "test", "room": "bench", "chunk_index": i} for i in range(batch_start, batch_end)] + batch_metas = [ + {"wing": "test", "room": "bench", "chunk_index": i} + for i in range(batch_start, batch_end) + ] col_batch.add(documents=batch_docs, ids=batch_ids, metadatas=batch_metas) batched_ms = (time.perf_counter() - start) * 1000 diff --git a/tests/benchmarks/test_ingest_bench.py b/tests/benchmarks/test_ingest_bench.py index 6703d11..2b4ea5b 100644 --- a/tests/benchmarks/test_ingest_bench.py +++ b/tests/benchmarks/test_ingest_bench.py @@ -125,7 +125,9 @@ class TestChunkThroughput: chunks_per_sec = total_chunks / max(elapsed, 0.001) kb_per_sec = (len(content) * n_iterations / 1024) / max(elapsed, 0.001) - record_metric("chunking", f"chunks_per_sec_at_{content_size_kb}kb", round(chunks_per_sec, 1)) + record_metric( + "chunking", f"chunks_per_sec_at_{content_size_kb}kb", round(chunks_per_sec, 1) + ) record_metric("chunking", f"kb_per_sec_at_{content_size_kb}kb", round(kb_per_sec, 1)) @@ -160,4 +162,8 @@ class TestReingestSkipOverhead: record_metric("reingest", "skip_check_elapsed_sec", round(skip_elapsed, 2)) record_metric("reingest", "files_checked", files_written) - record_metric("reingest", "skip_check_per_file_ms", round(skip_elapsed * 1000 / max(files_written, 1), 1)) + record_metric( + "reingest", + "skip_check_per_file_ms", + round(skip_elapsed * 1000 / max(files_written, 1), 1), + ) diff --git a/tests/benchmarks/test_knowledge_graph_bench.py b/tests/benchmarks/test_knowledge_graph_bench.py index c9897fb..60236bc 100644 --- a/tests/benchmarks/test_knowledge_graph_bench.py +++ b/tests/benchmarks/test_knowledge_graph_bench.py @@ -36,9 +36,7 @@ class TestTripleInsertionRate: # Measure triple insertion start = time.perf_counter() for subject, predicate, obj, valid_from, valid_to in triples: - kg.add_triple( - subject, predicate, obj, valid_from=valid_from, valid_to=valid_to - ) + kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to) elapsed = time.perf_counter() - start triples_per_sec = n_triples / max(elapsed, 0.001) @@ -128,7 +126,9 @@ class TestTemporalQueryAccuracy: kg.add_entity("ProjectB", "project") # Alice worked on ProjectA from 2024-01 to 2024-06 - kg.add_triple("Alice", "works_on", "ProjectA", valid_from="2024-01-01", valid_to="2024-06-30") + kg.add_triple( + "Alice", "works_on", "ProjectA", valid_from="2024-01-01", valid_to="2024-06-30" + ) # Alice worked on ProjectB from 2024-07 onwards kg.add_triple("Alice", "works_on", "ProjectB", valid_from="2024-07-01") @@ -145,8 +145,16 @@ class TestTemporalQueryAccuracy: # Query Alice as of September 2024 — should find ProjectB result_sept = kg.query_entity("Alice", as_of="2024-09-15") - record_metric("kg_temporal", "march_query_results", len(result_march) if isinstance(result_march, list) else 0) - record_metric("kg_temporal", "sept_query_results", len(result_sept) if isinstance(result_sept, list) else 0) + record_metric( + "kg_temporal", + "march_query_results", + len(result_march) if isinstance(result_march, list) else 0, + ) + record_metric( + "kg_temporal", + "sept_query_results", + len(result_sept) if isinstance(result_sept, list) else 0, + ) @pytest.mark.benchmark @@ -230,7 +238,9 @@ class TestSQLiteConcurrentAccess: fails = 0 for i in range(50): try: - kg.add_triple(f"E_{i % 50}", "new_rel", f"E_{(i + 7) % 50}", valid_from="2025-06-01") + kg.add_triple( + f"E_{i % 50}", "new_rel", f"E_{(i + 7) % 50}", valid_from="2025-06-01" + ) except Exception: fails += 1 write_errors.append(fails) diff --git a/tests/benchmarks/test_layers_bench.py b/tests/benchmarks/test_layers_bench.py index 2237209..b9604d7 100644 --- a/tests/benchmarks/test_layers_bench.py +++ b/tests/benchmarks/test_layers_bench.py @@ -116,7 +116,9 @@ class TestLayer1UnboundedFetch: record_metric("layer1_filter", "unfiltered_ms", round(unfiltered_ms, 1)) record_metric("layer1_filter", "filtered_ms", round(filtered_ms, 1)) if unfiltered_ms > 0: - record_metric("layer1_filter", "speedup_pct", round((1 - filtered_ms / unfiltered_ms) * 100, 1)) + record_metric( + "layer1_filter", "speedup_pct", round((1 - filtered_ms / unfiltered_ms) * 100, 1) + ) @pytest.mark.benchmark @@ -146,7 +148,9 @@ class TestWakeUpTokenBudget: record_metric("wakeup_budget", f"tokens_at_{n_drawers}", token_estimate) record_metric("wakeup_budget", f"chars_at_{n_drawers}", len(text)) - assert token_estimate < 1200, f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers" + assert token_estimate < 1200, ( + f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers" + ) @pytest.mark.benchmark diff --git a/tests/benchmarks/test_memory_profile.py b/tests/benchmarks/test_memory_profile.py index 769c501..b299b2d 100644 --- a/tests/benchmarks/test_memory_profile.py +++ b/tests/benchmarks/test_memory_profile.py @@ -63,7 +63,9 @@ class TestSearchMemoryProfile: record_metric("memory_search", "rss_end_mb", round(end_rss, 2)) record_metric("memory_search", "rss_growth_mb", round(growth, 2)) record_metric("memory_search", "n_calls", n_calls) - record_metric("memory_search", "growth_per_100_calls_mb", round(growth / (n_calls / 100), 2)) + record_metric( + "memory_search", "growth_per_100_calls_mb", round(growth / (n_calls / 100), 2) + ) @pytest.mark.benchmark @@ -166,11 +168,13 @@ class TestHeapSnapshot: stats = snap_after.compare_to(snap_before, "lineno") top_allocators = [] for stat in stats[:10]: - top_allocators.append({ - "file": str(stat.traceback), - "size_kb": round(stat.size / 1024, 1), - "count": stat.count, - }) + top_allocators.append( + { + "file": str(stat.traceback), + "size_kb": round(stat.size / 1024, 1), + "count": stat.count, + } + ) total_growth_kb = sum(s["size_kb"] for s in top_allocators) record_metric("heap_search", "top_10_growth_kb", round(total_growth_kb, 1)) diff --git a/tests/benchmarks/test_palace_boost.py b/tests/benchmarks/test_palace_boost.py index 6994313..ca90784 100644 --- a/tests/benchmarks/test_palace_boost.py +++ b/tests/benchmarks/test_palace_boost.py @@ -123,8 +123,12 @@ class TestFilterLatencyBenefit: record_metric("filter_latency", "avg_wing_filtered_ms", round(avg_wing, 1)) record_metric("filter_latency", "avg_room_filtered_ms", round(avg_room, 1)) if avg_none > 0: - record_metric("filter_latency", "wing_speedup_pct", round((1 - avg_wing / avg_none) * 100, 1)) - record_metric("filter_latency", "room_speedup_pct", round((1 - avg_room / avg_none) * 100, 1)) + record_metric( + "filter_latency", "wing_speedup_pct", round((1 - avg_wing / avg_none) * 100, 1) + ) + record_metric( + "filter_latency", "room_speedup_pct", round((1 - avg_room / avg_none) * 100, 1) + ) @pytest.mark.benchmark diff --git a/tests/benchmarks/test_recall_threshold.py b/tests/benchmarks/test_recall_threshold.py index e2c14ac..afe2323 100644 --- a/tests/benchmarks/test_recall_threshold.py +++ b/tests/benchmarks/test_recall_threshold.py @@ -61,14 +61,16 @@ def _populate_single_room(palace_path, n_drawers, n_needles=10): drawer_id = f"drawer_single_room_{hashlib.md5(needle_id.encode()).hexdigest()[:16]}" docs.append(content) ids.append(drawer_id) - metas.append({ - "wing": "concentrated", - "room": "single_room", - "source_file": f"needle_{i}.txt", - "chunk_index": 0, - "added_by": "threshold_bench", - "filed_at": datetime.now().isoformat(), - }) + metas.append( + { + "wing": "concentrated", + "room": "single_room", + "source_file": f"needle_{i}.txt", + "chunk_index": 0, + "added_by": "threshold_bench", + "filed_at": datetime.now().isoformat(), + } + ) # Fill with noise — all in the SAME room remaining = n_drawers - len(docs) @@ -77,14 +79,16 @@ def _populate_single_room(palace_path, n_drawers, n_needles=10): drawer_id = f"drawer_single_room_{hashlib.md5(f'noise_{i}'.encode()).hexdigest()[:16]}" docs.append(content) ids.append(drawer_id) - metas.append({ - "wing": "concentrated", - "room": "single_room", - "source_file": f"noise_{i:06d}.txt", - "chunk_index": i % 10, - "added_by": "threshold_bench", - "filed_at": datetime.now().isoformat(), - }) + metas.append( + { + "wing": "concentrated", + "room": "single_room", + "source_file": f"noise_{i:06d}.txt", + "chunk_index": i % 10, + "added_by": "threshold_bench", + "filed_at": datetime.now().isoformat(), + } + ) if len(docs) >= batch_size: col.add(documents=docs, ids=ids, metadatas=metas) diff --git a/tests/benchmarks/test_search_bench.py b/tests/benchmarks/test_search_bench.py index 5c2559e..3cb7785 100644 --- a/tests/benchmarks/test_search_bench.py +++ b/tests/benchmarks/test_search_bench.py @@ -77,9 +77,7 @@ class TestSearchRecallAtScale: total_needle_queries = min(10, len(needle_info)) for needle in needle_info[:total_needle_queries]: - result = search_memories( - needle["query"], palace_path=palace_path, n_results=10 - ) + result = search_memories(needle["query"], palace_path=palace_path, n_results=10) if "error" in result: continue @@ -150,8 +148,12 @@ class TestSearchFilteredVsUnfiltered: record_metric("search_filter", "avg_unfiltered_ms", round(avg_unfiltered, 1)) record_metric("search_filter", "avg_filtered_ms", round(avg_filtered, 1)) record_metric("search_filter", "latency_improvement_pct", round(latency_improvement, 1)) - record_metric("search_filter", "unfiltered_recall_at_5", round(unfiltered_hits / max(n_queries, 1), 3)) - record_metric("search_filter", "filtered_recall_at_5", round(filtered_hits / max(n_queries, 1), 3)) + record_metric( + "search_filter", "unfiltered_recall_at_5", round(unfiltered_hits / max(n_queries, 1), 3) + ) + record_metric( + "search_filter", "filtered_recall_at_5", round(filtered_hits / max(n_queries, 1), 3) + ) @pytest.mark.benchmark @@ -167,9 +169,16 @@ class TestConcurrentSearch: from mempalace.searcher import search_memories queries = [ - "authentication", "database", "deployment", "error handling", - "testing", "monitoring", "caching", "middleware", - "serialization", "validation", + "authentication", + "database", + "deployment", + "error handling", + "testing", + "monitoring", + "caching", + "middleware", + "serialization", + "validation", ] * 3 # 30 total queries def run_search(query): From 37e15767f050e6cb608e98bfdd8778ab06999b64 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:11:15 -0300 Subject: [PATCH 5/5] ci: remove benchmark job from CI workflow Too heavy for CI (~2h per run). Benchmarks can be run locally with: pytest -m benchmark --bench-scale=small --bench-report=results.json --- .github/workflows/ci.yml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 09ddda2..9fd8a0d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,22 +20,6 @@ jobs: - run: pip install -e ".[dev]" - run: python -m pytest tests/ -v --ignore=tests/benchmarks - benchmark: - runs-on: ubuntu-latest - if: github.event_name == 'pull_request' - steps: - - uses: actions/checkout@v6 - - uses: actions/setup-python@v6 - with: - python-version: "3.11" - - run: pip install -e ".[dev]" - - run: python -m pytest tests/benchmarks/ -v -m "benchmark and not stress and not slow" --bench-scale=small --bench-report=bench-results.json - - uses: actions/upload-artifact@v6 - if: always() - with: - name: benchmark-results - path: bench-results.json - lint: runs-on: ubuntu-latest steps: