diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java index d7a6e147aeb..76768d5802e 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java @@ -22,12 +22,15 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.avro.file.SeekableInput; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */ @@ -42,16 +45,20 @@ public FsInput(Path path, Configuration conf) throws IOException { /** Construct given a path and a {@code FileSystem}. */ public FsInput(Path path, FileSystem fileSystem) throws IOException { - this.len = fileSystem.getFileStatus(path).getLen(); - // use the hadoop 3.3.0 openFile API and specify length + final FileStatus st = fileSystem.getFileStatus(path); + this.len = st.getLen(); + // use the hadoop 3.3.0 openFile API, pass in status // and read policy. object stores can use these to // optimize read performance. // the read policy "adaptive" means "start sequential but - // go to random IO after backwards seeks" + // go to random IO when considered better" // Filesystems which don't recognize the options will ignore them - - this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") - .opt("fs.option.openfile.length", Long.toString(len)).build()); + // Importantly deployments which have switched the default read policy to "random" + // will not suffer when reading large avro files. + this.stream = awaitFuture(fileSystem.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .withFileStatus(st) + .build()); } @Override