Skip to content

Commit

Permalink
Add ability to disable workflow worker (#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jan 18, 2022
1 parent 147569c commit 0a00199
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 11 deletions.
36 changes: 25 additions & 11 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,11 +849,17 @@ type AggregatedWorker struct {

// RegisterWorkflow registers workflow implementation with the AggregatedWorker
func (aw *AggregatedWorker) RegisterWorkflow(w interface{}) {
if aw.workflowWorker == nil {
panic("workflow worker disabled, cannot register workflow")
}
aw.registry.RegisterWorkflow(w)
}

// RegisterWorkflowWithOptions registers workflow implementation with the AggregatedWorker
func (aw *AggregatedWorker) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions) {
if aw.workflowWorker == nil {
panic("workflow worker disabled, cannot register workflow")
}
aw.registry.RegisterWorkflowWithOptions(w, options)
}

Expand Down Expand Up @@ -882,8 +888,10 @@ func (aw *AggregatedWorker) Start() error {
if !util.IsInterfaceNil(aw.activityWorker) {
if err := aw.activityWorker.Start(); err != nil {
// stop workflow worker.
if aw.workflowWorker.worker.isWorkerStarted {
aw.workflowWorker.Stop()
if !util.IsInterfaceNil(aw.workflowWorker) {
if aw.workflowWorker.worker.isWorkerStarted {
aw.workflowWorker.Stop()
}
}
return err
}
Expand All @@ -893,11 +901,15 @@ func (aw *AggregatedWorker) Start() error {
aw.logger.Info("Starting session worker")
if err := aw.sessionWorker.Start(); err != nil {
// stop workflow worker and activity worker.
if aw.workflowWorker.worker.isWorkerStarted {
aw.workflowWorker.Stop()
if !util.IsInterfaceNil(aw.workflowWorker) {
if aw.workflowWorker.worker.isWorkerStarted {
aw.workflowWorker.Stop()
}
}
if aw.activityWorker.worker.isWorkerStarted {
aw.activityWorker.Stop()
if !util.IsInterfaceNil(aw.activityWorker) {
if aw.activityWorker.worker.isWorkerStarted {
aw.activityWorker.Stop()
}
}
return err
}
Expand Down Expand Up @@ -1312,11 +1324,13 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke

// workflow factory.
var workflowWorker *workflowWorker
testTags := getTestTags(options.BackgroundActivityContext)
if len(testTags) > 0 {
workflowWorker = newWorkflowWorkerWithPressurePoints(client.workflowService, workerParams, testTags, registry)
} else {
workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry)
if !options.DisableWorkflowWorker {
testTags := getTestTags(options.BackgroundActivityContext)
if len(testTags) > 0 {
workflowWorker = newWorkflowWorkerWithPressurePoints(client.workflowService, workerParams, testTags, registry)
} else {
workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry)
}
}

// activity types.
Expand Down
11 changes: 11 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2632,3 +2632,14 @@ func TestIsNonRetriableError(t *testing.T) {
require.Equal(t, test.expected, isNonRetriableError(test.err))
}
}

func TestWorkerRegisterDisabledWorkflow(t *testing.T) {
// Expect panic
var recovered interface{}
func() {
defer func() { recovered = recover() }()
worker := NewAggregatedWorker(&WorkflowClient{}, "some-task-queue", WorkerOptions{DisableWorkflowWorker: true})
worker.RegisterWorkflow(testReplayWorkflow)
}()
require.Equal(t, "workflow worker disabled, cannot register workflow", recovered)
}
6 changes: 6 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ type (
// default: 1000
MaxConcurrentSessionExecutionSize int

// Optional: If set to true, a workflow worker is not started for this
// worker and workflows cannot be registered with this worker. Use this if
// you only want your worker to execute activities.
// default: false
DisableWorkflowWorker bool

// Optional: If set to true worker would only handle workflow tasks and local activities.
// Non-local activities will not be executed by this worker.
// default: false
Expand Down
18 changes: 18 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,24 @@ func (ts *IntegrationTestSuite) TestTallyScopeAccess() {
assertHistDuration("some_histogram", 5*time.Second, 2)
}

func (ts *IntegrationTestSuite) TestActivityOnlyWorker() {
// Start worker
taskQueue := "test-activity-only-queue-" + uuid.New()
activityOnlyWorker := worker.New(ts.client, taskQueue, worker.Options{DisableWorkflowWorker: true})
a := newActivities()
activityOnlyWorker.RegisterActivity(a.activities2.ToUpper)
ts.NoError(activityOnlyWorker.Start())
defer activityOnlyWorker.Stop()

// Exec workflow on primary worker, confirm activity executed
var result string
err := ts.executeWorkflow("test-activity-only-worker", ts.workflows.ExecuteRemoteActivityToUpper, &result,
taskQueue, "fOobAr")
ts.NoError(err)
ts.Equal("FOOBAR", result)
ts.Equal(1, a.invokedCount("toUpper"))
}

func (ts *IntegrationTestSuite) TestReturnCancelError() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
11 changes: 11 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,16 @@ func (w *Workflows) TooFewParams(
return ret, workflow.ExecuteActivity(ctx, a.TooFewParams, param1).Get(ctx, &ret.Child)
}

func (w *Workflows) ExecuteRemoteActivityToUpper(ctx workflow.Context, taskQueue, str string) (string, error) {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskQueue: taskQueue,
ScheduleToCloseTimeout: 5 * time.Second,
})
var resp string
err := workflow.ExecuteActivity(ctx, (*Activities2).ToUpper, str).Get(ctx, &resp)
return resp, err
}

func (w *Workflows) ReturnCancelError(
ctx workflow.Context,
fromActivity bool,
Expand Down Expand Up @@ -1701,6 +1711,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.AdvancedPostCancellation)
worker.RegisterWorkflow(w.AdvancedPostCancellationChildWithDone)
worker.RegisterWorkflow(w.TooFewParams)
worker.RegisterWorkflow(w.ExecuteRemoteActivityToUpper)
worker.RegisterWorkflow(w.ReturnCancelError)

worker.RegisterWorkflow(w.child)
Expand Down

0 comments on commit 0a00199

Please sign in to comment.