Skip to content

Index

wrknv.workspace

Multi-Repo Workspace Management Package

Manage configurations across multiple independent Git repositories.

Classes

RepoConfig

Configuration for a single repository in the workspace.

Functions
from_dict classmethod
from_dict(data: dict[str, Any]) -> RepoConfig

Create RepoConfig from dictionary.

Source code in wrknv/workspace/schema.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> RepoConfig:
    """Create RepoConfig from dictionary."""
    return cls(
        path=Path(data["path"]),
        name=data["name"],
        type=data["type"],
        template_profile=data["template_profile"],
        features=data.get("features", []),
        custom_values=data.get("custom_values", {}),
        last_sync=data.get("last_sync"),
        template_version=data.get("template_version"),
    )
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary, excluding None values for TOML compatibility.

Source code in wrknv/workspace/schema.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary, excluding None values for TOML compatibility."""
    data = {
        "path": str(self.path),
        "name": self.name,
        "type": self.type,
        "template_profile": self.template_profile,
        "features": self.features,
        "custom_values": self.custom_values,
        "last_sync": self.last_sync,
        "template_version": self.template_version,
    }
    # Filter out None values - TOML cannot serialize None
    return {k: v for k, v in data.items() if v is not None}

WorkspaceConfig

Bases: BaseConfig

Root workspace configuration.

Functions
add_repo
add_repo(repo: RepoConfig) -> WorkspaceConfig

Add repository to workspace.

Source code in wrknv/workspace/schema.py
def add_repo(self, repo: RepoConfig) -> WorkspaceConfig:
    """Add repository to workspace."""
    # Remove existing repo with same name
    repos = [r for r in self.repos if r.name != repo.name]
    repos.append(repo)

    return self.__class__(
        version=self.version,
        root=self.root,
        repos=repos,
        template_source=self.template_source,
        global_standards=self.global_standards,
        sync_strategy=self.sync_strategy,
    )
find_repo
find_repo(name: str) -> RepoConfig | None

Find repository by name.

Source code in wrknv/workspace/schema.py
def find_repo(self, name: str) -> RepoConfig | None:
    """Find repository by name."""
    for repo in self.repos:
        if repo.name == name:
            return repo
    return None
from_dict classmethod
from_dict(data: dict[str, Any]) -> WorkspaceConfig

Create WorkspaceConfig from dictionary.

Source code in wrknv/workspace/schema.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> WorkspaceConfig:
    """Create WorkspaceConfig from dictionary."""
    repos = [RepoConfig.from_dict(repo) for repo in data.get("repos", [])]

    template_source = None
    if "template_source" in data:
        template_source = TemplateSource.from_dict(data["template_source"])

    return cls(
        version=data.get("version", "1.0"),
        root=Path(data["root"]),
        repos=repos,
        template_source=template_source,
        global_standards=data.get("global_standards", {}),
        sync_strategy=data.get("sync_strategy", "manual"),
    )
get_outdated_repos
get_outdated_repos(
    current_version: str,
) -> list[RepoConfig]

Get repositories that need template updates.

Source code in wrknv/workspace/schema.py
def get_outdated_repos(self, current_version: str) -> list[RepoConfig]:
    """Get repositories that need template updates."""
    return [repo for repo in self.repos if repo.template_version != current_version]
get_repos_by_type
get_repos_by_type(repo_type: str) -> list[RepoConfig]

Get all repositories of a specific type.

Source code in wrknv/workspace/schema.py
def get_repos_by_type(self, repo_type: str) -> list[RepoConfig]:
    """Get all repositories of a specific type."""
    return [repo for repo in self.repos if repo.type == repo_type]
remove_repo
remove_repo(name: str) -> WorkspaceConfig

Remove repository from workspace.

Source code in wrknv/workspace/schema.py
def remove_repo(self, name: str) -> WorkspaceConfig:
    """Remove repository from workspace."""
    repos = [r for r in self.repos if r.name != name]

    return self.__class__(
        version=self.version,
        root=self.root,
        repos=repos,
        template_source=self.template_source,
        global_standards=self.global_standards,
        sync_strategy=self.sync_strategy,
    )
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary, excluding None values for TOML compatibility.

Source code in wrknv/workspace/schema.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary, excluding None values for TOML compatibility."""
    data = {
        "version": self.version,
        "root": str(self.root),
        "repos": [repo.to_dict() for repo in self.repos],
        "template_source": self.template_source.to_dict() if self.template_source else None,
        "global_standards": self.global_standards,
        "sync_strategy": self.sync_strategy,
    }
    # Filter out None values - TOML cannot serialize None
    return {k: v for k, v in data.items() if v is not None}

