From a20c7fbc5913f5d2af1bbf3a17114cad43fd8a7a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 21 Apr 2023 16:33:42 +0100 Subject: [PATCH] AVRO-3594. openFile() API through shim Change-Id: Ie7099208b601a08823775e901359a6727f0c6fe6 --- .../java/org/apache/avro/mapred/FsInput.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java index d7a6e147aeb..5e25e3ccfe4 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java @@ -22,12 +22,15 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.avro.file.SeekableInput; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */ @@ -42,16 +45,19 @@ public FsInput(Path path, Configuration conf) throws IOException { /** Construct given a path and a {@code FileSystem}. */ public FsInput(Path path, FileSystem fileSystem) throws IOException { - this.len = fileSystem.getFileStatus(path).getLen(); - // use the hadoop 3.3.0 openFile API and specify length + final FileStatus st = fileSystem.getFileStatus(path); + this.len = st.getLen(); + // use the hadoop 3.3.0 openFile API, pass in status // and read policy. object stores can use these to // optimize read performance. // the read policy "adaptive" means "start sequential but - // go to random IO after backwards seeks" + // go to random IO when considered better" // Filesystems which don't recognize the options will ignore them - - this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") - .opt("fs.option.openfile.length", Long.toString(len)).build()); + // Importantly deployments which have switched the default read policy to + // "random" + // will not suffer when reading large avro files. + this.stream = awaitFuture(fileSystem.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).withFileStatus(st).build()); } @Override