From f6abe06c3c91781c380f4256a17d0a6f39dffc01 Mon Sep 17 00:00:00 2001 From: Srikanth Patchava Date: Fri, 24 Apr 2026 19:41:35 -0700 Subject: [PATCH 1/2] chore: add .editorconfig for consistent code formatting --- .editorconfig | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000..094729efd1 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,21 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true +charset = utf-8 + +[*.py] +indent_style = space +indent_size = 4 + +[*.{yml,yaml}] +indent_style = space +indent_size = 2 + +[*.md] +trim_trailing_whitespace = false + +[Makefile] +indent_style = tab From eaba3666ca3aa5091b6d07b5d970cb7b1770b22d Mon Sep 17 00:00:00 2001 From: Srikanth Patchava Date: Sat, 25 Apr 2026 01:36:19 -0700 Subject: [PATCH 2/2] feat: add validate workflow step type with JSON schema validation Add a new 'validate' step type for workflows that performs JSON schema validation with a custom rule engine: - Full JSON schema validation (type, enum, const, minLength, maxLength, pattern, minimum, maximum, exclusiveMinimum, exclusiveMaximum, multipleOf, minItems, maxItems, uniqueItems, required, properties, additionalProperties, nested items) - Custom rule engine for expression-based validation predicates - Detailed error reporting with JSON paths and rule identifiers - Configurable fail_on_error behavior - Step configuration validation - Registration in STEP_REGISTRY as 'validate' type Also includes: - Comprehensive pytest test suite (test_validate_step.py) - Bug fix: handle json.JSONDecodeError in RunState.load() for corrupted state.json and inputs.json files Signed-off-by: Srikanth Patchava --- src/specify_cli/workflows/__init__.py | 2 + src/specify_cli/workflows/engine.py | 12 +- .../workflows/steps/validate/__init__.py | 347 ++++++++++++++++++ tests/test_validate_step.py | 295 +++++++++++++++ 4 files changed, 654 insertions(+), 2 deletions(-) create mode 100644 src/specify_cli/workflows/steps/validate/__init__.py create mode 100644 tests/test_validate_step.py diff --git a/src/specify_cli/workflows/__init__.py b/src/specify_cli/workflows/__init__.py index 13782f620b..65ce6c4e8e 100644 --- a/src/specify_cli/workflows/__init__.py +++ b/src/specify_cli/workflows/__init__.py @@ -51,6 +51,7 @@ def _register_builtin_steps() -> None: from .steps.prompt import PromptStep from .steps.shell import ShellStep from .steps.switch import SwitchStep + from .steps.validate import ValidateStep from .steps.while_loop import WhileStep _register_step(CommandStep()) @@ -62,6 +63,7 @@ def _register_builtin_steps() -> None: _register_step(PromptStep()) _register_step(ShellStep()) _register_step(SwitchStep()) + _register_step(ValidateStep()) _register_step(WhileStep()) diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py index d6a73bbeb0..f8e1648d90 100644 --- a/src/specify_cli/workflows/engine.py +++ b/src/specify_cli/workflows/engine.py @@ -289,7 +289,11 @@ def load(cls, run_id: str, project_root: Path) -> RunState: raise FileNotFoundError(msg) with open(state_path, encoding="utf-8") as f: - state_data = json.load(f) + try: + state_data = json.load(f) + except json.JSONDecodeError as exc: + msg = f"Corrupted run state file: {state_path}: {exc}" + raise ValueError(msg) from exc state = cls( run_id=state_data["run_id"], @@ -306,7 +310,11 @@ def load(cls, run_id: str, project_root: Path) -> RunState: inputs_path = runs_dir / "inputs.json" if inputs_path.exists(): with open(inputs_path, encoding="utf-8") as f: - inputs_data = json.load(f) + try: + inputs_data = json.load(f) + except json.JSONDecodeError as exc: + msg = f"Corrupted inputs file: {inputs_path}: {exc}" + raise ValueError(msg) from exc state.inputs = inputs_data.get("inputs", {}) return state diff --git a/src/specify_cli/workflows/steps/validate/__init__.py b/src/specify_cli/workflows/steps/validate/__init__.py new file mode 100644 index 0000000000..08cd039b78 --- /dev/null +++ b/src/specify_cli/workflows/steps/validate/__init__.py @@ -0,0 +1,347 @@ +"""Validate step — JSON schema validation with custom rules. + +Validates step context data (inputs, step outputs, or arbitrary JSON) +against a JSON-Schema-like rule set, then aggregates errors into a +detailed report stored in ``output``. +""" + +from __future__ import annotations + +import re +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +# ── Schema types ──────────────────────────────────────────────────── + +_VALID_TYPES = {"string", "integer", "number", "boolean", "array", "object", "null"} + + +def _python_type_name(value: Any) -> str: + """Map a Python value to its JSON-schema type name.""" + if value is None: + return "null" + if isinstance(value, bool): + return "boolean" + if isinstance(value, int): + return "integer" + if isinstance(value, float): + return "number" + if isinstance(value, str): + return "string" + if isinstance(value, list): + return "array" + if isinstance(value, dict): + return "object" + return type(value).__name__ + + +# ── Schema validator ──────────────────────────────────────────────── + +class ValidationError: + """A single validation error with path and message.""" + + __slots__ = ("path", "message", "rule") + + def __init__(self, path: str, message: str, rule: str = "") -> None: + self.path = path + self.message = message + self.rule = rule + + def to_dict(self) -> dict[str, str]: + d: dict[str, str] = {"path": self.path, "message": self.message} + if self.rule: + d["rule"] = self.rule + return d + + def __repr__(self) -> str: + return f"ValidationError(path={self.path!r}, message={self.message!r})" + + +class SchemaValidator: + """Validate a value against a JSON-schema-like definition.""" + + def __init__(self) -> None: + self.errors: list[ValidationError] = [] + + def validate(self, value: Any, schema: dict[str, Any], path: str = "$") -> list[ValidationError]: + """Validate *value* against *schema* and return all errors.""" + self.errors = [] + self._validate_node(value, schema, path) + return list(self.errors) + + def _add_error(self, path: str, message: str, rule: str = "") -> None: + self.errors.append(ValidationError(path, message, rule)) + + def _validate_node(self, value: Any, schema: dict[str, Any], path: str) -> None: + # ── required check (handled at object level) ── + # ── type check ── + expected_type = schema.get("type") + if expected_type is not None: + if expected_type not in _VALID_TYPES: + self._add_error(path, f"Unknown schema type: {expected_type!r}", "type") + return + actual = _python_type_name(value) + # Allow integer where number is expected + if expected_type == "number" and actual == "integer": + actual = "number" + if actual != expected_type: + self._add_error( + path, + f"Expected type {expected_type!r} but got {actual!r}", + "type", + ) + return # skip further checks if type is wrong + + # ── enum ── + if "enum" in schema: + if value not in schema["enum"]: + self._add_error( + path, + f"Value {value!r} is not one of {schema['enum']!r}", + "enum", + ) + + # ── const ── + if "const" in schema: + if value != schema["const"]: + self._add_error( + path, + f"Value must be {schema['const']!r}", + "const", + ) + + # ── string constraints ── + if isinstance(value, str): + if "minLength" in schema and len(value) < schema["minLength"]: + self._add_error( + path, + f"String length {len(value)} is less than minimum {schema['minLength']}", + "minLength", + ) + if "maxLength" in schema and len(value) > schema["maxLength"]: + self._add_error( + path, + f"String length {len(value)} exceeds maximum {schema['maxLength']}", + "maxLength", + ) + if "pattern" in schema: + if not re.search(schema["pattern"], value): + self._add_error( + path, + f"String does not match pattern {schema['pattern']!r}", + "pattern", + ) + + # ── numeric constraints ── + if isinstance(value, (int, float)) and not isinstance(value, bool): + if "minimum" in schema and value < schema["minimum"]: + self._add_error( + path, + f"Value {value} is less than minimum {schema['minimum']}", + "minimum", + ) + if "maximum" in schema and value > schema["maximum"]: + self._add_error( + path, + f"Value {value} exceeds maximum {schema['maximum']}", + "maximum", + ) + if "exclusiveMinimum" in schema and value <= schema["exclusiveMinimum"]: + self._add_error( + path, + f"Value {value} must be > {schema['exclusiveMinimum']}", + "exclusiveMinimum", + ) + if "exclusiveMaximum" in schema and value >= schema["exclusiveMaximum"]: + self._add_error( + path, + f"Value {value} must be < {schema['exclusiveMaximum']}", + "exclusiveMaximum", + ) + if "multipleOf" in schema and value % schema["multipleOf"] != 0: + self._add_error( + path, + f"Value {value} is not a multiple of {schema['multipleOf']}", + "multipleOf", + ) + + # ── array constraints ── + if isinstance(value, list): + if "minItems" in schema and len(value) < schema["minItems"]: + self._add_error( + path, + f"Array length {len(value)} is less than minimum {schema['minItems']}", + "minItems", + ) + if "maxItems" in schema and len(value) > schema["maxItems"]: + self._add_error( + path, + f"Array length {len(value)} exceeds maximum {schema['maxItems']}", + "maxItems", + ) + if schema.get("uniqueItems") and len(value) != len(set(repr(v) for v in value)): + self._add_error(path, "Array items are not unique", "uniqueItems") + items_schema = schema.get("items") + if items_schema: + for i, item in enumerate(value): + self._validate_node(item, items_schema, f"{path}[{i}]") + + # ── object constraints ── + if isinstance(value, dict): + required_keys = schema.get("required", []) + for rk in required_keys: + if rk not in value: + self._add_error( + f"{path}.{rk}", + f"Missing required property {rk!r}", + "required", + ) + properties = schema.get("properties", {}) + for prop_name, prop_schema in properties.items(): + if prop_name in value: + self._validate_node( + value[prop_name], prop_schema, f"{path}.{prop_name}" + ) + additional = schema.get("additionalProperties") + if additional is False: + allowed = set(properties.keys()) + for k in value: + if k not in allowed: + self._add_error( + f"{path}.{k}", + f"Additional property {k!r} is not allowed", + "additionalProperties", + ) + + +# ── Custom rule engine ────────────────────────────────────────────── + +class CustomRuleEngine: + """Evaluate custom validation rules expressed as simple predicates.""" + + @staticmethod + def evaluate_rules( + rules: list[dict[str, Any]], data: Any, context: StepContext, + ) -> list[ValidationError]: + """Evaluate a list of custom rules against *data*. + + Each rule is a dict with: + - ``expr``: a ``{{ }}``-style expression that should resolve truthy. + - ``message``: error message if the rule fails. + - ``path`` (optional): JSON path for the error. + - ``severity`` (optional): ``error`` | ``warning`` (default ``error``). + """ + errors: list[ValidationError] = [] + for rule in rules: + expr = rule.get("expr", "") + if not expr: + continue + try: + result = evaluate_expression(expr, context) + except Exception: + result = None + + if not result: + errors.append( + ValidationError( + path=rule.get("path", "$"), + message=rule.get("message", f"Custom rule failed: {expr}"), + rule="custom", + ) + ) + return errors + + +# ── Step implementation ───────────────────────────────────────────── + +class ValidateStep(StepBase): + """Workflow step that validates data against a JSON schema. + + YAML configuration:: + + - id: check-inputs + type: validate + target: "{{ inputs }}" # expression resolving to data + schema: + type: object + required: [name, version] + properties: + name: { type: string, minLength: 1 } + version: { type: string, pattern: "^\\\\d+\\\\.\\\\d+\\\\.\\\\d+$" } + custom_rules: + - expr: "{{ inputs.name != inputs.version }}" + message: "name and version must differ" + fail_on_error: true # default true + """ + + type_key = "validate" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + # ── Resolve the target data ── + target_expr = config.get("target", "{{ inputs }}") + if isinstance(target_expr, str) and "{{" in target_expr: + try: + target_data = evaluate_expression(target_expr, context) + except Exception as exc: + return StepResult( + status=StepStatus.FAILED, + error=f"Failed to resolve target expression: {exc}", + output={"valid": False, "errors": [], "error_count": 0}, + ) + else: + target_data = target_expr + + all_errors: list[ValidationError] = [] + + # ── JSON schema validation ── + schema = config.get("schema") + if schema: + validator = SchemaValidator() + all_errors.extend(validator.validate(target_data, schema)) + + # ── Custom rules ── + custom_rules = config.get("custom_rules", []) + if custom_rules: + all_errors.extend( + CustomRuleEngine.evaluate_rules(custom_rules, target_data, context) + ) + + # ── Build result ── + error_dicts = [e.to_dict() for e in all_errors] + is_valid = len(all_errors) == 0 + fail_on_error = config.get("fail_on_error", True) + + output = { + "valid": is_valid, + "errors": error_dicts, + "error_count": len(error_dicts), + } + + if not is_valid and fail_on_error: + summary = "; ".join(f"{e.path}: {e.message}" for e in all_errors[:5]) + if len(all_errors) > 5: + summary += f" ... and {len(all_errors) - 5} more" + return StepResult( + status=StepStatus.FAILED, + error=f"Validation failed ({len(all_errors)} errors): {summary}", + output=output, + ) + + return StepResult(status=StepStatus.COMPLETED, output=output) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + schema = config.get("schema") + if schema is not None and not isinstance(schema, dict): + errors.append( + f"Validate step {config.get('id', '?')!r}: 'schema' must be a dict." + ) + custom_rules = config.get("custom_rules") + if custom_rules is not None and not isinstance(custom_rules, list): + errors.append( + f"Validate step {config.get('id', '?')!r}: 'custom_rules' must be a list." + ) + return errors diff --git a/tests/test_validate_step.py b/tests/test_validate_step.py new file mode 100644 index 0000000000..555bd722f9 --- /dev/null +++ b/tests/test_validate_step.py @@ -0,0 +1,295 @@ +"""Tests for the validate workflow step.""" + +from __future__ import annotations + +import pytest + +from specify_cli.workflows.base import StepContext, StepStatus +from specify_cli.workflows.steps.validate import ( + CustomRuleEngine, + SchemaValidator, + ValidateStep, + ValidationError, +) + + +# ── SchemaValidator tests ─────────────────────────────────────────── + + +class TestSchemaValidator: + def setup_method(self): + self.v = SchemaValidator() + + # -- type checks -- + + def test_type_string_valid(self): + errors = self.v.validate("hello", {"type": "string"}) + assert errors == [] + + def test_type_string_invalid(self): + errors = self.v.validate(42, {"type": "string"}) + assert len(errors) == 1 + assert errors[0].rule == "type" + + def test_type_integer_valid(self): + assert self.v.validate(10, {"type": "integer"}) == [] + + def test_type_number_accepts_int(self): + assert self.v.validate(10, {"type": "number"}) == [] + + def test_type_boolean(self): + assert self.v.validate(True, {"type": "boolean"}) == [] + errors = self.v.validate("yes", {"type": "boolean"}) + assert len(errors) == 1 + + def test_type_null(self): + assert self.v.validate(None, {"type": "null"}) == [] + + def test_type_array(self): + assert self.v.validate([1, 2], {"type": "array"}) == [] + + def test_type_object(self): + assert self.v.validate({"a": 1}, {"type": "object"}) == [] + + def test_unknown_type(self): + errors = self.v.validate("x", {"type": "foobar"}) + assert len(errors) == 1 + assert "Unknown schema type" in errors[0].message + + # -- enum / const -- + + def test_enum_valid(self): + assert self.v.validate("a", {"enum": ["a", "b", "c"]}) == [] + + def test_enum_invalid(self): + errors = self.v.validate("z", {"enum": ["a", "b"]}) + assert len(errors) == 1 + assert errors[0].rule == "enum" + + def test_const_valid(self): + assert self.v.validate(42, {"const": 42}) == [] + + def test_const_invalid(self): + errors = self.v.validate(43, {"const": 42}) + assert len(errors) == 1 + + # -- string constraints -- + + def test_minLength(self): + assert self.v.validate("ab", {"type": "string", "minLength": 2}) == [] + errors = self.v.validate("a", {"type": "string", "minLength": 2}) + assert len(errors) == 1 + + def test_maxLength(self): + errors = self.v.validate("abcdef", {"type": "string", "maxLength": 3}) + assert len(errors) == 1 + + def test_pattern_match(self): + assert self.v.validate("v1.2.3", {"type": "string", "pattern": r"^\d+\.\d+"}) == [] + + def test_pattern_no_match(self): + errors = self.v.validate("abc", {"type": "string", "pattern": r"^\d+$"}) + assert len(errors) == 1 + + # -- numeric constraints -- + + def test_minimum(self): + assert self.v.validate(5, {"type": "integer", "minimum": 5}) == [] + errors = self.v.validate(4, {"type": "integer", "minimum": 5}) + assert len(errors) == 1 + + def test_maximum(self): + errors = self.v.validate(11, {"type": "integer", "maximum": 10}) + assert len(errors) == 1 + + def test_exclusive_minimum(self): + errors = self.v.validate(5, {"type": "integer", "exclusiveMinimum": 5}) + assert len(errors) == 1 + assert self.v.validate(6, {"type": "integer", "exclusiveMinimum": 5}) == [] + + def test_exclusive_maximum(self): + errors = self.v.validate(10, {"type": "integer", "exclusiveMaximum": 10}) + assert len(errors) == 1 + + def test_multiple_of(self): + assert self.v.validate(9, {"type": "integer", "multipleOf": 3}) == [] + errors = self.v.validate(10, {"type": "integer", "multipleOf": 3}) + assert len(errors) == 1 + + # -- array constraints -- + + def test_min_items(self): + errors = self.v.validate([], {"type": "array", "minItems": 1}) + assert len(errors) == 1 + + def test_max_items(self): + errors = self.v.validate([1, 2, 3], {"type": "array", "maxItems": 2}) + assert len(errors) == 1 + + def test_unique_items(self): + errors = self.v.validate([1, 2, 1], {"type": "array", "uniqueItems": True}) + assert len(errors) == 1 + + def test_items_schema(self): + schema = {"type": "array", "items": {"type": "integer"}} + assert self.v.validate([1, 2, 3], schema) == [] + errors = self.v.validate([1, "x", 3], schema) + assert len(errors) == 1 + assert "[1]" in errors[0].path + + # -- object constraints -- + + def test_required_properties(self): + schema = {"type": "object", "required": ["name"]} + assert self.v.validate({"name": "foo"}, schema) == [] + errors = self.v.validate({}, schema) + assert len(errors) == 1 + assert errors[0].rule == "required" + + def test_nested_properties(self): + schema = { + "type": "object", + "properties": { + "age": {"type": "integer", "minimum": 0}, + }, + } + assert self.v.validate({"age": 25}, schema) == [] + errors = self.v.validate({"age": -1}, schema) + assert len(errors) == 1 + assert "$.age" in errors[0].path + + def test_additional_properties_false(self): + schema = { + "type": "object", + "properties": {"a": {"type": "string"}}, + "additionalProperties": False, + } + errors = self.v.validate({"a": "ok", "b": "nope"}, schema) + assert len(errors) == 1 + assert errors[0].rule == "additionalProperties" + + # -- complex nested -- + + def test_deeply_nested(self): + schema = { + "type": "object", + "properties": { + "users": { + "type": "array", + "items": { + "type": "object", + "required": ["id"], + "properties": { + "id": {"type": "integer"}, + }, + }, + } + }, + } + data = {"users": [{"id": 1}, {"id": "bad"}]} + errors = self.v.validate(data, schema) + assert len(errors) == 1 + assert "$.users[1].id" in errors[0].path + + +# ── ValidationError tests ────────────────────────────────────────── + + +class TestValidationError: + def test_to_dict(self): + e = ValidationError("$.a", "oops", "required") + d = e.to_dict() + assert d == {"path": "$.a", "message": "oops", "rule": "required"} + + def test_to_dict_no_rule(self): + e = ValidationError("$", "bad") + d = e.to_dict() + assert "rule" not in d + + def test_repr(self): + e = ValidationError("$.x", "err") + assert "$.x" in repr(e) + + +# ── ValidateStep tests ───────────────────────────────────────────── + + +class TestValidateStep: + def setup_method(self): + self.step = ValidateStep() + + def test_type_key(self): + assert self.step.type_key == "validate" + + def test_valid_data_passes(self): + ctx = StepContext(inputs={"name": "foo", "version": "1.0.0"}) + config = { + "id": "check", + "target": "{{ inputs }}", + "schema": { + "type": "object", + "required": ["name", "version"], + "properties": { + "name": {"type": "string", "minLength": 1}, + "version": {"type": "string"}, + }, + }, + } + result = self.step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["valid"] is True + assert result.output["error_count"] == 0 + + def test_invalid_data_fails(self): + ctx = StepContext(inputs={"name": ""}) + config = { + "id": "check", + "target": "{{ inputs }}", + "schema": { + "type": "object", + "required": ["name", "version"], + "properties": { + "name": {"type": "string", "minLength": 1}, + }, + }, + } + result = self.step.execute(config, ctx) + assert result.status == StepStatus.FAILED + assert result.output["valid"] is False + assert result.output["error_count"] >= 1 + + def test_fail_on_error_false_returns_completed(self): + ctx = StepContext(inputs={"x": 1}) + config = { + "id": "check", + "target": "{{ inputs }}", + "schema": {"type": "object", "required": ["missing"]}, + "fail_on_error": False, + } + result = self.step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["valid"] is False + + def test_validate_config_bad_schema(self): + errors = self.step.validate({"id": "x", "schema": "not-a-dict"}) + assert any("schema" in e for e in errors) + + def test_validate_config_bad_custom_rules(self): + errors = self.step.validate({"id": "x", "custom_rules": "not-a-list"}) + assert any("custom_rules" in e for e in errors) + + def test_validate_config_missing_id(self): + errors = self.step.validate({}) + assert any("id" in e for e in errors) + + def test_literal_target(self): + """When target is not an expression, use it directly.""" + ctx = StepContext() + config = { + "id": "check", + "target": {"a": 1}, + "schema": {"type": "object", "required": ["a"]}, + } + result = self.step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["valid"] is True