Skip to content

Commit 1c7f71a

Browse files
authored
Add WorkflowIdConflictPolicy (temporalio#5507)
## What changed? <!-- Describe what has changed in this PR --> This is the implementation for the API change in temporalio/api#359: - added support for new WorkflowIDConflictPolicy - `FAIL` - `USE_EXISTING` - `TERMINATE_EXISTING` - set new field `Started` in response accordingly - covering Start-Workflow and Signal-With-Start - migrating WorkflowIDReusePolicy Terminate-If-Running to WorkflowIDConflictPolicy Terminate-Existing - disallowing using WorkflowIDReusePolicy Terminate-If-Running with any WorkflowIDReusePolicy ## Why? <!-- Tell your future self why have you made these changes --> See temporalio/api#359 ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Existing + new tests for requirements mentioned above. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> Breaking existing usage - but the tests should have uncovered any issues. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent 3fd2b60 commit 1c7f71a

17 files changed

Lines changed: 2140 additions & 1797 deletions

File tree

api/historyservice/v1/request_response.pb.go

Lines changed: 1617 additions & 1598 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,9 @@ const (
395395
// lifecycle stage. Default value is `false`.
396396
FrontendEnableUpdateWorkflowExecutionAsyncAccepted = "frontend.enableUpdateWorkflowExecutionAsyncAccepted"
397397

398+
// EnableWorkflowIdConflictPolicy enables the `WorkflowIdConflictPolicy` option for Start and Signal-with-Start
399+
EnableWorkflowIdConflictPolicy = "frontend.enableWorkflowIdConflictPolicy"
400+
398401
// FrontendEnableWorkerVersioningDataAPIs enables worker versioning data read / write APIs.
399402
FrontendEnableWorkerVersioningDataAPIs = "frontend.workerVersioningDataAPIs"
400403
// FrontendEnableWorkerVersioningWorkflowAPIs enables worker versioning in workflow progress APIs.

common/enums/defaults.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ func SetDefaultWorkflowIdReusePolicy(f *enumspb.WorkflowIdReusePolicy) {
3434
}
3535
}
3636

37+
func SetDefaultWorkflowIdConflictPolicy(
38+
conflictPolicy *enumspb.WorkflowIdConflictPolicy,
39+
defaultPolicy enumspb.WorkflowIdConflictPolicy,
40+
) {
41+
if *conflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED {
42+
*conflictPolicy = defaultPolicy
43+
}
44+
}
45+
3746
func SetDefaultHistoryEventFilterType(f *enumspb.HistoryEventFilterType) {
3847
if *f == enumspb.HISTORY_EVENT_FILTER_TYPE_UNSPECIFIED {
3948
*f = enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ message StartWorkflowExecutionResponse {
7373
temporal.server.api.clock.v1.VectorClock clock = 2;
7474
// Set if request_eager_execution is set on the start request
7575
temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse eager_workflow_task = 3;
76+
bool started = 4;
7677
}
7778

7879
message GetMutableStateRequest {
@@ -302,6 +303,7 @@ message SignalWithStartWorkflowExecutionRequest {
302303

303304
message SignalWithStartWorkflowExecutionResponse {
304305
string run_id = 1;
306+
bool started = 2;
305307
}
306308

307309
message RemoveSignalMutableStateRequest {

service/frontend/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ var (
8383
errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
8484
errUseVersioningWithoutNormalName = serviceerror.NewInvalidArgument("NormalName must be set on sticky queue if UseVersioning is true.")
8585
errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.")
86+
errIncompatibleIDReusePolicy = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.")
8687

8788
errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
8889
errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.")
@@ -112,6 +113,7 @@ var (
112113
errUnableToGetNamespaceInfoMessage = "Unable to get namespace %v info with error: %v"
113114
errUnableToCreateFrontendClientMessage = "Unable to create frontend client with error: %v."
114115
errTooManySearchAttributesMessage = "Unable to create search attributes: cannot have more than %d search attribute of type %s."
116+
errUnsupportedIDConflictPolicy = "Invalid WorkflowIDConflictPolicy: %v is not supported for this operation."
115117

116118
errListNotAllowed = serviceerror.NewPermissionDenied("List is disabled on this namespace.", "")
117119
errSchedulesNotAllowed = serviceerror.NewPermissionDenied("Schedules are disabled on this namespace.", "")
@@ -121,6 +123,7 @@ var (
121123
errBatchOpsWorkflowFiltersNotAllowed = serviceerror.NewInvalidArgument("Workflow executions and visibility filter are both set on request. Only one of them is allowed.")
122124
errBatchOpsMaxWorkflowExecutionCount = serviceerror.NewInvalidArgument("Workflow executions count exceeded.")
123125

126+
errWorkflowIdConflictPolicyNotAllowed = serviceerror.NewPermissionDenied("WorkflowIdConflictPolicy option is disabled on this namespace.", "")
124127
errUpdateWorkflowExecutionAPINotAllowed = serviceerror.NewPermissionDenied("UpdateWorkflowExecution operation is disabled on this namespace.", "")
125128
errUpdateWorkflowExecutionAsyncAcceptedNotAllowed = serviceerror.NewPermissionDenied("UpdateWorkflowExecution issued asynchronously and waiting on update accepted is disabled on this namespace", "")
126129
errUpdateWorkflowExecutionAsyncAdmittedNotAllowed = serviceerror.NewPermissionDenied("UpdateWorkflowExecution issued asynchronously and waiting on update admitted is disabled on this namespace", "")

service/frontend/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ type Config struct {
179179
MaxConcurrentBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
180180
MaxExecutionCountBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
181181

182+
EnableWorkflowIdConflictPolicy dynamicconfig.BoolPropertyFnWithNamespaceFilter
183+
182184
EnableUpdateWorkflowExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
183185
EnableUpdateWorkflowExecutionAsyncAccepted dynamicconfig.BoolPropertyFnWithNamespaceFilter
184186

@@ -285,6 +287,8 @@ func NewConfig(
285287
MaxConcurrentBatchOperation: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace, 1),
286288
MaxExecutionCountBatchOperation: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxExecutionCountBatchOperationPerNamespace, 1000),
287289

290+
EnableWorkflowIdConflictPolicy: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableWorkflowIdConflictPolicy, false),
291+
288292
EnableUpdateWorkflowExecution: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.FrontendEnableUpdateWorkflowExecution, false),
289293
EnableUpdateWorkflowExecutionAsyncAccepted: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.FrontendEnableUpdateWorkflowExecutionAsyncAccepted, false),
290294

service/frontend/workflow_handler.go

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,17 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
394394
return nil, err
395395
}
396396

397+
if err := wh.validateWorkflowIdReusePolicy(
398+
namespaceName,
399+
request.WorkflowIdReusePolicy,
400+
request.WorkflowIdConflictPolicy,
401+
); err != nil {
402+
return nil, err
403+
}
404+
405+
enums.SetDefaultWorkflowIdReusePolicy(&request.WorkflowIdReusePolicy)
406+
enums.SetDefaultWorkflowIdConflictPolicy(&request.WorkflowIdConflictPolicy, enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL)
407+
397408
sa, err := wh.unaliasedSearchAttributesFrom(request.GetSearchAttributes(), namespaceName)
398409
if err != nil {
399410
return nil, err
@@ -421,8 +432,6 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
421432
}
422433
}
423434

424-
enums.SetDefaultWorkflowIdReusePolicy(&request.WorkflowIdReusePolicy)
425-
426435
wh.logger.Debug("Start workflow execution request namespace.", tag.WorkflowNamespace(namespaceName.String()))
427436
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName)
428437
if err != nil {
@@ -435,7 +444,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
435444
if err != nil {
436445
return nil, err
437446
}
438-
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId(), EagerWorkflowTask: resp.GetEagerWorkflowTask()}, nil
447+
return &workflowservice.StartWorkflowExecutionResponse{
448+
RunId: resp.GetRunId(),
449+
Started: resp.Started,
450+
EagerWorkflowTask: resp.GetEagerWorkflowTask(),
451+
}, nil
439452
}
440453

441454
func (wh *WorkflowHandler) unaliasedSearchAttributesFrom(
@@ -1727,6 +1740,23 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
17271740
return nil, err
17281741
}
17291742

1743+
if err := wh.validateWorkflowIdReusePolicy(
1744+
namespaceName,
1745+
request.WorkflowIdReusePolicy,
1746+
request.WorkflowIdConflictPolicy,
1747+
); err != nil {
1748+
return nil, err
1749+
}
1750+
1751+
if request.WorkflowIdConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL {
1752+
// Signal-with-*Required*-Start is not supported
1753+
name := enumspb.WorkflowIdConflictPolicy_name[int32(request.WorkflowIdConflictPolicy.Number())]
1754+
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errUnsupportedIDConflictPolicy, name))
1755+
}
1756+
1757+
enums.SetDefaultWorkflowIdReusePolicy(&request.WorkflowIdReusePolicy)
1758+
enums.SetDefaultWorkflowIdConflictPolicy(&request.WorkflowIdConflictPolicy, enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING)
1759+
17301760
if err := backoff.ValidateSchedule(request.GetCronSchedule()); err != nil {
17311761
return nil, err
17321762
}
@@ -1740,8 +1770,6 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
17401770
request.SearchAttributes = sa
17411771
}
17421772

