Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency limits): rename and provide examples of task run concurrency limits #362

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions docs/resources/concurrency_limit.md

This file was deleted.

65 changes: 65 additions & 0 deletions docs/resources/task_run_concurrency_limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "prefect_task_run_concurrency_limit Resource - prefect"
subcategory: ""
description: |-
The resource task_run_concurrency_limit represents a task run concurrency limit. Task Run concurrency limits allow you to control how many tasks with specific tags can run simultaneously.
---

# prefect_task_run_concurrency_limit (Resource)

The resource `task_run_concurrency_limit` represents a task run concurrency limit. Task Run concurrency limits allow you to control how many tasks with specific tags can run simultaneously.

## Example Usage

```terraform
provider "prefect" {}

data "prefect_workspace" "test" {
handle = "my-workspace"
}

resource "prefect_task_run_concurrency_limit" "test" {
workspace_id = data.prefect_workspace.test.id
concurrency_limit = 1
tag = "test-tag"
}

# Example of a task that will be limited to 1 concurrent run:
/*
from prefect import flow, task

# This task will be limited to 1 concurrent run
@task(tags=["test-tag"])
def my_task():
print("Hello, I'm a task")


@flow
def my_flow():
my_task()


if __name__ == "__main__":
my_flow()
*/
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `concurrency_limit` (Number) The task run concurrency limit.
- `tag` (String) A tag the task run concurrency limit is applied to.

### Optional

- `account_id` (String) Account ID (UUID)
- `workspace_id` (String) Workspace ID (UUID)

### Read-Only

- `created` (String) Timestamp of when the resource was created (RFC3339)
- `id` (String) Task run concurrency limit ID (UUID)
- `updated` (String) Timestamp of when the resource was updated (RFC3339)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
provider "prefect" {}

data "prefect_workspace" "test" {
handle = "my-workspace"
}

resource "prefect_task_run_concurrency_limit" "test" {
workspace_id = data.prefect_workspace.test.id
concurrency_limit = 1
tag = "test-tag"
}

# Example of a task that will be limited to 1 concurrent run:
/*
from prefect import flow, task

# This task will be limited to 1 concurrent run
@task(tags=["test-tag"])
def my_task():
print("Hello, I'm a task")


@flow
def my_flow():
my_task()


if __name__ == "__main__":
my_flow()
*/
2 changes: 1 addition & 1 deletion internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type PrefectClient interface {
BlockSchemas(accountID uuid.UUID, workspaceID uuid.UUID) (BlockSchemaClient, error)
BlockTypes(accountID uuid.UUID, workspaceID uuid.UUID) (BlockTypeClient, error)
Collections(accountID uuid.UUID, workspaceID uuid.UUID) (CollectionsClient, error)
ConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (ConcurrencyLimitsClient, error)
Deployments(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentsClient, error)
DeploymentAccess(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentAccessClient, error)
DeploymentSchedule(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentScheduleClient, error)
Teams(accountID uuid.UUID) (TeamsClient, error)
Flows(accountID uuid.UUID, workspaceID uuid.UUID) (FlowsClient, error)
TaskRunConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (TaskRunConcurrencyLimitsClient, error)
Workspaces(accountID uuid.UUID) (WorkspacesClient, error)
WorkspaceAccess(accountID uuid.UUID, workspaceID uuid.UUID) (WorkspaceAccessClient, error)
WorkspaceRoles(accountID uuid.UUID) (WorkspaceRolesClient, error)
Expand Down
25 changes: 0 additions & 25 deletions internal/api/concurrency_limits.go

This file was deleted.

25 changes: 25 additions & 0 deletions internal/api/task_run_concurrency_limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package api

import (
"context"
)

// TaskRunConcurrencyLimitsClient is a client for working with task run concurrency limits (in the api this is named "concurrency_limit").
type TaskRunConcurrencyLimitsClient interface {
Create(ctx context.Context, taskRunConcurrencyLimit TaskRunConcurrencyLimitCreate) (*TaskRunConcurrencyLimit, error)
Read(ctx context.Context, taskRunConcurrencyLimitID string) (*TaskRunConcurrencyLimit, error)
Delete(ctx context.Context, taskRunConcurrencyLimitID string) error
}

// TaskRunConcurrencyLimit is a representation of a task run concurrency limit.
type TaskRunConcurrencyLimit struct {
BaseModel
Tag string `json:"tag"`
ConcurrencyLimit int64 `json:"concurrency_limit"`
}

// TaskRunConcurrencyLimitCreate is a subset of TaskRunConcurrencyLimit used when creating task run concurrency limits.
type TaskRunConcurrencyLimitCreate struct {
Tag string `json:"tag"`
ConcurrencyLimit int64 `json:"concurrency_limit"`
}
95 changes: 0 additions & 95 deletions internal/client/concurrency_limits.go

This file was deleted.

95 changes: 95 additions & 0 deletions internal/client/task_run_concurrency_limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package client

import (
"context"
"fmt"
"net/http"

"github.com/google/uuid"
"github.com/prefecthq/terraform-provider-prefect/internal/api"
)

var _ = api.TaskRunConcurrencyLimitsClient(&TaskRunConcurrencyLimitsClient{})

// TaskRunConcurrencyLimitsClient is a client for working with task run concurrency limits.
type TaskRunConcurrencyLimitsClient struct {
hc *http.Client
routePrefix string
apiKey string
}

// TaskRunConcurrencyLimits returns a TaskRunConcurrencyLimitsClient.
//
//nolint:ireturn // required to support PrefectClient mocking
func (c *Client) TaskRunConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (api.TaskRunConcurrencyLimitsClient, error) {
if accountID == uuid.Nil {
accountID = c.defaultAccountID
}

if workspaceID == uuid.Nil {
workspaceID = c.defaultWorkspaceID
}

if err := validateCloudEndpoint(c.endpoint, accountID, workspaceID); err != nil {
return nil, err
}

return &TaskRunConcurrencyLimitsClient{
hc: c.hc,
routePrefix: getWorkspaceScopedURL(c.endpoint, accountID, workspaceID, "concurrency_limits"),
apiKey: c.apiKey,
}, nil
}

// Create creates a new task run concurrency limit.
func (c *TaskRunConcurrencyLimitsClient) Create(ctx context.Context, data api.TaskRunConcurrencyLimitCreate) (*api.TaskRunConcurrencyLimit, error) {
cfg := requestConfig{
method: http.MethodPost,
url: c.routePrefix + "/",
body: &data,
apiKey: c.apiKey,
successCodes: successCodesStatusOK,
}

var taskRunConcurrencyLimit api.TaskRunConcurrencyLimit
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &taskRunConcurrencyLimit); err != nil {
return nil, fmt.Errorf("failed to create task run concurrency limit: %w", err)
}

return &taskRunConcurrencyLimit, nil
}

// Read returns a task run concurrency limit.
func (c *TaskRunConcurrencyLimitsClient) Read(ctx context.Context, taskRunConcurrencyLimitID string) (*api.TaskRunConcurrencyLimit, error) {
cfg := requestConfig{
method: http.MethodGet,
url: fmt.Sprintf("%s/%s", c.routePrefix, taskRunConcurrencyLimitID),
apiKey: c.apiKey,
successCodes: successCodesStatusOK,
}

var taskRunConcurrencyLimit api.TaskRunConcurrencyLimit
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &taskRunConcurrencyLimit); err != nil {
return nil, fmt.Errorf("failed to get task run concurrency limit: %w", err)
}

return &taskRunConcurrencyLimit, nil
}

// Delete deletes a task run concurrency limit.
func (c *TaskRunConcurrencyLimitsClient) Delete(ctx context.Context, taskRunConcurrencyLimitID string) error {
cfg := requestConfig{
method: http.MethodDelete,
url: fmt.Sprintf("%s/%s", c.routePrefix, taskRunConcurrencyLimitID),
apiKey: c.apiKey,
successCodes: successCodesStatusOK,
}

resp, err := request(ctx, c.hc, cfg)
if err != nil {
return fmt.Errorf("failed to delete task run concurrency limit: %w", err)
}
defer resp.Body.Close()

return nil
}
Loading
Loading