6d8c462219
Fix E402 import ordering, F841 unused variable, F541 unnecessary f-strings, F401 unused import, and auto-format 6 files.
228 lines
7.0 KiB
Python
228 lines
7.0 KiB
Python
"""
|
|
palace_graph.py — Graph traversal layer for MemPalace
|
|
======================================================
|
|
|
|
Builds a navigable graph from the palace structure:
|
|
- Nodes = rooms (named ideas)
|
|
- Edges = shared rooms across wings (tunnels)
|
|
- Edge types = halls (the corridors)
|
|
|
|
Enables queries like:
|
|
"Start at chromadb-setup in wing_code, walk to wing_myproject"
|
|
"Find all rooms connected to riley-college-apps"
|
|
"What topics bridge wing_hardware and wing_myproject?"
|
|
|
|
No external graph DB needed — built from ChromaDB metadata.
|
|
"""
|
|
|
|
from collections import defaultdict, Counter
|
|
from .config import MempalaceConfig
|
|
|
|
import chromadb
|
|
|
|
|
|
def _get_collection(config=None):
|
|
config = config or MempalaceConfig()
|
|
try:
|
|
client = chromadb.PersistentClient(path=config.palace_path)
|
|
return client.get_collection(config.collection_name)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def build_graph(col=None, config=None):
|
|
"""
|
|
Build the palace graph from ChromaDB metadata.
|
|
|
|
Returns:
|
|
nodes: dict of {room: {wings: set, halls: set, count: int}}
|
|
edges: list of {room, wing_a, wing_b, hall} — one per tunnel crossing
|
|
"""
|
|
if col is None:
|
|
col = _get_collection(config)
|
|
if not col:
|
|
return {}, []
|
|
|
|
total = col.count()
|
|
room_data = defaultdict(lambda: {"wings": set(), "halls": set(), "count": 0, "dates": set()})
|
|
|
|
offset = 0
|
|
while offset < total:
|
|
batch = col.get(limit=1000, offset=offset, include=["metadatas"])
|
|
for meta in batch["metadatas"]:
|
|
room = meta.get("room", "")
|
|
wing = meta.get("wing", "")
|
|
hall = meta.get("hall", "")
|
|
date = meta.get("date", "")
|
|
if room and room != "general" and wing:
|
|
room_data[room]["wings"].add(wing)
|
|
if hall:
|
|
room_data[room]["halls"].add(hall)
|
|
if date:
|
|
room_data[room]["dates"].add(date)
|
|
room_data[room]["count"] += 1
|
|
if not batch["ids"]:
|
|
break
|
|
offset += len(batch["ids"])
|
|
|
|
# Build edges from rooms that span multiple wings
|
|
edges = []
|
|
for room, data in room_data.items():
|
|
wings = sorted(data["wings"])
|
|
if len(wings) >= 2:
|
|
for i, wa in enumerate(wings):
|
|
for wb in wings[i + 1 :]:
|
|
for hall in data["halls"]:
|
|
edges.append(
|
|
{
|
|
"room": room,
|
|
"wing_a": wa,
|
|
"wing_b": wb,
|
|
"hall": hall,
|
|
"count": data["count"],
|
|
}
|
|
)
|
|
|
|
# Convert sets to lists for JSON serialization
|
|
nodes = {}
|
|
for room, data in room_data.items():
|
|
nodes[room] = {
|
|
"wings": sorted(data["wings"]),
|
|
"halls": sorted(data["halls"]),
|
|
"count": data["count"],
|
|
"dates": sorted(data["dates"])[-5:] if data["dates"] else [],
|
|
}
|
|
|
|
return nodes, edges
|
|
|
|
|
|
def traverse(start_room: str, col=None, config=None, max_hops: int = 2):
|
|
"""
|
|
Walk the graph from a starting room. Find connected rooms
|
|
through shared wings.
|
|
|
|
Returns list of paths: [{room, wing, hall, hop_distance}]
|
|
"""
|
|
nodes, edges = build_graph(col, config)
|
|
|
|
if start_room not in nodes:
|
|
return {
|
|
"error": f"Room '{start_room}' not found",
|
|
"suggestions": _fuzzy_match(start_room, nodes),
|
|
}
|
|
|
|
start = nodes[start_room]
|
|
visited = {start_room}
|
|
results = [
|
|
{
|
|
"room": start_room,
|
|
"wings": start["wings"],
|
|
"halls": start["halls"],
|
|
"count": start["count"],
|
|
"hop": 0,
|
|
}
|
|
]
|
|
|
|
# BFS traversal
|
|
frontier = [(start_room, 0)]
|
|
while frontier:
|
|
current_room, depth = frontier.pop(0)
|
|
if depth >= max_hops:
|
|
continue
|
|
|
|
current = nodes.get(current_room, {})
|
|
current_wings = set(current.get("wings", []))
|
|
|
|
# Find all rooms that share a wing with current room
|
|
for room, data in nodes.items():
|
|
if room in visited:
|
|
continue
|
|
shared_wings = current_wings & set(data["wings"])
|
|
if shared_wings:
|
|
visited.add(room)
|
|
results.append(
|
|
{
|
|
"room": room,
|
|
"wings": data["wings"],
|
|
"halls": data["halls"],
|
|
"count": data["count"],
|
|
"hop": depth + 1,
|
|
"connected_via": sorted(shared_wings),
|
|
}
|
|
)
|
|
if depth + 1 < max_hops:
|
|
frontier.append((room, depth + 1))
|
|
|
|
# Sort by relevance (hop distance, then count)
|
|
results.sort(key=lambda x: (x["hop"], -x["count"]))
|
|
return results[:50] # cap results
|
|
|
|
|
|
def find_tunnels(wing_a: str = None, wing_b: str = None, col=None, config=None):
|
|
"""
|
|
Find rooms that connect two wings (or all tunnel rooms if no wings specified).
|
|
These are the "hallways" — same named idea appearing in multiple domains.
|
|
"""
|
|
nodes, edges = build_graph(col, config)
|
|
|
|
tunnels = []
|
|
for room, data in nodes.items():
|
|
wings = data["wings"]
|
|
if len(wings) < 2:
|
|
continue
|
|
|
|
if wing_a and wing_a not in wings:
|
|
continue
|
|
if wing_b and wing_b not in wings:
|
|
continue
|
|
|
|
tunnels.append(
|
|
{
|
|
"room": room,
|
|
"wings": wings,
|
|
"halls": data["halls"],
|
|
"count": data["count"],
|
|
"recent": data["dates"][-1] if data["dates"] else "",
|
|
}
|
|
)
|
|
|
|
tunnels.sort(key=lambda x: -x["count"])
|
|
return tunnels[:50]
|
|
|
|
|
|
def graph_stats(col=None, config=None):
|
|
"""Summary statistics about the palace graph."""
|
|
nodes, edges = build_graph(col, config)
|
|
|
|
tunnel_rooms = sum(1 for n in nodes.values() if len(n["wings"]) >= 2)
|
|
wing_counts = Counter()
|
|
for data in nodes.values():
|
|
for w in data["wings"]:
|
|
wing_counts[w] += 1
|
|
|
|
return {
|
|
"total_rooms": len(nodes),
|
|
"tunnel_rooms": tunnel_rooms,
|
|
"total_edges": len(edges),
|
|
"rooms_per_wing": dict(wing_counts.most_common()),
|
|
"top_tunnels": [
|
|
{"room": r, "wings": d["wings"], "count": d["count"]}
|
|
for r, d in sorted(nodes.items(), key=lambda x: -len(x[1]["wings"]))[:10]
|
|
if len(d["wings"]) >= 2
|
|
],
|
|
}
|
|
|
|
|
|
def _fuzzy_match(query: str, nodes: dict, n: int = 5):
|
|
"""Find rooms that approximately match a query string."""
|
|
query_lower = query.lower()
|
|
scored = []
|
|
for room in nodes:
|
|
# Simple substring matching
|
|
if query_lower in room:
|
|
scored.append((room, 1.0))
|
|
elif any(word in room for word in query_lower.split("-")):
|
|
scored.append((room, 0.5))
|
|
scored.sort(key=lambda x: -x[1])
|
|
return [r for r, _ in scored[:n]]
|