-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Closes #1426: Run IIS experiments by relying on spark 3.4 version
WIP. Replacing scala source code in iis-common module with java-based counterpart. Simplifying the code, aligning other classes with changes in avro read/write code.
- Loading branch information
1 parent
138b203
commit 3b6a565
Showing
21 changed files
with
198 additions
and
453 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package eu.dnetlib.iis.common.spark.avro; | ||
|
||
import java.io.Serializable; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.spark.sql.Dataset; | ||
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.SparkSession; | ||
import org.apache.spark.sql.avro.SchemaConverters; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* Support for reading avro datastores as dataframes. | ||
* | ||
* @author mhorst | ||
* | ||
*/ | ||
public class AvroDataFrameReader implements Serializable { | ||
|
||
private static final long serialVersionUID = 4858427693578954728L; | ||
|
||
private final SparkSession sparkSession; | ||
|
||
/** | ||
* Default constructor accepting spark session as parameter. | ||
* @param sparkSession spark session | ||
*/ | ||
public AvroDataFrameReader(SparkSession sparkSession) { | ||
this.sparkSession = sparkSession; | ||
} | ||
|
||
/** | ||
* @param path Path to the data store | ||
* @param avroSchema Avro schema of the records | ||
* @return DataFrame with data read from given path | ||
*/ | ||
public Dataset<Row> read(String path, Schema avroSchema) { | ||
Dataset<Row> in = sparkSession.read().format("avro").option("avroSchema", avroSchema.toString()).load(path); | ||
return sparkSession.createDataFrame(in.rdd(), (StructType) SchemaConverters.toSqlType(avroSchema).dataType()); | ||
} | ||
} |
38 changes: 38 additions & 0 deletions
38
iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package eu.dnetlib.iis.common.spark.avro; | ||
|
||
import java.io.Serializable; | ||
|
||
import org.apache.avro.specific.SpecificRecordBase; | ||
import org.apache.spark.api.java.function.MapFunction; | ||
import org.apache.spark.sql.Dataset; | ||
import org.apache.spark.sql.Encoders; | ||
import org.apache.spark.sql.Row; | ||
|
||
import com.fasterxml.jackson.databind.DeserializationFeature; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
/** | ||
* Support for dataframes of avro types. | ||
* | ||
* @author mhorst | ||
*/ | ||
public class AvroDataFrameSupport implements Serializable { | ||
|
||
private static final long serialVersionUID = -3980871922050483460L; | ||
|
||
private AvroDataFrameSupport() { | ||
} | ||
|
||
/** | ||
* @param <T> type of elements | ||
* @param dataFrame seq with elements for the dataframe | ||
* @param clazz class of objects in the dataset | ||
* @return Dataset of objects corresponding to records in the given dataframe | ||
*/ | ||
public static <T extends SpecificRecordBase> Dataset<T> toDS(final Dataset<Row> dataFrame, final Class<T> clazz) { | ||
final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, | ||
false); | ||
return (Dataset<T>) dataFrame.toJSON().map((MapFunction<String, T>) json -> (T) mapper.readValue(json, clazz), | ||
Encoders.kryo((Class<T>) clazz)); | ||
} | ||
} |
38 changes: 38 additions & 0 deletions
38
iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package eu.dnetlib.iis.common.spark.avro; | ||
|
||
import java.io.Serializable; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.spark.sql.Dataset; | ||
import org.apache.spark.sql.Row; | ||
|
||
/** | ||
* Support for writing dataframes of avro types. | ||
* | ||
* @author mhorst | ||
*/ | ||
public class AvroDataFrameWriter implements Serializable { | ||
|
||
private static final long serialVersionUID = 7842491849433906246L; | ||
|
||
private final Dataset<Row> dataFrame; | ||
|
||
/** | ||
* Default constructor accepting DataFrame. | ||
* | ||
* @param dataFrame DataFrame of avro type | ||
*/ | ||
public AvroDataFrameWriter(Dataset<Row> dataFrame) { | ||
this.dataFrame = dataFrame; | ||
} | ||
|
||
/** | ||
* Writes a dataframe as avro datastore using avro schema. | ||
* @param path path to the data store | ||
* @param avroSchema Avro schema of the records | ||
*/ | ||
public void write(String path, Schema avroSchema) { | ||
dataFrame.write().format("avro").option("avroSchema", avroSchema.toString()) | ||
.option("compression", "uncompressed").save(path); | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package eu.dnetlib.iis.common.spark.avro; | ||
|
||
import java.io.Serializable; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.specific.SpecificRecordBase; | ||
import org.apache.spark.sql.Dataset; | ||
import org.apache.spark.sql.SparkSession; | ||
|
||
/** | ||
* Support for reading avro datastores as datasets. | ||
* | ||
* @author mhorst | ||
*/ | ||
public class AvroDatasetReader implements Serializable { | ||
|
||
private static final long serialVersionUID = 4858427693578954728L; | ||
|
||
private final SparkSession sparkSession; | ||
|
||
/** | ||
* Default constructor accepting spark session as parameter. | ||
* @param sparkSession spark session | ||
*/ | ||
public AvroDatasetReader(SparkSession sparkSession) { | ||
this.sparkSession = sparkSession; | ||
} | ||
|
||
/** | ||
* Reads avro datastore as Spark dataset using avro schema and kryo encoder. | ||
* | ||
* NOTE: due to inability to use bean-based encoder for avro types this method uses kryo encoder; | ||
* for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects. | ||
* | ||
* @param <T> type of objects in the dataset | ||
* @param path path to the data store | ||
* @param avroSchema Avro schema of the records | ||
* @param clazz class of objects in the dataset | ||
* @return Dataset with data read from given path | ||
*/ | ||
public <T extends SpecificRecordBase> Dataset<T> read(String path, Schema avroSchema, Class<T> clazz) { | ||
return AvroDataFrameSupport.toDS(new AvroDataFrameReader(sparkSession).read(path, avroSchema), clazz); | ||
} | ||
} |
40 changes: 0 additions & 40 deletions
40
iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala
This file was deleted.
Oops, something went wrong.
67 changes: 0 additions & 67 deletions
67
iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala
This file was deleted.
Oops, something went wrong.
38 changes: 0 additions & 38 deletions
38
iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala
This file was deleted.
Oops, something went wrong.
29 changes: 0 additions & 29 deletions
29
iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.