Skip to content

Commit

Permalink
drop metrics outside of the aggregators period
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 12, 2016
1 parent 8ca4a50 commit a84ce5d
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 15 deletions.
22 changes: 11 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
}()

wg.Add(len(a.Config.Aggregators))
for _, aggregator := range a.Config.Aggregators {
go func(agg *models.RunningAggregator) {
defer wg.Done()
acc := NewAccumulator(agg, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, shutdown)
}(aggregator)
}

wg.Add(len(a.Config.Inputs))
for _, input := range a.Config.Inputs {
interval := a.Config.Agent.Interval.Duration
Expand All @@ -371,17 +382,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}(input, interval)
}

wg.Add(len(a.Config.Aggregators))
for _, aggregator := range a.Config.Aggregators {
go func(agg *models.RunningAggregator) {
defer wg.Done()
acc := NewAccumulator(agg, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, shutdown)
}(aggregator)
}

wg.Wait()
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ There are no generic configuration options available for all outputs.

The following config parameters are available for all aggregators:

* **period**: The period on which to flush & clear each aggregator.
* **period**: The period on which to flush & clear each aggregator. All metrics
that are sent with timestamps outside of this period will be ignored by the
aggregator.
* **delay**: The delay before each aggregator is flushed. This is to control
how long for aggregators to wait before receiving metrics from input plugins,
in the case that aggregators are flushing and inputs are gathering on the
Expand Down
33 changes: 33 additions & 0 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type RunningAggregator struct {
Config *AggregatorConfig

metrics chan telegraf.Metric

periodStart time.Time
periodEnd time.Time
}

func NewRunningAggregator(
Expand Down Expand Up @@ -105,10 +108,32 @@ func (r *RunningAggregator) reset() {
r.a.Reset()
}

// Run runs the running aggregator, listens for incoming metrics, and waits
// for period ticks to tell it when to push and reset the aggregator.
func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
shutdown chan struct{},
) {
// The start of the period is truncated to the nearest second.
//
// Every metric then gets it's timestamp checked and is dropped if it
// is not within:
//
// start < t < end + truncation + delay
//
// So if we start at now = 00:00.2 with a 10s period and 0.3s delay:
// now = 00:00.2
// start = 00:00
// truncation = 00:00.2
// end = 00:10
// 1st interval: 00:00 - 00:10.5
// 2nd interval: 00:10 - 00:20.5
// etc.
//
now := time.Now()
r.periodStart = now.Truncate(time.Second)
truncation := now.Sub(r.periodStart)
r.periodEnd = r.periodStart.Add(r.Config.Period)
time.Sleep(r.Config.Delay)
periodT := time.NewTicker(r.Config.Period)
defer periodT.Stop()
Expand All @@ -122,8 +147,16 @@ func (r *RunningAggregator) Run(
}
return
case m := <-r.metrics:
if m.Time().Before(r.periodStart) ||
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
// the metric is outside the current aggregation period, so
// skip it.
continue
}
r.add(m)
case <-periodT.C:
r.periodStart = r.periodEnd
r.periodEnd = r.periodStart.Add(r.Config.Period)
r.push(acc)
r.reset()
}
Expand Down
62 changes: 59 additions & 3 deletions internal/models/running_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,64 @@ func TestAdd(t *testing.T) {
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
time.Now().Add(time.Millisecond*150),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if atomic.LoadInt64(&a.sum) > 0 {
break
}
}
assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum))
}

func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, make(chan struct{}))

// metric before current period
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(-time.Hour),
)
assert.False(t, ra.Add(m))

// metric after current period
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(time.Hour),
)
assert.False(t, ra.Add(m))

// "now" metric
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(time.Millisecond*50),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if atomic.LoadInt64(&a.sum) > 0 {
break
}
Expand Down Expand Up @@ -68,11 +121,12 @@ func TestAddAndPushOnePeriod(t *testing.T) {
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
time.Now().Add(time.Millisecond*100),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if acc.NMetrics() > 0 {
break
}
Expand Down Expand Up @@ -182,7 +236,9 @@ type TestAggregator struct {

func (t *TestAggregator) Description() string { return "" }
func (t *TestAggregator) SampleConfig() string { return "" }
func (t *TestAggregator) Reset() {}
func (t *TestAggregator) Reset() {
atomic.StoreInt64(&t.sum, 0)
}

func (t *TestAggregator) Push(acc telegraf.Accumulator) {
acc.AddFields("TestMetric",
Expand Down

0 comments on commit a84ce5d

Please sign in to comment.