diff --git a/mempalace/fact_checker.py b/mempalace/fact_checker.py index 281f117..50e8842 100644 --- a/mempalace/fact_checker.py +++ b/mempalace/fact_checker.py @@ -1,152 +1,304 @@ """ fact_checker.py — Verify text against known facts in the palace. -Checks AI responses, diary entries, and new content against the -entity registry and knowledge graph for contradictions. Catches: - - Wrong names (similar but different entities) - - Wrong relationships (calling someone the wrong role) - - Stale facts (things that changed — KG has valid_from/valid_to) +Checks AI responses, diary entries, and new content against the entity +registry and knowledge graph for three classes of issue: -Uses the entity_registry and knowledge_graph — no hardcoded facts. + * similar_name — text mentions a name that's one/two edits + away from *another* registered name, raising + the possibility of a typo or mix-up. + * relationship_mismatch — text asserts a role between two entities + (e.g. "Bob is Alice's brother") while the KG + records a *different* current role for the + same subject/object pair. + * stale_fact — text asserts a fact that the KG marks closed + (``valid_to`` in the past). + +Purely offline. Inputs: entity_registry JSON + KG SQLite. No network. Usage: from mempalace.fact_checker import check_text issues = check_text("Bob is Alice's brother", palace_path) - # → [{"type": "relationship_mismatch", "detail": "KG says Bob is Alice's husband"}] # CLI - python -m mempalace.fact_checker "Bob is Alice's brother" --palace ~/.mempalace/palace + python -m mempalace.fact_checker "Bob is Alice's brother" \\ + --palace ~/.mempalace/palace """ +from __future__ import annotations + import os import re -from pathlib import Path +from datetime import datetime, timezone + +# Share miner's mtime-cached registry loader so we don't double-read +# ~/.mempalace/known_entities.json on every check_text call. +from .miner import _load_known_entities_raw -def check_text(text, palace_path=None, config=None): - """Check text for contradictions against known facts. +# Narrow detection patterns — parse "X is Y's Z" and "X's Z is Y". +# Names are captured greedily as word sequences (letters + optional +# capitalized follow-ons) so simple multi-token names still work. +# Relationship words are constrained to sane lengths to avoid matching +# arbitrary filler. +_RELATIONSHIP_PATTERNS = [ + # "Bob is Alice's brother" → subject=Bob, possessor=Alice, role=brother + re.compile(r"\b([A-Z][\w-]+)\s+is\s+([A-Z][\w-]+)'s\s+([a-z]{3,20})\b"), + # "Alice's brother is Bob" → possessor=Alice, role=brother, subject=Bob + re.compile(r"\b([A-Z][\w-]+)'s\s+([a-z]{3,20})\s+is\s+([A-Z][\w-]+)\b"), +] - Returns list of issues found. Empty list = no contradictions. + +def check_text(text: str, palace_path: str = None, config=None) -> list: + """Return a list of issues detected in ``text``. + + Empty list means "no contradictions found" — absence of evidence, not + evidence of absence. The detector is deliberately conservative: + every issue is anchored to a specific KG fact or registry entry. """ if config is None: from .config import MempalaceConfig + config = MempalaceConfig() if palace_path is None: palace_path = config.palace_path - issues = [] + if not text: + return [] - # Load known entities - entity_names = _load_known_entities() + issues: list = [] + entity_names_raw = _load_known_entities_raw() - # Check entity name confusion (similar names that might be mixed up) - issues.extend(_check_entity_confusion(text, entity_names)) - - # Check against knowledge graph facts - issues.extend(_check_kg_facts(text, palace_path)) + issues.extend(_check_entity_confusion(text, entity_names_raw)) + issues.extend(_check_kg_contradictions(text, palace_path)) return issues -def _load_known_entities(): - """Load entity names from the registry.""" - import json - registry_path = os.path.expanduser("~/.mempalace/known_entities.json") - if not os.path.exists(registry_path): - return {} - try: - return json.loads(open(registry_path).read()) - except Exception: - return {} +# ── entity-name confusion ──────────────────────────────────────────── -def _check_entity_confusion(text, entity_names): - """Check if text confuses similar entity names.""" - issues = [] - all_names = set() - for cat in entity_names.values(): +def _flatten_names(entity_names_raw: dict) -> set: + """Flatten a ``{category: [names]}`` or ``{category: {name: meta}}`` + registry into a set of names.""" + flat: set = set() + for cat in entity_names_raw.values(): if isinstance(cat, list): - all_names.update(cat) + flat.update(str(n) for n in cat if n) elif isinstance(cat, dict): - all_names.update(cat.keys()) + flat.update(str(k) for k in cat.keys() if k) + return flat - # Find names mentioned in text - mentioned = set() + +def _check_entity_confusion(text: str, entity_names_raw: dict) -> list: + """Flag names mentioned in the text that are edit-distance ≤ 2 from + a *different* registered name — a common typo / mix-up pattern. + + Performance note: the original O(n²) pairwise scan over the full + registry is gone. We first identify which names actually appear in + the text, then only compute edit distance between *mentioned* names + and the rest of the registry. This makes the cost O(m × n) where m + is the handful of names in the text, not the full registry. + """ + all_names = _flatten_names(entity_names_raw) + if not all_names: + return [] + + # Which names from the registry actually appear in the text? + mentioned: list = [] for name in all_names: - if re.search(r'\b' + re.escape(name) + r'\b', text, re.IGNORECASE): - mentioned.add(name) + if re.search(r"\b" + re.escape(name) + r"\b", text, re.IGNORECASE): + mentioned.append(name) + if not mentioned: + return [] - # Check for names that are very similar but different (edit distance 1-2) - name_list = sorted(all_names) - for i, name_a in enumerate(name_list): - for name_b in name_list[i + 1:]: - if _edit_distance(name_a.lower(), name_b.lower()) <= 2: - if name_a in mentioned or name_b in mentioned: - if name_a in text and name_b not in text: - issues.append({ - "type": "similar_name", - "detail": f"'{name_a}' mentioned — did you mean '{name_b}'? (similar names in registry)", - "names": [name_a, name_b], - }) + issues: list = [] + seen_pairs: set = set() + for name_a in mentioned: + a_lower = name_a.lower() + for name_b in all_names: + if name_b == name_a: + continue + # Dedupe by unordered pair so we don't double-report. + pair_key = tuple(sorted((name_a.lower(), name_b.lower()))) + if pair_key in seen_pairs: + continue + # Only flag when name_b is a *different* registry entry that + # was NOT mentioned — otherwise both names in the text is + # just the user writing about two people. + if name_b in mentioned: + seen_pairs.add(pair_key) + continue + distance = _edit_distance(a_lower, name_b.lower()) + if 0 < distance <= 2: + issues.append( + { + "type": "similar_name", + "detail": ( + f"'{name_a}' mentioned — did you mean " + f"'{name_b}'? (edit distance {distance})" + ), + "names": [name_a, name_b], + "distance": distance, + } + ) + seen_pairs.add(pair_key) return issues -def _check_kg_facts(text, palace_path): - """Check text against knowledge graph for contradictions.""" - issues = [] +# ── KG contradictions ──────────────────────────────────────────────── + + +def _extract_claims(text: str) -> list: + """Yield structured (subject, predicate, object) claims from ``text``. + + The two supported surface forms are "X is Y's Z" and "X's Z is Y", + both of which resolve to the triple ``(X, Z, Y)`` — ``X`` has role + ``Z`` with respect to ``Y``. Matches are case-preserving for the + entity names (KG lookup is case-insensitive on normalized IDs). + """ + claims: list = [] + for pat in _RELATIONSHIP_PATTERNS: + for match in pat.finditer(text): + groups = match.groups() + if pat is _RELATIONSHIP_PATTERNS[0]: + subject, possessor, role = groups[0], groups[1], groups[2] + else: + possessor, role, subject = groups[0], groups[1], groups[2] + claims.append( + { + "subject": subject, + "predicate": role.lower(), + "object": possessor, + "span": match.group(0), + } + ) + return claims + + +def _check_kg_contradictions(text: str, palace_path: str) -> list: + """Compare each claim in ``text`` against the KG. + + For every claim ``(subject, predicate, object)`` parsed from the + text, look up the subject's current KG triples: + + * ``relationship_mismatch`` fires when the KG records a fact about + the same ``(subject, object)`` pair but with a *different* + predicate — e.g. text says "brother" but KG says "husband". + * ``stale_fact`` fires when the KG has the exact ``(subject, + predicate, object)`` triple but its ``valid_to`` is in the past, + meaning the claim is no longer current. + """ + claims = _extract_claims(text) + if not claims: + return [] + try: from .knowledge_graph import KnowledgeGraph - kg = KnowledgeGraph(palace_path=palace_path) - # Extract relationship claims from text - # Pattern: "X is Y's Z" or "X's Z is Y" - patterns = [ - (r"(\w+)\s+is\s+(\w+)'s\s+(\w+)", "subject", "possessor", "role"), - (r"(\w+)'s\s+(\w+)\s+is\s+(\w+)", "possessor", "role", "subject"), - ] - - for pattern, *roles in patterns: - for match in re.finditer(pattern, text, re.IGNORECASE): - groups = match.groups() - subject = groups[0] - # Query KG for this entity - try: - facts = kg.query(subject) - if facts: - for fact in facts: - # Check if the claim contradicts a known fact - if fact.get("valid_to") is None: # current fact - kg_pred = fact.get("predicate", "").lower() - claim = match.group(0).lower() - if kg_pred in claim and fact.get("object", "").lower() not in claim: - issues.append({ - "type": "relationship_mismatch", - "detail": f"Text says '{match.group(0)}' but KG says: {subject} {kg_pred} {fact.get('object')}", - "entity": subject, - }) - except Exception: - pass + # KG lives alongside the palace collection; mcp_server uses the + # same convention (see _kg init). Pass ``db_path`` — the previous + # code passed a nonexistent ``palace_path`` kwarg which raised + # TypeError, silently swallowed by the outer except and rendered + # the entire KG-check path dead. + kg = KnowledgeGraph(db_path=os.path.join(palace_path, "knowledge_graph.sqlite3")) except Exception: - pass # KG not available — skip + # KG unavailable (brand-new palace, corrupted DB, etc.) — skip. + return [] + + issues: list = [] + for claim in claims: + subject = claim["subject"] + claim_pred = claim["predicate"] + claim_obj = claim["object"] + try: + facts = kg.query_entity(subject, direction="outgoing") + except Exception: + continue + if not facts: + continue + + current_facts = [f for f in facts if f.get("current")] + + # Mismatch: KG fact about same (subject, object) pair but different predicate. + for fact in current_facts: + if not _objects_match(fact.get("object"), claim_obj): + continue + kg_pred = (fact.get("predicate") or "").lower() + if kg_pred and kg_pred != claim_pred: + issues.append( + { + "type": "relationship_mismatch", + "detail": ( + f"Text says '{claim['span']}' but KG records " + f"{subject} {kg_pred} {fact.get('object')}" + ), + "entity": subject, + "claim": { + "predicate": claim_pred, + "object": claim_obj, + }, + "kg_fact": { + "predicate": kg_pred, + "object": fact.get("object"), + }, + } + ) + + # Stale fact: exact match on (subject, predicate, object) but KG + # closed the window in the past. + now_iso = datetime.now(timezone.utc).date().isoformat() + for fact in facts: + if fact.get("current"): + continue + kg_pred = (fact.get("predicate") or "").lower() + if kg_pred != claim_pred: + continue + if not _objects_match(fact.get("object"), claim_obj): + continue + valid_to = fact.get("valid_to") + if valid_to and str(valid_to) < now_iso: + issues.append( + { + "type": "stale_fact", + "detail": ( + f"Text says '{claim['span']}' but KG marks " + f"this fact closed on {valid_to}" + ), + "entity": subject, + "valid_to": valid_to, + } + ) return issues -def _edit_distance(s1, s2): - """Simple Levenshtein distance.""" +def _objects_match(kg_obj, claim_obj: str) -> bool: + if kg_obj is None or not claim_obj: + return False + return str(kg_obj).strip().lower() == claim_obj.strip().lower() + + +# ── Levenshtein helper (tight iterative version) ───────────────────── + + +def _edit_distance(s1: str, s2: str) -> int: + """Levenshtein distance. O(len(s1) * len(s2)) time, O(len(s2)) space.""" if len(s1) < len(s2): - return _edit_distance(s2, s1) - if len(s2) == 0: + s1, s2 = s2, s1 + if not s2: return len(s1) prev = list(range(len(s2) + 1)) for i, c1 in enumerate(s1): curr = [i + 1] for j, c2 in enumerate(s2): - curr.append(min( - prev[j + 1] + 1, - curr[j] + 1, - prev[j] + (0 if c1 == c2 else 1), - )) + curr.append( + min( + prev[j + 1] + 1, + curr[j] + 1, + prev[j] + (0 if c1 == c2 else 1), + ) + ) prev = curr return prev[-1] @@ -154,24 +306,30 @@ def _edit_distance(s1, s2): if __name__ == "__main__": import argparse import json + import sys - parser = argparse.ArgumentParser(description="Check text against known facts") - parser.add_argument("text", nargs="?", help="Text to check") - parser.add_argument("--palace", default=os.path.expanduser("~/.mempalace/palace")) - parser.add_argument("--stdin", action="store_true", help="Read from stdin") + parser = argparse.ArgumentParser( + description="Check text against known facts in the MemPalace palace.", + epilog="Exits 0 when no issues found, 1 when one or more issues detected.", + ) + parser.add_argument("text", nargs="?", help="Text to check (or use --stdin).") + parser.add_argument( + "--palace", + default=os.path.expanduser("~/.mempalace/palace"), + help="Path to the palace directory.", + ) + parser.add_argument("--stdin", action="store_true", help="Read text from stdin.") args = parser.parse_args() if args.stdin: - import sys - text = sys.stdin.read() + text_in = sys.stdin.read() elif args.text: - text = args.text + text_in = args.text else: - print("Provide text as argument or use --stdin") - exit(1) + parser.error("Provide text as argument or use --stdin.") - issues = check_text(text, palace_path=args.palace) - if issues: - print(json.dumps(issues, indent=2)) - else: - print("No contradictions found.") + found = check_text(text_in, palace_path=args.palace) + if found: + print(json.dumps(found, indent=2)) + sys.exit(1) + print("No contradictions found.") diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 08226a9..31be8a4 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -35,7 +35,15 @@ from .version import __version__ import chromadb from .query_sanitizer import sanitize_query from .searcher import search_memories -from .palace_graph import traverse, find_tunnels, graph_stats, create_tunnel, list_tunnels, delete_tunnel, follow_tunnels +from .palace_graph import ( + traverse, + find_tunnels, + graph_stats, + create_tunnel, + list_tunnels, + delete_tunnel, + follow_tunnels, +) from .knowledge_graph import KnowledgeGraph @@ -519,7 +527,10 @@ def tool_create_tunnel( except ValueError as e: return {"error": str(e)} return create_tunnel( - source_wing, source_room, target_wing, target_room, + source_wing, + source_room, + target_wing, + target_room, label=label, source_drawer_id=source_drawer_id, target_drawer_id=target_drawer_id, @@ -1251,8 +1262,14 @@ TOOLS = { "target_wing": {"type": "string", "description": "Wing of the target"}, "target_room": {"type": "string", "description": "Room in the target wing"}, "label": {"type": "string", "description": "Description of the connection"}, - "source_drawer_id": {"type": "string", "description": "Optional specific drawer ID"}, - "target_drawer_id": {"type": "string", "description": "Optional specific drawer ID"}, + "source_drawer_id": { + "type": "string", + "description": "Optional specific drawer ID", + }, + "target_drawer_id": { + "type": "string", + "description": "Optional specific drawer ID", + }, }, "required": ["source_wing", "source_room", "target_wing", "target_room"], }, @@ -1263,7 +1280,10 @@ TOOLS = { "input_schema": { "type": "object", "properties": { - "wing": {"type": "string", "description": "Filter tunnels by wing (shows tunnels where wing is source or target)"}, + "wing": { + "type": "string", + "description": "Filter tunnels by wing (shows tunnels where wing is source or target)", + }, }, }, "handler": tool_list_tunnels, diff --git a/mempalace/miner.py b/mempalace/miner.py index 04bcf61..3d8e29e 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -379,17 +379,17 @@ def chunk_text(content: str, source_file: str) -> list: _ENTITY_REGISTRY_PATH = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json") -_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset()} +_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset(), "raw": {}} _ENTITY_EXTRACT_WINDOW = 5000 # chars of content scanned for capitalized words _ENTITY_METADATA_LIMIT = 25 # max entities packed into the metadata field -def _load_known_entities() -> frozenset: - """Load (and cache) the user's known-entity registry by mtime. - - Reads ``~/.mempalace/known_entities.json``. The registry is shaped as - ``{"category": ["Name1", "Name2", ...], ...}``. Cached across calls - in the same process; invalidated when the file's mtime changes. +def _refresh_known_entities_cache() -> None: + """Reload ``~/.mempalace/known_entities.json`` into the module cache if + its mtime changed since the last read. Shared by ``_load_known_entities`` + (flat set) and ``_load_known_entities_raw`` (category dict), so callers + can pick whichever shape they need without duplicating the mtime-gated + disk read. """ try: mtime = os.path.getmtime(_ENTITY_REGISTRY_PATH) @@ -397,28 +397,56 @@ def _load_known_entities() -> frozenset: if _ENTITY_REGISTRY_CACHE["mtime"] is not None: _ENTITY_REGISTRY_CACHE["mtime"] = None _ENTITY_REGISTRY_CACHE["names"] = frozenset() - return _ENTITY_REGISTRY_CACHE["names"] + _ENTITY_REGISTRY_CACHE["raw"] = {} + return if _ENTITY_REGISTRY_CACHE["mtime"] == mtime: - return _ENTITY_REGISTRY_CACHE["names"] + return names: set = set() + raw: dict = {} try: import json with open(_ENTITY_REGISTRY_PATH, "r", encoding="utf-8") as f: data = json.load(f) - for cat in data.values(): - if isinstance(cat, list): - names.update(str(n) for n in cat if n) + if isinstance(data, dict): + raw = data + for cat in data.values(): + if isinstance(cat, list): + names.update(str(n) for n in cat if n) + elif isinstance(cat, dict): + names.update(str(k) for k in cat.keys() if k) except Exception: names = set() + raw = {} _ENTITY_REGISTRY_CACHE["mtime"] = mtime _ENTITY_REGISTRY_CACHE["names"] = frozenset(names) + _ENTITY_REGISTRY_CACHE["raw"] = raw + + +def _load_known_entities() -> frozenset: + """Flat set of every known entity name (across all categories). + + Cached by mtime; invalidated when the registry file changes. + """ + _refresh_known_entities_cache() return _ENTITY_REGISTRY_CACHE["names"] +def _load_known_entities_raw() -> dict: + """Full category-dict view of the registry, shape + ``{"category": ["Name1", ...], ...}``. Cached by mtime. + + Consumed by modules (e.g., fact_checker) that need to reason about + categories rather than a flat name set. Never returns a mutable + reference to the cache — callers get a shallow copy. + """ + _refresh_known_entities_cache() + return dict(_ENTITY_REGISTRY_CACHE["raw"]) + + def _extract_entities_for_metadata(content: str) -> str: """Extract entity names from content for metadata tagging. diff --git a/tests/test_fact_checker.py b/tests/test_fact_checker.py new file mode 100644 index 0000000..5b34a40 --- /dev/null +++ b/tests/test_fact_checker.py @@ -0,0 +1,288 @@ +""" +test_fact_checker.py — Regression + integration tests for fact_checker. + +Covers every detection path + the three bugs the original PR silently +hid behind ``except Exception: pass``: + + * ``kg.query()`` doesn't exist — code must use ``query_entity``. + * ``KnowledgeGraph(palace_path=...)`` is not a valid kwarg — code + must pass ``db_path``. + * O(n²) edit-distance over the full registry — must filter to names + actually mentioned in the text. + +Also pins the three feature contracts: + * similar_name — "Mila" vs "Milla" in a registry with both. + * relationship_mismatch — "Bob is Alice's brother" vs KG "husband". + * stale_fact — claim matches a triple whose valid_to is in the past. +""" + +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from mempalace.fact_checker import ( + _check_entity_confusion, + _edit_distance, + _extract_claims, + _flatten_names, + check_text, +) +from mempalace.knowledge_graph import KnowledgeGraph + + +# ── claim extraction ───────────────────────────────────────────────── + + +class TestExtractClaims: + def test_parses_x_is_ys_z(self): + claims = _extract_claims("Bob is Alice's brother") + assert len(claims) == 1 + assert claims[0] == { + "subject": "Bob", + "predicate": "brother", + "object": "Alice", + "span": "Bob is Alice's brother", + } + + def test_parses_xs_z_is_y(self): + claims = _extract_claims("Alice's brother is Bob") + assert len(claims) == 1 + assert claims[0]["subject"] == "Bob" + assert claims[0]["predicate"] == "brother" + assert claims[0]["object"] == "Alice" + + def test_ignores_sentences_without_possessive_role(self): + assert _extract_claims("Bob drove to the store today") == [] + assert _extract_claims("Just some prose without relationships") == [] + + def test_multiple_claims_in_one_text(self): + claims = _extract_claims("Bob is Alice's brother. Carol is Dave's sister.") + subjects = {c["subject"] for c in claims} + assert subjects == {"Bob", "Carol"} + + +# ── entity confusion ───────────────────────────────────────────────── + + +class TestEntityConfusion: + def test_flags_near_name_when_only_one_mentioned(self): + registry = {"people": ["Milla", "Mila"]} + issues = _check_entity_confusion("I spoke with Mila today.", registry) + # "Mila" mentioned, "Milla" not — registry has both at edit-distance 1, + # flag the possible confusion. + assert len(issues) == 1 + assert issues[0]["type"] == "similar_name" + assert set(issues[0]["names"]) == {"Mila", "Milla"} + assert issues[0]["distance"] == 1 + + def test_no_false_positive_when_both_names_mentioned(self): + """Regression: a text discussing both Mila and Milla is fine — + the user clearly knows they're different. Don't nag.""" + registry = {"people": ["Milla", "Mila"]} + issues = _check_entity_confusion("Mila and Milla met for lunch.", registry) + assert issues == [] + + def test_no_issues_when_registry_empty(self): + assert _check_entity_confusion("Bob said hi", {}) == [] + assert _check_entity_confusion("Bob said hi", {"people": []}) == [] + + def test_no_issues_when_no_mentioned_names(self): + registry = {"people": ["Zelda", "Link", "Sheik"]} + assert _check_entity_confusion("nothing relevant here", registry) == [] + + def test_registry_dict_shape_is_supported(self): + # Some registries store {"people": {"Alice": {...meta}}}; we still + # need to surface the keys as candidate names. + registry = {"people": {"Milla": {"role": "creator"}, "Mila": {}}} + issues = _check_entity_confusion("I messaged Mila yesterday", registry) + assert any("Milla" in (i["names"] or []) for i in issues) + + +class TestEditDistance: + def test_basic_distances(self): + assert _edit_distance("kitten", "sitting") == 3 + assert _edit_distance("mila", "milla") == 1 + assert _edit_distance("abc", "abc") == 0 + + def test_empty_strings(self): + assert _edit_distance("", "") == 0 + assert _edit_distance("abc", "") == 3 + assert _edit_distance("", "abc") == 3 + + def test_performance_bounded_by_mentioned_names(self): + """Regression: an earlier implementation did O(n²) pairwise + edit-distance over every registry entry on every check_text call. + With 100 names and zero mentions, the call must return in a blink + because no edit-distance comparison should even start.""" + import time + + # 500 random names, none of which appear in the text. + registry = {"people": [f"Zelda{i:03d}" for i in range(500)]} + text = "completely irrelevant prose with no registered names at all" + + start = time.perf_counter() + issues = _check_entity_confusion(text, registry) + elapsed = time.perf_counter() - start + + assert issues == [] + # Even an unoptimized implementation should beat this by orders + # of magnitude once we've filtered to mentioned names (which is + # 0 here) — if it's still doing O(n²), we'll blow past. + assert elapsed < 0.2, f"entity confusion took {elapsed:.3f}s on empty mentions" + + +# ── _flatten_names helper ──────────────────────────────────────────── + + +class TestFlattenNames: + def test_handles_list_categories(self): + assert _flatten_names({"people": ["Ada", "Bob"]}) == {"Ada", "Bob"} + + def test_handles_dict_categories(self): + assert _flatten_names({"people": {"Ada": {}, "Bob": {}}}) == {"Ada", "Bob"} + + def test_skips_falsy_entries(self): + assert _flatten_names({"people": ["Ada", "", None, "Bob"]}) == {"Ada", "Bob"} + + +# ── KG integration (uses a real tmp SQLite palace) ─────────────────── + + +@pytest.fixture +def palace_with_kg(tmp_path): + """Palace directory with a real KG pre-seeded with a few triples. + + The KG file lives at ``/knowledge_graph.sqlite3`` — same + convention used by the MCP server. Fact-checker must find it via + that path, not via a bogus ``palace_path`` kwarg. + """ + palace = tmp_path / "palace" + palace.mkdir() + db = str(palace / "knowledge_graph.sqlite3") + kg = KnowledgeGraph(db_path=db) + yield palace, kg + + +class TestKGContradictions: + def test_kg_init_uses_db_path_not_palace_path_kwarg(self): + """Regression: the original code passed ``palace_path=`` to a + constructor whose only kwarg is ``db_path``. That raised + TypeError — silently swallowed — and the KG path became dead + code. This test pins the correct call signature.""" + # Simply construct via the correct signature; raising means the + # KG constructor has changed in a way that fact_checker must too. + kg = KnowledgeGraph(db_path=":memory:") + # query_entity must exist (this is the method fact_checker calls). + assert callable(getattr(kg, "query_entity", None)) + # The API that fact_checker used to call does NOT exist. + assert not hasattr(kg, "query") + + def test_relationship_mismatch_detected(self, palace_with_kg): + """The feature's headline example: text says brother, KG says husband.""" + palace, kg = palace_with_kg + kg.add_triple("Bob", "husband_of", "Alice", valid_from="2020-01-01") + + issues = check_text("Bob is Alice's husband_of", str(palace)) + # Exact-predicate + same object → no mismatch. + assert all(i["type"] != "relationship_mismatch" for i in issues) + + issues = check_text("Bob is Alice's brother", str(palace)) + mismatches = [i for i in issues if i["type"] == "relationship_mismatch"] + assert mismatches, "should flag text/KG mismatch for same (subject, object)" + m = mismatches[0] + assert m["entity"] == "Bob" + assert m["claim"]["predicate"] == "brother" + assert m["kg_fact"]["predicate"] == "husband_of" + + def test_no_false_positive_when_kg_has_no_facts_about_subject(self, palace_with_kg): + palace, _ = palace_with_kg + # KG is empty → no mismatch should fire. + assert check_text("Bob is Alice's brother", str(palace)) == [] + + def test_stale_fact_detected(self, palace_with_kg): + palace, kg = palace_with_kg + # An old relationship that was superseded in 2023. Using a + # possessive-shape claim so the narrow claim-extraction regex + # actually reaches the stale-fact branch. + kg.add_triple( + "Bob", + "brother", + "Alice", + valid_from="2010-01-01", + valid_to="2023-06-01", + ) + issues = check_text("Bob is Alice's brother", str(palace)) + stale = [i for i in issues if i["type"] == "stale_fact"] + assert stale, "should flag closed-window fact as stale" + assert stale[0]["entity"] == "Bob" + assert stale[0]["valid_to"].startswith("2023") + + def test_current_fact_same_triple_is_not_flagged(self, palace_with_kg): + palace, kg = palace_with_kg + kg.add_triple("Bob", "brother", "Alice", valid_from="2010-01-01") + issues = check_text("Bob is Alice's brother", str(palace)) + assert issues == [] + + def test_missing_palace_does_not_crash(self, tmp_path): + """Brand-new palace (no KG file yet) — check_text must return [] + rather than raising or hanging.""" + nonexistent = str(tmp_path / "never_created") + assert check_text("Bob is Alice's brother", nonexistent) == [] + + +# ── end-to-end check_text contract ─────────────────────────────────── + + +class TestCheckTextContract: + def test_empty_text_returns_empty_list(self, tmp_path): + assert check_text("", str(tmp_path / "palace")) == [] + + def test_registry_confusion_path_isolated_from_kg(self, tmp_path, monkeypatch): + """If the registry file is present but the KG is missing, the + similar-name path must still fire. Prior implementations had + such entangled state that one failure killed both paths.""" + # Bypass the real registry by pointing cache at a temp file. + registry = tmp_path / "known_entities.json" + registry.write_text(json.dumps({"people": ["Milla", "Mila"]})) + from mempalace import miner + + monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry)) + miner._ENTITY_REGISTRY_CACHE.update({"mtime": None, "names": frozenset(), "raw": {}}) + + issues = check_text("Chatted with Mila.", str(tmp_path / "nonexistent_palace")) + assert any(i["type"] == "similar_name" for i in issues) + + +# ── CLI ────────────────────────────────────────────────────────────── + + +class TestCLI: + def test_exits_nonzero_when_issues_found(self, tmp_path, monkeypatch, capsys): + """The CLI exit code is how shell scripts / hooks know to act — + pin it explicitly.""" + registry = tmp_path / "known_entities.json" + registry.write_text(json.dumps({"people": ["Milla", "Mila"]})) + from mempalace import fact_checker, miner + + monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry)) + miner._ENTITY_REGISTRY_CACHE.update({"mtime": None, "names": frozenset(), "raw": {}}) + + # Simulate argv: "Mila said hi" + monkeypatch.setattr( + "sys.argv", + ["fact_checker", "Mila said hi", "--palace", str(tmp_path / "palace")], + ) + with pytest.raises(SystemExit) as excinfo: + # Re-exec the __main__ block via runpy. + import runpy + + runpy.run_module("mempalace.fact_checker", run_name="__main__") + # Issues found → exit code 1. + assert excinfo.value.code == 1 + out = capsys.readouterr().out + assert "similar_name" in out + # Silence unused import warning. + _ = (MagicMock, patch, fact_checker)