Skip to content

Commit

Permalink
fix(work pools): fix work pool base job template "inconsistent values" (
Browse files Browse the repository at this point in the history
#249)

Addresses the "inconsistent values" error the provider would sometimes return when working with work pools that specify a base job template (context: #244 (comment))

- Inconsistent result after apply bug with prefect_work_pool #244 (comment))
- Adds base_job_template to the Work Pool tests
- Conforms to the newer, simpler method of unmarshaling the base job template that we use in our Block data logic

Closes #244

Co-authored-by: Edward Park <[email protected]>

---------

Co-authored-by: Edward Park <[email protected]>
  • Loading branch information
mitchnielsen and parkedwards authored Aug 23, 2024
1 parent 7d75c2f commit d6d8355
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 74 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.22.1

require (
github.com/avast/retry-go/v4 v4.6.0
github.com/go-test/deep v1.1.1
github.com/google/uuid v1.6.0
github.com/hashicorp/terraform-plugin-docs v0.19.4
github.com/hashicorp/terraform-plugin-framework v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ github.com/go-git/go-billy/v5 v5.5.0 h1:yEY4yhzCDuMGSv83oGxiBotRzhwhNr8VZyphhiu+
github.com/go-git/go-billy/v5 v5.5.0/go.mod h1:hmexnoNsr2SJU1Ju67OaNz5ASJY3+sHgFRpCtpDCKow=
github.com/go-git/go-git/v5 v5.12.0 h1:7Md+ndsjrzZxbddRDZjF14qK+NN56sy6wkqaVrjZtys=
github.com/go-git/go-git/v5 v5.12.0/go.mod h1:FTM9VKtnI2m65hNI/TenDDDnUf2Q9FHnXYjuz9i5OEY=
github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68=
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U=
github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
14 changes: 14 additions & 0 deletions internal/provider/helpers/compare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package helpers

import "github.com/go-test/deep"

// ObjectsEqual checks to see if two objects are equivalent, accounting for
// differences in the order of the contents.
func ObjectsEqual(obj1, obj2 interface{}) (bool, []string) {
differences := deep.Equal(obj1, obj2)
if len(differences) != 0 {
return false, differences
}

return true, nil
}
88 changes: 30 additions & 58 deletions internal/provider/resources/work_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package resources

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
Expand Down Expand Up @@ -177,9 +175,7 @@ func (r *WorkPoolResource) Schema(_ context.Context, _ resource.SchemaRequest, r

// copyWorkPoolToModel maps an API response to a model that is saved in Terraform state.
// A model can be a Terraform Plan, State, or Config object.
func copyWorkPoolToModel(_ context.Context, pool *api.WorkPool, tfModel *WorkPoolResourceModel) diag.Diagnostics {
var diags diag.Diagnostics

func copyWorkPoolToModel(pool *api.WorkPool, tfModel *WorkPoolResourceModel) {
tfModel.ID = types.StringValue(pool.ID.String())
tfModel.Created = customtypes.NewTimestampPointerValue(pool.Created)
tfModel.Updated = customtypes.NewTimestampPointerValue(pool.Updated)
Expand All @@ -190,26 +186,6 @@ func copyWorkPoolToModel(_ context.Context, pool *api.WorkPool, tfModel *WorkPoo
tfModel.Name = types.StringValue(pool.Name)
tfModel.Paused = types.BoolValue(pool.IsPaused)
tfModel.Type = types.StringValue(pool.Type)

if pool.BaseJobTemplate != nil {
var builder strings.Builder
encoder := json.NewEncoder(&builder)
encoder.SetIndent("", " ")
err := encoder.Encode(pool.BaseJobTemplate)
if err != nil {
diags.AddAttributeError(
path.Root("base_job_template"),
"Failed to serialize Base Job Template",
fmt.Sprintf("Failed to serialize Base Job Template as JSON string: %s", err),
)

return diags
}

tfModel.BaseJobTemplate = jsontypes.NewNormalizedValue(strings.TrimSuffix(builder.String(), "\n"))
}

return nil
}

// Create creates the resource and sets the initial Terraform state.
Expand All @@ -223,19 +199,9 @@ func (r *WorkPoolResource) Create(ctx context.Context, req resource.CreateReques
}

baseJobTemplate := map[string]interface{}{}
if !plan.BaseJobTemplate.IsNull() {
reader := strings.NewReader(plan.BaseJobTemplate.ValueString())
decoder := json.NewDecoder(reader)
err := decoder.Decode(&baseJobTemplate)
if err != nil {
resp.Diagnostics.AddAttributeError(
path.Root("base_job_template"),
"Failed to deserialize Base Job Template",
fmt.Sprintf("Failed to deserialize Base Job Template as JSON object: %s", err),
)

return
}
resp.Diagnostics.Append(plan.BaseJobTemplate.Unmarshal(&baseJobTemplate)...)
if resp.Diagnostics.HasError() {
return
}

client, err := r.client.WorkPools(plan.AccountID.ValueUUID(), plan.WorkspaceID.ValueUUID())
Expand All @@ -259,7 +225,7 @@ func (r *WorkPoolResource) Create(ctx context.Context, req resource.CreateReques
return
}

resp.Diagnostics.Append(copyWorkPoolToModel(ctx, pool, &plan)...)
copyWorkPoolToModel(pool, &plan)
if resp.Diagnostics.HasError() {
return
}
Expand Down Expand Up @@ -294,10 +260,7 @@ func (r *WorkPoolResource) Read(ctx context.Context, req resource.ReadRequest, r
return
}

resp.Diagnostics.Append(copyWorkPoolToModel(ctx, pool, &state)...)
if resp.Diagnostics.HasError() {
return
}
copyWorkPoolToModel(pool, &state)

resp.Diagnostics.Append(resp.State.Set(ctx, &state)...)
if resp.Diagnostics.HasError() {
Expand All @@ -315,19 +278,9 @@ func (r *WorkPoolResource) Update(ctx context.Context, req resource.UpdateReques
}

baseJobTemplate := map[string]interface{}{}
if !plan.BaseJobTemplate.IsNull() {
reader := strings.NewReader(plan.BaseJobTemplate.ValueString())
decoder := json.NewDecoder(reader)
err := decoder.Decode(&baseJobTemplate)
if err != nil {
resp.Diagnostics.AddAttributeError(
path.Root("base_job_template"),
"Failed to deserialize Base Job Template",
fmt.Sprintf("Failed to deserialize Base Job Template as JSON object: %s", err),
)

return
}
resp.Diagnostics.Append(plan.BaseJobTemplate.Unmarshal(&baseJobTemplate)...)
if resp.Diagnostics.HasError() {
return
}

client, err := r.client.WorkPools(plan.AccountID.ValueUUID(), plan.WorkspaceID.ValueUUID())
Expand Down Expand Up @@ -356,11 +309,30 @@ func (r *WorkPoolResource) Update(ctx context.Context, req resource.UpdateReques
return
}

resp.Diagnostics.Append(copyWorkPoolToModel(ctx, pool, &plan)...)
if resp.Diagnostics.HasError() {
// If the base job template from the retrieved work pool is equal
// to the base job template from the plan (taking into account that the
// fields could be in a different order), then we will use the plan's definition
// of the base job template in the state. This avoids "inconsistent value"
// errors.
//
// If the two are not equal, then something has gone wrong so we should
// exit and alert the user of the differences.
equal, diffs := helpers.ObjectsEqual(baseJobTemplate, pool.BaseJobTemplate)
if !equal {
resp.Diagnostics.AddAttributeError(
path.Root("base_job_template"),
"Unexpected difference in base_job_templates",
fmt.Sprintf(
"Expected the provided base_job_template to be equal to the one retrieved from the API, differences: %s",
strings.Join(diffs, "\n"),
),
)

return
}

copyWorkPoolToModel(pool, &plan)

resp.Diagnostics.Append(resp.State.Set(ctx, &plan)...)
if resp.Diagnostics.HasError() {
return
Expand Down
114 changes: 100 additions & 14 deletions internal/provider/resources/work_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,31 @@ package resources_test

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/google/uuid"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/terraform"
"github.com/prefecthq/terraform-provider-prefect/internal/api"
"github.com/prefecthq/terraform-provider-prefect/internal/provider/helpers"
"github.com/prefecthq/terraform-provider-prefect/internal/testutils"
)

func fixtureAccWorkPoolCreate(workspace, workspaceName, name, poolType string, paused bool) string {
func fixtureAccWorkPoolCreate(workspace, workspaceName, name, poolType, baseJobTemplate string, paused bool) string {
return fmt.Sprintf(`
%s
resource "prefect_work_pool" "%s" {
name = "%s"
type = "%s"
paused = %t
base_job_template = jsonencode(%s)
workspace_id = prefect_workspace.%s.id
depends_on = [prefect_workspace.%s]
}
`, workspace, name, name, poolType, paused, workspaceName, workspaceName)
`, workspace, name, name, poolType, paused, baseJobTemplate, workspaceName, workspaceName)
}

//nolint:paralleltest // we use the resource.ParallelTest helper instead
Expand All @@ -39,6 +43,14 @@ func TestAccResource_work_pool(t *testing.T) {
poolType := "kubernetes"
poolType2 := "ecs"

baseJobTemplate := fmt.Sprintf(baseJobTemplateTpl, "The name given to infrastructure created by a worker.")
var baseJobTemplateMap map[string]interface{}
_ = json.Unmarshal([]byte(baseJobTemplate), &baseJobTemplateMap)

baseJobTemplate2 := fmt.Sprintf(baseJobTemplateTpl, "The name given to infrastructure created by a worker!")
var baseJobTemplateMap2 map[string]interface{}
_ = json.Unmarshal([]byte(baseJobTemplate2), &baseJobTemplateMap2)

// We use this variable to store the fetched resource from the API
// and it will be shared between TestSteps via a pointer.
var workPool api.WorkPool
Expand All @@ -49,51 +61,64 @@ func TestAccResource_work_pool(t *testing.T) {
Steps: []resource.TestStep{
{
// Check creation + existence of the work pool resource
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName, poolType, true),
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName, poolType, baseJobTemplate, true),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckWorkPoolExists(workPoolResourceName, workspaceResourceName, &workPool),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName, Type: poolType, IsPaused: true}),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName, Type: poolType, BaseJobTemplate: baseJobTemplateMap, IsPaused: true}),
resource.TestCheckResourceAttr(workPoolResourceName, "name", randomName),
resource.TestCheckResourceAttr(workPoolResourceName, "type", poolType),
resource.TestCheckResourceAttr(workPoolResourceName, "paused", "true"),
),
},
{
// Check that changing the paused state will update the resource in place
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName, poolType, false),
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName, poolType, baseJobTemplate, false),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckIDAreEqual(workPoolResourceName, &workPool),
testAccCheckWorkPoolExists(workPoolResourceName, workspaceResourceName, &workPool),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName, Type: poolType, IsPaused: false}),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName, Type: poolType, BaseJobTemplate: baseJobTemplateMap, IsPaused: false}),
resource.TestCheckResourceAttr(workPoolResourceName, "name", randomName),
resource.TestCheckResourceAttr(workPoolResourceName, "type", poolType),
resource.TestCheckResourceAttr(workPoolResourceName, "paused", "false"),
),
},
{
// Check that changing the baseJobTemplate will update the resource in place
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName, poolType, baseJobTemplate2, false),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckIDAreEqual(workPoolResourceName, &workPool),
testAccCheckWorkPoolExists(workPoolResourceName, workspaceResourceName, &workPool),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName, Type: poolType, BaseJobTemplate: baseJobTemplateMap2, IsPaused: false}),
resource.TestCheckResourceAttr(workPoolResourceName, "name", randomName),
resource.TestCheckResourceAttr(workPoolResourceName, "type", poolType),
resource.TestCheckResourceAttr(workPoolResourceName, "paused", "false"),
),
},
{
// Check that changing the name will re-create the resource
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName2, poolType, false),
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName2, poolType, baseJobTemplate2, false),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckIDsNotEqual(workPoolResourceName2, &workPool),
testAccCheckWorkPoolExists(workPoolResourceName2, workspaceResourceName, &workPool),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName2, Type: poolType, IsPaused: false}),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName2, Type: poolType, BaseJobTemplate: baseJobTemplateMap2, IsPaused: false}),
),
},
{
// Check that changing the poolType will re-create the resource
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName2, poolType2, false),
Config: fixtureAccWorkPoolCreate(workspace, workspaceName, randomName2, poolType2, baseJobTemplate2, false),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckIDsNotEqual(workPoolResourceName2, &workPool),
testAccCheckWorkPoolExists(workPoolResourceName2, workspaceResourceName, &workPool),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName2, Type: poolType2, IsPaused: false}),
testAccCheckWorkPoolValues(&workPool, &api.WorkPool{Name: randomName2, Type: poolType2, BaseJobTemplate: baseJobTemplateMap2, IsPaused: false}),
),
},
// Import State checks - import by workspace_id,name (dynamic)
{
ImportState: true,
ResourceName: workPoolResourceName2,
ImportStateIdFunc: getWorkPoolImportStateID(workPoolResourceName2, workspaceResourceName),
ImportStateVerify: true,
ImportState: true,
ResourceName: workPoolResourceName2,
ImportStateIdFunc: getWorkPoolImportStateID(workPoolResourceName2, workspaceResourceName),
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"base_job_template"}, // we've already tested this, and we can't provide our unique equality check here
},
},
})
Expand Down Expand Up @@ -146,6 +171,11 @@ func testAccCheckWorkPoolValues(fetchedWorkPool *api.WorkPool, valuesToCheck *ap
return fmt.Errorf("Expected work pool paused to be %t, got %t", valuesToCheck.IsPaused, fetchedWorkPool.IsPaused)
}

equal, diffs := helpers.ObjectsEqual(valuesToCheck.BaseJobTemplate, fetchedWorkPool.BaseJobTemplate)
if !equal {
return fmt.Errorf("Found unexpected differences in work pool base_job_template: %s", strings.Join(diffs, "\n"))
}

return nil
}
}
Expand Down Expand Up @@ -201,3 +231,59 @@ func getWorkPoolImportStateID(workPoolResourceName string, workspaceResourceName
return fmt.Sprintf("%s,%s", workspaceID, workPoolName), nil
}
}

var baseJobTemplateTpl = `
{
"job_configuration": {
"command": "{{ command }}",
"env": "{{ env }}",
"labels": "{{ labels }}",
"name": "{{ name }}",
"stream_output": "{{ stream_output }}",
"working_dir": "{{ working_dir }}"
},
"variables": {
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "%s",
"type": "string"
},
"env": {
"title": "Environment Variables",
"description": "Environment variables to set when starting a flow run.",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"labels": {
"title": "Labels",
"description": "Labels applied to infrastructure created by a worker.",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"command": {
"title": "Command",
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.",
"type": "string"
},
"stream_output": {
"title": "Stream Output",
"description": "If enabled, workers will stream output from flow run processes to local standard output.",
"default": true,
"type": "boolean"
},
"working_dir": {
"title": "Working Directory",
"description": "If provided, workers will open flow run processes within the specified path as the working directory. Otherwise, a temporary directory will be created.",
"type": "string",
"format": "path"
}
}
}
}
`

0 comments on commit d6d8355

Please sign in to comment.