From ba396780a64146a9d4092e4363bc9df5d851f711 Mon Sep 17 00:00:00 2001 From: taylanisikdemir Date: Fri, 12 Apr 2024 13:03:30 -0700 Subject: [PATCH] Switch async workflow request encoding from json to thrift (#5907) --- .../queue/consumer/default_consumer.go | 28 +++--- .../queue/consumer/default_consumer_test.go | 98 ++++++++++--------- ...rsion0Thriftrw.go => version0_thriftrw.go} | 0 ...ftrw_test.go => version0_thriftrw_test.go} | 28 ++++++ common/rpc.go | 30 ------ service/frontend/api/handler.go | 34 ++++--- service/frontend/api/handler_test.go | 12 +-- .../async_workflow_consumer_manager.go | 2 +- 8 files changed, 125 insertions(+), 107 deletions(-) rename common/codec/{version0Thriftrw.go => version0_thriftrw.go} (100%) rename common/codec/{version0Thriftrw_test.go => version0_thriftrw_test.go} (87%) diff --git a/common/asyncworkflow/queue/consumer/default_consumer.go b/common/asyncworkflow/queue/consumer/default_consumer.go index 6a58affc623..4c6fcbdc792 100644 --- a/common/asyncworkflow/queue/consumer/default_consumer.go +++ b/common/asyncworkflow/queue/consumer/default_consumer.go @@ -24,7 +24,6 @@ package consumer import ( "context" - "encoding/json" "fmt" "sort" "sync" @@ -43,6 +42,7 @@ import ( "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/thrift" ) const ( @@ -185,7 +185,7 @@ func (c *DefaultConsumer) processRequest(logger log.Logger, request *sqlblobs.As scope := c.scope.Tagged(metrics.AsyncWFRequestTypeTag(request.GetType().String())) switch request.GetType() { case sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest: - startWFReq, err := decodeStartWorkflowRequest(request.GetPayload(), request.GetEncoding()) + startWFReq, err := c.decodeStartWorkflowRequest(request.GetPayload(), request.GetEncoding()) if err != nil { scope.IncCounter(metrics.AsyncWorkflowFailureCorruptMsgCount) return err @@ -210,7 +210,7 @@ func (c *DefaultConsumer) processRequest(logger log.Logger, request *sqlblobs.As scope.IncCounter(metrics.AsyncWorkflowSuccessCount) logger.Info("StartWorkflowExecution succeeded", tag.WorkflowID(startWFReq.GetWorkflowID()), tag.WorkflowRunID(resp.GetRunID())) case sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest: - startWFReq, err := decodeSignalWithStartWorkflowRequest(request.GetPayload(), request.GetEncoding()) + startWFReq, err := c.decodeSignalWithStartWorkflowRequest(request.GetPayload(), request.GetEncoding()) if err != nil { c.scope.IncCounter(metrics.AsyncWorkflowFailureCorruptMsgCount) return err @@ -270,26 +270,30 @@ func getYARPCOptions(header *shared.Header) []yarpc.CallOption { return opts } -func decodeStartWorkflowRequest(payload []byte, encoding string) (*types.StartWorkflowExecutionRequest, error) { - if encoding != string(common.EncodingTypeJSON) { +func (c *DefaultConsumer) decodeStartWorkflowRequest(payload []byte, encoding string) (*types.StartWorkflowExecutionRequest, error) { + if encoding != string(common.EncodingTypeThriftRW) { return nil, &UnsupportedEncoding{EncodingType: encoding} } - var startRequest types.StartWorkflowExecutionAsyncRequest - if err := json.Unmarshal(payload, &startRequest); err != nil { + var thriftObj shared.StartWorkflowExecutionAsyncRequest + if err := c.msgDecoder.Decode(payload, &thriftObj); err != nil { return nil, err } + + startRequest := thrift.ToStartWorkflowExecutionAsyncRequest(&thriftObj) return startRequest.StartWorkflowExecutionRequest, nil } -func decodeSignalWithStartWorkflowRequest(payload []byte, encoding string) (*types.SignalWithStartWorkflowExecutionRequest, error) { - if encoding != string(common.EncodingTypeJSON) { +func (c *DefaultConsumer) decodeSignalWithStartWorkflowRequest(payload []byte, encoding string) (*types.SignalWithStartWorkflowExecutionRequest, error) { + if encoding != string(common.EncodingTypeThriftRW) { return nil, &UnsupportedEncoding{EncodingType: encoding} } - var startRequest types.SignalWithStartWorkflowExecutionAsyncRequest - if err := json.Unmarshal(payload, &startRequest); err != nil { + var thriftObj shared.SignalWithStartWorkflowExecutionAsyncRequest + if err := c.msgDecoder.Decode(payload, &thriftObj); err != nil { return nil, err } - return startRequest.SignalWithStartWorkflowExecutionRequest, nil + + signalWithStartRequest := thrift.ToSignalWithStartWorkflowExecutionAsyncRequest(&thriftObj) + return signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, nil } diff --git a/common/asyncworkflow/queue/consumer/default_consumer_test.go b/common/asyncworkflow/queue/consumer/default_consumer_test.go index 57fdd170b78..daadcfd8cd0 100644 --- a/common/asyncworkflow/queue/consumer/default_consumer_test.go +++ b/common/asyncworkflow/queue/consumer/default_consumer_test.go @@ -23,11 +23,12 @@ package consumer import ( - "encoding/json" "errors" "testing" "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" + "go.uber.org/yarpc" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" @@ -38,6 +39,28 @@ import ( "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/thrift" +) + +var ( + testSignalWithStartAsyncReq = &types.SignalWithStartWorkflowExecutionAsyncRequest{ + SignalWithStartWorkflowExecutionRequest: &types.SignalWithStartWorkflowExecutionRequest{ + Domain: "test-domain", + WorkflowID: "test-workflow-id", + WorkflowType: &types.WorkflowType{Name: "test-workflow-type"}, + Input: []byte("test-input"), + SignalName: "test-signal-name", + }, + } + + testStartReq = &types.StartWorkflowExecutionAsyncRequest{ + StartWorkflowExecutionRequest: &types.StartWorkflowExecutionRequest{ + Domain: "test-domain", + WorkflowID: "test-workflow-id", + WorkflowType: &types.WorkflowType{Name: "test-workflow-type"}, + Input: []byte("test-input"), + }, + } ) type fakeMessageConsumer struct { @@ -127,33 +150,33 @@ func TestDefaultConsumer(t *testing.T) { name: "startworkflow request with invalid payload content", frontendFails: true, msgs: []*fakeMessage{ - {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, false), wantAck: false}, + {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, false), wantAck: false}, }, }, { name: "startworkflowfrontend fails to respond", frontendFails: true, msgs: []*fakeMessage{ - {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: false}, + {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, true), wantAck: false}, }, }, { - name: "startworkflow unsupported encoding type", + name: "startworkflow unsupported encoding type. json encoding of requests are lossy due to PII masking so it shouldn't be used for async requests", msgs: []*fakeMessage{ - {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeProto, true), wantAck: false}, + {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: false}, }, }, { name: "startworkflow ok", msgs: []*fakeMessage{ - {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: true}, + {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, true), wantAck: true}, }, }, { name: "startworkflow ok with chan closed before stopping", closeChanBeforeStop: true, msgs: []*fakeMessage{ - {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: true}, + {val: mustGenerateStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, true), wantAck: true}, }, }, // signal with start test cases @@ -161,26 +184,26 @@ func TestDefaultConsumer(t *testing.T) { name: "signalwithstartworkflow request with invalid payload content", frontendFails: true, msgs: []*fakeMessage{ - {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, false), wantAck: false}, + {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, false), wantAck: false}, }, }, { name: "signalwithstartworkflow frontend fails to respond", frontendFails: true, msgs: []*fakeMessage{ - {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: false}, + {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, true), wantAck: false}, }, }, { - name: "signalwithstartworkflow unsupported encoding type", + name: "signalwithstartworkflow unsupported encoding type. json encoding of requests are lossy due to PII masking so it shouldn't be used for async requests", msgs: []*fakeMessage{ - {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeProto, true), wantAck: false}, + {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: false}, }, }, { name: "signalwithstartworkflow ok", msgs: []*fakeMessage{ - {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeJSON, true), wantAck: true}, + {val: mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t, common.EncodingTypeThriftRW, true), wantAck: true}, }, }, } @@ -206,10 +229,20 @@ func TestDefaultConsumer(t *testing.T) { resp := &types.StartWorkflowExecutionResponse{RunID: "test-run-id"} mockFrontend.EXPECT(). StartWorkflowExecution(gomock.Any(), gomock.Any(), opts[0], opts[1]). - Return(resp, nil).AnyTimes() + DoAndReturn(func(ctx interface{}, req *types.StartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + if diff := cmp.Diff(testStartReq.StartWorkflowExecutionRequest, req); diff != "" { + t.Fatalf("Request mismatch (-want +got):\n%s", diff) + } + return resp, nil + }).AnyTimes() mockFrontend.EXPECT(). SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), opts[0], opts[1]). - Return(resp, nil).AnyTimes() + DoAndReturn(func(ctx interface{}, req *types.SignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + if diff := cmp.Diff(testSignalWithStartAsyncReq.SignalWithStartWorkflowExecutionRequest, req); diff != "" { + t.Fatalf("Request mismatch (-want +got):\n%s", diff) + } + return resp, nil + }).AnyTimes() } c := New("queueid1", fakeConsumer, testlogger.New(t), metrics.NewNoopMetricsClient(), mockFrontend, WithConcurrency(2)) @@ -247,16 +280,8 @@ func TestDefaultConsumer(t *testing.T) { } func mustGenerateStartWorkflowExecutionRequestMsg(t *testing.T, encodingType common.EncodingType, validPayload bool) []byte { - startRequest := &types.StartWorkflowExecutionAsyncRequest{ - StartWorkflowExecutionRequest: &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", - WorkflowID: "test-workflow-id", - WorkflowType: &types.WorkflowType{Name: "test-workflow-type"}, - Input: []byte("test-input"), - }, - } - - payload, err := json.Marshal(startRequest) + encoder := codec.NewThriftRWEncoder() + payload, err := encoder.Encode(thrift.FromStartWorkflowExecutionAsyncRequest(testStartReq)) if err != nil { t.Fatal(err) } @@ -281,17 +306,8 @@ func mustGenerateStartWorkflowExecutionRequestMsg(t *testing.T, encodingType com } func mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t *testing.T, encodingType common.EncodingType, validPayload bool) []byte { - signalWithStartRequest := &types.SignalWithStartWorkflowExecutionAsyncRequest{ - SignalWithStartWorkflowExecutionRequest: &types.SignalWithStartWorkflowExecutionRequest{ - Domain: "test-domain", - WorkflowID: "test-workflow-id", - WorkflowType: &types.WorkflowType{Name: "test-workflow-type"}, - Input: []byte("test-input"), - SignalName: "test-signal-name", - }, - } - - payload, err := json.Marshal(signalWithStartRequest) + encoder := codec.NewThriftRWEncoder() + payload, err := encoder.Encode(thrift.FromSignalWithStartWorkflowExecutionAsyncRequest(testSignalWithStartAsyncReq)) if err != nil { t.Fatal(err) } @@ -316,16 +332,8 @@ func mustGenerateSignalWithStartWorkflowExecutionRequestMsg(t *testing.T, encodi } func mustGenerateUnsupportedRequestMsg(t *testing.T) []byte { - startRequest := &types.StartWorkflowExecutionAsyncRequest{ - StartWorkflowExecutionRequest: &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", - WorkflowID: "test-workflow-id", - WorkflowType: &types.WorkflowType{Name: "test-workflow-type"}, - Input: []byte("test-input"), - }, - } - - payload, err := json.Marshal(startRequest) + encoder := codec.NewThriftRWEncoder() + payload, err := encoder.Encode(thrift.FromStartWorkflowExecutionAsyncRequest(testStartReq)) if err != nil { t.Fatal(err) } diff --git a/common/codec/version0Thriftrw.go b/common/codec/version0_thriftrw.go similarity index 100% rename from common/codec/version0Thriftrw.go rename to common/codec/version0_thriftrw.go diff --git a/common/codec/version0Thriftrw_test.go b/common/codec/version0_thriftrw_test.go similarity index 87% rename from common/codec/version0Thriftrw_test.go rename to common/codec/version0_thriftrw_test.go index dc5aff182f0..06877af8dcc 100644 --- a/common/codec/version0Thriftrw_test.go +++ b/common/codec/version0_thriftrw_test.go @@ -21,11 +21,15 @@ package codec import ( + "fmt" "log" "os" + "sync" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/suite" + "go.uber.org/multierr" workflow "github.com/uber/cadence/.gen/go/shared" ) @@ -84,6 +88,30 @@ func (s *thriftRWEncoderSuite) TestEncode() { s.Equal(thriftEncodedBinary, binary) } +func (s *thriftRWEncoderSuite) TestEncodeConcurrent() { + var wg sync.WaitGroup + count := 200 + errs := make([]error, count) + wg.Add(count) + for i := 0; i < count; i++ { + go func(idx int) { + defer wg.Done() + binary, err := s.encoder.Encode(thriftObject) + if err != nil { + errs[idx] = err + return + } + + if diff := cmp.Diff(thriftEncodedBinary, binary); diff != "" { + errs[idx] = fmt.Errorf("Mismatch (-want +got):\n%s", diff) + return + } + }(i) + } + wg.Wait() + s.NoError(multierr.Combine(errs...)) +} + func (s *thriftRWEncoderSuite) TestDecode() { var val workflow.HistoryEvent err := s.encoder.Decode(thriftEncodedBinary, &val) diff --git a/common/rpc.go b/common/rpc.go index be6454a68d2..4d48eeac8e4 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -21,8 +21,6 @@ package common import ( - "context" - "go.uber.org/yarpc" ) @@ -69,31 +67,3 @@ type ( GetMaxMessageSize() int } ) - -// GetClientHeaders returns the headers that should be sent from client to server -func GetClientHeaders(ctx context.Context) map[string]string { - call := yarpc.CallFromContext(ctx) - headerNames := call.HeaderNames() - headerExists := map[string]struct{}{} - for _, h := range headerNames { - headerExists[h] = struct{}{} - } - - headers := make(map[string]string) - if _, ok := headerExists[LibraryVersionHeaderName]; !ok { - headers[LibraryVersionHeaderName] = call.Header(LibraryVersionHeaderName) - } - if _, ok := headerExists[FeatureVersionHeaderName]; !ok { - headers[FeatureVersionHeaderName] = call.Header(FeatureVersionHeaderName) - } - if _, ok := headerExists[ClientImplHeaderName]; !ok { - headers[ClientImplHeaderName] = call.Header(ClientImplHeaderName) - } - if _, ok := headerExists[ClientFeatureFlagsHeaderName]; !ok { - headers[ClientFeatureFlagsHeaderName] = call.Header(ClientFeatureFlagsHeaderName) - } - if _, ok := headerExists[ClientIsolationGroupHeaderName]; !ok { - headers[ClientIsolationGroupHeaderName] = call.Header(ClientIsolationGroupHeaderName) - } - return headers -} diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index b67b4f25c9c..4dce894af7b 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -38,6 +38,7 @@ import ( "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/client" + "github.com/uber/cadence/common/codec" "github.com/uber/cadence/common/domain" "github.com/uber/cadence/common/elasticsearch/validator" "github.com/uber/cadence/common/log" @@ -49,6 +50,7 @@ import ( "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/thrift" "github.com/uber/cadence/service/frontend/config" "github.com/uber/cadence/service/frontend/validate" ) @@ -79,6 +81,7 @@ type ( searchAttributesValidator *validator.SearchAttributesValidator throttleRetry *backoff.ThrottleRetry producerManager ProducerManager + thriftrwEncoder codec.BinaryEncoder } getHistoryContinuationToken struct { @@ -139,6 +142,7 @@ func NewWorkflowHandler( resource.GetLogger(), resource.GetMetricsClient(), ), + thriftrwEncoder: codec.NewThriftRWEncoder(), } } @@ -1760,17 +1764,19 @@ func (wh *WorkflowHandler) StartWorkflowExecutionAsync( if err != nil { return nil, err } - // serialize the message to be sent to the queue - payload, err := json.Marshal(startRequest) + + // Serialize the message to be sent to the queue. + // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes. + payload, err := wh.thriftrwEncoder.Encode(thrift.FromStartWorkflowExecutionAsyncRequest(startRequest)) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to encode StartWorkflowExecutionAsyncRequest: %v", err) } + // propagate the headers from the context to the message - clientHeaders := common.GetClientHeaders(ctx) header := &shared.Header{ - Fields: map[string][]byte{}, + Fields: make(map[string][]byte), } - for k, v := range clientHeaders { + for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() { header.Fields[k] = []byte(v) } messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest @@ -1778,7 +1784,7 @@ func (wh *WorkflowHandler) StartWorkflowExecutionAsync( PartitionKey: common.StringPtr(startRequest.GetWorkflowID()), Type: &messageType, Header: header, - Encoding: common.StringPtr(string(common.EncodingTypeJSON)), + Encoding: common.StringPtr(string(common.EncodingTypeThriftRW)), Payload: payload, } err = producer.Publish(ctx, message) @@ -2338,17 +2344,19 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync( if err != nil { return nil, err } - // serialize the message to be sent to the queue - payload, err := json.Marshal(signalWithStartRequest) + + // Serialize the message to be sent to the queue. + // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes. + payload, err := wh.thriftrwEncoder.Encode(thrift.FromSignalWithStartWorkflowExecutionAsyncRequest(signalWithStartRequest)) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to encode SignalWithStartWorkflowExecutionAsyncRequest: %v", err) } + // propagate the headers from the context to the message - clientHeaders := common.GetClientHeaders(ctx) header := &shared.Header{ Fields: map[string][]byte{}, } - for k, v := range clientHeaders { + for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() { header.Fields[k] = []byte(v) } messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest @@ -2356,7 +2364,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync( PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()), Type: &messageType, Header: header, - Encoding: common.StringPtr(string(common.EncodingTypeJSON)), + Encoding: common.StringPtr(string(common.EncodingTypeThriftRW)), Payload: payload, } err = producer.Publish(ctx, message) diff --git a/service/frontend/api/handler_test.go b/service/frontend/api/handler_test.go index 014b94a127d..212146ca776 100644 --- a/service/frontend/api/handler_test.go +++ b/service/frontend/api/handler_test.go @@ -2047,7 +2047,7 @@ func TestStartWorkflowExecutionAsync(t *testing.T) { TaskList: &types.TaskList{ Name: "test-task-list", }, - Input: nil, + Input: []byte("test-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), Identity: "test-identity", @@ -2071,7 +2071,7 @@ func TestStartWorkflowExecutionAsync(t *testing.T) { TaskList: &types.TaskList{ Name: "test-task-list", }, - Input: nil, + Input: []byte("test-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), Identity: "test-identity", @@ -2097,7 +2097,7 @@ func TestStartWorkflowExecutionAsync(t *testing.T) { TaskList: &types.TaskList{ Name: "test-task-list", }, - Input: nil, + Input: []byte("test-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), Identity: "test-identity", @@ -2165,7 +2165,7 @@ func TestSignalWithStartWorkflowExecutionAsync(t *testing.T) { TaskList: &types.TaskList{ Name: "test-task-list", }, - Input: nil, + Input: []byte("test-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), Identity: "test-identity", @@ -2190,7 +2190,7 @@ func TestSignalWithStartWorkflowExecutionAsync(t *testing.T) { TaskList: &types.TaskList{ Name: "test-task-list", }, - Input: nil, + Input: []byte("test-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), Identity: "test-identity", @@ -2217,7 +2217,7 @@ func TestSignalWithStartWorkflowExecutionAsync(t *testing.T) { TaskList: &types.TaskList{ Name: "test-task-list", }, - Input: nil, + Input: []byte("test-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), Identity: "test-identity", diff --git a/service/worker/asyncworkflow/async_workflow_consumer_manager.go b/service/worker/asyncworkflow/async_workflow_consumer_manager.go index 9f0617486b5..1a0379c30ad 100644 --- a/service/worker/asyncworkflow/async_workflow_consumer_manager.go +++ b/service/worker/asyncworkflow/async_workflow_consumer_manager.go @@ -40,7 +40,7 @@ import ( ) const ( - defaultRefreshInterval = 5 * time.Minute + defaultRefreshInterval = 1 * time.Minute defaultShutdownTimeout = 5 * time.Second )