Skip to content

Commit

Permalink
Store sticky pages separately (#12198)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Dec 4, 2024
1 parent 4f34c1a commit fa3d243
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 178 deletions.
86 changes: 61 additions & 25 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ void TExecutor::Broken() {
void TExecutor::RecreatePageCollectionsCache() noexcept
{
PrivatePageCache = MakeHolder<TPrivatePageCache>();
StickyPagesMemory = 0;

Stats->PacksMetaBytes = 0;

Expand Down Expand Up @@ -611,6 +612,10 @@ void TExecutor::AddSingleCache(const TIntrusivePtr<TPrivatePageCache::TInfo> &in
{
PrivatePageCache->RegisterPageCollection(info);
Send(MakeSharedPageCacheId(), new NSharedCache::TEvAttach(info->PageCollection, SelfId()));

StickyPagesMemory += info->GetStickySize();

Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY] = StickyPagesMemory;
}

void TExecutor::DropCachesOfBundle(const NTable::TPart &part) noexcept
Expand All @@ -633,13 +638,20 @@ void TExecutor::DropCachesOfBundle(const NTable::TPart &part) noexcept

void TExecutor::DropSingleCache(const TLogoBlobID &label) noexcept
{
auto toActivate = PrivatePageCache->ForgetPageCollection(label);
auto pageCollection = PrivatePageCache->GetPageCollection(label);

ui64 stickySize = pageCollection->GetStickySize();
Y_ABORT_UNLESS(StickyPagesMemory >= stickySize);
StickyPagesMemory -= stickySize;

auto toActivate = PrivatePageCache->ForgetPageCollection(pageCollection);
ActivateWaitingTransactions(toActivate);
if (!PrivatePageCache->Info(label))
Send(MakeSharedPageCacheId(), new NSharedCache::TEvInvalidate(label));

Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY] = StickyPagesMemory;
}

void TExecutor::TranslateCacheTouchesToSharedCache() {
Expand Down Expand Up @@ -667,6 +679,28 @@ void TExecutor::RequestInMemPagesForDatabase(bool pendingOnly) {
}
}

void TExecutor::StickInMemPages(NSharedCache::TEvResult *msg) {
const auto& scheme = Scheme();
for (auto& pr : scheme.Tables) {
const ui32 tid = pr.first;
auto subset = Database->Subset(tid, NTable::TEpoch::Max(), { } , { });
for (auto &partView : subset->Flatten) {
auto partStore = partView.As<NTable::TPartStore>();
for (auto &pageCollection : partStore->PageCollections) {
// Note: page collection search optimization seems useless
if (pageCollection->PageCollection == msg->Origin) {
ui64 stickySizeBefore = pageCollection->GetStickySize();
for (auto& loaded : msg->Loaded) {
pageCollection->AddSticky(loaded.PageId, loaded.Page);
}
StickyPagesMemory += pageCollection->GetStickySize() - stickySizeBefore;
}
}
}
}
// Note: the next call of ProvideBlock will also fill pages bodies
}

TExecutorCaches TExecutor::CleanupState() {
TExecutorCaches caches;

Expand Down Expand Up @@ -1347,14 +1381,8 @@ void TExecutor::RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartV

if (stickyGroup) {
auto req = partView.As<NTable::TPartStore>()->GetPages(groupIndex);

TPrivatePageCache::TInfo *info = PrivatePageCache->Info(req->PageCollection->Label());
Y_ABORT_UNLESS(info);
for (ui32 pageId : req->Pages)
PrivatePageCache->MarkSticky(pageId, info);

// TODO: only request missing pages
RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::CacheSync);
RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::InMemPages);
}
}
}
Expand Down Expand Up @@ -1792,13 +1820,17 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
}

void TExecutor::UnpinTransactionPages(TSeat &seat) {
Y_ABORT_UNLESS(TransactionPagesMemory >= seat.MemoryTouched);
TransactionPagesMemory -= seat.MemoryTouched;

size_t unpinnedPages = 0;
PrivatePageCache->UnpinPages(seat.Pinned, unpinnedPages);
seat.Pinned.clear();
seat.MemoryTouched = 0;

Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED] = TransactionPagesMemory;
}

