From 37b536493cad0b7ef20b0dc43959e44551964d49 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Fri, 29 Sep 2023 15:11:29 +0200 Subject: [PATCH] Closes #1426: Run IIS experiments by relying on spark 3.4 version WIP. Fixing task serialization issue by upgrading avro dependency from 1.8.10 to 1.11.1 which is already a part of sharelib342. This induced the requirement to align JsonConverter with the new code and one of the requirements to move it to a different package due to limited visibility of one of the crucial methods. Further logging system dependency alignment to make unit tests output produced on console visible. --- .../streaming/AvroAsJSONOutputFormat.java | 2 +- .../streaming/AvroAsJSONRecordWriter.java | 3 +- .../apache/avro}/JsonConverter.java | 23 +++++----- .../avro/common/JsonConverterTest.java | 2 + iis-3rdparty-avrojsoncoders/pom.xml | 4 ++ .../org/apache/avro/io/HackedJsonEncoder.java | 7 ++- .../CitationRelationExporterIOUtils.java | 7 +-- pom.xml | 44 +++++++++++++++++-- 8 files changed, 67 insertions(+), 25 deletions(-) rename iis-3rdparty-avro-json/src/main/java/{com/cloudera/science/avro/common => org/apache/avro}/JsonConverter.java (93%) diff --git a/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONOutputFormat.java b/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONOutputFormat.java index d49275fa1..c08e3bb2c 100644 --- a/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONOutputFormat.java +++ b/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONOutputFormat.java @@ -16,6 +16,7 @@ import java.io.IOException; +import org.apache.avro.JsonConverter; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; @@ -32,7 +33,6 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; -import com.cloudera.science.avro.common.JsonConverter; import com.cloudera.science.avro.common.SchemaLoader; /** diff --git a/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordWriter.java b/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordWriter.java index 2421ae74e..ee5a79375 100644 --- a/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordWriter.java +++ b/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordWriter.java @@ -16,14 +16,13 @@ import java.io.IOException; +import org.apache.avro.JsonConverter; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; -import com.cloudera.science.avro.common.JsonConverter; - public class AvroAsJSONRecordWriter implements RecordWriter { private final DataFileWriter writer; diff --git a/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/common/JsonConverter.java b/iis-3rdparty-avro-json/src/main/java/org/apache/avro/JsonConverter.java similarity index 93% rename from iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/common/JsonConverter.java rename to iis-3rdparty-avro-json/src/main/java/org/apache/avro/JsonConverter.java index aac2ba20a..a7f9723cf 100644 --- a/iis-3rdparty-avro-json/src/main/java/com/cloudera/science/avro/common/JsonConverter.java +++ b/iis-3rdparty-avro-json/src/main/java/org/apache/avro/JsonConverter.java @@ -12,7 +12,7 @@ * the specific language governing permissions and limitations under the * License. */ -package com.cloudera.science.avro.common; +package org.apache.avro; import java.io.IOException; import java.util.ArrayList; @@ -21,14 +21,13 @@ import java.util.Map; import java.util.Set; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -119,26 +118,26 @@ private GenericRecord convert(Map raw, Schema schema) Object value = null; switch (fieldSchema.getType()) { case BOOLEAN: - value = defaultValue.getValueAsBoolean(); + value = defaultValue.asBoolean(); break; case DOUBLE: - value = defaultValue.getValueAsDouble(); + value = defaultValue.asDouble(); break; case FLOAT: - value = (float) defaultValue.getValueAsDouble(); + value = (float) defaultValue.asDouble(); break; case INT: - value = defaultValue.getValueAsInt(); + value = defaultValue.asInt(); break; case LONG: - value = defaultValue.getValueAsLong(); + value = defaultValue.asLong(); break; case STRING: - value = defaultValue.getValueAsText(); + value = defaultValue.asText(); break; case MAP: Map fieldMap = mapper.readValue( - defaultValue.getValueAsText(), Map.class); + defaultValue.asText(), Map.class); Map mvalue = Maps.newHashMap(); for (Map.Entry e : fieldMap.entrySet()) { mvalue.put(e.getKey(), @@ -148,7 +147,7 @@ private GenericRecord convert(Map raw, Schema schema) break; case ARRAY: List fieldArray = mapper.readValue( - defaultValue.getValueAsText(), List.class); + defaultValue.asText(), List.class); List lvalue = Lists.newArrayList(); for (Object elem : fieldArray) { lvalue.add(typeConvert(elem, name, fieldSchema.getElementType())); @@ -157,7 +156,7 @@ private GenericRecord convert(Map raw, Schema schema) break; case RECORD: Map fieldRec = mapper.readValue( - defaultValue.getValueAsText(), Map.class); + defaultValue.asText(), Map.class); value = convert(fieldRec, fieldSchema); break; default: diff --git a/iis-3rdparty-avro-json/src/test/java/com/cloudera/science/avro/common/JsonConverterTest.java b/iis-3rdparty-avro-json/src/test/java/com/cloudera/science/avro/common/JsonConverterTest.java index b1e64b57e..fe7796e10 100644 --- a/iis-3rdparty-avro-json/src/test/java/com/cloudera/science/avro/common/JsonConverterTest.java +++ b/iis-3rdparty-avro-json/src/test/java/com/cloudera/science/avro/common/JsonConverterTest.java @@ -1,6 +1,8 @@ package com.cloudera.science.avro.common; import com.google.common.collect.ImmutableList; + +import org.apache.avro.JsonConverter; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericRecord; diff --git a/iis-3rdparty-avrojsoncoders/pom.xml b/iis-3rdparty-avrojsoncoders/pom.xml index c931f1afe..bbc6e83e8 100644 --- a/iis-3rdparty-avrojsoncoders/pom.xml +++ b/iis-3rdparty-avrojsoncoders/pom.xml @@ -18,6 +18,10 @@ org.apache.avro avro + + org.codehaus.jackson + jackson-core-asl + diff --git a/iis-3rdparty-avrojsoncoders/src/main/java/org/apache/avro/io/HackedJsonEncoder.java b/iis-3rdparty-avrojsoncoders/src/main/java/org/apache/avro/io/HackedJsonEncoder.java index 9b6e4345c..1d1c82bb6 100644 --- a/iis-3rdparty-avrojsoncoders/src/main/java/org/apache/avro/io/HackedJsonEncoder.java +++ b/iis-3rdparty-avrojsoncoders/src/main/java/org/apache/avro/io/HackedJsonEncoder.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.BitSet; import org.apache.avro.AvroTypeException; @@ -196,8 +197,10 @@ public void writeBytes(byte[] bytes, int start, int len) throws IOException { private void writeByteArray(byte[] bytes, int start, int len) throws IOException { - out.writeString( - new String(bytes, start, len, JsonDecoder.CHARSET)); + out.writeString( +// mh: after upgrading avro version JsonDecoder.CHARSET got missing but the value used in that class was set to StandardCharsets.ISO_8859_1 +// new String(bytes, start, len, JsonDecoder.CHARSET)); + new String(bytes, start, len, StandardCharsets.ISO_8859_1)); } @Override diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java index 51fa848c6..d7f67b919 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java +++ b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java @@ -3,7 +3,6 @@ import eu.dnetlib.iis.common.java.io.HdfsUtils; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.avro.AvroDataFrameReader; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter; import eu.dnetlib.iis.common.utils.RDDUtils; import eu.dnetlib.iis.export.schemas.Citations; import pl.edu.icm.sparkutils.avro.SparkAvroSaver; @@ -86,10 +85,8 @@ public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset reportEntries, String outputReportPath) { storeReportEntries(avroSaver, reportEntries, outputReportPath, (ds, path) -> - // FIXME avoiding relying on writing Dataset which apparently is not easily achievable in spark3 - // due to AvroDatasetSupport scala functions referring to classes which were made private in spark3 - avroSaver.saveJavaRDD(ds.javaRDD(), ReportEntry.SCHEMA$, path)); - // new AvroDataFrameWriter(ds).write(path, ReportEntry.SCHEMA$)); + //mh: due to changes in avro serialization model in spark3 relying on AvroSaver instead of writer storing Datasets + avroSaver.saveJavaRDD(ds.toJavaRDD(), ReportEntry.SCHEMA$, path)); } public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset reportEntries, diff --git a/pom.xml b/pom.xml index 0781e40f1..5a67fdb92 100644 --- a/pom.xml +++ b/pom.xml @@ -124,6 +124,28 @@ log4j-slf4j-impl 2.19.0 + + + + log4j + log4j + 1.2.17 + provided + + + + org.slf4j + slf4j-log4j12 + 1.7.5 + provided + + + + + org.slf4j + slf4j-api + 2.0.6 + + - + org.apache.avro avro - ${iis.avro.version} + 1.11.1 + provided + + com.fasterxml.jackson.core + jackson-core + 2.14.2 + provided + + + + + org.codehaus.jackson + jackson-core-asl + + 1.8.10 + + org.apache.avro