WorkspaceDiscovery

WorkspaceDiscovery(root: Path | None = None)

Discovers and analyzes repositories in workspace.

Source code in wrknv/workspace/discovery.py
def __init__(self, root: Path | None = None) -> None:
    self.root = root or Path.cwd()
Functions
analyze_repo
analyze_repo(path: Path) -> RepoInfo

Analyze a single repository.

Source code in wrknv/workspace/discovery.py
def analyze_repo(self, path: Path) -> RepoInfo:
    """Analyze a single repository."""
    has_git = (path / ".git").exists()
    has_pyproject = (path / "pyproject.toml").exists()

    name = None
    detected_type = None
    current_config = None

    if has_pyproject:
        try:
            with (path / "pyproject.toml").open("rb") as f:
                pyproject = tomllib.load(f)

            current_config = pyproject
            name = pyproject.get("project", {}).get("name")
            detected_type = self.detect_repo_type(pyproject, path)

        except Exception as e:
            logger.warning("⚠️ Failed to parse pyproject.toml", path=str(path), error=str(e))

    if not name:
        name = path.name

    return RepoInfo(
        path=path,
        name=name,
        has_git=has_git,
        has_pyproject=has_pyproject,
        detected_type=detected_type,
        current_config=current_config,
    )
detect_repo_type
detect_repo_type(
    pyproject: dict[str, Any], path: Path
) -> str | None

Determine repository type from pyproject.toml and path.

Source code in wrknv/workspace/discovery.py
def detect_repo_type(self, pyproject: dict[str, Any], path: Path) -> str | None:
    """Determine repository type from pyproject.toml and path."""
    project = pyproject.get("project", {})
    name = project.get("name", "")
    dependencies = project.get("dependencies", [])

    # Check name patterns
    if "provide-foundation" in name:
        return "foundation"
    elif "provide-testkit" in name:
        return "testkit"
    elif name.startswith("pyvider-"):
        return "pyvider-plugin"
    elif name == "pyvider":
        return "provider"
    elif "flavor" in name:
        return "packaging"

    # Check dependencies
    dep_str = " ".join(dependencies)
    if "provide-foundation" in dep_str:
        return "foundation-based"
    elif "pyvider" in dep_str:
        return "pyvider-plugin"

    # Check for specific files
    if (path / "src" / "pyvider").exists():
        return "provider"
    elif (path / "src" / "provide").exists():
        return "foundation-based"

    # Check classifiers
    classifiers = project.get("classifiers", [])
    for classifier in classifiers:
        if "Topic :: System :: Logging" in classifier:
            return "foundation-based"
        elif "Topic :: Software Development :: Build Tools" in classifier:
            return "packaging"

    logger.debug("🤷 Could not detect repo type", name=name)
    return "unknown"
discover_repos
discover_repos(
    patterns: list[str] | None = None,
) -> list[RepoInfo]

Find all Git repos with pyproject.toml.

Source code in wrknv/workspace/discovery.py
def discover_repos(self, patterns: list[str] | None = None) -> list[RepoInfo]:
    """Find all Git repos with pyproject.toml."""
    if patterns is None:
        patterns = ["*"]

    repos = []
    logger.info("🔍 Discovering repositories", root=str(self.root), patterns=patterns)

    for pattern in patterns:
        for path in self.root.glob(pattern):
            if path.is_dir() and path != self.root:
                repo_info = self.analyze_repo(path)
                if repo_info.has_git and repo_info.has_pyproject:
                    repos.append(repo_info)

    logger.info("📋 Discovered repositories", count=len(repos))
    return repos
get_repo_status
get_repo_status(repo_path: Path) -> dict[str, Any]

