diff --git a/cluster/src/main/scala/org/apache/spark/status/api/v1/AllGlobalTemporaryViewsResource.scala b/cluster/src/main/scala/org/apache/spark/status/api/v1/AllGlobalTemporaryViewsResource.scala new file mode 100644 index 0000000000..1313bba011 --- /dev/null +++ b/cluster/src/main/scala/org/apache/spark/status/api/v1/AllGlobalTemporaryViewsResource.scala @@ -0,0 +1,31 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.{GET, Produces} +import javax.ws.rs.core.MediaType + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllGlobalTemporaryViewsResource { + @GET + def viewsList(): Seq[GlobalTemporaryViewSummary] = { + // get all view stats details + TableDetails.getAllGlobalTempViewsInfo + } +} diff --git a/cluster/src/main/scala/org/apache/spark/status/api/v1/ClusterDetails.scala b/cluster/src/main/scala/org/apache/spark/status/api/v1/ClusterDetails.scala index 07183236f6..5fda2eaf3d 100644 --- a/cluster/src/main/scala/org/apache/spark/status/api/v1/ClusterDetails.scala +++ b/cluster/src/main/scala/org/apache/spark/status/api/v1/ClusterDetails.scala @@ -61,8 +61,10 @@ object ClusterDetails { val membersInfo = MemberDetails.getAllMembersInfo val tablesInfo = TableDetails.getAllTablesInfo val extTablesInfo = TableDetails.getAllExternalTablesInfo + val gblTempViewsInfo = TableDetails.getAllGlobalTempViewsInfo - clusterBuff += new ClusterSummary(clusterInfo, membersInfo, tablesInfo, extTablesInfo) + clusterBuff += new ClusterSummary(clusterInfo, membersInfo, tablesInfo, + extTablesInfo, gblTempViewsInfo) clusterBuff.toList } } diff --git a/cluster/src/main/scala/org/apache/spark/status/api/v1/SnappyApiRootResource.scala b/cluster/src/main/scala/org/apache/spark/status/api/v1/SnappyApiRootResource.scala index e7502c41b5..0514592d31 100644 --- a/cluster/src/main/scala/org/apache/spark/status/api/v1/SnappyApiRootResource.scala +++ b/cluster/src/main/scala/org/apache/spark/status/api/v1/SnappyApiRootResource.scala @@ -67,6 +67,11 @@ private[v1] class SnappyApiRootResource extends ApiRequestContext { new AllExternalTablesResource } + @Path("allglobaltempviews") + def getAllGlobalTempViews(): AllGlobalTemporaryViewsResource = { + new AllGlobalTemporaryViewsResource + } + } private[spark] object SnappyApiRootResource { diff --git a/cluster/src/main/scala/org/apache/spark/status/api/v1/TableDetails.scala b/cluster/src/main/scala/org/apache/spark/status/api/v1/TableDetails.scala index 272d26e42d..810c20ef74 100644 --- a/cluster/src/main/scala/org/apache/spark/status/api/v1/TableDetails.scala +++ b/cluster/src/main/scala/org/apache/spark/status/api/v1/TableDetails.scala @@ -18,8 +18,12 @@ */ package org.apache.spark.status.api.v1 +import scala.collection.mutable + import io.snappydata.SnappyTableStatsProviderService +import org.apache.spark.sql.types.StructType + object TableDetails { def getAllTablesInfo: Seq[TableSummary] = { @@ -60,4 +64,27 @@ object TableDetails { table.getDataSourcePath) }).values.toList } + + def getAllGlobalTempViewsInfo: Seq[GlobalTemporaryViewSummary] = { + + val gblTempViewBuff = + SnappyTableStatsProviderService.getService.getAllGlobalTempViewStatsFromService + + gblTempViewBuff.mapValues(view => { + val colCount = view.getSchema.asInstanceOf[StructType].size + val schemaFields = view.getSchema.asInstanceOf[StructType].fields + val schemaStringBuilder = new StringBuilder + schemaFields.foreach(field => { + schemaStringBuilder.append("(" + field.name + ":" + field.dataType + ", " + + "nullable=" + { if (field.nullable) "Yes" else "No" } + ")\n") + }) + + val columnsInfo = mutable.HashMap.empty[String, Any] + columnsInfo += ("numColumns" -> colCount); + columnsInfo += ("fieldsString" -> schemaStringBuilder.toString()); + + new GlobalTemporaryViewSummary(view.getFullyQualifiedName, view.getTableName, + view.getTableType, columnsInfo) + }).values.toList + } } diff --git a/cluster/src/main/scala/org/apache/spark/status/api/v1/snappyapi.scala b/cluster/src/main/scala/org/apache/spark/status/api/v1/snappyapi.scala index d78220ad72..3f6343e780 100644 --- a/cluster/src/main/scala/org/apache/spark/status/api/v1/snappyapi.scala +++ b/cluster/src/main/scala/org/apache/spark/status/api/v1/snappyapi.scala @@ -28,7 +28,8 @@ class ClusterSummary private[spark]( val clusterInfo: mutable.HashMap[String, Any], val membersInfo: Seq[MemberSummary], val tablesInfo: Seq[TableSummary], - val externalTablesInfo: Seq[ExternalTableSummary] + val externalTablesInfo: Seq[ExternalTableSummary], + val globalTempViewsInfo: Seq[GlobalTemporaryViewSummary] ) class MemberSummary private[spark]( @@ -98,3 +99,10 @@ class ExternalTableSummary private[spark]( val provider: String, val source: String ) + +class GlobalTemporaryViewSummary private[spark]( + val tableFQName: String, + val tableName: String, + val tableType: String, + val columnsInfo: mutable.HashMap[String, Any] +) diff --git a/cluster/src/main/scala/org/apache/spark/ui/SnappyDashboardPage.scala b/cluster/src/main/scala/org/apache/spark/ui/SnappyDashboardPage.scala index e7027c2fc5..568fb80ea3 100644 --- a/cluster/src/main/scala/org/apache/spark/ui/SnappyDashboardPage.scala +++ b/cluster/src/main/scala/org/apache/spark/ui/SnappyDashboardPage.scala @@ -81,12 +81,22 @@ private[ui] class SnappyDashboardPage (parent: SnappyDashboardTab) extTablesStatsTitle ++ extTablesStatsTable } + val gblTempViewsStatsDetails = { + val gblTempViewsStatsTitle = createTitleNode(SnappyDashboardPage.gblTempViewsStatsTitle, + SnappyDashboardPage.gblTempViewsStatsTitleTooltip, + "gblViewsStatsTitle", + false) + val gblTempViewsStatsTable = gblTempViewStats + + gblTempViewsStatsTitle ++ gblTempViewsStatsTable + } + val jsScripts = val pageContent = jsScripts ++ pageTitleNode ++ clusterStatsDetails ++ membersStatsDetails ++ - tablesStatsDetails ++ extTablesStatsDetails + tablesStatsDetails ++ extTablesStatsDetails ++ gblTempViewsStatsDetails UIUtils.headerSparkPage(pageHeaderText, pageContent, parent, Some(500), useDataTables = true, isSnappyPage = true) @@ -339,6 +349,42 @@ private[ui] class SnappyDashboardPage (parent: SnappyDashboardTab) } + private def gblTempViewStats(): Seq[Node] = { + + + } + } object SnappyDashboardPage { @@ -460,4 +506,14 @@ object SnappyDashboardPage { extTableStatsColumn += ("externalSource" -> "Source") extTableStatsColumn += ("externalSourceTooltip" -> "External Source of Tables ") + val gblTempViewsStatsTitle = "Global Temporary Views" + val gblTempViewsStatsTitleTooltip = "SnappyData Global Temporary Views Summary" + val gblTempViewStatsColumn = scala.collection.mutable.HashMap.empty[String, String] + gblTempViewStatsColumn += ("name" -> "Name") + gblTempViewStatsColumn += ("nameTooltip" -> "Global Temporary View's Name") + gblTempViewStatsColumn += ("type" -> "Type") + gblTempViewStatsColumn += ("typeTooltip" -> "Type") + gblTempViewStatsColumn += ("columnsCount" -> "Fields") + gblTempViewStatsColumn += ("columnsCountTooltip" -> "Number of Fields/Columns in the View") + } diff --git a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala index 1649477e0a..ec8b1d1c88 100644 --- a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala +++ b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala @@ -100,7 +100,7 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService override def run2(): Unit = { try { if (doRun) { - aggregateStats() + aggregateStats(Some(sc)) } } catch { case _: CancelException => // ignore @@ -182,7 +182,7 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService } override def getStatsFromAllServers(sc: Option[SparkContext] = None): (Seq[SnappyRegionStats], - Seq[SnappyIndexStats], Seq[SnappyExternalTableStats]) = { + Seq[SnappyIndexStats], Seq[SnappyExternalTableStats], Seq[SnappyGlobalTemporaryViewStats]) = { var result = new java.util.ArrayList[SnappyRegionStatsCollectorResult]().asScala val dataServers = GfxdMessage.getAllDataStores var resultObtained: Boolean = false @@ -221,6 +221,32 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService } } + val globalTempViews: mutable.Buffer[SnappyGlobalTemporaryViewStats] = { + + val snc = SnappyContext.apply(this.sparkContext) + val snappySessionCatalog = snc.snappySession.sessionCatalog + val globalTempSchema: String = "global_temp" + val gblTempViewList = snappySessionCatalog.listTables(globalTempSchema) + + if (gblTempViewList.isEmpty) { + mutable.Buffer.empty[SnappyGlobalTemporaryViewStats] + } else { + val gtv_buf = mutable.Buffer.empty[SnappyGlobalTemporaryViewStats] + + gblTempViewList.foreach(tmpView => { + try { + val gt = snappySessionCatalog.getTempViewOrPermanentTableMetadata(tmpView) + gtv_buf += new SnappyGlobalTemporaryViewStats(tmpView.table, gt.qualifiedName, + gt.tableType.name, gt.comment.getOrElse(""), gt.schema, gt.properties.asJava) + } catch { + case e: Exception => log.debug("Exception while getting details of global " + + "temporary view [" + tmpView + "] : " + e.getMessage, e) + } + }) + gtv_buf + } + } + if (resultObtained) { // Return updated tableSizeInfo // Map to hold hive table type against table names as keys @@ -245,12 +271,14 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService // Return updated details (regionStats, result.flatMap(_.getIndexStats.asScala), - externalTables) + externalTables, + globalTempViews) } else { // Return last successfully updated tableSizeInfo (tableSizeInfo.values.toSeq, result.flatMap(_.getIndexStats.asScala), - externalTables) + externalTables, + globalTempViews) } } diff --git a/core/src/main/scala/io/snappydata/SnappyThinConnectorTableStatsProvider.scala b/core/src/main/scala/io/snappydata/SnappyThinConnectorTableStatsProvider.scala index f38a6b72c7..29b030ef1a 100644 --- a/core/src/main/scala/io/snappydata/SnappyThinConnectorTableStatsProvider.scala +++ b/core/src/main/scala/io/snappydata/SnappyThinConnectorTableStatsProvider.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import com.gemstone.gemfire.CancelException import com.pivotal.gemfirexd.Attribute -import com.pivotal.gemfirexd.internal.engine.ui.{SnappyExternalTableStats, SnappyIndexStats, SnappyRegionStats} +import com.pivotal.gemfirexd.internal.engine.ui.{SnappyExternalTableStats, SnappyGlobalTemporaryViewStats, SnappyIndexStats, SnappyRegionStats} import io.snappydata.Constant._ import org.apache.spark.SparkContext @@ -71,7 +71,7 @@ object SnappyThinConnectorTableStatsProvider extends TableStatsProviderService { override def run(): Unit = { try { if (doRun) { - aggregateStats() + aggregateStats(Some(sc)) } } catch { case _: CancelException => // ignore @@ -111,7 +111,8 @@ object SnappyThinConnectorTableStatsProvider extends TableStatsProviderService { } override def getStatsFromAllServers(sc: Option[SparkContext] = None): (Seq[SnappyRegionStats], - Seq[SnappyIndexStats], Seq[SnappyExternalTableStats]) = synchronized { + Seq[SnappyIndexStats], Seq[SnappyExternalTableStats], + Seq[SnappyGlobalTemporaryViewStats]) = synchronized { try { val resultSet = executeStatsStmt(sc) val regionStats = new ArrayBuffer[SnappyRegionStats] @@ -126,14 +127,14 @@ object SnappyThinConnectorTableStatsProvider extends TableStatsProviderService { regionStats += new SnappyRegionStats(tableName, totalSize, sizeInMemory, rowCount, isColumnTable, isReplicatedTable, bucketCount) } - (regionStats, Nil, Nil) + (regionStats, Nil, Nil, Nil) } catch { case NonFatal(e) => logWarning("Warning: unable to retrieve table stats " + "from SnappyData cluster due to " + e.toString) logDebug("Exception stack trace: ", e) closeConnection() - (Nil, Nil, Nil) + (Nil, Nil, Nil, Nil) } } diff --git a/core/src/main/scala/io/snappydata/TableStatsProviderService.scala b/core/src/main/scala/io/snappydata/TableStatsProviderService.scala index c4db2171fb..75253f36cf 100644 --- a/core/src/main/scala/io/snappydata/TableStatsProviderService.scala +++ b/core/src/main/scala/io/snappydata/TableStatsProviderService.scala @@ -29,7 +29,7 @@ import scala.language.implicitConversions import scala.util.control.NonFatal import com.gemstone.gemfire.CancelException -import com.pivotal.gemfirexd.internal.engine.ui.{MemberStatistics, SnappyExternalTableStats, SnappyIndexStats, SnappyRegionStats} +import com.pivotal.gemfirexd.internal.engine.ui.{MemberStatistics, SnappyExternalTableStats, SnappyGlobalTemporaryViewStats, SnappyIndexStats, SnappyRegionStats} import org.apache.spark.sql.SnappySession import org.apache.spark.sql.collection.Utils @@ -40,6 +40,7 @@ trait TableStatsProviderService extends Logging { @volatile protected var tableSizeInfo = Map.empty[String, SnappyRegionStats] private var externalTableSizeInfo = Map.empty[String, SnappyExternalTableStats] + private var globalTemporaryViewSizeInfo = Map.empty[String, SnappyGlobalTemporaryViewStats] @volatile private var indexesInfo = Map.empty[String, SnappyIndexStats] protected val membersInfo: mutable.Map[String, MemberStatistics] = @@ -52,18 +53,22 @@ trait TableStatsProviderService extends Logging { @volatile protected var doRun: Boolean = false @volatile private var running: Boolean = false + protected var sparkContext: SparkContext = null + def start(sc: SparkContext, url: String): Unit - protected def aggregateStats(): Unit = synchronized { + protected def aggregateStats(sc: Option[SparkContext] = None): Unit = synchronized { try { if (doRun) { val prevTableSizeInfo = tableSizeInfo running = true try { - val (tableStats, indexStats, extTableStats) = getAggregatedStatsOnDemand + this.sparkContext = sc.get + val (tableStats, indexStats, extTableStats, gblTempViewStats) = getAggregatedStatsOnDemand tableSizeInfo = tableStats indexesInfo = indexStats // populating indexes stats externalTableSizeInfo = extTableStats + globalTemporaryViewSizeInfo = gblTempViewStats // Commenting this call to avoid periodic refresh of members stats // get members details @@ -182,15 +187,32 @@ trait TableStatsProviderService extends Logging { this.externalTableSizeInfo } + def getGlobalTempViewStatsFromService( + fullyQualifiedViewName: String): Option[SnappyGlobalTemporaryViewStats] = { + if (!this.globalTemporaryViewSizeInfo.contains(fullyQualifiedViewName)) { + // force run + aggregateStats() + } + this.globalTemporaryViewSizeInfo.get(fullyQualifiedViewName) + } + + def getAllGlobalTempViewStatsFromService: Map[String, SnappyGlobalTemporaryViewStats] = { + this.globalTemporaryViewSizeInfo + } + def getAggregatedStatsOnDemand: (Map[String, SnappyRegionStats], - Map[String, SnappyIndexStats], Map[String, SnappyExternalTableStats]) = { - if (!doRun) return (Map.empty, Map.empty, Map.empty) - val (tableStats, indexStats, externalTableStats) = getStatsFromAllServers() + Map[String, SnappyIndexStats], Map[String, SnappyExternalTableStats], + Map[String, SnappyGlobalTemporaryViewStats]) = { + if (!doRun) return (Map.empty, Map.empty, Map.empty, Map.empty) + val (tableStats, indexStats, externalTableStats, + globalTemporaryViewStats) = getStatsFromAllServers() val aggregatedStats = scala.collection.mutable.Map[String, SnappyRegionStats]() val aggregatedExtTableStats = scala.collection.mutable.Map[String, SnappyExternalTableStats]() + val aggregatedGlobalTempViewStats = + scala.collection.mutable.Map[String, SnappyGlobalTemporaryViewStats]() val aggregatedStatsIndex = scala.collection.mutable.Map[String, SnappyIndexStats]() - if (!doRun) return (Map.empty, Map.empty, Map.empty) + if (!doRun) return (Map.empty, Map.empty, Map.empty, Map.empty) // val samples = getSampleTableList(snc) tableStats.foreach { stat => aggregatedStats.get(stat.getTableName) match { @@ -204,12 +226,19 @@ trait TableStatsProviderService extends Logging { indexStats.foreach { stat => aggregatedStatsIndex.put(stat.getIndexName, stat) } + externalTableStats.foreach { stat => aggregatedExtTableStats.put(stat.getTableName, stat) } + + globalTemporaryViewStats.foreach { stat => + aggregatedGlobalTempViewStats.put(stat.getFullyQualifiedName, stat) + } + (Utils.immutableMap(aggregatedStats), Utils.immutableMap(aggregatedStatsIndex), - Utils.immutableMap(aggregatedExtTableStats)) + Utils.immutableMap(aggregatedExtTableStats), + Utils.immutableMap(aggregatedGlobalTempViewStats)) } /* @@ -225,5 +254,5 @@ trait TableStatsProviderService extends Logging { */ def getStatsFromAllServers(sc: Option[SparkContext] = None): (Seq[SnappyRegionStats], - Seq[SnappyIndexStats], Seq[SnappyExternalTableStats]) + Seq[SnappyIndexStats], Seq[SnappyExternalTableStats], Seq[SnappyGlobalTemporaryViewStats]) }