diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 9274684de4bf..8c29637f1d3f 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -36,12 +36,14 @@ using namespace NResourceBroker; #define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) #define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) -#define LOG_AS_C(stream) LOG_CRIT_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream) -#define LOG_AS_D(stream) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream) -#define LOG_AS_I(stream) LOG_INFO_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream) -#define LOG_AS_E(stream) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream) -#define LOG_AS_W(stream) LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream) -#define LOG_AS_N(stream) LOG_NOTICE_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_AS_SAFE(log) {if (ActorSystem) { log; }} + +#define LOG_AS_C(stream) LOG_AS_SAFE(LOG_CRIT_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)) +#define LOG_AS_D(stream) LOG_AS_SAFE(LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)) +#define LOG_AS_I(stream) LOG_AS_SAFE(LOG_INFO_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)) +#define LOG_AS_E(stream) LOG_AS_SAFE(LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)) +#define LOG_AS_W(stream) LOG_AS_SAFE(LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)) +#define LOG_AS_N(stream) LOG_AS_SAFE(LOG_NOTICE_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)) namespace { @@ -165,6 +167,7 @@ class TKqpResourceManager : public IKqpResourceManager { , TotalMemoryResource(MakeIntrusive(config.GetQueryMemoryLimit(), (double)100, config.GetSpillingPercent())) , ResourceSnapshotState(std::make_shared()) { + PublishAfterBootstrap.clear(); SetConfigValues(config); } @@ -179,6 +182,11 @@ class TKqpResourceManager : public IKqpResourceManager { config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); CreateResourceInfoExchanger(config.GetInfoExchangerSettings()); + + if (PublishAfterBootstrap.test()) { + FireResourcesPublishing(); + PublishAfterBootstrap.clear(); + } } const TIntrusivePtr& GetCounters() const override { @@ -489,7 +497,11 @@ class TKqpResourceManager : public IKqpResourceManager { void FireResourcesPublishing() { bool prev = PublishScheduled.test_and_set(); if (!prev) { - ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); + if (Y_LIKELY(ActorSystem)) { + ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); + } else { + PublishAfterBootstrap.test_and_set(); + } } } @@ -535,6 +547,7 @@ class TKqpResourceManager : public IKqpResourceManager { // current state std::atomic LastResourceBrokerTaskId = 0; + std::atomic_flag PublishAfterBootstrap; std::atomic_flag PublishScheduled; // pattern cache for different actors std::shared_ptr PatternCache; diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index de923393b0e1..8dcb892766a8 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -1,7 +1,7 @@ ActorSystemConfig { Executor { Type: BASIC - Threads: 1 + Threads: 2 SpinThreshold: 10 Name: "System" }