Skip to content

Commit

Permalink
Fix flakiness with OpenTelemetry integration test (#684)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jan 5, 2022
1 parent 638b0ed commit 4d67625
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1549,7 +1550,8 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
}

// Confirm expected
actual := interceptortest.Span("root-span", ts.openTelemetrySpanChildren(spans, rootSpan.SpanContext().SpanID())...)
actual := interceptortest.Span("root-span")
ts.addOpenTelemetryChildren(rootSpan.SpanContext().SpanID(), actual, spans)
expected := span("root-span",
span("SignalWithStartWorkflow:SignalsAndQueries",
span("HandleSignal:start-signal"),
Expand Down Expand Up @@ -1610,29 +1612,36 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
ts.Equal(expected, actual)
}

func (ts *IntegrationTestSuite) openTelemetrySpanChildren(
spans []sdktrace.ReadOnlySpan,
func (ts *IntegrationTestSuite) addOpenTelemetryChildren(
parentID trace.SpanID,
) (ret []*interceptortest.SpanInfo) {
seenSpans := map[string]*interceptortest.SpanInfo{}
parentSpan *interceptortest.SpanInfo,
spans []sdktrace.ReadOnlySpan,
) {
// Add any children that are not already present. We have to dedupe children
// recursively like this because, in cases where we have disabled the cache,
// the same interceptor may be called many times in duplicated ways but we
// only want the unique set based on name.
for _, s := range spans {
if s.Parent().SpanID() == parentID {
// In cases where we have disabled the cache, the same interceptors get
// called many times with replayed values which create replayed spans. We
// can't disable spans during replay because they are sometimes the ones
// that code paths continue on so they need spans. Since the tests don't
// expect any spans of the same name at the same level, we will just reuse
// spans we find with the same name to prevent this.
span := seenSpans[s.Name()]
if span == nil {
span = interceptortest.Span(s.Name())
ret = append(ret, span)
seenSpans[s.Name()] = span
// Must be same parent
if s.Parent().SpanID() != parentID {
continue
}
// Try to find child that already exists by name
var child *interceptortest.SpanInfo
for _, maybeChild := range parentSpan.Children {
if maybeChild.Name == s.Name() {
child = maybeChild
break
}
span.Children = append(span.Children, ts.openTelemetrySpanChildren(spans, s.SpanContext().SpanID())...)
}
// Add child if not there
if child == nil {
child = interceptortest.Span(s.Name())
parentSpan.Children = append(parentSpan.Children, child)
}
// Collect grandchildren
ts.addOpenTelemetryChildren(s.SpanContext().SpanID(), child, spans)
}
return
}

func (ts *IntegrationTestSuite) TestAdvancedPostCancellation() {
Expand Down Expand Up @@ -1979,8 +1988,8 @@ var _ interceptor.WorkflowInboundInterceptor = (*signalWorkflowInboundIntercepto

type signalInterceptor struct {
interceptor.WorkerInterceptorBase
ReturnErrorTimes int
TimesInvoked int
ReturnErrorTimes uint32
TimesInvoked uint32
}

func newSignalInterceptor() *signalInterceptor {
Expand All @@ -1993,8 +2002,8 @@ type signalWorkflowInboundInterceptor struct {
}

func (t *signalWorkflowInboundInterceptor) HandleSignal(ctx workflow.Context, in *interceptor.HandleSignalInput) error {
t.control.TimesInvoked++
if t.control.TimesInvoked <= t.control.ReturnErrorTimes {
timesInvoked := atomic.AddUint32(&t.control.TimesInvoked, 1)
if timesInvoked <= t.control.ReturnErrorTimes {
return fmt.Errorf("interceptor induced failure while processing signal %v", in.SignalName)
}
return t.Next.HandleSignal(ctx, in)
Expand Down

0 comments on commit 4d67625

Please sign in to comment.