Skip to content

Commit

Permalink
YQ-3561 add features for FQ-run (#14826)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 20, 2025
1 parent fe62452 commit 3ee5135
Show file tree
Hide file tree
Showing 20 changed files with 351 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct TDummyTopic {
TString TopicName;
TMaybe<TString> Path;
size_t PartitionsCount;
bool CancelOnFileFinish = false;
};

// Dummy Pq gateway for tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ class TFileTopicReadSession : public NYdb::NTopic::IReadSession {
constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);

public:
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "")
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "", bool cancelOnFileFinish = false)
: File_(std::move(file))
, Session_(std::move(session))
, ProducerId_(producerId)
, FilePoller_([this] () {
PollFileForChanges();
})
, Counters_()
, CancelOnFileFinish_(cancelOnFileFinish)
{
Pool_.Start(1);
}
Expand Down Expand Up @@ -123,7 +124,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
size_t size = 0;
ui64 maxBatchRowSize = 100;

while (size_t read = fi.ReadLine(rawMsg)) {
while (fi.ReadLine(rawMsg)) {
msgs.emplace_back(MakeNextMessage(rawMsg));
MsgOffset_++;
if (!maxBatchRowSize--) {
Expand All @@ -133,6 +134,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
}
if (!msgs.empty()) {
EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size);
} else if (CancelOnFileFinish_) {
EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size);
}

Sleep(FILE_POLL_PERIOD);
Expand All @@ -145,6 +148,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
TString ProducerId_;
std::thread FilePoller_;
NYdb::NTopic::TReaderCounters::TPtr Counters_;
bool CancelOnFileFinish_ = false;

TThreadPool Pool_;
size_t MsgOffset_ = 0;
Expand Down Expand Up @@ -336,6 +340,10 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession {
}
};

TFileTopicClient::TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics)
: Topics_(std::move(topics))
{}

std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) {
Y_ENSURE(!settings.Topics_.empty());
const auto& topic = settings.Topics_.front();
Expand All @@ -358,7 +366,8 @@ std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(
ui64 sessionId = 0;
return std::make_shared<TFileTopicReadSession>(
TFile(*filePath, EOpenMode::TEnum::RdOnly),
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId)
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId),
"", topicsIt->second.CancelOnFileFinish
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace NYql {
struct TFileTopicClient : public ITopicClient {
TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics): Topics_(topics) {}
explicit TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics);

NYdb::TAsyncStatus CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings = {}) override;

Expand Down Expand Up @@ -32,5 +32,7 @@ struct TFileTopicClient : public ITopicClient {

private:
THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> Topics_;
bool CancelOnFileFinish_ = false;
};
}

}
2 changes: 2 additions & 0 deletions ydb/tests/tools/fqrun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ sync_dir
*.conf
*.parquet
*.json
*.svg
*.txt
11 changes: 10 additions & 1 deletion ydb/tests/tools/fqrun/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

Tool can be used to execute streaming queries by using FQ proxy infrastructure.

For profiling memory allocations build fqrun with ya make flags `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`.

## Scripts

* `flame_graph.sh` - script for collecting flame graphs in svg format, usage:
```(bash)
./flame_graph.sh [graph collection time in seconds]
```
## Examples
### Queries
Expand All @@ -25,7 +34,7 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure.
./fqrun -M 32000
```
Monitoring endpoint: https://localhost:32000
Monitoring endpoint: http://localhost:32000
* gRPC endpoint:
```(bash)
Expand Down
46 changes: 46 additions & 0 deletions ydb/tests/tools/fqrun/configuration/as_config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 10
Name: "System"
}
Executor {
Type: BASIC
Threads: 6
SpinThreshold: 1
Name: "User"
}
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 1
Name: "Batch"
}
Executor {
Type: IO
Threads: 1
Name: "IO"
}
Executor {
Type: BASIC
Threads: 2
SpinThreshold: 10
Name: "IC"
TimePerMailboxMicroSecs: 100
}

Scheduler {
Resolution: 64
SpinThreshold: 0
ProgressThreshold: 10000
}

SysExecutor: 0
UserExecutor: 1
IoExecutor: 3
BatchExecutor: 2

ServiceExecutor {
ServiceName: "Interconnect"
ExecutorId: 4
}
25 changes: 25 additions & 0 deletions ydb/tests/tools/fqrun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

set -eux

function cleanup {
sudo rm ./profdata
rm ./profdata.txt
}
trap cleanup EXIT

if [ $# -gt 1 ]; then
echo "Too many arguments"
exit -1
fi

fqrun_pid=$(pgrep -u $USER fqrun)

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'}
sudo perf script -i profdata > profdata.txt

SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)

flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/"

${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg
65 changes: 63 additions & 2 deletions ydb/tests/tools/fqrun/fqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@

#include <util/datetime/base.h>

#include <ydb/core/blob_depot/mon_main.h>
#include <ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h>
#include <ydb/tests/tools/fqrun/src/fq_runner.h>
#include <ydb/tests/tools/kqprun/runlib/application.h>
#include <ydb/tests/tools/kqprun/runlib/utils.h>

#ifdef PROFILE_MEMORY_ALLOCATIONS
#include <library/cpp/lfalloc/alloc_profiler/profiler.h>
#endif

using namespace NKikimrRun;

namespace NFqRun {
Expand Down Expand Up @@ -156,6 +161,21 @@ class TMain : public TMainBase {
}
});

options.AddLongOption("as-cfg", "File with actor system config (TActorSystemConfig), use '-' for default")
.RequiredArgument("file")
.DefaultValue("./configuration/as_config.conf")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
const TString file(option->CurValOrDef());
if (file == "-") {
return;
}

