diff --git a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/DataApiWrapper.scala b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/DataApiWrapper.scala index 23894875..8fe1acbd 100644 --- a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/DataApiWrapper.scala +++ b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/DataApiWrapper.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration.Duration import scala.util.control.NonFatal import io.github.spark_redshift_community.spark.redshift.Parameters.MergedParameters import io.github.spark_redshift_community.spark.redshift.TimestampNTZTypeExtractor -import io.github.spark_redshift_community.spark.redshift.pushdown.{BooleanVariable, ByteVariable, ConstantString, DoubleVariable, FloatVariable, IntVariable, LongVariable, RedshiftSQLStatement, ShortVariable, StatementElement, StringVariable} +import io.github.spark_redshift_community.spark.redshift.pushdown.{BinaryVariable, BooleanVariable, ByteVariable, ConstantString, DoubleVariable, FloatVariable, IntVariable, LongVariable, RedshiftSQLStatement, ShortVariable, StatementElement, StringVariable} import org.slf4j.LoggerFactory import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory} @@ -365,6 +365,7 @@ private[redshift] class DataApiWrapper extends RedshiftWrapper with Serializable case "float8" => DoubleType case "char" => StringType case "varchar" => StringType + case "varbyte" => BinaryType case "bool" => BooleanType case "date" => DateType case "time" => TimestampType @@ -473,6 +474,7 @@ private[redshift] class DataApiWrapper extends RedshiftWrapper with Serializable case ele: DoubleVariable => QueryParameter(name, ele.variable, java.sql.Types.DOUBLE) case ele: BooleanVariable => QueryParameter(name, ele.variable, java.sql.Types.BOOLEAN) case ele: ByteVariable => QueryParameter(name, ele.variable, java.sql.Types.TINYINT) + case ele: BinaryVariable => QueryParameter(name, ele.variable, java.sql.Types.LONGVARBINARY) case _ => throw new IllegalArgumentException( "Unexpected Element Type: " + element.getClass.getName diff --git a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/JDBCWrapper.scala b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/JDBCWrapper.scala index 9088e660..2ba4e623 100644 --- a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/JDBCWrapper.scala +++ b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/JDBCWrapper.scala @@ -414,6 +414,9 @@ private[redshift] class JDBCWrapper extends RedshiftWrapper with Serializable { case java.sql.Types.VARCHAR => StringType case java.sql.Types.LONGVARCHAR => StringType + // Binary types + case java.sql.Types.LONGVARBINARY => BinaryType + // Datetime Types case java.sql.Types.DATE => DateType case java.sql.Types.TIME => TimestampType diff --git a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftResults.scala b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftResults.scala index bbc8947f..c50f5076 100644 --- a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftResults.scala +++ b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftResults.scala @@ -27,10 +27,12 @@ private[redshift] abstract class RedshiftResults() { def getInt(columnIndex: Int): Int def getLong(columnIndex: Int): Long def getString(columnIndex: Int): String + def getBinary(columnIndex: Int): Array[Byte] def getInt(columnLabel: String): Int def getLong(columnLabel: String): Long def getString(columnLabel: String): String + def getBinary(columnLabel: String): Array[Byte] } @@ -61,6 +63,10 @@ private[redshift] case class DataApiResults(results: GetStatementResultResult) curr.get(columnIndex - 1).getStringValue } + override def getBinary(columnIndex: Int): Array[Byte] = { + curr.get(columnIndex - 1).getBlobValue.array() + } + override def getInt(columnLabel: String): Int = { curr.get(getIndex(columnLabel)).getLongValue.asInstanceOf[Int] } @@ -73,6 +79,10 @@ private[redshift] case class DataApiResults(results: GetStatementResultResult) curr.get(getIndex(columnLabel)).getStringValue } + def getBinary(columnLabel: String): Array[Byte] = { + curr.get(getIndex(columnLabel)).getBlobValue.array() + } + private def getIndex(columnLabel: String): Int = { results.getColumnMetadata.asScala.indexWhere(col => col.getLabel == columnLabel) } @@ -95,6 +105,10 @@ private[redshift] case class JDBCResults(results: ResultSet) extends RedshiftRes results.getString(columnIndex) } + override def getBinary(columnIndex: Int): Array[Byte] = { + results.getBytes(columnIndex) + } + override def getInt(columnLabel: String): Int = { results.getInt(columnLabel) } @@ -106,4 +120,8 @@ private[redshift] case class JDBCResults(results: ResultSet) extends RedshiftRes override def getString(columnLabel: String): String = { results.getString(columnLabel) } + + override def getBinary(columnLabel: String): Array[Byte] = { + results.getBytes(columnLabel) + } } diff --git a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftWrapper.scala b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftWrapper.scala index e6acb93c..cebcc6d9 100644 --- a/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftWrapper.scala +++ b/src/main/scala/io/github/spark_redshift_community/spark/redshift/data/RedshiftWrapper.scala @@ -56,6 +56,7 @@ private[redshift] class RedshiftWrapper extends Serializable { } else { s"VARCHAR(MAX)" } + case BinaryType => "VARBYTE" case TimestampType => if (redshift.legacyTimestampHandling) "TIMESTAMP" else "TIMESTAMPTZ" case TimestampNTZTypeExtractor(_) if !redshift.legacyTimestampHandling => "TIMESTAMP" case DateType => "DATE" diff --git a/src/main/scala/io/github/spark_redshift_community/spark/redshift/pushdown/RedshiftSQLStatement.scala b/src/main/scala/io/github/spark_redshift_community/spark/redshift/pushdown/RedshiftSQLStatement.scala index d0b572d5..5de4affc 100644 --- a/src/main/scala/io/github/spark_redshift_community/spark/redshift/pushdown/RedshiftSQLStatement.scala +++ b/src/main/scala/io/github/spark_redshift_community/spark/redshift/pushdown/RedshiftSQLStatement.scala @@ -165,3 +165,6 @@ private[redshift] case class BooleanVariable(override val variable: Option[Boole private[redshift] case class ByteVariable(override val variable: Option[Byte]) extends VariableElement[Byte] + +private[redshift] case class BinaryVariable(override val variable: Option[Array[Byte]]) + extends VariableElement[Array[Byte]] \ No newline at end of file