diff --git a/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala b/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala index b0dd2fdda..ee3c9a532 100644 --- a/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala +++ b/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala @@ -23,3 +23,11 @@ import org.apache.spark.scheduler.SparkListenerEvent sealed trait AuronEvent extends SparkListenerEvent {} case class AuronBuildInfoEvent(info: mutable.LinkedHashMap[String, String]) extends AuronEvent {} + +case class AuronPlanFallbackEvent( + executionId: Long, + numAuronNodes: Int, + numFallbackNodes: Int, + physicalPlanDescription: String, + fallbackNodeToReason: Map[String, String]) + extends AuronEvent {} diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala index c237557ff..41b26eab7 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala @@ -16,12 +16,18 @@ */ package org.apache.spark.sql.execution.ui +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, NodeSeq} +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.mutable +import scala.xml.{Node, NodeSeq, Unparsed} import org.apache.spark.internal.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} +import org.apache.spark.util.Utils private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage("") with Logging { @@ -29,6 +35,38 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage( override def render(request: HttpServletRequest): Seq[Node] = { val buildInfo = sqlStore.buildInfo() + val data = sqlStore.executionsList() + + val content = { + val _content = mutable.ListBuffer[Node]() + + val auronPageTable = + executionsTable(request, "auron", data) + + _content ++= + +

+ + + Queries: + {data.size} +

+
++ +
+ {auronPageTable} +
+ + _content + } + content ++= + + val infos = UIUtils.listingTable(propertyHeader, propertyRow, buildInfo.info, fixedWidth = true) val summary: NodeSeq = @@ -48,7 +86,7 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage(
- UIUtils.headerSparkPage(request, "Auron", summary, parent) + UIUtils.headerSparkPage(request, "Auron", summary ++ content, parent) } private def propertyHeader = Seq("Name", "Value") @@ -61,4 +99,297 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage( + private def executionsTable( + request: HttpServletRequest, + executionTag: String, + executionData: Seq[AuronSQLExecutionUIData]): Seq[Node] = { + + val executionPage = + Option(request.getParameter(s"$executionTag.page")).map(_.toInt).getOrElse(1) + + val tableHeaderId = executionTag + + try { + new AuronExecutionPagedTable( + request, + parent, + executionData, + tableHeaderId, + executionTag, + UIUtils.prependBaseUri(request, parent.basePath), + "auron").table(executionPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering execution table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } +} + +private[ui] class AuronExecutionPagedTable( + request: HttpServletRequest, + parent: AuronSQLTab, + data: Seq[AuronSQLExecutionUIData], + tableHeaderId: String, + executionTag: String, + basePath: String, + subPath: String) + extends PagedTable[AuronExecutionTableRowData] { + + private val (sortColumn, desc, pageSize) = getAuronTableParameters(request, executionTag, "ID") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + override val dataSource = new AuronExecutionDataSource(data, pageSize, sortColumn, desc) + + private val parameterPath = + s"$basePath/$subPath/?${getAuronParameterOtherTable(request, executionTag)}" + + override def tableId: String = s"$executionTag-table" + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$executionTag.sort=$encodedSortColumn" + + s"&$executionTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableHeaderId" + } + + /** + * Returns parameters of other tables in the page. + */ + def getAuronParameterOtherTable(request: HttpServletRequest, tableTag: String): String = { + request.getParameterMap.asScala + .filterNot(_._1.startsWith(tableTag)) + .map(parameter => parameter._1 + "=" + parameter._2(0)) + .mkString("&") + } + + /** + * Returns parameter of this table. + */ + def getAuronTableParameters( + request: HttpServletRequest, + tableTag: String, + defaultSortColumn: String): (String, Boolean, Int) = { + val parameterSortColumn = request.getParameter(s"$tableTag.sort") + val parameterSortDesc = request.getParameter(s"$tableTag.desc") + val parameterPageSize = request.getParameter(s"$tableTag.pageSize") + val sortColumn = Option(parameterSortColumn) + .map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + } + .getOrElse(defaultSortColumn) + val desc = + Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn == defaultSortColumn) + val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + + (sortColumn, desc, pageSize) + } + + override def pageSizeFormField: String = s"$executionTag.pageSize" + + override def pageNumberFormField: String = s"$executionTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId" + + // Information for each header: title, sortable, tooltip + private val headerInfo: Seq[(String, Boolean, Option[String])] = { + Seq( + ("ID", true, None), + ("Description", true, None), + ("Num Auron Nodes", true, None), + ("Num Fallback Nodes", true, None)) + } + + override def headers: Seq[Node] = { + isAuronSortColumnValid(headerInfo, sortColumn) + + headerAuronRow( + headerInfo, + desc, + pageSize, + sortColumn, + parameterPath, + executionTag, + tableHeaderId) + } + + def headerAuronRow( + headerInfo: Seq[(String, Boolean, Option[String])], + desc: Boolean, + pageSize: Int, + sortColumn: String, + parameterPath: String, + tableTag: String, + headerId: String): Seq[Node] = { + val row: Seq[Node] = { + headerInfo.map { case (header, sortable, tooltip) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.desc=${!desc}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + + {header} {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + + + + + {header} + + + + } else { + + + {header} + + + } + } + } + } + + + {row} + + + } + + def isAuronSortColumnValid( + headerInfo: Seq[(String, Boolean, Option[String])], + sortColumn: String): Unit = { + if (!headerInfo.filter(_._2).map(_._1).contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") + } + } + + override def row(executionTableRow: AuronExecutionTableRowData): Seq[Node] = { + val executionUIData = executionTableRow.executionUIData + + + + {executionUIData.executionId.toString} + + + {descriptionCell(executionUIData)} + + + {executionUIData.numAuronNodes.toString} + + + {executionUIData.numFallbackNodes.toString} + + + } + + private def descriptionCell(execution: AuronSQLExecutionUIData): Seq[Node] = { + val details = if (execution.description != null && execution.description.nonEmpty) { + val concat = new PlanStringConcat() + concat.append("== Fallback Summary ==\n") + val fallbackSummary = execution.fallbackNodeToReason + .map { case (name, reason) => + val id = name.substring(0, 3) + val nodeName = name.substring(4) + s"(${id.toInt}) $nodeName: $reason" + } + .mkString("\n") + concat.append(fallbackSummary) + if (execution.fallbackNodeToReason.isEmpty) { + concat.append("No fallback nodes") + } + concat.append("\n\n") + concat.append(execution.fallbackDescription) + + + +details + ++ + + } else { + Nil + } + + val desc = if (execution.description != null && execution.description.nonEmpty) { + + {execution.description} + } else { + {execution.executionId} + } + +
{desc}{details}
+ } + + private def executionURL(executionID: Long): String = + s"${UIUtils.prependBaseUri(request, parent.basePath)}/SQL/execution/?id=$executionID" +} + +private[ui] class AuronExecutionTableRowData(val executionUIData: AuronSQLExecutionUIData) + +private[ui] class AuronExecutionDataSource( + executionData: Seq[AuronSQLExecutionUIData], + pageSize: Int, + sortColumn: String, + desc: Boolean) + extends PagedDataSource[AuronExecutionTableRowData](pageSize) { + + // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show + // in the table so that we can avoid creating duplicate contents during sorting the data + private val data = executionData.map(executionRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[AuronExecutionTableRowData] = + data.slice(from, to) + + private def executionRow( + executionUIData: AuronSQLExecutionUIData): AuronExecutionTableRowData = { + new AuronExecutionTableRowData(executionUIData) + } + + /** Return Ordering according to sortColumn and desc. */ + private def ordering( + sortColumn: String, + desc: Boolean): Ordering[AuronExecutionTableRowData] = { + val ordering: Ordering[AuronExecutionTableRowData] = sortColumn match { + case "ID" => Ordering.by(_.executionUIData.executionId) + case "Description" => Ordering.by(_.executionUIData.fallbackDescription) + case "Num Auron Nodes" => Ordering.by(_.executionUIData.numAuronNodes) + case "Num Fallback Nodes" => Ordering.by(_.executionUIData.numFallbackNodes) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } } diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala index e4a359ceb..5fdc669e1 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext import org.apache.auron.spark.ui.AuronEvent object AuronEventUtils { + def post(sc: SparkContext, event: AuronEvent): Unit = { sc.listenerBus.post(event) } diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala index 0da16d4fd..b817ad903 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala @@ -16,17 +16,27 @@ */ package org.apache.spark.sql.execution.ui +import scala.collection.mutable + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS +import org.apache.spark.status.{ElementTrackingStore, KVUtils} -import org.apache.auron.spark.ui.AuronBuildInfoEvent +import org.apache.auron.spark.ui.{AuronBuildInfoEvent, AuronPlanFallbackEvent} class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore) extends SparkListener with Logging { + private val executionIdToDescription = new mutable.HashMap[Long, String] + private val executionIdToFallbackEvent = new mutable.HashMap[Long, AuronPlanFallbackEvent] + + kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get[Int](UI_RETAINED_EXECUTIONS)) { + count => cleanupExecutions(count) + } + def getAuronBuildInfo(): Long = { kvstore.count(classOf[AuronBuildInfoUIData]) } @@ -36,11 +46,61 @@ class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore) kvstore.write(uiData) } + private def onAuronPlanFallback(event: AuronPlanFallbackEvent): Unit = { + val description = executionIdToDescription.get(event.executionId) + if (description.isDefined) { + val uiData = new AuronSQLExecutionUIData( + event.executionId, + description.get, + event.numAuronNodes, + event.numFallbackNodes, + event.physicalPlanDescription, + event.fallbackNodeToReason.toSeq.sortBy(_._1)) + kvstore.write(uiData) + } else { + executionIdToFallbackEvent.put(event.executionId, event.copy()) + } + } + + private def onSQLExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + val fallbackEvent = executionIdToFallbackEvent.get(event.executionId) + if (fallbackEvent.isDefined) { + val uiData = new AuronSQLExecutionUIData( + fallbackEvent.get.executionId, + event.description, + fallbackEvent.get.numAuronNodes, + fallbackEvent.get.numFallbackNodes, + fallbackEvent.get.physicalPlanDescription, + fallbackEvent.get.fallbackNodeToReason.toSeq.sortBy(_._1)) + kvstore.write(uiData) + executionIdToFallbackEvent.remove(event.executionId) + } + executionIdToDescription.put(event.executionId, event.description) + } + + private def onSQLExtensionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + executionIdToDescription.remove(event.executionId) + executionIdToFallbackEvent.remove(event.executionId) + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onSQLExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onSQLExtensionEnd(e) case e: AuronBuildInfoEvent => onAuronBuildInfo(e) + case e: AuronPlanFallbackEvent => onAuronPlanFallback(e) case _ => // Ignore } + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) + if (countToDelete <= 0) { + return + } + + val view = kvstore.view(classOf[AuronSQLExecutionUIData]).first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true) + toDelete.foreach(e => kvstore.delete(e.getClass(), e.executionId)) + } } object AuronSQLAppStatusListener { def register(sc: SparkContext): Unit = { diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala index 3fc4beb69..fd4721505 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala @@ -16,17 +16,54 @@ */ package org.apache.spark.sql.execution.ui +import scala.collection.JavaConverters.asScalaIteratorConverter + import com.fasterxml.jackson.annotation.JsonIgnore -import org.apache.spark.util.kvstore.{KVIndex, KVStore} +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView} class AuronSQLAppStatusStore(store: KVStore) { + private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = { + Utils.tryWithResource(view.closeableIterator())(iter => iter.asScala.toList) + } + + def executionsList(): Seq[AuronSQLExecutionUIData] = { + viewToSeq(store.view(classOf[AuronSQLExecutionUIData])) + } + def buildInfo(): AuronBuildInfoUIData = { val kClass = classOf[AuronBuildInfoUIData] store.read(kClass, kClass.getName) } + + def executionsList(offset: Int, length: Int): Seq[AuronSQLExecutionUIData] = { + viewToSeq(store.view(classOf[AuronSQLExecutionUIData]).skip(offset).max(length)) + } + + def execution(executionId: Long): Option[AuronSQLExecutionUIData] = { + try { + Some(store.read(classOf[AuronSQLExecutionUIData], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def executionsCount(): Long = { + store.count(classOf[AuronSQLExecutionUIData]) + } } +@KVIndex("executionId") +class AuronSQLExecutionUIData( + @KVIndexParam val executionId: Long, + val description: String, + val numAuronNodes: Int, + val numFallbackNodes: Int, + val fallbackDescription: String, + val fallbackNodeToReason: Seq[(String, String)]) {} + class AuronBuildInfoUIData(val info: Seq[(String, String)]) { @JsonIgnore @KVIndex diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala index 833ea7193..95d68158a 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala @@ -16,8 +16,16 @@ */ package org.apache.spark.sql.auron +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.ExplainUtils +import org.apache.spark.sql.execution.auron.plan.NativeParquetScanExec import org.apache.spark.sql.execution.ui.AuronSQLAppStatusListener +import org.apache.auron.spark.ui.AuronPlanFallbackEvent + class BuildinfoInSparkUISuite extends org.apache.spark.sql.QueryTest with BuildInfoAuronSQLSuite diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala new file mode 100644 index 000000000..c01222c6b --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron + +import java.util.Collections.newSetFromMap + +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, BitSet} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.auron.AuronConvertStrategy.neverConvertReasonTag +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.{BaseSubqueryExec, InputAdapter, ReusedSubqueryExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.ExplainUtils.getOpId +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} + +import org.apache.auron.sparkver + +object AuronExplainUtils { + private def generateOperatorIDs( + plan: QueryPlan[_], + startOperatorID: Int, + visited: java.util.Set[QueryPlan[_]], + reusedExchanges: ArrayBuffer[ReusedExchangeExec], + addReusedExchanges: Boolean): Int = { + var currentOperationID = startOperatorID + if (plan.isInstanceOf[BaseSubqueryExec]) { + return currentOperationID + } + + def setOpId(plan: QueryPlan[_]): Unit = if (!visited.contains(plan)) { + plan match { + case r: ReusedExchangeExec if addReusedExchanges => + reusedExchanges.append(r) + case _ => + } + visited.add(plan) + currentOperationID += 1 + plan.setTagValue(TreeNodeTag[Int]("operatorId"), currentOperationID) + } + + plan.foreachUp { + case _: WholeStageCodegenExec => + case _: InputAdapter => + case p: AdaptiveSparkPlanExec => + currentOperationID = generateOperatorIDs( + p.executedPlan, + currentOperationID, + visited, + reusedExchanges, + addReusedExchanges) + setOpId(p) + case p: QueryStageExec => + currentOperationID = generateOperatorIDs( + p.plan, + currentOperationID, + visited, + reusedExchanges, + addReusedExchanges) + setOpId(p) + case other: QueryPlan[_] => + setOpId(other) + currentOperationID = other.innerChildren.foldLeft(currentOperationID) { (curId, plan) => + generateOperatorIDs(plan, curId, visited, reusedExchanges, addReusedExchanges) + } + } + currentOperationID + } + + private def getSubqueries( + plan: => QueryPlan[_], + subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { + plan.foreach { + case a: AdaptiveSparkPlanExec => + getSubqueries(a.executedPlan, subqueries) + case q: QueryStageExec => + getSubqueries(q.plan, subqueries) + case p: SparkPlan => + p.expressions.foreach(_.collect { case e: PlanExpression[_] => + e.plan match { + case s: BaseSubqueryExec => + subqueries += ((p, e, s)) + getSubqueries(s, subqueries) + case _ => + } + }) + } + } + + private def processPlanSkippingSubqueries[T <: QueryPlan[T]]( + plan: T, + append: String => Unit, + collectedOperators: BitSet): Unit = { + try { + + QueryPlan.append(plan, append, verbose = false, addSuffix = false, printOperatorId = true) + + append("\n") + } catch { + case e: AnalysisException => append(e.toString) + } + } + + private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String, String]) = { + var numAuronNodes = 0 + val fallbackNodeToReason = new mutable.HashMap[String, String] + + def collect(tmp: QueryPlan[_]): Unit = { + tmp.foreachUp { + case p: ExecutedCommandExec => + handleVanillaSparkPlan(p, fallbackNodeToReason) + case p: AdaptiveSparkPlanExec => + handleVanillaSparkPlan(p, fallbackNodeToReason) + collect(p.executedPlan) + case p: QueryStageExec => + handleVanillaSparkPlan(p, fallbackNodeToReason) + collect(p.plan) + case p: NativeSupports => + numAuronNodes += 1 + p.innerChildren.foreach(collect) + case p: SparkPlan => + handleVanillaSparkPlan(p, fallbackNodeToReason) + p.innerChildren.foreach(collect) + case _ => + } + } + + collect(plan) + (numAuronNodes, fallbackNodeToReason.toMap) + } + + def handleVanillaSparkPlan( + p: SparkPlan, + fallbackNodeToReason: mutable.HashMap[String, String]): Unit = { + if (p.getTagValue(neverConvertReasonTag).isDefined) { + addFallbackNodeWithReason(p, p.getTagValue(neverConvertReasonTag).get, fallbackNodeToReason) + } + } + + def addFallbackNodeWithReason( + p: SparkPlan, + reason: String, + fallbackNodeToReason: mutable.HashMap[String, String]): Unit = { + p.getTagValue(TreeNodeTag[Int]("operatorId")).foreach { opId => + // e.g., 002 project, it is used to help analysis by `substring(4)` + val formattedNodeName = f"$opId%03d ${p.nodeName}" + fallbackNodeToReason.put(formattedNodeName, reason) + } + } + + def processPlan[T <: QueryPlan[T]]( + plan: T, + append: String => Unit, + collectFallbackFunc: Option[QueryPlan[_] => (Int, Map[String, String])] = None) + : (Int, Map[String, String]) = synchronized { + try { + val operators = newSetFromMap[QueryPlan[_]](new java.util.IdentityHashMap()) + val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec] + + var currentOperatorID = 0 + currentOperatorID = + generateOperatorIDs(plan, currentOperatorID, operators, reusedExchanges, true) + + val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] + getSubqueries(plan, subqueries) + + currentOperatorID = subqueries.foldLeft(currentOperatorID) { (curId, plan) => + generateOperatorIDs(plan._3.child, curId, operators, reusedExchanges, true) + } + + val optimizedOutExchanges = ArrayBuffer.empty[Exchange] + reusedExchanges.foreach { reused => + val child = reused.child + if (!operators.contains(child)) { + optimizedOutExchanges.append(child) + currentOperatorID = + generateOperatorIDs(child, currentOperatorID, operators, reusedExchanges, false) + } + } + + val collectedOperators = mutable.BitSet.empty + processPlanSkippingSubqueries(plan, append, collectedOperators) + + var i = 0 + for (sub <- subqueries) { + if (i == 0) { + append("\n===== Subqueries =====\n\n") + } + i = i + 1 + append( + s"Subquery:$i Hosting operator id = " + + s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n") + + if (!sub._3.isInstanceOf[ReusedSubqueryExec]) { + processPlanSkippingSubqueries(sub._3.child, append, collectedOperators) + } + append("\n") + } + + i = 0 + optimizedOutExchanges.foreach { exchange => + if (i == 0) { + append("\n===== Adaptively Optimized Out Exchanges =====\n\n") + } + i = i + 1 + append(s"Subplan:$i\n") + processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators) + append("\n") + } + + (subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ plan) + .map { plan => + if (collectFallbackFunc.isEmpty) { + collectFallbackNodes(plan) + } else { + collectFallbackFunc.get.apply(plan) + } + } + .reduce((a, b) => (a._1 + b._1, a._2 ++ b._2)) + } finally { + removeTags(plan) + } + } + + @sparkver("3.1/ 3.2 / 3.3/ 3.4/ 3.5") + private def removeTags(plan: QueryPlan[_]): Unit = { + def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + p.unsetTagValue(TreeNodeTag[Int]("operatorId")) + children.foreach(removeTags) + } + + plan.foreach { + case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan)) + case p: QueryStageExec => remove(p, Seq(p.plan)) + case plan: QueryPlan[_] => remove(plan, plan.innerChildren) + } + } + + @sparkver("3.0") + private def removeTags(plan: QueryPlan[_]): Unit = { + def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + p.unsetTagValue(TreeNodeTag[Int]("operatorId")) + children.foreach(removeTags) + } + + plan.foreach { + case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, p.initialPlan)) + case p: QueryStageExec => remove(p, Seq(p.plan)) + case plan: QueryPlan[_] => remove(plan, plan.innerChildren) + } + } +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index 2e46a88f0..5253f8308 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -22,11 +22,13 @@ import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.ColumnarRule -import org.apache.spark.sql.execution.LocalTableScanExec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat +import org.apache.spark.sql.execution.{ColumnarRule, LocalTableScanExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.ui.AuronEventUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.auron.spark.ui.AuronPlanFallbackEvent + class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with Logging { Shims.get.initExtension() @@ -94,4 +96,33 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu } } } + + override def postColumnarTransitions: Rule[SparkPlan] = { + new Rule[SparkPlan] { + override def apply(sparkPlan: SparkPlan): SparkPlan = { + if (SparkEnv.get.conf.get(AuronConf.UI_ENABLED.key, "true").equals("true")) { + val sc = sparkSession.sparkContext + val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionId == null) { + logDebug(s"Unknown execution id for plan: $sparkPlan") + return sparkPlan + } + val concat = new PlanStringConcat() + concat.append("== Physical Plan ==\n") + + val (numAuronNodes, fallbackNodeToReason) = + AuronExplainUtils.processPlan(sparkPlan, concat.append) + + val event = AuronPlanFallbackEvent( + executionId.toLong, + numAuronNodes, + fallbackNodeToReason.size, + concat.toString(), + fallbackNodeToReason) + AuronEventUtils.post(sc, event) + } + sparkPlan + } + } + } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index b2ffa53a0..d43f7d17d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -69,7 +69,8 @@ abstract class NativeParquetInsertIntoHiveTableBase( .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) .toSeq :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) - :+ ("bytes_written", SQLMetrics + :+ ("bytes_written", + SQLMetrics .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = {