Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3594: FsInput to use openFile() API #1807

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

steveloughran
Copy link

@steveloughran steveloughran commented Aug 4, 2022

Boost performance reading from object stores in hadoop 3.3.0+ by using the openFile builder API and passing in the file length as an option (can save a HEAD) and asks for adaptive IO (sequential going to random if the client starts seeking)

saving that HEAD request is a key benefit against s3 as it can save 50-100 mS per file.

Jira

Tests

  • My PR does not need testing for this extremely good reason:
  1. All existing local file IO tests act as regression tests.
  2. avro isn't set up for integration tests with abfs/gs/s3a urls.
  3. mocking doesn't really do much here.

Integration tests would be the way to do this, but the foundational set up to do this
is pretty complex. My cloudstream project downstream of spark is set up to do this.

Commits

  • My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

no new docs

@github-actions github-actions bot added the Java Pull Requests for Java binding label Aug 4, 2022
@steveloughran steveloughran marked this pull request as draft August 4, 2022 16:47
@steveloughran
Copy link
Author

except that hadoop-2 profile still exists, doesn't it? which means that even though hadoop 3 profile is full of features, the 2.x one blocks things from working

// Filesystems which don't recognize the options will ignore them

this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive")
.opt("fs.option.openfile.length", Long.toString(len)).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice optimization,
May be better with usage of Options.OpenFileOptions constants
FS_OPTION_OPENFILE_LENGTH
FS_OPTION_OPENFILE_READ_POLICY
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only added those options explicitly in branch-3.3; the code wouldn't compile/link with 3.3.0. hence the strings. Unfortunately it also means you don't get that speedup until 3.3.5 ships ....but the release will be ready.
3.3.0 did add the withFileStatus(FileStatus) param which was my original design for passing in all filestatus info, inc etag and maybe version, so you can go straight from listing to opening.

first in s3a, added abfs in 3.3.5. but it is too brittle because the path checking requires status.getPath to equal the path opened. and hive with its wrapper fs doesn't always do that.
Passing in file length is guaranteed to be ignored or actually used...no brittleness. it also suits hive where workers know the length of the file but don't have a status.

One thing i can add with immediate benefit in 3.3.0 is the initial the fs.s3a.experimental.fadvise option, which again can mandate be adaptive, even on hive clusters where they explicitly set read policy to be random (which some do for max orc/parquet performance). The new opt fs.option.openfile.read.policy is an evolution of that (you can now specify a list of policies and the first one recognised is understood. if someone ever implemented explicit "parquet", "orc", and "avro" for example), you could say the read policy is "orc, random, adaptive" and get the first one known.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for explanation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that org.apache.hadoop.fs.AvroFSInput does this

Copy link
Contributor

@clesaec clesaec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// Filesystems which don't recognize the options will ignore them

this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive")
.opt("fs.option.openfile.length", Long.toString(len)).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for explanation.

@steveloughran
Copy link
Author

@clesaec this can't go in until hadoop2 is cut as a profile.

I am actually doing a shim library to do reflection invocation on the newer operations, with fallbacks if not found.
https://github.com/steveloughran/fs-api-shim

but even so, the sooner avro goes to recent hadoop 3.x release only the better.

@RyanSkraba RyanSkraba changed the title AVRO-3594. FsInput to use openFile() API AVRO-3594: FsInput to use openFile() API Aug 25, 2022
Boost performance reading from object stores in
hadoop by using the openFile builder API
and passing in the file length as an option
(can save a HEAD) and asks for adaptive IO
(sequential going to random if the client starts seeking)
Change-Id: Ie7099208b601a08823775e901359a6727f0c6fe6
@steveloughran
Copy link
Author

@opwvhk thanks for the review, will do a merge rebase to make sure it is good.
FYI, parquet picked this up last week as now they are hadoop 3.3.0+ they can do this without reflection.

// "random"
// will not suffer when reading large avro files.
this.stream = awaitFuture(fileSystem.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).withFileStatus(st).build());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add the same fallback code as in parquet; s3a connector in 3.3.0-3.3.4 validates the whole path, and overreacts to hive's wrapping of the fs by its own VFS; 3.3.5+ only look at the filename.

Change-Id: I4ad736a35d625db9d67deda1d11a2d6e51a789e6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Java Pull Requests for Java binding
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants