Skip to content

Commit

Permalink
Fix activity cancellation race when not cancelling workflow (#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Feb 28, 2022
1 parent a7d8c1a commit c82b73d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 14 deletions.
24 changes: 14 additions & 10 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,16 +910,20 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
delete(h.commands, commandID)
command := h.orderedCommands.Remove(orderedCmdEl)
// Sometimes commandsCancelledDuringWFCancellation was incremented before
// it was reset and sometimes not. We use the reset counter to see if we're
// still on the same iteration where we may have incremented it before.
switch command := command.(type) {
case *cancelActivityStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
case *cancelTimerCommandStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
// it was reset and sometimes not. We make sure the workflow execution is
// actually cancelling since that's the only time we increment the counter
// in the first place. Also, we use the reset counter to see if we're still
// on the same iteration where we may have incremented it before.
if h.workflowExecutionIsCancelling {
switch command := command.(type) {
case *cancelActivityStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
case *cancelTimerCommandStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ import (
)

type Activities struct {
client client.Client
mu sync.Mutex
invocations []string
activities2 *Activities2
client client.Client
mu sync.Mutex
invocations []string
activities2 *Activities2
manualStopContext context.Context
}

type Activities2 struct {
Expand Down Expand Up @@ -188,6 +189,14 @@ func (a *Activities) WaitForWorkerStop(ctx context.Context, timeout time.Duratio
}
}

func (a *Activities) WaitForManualStop(context.Context) error {
if a.manualStopContext == nil {
return fmt.Errorf("no manual context set")
}
<-a.manualStopContext.Done()
return nil
}

func (a *Activities) HeartbeatUntilCanceled(ctx context.Context, heartbeatFreq time.Duration) error {
for {
select {
Expand Down
35 changes: 35 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ func (ts *IntegrationTestSuite) SetupTest() {

if strings.Contains(ts.T().Name(), "Session") {
options.EnableSessionWorker = true
// Limit the session execution size
if strings.Contains(ts.T().Name(), "TestMaxConcurrentSessionExecutionSize") {
options.MaxConcurrentSessionExecutionSize = 3
}
}

if strings.Contains(ts.T().Name(), "LocalActivityWorkerOnly") {
Expand Down Expand Up @@ -1936,6 +1940,37 @@ func (ts *IntegrationTestSuite) TestReturnCancelError() {
ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED, resp.GetWorkflowExecutionInfo().GetStatus())
}

func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSize() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ts.activities.manualStopContext = ctx
// Since the test setup set the max execution size to 3, we want to try to
// create 4 sessions with a creation timeout of 2s (which is basically
// schedule-to-start of the session creation worker)
err := ts.executeWorkflow("test-max-concurrent-session-execution-size", ts.workflows.AdvancedSession, nil,
&AdvancedSessionParams{SessionCount: 4, SessionCreationTimeout: 2 * time.Second})
// Confirm it failed on the 4th session
ts.Error(err)
ts.Truef(strings.Contains(err.Error(), "failed creating session #4"), "wrong error, got: %v", err)
}

func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSizeForRecreation() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ts.activities.manualStopContext = ctx
// Same as TestMaxConcurrentSessionExecutionSize above, but we want to start
// recreating at session 2 (index 1)
err := ts.executeWorkflow("test-max-concurrent-session-execution-size-recreate", ts.workflows.AdvancedSession, nil,
&AdvancedSessionParams{
SessionCount: 4,
SessionCreationTimeout: 2 * time.Second,
UseRecreationFrom: 1,
})
// Confirm it failed on the 4th session
ts.Error(err)
ts.Truef(strings.Contains(err.Error(), "failed recreating session #4"), "wrong error, got: %v", err)
}

func (ts *IntegrationTestSuite) registerNamespace() {
client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr})
ts.NoError(err)
Expand Down
78 changes: 78 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,83 @@ func (w *Workflows) BasicSession(ctx workflow.Context) ([]string, error) {
return []string{"toUpper"}, nil
}

type AdvancedSessionParams struct {
SessionCount int
SessionCreationTimeout time.Duration
// What session index at which to start using as recreation sessions of the
// last one instead of regular creation. Ignored if 0.
UseRecreationFrom int
}

func (w Workflows) AdvancedSession(ctx workflow.Context, params *AdvancedSessionParams) error {
// Create a query to know sessions started
sessionsStarted := false
err := workflow.SetQueryHandler(ctx, "sessions-started", func() (bool, error) { return sessionsStarted, nil })
if err != nil {
return err
}

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
// No retry on activities
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})

// Create the sessions and their activities
sel := workflow.NewSelector(ctx)
var actErr error
var act Activities
var lastCreatedSessionInfo *workflow.SessionInfo
for i := 0; i < params.SessionCount; i++ {
i := i
var sessionCtx workflow.Context
var err error
opts := &workflow.SessionOptions{
CreationTimeout: params.SessionCreationTimeout,
ExecutionTimeout: 20 * time.Second,
HeartbeatTimeout: 2 * time.Second,
}
// Do a create unless at recreate index
verb := "creating"
if params.UseRecreationFrom == 0 || i < params.UseRecreationFrom {
sessionCtx, err = workflow.CreateSession(ctx, opts)
if err == nil {
lastCreatedSessionInfo = workflow.GetSessionInfo(sessionCtx)
}
} else {
sessionCtx, err = workflow.RecreateSession(ctx, lastCreatedSessionInfo.GetRecreateToken(), opts)
verb = "recreating"
}
if err != nil {
// We use the error message instead of wrapping the error itself
// because unfortunately the error converter unwraps some like
// cancellation
return fmt.Errorf("failed %v session #%v: %v", verb, i+1, err.Error())
}
defer workflow.CompleteSession(sessionCtx)

// Run activity in session
sel.AddFuture(workflow.ExecuteActivity(sessionCtx, act.WaitForManualStop), func(f workflow.Future) {
if err := f.Get(sessionCtx, nil); err != nil {
// We use the error message instead of wrapping the error itself
// because unfortunately the error converter unwraps some like
// cancellation
actErr = fmt.Errorf("activity on session #%v failed: %v", i+1, err.Error())
}
})
}
sessionsStarted = true

// Wait for all
for i := 0; i < params.SessionCount; i++ {
sel.Select(ctx)
if actErr != nil {
return actErr
}
}
return nil
}

func (w *Workflows) ActivityCompletionUsingID(ctx workflow.Context) ([]string, error) {
activityAOptions := workflow.ActivityOptions{
ActivityID: "A",
Expand Down Expand Up @@ -1664,6 +1741,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.Panicked)
worker.RegisterWorkflow(w.PanickedActivity)
worker.RegisterWorkflow(w.BasicSession)
worker.RegisterWorkflow(w.AdvancedSession)
worker.RegisterWorkflow(w.CancelActivity)
worker.RegisterWorkflow(w.CancelActivityImmediately)
worker.RegisterWorkflow(w.CancelChildWorkflow)
Expand Down

0 comments on commit c82b73d

Please sign in to comment.