Skip to content

Commit

Permalink
AVRO-3594. openFile() API.
Browse files Browse the repository at this point in the history
For hadoop 3.3.5+; passes in length to save a HEAD call on abfs as well as s3

Change-Id: I9054ad1c3b374cabce36d06e9fbf7089adf8cad3
  • Loading branch information
steveloughran committed Apr 21, 2023
1 parent 088c346 commit d70a1ba
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.avro.file.SeekableInput;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */
Expand All @@ -42,16 +45,20 @@ public FsInput(Path path, Configuration conf) throws IOException {

/** Construct given a path and a {@code FileSystem}. */
public FsInput(Path path, FileSystem fileSystem) throws IOException {
this.len = fileSystem.getFileStatus(path).getLen();
// use the hadoop 3.3.0 openFile API and specify length
final FileStatus st = fileSystem.getFileStatus(path);
this.len = st.getLen();
// use the hadoop 3.3.0 openFile API, pass in status
// and read policy. object stores can use these to
// optimize read performance.
// the read policy "adaptive" means "start sequential but
// go to random IO after backwards seeks"
// go to random IO when considered better"
// Filesystems which don't recognize the options will ignore them

this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive")
.opt("fs.option.openfile.length", Long.toString(len)).build());
// Importantly deployments which have switched the default read policy to "random"
// will not suffer when reading large avro files.
this.stream = awaitFuture(fileSystem.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
.withFileStatus(st)
.build());
}

@Override
Expand Down

0 comments on commit d70a1ba

Please sign in to comment.