Skip to content

Commit

Permalink
merge to 25-1: Fix describe_volume and alter_volume (#15056)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Feb 26, 2025
1 parent d408be7 commit 79eee97
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 66 deletions.
151 changes: 93 additions & 58 deletions ydb/core/grpc_services/rpc_keyvalue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,58 +432,6 @@ class TDropVolumeRequest : public TRpcSchemeRequestActor<TDropVolumeRequest, TEv
}
};

class TAlterVolumeRequest : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> {
public:
using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
using TBase::TBase;

void Bootstrap(const TActorContext& ctx) {
TBase::Bootstrap(ctx);
Become(&TAlterVolumeRequest::StateFunc);
SendProposeRequest(ctx);
}

void SendProposeRequest(const TActorContext &ctx) {
const auto req = this->GetProtoRequest();

std::pair<TString, TString> pathPair;
try {
pathPair = SplitPath(req->path());
} catch (const std::exception& ex) {
Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
return Reply(StatusIds::BAD_REQUEST, ctx);
}
const auto& workingDir = pathPair.first;
const auto& name = pathPair.second;

std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetWorkingDir(workingDir);
NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;

modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
tableDesc = modifyScheme->MutableAlterSolomonVolume();
tableDesc->SetName(name);
tableDesc->SetPartitionCount(req->alter_partition_count());

if (GetProtoRequest()->has_storage_config()) {
tableDesc->SetUpdateChannelsBinding(true);
auto &storageConfig = GetProtoRequest()->storage_config();
auto *internalStorageConfig = tableDesc->MutableStorageConfig();
AssignPoolKinds(storageConfig, internalStorageConfig);
} else {
tableDesc->SetUpdateChannelsBinding(false);
tableDesc->SetChannelProfileId(0);
}

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}

STFUNC(StateFunc) {
return TBase::StateWork(ev);
}
};

template <typename TDerived>
class TBaseKeyValueRequest {
Expand Down Expand Up @@ -621,12 +569,10 @@ class TDescribeVolumeRequest
Ydb::KeyValue::DescribeVolumeResult result;
result.set_path(this->GetProtoRequest()->path());
result.set_partition_count(desc.PartitionsSize());
if (desc.PartitionsSize() > 0) {
auto *storageConfig = result.mutable_storage_config();
for (auto &channel : desc.GetPartitions(0).GetBoundChannels()) {
auto *channelBind = storageConfig->add_channel();
channelBind->set_media(channel.GetStoragePoolName());
}
auto *storageConfig = result.mutable_storage_config();
for (auto &channel : desc.GetBoundChannels()) {
auto *channelBind = storageConfig->add_channel();
channelBind->set_media(channel.GetStoragePoolName());
}
this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext());
}
Expand All @@ -640,6 +586,95 @@ class TDescribeVolumeRequest
};


class TAlterVolumeRequest
: public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>
, public TBaseKeyValueRequest<TAlterVolumeRequest>
{
public:
using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
using TBase::TBase;
using TBaseKeyValueRequest::TBaseKeyValueRequest;
friend class TBaseKeyValueRequest<TAlterVolumeRequest>;

void Bootstrap(const TActorContext& ctx) {
TBase::Bootstrap(ctx);
Become(&TAlterVolumeRequest::StateWork);
if (GetProtoRequest()->has_storage_config()) {
StorageConfig = GetProtoRequest()->storage_config();
SendProposeRequest(ctx);
} else {
OnBootstrap();
}
}

bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) {
return true;
}

void SendProposeRequest(const TActorContext &ctx) {
const auto req = this->GetProtoRequest();

std::pair<TString, TString> pathPair;
try {
pathPair = SplitPath(req->path());
} catch (const std::exception& ex) {
Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
return Reply(StatusIds::BAD_REQUEST, ctx);
}
const auto& workingDir = pathPair.first;
const auto& name = pathPair.second;

std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetWorkingDir(workingDir);
NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;

modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
tableDesc = modifyScheme->MutableAlterSolomonVolume();
tableDesc->SetName(name);
tableDesc->SetPartitionCount(req->alter_partition_count());

if (GetProtoRequest()->has_storage_config()) {
tableDesc->SetUpdateChannelsBinding(true);
}
auto *internalStorageConfig = tableDesc->MutableStorageConfig();
AssignPoolKinds(StorageConfig, internalStorageConfig);

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}

STFUNC(StateWork) {
Cerr << "TAlterVolumeRequest::StateWork; received event: " << ev->GetTypeName() << Endl;
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();

if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) {
return;
}

const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description;
for (auto &channel : desc.GetBoundChannels()) {
auto *channelBind = StorageConfig.add_channel();
channelBind->set_media(channel.GetStoragePoolName());
}
SendProposeRequest(TActivationContext::AsActorContext());
}

private:
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
Ydb::KeyValue::StorageConfig StorageConfig;
};


class TListLocalPartitionsRequest
: public TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest>
, public TBaseKeyValueRequest<TListLocalPartitionsRequest>
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,7 @@ message TSolomonVolumeDescription {
optional uint64 PathId = 2;
optional uint64 PartitionCount = 3;
repeated TPartition Partitions = 4;
repeated NKikimrStoragePool.TChannelBind BoundChannels = 5;
}

