Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/warnings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ no_warning(thread-safety-negative) # experimental flag, too many false positives
no_warning(unsafe-buffer-usage) # too aggressive
no_warning(switch-default) # conflicts with "defaults in a switch covering all enum values"
no_warning(nrvo) # not eliding copy on return - too aggressive
no_warning(missing-noreturn) # Clang: many throw-only overrides; marking [[noreturn]] on all of them is impractical
1 change: 1 addition & 0 deletions contrib/libhdfs3-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ set(SRCS
"${HDFS3_SOURCE_DIR}/client/RawErasureCoderFactory.cpp"
"${HDFS3_SOURCE_DIR}/client/RawErasureDecoder.cpp"
"${HDFS3_SOURCE_DIR}/client/RawErasureEncoder.cpp"
"${HDFS3_SOURCE_DIR}/client/PositionStripeReader.cpp"
"${HDFS3_SOURCE_DIR}/client/StatefulStripeReader.cpp"
"${HDFS3_SOURCE_DIR}/client/StripeReader.cpp"
"${HDFS3_SOURCE_DIR}/client/StripedBlockUtil.cpp"
Expand Down
18 changes: 14 additions & 4 deletions src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ extern const int ARGUMENT_OUT_OF_BOUND;


ORCInputStream::ORCInputStream(SeekableReadBuffer & in_, size_t file_size_, bool use_prefetch)
: in(in_), file_size(file_size_), supports_read_at(use_prefetch && in_.supportsReadAt())
: in(in_)
, file_size(file_size_)
, use_offset_based_read(in_.supportsReadAt())
, use_async_prefetch(use_prefetch && use_offset_based_read)
{
if (supports_read_at)
if (use_async_prefetch)
async_runner = threadPoolCallbackRunnerUnsafe<void>(getIOThreadPool().get(), "ORCFile");
}

Expand All @@ -114,13 +117,20 @@ UInt64 ORCInputStream::getNaturalReadSize() const

void ORCInputStream::read(void * buf, UInt64 length, UInt64 offset)
{
if (supports_read_at)
if (use_offset_based_read)
{
size_t bytes_read = 0;
while (bytes_read < length)
{
size_t bytes_to_read = length - bytes_read;
size_t n = in.readBigAt(reinterpret_cast<char *>(buf) + bytes_read, bytes_to_read, offset + bytes_read, nullptr);
if (n == 0)
throw Exception(
ErrorCodes::INCORRECT_DATA,
"ORC readBigAt returned no bytes at offset {} ({} bytes remaining of {}); input may be truncated or corrupted",
offset + bytes_read,
bytes_to_read,
length);
bytes_read += n;
}
}
Expand All @@ -134,7 +144,7 @@ void ORCInputStream::read(void * buf, UInt64 length, UInt64 offset)

std::future<void> ORCInputStream::readAsync(void * buf, uint64_t length, uint64_t offset)
{
if (supports_read_at)
if (use_async_prefetch)
{
return async_runner(
[this, buf, length, offset]
Expand Down
5 changes: 4 additions & 1 deletion src/Processors/Formats/Impl/NativeORCBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ class ORCInputStream : public orc::InputStream
protected:
SeekableReadBuffer & in;
size_t file_size;
bool supports_read_at;
/// Use offset-based reads (ReadBuffer::readBigAt, e.g. hdfs pread) instead of seek+read; needed for ORC tail on HDFS EC.
bool use_offset_based_read;
/// Async wrapper only when caller enabled prefetch and the buffer supports read-at.
bool use_async_prefetch;
ThreadPoolCallbackRunnerUnsafe<void> async_runner;

std::string name = "ORCInputStream";
Expand Down