diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 77611128b5..e6cc0cd1e4 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4669,6 +4669,15 @@ def workflow_run( engine = WorkflowEngine(project_root) engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") + # Check if workflow is disabled in registry before loading + from .workflows.catalog import WorkflowRegistry + wf_registry = WorkflowRegistry(project_root) + wf_meta = wf_registry.get(source) + if isinstance(wf_meta, dict) and not wf_meta.get("enabled", True): + console.print(f"[red]Error:[/red] Workflow '{source}' is disabled") + console.print(f"\nTo re-enable: specify workflow enable {source}") + raise typer.Exit(1) + try: definition = engine.load_workflow(source) except FileNotFoundError: @@ -4678,6 +4687,14 @@ def workflow_run( console.print(f"[red]Error:[/red] Invalid workflow: {exc}") raise typer.Exit(1) + # Also check by loaded definition ID (covers file-path invocations) + if definition.id and definition.id != source: + wf_meta_by_id = wf_registry.get(definition.id) + if isinstance(wf_meta_by_id, dict) and not wf_meta_by_id.get("enabled", True): + console.print(f"[red]Error:[/red] Workflow '{definition.id}' is disabled") + console.print(f"\nTo re-enable: specify workflow enable {definition.id}") + raise typer.Exit(1) + # Validate errors = engine.validate(definition) if errors: @@ -4844,21 +4861,109 @@ def workflow_list(): console.print("\n[bold cyan]Installed Workflows:[/bold cyan]\n") for wf_id, wf_data in installed.items(): - console.print(f" [bold]{wf_data.get('name', wf_id)}[/bold] ({wf_id}) v{wf_data.get('version', '?')}") + if not isinstance(wf_data, dict): + console.print( + f" [yellow]Warning:[/yellow] Skipping malformed workflow metadata for '{wf_id}' " + f"(expected object, got {type(wf_data).__name__})" + ) + continue + enabled = wf_data.get("enabled", True) + status = "" if enabled else " [yellow](disabled)[/yellow]" + console.print(f" [bold]{wf_data.get('name', wf_id)}[/bold] ({wf_id}) v{wf_data.get('version', '?')}{status}") desc = wf_data.get("description", "") if desc: console.print(f" {desc}") console.print() +def _is_loopback_host(hostname: str) -> bool: + """Check if a hostname is a loopback address.""" + from ipaddress import ip_address + + if hostname == "localhost": + return True + try: + return ip_address(hostname).is_loopback + except ValueError: + return False + + +def _validate_url_scheme(url: str, *, allow_http_loopback: bool = True) -> None: + """Raise ValueError if URL is not HTTPS, optionally allowing HTTP loopback URLs.""" + from urllib.parse import urlparse + + parsed = urlparse(url) + if not parsed.hostname: + raise ValueError(f"URL has no hostname: {url}") + host = parsed.hostname + if parsed.scheme == "https": + return + if allow_http_loopback and parsed.scheme == "http" and _is_loopback_host(host): + return + raise ValueError(f"Non-HTTPS URL not allowed: {url}") + + +def _download_validated(source_url: str, destination: Path) -> None: + """Download a URL to *destination*, rejecting insecure redirects before following them.""" + import shutil as _shutil + from urllib.parse import urlparse + from urllib.request import build_opener, HTTPRedirectHandler # noqa: S310 + + _validate_url_scheme(source_url) + + # Only allow HTTP loopback redirects if the source was already HTTP loopback + source_parsed = urlparse(source_url) + source_allows_http_loopback = ( + source_parsed.scheme == "http" + and source_parsed.hostname is not None + and _is_loopback_host(source_parsed.hostname) + ) + + class _SafeRedirectHandler(HTTPRedirectHandler): + def redirect_request(self, req, fp, code, msg, headers, newurl): + redirect_req = super().redirect_request(req, fp, code, msg, headers, newurl) + if redirect_req is None: + return None + redirect_url = redirect_req.full_url + try: + _validate_url_scheme( + redirect_url, + allow_http_loopback=source_allows_http_loopback, + ) + except ValueError: + raise ValueError(f"Redirected to non-HTTPS: {redirect_url}") + return redirect_req + + opener = build_opener(_SafeRedirectHandler) + with opener.open(source_url, timeout=30) as resp: + with destination.open("wb") as dest_file: + _shutil.copyfileobj(resp, dest_file) + + @workflow_app.command("add") def workflow_add( - source: str = typer.Argument(..., help="Workflow ID, URL, or local path"), + source: Optional[str] = typer.Argument(None, help="Workflow ID, URL, or local path"), + dev: bool = typer.Option(False, "--dev", help="Install from local directory"), + from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), ): """Install a workflow from catalog, URL, or local path.""" from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError from .workflows.engine import WorkflowDefinition + # Validate source selection and disallow incompatible options + if dev and from_url: + console.print("[red]Error:[/red] --dev cannot be combined with --from") + raise typer.Exit(1) + if source and from_url: + console.print("[red]Error:[/red] Cannot combine a positional source with --from") + raise typer.Exit(1) + if dev and not source: + console.print("[red]Error:[/red] --dev requires a local workflow path") + raise typer.Exit(1) + if not source and not from_url: + console.print("[red]Error:[/red] Provide a workflow ID/URL/path, or use --from ") + raise typer.Exit(1) + project_root = Path.cwd() specify_dir = project_root / ".specify" if not specify_dir.exists(): @@ -4899,53 +5004,78 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: }) console.print(f"[green]✓[/green] Workflow '{definition.name}' ({definition.id}) installed") - # Try as URL (http/https) - if source.startswith("http://") or source.startswith("https://"): - from ipaddress import ip_address - from urllib.parse import urlparse - from urllib.request import urlopen # noqa: S310 - - parsed_src = urlparse(source) - src_host = parsed_src.hostname or "" - src_loopback = src_host == "localhost" - if not src_loopback: - try: - src_loopback = ip_address(src_host).is_loopback - except ValueError: - # Host is not an IP literal (e.g., a DNS name); keep default non-loopback. - pass - if parsed_src.scheme != "https" and not (parsed_src.scheme == "http" and src_loopback): - console.print("[red]Error:[/red] Only HTTPS URLs are allowed, except HTTP for localhost.") + # --dev: install from local directory + if dev: + source_path = Path(source).expanduser().resolve() + if not source_path.exists(): + console.print(f"[red]Error:[/red] Path not found: {source_path}") + raise typer.Exit(1) + + if source_path.is_file() and source_path.suffix in (".yml", ".yaml"): + _validate_and_install_local(source_path, str(source_path)) + elif source_path.is_dir(): + wf_file = source_path / "workflow.yml" + if not wf_file.exists(): + console.print(f"[red]Error:[/red] No workflow.yml found in {source_path}") + raise typer.Exit(1) + _validate_and_install_local(wf_file, str(source_path)) + else: + console.print(f"[red]Error:[/red] Path is not a YAML file or directory: {source_path}") raise typer.Exit(1) + return + + # --from: install from custom URL + if from_url: + try: + _validate_url_scheme(from_url) + except ValueError as exc: + console.print(f"[red]Error:[/red] Invalid URL: {exc}") + console.print("URL must use HTTPS for security. HTTP is only allowed for localhost/loopback URLs.") + raise typer.Exit(1) + + console.print("[yellow]Warning:[/yellow] Installing from external URL.") + console.print("Only install workflows from sources you trust.\n") import tempfile + tmp_path = None try: - with urlopen(source, timeout=30) as resp: # noqa: S310 - final_url = resp.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_lb = final_host == "localhost" - if not final_lb: - try: - final_lb = ip_address(final_host).is_loopback - except ValueError: - # Redirect host is not an IP literal; keep loopback as determined above. - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): - console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}") - raise typer.Exit(1) - with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: - tmp.write(resp.read()) - tmp_path = Path(tmp.name) + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp_path = Path(tmp.name) + _download_validated(from_url, tmp_path) + _validate_and_install_local(tmp_path, from_url) except typer.Exit: raise except Exception as exc: console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") raise typer.Exit(1) + finally: + if tmp_path: + tmp_path.unlink(missing_ok=True) + return + + # Try as URL (http/https) + if source.startswith("http://") or source.startswith("https://"): + try: + _validate_url_scheme(source) + except ValueError: + console.print("[red]Error:[/red] Only HTTPS URLs are allowed, except HTTP for localhost/loopback.") + raise typer.Exit(1) + + import tempfile + tmp_path = None try: + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp_path = Path(tmp.name) + _download_validated(source, tmp_path) _validate_and_install_local(tmp_path, source) + except typer.Exit: + raise + except Exception as exc: + console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") + raise typer.Exit(1) finally: - tmp_path.unlink(missing_ok=True) + if tmp_path: + tmp_path.unlink(missing_ok=True) return # Try as a local file/directory @@ -4985,21 +5115,9 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) # Validate URL scheme (HTTPS required, HTTP allowed for localhost only) - from ipaddress import ip_address - from urllib.parse import urlparse - - parsed_url = urlparse(workflow_url) - url_host = parsed_url.hostname or "" - is_loopback = False - if url_host == "localhost": - is_loopback = True - else: - try: - is_loopback = ip_address(url_host).is_loopback - except ValueError: - # Host is not an IP literal (e.g., a regular hostname); treat as non-loopback. - pass - if parsed_url.scheme != "https" and not (parsed_url.scheme == "http" and is_loopback): + try: + _validate_url_scheme(workflow_url) + except ValueError: console.print( f"[red]Error:[/red] Workflow '{source}' has an invalid install URL. " "Only HTTPS URLs are allowed, except HTTP for localhost/loopback." @@ -5007,7 +5125,11 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) workflow_dir = workflows_dir / source - # Validate that source is a safe directory name (no path traversal) + # Validate workflow ID format and path safety + from .workflows.engine import is_valid_workflow_id + if not is_valid_workflow_id(source): + console.print(f"[red]Error:[/red] Invalid workflow ID: {source!r}") + raise typer.Exit(1) try: workflow_dir.resolve().relative_to(workflows_dir.resolve()) except ValueError: @@ -5016,30 +5138,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: workflow_file = workflow_dir / "workflow.yml" try: - from urllib.request import urlopen # noqa: S310 — URL comes from catalog - workflow_dir.mkdir(parents=True, exist_ok=True) - with urlopen(workflow_url, timeout=30) as response: # noqa: S310 - # Validate final URL after redirects - final_url = response.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_loopback = final_host == "localhost" - if not final_loopback: - try: - final_loopback = ip_address(final_host).is_loopback - except ValueError: - # Host is not an IP literal (e.g., a regular hostname); treat as non-loopback. - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_loopback): - if workflow_dir.exists(): - import shutil - shutil.rmtree(workflow_dir, ignore_errors=True) - console.print( - f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}" - ) - raise typer.Exit(1) - workflow_file.write_bytes(response.read()) + _download_validated(workflow_url, workflow_file) except Exception as exc: if workflow_dir.exists(): import shutil @@ -5121,6 +5221,7 @@ def workflow_remove( def workflow_search( query: str | None = typer.Argument(None, help="Search query"), tag: str | None = typer.Option(None, "--tag", help="Filter by tag"), + author: str | None = typer.Option(None, "--author", help="Filter by author"), ): """Search workflow catalogs.""" from .workflows.catalog import WorkflowCatalog, WorkflowCatalogError @@ -5132,7 +5233,7 @@ def workflow_search( catalog = WorkflowCatalog(project_root) try: - results = catalog.search(query=query, tag=tag) + results = catalog.search(query=query, tag=tag, author=author) except WorkflowCatalogError as exc: console.print(f"[red]Error:[/red] {exc}") raise typer.Exit(1) @@ -5226,6 +5327,328 @@ def workflow_info( raise typer.Exit(1) +@workflow_app.command("update") +def workflow_update( + workflow_id: Optional[str] = typer.Argument(None, help="Workflow ID to update (or all)"), +): + """Update workflow(s) to latest version.""" + from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError + from .workflows.engine import WorkflowDefinition, validate_workflow + from packaging import version as pkg_version + import shutil + import tempfile + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + catalog = WorkflowCatalog(project_root) + workflows_dir = project_root / ".specify" / "workflows" + + installed = registry.list() + if workflow_id: + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + workflows_to_update = [workflow_id] + else: + workflows_to_update = list(installed.keys()) + + if not workflows_to_update: + console.print("[yellow]No workflows installed[/yellow]") + raise typer.Exit(0) + + console.print("🔄 Checking for updates...\n") + + updates_available = [] + catalog_errors = 0 + skipped = 0 + for wf_id in workflows_to_update: + metadata = installed.get(wf_id, {}) + if not isinstance(metadata, dict): + console.print(f"⚠ {wf_id}: Malformed workflow registry entry (skipping)") + skipped += 1 + continue + installed_ver_str = str(metadata.get("version", "0.0.0")) + try: + installed_version = pkg_version.Version(installed_ver_str) + except (pkg_version.InvalidVersion, TypeError): + console.print(f"⚠ {wf_id}: Invalid installed version '{installed_ver_str}' (skipping)") + skipped += 1 + continue + + try: + cat_info = catalog.get_workflow_info(wf_id) + except WorkflowCatalogError as exc: + console.print(f"⚠ {wf_id}: Catalog error: {exc}") + catalog_errors += 1 + cat_info = None + + if not cat_info: + console.print(f"⚠ {wf_id}: Not found in catalog (skipping)") + skipped += 1 + continue + + if not cat_info.get("_install_allowed", True): + console.print(f"⚠ {wf_id}: Updates not allowed from '{cat_info.get('_catalog_name', 'catalog')}' (skipping)") + skipped += 1 + continue + + cat_ver_str = str(cat_info.get("version", "0.0.0")) + try: + catalog_version = pkg_version.Version(cat_ver_str) + except (pkg_version.InvalidVersion, TypeError): + console.print(f"⚠ {wf_id}: Invalid catalog version '{cat_ver_str}' (skipping)") + skipped += 1 + continue + + if catalog_version > installed_version: + updates_available.append({ + "id": wf_id, + "name": cat_info.get("name", wf_id), + "installed": str(installed_version), + "available": str(catalog_version), + "url": cat_info.get("url"), + "catalog_name": cat_info.get("_catalog_name", ""), + "description": cat_info.get("description", ""), + }) + else: + console.print(f"✓ {wf_id}: Up to date (v{installed_version})") + + if not updates_available: + if catalog_errors: + console.print(f"\n[yellow]Could not check {catalog_errors} workflow(s) due to catalog errors[/yellow]") + raise typer.Exit(1) + if skipped: + console.print(f"\n[green]No updates found[/green] ({skipped} workflow(s) skipped)") + else: + console.print("\n[green]All workflows are up to date![/green]") + raise typer.Exit(0) + + console.print("\n[bold]Updates available:[/bold]\n") + for update in updates_available: + console.print(f" • {update['id']}: {update['installed']} → {update['available']}") + + console.print() + confirm = typer.confirm("Update these workflows?") + if not confirm: + console.print("Cancelled") + raise typer.Exit(0) + + console.print() + had_failures = False + for update in updates_available: + wf_id = update["id"] + wf_url = update.get("url") + if not wf_url: + console.print(f"⚠ {wf_id}: No download URL in catalog (skipping)") + continue + + console.print(f"📦 Updating {update['name']}...") + + try: + _validate_url_scheme(wf_url) + except ValueError: + console.print(f"⚠ {wf_id}: Invalid URL scheme (skipping)") + continue + + wf_dir = workflows_dir / wf_id + # Validate workflow ID format and path safety + from .workflows.engine import is_valid_workflow_id + if not is_valid_workflow_id(wf_id): + console.print(f"⚠ {wf_id}: Invalid workflow ID format (skipping)") + continue + try: + wf_dir.resolve().relative_to(workflows_dir.resolve()) + except ValueError: + console.print(f"⚠ {wf_id}: Invalid workflow ID (skipping)") + continue + try: + # Ensure workflows directory exists before creating temp dir + workflows_dir.mkdir(parents=True, exist_ok=True) + # Download and validate in a temporary directory so failures do not + # leave a partially-created workflow directory behind. + with tempfile.TemporaryDirectory(prefix="wf-update-", dir=workflows_dir) as tmp_dir: + staged_dir = Path(tmp_dir) / wf_id + staged_dir.mkdir(parents=True, exist_ok=True) + wf_file = staged_dir / "workflow.yml" + _download_validated(wf_url, wf_file) + + # Validate staged contents before replacing the live workflow + definition = WorkflowDefinition.from_yaml(wf_file) + if definition.id != wf_id: + raise ValueError( + f"Workflow ID mismatch: expected '{wf_id}', got '{definition.id}'" + ) + errors = validate_workflow(definition) + if errors: + raise ValueError(f"Validation failed: {'; '.join(errors)}") + + # Atomic swap: move old to hidden backup, move staged in + backup_dir = None + if wf_dir.exists(): + backup_parent = workflows_dir / ".backup" + # Guard against symlink attacks + if backup_parent.exists() and backup_parent.is_symlink(): + raise ValueError(".backup is a symlink; refusing to use it") + backup_parent.mkdir(parents=True, exist_ok=True) + backup_dir = backup_parent / wf_id + try: + backup_dir.resolve().relative_to(workflows_dir.resolve()) + except ValueError: + raise ValueError(f"Backup path escapes workflows directory: {backup_dir}") + if backup_dir.exists(): + shutil.rmtree(backup_dir) + wf_dir.rename(backup_dir) + try: + shutil.move(str(staged_dir), str(wf_dir)) + except BaseException: + # Restore from backup if swap fails (including KeyboardInterrupt) + if backup_dir and backup_dir.exists(): + if wf_dir.exists(): + shutil.rmtree(wf_dir) + backup_dir.rename(wf_dir) + raise + + # Snapshot registry file, then update; restore both on failure + import os + registry_file = registry.registry_path + registry_snapshot = None + try: + if registry_file.exists(): + fd, snap = tempfile.mkstemp( + prefix="registry-", suffix=".json", dir=workflows_dir + ) + os.close(fd) + registry_snapshot = Path(snap) + shutil.copy2(registry_file, registry_snapshot) + + existing_entry = installed.get(wf_id, {}) + description = ( + definition.description + or update.get("description") + or (existing_entry.get("description", "") if isinstance(existing_entry, dict) else "") + ) + + registry.update(wf_id, { + "version": update["available"], + "name": definition.name or update["name"], + "description": description, + }) + except BaseException: + # Restore registry snapshot + if registry_snapshot and registry_snapshot.exists(): + try: + os.replace(registry_snapshot, registry_file) + except OSError: + pass + registry_snapshot = None + # Reload in-memory state from restored file + registry.data = registry._load() + # Restore workflow from backup, or remove new dir if no backup existed + if backup_dir and backup_dir.exists(): + if wf_dir.exists(): + shutil.rmtree(wf_dir) + backup_dir.rename(wf_dir) + elif wf_dir.exists() and not backup_dir: + # No prior directory existed; remove the newly placed one + shutil.rmtree(wf_dir, ignore_errors=True) + raise + finally: + if registry_snapshot and registry_snapshot.exists(): + try: + registry_snapshot.unlink() + except OSError: + pass + # Clean up backup after fully successful update + if backup_dir and backup_dir.exists(): + try: + shutil.rmtree(backup_dir) + except OSError as cleanup_exc: + console.print(f" [yellow]⚠[/yellow] {wf_id}: Updated successfully, but failed to remove backup: {cleanup_exc}") + # Clean up empty .backup directory + backup_parent = workflows_dir / ".backup" + if backup_parent.exists(): + try: + backup_parent.rmdir() # only succeeds if empty + except OSError: + pass + console.print(f" [green]✓[/green] {wf_id}: {update['installed']} → {update['available']}") + + except Exception as exc: + console.print(f" [red]✗[/red] {wf_id}: {exc}") + had_failures = True + + if had_failures: + raise typer.Exit(1) + + +@workflow_app.command("enable") +def workflow_enable( + workflow_id: str = typer.Argument(..., help="Workflow ID to enable"), +): + """Enable a disabled workflow.""" + from .workflows.catalog import WorkflowRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + + metadata = registry.get(workflow_id) + if not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' has corrupted registry metadata") + raise typer.Exit(1) + if metadata.get("enabled", True): + console.print(f"[yellow]Workflow '{workflow_id}' is already enabled[/yellow]") + raise typer.Exit(0) + + registry.update(workflow_id, {"enabled": True}) + console.print(f"[green]✓[/green] Workflow '{workflow_id}' enabled") + + +@workflow_app.command("disable") +def workflow_disable( + workflow_id: str = typer.Argument(..., help="Workflow ID to disable"), +): + """Disable a workflow without removing it.""" + from .workflows.catalog import WorkflowRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + + metadata = registry.get(workflow_id) + if not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' has corrupted registry metadata") + raise typer.Exit(1) + if not metadata.get("enabled", True): + console.print(f"[yellow]Workflow '{workflow_id}' is already disabled[/yellow]") + raise typer.Exit(0) + + registry.update(workflow_id, {"enabled": False}) + console.print(f"[green]✓[/green] Workflow '{workflow_id}' disabled") + console.print(f"\nTo re-enable: specify workflow enable {workflow_id}") + + @workflow_catalog_app.command("list") def workflow_catalog_list(): """List configured workflow catalog sources.""" diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index da5c60b5c8..ae88e8e5c3 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -12,6 +12,7 @@ import hashlib import json import os +import tempfile import time from dataclasses import dataclass from pathlib import Path @@ -72,20 +73,51 @@ def __init__(self, project_root: Path) -> None: def _load(self) -> dict[str, Any]: """Load registry from disk or create default.""" + default = {"schema_version": self.SCHEMA_VERSION, "workflows": {}} if self.registry_path.exists(): try: with open(self.registry_path, encoding="utf-8") as f: - return json.load(f) + data = json.load(f) except (json.JSONDecodeError, ValueError): - # Corrupted registry file — reset to default - return {"schema_version": self.SCHEMA_VERSION, "workflows": {}} - return {"schema_version": self.SCHEMA_VERSION, "workflows": {}} + return default + if not isinstance(data, dict): + return default + # Normalize 'workflows' to a dict in case the file is corrupted + if not isinstance(data.get("workflows"), dict): + data["workflows"] = {} + return data + return default def save(self) -> None: - """Persist registry to disk.""" + """Persist registry to disk atomically (write to temp, then rename).""" self.workflows_dir.mkdir(parents=True, exist_ok=True) - with open(self.registry_path, "w", encoding="utf-8") as f: - json.dump(self.data, f, indent=2) + + # Capture existing file permissions to preserve after replace + existing_stat = None + if self.registry_path.exists(): + try: + existing_stat = self.registry_path.stat() + except OSError: + pass + + fd, tmp_path = tempfile.mkstemp( + suffix=".json", dir=self.workflows_dir + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(self.data, f, indent=2) + if existing_stat is not None: + try: + os.chmod(tmp_path, existing_stat.st_mode & 0o7777) + except OSError: + pass + os.replace(tmp_path, self.registry_path) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise def add(self, workflow_id: str, metadata: dict[str, Any]) -> None: """Add or update an installed workflow entry.""" @@ -107,6 +139,42 @@ def remove(self, workflow_id: str) -> bool: return True return False + def update(self, workflow_id: str, fields: dict[str, Any]) -> None: + """Update specific fields on an installed workflow entry. + + Raises: + KeyError: If the workflow is not installed. + """ + from datetime import datetime, timezone + + if workflow_id not in self.data["workflows"]: + raise KeyError(f"Workflow '{workflow_id}' is not installed") + + existing = self.data["workflows"][workflow_id] + if not isinstance(existing, dict): + now = datetime.now(timezone.utc).isoformat() + repaired = dict(fields) + repaired.pop("installed_at", None) + repaired.setdefault("installed_at", now) + repaired["updated_at"] = now + self.data["workflows"][workflow_id] = repaired + self.save() + return + installed_at = existing.get("installed_at") + + import copy + safe_fields = copy.deepcopy(fields) + safe_fields.pop("installed_at", None) # always preserve original + now = datetime.now(timezone.utc).isoformat() + existing.update(safe_fields) + # Preserve installed_at only if it's a valid string; otherwise reset + if isinstance(installed_at, str) and installed_at: + existing["installed_at"] = installed_at + else: + existing["installed_at"] = now + existing["updated_at"] = now + self.save() + def get(self, workflow_id: str) -> dict[str, Any] | None: """Get metadata for an installed workflow.""" return self.data["workflows"].get(workflow_id) @@ -412,6 +480,7 @@ def search( self, query: str | None = None, tag: str | None = None, + author: str | None = None, ) -> list[dict[str, Any]]: """Search workflows across all configured catalogs.""" merged = self._get_merged_workflows() @@ -419,6 +488,11 @@ def search( for wf_id, wf_data in merged.items(): wf_data.setdefault("id", wf_id) + if author: + raw_author = wf_data.get("author", "") + normalized_author = raw_author if isinstance(raw_author, str) else "" + if normalized_author.lower() != author.lower(): + continue if query: q = query.lower() searchable = " ".join( diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py index d6a73bbeb0..dad9f167cc 100644 --- a/src/specify_cli/workflows/engine.py +++ b/src/specify_cli/workflows/engine.py @@ -82,6 +82,11 @@ def from_string(cls, content: str) -> WorkflowDefinition: # ID format: lowercase alphanumeric with hyphens _ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$") + +def is_valid_workflow_id(workflow_id: str) -> bool: + """Return True if *workflow_id* matches the allowed ID format.""" + return bool(_ID_PATTERN.fullmatch(workflow_id)) + # Valid step types (matching STEP_REGISTRY keys) def _get_valid_step_types() -> set[str]: """Return valid step types from the registry, with a built-in fallback.""" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4c042fc7d5..22a9fb5fc3 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1639,6 +1639,51 @@ def test_persistence(self, project_dir): registry2 = WorkflowRegistry(project_dir) assert registry2.is_installed("test-wf") + def test_update(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + original_entry = registry.get("test-wf") + original_installed_at = original_entry["installed_at"] + + registry.update("test-wf", {"version": "2.0.0", "enabled": False}) + + entry = registry.get("test-wf") + assert entry["version"] == "2.0.0" + assert entry["enabled"] is False + # Original fields preserved + assert entry["name"] == "Test" + # installed_at preserved, updated_at set + assert entry["installed_at"] == original_installed_at + assert "updated_at" in entry + + def test_update_nonexistent(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + import pytest + + registry = WorkflowRegistry(project_dir) + with pytest.raises(KeyError, match="not installed"): + registry.update("missing", {"version": "2.0.0"}) + assert registry.get("missing") is None + + def test_enable_disable_via_update(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + # Disable + registry.update("test-wf", {"enabled": False}) + entry = registry.get("test-wf") + assert entry["enabled"] is False + + # Enable + registry.update("test-wf", {"enabled": True}) + entry = registry.get("test-wf") + assert entry["enabled"] is True + # ===== Workflow Catalog Tests ===== @@ -1749,6 +1794,39 @@ def test_get_catalog_configs(self, project_dir): assert configs[0]["name"] == "default" assert isinstance(configs[0]["install_allowed"], bool) + def test_search_author_filter(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + + # Mock _get_merged_workflows to return test data + def mock_merged(force_refresh=False): + return { + "wf-a": {"id": "wf-a", "name": "Alpha", "author": "Alice", "tags": []}, + "wf-b": {"id": "wf-b", "name": "Beta", "author": "Bob", "tags": ["sdd"]}, + "wf-c": {"id": "wf-c", "name": "Charlie", "author": "Alice", "tags": ["deploy"]}, + } + + monkeypatch.setattr(catalog, "_get_merged_workflows", mock_merged) + + # Filter by author + results = catalog.search(author="Alice") + assert len(results) == 2 + assert all(r["author"] == "Alice" for r in results) + + # Filter by author (case-insensitive) + results = catalog.search(author="alice") + assert len(results) == 2 + + # Filter by author + tag + results = catalog.search(author="Alice", tag="deploy") + assert len(results) == 1 + assert results[0]["id"] == "wf-c" + + # No match + results = catalog.search(author="Nobody") + assert len(results) == 0 + # ===== Integration Test ===== @@ -1843,3 +1921,208 @@ def test_switch_workflow(self, project_dir): assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results + + +# ===== CLI Integration Tests ===== + + +class TestWorkflowCLI: + """CLI integration tests for workflow commands via CliRunner.""" + + def test_enable_not_installed(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "enable", "nonexistent"]) + assert result.exit_code != 0 + assert "not installed" in result.output + + def test_disable_not_installed(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "disable", "nonexistent"]) + assert result.exit_code != 0 + assert "not installed" in result.output + + def test_enable_disable_roundtrip(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + from specify_cli.workflows.catalog import WorkflowRegistry + + runner = CliRunner() + monkeypatch.chdir(project_dir) + + # Pre-install a workflow in the registry + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + # Disable + result = runner.invoke(app, ["workflow", "disable", "test-wf"]) + assert result.exit_code == 0 + assert "disabled" in result.output + + # Already disabled + result = runner.invoke(app, ["workflow", "disable", "test-wf"]) + assert result.exit_code == 0 + assert "already disabled" in result.output + + # Enable + result = runner.invoke(app, ["workflow", "enable", "test-wf"]) + assert result.exit_code == 0 + assert "enabled" in result.output + + # Already enabled + result = runner.invoke(app, ["workflow", "enable", "test-wf"]) + assert result.exit_code == 0 + assert "already enabled" in result.output + + def test_update_no_project(self, tmp_path, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(tmp_path) + result = runner.invoke(app, ["workflow", "update"]) + assert result.exit_code != 0 + assert "Not a spec-kit project" in result.output + + def test_update_not_installed(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "update", "nonexistent"]) + assert result.exit_code != 0 + assert "not installed" in result.output + + def test_add_no_source(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add"]) + assert result.exit_code != 0 + + def test_add_from_url_without_source(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + # --from with non-HTTPS should be rejected with URL scheme error, not missing source + result = runner.invoke(app, ["workflow", "add", "--from", "http://evil.example.com/wf.yml"]) + assert result.exit_code != 0 + assert "HTTPS" in result.output + + def test_add_dev_missing_path(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add", "--dev", "/nonexistent/path"]) + assert result.exit_code != 0 + assert "not found" in result.output.lower() + + def test_list_shows_disabled(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + from specify_cli.workflows.catalog import WorkflowRegistry + + runner = CliRunner() + monkeypatch.chdir(project_dir) + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test WF", "version": "1.0.0", "enabled": False}) + + result = runner.invoke(app, ["workflow", "list"]) + assert result.exit_code == 0 + assert "disabled" in result.output + + def test_add_dev_and_from_mutually_exclusive(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add", "--dev", "--from", "https://example.com/wf.yml", "./local"]) + assert result.exit_code != 0 + assert "cannot be combined" in result.output + + def test_add_dev_requires_source(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add", "--dev"]) + assert result.exit_code != 0 + assert "--dev requires" in result.output or "path" in result.output.lower() + + def test_update_success_path(self, project_dir, monkeypatch): + """Test workflow update happy path: download, validate, swap, registry update.""" + from typer.testing import CliRunner + from unittest.mock import patch, MagicMock + from specify_cli import app + from specify_cli.workflows.catalog import WorkflowRegistry + + runner = CliRunner() + monkeypatch.chdir(project_dir) + + # Pre-install a workflow + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test WF", "version": "1.0.0", "source": "catalog"}) + + # Create the existing workflow directory + wf_dir = project_dir / ".specify" / "workflows" / "test-wf" + wf_dir.mkdir(parents=True, exist_ok=True) + (wf_dir / "workflow.yml").write_text( + 'schema_version: "1.0"\nworkflow:\n id: test-wf\n name: Test WF\n version: "1.0.0"\nsteps:\n - id: step1\n type: shell\n run: "echo hello"\n' + ) + + # Prepare updated workflow YAML + updated_yaml = ( + 'schema_version: "1.0"\nworkflow:\n id: test-wf\n name: Test WF Updated\n version: "2.0.0"\n' + 'steps:\n - id: step1\n type: shell\n run: "echo hello"\n' + ) + + # Mock catalog to report v2.0.0 available + mock_cat_info = { + "name": "Test WF", + "version": "2.0.0", + "url": "https://example.com/test-wf/workflow.yml", + "_install_allowed": True, + "_catalog_name": "default", + } + + with patch("specify_cli.workflows.catalog.WorkflowCatalog") as MockCatalog, \ + patch("specify_cli._download_validated") as mock_download: + + mock_instance = MagicMock() + mock_instance.get_workflow_info.return_value = mock_cat_info + MockCatalog.return_value = mock_instance + + def fake_download(url, dest): + dest.write_text(updated_yaml) + mock_download.side_effect = fake_download + + result = runner.invoke(app, ["workflow", "update", "test-wf"], input="y\n") + + assert result.exit_code == 0, result.output + assert "2.0.0" in result.output + + # Verify registry was updated + registry2 = WorkflowRegistry(project_dir) + entry = registry2.get("test-wf") + assert entry["version"] == "2.0.0" + + # Verify workflow file was swapped + new_content = (wf_dir / "workflow.yml").read_text() + assert "2.0.0" in new_content