Skip to content

Commit

Permalink
YQ-4133 fix race on RM initialization (#14931)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 24, 2025
1 parent 50d5375 commit 388cf79
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
27 changes: 20 additions & 7 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ using namespace NResourceBroker;
#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)

#define LOG_AS_C(stream) LOG_CRIT_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_AS_D(stream) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_AS_I(stream) LOG_INFO_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_AS_E(stream) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_AS_W(stream) LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_AS_N(stream) LOG_NOTICE_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_AS_SAFE(log) {if (ActorSystem) { log; }}

#define LOG_AS_C(stream) LOG_AS_SAFE(LOG_CRIT_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
#define LOG_AS_D(stream) LOG_AS_SAFE(LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
#define LOG_AS_I(stream) LOG_AS_SAFE(LOG_INFO_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
#define LOG_AS_E(stream) LOG_AS_SAFE(LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
#define LOG_AS_W(stream) LOG_AS_SAFE(LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
#define LOG_AS_N(stream) LOG_AS_SAFE(LOG_NOTICE_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))

namespace {

Expand Down Expand Up @@ -165,6 +167,7 @@ class TKqpResourceManager : public IKqpResourceManager {
, TotalMemoryResource(MakeIntrusive<TMemoryResource>(config.GetQueryMemoryLimit(), (double)100, config.GetSpillingPercent()))
, ResourceSnapshotState(std::make_shared<TResourceSnapshotState>())
{
PublishAfterBootstrap.clear();
SetConfigValues(config);
}

Expand All @@ -179,6 +182,11 @@ class TKqpResourceManager : public IKqpResourceManager {
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());

CreateResourceInfoExchanger(config.GetInfoExchangerSettings());

if (PublishAfterBootstrap.test()) {
FireResourcesPublishing();
PublishAfterBootstrap.clear();
}
}

const TIntrusivePtr<TKqpCounters>& GetCounters() const override {
Expand Down Expand Up @@ -489,7 +497,11 @@ class TKqpResourceManager : public IKqpResourceManager {
void FireResourcesPublishing() {
bool prev = PublishScheduled.test_and_set();
if (!prev) {
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
if (Y_LIKELY(ActorSystem)) {
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
} else {
PublishAfterBootstrap.test_and_set();
}
}
}

Expand Down Expand Up @@ -535,6 +547,7 @@ class TKqpResourceManager : public IKqpResourceManager {
// current state
std::atomic<ui64> LastResourceBrokerTaskId = 0;

std::atomic_flag PublishAfterBootstrap;
std::atomic_flag PublishScheduled;
// pattern cache for different actors
std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache;
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ActorSystemConfig {
Executor {
Type: BASIC
Threads: 1
Threads: 2
SpinThreshold: 10
Name: "System"
}
Expand Down

0 comments on commit 388cf79

Please sign in to comment.