merge: full hardened stack + rewrite fact_checker around actual KG API

Merges the full hardened stack (up through #791 drawer-grep) and turns
fact_checker from "dead code hidden behind bare except" into an
actually-working offline contradiction detector with tests.

## Dead paths the PR body advertised but the code never executed

Both buried by a single outer ``except Exception: pass``:

  * ``kg.query(subject)`` — ``KnowledgeGraph`` has no ``query()`` method;
    it has ``query_entity()``. The attribute error was silently swallowed
    and the entire KG branch always returned ``[]``. Now using
    ``kg.query_entity(subject, direction="outgoing")`` with proper
    handling of the ``predicate``/``object``/``current``/``valid_to``
    fields the real API returns.
  * ``KnowledgeGraph(palace_path=palace_path)`` — the constructor's only
    kwarg is ``db_path``. Passing ``palace_path`` raised TypeError,
    silently swallowed. Now computing the db_path correctly from
    ``<palace>/knowledge_graph.sqlite3``, matching the convention the
    MCP server already uses.

## Contradiction logic rewritten

The previous ``if kg_pred in claim and fact.object not in claim`` only
fired when text used the SAME predicate word as the KG fact — the exact
opposite of the stated use case ("Bob is Alice's brother" when KG says
husband" would NOT have fired). Replaced with a proper parse → lookup
→ compare pipeline:

  * ``_extract_claims`` parses two surface forms ("X is Y's Z" and
    "X's Z is Y") into ``(subject, predicate, object)`` triples.
  * ``_check_kg_contradictions`` pulls the subject's outgoing facts
    and flags two classes:
      - ``relationship_mismatch`` when a current KG fact matches the
        same ``(subject, object)`` pair but with a different predicate.
      - ``stale_fact`` when the exact triple exists but is
        ``valid_to``-closed in the past.
  * Stale-fact detection is now implemented (the PR body claimed it;
    the old code silently didn't implement it).

## Performance fix — O(n²) → O(mentioned × n)

``_check_entity_confusion`` previously computed Levenshtein for every
pair of registered names on every ``check_text`` call. For 1,000
registered names that's ~500K edit-distance calls per hook invocation.
Now we first identify which registry names actually appear in the text
(single regex scan), then only compute edit distance between mentioned
and unmentioned names. Pinned by a test that asserts <200ms on a 500-
name registry with zero mentions.

Also: when *both* similar names are mentioned in the text, we no
longer flag them — the user clearly knows they're different people.

## Shared entity-registry loader

``mempalace/miner.py`` already had an mtime-cached loader for
``~/.mempalace/known_entities.json``. fact_checker had a duplicate
implementation that leaked file handles and ignored caching. Extended
miner's cache to expose both the flat set (``_load_known_entities``)
and the raw category dict (``_load_known_entities_raw``); fact_checker
now imports the latter. No more double disk reads, no more handle leak.

## Tests — 24 cases in tests/test_fact_checker.py

All three detection paths + both dead-code regressions:
  * ``test_kg_init_uses_db_path_not_palace_path_kwarg`` — pins the
    correct KG constructor signature so the ``palace_path=`` bug can't
    come back.
  * ``test_relationship_mismatch_detected`` — the headline example from
    the PR body now actually fires.
  * ``test_stale_fact_detected`` — valid_to-closed triple is flagged.
  * ``test_current_fact_same_triple_is_not_flagged`` — no false positive
    on a still-valid match.
  * ``test_performance_bounded_by_mentioned_names`` — 500-name registry,
    zero mentions, <200ms. Regression for the O(n²) blowup.
  * ``test_no_false_positive_when_both_names_mentioned`` — Mila and
    Milla in the same text is fine.
  * Plus claim extraction, flatten_names shapes, CLI exit code, empty
    text handling, missing-palace graceful fallback, registry-dict
    shape support.

785/785 suite pass. ruff + format clean on CI-pinned 0.4.x.
This commit is contained in:
Igor Lins e Silva
2026-04-13 18:20:11 -03:00
parent 339f96a4d9
commit 1263c3c91e
4 changed files with 620 additions and 126 deletions
+267 -109
View File
@@ -1,152 +1,304 @@
"""
fact_checker.py — Verify text against known facts in the palace.
Checks AI responses, diary entries, and new content against the
entity registry and knowledge graph for contradictions. Catches:
- Wrong names (similar but different entities)
- Wrong relationships (calling someone the wrong role)
- Stale facts (things that changed — KG has valid_from/valid_to)
Checks AI responses, diary entries, and new content against the entity
registry and knowledge graph for three classes of issue:
Uses the entity_registry and knowledge_graph — no hardcoded facts.
* similar_name — text mentions a name that's one/two edits
away from *another* registered name, raising
the possibility of a typo or mix-up.
* relationship_mismatch — text asserts a role between two entities
(e.g. "Bob is Alice's brother") while the KG
records a *different* current role for the
same subject/object pair.
* stale_fact — text asserts a fact that the KG marks closed
(``valid_to`` in the past).
Purely offline. Inputs: entity_registry JSON + KG SQLite. No network.
Usage:
from mempalace.fact_checker import check_text
issues = check_text("Bob is Alice's brother", palace_path)
# → [{"type": "relationship_mismatch", "detail": "KG says Bob is Alice's husband"}]
# CLI
python -m mempalace.fact_checker "Bob is Alice's brother" --palace ~/.mempalace/palace
python -m mempalace.fact_checker "Bob is Alice's brother" \\
--palace ~/.mempalace/palace
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from datetime import datetime, timezone
# Share miner's mtime-cached registry loader so we don't double-read
# ~/.mempalace/known_entities.json on every check_text call.
from .miner import _load_known_entities_raw
def check_text(text, palace_path=None, config=None):
"""Check text for contradictions against known facts.
# Narrow detection patterns — parse "X is Y's Z" and "X's Z is Y".
# Names are captured greedily as word sequences (letters + optional
# capitalized follow-ons) so simple multi-token names still work.
# Relationship words are constrained to sane lengths to avoid matching
# arbitrary filler.
_RELATIONSHIP_PATTERNS = [
# "Bob is Alice's brother" → subject=Bob, possessor=Alice, role=brother
re.compile(r"\b([A-Z][\w-]+)\s+is\s+([A-Z][\w-]+)'s\s+([a-z]{3,20})\b"),
# "Alice's brother is Bob" → possessor=Alice, role=brother, subject=Bob
re.compile(r"\b([A-Z][\w-]+)'s\s+([a-z]{3,20})\s+is\s+([A-Z][\w-]+)\b"),
]
Returns list of issues found. Empty list = no contradictions.
def check_text(text: str, palace_path: str = None, config=None) -> list:
"""Return a list of issues detected in ``text``.
Empty list means "no contradictions found" — absence of evidence, not
evidence of absence. The detector is deliberately conservative:
every issue is anchored to a specific KG fact or registry entry.
"""
if config is None:
from .config import MempalaceConfig
config = MempalaceConfig()
if palace_path is None:
palace_path = config.palace_path
issues = []
if not text:
return []
# Load known entities
entity_names = _load_known_entities()
issues: list = []
entity_names_raw = _load_known_entities_raw()
# Check entity name confusion (similar names that might be mixed up)
issues.extend(_check_entity_confusion(text, entity_names))
# Check against knowledge graph facts
issues.extend(_check_kg_facts(text, palace_path))
issues.extend(_check_entity_confusion(text, entity_names_raw))
issues.extend(_check_kg_contradictions(text, palace_path))
return issues
def _load_known_entities():
"""Load entity names from the registry."""
import json
registry_path = os.path.expanduser("~/.mempalace/known_entities.json")
if not os.path.exists(registry_path):
return {}
try:
return json.loads(open(registry_path).read())
except Exception:
return {}
# ── entity-name confusion ────────────────────────────────────────────
def _check_entity_confusion(text, entity_names):
"""Check if text confuses similar entity names."""
issues = []
all_names = set()
for cat in entity_names.values():
def _flatten_names(entity_names_raw: dict) -> set:
"""Flatten a ``{category: [names]}`` or ``{category: {name: meta}}``
registry into a set of names."""
flat: set = set()
for cat in entity_names_raw.values():
if isinstance(cat, list):
all_names.update(cat)
flat.update(str(n) for n in cat if n)
elif isinstance(cat, dict):
all_names.update(cat.keys())
flat.update(str(k) for k in cat.keys() if k)
return flat
# Find names mentioned in text
mentioned = set()
def _check_entity_confusion(text: str, entity_names_raw: dict) -> list:
"""Flag names mentioned in the text that are edit-distance ≤ 2 from
a *different* registered name — a common typo / mix-up pattern.
Performance note: the original O(n²) pairwise scan over the full
registry is gone. We first identify which names actually appear in
the text, then only compute edit distance between *mentioned* names
and the rest of the registry. This makes the cost O(m × n) where m
is the handful of names in the text, not the full registry.
"""
all_names = _flatten_names(entity_names_raw)
if not all_names:
return []
# Which names from the registry actually appear in the text?
mentioned: list = []
for name in all_names:
if re.search(r'\b' + re.escape(name) + r'\b', text, re.IGNORECASE):
mentioned.add(name)
if re.search(r"\b" + re.escape(name) + r"\b", text, re.IGNORECASE):
mentioned.append(name)
if not mentioned:
return []
# Check for names that are very similar but different (edit distance 1-2)
name_list = sorted(all_names)
for i, name_a in enumerate(name_list):
for name_b in name_list[i + 1:]:
if _edit_distance(name_a.lower(), name_b.lower()) <= 2:
if name_a in mentioned or name_b in mentioned:
if name_a in text and name_b not in text:
issues.append({
"type": "similar_name",
"detail": f"'{name_a}' mentioned — did you mean '{name_b}'? (similar names in registry)",
"names": [name_a, name_b],
})
issues: list = []
seen_pairs: set = set()
for name_a in mentioned:
a_lower = name_a.lower()
for name_b in all_names:
if name_b == name_a:
continue
# Dedupe by unordered pair so we don't double-report.
pair_key = tuple(sorted((name_a.lower(), name_b.lower())))
if pair_key in seen_pairs:
continue
# Only flag when name_b is a *different* registry entry that
# was NOT mentioned — otherwise both names in the text is
# just the user writing about two people.
if name_b in mentioned:
seen_pairs.add(pair_key)
continue
distance = _edit_distance(a_lower, name_b.lower())
if 0 < distance <= 2:
issues.append(
{
"type": "similar_name",
"detail": (
f"'{name_a}' mentioned — did you mean "
f"'{name_b}'? (edit distance {distance})"
),
"names": [name_a, name_b],
"distance": distance,
}
)
seen_pairs.add(pair_key)
return issues
def _check_kg_facts(text, palace_path):
"""Check text against knowledge graph for contradictions."""
issues = []
# ── KG contradictions ────────────────────────────────────────────────
def _extract_claims(text: str) -> list:
"""Yield structured (subject, predicate, object) claims from ``text``.
The two supported surface forms are "X is Y's Z" and "X's Z is Y",
both of which resolve to the triple ``(X, Z, Y)`` — ``X`` has role
``Z`` with respect to ``Y``. Matches are case-preserving for the
entity names (KG lookup is case-insensitive on normalized IDs).
"""
claims: list = []
for pat in _RELATIONSHIP_PATTERNS:
for match in pat.finditer(text):
groups = match.groups()
if pat is _RELATIONSHIP_PATTERNS[0]:
subject, possessor, role = groups[0], groups[1], groups[2]
else:
possessor, role, subject = groups[0], groups[1], groups[2]
claims.append(
{
"subject": subject,
"predicate": role.lower(),
"object": possessor,
"span": match.group(0),
}
)
return claims
def _check_kg_contradictions(text: str, palace_path: str) -> list:
"""Compare each claim in ``text`` against the KG.
For every claim ``(subject, predicate, object)`` parsed from the
text, look up the subject's current KG triples:
* ``relationship_mismatch`` fires when the KG records a fact about
the same ``(subject, object)`` pair but with a *different*
predicate — e.g. text says "brother" but KG says "husband".
* ``stale_fact`` fires when the KG has the exact ``(subject,
predicate, object)`` triple but its ``valid_to`` is in the past,
meaning the claim is no longer current.
"""
claims = _extract_claims(text)
if not claims:
return []
try:
from .knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(palace_path=palace_path)
# Extract relationship claims from text
# Pattern: "X is Y's Z" or "X's Z is Y"
patterns = [
(r"(\w+)\s+is\s+(\w+)'s\s+(\w+)", "subject", "possessor", "role"),
(r"(\w+)'s\s+(\w+)\s+is\s+(\w+)", "possessor", "role", "subject"),
]
for pattern, *roles in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
groups = match.groups()
subject = groups[0]
# Query KG for this entity
try:
facts = kg.query(subject)
if facts:
for fact in facts:
# Check if the claim contradicts a known fact
if fact.get("valid_to") is None: # current fact
kg_pred = fact.get("predicate", "").lower()
claim = match.group(0).lower()
if kg_pred in claim and fact.get("object", "").lower() not in claim:
issues.append({
"type": "relationship_mismatch",
"detail": f"Text says '{match.group(0)}' but KG says: {subject} {kg_pred} {fact.get('object')}",
"entity": subject,
})
except Exception:
pass
# KG lives alongside the palace collection; mcp_server uses the
# same convention (see _kg init). Pass ``db_path`` — the previous
# code passed a nonexistent ``palace_path`` kwarg which raised
# TypeError, silently swallowed by the outer except and rendered
# the entire KG-check path dead.
kg = KnowledgeGraph(db_path=os.path.join(palace_path, "knowledge_graph.sqlite3"))
except Exception:
pass # KG not available — skip
# KG unavailable (brand-new palace, corrupted DB, etc.) — skip.
return []
issues: list = []
for claim in claims:
subject = claim["subject"]
claim_pred = claim["predicate"]
claim_obj = claim["object"]
try:
facts = kg.query_entity(subject, direction="outgoing")
except Exception:
continue
if not facts:
continue
current_facts = [f for f in facts if f.get("current")]
# Mismatch: KG fact about same (subject, object) pair but different predicate.
for fact in current_facts:
if not _objects_match(fact.get("object"), claim_obj):
continue
kg_pred = (fact.get("predicate") or "").lower()
if kg_pred and kg_pred != claim_pred:
issues.append(
{
"type": "relationship_mismatch",
"detail": (
f"Text says '{claim['span']}' but KG records "
f"{subject} {kg_pred} {fact.get('object')}"
),
"entity": subject,
"claim": {
"predicate": claim_pred,
"object": claim_obj,
},
"kg_fact": {
"predicate": kg_pred,
"object": fact.get("object"),
},
}
)
# Stale fact: exact match on (subject, predicate, object) but KG
# closed the window in the past.
now_iso = datetime.now(timezone.utc).date().isoformat()
for fact in facts:
if fact.get("current"):
continue
kg_pred = (fact.get("predicate") or "").lower()
if kg_pred != claim_pred:
continue
if not _objects_match(fact.get("object"), claim_obj):
continue
valid_to = fact.get("valid_to")
if valid_to and str(valid_to) < now_iso:
issues.append(
{
"type": "stale_fact",
"detail": (
f"Text says '{claim['span']}' but KG marks "
f"this fact closed on {valid_to}"
),
"entity": subject,
"valid_to": valid_to,
}
)
return issues
def _edit_distance(s1, s2):
"""Simple Levenshtein distance."""
def _objects_match(kg_obj, claim_obj: str) -> bool:
if kg_obj is None or not claim_obj:
return False
return str(kg_obj).strip().lower() == claim_obj.strip().lower()
# ── Levenshtein helper (tight iterative version) ─────────────────────
def _edit_distance(s1: str, s2: str) -> int:
"""Levenshtein distance. O(len(s1) * len(s2)) time, O(len(s2)) space."""
if len(s1) < len(s2):
return _edit_distance(s2, s1)
if len(s2) == 0:
s1, s2 = s2, s1
if not s2:
return len(s1)
prev = list(range(len(s2) + 1))
for i, c1 in enumerate(s1):
curr = [i + 1]
for j, c2 in enumerate(s2):
curr.append(min(
prev[j + 1] + 1,
curr[j] + 1,
prev[j] + (0 if c1 == c2 else 1),
))
curr.append(
min(
prev[j + 1] + 1,
curr[j] + 1,
prev[j] + (0 if c1 == c2 else 1),
)
)
prev = curr
return prev[-1]
@@ -154,24 +306,30 @@ def _edit_distance(s1, s2):
if __name__ == "__main__":
import argparse
import json
import sys
parser = argparse.ArgumentParser(description="Check text against known facts")
parser.add_argument("text", nargs="?", help="Text to check")
parser.add_argument("--palace", default=os.path.expanduser("~/.mempalace/palace"))
parser.add_argument("--stdin", action="store_true", help="Read from stdin")
parser = argparse.ArgumentParser(
description="Check text against known facts in the MemPalace palace.",
epilog="Exits 0 when no issues found, 1 when one or more issues detected.",
)
parser.add_argument("text", nargs="?", help="Text to check (or use --stdin).")
parser.add_argument(
"--palace",
default=os.path.expanduser("~/.mempalace/palace"),
help="Path to the palace directory.",
)
parser.add_argument("--stdin", action="store_true", help="Read text from stdin.")
args = parser.parse_args()
if args.stdin:
import sys
text = sys.stdin.read()
text_in = sys.stdin.read()
elif args.text:
text = args.text
text_in = args.text
else:
print("Provide text as argument or use --stdin")
exit(1)
parser.error("Provide text as argument or use --stdin.")
issues = check_text(text, palace_path=args.palace)
if issues:
print(json.dumps(issues, indent=2))
else:
print("No contradictions found.")
found = check_text(text_in, palace_path=args.palace)
if found:
print(json.dumps(found, indent=2))
sys.exit(1)
print("No contradictions found.")
+25 -5
View File
@@ -35,7 +35,15 @@ from .version import __version__
import chromadb
from .query_sanitizer import sanitize_query
from .searcher import search_memories
from .palace_graph import traverse, find_tunnels, graph_stats, create_tunnel, list_tunnels, delete_tunnel, follow_tunnels
from .palace_graph import (
traverse,
find_tunnels,
graph_stats,
create_tunnel,
list_tunnels,
delete_tunnel,
follow_tunnels,
)
from .knowledge_graph import KnowledgeGraph
@@ -519,7 +527,10 @@ def tool_create_tunnel(
except ValueError as e:
return {"error": str(e)}
return create_tunnel(
source_wing, source_room, target_wing, target_room,
source_wing,
source_room,
target_wing,
target_room,
label=label,
source_drawer_id=source_drawer_id,
target_drawer_id=target_drawer_id,
@@ -1251,8 +1262,14 @@ TOOLS = {
"target_wing": {"type": "string", "description": "Wing of the target"},
"target_room": {"type": "string", "description": "Room in the target wing"},
"label": {"type": "string", "description": "Description of the connection"},
"source_drawer_id": {"type": "string", "description": "Optional specific drawer ID"},
"target_drawer_id": {"type": "string", "description": "Optional specific drawer ID"},
"source_drawer_id": {
"type": "string",
"description": "Optional specific drawer ID",
},
"target_drawer_id": {
"type": "string",
"description": "Optional specific drawer ID",
},
},
"required": ["source_wing", "source_room", "target_wing", "target_room"],
},
@@ -1263,7 +1280,10 @@ TOOLS = {
"input_schema": {
"type": "object",
"properties": {
"wing": {"type": "string", "description": "Filter tunnels by wing (shows tunnels where wing is source or target)"},
"wing": {
"type": "string",
"description": "Filter tunnels by wing (shows tunnels where wing is source or target)",
},
},
},
"handler": tool_list_tunnels,
+40 -12
View File
@@ -379,17 +379,17 @@ def chunk_text(content: str, source_file: str) -> list:
_ENTITY_REGISTRY_PATH = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json")
_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset()}
_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset(), "raw": {}}
_ENTITY_EXTRACT_WINDOW = 5000 # chars of content scanned for capitalized words
_ENTITY_METADATA_LIMIT = 25 # max entities packed into the metadata field
def _load_known_entities() -> frozenset:
"""Load (and cache) the user's known-entity registry by mtime.
Reads ``~/.mempalace/known_entities.json``. The registry is shaped as
``{"category": ["Name1", "Name2", ...], ...}``. Cached across calls
in the same process; invalidated when the file's mtime changes.
def _refresh_known_entities_cache() -> None:
"""Reload ``~/.mempalace/known_entities.json`` into the module cache if
its mtime changed since the last read. Shared by ``_load_known_entities``
(flat set) and ``_load_known_entities_raw`` (category dict), so callers
can pick whichever shape they need without duplicating the mtime-gated
disk read.
"""
try:
mtime = os.path.getmtime(_ENTITY_REGISTRY_PATH)
@@ -397,28 +397,56 @@ def _load_known_entities() -> frozenset:
if _ENTITY_REGISTRY_CACHE["mtime"] is not None:
_ENTITY_REGISTRY_CACHE["mtime"] = None
_ENTITY_REGISTRY_CACHE["names"] = frozenset()
return _ENTITY_REGISTRY_CACHE["names"]
_ENTITY_REGISTRY_CACHE["raw"] = {}
return
if _ENTITY_REGISTRY_CACHE["mtime"] == mtime:
return _ENTITY_REGISTRY_CACHE["names"]
return
names: set = set()
raw: dict = {}
try:
import json
with open(_ENTITY_REGISTRY_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
for cat in data.values():
if isinstance(cat, list):
names.update(str(n) for n in cat if n)
if isinstance(data, dict):
raw = data
for cat in data.values():
if isinstance(cat, list):
names.update(str(n) for n in cat if n)
elif isinstance(cat, dict):
names.update(str(k) for k in cat.keys() if k)
except Exception:
names = set()
raw = {}
_ENTITY_REGISTRY_CACHE["mtime"] = mtime
_ENTITY_REGISTRY_CACHE["names"] = frozenset(names)
_ENTITY_REGISTRY_CACHE["raw"] = raw
def _load_known_entities() -> frozenset:
"""Flat set of every known entity name (across all categories).
Cached by mtime; invalidated when the registry file changes.
"""
_refresh_known_entities_cache()
return _ENTITY_REGISTRY_CACHE["names"]
def _load_known_entities_raw() -> dict:
"""Full category-dict view of the registry, shape
``{"category": ["Name1", ...], ...}``. Cached by mtime.
Consumed by modules (e.g., fact_checker) that need to reason about
categories rather than a flat name set. Never returns a mutable
reference to the cache — callers get a shallow copy.
"""
_refresh_known_entities_cache()
return dict(_ENTITY_REGISTRY_CACHE["raw"])
def _extract_entities_for_metadata(content: str) -> str:
"""Extract entity names from content for metadata tagging.
+288
View File
@@ -0,0 +1,288 @@
"""
test_fact_checker.py — Regression + integration tests for fact_checker.
Covers every detection path + the three bugs the original PR silently
hid behind ``except Exception: pass``:
* ``kg.query()`` doesn't exist — code must use ``query_entity``.
* ``KnowledgeGraph(palace_path=...)`` is not a valid kwarg — code
must pass ``db_path``.
* O(n²) edit-distance over the full registry — must filter to names
actually mentioned in the text.
Also pins the three feature contracts:
* similar_name — "Mila" vs "Milla" in a registry with both.
* relationship_mismatch — "Bob is Alice's brother" vs KG "husband".
* stale_fact — claim matches a triple whose valid_to is in the past.
"""
from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
from mempalace.fact_checker import (
_check_entity_confusion,
_edit_distance,
_extract_claims,
_flatten_names,
check_text,
)
from mempalace.knowledge_graph import KnowledgeGraph
# ── claim extraction ─────────────────────────────────────────────────
class TestExtractClaims:
def test_parses_x_is_ys_z(self):
claims = _extract_claims("Bob is Alice's brother")
assert len(claims) == 1
assert claims[0] == {
"subject": "Bob",
"predicate": "brother",
"object": "Alice",
"span": "Bob is Alice's brother",
}
def test_parses_xs_z_is_y(self):
claims = _extract_claims("Alice's brother is Bob")
assert len(claims) == 1
assert claims[0]["subject"] == "Bob"
assert claims[0]["predicate"] == "brother"
assert claims[0]["object"] == "Alice"
def test_ignores_sentences_without_possessive_role(self):
assert _extract_claims("Bob drove to the store today") == []
assert _extract_claims("Just some prose without relationships") == []
def test_multiple_claims_in_one_text(self):
claims = _extract_claims("Bob is Alice's brother. Carol is Dave's sister.")
subjects = {c["subject"] for c in claims}
assert subjects == {"Bob", "Carol"}
# ── entity confusion ─────────────────────────────────────────────────
class TestEntityConfusion:
def test_flags_near_name_when_only_one_mentioned(self):
registry = {"people": ["Milla", "Mila"]}
issues = _check_entity_confusion("I spoke with Mila today.", registry)
# "Mila" mentioned, "Milla" not — registry has both at edit-distance 1,
# flag the possible confusion.
assert len(issues) == 1
assert issues[0]["type"] == "similar_name"
assert set(issues[0]["names"]) == {"Mila", "Milla"}
assert issues[0]["distance"] == 1
def test_no_false_positive_when_both_names_mentioned(self):
"""Regression: a text discussing both Mila and Milla is fine —
the user clearly knows they're different. Don't nag."""
registry = {"people": ["Milla", "Mila"]}
issues = _check_entity_confusion("Mila and Milla met for lunch.", registry)
assert issues == []
def test_no_issues_when_registry_empty(self):
assert _check_entity_confusion("Bob said hi", {}) == []
assert _check_entity_confusion("Bob said hi", {"people": []}) == []
def test_no_issues_when_no_mentioned_names(self):
registry = {"people": ["Zelda", "Link", "Sheik"]}
assert _check_entity_confusion("nothing relevant here", registry) == []
def test_registry_dict_shape_is_supported(self):
# Some registries store {"people": {"Alice": {...meta}}}; we still
# need to surface the keys as candidate names.
registry = {"people": {"Milla": {"role": "creator"}, "Mila": {}}}
issues = _check_entity_confusion("I messaged Mila yesterday", registry)
assert any("Milla" in (i["names"] or []) for i in issues)
class TestEditDistance:
def test_basic_distances(self):
assert _edit_distance("kitten", "sitting") == 3
assert _edit_distance("mila", "milla") == 1
assert _edit_distance("abc", "abc") == 0
def test_empty_strings(self):
assert _edit_distance("", "") == 0
assert _edit_distance("abc", "") == 3
assert _edit_distance("", "abc") == 3
def test_performance_bounded_by_mentioned_names(self):
"""Regression: an earlier implementation did O(n²) pairwise
edit-distance over every registry entry on every check_text call.
With 100 names and zero mentions, the call must return in a blink
because no edit-distance comparison should even start."""
import time
# 500 random names, none of which appear in the text.
registry = {"people": [f"Zelda{i:03d}" for i in range(500)]}
text = "completely irrelevant prose with no registered names at all"
start = time.perf_counter()
issues = _check_entity_confusion(text, registry)
elapsed = time.perf_counter() - start
assert issues == []
# Even an unoptimized implementation should beat this by orders
# of magnitude once we've filtered to mentioned names (which is
# 0 here) — if it's still doing O(n²), we'll blow past.
assert elapsed < 0.2, f"entity confusion took {elapsed:.3f}s on empty mentions"
# ── _flatten_names helper ────────────────────────────────────────────
class TestFlattenNames:
def test_handles_list_categories(self):
assert _flatten_names({"people": ["Ada", "Bob"]}) == {"Ada", "Bob"}
def test_handles_dict_categories(self):
assert _flatten_names({"people": {"Ada": {}, "Bob": {}}}) == {"Ada", "Bob"}
def test_skips_falsy_entries(self):
assert _flatten_names({"people": ["Ada", "", None, "Bob"]}) == {"Ada", "Bob"}
# ── KG integration (uses a real tmp SQLite palace) ───────────────────
@pytest.fixture
def palace_with_kg(tmp_path):
"""Palace directory with a real KG pre-seeded with a few triples.
The KG file lives at ``<palace>/knowledge_graph.sqlite3`` — same
convention used by the MCP server. Fact-checker must find it via
that path, not via a bogus ``palace_path`` kwarg.
"""
palace = tmp_path / "palace"
palace.mkdir()
db = str(palace / "knowledge_graph.sqlite3")
kg = KnowledgeGraph(db_path=db)
yield palace, kg
class TestKGContradictions:
def test_kg_init_uses_db_path_not_palace_path_kwarg(self):
"""Regression: the original code passed ``palace_path=`` to a
constructor whose only kwarg is ``db_path``. That raised
TypeError — silently swallowed — and the KG path became dead
code. This test pins the correct call signature."""
# Simply construct via the correct signature; raising means the
# KG constructor has changed in a way that fact_checker must too.
kg = KnowledgeGraph(db_path=":memory:")
# query_entity must exist (this is the method fact_checker calls).
assert callable(getattr(kg, "query_entity", None))
# The API that fact_checker used to call does NOT exist.
assert not hasattr(kg, "query")
def test_relationship_mismatch_detected(self, palace_with_kg):
"""The feature's headline example: text says brother, KG says husband."""
palace, kg = palace_with_kg
kg.add_triple("Bob", "husband_of", "Alice", valid_from="2020-01-01")
issues = check_text("Bob is Alice's husband_of", str(palace))
# Exact-predicate + same object → no mismatch.
assert all(i["type"] != "relationship_mismatch" for i in issues)
issues = check_text("Bob is Alice's brother", str(palace))
mismatches = [i for i in issues if i["type"] == "relationship_mismatch"]
assert mismatches, "should flag text/KG mismatch for same (subject, object)"
m = mismatches[0]
assert m["entity"] == "Bob"
assert m["claim"]["predicate"] == "brother"
assert m["kg_fact"]["predicate"] == "husband_of"
def test_no_false_positive_when_kg_has_no_facts_about_subject(self, palace_with_kg):
palace, _ = palace_with_kg
# KG is empty → no mismatch should fire.
assert check_text("Bob is Alice's brother", str(palace)) == []
def test_stale_fact_detected(self, palace_with_kg):
palace, kg = palace_with_kg
# An old relationship that was superseded in 2023. Using a
# possessive-shape claim so the narrow claim-extraction regex
# actually reaches the stale-fact branch.
kg.add_triple(
"Bob",
"brother",
"Alice",
valid_from="2010-01-01",
valid_to="2023-06-01",
)
issues = check_text("Bob is Alice's brother", str(palace))
stale = [i for i in issues if i["type"] == "stale_fact"]
assert stale, "should flag closed-window fact as stale"
assert stale[0]["entity"] == "Bob"
assert stale[0]["valid_to"].startswith("2023")
def test_current_fact_same_triple_is_not_flagged(self, palace_with_kg):
palace, kg = palace_with_kg
kg.add_triple("Bob", "brother", "Alice", valid_from="2010-01-01")
issues = check_text("Bob is Alice's brother", str(palace))
assert issues == []
def test_missing_palace_does_not_crash(self, tmp_path):
"""Brand-new palace (no KG file yet) — check_text must return []
rather than raising or hanging."""
nonexistent = str(tmp_path / "never_created")
assert check_text("Bob is Alice's brother", nonexistent) == []
# ── end-to-end check_text contract ───────────────────────────────────
class TestCheckTextContract:
def test_empty_text_returns_empty_list(self, tmp_path):
assert check_text("", str(tmp_path / "palace")) == []
def test_registry_confusion_path_isolated_from_kg(self, tmp_path, monkeypatch):
"""If the registry file is present but the KG is missing, the
similar-name path must still fire. Prior implementations had
such entangled state that one failure killed both paths."""
# Bypass the real registry by pointing cache at a temp file.
registry = tmp_path / "known_entities.json"
registry.write_text(json.dumps({"people": ["Milla", "Mila"]}))
from mempalace import miner
monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry))
miner._ENTITY_REGISTRY_CACHE.update({"mtime": None, "names": frozenset(), "raw": {}})
issues = check_text("Chatted with Mila.", str(tmp_path / "nonexistent_palace"))
assert any(i["type"] == "similar_name" for i in issues)
# ── CLI ──────────────────────────────────────────────────────────────
class TestCLI:
def test_exits_nonzero_when_issues_found(self, tmp_path, monkeypatch, capsys):
"""The CLI exit code is how shell scripts / hooks know to act —
pin it explicitly."""
registry = tmp_path / "known_entities.json"
registry.write_text(json.dumps({"people": ["Milla", "Mila"]}))
from mempalace import fact_checker, miner
monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry))
miner._ENTITY_REGISTRY_CACHE.update({"mtime": None, "names": frozenset(), "raw": {}})
# Simulate argv: "Mila said hi"
monkeypatch.setattr(
"sys.argv",
["fact_checker", "Mila said hi", "--palace", str(tmp_path / "palace")],
)
with pytest.raises(SystemExit) as excinfo:
# Re-exec the __main__ block via runpy.
import runpy
runpy.run_module("mempalace.fact_checker", run_name="__main__")
# Issues found → exit code 1.
assert excinfo.value.code == 1
out = capsys.readouterr().out
assert "similar_name" in out
# Silence unused import warning.
_ = (MagicMock, patch, fact_checker)