Skip to content

Commit

Permalink
Fix heartbeats recording on activity failure (#578)
Browse files Browse the repository at this point in the history
Fixes #521
  • Loading branch information
cretz authored Oct 8, 2021
1 parent 2c0af69 commit 662d4c6
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,10 +1771,12 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
activityMetricsScope := metrics.GetMetricsScopeForActivity(ath.metricsScope, workflowType, activityType, ath.taskQueueName)
ctx := WithActivityTask(canCtx, t, taskQueue, invoker, ath.logger, activityMetricsScope, ath.dataConverter, ath.workerStopCh, ath.contextPropagators, ath.tracer)

defer func() {
// We must capture the context here because it is changed later to one that is
// cancelled when the activity is done
defer func(ctx context.Context) {
_, activityCompleted := result.(*workflowservice.RespondActivityTaskCompletedRequest)
invoker.Close(ctx, !activityCompleted) // flush buffered heartbeat if activity was not successfully completed.
}()
}(ctx)

activityImplementation := ath.getActivity(activityType)
if activityImplementation == nil {
Expand Down
26 changes: 26 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ func (a *Activities) LongRunningHeartbeat(ctx context.Context, delay time.Durati
return nil
}

func (a *Activities) HeartbeatTwiceAndFailNTimes(
ctx context.Context,
times int,
id string,
) (heartbeatCounts int, err error) {
// Get details
if activity.HasHeartbeatDetails(ctx) {
if err = activity.GetHeartbeatDetails(ctx, &heartbeatCounts); err != nil {
return
}
}

// Heartbeat twice, incrementing before each
heartbeatCounts++
activity.RecordHeartbeat(ctx, heartbeatCounts)
heartbeatCounts++
activity.RecordHeartbeat(ctx, heartbeatCounts)

// Set error if haven't reached enough times
a.append(id)
if a.invokedCount(id) <= times {
err = errFailOnPurpose
}
return
}

func (a *Activities) fail(_ context.Context) error {
a.append("fail")
return errFailOnPurpose
Expand Down
12 changes: 12 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,18 @@ func (ts *IntegrationTestSuite) TestLongRunningActivityWithHBAndGrpcRetries() {
ts.assertReportedOperationCount("temporal_request_attempt", "RecordActivityTaskHeartbeat", int(totalHeartbeats+3))
}

func (ts *IntegrationTestSuite) TestHeartbeatOnActivityFailure() {
var heartbeatCounts int
err := ts.executeWorkflow("test-heartbeat-on-activity-failure",
ts.workflows.ActivityHeartbeatWithRetry, &heartbeatCounts)
ts.NoError(err)
// Final count should be 6 because the activity is called 3 times (first 2
// fail) and each activity heartbeats twice. Before fixing a bug where the
// gRPC call wasn't made on activity failure, this was 4 because the first 2
// failing activities didn't have their second heartbeats recorded.
ts.Equal(6, heartbeatCounts)
}

func (ts *IntegrationTestSuite) TestContinueAsNew() {
var result int
err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskQueueName)
Expand Down
13 changes: 13 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,18 @@ func (w *Workflows) ActivityRetryOnHBTimeout(ctx workflow.Context) ([]string, er
return []string{"heartbeatAndSleep", "heartbeatAndSleep", "heartbeatAndSleep"}, nil
}

func (w *Workflows) ActivityHeartbeatWithRetry(ctx workflow.Context) (heartbeatCounts int, err error) {
// Make retries fast
opts := w.defaultActivityOptions()
opts.RetryPolicy = &temporal.RetryPolicy{InitialInterval: 5 * time.Millisecond, BackoffCoefficient: 1}
ctx = workflow.WithActivityOptions(ctx, opts)

// Fail twice then succeed
err = workflow.ExecuteActivity(ctx, "HeartbeatTwiceAndFailNTimes", 2,
"activity-heartbeat-"+workflow.GetInfo(ctx).WorkflowExecution.ID).Get(ctx, &heartbeatCounts)
return
}

func (w *Workflows) ContinueAsNew(ctx workflow.Context, count int, taskQueue string) (int, error) {
tq := workflow.GetInfo(ctx).TaskQueueName
if tq != taskQueue {
Expand Down Expand Up @@ -1265,6 +1277,7 @@ func (w *Workflows) WaitSignalReturnParam(ctx workflow.Context, v interface{}) (
func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
worker.RegisterWorkflow(w.ActivityHeartbeatWithRetry)
worker.RegisterWorkflow(w.ActivityRetryOnError)
worker.RegisterWorkflow(w.CallUnregisteredActivityRetry)
worker.RegisterWorkflow(w.ActivityRetryOnHBTimeout)
Expand Down

0 comments on commit 662d4c6

Please sign in to comment.