void TExecutor::ReleaseTxData(TSeat &seat, ui64 requested, const TActorContext &ctx)
Expand Down Expand Up @@ -1830,12 +1862,14 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
ui64 prevTouched = seat->MemoryTouched;

PrivatePageCache->PinTouches(seat->Pinned, touchedPages, newPinnedPages, seat->MemoryTouched);
TransactionPagesMemory += seat->MemoryTouched - prevTouched;

ui32 newTouchedPages = newPinnedPages;
ui64 newTouchedBytes = seat->MemoryTouched - prevTouched;
prevTouched = seat->MemoryTouched;

PrivatePageCache->PinToLoad(seat->Pinned, newPinnedPages, seat->MemoryTouched);
TransactionPagesMemory += seat->MemoryTouched - prevTouched;

if (seat->AttachedMemory)
Memory->AttachMemory(*seat);
Expand Down Expand Up @@ -1978,6 +2012,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &

Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED] = TransactionPagesMemory;
}

void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &env,
Expand Down Expand Up @@ -2665,20 +2700,19 @@ void TExecutor::Handle(TEvents::TEvFlushLog::TPtr &ev) {
}

void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
const bool failed = (ev->Get()->Status != NKikimrProto::OK);
NSharedCache::TEvResult *msg = ev->Get();
const bool failed = (msg->Status != NKikimrProto::OK);

if (auto logl = Logger->Log(failed ? ELnLev::Info : ELnLev::Debug)) {
logl
<< NFmt::Do(*this) << " got result " << NFmt::Do(*ev->Get())
<< ", category " << ev->Cookie;
}

switch (EPageCollectionRequest(ev->Cookie)) {
switch (auto requestType = EPageCollectionRequest(ev->Cookie)) {
case EPageCollectionRequest::Cache:
case EPageCollectionRequest::CacheSync:
case EPageCollectionRequest::InMemPages:
{
auto *msg = ev->CastAsLocal<NSharedCache::TEvResult>();

TPrivatePageCache::TInfo *collectionInfo = PrivatePageCache->Info(msg->Origin->Label());
if (!collectionInfo) // collection could be outdated
return;
Expand All @@ -2695,6 +2729,9 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
return Broken();
}

if (requestType == EPageCollectionRequest::InMemPages) {
StickInMemPages(msg);
}
for (auto& loaded : msg->Loaded) {
TPrivatePageCache::TPage::TWaitQueuePtr transactionsToActivate = PrivatePageCache->ProvideBlock(std::move(loaded), collectionInfo);
ActivateWaitingTransactions(transactionsToActivate);
Expand All @@ -2704,8 +2741,6 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {

case EPageCollectionRequest::PendingInit:
{
auto *msg = ev->CastAsLocal<NSharedCache::TEvResult>();

const auto *pageCollection = msg->Origin.Get();
TPendingPartSwitch *foundSwitch = nullptr;
TPendingPartSwitch::TNewBundle *foundBundle = nullptr;
Expand Down Expand Up @@ -2756,6 +2791,7 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
return;

default:
Y_DEBUG_ABORT_S("Unexpected request " << ev->Cookie);
break;
}
}
Expand Down Expand Up @@ -3460,8 +3496,8 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)

CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage());

const ui64 partSwitchCpuuS = ui64(1000000. * partSwitchCpuTimer.Passed());
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_PARTSWITCH_CPUTIME].IncrementFor(partSwitchCpuuS);
const ui64 partSwitchCpuUs = ui64(1000000. * partSwitchCpuTimer.Passed());
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_PARTSWITCH_CPUTIME].IncrementFor(partSwitchCpuUs);

if (msg->YellowMoveChannels || msg->YellowStopChannels) {
CheckYellow(std::move(msg->YellowMoveChannels), std::move(msg->YellowStopChannels));
Expand Down Expand Up @@ -3490,13 +3526,11 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)