1743-
enums.SetDefaultWorkflowIdReusePolicy(&request.WorkflowIdReusePolicy)
1744-
17451773
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName)
17461774
if err != nil {
17471775
return nil, err
@@ -1756,7 +1784,10 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
17561784
return nil, err
17571785
}
17581786

1759-
return &workflowservice.SignalWithStartWorkflowExecutionResponse{RunId: resp.GetRunId()}, nil
1787+
return &workflowservice.SignalWithStartWorkflowExecutionResponse{
1788+
RunId: resp.GetRunId(),
1789+
Started: resp.Started,
1790+
}, nil
17601791
}
17611792

17621793
// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskCompleted event(exclusive).
@@ -2641,16 +2672,17 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
26412672
wh.addInitialScheduleMemo(request, input)
26422673
// Create StartWorkflowExecutionRequest
26432674
startReq := &workflowservice.StartWorkflowExecutionRequest{
2644-
Namespace: request.Namespace,
2645-
WorkflowId: workflowID,
2646-
WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType},
2647-
TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue},
2648-
Input: inputPayloads,
2649-
Identity: request.Identity,
2650-
RequestId: request.RequestId,
2651-
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
2652-
Memo: request.Memo,
2653-
SearchAttributes: sa,
2675+
Namespace: request.Namespace,
2676+
WorkflowId: workflowID,
2677+
WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType},
2678+
TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue},
2679+
Input: inputPayloads,
2680+
Identity: request.Identity,
2681+
RequestId: request.RequestId,
2682+
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
2683+
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
2684+
Memo: request.Memo,
2685+
SearchAttributes: sa,
26542686
}
26552687
_, err = wh.historyClient.StartWorkflowExecution(ctx, common.CreateHistoryStartWorkflowRequest(namespaceID.String(), startReq, nil, time.Now().UTC()))
26562688

