From d4fd5e2ae13fd6199fc4ffabee9e70d0ccdf644d Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Sat, 16 Dec 2023 11:27:12 -0800 Subject: [PATCH] [Spark] Support write partition columns to data files for Delta This support write partition columns to data files for Delta. The feature is required by Uniform Iceberg as Iceberg spec defines so. The approach is to copy FileFormatWriter from Spark as a DeltaFileFormatWriter, and add the option and logic there to support writing partition columns. Closes delta-io/delta#2367 GitOrigin-RevId: 77657bb422ce93b924f3cb25548e845477f8632f --- .../apache/spark/sql/delta/DeltaOptions.scala | 7 +- .../delta/files/DeltaFileFormatWriter.scala | 479 ++++++++++++++++++ .../sql/delta/files/TransactionalWrite.scala | 8 +- 3 files changed, 490 insertions(+), 4 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala index 21180743a56..dd01fc0ba2f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala @@ -289,6 +289,10 @@ object DeltaOptions extends DeltaLogging { */ val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId" + /** + * An option to control if delta will write partition columns to data files + */ + val WRITE_PARTITION_COLUMNS = "writePartitionColumns" val validOptionKeys : Set[String] = Set( REPLACE_WHERE_OPTION, @@ -323,7 +327,8 @@ object DeltaOptions extends DeltaLogging { "checkpointLocation", "path", VERSION_AS_OF, - TIMESTAMP_AS_OF + TIMESTAMP_AS_OF, + WRITE_PARTITION_COLUMNS ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala new file mode 100644 index 00000000000..2a73b8b852a --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala @@ -0,0 +1,479 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.files + +import java.util.{Date, UUID} + +import org.apache.spark.sql.delta.DeltaOptions +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} +import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.connector.write.WriterCommitMessage +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.FileFormatWriter._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{SerializableConfiguration, Utils} + +/** + * A helper object for writing FileFormat data out to a location. + * Logic is copied from FileFormatWriter from Spark 3.5 with added functionality to write partition + * values to data files. Specifically L123-126, L132, and L140 where it adds option + * WRITE_PARTITION_COLUMNS + */ +object DeltaFileFormatWriter extends Logging { + + /** + * A variable used in tests to check whether the output ordering of the query matches the + * required ordering of the write command. + */ + private var outputOrderingMatched: Boolean = false + + /** + * A variable used in tests to check the final executed plan. + */ + private var executedPlan: Option[SparkPlan] = None + + // scalastyle:off argcount + /** + * Basic work flow of this command is: + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + * 5. If the job is successfully committed, perform post-commit operations such as + * processing statistics. + * @return The set of all partition paths that were updated during this write job. + */ + def write( + sparkSession: SparkSession, + plan: SparkPlan, + fileFormat: FileFormat, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + hadoopConf: Configuration, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + statsTrackers: Seq[WriteJobStatsTracker], + options: Map[String, String], + numStaticPartitionCols: Int = 0): Set[String] = { + require(partitionColumns.size >= numStaticPartitionCols) + + val job = Job.getInstance(hadoopConf) + job.setOutputKeyClass(classOf[Void]) + job.setOutputValueClass(classOf[InternalRow]) + FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + + val partitionSet = AttributeSet(partitionColumns) + // cleanup the internal metadata information of + // the file source metadata attribute if any before write out + val finalOutputSpec = outputSpec.copy( + outputColumns = outputSpec.outputColumns + .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation) + ) + val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) + + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) + + val caseInsensitiveOptions = CaseInsensitiveMap(options) + + val dataSchema = dataColumns.toStructType + DataSourceUtils.verifySchema(fileFormat, dataSchema) + DataSourceUtils.checkFieldNames(fileFormat, dataSchema) + // Note: prepareWrite has side effect. It sets "job". + + val outputDataColumns = + if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { + dataColumns ++ partitionColumns + } else dataColumns + + val outputWriterFactory = + fileFormat.prepareWrite( + sparkSession, + job, + caseInsensitiveOptions, + outputDataColumns.toStructType + ) + + val description = new WriteJobDescription( + uuid = UUID.randomUUID.toString, + serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), + outputWriterFactory = outputWriterFactory, + allColumns = finalOutputSpec.outputColumns, + dataColumns = outputDataColumns, + partitionColumns = partitionColumns, + bucketSpec = writerBucketSpec, + path = finalOutputSpec.outputPath, + customPartitionLocations = finalOutputSpec.customPartitionLocations, + maxRecordsPerFile = caseInsensitiveOptions + .get("maxRecordsPerFile") + .map(_.toLong) + .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), + timeZoneId = caseInsensitiveOptions + .get(DateTimeUtils.TIMEZONE_OPTION) + .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), + statsTrackers = statsTrackers + ) + + // We should first sort by dynamic partition columns, then bucket id, and finally sorting + // columns. + val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ + writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) + + // SPARK-40588: when planned writing is disabled and AQE is enabled, + // plan contains an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + // it is fine to use plan further down as the final plan is cached in that plan + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + + // the sort order doesn't matter + val actualOrdering = writeFilesOpt + .map(_.child) + .getOrElse(materializeAdaptiveSparkPlan(plan)) + .outputOrdering + val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) + + SQLExecution.checkSQLExecutionId(sparkSession) + + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) + + // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort + // operator based on the required ordering of the V1 write command. So the output + // ordering of the physical plan should always match the required ordering. Here + // we set the variable to verify this behavior in tests. + // There are two cases where FileFormatWriter still needs to add physical sort: + // 1) When the planned write config is disabled. + // 2) When the concurrent writers are enabled (in this case the required ordering of a + // V1 write command will be empty). + if (Utils.isTesting) outputOrderingMatched = orderingMatched + + if (writeFilesOpt.isDefined) { + // build `WriteFilesSpec` for `WriteFiles` + val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => { + val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec) + createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) + } + val writeSpec = WriteFilesSpec( + description = description, + committer = committer, + concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc + ) + executeWrite(sparkSession, plan, writeSpec, job) + } else { + executeWrite( + sparkSession, + plan, + job, + description, + committer, + outputSpec, + requiredOrdering, + partitionColumns, + sortColumns, + orderingMatched + ) + } + } + // scalastyle:on argcount + + private def executeWrite( + sparkSession: SparkSession, + plan: SparkPlan, + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + requiredOrdering: Seq[Expression], + partitionColumns: Seq[Attribute], + sortColumns: Seq[Attribute], + orderingMatched: Boolean): Set[String] = { + val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns) + val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan + + writeAndCommit(job, description, committer) { + val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { + (empty2NullPlan, None) + } else { + val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) + val concurrentOutputWriterSpec = + createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) + if (concurrentOutputWriterSpec.isDefined) { + (empty2NullPlan, concurrentOutputWriterSpec) + } else { + (sortPlan, concurrentOutputWriterSpec) + } + } + + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planToExecute) + + val rdd = planToExecute.execute() + + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single + // partition rdd to make sure we at least set up one write task to write the metadata. + val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { + sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) + } else { + rdd + } + + val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) + val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) + sparkSession.sparkContext.runJob( + rddWithNonEmptyPartitions, + (taskContext: TaskContext, iter: Iterator[InternalRow]) => { + executeTask( + description = description, + jobTrackerID = jobTrackerID, + sparkStageId = taskContext.stageId(), + sparkPartitionId = taskContext.partitionId(), + sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, + committer, + iterator = iter, + concurrentOutputWriterSpec = concurrentOutputWriterSpec + ) + }, + rddWithNonEmptyPartitions.partitions.indices, + (index, res: WriteTaskResult) => { + committer.onTaskCommit(res.commitMsg) + ret(index) = res + } + ) + ret + } + } + + private def writeAndCommit( + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol)(f: => Array[WriteTaskResult]): Set[String] = { + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + committer.setupJob(job) + try { + val ret = f + val commitMsgs = ret.map(_.commitMsg) + + logInfo(s"Start to commit write Job ${description.uuid}.") + val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } + logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") + + processStats(description.statsTrackers, ret.map(_.summary.stats), duration) + logInfo(s"Finished processing stats for write job ${description.uuid}.") + + // return a set of all the partition paths that were updated during this job + ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) + } catch { + case cause: Throwable => + logError(s"Aborting job ${description.uuid}.", cause) + committer.abortJob(job) + throw cause + } + } + + /** + * Write files using [[SparkPlan.executeWrite]] + */ + private def executeWrite( + session: SparkSession, + planForWrites: SparkPlan, + writeFilesSpec: WriteFilesSpec, + job: Job): Set[String] = { + val committer = writeFilesSpec.committer + val description = writeFilesSpec.description + + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planForWrites) + + writeAndCommit(job, description, committer) { + val rdd = planForWrites.executeWrite(writeFilesSpec) + val ret = new Array[WriteTaskResult](rdd.partitions.length) + session.sparkContext.runJob( + rdd, + (context: TaskContext, iter: Iterator[WriterCommitMessage]) => { + assert(iter.hasNext) + val commitMessage = iter.next() + assert(!iter.hasNext) + commitMessage + }, + rdd.partitions.indices, + (index, res: WriterCommitMessage) => { + assert(res.isInstanceOf[WriteTaskResult]) + val writeTaskResult = res.asInstanceOf[WriteTaskResult] + committer.onTaskCommit(writeTaskResult.commitMsg) + ret(index) = writeTaskResult + } + ) + ret + } + } + + private def createSortPlan( + plan: SparkPlan, + requiredOrdering: Seq[Expression], + outputSpec: OutputSpec): SortExec = { + // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and + // the physical plan may have different attribute ids due to optimizer removing some + // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. + val orderingExpr = + bindReferences(requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) + SortExec(orderingExpr, global = false, child = plan) + } + + private def createConcurrentOutputWriterSpec( + sparkSession: SparkSession, + sortPlan: SortExec, + sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = { + val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters + val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty + if (concurrentWritersEnabled) { + Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())) + } else { + None + } + } + + /** Writes data out in a single Spark task. */ + private def executeTask( + description: WriteJobDescription, + jobTrackerID: String, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[InternalRow], + concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { + + val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) + val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) + val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) + + // Set up the attempt context required to use in the output committer. + val taskAttemptContext: TaskAttemptContext = { + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + hadoopConf.set("mapreduce.job.id", jobId.toString) + hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapreduce.task.ismap", true) + hadoopConf.setInt("mapreduce.task.partition", 0) + + new TaskAttemptContextImpl(hadoopConf, taskAttemptId) + } + + committer.setupTask(taskAttemptContext) + + val dataWriter = + if (sparkPartitionId != 0 && !iterator.hasNext) { + // In case of empty job, leave first partition to save meta for file format like parquet. + new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else { + concurrentOutputWriterSpec match { + case Some(spec) => + new DynamicPartitionDataConcurrentWriter( + description, + taskAttemptContext, + committer, + spec + ) + case _ => + new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + } + } + + try { + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + // Execute the task to write rows out and commit the task. + dataWriter.writeWithIterator(iterator) + dataWriter.commit() + })(catchBlock = { + // If there is an error, abort the task + dataWriter.abort() + logError(s"Job $jobId aborted.") + }, finallyBlock = { + dataWriter.close() + }) + } catch { + case e: FetchFailedException => + throw e + case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => + // If any output file to write already exists, it does not make sense to re-run this task. + // We throw the exception and let Executor throw ExceptionFailure to abort the job. + throw new TaskOutputFileAlreadyExistException(f) + case t: Throwable => + throw QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t) + } + } + + /** + * For every registered [[WriteJobStatsTracker]], call `processStats()` on it, passing it + * the corresponding [[WriteTaskStats]] from all executors. + */ + private def processStats( + statsTrackers: Seq[WriteJobStatsTracker], + statsPerTask: Seq[Seq[WriteTaskStats]], + jobCommitDuration: Long): Unit = { + + val numStatsTrackers = statsTrackers.length + assert( + statsPerTask.forall(_.length == numStatsTrackers), + s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker. + |There are $numStatsTrackers statsTrackers, but some task returned + |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead. + """.stripMargin + ) + + val statsPerTracker = if (statsPerTask.nonEmpty) { + statsPerTask.transpose + } else { + statsTrackers.map(_ => Seq.empty) + } + + statsTrackers.zip(statsPerTracker).foreach { + case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration) + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala index 5a5c6b5b68b..33506412c42 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala @@ -415,19 +415,21 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl statsTrackers.append(basicWriteJobStatsTracker) } + // Iceberg spec requires partition columns in data files + val writePartitionColumns = IcebergCompatV1.isEnabled(metadata) // Retain only a minimal selection of Spark writer options to avoid any potential // compatibility issues - val options = writeOptions match { + val options = (writeOptions match { case None => Map.empty[String, String] case Some(writeOptions) => writeOptions.options.filterKeys { key => key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || key.equalsIgnoreCase(DeltaOptions.COMPRESSION) }.toMap - } + }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString) try { - FileFormatWriter.write( + DeltaFileFormatWriter.write( sparkSession = spark, plan = physicalPlan, fileFormat = deltaLog.fileFormat(protocol, metadata), // TODO support changing formats.