void TExecutor::UpdateUsedTabletMemory() {
// Estimate memory usage for internal executor structures:
UsedTabletMemory = 50 << 10; // 50kb
UsedTabletMemory = 50_KB;

// Count the number of bytes kept in private cache (can't be offloaded right now):
if (PrivatePageCache) {
UsedTabletMemory += PrivatePageCache->GetStats().TotalPinnedBody;
UsedTabletMemory += PrivatePageCache->GetStats().PinnedLoadSize;
}
// Count the number of bytes that can't be offloaded right now:
UsedTabletMemory += StickyPagesMemory;
UsedTabletMemory += TransactionPagesMemory;

// Estimate memory used by internal database structures:
auto &counters = Database->Counters();
Expand Down Expand Up @@ -3589,8 +3623,9 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_SHARED_BODY].Set(stats.TotalSharedBody);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_PINNED_BODY].Set(stats.TotalPinnedBody);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_EXCLUSIVE].Set(stats.TotalExclusive);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY].Set(stats.TotalSticky);
}
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY].Set(StickyPagesMemory);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED].Set(TransactionPagesMemory);

const auto &memory = Memory->Stats();

Expand Down Expand Up @@ -4108,7 +4143,8 @@ void TExecutor::RenderHtmlPage(NMon::TEvRemoteHttpInfo::TPtr &ev) const {
DIV_CLASS("row") {str << "Total bytes in shared cache: " << PrivatePageCache->GetStats().TotalSharedBody; }
DIV_CLASS("row") {str << "Total bytes in local cache: " << PrivatePageCache->GetStats().TotalPinnedBody; }
DIV_CLASS("row") {str << "Total bytes exclusive to local cache: " << PrivatePageCache->GetStats().TotalExclusive; }
DIV_CLASS("row") {str << "Total bytes marked as sticky: " << PrivatePageCache->GetStats().TotalSticky; }
DIV_CLASS("row") {str << "Total bytes marked as sticky: " << StickyPagesMemory; }
DIV_CLASS("row") {str << "Total bytes currently in use: " << TransactionPagesMemory; }

if (GcLogic) {
TAG(TH3) {str << "Gc logic:";}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ struct TPendingPartSwitch {
enum class EPageCollectionRequest : ui64 {
Undefined = 0,
Cache = 1,
CacheSync,
InMemPages,
PendingInit,
BootLogic,
};
Expand Down Expand Up @@ -465,6 +465,8 @@ class TExecutor
size_t ReadyPartSwitches = 0;

ui64 UsedTabletMemory = 0;
ui64 StickyPagesMemory = 0;
ui64 TransactionPagesMemory = 0;

TActorContext OwnerCtx() const;

Expand Down Expand Up @@ -515,6 +517,7 @@ class TExecutor
void TranslateCacheTouchesToSharedCache();
void RequestInMemPagesForDatabase(bool pendingOnly = false);
void RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartView &partView, const THashSet<NTable::TTag> &stickyColumns);
void StickInMemPages(NSharedCache::TEvResult *msg);
THashSet<NTable::TTag> GetStickyColumns(ui32 tableId);
void RequestFromSharedCache(TAutoPtr<NPageCollection::TFetch> fetch,
NBlockIO::EPriority way, EPageCollectionRequest requestCategory);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/flat_executor_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace NTabletFlatExecutor {
XX(COMPACTION_READ_IN_FLY, "CompactionReadInFly") \
XX(DB_FLAT_INDEX_BYTES, "DbFlatIndexBytes") \
XX(DB_B_TREE_INDEX_BYTES, "DbBTreeIndexBytes") \
XX(CACHE_TOTAL_USED, "CacheTotalUsed") \

// don't change order!
#define FLAT_EXECUTOR_CUMULATIVE_COUNTERS_MAP(XX) \
Expand Down
Loading

0 comments on commit fa3d243

Please sign in to comment.