@@ -4032,6 +4064,22 @@ func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue, namespace
40324064
return nil
40334065
}
40344066

4067+
func (wh *WorkflowHandler) validateWorkflowIdReusePolicy(
4068+
ns namespace.Name,
4069+
reusePolicy enumspb.WorkflowIdReusePolicy,
4070+
conflictPolicy enumspb.WorkflowIdConflictPolicy,
4071+
) error {
4072+
if conflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED {
4073+
if !wh.config.EnableWorkflowIdConflictPolicy(ns.String()) {
4074+
return errWorkflowIdConflictPolicyNotAllowed
4075+
}
4076+
if reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING {
4077+
return errIncompatibleIDReusePolicy
4078+
}
4079+
}
4080+
return nil
4081+
}
4082+
40354083
type buildIdAndFlag interface {
40364084
GetBuildId() string
40374085
GetUseVersioning() bool

service/frontend/workflow_handler_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/golang/mock/gomock"
3535
"github.com/pborman/uuid"
3636
"github.com/stretchr/testify/assert"
37+
"github.com/stretchr/testify/mock"
3738
"github.com/stretchr/testify/require"
3839
"github.com/stretchr/testify/suite"
3940
batchpb "go.temporal.io/api/batch/v1"
@@ -611,10 +612,117 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidStartDel
611612
RequestId: uuid.New(),
612613
WorkflowStartDelay: durationpb.New(-10 * time.Second),
613614
}
615+
614616
_, err := wh.StartWorkflowExecution(context.Background(), startWorkflowExecutionRequest)
617+
615618
s.ErrorIs(err, errInvalidWorkflowStartDelaySeconds)
616619
}
617620

