Skip to content

Commit

Permalink
Merge 997aee1 into 4c3f422
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Mar 9, 2025
2 parents 4c3f422 + 997aee1 commit bbb3b08
Show file tree
Hide file tree
Showing 58 changed files with 2,438 additions and 1,100 deletions.
15 changes: 15 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ class IChunkedArray {
}
virtual std::optional<ui64> DoGetRawSize() const = 0;
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const = 0;
virtual std::optional<bool> DoCheckOneValueAccessor(std::shared_ptr<arrow::Scalar>& /*value*/) const {
return std::nullopt;
}

virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray(
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
Expand Down Expand Up @@ -328,6 +331,18 @@ class IChunkedArray {
public:
std::shared_ptr<IChunkedArray> ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const;

virtual bool HasWholeDataVolume() const {
return true;
}

std::optional<bool> CheckOneValueAccessor(std::shared_ptr<arrow::Scalar>& value) const {
return DoCheckOneValueAccessor(value);
}

virtual bool HasSubColumnData(const TString& /*subColumnName*/) const {
return true;
}

virtual void Reallocate() {

}
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ class TSparsedArray: public IChunkedArray {
Record.VisitValues(visitor);
}

virtual std::optional<bool> DoCheckOneValueAccessor(std::shared_ptr<arrow::Scalar>& value) const override {
if (Record.GetNotDefaultRecordsCount()) {
return false;
} else {
value = DefaultValue;
return true;
}
}

protected:
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/formats/arrow/accessor/sub_columns/partial.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ class TSubColumnsPartialArray: public IChunkedArray {
, Header(std::move(header)) {
}

virtual bool HasWholeDataVolume() const override {
return false;
}

virtual bool HasSubColumnData(const TString& subColumnName) const override {
return !NeedFetch(std::string_view(subColumnName.data(), subColumnName.size()));
}

static std::shared_ptr<TSubColumnsPartialArray> BuildEmpty(const std::shared_ptr<arrow::DataType>& dataType, const ui32 recordsCount) {
return std::make_shared<TSubColumnsPartialArray>(TSubColumnsHeader::BuildEmpty(), recordsCount, dataType);
}
Expand Down
56 changes: 56 additions & 0 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,4 +785,60 @@ TString TColumnFilter::DebugString() const {
return sb;
}

TColumnFilter TColumnFilter::Cut(const ui32 recordsCount, const ui32 limit, const bool reverse) const {
if (IsTotalDenyFilter()) {
return TColumnFilter::BuildDenyFilter();
}
TColumnFilter result = TColumnFilter::BuildAllowFilter();
if (IsTotalAllowFilter()) {
if (recordsCount <= limit) {
return result;
}
if (reverse) {
result.Add(false, recordsCount - limit);
result.Add(true, limit);
} else {
result.Add(true, limit);
result.Add(false, recordsCount - limit);
}
} else {
AFL_VERIFY(GetRecordsCountVerified() == recordsCount)("filter", GetRecordsCountVerified())("ext", recordsCount);
ui32 cutCount = 0;
bool currentValue = reverse ? LastValue : GetStartValue();
const auto scan = [&](auto begin, auto end) {
for (auto it = begin; it != end; ++it) {
AFL_VERIFY(cutCount <= limit);
if (currentValue) {
if (cutCount < limit) {
if (limit <= cutCount + *it) {
result.Add(true, limit - cutCount);
result.Add(false, cutCount + *it - limit);
cutCount = limit;
} else {
result.Add(true, *it);
cutCount += *it;
}
} else {
result.Add(false, *it);
}
}
if (!currentValue) {
result.Add(false, *it);
}
currentValue = !currentValue;
}
};
if (reverse) {
scan(Filter.rbegin(), Filter.rend());
} else {
scan(Filter.begin(), Filter.end());
}
if (reverse) {
std::reverse(result.Filter.begin(), result.Filter.end());
result.LastValue = result.GetStartValue();
}
}
return result;
}

} // namespace NKikimr::NArrow
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class TColumnFilter {
bool Next();
};

TColumnFilter Cut(const ui32 recordsCount, const ui32 limit, const bool reverse) const;

TSlicesIterator BuildSlicesIterator(const std::optional<ui32> startIndex, const std::optional<ui32> count) const {
return TSlicesIterator(*this, startIndex, count);
}
Expand Down
35 changes: 9 additions & 26 deletions ydb/core/formats/arrow/program/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,21 @@ namespace NKikimr::NArrow::NSSA {
NJson::TJsonValue IResourceProcessor::DebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
if (Input.size()) {
result.InsertValue("input", JoinSeq(",", Input));
result.InsertValue("i", JoinSeq(",", Input));
}
if (Output.size()) {
result.InsertValue("output", JoinSeq(",", Output));
result.InsertValue("o", JoinSeq(",", Output));
}
result.InsertValue("type", ::ToString(ProcessorType));
result.InsertValue("internal", DoDebugJson());
return result;
}

TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
for (auto&& i : Output) {
if (resources->HasColumn(i.GetColumnId())) {
return TConclusionStatus::Fail("column " + ::ToString(i.GetColumnId()) + " has already");
}
result.InsertValue("t", ::ToString(ProcessorType));
auto internalJson = DoDebugJson();
if (!internalJson.IsMap() || internalJson.GetMapSafe().size()) {
result.InsertValue("p", std::move(internalJson));
}
return DoExecute(resources, context);
return result;
}

std::optional<TFetchingInfo> IResourceProcessor::BuildFetchTask(
const ui32 columnId, const NAccessor::IChunkedArray::EType /*arrType*/, const std::shared_ptr<TAccessorsCollection>& resources) const {
auto acc = resources->GetAccessorOptional(columnId);
if (!acc) {
return TFetchingInfo::BuildFullRestore(false);
}
return NAccessor::TCompositeChunkedArray::VisitDataOwners<TFetchingInfo>(acc, [](const std::shared_ptr<NAccessor::IChunkedArray>& arr) {
if (arr->GetType() == NAccessor::IChunkedArray::EType::SubColumnsPartialArray) {
return std::optional<TFetchingInfo>(TFetchingInfo::BuildFullRestore(true));
} else {
return std::optional<TFetchingInfo>();
}
});
TConclusion<IResourceProcessor::EExecutionResult> IResourceProcessor::Execute(const TProcessorContext& context, const TExecutionNodeContext& nodeContext) const {
return DoExecute(context, nodeContext);
}

NJson::TJsonValue TResourceProcessorStep::DebugJson() const {
Expand Down
113 changes: 82 additions & 31 deletions ydb/core/formats/arrow/program/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ namespace NKikimr::NArrow::NSSA {
using IChunkedArray = NAccessor::IChunkedArray;
using TAccessorsCollection = NAccessor::TAccessorsCollection;

class TExecutionNodeContext {
private:
YDB_ACCESSOR_DEF(THashSet<ui32>, RemoveResourceIds);

public:
};

class TColumnInfo {
private:
bool GeneratedFlag = false;
Expand Down Expand Up @@ -165,7 +172,11 @@ enum class EProcessorType {
Projection,
Filter,
Aggregation,
Original
FetchOriginalData,
AssembleOriginalData,
FetchIndexData,
CheckIndexData,
StreamLogic
};

class TFetchingInfo {
Expand All @@ -188,38 +199,21 @@ class TFetchingInfo {
}
};

class TProcessorContext {
protected:
std::vector<TColumnChainInfo> ColumnsToFetch;
std::vector<TColumnChainInfo> OriginalColumnsToUse;
std::vector<TColumnChainInfo> ColumnsToDrop;
class TProcessorContext;

class IResourceProcessor {
public:
const std::vector<TColumnChainInfo>& GetColumnsToFetch() const {
return ColumnsToFetch;
}
const std::vector<TColumnChainInfo>& GetOriginalColumnsToUse() const {
return OriginalColumnsToUse;
}
const std::vector<TColumnChainInfo>& GetColumnsToDrop() const {
return ColumnsToDrop;
}

TProcessorContext(
std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse, std::vector<TColumnChainInfo>&& toDrop)
: ColumnsToFetch(std::move(toFetch))
, OriginalColumnsToUse(std::move(originalToUse))
, ColumnsToDrop(std::move(toDrop)) {
}
};
enum class EExecutionResult {
Success,
InBackground
};

class IResourceProcessor {
private:
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Input);
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Output);
YDB_READONLY(EProcessorType, ProcessorType, EProcessorType::Unknown);

virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const = 0;
virtual TConclusion<EExecutionResult> DoExecute(const TProcessorContext& context, const TExecutionNodeContext& nodeContext) const = 0;

virtual NJson::TJsonValue DoDebugJson() const {
return NJson::JSON_MAP;
Expand All @@ -233,15 +227,57 @@ class IResourceProcessor {
return DoGetWeight();
}

virtual std::optional<TFetchingInfo> BuildFetchTask(
const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;

virtual bool IsAggregation() const = 0;

virtual ~IResourceProcessor() = default;

NJson::TJsonValue DebugJson() const;

void ExchangeInput(const ui32 resourceIdFrom, const ui32 resourceIdTo) {
bool found = false;
for (auto&& i : Input) {
AFL_VERIFY(i.GetColumnId() != resourceIdTo);
if (i.GetColumnId() == resourceIdFrom) {
AFL_VERIFY(!found);
found = true;
i = TColumnChainInfo(resourceIdTo);
}
}
AFL_VERIFY(found);
}

void AddInput(const ui32 resourceId) {
for (auto&& i : Input) {
AFL_VERIFY(i.GetColumnId() != resourceId);
}
Input.emplace_back(TColumnChainInfo(resourceId));
}

void RemoveInput(const ui32 resourceId) {
for (ui32 idx = 0; idx < Input.size(); ++idx) {
if (Input[idx].GetColumnId() == resourceId) {
std::swap(Input[idx], Input.back());
Input.pop_back();
return;
}
}
AFL_VERIFY(false);
}

bool HasInput(const ui32 resourceId) {
for (ui32 idx = 0; idx < Input.size(); ++idx) {
if (Input[idx].GetColumnId() == resourceId) {
return true;
}
}
return false;
}

void SetOutputResourceIdOnce(const ui32 resourceId) {
AFL_VERIFY(Output.size() == 1)("size", Output.size());
Output.front() = TColumnChainInfo(resourceId);
}

ui32 GetOutputColumnIdOnce() const {
AFL_VERIFY(Output.size() == 1)("size", Output.size());
return Output.front().GetColumnId();
Expand All @@ -258,20 +294,35 @@ class IResourceProcessor {
, ProcessorType(type) {
}

[[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const;
[[nodiscard]] TConclusion<EExecutionResult> Execute(const TProcessorContext& context, const TExecutionNodeContext& nodeContext) const;
};

class TResourceProcessorStep: public TProcessorContext {
class TResourceProcessorStep {
private:
using TBase = TProcessorContext;
std::vector<TColumnChainInfo> ColumnsToFetch;
std::vector<TColumnChainInfo> OriginalColumnsToUse;
std::vector<TColumnChainInfo> ColumnsToDrop;
YDB_READONLY_DEF(std::shared_ptr<IResourceProcessor>, Processor);

public:
const std::vector<TColumnChainInfo>& GetColumnsToFetch() const {
return ColumnsToFetch;
}
const std::vector<TColumnChainInfo>& GetOriginalColumnsToUse() const {
return OriginalColumnsToUse;
}
const std::vector<TColumnChainInfo>& GetColumnsToDrop() const {
return ColumnsToDrop;
}

NJson::TJsonValue DebugJson() const;

TResourceProcessorStep(std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse,
std::shared_ptr<IResourceProcessor>&& processor, std::vector<TColumnChainInfo>&& toDrop)
: TBase(std::move(toFetch), std::move(originalToUse), std::move(toDrop))
: ColumnsToFetch(std::move(toFetch))
, OriginalColumnsToUse(std::move(originalToUse))
, ColumnsToDrop(std::move(toDrop))
, Processor(std::move(processor)) {
AFL_VERIFY(Processor);
}
Expand Down
Loading

0 comments on commit bbb3b08

Please sign in to comment.