Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:

- name: Start containerized server and dependencies
env:
TEMPORAL_CLI_VERSION: 1.4.0
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand Down
81 changes: 76 additions & 5 deletions temporal-sdk/src/main/java/io/temporal/common/Priority.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ public static Priority getDefaultInstance() {

public static final class Builder {
private int priorityKey;
private String fairnessKey;
private float fairnessWeight;

private Builder(Priority options) {
if (options == null) {
return;
}
this.priorityKey = options.getPriorityKey();
this.fairnessKey = options.getFairnessKey();
this.fairnessWeight = options.getFairnessWeight();
}

/**
Expand All @@ -55,16 +59,55 @@ public Builder setPriorityKey(int priorityKey) {
return this;
}

/**
* FairnessKey is a short string that's used as a key for a fairness balancing mechanism. It may
* correspond to a tenant id, or to a fixed string like "high" or "low". The default is the
* empty string.
*
* <p>>The fairness mechanism attempts to dispatch tasks for a given key in proportion to its
* weight. For example, using a thousand distinct tenant ids, each with a weight of 1.0 (the
* default) will result in each tenant getting a roughly equal share of task dispatch
* throughput.
*
* <p>Fairness keys are limited to 64 bytes.
*/
public Builder setFairnessKey(String fairnessKey) {
this.fairnessKey = fairnessKey;
return this;
}

/**
* FairnessWeight for a task can come from multiple sources for flexibility. From highest to
* lowest precedence:
*
* <ul>
* <li>Weights for a small set of keys can be overridden in task queue configuration with an
* API.
* <li>It can be attached to the workflow/activity in this field.
* <li>The default weight of 1.0 will be used.
* </ul>
*
* <p>Weight values are clamped to the range [0.001, 1000].
*/
public Builder setFairnessWeight(float fairnessWeight) {
this.fairnessWeight = fairnessWeight;
return this;
}

public Priority build() {
return new Priority(priorityKey);
return new Priority(priorityKey, fairnessKey, fairnessWeight);
}
}

private Priority(int priorityKey) {
private Priority(int priorityKey, String fairnessKey, float fairnessWeight) {
this.priorityKey = priorityKey;
this.fairnessKey = fairnessKey;
this.fairnessWeight = fairnessWeight;
}

private final int priorityKey;
private final String fairnessKey;
private final float fairnessWeight;

/**
* See {@link Builder#setPriorityKey(int)}
Expand All @@ -75,20 +118,48 @@ public int getPriorityKey() {
return priorityKey;
}

/**
* See {@link Builder#setFairnessKey(String)}
*
* @return The fairness key
*/
public String getFairnessKey() {
return fairnessKey;
}

/**
* See {@link Builder#setFairnessWeight(float)}
*
* @return The fairness weight
*/
public float getFairnessWeight() {
return fairnessWeight;
}

@Override
public String toString() {
return "Priority{" + "priorityKey=" + priorityKey + '}';
return "Priority{"
+ "priorityKey="
+ priorityKey
+ ", fairnessKey='"
+ fairnessKey
+ '\''
+ ", fairnessWeight="
+ fairnessWeight
+ '}';
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Priority priority = (Priority) o;
return priorityKey == priority.priorityKey;
return priorityKey == priority.priorityKey
&& Float.compare(priority.fairnessWeight, fairnessWeight) == 0
&& Objects.equals(fairnessKey, priority.fairnessKey);
}

@Override
public int hashCode() {
return Objects.hashCode(priorityKey);
return Objects.hash(priorityKey, fairnessKey, fairnessWeight);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,19 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
case NEXUS_OPERATION_EXECUTION_FAILURE_INFO:
{
NexusOperationFailureInfo info = failure.getNexusOperationExecutionFailureInfo();
return new NexusOperationFailure(
failure.getMessage(),
info.getScheduledEventId(),
info.getEndpoint(),
info.getService(),
info.getOperation(),
info.getOperationToken().isEmpty() ? info.getOperationId() : info.getOperationToken(),
cause);
@SuppressWarnings("deprecation")
NexusOperationFailure f =
new NexusOperationFailure(
failure.getMessage(),
info.getScheduledEventId(),
info.getEndpoint(),
info.getService(),
info.getOperation(),
info.getOperationToken().isEmpty()
? info.getOperationId()
: info.getOperationToken(),
cause);
return f;
}
case NEXUS_HANDLER_FAILURE_INFO:
{
Expand Down Expand Up @@ -307,6 +312,7 @@ private Failure exceptionToFailure(Throwable throwable) {
failure.setCanceledFailureInfo(info);
} else if (throwable instanceof NexusOperationFailure) {
NexusOperationFailure no = (NexusOperationFailure) throwable;
@SuppressWarnings("deprecation")
NexusOperationFailureInfo.Builder op =
NexusOperationFailureInfo.newBuilder()
.setScheduledEventId(no.getScheduledEventId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,27 @@

public class ProtoConverters {
public static Priority toProto(io.temporal.common.Priority priority) {
return Priority.newBuilder().setPriorityKey(priority.getPriorityKey()).build();
Priority.Builder builder = Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
if (priority.getFairnessKey() != null) {
builder.setFairnessKey(priority.getFairnessKey());
}
if (priority.getFairnessWeight() != 0.0f) {
builder.setFairnessWeight(priority.getFairnessWeight());
}
return builder.build();
}

@Nonnull
public static io.temporal.common.Priority fromProto(@Nonnull Priority priority) {
return io.temporal.common.Priority.newBuilder()
.setPriorityKey(priority.getPriorityKey())
.build();
io.temporal.common.Priority.Builder builder =
io.temporal.common.Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
if (!priority.getFairnessKey().isEmpty()) {
builder.setFairnessKey(priority.getFairnessKey());
}
if (priority.getFairnessWeight() != 0.0f) {
builder.setFairnessWeight(priority.getFairnessWeight());
}
return builder.build();
}

public static io.temporal.api.deployment.v1.WorkerDeploymentVersion toProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private CancelOperationResponse handleCancelledOperation(
OperationContext.Builder ctx, CancelOperationRequest task) {
ctx.setService(task.getService()).setOperation(task.getOperation());

@SuppressWarnings("deprecation") // getOperationId kept to support old server for a while
OperationCancelDetails operationCancelDetails =
OperationCancelDetails.newBuilder()
.setOperationToken(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ public void testPriority() {
TestWorkflow1.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setPriority(Priority.newBuilder().setPriorityKey(5).build())
.setPriority(
Priority.newBuilder()
.setPriorityKey(5)
.setFairnessKey("tenant-123")
.setFairnessWeight(2.5f)
.build())
.build());
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertEquals("5", result);
assertEquals("5:tenant-123:2.5", result);
}

@ActivityInterface
Expand All @@ -47,15 +52,18 @@ public interface PriorityActivities {
public static class PriorityActivitiesImpl implements PriorityActivities {
@Override
public String activity1(String a1) {
return String.valueOf(
Activity.getExecutionContext().getInfo().getPriority().getPriorityKey());
Priority priority = Activity.getExecutionContext().getInfo().getPriority();
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
return priority.getPriorityKey() + ":" + key + ":" + priority.getFairnessWeight();
}
}

public static class TestPriorityChildWorkflow implements TestWorkflows.TestWorkflowReturnString {
@Override
public String execute() {
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
Priority priority = Workflow.getInfo().getPriority();
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
return priority.getPriorityKey() + ":" + key + ":" + priority.getFairnessWeight();
}
}

Expand All @@ -70,12 +78,17 @@ public String execute(String taskQueue) {
ActivityOptions.newBuilder()
.setTaskQueue(taskQueue)
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setPriority(Priority.newBuilder().setPriorityKey(3).build())
.setPriority(
Priority.newBuilder()
.setPriorityKey(3)
.setFairnessKey("override")
.setFairnessWeight(1.5f)
.build())
.setDisableEagerExecution(true)
.build())
.activity1("1");
Assert.assertEquals("3", priority);
// Test that of if no priority is set the workflows priority is used
Assert.assertEquals("3:override:1.5", priority);
// Test that if no priority is set the workflow's priority is used
priority =
Workflow.newActivityStub(
PriorityActivities.class,
Expand All @@ -85,46 +98,37 @@ public String execute(String taskQueue) {
.setDisableEagerExecution(true)
.build())
.activity1("2");
Assert.assertEquals("5", priority);
// Test that of if a default priority is set the workflows priority is used
priority =
Workflow.newActivityStub(
PriorityActivities.class,
ActivityOptions.newBuilder()
.setTaskQueue(taskQueue)
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setPriority(Priority.newBuilder().build())
.setDisableEagerExecution(true)
.build())
.activity1("2");
Assert.assertEquals("5", priority);
Assert.assertEquals("5:tenant-123:2.5", priority);
// Test that the priority is passed to child workflows
priority =
Workflow.newChildWorkflowStub(
TestWorkflows.TestWorkflowReturnString.class,
ChildWorkflowOptions.newBuilder()
.setPriority(Priority.newBuilder().setPriorityKey(1).build())
.setPriority(
Priority.newBuilder()
.setPriorityKey(1)
.setFairnessKey("child")
.setFairnessWeight(0.5f)
.build())
.build())
.execute();
Assert.assertEquals("1", priority);
// Test that of no priority is set the workflows priority is used
Assert.assertEquals("1:child:0.5", priority);
// Test that if no priority is set the workflow's priority is used
priority =
Workflow.newChildWorkflowStub(
TestWorkflows.TestWorkflowReturnString.class,
ChildWorkflowOptions.newBuilder().build())
.execute();
Assert.assertEquals("5", priority);
// Test that if a default priority is set the workflows priority is used
priority =
Workflow.newChildWorkflowStub(
TestWorkflows.TestWorkflowReturnString.class,
ChildWorkflowOptions.newBuilder()
.setPriority(Priority.newBuilder().build())
.build())
.execute();
Assert.assertEquals("5", priority);
// Return the workflows priority
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
Assert.assertEquals("5:tenant-123:2.5", priority);
// Return the workflow's priority
Priority workflowPriority = Workflow.getInfo().getPriority();
String key =
workflowPriority.getFairnessKey() != null ? workflowPriority.getFairnessKey() : "null";
return workflowPriority.getPriorityKey()
+ ":"
+ key
+ ":"
+ workflowPriority.getFairnessWeight();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2563,9 +2563,17 @@ static Priority mergePriorities(Priority parent, Priority child) {
}
Priority.Builder result = Priority.newBuilder();
result.setPriorityKey(parent.getPriorityKey());
result.setFairnessKey(child.getFairnessKey());
result.setFairnessWeight(child.getFairnessWeight());
if (child.getPriorityKey() != 0) {
result.setPriorityKey(child.getPriorityKey());
}
if (!child.getFairnessKey().isEmpty()) {
result.setFairnessKey(child.getFairnessKey());
}
if (child.getFairnessWeight() != 0) {
result.setFairnessWeight(child.getFairnessWeight());
}
return result.build();
}
}
Loading