Get current status of repository.

Source code in wrknv/workspace/discovery.py
def get_repo_status(self, repo_path: Path) -> dict[str, Any]:
    """Get current status of repository."""
    status = {
        "path": str(repo_path),
        "exists": repo_path.exists(),
        "has_git": False,
        "has_pyproject": False,
        "has_workenv": False,
        "has_claude_md": False,
        "git_status": None,
    }

    if not repo_path.exists():
        return status

    status["has_git"] = (repo_path / ".git").exists()
    status["has_pyproject"] = (repo_path / "pyproject.toml").exists()
    status["has_workenv"] = (repo_path / "workenv").exists()
    status["has_claude_md"] = (repo_path / "CLAUDE.md").exists()

    # Get git status if available
    if status["has_git"]:
        status["git_status"] = self._get_git_status(repo_path)

    return status
get_workspace_summary
get_workspace_summary() -> dict[str, Any]

Get summary of workspace state.

Source code in wrknv/workspace/discovery.py
def get_workspace_summary(self) -> dict[str, Any]:
    """Get summary of workspace state."""
    repos = self.discover_repos()

    type_counts = {}
    for repo in repos:
        repo_type = repo.detected_type or "unknown"
        type_counts[repo_type] = type_counts.get(repo_type, 0) + 1

    return {
        "root": str(self.root),
        "total_repos": len(repos),
        "type_distribution": type_counts,
        "repos": [
            {
                "name": repo.name,
                "type": repo.detected_type,
                "path": str(repo.path.relative_to(self.root)),
            }
            for repo in repos
        ],
    }
validate_workspace_structure
validate_workspace_structure(root: Path) -> list[str]

Validate workspace structure and return issues.

Source code in wrknv/workspace/discovery.py
def validate_workspace_structure(self, root: Path) -> list[str]:
    """Validate workspace structure and return issues."""
    issues = []

    if not root.exists():
        issues.append(f"Workspace root does not exist: {root}")
        return issues

    # Check for workspace config
    workspace_config = root / ".wrknv" / "workspace.toml"
    if not workspace_config.exists():
        issues.append("No workspace.toml found in .wrknv/")

    # Check for common problems
    repos = self.discover_repos()

    for repo in repos:
        if not repo.has_git:
            issues.append(f"Repository {repo.name} is not a git repository")

        if not repo.has_pyproject:
            issues.append(f"Repository {repo.name} has no pyproject.toml")

        if repo.detected_type == "unknown":
            issues.append(f"Could not detect type for repository {repo.name}")

    return issues

WorkspaceManager

WorkspaceManager(root: Path | None = None)

Manage multi-repo workspaces.

Source code in wrknv/workspace/manager.py
def __init__(self, root: Path | None = None) -> None:
    self.root = root or Path.cwd()
    self.config_dir = self.root / ".wrknv"
    self.config_path = self.config_dir / "workspace.toml"
    self.discovery = WorkspaceDiscovery(self.root)
Functions
add_repo
add_repo(
    repo_path: Path | str,
    name: str | None = None,
    repo_type: str | None = None,
    template_profile: str | None = None,
) -> WorkspaceConfig

Add repository to workspace.

Source code in wrknv/workspace/manager.py
def add_repo(
    self,
    repo_path: Path | str,
    name: str | None = None,
    repo_type: str | None = None,
    template_profile: str | None = None,
) -> WorkspaceConfig:
    """Add repository to workspace."""
    repo_path = Path(repo_path)
    if not repo_path.exists():
        raise FileNotFoundError(f"Repository path does not exist: {repo_path}")

    # Analyze repository
    repo_info = self.discovery.analyze_repo(repo_path)

    # Create repo config
    repo_config = RepoConfig(
        path=repo_path,
        name=name or repo_info.name or repo_path.name,
        type=repo_type or repo_info.detected_type or "unknown",
        template_profile=template_profile or self._get_default_profile(repo_info.detected_type),
        features=self._get_default_features(repo_info.detected_type),
    )

    # Load current config and add repo
    config = self.load_config()
    if config is None:
        config = self.init_workspace(auto_discover=False)

    updated_config = config.add_repo(repo_config)
    self.save_config(updated_config)

    return updated_config
