Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions bundle/config/validate/job_cluster_key_defined.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,36 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, b *bundle.Bundle) diag
}

for index, task := range job.Tasks {
if task.JobClusterKey != "" {
if _, ok := jobClusterKeys[task.JobClusterKey]; !ok {
path := fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index)

diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("job_cluster_key %s is not defined", task.JobClusterKey),
// Show only the location where the job_cluster_key is defined.
// Other associated locations are not relevant since they are
// overridden during merging.
Locations: b.Config.GetLocations(path),
Paths: []dyn.Path{dyn.MustPathFromString(path)},
})
}
diags = diags.Extend(checkJobClusterKey(b, jobClusterKeys, task.JobClusterKey,
fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index)))

// The Jobs API rejects nested for_each_task, so one level is sufficient.
if task.ForEachTask != nil {
diags = diags.Extend(checkJobClusterKey(b, jobClusterKeys, task.ForEachTask.Task.JobClusterKey,
fmt.Sprintf("resources.jobs.%s.tasks[%d].for_each_task.task.job_cluster_key", k, index)))
}
}
}

return diags
}

// checkJobClusterKey warns if jobClusterKey is set but not defined in the job's job_clusters.
func checkJobClusterKey(b *bundle.Bundle, jobClusterKeys map[string]bool, jobClusterKey, path string) diag.Diagnostics {
if jobClusterKey == "" {
return nil
}
if _, ok := jobClusterKeys[jobClusterKey]; ok {
return nil
}

return diag.Diagnostics{{
Severity: diag.Warning,
Summary: fmt.Sprintf("job_cluster_key %s is not defined", jobClusterKey),
// Show only the location where the job_cluster_key is defined.
// Other associated locations are not relevant since they are
// overridden during merging.
Locations: b.Config.GetLocations(path),
Paths: []dyn.Path{dyn.MustPathFromString(path)},
}}
}
61 changes: 61 additions & 0 deletions bundle/config/validate/job_cluster_key_defined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,67 @@ func TestJobClusterKeyNotDefined(t *testing.T) {
require.Equal(t, "job_cluster_key do-not-exist is not defined", diags[0].Summary)
}

func TestJobClusterKeyDefinedInForEachTask(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: jobs.JobSettings{
Name: "job1",
JobClusters: []jobs.JobCluster{
{JobClusterKey: "do-not-exist"},
},
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{JobClusterKey: "do-not-exist"},
},
},
},
},
},
},
},
},
}

diags := JobClusterKeyDefined().Apply(t.Context(), b)
require.Empty(t, diags)
require.NoError(t, diags.Error())
}

func TestJobClusterKeyNotDefinedInForEachTask(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{JobClusterKey: "do-not-exist"},
},
},
},
},
},
},
},
},
}

diags := JobClusterKeyDefined().Apply(t.Context(), b)
require.Len(t, diags, 1)
require.NoError(t, diags.Error())
require.Equal(t, diag.Warning, diags[0].Severity)
require.Equal(t, "job_cluster_key do-not-exist is not defined", diags[0].Summary)
require.Len(t, diags[0].Paths, 1)
require.Equal(t, "resources.jobs.job1.tasks[0].for_each_task.task.job_cluster_key", diags[0].Paths[0].String())
}

func TestJobClusterKeyDefinedInDifferentJob(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Expand Down
Loading