621+
func (s *workflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_TerminateIfRunning() {
622+
config := s.newConfig()
623+
config.EnableWorkflowIdConflictPolicy = func(string) bool { return true }
624+
wh := s.getWorkflowHandler(config)
625+
req := &workflowservice.StartWorkflowExecutionRequest{
626+
WorkflowId: testWorkflowID,
627+
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
628+
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
629+
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
630+
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
631+
}
632+
633+
resp, err := wh.StartWorkflowExecution(context.Background(), req)
634+
635+
s.Nil(resp)
636+
s.Equal(err, serviceerror.NewInvalidArgument(
637+
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy."))
638+
}
639+
640+
func (s *workflowHandlerSuite) TestStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() {
641+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil)
642+
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespace.NewID(), nil)
643+
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), mock.MatchedBy(
644+
func(request *historyservice.StartWorkflowExecutionRequest) bool {
645+
return request.StartRequest.WorkflowIdReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE &&
646+
request.StartRequest.WorkflowIdConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL
647+
},
648+
)).Return(&historyservice.StartWorkflowExecutionResponse{Started: true}, nil)
649+
650+
wh := s.getWorkflowHandler(s.newConfig())
651+
req := &workflowservice.StartWorkflowExecutionRequest{
652+
WorkflowId: testWorkflowID,
653+
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
654+
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
655+
// both policies are not specified
656+
}
657+
658+
resp, err := wh.StartWorkflowExecution(context.Background(), req)
659+
s.NoError(err)
660+
s.True(resp.Started)
661+
}
662+
663+
func (s *workflowHandlerSuite) TestSignalWithStartWorkflowExecution_InvalidWorkflowIdConflictPolicy() {
664+
config := s.newConfig()
665+
config.EnableWorkflowIdConflictPolicy = func(string) bool { return true }
666+
wh := s.getWorkflowHandler(config)
667+
req := &workflowservice.SignalWithStartWorkflowExecutionRequest{
668+
WorkflowId: testWorkflowID,
669+
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
670+
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
671+
SignalName: "SIGNAL",
672+
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
673+
}
674+
675+
resp, err := wh.SignalWithStartWorkflowExecution(context.Background(), req)
676+
677+
s.Nil(resp)
678+
s.Equal(err, serviceerror.NewInvalidArgument(
679+
"Invalid WorkflowIDConflictPolicy: WORKFLOW_ID_CONFLICT_POLICY_FAIL is not supported for this operation."))
680+
}
681+
682+
func (s *workflowHandlerSuite) TestSignalWithStartWorkflowExecution_InvalidWorkflowIdReusePolicy_TerminateIfRunning() {
683+
config := s.newConfig()
684+
config.EnableWorkflowIdConflictPolicy = func(string) bool { return true }
685+
wh := s.getWorkflowHandler(config)
686+
req := &workflowservice.SignalWithStartWorkflowExecutionRequest{
687+
WorkflowId: testWorkflowID,
688+
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
689+
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
690+
SignalName: "SIGNAL",
691+
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
692+
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
693+
}
694+
695+
resp, err := wh.SignalWithStartWorkflowExecution(context.Background(), req)
696+
697+
s.Nil(resp)
698+
s.Equal(err, serviceerror.NewInvalidArgument(
699+
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy."))
700+
}
701+
702+
func (s *workflowHandlerSuite) TestSignalWithStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() {
703+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil)
704+
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespace.NewID(), nil)
705+
s.mockHistoryClient.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), mock.MatchedBy(
706+
func(request *historyservice.SignalWithStartWorkflowExecutionRequest) bool {
707+
return request.SignalWithStartRequest.WorkflowIdReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE &&
708+
request.SignalWithStartRequest.WorkflowIdConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
709+
},
710+
)).Return(&historyservice.SignalWithStartWorkflowExecutionResponse{Started: true}, nil)
711+
712+
wh := s.getWorkflowHandler(s.newConfig())
713+
req := &workflowservice.SignalWithStartWorkflowExecutionRequest{
714+
WorkflowId: testWorkflowID,
715+
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
716+
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
717+
SignalName: "SIGNAL",
718+
// both policies are not specified
719+
}
720+
721+
resp, err := wh.SignalWithStartWorkflowExecution(context.Background(), req)
722+
s.NoError(err)
723+
s.True(resp.Started)
724+
}
725+
618726
func (s *workflowHandlerSuite) TestRegisterNamespace_Failure_InvalidArchivalURI() {
619727
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(false)
620728
s.mockArchivalMetadata.EXPECT().GetHistoryConfig().Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("enabled"), dc.GetBoolPropertyFn(true), "disabled", "random URI"))

