Merge pull request #1014 from MemPalace/refactor/rfc-002-sources-scaffolding

refactor(sources): RFC 002 §9 scaffolding — BaseSourceAdapter, registry, PalaceContext
This commit is contained in:
Igor Lins e Silva
2026-04-18 18:44:52 -03:00
committed by GitHub
8 changed files with 1325 additions and 2 deletions
+33 -2
View File
@@ -83,6 +83,8 @@ class KnowledgeGraph:
confidence REAL DEFAULT 1.0,
source_closet TEXT,
source_file TEXT,
source_drawer_id TEXT,
adapter_name TEXT,
extracted_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (subject) REFERENCES entities(id),
FOREIGN KEY (object) REFERENCES entities(id)
@@ -93,8 +95,25 @@ class KnowledgeGraph:
CREATE INDEX IF NOT EXISTS idx_triples_predicate ON triples(predicate);
CREATE INDEX IF NOT EXISTS idx_triples_valid ON triples(valid_from, valid_to);
""")
self._migrate_schema(conn)
conn.commit()
def _migrate_schema(self, conn):
"""Backwards-compatible schema migration for older triples tables.
Fresh palaces get ``source_drawer_id`` / ``adapter_name`` (RFC 002 §5.5)
directly from the canonical ``CREATE TABLE`` above, so this path is a
no-op on new installs. It exists for palaces that were created before
those columns were added: SQLite has no ``ADD COLUMN IF NOT EXISTS``,
so we introspect the schema and only issue the ALTER when the column
is missing.
"""
existing = {row["name"] for row in conn.execute("PRAGMA table_info(triples)")}
if "source_drawer_id" not in existing:
conn.execute("ALTER TABLE triples ADD COLUMN source_drawer_id TEXT")
if "adapter_name" not in existing:
conn.execute("ALTER TABLE triples ADD COLUMN adapter_name TEXT")
def _conn(self):
if self._connection is None:
self._connection = sqlite3.connect(self.db_path, timeout=10, check_same_thread=False)
@@ -137,10 +156,16 @@ class KnowledgeGraph:
confidence: float = 1.0,
source_closet: str = None,
source_file: str = None,
source_drawer_id: str = None,
adapter_name: str = None,
):
"""
Add a relationship triple: subject → predicate → object.
``source_drawer_id`` and ``adapter_name`` are RFC 002 §5.5 provenance
fields populated by adapters that advertise ``supports_kg_triples``;
they default to ``None`` so every existing caller stays source-compatible.
Examples:
add_triple("Max", "child_of", "Alice", valid_from="2015-04-01")
add_triple("Max", "does", "swimming", valid_from="2025-01-01")
@@ -173,8 +198,12 @@ class KnowledgeGraph:
triple_id = f"t_{sub_id}_{pred}_{obj_id}_{hashlib.sha256(f'{valid_from}{datetime.now().isoformat()}'.encode()).hexdigest()[:12]}"
conn.execute(
"""INSERT INTO triples (id, subject, predicate, object, valid_from, valid_to, confidence, source_closet, source_file)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
"""INSERT INTO triples (
id, subject, predicate, object,
valid_from, valid_to, confidence,
source_closet, source_file,
source_drawer_id, adapter_name
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
triple_id,
sub_id,
@@ -185,6 +214,8 @@ class KnowledgeGraph:
confidence,
source_closet,
source_file,
source_drawer_id,
adapter_name,
),
)
return triple_id
+74
View File
@@ -0,0 +1,74 @@
"""Source adapter subsystem (RFC 002).
Public surface:
* :class:`BaseSourceAdapter` — per-source read-side contract.
* Typed records: :class:`SourceRef`, :class:`SourceItemMetadata`,
:class:`DrawerRecord`, :class:`RouteHint`, :class:`SourceSummary`,
:class:`AdapterSchema`, :class:`FieldSpec`.
* Error classes: :class:`SourceNotFoundError`, :class:`AuthRequiredError`,
:class:`AdapterClosedError`, :class:`TransformationViolationError`,
:class:`SchemaConformanceError`.
* Registry: :func:`register`, :func:`get_adapter`, :func:`available_adapters`,
:func:`resolve_adapter_for_source`.
* :class:`PalaceContext` — facade core passes to adapters during ``ingest``.
* :mod:`transforms` — reference implementations of the reserved §1.4
transformations + :func:`get_transformation` resolver.
"""
from .base import (
AdapterClosedError,
AdapterSchema,
AuthRequiredError,
BaseSourceAdapter,
DrawerRecord,
FieldSpec,
IngestMode,
IngestResult,
RouteHint,
SchemaConformanceError,
SourceAdapterError,
SourceItemMetadata,
SourceNotFoundError,
SourceRef,
SourceSummary,
TransformationViolationError,
)
from .context import PalaceContext, ProgressHook
from .registry import (
available_adapters,
get_adapter,
get_adapter_class,
register,
reset_adapters,
resolve_adapter_for_source,
unregister,
)
__all__ = [
"AdapterClosedError",
"AdapterSchema",
"AuthRequiredError",
"BaseSourceAdapter",
"DrawerRecord",
"FieldSpec",
"IngestMode",
"IngestResult",
"PalaceContext",
"ProgressHook",
"RouteHint",
"SchemaConformanceError",
"SourceAdapterError",
"SourceItemMetadata",
"SourceNotFoundError",
"SourceRef",
"SourceSummary",
"TransformationViolationError",
"available_adapters",
"get_adapter",
"get_adapter_class",
"register",
"reset_adapters",
"resolve_adapter_for_source",
"unregister",
]
+245
View File
@@ -0,0 +1,245 @@
"""Source adapter contract for MemPalace (RFC 002).
Mirrors what ``mempalace/backends/base.py`` does for the write side: it defines
the read-side surface every source adapter must implement. A source adapter
extracts content from a specific origin (filesystem, git, Slack, Cursor …) and
yields typed records (``SourceItemMetadata`` / ``DrawerRecord``) that core
routes into the palace.
This module is spec scaffolding. The first-party miners (``mempalace/miner.py``
and ``mempalace/convo_miner.py``) are migrated onto it in a follow-up PR;
in this PR we publish the contract so third-party adapters can begin building
against a stable surface.
See ``docs/rfcs/002-source-adapter-plugin-spec.md`` for the authoritative
spec text.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, ClassVar, Iterator, Literal, Optional
if TYPE_CHECKING:
from .context import PalaceContext # noqa: F401 (used in string annotation)
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class SourceAdapterError(Exception):
"""Base class for every source-adapter error raised by core."""
class SourceNotFoundError(SourceAdapterError):
"""Raised when a ``SourceRef`` does not resolve to a readable source."""
class AuthRequiredError(SourceAdapterError):
"""Raised when an adapter needs credentials that were not provided.
The message MUST name the env vars (or other supported mechanism) the
operator needs to set.
"""
class AdapterClosedError(SourceAdapterError):
"""Raised when an adapter method is called after ``close()``."""
class TransformationViolationError(SourceAdapterError):
"""Raised by the conformance suite when round-tripping a drawer requires
an undeclared transformation (RFC 002 §7.27.3)."""
class SchemaConformanceError(SourceAdapterError):
"""Raised when a ``DrawerRecord.metadata`` violates the adapter schema
returned by :meth:`BaseSourceAdapter.describe_schema`."""
# ---------------------------------------------------------------------------
# Value objects
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class SourceRef:
"""A handle to the source a user wants to ingest.
``local_path`` is for filesystem-rooted sources (project dir, mbox file).
``uri`` is for URL-like references (``github.com/org/repo``,
``slack://workspace/channel``).
``options`` carries adapter-specific non-secret config. Secrets MUST NOT
be placed here; see §4.2.
"""
local_path: Optional[str] = None
uri: Optional[str] = None
options: dict = field(default_factory=dict)
@dataclass(frozen=True)
class RouteHint:
"""Adapter-supplied routing hint (RFC 002 §2.5)."""
wing: Optional[str] = None
room: Optional[str] = None
hall: Optional[str] = None
@dataclass(frozen=True)
class SourceItemMetadata:
"""Lightweight pointer yielded before drawers for lazy-fetch adapters.
Core inspects ``version`` via :meth:`BaseSourceAdapter.is_current` to
decide whether to skip extraction; an adapter that responds positively
stops yielding drawers for this item and moves to the next.
"""
source_file: str
version: str
size_hint: Optional[int] = None
route_hint: Optional[RouteHint] = None
@dataclass(frozen=True)
class DrawerRecord:
"""One drawer's worth of extracted content plus flat metadata.
``metadata`` values MUST be flat scalars (``str``/``int``/``float``/``bool``)
per RFC 001 §1.4 — the chroma constraint. Nested data belongs on the
knowledge graph (§5.5) or in a declared ``json_string`` field (§5.4).
"""
content: str
source_file: str
chunk_index: int = 0
metadata: dict = field(default_factory=dict)
route_hint: Optional[RouteHint] = None
@dataclass(frozen=True)
class SourceSummary:
"""High-level description of a source returned by :meth:`source_summary`."""
description: str
item_count: Optional[int] = None
IngestMode = Literal["chunked_content", "whole_record", "metadata_only"]
@dataclass(frozen=True)
class FieldSpec:
"""Declared shape of a single per-adapter metadata field (§5.2)."""
type: Literal["string", "int", "float", "bool", "delimiter_joined_string", "json_string"]
required: bool
description: str
indexed: bool = False
delimiter: str = ";"
json_schema: Optional[dict] = None
@dataclass(frozen=True)
class AdapterSchema:
"""The per-adapter metadata schema returned by :meth:`describe_schema`."""
fields: dict[str, FieldSpec]
version: str
# The union type adapters yield from ``ingest``.
IngestResult = object # intentionally broad; runtime checks in core
# ---------------------------------------------------------------------------
# Adapter contract
# ---------------------------------------------------------------------------
class BaseSourceAdapter(ABC):
"""Long-lived adapter serving many ``SourceRef`` invocations (RFC 002 §2).
Instances are lightweight on construction — no I/O, no network, no
credential fetch. All work is deferred to :meth:`ingest`. Instances are
thread-safe for concurrent ``ingest`` calls across different ``SourceRef``
values (v1 serializes within a single ``SourceRef``).
Class attributes form the adapter's identity contract:
* ``name`` — stable adapter name used for registration and drawer metadata.
* ``adapter_version`` — adapter's own version, independent of
``spec_version``. Recorded on every drawer so re-extract workflows can
target drawers from a known-buggy adapter version.
* ``capabilities`` — free-form tokens; core inspects a documented subset.
* ``supported_modes`` — subset of ``chunked_content``, ``whole_record``,
``metadata_only``.
* ``declared_transformations`` — set of transformation names the adapter
applies to source bytes. The empty set marks a byte-preserving adapter.
* ``default_privacy_class`` — privacy class level (§6) applied unless the
palace config overrides it.
"""
name: ClassVar[str]
spec_version: ClassVar[str] = "1.0"
adapter_version: ClassVar[str] = "0.0.0"
capabilities: ClassVar[frozenset[str]] = frozenset()
supported_modes: ClassVar[frozenset[str]] = frozenset({"chunked_content"})
declared_transformations: ClassVar[frozenset[str]] = frozenset()
default_privacy_class: ClassVar[str] = "pii_potential"
# ------------------------------------------------------------------
# Required methods
# ------------------------------------------------------------------
@abstractmethod
def ingest(
self,
*,
source: SourceRef,
palace: "PalaceContext",
) -> Iterator[IngestResult]:
"""Enumerate and extract content from a source.
Yields a stream of ``SourceItemMetadata`` and ``DrawerRecord`` values.
Lazy adapters yield ``SourceItemMetadata`` ahead of the drawers for
that item so core can check :meth:`is_current` before committing to
the fetch. Eager adapters MAY interleave freely.
"""
@abstractmethod
def describe_schema(self) -> AdapterSchema:
"""Declare the structured metadata this adapter attaches.
The returned schema MUST be stable for a given ``adapter_version``.
Enterprises index on it; core uses it to validate adapter output.
"""
# ------------------------------------------------------------------
# Optional methods with default implementations
# ------------------------------------------------------------------
def is_current(
self,
*,
item: SourceItemMetadata,
existing_metadata: Optional[dict],
) -> bool:
"""Return True if the palace already has an up-to-date copy of ``item``.
Default: always returns False (re-extract every time). Adapters
advertising ``supports_incremental`` MUST override.
"""
return False
def source_summary(self, *, source: SourceRef) -> SourceSummary:
"""Describe a source without extracting."""
return SourceSummary(description=self.name)
def close(self) -> None:
"""Release any resources the adapter holds. Default: no-op."""
return None
+142
View File
@@ -0,0 +1,142 @@
"""``PalaceContext`` facade passed to source adapters (RFC 002 §9).
Bundles the palace-side surface an adapter needs during :meth:`ingest`:
drawer collection, closet collection, knowledge graph, palace config, and
progress hooks. Adapters receive a ``PalaceContext`` instance and MUST NOT
import ``mempalace.palace`` directly — that coupling is what the facade
exists to prevent.
This module publishes the shape third-party adapters target. Core's mine
loop will construct a concrete ``PalaceContext`` and pass it to adapters
when the filesystem/conversations miners are migrated onto ``BaseSourceAdapter``
in a follow-up PR; until then, no in-tree code constructs one, but the
contract is stable.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, Protocol
from .base import DrawerRecord
class _CollectionLike(Protocol):
"""Minimum of :class:`mempalace.backends.BaseCollection` adapters rely on.
Declared as a Protocol so tests and third-party adapters can substitute
any object with compatible method signatures without importing the
concrete backend. See ``mempalace/backends/base.py`` for the full surface.
"""
def add(self, **kwargs: Any) -> None: ...
def upsert(self, **kwargs: Any) -> None: ...
def query(self, **kwargs: Any) -> Any: ...
def get(self, **kwargs: Any) -> Any: ...
def delete(self, **kwargs: Any) -> None: ...
def count(self) -> int: ...
class _KnowledgeGraphLike(Protocol):
def add_triple(self, subject: str, predicate: str, obj: str, **kwargs: Any) -> Any: ...
# Progress hook signature: ``fn(event_name, **details) -> None``.
ProgressHook = Callable[..., None]
@dataclass
class PalaceContext:
"""Per-mine-invocation facade passed to :meth:`BaseSourceAdapter.ingest`.
Fields:
drawer_collection: The palace's drawer collection (via RFC 001 backend).
closet_collection: The palace's closet collection, or ``None`` if the
palace has no closets yet. Adapters should not write to this
directly; core builds closets post-step (RFC 002 §1.7).
knowledge_graph: The palace's SQLite knowledge graph. Adapters
advertising ``supports_kg_triples`` call ``add_triple`` on it.
palace_path: Filesystem root of the palace (convenience; same as
``backend.PalaceRef.local_path``).
config: Palace config object (hall keywords, rooms list, privacy
floor, etc.). Shape is the existing :class:`MempalaceConfig`.
adapter_name: Name of the adapter currently ingesting; populated by
core so drawers can carry ``metadata["adapter_name"]``.
adapter_version: Version of the adapter currently ingesting.
progress_hooks: Optional callables core invokes on progress events.
Methods are intentionally thin wrappers so the concrete mine loop in
core can swap implementations without changing adapter code.
"""
drawer_collection: _CollectionLike
knowledge_graph: _KnowledgeGraphLike
palace_path: str
closet_collection: Optional[_CollectionLike] = None
config: Optional[Any] = None
adapter_name: str = ""
adapter_version: str = ""
progress_hooks: list[ProgressHook] = field(default_factory=list)
# Internal: flag set by :meth:`skip_current_item` and checked by the core
# mine loop between yields. Not part of the adapter-facing contract; the
# adapter only needs to know that calling :meth:`skip_current_item` stops
# drawer emission for the current ``SourceItemMetadata``.
_skip_requested: bool = False
# ------------------------------------------------------------------
# Adapter-facing surface
# ------------------------------------------------------------------
def upsert_drawer(self, record: DrawerRecord) -> None:
"""Persist a ``DrawerRecord`` to the drawer collection.
Applies the spec-mandated ``adapter_name`` and ``adapter_version``
metadata stamps (§5.1) so adapters never need to populate them.
"""
meta = dict(record.metadata)
meta.setdefault("source_file", record.source_file)
meta.setdefault("chunk_index", record.chunk_index)
if self.adapter_name:
meta.setdefault("adapter_name", self.adapter_name)
if self.adapter_version:
meta.setdefault("adapter_version", self.adapter_version)
drawer_id = _build_drawer_id(record)
self.drawer_collection.upsert(
documents=[record.content],
ids=[drawer_id],
metadatas=[meta],
)
def skip_current_item(self) -> None:
"""Signal to core that the current ``SourceItemMetadata`` is up-to-date
and no drawers should be emitted for it. Core resets the flag after
advancing past the item."""
self._skip_requested = True
def emit(self, event: str, **details: Any) -> None:
"""Invoke each registered progress hook with ``(event, **details)``."""
for hook in self.progress_hooks:
try:
hook(event, **details)
except Exception: # pragma: no cover - hook errors never fail mine
import logging
logging.getLogger(__name__).exception("progress hook failed on %r", event)
def _build_drawer_id(record: DrawerRecord) -> str:
"""Deterministic drawer id: ``<sha256(source_file)[:24]>_<chunk_index>``.
Matches the shape existing miners rely on (``source_file`` + chunk index
pair) while keeping the id chroma-safe (no separators that collide with
existing metadata values). 96-bit SHA-256 prefix keeps collision risk
negligible across corpora the size of a palace (sha1@64 bits was too
close to the birthday bound for large ingests). Adapters that need a
different id scheme can bypass :meth:`PalaceContext.upsert_drawer` and
write through ``drawer_collection.upsert`` directly.
"""
import hashlib
digest = hashlib.sha256(record.source_file.encode("utf-8")).hexdigest()[:24]
return f"{digest}_{record.chunk_index}"
+162
View File
@@ -0,0 +1,162 @@
"""Source adapter registry + entry-point discovery (RFC 002 §3).
Third-party adapters ship as installable packages that declare a
``mempalace.sources`` entry point::
# pyproject.toml of mempalace-source-cursor
[project.entry-points."mempalace.sources"]
cursor = "mempalace_source_cursor:CursorAdapter"
MemPalace discovers them at process start. In-tree tests and local
development can register manually via :func:`register`. Explicit
registration wins on name conflict (RFC 002 §3.2).
Unlike storage backends (RFC 001 §3.3), source adapters are never auto-
detected — the user selects the adapter explicitly via ``--source NAME``
or config (§3.3). The default when no adapter is named is ``filesystem``
(to preserve current ``mempalace mine <path>`` behavior).
"""
from __future__ import annotations
import logging
from importlib import metadata
from threading import Lock
from typing import Type
from .base import BaseSourceAdapter
logger = logging.getLogger(__name__)
_ENTRY_POINT_GROUP = "mempalace.sources"
_DEFAULT_ADAPTER = "filesystem"
_registry: dict[str, Type[BaseSourceAdapter]] = {}
_instances: dict[str, BaseSourceAdapter] = {}
_explicit: set[str] = set()
_discovered = False
_lock = Lock()
def register(name: str, adapter_cls: Type[BaseSourceAdapter]) -> None:
"""Register ``adapter_cls`` under ``name``.
Explicit registration wins over entry-point discovery on conflict (§3.2).
"""
with _lock:
_registry[name] = adapter_cls
_explicit.add(name)
_instances.pop(name, None)
def unregister(name: str) -> None:
"""Remove an adapter registration (primarily for tests)."""
with _lock:
_registry.pop(name, None)
_explicit.discard(name)
_instances.pop(name, None)
def _discover_entry_points() -> None:
global _discovered
if _discovered:
return
with _lock:
if _discovered:
return
try:
eps = metadata.entry_points()
group = (
eps.select(group=_ENTRY_POINT_GROUP)
if hasattr(eps, "select")
else eps.get(_ENTRY_POINT_GROUP, [])
)
except Exception:
logger.exception("entry-point discovery for %s failed", _ENTRY_POINT_GROUP)
group = []
for ep in group:
if ep.name in _explicit:
continue # explicit registration wins
try:
cls = ep.load()
except Exception:
logger.exception("failed to load adapter entry point %r", ep.name)
continue
if not isinstance(cls, type) or not issubclass(cls, BaseSourceAdapter):
logger.warning(
"entry point %r did not resolve to a BaseSourceAdapter subclass (got %r)",
ep.name,
cls,
)
continue
_registry.setdefault(ep.name, cls)
_discovered = True
def available_adapters() -> list[str]:
"""Return sorted list of all registered adapter names."""
_discover_entry_points()
return sorted(_registry.keys())
def get_adapter_class(name: str) -> Type[BaseSourceAdapter]:
"""Return the registered adapter class for ``name``."""
_discover_entry_points()
try:
return _registry[name]
except KeyError as e:
raise KeyError(f"unknown source adapter {name!r}; available: {available_adapters()}") from e
def get_adapter(name: str) -> BaseSourceAdapter:
"""Return a long-lived instance of the named adapter.
Instances are cached per-name; repeated calls return the same object.
Call :func:`reset_adapters` in tests that need isolation.
"""
_discover_entry_points()
with _lock:
inst = _instances.get(name)
if inst is not None:
return inst
cls = _registry.get(name)
if cls is None:
raise KeyError(
f"unknown source adapter {name!r}; available: {sorted(_registry.keys())}"
)
inst = cls()
_instances[name] = inst
return inst
def reset_adapters() -> None:
"""Close and drop all cached adapter instances (primarily for tests)."""
with _lock:
for inst in _instances.values():
try:
inst.close()
except Exception:
logger.exception("error closing adapter during reset")
_instances.clear()
def resolve_adapter_for_source(
*,
explicit: str | None = None,
config_value: str | None = None,
default: str = _DEFAULT_ADAPTER,
) -> str:
"""Resolve the adapter name per RFC 002 §3.3 priority order.
1. Explicit ``--source`` flag or kwarg
2. Per-source config value
3. Default (``filesystem``)
Auto-detection is *intentionally* absent on the read side (§3.3); a
directory containing ``.git`` + ``workspaceStorage/`` + an ``mbox`` file
is not a signal of user intent.
"""
for candidate in (explicit, config_value):
if candidate:
return candidate
return default
+196
View File
@@ -0,0 +1,196 @@
"""Reference implementations of the reserved content transformations (RFC 002 §1.4).
Every source adapter declares the set of transformations it applies to source
bytes via ``declared_transformations``. The conformance suite then verifies
that the adapter's output can be reproduced from the source bytes by applying
*only* the declared transformations in declaration order, using these
reference implementations.
Each transformation is a pure function on strings (text content after UTF-8
decoding). ``utf8_replace_invalid`` is the one that operates on bytes.
The invariant the spec enforces: **no transformation is applied that is not
declared in the adapter's set**. Adapters with an empty set are byte-preserving
end-to-end (modulo the initial UTF-8 decode itself, which is captured by
``utf8_replace_invalid`` when applicable).
Adapters MAY add custom transformations beyond the reserved set; third-party
names SHOULD be prefixed with the adapter name (``cursor.composer_ordering``).
Custom transformations MUST expose a reference implementation under
``mempalace.sources.transforms.<adapter_name>_<transform_name>`` so the
conformance suite can locate and apply them.
"""
from __future__ import annotations
import re
from typing import Protocol, Union
class Transformation(Protocol):
"""Callable signature every reserved transformation conforms to.
Accepts the current stage of the pipeline — ``bytes`` on input
(``utf8_replace_invalid``) or ``str`` after decoding — and returns ``str``.
Adapters compose them in declaration order; the first step operates on the
original source bytes, every subsequent step on the prior step's output.
"""
def __call__(self, data: Union[bytes, str], /) -> str: ...
# ---------------------------------------------------------------------------
# Reserved transformations
# ---------------------------------------------------------------------------
def utf8_replace_invalid(raw: bytes) -> str:
"""Decode bytes as UTF-8; replace invalid sequences with U+FFFD.
Equivalent to ``raw.decode("utf-8", errors="replace")``. This is the one
reserved transformation that operates on bytes rather than decoded text.
"""
return raw.decode("utf-8", errors="replace")
def newline_normalize(text: str) -> str:
"""Convert CRLF and bare-CR line endings to LF."""
return text.replace("\r\n", "\n").replace("\r", "\n")
def whitespace_trim(text: str) -> str:
"""Strip leading and trailing whitespace at the record boundary only."""
return text.strip()
_RUN_OF_THREE_OR_MORE_BLANK = re.compile(r"(?:\n[ \t]*){3,}\n")
def whitespace_collapse_internal(text: str) -> str:
"""Collapse runs of three or more blank lines to exactly two blank lines.
A "blank line" here is a line containing only spaces or tabs. Single and
double blank-line runs are preserved.
"""
# Normalise inputs before collapsing: turn internal blank lines with
# whitespace content into pure \n so the regex matches consistently.
lines = text.split("\n")
normalised = "\n".join(line if line.strip() else "" for line in lines)
return _RUN_OF_THREE_OR_MORE_BLANK.sub("\n\n\n", normalised)
def line_trim(text: str) -> str:
"""Strip leading and trailing whitespace from each individual line."""
return "\n".join(line.strip() for line in text.split("\n"))
def line_join_spaces(text: str) -> str:
"""Join adjacent non-blank lines with a single space, preserving paragraph breaks.
Two lines separated by at least one blank line remain on separate lines;
runs of non-blank lines collapse into a single space-separated line.
"""
paragraphs = re.split(r"\n[ \t]*\n", text)
joined = [" ".join(line.strip() for line in p.split("\n") if line.strip()) for p in paragraphs]
return "\n\n".join(joined)
def blank_line_drop(text: str) -> str:
"""Drop blank lines between non-blank lines, keeping non-blank lines only."""
return "\n".join(line for line in text.split("\n") if line.strip())
# The following reserved transformations are declared in the spec but are
# deeply adapter-specific. Rather than guess a single reference implementation
# now, we provide identity shims that leave the input unchanged when no
# adapter-specific implementation is available. Adapters that declare these
# MUST either override with a concrete implementation or provide a namespaced
# reference under
# ``mempalace.sources.transforms.<adapter_name>_<transform_name>`` (per the
# module docstring). The conformance suite looks up the adapter-specific
# implementation first, falling back to these identity shims only when none
# exists.
def strip_tool_chrome(text: str) -> str:
"""Adapter-supplied: remove system tags, hook output, tool UI chrome.
The reference implementation here is intentionally an identity function
because the noise patterns differ per transcript format (Claude Code,
Codex, ChatGPT, Slack). The conversations adapter, when migrated, will
register a concrete reference implementation under
``mempalace.sources.transforms.conversations_strip_tool_chrome``.
"""
return text
def tool_result_truncate(text: str) -> str:
"""Adapter-supplied: head/tail window on tool output with a middle marker."""
return text
def tool_result_omitted(text: str) -> str:
"""Adapter-supplied: fully omit some tool outputs (e.g., Read/Edit/Write)."""
return text
def spellcheck_user(text: str) -> str:
"""Adapter-supplied: rewrite user turns via autocorrect.
Requires the optional ``spellcheck`` extra and a tokenizer; the spec does
not mandate a specific language model, so the reference is adapter-owned.
"""
return text
def synthesized_marker(text: str) -> str:
"""Adapter-supplied: adapter inserts its own strings (e.g., '[N lines omitted]')."""
return text
def speaker_role_assignment(text: str) -> str:
"""Adapter-supplied: multi-party speakers alternately assigned user/assistant."""
return text
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
# Reserved transformation name → reference implementation.
# Adapters look up by name to compose a round-trip pipeline during testing.
# The value conforms to the :class:`Transformation` protocol above; we type
# it as that Protocol rather than a concrete ``Callable`` so static checkers
# accept both the bytes→str (``utf8_replace_invalid``) and str→str shapes.
RESERVED_TRANSFORMATIONS: dict[str, Transformation] = {
"utf8_replace_invalid": utf8_replace_invalid,
"newline_normalize": newline_normalize,
"whitespace_trim": whitespace_trim,
"whitespace_collapse_internal": whitespace_collapse_internal,
"line_trim": line_trim,
"line_join_spaces": line_join_spaces,
"blank_line_drop": blank_line_drop,
"strip_tool_chrome": strip_tool_chrome,
"tool_result_truncate": tool_result_truncate,
"tool_result_omitted": tool_result_omitted,
"spellcheck_user": spellcheck_user,
"synthesized_marker": synthesized_marker,
"speaker_role_assignment": speaker_role_assignment,
}
def get_transformation(name: str) -> Transformation:
"""Resolve a reserved transformation by name.
Raises :class:`KeyError` if the name is neither reserved nor registered as
an adapter-namespaced reference (``<adapter>_<transform>``). Callers
looking for adapter-specific references SHOULD ``getattr`` on this module
first; this helper only covers the reserved names.
"""
try:
return RESERVED_TRANSFORMATIONS[name]
except KeyError as e:
raise KeyError(
f"unknown transformation {name!r}; reserved names: {sorted(RESERVED_TRANSFORMATIONS)}"
) from e
+6
View File
@@ -42,6 +42,12 @@ mempalace = "mempalace.cli:main"
[project.entry-points."mempalace.backends"]
chroma = "mempalace.backends.chroma:ChromaBackend"
# RFC 002 source-adapter entry-point group. Core publishes no first-party
# adapters under this group yet; ``miner.py`` and ``convo_miner.py`` migrate
# onto ``BaseSourceAdapter`` in a follow-up PR. Third-party adapter packages
# (``mempalace-source-cursor``, ``mempalace-source-git``, …) register here.
[project.entry-points."mempalace.sources"]
[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-cov>=4.0", "ruff>=0.4.0", "psutil>=5.9"]
spellcheck = ["autocorrect>=2.0"]
+467
View File
@@ -0,0 +1,467 @@
"""Tests for the RFC 002 source-adapter scaffolding."""
import pytest
from mempalace.sources import (
AdapterSchema,
BaseSourceAdapter,
DrawerRecord,
FieldSpec,
PalaceContext,
RouteHint,
SourceItemMetadata,
SourceRef,
SourceSummary,
available_adapters,
get_adapter,
get_adapter_class,
register,
reset_adapters,
resolve_adapter_for_source,
unregister,
)
from mempalace.sources.transforms import (
RESERVED_TRANSFORMATIONS,
blank_line_drop,
get_transformation,
line_join_spaces,
line_trim,
newline_normalize,
utf8_replace_invalid,
whitespace_collapse_internal,
whitespace_trim,
)
# ---------------------------------------------------------------------------
# Minimal conforming adapter used as a fixture across tests
# ---------------------------------------------------------------------------
class _TrivialAdapter(BaseSourceAdapter):
name = "_trivial"
adapter_version = "0.1.0"
capabilities = frozenset({"byte_preserving"})
supported_modes = frozenset({"whole_record"})
declared_transformations = frozenset()
default_privacy_class = "public"
def ingest(self, *, source, palace):
yield SourceItemMetadata(source_file=source.uri or "x", version="v1")
yield DrawerRecord(content="hello", source_file=source.uri or "x", chunk_index=0)
def describe_schema(self):
return AdapterSchema(
version="1.0",
fields={"example": FieldSpec(type="string", required=False, description="x")},
)
@pytest.fixture(autouse=True)
def _isolate_registry():
yield
reset_adapters()
for name in list(available_adapters()):
unregister(name)
# ---------------------------------------------------------------------------
# base.py — ABC + typed records
# ---------------------------------------------------------------------------
def test_base_adapter_is_abstract_without_required_methods():
with pytest.raises(TypeError):
class Incomplete(BaseSourceAdapter):
name = "incomplete"
Incomplete()
def test_conforming_adapter_instantiates_and_yields_typed_records():
adapter = _TrivialAdapter()
results = list(adapter.ingest(source=SourceRef(uri="foo"), palace=None))
assert len(results) == 2
assert isinstance(results[0], SourceItemMetadata)
assert isinstance(results[1], DrawerRecord)
assert results[1].content == "hello"
def test_is_current_default_is_false_always_reextracts():
adapter = _TrivialAdapter()
item = SourceItemMetadata(source_file="f", version="v1")
assert adapter.is_current(item=item, existing_metadata=None) is False
assert adapter.is_current(item=item, existing_metadata={"version": "v1"}) is False
def test_source_summary_default_uses_adapter_name():
adapter = _TrivialAdapter()
summary = adapter.source_summary(source=SourceRef(uri="x"))
assert isinstance(summary, SourceSummary)
assert summary.description == "_trivial"
def test_source_ref_options_default_is_empty_dict():
# Frozen dataclass must not share a default_factory=dict instance across instances.
a = SourceRef(uri="a")
b = SourceRef(uri="b")
a.options["touched"] = True
assert "touched" not in b.options
# ---------------------------------------------------------------------------
# transforms.py
# ---------------------------------------------------------------------------
def test_reserved_transformations_registry_has_all_13():
expected = {
"utf8_replace_invalid",
"newline_normalize",
"whitespace_trim",
"whitespace_collapse_internal",
"line_trim",
"line_join_spaces",
"blank_line_drop",
"strip_tool_chrome",
"tool_result_truncate",
"tool_result_omitted",
"spellcheck_user",
"synthesized_marker",
"speaker_role_assignment",
}
assert set(RESERVED_TRANSFORMATIONS) == expected
def test_utf8_replace_invalid_handles_bad_bytes():
# A lone 0xff byte is never valid UTF-8; U+FFFD should replace it.
assert utf8_replace_invalid(b"ok \xff end") == "ok \ufffd end"
def test_newline_normalize_converts_crlf_and_cr():
assert newline_normalize("a\r\nb\rc\nd") == "a\nb\nc\nd"
def test_whitespace_trim_strips_boundaries():
assert whitespace_trim(" hello\n\n") == "hello"
def test_whitespace_collapse_internal_caps_at_two_blanks():
# Five blanks collapses to exactly three newlines (two blank lines).
text = "a\n\n\n\n\nb"
assert whitespace_collapse_internal(text) == "a\n\n\nb"
def test_line_trim_strips_each_line():
assert line_trim(" a \n\t b \n c") == "a\nb\nc"
def test_line_join_spaces_preserves_paragraph_breaks():
text = "foo\nbar\nbaz\n\nqux\nquux"
assert line_join_spaces(text) == "foo bar baz\n\nqux quux"
def test_blank_line_drop_removes_blanks_only():
assert blank_line_drop("a\n\nb\n\n\nc") == "a\nb\nc"
def test_get_transformation_resolves_reserved_and_rejects_unknown():
assert get_transformation("newline_normalize") is newline_normalize
with pytest.raises(KeyError):
get_transformation("not_a_real_transformation")
# ---------------------------------------------------------------------------
# registry.py
# ---------------------------------------------------------------------------
def test_register_and_get_adapter_roundtrip():
register("_trivial", _TrivialAdapter)
assert "_trivial" in available_adapters()
inst = get_adapter("_trivial")
assert isinstance(inst, _TrivialAdapter)
# Cached: repeated calls return the same instance.
assert get_adapter("_trivial") is inst
def test_get_adapter_class_returns_class_not_instance():
register("_trivial", _TrivialAdapter)
assert get_adapter_class("_trivial") is _TrivialAdapter
def test_get_adapter_unknown_raises_key_error():
with pytest.raises(KeyError):
get_adapter("does-not-exist")
def test_unregister_drops_registration_and_cached_instance():
register("_trivial", _TrivialAdapter)
get_adapter("_trivial")
unregister("_trivial")
assert "_trivial" not in available_adapters()
with pytest.raises(KeyError):
get_adapter("_trivial")
def test_resolve_adapter_priority_order():
# Explicit wins over everything.
assert resolve_adapter_for_source(explicit="cursor", config_value="git") == "cursor"
# Config wins over default.
assert resolve_adapter_for_source(config_value="git") == "git"
# Default is filesystem (preserves existing ``mempalace mine <path>`` behavior).
assert resolve_adapter_for_source() == "filesystem"
# ---------------------------------------------------------------------------
# PalaceContext
# ---------------------------------------------------------------------------
class _FakeCollection:
def __init__(self):
self.upserts = []
def add(self, **kwargs):
pass
def upsert(self, **kwargs):
self.upserts.append(kwargs)
def query(self, **kwargs):
return {}
def get(self, **kwargs):
return {}
def delete(self, **kwargs):
pass
def count(self):
return 0
class _FakeKG:
def __init__(self):
self.triples = []
def add_triple(self, subject, predicate, obj, **kwargs):
self.triples.append((subject, predicate, obj, kwargs))
def test_palace_context_upsert_drawer_stamps_adapter_metadata():
drawers = _FakeCollection()
kg = _FakeKG()
ctx = PalaceContext(
drawer_collection=drawers,
knowledge_graph=kg,
palace_path="/tmp/palace",
adapter_name="test-adapter",
adapter_version="0.1.0",
)
record = DrawerRecord(
content="hello",
source_file="/abs/path/file.txt",
chunk_index=2,
metadata={"wing": "proj"},
)
ctx.upsert_drawer(record)
assert len(drawers.upserts) == 1
kwargs = drawers.upserts[0]
assert kwargs["documents"] == ["hello"]
assert len(kwargs["ids"]) == 1
meta = kwargs["metadatas"][0]
assert meta["wing"] == "proj"
assert meta["adapter_name"] == "test-adapter"
assert meta["adapter_version"] == "0.1.0"
assert meta["source_file"] == "/abs/path/file.txt"
assert meta["chunk_index"] == 2
def test_palace_context_drawer_id_is_sha256_prefix_not_sha1():
"""Guards against the pre-review sha1[:16]=64-bit id scheme.
64-bit ids sit close to the birthday bound for palace-sized corpora.
The helper uses sha256[:24]=96 bits so collision risk stays negligible.
"""
import hashlib
from mempalace.sources.context import _build_drawer_id
src = "/an/absolute/path/to/a/file.txt"
record = DrawerRecord(content="x", source_file=src, chunk_index=3)
drawer_id = _build_drawer_id(record)
expected_prefix = hashlib.sha256(src.encode("utf-8")).hexdigest()[:24]
assert drawer_id == f"{expected_prefix}_3"
# Negative: the old sha1 scheme MUST NOT produce the same id.
sha1_prefix = hashlib.sha1(src.encode("utf-8")).hexdigest()[:16]
assert drawer_id != f"{sha1_prefix}_3"
def test_palace_context_skip_current_item_sets_flag():
ctx = PalaceContext(
drawer_collection=_FakeCollection(),
knowledge_graph=_FakeKG(),
palace_path="/tmp/p",
)
assert ctx._skip_requested is False
ctx.skip_current_item()
assert ctx._skip_requested is True
def test_palace_context_emit_dispatches_to_hooks_and_swallows_errors():
calls = []
err_calls = []
def good_hook(event, **details):
calls.append((event, details))
def bad_hook(event, **details):
err_calls.append(event)
raise RuntimeError("hook exploded")
ctx = PalaceContext(
drawer_collection=_FakeCollection(),
knowledge_graph=_FakeKG(),
palace_path="/tmp/p",
progress_hooks=[good_hook, bad_hook],
)
ctx.emit("mined_file", path="a.txt", bytes=42)
assert calls == [("mined_file", {"path": "a.txt", "bytes": 42})]
assert err_calls == ["mined_file"] # was invoked; error was swallowed
def test_palace_context_uses_route_hint_when_present():
# Route hints are frozen dataclasses the adapter passes through.
hint = RouteHint(wing="proj", room="backend", hall="general")
assert hint.wing == "proj"
assert hint.room == "backend"
# ---------------------------------------------------------------------------
# KnowledgeGraph new provenance params (RFC 002 §5.5)
# ---------------------------------------------------------------------------
def test_knowledge_graph_add_triple_accepts_source_drawer_id_and_adapter_name(tmp_path):
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
try:
triple_id = kg.add_triple(
"Ben",
"committed",
"PR-567",
valid_from="2026-03-12",
source_file="github.com/org/repo#pr=567",
source_drawer_id="abc123_0",
adapter_name="git",
)
assert triple_id is not None
import sqlite3
conn = sqlite3.connect(str(tmp_path / "kg.sqlite3"))
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT source_drawer_id, adapter_name FROM triples WHERE id=?", (triple_id,)
).fetchone()
assert row["source_drawer_id"] == "abc123_0"
assert row["adapter_name"] == "git"
conn.close()
finally:
kg.close()
def test_knowledge_graph_fresh_schema_includes_new_columns(tmp_path):
"""Brand-new palaces should get source_drawer_id / adapter_name directly
from CREATE TABLE, not via a post-hoc ALTER. _migrate_schema exists only
for legacy palaces."""
import sqlite3
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "fresh.sqlite3"))
try:
conn = sqlite3.connect(str(tmp_path / "fresh.sqlite3"))
cols = {row[1] for row in conn.execute("PRAGMA table_info(triples)")}
conn.close()
assert "source_drawer_id" in cols
assert "adapter_name" in cols
finally:
kg.close()
def test_knowledge_graph_migration_adds_missing_columns_to_old_schema(tmp_path):
"""An old-schema triples table (pre-RFC 002) should auto-migrate on open."""
import sqlite3
db_path = tmp_path / "legacy.sqlite3"
conn = sqlite3.connect(str(db_path))
conn.executescript("""
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT DEFAULT 'unknown',
properties TEXT DEFAULT '{}',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE triples (
id TEXT PRIMARY KEY,
subject TEXT NOT NULL,
predicate TEXT NOT NULL,
object TEXT NOT NULL,
valid_from TEXT,
valid_to TEXT,
confidence REAL DEFAULT 1.0,
source_closet TEXT,
source_file TEXT,
extracted_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
conn.close()
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(db_path))
try:
# New columns must be present after _init_db runs the migration.
conn = sqlite3.connect(str(db_path))
cols = {row[1] for row in conn.execute("PRAGMA table_info(triples)")}
conn.close()
assert "source_drawer_id" in cols
assert "adapter_name" in cols
# New-column insert works.
kg.add_triple("a", "rel", "b", source_drawer_id="d0", adapter_name="x")
finally:
kg.close()
def test_knowledge_graph_add_triple_backwards_compatible_without_new_kwargs(tmp_path):
"""Existing callers that omit the RFC 002 kwargs keep working unchanged."""
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
try:
triple_id = kg.add_triple("Max", "likes", "trains")
assert triple_id is not None
finally:
kg.close()
# ---------------------------------------------------------------------------
# pyproject entry-point group is discoverable even when empty
# ---------------------------------------------------------------------------
def test_entry_point_group_exists_and_returns_zero_or_more_adapters():
# No in-tree first-party adapters yet (miners migrate in a follow-up PR),
# but the ``mempalace.sources`` entry-point group is declared so third-
# party packages can register. ``available_adapters`` MUST NOT raise.
adapters = available_adapters()
assert isinstance(adapters, list)