From 89a62b2687c55a658c2602f00c7041a3e92928d7 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 07:40:27 -0500 Subject: [PATCH 01/26] feat(workflows): align CLI commands with extension/preset pattern Add missing workflow CLI commands and flags to match the capabilities already available in `specify extension` and `specify preset`: - `add --dev`: install from local directory for development - `add --from `: install from custom URL - `search --author`: filter search results by author - `update`: update installed workflow(s) to latest version - `enable` / `disable`: toggle workflows without removing them Also adds `WorkflowRegistry.update()` method and shows disabled status in `workflow list` output. Closes #2342 --- src/specify_cli/__init__.py | 313 ++++++++++++++++++++++++++- src/specify_cli/workflows/catalog.py | 11 + tests/test_workflows.py | 71 ++++++ 3 files changed, 393 insertions(+), 2 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 77611128b5..218b09f469 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4844,7 +4844,9 @@ def workflow_list(): console.print("\n[bold cyan]Installed Workflows:[/bold cyan]\n") for wf_id, wf_data in installed.items(): - console.print(f" [bold]{wf_data.get('name', wf_id)}[/bold] ({wf_id}) v{wf_data.get('version', '?')}") + enabled = wf_data.get("enabled", True) + status = "" if enabled else " [yellow](disabled)[/yellow]" + console.print(f" [bold]{wf_data.get('name', wf_id)}[/bold] ({wf_id}) v{wf_data.get('version', '?')}{status}") desc = wf_data.get("description", "") if desc: console.print(f" {desc}") @@ -4854,6 +4856,8 @@ def workflow_list(): @workflow_app.command("add") def workflow_add( source: str = typer.Argument(..., help="Workflow ID, URL, or local path"), + dev: bool = typer.Option(False, "--dev", help="Install from local directory"), + from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), ): """Install a workflow from catalog, URL, or local path.""" from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError @@ -4899,6 +4903,77 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: }) console.print(f"[green]✓[/green] Workflow '{definition.name}' ({definition.id}) installed") + # --dev: install from local directory + if dev: + source_path = Path(source).expanduser().resolve() + if not source_path.exists(): + console.print(f"[red]Error:[/red] Directory not found: {source_path}") + raise typer.Exit(1) + + if source_path.is_file() and source_path.suffix in (".yml", ".yaml"): + _validate_and_install_local(source_path, str(source_path)) + elif source_path.is_dir(): + wf_file = source_path / "workflow.yml" + if not wf_file.exists(): + console.print(f"[red]Error:[/red] No workflow.yml found in {source_path}") + raise typer.Exit(1) + _validate_and_install_local(wf_file, str(source_path)) + else: + console.print(f"[red]Error:[/red] Path is not a YAML file or directory: {source_path}") + raise typer.Exit(1) + return + + # --from: install from custom URL + if from_url: + from ipaddress import ip_address + from urllib.parse import urlparse + from urllib.request import urlopen # noqa: S310 + + parsed_from = urlparse(from_url) + from_host = parsed_from.hostname or "" + is_lb = from_host == "localhost" + if not is_lb: + try: + is_lb = ip_address(from_host).is_loopback + except ValueError: + pass + if parsed_from.scheme != "https" and not (parsed_from.scheme == "http" and is_lb): + console.print("[red]Error:[/red] URL must use HTTPS for security.") + console.print("HTTP is only allowed for localhost URLs.") + raise typer.Exit(1) + + console.print("[yellow]Warning:[/yellow] Installing from external URL.") + console.print("Only install workflows from sources you trust.\n") + + import tempfile + try: + with urlopen(from_url, timeout=30) as resp: # noqa: S310 + final_url = resp.geturl() + final_parsed = urlparse(final_url) + final_host = final_parsed.hostname or "" + final_lb = final_host == "localhost" + if not final_lb: + try: + final_lb = ip_address(final_host).is_loopback + except ValueError: + pass + if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): + console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}") + raise typer.Exit(1) + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp.write(resp.read()) + tmp_path = Path(tmp.name) + except typer.Exit: + raise + except Exception as exc: + console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") + raise typer.Exit(1) + try: + _validate_and_install_local(tmp_path, from_url) + finally: + tmp_path.unlink(missing_ok=True) + return + # Try as URL (http/https) if source.startswith("http://") or source.startswith("https://"): from ipaddress import ip_address @@ -5121,6 +5196,7 @@ def workflow_remove( def workflow_search( query: str | None = typer.Argument(None, help="Search query"), tag: str | None = typer.Option(None, "--tag", help="Filter by tag"), + author: str | None = typer.Option(None, "--author", help="Filter by author"), ): """Search workflow catalogs.""" from .workflows.catalog import WorkflowCatalog, WorkflowCatalogError @@ -5132,7 +5208,7 @@ def workflow_search( catalog = WorkflowCatalog(project_root) try: - results = catalog.search(query=query, tag=tag) + results = catalog.search(query=query, tag=tag, author=author) except WorkflowCatalogError as exc: console.print(f"[red]Error:[/red] {exc}") raise typer.Exit(1) @@ -5226,6 +5302,239 @@ def workflow_info( raise typer.Exit(1) +@workflow_app.command("update") +def workflow_update( + workflow_id: str = typer.Argument(None, help="Workflow ID to update (or all)"), +): + """Update workflow(s) to latest version.""" + from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError + from .workflows.engine import WorkflowDefinition, validate_workflow + from packaging import version as pkg_version + import shutil + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + catalog = WorkflowCatalog(project_root) + workflows_dir = project_root / ".specify" / "workflows" + + installed = registry.list() + if workflow_id: + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + workflows_to_update = [workflow_id] + else: + workflows_to_update = list(installed.keys()) + + if not workflows_to_update: + console.print("[yellow]No workflows installed[/yellow]") + raise typer.Exit(0) + + console.print("🔄 Checking for updates...\n") + + updates_available = [] + for wf_id in workflows_to_update: + metadata = installed.get(wf_id, {}) + installed_ver_str = metadata.get("version", "0.0.0") + try: + installed_version = pkg_version.Version(installed_ver_str) + except pkg_version.InvalidVersion: + console.print(f"⚠ {wf_id}: Invalid installed version '{installed_ver_str}' (skipping)") + continue + + try: + cat_info = catalog.get_workflow_info(wf_id) + except WorkflowCatalogError: + cat_info = None + + if not cat_info: + console.print(f"⚠ {wf_id}: Not found in catalog (skipping)") + continue + + if not cat_info.get("_install_allowed", True): + console.print(f"⚠ {wf_id}: Updates not allowed from '{cat_info.get('_catalog_name', 'catalog')}' (skipping)") + continue + + cat_ver_str = cat_info.get("version", "0.0.0") + try: + catalog_version = pkg_version.Version(cat_ver_str) + except pkg_version.InvalidVersion: + console.print(f"⚠ {wf_id}: Invalid catalog version '{cat_ver_str}' (skipping)") + continue + + if catalog_version > installed_version: + updates_available.append({ + "id": wf_id, + "name": cat_info.get("name", wf_id), + "installed": str(installed_version), + "available": str(catalog_version), + "url": cat_info.get("url"), + "catalog_name": cat_info.get("_catalog_name", ""), + }) + else: + console.print(f"✓ {wf_id}: Up to date (v{installed_version})") + + if not updates_available: + console.print("\n[green]All workflows are up to date![/green]") + raise typer.Exit(0) + + console.print("\n[bold]Updates available:[/bold]\n") + for update in updates_available: + console.print(f" • {update['id']}: {update['installed']} → {update['available']}") + + console.print() + confirm = typer.confirm("Update these workflows?") + if not confirm: + console.print("Cancelled") + raise typer.Exit(0) + + console.print() + for update in updates_available: + wf_id = update["id"] + wf_url = update.get("url") + if not wf_url: + console.print(f"⚠ {wf_id}: No download URL in catalog (skipping)") + continue + + console.print(f"📦 Updating {update['name']}...") + + from ipaddress import ip_address + from urllib.parse import urlparse + from urllib.request import urlopen # noqa: S310 + + parsed_url = urlparse(wf_url) + url_host = parsed_url.hostname or "" + is_loopback = url_host == "localhost" + if not is_loopback: + try: + is_loopback = ip_address(url_host).is_loopback + except ValueError: + pass + if parsed_url.scheme != "https" and not (parsed_url.scheme == "http" and is_loopback): + console.print(f"⚠ {wf_id}: Invalid URL scheme (skipping)") + continue + + wf_dir = workflows_dir / wf_id + backup_dir = workflows_dir / ".backup" / wf_id + try: + # Backup existing + if wf_dir.exists(): + backup_dir.parent.mkdir(parents=True, exist_ok=True) + if backup_dir.exists(): + shutil.rmtree(backup_dir) + shutil.copytree(wf_dir, backup_dir) + + # Download new version + wf_dir.mkdir(parents=True, exist_ok=True) + with urlopen(wf_url, timeout=30) as resp: # noqa: S310 + final_url = resp.geturl() + final_parsed = urlparse(final_url) + final_host = final_parsed.hostname or "" + final_lb = final_host == "localhost" + if not final_lb: + try: + final_lb = ip_address(final_host).is_loopback + except ValueError: + pass + if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): + raise ValueError(f"Redirected to non-HTTPS: {final_url}") + wf_file = wf_dir / "workflow.yml" + wf_file.write_bytes(resp.read()) + + # Validate + definition = WorkflowDefinition.from_yaml(wf_file) + errors = validate_workflow(definition) + if errors: + raise ValueError(f"Validation failed: {'; '.join(errors)}") + + # Update registry + registry.update(wf_id, { + "version": update["available"], + "name": definition.name or update["name"], + "description": definition.description or "", + }) + console.print(f" [green]✓[/green] {wf_id}: {update['installed']} → {update['available']}") + + # Clean up backup + if backup_dir.exists(): + shutil.rmtree(backup_dir) + + except Exception as exc: + console.print(f" [red]✗[/red] {wf_id}: {exc}") + # Restore from backup + if backup_dir.exists(): + if wf_dir.exists(): + shutil.rmtree(wf_dir) + shutil.move(str(backup_dir), str(wf_dir)) + + # Clean up backup directory if empty + backup_parent = workflows_dir / ".backup" + if backup_parent.exists() and not any(backup_parent.iterdir()): + backup_parent.rmdir() + + +@workflow_app.command("enable") +def workflow_enable( + workflow_id: str = typer.Argument(..., help="Workflow ID to enable"), +): + """Enable a disabled workflow.""" + from .workflows.catalog import WorkflowRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + + metadata = registry.get(workflow_id) + if metadata.get("enabled", True): + console.print(f"[yellow]Workflow '{workflow_id}' is already enabled[/yellow]") + raise typer.Exit(0) + + registry.update(workflow_id, {"enabled": True}) + console.print(f"[green]✓[/green] Workflow '{workflow_id}' enabled") + + +@workflow_app.command("disable") +def workflow_disable( + workflow_id: str = typer.Argument(..., help="Workflow ID to disable"), +): + """Disable a workflow without removing it.""" + from .workflows.catalog import WorkflowRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + + metadata = registry.get(workflow_id) + if not metadata.get("enabled", True): + console.print(f"[yellow]Workflow '{workflow_id}' is already disabled[/yellow]") + raise typer.Exit(0) + + registry.update(workflow_id, {"enabled": False}) + console.print(f"[green]✓[/green] Workflow '{workflow_id}' disabled") + console.print(f"\nTo re-enable: specify workflow enable {workflow_id}") + + @workflow_catalog_app.command("list") def workflow_catalog_list(): """List configured workflow catalog sources.""" diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index da5c60b5c8..beb27c90a0 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -107,6 +107,13 @@ def remove(self, workflow_id: str) -> bool: return True return False + def update(self, workflow_id: str, fields: dict[str, Any]) -> None: + """Update specific fields on an installed workflow entry.""" + if workflow_id not in self.data["workflows"]: + return + self.data["workflows"][workflow_id].update(fields) + self.save() + def get(self, workflow_id: str) -> dict[str, Any] | None: """Get metadata for an installed workflow.""" return self.data["workflows"].get(workflow_id) @@ -412,6 +419,7 @@ def search( self, query: str | None = None, tag: str | None = None, + author: str | None = None, ) -> list[dict[str, Any]]: """Search workflows across all configured catalogs.""" merged = self._get_merged_workflows() @@ -419,6 +427,9 @@ def search( for wf_id, wf_data in merged.items(): wf_data.setdefault("id", wf_id) + if author: + if wf_data.get("author", "").lower() != author.lower(): + continue if query: q = query.lower() searchable = " ".join( diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4c042fc7d5..6eee624d9d 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1639,6 +1639,44 @@ def test_persistence(self, project_dir): registry2 = WorkflowRegistry(project_dir) assert registry2.is_installed("test-wf") + def test_update(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + registry.update("test-wf", {"version": "2.0.0", "enabled": False}) + + entry = registry.get("test-wf") + assert entry["version"] == "2.0.0" + assert entry["enabled"] is False + # Original fields preserved + assert entry["name"] == "Test" + + def test_update_nonexistent(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + # Should not raise, just no-op + registry.update("missing", {"version": "2.0.0"}) + assert registry.get("missing") is None + + def test_enable_disable_via_update(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + # Disable + registry.update("test-wf", {"enabled": False}) + entry = registry.get("test-wf") + assert entry["enabled"] is False + + # Enable + registry.update("test-wf", {"enabled": True}) + entry = registry.get("test-wf") + assert entry["enabled"] is True + # ===== Workflow Catalog Tests ===== @@ -1749,6 +1787,39 @@ def test_get_catalog_configs(self, project_dir): assert configs[0]["name"] == "default" assert isinstance(configs[0]["install_allowed"], bool) + def test_search_author_filter(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + + # Mock _get_merged_workflows to return test data + def mock_merged(force_refresh=False): + return { + "wf-a": {"id": "wf-a", "name": "Alpha", "author": "Alice", "tags": []}, + "wf-b": {"id": "wf-b", "name": "Beta", "author": "Bob", "tags": ["sdd"]}, + "wf-c": {"id": "wf-c", "name": "Charlie", "author": "Alice", "tags": ["deploy"]}, + } + + monkeypatch.setattr(catalog, "_get_merged_workflows", mock_merged) + + # Filter by author + results = catalog.search(author="Alice") + assert len(results) == 2 + assert all(r["author"] == "Alice" for r in results) + + # Filter by author (case-insensitive) + results = catalog.search(author="alice") + assert len(results) == 2 + + # Filter by author + tag + results = catalog.search(author="Alice", tag="deploy") + assert len(results) == 1 + assert results[0]["id"] == "wf-c" + + # No match + results = catalog.search(author="Nobody") + assert len(results) == 0 + # ===== Integration Test ===== From 317830c3b8c19abff52dc4f2192311c8356f36e4 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 09:08:37 -0500 Subject: [PATCH 02/26] fix: address PR review feedback from Copilot - Defensive author field handling in WorkflowCatalog.search(): coerce non-string author values to empty string before comparison - Make 'source' argument optional in workflow add when --from is used - Fix type hint: workflow_id in update command is Optional[str] - Add workflow ID mismatch check after download in workflow update - Use temp dir for atomic update: download+validate in staging dir, then swap into place (no partial state on failure) - WorkflowRegistry.update() now sets updated_at timestamp and preserves installed_at - Add 9 CLI integration tests via CliRunner (enable/disable roundtrip, update error paths, add validation, list disabled status) --- src/specify_cli/__init__.py | 86 +++++++++--------- src/specify_cli/workflows/catalog.py | 15 +++- tests/test_workflows.py | 130 +++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 47 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 218b09f469..46647a850b 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4855,7 +4855,7 @@ def workflow_list(): @workflow_app.command("add") def workflow_add( - source: str = typer.Argument(..., help="Workflow ID, URL, or local path"), + source: Optional[str] = typer.Argument(None, help="Workflow ID, URL, or local path"), dev: bool = typer.Option(False, "--dev", help="Install from local directory"), from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), ): @@ -4863,6 +4863,11 @@ def workflow_add( from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError from .workflows.engine import WorkflowDefinition + # Validate that at least one source is provided + if not source and not from_url: + console.print("[red]Error:[/red] Provide a workflow ID/URL/path, or use --from ") + raise typer.Exit(1) + project_root = Path.cwd() specify_dir = project_root / ".specify" if not specify_dir.exists(): @@ -5304,13 +5309,14 @@ def workflow_info( @workflow_app.command("update") def workflow_update( - workflow_id: str = typer.Argument(None, help="Workflow ID to update (or all)"), + workflow_id: Optional[str] = typer.Argument(None, help="Workflow ID to update (or all)"), ): """Update workflow(s) to latest version.""" from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError from .workflows.engine import WorkflowDefinition, validate_workflow from packaging import version as pkg_version import shutil + import tempfile project_root = Path.cwd() specify_dir = project_root / ".specify" @@ -5420,37 +5426,41 @@ def workflow_update( continue wf_dir = workflows_dir / wf_id - backup_dir = workflows_dir / ".backup" / wf_id try: - # Backup existing - if wf_dir.exists(): - backup_dir.parent.mkdir(parents=True, exist_ok=True) - if backup_dir.exists(): - shutil.rmtree(backup_dir) - shutil.copytree(wf_dir, backup_dir) - - # Download new version - wf_dir.mkdir(parents=True, exist_ok=True) - with urlopen(wf_url, timeout=30) as resp: # noqa: S310 - final_url = resp.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_lb = final_host == "localhost" - if not final_lb: - try: - final_lb = ip_address(final_host).is_loopback - except ValueError: - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): - raise ValueError(f"Redirected to non-HTTPS: {final_url}") - wf_file = wf_dir / "workflow.yml" - wf_file.write_bytes(resp.read()) + # Download and validate in a temporary directory so failures do not + # leave a partially-created workflow directory behind. + with tempfile.TemporaryDirectory(prefix=f"{wf_id}-", dir=workflows_dir) as tmp_dir: + staged_dir = Path(tmp_dir) / wf_id + staged_dir.mkdir(parents=True, exist_ok=True) + with urlopen(wf_url, timeout=30) as resp: # noqa: S310 + final_url = resp.geturl() + final_parsed = urlparse(final_url) + final_host = final_parsed.hostname or "" + final_lb = final_host == "localhost" + if not final_lb: + try: + final_lb = ip_address(final_host).is_loopback + except ValueError: + pass + if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): + raise ValueError(f"Redirected to non-HTTPS: {final_url}") + wf_file = staged_dir / "workflow.yml" + wf_file.write_bytes(resp.read()) + + # Validate staged contents before replacing the live workflow + definition = WorkflowDefinition.from_yaml(wf_file) + if definition.id != wf_id: + raise ValueError( + f"Workflow ID mismatch: expected '{wf_id}', got '{definition.id}'" + ) + errors = validate_workflow(definition) + if errors: + raise ValueError(f"Validation failed: {'; '.join(errors)}") - # Validate - definition = WorkflowDefinition.from_yaml(wf_file) - errors = validate_workflow(definition) - if errors: - raise ValueError(f"Validation failed: {'; '.join(errors)}") + # Swap: remove old, move staged into place + if wf_dir.exists(): + shutil.rmtree(wf_dir) + shutil.move(str(staged_dir), str(wf_dir)) # Update registry registry.update(wf_id, { @@ -5460,22 +5470,8 @@ def workflow_update( }) console.print(f" [green]✓[/green] {wf_id}: {update['installed']} → {update['available']}") - # Clean up backup - if backup_dir.exists(): - shutil.rmtree(backup_dir) - except Exception as exc: console.print(f" [red]✗[/red] {wf_id}: {exc}") - # Restore from backup - if backup_dir.exists(): - if wf_dir.exists(): - shutil.rmtree(wf_dir) - shutil.move(str(backup_dir), str(wf_dir)) - - # Clean up backup directory if empty - backup_parent = workflows_dir / ".backup" - if backup_parent.exists() and not any(backup_parent.iterdir()): - backup_parent.rmdir() @workflow_app.command("enable") diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index beb27c90a0..de2b35785b 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -109,9 +109,18 @@ def remove(self, workflow_id: str) -> bool: def update(self, workflow_id: str, fields: dict[str, Any]) -> None: """Update specific fields on an installed workflow entry.""" + from datetime import datetime, timezone + if workflow_id not in self.data["workflows"]: return - self.data["workflows"][workflow_id].update(fields) + + existing = self.data["workflows"][workflow_id] + installed_at = existing.get("installed_at") + + existing.update(fields) + if installed_at is not None and "installed_at" not in fields: + existing["installed_at"] = installed_at + existing["updated_at"] = datetime.now(timezone.utc).isoformat() self.save() def get(self, workflow_id: str) -> dict[str, Any] | None: @@ -428,7 +437,9 @@ def search( for wf_id, wf_data in merged.items(): wf_data.setdefault("id", wf_id) if author: - if wf_data.get("author", "").lower() != author.lower(): + raw_author = wf_data.get("author", "") + normalized_author = raw_author if isinstance(raw_author, str) else "" + if normalized_author.lower() != author.lower(): continue if query: q = query.lower() diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 6eee624d9d..78857e7e3e 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1645,6 +1645,9 @@ def test_update(self, project_dir): registry = WorkflowRegistry(project_dir) registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + original_entry = registry.get("test-wf") + original_installed_at = original_entry["installed_at"] + registry.update("test-wf", {"version": "2.0.0", "enabled": False}) entry = registry.get("test-wf") @@ -1652,6 +1655,9 @@ def test_update(self, project_dir): assert entry["enabled"] is False # Original fields preserved assert entry["name"] == "Test" + # installed_at preserved, updated_at set + assert entry["installed_at"] == original_installed_at + assert "updated_at" in entry def test_update_nonexistent(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry @@ -1914,3 +1920,127 @@ def test_switch_workflow(self, project_dir): assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results + + +# ===== CLI Integration Tests ===== + + +class TestWorkflowCLI: + """CLI integration tests for workflow commands via CliRunner.""" + + def test_enable_not_installed(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "enable", "nonexistent"]) + assert result.exit_code != 0 + assert "not installed" in result.output + + def test_disable_not_installed(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "disable", "nonexistent"]) + assert result.exit_code != 0 + assert "not installed" in result.output + + def test_enable_disable_roundtrip(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + from specify_cli.workflows.catalog import WorkflowRegistry + + runner = CliRunner() + monkeypatch.chdir(project_dir) + + # Pre-install a workflow in the registry + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + # Disable + result = runner.invoke(app, ["workflow", "disable", "test-wf"]) + assert result.exit_code == 0 + assert "disabled" in result.output + + # Already disabled + result = runner.invoke(app, ["workflow", "disable", "test-wf"]) + assert result.exit_code == 0 + assert "already disabled" in result.output + + # Enable + result = runner.invoke(app, ["workflow", "enable", "test-wf"]) + assert result.exit_code == 0 + assert "enabled" in result.output + + # Already enabled + result = runner.invoke(app, ["workflow", "enable", "test-wf"]) + assert result.exit_code == 0 + assert "already enabled" in result.output + + def test_update_no_project(self, tmp_path, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(tmp_path) + result = runner.invoke(app, ["workflow", "update"]) + assert result.exit_code != 0 + assert "Not a spec-kit project" in result.output + + def test_update_not_installed(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "update", "nonexistent"]) + assert result.exit_code != 0 + assert "not installed" in result.output + + def test_add_no_source(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add"]) + assert result.exit_code != 0 + + def test_add_from_url_without_source(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + # --from with non-HTTPS should be rejected with URL scheme error, not missing source + result = runner.invoke(app, ["workflow", "add", "--from", "http://evil.example.com/wf.yml"]) + assert result.exit_code != 0 + assert "HTTPS" in result.output + + def test_add_dev_missing_path(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add", "--dev", "/nonexistent/path"]) + assert result.exit_code != 0 + assert "not found" in result.output.lower() or "Error" in result.output + + def test_list_shows_disabled(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + from specify_cli.workflows.catalog import WorkflowRegistry + + runner = CliRunner() + monkeypatch.chdir(project_dir) + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test WF", "version": "1.0.0", "enabled": False}) + + result = runner.invoke(app, ["workflow", "list"]) + assert result.exit_code == 0 + assert "disabled" in result.output From 9d07773863fd98c344837ec517cef0a40f621c56 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 10:17:30 -0500 Subject: [PATCH 03/26] fix: address second round of PR review feedback - Add mutual exclusion: --dev cannot be combined with --from, and --dev requires a positional source path - Atomic swap with backup/rollback in workflow update: rename old dir to .bak before moving staged dir in; restore on failure - Exit 1 if any workflow update fails (track had_failures) - Add CLI tests for --dev/--from mutual exclusion --- src/specify_cli/__init__.py | 34 ++++++++++++++++++++++++++++++---- tests/test_workflows.py | 19 +++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 46647a850b..9093ce70da 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4863,10 +4863,16 @@ def workflow_add( from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError from .workflows.engine import WorkflowDefinition - # Validate that at least one source is provided + # Validate source selection and disallow incompatible options if not source and not from_url: console.print("[red]Error:[/red] Provide a workflow ID/URL/path, or use --from ") raise typer.Exit(1) + if dev and from_url: + console.print("[red]Error:[/red] --dev cannot be combined with --from") + raise typer.Exit(1) + if dev and not source: + console.print("[red]Error:[/red] --dev requires a local workflow path") + raise typer.Exit(1) project_root = Path.cwd() specify_dir = project_root / ".specify" @@ -5400,6 +5406,7 @@ def workflow_update( raise typer.Exit(0) console.print() + had_failures = False for update in updates_available: wf_id = update["id"] wf_url = update.get("url") @@ -5457,10 +5464,25 @@ def workflow_update( if errors: raise ValueError(f"Validation failed: {'; '.join(errors)}") - # Swap: remove old, move staged into place + # Atomic swap: move old to backup, move staged in, remove backup on success + backup_dir = None if wf_dir.exists(): - shutil.rmtree(wf_dir) - shutil.move(str(staged_dir), str(wf_dir)) + backup_dir = wf_dir.with_name(f"{wf_id}.bak") + if backup_dir.exists(): + shutil.rmtree(backup_dir) + wf_dir.rename(backup_dir) + try: + shutil.move(str(staged_dir), str(wf_dir)) + except Exception: + # Restore from backup if swap fails + if backup_dir and backup_dir.exists(): + if wf_dir.exists(): + shutil.rmtree(wf_dir) + backup_dir.rename(wf_dir) + raise + # Clean up backup after successful swap + if backup_dir and backup_dir.exists(): + shutil.rmtree(backup_dir) # Update registry registry.update(wf_id, { @@ -5472,6 +5494,10 @@ def workflow_update( except Exception as exc: console.print(f" [red]✗[/red] {wf_id}: {exc}") + had_failures = True + + if had_failures: + raise typer.Exit(1) @workflow_app.command("enable") diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 78857e7e3e..8ee63690b0 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -2044,3 +2044,22 @@ def test_list_shows_disabled(self, project_dir, monkeypatch): result = runner.invoke(app, ["workflow", "list"]) assert result.exit_code == 0 assert "disabled" in result.output + + def test_add_dev_and_from_mutually_exclusive(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add", "--dev", "--from", "https://example.com/wf.yml", "./local"]) + assert result.exit_code != 0 + assert "cannot be combined" in result.output + + def test_add_dev_requires_source(self, project_dir, monkeypatch): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + monkeypatch.chdir(project_dir) + result = runner.invoke(app, ["workflow", "add", "--dev", "--from", "https://example.com/wf.yml"]) + assert result.exit_code != 0 From 605f71058a92e1a2df9c537d234f1b6bd916021e Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 10:35:36 -0500 Subject: [PATCH 04/26] fix: address third round of PR review feedback - Validate registry entries are dicts before accessing fields (skip malformed entries gracefully) - Add path traversal check in workflow update (resolve + relative_to) - Extract shared URL helpers (_is_loopback_host, _validate_url_scheme, _download_validated) to DRY duplicated HTTPS/redirect validation across workflow add (--from, direct URL, catalog) and workflow update - Add success-path CLI test for workflow update with mocked catalog and download (verifies directory swap + registry metadata update) --- src/specify_cli/__init__.py | 190 ++++++++++++------------------------ tests/test_workflows.py | 61 ++++++++++++ 2 files changed, 126 insertions(+), 125 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 9093ce70da..26a5a007f0 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4853,6 +4853,41 @@ def workflow_list(): console.print() +def _is_loopback_host(hostname: str) -> bool: + """Check if a hostname is a loopback address.""" + from ipaddress import ip_address + + if hostname == "localhost": + return True + try: + return ip_address(hostname).is_loopback + except ValueError: + return False + + +def _validate_url_scheme(url: str) -> None: + """Raise ValueError if URL is not HTTPS (or HTTP for localhost).""" + from urllib.parse import urlparse + + parsed = urlparse(url) + host = parsed.hostname or "" + if parsed.scheme != "https" and not (parsed.scheme == "http" and _is_loopback_host(host)): + raise ValueError(f"Non-HTTPS URL not allowed: {url}") + + +def _download_validated(source_url: str, destination: Path) -> None: + """Download a URL to *destination*, validating HTTPS on redirects.""" + from urllib.request import urlopen # noqa: S310 + + with urlopen(source_url, timeout=30) as resp: # noqa: S310 + final_url = resp.geturl() + try: + _validate_url_scheme(final_url) + except ValueError: + raise ValueError(f"Redirected to non-HTTPS: {final_url}") + destination.write_bytes(resp.read()) + + @workflow_app.command("add") def workflow_add( source: Optional[str] = typer.Argument(None, help="Workflow ID, URL, or local path"), @@ -4936,19 +4971,9 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: # --from: install from custom URL if from_url: - from ipaddress import ip_address - from urllib.parse import urlparse - from urllib.request import urlopen # noqa: S310 - - parsed_from = urlparse(from_url) - from_host = parsed_from.hostname or "" - is_lb = from_host == "localhost" - if not is_lb: - try: - is_lb = ip_address(from_host).is_loopback - except ValueError: - pass - if parsed_from.scheme != "https" and not (parsed_from.scheme == "http" and is_lb): + try: + _validate_url_scheme(from_url) + except ValueError: console.print("[red]Error:[/red] URL must use HTTPS for security.") console.print("HTTP is only allowed for localhost URLs.") raise typer.Exit(1) @@ -4958,22 +4983,9 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: import tempfile try: - with urlopen(from_url, timeout=30) as resp: # noqa: S310 - final_url = resp.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_lb = final_host == "localhost" - if not final_lb: - try: - final_lb = ip_address(final_host).is_loopback - except ValueError: - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): - console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}") - raise typer.Exit(1) - with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: - tmp.write(resp.read()) - tmp_path = Path(tmp.name) + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp_path = Path(tmp.name) + _download_validated(from_url, tmp_path) except typer.Exit: raise except Exception as exc: @@ -4987,42 +4999,17 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: # Try as URL (http/https) if source.startswith("http://") or source.startswith("https://"): - from ipaddress import ip_address - from urllib.parse import urlparse - from urllib.request import urlopen # noqa: S310 - - parsed_src = urlparse(source) - src_host = parsed_src.hostname or "" - src_loopback = src_host == "localhost" - if not src_loopback: - try: - src_loopback = ip_address(src_host).is_loopback - except ValueError: - # Host is not an IP literal (e.g., a DNS name); keep default non-loopback. - pass - if parsed_src.scheme != "https" and not (parsed_src.scheme == "http" and src_loopback): + try: + _validate_url_scheme(source) + except ValueError: console.print("[red]Error:[/red] Only HTTPS URLs are allowed, except HTTP for localhost.") raise typer.Exit(1) import tempfile try: - with urlopen(source, timeout=30) as resp: # noqa: S310 - final_url = resp.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_lb = final_host == "localhost" - if not final_lb: - try: - final_lb = ip_address(final_host).is_loopback - except ValueError: - # Redirect host is not an IP literal; keep loopback as determined above. - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): - console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}") - raise typer.Exit(1) - with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: - tmp.write(resp.read()) - tmp_path = Path(tmp.name) + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp_path = Path(tmp.name) + _download_validated(source, tmp_path) except typer.Exit: raise except Exception as exc: @@ -5071,21 +5058,9 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) # Validate URL scheme (HTTPS required, HTTP allowed for localhost only) - from ipaddress import ip_address - from urllib.parse import urlparse - - parsed_url = urlparse(workflow_url) - url_host = parsed_url.hostname or "" - is_loopback = False - if url_host == "localhost": - is_loopback = True - else: - try: - is_loopback = ip_address(url_host).is_loopback - except ValueError: - # Host is not an IP literal (e.g., a regular hostname); treat as non-loopback. - pass - if parsed_url.scheme != "https" and not (parsed_url.scheme == "http" and is_loopback): + try: + _validate_url_scheme(workflow_url) + except ValueError: console.print( f"[red]Error:[/red] Workflow '{source}' has an invalid install URL. " "Only HTTPS URLs are allowed, except HTTP for localhost/loopback." @@ -5102,30 +5077,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: workflow_file = workflow_dir / "workflow.yml" try: - from urllib.request import urlopen # noqa: S310 — URL comes from catalog - workflow_dir.mkdir(parents=True, exist_ok=True) - with urlopen(workflow_url, timeout=30) as response: # noqa: S310 - # Validate final URL after redirects - final_url = response.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_loopback = final_host == "localhost" - if not final_loopback: - try: - final_loopback = ip_address(final_host).is_loopback - except ValueError: - # Host is not an IP literal (e.g., a regular hostname); treat as non-loopback. - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_loopback): - if workflow_dir.exists(): - import shutil - shutil.rmtree(workflow_dir, ignore_errors=True) - console.print( - f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}" - ) - raise typer.Exit(1) - workflow_file.write_bytes(response.read()) + _download_validated(workflow_url, workflow_file) except Exception as exc: if workflow_dir.exists(): import shutil @@ -5352,6 +5305,9 @@ def workflow_update( updates_available = [] for wf_id in workflows_to_update: metadata = installed.get(wf_id, {}) + if not isinstance(metadata, dict): + console.print(f"⚠ {wf_id}: Malformed workflow registry entry (skipping)") + continue installed_ver_str = metadata.get("version", "0.0.0") try: installed_version = pkg_version.Version(installed_ver_str) @@ -5416,43 +5372,27 @@ def workflow_update( console.print(f"📦 Updating {update['name']}...") - from ipaddress import ip_address - from urllib.parse import urlparse - from urllib.request import urlopen # noqa: S310 - - parsed_url = urlparse(wf_url) - url_host = parsed_url.hostname or "" - is_loopback = url_host == "localhost" - if not is_loopback: - try: - is_loopback = ip_address(url_host).is_loopback - except ValueError: - pass - if parsed_url.scheme != "https" and not (parsed_url.scheme == "http" and is_loopback): + try: + _validate_url_scheme(wf_url) + except ValueError: console.print(f"⚠ {wf_id}: Invalid URL scheme (skipping)") continue wf_dir = workflows_dir / wf_id + # Validate that wf_id is a safe directory name (no path traversal) + try: + wf_dir.resolve().relative_to(workflows_dir.resolve()) + except ValueError: + console.print(f"⚠ {wf_id}: Invalid workflow ID (skipping)") + continue try: # Download and validate in a temporary directory so failures do not # leave a partially-created workflow directory behind. with tempfile.TemporaryDirectory(prefix=f"{wf_id}-", dir=workflows_dir) as tmp_dir: staged_dir = Path(tmp_dir) / wf_id staged_dir.mkdir(parents=True, exist_ok=True) - with urlopen(wf_url, timeout=30) as resp: # noqa: S310 - final_url = resp.geturl() - final_parsed = urlparse(final_url) - final_host = final_parsed.hostname or "" - final_lb = final_host == "localhost" - if not final_lb: - try: - final_lb = ip_address(final_host).is_loopback - except ValueError: - pass - if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): - raise ValueError(f"Redirected to non-HTTPS: {final_url}") - wf_file = staged_dir / "workflow.yml" - wf_file.write_bytes(resp.read()) + wf_file = staged_dir / "workflow.yml" + _download_validated(wf_url, wf_file) # Validate staged contents before replacing the live workflow definition = WorkflowDefinition.from_yaml(wf_file) diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 8ee63690b0..5cdd42c657 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -2063,3 +2063,64 @@ def test_add_dev_requires_source(self, project_dir, monkeypatch): monkeypatch.chdir(project_dir) result = runner.invoke(app, ["workflow", "add", "--dev", "--from", "https://example.com/wf.yml"]) assert result.exit_code != 0 + + def test_update_success_path(self, project_dir, monkeypatch): + """Test workflow update happy path: download, validate, swap, registry update.""" + from typer.testing import CliRunner + from unittest.mock import patch, MagicMock + from specify_cli import app + from specify_cli.workflows.catalog import WorkflowRegistry + + runner = CliRunner() + monkeypatch.chdir(project_dir) + + # Pre-install a workflow + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test WF", "version": "1.0.0", "source": "catalog"}) + + # Create the existing workflow directory + wf_dir = project_dir / ".specify" / "workflows" / "test-wf" + wf_dir.mkdir(parents=True, exist_ok=True) + (wf_dir / "workflow.yml").write_text( + 'schema_version: "1.0"\nworkflow:\n id: test-wf\n name: Test WF\n version: "1.0.0"\nsteps:\n - id: step1\n type: shell\n run: "echo hello"\n' + ) + + # Prepare updated workflow YAML + updated_yaml = ( + 'schema_version: "1.0"\nworkflow:\n id: test-wf\n name: Test WF Updated\n version: "2.0.0"\n' + 'steps:\n - id: step1\n type: shell\n run: "echo hello"\n' + ) + + # Mock catalog to report v2.0.0 available + mock_cat_info = { + "name": "Test WF", + "version": "2.0.0", + "url": "https://example.com/test-wf/workflow.yml", + "_install_allowed": True, + "_catalog_name": "default", + } + + with patch("specify_cli.workflows.catalog.WorkflowCatalog") as MockCatalog, \ + patch("specify_cli._download_validated") as mock_download: + + mock_instance = MagicMock() + mock_instance.get_workflow_info.return_value = mock_cat_info + MockCatalog.return_value = mock_instance + + def fake_download(url, dest): + dest.write_text(updated_yaml) + mock_download.side_effect = fake_download + + result = runner.invoke(app, ["workflow", "update", "test-wf"], input="y\n") + + assert result.exit_code == 0, result.output + assert "2.0.0" in result.output + + # Verify registry was updated + registry2 = WorkflowRegistry(project_dir) + entry = registry2.get("test-wf") + assert entry["version"] == "2.0.0" + + # Verify workflow file was swapped + new_content = (wf_dir / "workflow.yml").read_text() + assert "2.0.0" in new_content From 71c5171d9c1aa2b9c3ef9692db124dda79efd793 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:07:20 -0500 Subject: [PATCH 05/26] fix: address fourth round of PR review feedback - Guard workflow_list against malformed (non-dict) registry entries - Validate URL has hostname in _validate_url_scheme (reject hostless URLs) - Fix --dev error message: 'Path not found' instead of 'Directory not found' - Ensure workflows_dir exists before creating temp dir in update - Keep backup until after registry.update() succeeds; restore on failure - Guard Registry.update() against non-dict entries (reset to fresh dict) --- src/specify_cli/__init__.py | 40 ++++++++++++++++++++-------- src/specify_cli/workflows/catalog.py | 5 ++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 26a5a007f0..8e23e9f92b 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4844,6 +4844,12 @@ def workflow_list(): console.print("\n[bold cyan]Installed Workflows:[/bold cyan]\n") for wf_id, wf_data in installed.items(): + if not isinstance(wf_data, dict): + console.print( + f" [yellow]Warning:[/yellow] Skipping malformed workflow metadata for '{wf_id}' " + f"(expected object, got {type(wf_data).__name__})" + ) + continue enabled = wf_data.get("enabled", True) status = "" if enabled else " [yellow](disabled)[/yellow]" console.print(f" [bold]{wf_data.get('name', wf_id)}[/bold] ({wf_id}) v{wf_data.get('version', '?')}{status}") @@ -4870,7 +4876,9 @@ def _validate_url_scheme(url: str) -> None: from urllib.parse import urlparse parsed = urlparse(url) - host = parsed.hostname or "" + if not parsed.hostname: + raise ValueError(f"URL has no hostname: {url}") + host = parsed.hostname if parsed.scheme != "https" and not (parsed.scheme == "http" and _is_loopback_host(host)): raise ValueError(f"Non-HTTPS URL not allowed: {url}") @@ -4953,7 +4961,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if dev: source_path = Path(source).expanduser().resolve() if not source_path.exists(): - console.print(f"[red]Error:[/red] Directory not found: {source_path}") + console.print(f"[red]Error:[/red] Path not found: {source_path}") raise typer.Exit(1) if source_path.is_file() and source_path.suffix in (".yml", ".yaml"): @@ -5386,6 +5394,8 @@ def workflow_update( console.print(f"⚠ {wf_id}: Invalid workflow ID (skipping)") continue try: + # Ensure workflows directory exists before creating temp dir + workflows_dir.mkdir(parents=True, exist_ok=True) # Download and validate in a temporary directory so failures do not # leave a partially-created workflow directory behind. with tempfile.TemporaryDirectory(prefix=f"{wf_id}-", dir=workflows_dir) as tmp_dir: @@ -5420,16 +5430,24 @@ def workflow_update( shutil.rmtree(wf_dir) backup_dir.rename(wf_dir) raise - # Clean up backup after successful swap - if backup_dir and backup_dir.exists(): - shutil.rmtree(backup_dir) - # Update registry - registry.update(wf_id, { - "version": update["available"], - "name": definition.name or update["name"], - "description": definition.description or "", - }) + # Update registry (backup still exists until this succeeds) + try: + registry.update(wf_id, { + "version": update["available"], + "name": definition.name or update["name"], + "description": definition.description or "", + }) + except Exception: + # Restore from backup if registry update fails + if backup_dir and backup_dir.exists(): + if wf_dir.exists(): + shutil.rmtree(wf_dir) + backup_dir.rename(wf_dir) + raise + # Clean up backup after fully successful update + if backup_dir and backup_dir.exists(): + shutil.rmtree(backup_dir) console.print(f" [green]✓[/green] {wf_id}: {update['installed']} → {update['available']}") except Exception as exc: diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index de2b35785b..e25ec0ade5 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -115,6 +115,11 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: return existing = self.data["workflows"][workflow_id] + if not isinstance(existing, dict): + self.data["workflows"][workflow_id] = dict(fields) + self.data["workflows"][workflow_id]["updated_at"] = datetime.now(timezone.utc).isoformat() + self.save() + return installed_at = existing.get("installed_at") existing.update(fields) From d2588e750bae2829ca4b155e5f08c00cfb5401cb Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:22:28 -0500 Subject: [PATCH 06/26] fix: harden WorkflowRegistry against corrupted JSON - Normalize 'workflows' field to dict in _load() (handles corrupted registry where 'workflows' is a list/string/null) - Validate top-level loaded data is a dict before using it - Guard enable/disable commands against non-dict metadata from get() --- src/specify_cli/__init__.py | 6 ++++++ src/specify_cli/workflows/catalog.py | 14 ++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 8e23e9f92b..07fa1c5b8d 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5478,6 +5478,9 @@ def workflow_enable( raise typer.Exit(1) metadata = registry.get(workflow_id) + if not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' has corrupted registry metadata") + raise typer.Exit(1) if metadata.get("enabled", True): console.print(f"[yellow]Workflow '{workflow_id}' is already enabled[/yellow]") raise typer.Exit(0) @@ -5506,6 +5509,9 @@ def workflow_disable( raise typer.Exit(1) metadata = registry.get(workflow_id) + if not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' has corrupted registry metadata") + raise typer.Exit(1) if not metadata.get("enabled", True): console.print(f"[yellow]Workflow '{workflow_id}' is already disabled[/yellow]") raise typer.Exit(0) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index e25ec0ade5..6823bea156 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -72,14 +72,20 @@ def __init__(self, project_root: Path) -> None: def _load(self) -> dict[str, Any]: """Load registry from disk or create default.""" + default = {"schema_version": self.SCHEMA_VERSION, "workflows": {}} if self.registry_path.exists(): try: with open(self.registry_path, encoding="utf-8") as f: - return json.load(f) + data = json.load(f) except (json.JSONDecodeError, ValueError): - # Corrupted registry file — reset to default - return {"schema_version": self.SCHEMA_VERSION, "workflows": {}} - return {"schema_version": self.SCHEMA_VERSION, "workflows": {}} + return default + if not isinstance(data, dict): + return default + # Normalize 'workflows' to a dict in case the file is corrupted + if not isinstance(data.get("workflows"), dict): + data["workflows"] = {} + return data + return default def save(self) -> None: """Persist registry to disk.""" From 4a7cf529c077bf233c5b350bea4c230d7dc4f045 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:31:59 -0500 Subject: [PATCH 07/26] fix: validate source URL scheme in _download_validated before opening Defense-in-depth: _download_validated() now calls _validate_url_scheme() on the source URL before urlopen, not just on the final redirected URL. --- src/specify_cli/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 07fa1c5b8d..9ba89ac7be 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4884,9 +4884,10 @@ def _validate_url_scheme(url: str) -> None: def _download_validated(source_url: str, destination: Path) -> None: - """Download a URL to *destination*, validating HTTPS on redirects.""" + """Download a URL to *destination*, validating HTTPS before and after redirects.""" from urllib.request import urlopen # noqa: S310 + _validate_url_scheme(source_url) with urlopen(source_url, timeout=30) as resp: # noqa: S310 final_url = resp.geturl() try: From 390691831301186519ddda03d4f3cba96f00127f Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:37:55 -0500 Subject: [PATCH 08/26] fix: reorder validation, safe tempdir prefix, fix test coverage - Reorder --dev/--from/source validation so --dev-without-source produces its own error instead of falling through to generic message - Use constant 'wf-update-' tempdir prefix instead of wf_id (avoids issues with path separators in corrupted IDs) - Fix test_add_dev_requires_source to invoke 'add --dev' without source (previously tested mutual exclusion instead) --- src/specify_cli/__init__.py | 8 ++++---- tests/test_workflows.py | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 9ba89ac7be..248913fa71 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4908,15 +4908,15 @@ def workflow_add( from .workflows.engine import WorkflowDefinition # Validate source selection and disallow incompatible options - if not source and not from_url: - console.print("[red]Error:[/red] Provide a workflow ID/URL/path, or use --from ") - raise typer.Exit(1) if dev and from_url: console.print("[red]Error:[/red] --dev cannot be combined with --from") raise typer.Exit(1) if dev and not source: console.print("[red]Error:[/red] --dev requires a local workflow path") raise typer.Exit(1) + if not source and not from_url: + console.print("[red]Error:[/red] Provide a workflow ID/URL/path, or use --from ") + raise typer.Exit(1) project_root = Path.cwd() specify_dir = project_root / ".specify" @@ -5399,7 +5399,7 @@ def workflow_update( workflows_dir.mkdir(parents=True, exist_ok=True) # Download and validate in a temporary directory so failures do not # leave a partially-created workflow directory behind. - with tempfile.TemporaryDirectory(prefix=f"{wf_id}-", dir=workflows_dir) as tmp_dir: + with tempfile.TemporaryDirectory(prefix="wf-update-", dir=workflows_dir) as tmp_dir: staged_dir = Path(tmp_dir) / wf_id staged_dir.mkdir(parents=True, exist_ok=True) wf_file = staged_dir / "workflow.yml" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 5cdd42c657..105a46c13d 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -2061,8 +2061,9 @@ def test_add_dev_requires_source(self, project_dir, monkeypatch): runner = CliRunner() monkeypatch.chdir(project_dir) - result = runner.invoke(app, ["workflow", "add", "--dev", "--from", "https://example.com/wf.yml"]) + result = runner.invoke(app, ["workflow", "add", "--dev"]) assert result.exit_code != 0 + assert "--dev requires" in result.output or "path" in result.output.lower() def test_update_success_path(self, project_dir, monkeypatch): """Test workflow update happy path: download, validate, swap, registry update.""" From cca84b7a40533ee1da75d0b1f446ae08f312f751 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:51:21 -0500 Subject: [PATCH 09/26] fix: KeyboardInterrupt safety, corrupted entry repair, disabled run guard - Catch BaseException (not just Exception) in update swap/registry to handle KeyboardInterrupt and restore backup on Ctrl+C - Preserve installed_at (or set fresh one) when repairing corrupted non-dict registry entries in WorkflowRegistry.update() - Block 'workflow run' for disabled workflows with clear error message and re-enable hint --- src/specify_cli/__init__.py | 17 +++++++++++++---- src/specify_cli/workflows/catalog.py | 7 +++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 248913fa71..c0271a6ad5 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4678,6 +4678,15 @@ def workflow_run( console.print(f"[red]Error:[/red] Invalid workflow: {exc}") raise typer.Exit(1) + # Check if workflow is disabled in registry + from .workflows.catalog import WorkflowRegistry + wf_registry = WorkflowRegistry(project_root) + wf_meta = wf_registry.get(source) + if isinstance(wf_meta, dict) and not wf_meta.get("enabled", True): + console.print(f"[red]Error:[/red] Workflow '{source}' is disabled") + console.print(f"\nTo re-enable: specify workflow enable {source}") + raise typer.Exit(1) + # Validate errors = engine.validate(definition) if errors: @@ -5424,8 +5433,8 @@ def workflow_update( wf_dir.rename(backup_dir) try: shutil.move(str(staged_dir), str(wf_dir)) - except Exception: - # Restore from backup if swap fails + except BaseException: + # Restore from backup if swap fails (including KeyboardInterrupt) if backup_dir and backup_dir.exists(): if wf_dir.exists(): shutil.rmtree(wf_dir) @@ -5439,8 +5448,8 @@ def workflow_update( "name": definition.name or update["name"], "description": definition.description or "", }) - except Exception: - # Restore from backup if registry update fails + except BaseException: + # Restore from backup if registry update fails (including KeyboardInterrupt) if backup_dir and backup_dir.exists(): if wf_dir.exists(): shutil.rmtree(wf_dir) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 6823bea156..2e442a0c4c 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -122,8 +122,11 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: existing = self.data["workflows"][workflow_id] if not isinstance(existing, dict): - self.data["workflows"][workflow_id] = dict(fields) - self.data["workflows"][workflow_id]["updated_at"] = datetime.now(timezone.utc).isoformat() + now = datetime.now(timezone.utc).isoformat() + repaired = dict(fields) + repaired.setdefault("installed_at", now) + repaired["updated_at"] = now + self.data["workflows"][workflow_id] = repaired self.save() return installed_at = existing.get("installed_at") From 7414b5f03947fab229359f296e081af56b3e1215 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:59:56 -0500 Subject: [PATCH 10/26] fix: check disabled before load, streaming download, tolerate cleanup - Move disabled-workflow check before engine.load_workflow() so disabled workflows get the 'disabled' error, not a load/parse error - Use shutil.copyfileobj for streaming download instead of read() into memory (consistent with shared helper pattern) - Wrap backup cleanup in try/except OSError so cleanup failures after a successful update don't flip the command to exit 1 --- src/specify_cli/__init__.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index c0271a6ad5..6ff579bb41 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4669,6 +4669,15 @@ def workflow_run( engine = WorkflowEngine(project_root) engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") + # Check if workflow is disabled in registry before loading + from .workflows.catalog import WorkflowRegistry + wf_registry = WorkflowRegistry(project_root) + wf_meta = wf_registry.get(source) + if isinstance(wf_meta, dict) and not wf_meta.get("enabled", True): + console.print(f"[red]Error:[/red] Workflow '{source}' is disabled") + console.print(f"\nTo re-enable: specify workflow enable {source}") + raise typer.Exit(1) + try: definition = engine.load_workflow(source) except FileNotFoundError: @@ -4678,15 +4687,6 @@ def workflow_run( console.print(f"[red]Error:[/red] Invalid workflow: {exc}") raise typer.Exit(1) - # Check if workflow is disabled in registry - from .workflows.catalog import WorkflowRegistry - wf_registry = WorkflowRegistry(project_root) - wf_meta = wf_registry.get(source) - if isinstance(wf_meta, dict) and not wf_meta.get("enabled", True): - console.print(f"[red]Error:[/red] Workflow '{source}' is disabled") - console.print(f"\nTo re-enable: specify workflow enable {source}") - raise typer.Exit(1) - # Validate errors = engine.validate(definition) if errors: @@ -4894,6 +4894,7 @@ def _validate_url_scheme(url: str) -> None: def _download_validated(source_url: str, destination: Path) -> None: """Download a URL to *destination*, validating HTTPS before and after redirects.""" + import shutil as _shutil from urllib.request import urlopen # noqa: S310 _validate_url_scheme(source_url) @@ -4903,7 +4904,8 @@ def _download_validated(source_url: str, destination: Path) -> None: _validate_url_scheme(final_url) except ValueError: raise ValueError(f"Redirected to non-HTTPS: {final_url}") - destination.write_bytes(resp.read()) + with destination.open("wb") as dest_file: + _shutil.copyfileobj(resp, dest_file) @workflow_app.command("add") @@ -5457,7 +5459,10 @@ def workflow_update( raise # Clean up backup after fully successful update if backup_dir and backup_dir.exists(): - shutil.rmtree(backup_dir) + try: + shutil.rmtree(backup_dir) + except OSError as cleanup_exc: + console.print(f" [yellow]⚠[/yellow] {wf_id}: Updated successfully, but failed to remove backup: {cleanup_exc}") console.print(f" [green]✓[/green] {wf_id}: {update['installed']} → {update['available']}") except Exception as exc: From bc6ee95e5f99637782846573d5eb2b5cec063532 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 12:44:44 -0500 Subject: [PATCH 11/26] fix: reject non-HTTPS redirects before following, validate workflow ID format - Replace urlopen with custom HTTPRedirectHandler that validates each redirect URL scheme before following it (prevents non-HTTPS requests) - Validate workflow ID against _ID_PATTERN (lowercase alnum + hyphens) before any filesystem operations in both add and update commands, preventing accidental deletion of reserved directories --- src/specify_cli/__init__.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 6ff579bb41..f3dba79743 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4893,17 +4893,22 @@ def _validate_url_scheme(url: str) -> None: def _download_validated(source_url: str, destination: Path) -> None: - """Download a URL to *destination*, validating HTTPS before and after redirects.""" + """Download a URL to *destination*, rejecting non-HTTPS redirects before following them.""" import shutil as _shutil - from urllib.request import urlopen # noqa: S310 + from urllib.request import build_opener, HTTPRedirectHandler, Request # noqa: S310 _validate_url_scheme(source_url) - with urlopen(source_url, timeout=30) as resp: # noqa: S310 - final_url = resp.geturl() - try: - _validate_url_scheme(final_url) - except ValueError: - raise ValueError(f"Redirected to non-HTTPS: {final_url}") + + class _SafeRedirectHandler(HTTPRedirectHandler): + def redirect_request(self, req, fp, code, msg, headers, newurl): + try: + _validate_url_scheme(newurl) + except ValueError: + raise ValueError(f"Redirected to non-HTTPS: {newurl}") + return super().redirect_request(req, fp, code, msg, headers, newurl) + + opener = build_opener(_SafeRedirectHandler) + with opener.open(source_url, timeout=30) as resp: with destination.open("wb") as dest_file: _shutil.copyfileobj(resp, dest_file) @@ -5088,7 +5093,11 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) workflow_dir = workflows_dir / source - # Validate that source is a safe directory name (no path traversal) + # Validate workflow ID format and path safety + import re as _re + if not _re.match(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$", source): + console.print(f"[red]Error:[/red] Invalid workflow ID: {source!r}") + raise typer.Exit(1) try: workflow_dir.resolve().relative_to(workflows_dir.resolve()) except ValueError: @@ -5399,7 +5408,11 @@ def workflow_update( continue wf_dir = workflows_dir / wf_id - # Validate that wf_id is a safe directory name (no path traversal) + # Validate workflow ID format and path safety + import re as _re + if not _re.match(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$", wf_id): + console.print(f"⚠ {wf_id}: Invalid workflow ID format (skipping)") + continue try: wf_dir.resolve().relative_to(workflows_dir.resolve()) except ValueError: From ea2c732c435298a9e5a2be1b9215bf9a691dd12f Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 12:55:18 -0500 Subject: [PATCH 12/26] fix: remove unused import, reject source+--from, broaden version parsing - Remove unused Request import from _download_validated - Reject ambiguous 'source + --from' combination in workflow add - Coerce version values to str and catch TypeError alongside InvalidVersion for resilience against non-string registry/catalog version fields --- src/specify_cli/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index f3dba79743..5de805f633 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4895,7 +4895,7 @@ def _validate_url_scheme(url: str) -> None: def _download_validated(source_url: str, destination: Path) -> None: """Download a URL to *destination*, rejecting non-HTTPS redirects before following them.""" import shutil as _shutil - from urllib.request import build_opener, HTTPRedirectHandler, Request # noqa: S310 + from urllib.request import build_opener, HTTPRedirectHandler # noqa: S310 _validate_url_scheme(source_url) @@ -4927,6 +4927,9 @@ def workflow_add( if dev and from_url: console.print("[red]Error:[/red] --dev cannot be combined with --from") raise typer.Exit(1) + if source and from_url: + console.print("[red]Error:[/red] Cannot combine a positional source with --from") + raise typer.Exit(1) if dev and not source: console.print("[red]Error:[/red] --dev requires a local workflow path") raise typer.Exit(1) @@ -5337,10 +5340,10 @@ def workflow_update( if not isinstance(metadata, dict): console.print(f"⚠ {wf_id}: Malformed workflow registry entry (skipping)") continue - installed_ver_str = metadata.get("version", "0.0.0") + installed_ver_str = str(metadata.get("version", "0.0.0")) try: installed_version = pkg_version.Version(installed_ver_str) - except pkg_version.InvalidVersion: + except (pkg_version.InvalidVersion, TypeError): console.print(f"⚠ {wf_id}: Invalid installed version '{installed_ver_str}' (skipping)") continue @@ -5357,10 +5360,10 @@ def workflow_update( console.print(f"⚠ {wf_id}: Updates not allowed from '{cat_info.get('_catalog_name', 'catalog')}' (skipping)") continue - cat_ver_str = cat_info.get("version", "0.0.0") + cat_ver_str = str(cat_info.get("version", "0.0.0")) try: catalog_version = pkg_version.Version(cat_ver_str) - except pkg_version.InvalidVersion: + except (pkg_version.InvalidVersion, TypeError): console.print(f"⚠ {wf_id}: Invalid catalog version '{cat_ver_str}' (skipping)") continue From 6dcb3fb5d0ae9307804d3310be2296ecadbb2958 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 13:04:53 -0500 Subject: [PATCH 13/26] fix: atomic registry save, registry snapshot rollback, hidden backup dir - Make WorkflowRegistry.save() atomic: write to temp file then os.replace into final path (prevents corruption on partial writes) - Snapshot registry file before update; restore snapshot on failure alongside workflow directory rollback - Move update backups to hidden .backup/ directory instead of .bak to avoid cluttering workflows dir and accidental user data deletion - Clean up empty .backup/ directory after successful update --- src/specify_cli/__init__.py | 41 +++++++++++++++++++++++++--- src/specify_cli/workflows/catalog.py | 20 ++++++++++++-- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 5de805f633..79e677dbc4 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5442,10 +5442,12 @@ def workflow_update( if errors: raise ValueError(f"Validation failed: {'; '.join(errors)}") - # Atomic swap: move old to backup, move staged in, remove backup on success + # Atomic swap: move old to hidden backup, move staged in backup_dir = None if wf_dir.exists(): - backup_dir = wf_dir.with_name(f"{wf_id}.bak") + backup_parent = workflows_dir / ".backup" + backup_parent.mkdir(parents=True, exist_ok=True) + backup_dir = backup_parent / wf_id if backup_dir.exists(): shutil.rmtree(backup_dir) wf_dir.rename(backup_dir) @@ -5459,26 +5461,57 @@ def workflow_update( backup_dir.rename(wf_dir) raise - # Update registry (backup still exists until this succeeds) + # Snapshot registry file, then update; restore both on failure + import os + registry_file = registry.registry_path + registry_snapshot = None try: + if registry_file.exists(): + fd, snap = tempfile.mkstemp( + prefix="registry-", suffix=".json", dir=workflows_dir + ) + os.close(fd) + registry_snapshot = Path(snap) + shutil.copy2(registry_file, registry_snapshot) + registry.update(wf_id, { "version": update["available"], "name": definition.name or update["name"], "description": definition.description or "", }) except BaseException: - # Restore from backup if registry update fails (including KeyboardInterrupt) + # Restore registry snapshot + if registry_snapshot and registry_snapshot.exists(): + try: + os.replace(registry_snapshot, registry_file) + except OSError: + pass + registry_snapshot = None + # Restore workflow from backup if backup_dir and backup_dir.exists(): if wf_dir.exists(): shutil.rmtree(wf_dir) backup_dir.rename(wf_dir) raise + finally: + if registry_snapshot and registry_snapshot.exists(): + try: + registry_snapshot.unlink() + except OSError: + pass # Clean up backup after fully successful update if backup_dir and backup_dir.exists(): try: shutil.rmtree(backup_dir) except OSError as cleanup_exc: console.print(f" [yellow]⚠[/yellow] {wf_id}: Updated successfully, but failed to remove backup: {cleanup_exc}") + # Clean up empty .backup directory + backup_parent = workflows_dir / ".backup" + if backup_parent.exists(): + try: + backup_parent.rmdir() # only succeeds if empty + except OSError: + pass console.print(f" [green]✓[/green] {wf_id}: {update['installed']} → {update['available']}") except Exception as exc: diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 2e442a0c4c..87feea0026 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -88,10 +88,24 @@ def _load(self) -> dict[str, Any]: return default def save(self) -> None: - """Persist registry to disk.""" + """Persist registry to disk atomically (write to temp, then rename).""" + import os + import tempfile as _tempfile + self.workflows_dir.mkdir(parents=True, exist_ok=True) - with open(self.registry_path, "w", encoding="utf-8") as f: - json.dump(self.data, f, indent=2) + fd, tmp_path = _tempfile.mkstemp( + suffix=".json", dir=self.workflows_dir + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(self.data, f, indent=2) + os.replace(tmp_path, self.registry_path) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise def add(self, workflow_id: str, metadata: dict[str, Any]) -> None: """Add or update an installed workflow entry.""" From f18e5a3bcb5430603cf10d2ed8e5d4a875186eac Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 13:13:22 -0500 Subject: [PATCH 14/26] fix: tighten test assertion for --dev missing path Remove overly permissive 'or Error in output' fallback so the test validates the specific 'not found' message. --- tests/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 105a46c13d..763baaf477 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -2028,7 +2028,7 @@ def test_add_dev_missing_path(self, project_dir, monkeypatch): monkeypatch.chdir(project_dir) result = runner.invoke(app, ["workflow", "add", "--dev", "/nonexistent/path"]) assert result.exit_code != 0 - assert "not found" in result.output.lower() or "Error" in result.output + assert "not found" in result.output.lower() def test_list_shows_disabled(self, project_dir, monkeypatch): from typer.testing import CliRunner From 8a4dbafd8e6e20c951f58331650fbae620176dc8 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 13:30:40 -0500 Subject: [PATCH 15/26] fix: check disabled by definition.id, reload registry after rollback, reuse _ID_PATTERN - Check disabled status by both source arg and loaded definition.id so file-path invocations of disabled workflows are also blocked - Reload registry in-memory state from disk after snapshot restore so subsequent updates in the same run don't write back stale data - Replace duplicated ID regex with shared _ID_PATTERN from engine --- src/specify_cli/__init__.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 79e677dbc4..02064ab754 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4687,6 +4687,14 @@ def workflow_run( console.print(f"[red]Error:[/red] Invalid workflow: {exc}") raise typer.Exit(1) + # Also check by loaded definition ID (covers file-path invocations) + if definition.id and definition.id != source: + wf_meta_by_id = wf_registry.get(definition.id) + if isinstance(wf_meta_by_id, dict) and not wf_meta_by_id.get("enabled", True): + console.print(f"[red]Error:[/red] Workflow '{definition.id}' is disabled") + console.print(f"\nTo re-enable: specify workflow enable {definition.id}") + raise typer.Exit(1) + # Validate errors = engine.validate(definition) if errors: @@ -5097,8 +5105,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: workflow_dir = workflows_dir / source # Validate workflow ID format and path safety - import re as _re - if not _re.match(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$", source): + from .workflows.engine import _ID_PATTERN as _WORKFLOW_ID_PATTERN + if not _WORKFLOW_ID_PATTERN.fullmatch(source): console.print(f"[red]Error:[/red] Invalid workflow ID: {source!r}") raise typer.Exit(1) try: @@ -5412,8 +5420,8 @@ def workflow_update( wf_dir = workflows_dir / wf_id # Validate workflow ID format and path safety - import re as _re - if not _re.match(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$", wf_id): + from .workflows.engine import _ID_PATTERN as _WORKFLOW_ID_PATTERN + if not _WORKFLOW_ID_PATTERN.fullmatch(wf_id): console.print(f"⚠ {wf_id}: Invalid workflow ID format (skipping)") continue try: @@ -5487,6 +5495,8 @@ def workflow_update( except OSError: pass registry_snapshot = None + # Reload in-memory state from restored file + registry.data = registry._load() # Restore workflow from backup if backup_dir and backup_dir.exists(): if wf_dir.exists(): From fbc60018fa6e0ce758dee4d02fa7ec6cad98c714 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 13:41:00 -0500 Subject: [PATCH 16/26] fix: temp file cleanup on download failure, rollback without backup - Consolidate download+install into single try/finally so temp files are always cleaned up even if download fails - On registry update failure when no backup existed (dir was missing), remove the newly placed workflow directory to restore pre-update state --- src/specify_cli/__init__.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 02064ab754..ea703105c3 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5018,19 +5018,20 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: console.print("Only install workflows from sources you trust.\n") import tempfile + tmp_path = None try: with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: tmp_path = Path(tmp.name) _download_validated(from_url, tmp_path) + _validate_and_install_local(tmp_path, from_url) except typer.Exit: raise except Exception as exc: console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") raise typer.Exit(1) - try: - _validate_and_install_local(tmp_path, from_url) finally: - tmp_path.unlink(missing_ok=True) + if tmp_path: + tmp_path.unlink(missing_ok=True) return # Try as URL (http/https) @@ -5042,19 +5043,20 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) import tempfile + tmp_path = None try: with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: tmp_path = Path(tmp.name) _download_validated(source, tmp_path) + _validate_and_install_local(tmp_path, source) except typer.Exit: raise except Exception as exc: console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") raise typer.Exit(1) - try: - _validate_and_install_local(tmp_path, source) finally: - tmp_path.unlink(missing_ok=True) + if tmp_path: + tmp_path.unlink(missing_ok=True) return # Try as a local file/directory @@ -5497,11 +5499,14 @@ def workflow_update( registry_snapshot = None # Reload in-memory state from restored file registry.data = registry._load() - # Restore workflow from backup + # Restore workflow from backup, or remove new dir if no backup existed if backup_dir and backup_dir.exists(): if wf_dir.exists(): shutil.rmtree(wf_dir) backup_dir.rename(wf_dir) + elif wf_dir.exists() and not backup_dir: + # No prior directory existed; remove the newly placed one + shutil.rmtree(wf_dir, ignore_errors=True) raise finally: if registry_snapshot and registry_snapshot.exists(): From 77b6db00f339b5020eacf6e1b2e68d09ea321e95 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 13:52:12 -0500 Subject: [PATCH 17/26] fix: expose public is_valid_workflow_id() instead of importing private _ID_PATTERN Add is_valid_workflow_id() to workflows.engine as the public API for ID validation. CLI code now uses this instead of the private _ID_PATTERN constant. --- src/specify_cli/__init__.py | 8 ++++---- src/specify_cli/workflows/engine.py | 5 +++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index ea703105c3..5682a65659 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5107,8 +5107,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: workflow_dir = workflows_dir / source # Validate workflow ID format and path safety - from .workflows.engine import _ID_PATTERN as _WORKFLOW_ID_PATTERN - if not _WORKFLOW_ID_PATTERN.fullmatch(source): + from .workflows.engine import is_valid_workflow_id + if not is_valid_workflow_id(source): console.print(f"[red]Error:[/red] Invalid workflow ID: {source!r}") raise typer.Exit(1) try: @@ -5422,8 +5422,8 @@ def workflow_update( wf_dir = workflows_dir / wf_id # Validate workflow ID format and path safety - from .workflows.engine import _ID_PATTERN as _WORKFLOW_ID_PATTERN - if not _WORKFLOW_ID_PATTERN.fullmatch(wf_id): + from .workflows.engine import is_valid_workflow_id + if not is_valid_workflow_id(wf_id): console.print(f"⚠ {wf_id}: Invalid workflow ID format (skipping)") continue try: diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py index d6a73bbeb0..dad9f167cc 100644 --- a/src/specify_cli/workflows/engine.py +++ b/src/specify_cli/workflows/engine.py @@ -82,6 +82,11 @@ def from_string(cls, content: str) -> WorkflowDefinition: # ID format: lowercase alphanumeric with hyphens _ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$") + +def is_valid_workflow_id(workflow_id: str) -> bool: + """Return True if *workflow_id* matches the allowed ID format.""" + return bool(_ID_PATTERN.fullmatch(workflow_id)) + # Valid step types (matching STEP_REGISTRY keys) def _get_valid_step_types() -> set[str]: """Return valid step types from the registry, with a built-in fallback.""" From 7c232c26f756b8188cb6d6418e4fa60b61c9a0b6 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 14:01:33 -0500 Subject: [PATCH 18/26] fix: prevent SSRF via redirect to loopback, preserve description on update - Redirect validation now only allows HTTP loopback when the original URL was already HTTP loopback (prevents HTTPS->http://localhost SSRF) - Description fallback chain on update: definition > update info > existing registry entry (prevents clearing catalog descriptions) --- src/specify_cli/__init__.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 5682a65659..59c8b7bdbe 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4888,29 +4888,44 @@ def _is_loopback_host(hostname: str) -> bool: return False -def _validate_url_scheme(url: str) -> None: - """Raise ValueError if URL is not HTTPS (or HTTP for localhost).""" +def _validate_url_scheme(url: str, *, allow_http_loopback: bool = True) -> None: + """Raise ValueError if URL is not HTTPS, optionally allowing HTTP loopback URLs.""" from urllib.parse import urlparse parsed = urlparse(url) if not parsed.hostname: raise ValueError(f"URL has no hostname: {url}") host = parsed.hostname - if parsed.scheme != "https" and not (parsed.scheme == "http" and _is_loopback_host(host)): - raise ValueError(f"Non-HTTPS URL not allowed: {url}") + if parsed.scheme == "https": + return + if allow_http_loopback and parsed.scheme == "http" and _is_loopback_host(host): + return + raise ValueError(f"Non-HTTPS URL not allowed: {url}") def _download_validated(source_url: str, destination: Path) -> None: - """Download a URL to *destination*, rejecting non-HTTPS redirects before following them.""" + """Download a URL to *destination*, rejecting insecure redirects before following them.""" import shutil as _shutil + from urllib.parse import urlparse from urllib.request import build_opener, HTTPRedirectHandler # noqa: S310 _validate_url_scheme(source_url) + # Only allow HTTP loopback redirects if the source was already HTTP loopback + source_parsed = urlparse(source_url) + source_allows_http_loopback = ( + source_parsed.scheme == "http" + and source_parsed.hostname is not None + and _is_loopback_host(source_parsed.hostname) + ) + class _SafeRedirectHandler(HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): try: - _validate_url_scheme(newurl) + _validate_url_scheme( + newurl, + allow_http_loopback=source_allows_http_loopback, + ) except ValueError: raise ValueError(f"Redirected to non-HTTPS: {newurl}") return super().redirect_request(req, fp, code, msg, headers, newurl) @@ -5484,10 +5499,17 @@ def workflow_update( registry_snapshot = Path(snap) shutil.copy2(registry_file, registry_snapshot) + existing_entry = installed.get(wf_id, {}) + description = ( + definition.description + or update.get("description") + or (existing_entry.get("description", "") if isinstance(existing_entry, dict) else "") + ) + registry.update(wf_id, { "version": update["available"], "name": definition.name or update["name"], - "description": definition.description or "", + "description": description, }) except BaseException: # Restore registry snapshot From 15beca8b53637394f2c531060e86f76acbea4697 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 14:10:19 -0500 Subject: [PATCH 19/26] fix: surface catalog errors in update, clean up save() imports - Track catalog errors during update; exit 1 with warning instead of misleading 'all up to date' when catalog resolution fails - Move tempfile import to module scope in catalog.py; remove redundant local imports in save() --- src/specify_cli/__init__.py | 8 +++++++- src/specify_cli/workflows/catalog.py | 6 ++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 59c8b7bdbe..eb8a1c3149 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5360,6 +5360,7 @@ def workflow_update( console.print("🔄 Checking for updates...\n") updates_available = [] + catalog_errors = 0 for wf_id in workflows_to_update: metadata = installed.get(wf_id, {}) if not isinstance(metadata, dict): @@ -5374,7 +5375,9 @@ def workflow_update( try: cat_info = catalog.get_workflow_info(wf_id) - except WorkflowCatalogError: + except WorkflowCatalogError as exc: + console.print(f"⚠ {wf_id}: Catalog error: {exc}") + catalog_errors += 1 cat_info = None if not cat_info: @@ -5405,6 +5408,9 @@ def workflow_update( console.print(f"✓ {wf_id}: Up to date (v{installed_version})") if not updates_available: + if catalog_errors: + console.print(f"\n[yellow]Could not check {catalog_errors} workflow(s) due to catalog errors[/yellow]") + raise typer.Exit(1) console.print("\n[green]All workflows are up to date![/green]") raise typer.Exit(0) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 87feea0026..59ce92ab66 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -12,6 +12,7 @@ import hashlib import json import os +import tempfile import time from dataclasses import dataclass from pathlib import Path @@ -89,11 +90,8 @@ def _load(self) -> dict[str, Any]: def save(self) -> None: """Persist registry to disk atomically (write to temp, then rename).""" - import os - import tempfile as _tempfile - self.workflows_dir.mkdir(parents=True, exist_ok=True) - fd, tmp_path = _tempfile.mkstemp( + fd, tmp_path = tempfile.mkstemp( suffix=".json", dir=self.workflows_dir ) try: From 16509cb8e44293f4763690cf652df04d3032bed1 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 14:25:55 -0500 Subject: [PATCH 20/26] fix: preserve file permissions on atomic registry save Capture existing registry file mode before replace and apply it to the temp file via os.chmod, so mkstemp's 0o600 default doesn't change on-disk permissions. --- src/specify_cli/workflows/catalog.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 59ce92ab66..aa973feb11 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -91,12 +91,26 @@ def _load(self) -> dict[str, Any]: def save(self) -> None: """Persist registry to disk atomically (write to temp, then rename).""" self.workflows_dir.mkdir(parents=True, exist_ok=True) + + # Capture existing file permissions to preserve after replace + existing_stat = None + if self.registry_path.exists(): + try: + existing_stat = self.registry_path.stat() + except OSError: + pass + fd, tmp_path = tempfile.mkstemp( suffix=".json", dir=self.workflows_dir ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(self.data, f, indent=2) + if existing_stat is not None: + try: + os.chmod(tmp_path, existing_stat.st_mode & 0o7777) + except OSError: + pass os.replace(tmp_path, self.registry_path) except BaseException: try: From 5da3a397fcf2068ca59faaf355fd6423564f9145 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 14:48:06 -0500 Subject: [PATCH 21/26] fix: deepcopy fields in Registry.update(), ensure installed_at exists - Deepcopy incoming fields to prevent callers from mutating registry state, matching ExtensionRegistry/PresetRegistry pattern - Set installed_at when missing from existing entries so the registry converges to the expected shape over time --- src/specify_cli/workflows/catalog.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index aa973feb11..72e7ae8708 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -157,9 +157,12 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: return installed_at = existing.get("installed_at") - existing.update(fields) + import copy + existing.update(copy.deepcopy(fields)) if installed_at is not None and "installed_at" not in fields: existing["installed_at"] = installed_at + if "installed_at" not in existing: + existing["installed_at"] = datetime.now(timezone.utc).isoformat() existing["updated_at"] = datetime.now(timezone.utc).isoformat() self.save() From 28309a417f39aab6ce12d916571c0432c64773ad Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:35:36 -0500 Subject: [PATCH 22/26] fix: validate absolute redirect URL, raise KeyError on missing workflow - Validate redirect URL after super().redirect_request() resolves relative Location headers to absolute URLs - Registry.update() now raises KeyError on missing workflow ID, matching ExtensionRegistry/PresetRegistry pattern --- src/specify_cli/__init__.py | 10 +++++++--- src/specify_cli/workflows/catalog.py | 8 ++++++-- tests/test_workflows.py | 5 +++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index eb8a1c3149..07b755cb64 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4921,14 +4921,18 @@ def _download_validated(source_url: str, destination: Path) -> None: class _SafeRedirectHandler(HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): + redirect_req = super().redirect_request(req, fp, code, msg, headers, newurl) + if redirect_req is None: + return None + redirect_url = redirect_req.full_url try: _validate_url_scheme( - newurl, + redirect_url, allow_http_loopback=source_allows_http_loopback, ) except ValueError: - raise ValueError(f"Redirected to non-HTTPS: {newurl}") - return super().redirect_request(req, fp, code, msg, headers, newurl) + raise ValueError(f"Redirected to non-HTTPS: {redirect_url}") + return redirect_req opener = build_opener(_SafeRedirectHandler) with opener.open(source_url, timeout=30) as resp: diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 72e7ae8708..5ab77f684c 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -140,11 +140,15 @@ def remove(self, workflow_id: str) -> bool: return False def update(self, workflow_id: str, fields: dict[str, Any]) -> None: - """Update specific fields on an installed workflow entry.""" + """Update specific fields on an installed workflow entry. + + Raises: + KeyError: If the workflow is not installed. + """ from datetime import datetime, timezone if workflow_id not in self.data["workflows"]: - return + raise KeyError(workflow_id) existing = self.data["workflows"][workflow_id] if not isinstance(existing, dict): diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 763baaf477..c4c63b4602 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1661,10 +1661,11 @@ def test_update(self, project_dir): def test_update_nonexistent(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry + import pytest registry = WorkflowRegistry(project_dir) - # Should not raise, just no-op - registry.update("missing", {"version": "2.0.0"}) + with pytest.raises(KeyError): + registry.update("missing", {"version": "2.0.0"}) assert registry.get("missing") is None def test_enable_disable_via_update(self, project_dir): From 9d8de6e197c4f10d2c6008063df6722b995d580f Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:54:39 -0500 Subject: [PATCH 23/26] fix: descriptive KeyError, preserve installed_at unconditionally, carry catalog description - KeyError message now says "Workflow '' is not installed" - installed_at is always preserved from existing entry (callers cannot override it via fields), matching extension/preset behavior - Catalog description carried in updates_available so fallback chain works: definition > catalog > existing registry entry --- src/specify_cli/__init__.py | 1 + src/specify_cli/workflows/catalog.py | 9 ++++++--- tests/test_workflows.py | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 07b755cb64..689741e4ba 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5407,6 +5407,7 @@ def workflow_update( "available": str(catalog_version), "url": cat_info.get("url"), "catalog_name": cat_info.get("_catalog_name", ""), + "description": cat_info.get("description", ""), }) else: console.print(f"✓ {wf_id}: Up to date (v{installed_version})") diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 5ab77f684c..d030c48a4f 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -148,12 +148,13 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: from datetime import datetime, timezone if workflow_id not in self.data["workflows"]: - raise KeyError(workflow_id) + raise KeyError(f"Workflow '{workflow_id}' is not installed") existing = self.data["workflows"][workflow_id] if not isinstance(existing, dict): now = datetime.now(timezone.utc).isoformat() repaired = dict(fields) + repaired.pop("installed_at", None) repaired.setdefault("installed_at", now) repaired["updated_at"] = now self.data["workflows"][workflow_id] = repaired @@ -162,8 +163,10 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: installed_at = existing.get("installed_at") import copy - existing.update(copy.deepcopy(fields)) - if installed_at is not None and "installed_at" not in fields: + safe_fields = copy.deepcopy(fields) + safe_fields.pop("installed_at", None) # always preserve original + existing.update(safe_fields) + if installed_at is not None: existing["installed_at"] = installed_at if "installed_at" not in existing: existing["installed_at"] = datetime.now(timezone.utc).isoformat() diff --git a/tests/test_workflows.py b/tests/test_workflows.py index c4c63b4602..22a9fb5fc3 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1664,7 +1664,7 @@ def test_update_nonexistent(self, project_dir): import pytest registry = WorkflowRegistry(project_dir) - with pytest.raises(KeyError): + with pytest.raises(KeyError, match="not installed"): registry.update("missing", {"version": "2.0.0"}) assert registry.get("missing") is None From 94415a0776e03557cb506175e320710ab6bb4bee Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 16:13:50 -0500 Subject: [PATCH 24/26] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20sin?= =?UTF-8?q?gle=20timestamp,=20descriptive=20URL=20error,=20skipped=20count?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Capture single datetime.now() in Registry.update() and reuse for both installed_at and updated_at to avoid tiny time skews - Include ValueError message in --from URL validation error so users see the actual failure reason (e.g., missing hostname) - Track skipped workflow count in 'workflow update' and show 'No updates found (N skipped)' instead of misleading 'All workflows are up to date!' when workflows were skipped --- src/specify_cli/__init__.py | 17 +++++++++++++---- src/specify_cli/workflows/catalog.py | 5 +++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 689741e4ba..2dd459026c 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5028,9 +5028,9 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if from_url: try: _validate_url_scheme(from_url) - except ValueError: - console.print("[red]Error:[/red] URL must use HTTPS for security.") - console.print("HTTP is only allowed for localhost URLs.") + except ValueError as exc: + console.print(f"[red]Error:[/red] Invalid URL: {exc}") + console.print("URL must use HTTPS for security. HTTP is only allowed for localhost URLs.") raise typer.Exit(1) console.print("[yellow]Warning:[/yellow] Installing from external URL.") @@ -5365,16 +5365,19 @@ def workflow_update( updates_available = [] catalog_errors = 0 + skipped = 0 for wf_id in workflows_to_update: metadata = installed.get(wf_id, {}) if not isinstance(metadata, dict): console.print(f"⚠ {wf_id}: Malformed workflow registry entry (skipping)") + skipped += 1 continue installed_ver_str = str(metadata.get("version", "0.0.0")) try: installed_version = pkg_version.Version(installed_ver_str) except (pkg_version.InvalidVersion, TypeError): console.print(f"⚠ {wf_id}: Invalid installed version '{installed_ver_str}' (skipping)") + skipped += 1 continue try: @@ -5386,10 +5389,12 @@ def workflow_update( if not cat_info: console.print(f"⚠ {wf_id}: Not found in catalog (skipping)") + skipped += 1 continue if not cat_info.get("_install_allowed", True): console.print(f"⚠ {wf_id}: Updates not allowed from '{cat_info.get('_catalog_name', 'catalog')}' (skipping)") + skipped += 1 continue cat_ver_str = str(cat_info.get("version", "0.0.0")) @@ -5397,6 +5402,7 @@ def workflow_update( catalog_version = pkg_version.Version(cat_ver_str) except (pkg_version.InvalidVersion, TypeError): console.print(f"⚠ {wf_id}: Invalid catalog version '{cat_ver_str}' (skipping)") + skipped += 1 continue if catalog_version > installed_version: @@ -5416,7 +5422,10 @@ def workflow_update( if catalog_errors: console.print(f"\n[yellow]Could not check {catalog_errors} workflow(s) due to catalog errors[/yellow]") raise typer.Exit(1) - console.print("\n[green]All workflows are up to date![/green]") + if skipped: + console.print(f"\n[green]No updates found[/green] ({skipped} workflow(s) skipped)") + else: + console.print("\n[green]All workflows are up to date![/green]") raise typer.Exit(0) console.print("\n[bold]Updates available:[/bold]\n") diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index d030c48a4f..91d15e3139 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -165,12 +165,13 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: import copy safe_fields = copy.deepcopy(fields) safe_fields.pop("installed_at", None) # always preserve original + now = datetime.now(timezone.utc).isoformat() existing.update(safe_fields) if installed_at is not None: existing["installed_at"] = installed_at if "installed_at" not in existing: - existing["installed_at"] = datetime.now(timezone.utc).isoformat() - existing["updated_at"] = datetime.now(timezone.utc).isoformat() + existing["installed_at"] = now + existing["updated_at"] = now self.save() def get(self, workflow_id: str) -> dict[str, Any] | None: From d925baf174ec5e7ed917cd208159d7f5e9eb4046 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 16:24:43 -0500 Subject: [PATCH 25/26] fix: normalize installed_at timestamp, guard .backup against symlinks - Preserve installed_at only when it's a valid non-empty string; reset to now() if null/non-string/empty (prevents corrupted values) - Validate .backup is not a symlink before use; verify backup_dir resolves within workflows_dir to prevent path escape attacks --- src/specify_cli/__init__.py | 7 +++++++ src/specify_cli/workflows/catalog.py | 5 +++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 2dd459026c..d7eabc51ec 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5491,8 +5491,15 @@ def workflow_update( backup_dir = None if wf_dir.exists(): backup_parent = workflows_dir / ".backup" + # Guard against symlink attacks + if backup_parent.exists() and backup_parent.is_symlink(): + raise ValueError(".backup is a symlink; refusing to use it") backup_parent.mkdir(parents=True, exist_ok=True) backup_dir = backup_parent / wf_id + try: + backup_dir.resolve().relative_to(workflows_dir.resolve()) + except ValueError: + raise ValueError(f"Backup path escapes workflows directory: {backup_dir}") if backup_dir.exists(): shutil.rmtree(backup_dir) wf_dir.rename(backup_dir) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 91d15e3139..ae88e8e5c3 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -167,9 +167,10 @@ def update(self, workflow_id: str, fields: dict[str, Any]) -> None: safe_fields.pop("installed_at", None) # always preserve original now = datetime.now(timezone.utc).isoformat() existing.update(safe_fields) - if installed_at is not None: + # Preserve installed_at only if it's a valid string; otherwise reset + if isinstance(installed_at, str) and installed_at: existing["installed_at"] = installed_at - if "installed_at" not in existing: + else: existing["installed_at"] = now existing["updated_at"] = now self.save() From 6c0351d2d8a37ce8f8da8ee17b47de6adbbb4977 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Fri, 24 Apr 2026 16:40:15 -0500 Subject: [PATCH 26/26] fix: update error messages to say localhost/loopback Error messages for HTTP URL rejection now correctly say 'localhost/loopback' instead of just 'localhost', matching the actual _validate_url_scheme behavior that allows any loopback host. --- src/specify_cli/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index d7eabc51ec..e6cc0cd1e4 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5030,7 +5030,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: _validate_url_scheme(from_url) except ValueError as exc: console.print(f"[red]Error:[/red] Invalid URL: {exc}") - console.print("URL must use HTTPS for security. HTTP is only allowed for localhost URLs.") + console.print("URL must use HTTPS for security. HTTP is only allowed for localhost/loopback URLs.") raise typer.Exit(1) console.print("[yellow]Warning:[/yellow] Installing from external URL.") @@ -5058,7 +5058,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: try: _validate_url_scheme(source) except ValueError: - console.print("[red]Error:[/red] Only HTTPS URLs are allowed, except HTTP for localhost.") + console.print("[red]Error:[/red] Only HTTPS URLs are allowed, except HTTP for localhost/loopback.") raise typer.Exit(1) import tempfile