Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import io.github.spark_redshift_community.spark.redshift.Parameters.MergedParameters
import io.github.spark_redshift_community.spark.redshift.TimestampNTZTypeExtractor
import io.github.spark_redshift_community.spark.redshift.pushdown.{BooleanVariable, ByteVariable, ConstantString, DoubleVariable, FloatVariable, IntVariable, LongVariable, RedshiftSQLStatement, ShortVariable, StatementElement, StringVariable}
import io.github.spark_redshift_community.spark.redshift.pushdown.{BinaryVariable, BooleanVariable, ByteVariable, ConstantString, DoubleVariable, FloatVariable, IntVariable, LongVariable, RedshiftSQLStatement, ShortVariable, StatementElement, StringVariable}
import org.slf4j.LoggerFactory

import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory}
Expand Down Expand Up @@ -365,6 +365,7 @@ private[redshift] class DataApiWrapper extends RedshiftWrapper with Serializable
case "float8" => DoubleType
case "char" => StringType
case "varchar" => StringType
case "varbyte" => BinaryType
case "bool" => BooleanType
case "date" => DateType
case "time" => TimestampType
Expand Down Expand Up @@ -473,6 +474,7 @@ private[redshift] class DataApiWrapper extends RedshiftWrapper with Serializable
case ele: DoubleVariable => QueryParameter(name, ele.variable, java.sql.Types.DOUBLE)
case ele: BooleanVariable => QueryParameter(name, ele.variable, java.sql.Types.BOOLEAN)
case ele: ByteVariable => QueryParameter(name, ele.variable, java.sql.Types.TINYINT)
case ele: BinaryVariable => QueryParameter(name, ele.variable, java.sql.Types.LONGVARBINARY)
case _ =>
throw new IllegalArgumentException(
"Unexpected Element Type: " + element.getClass.getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ private[redshift] class JDBCWrapper extends RedshiftWrapper with Serializable {
case java.sql.Types.VARCHAR => StringType
case java.sql.Types.LONGVARCHAR => StringType

// Binary types
case java.sql.Types.LONGVARBINARY => BinaryType

// Datetime Types
case java.sql.Types.DATE => DateType
case java.sql.Types.TIME => TimestampType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ private[redshift] abstract class RedshiftResults() {
def getInt(columnIndex: Int): Int
def getLong(columnIndex: Int): Long
def getString(columnIndex: Int): String
def getBinary(columnIndex: Int): Array[Byte]

def getInt(columnLabel: String): Int
def getLong(columnLabel: String): Long
def getString(columnLabel: String): String
def getBinary(columnLabel: String): Array[Byte]

}

Expand Down Expand Up @@ -61,6 +63,10 @@ private[redshift] case class DataApiResults(results: GetStatementResultResult)
curr.get(columnIndex - 1).getStringValue
}

override def getBinary(columnIndex: Int): Array[Byte] = {
curr.get(columnIndex - 1).getBlobValue.array()
}

override def getInt(columnLabel: String): Int = {
curr.get(getIndex(columnLabel)).getLongValue.asInstanceOf[Int]
}
Expand All @@ -73,6 +79,10 @@ private[redshift] case class DataApiResults(results: GetStatementResultResult)
curr.get(getIndex(columnLabel)).getStringValue
}

def getBinary(columnLabel: String): Array[Byte] = {
curr.get(getIndex(columnLabel)).getBlobValue.array()
}

private def getIndex(columnLabel: String): Int = {
results.getColumnMetadata.asScala.indexWhere(col => col.getLabel == columnLabel)
}
Expand All @@ -95,6 +105,10 @@ private[redshift] case class JDBCResults(results: ResultSet) extends RedshiftRes
results.getString(columnIndex)
}

override def getBinary(columnIndex: Int): Array[Byte] = {
results.getBytes(columnIndex)
}

override def getInt(columnLabel: String): Int = {
results.getInt(columnLabel)
}
Expand All @@ -106,4 +120,8 @@ private[redshift] case class JDBCResults(results: ResultSet) extends RedshiftRes
override def getString(columnLabel: String): String = {
results.getString(columnLabel)
}

override def getBinary(columnLabel: String): Array[Byte] = {
results.getBytes(columnLabel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private[redshift] class RedshiftWrapper extends Serializable {
} else {
s"VARCHAR(MAX)"
}
case BinaryType => "VARBYTE"
case TimestampType => if (redshift.legacyTimestampHandling) "TIMESTAMP" else "TIMESTAMPTZ"
case TimestampNTZTypeExtractor(_) if !redshift.legacyTimestampHandling => "TIMESTAMP"
case DateType => "DATE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,6 @@ private[redshift] case class BooleanVariable(override val variable: Option[Boole

private[redshift] case class ByteVariable(override val variable: Option[Byte])
extends VariableElement[Byte]

private[redshift] case class BinaryVariable(override val variable: Option[Array[Byte]])
extends VariableElement[Array[Byte]]