service/history/api/signalwithstartworkflow/api.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,25 @@ func Invoke(
7171
return nil, err
7272
}
7373

74-
// Start workflow and signal
74+
api.MigrateWorkflowIdReusePolicyForRunningWorkflow(
75+
&signalWithStartRequest.SignalWithStartRequest.WorkflowIdReusePolicy,
76+
&signalWithStartRequest.SignalWithStartRequest.WorkflowIdConflictPolicy)
77+
7578
startRequest := ConvertToStartRequest(
7679
namespaceID,
7780
signalWithStartRequest.SignalWithStartRequest,
7881
shard.GetTimeSource().Now(),
7982
)
8083
request := startRequest.StartRequest
84+
8185
api.OverrideStartWorkflowExecutionRequest(request, metrics.HistorySignalWithStartWorkflowExecutionScope, shard, shard.GetMetricsHandler())
86+
8287
err = api.ValidateStartWorkflowExecutionRequest(ctx, request, shard, namespaceEntry, "SignalWithStartWorkflowExecution")
8388
if err != nil {
8489
return nil, err
8590
}
86-
runID, err := SignalWithStartWorkflow(
91+
92+
runID, started, err := SignalWithStartWorkflow(
8793
ctx,
8894
shard,
8995
namespaceEntry,
@@ -95,6 +101,7 @@ func Invoke(
95101
return nil, err
96102
}
97103
return &historyservice.SignalWithStartWorkflowExecutionResponse{
98-
RunId: runID,
104+
RunId: runID,
105+
Started: started,
99106
}, nil
100107
}

service/history/api/signalwithstartworkflow/convert.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func ConvertToStartRequest(
5151
Identity: request.GetIdentity(),
5252
RequestId: request.GetRequestId(),
5353
WorkflowIdReusePolicy: request.GetWorkflowIdReusePolicy(),
54+
WorkflowIdConflictPolicy: request.GetWorkflowIdConflictPolicy(),
5455
RetryPolicy: request.GetRetryPolicy(),
5556
CronSchedule: request.GetCronSchedule(),
5657
Memo: request.GetMemo(),

0 commit comments

Comments
 (0)