diff --git a/iis-common/pom.xml b/iis-common/pom.xml
index 53c37a3a8..af4504cc2 100644
--- a/iis-common/pom.xml
+++ b/iis-common/pom.xml
@@ -70,13 +70,6 @@
hadoop-common
-
- org.scala-lang
- scala-library
- ${scala.version}
- provided
-
-
org.apache.spark
spark-core_2.12
@@ -169,29 +162,6 @@
-
- net.alchim31.maven
- scala-maven-plugin
- 3.2.2
-
-
- scala-compile-first
- process-resources
-
- add-source
- compile
-
-
-
- scala-test-compile
- process-test-resources
-
- testCompile
-
-
-
-
-
org.apache.avro
@@ -257,8 +227,4 @@
-
- 2.12.14
-
-
diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java
new file mode 100644
index 000000000..36ec7ad9b
--- /dev/null
+++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java
@@ -0,0 +1,41 @@
+package eu.dnetlib.iis.common.spark.avro;
+
+import java.io.Serializable;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Support for reading avro datastores as dataframes.
+ *
+ * @author mhorst
+ *
+ */
+public class AvroDataFrameReader implements Serializable {
+
+ private static final long serialVersionUID = 4858427693578954728L;
+
+ private final SparkSession sparkSession;
+
+ /**
+ * Default constructor accepting spark session as parameter.
+ * @param sparkSession spark session
+ */
+ public AvroDataFrameReader(SparkSession sparkSession) {
+ this.sparkSession = sparkSession;
+ }
+
+ /**
+ * @param path Path to the data store
+ * @param avroSchema Avro schema of the records
+ * @return DataFrame with data read from given path
+ */
+ public Dataset read(String path, Schema avroSchema) {
+ Dataset in = sparkSession.read().format("avro").option("avroSchema", avroSchema.toString()).load(path);
+ return sparkSession.createDataFrame(in.rdd(), (StructType) SchemaConverters.toSqlType(avroSchema).dataType());
+ }
+}
diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java
new file mode 100644
index 000000000..22156ac6f
--- /dev/null
+++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java
@@ -0,0 +1,38 @@
+package eu.dnetlib.iis.common.spark.avro;
+
+import java.io.Serializable;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Support for dataframes of avro types.
+ *
+ * @author mhorst
+ */
+public class AvroDataFrameSupport implements Serializable {
+
+ private static final long serialVersionUID = -3980871922050483460L;
+
+ private AvroDataFrameSupport() {
+ }
+
+ /**
+ * @param type of elements
+ * @param dataFrame seq with elements for the dataframe
+ * @param clazz class of objects in the dataset
+ * @return Dataset of objects corresponding to records in the given dataframe
+ */
+ public static Dataset toDS(final Dataset dataFrame, final Class clazz) {
+ final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+ false);
+ return (Dataset) dataFrame.toJSON().map((MapFunction) json -> (T) mapper.readValue(json, clazz),
+ Encoders.kryo((Class) clazz));
+ }
+}
diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java
new file mode 100644
index 000000000..c90d62e86
--- /dev/null
+++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java
@@ -0,0 +1,38 @@
+package eu.dnetlib.iis.common.spark.avro;
+
+import java.io.Serializable;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * Support for writing dataframes of avro types.
+ *
+ * @author mhorst
+ */
+public class AvroDataFrameWriter implements Serializable {
+
+ private static final long serialVersionUID = 7842491849433906246L;
+
+ private final Dataset dataFrame;
+
+ /**
+ * Default constructor accepting DataFrame.
+ *
+ * @param dataFrame DataFrame of avro type
+ */
+ public AvroDataFrameWriter(Dataset dataFrame) {
+ this.dataFrame = dataFrame;
+ }
+
+ /**
+ * Writes a dataframe as avro datastore using avro schema.
+ * @param path path to the data store
+ * @param avroSchema Avro schema of the records
+ */
+ public void write(String path, Schema avroSchema) {
+ dataFrame.write().format("avro").option("avroSchema", avroSchema.toString())
+ .option("compression", "uncompressed").save(path);
+ }
+}
diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java
new file mode 100644
index 000000000..72b0ba164
--- /dev/null
+++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java
@@ -0,0 +1,43 @@
+package eu.dnetlib.iis.common.spark.avro;
+
+import java.io.Serializable;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Support for reading avro datastores as datasets.
+ *
+ * @author mhorst
+ */
+public class AvroDatasetReader implements Serializable {
+
+ private static final long serialVersionUID = 4858427693578954728L;
+
+ private final SparkSession sparkSession;
+
+ /**
+ * Default constructor accepting spark session as parameter.
+ * @param sparkSession spark session
+ */
+ public AvroDatasetReader(SparkSession sparkSession) {
+ this.sparkSession = sparkSession;
+ }
+
+ /**
+ * Reads avro datastore as Spark dataset using avro schema and kryo encoder.
+ *
+ * NOTE: due to inability to use bean-based encoder for avro types this method uses kryo encoder;
+ * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
+ *
+ * @param type of objects in the dataset
+ * @param path path to the data store
+ * @param avroSchema Avro schema of the records
+ * @param clazz class of objects in the dataset
+ * @return Dataset with data read from given path
+ */
+ public Dataset read(String path, Schema avroSchema, Class clazz) {
+ return AvroDataFrameSupport.toDS(new AvroDataFrameReader(sparkSession).read(path, avroSchema), clazz);
+ }
+}
diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala
deleted file mode 100644
index bc4540c73..000000000
--- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro
-
-import org.apache.avro.Schema
-import org.apache.spark.sql.avro.SchemaConverters
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SparkSession}
-
-/**
- * Support for reading avro datastores as dataframes.
- *
- * @param spark SparkSession instance.
- */
-class AvroDataFrameReader(val spark: SparkSession) extends Serializable {
-
- /**
- * Reads avro datastore as Spark dataframe using SQL schema.
- *
- * @param path Path to the datastore.
- * @param schema SQL schema of the records.
- * @return DataFrame with data read from given path.
- */
- def read(path: String, schema: StructType): DataFrame = {
- read(path, SchemaConverters.toAvroType(schema))
- }
-
- /**
- * Reads avro datastore as Spark dataframe using avro schema.
- *
- * @param path Path to the data store.
- * @param avroSchema Avro schema of the records.
- * @return DataFrame with data read from given path.
- */
- def read(path: String, avroSchema: Schema): DataFrame = {
- val in = spark.read
- .format("avro")
- .option("avroSchema", avroSchema.toString)
- .load(path)
- spark.createDataFrame(in.rdd, SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType])
- }
-}
diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala
deleted file mode 100644
index 3ad0f7a1d..000000000
--- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro
-
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import org.apache.avro.Schema
-import org.apache.avro.specific.SpecificRecordBase
-import org.apache.spark.sql._
-import org.apache.spark.sql.avro._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.types.StructType
-
-import scala.collection.JavaConverters._
-
-/**
- * Support for dataframes of avro types.
- *
- * @param spark SparkSession instance.
- */
-class AvroDataFrameSupport(val spark: SparkSession) extends Serializable {
-
- /**
- * Creates a dataframe from a given collection.
- *
- * @param data List with elements for the dataframe.
- * @param avroSchema Avro schema of the elements.
- * @tparam T Type of elements.
- * @return DataFrame containing data from the given list.
- */
-// def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = {
-// createDataFrame(data.asScala, avroSchema)
-// }
-
- /**
- * Creates a dataframe from a given collection.
- *
- * @param data Seq with elements for the dataframe.
- * @param avroSchema Avro schema of the elements.
- * @tparam T Type of elements.
- * @return DataFrame containing data from the given seq.
- */
-// def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = {
-// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
-// val encoder = RowEncoder.apply(rowSchema).resolveAndBind()
-// val deserializer = new AvroDeserializer(avroSchema, rowSchema)
-// val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
-// spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema)
-// }
-
- /**
- * Creates a dataset from given dataframe using kryo encoder.
- *
- * NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder;
- * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
- *
- * @param df DataFrame to be converted to a dataset.
- * @param clazz Class of objects in the dataset.
- * @tparam T Type of objects in the dataset.
- * @return Dataset of objects corresponding to records in the given dataframe.
- */
- def toDS[T <: SpecificRecordBase](df: DataFrame, clazz: Class[T]): Dataset[T] = {
- implicit val encoder: Encoder[T] = Encoders.kryo(clazz)
- val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
- df
- .toJSON
- .map(json => mapper.readValue(json, clazz))
- }
-}
diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala
deleted file mode 100644
index ca31204fd..000000000
--- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro
-
-import org.apache.avro.Schema
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.avro.SchemaConverters
-
-/**
- * Support for writing dataframes of avro types.
- *
- * @param df DataFrame of avro type.
- */
-class AvroDataFrameWriter(df: DataFrame) extends Serializable {
-
- /**
- * Writes a dataframe as avro datastore using avro schema generated from sql schema.
- *
- * @param path Path to the data store.
- * @return
- */
- def write(path: String): Unit = {
- write(path, SchemaConverters.toAvroType(df.schema))
- }
-
- /**
- * Writes a dataframe as avro datastore using avro schema.
- *
- * @param path Path to the data store.
- * @param avroSchema Avro schema of the records.
- */
- def write(path: String, avroSchema: Schema): Unit = {
- df
- .write
- .format("avro")
- .option("avroSchema", avroSchema.toString)
- .option("compression", "uncompressed")
- .save(path)
- }
-}
diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala
deleted file mode 100644
index cb17918b1..000000000
--- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro
-
-import org.apache.avro.Schema
-import org.apache.avro.specific.SpecificRecordBase
-import org.apache.spark.sql.{Dataset, SparkSession}
-
-/**
- * Support for reading avro datastores as datasets.
- *
- * @param spark SparkSession instance.
- */
-class AvroDatasetReader(val spark: SparkSession) extends Serializable {
-
- /**
- * Reads avro datastore as Spark dataset using avro schema and kryo encoder.
- *
- * NOTE: due to inability to use bean-based encoder for avro types this method uses kryo encoder;
- * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
- *
- * @param path Path to the data store.
- * @param avroSchema Avro schema of the records.
- * @param clazz Class of objects in the dataset.
- * @tparam T Type of objects in the dataset.
- * @return Dataset with data read from given path.
- */
- def read[T <: SpecificRecordBase](path: String, avroSchema: Schema, clazz: Class[T]): Dataset[T] = {
- new AvroDataFrameSupport(spark).toDS(new AvroDataFrameReader(spark).read(path, avroSchema), clazz)
- }
-}
diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala
deleted file mode 100644
index ea39f79c8..000000000
--- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro
-
-import org.apache.avro.Schema
-import org.apache.avro.specific.SpecificRecordBase
-import org.apache.spark.sql._
-import org.apache.spark.sql.avro._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.types.StructType
-
-/**
- * Support for datasets of avro types.
- *
- * @param spark SparkSession instance.
- */
-class AvroDatasetSupport(val spark: SparkSession) extends Serializable {
-
- /**
- * Creates a dataframe from given dataset of avro type.
- *
- * @param ds Dataset to be converted to a dataframe.
- * @param avroSchema Avro schema of the records.
- * @tparam T Type of objects in the dataset.
- * @return DataFrame of objects corresponding to records in the given dataset.
- */
-// def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = {
-// val avroSchemaStr = avroSchema.toString
-// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
-// val encoder = RowEncoder(rowSchema).resolveAndBind()
-//
-// object SerializationSupport extends Serializable {
-// @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema)
-// private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
-//
-// def doToDF(): DataFrame = {
-// spark.createDataFrame(rows, rowSchema)
-// }
-// }
-//
-// SerializationSupport.doToDF()
-// }
-}
diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala
deleted file mode 100644
index 20258d3a9..000000000
--- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro
-
-import org.apache.avro.Schema
-import org.apache.avro.specific.SpecificRecordBase
-import org.apache.spark.sql.Dataset
-
-/**
- * Support for writing datasets of avro types.
- *
- * @param ds Dataset of avro type.
- * @tparam T Avro type.
- */
-class AvroDatasetWriter[T <: SpecificRecordBase](ds: Dataset[T]) extends Serializable {
-
- /**
- * Writes a dataset as avro datastore using avro schema.
- *
- * @param path Path to the data store.
- * @param avroSchema Avro schema of the records.
- */
-// def write(path: String, avroSchema: Schema): Unit = {
-// new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema))
-// .write(path, avroSchema)
-// }
-}
diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java
index eafcb3b02..f4168cafc 100644
--- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java
+++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java
@@ -1,23 +1,23 @@
package eu.dnetlib.iis.common.spark.avro;
-import eu.dnetlib.iis.common.avro.Person;
-import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
-import eu.dnetlib.iis.common.utils.AvroTestUtils;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.avro.SchemaConverters;
-import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import eu.dnetlib.iis.common.avro.Person;
+import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
+import eu.dnetlib.iis.common.utils.AvroTestUtils;
class AvroDataFrameReaderTest extends TestWithSharedSparkSession {
@@ -29,25 +29,6 @@ public void beforeEach() {
reader = new AvroDataFrameReader(spark());
}
- @Test
- @DisplayName("Avro dataframe reader reads avro datastore with SQL schema as dataframe")
- public void givenAvroDatastore_whenReadUsingAvroReaderWithSQLSchema_thenProperDataFrameIsReturned(@TempDir Path inputDir) throws IOException {
- Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
- List data = Collections.singletonList(person);
- AvroTestUtils.createLocalAvroDataStore(data, inputDir.toString(), Person.class);
-
- Dataset result = reader.read(inputDir.toString(),
- (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType());
-
- assertEquals(SchemaConverters.toSqlType(Person.SCHEMA$).dataType(), result.schema());
- List rows = result.collectAsList();
- assertEquals(1, rows.size());
- Row row = rows.get(0);
- assertEquals(person.getId(), row.getAs("id"));
- assertEquals(person.getName(), row.getAs("name"));
- assertEquals(person.getAge(), row.getAs("age"));
- }
-
@Test
@DisplayName("Avro dataframe reader reads avro datastore with avro schema as dataframe")
public void givenAvroDatastore_whenReadUsingAvroReaderWithAvroSchema_thenProperDataFrameIsReturned(@TempDir Path inputDir) throws IOException {
diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java
index 5e28202b3..c073b069b 100644
--- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java
+++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java
@@ -1,49 +1,23 @@
package eu.dnetlib.iis.common.spark.avro;
-import eu.dnetlib.iis.common.avro.Person;
-import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
-import org.apache.avro.Schema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import eu.dnetlib.iis.common.avro.Person;
+import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
public class AvroDataFrameSupportTest extends TestWithSharedSparkSession {
- private AvroDataFrameSupport support;
-
- @BeforeEach
- public void beforeEach() {
- super.beforeEach();
- support = new AvroDataFrameSupport(spark());
- }
-
-// @Test
-// @DisplayName("Avro dataframe support creates dataframe from collection of avro type")
-// public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() {
-// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
-// List data = Collections.singletonList(person);
-//
-// Dataset result = support.createDataFrame(data, Person.SCHEMA$);
-//
-// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema());
-// List rows = result.collectAsList();
-// assertEquals(1, rows.size());
-// Row row = rows.get(0);
-// assertEquals(person.getId(), row.getAs("id"));
-// assertEquals(person.getName(), row.getAs("name"));
-// assertEquals(person.getAge(), row.getAs("age"));
-// }
-
@Test
@DisplayName("Avro dataframe support converts dataframe of avro type to dataset of avro type")
public void givenDataFrameOfAvroType_whenConvertedToDataset_thenProperDatasetIsReturned() {
@@ -53,7 +27,7 @@ public void givenDataFrameOfAvroType_whenConvertedToDataset_thenProperDatasetIsR
data, (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType()
);
- Dataset result = support.toDS(df, Person.class);
+ Dataset result = AvroDataFrameSupport.toDS(df, Person.class);
List personList = result.collectAsList();
assertEquals(1, personList.size());
@@ -63,7 +37,4 @@ public void givenDataFrameOfAvroType_whenConvertedToDataset_thenProperDatasetIsR
assertEquals(personRow.getAs(2), person.getAge());
}
- private static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) {
- assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable());
- }
}
\ No newline at end of file
diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java
index e257a9cae..74f2c2824 100644
--- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java
+++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java
@@ -1,9 +1,12 @@
package eu.dnetlib.iis.common.spark.avro;
-import eu.dnetlib.iis.common.avro.Person;
-import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
-import eu.dnetlib.iis.common.utils.AvroTestUtils;
-import org.apache.avro.generic.GenericRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -14,12 +17,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import eu.dnetlib.iis.common.avro.Person;
+import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
+import eu.dnetlib.iis.common.utils.AvroTestUtils;
class AvroDataFrameWriterTest extends TestWithSharedSparkSession {
@@ -28,26 +28,6 @@ public void beforeEach() {
super.beforeEach();
}
- @Test
- @DisplayName("Avro dataframe writer writes dataframe of avro type using SQL schema")
- public void givenDataFrameOfAvroType_whenWrittenToOutputUsingSQLSchema_thenWriteSucceeds(@TempDir Path workingDir) throws IOException {
- Path outputDir = workingDir.resolve("output");
- Row personRow = RowFactory.create(1, "name", 2);
- Dataset df = spark().createDataFrame(
- Collections.singletonList(personRow),
- (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType()
- );
-
- new AvroDataFrameWriter(df).write(outputDir.toString());
-
- List genericRecordList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString());
- assertEquals(1, genericRecordList.size());
- GenericRecord genericRecord = genericRecordList.get(0);
- assertEquals(personRow.getAs(0), genericRecord.get(0));
- assertEquals(personRow.getAs(1).toString(), genericRecord.get(1).toString());
- assertEquals(personRow.getAs(2), genericRecord.get(2));
- }
-
@Test
@DisplayName("Avro dataframe writer writes dataframe of avro type using avro schema")
public void givenDataFrameOfAvroType_whenWrittenToOutputUsingAvroSchema_thenWriteSucceeds(@TempDir Path workingDir) throws IOException {
diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java
index e57403116..1537a87ae 100644
--- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java
+++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java
@@ -1,21 +1,13 @@
package eu.dnetlib.iis.common.spark.avro;
-import eu.dnetlib.iis.common.avro.Person;
-import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import org.apache.avro.Schema;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.List;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
public class AvroDatasetSupportTest extends TestWithSharedSparkSession {
@@ -24,26 +16,6 @@ public void beforeEach() {
super.beforeEach();
}
-// @Test
-// @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type")
-// public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() {
-// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
-// Dataset ds = spark().createDataset(
-// Collections.singletonList(person),
-// Encoders.kryo(Person.class)
-// );
-//
-// Dataset result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$);
-//
-// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema());
-// List rows = result.collectAsList();
-// assertEquals(1, rows.size());
-// Row row = rows.get(0);
-// assertEquals(person.getId(), row.getAs("id"));
-// assertEquals(person.getName(), row.getAs("name"));
-// assertEquals(person.getAge(), row.getAs("age"));
-// }
-
public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) {
assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable());
}
diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java
deleted file mode 100644
index beab31aa2..000000000
--- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package eu.dnetlib.iis.common.spark.avro;
-
-import eu.dnetlib.iis.common.avro.Person;
-import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
-import eu.dnetlib.iis.common.utils.AvroTestUtils;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class AvroDatasetWriterTest extends TestWithSharedSparkSession {
-
- @BeforeEach
- public void beforeEach() {
- super.beforeEach();
- }
-
-// @Test
-// @DisplayName("Avro dataset writer writes dataset of avro type")
-// public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException {
-// Path outputDir = workingDir.resolve("output");
-// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
-// Dataset ds = spark().createDataset(
-// Collections.singletonList(person),
-// Encoders.kryo(Person.class)
-// );
-//
-// new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$);
-//
-// List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString());
-// assertEquals(1, personList.size());
-// Person personRead = personList.get(0);
-// assertEquals(person, personRead);
-// }
-}
\ No newline at end of file
diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java
index 816914a9b..b4310ade2 100644
--- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java
+++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java
@@ -46,7 +46,7 @@ public void givenInputCitationsPath_whenRun_thenSerializedAtomicActionsAndReport
);
Path inputCitationsPath = rootInputPath.resolve("citations");
- new AvroDataFrameWriter(CitationRelationExporterTestUtils.createDataFrame(spark(), citationsList)).write(inputCitationsPath.toString());
+ new AvroDataFrameWriter(CitationRelationExporterTestUtils.createDataFrame(spark(), citationsList)).write(inputCitationsPath.toString(),Citations.SCHEMA$);
float trustLevelThreshold = 0.5f;
Path outputRelationPath = rootOutputPath.resolve("output");
diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java
index 687158df1..7c8400924 100644
--- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java
+++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java
@@ -59,7 +59,7 @@ public static void main(String[] args) throws Exception {
Dataset result = sparkSession.sql("select id, location, mimetype, size, hash from "
+ params.inputTableName + " where location is not null");
- JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession);
+ JavaRDD documentContentUrl = buildOutputRecord(result);
documentContentUrl.cache();
JavaRDD reports = generateReportEntries(sparkSession, documentContentUrl.count());
@@ -76,7 +76,7 @@ private static JavaRDD generateReportEntries(SparkSession sparkSess
Encoders.kryo(ReportEntry.class)).javaRDD();
}
- private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) {
+ private static JavaRDD buildOutputRecord(Dataset source) {
Dataset resultDs = source.select(
concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"),
col("location").as("url"),
@@ -84,7 +84,7 @@ private static JavaRDD buildOutputRecord(Dataset source
col("size").cast("long").divide(1024).as("contentSizeKB"),
col("hash").as("contentChecksum")
);
- return new AvroDataFrameSupport(spark).toDS(resultDs, DocumentContentUrl.class).toJavaRDD();
+ return AvroDataFrameSupport.toDS(resultDs, DocumentContentUrl.class).toJavaRDD();
}
@Parameters(separators = "=")
diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java
index bae8e4238..0af611eb1 100644
--- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java
+++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java
@@ -63,6 +63,6 @@ public static JavaRDD produceGraphIdToObjectStoreIdMapping(Ja
col("oid").as("originalId")
);
- return new AvroDataFrameSupport(spark).toDS(identifierMappingDF, IdentifierMapping.class).toJavaRDD();
+ return AvroDataFrameSupport.toDS(identifierMappingDF, IdentifierMapping.class).toJavaRDD();
}
}
diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java
index c8945089f..d23e49ef8 100644
--- a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java
+++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java
@@ -94,7 +94,7 @@ public static void main(String[] args) throws Exception {
JavaPairRDD> cacheById = cachedSources
.mapToPair(x -> new Tuple2<>(x.getId(), Optional.of(x)))
- .union(cachedFaults.mapToPair(x -> new Tuple2<>(x.getInputObjectId(), Optional.empty())));
+ .union(cachedFaults.mapToPair(x -> new Tuple2>(x.getInputObjectId(), Optional.empty())));
JavaPairRDD inputById = importedPatents
.mapToPair(x -> new Tuple2<>(getId(x), x));
JavaPairRDD>>> inputJoinedWithCache =
diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java
index f90d1cf76..1cff2db27 100644
--- a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java
+++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java
@@ -99,7 +99,7 @@ public static void main(String[] args) throws Exception {
JavaPairRDD> cacheByUrl = cachedSources
.mapToPair(x -> new Tuple2<>(x.getId(), Optional.of(x)))
- .union(cachedFaults.mapToPair(x -> new Tuple2<>(x.getInputObjectId(), Optional.empty())));
+ .union(cachedFaults.mapToPair(x -> new Tuple2>(x.getInputObjectId(), Optional.empty())));
JavaPairRDD inputByUrl = documentToSoftwareUrl
.mapToPair(x -> new Tuple2<>(x.getSoftwareUrl(), x));
JavaPairRDD>>> inputJoinedWithCache =