check_drift
check_drift() -> dict[str, Any]

Check for configuration drift.

Source code in wrknv/workspace/manager.py
def check_drift(self) -> dict[str, Any]:
    """Check for configuration drift."""
    config = self.load_config()
    if config is None:
        raise RuntimeError("No workspace configuration found")

    sync = WorkspaceSync(config)
    return sync.check_drift()
get_workspace_status
get_workspace_status() -> dict[str, Any]

Get comprehensive workspace status.

Source code in wrknv/workspace/manager.py
def get_workspace_status(self) -> dict[str, Any]:
    """Get comprehensive workspace status."""
    config = self.load_config()
    if config is None:
        return {"error": "No workspace configuration found"}

    summary = self.discovery.get_workspace_summary()
    issues = self.discovery.validate_workspace_structure(self.root)

    return {
        "root": str(self.root),
        "config_path": str(self.config_path),
        "repos_configured": len(config.repos),
        "repos_discovered": summary["total_repos"],
        "type_distribution": summary["type_distribution"],
        "issues": issues,
        "sync_strategy": config.sync_strategy,
        "template_source": config.template_source.to_dict() if config.template_source else None,
    }
init_workspace
init_workspace(
    template_source: str | None = None,
    auto_discover: bool = True,
) -> WorkspaceConfig

Initialize workspace in current directory.

Source code in wrknv/workspace/manager.py
def init_workspace(
    self, template_source: str | None = None, auto_discover: bool = True
) -> WorkspaceConfig:
    """Initialize workspace in current directory."""
    logger.info("🚀 Initializing workspace", root=str(self.root))

    # Create .wrknv directory
    self.config_dir.mkdir(exist_ok=True)

    # Auto-discover repositories if requested
    repos = []
    if auto_discover:
        discovered = self.discovery.discover_repos()
        repos = [
            RepoConfig(
                path=repo.path,
                name=repo.name or repo.path.name,
                type=repo.detected_type or "unknown",
                template_profile=self._get_default_profile(repo.detected_type),
                features=self._get_default_features(repo.detected_type),
            )
            for repo in discovered
        ]
        logger.info("📋 Auto-discovered repositories", count=len(repos))

    # Set up template source
    source = None
    if template_source:
        source = TemplateSource(
            type="local" if Path(template_source).exists() else "git",
            location=template_source,
        )

    # Create workspace config
    config = WorkspaceConfig(
        root=self.root,
        repos=repos,
        template_source=source,
        global_standards=self._get_default_standards(),
    )

    # Save configuration
    self.save_config(config)

    return config
load_config
load_config() -> WorkspaceConfig | None

Load workspace configuration.

Source code in wrknv/workspace/manager.py
def load_config(self) -> WorkspaceConfig | None:
    """Load workspace configuration."""
    if not self.config_path.exists():
        return None

    try:
        data = read_toml(self.config_path, default={})
        if not data:
            return None
        return WorkspaceConfig.from_dict(data)

    except Exception as e:
        logger.error("❌ Failed to load workspace config", error=str(e))
        return None
remove_repo
remove_repo(name: str) -> WorkspaceConfig

Remove repository from workspace.

Source code in wrknv/workspace/manager.py
def remove_repo(self, name: str) -> WorkspaceConfig:
    """Remove repository from workspace."""
    config = self.load_config()
    if config is None:
        raise RuntimeError("No workspace configuration found")

    updated_config = config.remove_repo(name)
    self.save_config(updated_config)

    return updated_config
save_config
save_config(config: WorkspaceConfig) -> None

Save workspace configuration.

Source code in wrknv/workspace/manager.py
def save_config(self, config: WorkspaceConfig) -> None:
    """Save workspace configuration."""
    try:
        # Convert to dictionary and write with foundation's atomic TOML writer
        write_toml(self.config_path, config.to_dict(), atomic=True)
        logger.debug("💾 Workspace config saved", path=str(self.config_path))

    except Exception as e:
        logger.error("❌ Failed to save workspace config", error=str(e))
        raise
sync_all async
sync_all(dry_run: bool = False) -> dict[str, Any]

Sync configurations across all repos.

