Skip to content

Commit

Permalink
Make mutable side effects work during replay (#809)
Browse files Browse the repository at this point in the history
Fixes #803
  • Loading branch information
cretz authored May 25, 2022
1 parent 41cd853 commit 54f4148
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 16 deletions.
3 changes: 3 additions & 0 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,9 @@ func (h *commandsHelper) recordLocalActivityMarker(activityID string, details ma
}

func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
// In order to avoid duplicate marker IDs, we must append the counter to the
// user-provided ID
mutableSideEffectID = fmt.Sprintf("%v_%v", mutableSideEffectID, h.nextCommandEventID)
markerID := fmt.Sprintf("%v_%v", mutableSideEffectMarkerName, mutableSideEffectID)

mutableSideEffectIDPayload, err := dc.ToPayloads(mutableSideEffectID)
Expand Down
43 changes: 36 additions & 7 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ type (
unstartedLaTasks map[string]struct{}
openSessions map[string]*SessionInfo

// Set of mutable side effect IDs that are recorded on the next task for use
// during replay to determine whether a command should be created. The keys
// are the user-provided IDs + "_" + the command counter.
mutableSideEffectsRecorded map[string]bool

// LocalActivities have a separate, individual counter instead of relying on actual commandEventIDs.
// This is because command IDs are only incremented on activity completion, which breaks
// local activities that are spawned in parallel as they would all share the same command ID
Expand Down Expand Up @@ -690,6 +695,12 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa
if result, ok := wc.mutableSideEffect[id]; ok {
encodedResult := newEncodedValue(result, wc.GetDataConverter())
if wc.isReplay {
// During replay, we only generate a command if there was a known marker
// recorded on the next task. We have to append the current command
// counter to the user-provided ID to avoid duplicates.
if wc.mutableSideEffectsRecorded[fmt.Sprintf("%v_%v", id, wc.commandsHelper.nextCommandEventID)] {
return wc.recordMutableSideEffect(id, result)
}
return encodedResult
}

Expand Down Expand Up @@ -1132,17 +1143,35 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
case localActivityMarkerName:
err = weh.handleLocalActivityMarker(attributes.GetDetails(), attributes.GetFailure())
case mutableSideEffectMarkerName:
if sideEffectIDPayload, ok := attributes.GetDetails()[sideEffectMarkerIDName]; !ok {
var sideEffectIDWithCounterPayload, sideEffectDataPayload *commonpb.Payloads
if sideEffectIDWithCounterPayload = attributes.GetDetails()[sideEffectMarkerIDName]; sideEffectIDWithCounterPayload == nil {
err = fmt.Errorf("key %q: %w", sideEffectMarkerIDName, ErrMissingMarkerDataKey)
} else {
if sideEffectData, ok := attributes.GetDetails()[sideEffectMarkerDataName]; !ok {
}
if err == nil {
if sideEffectDataPayload = attributes.GetDetails()[sideEffectMarkerDataName]; sideEffectDataPayload == nil {
err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey)
} else {
var sideEffectID string
_ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID)
weh.mutableSideEffect[sideEffectID] = sideEffectData
}
}
var sideEffectIDWithCounter, sideEffectDataID string
var sideEffectDataContents commonpb.Payloads
if err == nil {
err = weh.dataConverter.FromPayloads(sideEffectIDWithCounterPayload, &sideEffectIDWithCounter)
}
// Side effect data is actually a wrapper of ID + data, so we need to
// extract the second value as the actual data
if err == nil {
err = weh.dataConverter.FromPayloads(sideEffectDataPayload, &sideEffectDataID, &sideEffectDataContents)
}
if err == nil {
weh.mutableSideEffect[sideEffectDataID] = &sideEffectDataContents
// We must mark that it is recorded so we can know whether a command
// needs to be generated during replay
if weh.mutableSideEffectsRecorded == nil {
weh.mutableSideEffectsRecorded = map[string]bool{}
}
// This must be stored with the counter
weh.mutableSideEffectsRecorded[sideEffectIDWithCounter] = true
}
default:
err = ErrUnknownMarkerName
}
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,8 @@ ProcessEvents:
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
// Reset the mutable side effect markers recorded
eventHandler.mutableSideEffectsRecorded = nil
// Markers are from the events that are produced from the current workflow task.
for _, m := range markers {
if m.GetMarkerRecordedEventAttributes().GetMarkerName() != localActivityMarkerName {
Expand Down
1 change: 1 addition & 0 deletions test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000
go.temporal.io/sdk/contrib/tally v0.1.0
go.uber.org/goleak v1.1.11
google.golang.org/grpc v1.46.0
)

replace (
Expand Down
9 changes: 0 additions & 9 deletions test/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
Expand All @@ -29,7 +28,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
Expand Down Expand Up @@ -168,8 +166,6 @@ go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uado
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.7.1-0.20220429205751-8a73b1f896d0 h1:TCljuP7nzCO0a9fMHcYsKAtacMTt92FxS1EkFzB/9NY=
go.temporal.io/api v1.7.1-0.20220429205751-8a73b1f896d0/go.mod h1:QXFU+pt4JL280LYD40YrvLelG1jfei1TZ0GD7X3DLSg=
go.temporal.io/api v1.7.1-0.20220510183009-449d18444c9a h1:9ZcXhoat+9nw8iqGxQviHWK7FkuKz8fm4aOz8pX2RzI=
go.temporal.io/api v1.7.1-0.20220510183009-449d18444c9a/go.mod h1:YU5EQaONkIr0ZRju0NqdqYNH/hCkBuwqRMDA0iaj7JM=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down Expand Up @@ -247,8 +243,6 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220429121018-84afa8d3f7b3 h1:kBsBifDikLCf5sUMbcD8p73OinDtAQWQp8+n7FiyzlA=
golang.org/x/sys v0.0.0-20220429121018-84afa8d3f7b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -287,8 +281,6 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e h1:gMjH4zLGs9m+dGzR7qHCHaXMOwsJHJKKkHtyXhtOrJk=
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3 h1:q1kiSVscqoDeqTF27eQ2NnLLDmqF0I373qQNXYMy0fo=
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand All @@ -299,7 +291,6 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
19 changes: 19 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2293,6 +2293,25 @@ func (ts *IntegrationTestSuite) TestClientGetNotFollowingRuns() {
ts.Equal(ts.taskQueueName, contErr.TaskQueueName)
}

func (ts *IntegrationTestSuite) TestMutableSideEffects() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Run workflow that does side effects to add 1 to our number
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-mutable-side-effects"),
ts.workflows.MutableSideEffect, 42)
ts.NoError(err)
var val int
ts.NoError(run.Get(ctx, &val))
ts.Equal(45, val)

// Now replay it
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ts.workflows.MutableSideEffect)
ts.NoError(replayer.ReplayWorkflowExecution(ctx, ts.client.WorkflowService(), nil, ts.config.Namespace,
workflow.Execution{ID: run.GetID(), RunID: run.GetRunID()}))
}

func (ts *IntegrationTestSuite) registerNamespace() {
client, err := client.NewNamespaceClient(client.Options{
HostPort: ts.config.ServiceAddr,
Expand Down
42 changes: 42 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,47 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif
return
}

func (w *Workflows) MutableSideEffect(ctx workflow.Context, startVal int) (currVal int, err error) {
// Make some mutable side effect calls with timers in between
sideEffector := func(retVal int) (newVal int, err error) {
err = workflow.MutableSideEffect(
ctx,
"side-effect-1",
func(ctx workflow.Context) interface{} { return retVal },
func(a, b interface{}) bool { return a.(int) == b.(int) },
).Get(&newVal)
return
}
// Make several mutable side effect calls, some that change the data, some
// that don't. And then sleep and do again. This checks that multiple
// mutable side effects of the same ID can happen at the same time, and that
// replay properly distinguishes between which ones were recorded and which
// weren't for command counting purposes
if currVal, err = sideEffector(startVal); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal + 1); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal); err != nil {
panic(err)
} else if err = workflow.Sleep(ctx, 1*time.Millisecond); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal + 1); err != nil {
panic(err)
} else if err = workflow.Sleep(ctx, 1*time.Millisecond); err != nil {
panic(err)
} else if currVal, err = sideEffector(currVal + 1); err != nil {
panic(err)
}
err = workflow.Sleep(ctx, 1*time.Millisecond)
return
}

func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
Expand Down Expand Up @@ -1856,6 +1897,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.SignalCounter)
worker.RegisterWorkflow(w.PanicOnSignal)
worker.RegisterWorkflow(w.ForcedNonDeterminism)
worker.RegisterWorkflow(w.MutableSideEffect)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childForMemoAndSearchAttr)
Expand Down

0 comments on commit 54f4148

Please sign in to comment.