From ab1c356129ffaf8cb10e8f3e4c00eb68dfaa930e Mon Sep 17 00:00:00 2001 From: Rodrigo Zhou <2068124+rodrigozhou@users.noreply.github.com> Date: Tue, 25 Feb 2025 12:42:49 -0600 Subject: [PATCH] Block using conflict policy UseExisting for Nexus WorkflowRunOperation (#1845) * Block using conflict policy UseExisting for Nexus WorkflowRunOperation * address comments --- temporalnexus/operation.go | 9 ++++++ test/nexus_test.go | 60 +++++++++++++++++++++++++------------- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/temporalnexus/operation.go b/temporalnexus/operation.go index 285771149..f25de2381 100644 --- a/temporalnexus/operation.go +++ b/temporalnexus/operation.go @@ -389,6 +389,15 @@ func ExecuteUntypedWorkflow[R any]( internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links) internal.SetOnConflictOptionsOnStartWorkflowOptions(&startWorkflowOptions) + // TODO(rodrigozhou): temporarily blocking conflict policy UseExisting. + if startWorkflowOptions.WorkflowIDConflictPolicy == enums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING { + return nil, &nexus.HandlerError{ + Type: nexus.HandlerErrorTypeInternal, + RetryBehavior: nexus.HandlerErrorRetryBehaviorNonRetryable, + Cause: errors.New("workflow ID conflict policy UseExisting is not supported for Nexus WorkflowRunOperation"), + } + } + // This makes sure that ExecuteWorkflow will respect the WorkflowIDConflictPolicy, ie., if the // conflict policy is to fail (default value), then ExecuteWorkflow will return an error if the // workflow already running. For Nexus, this ensures that operation has only started successfully diff --git a/test/nexus_test.go b/test/nexus_test.go index 9faa782f5..5c0c06db6 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -994,7 +994,6 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) { fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{}) var exec workflow.NexusOperationExecution err := fut.GetNexusOperationExecution().Get(ctx, &exec) - execOpCh.Send(ctx, nil) if err != nil { output.CntErr++ var handlerErr *nexus.HandlerError @@ -1006,9 +1005,13 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) { } else if appErr.Type() != "WorkflowExecutionAlreadyStarted" { retError = err } + } else { + output.CntOk++ + } + execOpCh.Send(ctx, nil) + if err != nil { return } - output.CntOk++ var res string err = fut.Get(ctx, &res) if err != nil { @@ -1023,8 +1026,10 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) { execOpCh.Receive(ctx, nil) } - // signal handler workflow so it will complete - workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil) + if output.CntOk > 0 { + // signal handler workflow so it will complete + workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil) + } wg.Wait(ctx) return output, retError } @@ -1040,19 +1045,24 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) { testCases := []struct { input string - checkOutput func(t *testing.T, numCalls int, res CallerWfOutput) + checkOutput func(t *testing.T, numCalls int, res CallerWfOutput, err error) }{ { input: "conflict-policy-fail", - checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) { + require.NoError(t, err) require.EqualValues(t, 1, res.CntOk) require.EqualValues(t, numCalls-1, res.CntErr) }, }, { input: "conflict-policy-use-existing", - checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { - require.EqualValues(t, numCalls, res.CntOk) + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) { + // TODO(rodrigozhou): assert NotError and remove the comments below after UseExisting is + // unblocked. + require.ErrorContains(t, err, "workflow ID conflict policy UseExisting is not supported for Nexus WorkflowRunOperation") + // require.EqualValues(t, numCalls, res.CntOk) + // require.EqualValues(t, 0, res.CntErr) }, }, } @@ -1075,8 +1085,8 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) { ) require.NoError(t, err) var res CallerWfOutput - require.NoError(t, run.Get(ctx, &res)) - tc.checkOutput(t, numCalls, res) + err = run.Get(ctx, &res) + tc.checkOutput(t, numCalls, res, err) }) } } @@ -1687,7 +1697,6 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) { fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{}) var exec workflow.NexusOperationExecution err := fut.GetNexusOperationExecution().Get(ctx, &exec) - execOpCh.Send(ctx, nil) if err != nil { output.CntErr++ var handlerErr *nexus.HandlerError @@ -1699,9 +1708,13 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) { } else if appErr.Type() != "WorkflowExecutionAlreadyStarted" { retError = err } + } else { + output.CntOk++ + } + execOpCh.Send(ctx, nil) + if err != nil { return } - output.CntOk++ var res string err = fut.Get(ctx, &res) if err != nil { @@ -1716,7 +1729,10 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) { execOpCh.Receive(ctx, nil) } - workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil) + if output.CntOk > 0 { + // signal handler workflow so it will complete + workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil) + } wg.Wait(ctx) return output, retError } @@ -1726,19 +1742,24 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) { testCases := []struct { input string - checkOutput func(t *testing.T, numCalls int, res CallerWfOutput) + checkOutput func(t *testing.T, numCalls int, res CallerWfOutput, err error) }{ { input: "conflict-policy-fail", - checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) { + require.NoError(t, err) require.EqualValues(t, 1, res.CntOk) require.EqualValues(t, numCalls-1, res.CntErr) }, }, { input: "conflict-policy-use-existing", - checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { - require.EqualValues(t, numCalls, res.CntOk) + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) { + // TODO(rodrigozhou): assert NotError and remove the comments below after UseExisting is + // unblocked. + require.ErrorContains(t, err, "workflow ID conflict policy UseExisting is not supported for Nexus WorkflowRunOperation") + // require.EqualValues(t, numCalls, res.CntOk) + // require.EqualValues(t, 0, res.CntErr) }, }, } @@ -1754,10 +1775,9 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) { env.ExecuteWorkflow(callerWf, tc.input, numCalls) require.True(t, env.IsWorkflowCompleted()) - require.NoError(t, env.GetWorkflowError()) var res CallerWfOutput - require.NoError(t, env.GetWorkflowResult(&res)) - tc.checkOutput(t, numCalls, res) + err := env.GetWorkflowResult(&res) + tc.checkOutput(t, numCalls, res, err) }) } }