Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3926 RD added parallel purecalc compilation #12505

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ message TJsonParserConfig {
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
}

message TCompileServiceConfig {
uint64 ParallelCompilationLimit = 1; // 1 by default
}

message TRowDispatcherConfig {
bool Enabled = 1;
uint64 TimeoutBeforeStartSessionSec = 2;
uint64 SendStatusPeriodSec = 3;
uint64 MaxSessionUsedMemory = 4;
bool WithoutConsumer = 5;
TJsonParserConfig JsonParser = 7;
TCompileServiceConfig CompileService = 8;
TRowDispatcherCoordinatorConfig Coordinator = 6;
}
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/events/data_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct TEvRowDispatcher {
EvGetInternalStateResponse,
EvPurecalcCompileRequest,
EvPurecalcCompileResponse,
EvPurecalcCompileAbort,
EvEnd,
};

Expand Down Expand Up @@ -197,6 +198,8 @@ struct TEvRowDispatcher {
NYql::NDqProto::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};

struct TEvPurecalcCompileAbort : public NActors::TEventLocal<TEvPurecalcCompileAbort, EEv::EvPurecalcCompileAbort> {};
};

} // namespace NFq
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TTopicFilters : public ITopicFilters {
NMonitoring::TDynamicCounters::TCounterPtr InFlightCompileRequests;
NMonitoring::TDynamicCounters::TCounterPtr CompileErrors;

TCounters(NMonitoring::TDynamicCounterPtr counters)
explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
Register();
Expand Down Expand Up @@ -83,6 +83,18 @@ class TTopicFilters : public ITopicFilters {
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId));
}

void AbortCompilation() {
if (!InFlightCompilationId) {
return;
}

LOG_ROW_DISPATCHER_TRACE("Send abort compile request with id " << InFlightCompilationId);
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, new TEvRowDispatcher::TEvPurecalcCompileAbort(), 0, InFlightCompilationId));

InFlightCompilationId = 0;
Self.Counters.InFlightCompileRequests->Dec();
}

void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) {
if (ev->Cookie != InFlightCompilationId) {
LOG_ROW_DISPATCHER_DEBUG("Outdated compiler response ignored for id " << ev->Cookie << ", current compile id " << InFlightCompilationId);
Expand Down Expand Up @@ -205,7 +217,14 @@ class TTopicFilters : public ITopicFilters {

void RemoveFilter(NActors::TActorId filterId) override {
LOG_ROW_DISPATCHER_TRACE("Remove filter with id " << filterId);
Filters.erase(filterId);

const auto it = Filters.find(filterId);
if (it == Filters.end()) {
return;
}

it->second.AbortCompilation();
Filters.erase(it);
}

TFiltersStatistic GetStatistics() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void Handle(NActors::TEvents::TEvPoison::TPtr&) {
with_lock(Alloc) {
Clients.clear();
if (Filters) {
for (const auto& [clientId, _] : Clients) {
Filters->RemoveFilter(clientId);
}
Filters.Reset();
}
PassAway();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TFiterFixture : public TBaseFixture {
virtual void SetUp(NUnitTest::TTestContext& ctx) override {
TBase::SetUp(ctx);

CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService());
CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService({}, MakeIntrusive<NMonitoring::TDynamicCounters>()));
}

virtual void TearDown(NUnitTest::TTestContext& ctx) override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>

#include <yql/essentials/public/purecalc/common/interface.h>
Expand All @@ -12,26 +13,49 @@ namespace NFq::NRowDispatcher {

namespace {

class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
using TBase = NActors::TActor<TPurecalcCompileService>;
struct TEvPrivate {
// Event ids
enum EEv : ui32 {
EvCompileFinished = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");

// Events
struct TEvCompileFinished : public NActors::TEventLocal<TEvCompileFinished, EvCompileFinished> {
TEvCompileFinished(NActors::TActorId requestActor, ui64 requestId)
: RequestActor(requestActor)
, RequestId(requestId)
{}

const NActors::TActorId RequestActor;
const ui64 RequestId;
};
};

class TPurecalcCompileActor : public NActors::TActorBootstrapped<TPurecalcCompileActor> {
public:
TPurecalcCompileService()
: TBase(&TPurecalcCompileService::StateFunc)
, LogPrefix("TPurecalcCompileService: ")
TPurecalcCompileActor(NActors::TActorId owner, NYql::NPureCalc::IProgramFactoryPtr factory, TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr request)
: Owner(owner)
, Factory(factory)
, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ")
, Request(std::move(request))
{}

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
)
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_ACTOR";

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie);
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);
void Bootstrap() {
Y_DEFER {
Finish();
};

LOG_ROW_DISPATCHER_TRACE("Started compile request");
IProgramHolder::TPtr programHolder = std::move(Request->Get()->ProgramHolder);

TStatus status = TStatus::Success();
try {
programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings));
programHolder->CreateProgram(Factory);
} catch (const NYql::NPureCalc::TCompileError& error) {
status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues())
.AddIssue(TStringBuilder() << "Final yql: " << error.GetYql())
Expand All @@ -41,15 +65,122 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
}

if (status.IsFail()) {
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie);
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, ev->Cookie);
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А куда id потерялся? Как понять какая именно компилияция зафейлилась?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id переехало в LogPrefix, чтобы точно не забыть поставить его везде:

, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ")

Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, Request->Cookie);
} else {
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie);
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сюда тоже нужен какой-то идентификатор, иначе спам будет бесполезный

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут тоже, id переехало в LogPrefix:

, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ")

Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, Request->Cookie);
}
}

private:
void Finish() {
Send(Owner, new TEvPrivate::TEvCompileFinished(Request->Sender, Request->Cookie));
PassAway();
}

private:
const NActors::TActorId Owner;
const NYql::NPureCalc::IProgramFactoryPtr Factory;
const TString LogPrefix;

TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr Request;
};

class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
using TBase = NActors::TActor<TPurecalcCompileService>;

struct TCounters {
const NMonitoring::TDynamicCounterPtr Counters;

NMonitoring::TDynamicCounters::TCounterPtr ActiveCompileActors;
NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize;

explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
Register();
}

private:
void Register() {
ActiveCompileActors = Counters->GetCounter("ActiveCompileActors", false);
CompileQueueSize = Counters->GetCounter("CompileQueueSize", false);
}
};

public:
TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters)
: TBase(&TPurecalcCompileService::StateFunc)
, Config(config)
, InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : 1)
, LogPrefix("TPurecalcCompileService: ")
, Counters(counters)
{}

static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_SERVICE";

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle)
hFunc(TEvPrivate::TEvCompileFinished, Handle);
)

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
const auto requestActor = ev->Sender;
const ui64 requestId = ev->Cookie;
LOG_ROW_DISPATCHER_TRACE("Add to compile queue request with id " << requestId << " from " << requestActor);

// Remove old compile request
RemoveRequest(requestActor, requestId);

// Add new request
RequestsQueue.emplace_back(std::move(ev));
Y_ENSURE(RequestsIndex.emplace(std::make_pair(requestActor, requestId), --RequestsQueue.end()).second);
Counters.CompileQueueSize->Inc();

StartCompilation();
}

void Handle(TEvRowDispatcher::TEvPurecalcCompileAbort::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Abort compile request with id " << ev->Cookie << " from " << ev->Sender);

RemoveRequest(ev->Sender, ev->Cookie);
}

void Handle(TEvPrivate::TEvCompileFinished::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Compile finished for request with id " << ev->Get()->RequestId << " from " << ev->Get()->RequestActor);

InFlightCompilations.erase(ev->Sender);
Counters.ActiveCompileActors->Dec();

StartCompilation();
}

private:
void RemoveRequest(NActors::TActorId requestActor, ui64 requestId) {
const auto it = RequestsIndex.find(std::make_pair(requestActor, requestId));
if (it == RequestsIndex.end()) {
return;
}

RequestsQueue.erase(it->second);
RequestsIndex.erase(it);
Counters.CompileQueueSize->Dec();
}

void StartCompilation() {
while (!RequestsQueue.empty() && InFlightCompilations.size() < InFlightLimit) {
auto request = std::move(RequestsQueue.front());
RemoveRequest(request->Sender, request->Cookie);

const auto factory = GetOrCreateFactory(request->Get()->Settings);
const auto compileActor = Register(new TPurecalcCompileActor(SelfId(), factory, std::move(request)));
Y_ENSURE(InFlightCompilations.emplace(compileActor).second);
Counters.ActiveCompileActors->Inc();
}
}

NYql::NPureCalc::IProgramFactoryPtr GetOrCreateFactory(const TPurecalcCompileSettings& settings) {
const auto it = ProgramFactories.find(settings);
if (it != ProgramFactories.end()) {
Expand All @@ -62,15 +193,23 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
}

private:
const NConfig::TCompileServiceConfig Config;
const ui64 InFlightLimit;
const TString LogPrefix;

std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr> RequestsQueue;
THashMap<std::pair<NActors::TActorId, ui64>, std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr>::iterator> RequestsIndex;
std::unordered_set<NActors::TActorId> InFlightCompilations;

std::map<TPurecalcCompileSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;

const TCounters Counters;
};

} // namespace {
} // anonymous namespace

NActors::IActor* CreatePurecalcCompileService() {
return new TPurecalcCompileService();
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) {
return new TPurecalcCompileService(config, counters);
}

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>

#include <ydb/library/actors/core/actor.h>

namespace NFq::NRowDispatcher {

NActors::IActor* CreatePurecalcCompileService();
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters);

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(

PEERDIR(
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/row_dispatcher/events
ydb/core/fq/libs/row_dispatcher/format_handler/common
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ void TRowDispatcher::Bootstrap() {
auto coordinatorId = Register(NewCoordinator(SelfId(), config, YqSharedResources, Tenant, Counters).release());
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());

CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService());
CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService(Config.GetCompileService(), Counters));

Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Expand Down
Loading