Skip to content

Commit d8f61d5

Browse files
committed
Add fairness keys/weights
1 parent 5b2856a commit d8f61d5

File tree

3 files changed

+122
-45
lines changed

3 files changed

+122
-45
lines changed

temporal-sdk/src/main/java/io/temporal/common/Priority.java

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@ public static Priority getDefaultInstance() {
3131

3232
public static final class Builder {
3333
private int priorityKey;
34+
private String fairnessKey;
35+
private float fairnessWeight;
3436

3537
private Builder(Priority options) {
3638
if (options == null) {
3739
return;
3840
}
3941
this.priorityKey = options.getPriorityKey();
42+
this.fairnessKey = options.getFairnessKey();
43+
this.fairnessWeight = options.getFairnessWeight();
4044
}
4145

4246
/**
@@ -55,16 +59,44 @@ public Builder setPriorityKey(int priorityKey) {
5559
return this;
5660
}
5761

62+
/**
63+
* A fairness key is a short string used for balancing task dispatch. Tasks with the same
64+
* fairness key will be processed proportionally according to their fairness weight.
65+
*
66+
* <p>If not set, inherits from the parent workflow or uses an empty string if there is no
67+
* parent.
68+
*/
69+
public Builder setFairnessKey(String fairnessKey) {
70+
this.fairnessKey = fairnessKey;
71+
return this;
72+
}
73+
74+
/**
75+
* A fairness weight determines the relative proportion of task processing for a given fairness
76+
* key. The weight should be a positive number. A higher weight means more tasks will be
77+
* processed for that fairness key.
78+
*
79+
* <p>If not set or 0, defaults to 1.0. If there is a parent workflow, inherits from the parent.
80+
*/
81+
public Builder setFairnessWeight(float fairnessWeight) {
82+
this.fairnessWeight = fairnessWeight;
83+
return this;
84+
}
85+
5886
public Priority build() {
59-
return new Priority(priorityKey);
87+
return new Priority(priorityKey, fairnessKey, fairnessWeight);
6088
}
6189
}
6290

63-
private Priority(int priorityKey) {
91+
private Priority(int priorityKey, String fairnessKey, float fairnessWeight) {
6492
this.priorityKey = priorityKey;
93+
this.fairnessKey = fairnessKey;
94+
this.fairnessWeight = fairnessWeight;
6595
}
6696

6797
private final int priorityKey;
98+
private final String fairnessKey;
99+
private final float fairnessWeight;
68100

69101
/**
70102
* See {@link Builder#setPriorityKey(int)}
@@ -75,20 +107,48 @@ public int getPriorityKey() {
75107
return priorityKey;
76108
}
77109

110+
/**
111+
* See {@link Builder#setFairnessKey(String)}
112+
*
113+
* @return The fairness key
114+
*/
115+
public String getFairnessKey() {
116+
return fairnessKey;
117+
}
118+
119+
/**
120+
* See {@link Builder#setFairnessWeight(float)}
121+
*
122+
* @return The fairness weight
123+
*/
124+
public float getFairnessWeight() {
125+
return fairnessWeight;
126+
}
127+
78128
@Override
79129
public String toString() {
80-
return "Priority{" + "priorityKey=" + priorityKey + '}';
130+
return "Priority{"
131+
+ "priorityKey="
132+
+ priorityKey
133+
+ ", fairnessKey='"
134+
+ fairnessKey
135+
+ '\''
136+
+ ", fairnessWeight="
137+
+ fairnessWeight
138+
+ '}';
81139
}
82140

83141
@Override
84142
public boolean equals(Object o) {
85143
if (o == null || getClass() != o.getClass()) return false;
86144
Priority priority = (Priority) o;
87-
return priorityKey == priority.priorityKey;
145+
return priorityKey == priority.priorityKey
146+
&& Float.compare(priority.fairnessWeight, fairnessWeight) == 0
147+
&& Objects.equals(fairnessKey, priority.fairnessKey);
88148
}
89149

90150
@Override
91151
public int hashCode() {
92-
return Objects.hashCode(priorityKey);
152+
return Objects.hash(priorityKey, fairnessKey, fairnessWeight);
93153
}
94154
}

temporal-sdk/src/main/java/io/temporal/internal/common/ProtoConverters.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,27 @@
88

99
public class ProtoConverters {
1010
public static Priority toProto(io.temporal.common.Priority priority) {
11-
return Priority.newBuilder().setPriorityKey(priority.getPriorityKey()).build();
11+
Priority.Builder builder = Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
12+
if (priority.getFairnessKey() != null) {
13+
builder.setFairnessKey(priority.getFairnessKey());
14+
}
15+
if (priority.getFairnessWeight() != 0.0f) {
16+
builder.setFairnessWeight(priority.getFairnessWeight());
17+
}
18+
return builder.build();
1219
}
1320

1421
@Nonnull
1522
public static io.temporal.common.Priority fromProto(@Nonnull Priority priority) {
16-
return io.temporal.common.Priority.newBuilder()
17-
.setPriorityKey(priority.getPriorityKey())
18-
.build();
23+
io.temporal.common.Priority.Builder builder =
24+
io.temporal.common.Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
25+
if (!priority.getFairnessKey().isEmpty()) {
26+
builder.setFairnessKey(priority.getFairnessKey());
27+
}
28+
if (priority.getFairnessWeight() != 0.0f) {
29+
builder.setFairnessWeight(priority.getFairnessWeight());
30+
}
31+
return builder.build();
1932
}
2033

2134
public static io.temporal.api.deployment.v1.WorkerDeploymentVersion toProto(

temporal-sdk/src/test/java/io/temporal/workflow/PriorityInfoTest.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@ public void testPriority() {
3333
TestWorkflow1.class,
3434
WorkflowOptions.newBuilder()
3535
.setTaskQueue(testWorkflowRule.getTaskQueue())
36-
.setPriority(Priority.newBuilder().setPriorityKey(5).build())
36+
.setPriority(
37+
Priority.newBuilder()
38+
.setPriorityKey(5)
39+
.setFairnessKey("tenant-123")
40+
.setFairnessWeight(2.5f)
41+
.build())
3742
.build());
3843
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
39-
assertEquals("5", result);
44+
assertEquals("5:tenant-123:2.5", result);
4045
}
4146

4247
@ActivityInterface
@@ -47,15 +52,18 @@ public interface PriorityActivities {
4752
public static class PriorityActivitiesImpl implements PriorityActivities {
4853
@Override
4954
public String activity1(String a1) {
50-
return String.valueOf(
51-
Activity.getExecutionContext().getInfo().getPriority().getPriorityKey());
55+
Priority priority = Activity.getExecutionContext().getInfo().getPriority();
56+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
57+
return priority.getPriorityKey() + ":" + key + ":" + priority.getFairnessWeight();
5258
}
5359
}
5460

5561
public static class TestPriorityChildWorkflow implements TestWorkflows.TestWorkflowReturnString {
5662
@Override
5763
public String execute() {
58-
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
64+
Priority priority = Workflow.getInfo().getPriority();
65+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
66+
return priority.getPriorityKey() + ":" + key + ":" + priority.getFairnessWeight();
5967
}
6068
}
6169

@@ -70,12 +78,17 @@ public String execute(String taskQueue) {
7078
ActivityOptions.newBuilder()
7179
.setTaskQueue(taskQueue)
7280
.setStartToCloseTimeout(Duration.ofSeconds(10))
73-
.setPriority(Priority.newBuilder().setPriorityKey(3).build())
81+
.setPriority(
82+
Priority.newBuilder()
83+
.setPriorityKey(3)
84+
.setFairnessKey("override")
85+
.setFairnessWeight(1.5f)
86+
.build())
7487
.setDisableEagerExecution(true)
7588
.build())
7689
.activity1("1");
77-
Assert.assertEquals("3", priority);
78-
// Test that of if no priority is set the workflows priority is used
90+
Assert.assertEquals("3:override:1.5", priority);
91+
// Test that if no priority is set the workflow's priority is used
7992
priority =
8093
Workflow.newActivityStub(
8194
PriorityActivities.class,
@@ -85,46 +98,37 @@ public String execute(String taskQueue) {
8598
.setDisableEagerExecution(true)
8699
.build())
87100
.activity1("2");
88-
Assert.assertEquals("5", priority);
89-
// Test that of if a default priority is set the workflows priority is used
90-
priority =
91-
Workflow.newActivityStub(
92-
PriorityActivities.class,
93-
ActivityOptions.newBuilder()
94-
.setTaskQueue(taskQueue)
95-
.setStartToCloseTimeout(Duration.ofSeconds(10))
96-
.setPriority(Priority.newBuilder().build())
97-
.setDisableEagerExecution(true)
98-
.build())
99-
.activity1("2");
100-
Assert.assertEquals("5", priority);
101+
Assert.assertEquals("5:tenant-123:2.5", priority);
101102
// Test that the priority is passed to child workflows
102103
priority =
103104
Workflow.newChildWorkflowStub(
104105
TestWorkflows.TestWorkflowReturnString.class,
105106
ChildWorkflowOptions.newBuilder()
106-
.setPriority(Priority.newBuilder().setPriorityKey(1).build())
107+
.setPriority(
108+
Priority.newBuilder()
109+
.setPriorityKey(1)
110+
.setFairnessKey("child")
111+
.setFairnessWeight(0.5f)
112+
.build())
107113
.build())
108114
.execute();
109-
Assert.assertEquals("1", priority);
110-
// Test that of no priority is set the workflows priority is used
115+
Assert.assertEquals("1:child:0.5", priority);
116+
// Test that if no priority is set the workflow's priority is used
111117
priority =
112118
Workflow.newChildWorkflowStub(
113119
TestWorkflows.TestWorkflowReturnString.class,
114120
ChildWorkflowOptions.newBuilder().build())
115121
.execute();
116-
Assert.assertEquals("5", priority);
117-
// Test that if a default priority is set the workflows priority is used
118-
priority =
119-
Workflow.newChildWorkflowStub(
120-
TestWorkflows.TestWorkflowReturnString.class,
121-
ChildWorkflowOptions.newBuilder()
122-
.setPriority(Priority.newBuilder().build())
123-
.build())
124-
.execute();
125-
Assert.assertEquals("5", priority);
126-
// Return the workflows priority
127-
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
122+
Assert.assertEquals("5:tenant-123:2.5", priority);
123+
// Return the workflow's priority
124+
Priority workflowPriority = Workflow.getInfo().getPriority();
125+
String key =
126+
workflowPriority.getFairnessKey() != null ? workflowPriority.getFairnessKey() : "null";
127+
return workflowPriority.getPriorityKey()
128+
+ ":"
129+
+ key
130+
+ ":"
131+
+ workflowPriority.getFairnessWeight();
128132
}
129133
}
130134
}

0 commit comments

Comments
 (0)