Skip to content

Commit

Permalink
Added bloom filter preallocation in GJ
Browse files Browse the repository at this point in the history
As mentioned here: #13349
GraceJoin fails with memlimit when trying to allocate memory for bloom filters. This PR adds these filters to pre-allocation so we can enable spilling in case of low memory
commit_hash:107ea78fdfeee8f3422818ada96af8d3763e6849
  • Loading branch information
lll-phill-lll committed Feb 20, 2025
1 parent eaa19c6 commit c66255b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
8 changes: 7 additions & 1 deletion yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,22 +353,28 @@ bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind /
if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue;

TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket];
const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket];
TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket];

const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize,
tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0);
const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum);

try {
bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize);
bucketForPreallocationStats.BloomFilter.Reserve(bucketForPreallocationStats.TuplesNum);
} catch (TMemoryLimitExceededException) {
for (ui64 i = 0; i < bucket; ++i) {
auto& b1 = t1.TableBuckets[i];
b1.JoinSlots.resize(0);
b1.JoinSlots.shrink_to_fit();
auto& s1 = t1.TableBucketsStats[i];
s1.BloomFilter.Shrink();

auto& b2 = t2.TableBuckets[i];
b2.JoinSlots.resize(0);
b2.JoinSlots.shrink_to_fit();
auto& s2 = t2.TableBucketsStats[i];
s2.BloomFilter.Shrink();
}
return false;
}
Expand Down
19 changes: 15 additions & 4 deletions yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ class TBloomfilter {
Resize(size);
}

void Resize(ui64 size) {
void Reserve(ui64 size) {
size = std::max(size, CachelineSize);
Bits_ = 6;

for (; (ui64(1)<<Bits_) < size; ++Bits_)
for (; (ui64(1) << Bits_) < size; ++Bits_)
;

Bits_ += 3; // -> multiply by 8
size = 1u<<(Bits_ - 6);

Storage_.reserve(ComputeStorageSize());
}

void Resize(ui64 size) {
Storage_.clear();
Storage_.resize(size + CachelineSize/sizeof(ui64) - 1);

Reserve(size);
Storage_.resize(ComputeStorageSize());

// align Ptr_ up to BlockSize
Ptr_ = (ui64 *)((uintptr_t(Storage_.data()) + BlockSize - 1) & ~(BlockSize - 1));
Expand Down Expand Up @@ -104,6 +109,12 @@ class TBloomfilter {
Storage_.shrink_to_fit();
Ptr_ = Storage_.data();
}

private:
ui64 ComputeStorageSize() const {
MKQL_ENSURE(Bits_ >= 6, "Internal logic error");
return (1u << (Bits_ - 6)) + CachelineSize / sizeof(ui64) - 1;
}
};

/*
Expand Down

0 comments on commit c66255b

Please sign in to comment.