Skip to content

Commit

Permalink
[paypal#247] [spark] Bump Spark Version to 2.4.7 | Bump Spark Version…
Browse files Browse the repository at this point in the history
… to 2.4.7 along with other stack to align with gcp_dataproc_1.5.x
  • Loading branch information
Dee-Pac committed Nov 6, 2020
1 parent d78326b commit bb696af
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 117 deletions.
27 changes: 9 additions & 18 deletions gimel-dataapi/gimel-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ under the License.
<scope>${packaging.scope}</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
<version>${scala.xml.version}</version>
<scope>${scala.packaging.scope}</scope>
</dependency>
<dependency>
Expand All @@ -82,12 +82,6 @@ under the License.
<groupId>com.paypal.gimel</groupId>
<artifactId>gimel-logger</artifactId>
<version>${gimel.version}-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -97,7 +91,7 @@ under the License.
<dependency>
<groupId>com.paypal.gimel</groupId>
<artifactId>serde-common</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.4.7-SNAPSHOT</version>
<scope>${packaging.scope}</scope>
</dependency>
<dependency>
Expand All @@ -106,12 +100,6 @@ under the License.
<version>${kafka.version}</version>
<scope>${packaging.scope}</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>4.0.0</version>
<scope>${packaging.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
Expand Down Expand Up @@ -197,7 +185,7 @@ under the License.
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>
<!-- Kafka local testing utility needs Netty 3.x at test scope for the minicluster -->
<!-- Kafka local testing utility needs Netty 3.x at test scope for the minicluster -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
Expand All @@ -211,7 +199,7 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId> net.jpountz.lz4</groupId>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
<scope>test</scope>
Expand All @@ -220,16 +208,19 @@ under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down
18 changes: 16 additions & 2 deletions gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,23 @@ under the License.
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_${scala.binary.version}</artifactId>
<artifactId>elasticsearch-spark-20_${elastic.scala.binary.version}</artifactId>
<version>${elasticsearch.version}</version>
<scope>${packaging.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
Expand Down Expand Up @@ -78,7 +92,7 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<version>${maven.shade.plugin.version}</version>
<configuration>
<relocations>
<relocation>
Expand Down
2 changes: 1 addition & 1 deletion gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<version>${maven.shade.plugin.version}</version>
<configuration>
<relocations>
<relocation>
Expand Down
15 changes: 12 additions & 3 deletions gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,19 @@ under the License.
</dependency>
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_${scala.binary.version}</artifactId>
<artifactId>spark-sftp_${springml.scala.binary.version}</artifactId>
<version>${spark.sftp.version}</version>
<scope>${packaging.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
Expand All @@ -53,12 +63,11 @@ under the License.

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<version>${maven.shade.plugin.version}</version>
<configuration>
<relocations>
<relocation>
Expand Down
27 changes: 0 additions & 27 deletions gimel-dataapi/gimel-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,6 @@ under the License.
<version>${scala.version}</version>
<scope>${scala.packaging.scope}</scope>
</dependency>
<dependency>
<groupId>com.paypal.gimel</groupId>
<artifactId>gimel-logging_${gimel.logging.spark.binary.version}</artifactId>
<version>${gimel.logging.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
</exclusion>
<exclusion>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
Expand Down
6 changes: 3 additions & 3 deletions gimel-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
<dependencies>
<!--Provided dependencies-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
<version>${scala.xml.version}</version>
<scope>${scala.packaging.scope}</scope>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions gimel-serde/gimel-deserializers/generic-deserializers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<dependency>
<groupId>com.paypal.gimel</groupId>
<artifactId>serde-common</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.4.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand All @@ -43,7 +43,7 @@
<dependency>
<groupId>com.paypal.gimel</groupId>
<artifactId>serde-common</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.4.7-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion gimel-serde/gimel-serializers/generic-serializers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<dependency>
<groupId>com.paypal.gimel</groupId>
<artifactId>serde-common</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.4.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down
18 changes: 9 additions & 9 deletions gimel-serde/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<configuration>
<!-- Avro string compiled to java.lang.String instead of CharSequence -->
<stringType>String</stringType>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.avro</groupId>-->
<!-- <artifactId>avro-maven-plugin</artifactId>-->
<!-- <configuration>-->
<!-- &lt;!&ndash; Avro string compiled to java.lang.String instead of CharSequence &ndash;&gt;-->
<!-- <stringType>String</stringType>-->
<!-- </configuration>-->
<!-- </plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down Expand Up @@ -164,7 +164,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<version>${maven.shade.plugin.version}</version>
<configuration>
<relocations>
<!--<relocation>-->
Expand Down
18 changes: 12 additions & 6 deletions gimel-serde/serde-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>4.0.0</version>
<scope>${packaging.scope}</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.databricks</groupId>-->
<!-- <artifactId>spark-avro_2.11</artifactId>-->
<!-- <version>4.0.0</version>-->
<!-- <scope>${packaging.scope}</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
<scope>${packaging.scope}</scope>
</dependency>
<dependency>
<groupId>io.spray</groupId>
<artifactId>spray-json_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.collection.mutable

import com.databricks.spark.avro.SchemaConverters._
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions._
import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.json.JsValue
Expand Down Expand Up @@ -109,6 +108,26 @@ object AvroUtils extends Serializable {
newGenericRec
}

/**
* Takes An Avro Schema String and Returns the list of field names in the "fields" list
* @param schemaString
* @return List(fieldNames)
*/
def getTopLevelFieldNamesFromAvro(schemaString: String): Seq[String] = {
// Parse as JsValue
val schemaAsJsVal = schemaString.parseJson
// Convert to JsObject
val schemaAsJsObject = schemaAsJsVal.asJsObject
// Get the Map of each element & Value
val schemaElementsMap: Map[String, JsValue] = schemaAsJsObject.fields
// These fields will be added with "to-add" fields
val schemaFields: Seq[JsValue] = schemaAsJsObject.getFields("fields").head.convertTo[Seq[JsValue]]
schemaFields.map{ x =>
x.asJsObject.fields.head._2.toString().replace(""""""", "")
}

}

/**
* Adds additional fields to the Avro Schema
*
Expand Down Expand Up @@ -189,39 +208,24 @@ object AvroUtils extends Serializable {
*/
def getDeserializedDataFrame(dataframe: DataFrame, columnToDeserialize: String, avroSchemaString: String): DataFrame = {
val originalFields: Array[String] = dataframe.columns.filter(field => field != columnToDeserialize)
val newAvroSchemaString = addAdditionalFieldsToSchema(originalFields.toList, avroSchemaString)

try {
dataframe.map { eachRow =>
val recordToDeserialize: Array[Byte] = eachRow.getAs(columnToDeserialize).asInstanceOf[Array[Byte]]
val originalColumnsMap = originalFields.map {
field => {
val index = eachRow.fieldIndex(field)
if (eachRow.isNullAt(index)) {
(field -> "null")
} else {
(field -> eachRow.getAs(field).toString)
}
}
}
val deserializedGenericRecord: GenericRecord = bytesToGenericRecordWithSchemaRecon(recordToDeserialize, avroSchemaString, avroSchemaString)
val newDeserializedGenericRecord: GenericRecord = copyToGenericRecord(deserializedGenericRecord, avroSchemaString, newAvroSchemaString)
originalColumnsMap.foreach { kv => newDeserializedGenericRecord.put(kv._1, kv._2) }
val avroSchemaObj: Schema = (new Schema.Parser).parse(newAvroSchemaString)
val converter = AvroToSQLSchemaConverter.createConverterToSQL(avroSchemaObj)
converter(newDeserializedGenericRecord).asInstanceOf[Row]
} {
val avroSchema: Schema = (new Schema.Parser).parse(newAvroSchemaString)
val schemaType: SchemaType = toSqlType(avroSchema)
val encoder = RowEncoder(schemaType.dataType.asInstanceOf[StructType])
encoder
}.toDF
} catch {
case ex: Throwable => {
ex.printStackTrace()
throw ex
}
}
logger.debug(s"Original Fields \n${originalFields}")
logger.debug(s"schema \n${avroSchemaString}")
val fieldsInAvro = getTopLevelFieldNamesFromAvro(avroSchemaString )
logger.debug(s"Avro Fields \n${fieldsInAvro}")
logger.debug(s"**************** schema before deserialize ************************")
dataframe.printSchema()
val op = dataframe.withColumn("avro", from_avro(col(columnToDeserialize), avroSchemaString) )
logger.debug(s"**************** schema after deserialize ************************")
op.printSchema()
op.show(2)
logger.debug(s"**************** Fields in avro that will be projected in dataFrame ************************")
logger.debug(fieldsInAvro.mkString(","))
val colsToSelect: Seq[String] = fieldsInAvro.map{ x => s"avro.${x}"}
logger.debug(colsToSelect.mkString(","))
val k = op.select(colsToSelect.head, colsToSelect.tail: _*)
k

}

/**
Expand Down
Loading

0 comments on commit bb696af

Please sign in to comment.