-
Notifications
You must be signed in to change notification settings - Fork 1.5k
GH-3356: Add buffers allocated by vectored IO for releasing #3357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); | ||
| // Request a vectored read; | ||
| f.readVectored(ranges, options.getAllocator()); | ||
| f.readVectored(ranges, new ReleasingAllocator(options.getAllocator(), builder.releaser)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the right direction. Is it better to make it a contract for ByteBufferAllocator implementations to take this responsibility?
WDYT? @gszadovszky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get the concept of this PR. ByteBufferAllocator actually has the contract of releasing the ByteBuffer allocated by it. The only thing we need to do is to invoke this method at the right time when the related buffer is not needed anymore.
The ByteBufferReleaser concept came into the scope only to easily postpone the release invocation to the time we really can release the related ByteBuffers. (By using BytesInput we may pass the related buffers around and it is not always clear when to release them.)
@annimesh2809, I would suggest you to implement a unit test to reproduce the issue first. You may use TrackingByteBufferAllocator to fail if any allocated buffer is not released during the execution. You may find examples of its usage among the unit tests. If you find the issue, you'll need to ensure that the related allocated buffers are get back to their allocator to release them. You may use the existing patterns we already have or invent new ones if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I try to build parquet-mr with hadoop 3.4.2 without any additional changes, I see testRangeFiltering test case (and some others) of TestParquetReader suite fail. The TrackingByteBufferAllocator reveals that the unreleased allocation happens in:
Cause: org.apache.parquet.bytes.TrackingByteBufferAllocator$ByteBufferAllocationStacktraceException: Allocation stacktrace of the first ByteBuffer:
at org.apache.parquet.bytes.TrackingByteBufferAllocator$ByteBufferAllocationStacktraceException.create(TrackingByteBufferAllocator.java:96)
at org.apache.parquet.bytes.TrackingByteBufferAllocator.allocate(TrackingByteBufferAllocator.java:136)
at org.apache.hadoop.fs.impl.VectorIOBufferPool.getBuffer(VectorIOBufferPool.java:65)
at org.apache.hadoop.fs.RawLocalFileSystem$AsyncHandler.initiateRead(RawLocalFileSystem.java:400)
at org.apache.hadoop.fs.RawLocalFileSystem$AsyncHandler.access$000(RawLocalFileSystem.java:360)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.readVectored(RawLocalFileSystem.java:345)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.readVectored(RawLocalFileSystem.java:324)
at org.apache.hadoop.fs.BufferedFSInputStream.readVectored(BufferedFSInputStream.java:183)
at org.apache.hadoop.fs.FSDataInputStream.readVectored(FSDataInputStream.java:308)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readVectored(ChecksumFileSystem.java:474)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readVectored(ChecksumFileSystem.java:463)
The root cause here seems to be that ChecksumFileSystem (coming from hadoop) starts supporting readVectored: https://github.com/apache/hadoop/blob/branch-3.4.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java#L460-L513.
ChecksumFileSystem.readVectored internally does more allocations like:
sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);
which are not marked for release by ByteBufferReleaser.
Also with vectored reads, it is not sufficient to mark the buffers returned by the allocator for release, as they are sliced internally and the returned buffer object is different even though the underlying memory remains the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context, @annimesh2809.
Why do you need to track the allocated buffers to be released later instead of simply giving the allocate and release methods of the ByteBufferAllocator instance to the related Hadoop API via the implementations of SeekableInputStream.readVectored? I assume the Hadoop code would release the allocated buffers as soon as they are not needed anymore.
Rationale for this change
A couple of test cases of TestParquetReader suite started failing with errors like:
with hadoop 3.4.2 and parquet-mr 1.16.0
The leaks are happening when reading using vectored IO because we never pass the buffers to the releaser.
What changes are included in this PR?
Added a custom allocator that adds all allocated buffers to the releaser. This way all classes using the allocator for allocating buffers (like
ChecksumFileSystem) will also be cleaned up.Are these changes tested?
TestParquetReader suite passes with these changes.
Are there any user-facing changes?
No