Skip to content

Commit

Permalink
Introduce ClusteringTableFeature and CLUSTERED_BY file tag
Browse files Browse the repository at this point in the history
#1874 requests Liquid clustering, and this PR starts the first step to introduce ClusteringTableFeature and CLUSTERED_BY tags.

When creating a clustered table, The feature clustering must exist in the table protocol's writerFeatures.

When a clustering implementation clusters files, writers must incorporate a tag with CLUSTERED_BY as the key and the name of the clustering implementation as the corresponding value in add action.

More detail can be found in the Delta protocol change PR #2264

The next step is to pave the way to integrate the table feature and clusterby tags when defining and clustering a clustered table.
Closes #2281

GitOrigin-RevId: e210b491a324a0794ec9f3a9236bb1932a6677e3
  • Loading branch information
dabao521 authored and allisonport-db committed Nov 16, 2023
1 parent eafb36c commit 8b768b6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ object TableFeature {
AppendOnlyTableFeature,
ChangeDataFeedTableFeature,
CheckConstraintsTableFeature,
ClusteringTableFeature,
DomainMetadataTableFeature,
GeneratedColumnsTableFeature,
InvariantsTableFeature,
Expand Down Expand Up @@ -520,6 +521,13 @@ object IcebergCompatV1TableFeature extends WriterFeature(name = "icebergCompatV1
override def requiredFeatures: Set[TableFeature] = Set(ColumnMappingTableFeature)
}

/**
* Clustering table feature is enabled when a table is created with CLUSTER BY clause.
*/
object ClusteringTableFeature extends WriterFeature("clustering") {
override val requiredFeatures: Set[TableFeature] = Set(DomainMetadataTableFeature)
}


/**
* V2 Checkpoint table feature is for checkpoints with sidecars and the new format and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,14 @@ object AddFile {
/** [[OPTIMIZE_TARGET_SIZE]]: target file size the file was optimized to. */
object OPTIMIZE_TARGET_SIZE extends AddFile.Tags.KeyType("OPTIMIZE_TARGET_SIZE")

/**
* [[CLUSTERED_BY]]: the name of the clustering implementation.
*
* A clustering implementation should only cluster files that belong to the implementation
* or files that do not have the [[CLUSTERED_BY]] tag (i.e., unclustered).
*/
object CLUSTERED_BY extends AddFile.Tags.KeyType("CLUSTERED_BY")

}

/** Convert a [[Tags.KeyType]] to a string to be used in the AddMap.tags Map[String, String]. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.skipping.clustering

import org.apache.spark.sql.delta.{ClusteringTableFeature, DeltaConfigs}
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.internal.SQLConf

/**
* Clustered table utility functions.
*/
trait ClusteredTableUtilsBase extends DeltaLogging {
/**
* Returns whether the protocol version supports the Liquid table feature.
*/
def isSupported(protocol: Protocol): Boolean = protocol.isFeatureSupported(ClusteringTableFeature)

/** Returns true to enable clustering table and currently it is only enabled for testing.
*
* Note this function is going to be removed when clustering table is fully developed.
*/
def exposeClusteringTableForTesting: Boolean =
SQLConf.get.getConf(DeltaSQLConf.EXPOSE_CLUSTERING_TABLE_FOR_TESTING)
}

object ClusteredTableUtils extends ClusteredTableUtilsBase
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* [[SQLConf]] entries for Delta features.
Expand Down Expand Up @@ -1387,6 +1388,26 @@ trait DeltaSQLConfBase {
.doc("Bin size for the adaptive shuffle in optimized writes in megabytes.")
.bytesConf(ByteUnit.MiB)
.createWithDefault(512)

//////////////////
// Clustered Table
//////////////////

// This is temporary conf to make sure clustering table is not used by anyone other than devs as
// the feature is not fully ready.
val EXPOSE_CLUSTERING_TABLE_FOR_TESTING =
buildConf("clusteringTable.exposeClusteringTableForTesting")
.internal()
.doc(
"""
|This conf controls whether clustering table is exposed or not. Note that
| clustering table is in development and this config should be used only for
| testing/benchmarking.
|""".stripMargin)
.booleanConf
.checkValue(v => !v || Utils.isTesting,
"Exposing clustering table is only allowed in testing.")
.createWithDefault(false)
}

object DeltaSQLConf extends DeltaSQLConfBase

0 comments on commit 8b768b6

Please sign in to comment.