From 934c7d288b3b849cca4f5daec4d0df7eede09255 Mon Sep 17 00:00:00 2001 From: Zeyu Chen Date: Tue, 4 Mar 2025 13:46:16 -0800 Subject: [PATCH 1/3] SPARK-51097 Split SparkPlan metrics and instance metrics --- .../org/apache/spark/sql/execution/SparkPlan.scala | 11 +++++++++++ .../sql/execution/streaming/statefulOperators.scala | 13 ++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index d9bb057282dff..07cc019a4983d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -140,11 +140,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def metrics: Map[String, SQLMetric] = Map.empty + /** + * @return All instance metrics of this SparkPlan. + */ + def instanceMetrics: Map[String, SQLMetric] = Map.empty + /** * Resets all the metrics. */ def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) + instanceMetrics.valuesIterator.foreach(_.reset()) children.foreach(_.resetMetrics()) } @@ -153,6 +159,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def longMetric(name: String): SQLMetric = metrics(name) + /** + * @return The instance-specific [[SQLMetric]] for the given `name`. + */ + def longInstanceMetric(name: String): SQLMetric = instanceMetrics(name) + // TODO: Move to `DistributedPlan` /** * Specifies how data is partitioned across different nodes in the cluster. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index af47229dfa88c..d6da67eddd0b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -216,7 +216,9 @@ trait StateStoreWriter "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext, "number of state store instances") - ) ++ stateStoreCustomMetrics ++ pythonMetrics ++ stateStoreInstanceMetrics + ) ++ stateStoreCustomMetrics ++ pythonMetrics + + override lazy val instanceMetrics = stateStoreInstanceMetrics val stateStoreNames: Seq[String] = Seq(StateStoreId.DEFAULT_STORE_NAME) @@ -332,7 +334,7 @@ trait StateStoreWriter case (name, metricConfig) => // Keep instance metrics that are updated or aren't marked to be ignored, // as their initial value could still be important. - !metricConfig.ignoreIfUnchanged || !longMetric(name).isZero + !metricConfig.ignoreIfUnchanged || !longInstanceMetric(name).isZero } .groupBy { // Group all instance metrics underneath their common metric prefix @@ -347,8 +349,8 @@ trait StateStoreWriter metrics .map { case (_, metric) => - metric.name -> (if (longMetric(metric.name).isZero) metricConf.initValue - else longMetric(metric.name).value) + metric.name -> (if (longInstanceMetric(metric.name).isZero) metricConf.initValue + else longInstanceMetric(metric.name).value) } .toSeq .sortBy(_._2)(metricConf.ordering) @@ -439,7 +441,8 @@ trait StateStoreWriter case (metric, value) => val metricConfig = instanceMetricConfiguration(metric.name) // Update the metric's value based on the defined combine method - longMetric(metric.name).set(metricConfig.combine(longMetric(metric.name), value)) + longInstanceMetric(metric.name) + .set(metricConfig.combine(longInstanceMetric(metric.name), value)) } } From ef4aedf90102eff8a2c12eb63882044c6c0a02c0 Mon Sep 17 00:00:00 2001 From: Zeyu Chen Date: Wed, 5 Mar 2025 10:56:22 -0800 Subject: [PATCH 2/3] SPARK-51097 Lazy initialize instance metrics alongside custom metrics --- .../streaming/statefulOperators.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index d6da67eddd0b5..e9c4757f55687 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -203,20 +203,26 @@ trait StateStoreWriter def operatorStateMetadataVersion: Int = 1 - override lazy val metrics = statefulOperatorCustomMetrics ++ Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext, - "number of rows which are dropped by watermark"), - "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), - "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), - "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), - "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of removed state rows"), - "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), - "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), - "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), - "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext, - "number of state store instances") - ) ++ stateStoreCustomMetrics ++ pythonMetrics + override lazy val metrics = { + // Lazy initialize instance metrics + instanceMetrics + statefulOperatorCustomMetrics ++ Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numRowsDroppedByWatermark" -> SQLMetrics + .createMetric(sparkContext, "number of rows which are dropped by watermark"), + "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), + "numUpdatedStateRows" -> SQLMetrics + .createMetric(sparkContext, "number of updated state rows"), + "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), + "numRemovedStateRows" -> SQLMetrics + .createMetric(sparkContext, "number of removed state rows"), + "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), + "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), + "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), + "numStateStoreInstances" -> SQLMetrics + .createMetric(sparkContext, "number of state store instances") + ) ++ stateStoreCustomMetrics ++ pythonMetrics + } override lazy val instanceMetrics = stateStoreInstanceMetrics From efd8dac9dc590955d9de2620b3487ded8568906d Mon Sep 17 00:00:00 2001 From: Zeyu Chen Date: Wed, 5 Mar 2025 12:58:02 -0800 Subject: [PATCH 3/3] SPARK-51097 Reduce scope of instance metrics --- .../spark/sql/execution/SparkPlan.scala | 11 ---------- .../streaming/statefulOperators.scala | 22 +++++++++++-------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 07cc019a4983d..d9bb057282dff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -140,17 +140,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def metrics: Map[String, SQLMetric] = Map.empty - /** - * @return All instance metrics of this SparkPlan. - */ - def instanceMetrics: Map[String, SQLMetric] = Map.empty - /** * Resets all the metrics. */ def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) - instanceMetrics.valuesIterator.foreach(_.reset()) children.foreach(_.resetMetrics()) } @@ -159,11 +153,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def longMetric(name: String): SQLMetric = metrics(name) - /** - * @return The instance-specific [[SQLMetric]] for the given `name`. - */ - def longInstanceMetric(name: String): SQLMetric = instanceMetrics(name) - // TODO: Move to `DistributedPlan` /** * Specifies how data is partitioned across different nodes in the cluster. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index e9c4757f55687..102de83cd37f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -204,7 +204,7 @@ trait StateStoreWriter def operatorStateMetadataVersion: Int = 1 override lazy val metrics = { - // Lazy initialize instance metrics + // Lazy initialize instance metrics, but do not include these with regular metrics instanceMetrics statefulOperatorCustomMetrics ++ Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), @@ -224,7 +224,12 @@ trait StateStoreWriter ) ++ stateStoreCustomMetrics ++ pythonMetrics } - override lazy val instanceMetrics = stateStoreInstanceMetrics + lazy val instanceMetrics = stateStoreInstanceMetrics + + override def resetMetrics(): Unit = { + super.resetMetrics() + instanceMetrics.valuesIterator.foreach(_.reset()) + } val stateStoreNames: Seq[String] = Seq(StateStoreId.DEFAULT_STORE_NAME) @@ -340,7 +345,7 @@ trait StateStoreWriter case (name, metricConfig) => // Keep instance metrics that are updated or aren't marked to be ignored, // as their initial value could still be important. - !metricConfig.ignoreIfUnchanged || !longInstanceMetric(name).isZero + !metricConfig.ignoreIfUnchanged || !instanceMetrics(name).isZero } .groupBy { // Group all instance metrics underneath their common metric prefix @@ -355,8 +360,8 @@ trait StateStoreWriter metrics .map { case (_, metric) => - metric.name -> (if (longInstanceMetric(metric.name).isZero) metricConf.initValue - else longInstanceMetric(metric.name).value) + metric.name -> (if (instanceMetrics(metric.name).isZero) metricConf.initValue + else instanceMetrics(metric.name).value) } .toSeq .sortBy(_._2)(metricConf.ordering) @@ -442,13 +447,12 @@ trait StateStoreWriter } protected def setStoreInstanceMetrics( - instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = { - instanceMetrics.foreach { + newInstanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = { + newInstanceMetrics.foreach { case (metric, value) => val metricConfig = instanceMetricConfiguration(metric.name) // Update the metric's value based on the defined combine method - longInstanceMetric(metric.name) - .set(metricConfig.combine(longInstanceMetric(metric.name), value)) + instanceMetrics(metric.name).set(metricConfig.combine(instanceMetrics(metric.name), value)) } }