RunnerOptions.FqSettings.ActorSystemConfig = NKikimrConfig::TActorSystemConfig();
if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &(*RunnerOptions.FqSettings.ActorSystemConfig))) {
ythrow yexception() << "Bad format of actor system configuration";
}
});

options.AddLongOption("emulate-s3", "Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files")
.NoArgument()
.SetFlag(&RunnerOptions.FqSettings.EmulateS3);
Expand All @@ -165,17 +185,27 @@ class TMain : public TMainBase {
.Handler1([this](const NLastGetopt::TOptsParser* option) {
TStringBuf topicName, others;
TStringBuf(option->CurVal()).Split('@', topicName, others);

TStringBuf path, partitionCountStr;
TStringBuf(others).Split(':', path, partitionCountStr);
size_t partitionCount = !partitionCountStr.empty() ? FromString<size_t>(partitionCountStr) : 1;
if (!partitionCount) {
ythrow yexception() << "Topic partition count should be at least one";
}
if (topicName.empty() || path.empty()) {
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl;
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]";
}
if (!PqFilesMapping.emplace(topicName, NYql::TDummyTopic("pq", TString(topicName), TString(path), partitionCount)).second) {
ythrow yexception() << "Got duplicated topic name: " << topicName;
}
});

options.AddLongOption("cancel-on-file-finish", "Cancel emulate YDS topics when topic file finished")
.RequiredArgument("topic")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
TopicsSettings[option->CurVal()].CancelOnFileFinish = true;
});

// Outputs

options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)")
Expand Down Expand Up @@ -210,6 +240,7 @@ class TMain : public TMainBase {
ExecutionOptions.Validate(RunnerOptions);

RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE);
RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get();

auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways();
FillTokens(gatewayConfig.mutable_pq());
Expand All @@ -224,14 +255,39 @@ class TMain : public TMainBase {

if (!PqFilesMapping.empty()) {
auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>();
for (const auto& [_, topic] : PqFilesMapping) {
for (auto [_, topic] : PqFilesMapping) {
if (const auto it = TopicsSettings.find(topic.TopicName); it != TopicsSettings.end()) {
topic.CancelOnFileFinish = it->second.CancelOnFileFinish;
TopicsSettings.erase(it);
}
fileGateway->AddDummyTopic(topic);
}
RunnerOptions.FqSettings.PqGateway = std::move(fileGateway);
}
if (!TopicsSettings.empty()) {
ythrow yexception() << "Found topic settings for not existing topic: '" << TopicsSettings.begin()->first << "'";
}

#ifdef PROFILE_MEMORY_ALLOCATIONS
if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl;
}
NAllocProfiler::StartAllocationSampling(true);
#else
if (ProfileAllocationsOutput) {
ythrow yexception() << "Profile memory allocations disabled, please rebuild fqrun with flag `-D PROFILE_MEMORY_ALLOCATIONS`";
}
#endif

RunScript(ExecutionOptions, RunnerOptions);

#ifdef PROFILE_MEMORY_ALLOCATIONS
if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl;
}
FinishProfileMemoryAllocations();
#endif

return 0;
}

Expand All @@ -249,6 +305,11 @@ class TMain : public TMainBase {
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;
std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;

struct TTopicSettings {
bool CancelOnFileFinish = false;
};
std::unordered_map<TString, TTopicSettings> TopicsSettings;
};

} // anonymous namespace
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/fqrun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/tests/tools/kqprun/runlib/settings.h>

#include <yql/essentials/minikql/mkql_function_registry.h>

namespace NFqRun {

constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
Expand All @@ -27,8 +29,10 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings {

TString YqlToken;
NYql::IPqGateway::TPtr PqGateway;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
NFq::NConfig::TConfig FqConfig;
NKikimrConfig::TLogConfig LogConfig;
std::optional<NKikimrConfig::TActorSystemConfig> ActorSystemConfig;
};

struct TRunnerOptions {
Expand Down
12 changes: 12 additions & 0 deletions ydb/tests/tools/fqrun/src/fq_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class TFqSetup::TImpl {
serverSettings.SetLoggerInitializer(loggerInitializer);
}

void SetFunctionRegistry(NKikimr::Tests::TServerSettings& serverSettings) const {
if (Settings.FunctionRegistry) {
serverSettings.SetFrFactory([this](const NKikimr::NScheme::TTypeRegistry&) {
return Settings.FunctionRegistry.Get();
});
}
}

NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) {
NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort());

Expand All @@ -52,9 +60,13 @@ class TFqSetup::TImpl {

NKikimrConfig::TAppConfig config;
*config.MutableLogConfig() = Settings.LogConfig;
if (Settings.ActorSystemConfig) {
*config.MutableActorSystemConfig() = *Settings.ActorSystemConfig;
}
serverSettings.SetAppConfig(config);

SetLoggerSettings(serverSettings);
SetFunctionRegistry(serverSettings);

if (Settings.MonitoringEnabled) {
serverSettings.InitKikimrRunConfig();
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/fqrun/src/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ SRCS(
PEERDIR(
library/cpp/colorizer
library/cpp/testing/unittest
util
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/control_plane_proxy/events
ydb/core/fq/libs/init
Expand All @@ -20,6 +19,7 @@ PEERDIR(
ydb/library/security
ydb/library/yql/providers/pq/provider
ydb/tests/tools/kqprun/runlib
yql/essentials/minikql
)

YQL_LAST_ABI_VERSION()
Expand Down
Loading

0 comments on commit 3ee5135

Please sign in to comment.