-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YQ-3926 RD added parallel purecalc compilation #12505
Merged
GrigoriyPA
merged 3 commits into
ydb-platform:main
from
GrigoriyPA:YQ-3926-RD-add-parallel-purecalc-compilation
Dec 23, 2024
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -4,6 +4,7 @@ | |||
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h> | ||||
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h> | ||||
|
||||
#include <ydb/library/actors/core/actor_bootstrapped.h> | ||||
#include <ydb/library/actors/core/hfunc.h> | ||||
|
||||
#include <yql/essentials/public/purecalc/common/interface.h> | ||||
|
@@ -12,26 +13,49 @@ namespace NFq::NRowDispatcher { | |||
|
||||
namespace { | ||||
|
||||
class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> { | ||||
using TBase = NActors::TActor<TPurecalcCompileService>; | ||||
struct TEvPrivate { | ||||
// Event ids | ||||
enum EEv : ui32 { | ||||
EvCompileFinished = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), | ||||
EvEnd | ||||
}; | ||||
|
||||
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); | ||||
|
||||
// Events | ||||
struct TEvCompileFinished : public NActors::TEventLocal<TEvCompileFinished, EvCompileFinished> { | ||||
TEvCompileFinished(NActors::TActorId requestActor, ui64 requestId) | ||||
: RequestActor(requestActor) | ||||
, RequestId(requestId) | ||||
{} | ||||
|
||||
const NActors::TActorId RequestActor; | ||||
const ui64 RequestId; | ||||
}; | ||||
}; | ||||
|
||||
class TPurecalcCompileActor : public NActors::TActorBootstrapped<TPurecalcCompileActor> { | ||||
public: | ||||
TPurecalcCompileService() | ||||
: TBase(&TPurecalcCompileService::StateFunc) | ||||
, LogPrefix("TPurecalcCompileService: ") | ||||
TPurecalcCompileActor(NActors::TActorId owner, NYql::NPureCalc::IProgramFactoryPtr factory, TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr request) | ||||
: Owner(owner) | ||||
, Factory(factory) | ||||
, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ") | ||||
, Request(std::move(request)) | ||||
{} | ||||
|
||||
STRICT_STFUNC(StateFunc, | ||||
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); | ||||
) | ||||
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_ACTOR"; | ||||
|
||||
void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { | ||||
LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie); | ||||
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder); | ||||
void Bootstrap() { | ||||
Y_DEFER { | ||||
Finish(); | ||||
}; | ||||
|
||||
LOG_ROW_DISPATCHER_TRACE("Started compile request"); | ||||
IProgramHolder::TPtr programHolder = std::move(Request->Get()->ProgramHolder); | ||||
|
||||
TStatus status = TStatus::Success(); | ||||
try { | ||||
programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings)); | ||||
programHolder->CreateProgram(Factory); | ||||
} catch (const NYql::NPureCalc::TCompileError& error) { | ||||
status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues()) | ||||
.AddIssue(TStringBuilder() << "Final yql: " << error.GetYql()) | ||||
|
@@ -41,15 +65,122 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> | |||
} | ||||
|
||||
if (status.IsFail()) { | ||||
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie); | ||||
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, ev->Cookie); | ||||
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request"); | ||||
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, Request->Cookie); | ||||
} else { | ||||
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie); | ||||
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie); | ||||
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request"); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Сюда тоже нужен какой-то идентификатор, иначе спам будет бесполезный There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тут тоже, id переехало в LogPrefix:
|
||||
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, Request->Cookie); | ||||
} | ||||
} | ||||
|
||||
private: | ||||
void Finish() { | ||||
Send(Owner, new TEvPrivate::TEvCompileFinished(Request->Sender, Request->Cookie)); | ||||
PassAway(); | ||||
} | ||||
|
||||
private: | ||||
const NActors::TActorId Owner; | ||||
const NYql::NPureCalc::IProgramFactoryPtr Factory; | ||||
const TString LogPrefix; | ||||
|
||||
TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr Request; | ||||
}; | ||||
|
||||
class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> { | ||||
using TBase = NActors::TActor<TPurecalcCompileService>; | ||||
|
||||
struct TCounters { | ||||
const NMonitoring::TDynamicCounterPtr Counters; | ||||
|
||||
NMonitoring::TDynamicCounters::TCounterPtr ActiveCompileActors; | ||||
NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize; | ||||
|
||||
explicit TCounters(NMonitoring::TDynamicCounterPtr counters) | ||||
: Counters(counters) | ||||
{ | ||||
Register(); | ||||
} | ||||
|
||||
private: | ||||
void Register() { | ||||
ActiveCompileActors = Counters->GetCounter("ActiveCompileActors", false); | ||||
CompileQueueSize = Counters->GetCounter("CompileQueueSize", false); | ||||
} | ||||
}; | ||||
|
||||
public: | ||||
TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) | ||||
: TBase(&TPurecalcCompileService::StateFunc) | ||||
, Config(config) | ||||
, InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : 1) | ||||
, LogPrefix("TPurecalcCompileService: ") | ||||
, Counters(counters) | ||||
{} | ||||
|
||||
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_SERVICE"; | ||||
|
||||
STRICT_STFUNC(StateFunc, | ||||
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); | ||||
hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle) | ||||
hFunc(TEvPrivate::TEvCompileFinished, Handle); | ||||
) | ||||
|
||||
void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { | ||||
const auto requestActor = ev->Sender; | ||||
const ui64 requestId = ev->Cookie; | ||||
LOG_ROW_DISPATCHER_TRACE("Add to compile queue request with id " << requestId << " from " << requestActor); | ||||
|
||||
// Remove old compile request | ||||
RemoveRequest(requestActor, requestId); | ||||
|
||||
// Add new request | ||||
RequestsQueue.emplace_back(std::move(ev)); | ||||
Y_ENSURE(RequestsIndex.emplace(std::make_pair(requestActor, requestId), --RequestsQueue.end()).second); | ||||
Counters.CompileQueueSize->Inc(); | ||||
|
||||
StartCompilation(); | ||||
} | ||||
|
||||
void Handle(TEvRowDispatcher::TEvPurecalcCompileAbort::TPtr& ev) { | ||||
LOG_ROW_DISPATCHER_TRACE("Abort compile request with id " << ev->Cookie << " from " << ev->Sender); | ||||
|
||||
RemoveRequest(ev->Sender, ev->Cookie); | ||||
} | ||||
|
||||
void Handle(TEvPrivate::TEvCompileFinished::TPtr& ev) { | ||||
LOG_ROW_DISPATCHER_TRACE("Compile finished for request with id " << ev->Get()->RequestId << " from " << ev->Get()->RequestActor); | ||||
|
||||
InFlightCompilations.erase(ev->Sender); | ||||
Counters.ActiveCompileActors->Dec(); | ||||
|
||||
StartCompilation(); | ||||
} | ||||
|
||||
private: | ||||
void RemoveRequest(NActors::TActorId requestActor, ui64 requestId) { | ||||
const auto it = RequestsIndex.find(std::make_pair(requestActor, requestId)); | ||||
if (it == RequestsIndex.end()) { | ||||
return; | ||||
} | ||||
|
||||
RequestsQueue.erase(it->second); | ||||
RequestsIndex.erase(it); | ||||
Counters.CompileQueueSize->Dec(); | ||||
} | ||||
|
||||
void StartCompilation() { | ||||
while (!RequestsQueue.empty() && InFlightCompilations.size() < InFlightLimit) { | ||||
auto request = std::move(RequestsQueue.front()); | ||||
RemoveRequest(request->Sender, request->Cookie); | ||||
|
||||
const auto factory = GetOrCreateFactory(request->Get()->Settings); | ||||
const auto compileActor = Register(new TPurecalcCompileActor(SelfId(), factory, std::move(request))); | ||||
Y_ENSURE(InFlightCompilations.emplace(compileActor).second); | ||||
Counters.ActiveCompileActors->Inc(); | ||||
} | ||||
} | ||||
|
||||
NYql::NPureCalc::IProgramFactoryPtr GetOrCreateFactory(const TPurecalcCompileSettings& settings) { | ||||
const auto it = ProgramFactories.find(settings); | ||||
if (it != ProgramFactories.end()) { | ||||
|
@@ -62,15 +193,23 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> | |||
} | ||||
|
||||
private: | ||||
const NConfig::TCompileServiceConfig Config; | ||||
const ui64 InFlightLimit; | ||||
const TString LogPrefix; | ||||
|
||||
std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr> RequestsQueue; | ||||
THashMap<std::pair<NActors::TActorId, ui64>, std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr>::iterator> RequestsIndex; | ||||
std::unordered_set<NActors::TActorId> InFlightCompilations; | ||||
|
||||
std::map<TPurecalcCompileSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories; | ||||
|
||||
const TCounters Counters; | ||||
}; | ||||
|
||||
} // namespace { | ||||
} // anonymous namespace | ||||
|
||||
NActors::IActor* CreatePurecalcCompileService() { | ||||
return new TPurecalcCompileService(); | ||||
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) { | ||||
return new TPurecalcCompileService(config, counters); | ||||
} | ||||
|
||||
} // namespace NFq::NRowDispatcher |
4 changes: 3 additions & 1 deletion
4
ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
#pragma once | ||
|
||
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h> | ||
|
||
#include <ydb/library/actors/core/actor.h> | ||
|
||
namespace NFq::NRowDispatcher { | ||
|
||
NActors::IActor* CreatePurecalcCompileService(); | ||
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters); | ||
|
||
} // namespace NFq::NRowDispatcher |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А куда id потерялся? Как понять какая именно компилияция зафейлилась?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id переехало в LogPrefix, чтобы точно не забыть поставить его везде:
ydb/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp
Line 42 in 6c8320c