diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java index e8fe1d03e1d..d7a6e147aeb 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java @@ -28,6 +28,8 @@ import org.apache.avro.file.SeekableInput; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */ public class FsInput implements Closeable, SeekableInput { private final FSDataInputStream stream; @@ -41,7 +43,15 @@ public FsInput(Path path, Configuration conf) throws IOException { /** Construct given a path and a {@code FileSystem}. */ public FsInput(Path path, FileSystem fileSystem) throws IOException { this.len = fileSystem.getFileStatus(path).getLen(); - this.stream = fileSystem.open(path); + // use the hadoop 3.3.0 openFile API and specify length + // and read policy. object stores can use these to + // optimize read performance. + // the read policy "adaptive" means "start sequential but + // go to random IO after backwards seeks" + // Filesystems which don't recognize the options will ignore them + + this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") + .opt("fs.option.openfile.length", Long.toString(len)).build()); } @Override