diff --git a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala index 4ebcec85..9fede8d1 100644 --- a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -20,6 +20,7 @@ import java.io._ import java.net.URI import java.util.zip.Deflater +import scala.math.Ordering import scala.util.control.NonFatal import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration} @@ -61,24 +62,38 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sparkContext.hadoopConfiguration - // Schema evolution is not supported yet. Here we only pick a single random sample file to + // Schema evolution is not supported yet. Here we only pick the last file sorted by path to // figure out the schema of the whole dataset. - val sampleFile = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) { - files.find(_.getPath.getName.endsWith(".avro")).getOrElse { + def sampleFilePath = { + implicit def pathOrdering: Ordering[Path] = Ordering.fromLessThan( + (p1: Path, p2: Path) => p1.compareTo(p2) <= 0 + ) + + val ignoreWithoutExtension = conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true) + + val paths = if (ignoreWithoutExtension) { + files.map(_.getPath).filter(_.getName.endsWith(".avro")) + } else { + files.map(_.getPath) + } + + if (paths.isEmpty) { throw new FileNotFoundException( - "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" is " + - "set to true. Do all input files have \".avro\" extension?" + if (ignoreWithoutExtension) { + "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " + + "is set to true. Do all input files have \".avro\" extension?" + } else { + "No Avro files found." + } ) - } - } else { - files.headOption.getOrElse { - throw new FileNotFoundException("No Avro files found.") + } else { + paths.max } } // User can specify an optional avro json schema. val avroSchema = options.get(AvroSchema).map(new Schema.Parser().parse).getOrElse { - val in = new FsInput(sampleFile.getPath, conf) + val in = new FsInput(sampleFilePath, conf) try { val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()) try { diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 8e49caa0..6606fa24 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -803,4 +803,21 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { assert(readDf.collect().sameElements(writeDf.collect())) } } + + test("writing avro partitions with different schemas and reading back out with a single predictable schema") { + TestUtils.withTempDir { tempDir => + val df1 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b"))) + val df2 = spark.createDataFrame(Seq(("a", 1), ("b", 2))) + + df1.write.avro(s"$tempDir/different_schemas/z=1") + df2.write.avro(s"$tempDir/different_schemas/z=2") + val df3 = spark.read.avro(s"$tempDir/different_schemas") + assert(df3.schema.fieldNames.toSet === Set("_1", "_2", "z")) + + df1.write.avro(s"$tempDir/different_schemas_yet_again/z=2") + df2.write.avro(s"$tempDir/different_schemas_yet_again/z=1") + val df4 = spark.read.avro(s"$tempDir/different_schemas_yet_again") + assert(df4.schema.fieldNames.toSet === Set("_1", "z")) + } + } }