Skip to content

Commit

Permalink
Test for describe a transfer with errors (#15404)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Mar 6, 2025
1 parent 06876fe commit f5739aa
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 30 deletions.
43 changes: 19 additions & 24 deletions ydb/core/tx/replication/service/transfer_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ class TTransferWriter
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);

hFunc(TEvWorker::TEvHandshake, HoldHandle);
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, HoldHandle);
sFunc(TEvents::TEvWakeup, GetTableScheme);
sFunc(TEvents::TEvPoison, PassAway);
Expand Down Expand Up @@ -501,7 +501,7 @@ class TTransferWriter
switch (ev->GetTypeRewrite()) {
hFunc(NFq::TEvRowDispatcher::TEvPurecalcCompileResponse, Handle);

hFunc(TEvWorker::TEvHandshake, HoldHandle);
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, HoldHandle);
//sFunc(TEvents::TEvWakeup, SendS3Request);
sFunc(TEvents::TEvPoison, PassAway);
Expand Down Expand Up @@ -545,11 +545,6 @@ class TTransferWriter
void StartWork() {
Become(&TThis::StateWork);

if (PendingWorker) {
ProcessWorker(*PendingWorker);
PendingWorker.reset();
}

if (PendingRecords) {
ProcessData(PendingPartitionId, *PendingRecords);
PendingRecords.reset();
Expand All @@ -565,21 +560,16 @@ class TTransferWriter
}
}

void HoldHandle(TEvWorker::TEvHandshake::TPtr& ev) {
Y_ABORT_UNLESS(!PendingWorker);
PendingWorker = ev->Sender;
}

void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
ProcessWorker(ev->Sender);
}

void ProcessWorker(const TActorId& worker) {
Worker = worker;
Worker = ev->Sender;
LOG_D("Handshake"
<< ": worker# " << Worker);

Send(Worker, new TEvWorker::TEvHandshake());
if (ProcessingError) {
Leave(ProcessingErrorStatus, *ProcessingError);
} else {
Send(Worker, new TEvWorker::TEvHandshake());
}
}

void HoldHandle(TEvWorker::TEvData::TPtr& ev) {
Expand Down Expand Up @@ -632,7 +622,7 @@ class TTransferWriter
STFUNC(StateWrite) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvCompleted, Handle);
hFunc(TEvWorker::TEvHandshake, HoldHandle);
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, HoldHandle);

sFunc(TEvents::TEvPoison, PassAway);
Expand Down Expand Up @@ -688,12 +678,16 @@ class TTransferWriter
this->Schedule(Delay + random, new TEvents::TEvWakeup());
}

template <typename... Args>
void Leave(Args&&... args) {
void Leave(TEvWorker::TEvGone::EStatus status, const TString& message) {
LOG_I("Leave");

Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
PassAway();
if (Worker) {
Send(Worker, new TEvWorker::TEvGone(status, message));
PassAway();
} else {
ProcessingErrorStatus = status;
ProcessingError = message;
}
}

void PassAway() override {
Expand Down Expand Up @@ -726,9 +720,10 @@ class TTransferWriter
TProgramHolder::TPtr ProgramHolder;

mutable TMaybe<TString> LogPrefix;

mutable TEvWorker::TEvGone::EStatus ProcessingErrorStatus;
mutable TMaybe<TString> ProcessingError;

std::optional<TActorId> PendingWorker;
ui32 PendingPartitionId = 0;
std::optional<TVector<TTopicMessage>> PendingRecords;

Expand Down
84 changes: 78 additions & 6 deletions ydb/tests/functional/transfer/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ struct MainTestCase {
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}

void CreateTopic() {
void CreateTopic(size_t partitionCount = 10) {
auto res = Session.ExecuteQuery(Sprintf(R"(
CREATE TOPIC `%s`
WITH (
min_active_partitions = 10
min_active_partitions = %d
);
)", TopicName.data()), TTxControl::NoTx()).GetValueSync();
)", TopicName.data(), partitionCount), TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}

Expand Down Expand Up @@ -194,7 +194,7 @@ struct MainTestCase {
TDescribeReplicationSettings settings;
settings.IncludeStats(true);

return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings);
return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
}

auto DescribeTopic() {
Expand Down Expand Up @@ -828,14 +828,14 @@ Y_UNIT_TEST_SUITE(Transfer)
});

{
auto result = testCase.DescribeTransfer().ExtractValueSync();
auto result = testCase.DescribeTransfer();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
}

testCase.DropTransfer();

{
auto result = testCase.DescribeTransfer().ExtractValueSync();
auto result = testCase.DescribeTransfer();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus());
}
Expand Down Expand Up @@ -897,5 +897,77 @@ Y_UNIT_TEST_SUITE(Transfer)
}
}

Y_UNIT_TEST(DescribeError_OnLambdaCompilation)
{
MainTestCase testCase;
testCase.CreateTable(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
Message Utf8 NOT NULL,
PRIMARY KEY (Key)
) WITH (
STORE = COLUMN
);
)");

testCase.CreateTopic(1);
testCase.CreateTransfer(R"(
$l = ($x) -> {
return $x._unknown_field_for_lambda_compilation_error;
};
)");

for (size_t i = 20; i--;) {
auto result = testCase.DescribeTransfer().GetReplicationDescription();
if (TReplicationDescription::EState::Error == result.GetState()) {
Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("_unknown_field_for_lambda_compilation_error"));
break;
}

UNIT_ASSERT_C(i, "Unable to wait transfer error");
Sleep(TDuration::Seconds(1));
}
}

Y_UNIT_TEST(DescribeError_OnWriteToShard)
{
MainTestCase testCase;
testCase.CreateTable(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
Message Utf8,
PRIMARY KEY (Key)
) WITH (
STORE = COLUMN
);
)");

testCase.CreateTopic(1);
testCase.CreateTransfer(R"(
$l = ($x) -> {
return [
<|
Key:null,
Message:CAST($x._data AS Utf8)
|>
];
};
)");

testCase.Write({"message-1"});

for (size_t i = 20; i--;) {
auto result = testCase.DescribeTransfer().GetReplicationDescription();
if (TReplicationDescription::EState::Error == result.GetState()) {
Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("Cannot write data into shard"));
break;
}

UNIT_ASSERT_C(i, "Unable to wait transfer error");
Sleep(TDuration::Seconds(1));
}
}
}

0 comments on commit f5739aa

Please sign in to comment.