message TCreateSolomonVolume {
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,16 @@ class TAlterSolomon: public TSubOperation {
return result;
}

if (!alter.HasChannelProfileId()) {
result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please");
return result;
}

TChannelsBindings channelsBinding;
bool isResolved = false;
if (alter.HasStorageConfig()) {
isResolved = context.SS->ResolveSolomonChannels(alter.GetStorageConfig(), path.GetPathIdForDomain(), channelsBinding);
} else {
isResolved = context.SS->ResolveSolomonChannels(channelProfileId, path.GetPathIdForDomain(), channelsBinding);
if (!alter.HasChannelProfileId()) {
result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please");
return result;
}
isResolved = context.SS->ResolveSolomonChannels(alter.GetChannelProfileId(), path.GetPathIdForDomain(), channelsBinding);
}
if (!isResolved) {
result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool");
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,15 @@ void TPathDescriber::DescribeSolomonVolume(TPathId pathId, TPathElement::TPtr pa
}
}

if (solomonVolumeInfo->Partitions.size() > 0) {
auto shardId = solomonVolumeInfo->Partitions.begin()->first;
auto shardInfo = Self->ShardInfos.FindPtr(shardId);
Y_ABORT_UNLESS(shardInfo);
for (const auto& channel : shardInfo->BindedChannels) {
entry->AddBoundChannels()->CopyFrom(channel);
}
}

Sort(entry->MutablePartitions()->begin(),
entry->MutablePartitions()->end(),
[] (auto& part1, auto& part2) {
Expand Down
150 changes: 148 additions & 2 deletions ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9216,7 +9216,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
"PartitionCount: 40 ");
env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"),
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40)});
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40),
[](const NKikimrScheme::TEvDescribeSchemeResult& result){
const auto& desc = result.GetPathDescription().GetSolomonDescription();
const auto& boundChannels = desc.GetBoundChannels();
UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1");
}});

// Already exists
TestCreateSolomon(runtime, ++txId, "/MyRoot", "Name: \"Solomon\" "
Expand Down Expand Up @@ -9317,7 +9326,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"),
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4)});
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4),
[](const NKikimrScheme::TEvDescribeSchemeResult& result){
const auto& desc = result.GetPathDescription().GetSolomonDescription();
const auto& boundChannels = desc.GetBoundChannels();
UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1");
}});

TestDropSolomon(runtime, ++txId, "/MyRoot", "Solomon");
env.TestWaitNotification(runtime, txId);
Expand Down Expand Up @@ -9430,6 +9448,134 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
UpdateChannelsBindingSolomon(true);
}

void UpdateChannelsBindingSolomonStorageConfig() {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().AllowUpdateChannelsBindingOfSolomonPartitions(true));
ui64 txId = 100;

auto check = [&](const TString& path, ui64 shards, const THashMap<TString, ui32>& expectedChannels) {
NKikimrSchemeOp::TDescribeOptions opts;
opts.SetReturnChannelsBinding(true);

auto makeChannels = [](const auto& boundsChannels) {
THashMap<TString, ui32> channels;
for (const auto& channel : boundsChannels) {
channels[channel.GetStoragePoolName()]++;
}
return channels;
};

TestDescribeResult(DescribePath(runtime, path, opts), {
NLs::Finished,
NLs::ShardsInsideDomain(shards),
[&expectedChannels, &makeChannels, &shards] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& desc = record.GetPathDescription().GetSolomonDescription();

UNIT_ASSERT_VALUES_EQUAL(shards, desc.PartitionsSize());

for (size_t i = 0; i < desc.PartitionsSize(); ++i) {
const auto& partition = desc.GetPartitions(i);

THashMap<TString, ui32> channels = makeChannels(partition.GetBoundChannels());
UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), channels.size());

for (const auto& [name, count] : expectedChannels) {
UNIT_ASSERT_C(channels.contains(name), "Cannot find channel: " << name);
UNIT_ASSERT_VALUES_EQUAL(channels.at(name), count);
}
}

THashMap<TString, ui32> volumeChannels = makeChannels(desc.GetBoundChannels());
UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), volumeChannels.size());

for (const auto& [name, count] : expectedChannels) {
UNIT_ASSERT_C(volumeChannels.contains(name), "Cannot find channel: " << name);
UNIT_ASSERT_VALUES_EQUAL(volumeChannels.at(name), count);
}
}
});
};

TestCreateSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
PartitionCount: 1
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
}
)");

env.TestWaitNotification(runtime, txId);
check("/MyRoot/Solomon", 1, {{{"pool-1", 3}}});
// case 1: empty alter
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
}
)", {NKikimrScheme::StatusInvalidParameter});

// case 2: add partition
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
PartitionCount: 2
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
}
)");

env.TestWaitNotification(runtime, txId);
check("/MyRoot/Solomon", 2, {{"pool-1", 3}});

// case 3: add partition & update channels binding
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
PartitionCount: 3
UpdateChannelsBinding: true
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
}
)");

env.TestWaitNotification(runtime, txId);
check("/MyRoot/Solomon", 3, {{"pool-2", 3}});
}

Y_UNIT_TEST(UpdateChannelsBindingSolomonStorageConfig) {
UpdateChannelsBindingSolomonStorageConfig();
}

Y_UNIT_TEST(RejectAlterSolomon) { //+
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
Loading

0 comments on commit 79eee97

Please sign in to comment.