Skip to content

Commit

Permalink
Disable timeout retries during session creation (#746)
Browse files Browse the repository at this point in the history
Fixes #722
  • Loading branch information
cretz authored Mar 7, 2022
1 parent 2c5ed0f commit 8cb1963
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 40 deletions.
3 changes: 3 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work

params.MaxConcurrentActivityTaskQueuePollers = 1
params.TaskQueue = creationTaskqueue
// Although we have session token bucket to limit session size across creation
// and recreation, we also limit it here for creation only
params.ConcurrentActivityExecutionSize = maxConcurrentSessionExecutionSize
creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket())

return &sessionWorker{
Expand Down
17 changes: 11 additions & 6 deletions internal/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,16 @@ func createSession(ctx Context, creationTaskqueue string, options *SessionOption
}

taskqueueChan := GetSignalChannel(ctx, sessionID) // use sessionID as channel name
// Retry is only needed when creating new session and the error returned is NewApplicationError(errTooManySessionsMsg)
// Retry is only needed when creating new session and the error returned is
// NewApplicationError(errTooManySessionsMsg). Therefore we make sure to
// disable retrying for start-to-close and heartbeat timeouts which can occur
// when attempting to retry a create-session on a different worker.
retryPolicy := &RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 1.1,
MaximumInterval: time.Second * 10,
MaximumAttempts: 0,
InitialInterval: time.Second,
BackoffCoefficient: 1.1,
MaximumInterval: time.Second * 10,
MaximumAttempts: 0,
NonRetryableErrorTypes: []string{"TemporalTimeout:StartToClose", "TemporalTimeout:Heartbeat"},
}

heartbeatTimeout := defaultSessionHeartbeatTimeout
Expand Down Expand Up @@ -523,7 +527,8 @@ func newSessionEnvironment(resourceID string, concurrentSessionExecutionSize int

func (env *sessionEnvironmentImpl) CreateSession(_ context.Context, sessionID string) (<-chan struct{}, error) {
if !env.sessionTokenBucket.getToken() {
return nil, NewApplicationError(errTooManySessionsMsg, "", true, nil)
// This error must be retryable so sessions can keep trying to be created
return nil, NewApplicationError(errTooManySessionsMsg, "", false, nil)
}

env.Lock()
Expand Down
94 changes: 81 additions & 13 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ func (ts *IntegrationTestSuite) SetupTest() {
}
}

if strings.Contains(ts.T().Name(), "TestSessionOnWorkerFailure") {
// We disable sticky execution here since we kill the worker and restart it
// and sticky execution adds a 5s penalty
worker.SetStickyWorkflowCacheSize(0)
}

if strings.Contains(ts.T().Name(), "LocalActivityWorkerOnly") {
options.LocalActivityWorkerOnly = true
}
Expand Down Expand Up @@ -1736,11 +1742,11 @@ func (ts *IntegrationTestSuite) TestAdvancedPostCancellationChildWithDone() {
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) waitForQueryTrue(run client.WorkflowRun, query string) {
func (ts *IntegrationTestSuite) waitForQueryTrue(run client.WorkflowRun, query string, args ...interface{}) {
var result bool
for i := 0; !result && i < 30; i++ {
time.Sleep(50 * time.Millisecond)
val, err := ts.client.QueryWorkflow(context.Background(), run.GetID(), run.GetRunID(), query)
val, err := ts.client.QueryWorkflow(context.Background(), run.GetID(), run.GetRunID(), query, args...)
// Ignore query failed because it means query may not be registered yet
var queryFailed *serviceerror.QueryFailed
if errors.As(err, &queryFailed) {
Expand Down Expand Up @@ -1967,7 +1973,7 @@ func (ts *IntegrationTestSuite) TestLocalActivityStringNameReplay() {
ts.NoError(replayer.ReplayWorkflowHistory(nil, &history))
}

func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSize() {
func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSizeNoWait() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ts.activities.manualStopContext = ctx
Expand All @@ -1976,26 +1982,88 @@ func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSize() {
// schedule-to-start of the session creation worker)
err := ts.executeWorkflow("test-max-concurrent-session-execution-size", ts.workflows.AdvancedSession, nil,
&AdvancedSessionParams{SessionCount: 4, SessionCreationTimeout: 2 * time.Second})
// Confirm it failed on the 4th session
// Confirm it failed on the 4th session because it took to long to create
ts.Error(err)
ts.Truef(strings.Contains(err.Error(), "failed creating session #4"), "wrong error, got: %v", err)
ts.Truef(strings.Contains(err.Error(), "activity ScheduleToStart timeout"), "wrong error, got: %v", err)
}

func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSizeForRecreation() {
func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSizeWithRecreationAndWait() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var manualCancel context.CancelFunc
ts.activities.manualStopContext, manualCancel = context.WithCancel(ctx)
// Create 2 workflows each wanting to create 2 sessions (second session on
// each is recreation to ensure counter works). This will hang with one
// creating 2 and another creating 1 and waiting. Then when we send the signal
// that was done creating sessions, they will complete theirs allowing the
// other pending creation to complete.
run1, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-max-concurrent-session-execution-size-recreate-1"),
ts.workflows.AdvancedSession, &AdvancedSessionParams{
SessionCount: 2,
SessionCreationTimeout: 40 * time.Second,
RecreateAtIndex: 1,
})
ts.NoError(err)
// Wait until sessions created
ts.waitForQueryTrue(run1, "sessions-created-equals", 2)

// Now create second and wait until create pending after 1
run2, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-max-concurrent-session-execution-size-recreate-2"),
ts.workflows.AdvancedSession, &AdvancedSessionParams{
SessionCount: 2,
SessionCreationTimeout: 40 * time.Second,
RecreateAtIndex: 1,
})
ts.NoError(err)
// Wait until sessions created
ts.waitForQueryTrue(run2, "sessions-created-equals-and-pending", 1)

// Now let the activities complete which lets run1 complete and free up
// sessions for run2
manualCancel()
ts.NoError(run1.Get(ctx, nil))
ts.NoError(run2.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ts.activities.manualStopContext = ctx
// Same as TestMaxConcurrentSessionExecutionSize above, but we want to start
// recreating at session 2 (index 1)
err := ts.executeWorkflow("test-max-concurrent-session-execution-size-recreate", ts.workflows.AdvancedSession, nil,
// We want to start a single long-running activity in a session
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-session-worker-failure"),
ts.workflows.AdvancedSession,
&AdvancedSessionParams{
SessionCount: 4,
SessionCreationTimeout: 2 * time.Second,
UseRecreationFrom: 1,
SessionCount: 1,
SessionCreationTimeout: 10 * time.Second,
})
// Confirm it failed on the 4th session
ts.NoError(err)

// Wait until sessions started
ts.waitForQueryTrue(run, "sessions-created-equals", 1)

// Kill the worker
ts.worker.Stop()
ts.workerStopped = true

// Now create a new worker on that same task queue to resume the work of the
// workflow
nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{DisableStickyExecution: true})
ts.registerWorkflowsAndActivities(nextWorker)
ts.NoError(nextWorker.Start())
defer nextWorker.Stop()

// Get the result of the workflow run now
err = run.Get(ctx, nil)
// We expect the activity to timeout (which shows as cancelled in Go) since
// the original worker is no longer present that was running the activity.
// Before the issue that was fixed when this test was written, this would hang
// because sessions would inadvertently retry.
ts.Error(err)
ts.Truef(strings.Contains(err.Error(), "failed recreating session #4"), "wrong error, got: %v", err)
ts.Truef(strings.HasSuffix(err.Error(), "activity on session #1 failed: canceled"), "wrong error, got: %v", err)
}

func (ts *IntegrationTestSuite) registerNamespace() {
Expand Down
50 changes: 29 additions & 21 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,30 +1162,38 @@ func (w *Workflows) BasicSession(ctx workflow.Context) ([]string, error) {
type AdvancedSessionParams struct {
SessionCount int
SessionCreationTimeout time.Duration
// What session index at which to start using as recreation sessions of the
// last one instead of regular creation. Ignored if 0.
UseRecreationFrom int
// Just a single index to do recreation. Ignored if 0.
RecreateAtIndex int
}

func (w Workflows) AdvancedSession(ctx workflow.Context, params *AdvancedSessionParams) error {
// Create a query to know sessions started
sessionsStarted := false
err := workflow.SetQueryHandler(ctx, "sessions-started", func() (bool, error) { return sessionsStarted, nil })
if err != nil {
return err
}

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
// No retry on activities
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})

// Create the sessions and their activities
// Create a query to know sessions pending or started
var sessionCreatePending bool
var sessionsCreated int
err := workflow.SetQueryHandler(ctx, "sessions-created-equals", func(expected int) (bool, error) {
return sessionsCreated == expected, nil
})
if err != nil {
return err
}
err = workflow.SetQueryHandler(ctx, "sessions-created-equals-and-pending", func(expected int) (bool, error) {
return sessionsCreated == expected && sessionCreatePending, nil
})
if err != nil {
return err
}

// Create the sessions
sel := workflow.NewSelector(ctx)
var actErr error
var act Activities
var lastCreatedSessionInfo *workflow.SessionInfo
var lastSessionCtx workflow.Context
for i := 0; i < params.SessionCount; i++ {
i := i
var sessionCtx workflow.Context
Expand All @@ -1197,22 +1205,23 @@ func (w Workflows) AdvancedSession(ctx workflow.Context, params *AdvancedSession
}
// Do a create unless at recreate index
verb := "creating"
if params.UseRecreationFrom == 0 || i < params.UseRecreationFrom {
sessionCtx, err = workflow.CreateSession(ctx, opts)
if err == nil {
lastCreatedSessionInfo = workflow.GetSessionInfo(sessionCtx)
}
} else {
sessionCtx, err = workflow.RecreateSession(ctx, lastCreatedSessionInfo.GetRecreateToken(), opts)
sessionCreatePending = true
if params.RecreateAtIndex > 0 && i == params.RecreateAtIndex {
sessionCtx, err = workflow.RecreateSession(ctx, workflow.GetSessionInfo(lastSessionCtx).GetRecreateToken(), opts)
verb = "recreating"
} else {
sessionCtx, err = workflow.CreateSession(ctx, opts)
}
sessionCreatePending = false
if err != nil {
// We use the error message instead of wrapping the error itself
// because unfortunately the error converter unwraps some like
// cancellation
return fmt.Errorf("failed %v session #%v: %v", verb, i+1, err.Error())
}
sessionsCreated++
defer workflow.CompleteSession(sessionCtx)
lastSessionCtx = sessionCtx

// Run activity in session
sel.AddFuture(workflow.ExecuteActivity(sessionCtx, act.WaitForManualStop), func(f workflow.Future) {
Expand All @@ -1224,9 +1233,8 @@ func (w Workflows) AdvancedSession(ctx workflow.Context, params *AdvancedSession
}
})
}
sessionsStarted = true

// Wait for all
// Wait for all activities
for i := 0; i < params.SessionCount; i++ {
sel.Select(ctx)
if actErr != nil {
Expand Down
9 changes: 9 additions & 0 deletions workflow/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ var ErrSessionFailed = internal.ErrSessionFailed
// // Handle activity error
// }
// ... // execute more activities using sessionCtx
//
// NOTE: Session recreation via RecreateSession may not work properly across worker fail/crash before Temporal server
// version v1.15.1.
func CreateSession(ctx Context, sessionOptions *SessionOptions) (Context, error) {
return internal.CreateSession(ctx, sessionOptions)
}
Expand All @@ -111,6 +114,9 @@ func CreateSession(ctx Context, sessionOptions *SessionOptions) (Context, error)
// The main usage of RecreateSession is for long sessions that are splited into multiple runs. At the end of
// one run, complete the current session, get recreateToken from sessionInfo by calling SessionInfo.GetRecreateToken()
// and pass the token to the next run. In the new run, session can be recreated using that token.
//
// NOTE: Session recreation via RecreateSession may not work properly across worker fail/crash before Temporal server
// version v1.15.1.
func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionOptions) (Context, error) {
return internal.RecreateSession(ctx, recreateToken, sessionOptions)
}
Expand All @@ -122,6 +128,9 @@ func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionO
// After a session has completed, user can continue to use the context, but the activities will be scheduled
// on the normal taskQueue (as user specified in ActivityOptions) and may be picked up by another worker since
// it's not in a session.
//
// Due to internal logic, this call must be made in the same coroutine CreateSession/RecreateSession were
// called in.
func CompleteSession(ctx Context) {
internal.CompleteSession(ctx)
}
Expand Down

0 comments on commit 8cb1963

Please sign in to comment.