fix(project-scanner): address review feedback
Agent-Logs-Url: https://github.com/MemPalace/mempalace/sessions/3c277c46-20b3-4a43-8eb7-8ee2eb3cb55a Co-authored-by: igorls <4753812+igorls@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
14d7444abe
commit
70d4c5471e
@@ -170,6 +170,12 @@ MANIFEST_PARSERS = {
|
|||||||
"Cargo.toml": _parse_cargo,
|
"Cargo.toml": _parse_cargo,
|
||||||
"go.mod": _parse_gomod,
|
"go.mod": _parse_gomod,
|
||||||
}
|
}
|
||||||
|
MANIFEST_PRIORITY = {
|
||||||
|
"pyproject.toml": 0,
|
||||||
|
"package.json": 1,
|
||||||
|
"Cargo.toml": 2,
|
||||||
|
"go.mod": 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
# ==================== GIT HELPERS ====================
|
# ==================== GIT HELPERS ====================
|
||||||
@@ -290,7 +296,6 @@ def _looks_like_real_name(name: str) -> bool:
|
|||||||
def _walk(root: Path, max_depth: int = MAX_DEPTH):
|
def _walk(root: Path, max_depth: int = MAX_DEPTH):
|
||||||
for dirpath, dirs, files in os.walk(root):
|
for dirpath, dirs, files in os.walk(root):
|
||||||
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")]
|
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")]
|
||||||
rel = Path(dirpath).relative_to(root) if dirpath != str(root) else Path(".")
|
|
||||||
try:
|
try:
|
||||||
rel = Path(dirpath).relative_to(root)
|
rel = Path(dirpath).relative_to(root)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@@ -302,17 +307,34 @@ def _walk(root: Path, max_depth: int = MAX_DEPTH):
|
|||||||
yield Path(dirpath), dirs, files
|
yield Path(dirpath), dirs, files
|
||||||
|
|
||||||
|
|
||||||
|
def _has_git_marker(path: Path) -> bool:
|
||||||
|
git_path = path / ".git"
|
||||||
|
return git_path.is_dir() or git_path.is_file()
|
||||||
|
|
||||||
|
|
||||||
|
def _manifest_sort_key(entry: tuple[str, str, Path], repo_root: Path) -> tuple[int, int, str]:
|
||||||
|
manifest_file, _project_name, manifest_dir = entry
|
||||||
|
try:
|
||||||
|
rel = manifest_dir.relative_to(repo_root)
|
||||||
|
depth = len(rel.parts)
|
||||||
|
rel_str = rel.as_posix()
|
||||||
|
except ValueError:
|
||||||
|
depth = MAX_DEPTH + 1
|
||||||
|
rel_str = manifest_dir.as_posix()
|
||||||
|
return (depth, MANIFEST_PRIORITY.get(manifest_file, len(MANIFEST_PRIORITY)), rel_str)
|
||||||
|
|
||||||
|
|
||||||
def find_git_repos(root: Path, max_depth: int = MAX_DEPTH) -> list[Path]:
|
def find_git_repos(root: Path, max_depth: int = MAX_DEPTH) -> list[Path]:
|
||||||
"""Return git repo roots under `root` (including root itself if it's a repo)."""
|
"""Return git repo roots under `root` (including root itself if it's a repo)."""
|
||||||
root = root.resolve()
|
root = root.resolve()
|
||||||
repos: list[Path] = []
|
repos: list[Path] = []
|
||||||
if (root / ".git").is_dir():
|
if _has_git_marker(root):
|
||||||
# Root is a repo — still walk for nested repos (submodules, etc.)
|
# Root is a repo — still walk for nested repos (submodules, etc.)
|
||||||
repos.append(root)
|
repos.append(root)
|
||||||
for dirpath, dirs, _ in _walk(root, max_depth):
|
for dirpath, dirs, _ in _walk(root, max_depth):
|
||||||
if dirpath == root:
|
if dirpath == root:
|
||||||
continue
|
continue
|
||||||
if (dirpath / ".git").is_dir():
|
if _has_git_marker(dirpath):
|
||||||
repos.append(dirpath)
|
repos.append(dirpath)
|
||||||
dirs.clear() # don't descend into this repo's contents from here
|
dirs.clear() # don't descend into this repo's contents from here
|
||||||
return repos
|
return repos
|
||||||
@@ -325,7 +347,7 @@ def _collect_manifest_names(repo_root: Path) -> list[tuple[str, str, Path]]:
|
|||||||
"""
|
"""
|
||||||
found: list[tuple[str, str, Path]] = []
|
found: list[tuple[str, str, Path]] = []
|
||||||
for dirpath, dirs, files in _walk(repo_root):
|
for dirpath, dirs, files in _walk(repo_root):
|
||||||
if dirpath != repo_root and (dirpath / ".git").is_dir():
|
if dirpath != repo_root and _has_git_marker(dirpath):
|
||||||
dirs.clear()
|
dirs.clear()
|
||||||
continue
|
continue
|
||||||
for fname in files:
|
for fname in files:
|
||||||
@@ -335,7 +357,7 @@ def _collect_manifest_names(repo_root: Path) -> list[tuple[str, str, Path]]:
|
|||||||
name = parser(dirpath / fname)
|
name = parser(dirpath / fname)
|
||||||
if name:
|
if name:
|
||||||
found.append((fname, name, dirpath))
|
found.append((fname, name, dirpath))
|
||||||
return found
|
return sorted(found, key=lambda entry: _manifest_sort_key(entry, repo_root))
|
||||||
|
|
||||||
|
|
||||||
# ==================== MAIN SCAN ====================
|
# ==================== MAIN SCAN ====================
|
||||||
@@ -437,21 +459,17 @@ def scan(root: str | os.PathLike) -> tuple[list[ProjectInfo], list[PersonInfo]]:
|
|||||||
|
|
||||||
for repo in repos:
|
for repo in repos:
|
||||||
manifests = _collect_manifest_names(repo)
|
manifests = _collect_manifest_names(repo)
|
||||||
root_level = [m for m in manifests if m[2] == repo]
|
if manifests:
|
||||||
if root_level:
|
|
||||||
manifest_file, proj_name, _ = root_level[0]
|
|
||||||
elif manifests:
|
|
||||||
manifest_file, proj_name, _ = manifests[0]
|
manifest_file, proj_name, _ = manifests[0]
|
||||||
else:
|
else:
|
||||||
manifest_file, proj_name = None, repo.name
|
manifest_file, proj_name = None, repo.name
|
||||||
|
|
||||||
authors = _git_authors(repo)
|
authors = _git_authors(repo)
|
||||||
total_commits = len(authors)
|
non_bot_authors = [(name, email) for name, email in authors if not _is_bot(name, email)]
|
||||||
|
total_commits = len(non_bot_authors)
|
||||||
user_commits = 0
|
user_commits = 0
|
||||||
author_counts: dict[str, int] = {}
|
author_counts: dict[str, int] = {}
|
||||||
for name, email in authors:
|
for name, email in non_bot_authors:
|
||||||
if _is_bot(name, email):
|
|
||||||
continue
|
|
||||||
author_counts[name] = author_counts.get(name, 0) + 1
|
author_counts[name] = author_counts.get(name, 0) + 1
|
||||||
all_commits.append((name, email, str(repo)))
|
all_commits.append((name, email, str(repo)))
|
||||||
if (me_name and name == me_name) or (me_email and email == me_email):
|
if (me_name and name == me_name) or (me_email and email == me_email):
|
||||||
|
|||||||
@@ -1,15 +1,20 @@
|
|||||||
"""Tests for mempalace.project_scanner."""
|
"""Tests for mempalace.project_scanner."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from mempalace.project_scanner import (
|
from mempalace.project_scanner import (
|
||||||
PersonInfo,
|
PersonInfo,
|
||||||
ProjectInfo,
|
ProjectInfo,
|
||||||
_dedupe_people,
|
_dedupe_people,
|
||||||
_is_bot,
|
_is_bot,
|
||||||
_looks_like_real_name,
|
_looks_like_real_name,
|
||||||
|
_collect_manifest_names,
|
||||||
_merge_detected,
|
_merge_detected,
|
||||||
_parse_cargo,
|
_parse_cargo,
|
||||||
_parse_gomod,
|
_parse_gomod,
|
||||||
@@ -184,15 +189,24 @@ def test_find_git_repos_detects_nested(tmp_path):
|
|||||||
|
|
||||||
|
|
||||||
def test_find_git_repos_skips_nested_inside_repo(tmp_path):
|
def test_find_git_repos_skips_nested_inside_repo(tmp_path):
|
||||||
"""If root is a repo and there's another repo inside it, the inner repo is
|
"""If root is a repo, nested repos are still discovered as separate roots."""
|
||||||
NOT walked into (we stop at the first repo boundary when descending)."""
|
|
||||||
(tmp_path / ".git").mkdir()
|
(tmp_path / ".git").mkdir()
|
||||||
deep = tmp_path / "a" / "b" / "nested-repo"
|
deep = tmp_path / "a" / "b" / "nested-repo"
|
||||||
deep.mkdir(parents=True)
|
deep.mkdir(parents=True)
|
||||||
(deep / ".git").mkdir()
|
(deep / ".git").mkdir()
|
||||||
repos = find_git_repos(tmp_path)
|
repos = find_git_repos(tmp_path)
|
||||||
# Root IS found; nested still discovered on its own branch (not inside root's .git)
|
|
||||||
assert tmp_path in repos
|
assert tmp_path in repos
|
||||||
|
assert deep in repos
|
||||||
|
|
||||||
|
|
||||||
|
def test_find_git_repos_detects_git_file_markers(tmp_path):
|
||||||
|
(tmp_path / ".git").write_text("gitdir: /tmp/root.git\n")
|
||||||
|
sub = tmp_path / "subproject"
|
||||||
|
sub.mkdir()
|
||||||
|
(sub / ".git").write_text("gitdir: /tmp/sub.git\n")
|
||||||
|
repos = find_git_repos(tmp_path)
|
||||||
|
assert tmp_path in repos
|
||||||
|
assert sub in repos
|
||||||
|
|
||||||
|
|
||||||
def test_find_git_repos_empty_dir(tmp_path):
|
def test_find_git_repos_empty_dir(tmp_path):
|
||||||
@@ -202,20 +216,35 @@ def test_find_git_repos_empty_dir(tmp_path):
|
|||||||
# ── scan ────────────────────────────────────────────────────────────────
|
# ── scan ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _require_git() -> None:
|
||||||
|
if shutil.which("git") is None:
|
||||||
|
pytest.skip("git executable not available")
|
||||||
|
|
||||||
|
|
||||||
|
def _git_commit(
|
||||||
|
path: Path, filename: str, content: str, message: str, name: str, email: str
|
||||||
|
) -> None:
|
||||||
|
_require_git()
|
||||||
|
env = {
|
||||||
|
**os.environ,
|
||||||
|
"GIT_AUTHOR_NAME": name,
|
||||||
|
"GIT_AUTHOR_EMAIL": email,
|
||||||
|
"GIT_COMMITTER_NAME": name,
|
||||||
|
"GIT_COMMITTER_EMAIL": email,
|
||||||
|
}
|
||||||
|
(path / filename).write_text(content)
|
||||||
|
subprocess.run(["git", "add", filename], cwd=path, check=True, env=env)
|
||||||
|
subprocess.run(["git", "commit", "-q", "-m", message], cwd=path, check=True, env=env)
|
||||||
|
|
||||||
|
|
||||||
def _init_git_repo(path: Path, name: str = "Jane Doe", email: str = "jane@example.com"):
|
def _init_git_repo(path: Path, name: str = "Jane Doe", email: str = "jane@example.com"):
|
||||||
"""Helper: init a git repo with one commit."""
|
"""Helper: init a git repo with one commit."""
|
||||||
|
_require_git()
|
||||||
subprocess.run(["git", "init", "-q"], cwd=path, check=True)
|
subprocess.run(["git", "init", "-q"], cwd=path, check=True)
|
||||||
subprocess.run(["git", "config", "user.name", name], cwd=path, check=True)
|
subprocess.run(["git", "config", "user.name", name], cwd=path, check=True)
|
||||||
subprocess.run(["git", "config", "user.email", email], cwd=path, check=True)
|
subprocess.run(["git", "config", "user.email", email], cwd=path, check=True)
|
||||||
subprocess.run(["git", "config", "commit.gpgsign", "false"], cwd=path, check=True)
|
subprocess.run(["git", "config", "commit.gpgsign", "false"], cwd=path, check=True)
|
||||||
(path / "README.md").write_text("hello")
|
_git_commit(path, "README.md", "hello", "initial", name, email)
|
||||||
subprocess.run(["git", "add", "README.md"], cwd=path, check=True)
|
|
||||||
subprocess.run(
|
|
||||||
["git", "commit", "-q", "-m", "initial"],
|
|
||||||
cwd=path,
|
|
||||||
check=True,
|
|
||||||
env={"GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, "PATH": "/usr/bin:/bin"},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_scan_project_from_package_json(tmp_path):
|
def test_scan_project_from_package_json(tmp_path):
|
||||||
@@ -234,6 +263,17 @@ def test_scan_project_from_pyproject(tmp_path):
|
|||||||
assert any(p.name == "pyproj" for p in projects)
|
assert any(p.name == "pyproj" for p in projects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_prefers_root_manifest_with_explicit_priority(tmp_path):
|
||||||
|
(tmp_path / "package.json").write_text(json.dumps({"name": "package-name"}))
|
||||||
|
(tmp_path / "pyproject.toml").write_text('[project]\nname = "pyproject-name"\n')
|
||||||
|
nested = tmp_path / "nested"
|
||||||
|
nested.mkdir()
|
||||||
|
(nested / "package.json").write_text(json.dumps({"name": "nested-name"}))
|
||||||
|
_init_git_repo(tmp_path)
|
||||||
|
projects, _ = scan(tmp_path)
|
||||||
|
assert projects[0].name == "pyproject-name"
|
||||||
|
|
||||||
|
|
||||||
def test_scan_fallback_to_dir_name_when_no_manifest(tmp_path):
|
def test_scan_fallback_to_dir_name_when_no_manifest(tmp_path):
|
||||||
repo = tmp_path / "my-repo-name"
|
repo = tmp_path / "my-repo-name"
|
||||||
repo.mkdir()
|
repo.mkdir()
|
||||||
@@ -252,6 +292,34 @@ def test_scan_manifest_only_no_git(tmp_path):
|
|||||||
assert people == []
|
assert people == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_collect_manifest_names_stops_at_git_file_boundary(tmp_path):
|
||||||
|
(tmp_path / ".git").write_text("gitdir: /tmp/root.git\n")
|
||||||
|
(tmp_path / "package.json").write_text(json.dumps({"name": "root-name"}))
|
||||||
|
nested = tmp_path / "nested"
|
||||||
|
nested.mkdir()
|
||||||
|
(nested / ".git").write_text("gitdir: /tmp/nested.git\n")
|
||||||
|
(nested / "package.json").write_text(json.dumps({"name": "nested-name"}))
|
||||||
|
manifests = _collect_manifest_names(tmp_path)
|
||||||
|
assert [name for _file, name, _dir in manifests] == ["root-name"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_excludes_bot_commits_from_totals(tmp_path):
|
||||||
|
(tmp_path / "package.json").write_text(json.dumps({"name": "my-app"}))
|
||||||
|
_init_git_repo(tmp_path, name="Jane Doe", email="jane@example.com")
|
||||||
|
_git_commit(
|
||||||
|
tmp_path,
|
||||||
|
"bot.txt",
|
||||||
|
"generated",
|
||||||
|
"bot update",
|
||||||
|
"github-actions[bot]",
|
||||||
|
"41898282+github-actions[bot]@users.noreply.github.com",
|
||||||
|
)
|
||||||
|
projects, people = scan(tmp_path)
|
||||||
|
assert projects[0].total_commits == 1
|
||||||
|
assert projects[0].user_commits == 1
|
||||||
|
assert [person.name for person in people] == ["Jane Doe"]
|
||||||
|
|
||||||
|
|
||||||
def test_scan_empty_dir(tmp_path):
|
def test_scan_empty_dir(tmp_path):
|
||||||
projects, people = scan(tmp_path)
|
projects, people = scan(tmp_path)
|
||||||
assert projects == []
|
assert projects == []
|
||||||
|
|||||||
Reference in New Issue
Block a user