diff --git a/build.gradle b/build.gradle index 6d74360481..144a0709d1 100644 --- a/build.gradle +++ b/build.gradle @@ -818,9 +818,13 @@ task product(type: Zip) { dependsOn ":snappy-spark:snappy-spark-assembly_${scalaBinaryVersion}:sparkProduct" dependsOn ':snappy-launcher:jar' dependsOn ':jdbcJar' + // Commented "v2connector" in order to exclude from build and test process. + // uncomment if we decide to include it. +// dependsOn ":snappy-v2connector_${scalaBinaryVersion}:jar" def clusterProject = project(":snappy-cluster_${scalaBinaryVersion}") def launcherProject = project(':snappy-launcher') +// def v2connectorProject = project(":snappy-v2connector_${scalaBinaryVersion}") def targetProject = clusterProject if (isEnterpriseProduct) { @@ -885,6 +889,13 @@ task product(type: Zip) { include launcherProject.jar.archiveName } + //Copying the V2Connector jar +// copy { +// from v2connectorProject.jar.destinationDir +// into "${snappyProductDir}/jars" +// include v2connectorProject.jar.archiveName +// } + // create the RELEASE file def releaseFile = file("${snappyProductDir}/RELEASE") String buildFlags = '' @@ -1162,8 +1173,26 @@ task jdbcJar { } } +// Uncomment for v2Connector project build. +//task v2ConnectorJar { +// dependsOn ":snappy-v2connector_${scalaBinaryVersion}:shadowJar" +// +// doLast { +// def v2ConnectorProject = project(":snappy-v2connector_${scalaBinaryVersion}") +// String v2ConnectorName = "snappydata-v2connector_${scalaBinaryVersion}-${version}.jar" +// // copy the snappy-v2connector shadow jar into distributions +// copy { +// from v2ConnectorProject.shadowJar.destinationDir +// into "${rootProject.buildDir}/distributions" +// include v2ConnectorProject.shadowJar.archiveName +// rename { filename -> v2ConnectorName } +// } +// } +//} + task copyShadowJars { dependsOn jdbcJar +// dependsOn v2ConnectorJar dependsOn ":snappy-core_${scalaBinaryVersion}:shadowJar" doLast { diff --git a/cluster/src/dunit/scala/org/apache/spark/memory/SnappyUnifiedMemoryManagerDUnitTest.scala b/cluster/src/dunit/scala/org/apache/spark/memory/SnappyUnifiedMemoryManagerDUnitTest.scala index abf126018c..188ed44fee 100644 --- a/cluster/src/dunit/scala/org/apache/spark/memory/SnappyUnifiedMemoryManagerDUnitTest.scala +++ b/cluster/src/dunit/scala/org/apache/spark/memory/SnappyUnifiedMemoryManagerDUnitTest.scala @@ -310,7 +310,7 @@ class SnappyUnifiedMemoryManagerDUnitTest(s: String) extends ClusterManagerTestB stmt.execute(s"CALL SYS.SET_BUCKETS_FOR_LOCAL_EXECUTION('$columnTable', " + s"'${(0 until numBuckets).mkString(",")}', -1)") val rs = stmt.executeQuery(s"CALL SYS.COLUMN_TABLE_SCAN('$columnTable', " + - s"'${(1 to numColumns).mkString(",")}', null)") + s"'${(1 to numColumns).mkString(",")}', null, 1)") var n = 0 while (rs.next()) { n += 1 diff --git a/core/src/main/scala/io/snappydata/impl/SmartConnectorRDDHelper.scala b/core/src/main/scala/io/snappydata/impl/SmartConnectorRDDHelper.scala index 95045f7643..0b00d052ea 100644 --- a/core/src/main/scala/io/snappydata/impl/SmartConnectorRDDHelper.scala +++ b/core/src/main/scala/io/snappydata/impl/SmartConnectorRDDHelper.scala @@ -37,7 +37,8 @@ final class SmartConnectorRDDHelper { def prepareScan(conn: Connection, txId: String, columnTable: String, projection: Array[Int], serializedFilters: Array[Byte], partition: SmartExecutorBucketPartition, catalogVersion: Long): (PreparedStatement, ResultSet) = { - val pstmt = conn.prepareStatement("call sys.COLUMN_TABLE_SCAN(?, ?, ?)") + + val pstmt = conn.prepareStatement("call sys.COLUMN_TABLE_SCAN(?, ?, ?, 1)") pstmt.setString(1, columnTable) pstmt.setString(2, projection.mkString(",")) // serialize the filters diff --git a/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala b/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala index ddb3f4b94c..ddcf27e456 100644 --- a/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala +++ b/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala @@ -1325,10 +1325,3 @@ case class LocalMode(override val sc: SparkContext, override val url: String) extends ClusterMode { override val description: String = "Local mode" } - -class TableNotFoundException(schema: String, table: String, cause: Option[Throwable] = None) - extends AnalysisException(s"Table or view '$table' not found in schema '$schema'", - cause = cause) - -class PolicyNotFoundException(schema: String, name: String, cause: Option[Throwable] = None) - extends AnalysisException(s"Policy '$name' not found in schema '$schema'", cause = cause) diff --git a/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala b/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala index e0c4bd9c8d..27cd4894f3 100644 --- a/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala @@ -839,11 +839,6 @@ object Utils { TASKCONTEXT_FUNCTION } - def executorsListener(sc: SparkContext): Option[ExecutorsListener] = sc.ui match { - case Some(ui) => Some(ui.executorsListener) - case _ => None - } - def getActiveSession: Option[SparkSession] = SparkSession.getActiveSession def sqlInternal(snappy: SnappySession, sqlText: String): CachedDataFrame = diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchIterator.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchIterator.scala index 09b9f13fc8..3870ac5b58 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchIterator.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchIterator.scala @@ -16,30 +16,21 @@ */ package org.apache.spark.sql.execution.columnar -import java.nio.{ByteBuffer, ByteOrder} -import java.sql.{Connection, ResultSet, Statement} +import java.nio.ByteBuffer import java.util.function.BiFunction -import scala.collection.mutable.ArrayBuffer -import scala.language.implicitConversions -import scala.util.control.NonFatal - import com.gemstone.gemfire.cache.EntryDestroyedException -import com.gemstone.gemfire.internal.cache.{BucketRegion, GemFireCacheImpl, LocalRegion, NonLocalRegionEntry, PartitionedRegion, RegionEntry, TXStateInterface} -import com.gemstone.gemfire.internal.shared.{BufferAllocator, FetchRequest} +import com.gemstone.gemfire.internal.cache._ +import com.gemstone.gemfire.internal.shared.FetchRequest import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer -import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection -import io.snappydata.thrift.common.BufferedBlob -import org.eclipse.collections.api.block.procedure.Procedure -import org.eclipse.collections.impl.map.mutable.primitive.IntObjectHashMap - -import org.apache.spark.memory.MemoryManagerCallback.releaseExecutionMemory -import org.apache.spark.sql.execution.columnar.encoding.{ColumnDecoder, ColumnDeleteDecoder, ColumnEncoding, UpdatedColumnDecoder, UpdatedColumnDecoderBase} +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.columnar.encoding._ import org.apache.spark.sql.execution.columnar.impl._ import org.apache.spark.sql.execution.row.PRValuesIterator -import org.apache.spark.sql.store.CompressionUtils import org.apache.spark.sql.types.StructField -import org.apache.spark.{Logging, TaskContext, TaskContextImpl, TaskKilledException} + +import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions case class ColumnBatch(numRows: Int, buffers: Array[ByteBuffer], statsData: Array[Byte], deltaIndexes: Array[Int]) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala index d1ac23c1e1..8888c6cd9f 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala @@ -22,7 +22,6 @@ import java.util.Collections import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer - import com.gemstone.gemfire.cache.{EntryDestroyedException, RegionDestroyedException} import com.gemstone.gemfire.internal.cache.lru.LRUEntry import com.gemstone.gemfire.internal.cache.persistence.query.CloseableIterator @@ -43,8 +42,7 @@ import com.pivotal.gemfirexd.internal.impl.jdbc.{EmbedConnection, Util} import com.pivotal.gemfirexd.internal.impl.sql.execute.PrivilegeInfo import com.pivotal.gemfirexd.internal.shared.common.reference.SQLState import io.snappydata.SnappyTableStatsProviderService -import io.snappydata.sql.catalog.{CatalogObjectType, SnappyExternalCatalog} - +import io.snappydata.sql.catalog.{CatalogObjectType, SmartConnectorHelper, SnappyExternalCatalog} import org.apache.spark.Logging import org.apache.spark.memory.{MemoryManagerCallback, MemoryMode} import org.apache.spark.serializer.KryoSerializerPool @@ -52,7 +50,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodeGenerator, CodegenContext} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal, TokenLiteral, UnsafeRow} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, expressions} -import org.apache.spark.sql.collection.{ToolsCallbackInit, Utils} +import org.apache.spark.sql.collection.{SharedUtils, ToolsCallbackInit, Utils} import org.apache.spark.sql.execution.ConnectionPool import org.apache.spark.sql.execution.columnar.encoding.ColumnStatsSchema import org.apache.spark.sql.execution.columnar.{ColumnBatchCreator, ColumnBatchIterator, ColumnTableScan, ExternalStore, ExternalStoreUtils} @@ -129,7 +127,7 @@ object StoreCallbacksImpl extends StoreCallbacks with Logging with Serializable // add weightage column for sample tables if required var schema = catalogEntry.schema.asInstanceOf[StructType] if (catalogEntry.tableType == CatalogObjectType.Sample.toString && - schema(schema.length - 1).name != Utils.WEIGHTAGE_COLUMN_NAME) { + schema(schema.length - 1).name != Utils.WEIGHTAGE_COLUMN_NAME) { schema = schema.add(Utils.WEIGHTAGE_COLUMN_NAME, LongType, nullable = false) } @@ -197,11 +195,18 @@ object StoreCallbacksImpl extends StoreCallbacks with Logging with Serializable @throws(classOf[SQLException]) override def columnTableScan(columnTable: String, projection: Array[Int], serializedFilters: Array[Byte], - bucketIds: java.util.Set[Integer]): CloseableIterator[ColumnTableEntry] = { + bucketIds: java.util.Set[Integer], + useKryoSerializer: Boolean): CloseableIterator[ColumnTableEntry] = { // deserialize the filters val batchFilters = if ((serializedFilters ne null) && serializedFilters.length > 0) { - KryoSerializerPool.deserialize(serializedFilters, 0, serializedFilters.length, - (kryo, in) => kryo.readObject(in, classOf[Array[Filter]])).toSeq + if (useKryoSerializer) { + KryoSerializerPool.deserialize(serializedFilters, 0, serializedFilters.length, + (kryo, in) => kryo.readObject(in, classOf[Array[Filter]])).toSeq + } else { + // java serializer + val v = SharedUtils.deserialize(serializedFilters).asInstanceOf[Array[Filter]] + v.toSeq + } } else null val (region, schemaAttrs, batchFilterExprs) = try { val lr = Misc.getRegionForTable(columnTable, true).asInstanceOf[LocalRegion] @@ -372,7 +377,8 @@ object StoreCallbacksImpl extends StoreCallbacks with Logging with Serializable private def attr(a: String, schema: Seq[AttributeReference]): AttributeReference = { // filter passed should have same case as in schema and not be qualified which // should be true since these have been created from resolved Expression by sender - schema.find(_.name == a) match { + // TODO: [shirish] converted to uppercase to make v2 connector work + schema.find(x => x.name == a || x.name == a.toUpperCase) match { case Some(attr) => attr case _ => throw Utils.analysisException(s"Could not find $a in ${schema.mkString(", ")}") } diff --git a/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala b/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala index d3e31d21af..936a84cb80 100644 --- a/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala +++ b/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala @@ -925,206 +925,6 @@ class SnappyConf(@transient val session: SnappySession) } } -class SQLConfigEntry private(private[sql] val entry: ConfigEntry[_]) { - - def key: String = entry.key - - def doc: String = entry.doc - - def isPublic: Boolean = entry.isPublic - - def defaultValue[T]: Option[T] = entry.defaultValue.asInstanceOf[Option[T]] - - def defaultValueString: String = entry.defaultValueString - - def valueConverter[T]: String => T = - entry.asInstanceOf[ConfigEntry[T]].valueConverter - - def stringConverter[T]: T => String = - entry.asInstanceOf[ConfigEntry[T]].stringConverter - - override def toString: String = entry.toString -} - -object SQLConfigEntry { - - private def handleDefault[T](entry: TypedConfigBuilder[T], - defaultValue: Option[T]): SQLConfigEntry = defaultValue match { - case Some(v) => new SQLConfigEntry(entry.createWithDefault(v)) - case None => new SQLConfigEntry(entry.createOptional) - } - - def sparkConf[T: ClassTag](key: String, doc: String, defaultValue: Option[T], - isPublic: Boolean = true): SQLConfigEntry = { - classTag[T] match { - case ClassTag.Int => handleDefault[Int](ConfigBuilder(key) - .doc(doc).intConf, defaultValue.asInstanceOf[Option[Int]]) - case ClassTag.Long => handleDefault[Long](ConfigBuilder(key) - .doc(doc).longConf, defaultValue.asInstanceOf[Option[Long]]) - case ClassTag.Double => handleDefault[Double](ConfigBuilder(key) - .doc(doc).doubleConf, defaultValue.asInstanceOf[Option[Double]]) - case ClassTag.Boolean => handleDefault[Boolean](ConfigBuilder(key) - .doc(doc).booleanConf, defaultValue.asInstanceOf[Option[Boolean]]) - case c if c.runtimeClass == classOf[String] => - handleDefault[String](ConfigBuilder(key).doc(doc).stringConf, - defaultValue.asInstanceOf[Option[String]]) - case c => throw new IllegalArgumentException( - s"Unknown type of configuration key: $c") - } - } - - def apply[T: ClassTag](key: String, doc: String, defaultValue: Option[T], - isPublic: Boolean = true): SQLConfigEntry = { - classTag[T] match { - case ClassTag.Int => handleDefault[Int](SQLConfigBuilder(key) - .doc(doc).intConf, defaultValue.asInstanceOf[Option[Int]]) - case ClassTag.Long => handleDefault[Long](SQLConfigBuilder(key) - .doc(doc).longConf, defaultValue.asInstanceOf[Option[Long]]) - case ClassTag.Double => handleDefault[Double](SQLConfigBuilder(key) - .doc(doc).doubleConf, defaultValue.asInstanceOf[Option[Double]]) - case ClassTag.Boolean => handleDefault[Boolean](SQLConfigBuilder(key) - .doc(doc).booleanConf, defaultValue.asInstanceOf[Option[Boolean]]) - case c if c.runtimeClass == classOf[String] => - handleDefault[String](SQLConfigBuilder(key).doc(doc).stringConf, - defaultValue.asInstanceOf[Option[String]]) - case c => throw new IllegalArgumentException( - s"Unknown type of configuration key: $c") - } - } -} - -trait AltName[T] { - - def name: String - - def altName: String - - def configEntry: SQLConfigEntry - - def defaultValue: Option[T] = configEntry.defaultValue[T] - - def getOption(conf: SparkConf): Option[String] = if (altName == null) { - conf.getOption(name) - } else { - conf.getOption(name) match { - case s: Some[String] => // check if altName also present and fail if so - if (conf.contains(altName)) { - throw new IllegalArgumentException( - s"Both $name and $altName configured. Only one should be set.") - } else s - case None => conf.getOption(altName) - } - } - - private def get(conf: SparkConf, name: String, - defaultValue: String): T = { - configEntry.entry.defaultValue match { - case Some(_) => configEntry.valueConverter[T]( - conf.get(name, defaultValue)) - case None => configEntry.valueConverter[Option[T]]( - conf.get(name, defaultValue)).get - } - } - - def get(conf: SparkConf): T = if (altName == null) { - get(conf, name, configEntry.defaultValueString) - } else { - if (conf.contains(name)) { - if (!conf.contains(altName)) get(conf, name, configEntry.defaultValueString) - else { - throw new IllegalArgumentException( - s"Both $name and $altName configured. Only one should be set.") - } - } else { - get(conf, altName, configEntry.defaultValueString) - } - } - - def get(properties: Properties): T = { - val propertyValue = getProperty(properties) - if (propertyValue ne null) configEntry.valueConverter[T](propertyValue) - else defaultValue.get - } - - def getProperty(properties: Properties): String = if (altName == null) { - properties.getProperty(name) - } else { - val v = properties.getProperty(name) - if (v != null) { - // check if altName also present and fail if so - if (properties.getProperty(altName) != null) { - throw new IllegalArgumentException( - s"Both $name and $altName specified. Only one should be set.") - } - v - } else properties.getProperty(altName) - } - - def unapply(key: String): Boolean = name.equals(key) || - (altName != null && altName.equals(key)) -} - -trait SQLAltName[T] extends AltName[T] { - - private def get(conf: SQLConf, entry: SQLConfigEntry): T = { - entry.defaultValue match { - case Some(_) => conf.getConf(entry.entry.asInstanceOf[ConfigEntry[T]]) - case None => conf.getConf(entry.entry.asInstanceOf[ConfigEntry[Option[T]]]).get - } - } - - private def get(conf: SQLConf, name: String, - defaultValue: String): T = { - configEntry.entry.defaultValue match { - case Some(_) => configEntry.valueConverter[T]( - conf.getConfString(name, defaultValue)) - case None => configEntry.valueConverter[Option[T]]( - conf.getConfString(name, defaultValue)).get - } - } - - def get(conf: SQLConf): T = if (altName == null) { - get(conf, configEntry) - } else { - if (conf.contains(name)) { - if (!conf.contains(altName)) get(conf, configEntry) - else { - throw new IllegalArgumentException( - s"Both $name and $altName configured. Only one should be set.") - } - } else { - get(conf, altName, configEntry.defaultValueString) - } - } - - def getOption(conf: SQLConf): Option[T] = if (altName == null) { - if (conf.contains(name)) Some(get(conf, name, "")) - else defaultValue - } else { - if (conf.contains(name)) { - if (!conf.contains(altName)) Some(get(conf, name, "")) - else { - throw new IllegalArgumentException( - s"Both $name and $altName configured. Only one should be set.") - } - } else if (conf.contains(altName)) { - Some(get(conf, altName, "")) - } else defaultValue - } - - def set(conf: SQLConf, value: T, useAltName: Boolean = false): Unit = { - if (useAltName) { - conf.setConfString(altName, configEntry.stringConverter(value)) - } else { - conf.setConf[T](configEntry.entry.asInstanceOf[ConfigEntry[T]], value) - } - } - - def remove(conf: SQLConf, useAltName: Boolean = false): Unit = { - conf.unsetConf(if (useAltName) altName else name) - } -} - private[sql] final class PreprocessTable(state: SnappySessionState) extends Rule[LogicalPlan] { private def conf: SQLConf = state.conf diff --git a/encoders/build.gradle b/encoders/build.gradle index 232b1bd3d7..d3ede73b64 100644 --- a/encoders/build.gradle +++ b/encoders/build.gradle @@ -45,9 +45,10 @@ dependencies { } else { compile group: 'io.snappydata', name: 'snappydata-store-core', version: snappyStoreVersion } - + compile "org.eclipse.collections:eclipse-collections-api:${eclipseCollectionsVersion}" compile "org.eclipse.collections:eclipse-collections:${eclipseCollectionsVersion}" + compileOnly "org.eclipse.jetty:jetty-servlet:${jettyVersion}" compile "org.apache.tomcat:tomcat-jdbc:${tomcatJdbcVersion}" compile "com.zaxxer:HikariCP:${hikariCPVersion}" @@ -71,4 +72,3 @@ scalaTest { test.dependsOn ':cleanJUnit' -archivesBaseName = 'snappydata-encoders_' + scalaBinaryVersion diff --git a/core/src/main/scala/io/snappydata/Literals.scala b/encoders/src/main/scala/io/snappydata/Literals.scala similarity index 98% rename from core/src/main/scala/io/snappydata/Literals.scala rename to encoders/src/main/scala/io/snappydata/Literals.scala index 8625565b9f..f6a828137e 100644 --- a/core/src/main/scala/io/snappydata/Literals.scala +++ b/encoders/src/main/scala/io/snappydata/Literals.scala @@ -16,11 +16,11 @@ */ package io.snappydata -import scala.reflect.ClassTag - -import org.apache.spark.sql.execution.columnar.ExternalStoreUtils +import org.apache.spark.sql.execution.columnar.SharedExternalStoreUtils import org.apache.spark.sql.internal.{AltName, SQLAltName, SQLConfigEntry} +import scala.reflect.ClassTag + object StreamingConstants { val EVENT_TYPE_COLUMN = "_eventType" val SINK_STATE_TABLE = s"SNAPPYSYS_INTERNAL____SINK_STATE_TABLE" @@ -140,7 +140,7 @@ object Property extends Enumeration { "store. When inserting data into the column storage this is the unit " + "(in bytes or k/m/g suffixes for unit) that will be used to split the data " + "into chunks for efficient storage and retrieval. It can also be set for each " + - s"table using the ${ExternalStoreUtils.COLUMN_BATCH_SIZE} option in " + + s"table using the ${SharedExternalStoreUtils.COLUMN_BATCH_SIZE} option in " + "create table DDL. Maximum allowed size is 2GB.", Some("24m")) val ColumnMaxDeltaRows: SQLValue[Int] = SQLVal[Int]( @@ -150,7 +150,7 @@ object Property extends Enumeration { "this allows a lower limit on number of rows for better scan performance. " + "So the delta buffer will be rolled into the column store whichever of " + s"$ColumnBatchSize and this property is hit first. It can also be set for " + - s"each table using the ${ExternalStoreUtils.COLUMN_MAX_DELTA_ROWS} option in " + + s"each table using the ${SharedExternalStoreUtils.COLUMN_MAX_DELTA_ROWS} option in " + s"create table DDL else this setting is used for the create table.", Some(10000)) val DisableHashJoin: SQLValue[Boolean] = SQLVal[Boolean]( diff --git a/core/src/main/scala/io/snappydata/sql/catalog/ConnectorExternalCatalog.scala b/encoders/src/main/scala/io/snappydata/sql/catalog/ConnectorExternalCatalog.scala similarity index 93% rename from core/src/main/scala/io/snappydata/sql/catalog/ConnectorExternalCatalog.scala rename to encoders/src/main/scala/io/snappydata/sql/catalog/ConnectorExternalCatalog.scala index ce0705fb47..9265d9176e 100644 --- a/core/src/main/scala/io/snappydata/sql/catalog/ConnectorExternalCatalog.scala +++ b/encoders/src/main/scala/io/snappydata/sql/catalog/ConnectorExternalCatalog.scala @@ -18,24 +18,22 @@ package io.snappydata.sql.catalog import java.sql.SQLException import java.util.Collections -import javax.annotation.concurrent.GuardedBy - -import scala.collection.JavaConverters._ import com.google.common.cache.{Cache, CacheBuilder} import com.pivotal.gemfirexd.internal.shared.common.reference.SQLState import io.snappydata.Property import io.snappydata.thrift._ - +import javax.annotation.concurrent.GuardedBy import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.collection.Utils -import org.apache.spark.sql.collection.Utils.EMPTY_STRING_ARRAY -import org.apache.spark.sql.execution.columnar.ExternalStoreUtils +import org.apache.spark.sql.collection.SharedUtils +import org.apache.spark.sql.execution.columnar.SharedExternalStoreUtils import org.apache.spark.sql.{SparkSession, TableNotFoundException} import org.apache.spark.{Logging, Partition, SparkEnv} +import scala.collection.JavaConverters._ + /** * Base class for catalog implementations for connector modes. This is either used as basis * for ExternalCatalog implementation (in smart connector) or as a helper class for catalog @@ -85,6 +83,8 @@ trait ConnectorExternalCatalog { object ConnectorExternalCatalog extends Logging { + final val EMPTY_STRING_ARRAY = Array.empty[String] + def cacheSize: Int = { SparkEnv.get match { case null => Property.CatalogCacheSize.defaultValue.get @@ -119,7 +119,7 @@ object ConnectorExternalCatalog extends Logging { val tableProps = tableObj.getProperties.asScala.toMap val storage = tableObj.getStorage val storageProps = storage.properties.asScala.toMap - val schema = ExternalStoreUtils.getTableSchema(tableObj.getTableSchema) + val schema = SharedExternalStoreUtils.getTableSchema(tableObj.getTableSchema) // SnappyData tables have bucketOwners while hive managed tables have bucketColumns // The bucketSpec below is only for hive managed tables. val bucketSpec = if (tableObj.getBucketColumns.isEmpty) None @@ -144,7 +144,7 @@ object ConnectorExternalCatalog extends Logging { } else None val bucketOwners = tableObj.getBucketOwners // remove partitioning columns from CatalogTable for row/column tables - val partitionCols = if (bucketOwners.isEmpty) Utils.EMPTY_STRING_ARRAY + val partitionCols = if (bucketOwners.isEmpty) SharedUtils.EMPTY_STRING_ARRAY else { val cols = tableObj.getPartitionColumns tableObj.setPartitionColumns(Collections.emptyList()) @@ -172,15 +172,20 @@ object ConnectorExternalCatalog extends Logging { val bucketCount = tableObj.getNumBuckets val indexCols = toArray(tableObj.getIndexColumns) val pkCols = toArray(tableObj.getPrimaryKeyColumns) + val preferHost = SmartConnectorHelper.preferHostName(session) + val preferPrimaries = session.conf.getOption(Property.PreferPrimariesInQuery.name) match { + case None => Property.PreferPrimariesInQuery.defaultValue.get + case Some(p) => p.toBoolean + } if (bucketCount > 0) { val allNetUrls = SmartConnectorHelper.setBucketToServerMappingInfo( - bucketCount, bucketOwners, session) + bucketCount, bucketOwners, preferHost, preferPrimaries) val partitions = SmartConnectorHelper.getPartitions(allNetUrls) table -> Some(RelationInfo(bucketCount, isPartitioned = true, partitionCols, indexCols, pkCols, partitions, catalogSchemaVersion)) } else { val allNetUrls = SmartConnectorHelper.setReplicasToServerMappingInfo( - tableObj.getBucketOwners.get(0).getSecondaries, session) + tableObj.getBucketOwners.get(0).getSecondaries, preferHost) val partitions = SmartConnectorHelper.getPartitions(allNetUrls) table -> Some(RelationInfo(1, isPartitioned = false, EMPTY_STRING_ARRAY, indexCols, pkCols, partitions, catalogSchemaVersion)) @@ -319,9 +324,9 @@ object ConnectorExternalCatalog extends Logging { case class RelationInfo(numBuckets: Int, isPartitioned: Boolean, - partitioningCols: Array[String] = Utils.EMPTY_STRING_ARRAY, - indexCols: Array[String] = Utils.EMPTY_STRING_ARRAY, - pkCols: Array[String] = Utils.EMPTY_STRING_ARRAY, + partitioningCols: Array[String] = SharedUtils.EMPTY_STRING_ARRAY, + indexCols: Array[String] = SharedUtils.EMPTY_STRING_ARRAY, + pkCols: Array[String] = SharedUtils.EMPTY_STRING_ARRAY, partitions: Array[org.apache.spark.Partition] = Array.empty, catalogSchemaVersion: Long = -1) { diff --git a/core/src/main/scala/io/snappydata/sql/catalog/SmartConnectorHelper.scala b/encoders/src/main/scala/io/snappydata/sql/catalog/SmartConnectorHelper.scala similarity index 92% rename from core/src/main/scala/io/snappydata/sql/catalog/SmartConnectorHelper.scala rename to encoders/src/main/scala/io/snappydata/sql/catalog/SmartConnectorHelper.scala index f2a2faa217..3946e07798 100644 --- a/core/src/main/scala/io/snappydata/sql/catalog/SmartConnectorHelper.scala +++ b/encoders/src/main/scala/io/snappydata/sql/catalog/SmartConnectorHelper.scala @@ -21,23 +21,21 @@ import java.nio.file.{Files, Paths} import java.sql.{CallableStatement, Connection, SQLException} import java.util.Collections -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - import com.pivotal.gemfirexd.Attribute import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils import com.pivotal.gemfirexd.internal.iapi.types.HarmonySerialBlob import com.pivotal.gemfirexd.jdbc.ClientAttribute -import io.snappydata.thrift.{BucketOwners, CatalogMetadataDetails, CatalogMetadataRequest} import io.snappydata.{Constant, Property} -import org.eclipse.collections.impl.map.mutable.UnifiedMap - +import io.snappydata.thrift.{BucketOwners, CatalogMetadataDetails, CatalogMetadataRequest} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.collection.{SmartExecutorBucketPartition, Utils} +import org.apache.spark.sql.collection.{SharedUtils, SmartExecutorBucketPartition} import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, JDBCOptions, JdbcUtils} -import org.apache.spark.sql.store.StoreUtils import org.apache.spark.{Logging, Partition, SparkContext} +import org.eclipse.collections.impl.map.mutable.UnifiedMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer class SmartConnectorHelper(session: SparkSession, jdbcUrl: String) extends Logging { @@ -106,7 +104,7 @@ class SmartConnectorHelper(session: SparkSession, jdbcUrl: String) extends Loggi logWarning(s"could not add path $jarpath to SparkContext as the file is not readable") } }) - val newClassLoader = Utils.newMutableURLClassLoader(mutableList.toArray) + val newClassLoader = SharedUtils.newMutableURLClassLoader(mutableList.toArray) Thread.currentThread().setContextClassLoader(newClassLoader) } } @@ -182,7 +180,7 @@ object SmartConnectorHelper { val numServers = bucketToServerList(0).length val chosenServerIndex = if (numServers > 1) scala.util.Random.nextInt(numServers) else 0 for (p <- 0 until numPartitions) { - if (StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT) { + if (SharedUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT) { partitions(p) = new SmartExecutorBucketPartition(p, p, bucketToServerList(scala.util.Random.nextInt(numPartitions))) } else { @@ -198,7 +196,7 @@ object SmartConnectorHelper { def preferHostName(session: SparkSession): Boolean = { // check if Spark executors are using IP addresses or host names - Utils.executorsListener(session.sparkContext) match { + SharedUtils.executorsListener(session.sparkContext) match { case Some(l) => val preferHost = l.activeStorageStatusList.collectFirst { case status if status.blockManagerId.executorId != "driver" => @@ -223,14 +221,8 @@ object SmartConnectorHelper { } def setBucketToServerMappingInfo(numBuckets: Int, buckets: java.util.List[BucketOwners], - session: SparkSession): Array[ArrayBuffer[(String, String)]] = { + preferHost: Boolean, preferPrimaries: Boolean): Array[ArrayBuffer[(String, String)]] = { if (!buckets.isEmpty) { - // check if Spark executors are using IP addresses or host names - val preferHost = preferHostName(session) - val preferPrimaries = session.conf.getOption(Property.PreferPrimariesInQuery.name) match { - case None => Property.PreferPrimariesInQuery.defaultValue.get - case Some(p) => p.toBoolean - } var orphanBuckets: ArrayBuffer[Int] = null val allNetUrls = new Array[ArrayBuffer[(String, String)]](numBuckets) val availableNetUrls = new UnifiedMap[String, String](4) @@ -276,10 +268,8 @@ object SmartConnectorHelper { } def setReplicasToServerMappingInfo(replicaNodes: java.util.List[String], - session: SparkSession): Array[ArrayBuffer[(String, String)]] = { - // check if Spark executors are using IP addresses or host names - val preferHost = preferHostName(session) - val urlPrefix = Constant.DEFAULT_THIN_CLIENT_URL + preferHost: Boolean): Array[ArrayBuffer[(String, String)]] = { + val urlPrefix = Constant.DEFAULT_THIN_CLIENT_URL // no query routing or load-balancing val urlSuffix = "/" + ClientAttribute.ROUTE_QUERY + "=false;" + ClientAttribute.LOAD_BALANCE + "=false" diff --git a/encoders/src/main/scala/org/apache/spark/sql/TableNotFoundException.scala b/encoders/src/main/scala/org/apache/spark/sql/TableNotFoundException.scala new file mode 100644 index 0000000000..597d253d30 --- /dev/null +++ b/encoders/src/main/scala/org/apache/spark/sql/TableNotFoundException.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + */ +package org.apache.spark.sql + +class TableNotFoundException(schema: String, table: String, cause: Option[Throwable] = None) + extends AnalysisException(s"Table or view '$table' not found in schema '$schema'", + cause = cause) + + +class PolicyNotFoundException(schema: String, name: String, cause: Option[Throwable] = None) + extends AnalysisException(s"Policy '$name' not found in schema '$schema'", cause = cause) \ No newline at end of file diff --git a/encoders/src/main/scala/org/apache/spark/sql/collection/SharedUtils.scala b/encoders/src/main/scala/org/apache/spark/sql/collection/SharedUtils.scala index b997694768..17acfa49d1 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/collection/SharedUtils.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/collection/SharedUtils.scala @@ -22,19 +22,18 @@ import java.nio.ByteBuffer import scala.collection.mutable import scala.language.existentials - import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import com.gemstone.gemfire.internal.shared.BufferAllocator import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder import com.gemstone.gemfire.internal.snappy.UMMMemoryTracker - import org.apache.spark._ import org.apache.spark.memory.{MemoryManagerCallback, MemoryMode, TaskMemoryManager} import org.apache.spark.scheduler.TaskLocation import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.storage.BlockManagerId +import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.unsafe.Platform import org.apache.spark.util.MutableURLClassLoader @@ -68,6 +67,12 @@ object SharedUtils { def taskMemoryManager(context: TaskContext): TaskMemoryManager = context.taskMemoryManager() + + def executorsListener(sc: SparkContext): Option[ExecutorsListener] = sc.ui match { + case Some(ui) => Some(ui.executorsListener) + case _ => None + } + def toUnsafeRow(buffer: ByteBuffer, numColumns: Int): UnsafeRow = { if (buffer eq null) return null val row = new UnsafeRow(numColumns) @@ -133,11 +138,9 @@ object SharedUtils { * @return */ def deserialize(value: Array[Byte]): Any = { - val bais: ByteArrayInputStream = new ByteArrayInputStream(value) - val os: ObjectInputStream = new ObjectInputStream(bais) - val filters = os.read() - os.close() - filters + val baip = new ByteArrayInputStream(value) + val ois = new ObjectInputStream(baip) + ois.readObject() } } diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/SharedExternalStoreUtils.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/SharedExternalStoreUtils.scala index bad207b674..e633c74d7c 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/SharedExternalStoreUtils.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/SharedExternalStoreUtils.scala @@ -154,8 +154,3 @@ object SharedExternalStoreUtils { } } } - -class TableNotFoundException(schema: String, table: String, cause: Option[Throwable] = None) - extends AnalysisException(s"Table or view '$table' not found in schema '$schema'", - cause = cause) - diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala index 63b649edd9..549ceaa7a9 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala @@ -22,9 +22,9 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl import com.gemstone.gemfire.internal.shared.unsafe.DirectBufferAllocator import com.gemstone.gemfire.internal.shared.{BufferAllocator, ClientSharedUtils, HeapBufferAllocator} import io.snappydata.util.StringUtils - import org.apache.spark.memory.MemoryManagerCallback.memoryManager import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.dsl.expressions import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.UnsafeRow.calculateBitSetWidthInBytes import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} @@ -1013,13 +1013,16 @@ object ColumnEncoding { * Full stats row has "nullCount" as non-nullable while delta stats row has it as nullable. */ case class ColumnStatsSchema(fieldName: String, - dataType: DataType, nullCountNullable: Boolean) { - val lowerBound: AttributeReference = AttributeReference( - fieldName + ".lowerBound", dataType)() - val upperBound: AttributeReference = AttributeReference( - fieldName + ".upperBound", dataType)() - val nullCount: AttributeReference = AttributeReference( - fieldName + ".nullCount", IntegerType, nullCountNullable)() + dataType: DataType, nullCountNullable: Boolean) { + + // TODO: verify nullable = false value + val lowerBound: AttributeReference = ColumnStatsSchema.newAttributeReference( + fieldName + ".lowerBound", dataType, nullable = false) + // TODO: verify nullable = false value + val upperBound: AttributeReference = ColumnStatsSchema.newAttributeReference( + fieldName + ".upperBound", dataType, nullable = false) + val nullCount: AttributeReference = ColumnStatsSchema.newAttributeReference( + fieldName + ".nullCount", IntegerType, nullCountNullable) val schema = Seq(lowerBound, upperBound, nullCount) @@ -1030,10 +1033,34 @@ object ColumnStatsSchema { val NUM_STATS_PER_COLUMN = 3 val COUNT_INDEX_IN_SCHEMA = 0 - val COUNT_ATTRIBUTE: AttributeReference = AttributeReference( - "batchCount", IntegerType, nullable = false)() + val COUNT_ATTRIBUTE: AttributeReference = newAttributeReference("batchCount", + IntegerType, nullable = false) def numStatsColumns(schemaSize: Int): Int = schemaSize * NUM_STATS_PER_COLUMN + 1 + + def newAttributeReference(name: String, dataType: DataType, nullable: Boolean): + AttributeReference = { (dataType match { + + case booleanType: BooleanType => new expressions.DslSymbol(Symbol(name)).boolean + case byteType: ByteType => new expressions.DslSymbol(Symbol(name)).byte + case shortType: ShortType => new expressions.DslSymbol(Symbol(name)).short + case integerType: IntegerType => new expressions.DslSymbol(Symbol(name)).int + case longType: LongType => new expressions.DslSymbol(Symbol(name)).long + case doubleType: DoubleType => new expressions.DslSymbol(Symbol(name)).float + case floatType: FloatType => new expressions.DslSymbol(Symbol(name)).double + case stringType: StringType => new expressions.DslSymbol(Symbol(name)).string + case dateType: DateType => new expressions.DslSymbol(Symbol(name)).date + case decimalType: DecimalType => new expressions.DslSymbol(Symbol(name)) + .decimal(decimalType.precision, decimalType.scale) + // case DecimalType => new expressions.DslSymbol(Symbol(name)).decimal + case timestampType: TimestampType => new expressions.DslSymbol(Symbol(name)).timestamp + case binaryType: BinaryType => new expressions.DslSymbol(Symbol(name)).binary + case arrayType: ArrayType => new expressions.DslSymbol(Symbol(name)).array(arrayType) + case mapType: MapType => new expressions.DslSymbol(Symbol(name)).map(mapType) + case structType: StructType => new expressions.DslSymbol(Symbol(name)).struct(structType) + + }).withNullability(nullable) + } } trait NotNullDecoder extends ColumnDecoder { diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/row/ResultSetDecoder.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/row/ResultSetDecoder.scala deleted file mode 100644 index 980809cda3..0000000000 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/row/ResultSetDecoder.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2018 SnappyData, Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ -package org.apache.spark.sql.execution.row - -import com.gemstone.gemfire.internal.shared.ClientSharedData -import io.snappydata.ResultSetWithNull - -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, SerializedArray, SerializedMap, SerializedRow} -import org.apache.spark.sql.execution.columnar.encoding.ColumnDecoder -import org.apache.spark.sql.types.{DataType, Decimal, StructField} -import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} - -/** - * An adapter for a ResultSet to pose as ColumnEncoding so that the same - * generated code can be used for both row buffer and column data access. - */ -final class ResultSetDecoder(rs: ResultSetWithNull, columnPosition: Int) - extends ColumnDecoder(null, 0L, null) { - - private[this] val defaultCal = ClientSharedData.getDefaultCleanCalendar - - override def typeId: Int = -1 - - override def supports(dataType: DataType): Boolean = true - - // nulls can be present so always return true - override protected[sql] def hasNulls: Boolean = true - - override protected[sql] def initializeNulls(columnBytes: AnyRef, - startCursor: Long, field: StructField): Long = 0L - - override protected[sql] def initializeCursor(columnBytes: AnyRef, cursor: Long, - dataType: DataType): Long = 0L - - override def getNextNullPosition: Int = - if (rs.isNull(columnPosition)) 0 else 1 /* 1 will never match */ - - override def findNextNullPosition(columnBytes: AnyRef, nextNullPosition: Int, num: Int): Int = - 1 /* batch size is always 1 */ - - override def numNulls(columnBytes: AnyRef, ordinal: Int, num: Int): Int = - if (rs.isNull(columnPosition)) 1 else 0 - - override def isNullAt(columnBytes: AnyRef, position: Int): Boolean = - rs.isNull(columnPosition) - - override def readBoolean(columnBytes: AnyRef, nonNullPosition: Int): Boolean = - rs.getBoolean(columnPosition) - - override def readByte(columnBytes: AnyRef, nonNullPosition: Int): Byte = - rs.getByte(columnPosition) - - override def readShort(columnBytes: AnyRef, nonNullPosition: Int): Short = - rs.getShort(columnPosition) - - override def readInt(columnBytes: AnyRef, nonNullPosition: Int): Int = - rs.getInt(columnPosition) - - override def readLong(columnBytes: AnyRef, nonNullPosition: Int): Long = - rs.getLong(columnPosition) - - override def readFloat(columnBytes: AnyRef, nonNullPosition: Int): Float = - rs.getFloat(columnPosition) - - override def readDouble(columnBytes: AnyRef, nonNullPosition: Int): Double = - rs.getDouble(columnPosition) - - override def readLongDecimal(columnBytes: AnyRef, precision: Int, - scale: Int, nonNullPosition: Int): Decimal = { - val dec = rs.getBigDecimal(columnPosition) - if (dec != null) { - Decimal.apply(dec, precision, scale) - } else { - null - } - } - - override def readDecimal(columnBytes: AnyRef, precision: Int, scale: Int, - nonNullPosition: Int): Decimal = - readLongDecimal(columnBytes, precision, scale, nonNullPosition) - - override def readUTF8String(columnBytes: AnyRef, nonNullPosition: Int): UTF8String = - UTF8String.fromString(rs.getString(columnPosition)) - - override def readDate(columnBytes: AnyRef, nonNullPosition: Int): Int = { - defaultCal.clear() - val date = rs.getDate(columnPosition, defaultCal) - if (date ne null) DateTimeUtils.fromJavaDate(date) else -1 - } - - override def readTimestamp(columnBytes: AnyRef, nonNullPosition: Int): Long = { - defaultCal.clear() - val timestamp = rs.getTimestamp(columnPosition, defaultCal) - if (timestamp ne null) DateTimeUtils.fromJavaTimestamp(timestamp) else -1L - } - - override def readBinary(columnBytes: AnyRef, nonNullPosition: Int): Array[Byte] = - rs.getBytes(columnPosition) - - override def readInterval(columnBytes: AnyRef, - nonNullPosition: Int): CalendarInterval = { - val micros = rs.getLong(columnPosition) - if (rs.wasNull()) null else new CalendarInterval(0, micros) - } - - override def readArray(columnBytes: AnyRef, nonNullPosition: Int): SerializedArray = { - val b = rs.getBytes(columnPosition) - if (b != null) { - val result = new SerializedArray(8) // includes size - result.pointTo(b, Platform.BYTE_ARRAY_OFFSET, b.length) - result - } else null - } - - override def readMap(columnBytes: AnyRef, nonNullPosition: Int): SerializedMap = { - val b = rs.getBytes(columnPosition) - if (b != null) { - val result = new SerializedMap - result.pointTo(b, Platform.BYTE_ARRAY_OFFSET) - result - } else null - } - - override def readStruct(columnBytes: AnyRef, numFields: Int, - nonNullPosition: Int): SerializedRow = { - val b = rs.getBytes(columnPosition) - if (b != null) { - val result = new SerializedRow(4, numFields) // includes size - result.pointTo(b, Platform.BYTE_ARRAY_OFFSET, b.length) - result - } else null - } -} diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowDecoder.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowDecoder.scala deleted file mode 100644 index 9ba3cd6cba..0000000000 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowDecoder.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2018 SnappyData, Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ -package org.apache.spark.sql.execution.row - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} -import org.apache.spark.sql.execution.columnar.encoding.ColumnDecoder -import org.apache.spark.sql.types.{DataType, Decimal, StructField} -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} - -// TODO: SW: change this to use SerializedRow/Array/Map (for sampler reservoir) -final class UnsafeRowDecoder(holder: UnsafeRowHolder, columnIndex: Int) - extends ColumnDecoder(null, 0L, null) { - - override def typeId: Int = -2 - - override def supports(dataType: DataType): Boolean = true - - // nulls can be present so always return true - override protected[sql] def hasNulls: Boolean = true - - override protected[sql] def initializeNulls(columnBytes: AnyRef, - startCursor: Long, field: StructField): Long = 0L - - override protected[sql] def initializeCursor(columnBytes: AnyRef, cursor: Long, - dataType: DataType): Long = 0L - - override def getNextNullPosition: Int = - if (holder.row.isNullAt(columnIndex)) 0 else 1 /* 1 will never match */ - - override def findNextNullPosition(columnBytes: AnyRef, nextNullPosition: Int, num: Int): Int = - 1 /* batch size is always 1 */ - - override def numNulls(columnBytes: AnyRef, ordinal: Int, num: Int): Int = - if (holder.row.isNullAt(columnIndex)) 1 else 0 - - override def isNullAt(columnBytes: AnyRef, position: Int): Boolean = - holder.row.isNullAt(columnIndex) - - override def readBoolean(columnBytes: AnyRef, nonNullPosition: Int): Boolean = - holder.row.getBoolean(columnIndex) - - override def readByte(columnBytes: AnyRef, nonNullPosition: Int): Byte = - holder.row.getByte(columnIndex) - - override def readShort(columnBytes: AnyRef, nonNullPosition: Int): Short = - holder.row.getShort(columnIndex) - - override def readInt(columnBytes: AnyRef, nonNullPosition: Int): Int = - holder.row.getInt(columnIndex) - - override def readLong(columnBytes: AnyRef, nonNullPosition: Int): Long = - holder.row.getLong(columnIndex) - - override def readFloat(columnBytes: AnyRef, nonNullPosition: Int): Float = - holder.row.getFloat(columnIndex) - - override def readDouble(columnBytes: AnyRef, nonNullPosition: Int): Double = - holder.row.getDouble(columnIndex) - - override def readLongDecimal(columnBytes: AnyRef, precision: Int, scale: Int, - nonNullPosition: Int): Decimal = - holder.row.getDecimal(columnIndex, precision, scale) - - override def readDecimal(columnBytes: AnyRef, precision: Int, scale: Int, - nonNullPosition: Int): Decimal = - holder.row.getDecimal(columnIndex, precision, scale) - - override def readUTF8String(columnBytes: AnyRef, nonNullPosition: Int): UTF8String = - holder.row.getUTF8String(columnIndex) - - override def readBinary(columnBytes: AnyRef, nonNullPosition: Int): Array[Byte] = - holder.row.getBinary(columnIndex) - - override def readInterval(columnBytes: AnyRef, nonNullPosition: Int): CalendarInterval = - holder.row.getInterval(columnIndex) - - override def readArray(columnBytes: AnyRef, nonNullPosition: Int): ArrayData = - holder.row.getArray(columnIndex) - - override def readMap(columnBytes: AnyRef, nonNullPosition: Int): MapData = - holder.row.getMap(columnIndex) - - override def readStruct(columnBytes: AnyRef, numFields: Int, - nonNullPosition: Int): InternalRow = - holder.row.getStruct(columnIndex, numFields) -} - -final class UnsafeRowHolder { - private[row] var row: UnsafeRow = _ - - def setRow(row: UnsafeRow): Unit = this.row = row -} diff --git a/encoders/src/main/scala/org/apache/spark/sql/internal/SnappySessionSQLConf.scala b/encoders/src/main/scala/org/apache/spark/sql/internal/SnappySessionSQLConf.scala new file mode 100644 index 0000000000..49028f65c8 --- /dev/null +++ b/encoders/src/main/scala/org/apache/spark/sql/internal/SnappySessionSQLConf.scala @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.sql.internal + +import java.util.Properties + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry, TypedConfigBuilder} +import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder + +import scala.reflect.{ClassTag, classTag} + +class SQLConfigEntry private(private[sql] val entry: ConfigEntry[_]) { + + def key: String = entry.key + + def doc: String = entry.doc + + def isPublic: Boolean = entry.isPublic + + def defaultValue[T]: Option[T] = entry.defaultValue.asInstanceOf[Option[T]] + + def defaultValueString: String = entry.defaultValueString + + def valueConverter[T]: String => T = + entry.asInstanceOf[ConfigEntry[T]].valueConverter + + def stringConverter[T]: T => String = + entry.asInstanceOf[ConfigEntry[T]].stringConverter + + override def toString: String = entry.toString +} + +object SQLConfigEntry { + + private def handleDefault[T](entry: TypedConfigBuilder[T], + defaultValue: Option[T]): SQLConfigEntry = defaultValue match { + case Some(v) => new SQLConfigEntry(entry.createWithDefault(v)) + case None => new SQLConfigEntry(entry.createOptional) + } + + def sparkConf[T: ClassTag](key: String, doc: String, defaultValue: Option[T], + isPublic: Boolean = true): SQLConfigEntry = { + classTag[T] match { + case ClassTag.Int => handleDefault[Int](ConfigBuilder(key) + .doc(doc).intConf, defaultValue.asInstanceOf[Option[Int]]) + case ClassTag.Long => handleDefault[Long](ConfigBuilder(key) + .doc(doc).longConf, defaultValue.asInstanceOf[Option[Long]]) + case ClassTag.Double => handleDefault[Double](ConfigBuilder(key) + .doc(doc).doubleConf, defaultValue.asInstanceOf[Option[Double]]) + case ClassTag.Boolean => handleDefault[Boolean](ConfigBuilder(key) + .doc(doc).booleanConf, defaultValue.asInstanceOf[Option[Boolean]]) + case c if c.runtimeClass == classOf[String] => + handleDefault[String](ConfigBuilder(key).doc(doc).stringConf, + defaultValue.asInstanceOf[Option[String]]) + case c => throw new IllegalArgumentException( + s"Unknown type of configuration key: $c") + } + } + + def apply[T: ClassTag](key: String, doc: String, defaultValue: Option[T], + isPublic: Boolean = true): SQLConfigEntry = { + classTag[T] match { + case ClassTag.Int => handleDefault[Int](SQLConfigBuilder(key) + .doc(doc).intConf, defaultValue.asInstanceOf[Option[Int]]) + case ClassTag.Long => handleDefault[Long](SQLConfigBuilder(key) + .doc(doc).longConf, defaultValue.asInstanceOf[Option[Long]]) + case ClassTag.Double => handleDefault[Double](SQLConfigBuilder(key) + .doc(doc).doubleConf, defaultValue.asInstanceOf[Option[Double]]) + case ClassTag.Boolean => handleDefault[Boolean](SQLConfigBuilder(key) + .doc(doc).booleanConf, defaultValue.asInstanceOf[Option[Boolean]]) + case c if c.runtimeClass == classOf[String] => + handleDefault[String](SQLConfigBuilder(key).doc(doc).stringConf, + defaultValue.asInstanceOf[Option[String]]) + case c => throw new IllegalArgumentException( + s"Unknown type of configuration key: $c") + } + } +} + +trait AltName[T] { + + def name: String + + def altName: String + + def configEntry: SQLConfigEntry + + def defaultValue: Option[T] = configEntry.defaultValue[T] + + def getOption(conf: SparkConf): Option[String] = if (altName == null) { + conf.getOption(name) + } else { + conf.getOption(name) match { + case s: Some[String] => // check if altName also present and fail if so + if (conf.contains(altName)) { + throw new IllegalArgumentException( + s"Both $name and $altName configured. Only one should be set.") + } else s + case None => conf.getOption(altName) + } + } + + private def get(conf: SparkConf, name: String, + defaultValue: String): T = { + configEntry.entry.defaultValue match { + case Some(_) => configEntry.valueConverter[T]( + conf.get(name, defaultValue)) + case None => configEntry.valueConverter[Option[T]]( + conf.get(name, defaultValue)).get + } + } + + def get(conf: SparkConf): T = if (altName == null) { + get(conf, name, configEntry.defaultValueString) + } else { + if (conf.contains(name)) { + if (!conf.contains(altName)) get(conf, name, configEntry.defaultValueString) + else { + throw new IllegalArgumentException( + s"Both $name and $altName configured. Only one should be set.") + } + } else { + get(conf, altName, configEntry.defaultValueString) + } + } + + def get(properties: Properties): T = { + val propertyValue = getProperty(properties) + if (propertyValue ne null) configEntry.valueConverter[T](propertyValue) + else defaultValue.get + } + + def getProperty(properties: Properties): String = if (altName == null) { + properties.getProperty(name) + } else { + val v = properties.getProperty(name) + if (v != null) { + // check if altName also present and fail if so + if (properties.getProperty(altName) != null) { + throw new IllegalArgumentException( + s"Both $name and $altName specified. Only one should be set.") + } + v + } else properties.getProperty(altName) + } + + def unapply(key: String): Boolean = name.equals(key) || + (altName != null && altName.equals(key)) +} + +trait SQLAltName[T] extends AltName[T] { + + private def get(conf: SQLConf, entry: SQLConfigEntry): T = { + entry.defaultValue match { + case Some(_) => conf.getConf(entry.entry.asInstanceOf[ConfigEntry[T]]) + case None => conf.getConf(entry.entry.asInstanceOf[ConfigEntry[Option[T]]]).get + } + } + + private def get(conf: SQLConf, name: String, + defaultValue: String): T = { + configEntry.entry.defaultValue match { + case Some(_) => configEntry.valueConverter[T]( + conf.getConfString(name, defaultValue)) + case None => configEntry.valueConverter[Option[T]]( + conf.getConfString(name, defaultValue)).get + } + } + + def get(conf: SQLConf): T = if (altName == null) { + get(conf, configEntry) + } else { + if (conf.contains(name)) { + if (!conf.contains(altName)) get(conf, configEntry) + else { + throw new IllegalArgumentException( + s"Both $name and $altName configured. Only one should be set.") + } + } else { + get(conf, altName, configEntry.defaultValueString) + } + } + + def getOption(conf: SQLConf): Option[T] = if (altName == null) { + if (conf.contains(name)) Some(get(conf, name, "")) + else defaultValue + } else { + if (conf.contains(name)) { + if (!conf.contains(altName)) Some(get(conf, name, "")) + else { + throw new IllegalArgumentException( + s"Both $name and $altName configured. Only one should be set.") + } + } else if (conf.contains(altName)) { + Some(get(conf, altName, "")) + } else defaultValue + } + + def set(conf: SQLConf, value: T, useAltName: Boolean = false): Unit = { + if (useAltName) { + conf.setConfString(altName, configEntry.stringConverter(value)) + } else { + conf.setConf[T](configEntry.entry.asInstanceOf[ConfigEntry[T]], value) + } + } + + def remove(conf: SQLConf, useAltName: Boolean = false): Unit = { + conf.unsetConf(if (useAltName) altName else name) + } +} + diff --git a/settings.gradle b/settings.gradle index ba5c9e8e8a..3af9d0bc43 100644 --- a/settings.gradle +++ b/settings.gradle @@ -27,6 +27,9 @@ include 'dunit' include ':snappy-dtests_' + scalaBinaryVersion include ':snappy-compatibility-tests_' + scalaBinaryVersion include ':snappy-encoders_' + scalaBinaryVersion +// Commented "v2connector" in order to exclude from build and test process. +// uncomment if we decide to include it. +// include ':snappy-v2connector_' + scalaBinaryVersion project(':snappy-jdbc_' + scalaBinaryVersion).projectDir = "$rootDir/jdbc" as File project(':snappy-core_' + scalaBinaryVersion).projectDir = "$rootDir/core" as File @@ -36,6 +39,7 @@ project(':snappy-examples_' + scalaBinaryVersion).projectDir = "$rootDir/example project(':snappy-dtests_' + scalaBinaryVersion).projectDir = "$rootDir/dtests" as File project(':snappy-compatibility-tests_' + scalaBinaryVersion).projectDir = "$rootDir/compatibilityTests" as File project(':snappy-encoders_' + scalaBinaryVersion).projectDir = "$rootDir/encoders" as File +// project(':snappy-v2connector_' + scalaBinaryVersion).projectDir = "$rootDir/v2connector" as File if (new File(rootDir, 'spark/build.gradle').exists()) { include ':snappy-spark' diff --git a/v2connector/build.gradle b/v2connector/build.gradle new file mode 100644 index 0000000000..6708b77c24 --- /dev/null +++ b/v2connector/build.gradle @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +apply plugin: 'scala' + +compileScala.options.encoding = 'UTF-8' +// fix scala+java mix to all use compileScala which uses correct dependency order +sourceSets.main.scala.srcDir 'src/main/java' +sourceSets.test.scala.srcDirs = [ 'src/test/java', 'src/test/scala', + 'src/dunit/java', 'src/dunit/scala' ] +sourceSets.main.java.srcDirs = [] +sourceSets.test.java.srcDirs = [] + +def osName = org.gradle.internal.os.OperatingSystem.current() + +dependencies { + compileOnly 'org.scala-lang:scala-library:' + scalaVersion + compileOnly 'org.scala-lang:scala-reflect:' + scalaVersion + + compileOnly("org.apache.spark:spark-core_${scalaBinaryVersion}:2.3.2") + compileOnly("org.apache.spark:spark-sql_${scalaBinaryVersion}:2.3.2") + + testCompile("org.apache.spark:spark-core_${scalaBinaryVersion}:2.3.2") + testCompile("org.apache.spark:spark-sql_${scalaBinaryVersion}:2.3.2") + + compile project(":snappy-jdbc_${scalaBinaryVersion}") + compile project(":snappy-encoders_${scalaBinaryVersion}") + + testCompile project(':dunit') + testCompile "org.scalatest:scalatest_${scalaBinaryVersion}:${scalatestVersion}" + + testCompile("org.apache.spark:spark-core_${scalaBinaryVersion}:2.3.2:tests") + testCompile("org.apache.spark:spark-sql_${scalaBinaryVersion}:2.3.2:tests") + + testRuntime files("${projectDir}/../tests/common/src/main/resources") +} + +task packageScalaDocs(type: Jar, dependsOn: scaladoc) { + classifier = 'javadoc' + from scaladoc +} +if (rootProject.hasProperty('enablePublish')) { + artifacts { + archives packageScalaDocs, packageSources + } +} + +scalaTest { + dependsOn ':cleanScalaTest' + doFirst { + // cleanup files since scalatest plugin does not honour workingDir yet + cleanIntermediateFiles(project.path) + } + doLast { + // cleanup files since scalatest plugin does not honour workingDir yet + cleanIntermediateFiles(project.path) + } +} + +//def downloadApacheSparkDist(String ver, String distName, String prodDir) { +// return tasks.create("downloadApache${ver}SparkDist", Download) { +// outputs.files "${prodDir}.tgz" +// +// src "http://archive.apache.org/dist/spark/spark-${ver}/${distName}.tgz" +// dest sparkDistDir +// onlyIfNewer true +// +// doFirst { +// mkdir(sparkDistDir) +// } +// } +//} +// +//def taskGetApacheSparkDist(String ver, String distName, String prodDir) { +// return tasks.create("getApacheSpark${ver}Dist") { +// dependsOn downloadApacheSparkDist(ver, distName, prodDir) +// +// outputs.files "${prodDir}.tgz", "${prodDir}/README.md" +// +// doLast { +// if (osName.isWindows()) { +// copy { +// from tarTree(resources.gzip("${sparkDistDir}/${distName}.tgz")) +// into sparkDistDir +// } +// } else { +// // gradle tarTree does not preserve symlinks (GRADLE-2844) +// exec { +// executable 'tar' +// args 'xzf', "${distName}.tgz" +// workingDir = sparkDistDir +// } +// } +// } +// } +//} + +/*task getApacheSparkDist { + dependsOn taskGetApacheSparkDist(sparkCurrentVersion, sparkCurrentDistName, sparkCurrentProductDir) +}*/ + +test.dependsOn ':cleanJUnit' +// dunitTest.dependsOn getApacheSparkDist +check.dependsOn test, scalaTest, dunitTest + +archivesBaseName = 'snappydata-v2connector_' + scalaBinaryVersion +shadowJar { + zip64 = true + // avoid conflict with the 0.9.2 version in stock Spark + relocate 'org.apache.thrift', 'io.snappydata.org.apache.thrift' + // relocate koloboke for possible conflicts with user dependencies + relocate 'com.koloboke', 'io.snappydata.com.koloboke' + // relocate the guava's com.google packages + relocate 'com.google.common', 'io.snappydata.com.google.common' + + mergeServiceFiles() + exclude 'log4j.properties' + + if (rootProject.hasProperty('enablePublish')) { + createdBy = 'SnappyData Build Team' + } else { + createdBy = System.getProperty('user.name') + } + manifest { + attributes( + 'Manifest-Version' : '1.0', + 'Created-By' : createdBy, + 'Title' : "snappydata-v2connector_${scalaBinaryVersion}", + 'Version' : version, + 'Vendor' : vendorName + ) + } +} + +// write the POM for spark-package +String sparkPackageName = "snappydata-${version}-s_${scalaBinaryVersion}" + +task sparkPackagePom(dependsOn: shadowJar) { doLast { + file("${rootProject.buildDir}/distributions").mkdirs() + pom { + project { + groupId 'SnappyDataInc' + artifactId 'snappydata' + version "${version}-s_${scalaBinaryVersion}" + licenses { + license { + name 'The Apache Software License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + distribution 'repo' + } + } + } + whenConfigured { p -> p.dependencies.clear() } + }.writeTo("${rootProject.buildDir}/distributions/${sparkPackageName}.pom") + copy { + from "${buildDir}/libs" + into "${rootProject.buildDir}/distributions" + include "${shadowJar.archiveName}" + rename { filename -> "${sparkPackageName}.jar" } + } +} } +task sparkPackage(type: Zip, dependsOn: sparkPackagePom) { + archiveName "${sparkPackageName}.zip" + destinationDir = file("${rootProject.buildDir}/distributions") + outputs.upToDateWhen { false } + + from ("${rootProject.buildDir}/distributions") { + include "${sparkPackageName}.jar" + include "${sparkPackageName}.pom" + } +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/ConnectorUtils.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/ConnectorUtils.scala new file mode 100644 index 0000000000..c8a816e7ea --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/ConnectorUtils.scala @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2 + +import scala.collection.mutable.ArrayBuffer + +import io.snappydata.datasource.v2.driver.SnappyTableMetaData + +/** + * Contains utility methods required by connectors + */ +object ConnectorUtils { + + def preferredLocations(tableMetaData: SnappyTableMetaData, bucketId: Int): Array[String] = { + if (tableMetaData.bucketToServerMapping.isEmpty) return new Array[String](0) + + val preferredServers: ArrayBuffer[(String, String)] = if (tableMetaData.bucketCount > 0) { + // from bucketToServerMapping get the collection of hosts where the bucket exists + // (each element in preferredServers ArrayBuffer is in the form of a tuple (host, jdbcURL)) + tableMetaData.bucketToServerMapping.get(bucketId) + } else { // replicated tables + tableMetaData.bucketToServerMapping.get(0) + } + + if (preferredServers.isEmpty) return new Array[String](0) + + val locations = Array.ofDim[String](preferredServers.length) + var index: Int = 0 + preferredServers.foreach( + h => { + locations(index) = h._1 + index = index + 1 + } + ) + locations + } + +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/EvaluateFilter.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/EvaluateFilter.scala new file mode 100644 index 0000000000..a73f7c6ea2 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/EvaluateFilter.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package io.snappydata.datasource.v2 + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.sources._ + +object EvaluateFilter { + + def evaluateWhereClause(filters: Array[Filter]): (String, ArrayBuffer[Any]) = { + val numFilters = filters.length + val filterWhereArgs = new ArrayBuffer[Any](numFilters) + + // TODO: return pushed filters + val pushedFilters = Array.empty[Filter] + val filtersNotPushed = Array.empty[Filter] + + val filterWhereClause = if (numFilters > 0) { + val sb = new StringBuilder().append(" WHERE ") + val initLen = sb.length + filters.foreach(f => compileFilter(f, sb, filterWhereArgs, sb.length > initLen)) + if (filterWhereArgs.nonEmpty) { + sb.toString() + } else "" + } else "" + (filterWhereClause, filterWhereArgs) + } + + // below should exactly match ExternalStoreUtils.handledFilter + private def compileFilter(f: Filter, sb: StringBuilder, + args: ArrayBuffer[Any], addAnd: Boolean): Unit = f match { + case EqualTo(col, value) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(" = ?") + args += value + case LessThan(col, value) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(" < ?") + args += value + case GreaterThan(col, value) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(" > ?") + args += value + case LessThanOrEqual(col, value) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(" <= ?") + args += value + case GreaterThanOrEqual(col, value) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(" >= ?") + args += value + case StringStartsWith(col, value) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(s" LIKE $value%") + case In(col, values) => + if (addAnd) { + sb.append(" AND ") + } + sb.append(col).append(" IN (") + (1 until values.length).foreach(_ => sb.append("?,")) + sb.append("?)") + args ++= values + case And(left, right) => + if (addAnd) { + sb.append(" AND ") + } + sb.append('(') + compileFilter(left, sb, args, addAnd = false) + sb.append(") AND (") + compileFilter(right, sb, args, addAnd = false) + sb.append(')') + case Or(left, right) => + if (addAnd) { + sb.append(" AND ") + } + sb.append('(') + compileFilter(left, sb, args, addAnd = false) + sb.append(") OR (") + compileFilter(right, sb, args, addAnd = false) + sb.append(')') + case _ => // no filter pushdown + } + +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyDataPartitioning.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyDataPartitioning.scala new file mode 100644 index 0000000000..07d0b178b0 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyDataPartitioning.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2 + +import io.snappydata.datasource.v2.driver.SnappyTableMetaData + +import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Distribution, Partitioning} + +class SnappyDataPartitioning(tableMetaData: SnappyTableMetaData) extends Partitioning { + + override def numPartitions(): Int = { + if (tableMetaData.bucketCount > 0) { + tableMetaData.bucketCount + } else { + 1 // returning 1 for replicated table + } + } + + override def satisfy(distribution: Distribution): Boolean = { + if (tableMetaData.bucketCount > 0) { + distribution match { + case c: ClusteredDistribution => + c.clusteredColumns.sameElements(tableMetaData.partitioningCols) + case _ => false + } + } else { // replicated table + false + } + } + +} \ No newline at end of file diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyDataSource.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyDataSource.scala new file mode 100644 index 0000000000..c71c2b39ea --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyDataSource.scala @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package io.snappydata.datasource.v2 + +import java.util.function.Supplier + +import io.snappydata.datasource.v2.driver.{ColumnTableDataSourceReader, RowTableDataSourceReader, SnappyTableMetaData, SnappyTableMetaDataReader} + +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.reader.DataSourceReader +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, SessionConfigSupport} + +/** + * DataSource V2 implementation for SnappyData + */ +class SnappyDataSource extends DataSourceV2 with + ReadSupport with + DataSourceRegister with + SessionConfigSupport { + + /** + * Creates a {@link DataSourceReader} to scan the data from this data source. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + override def createReader(options: DataSourceOptions): DataSourceReader = { + validateOptions(options) + val tableMetaData: SnappyTableMetaData = + new SnappyTableMetaDataReader().getTableMetaData(options) + populateUserStats(options) + val dataSourceReader = tableMetaData.tableStorageType match { + case "row" => new RowTableDataSourceReader(options, tableMetaData) + case "column" => new ColumnTableDataSourceReader(options, tableMetaData) + case _ => throw new UnsupportedOperationException(s"Operations on tables of type" + + s" ${tableMetaData.tableStorageType} are not supported from V2 connector") + } + dataSourceReader + } + + override def shortName(): String = { + V2Constants.DATASOURCE_SHORT_NAME + } + + override def keyPrefix(): String = { + V2Constants.KEY_PREFIX + } + + private def validateOptions(options: DataSourceOptions): Unit = { + options.get(V2Constants.SnappyConnection). + orElseThrow(new Supplier[Throwable] { + override def get(): Throwable = + new IllegalArgumentException( + s"Required configuration ${V2Constants.SnappyConnection} not specified") + }) + + options.get(V2Constants.TABLE_NAME). + orElseThrow(new Supplier[Throwable] { + override def get(): Throwable = + new IllegalArgumentException( + s"Required configuration ${V2Constants.TABLE_NAME} not specified") + }) + + } + + private def populateUserStats(options: DataSourceOptions): Unit = { + import scala.collection.JavaConverters._ + val optionsMap : java.util.Map[String, String] = options.asMap() + optionsMap.asScala.foreach(e => + if (e._1.endsWith("_size")) { + UserProvidedStats.statsMap.+=(e._1 -> e._2.toLong) + } + ) + + } +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyStatistics.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyStatistics.scala new file mode 100644 index 0000000000..26fc231eb2 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/SnappyStatistics.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2 + +import java.util.OptionalLong + +import org.apache.spark.sql.sources.v2.reader.Statistics + +class SnappyStatistics(val tableName: String) extends Statistics { + + // TODO: currently returning user provided stats, + // fix this to return stats fetched from Snappy cluster + private lazy val sizeBytes = UserProvidedStats.statsMap.get(tableName.toLowerCase() + "_size") + + override def sizeInBytes(): OptionalLong = { + if (sizeBytes.isDefined) { + OptionalLong.of(sizeBytes.get) + } else { + OptionalLong.empty() + } + + } + + override def numRows(): OptionalLong = OptionalLong.empty() +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/UserProvidedStats.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/UserProvidedStats.scala new file mode 100644 index 0000000000..61f149965a --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/UserProvidedStats.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2 + +import scala.collection.mutable + +/** + * User provided stats + */ +object UserProvidedStats { + + val statsMap: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/V2Constants.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/V2Constants.scala new file mode 100644 index 0000000000..859f101aed --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/V2Constants.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2 + +object V2Constants { + + val DATASOURCE_SHORT_NAME = "snappydata" + + val KEY_PREFIX = "snappydata" + + /* + TODO: Same property defined in io.snappydata.Property.SnappyConnection + Move Literals.scala to a shared jar accessible here + */ + val SnappyConnection = "snappydata.connection" + + val TABLE_NAME = "table" + + val USER = "user" + + val PASSWORD = "password" +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/ColumnTableDataSourceReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/ColumnTableDataSourceReader.scala new file mode 100644 index 0000000000..26f43ae09f --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/ColumnTableDataSourceReader.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.driver + +import java.util +import java.util.{List => JList} + +import io.snappydata.datasource.v2.partition.SnappyColumnBatchReaderFactory + +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, SupportsScanColumnarBatch} +import org.apache.spark.sql.vectorized.ColumnarBatch + +// created on driver +class ColumnTableDataSourceReader(options: DataSourceOptions, tableMetaData: SnappyTableMetaData) + extends SnappyDataSourceReader(options, tableMetaData) with SupportsScanColumnarBatch { + + /** + * Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data + * in batches. + */ + override def createBatchDataReaderFactories: JList[DataReaderFactory[ColumnarBatch]] = { + val factories = new util.ArrayList[DataReaderFactory[ + ColumnarBatch]](tableMetaData.bucketToServerMapping.get.length) + var bucketId = 0 + val queryConstructs = QueryConstructs(readSchema(), filtersPushedToSnappy, + whereClause, whereClauseArgs) + tableMetaData.bucketToServerMapping.foreach(b => b.foreach(_ => { + factories.add(new SnappyColumnBatchReaderFactory(bucketId, tableMetaData, queryConstructs)) + bucketId = bucketId + 1 + } )) + factories + } + +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/RowTableDataSourceReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/RowTableDataSourceReader.scala new file mode 100644 index 0000000000..ec49eb7c85 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/RowTableDataSourceReader.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.driver + +import java.util +import java.util.{List => JList} + +import io.snappydata.datasource.v2.partition.SnappyRowTableReaderFactory + +import org.apache.spark.sql.Row +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.DataReaderFactory + +// created on driver +class RowTableDataSourceReader(options: DataSourceOptions, tableMetaData: SnappyTableMetaData) + extends SnappyDataSourceReader(options, tableMetaData) { + + /** + * Returns a list of reader factories. Each factory is responsible for creating a data reader to + * output data for one RDD partition. That means the number of factories returned here is same as + * the number of RDD partitions this scan outputs. + * + * Note that, this may not be a full scan if the data source reader mixes in other optimization + * interfaces like column pruning, filter push-down, etc. These optimizations are applied before + * Spark issues the scan request. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = { + // This will be called in the DataSourceV2ScanExec for creating + // org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories. + // Each factory object will correspond to one bucket/partition + // This will be called on the driver + // We will know the the name of the table from datasource options + // We will fire a system procedure to get the bucket to host mapping + // For each partition, we will create a factory with partition id and bucket id + // PERF: To start with, with we will consider one partition per bucket, later we can + // think batching multiple buckets for one partitions based on no of cores + // Each factory object will be constructed with table name and bucket ids + // call readSchema() and pushFilters() pass to the factory object so that + // Will return all the filters as unhandled filters initially using pushedFilters() + + val factories = + new util.ArrayList[DataReaderFactory[Row]](tableMetaData.bucketToServerMapping.get.length) + var bucketId = 0 + val queryConstructs = QueryConstructs(readSchema(), + filtersPushedToSnappy, whereClause, whereClauseArgs) + tableMetaData.bucketToServerMapping.foreach(b => b.foreach(_ => { + factories.add(new SnappyRowTableReaderFactory(bucketId, tableMetaData, queryConstructs)) + bucketId = bucketId + 1 + } )) + factories + } +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/SnappyDataSourceReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/SnappyDataSourceReader.scala new file mode 100644 index 0000000000..488d1d47f5 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/SnappyDataSourceReader.scala @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package io.snappydata.datasource.v2.driver + +import scala.collection.mutable.ArrayBuffer + +import io.snappydata.datasource.v2.{EvaluateFilter, SnappyDataPartitioning, SnappyStatistics, V2Constants} + +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, Statistics, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportPartitioning, SupportsReportStatistics} +import org.apache.spark.sql.types.StructType + +// created on driver +abstract class SnappyDataSourceReader(options: DataSourceOptions, + tableMetaData: SnappyTableMetaData) + extends DataSourceReader with + SupportsReportPartitioning with + SupportsPushDownRequiredColumns with + SupportsPushDownFilters with + SupportsReportStatistics { + + // projected columns + var projectedColumns: Option[StructType] = None + var filtersPushedToSnappy: Option[Array[Filter]] = None + var whereClause: String = _ + var whereClauseArgs: ArrayBuffer[Any] = _ + + /** + * Returns the actual schema of this data source reader, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + override def readSchema(): StructType = { + projectedColumns.getOrElse(tableMetaData.schema) + } + + + /** + * Applies column pruning w.r.t. the given requiredSchema. + * + * Implementation should try its best to prune the unnecessary columns or nested fields, but it's + * also OK to do the pruning partially, e.g., a data source may not be able to prune nested + * fields, and only prune top-level columns. + * + * Note that, data source readers should update {@link DataSourceReader#readSchema()} after + * applying column pruning. + */ + override def pruneColumns(requiredSchema: StructType): Unit = { + // called by the engine to set projected columns so that our implementation can use those. + // Implementation should return these in readSchema() + if (requiredSchema.length > 0) projectedColumns = Option(requiredSchema) + } + + /** + * Pushes down filters, and returns filters that need to be evaluated after scanning. + */ + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + // This method is passed all filters and is supposed to return unhandled filters + // We will return all the filters as unhandled filters initially using pushFilters() + // TODO: update the filters that can be pushed to Snappy + val (predicateClause, predicateArgs) = EvaluateFilter.evaluateWhereClause(filters) + whereClause = predicateClause + whereClauseArgs = predicateArgs + filtersPushedToSnappy = Option(filters) + filters + } + + /** + * Returns the filters that are pushed in {@link #pushFilters(Filter[])}. + * It's possible that there is no filters in the query and {@link #pushFilters(Filter[])} + * is never called, empty array should be returned for this case. + */ + override def pushedFilters(): Array[Filter] = { + // looks like not much of use +// filtersPushedToSnappy.getOrElse(Array.empty[Filter]) + Array.empty[Filter] + } + + /** + * Returns the output data partitioning that this reader guarantees. + */ + override def outputPartitioning(): Partitioning = { + new SnappyDataPartitioning(tableMetaData) + } + + override def getStatistics(): Statistics = { + new SnappyStatistics(options.get(V2Constants.TABLE_NAME).get()) + } +} + +case class QueryConstructs(projections: StructType, filters: Option[Array[Filter]] = None, + whereClause: String = "", whereClauseArgs: ArrayBuffer[Any]) \ No newline at end of file diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/SnappyTableMetaDataReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/SnappyTableMetaDataReader.scala new file mode 100644 index 0000000000..a2969b5fbf --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/driver/SnappyTableMetaDataReader.scala @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.driver + +import java.sql.{CallableStatement, DriverManager} +import java.util + +import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils +import com.pivotal.gemfirexd.internal.iapi.types.HarmonySerialBlob +import io.snappydata.Constant +import io.snappydata.datasource.v2.V2Constants +import io.snappydata.sql.catalog.SmartConnectorHelper +import io.snappydata.thrift.{CatalogMetadataDetails, CatalogMetadataRequest, snappydataConstants} +import org.apache.spark.sql.execution.columnar.TableNotFoundException +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.types.{DataType, StructType} + +import scala.collection.mutable.ArrayBuffer + +final class SnappyTableMetaDataReader { + + var conn: java.sql.Connection = _ + + // scalastyle:off classforname + Class.forName("io.snappydata.jdbc.ClientDriver").newInstance + // scalastyle:on classforname + + private val getV2MetaDataStmtString = "call sys.GET_CATALOG_METADATA(?, ?, ?)" + + private var getCatalogMetaDataStmt: CallableStatement = _ + + def initializeConnection(hostPort: String, user: String, password: String): Unit = { + val connectionURL = s"${Constant.DEFAULT_THIN_CLIENT_URL}$hostPort/" + + ";route-query=false;" + conn = DriverManager.getConnection(connectionURL) + getCatalogMetaDataStmt = conn.prepareCall(getV2MetaDataStmtString) + } + + def closeConnection(): Unit = { + if (conn != null) { + conn.close() + conn = null + } + } + + def getTableMetaData(options: DataSourceOptions): SnappyTableMetaData = { + + try { + val hostString = options.get(V2Constants.SnappyConnection).get() + val tableName = options.get(V2Constants.TABLE_NAME).get() + val user = options.get(V2Constants.USER).get() + val password = options.get(V2Constants.PASSWORD).get() + val schemaAndTableName = tableName.split("\\.") + + initializeConnection(hostString, user, password) + + val request = new CatalogMetadataRequest() + request.setSchemaName(schemaAndTableName(0)).setNameOrPattern(schemaAndTableName(1)) + val result = getCatalogInformation(request) + + if (result == null){ + throw new TableNotFoundException(schemaAndTableName(0), schemaAndTableName(1)) + } + + val tblSchema = result.catalogTable.tableSchema + val tblType = result.catalogTable.provider + val tblBucketCount = result.catalogTable.numBuckets + val tblBucketOwner = result.catalogTable.bucketOwners + + val primaryKeyColumns = result.catalogTable.primaryKeyColumns + val schema1 = DataType.fromJson(tblSchema).asInstanceOf[StructType] + + val partitioningCols1 = Option(primaryKeyColumns.toString) match { + case Some(str) => str.split(":") + case None => Array.empty[String] + } + + // even though the name below is bucketToServerMapping; for replicated tables + // this returns list of all servers on which replicated table exists + val bucketToServerMapping = if (tblBucketCount > 0) { + Option(SmartConnectorHelper.setBucketToServerMappingInfo(tblBucketCount, tblBucketOwner, + true, true)) + } else { + Option(SmartConnectorHelper.setReplicasToServerMappingInfo( + tblBucketOwner.get(0).getSecondaries, true)) + } + SnappyTableMetaData(tableName, schema1, tblType, tblBucketCount, + partitioningCols1, bucketToServerMapping) + } finally { + closeConnection() + } + } + + def getCatalogInformation(request: CatalogMetadataRequest): CatalogMetadataDetails = { + getCatalogMetaDataStmt.setInt(1, snappydataConstants.CATALOG_GET_TABLE) + val requestBytes = GemFireXDUtils.writeThriftObject(request) + getCatalogMetaDataStmt.setBlob(2, new HarmonySerialBlob(requestBytes)) + getCatalogMetaDataStmt.registerOutParameter(3, java.sql.Types.BLOB) + assert(!getCatalogMetaDataStmt.execute()) + val resultBlob = getCatalogMetaDataStmt.getBlob(3) + val resultLen = resultBlob.length().toInt + val result = new CatalogMetadataDetails() + assert(GemFireXDUtils.readThriftObject(result, resultBlob.getBytes(1, resultLen)) == 0) + resultBlob.free() + result + } + + def getSecurePart(user: String, password: String): String = { + var securePart = "" + if (user != null && !user.isEmpty && password != null && !password.isEmpty) { + securePart = s";user=$user;password=$password" + } + securePart + } +} + +/** + * Metadata for tables + * + * @param tableName table for which metadata is needed + * @param schema table schema (columns) + * @param tableStorageType table type that is ROW/COLUMN etc. + * @param bucketCount 0 for replicated tables otherwise the actual count + * @param partitioningCols partitioning columns + * @param bucketToServerMapping For a partitioned table, this is an array where each entry + * is an ArrayBuffer of tuples and corresponds to a bucket(0th + * entry for bucket#0 and so on). + * Each entry in the ArrayBuffer is in the form of + * (host, jdbcURL) for hosts where bucket exists + * For replicated table the array contains a single ArrayBuffer + * of tuples((host, jdbcURL)) for all hosts where the table exists + */ +case class SnappyTableMetaData(tableName: String, + schema: StructType, tableStorageType: String, + bucketCount: Int, partitioningCols: Seq[String] = Nil, + bucketToServerMapping: Option[Array[ArrayBuffer[(String, String)]]] + = None) diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/JDBCResultSetColumnVector.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/JDBCResultSetColumnVector.scala new file mode 100644 index 0000000000..7f4bb8f2fe --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/JDBCResultSetColumnVector.scala @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.partition + +import java.sql.ResultSet + +import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarArray, ColumnarMap} +import org.apache.spark.unsafe.types.UTF8String + + +class JDBCResultSetColumnVector(dataType: DataType, rs: ResultSet, + columnIndex: Int) extends ColumnVector(dataType) { + + override def close(): Unit = {} + + override def hasNull: Boolean = { rs.getObject(columnIndex) == null } + + override def numNulls(): Int = { + if (rs.getObject(columnIndex) == null) { + 1 + } else { + 0 + } + } + + override def isNullAt(rowId: Int): Boolean = { + rs.getObject(columnIndex) == null + } + + override def getBoolean(rowId: Int): Boolean = { + rs.getBoolean(columnIndex) + } + + override def getByte(rowId: Int): Byte = rs.getByte(columnIndex) + + + override def getShort(rowId: Int): Short = rs.getByte(columnIndex) + + + override def getInt(rowId: Int): Int = rs.getInt(columnIndex) + + override def getLong(rowId: Int): Long = rs.getLong(columnIndex) + + override def getFloat(rowId: Int): Float = rs.getFloat(columnIndex) + + override def getDouble(rowId: Int): Double = rs.getDouble(columnIndex) + + override def getArray(rowId: Int): ColumnarArray = { + throw new IllegalStateException("Not implemented") + } + + override def getMap(ordinal: Int): ColumnarMap = + throw new IllegalStateException("Not implemented") + + override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal = { + val dec = rs.getBigDecimal(columnIndex) + if (dec != null) { + Decimal.apply(dec, precision, scale) + } else { + null + } + } + + override def getUTF8String(rowId: Int): UTF8String = { + UTF8String.fromString(rs.getString(columnIndex)) + } + + override def getBinary(rowId: Int): Array[Byte] = + throw new IllegalStateException("Not implemented") + + override def getChild(ordinal: Int): ColumnVector = + throw new IllegalStateException("Not implemented") +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnBatchReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnBatchReader.scala new file mode 100644 index 0000000000..d79e2a05b7 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnBatchReader.scala @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.partition + +import scala.collection.mutable.ArrayBuffer +import io.snappydata.datasource.v2.driver.{QueryConstructs, SnappyTableMetaData} +import org.apache.spark.sql.sources.v2.reader.DataReader +import org.apache.spark.sql.vectorized.ColumnarBatch + +class SnappyColumnBatchReader (val bucketId: Int, + tableMetaData: SnappyTableMetaData, queryConstructs: QueryConstructs) + extends DataReader[ColumnarBatch] { + + val hostsAndURLs: ArrayBuffer[(String, String)] = tableMetaData. + bucketToServerMapping.get(bucketId) + + val colBufferReader = new SnappyColumnTableReader( + tableMetaData.tableName, queryConstructs.projections, tableMetaData.schema, + queryConstructs.filters, + bucketId, hostsAndURLs) + colBufferReader.initialize + + val rowBufferReader = new SnappyRowTableReader(bucketId, tableMetaData, queryConstructs) + var hasDataInRowBuffer = false + + override def next(): Boolean = { + hasDataInRowBuffer = rowBufferReader.next() + if (hasDataInRowBuffer) { + hasDataInRowBuffer + } else { + colBufferReader.hasNext + } + } + + override def get(): ColumnarBatch = { + if (hasDataInRowBuffer) { + rowBufferReader.getAsColumnarBatch() + } else { + colBufferReader.next + } + } + + override def close(): Unit = { + rowBufferReader.close() + colBufferReader.close + } +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnBatchReaderFactory.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnBatchReaderFactory.scala new file mode 100644 index 0000000000..16bb6702d0 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnBatchReaderFactory.scala @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package io.snappydata.datasource.v2.partition + +import io.snappydata.datasource.v2.ConnectorUtils +import io.snappydata.datasource.v2.driver.{QueryConstructs, SnappyTableMetaData} + +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.vectorized.ColumnarBatch + +class SnappyColumnBatchReaderFactory(val bucketId: Int, + tableMetaData: SnappyTableMetaData, queryConstructs: QueryConstructs) + extends DataReaderFactory[ColumnarBatch] { + + /** + * The preferred locations where the data reader returned by this reader factory can run faster, + * but Spark does not guarantee to run the data reader on these locations. + * The implementations should make sure that it can be run on any location. + * The location is a string representing the host name. + * + * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in + * the returned locations. By default this method returns empty string array, which means this + * task has no location preference. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + override def preferredLocations(): Array[String] = { + ConnectorUtils.preferredLocations(tableMetaData, bucketId) + } + + /** + * Returns a data reader to do the actual reading work. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + override def createDataReader(): DataReader[ColumnarBatch] = { + new SnappyColumnBatchReader(bucketId, tableMetaData, queryConstructs) + } + +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnTableReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnTableReader.scala new file mode 100644 index 0000000000..97555284a6 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyColumnTableReader.scala @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.partition + +import java.io.{ByteArrayOutputStream, ObjectOutputStream} +import java.nio.ByteBuffer +import java.sql.{Connection, PreparedStatement, ResultSet} +import java.util.{Collections, Properties} + +import com.pivotal.gemfirexd.internal.iapi.types.HarmonySerialBlob +import io.snappydata.Constant +import io.snappydata.thrift.internal.ClientPreparedStatement +import org.apache.spark.sql.execution.columnar.encoding.{ColumnEncoding, ColumnStatsSchema} +import org.apache.spark.sql.execution.columnar.{ColumnBatchIteratorOnRS, SharedExternalStoreUtils} +import org.apache.spark.sql.sources.{ConnectionProperties, Filter} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.sql.{SnappyColumnVector, SnappyStoreClientDialect} + +import scala.collection.immutable.HashMap +import scala.collection.mutable.ArrayBuffer + +/** + * + * @param tableName + * @param projection + * @param schema + * @param filters + * @param bucketId + * @param hostList + * @param relDestroyVersion + */ +class SnappyColumnTableReader(tableName: String, projection: StructType, + schema: StructType, filters: Option[Array[Filter]], bucketId: Int, + hostList: ArrayBuffer[(String, String)], + relDestroyVersion: Int = -1) { + + private val identityLong: (AnyRef, Long) => Long = (_: AnyRef, l: Long) => l + + private var columnBatchIterator: ColumnBatchIteratorOnRS = null + private var scan_batchNumRows = 0 + private var batchBuffer: ByteBuffer = null + + private val columnOrdinals: Array[Int] = new Array[Int](projection.length) + + private var conn: Connection = _ + + /** + * + */ + def initialize: Unit = { + setProjectedColumnOrdinals + val connProperties = connectionProperties(hostList) + conn = SharedExternalStoreUtils.getConnection(connProperties, hostList) + val txId = null + // fetch all the column blobs pushing down the filters + val helper = new ColumnBatchScanHelper + val (statement, rs) = helper.prepareScan(conn, txId, + getTableName, columnOrdinals, serializeFilters, bucketId, relDestroyVersion, false) + columnBatchIterator = new ColumnBatchIteratorOnRS(conn, columnOrdinals, statement, rs, + null, bucketId) + } + + /** + * + * @return + */ + def next: ColumnarBatch = { + + // Initialize next columnBatch + val scan_colNextBytes = columnBatchIterator.next() + + // Calculate the number of row in the current batch + val numStatsColumns = ColumnStatsSchema.numStatsColumns(schema.length) + val scan_statsRow = org.apache.spark.sql.collection.SharedUtils + .toUnsafeRow(scan_colNextBytes, numStatsColumns) + + val deltaStatsDecoder = columnBatchIterator.getCurrentDeltaStats + val scan_deltaStatsRow = org.apache.spark.sql.collection.SharedUtils. + toUnsafeRow(deltaStatsDecoder, numStatsColumns) + + val scan_batchNumFullRows = scan_statsRow.getInt(0) + val scan_batchNumDeltaRows = if (scan_deltaStatsRow != null) { + scan_deltaStatsRow.getInt(0) + } else 0 + scan_batchNumRows = scan_batchNumFullRows + scan_batchNumDeltaRows + scan_batchNumRows = scan_batchNumRows - columnBatchIterator.getDeletedRowCount + + // Construct ColumnBatch and return + val columnVectors = new Array[ColumnVector](projection.length) + + // scan_buffer_initialization + var vectorIndex = 0 + for (columnOrdinal <- columnOrdinals) { + batchBuffer = columnBatchIterator.getColumnLob(columnOrdinal - 1) + val field = schema.fields(columnOrdinal - 1) + + val columnDecoder = ColumnEncoding.getColumnDecoder(batchBuffer, field, + identityLong) + + val columnUpdatedDecoder = columnBatchIterator + .getUpdatedColumnDecoder(columnDecoder, field, columnOrdinal - 1) + + val columnVector = new SnappyColumnVector(field.dataType, field, + batchBuffer, scan_batchNumRows, + columnOrdinal, columnDecoder, + columnBatchIterator.getDeletedColumnDecoder, columnUpdatedDecoder) + + columnVectors(vectorIndex) = columnVector + vectorIndex = vectorIndex + 1 + } + + val columBatch = new ColumnarBatch(columnVectors) + columBatch.setNumRows(scan_batchNumRows) + columBatch + } + + /** + * + * @return + */ + def hasNext: Boolean = { + columnBatchIterator.hasNext + } + + /** + * + */ + def close: Unit = { + columnBatchIterator.close() + } + + /** + * Get the actual table name created inside the gemxd layer + * + * @return + */ + private def getTableName: String = { + val dotIndex = tableName.indexOf('.') + val schema = tableName.substring(0, dotIndex) + val table = if (dotIndex > 0) tableName.substring(dotIndex + 1) else tableName + schema + '.' + Constant.SHADOW_SCHEMA_NAME_WITH_SEPARATOR + + table + Constant.SHADOW_TABLE_SUFFIX + } + + /** + * Method takes in projection column schema and calculates ordinals + * of the projected columns + * + * @return + */ + private def setProjectedColumnOrdinals: Unit = { + var ordinal = 0 + for (field <- projection.fields) { + columnOrdinals(ordinal) = schema.fieldIndex(field.name) + 1 + ordinal = ordinal + 1 + } + } + + def getBlob(value: Any, conn: Connection): java.sql.Blob = { + val serializedValue: Array[Byte] = serialize(value) + val blob = conn.createBlob() + blob.setBytes(1, serializedValue) + blob + } + + def serialize(value: Any): Array[Byte] = { + val baos: ByteArrayOutputStream = new ByteArrayOutputStream() + val os: ObjectOutputStream = new ObjectOutputStream(baos) + os.writeObject(value) + os.close() + baos.toByteArray + } + + /** + * Method serializes the passed filters from Spark format to snappy format. + * + * @return + */ + private def serializeFilters: Array[Byte] = { + if (filters.isDefined) { + serialize(filters.get) + } else { + null + } + } + + /** + * Connection Properties. + * @param hostList + * @return + */ + private def connectionProperties(hostList: ArrayBuffer[(String, String)]): + ConnectionProperties = { + + // TODO: Check how to make properties Dynamic [Pradeep] + // Hard-coded properties should be made dynamic. It should be + // passed as a property bag to this method which will be obtained + // rom the original create statement options. + val map: Map[String, String] = HashMap[String, String](("maxActive", "256"), + ("testOnBorrow", "true"), ("maxIdle", "256"), ("validationInterval", "10000"), + ("initialSize", "4"), ("driverClassName", "io.snappydata.jdbc.ClientDriver")) + + val poolProperties = new Properties + poolProperties.setProperty("driver", "io.snappydata.jdbc.ClientDriver") + poolProperties.setProperty("route-query", "false") + + val executorConnProps = new Properties + executorConnProps.setProperty("lob-chunk-size", "33554432") + executorConnProps.setProperty("driver", "io.snappydata.jdbc.ClientDriver") + executorConnProps.setProperty("route-query", "false") + executorConnProps.setProperty("lob-direct-buffers", "true") + + ConnectionProperties(hostList(0)._2, + "io.snappydata.jdbc.ClientDriver", SnappyStoreClientDialect, map, + poolProperties, executorConnProps, false) + + } +} + +// TODO [Pradeep] possibly this code can be reused from the SmartConnectorRDDHelper.prepareScan() +final class ColumnBatchScanHelper { + + def prepareScan(conn: Connection, txId: String, columnTable: String, projection: Array[Int], + serializedFilters: Array[Byte], bucketId: Int, + catalogVersion: Int, useKryoSerializer: Boolean): (PreparedStatement, ResultSet) = { + val pstmt = if (useKryoSerializer) { + conn.prepareStatement("call sys.COLUMN_TABLE_SCAN(?, ?, ?, 1)") + } else { + conn.prepareStatement("call sys.COLUMN_TABLE_SCAN(?, ?, ?, 0)") + } + pstmt.setString(1, columnTable) + pstmt.setString(2, projection.mkString(",")) + // serialize the filters + if ((serializedFilters ne null) && serializedFilters.length > 0) { + pstmt.setBlob(3, new HarmonySerialBlob(serializedFilters)) + } else { + pstmt.setNull(3, java.sql.Types.BLOB) + } + pstmt match { + case clientStmt: ClientPreparedStatement => + val bucketSet = Collections.singleton(Int.box(bucketId)) + clientStmt.setLocalExecutionBucketIds(bucketSet, columnTable, true) + clientStmt.setCatalogVersion(catalogVersion) + clientStmt.setSnapshotTransactionId(txId) + case _ => + pstmt.execute("call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION(" + + s"'$columnTable', '${bucketId}', $catalogVersion)") + if (txId ne null) { + pstmt.execute(s"call sys.USE_SNAPSHOT_TXID('$txId')") + } + } + + val rs = pstmt.executeQuery() + (pstmt, rs) + } +} \ No newline at end of file diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyRowTableReader.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyRowTableReader.scala new file mode 100644 index 0000000000..141da00d3a --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyRowTableReader.scala @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.partition + +import java.io.IOException +import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} +import java.util.Collections + +import scala.collection.mutable.ArrayBuffer + +import io.snappydata.datasource.v2.driver.{QueryConstructs, SnappyTableMetaData} +import io.snappydata.thrift.StatementAttrs +import io.snappydata.thrift.internal.ClientStatement + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.execution.columnar.SharedExternalStoreUtils +import org.apache.spark.sql.sources.JdbcExtendedUtils.quotedName +import org.apache.spark.sql.sources.v2.reader.DataReader +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} + +/** + * Actually fetches the data on executors + * + * @param bucketId bucketId for which this factory is created + * @param tableMetaData metadata of the table being scanned + * @param queryConstructs contains projections and filters + */ +class SnappyRowTableReader(val bucketId: Int, + tableMetaData: SnappyTableMetaData, queryConstructs: QueryConstructs) + extends DataReader[Row] { + + private lazy val conn = jdbcConnection() + private var preparedStatement: PreparedStatement = _ + private var resultSet: ResultSet = _ + private lazy val resultColumnCount = resultSet.getMetaData.getColumnCount + + initiateScan() + + def initiateScan(): Unit = { + setLocalBucketScan() + prepareScanStatement() + } + + private def jdbcConnection(): Connection = { + // from bucketToServerMapping get the collection of hosts where the bucket exists + // (each element in hostsAndURLs ArrayBuffer is in the form of a tuple (host, jdbcURL)) + val hostsAndURLs: ArrayBuffer[(String, String)] = if (tableMetaData.bucketCount == 0) { + tableMetaData.bucketToServerMapping.head.apply(0) + } else { + tableMetaData.bucketToServerMapping.get(bucketId) + } + val connectionURL = hostsAndURLs(0)._2 + DriverManager.getConnection(connectionURL) + } + + private def setLocalBucketScan(): Unit = { + val statement = conn.createStatement() + + val thriftConn = statement match { + case clientStmt: ClientStatement => + val clientConn = clientStmt.getConnection + if (tableMetaData.bucketCount > 0) { // partitioned table + clientConn.setCommonStatementAttributes(ClientStatement.setLocalExecutionBucketIds( + new StatementAttrs(), Collections.singleton(Int.box(bucketId)), + tableMetaData.tableName, true)) + } + clientConn + case _ => null + } + + // TODO: handle case of DRDA driver that is when thriftConn = null + } + + private def prepareScanStatement(): Unit = { + val columnList = queryConstructs.projections.fieldNames.mkString(",") + + val filterWhereClause = if (queryConstructs.whereClause ne null) { + queryConstructs.whereClause + } else { + "" + } + + val sqlText = s"SELECT $columnList FROM" + + s" ${quotedName(tableMetaData.tableName)}$filterWhereClause" + + preparedStatement = conn.prepareStatement(sqlText) + if (queryConstructs.whereClauseArgs ne null) { + SharedExternalStoreUtils.setStatementParameters(preparedStatement, + queryConstructs.whereClauseArgs) + } + resultSet = preparedStatement.executeQuery() + } + + /** + * Proceed to next record, returns false if there is no more records. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + * + * @throws IOException if failure happens during disk/network IO like reading files. + */ + override def next(): Boolean = { + // For the first cut we are assuming that we will get entire data + // in the first call of this method + // We will have to think about breaking into chunks if the data size + // too huge to handle in one fetch + // Check the current smart connector code, to see how row buffers and column + // batches are brought and how filters and column projections are pushed. + // We can exactly mirror the smart connector implementation + // We decode and form a row. We will use our decoder classes + // which are to be moved to a new package. This package needs to be present in + // the classpath + resultSet.next() + } + + /** + * Return the current record. This method should return same value until `next` is called. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + override def get(): Row = { + val values = new Array[Any](resultColumnCount) + for(index <- 0 until resultColumnCount) { + values(index) = resultSet.getObject(index + 1) + } + new GenericRowWithSchema(values, queryConstructs.projections) + } + + /** + * Returns the current record in the result set as a ColumnarBatch + * @return ColumnarBatch of one row + */ + def getAsColumnarBatch(): ColumnarBatch = { + val columnVectors = new Array[ColumnVector](resultColumnCount) + for(index <- 0 until resultColumnCount) { + columnVectors(index) = new JDBCResultSetColumnVector( + queryConstructs.projections.fields(index).dataType, resultSet, index + 1) + } + val columnarBatch = new ColumnarBatch(columnVectors) + columnarBatch.setNumRows(1) + columnarBatch + } + + override def close(): Unit = { + if (resultSet != null) resultSet.close() + if (preparedStatement != null) preparedStatement.close() + if (conn != null) conn.close() + } + +} diff --git a/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyRowTableReaderFactory.scala b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyRowTableReaderFactory.scala new file mode 100644 index 0000000000..b9e02cbcb0 --- /dev/null +++ b/v2connector/src/main/scala/io/snappydata/datasource/v2/partition/SnappyRowTableReaderFactory.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package io.snappydata.datasource.v2.partition + +import io.snappydata.datasource.v2.ConnectorUtils +import io.snappydata.datasource.v2.driver.{QueryConstructs, SnappyTableMetaData} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} + +/** + * Creates {@link SnappyDataReader} that actually fetches data on executors + * Also returns the preferred locations for the bucket id for which + * {@link SnappyDataReader} is responsible for + * @param bucketId bucketId for which this factory is created + * @param tableMetaData metadata of the table being scanned + * @param queryConstructs contains projections and filters + */ +class SnappyRowTableReaderFactory(val bucketId: Int, + tableMetaData: SnappyTableMetaData, queryConstructs: QueryConstructs) + extends DataReaderFactory[Row] { + + /** + * The preferred locations where the data reader returned by this reader factory can run faster, + * but Spark does not guarantee to run the data reader on these locations. + * The implementations should make sure that it can be run on any location. + * The location is a string representing the host name. + * + * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in + * the returned locations. By default this method returns empty string array, which means this + * task has no location preference. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + override def preferredLocations(): Array[String] = { + ConnectorUtils.preferredLocations(tableMetaData, bucketId) + } + + /** + * Returns a data reader to do the actual reading work. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + override def createDataReader(): DataReader[Row] = { + new SnappyRowTableReader(bucketId, tableMetaData, queryConstructs) + } + +} \ No newline at end of file diff --git a/v2connector/src/main/scala/org/apache/spark/sql/SnappyColumnVector.scala b/v2connector/src/main/scala/org/apache/spark/sql/SnappyColumnVector.scala new file mode 100644 index 0000000000..47be7f91ef --- /dev/null +++ b/v2connector/src/main/scala/org/apache/spark/sql/SnappyColumnVector.scala @@ -0,0 +1,223 @@ +/* + * + */ +package org.apache.spark.sql + +import java.nio.ByteBuffer + +import org.apache.spark.sql.execution.columnar.encoding.{ColumnDecoder, ColumnDeleteDecoder, UpdatedColumnDecoderBase} +import org.apache.spark.sql.types.{DataType, Decimal, StructField} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarArray, ColumnarMap} +import org.apache.spark.unsafe.types.UTF8String + +class SnappyColumnVector(dataType: DataType, structField: StructField, + byteBuffer: ByteBuffer, numOfRow: Int, ordinal: Int, + columnDecoder: ColumnDecoder, + deletedColumnDecoder: ColumnDeleteDecoder, + updatedColumnDecoder: UpdatedColumnDecoderBase) + extends ColumnVector(dataType: DataType) { + + var currentNullCount = 0 + var currentDeletedCount = 0 + var nextNullPosition = 0 + + private val arrayOfBytes = if (byteBuffer == null || byteBuffer.isDirect) { + null + } else { + byteBuffer.array + } + + override def close(): Unit = { + // TODO Check for the close operation on the + // ColumnVector, whenever the current columnVector + // finished reading by the upstream spark. + } + + override def hasNull: Boolean = { + columnDecoder.hasNulls + } + + @inline def skipDeletedRows(rowId: Int): Unit = { + if (deletedColumnDecoder != null) { + while (deletedColumnDecoder.deleted(rowId + currentDeletedCount - currentNullCount)) { + currentDeletedCount = currentDeletedCount + 1 + } + } + } + + @inline private def incrementAndGetNextNullPosition: Int = { + currentNullCount = currentNullCount + 1 + nextNullPosition = columnDecoder.findNextNullPosition( + arrayOfBytes, nextNullPosition, currentNullCount) + nextNullPosition + } + + @inline private def setAndGetCurrentNullCount(rowId: Int): Int = { + currentNullCount = columnDecoder.numNulls(arrayOfBytes, + (rowId + currentDeletedCount), + currentNullCount) + currentNullCount + } + + override def numNulls(): Int = { + currentNullCount + } + + override def isNullAt(rowId: Int): Boolean = { + var hasNull = true + if (updatedColumnDecoder == null || + updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount)){ + nextNullPosition = columnDecoder.getNextNullPosition + if (rowId < nextNullPosition || + (rowId == nextNullPosition + 1 && + rowId < incrementAndGetNextNullPosition) || + (rowId != nextNullPosition && (setAndGetCurrentNullCount(rowId) == 0 || + rowId != columnDecoder.getNextNullPosition))) { + hasNull = false + } + } else if (updatedColumnDecoder.readNotNull){ + hasNull = false + } + hasNull + } + + override def getInt(rowId: Int): Int = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readInt + } else { + columnDecoder.readInt(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getBoolean(rowId: Int): Boolean = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readBoolean + } else { + columnDecoder.readBoolean(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getByte(rowId: Int): Byte = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readByte + } else { + columnDecoder.readByte(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getShort(rowId: Int): Short = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readShort + } else { + columnDecoder.readShort(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getLong(rowId: Int): Long = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readLong + } else { + columnDecoder.readLong(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getFloat(rowId: Int): Float = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readFloat + } else { + columnDecoder.readFloat(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getDouble(rowId: Int): Double = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readDouble + } else { + columnDecoder.readDouble(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readDecimal(precision, scale) + } else { + columnDecoder.readDecimal(arrayOfBytes, rowId + currentDeletedCount - + currentNullCount, precision, scale) + } + } + + override def getUTF8String(rowId: Int): UTF8String = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readUTF8String + } else { + columnDecoder.readUTF8String(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getBinary(rowId: Int): Array[Byte] = { + skipDeletedRows(rowId) + if (updatedColumnDecoder != null && + !updatedColumnDecoder.unchanged(rowId + currentDeletedCount - currentNullCount) + && updatedColumnDecoder.readNotNull){ + updatedColumnDecoder.getCurrentDeltaBuffer.readBinary + } else { + columnDecoder.readBinary(arrayOfBytes, rowId + currentDeletedCount - currentNullCount) + } + } + + override def getArray(rowId: Int): ColumnarArray = { + // TODO Handling the Array conversion + // columnDecoder.readArray(arrayOfBytes, rowId) + /* + Error:(65, 28) type mismatch; + found : org.apache.spark.sql.catalyst.util.ArrayData + required: org.apache.spark.sql.vectorized.ColumnarArray + columnDecoder.readArray(arrayOfBytes, rowId) + */ + null + } + + override def getMap(ordinal: Int): ColumnarMap = { + // TODO Handling the Map conversion + /* + Error:(69, 26) type mismatch; + found : org.apache.spark.sql.catalyst.util.MapData + required: org.apache.spark.sql.vectorized.ColumnarMap + columnDecoder.readMap(arrayOfBytes, ordinal) + */ + // columnDecoder.readMap(arrayOfBytes, ordinal) + null + } + + override def getChild(ordinal: Int): ColumnVector = { + // TODO : check for this later + null + } +} diff --git a/v2connector/src/test/resources/log4j.properties b/v2connector/src/test/resources/log4j.properties new file mode 100644 index 0000000000..7b829d9c54 --- /dev/null +++ b/v2connector/src/test/resources/log4j.properties @@ -0,0 +1,117 @@ +# +# Copyright (c) 2018 SnappyData, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# +# Some parts taken from Spark's log4j.properties having license below. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootCategory=INFO, file + +# RollingFile appender +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.append=true +log4j.appender.file.file=snappydata.log +log4j.appender.file.MaxFileSize=100MB +log4j.appender.file.MaxBackupIndex=10000 +log4j.appender.file.layout=io.snappydata.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS zzz} %t %p %c{1}: %m%n + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=io.snappydata.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS zzz} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN +org.spark-project.jetty.LEVEL=WARN +log4j.logger.org.mortbay.jetty=WARN +log4j.logger.org.eclipse.jetty=WARN + +# Some packages are noisy for no good reason. +log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false +log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF + +log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF + +log4j.additivity.hive.log=false +log4j.logger.hive.log=OFF + +log4j.additivity.parquet.hadoop.ParquetRecordReader=false +log4j.logger.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false +log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false +log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF + +log4j.additivity.hive.ql.metadata.Hive=false +log4j.logger.hive.ql.metadata.Hive=OFF + +log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false +log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR + +# Other Spark classes that generate unnecessary logs at INFO level +log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=WARN +log4j.logger.org.apache.spark.ContextCleaner=WARN +log4j.logger.org.apache.spark.MapOutputTracker=WARN +log4j.logger.org.apache.spark.scheduler.TaskSchedulerImpl=WARN +log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=WARN +log4j.logger.org.apache.spark.scheduler.DAGScheduler=WARN +log4j.logger.org.apache.spark.scheduler.TaskSetManager=WARN +log4j.logger.org.apache.spark.scheduler.FairSchedulableBuilder=WARN +log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint=WARN +log4j.logger.org.apache.spark.storage.BlockManagerInfo=WARN +log4j.logger.org.apache.hadoop.hive=WARN +# for all Spark generated code (including ad-hoc UnsafeProjection calls etc) +log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator=WARN +log4j.logger.org.apache.spark.sql.execution.datasources=WARN +log4j.logger.org.apache.spark.scheduler.SnappyTaskSchedulerImpl=WARN +log4j.logger.org.apache.spark.MapOutputTrackerMasterEndpoint=WARN +log4j.logger.org.apache.spark.MapOutputTrackerMaster=WARN +log4j.logger.org.apache.spark.storage.memory.MemoryStore=WARN +log4j.logger.org.apache.spark.MapOutputTrackerWorker=WARN +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR +log4j.logger.org.apache.hadoop.io.compress=WARN +log4j.logger.spark.jobserver.LocalContextSupervisorActor=WARN +log4j.logger.spark.jobserver.JarManager=WARN +log4j.logger.org.apache.spark.sql.hive.HiveClientUtil=WARN +log4j.logger.org.datanucleus=ERROR +# Task logger created in SparkEnv +log4j.logger.org.apache.spark.Task=WARN +log4j.logger.org.apache.spark.sql.catalyst.parser.CatalystSqlParser=WARN + +# for generated code of plans +# log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=DEBUG +# for SnappyData generated code used on store (ComplexTypeSerializer, JDBC inserts ...) +# log4j.logger.org.apache.spark.sql.store.CodeGeneration=DEBUG diff --git a/v2connector/src/test/scala/io/snappydata/BasicDataSourceV2Suite.scala b/v2connector/src/test/scala/io/snappydata/BasicDataSourceV2Suite.scala new file mode 100644 index 0000000000..83c70f8559 --- /dev/null +++ b/v2connector/src/test/scala/io/snappydata/BasicDataSourceV2Suite.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package io.snappydata + +import io.snappydata.datasource.v2.SnappyDataSource +import org.scalatest.ConfigMap + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SparkSession + +class BasicDataSourceV2Suite extends SparkFunSuite { + + var spark: SparkSession = _ + + override def beforeAll(configMap: ConfigMap): Unit = { + spark = SparkSession.builder() + .appName("BasicDataSourceV2Suite") + .master("local[*]") + .getOrCreate() + } + + override def afterAll(configMap: ConfigMap): Unit = { + spark.stop() + } + + test("initialize a datasource") { + // val df = spark.read.format("snappydata") + // .option("snappydata.connection", "localhost:1527" ) + // .option("table", "app.t1") + + val df = spark.read.format(classOf[SnappyDataSource].getName) + .option("snappydata.connection", "localhost:1527" ) + .option("table", "APP.TEST_TABLE") + .option("user", "APP") + .option("password", "APP") + // df.load().select("COL1").collect() + // df.load().select("COL1").collect().foreach(println) + // df.load().count() + + df.load().createOrReplaceTempView("v1") + // val df2 =spark.sql("select avg(COL1) from v1 group by COL1") + // df2.explain() + // df2.collect().foreach(println) + + + val df3 = spark.sql("select id, rank, designation from v1 ")/* where id is null */ + df3.explain(true) + // scalastyle:off + println("numrows = " + df3.count()) + df3.collect().foreach(println) + + } + +} diff --git a/v2connector/src/test/scala/io/snappydata/ColumnBatchDecoderTest.scala b/v2connector/src/test/scala/io/snappydata/ColumnBatchDecoderTest.scala new file mode 100644 index 0000000000..7761f7df01 --- /dev/null +++ b/v2connector/src/test/scala/io/snappydata/ColumnBatchDecoderTest.scala @@ -0,0 +1,66 @@ +/* + * Comment here + */ +package io.snappydata + +import java.sql.DriverManager + +import org.apache.spark.sql.SparkSession + +object ColumnBatchDecoderTest { + + def main(args: Array[String]): Unit = { + + val builder = SparkSession + .builder + .appName("DecoderExample") + .master("local[4]") + + builder.config("spark.snappydata.connection", "localhost:1527") + + args.foreach(prop => { + val params = prop.split("=") + builder.config(params(0), params(1)) + }) + + val spark: SparkSession = builder.getOrCreate + + val conn = DriverManager.getConnection("jdbc:snappydata://localhost[1527]") + val stmt = conn.createStatement() + // stmt.execute("set snappydata.column.maxDeltaRows=1") + stmt.execute("DROP TABLE IF EXISTS TEST_TABLE") + stmt.execute("create table TEST_TABLE (ID long, rank int, designation String NULL ) " + + "using column options (buckets '4', COLUMN_MAX_DELTA_ROWS '1') as select id, 101, " + + " 'somerank' || id from range(20)") + stmt.close() + conn.close() + + /* + val field1 = StructField("ID", LongType, true) + val schema = new StructType(Array[StructField](field1, field2)) + val projection = new StructType(Array[StructField](field1, field2)) + + + for (bucketId <- 0 until 2) { + // scan_colInput = (ColumnBatchIteratorOnRS) + val columnBatchDecoderHelper = new V2ColumnBatchDecoderHelper( + "APP.TEST_TABLE", projection, schema, null, bucketId, + ArrayBuffer("127.0.0.1" -> + "jdbc:snappydata://localhost[1528]/;route-query=false;load-balance=false")) + + columnBatchDecoderHelper.initialize + while (columnBatchDecoderHelper.hasNext) { + val columnBatchSpark = columnBatchDecoderHelper.next + val iterator = columnBatchSpark.rowIterator() + while (iterator.hasNext) { + // scalastyle:off + val row = iterator.next() + println("Row " + row.getLong(0)) + println("Row " + row.getInt(1)) + // println("Row " + row.getInt(2)) + } + } + } + */ + } +}