Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract rangeEvents function #18981

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,20 +357,7 @@ func (s *watchableStore) syncWatchers() int {
compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
evs := rangeEvents(s.store.lg, s.store.b, minRev, curRev+1, wg)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
Expand Down Expand Up @@ -422,15 +409,38 @@ func (s *watchableStore) syncWatchers() int {
return s.unsynced.size()
}

// rangeEvents returns events in range [minRev, maxRev).
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c contains) []mvccpb.Event {
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(lg, c, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
return evs
}

type contains interface {
contains(string) bool
}
Comment on lines +431 to +433
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make much sense to add an interface here. I didn't raise this comment in #17563 is because you removed it in your second commit.


// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
func kvsToEvents(lg *zap.Logger, c contains, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}

if !wg.contains(string(kv.Key)) {
if !c.contains(string(kv.Key)) {
continue
}

Expand Down
106 changes: 106 additions & 0 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,112 @@ func TestSyncWatchers(t *testing.T) {
}
}

func TestRangeEvents(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := NewStore(lg, b, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(s, b)

foo1 := []byte("foo1")
foo2 := []byte("foo2")
foo3 := []byte("foo3")
value := []byte("bar")
s.Put(foo1, value, lease.NoLease)
s.Put(foo2, value, lease.NoLease)
s.Put(foo3, value, lease.NoLease)
s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events

expectEvents := []mvccpb.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo1,
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo2,
CreateRevision: 3,
ModRevision: 3,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo3,
CreateRevision: 4,
ModRevision: 4,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: foo1,
ModRevision: 5,
},
},
{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: foo2,
ModRevision: 5,
},
},
}

tcs := []struct {
minRev int64
maxRev int64
expectEvents []mvccpb.Event
}{
// maxRev, top to bottom
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
{minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]},
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},

// minRev, bottom to top
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
{minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]},
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},
{minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]},
{minRev: 6, maxRev: 6, expectEvents: expectEvents[0:0]},

// Moving window algorithm, first increase maxRev, then increase minRev, repeat.
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
{minRev: 3, maxRev: 4, expectEvents: expectEvents[1:2]},
{minRev: 3, maxRev: 5, expectEvents: expectEvents[1:3]},
{minRev: 4, maxRev: 5, expectEvents: expectEvents[2:3]},
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},
{minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]},
{minRev: 6, maxRev: 6, expectEvents: expectEvents[5:5]},
}
for i, tc := range tcs {
t.Run(fmt.Sprintf("%d rangeEvents(%d, %d)", i, tc.minRev, tc.maxRev), func(t *testing.T) {
assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev, fakeContains{}))
})
}
}

type fakeContains struct{}

func (f fakeContains) contains(string) bool {
return true
}

// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
Expand Down
Loading