diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index 7d57f50ae58..ee47c2c6d72 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -221,6 +221,7 @@ func (s *watchableStore) syncWatchersLoop() { waitDuration := 100 * time.Millisecond delayTicker := time.NewTicker(waitDuration) defer delayTicker.Stop() + var evs []mvccpb.Event for { s.mu.RLock() @@ -230,7 +231,7 @@ func (s *watchableStore) syncWatchersLoop() { unsyncedWatchers := 0 if lastUnsyncedWatchers > 0 { - unsyncedWatchers = s.syncWatchers() + unsyncedWatchers, evs = s.syncWatchers(evs) } syncDuration := time.Since(st) @@ -339,12 +340,12 @@ func (s *watchableStore) moveVictims() (moved int) { // 2. iterate over the set to get the minimum revision and remove compacted watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers // 4. remove synced watchers in set from unsynced group and move to synced group -func (s *watchableStore) syncWatchers() int { +func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) { s.mu.Lock() defer s.mu.Unlock() if s.unsynced.size() == 0 { - return 0 + return 0, []mvccpb.Event{} } s.store.revMu.RLock() @@ -357,7 +358,7 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) - evs := rangeEvents(s.store.lg, s.store.b, minRev, curRev+1, wg) + evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1) victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) @@ -406,11 +407,43 @@ func (s *watchableStore) syncWatchers() int { } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) - return s.unsynced.size() + return s.unsynced.size(), evs +} + +// rangeEventsWithReuse returns events in range [minRev, maxRev), while reusing already provided events. +func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event, minRev, maxRev int64) []mvccpb.Event { + if len(evs) == 0 { + return rangeEvents(lg, b, minRev, maxRev) + } + // append from left + if evs[0].Kv.ModRevision > minRev { + evs = append(rangeEvents(lg, b, minRev, evs[0].Kv.ModRevision), evs...) + } + // cut from left + prefixIndex := 0 + for prefixIndex < len(evs) && evs[prefixIndex].Kv.ModRevision < minRev { + prefixIndex++ + } + evs = evs[prefixIndex:] + + if len(evs) == 0 { + return rangeEvents(lg, b, minRev, maxRev) + } + // append from right + if evs[len(evs)-1].Kv.ModRevision+1 < maxRev { + evs = append(evs, rangeEvents(lg, b, evs[len(evs)-1].Kv.ModRevision+1, maxRev)...) + } + // cut from right + suffixIndex := len(evs) - 1 + for suffixIndex >= 0 && evs[suffixIndex].Kv.ModRevision >= maxRev { + suffixIndex-- + } + evs = evs[:suffixIndex+1] + return evs } // rangeEvents returns events in range [minRev, maxRev). -func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c contains) []mvccpb.Event { +func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event { minBytes, maxBytes := NewRevBytes(), NewRevBytes() minBytes = RevToBytes(Revision{Main: minRev}, minBytes) maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes) @@ -420,7 +453,7 @@ func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c cont tx := b.ReadTx() tx.RLock() revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) - evs := kvsToEvents(lg, c, revs, vs) + evs := kvsToEvents(lg, revs, vs) // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. // We can only unlock after Unmarshal, which will do deep copy. // Otherwise we will trigger SIGSEGV during boltdb re-mmap. @@ -428,22 +461,14 @@ func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c cont return evs } -type contains interface { - contains(string) bool -} - // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, c contains, revs, vals [][]byte) (evs []mvccpb.Event) { +func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } - if !c.contains(string(kv.Key)) { - continue - } - ty := mvccpb.PUT if isTombstone(revs[i]) { ty = mvccpb.DELETE diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index c2030616180..a418c6c78fe 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -141,7 +141,7 @@ func TestSyncWatchers(t *testing.T) { assert.Empty(t, s.synced.watcherSetByKey(string(testKey))) assert.Len(t, s.unsynced.watcherSetByKey(string(testKey)), watcherN) - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) assert.Len(t, s.synced.watcherSetByKey(string(testKey)), watcherN) assert.Empty(t, s.unsynced.watcherSetByKey(string(testKey))) @@ -257,19 +257,17 @@ func TestRangeEvents(t *testing.T) { {minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]}, {minRev: 6, maxRev: 6, expectEvents: expectEvents[5:5]}, } + // reuse the evs to test rangeEventsWithReuse + var evs []mvccpb.Event for i, tc := range tcs { t.Run(fmt.Sprintf("%d rangeEvents(%d, %d)", i, tc.minRev, tc.maxRev), func(t *testing.T) { - assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev, fakeContains{})) + assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev)) + evs = rangeEventsWithReuse(lg, b, evs, tc.minRev, tc.maxRev) + assert.ElementsMatch(t, tc.expectEvents, evs) }) } } -type fakeContains struct{} - -func (f fakeContains) contains(string) bool { - return true -} - // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) @@ -342,7 +340,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { require.NoError(t, err) } // fill up w.Chan() with 1 buf via 2 compacted watch response - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) for len(watchers) > 0 { resp := <-w.Chan() diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index 0c1fa521267..e774c70cfac 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -320,7 +320,7 @@ func TestWatcherRequestProgress(t *testing.T) { default: } - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) w.RequestProgress(id) wrs := WatchResponse{WatchID: id, Revision: 2} @@ -359,7 +359,7 @@ func TestWatcherRequestProgressAll(t *testing.T) { default: } - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) w.RequestProgressAll() wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}