diff --git a/agent/agent.go b/agent/agent.go index 47d5336e48c58..1a205e218b203 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -358,6 +358,17 @@ func (a *Agent) Run(shutdown chan struct{}) error { } }() + wg.Add(len(a.Config.Aggregators)) + for _, aggregator := range a.Config.Aggregators { + go func(agg *models.RunningAggregator) { + defer wg.Done() + acc := NewAccumulator(agg, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + agg.Run(acc, shutdown) + }(aggregator) + } + wg.Add(len(a.Config.Inputs)) for _, input := range a.Config.Inputs { interval := a.Config.Agent.Interval.Duration @@ -371,17 +382,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { }(input, interval) } - wg.Add(len(a.Config.Aggregators)) - for _, aggregator := range a.Config.Aggregators { - go func(agg *models.RunningAggregator) { - defer wg.Done() - acc := NewAccumulator(agg, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) - agg.Run(acc, shutdown) - }(aggregator) - } - wg.Wait() return nil } diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 5cf43f5322dbc..d05fc987ec834 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -87,7 +87,9 @@ There are no generic configuration options available for all outputs. The following config parameters are available for all aggregators: -* **period**: The period on which to flush & clear each aggregator. +* **period**: The period on which to flush & clear each aggregator. All metrics +that are sent with timestamps outside of this period will be ignored by the +aggregator. * **delay**: The delay before each aggregator is flushed. This is to control how long for aggregators to wait before receiving metrics from input plugins, in the case that aggregators are flushing and inputs are gathering on the diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 9c8403f5fec11..5c7640ba68258 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -11,6 +11,9 @@ type RunningAggregator struct { Config *AggregatorConfig metrics chan telegraf.Metric + + periodStart time.Time + periodEnd time.Time } func NewRunningAggregator( @@ -105,10 +108,32 @@ func (r *RunningAggregator) reset() { r.a.Reset() } +// Run runs the running aggregator, listens for incoming metrics, and waits +// for period ticks to tell it when to push and reset the aggregator. func (r *RunningAggregator) Run( acc telegraf.Accumulator, shutdown chan struct{}, ) { + // The start of the period is truncated to the nearest second. + // + // Every metric then gets it's timestamp checked and is dropped if it + // is not within: + // + // start < t < end + truncation + delay + // + // So if we start at now = 00:00.2 with a 10s period and 0.3s delay: + // now = 00:00.2 + // start = 00:00 + // truncation = 00:00.2 + // end = 00:10 + // 1st interval: 00:00 - 00:10.5 + // 2nd interval: 00:10 - 00:20.5 + // etc. + // + now := time.Now() + r.periodStart = now.Truncate(time.Second) + truncation := now.Sub(r.periodStart) + r.periodEnd = r.periodStart.Add(r.Config.Period) time.Sleep(r.Config.Delay) periodT := time.NewTicker(r.Config.Period) defer periodT.Stop() @@ -122,8 +147,16 @@ func (r *RunningAggregator) Run( } return case m := <-r.metrics: + if m.Time().Before(r.periodStart) || + m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) { + // the metric is outside the current aggregation period, so + // skip it. + continue + } r.add(m) case <-periodT.C: + r.periodStart = r.periodEnd + r.periodEnd = r.periodStart.Add(r.Config.Period) r.push(acc) r.reset() } diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index 495b8ddda1e42..834f7d1e0118f 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -31,11 +31,64 @@ func TestAdd(t *testing.T) { map[string]interface{}{"value": int(101)}, map[string]string{}, telegraf.Untyped, - time.Now(), + time.Now().Add(time.Millisecond*150), ) assert.False(t, ra.Add(m)) for { + time.Sleep(time.Millisecond) + if atomic.LoadInt64(&a.sum) > 0 { + break + } + } + assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum)) +} + +func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { + a := &TestAggregator{} + ra := NewRunningAggregator(a, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"*"}, + }, + Period: time.Millisecond * 500, + }) + assert.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + go ra.Run(&acc, make(chan struct{})) + + // metric before current period + m := ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now().Add(-time.Hour), + ) + assert.False(t, ra.Add(m)) + + // metric after current period + m = ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now().Add(time.Hour), + ) + assert.False(t, ra.Add(m)) + + // "now" metric + m = ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now().Add(time.Millisecond*50), + ) + assert.False(t, ra.Add(m)) + + for { + time.Sleep(time.Millisecond) if atomic.LoadInt64(&a.sum) > 0 { break } @@ -68,11 +121,12 @@ func TestAddAndPushOnePeriod(t *testing.T) { map[string]interface{}{"value": int(101)}, map[string]string{}, telegraf.Untyped, - time.Now(), + time.Now().Add(time.Millisecond*100), ) assert.False(t, ra.Add(m)) for { + time.Sleep(time.Millisecond) if acc.NMetrics() > 0 { break } @@ -182,7 +236,9 @@ type TestAggregator struct { func (t *TestAggregator) Description() string { return "" } func (t *TestAggregator) SampleConfig() string { return "" } -func (t *TestAggregator) Reset() {} +func (t *TestAggregator) Reset() { + atomic.StoreInt64(&t.sum, 0) +} func (t *TestAggregator) Push(acc telegraf.Accumulator) { acc.AddFields("TestMetric",