Skip to content

Commit

Permalink
feat(global concurrency limit): Resource Support (#363)
Browse files Browse the repository at this point in the history
* Rename and refactor the refs

* Fix provider name

* Docs

* Add example task run concurrency limit

* Correct the example naming

* Give ref to python example

* Include example in doc

* Global concurrency limit

* Current state

* Current state

* Update and import state fixes

* Add images

* Add example

* Add back images again

* Fix example

* Update internal/provider/resources/global_concurrency_limit.go

Co-authored-by: Mitchell Nielsen <[email protected]>

* Update internal/provider/resources/global_concurrency_limit.go

Co-authored-by: Mitchell Nielsen <[email protected]>

* Add back denied slots

* Re-remove denied_slots

* Images

* Generate Terraform Docs

---------

Co-authored-by: Mitchell Nielsen <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 31, 2025
1 parent 70d957a commit d1e724b
Show file tree
Hide file tree
Showing 9 changed files with 665 additions and 22 deletions.
52 changes: 52 additions & 0 deletions docs/resources/global_concurrency_limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "prefect_global_concurrency_limit Resource - prefect"
subcategory: ""
description: |-
The resource global_concurrency_limit represents a global concurrency limit. Global concurrency limits allow you to control how many tasks can run simultaneously across all workspaces. For more information, see https://docs.prefect.io/v3/develop/global-concurrency-limits.
---

# prefect_global_concurrency_limit (Resource)

The resource `global_concurrency_limit` represents a global concurrency limit. Global concurrency limits allow you to control how many tasks can run simultaneously across all workspaces. For more information, see https://docs.prefect.io/v3/develop/global-concurrency-limits.

## Example Usage

```terraform
provider "prefect" {}
data "prefect_workspace" "test" {
handle = "my-workspace"
}
resource "prefect_global_concurrency_limit" "test" {
workspace_id = data.prefect_workspace.test.id
name = "test-global-concurrency-limit"
limit = 1
active = true
active_slots = 0
slot_decay_per_second = 1.5
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `limit` (Number) The maximum number of tasks that can run simultaneously.
- `name` (String) The name of the global concurrency limit.

### Optional

- `account_id` (String) Account ID (UUID)
- `active` (Boolean) Whether the global concurrency limit is active.
- `active_slots` (Number) The number of active slots.
- `slot_decay_per_second` (Number) Slot Decay Per Second (number or null)
- `workspace_id` (String) Workspace ID (UUID)

### Read-Only

- `created` (String) Timestamp of when the resource was created (RFC3339)
- `id` (String) Global concurrency limit ID (UUID)
- `updated` (String) Timestamp of when the resource was updated (RFC3339)
14 changes: 14 additions & 0 deletions examples/resources/prefect_global_concurrency_limit/resource.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
provider "prefect" {}

data "prefect_workspace" "test" {
handle = "my-workspace"
}

resource "prefect_global_concurrency_limit" "test" {
workspace_id = data.prefect_workspace.test.id
name = "test-global-concurrency-limit"
limit = 1
active = true
active_slots = 0
slot_decay_per_second = 1.5
}
1 change: 1 addition & 0 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type PrefectClient interface {
Deployments(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentsClient, error)
DeploymentAccess(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentAccessClient, error)
DeploymentSchedule(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentScheduleClient, error)
GlobalConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (GlobalConcurrencyLimitsClient, error)
Teams(accountID uuid.UUID) (TeamsClient, error)
Flows(accountID uuid.UUID, workspaceID uuid.UUID) (FlowsClient, error)
TaskRunConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (TaskRunConcurrencyLimitsClient, error)
Expand Down
44 changes: 22 additions & 22 deletions internal/api/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ type Deployment struct {
AccountID uuid.UUID `json:"account_id"`
WorkspaceID uuid.UUID `json:"workspace_id"`

ConcurrencyLimit *int64 `json:"concurrency_limit"`
ConcurrencyOptions *ConcurrencyOptions `json:"concurrency_options"`
Description string `json:"description"`
EnforceParameterSchema bool `json:"enforce_parameter_schema"`
Entrypoint string `json:"entrypoint"`
FlowID uuid.UUID `json:"flow_id"`
GlobalConcurrencyLimit *GlobalConcurrencyLimit `json:"global_concurrency_limit"`
JobVariables map[string]interface{} `json:"job_variables"`
ManifestPath string `json:"manifest_path"`
Name string `json:"name"`
ParameterOpenAPISchema map[string]interface{} `json:"parameter_openapi_schema"`
Parameters map[string]interface{} `json:"parameters"`
Path string `json:"path"`
Paused bool `json:"paused"`
PullSteps []PullStep `json:"pull_steps"`
StorageDocumentID uuid.UUID `json:"storage_document_id"`
Tags []string `json:"tags"`
Version string `json:"version"`
WorkPoolName string `json:"work_pool_name"`
WorkQueueName string `json:"work_queue_name"`
ConcurrencyLimit *int64 `json:"concurrency_limit"`
ConcurrencyOptions *ConcurrencyOptions `json:"concurrency_options"`
Description string `json:"description"`
EnforceParameterSchema bool `json:"enforce_parameter_schema"`
Entrypoint string `json:"entrypoint"`
FlowID uuid.UUID `json:"flow_id"`
GlobalConcurrencyLimit *CurrentGlobalConcurrencyLimit `json:"global_concurrency_limit"`
JobVariables map[string]interface{} `json:"job_variables"`
ManifestPath string `json:"manifest_path"`
Name string `json:"name"`
ParameterOpenAPISchema map[string]interface{} `json:"parameter_openapi_schema"`
Parameters map[string]interface{} `json:"parameters"`
Path string `json:"path"`
Paused bool `json:"paused"`
PullSteps []PullStep `json:"pull_steps"`
StorageDocumentID uuid.UUID `json:"storage_document_id"`
Tags []string `json:"tags"`
Version string `json:"version"`
WorkPoolName string `json:"work_pool_name"`
WorkQueueName string `json:"work_queue_name"`
}

// DeploymentCreate is a subset of Deployment used when creating deployments.
Expand Down Expand Up @@ -91,8 +91,8 @@ type ConcurrencyOptions struct {
CollisionStrategy string `json:"collision_strategy"`
}

// GlobalConcurrencyLimit is a representation of the deployment global concurrency limit.
type GlobalConcurrencyLimit struct {
// CurrentGlobalConcurrencyLimit is a representation of the deployment global concurrency limit.
type CurrentGlobalConcurrencyLimit struct {
Limit int64 `json:"limit"`

// These other fields exist in the response payload, but we don't make use of them at the
Expand Down
48 changes: 48 additions & 0 deletions internal/api/global_concurrency_limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package api

import (
"context"

"github.com/google/uuid"
)

// GlobalConcurrencyLimitsClient is a client for working with global concurrency limits.
type GlobalConcurrencyLimitsClient interface {
Create(ctx context.Context, globalConcurrencyLimit GlobalConcurrencyLimitCreate) (*GlobalConcurrencyLimit, error)
Read(ctx context.Context, globalConcurrencyLimitID string) (*GlobalConcurrencyLimit, error)
Update(ctx context.Context, globalConcurrencyLimitID string, globalConcurrencyLimit GlobalConcurrencyLimitUpdate) error
Delete(ctx context.Context, globalConcurrencyLimitID string) error
}

// GlobalConcurrencyLimit is a representation of a global concurrency limit.
type GlobalConcurrencyLimit struct {
BaseModel
Active bool `json:"active"`
Name string `json:"name"`
Limit int64 `json:"limit"`
ActiveSlots int64 `json:"active_slots"`
SlotDecayPerSecond float64 `json:"slot_decay_per_second"`
}

// GlobalConcurrencyLimitCreate is a subset of GlobalConcurrencyLimit used when creating global concurrency limits.
type GlobalConcurrencyLimitCreate struct {
Active bool `json:"active"`
Name string `json:"name"`
Limit int64 `json:"limit"`
ActiveSlots int64 `json:"active_slots"`
SlotDecayPerSecond float64 `json:"slot_decay_per_second"`
}

// GlobalConcurrencyLimitUpdate is a subset of GlobalConcurrencyLimit used when updating global concurrency limits.
type GlobalConcurrencyLimitUpdate struct {
Active bool `json:"active"`
Name string `json:"name"`
Limit int64 `json:"limit"`
ActiveSlots int64 `json:"active_slots"`
SlotDecayPerSecond float64 `json:"slot_decay_per_second"`
}

// GlobalConcurrencyLimitFilter is a filter for global concurrency limits.
type GlobalConcurrencyLimitFilter struct {
Any []uuid.UUID `json:"any_"`
}
114 changes: 114 additions & 0 deletions internal/client/global_concurrency_limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package client

import (
"context"
"fmt"
"net/http"

"github.com/google/uuid"
"github.com/prefecthq/terraform-provider-prefect/internal/api"
)

var _ = api.GlobalConcurrencyLimitsClient(&GlobalConcurrencyLimitsClient{})

// GlobalConcurrencyLimitsClient is a client for working with global concurrency limits.
type GlobalConcurrencyLimitsClient struct {
hc *http.Client
routePrefix string
apiKey string
}

// GlobalConcurrencyLimits returns a GlobalConcurrencyLimitsClient.
//
//nolint:ireturn // required to support PrefectClient mocking
func (c *Client) GlobalConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (api.GlobalConcurrencyLimitsClient, error) {
if accountID == uuid.Nil {
accountID = c.defaultAccountID
}

if workspaceID == uuid.Nil {
workspaceID = c.defaultWorkspaceID
}

if err := validateCloudEndpoint(c.endpoint, accountID, workspaceID); err != nil {
return nil, err
}

return &GlobalConcurrencyLimitsClient{
hc: c.hc,
routePrefix: getWorkspaceScopedURL(c.endpoint, accountID, workspaceID, "v2/concurrency_limits"),
apiKey: c.apiKey,
}, nil
}

// Create creates a new global concurrency limit.
func (c *GlobalConcurrencyLimitsClient) Create(ctx context.Context, data api.GlobalConcurrencyLimitCreate) (*api.GlobalConcurrencyLimit, error) {
cfg := requestConfig{
method: http.MethodPost,
url: c.routePrefix + "/",
body: &data,
apiKey: c.apiKey,
successCodes: successCodesStatusCreated,
}

var globalConcurrencyLimit api.GlobalConcurrencyLimit
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &globalConcurrencyLimit); err != nil {
return nil, fmt.Errorf("failed to create global concurrency limit: %w", err)
}

return &globalConcurrencyLimit, nil
}

// Read returns a global concurrency limit.
func (c *GlobalConcurrencyLimitsClient) Read(ctx context.Context, globalConcurrencyLimitID string) (*api.GlobalConcurrencyLimit, error) {
cfg := requestConfig{
method: http.MethodGet,
url: fmt.Sprintf("%s/%s", c.routePrefix, globalConcurrencyLimitID),
apiKey: c.apiKey,
successCodes: successCodesStatusOK,
}

var globalConcurrencyLimit api.GlobalConcurrencyLimit
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &globalConcurrencyLimit); err != nil {
return nil, fmt.Errorf("failed to get global concurrency limit: %w", err)
}

return &globalConcurrencyLimit, nil
}

// Update updates a global concurrency limit.
func (c *GlobalConcurrencyLimitsClient) Update(ctx context.Context, globalConcurrencyLimitID string, data api.GlobalConcurrencyLimitUpdate) error {
cfg := requestConfig{
method: http.MethodPatch,
url: fmt.Sprintf("%s/%s", c.routePrefix, globalConcurrencyLimitID),
body: &data,
apiKey: c.apiKey,
successCodes: successCodesStatusNoContent,
}

resp, err := request(ctx, c.hc, cfg)
if err != nil {
return fmt.Errorf("failed to update global concurrency limit: %w", err)
}
defer resp.Body.Close()

return nil
}

// Delete deletes a global concurrency limit.
func (c *GlobalConcurrencyLimitsClient) Delete(ctx context.Context, globalConcurrencyLimitID string) error {
cfg := requestConfig{
method: http.MethodDelete,
url: fmt.Sprintf("%s/%s", c.routePrefix, globalConcurrencyLimitID),
apiKey: c.apiKey,
successCodes: successCodesStatusNoContent,
}

resp, err := request(ctx, c.hc, cfg)
if err != nil {
return fmt.Errorf("failed to delete global concurrency limit: %w", err)
}
defer resp.Body.Close()

return nil
}
1 change: 1 addition & 0 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (p *PrefectProvider) Resources(_ context.Context) []func() resource.Resourc
resources.NewDeploymentResource,
resources.NewDeploymentScheduleResource,
resources.NewFlowResource,
resources.NewGlobalConcurrencyLimitResource,
resources.NewServiceAccountResource,
resources.NewTaskRunConcurrencyLimitResource,
resources.NewVariableResource,
Expand Down
Loading

0 comments on commit d1e724b

Please sign in to comment.