Source code in wrknv/workspace/manager.py
async def sync_all(self, dry_run: bool = False) -> dict[str, Any]:
    """Sync configurations across all repos."""
    config = self.load_config()
    if config is None:
        raise RuntimeError("No workspace configuration found")

    sync = WorkspaceSync(config)
    return await sync.sync_all(dry_run=dry_run)
sync_repo async
sync_repo(
    name: str, dry_run: bool = False
) -> dict[str, Any]

Sync configuration for specific repository.

Source code in wrknv/workspace/manager.py
async def sync_repo(self, name: str, dry_run: bool = False) -> dict[str, Any]:
    """Sync configuration for specific repository."""
    config = self.load_config()
    if config is None:
        raise RuntimeError("No workspace configuration found")

    repo = config.find_repo(name)
    if repo is None:
        raise ValueError(f"Repository not found in workspace: {name}")

    sync = WorkspaceSync(config)
    return await sync.sync_repo(repo, dry_run=dry_run)

WorkspaceSync

WorkspaceSync(workspace_config: WorkspaceConfig)

Synchronize configurations across repositories.

Source code in wrknv/workspace/sync.py
def __init__(self, workspace_config: WorkspaceConfig) -> None:
    self.config = workspace_config
Functions
check_drift
check_drift() -> dict[str, Any]

Check for configuration drift across repositories.

Source code in wrknv/workspace/sync.py
def check_drift(self) -> dict[str, Any]:
    """Check for configuration drift across repositories."""
    logger.info("🔍 Checking configuration drift")

    drift_report = {
        "repos_checked": len(self.config.repos),
        "drift_detected": False,
        "repo_drifts": {},
    }

    for repo in self.config.repos:
        repo_drift = self._check_repo_drift(repo)
        if repo_drift["has_drift"]:
            drift_report["drift_detected"] = True
            drift_report["repo_drifts"][repo.name] = repo_drift

    return drift_report
sync_all async
sync_all(dry_run: bool = False) -> dict[str, Any]

Sync all repositories in workspace.

Source code in wrknv/workspace/sync.py
async def sync_all(self, dry_run: bool = False) -> dict[str, Any]:
    """Sync all repositories in workspace."""
    logger.info("🔄 Starting workspace sync", repos=len(self.config.repos), dry_run=dry_run)

    results = {}
    for repo in self.config.repos:
        try:
            result = await self.sync_repo(repo, dry_run)
            results[repo.name] = result
        except Exception as e:
            logger.error("❌ Failed to sync repo", repo=repo.name, error=str(e))
            results[repo.name] = {"error": str(e)}

    return results
sync_repo async
sync_repo(
    repo: RepoConfig, dry_run: bool = False
) -> dict[str, Any]

Sync single repository configuration.

Source code in wrknv/workspace/sync.py
async def sync_repo(self, repo: RepoConfig, dry_run: bool = False) -> dict[str, Any]:
    """Sync single repository configuration."""

    # Build template context
    context = self._build_template_context(repo)

    # Generate configurations
    configs = self._generate_configs(repo, context)

    # Apply changes
    changes = {}
    for filename, new_content in configs.items():
        file_path = repo.path / filename
        change_info = await self._apply_config_change(file_path, new_content, dry_run)
        changes[filename] = change_info

    result = {
        "repo": repo.name,
        "changes": changes,
        "files_updated": sum(1 for c in changes.values() if c.get("changed", False)),
    }

    logger.debug("📋 Repo sync result", **result)
    return result
validate_templates
validate_templates() -> bool

Validate that all templates are available and functional.

Source code in wrknv/workspace/sync.py
def validate_templates(self) -> bool:
    """Validate that all templates are available and functional."""

    try:
        # Test template generation with minimal context
        test_context = {
            "project_name": "test-project",
            "version": "1.0.0",
            "description": "Test project",
            "python_version": "3.11",
        }

        # Test each template
        self.template_generator.generate_pyproject(test_context)
        self.template_generator.generate_claude_md(test_context)
        self.template_generator.generate_gitignore(test_context)

        return True

    except Exception as e:
        logger.error("❌ Template validation failed", error=str(e))
        return False