Skip to content

Commit

Permalink
Make Worker.Stop reentrant (#906)
Browse files Browse the repository at this point in the history
Fixes #903
  • Loading branch information
cretz authored Sep 2, 2022
1 parent f4f143c commit e7905ec
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
8 changes: 7 additions & 1 deletion internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,13 @@ func (aw *AggregatedWorker) Run(interruptCh <-chan interface{}) error {

// Stop the worker.
func (aw *AggregatedWorker) Stop() {
close(aw.stopC)
// Only attempt stop if we haven't attempted before
select {
case <-aw.stopC:
return
default:
close(aw.stopC)
}

if !util.IsInterfaceNil(aw.workflowWorker) {
aw.workflowWorker.Stop()
Expand Down
13 changes: 13 additions & 0 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,19 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
s.Equal(2, localActivityCalledCount)
}

func (s *WorkersTestSuite) TestWorkerMultipleStop() {
s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&workflowservice.PollWorkflowTaskQueueResponse{}, nil).AnyTimes()
s.service.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&workflowservice.PollActivityTaskQueueResponse{}, nil).AnyTimes()
client := NewServiceClient(s.service, nil, ClientOptions{Identity: "multi-stop-identity"})
worker := NewAggregatedWorker(client, "multi-stop-tq", WorkerOptions{})
s.NoError(worker.Start())
worker.Stop()
worker.Stop()
}

func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID string) map[string]*commonpb.Payloads {
lamd := localActivityMarkerData{
ActivityID: activityID,
Expand Down

0 comments on commit e7905ec

Please sign in to comment.