diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 696bda193597..0d86f5eaa0d3 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -148,7 +148,7 @@ class DelayedBufferReader : public ::arrow::io::BufferReader { return DeferNotOk(::arrow::io::internal::SubmitIO( io_context, [self, position, nbytes]() -> Result> { std::this_thread::sleep_for(std::chrono::seconds(1)); - return self->DoReadAt(position, nbytes); + return self->DoReadAt(position, nbytes, /*allow_short_read=*/false); })); } diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 87d985a0fc2b..048455ce3c95 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -260,28 +260,38 @@ Status CudaBufferReader::DoSeek(int64_t position) { } Result CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes, - void* buffer) { + bool allow_short_read, void* buffer) { RETURN_NOT_OK(CheckClosed()); - nbytes = std::min(nbytes, size_ - position); - RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes)); - return nbytes; + auto real_nbytes = std::min(nbytes, size_ - position); + if (!allow_short_read && real_nbytes != nbytes) { + return Status::IOError("Cuda buffer too short: expected to be able to read ", nbytes, + " bytes, got ", real_nbytes); + } + RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, real_nbytes)); + return real_nbytes; } Result CudaBufferReader::DoRead(int64_t nbytes, void* buffer) { RETURN_NOT_OK(CheckClosed()); - ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer)); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + DoReadAt(position_, nbytes, /*allow_short_read=*/true, buffer)); position_ += bytes_read; return bytes_read; } Result> CudaBufferReader::DoReadAt(int64_t position, - int64_t nbytes) { + int64_t nbytes, + bool allow_short_read) { RETURN_NOT_OK(CheckClosed()); - int64_t size = std::min(nbytes, size_ - position); - return std::make_shared(buffer_, position, size); + auto real_nbytes = std::min(nbytes, size_ - position); + if (!allow_short_read && real_nbytes != nbytes) { + return Status::IOError("Cuda buffer too short: expected to be able to read ", nbytes, + " bytes, got ", real_nbytes); + } + return std::make_shared(buffer_, position, real_nbytes); } Result> CudaBufferReader::DoRead(int64_t nbytes) { diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 83d65b0e7f78..a25b6afd918c 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -177,8 +177,10 @@ class ARROW_CUDA_EXPORT CudaBufferReader Result DoRead(int64_t nbytes, void* buffer); Result> DoRead(int64_t nbytes); - Result DoReadAt(int64_t position, int64_t nbytes, void* out); - Result> DoReadAt(int64_t position, int64_t nbytes); + Result DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out); + Result> DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read); Result DoTell() const; Status DoSeek(int64_t position); diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index 74e98170ad0b..41fdd9f78108 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -167,7 +167,8 @@ struct ReadRangeCache::Impl { std::vector new_entries; new_entries.reserve(ranges.size()); for (const auto& range : ranges) { - new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length)); + new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length, + /*allow_short_read=*/false)); } return new_entries; } @@ -219,7 +220,8 @@ struct ReadRangeCache::Impl { ++next_it) { if (!next_it->future.is_valid()) { next_it->future = - file->ReadAsync(ctx, next_it->range.offset, next_it->range.length); + file->ReadAsync(ctx, next_it->range.offset, next_it->range.length, + /*allow_short_read=*/false); } ++num_prefetched; } @@ -272,7 +274,8 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl { Future> MaybeRead(RangeCacheEntry* entry) override { // Called by superclass Read()/WaitFor() so we have the lock if (!entry->future.is_valid()) { - entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length); + entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length, + /*allow_short_read=*/false); } return entry->future; } diff --git a/cpp/src/arrow/io/concurrency.h b/cpp/src/arrow/io/concurrency.h index 35c2aac6a7e1..8b5d2cb1fc5b 100644 --- a/cpp/src/arrow/io/concurrency.h +++ b/cpp/src/arrow/io/concurrency.h @@ -208,13 +208,23 @@ class RandomAccessFileConcurrencyWrapper : public RandomAccessFile { // to use the exclusive_guard. Result ReadAt(int64_t position, int64_t nbytes, void* out) final { + return ReadAt(position, nbytes, /*allow_short_read=*/true, out); + } + + Result ReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out) final { auto guard = lock_.shared_guard(); - return derived()->DoReadAt(position, nbytes, out); + return derived()->DoReadAt(position, nbytes, allow_short_read, out); } Result> ReadAt(int64_t position, int64_t nbytes) final { + return ReadAt(position, nbytes, /*allow_short_read=*/true); + } + + Result> ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) final { auto guard = lock_.shared_guard(); - return derived()->DoReadAt(position, nbytes); + return derived()->DoReadAt(position, nbytes, allow_short_read); } /* diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 4ce0b6d66253..ed2c8c9a246f 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -133,14 +133,21 @@ class OSFile { return ::arrow::internal::FileRead(fd_.fd(), reinterpret_cast(out), nbytes); } - Result ReadAt(int64_t position, int64_t nbytes, void* out) { + Result ReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out) { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(internal::ValidateRange(position, nbytes)); // ReadAt() leaves the file position undefined, so require that we seek // before calling Read() or Write(). need_seeking_.store(true); - return ::arrow::internal::FileReadAt(fd_.fd(), reinterpret_cast(out), - position, nbytes); + ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ::arrow::internal::FileReadAt( + fd_.fd(), reinterpret_cast(out), + position, nbytes)); + if (!allow_short_read && real_nbytes != nbytes) { + return Status::IOError("File too short: expected to be able to read ", nbytes, + " bytes, got ", real_nbytes); + } + return real_nbytes; } Status Seek(int64_t pos) { @@ -230,21 +237,20 @@ class ReadableFile::ReadableFileImpl : public OSFile { RETURN_NOT_OK(buffer->Resize(bytes_read)); buffer->ZeroPadding(); } - // R build with openSUSE155 requires an explicit shared_ptr construction - return std::shared_ptr(std::move(buffer)); + return buffer; } - Result> ReadBufferAt(int64_t position, int64_t nbytes) { + Result> ReadBufferAt(int64_t position, int64_t nbytes, + bool allow_short_read) { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); - ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, - ReadAt(position, nbytes, buffer->mutable_data())); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, allow_short_read, + buffer->mutable_data())); if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); buffer->ZeroPadding(); } - // R build with openSUSE155 requires an explicit shared_ptr construction - return std::shared_ptr(std::move(buffer)); + return buffer; } Status WillNeed(const std::vector& ranges) { @@ -322,12 +328,14 @@ Result ReadableFile::DoRead(int64_t nbytes, void* out) { return impl_->Read(nbytes, out); } -Result ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) { - return impl_->ReadAt(position, nbytes, out); +Result ReadableFile::DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read, void* out) { + return impl_->ReadAt(position, nbytes, allow_short_read, out); } -Result> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) { - return impl_->ReadBufferAt(position, nbytes); +Result> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) { + return impl_->ReadBufferAt(position, nbytes, allow_short_read); } Result> ReadableFile::DoRead(int64_t nbytes) { diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 50d4f2c4dfc9..a25305b036de 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -124,10 +124,12 @@ class ARROW_EXPORT ReadableFile Result> DoRead(int64_t nbytes); /// \brief Thread-safe implementation of ReadAt - Result DoReadAt(int64_t position, int64_t nbytes, void* out); + Result DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out); /// \brief Thread-safe implementation of ReadAt - Result> DoReadAt(int64_t position, int64_t nbytes); + Result> DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read); Result DoGetSize(); Status DoSeek(int64_t position); diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index 8970dfe7cc43..1e2de3f07a5d 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -31,6 +31,7 @@ #include #include +#include #include #include "arrow/buffer.h" @@ -399,12 +400,18 @@ TEST_F(TestReadableFile, ReadAsync) { MakeTestFile(); OpenFile(); - auto fut1 = file_->ReadAsync({}, 1, 10); - auto fut2 = file_->ReadAsync({}, 0, 4); + auto fut1 = file_->ReadAsync(default_io_context(), 1, 10); + auto fut2 = file_->ReadAsync(default_io_context(), 0, 4); + auto fut3 = file_->ReadAsync(default_io_context(), 1, 10, /*allow_short_read=*/false); + auto fut4 = file_->ReadAsync(default_io_context(), 0, 4, /*allow_short_read=*/false); ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result()); ASSERT_OK_AND_ASSIGN(auto buf2, fut2.result()); + EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, ::testing::HasSubstr("File too short"), + fut3.result()); + ASSERT_OK_AND_ASSIGN(auto buf4, fut4.result()); AssertBufferEqual(*buf1, "estdata"); AssertBufferEqual(*buf2, "test"); + AssertBufferEqual(*buf4, "test"); } TEST_F(TestReadableFile, ReadManyAsync) { diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index cdd2470b629c..41a7fdecb229 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -149,12 +149,33 @@ RandomAccessFile::~RandomAccessFile() = default; RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {} +Result RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read, void* out) { + ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ReadAt(position, nbytes, out)); + if (!allow_short_read && real_nbytes != nbytes) { + return Status::IOError("File too short: expected to be able to read ", nbytes, + " bytes, got ", real_nbytes); + } + return real_nbytes; +} + Result RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) { std::lock_guard lock(interface_impl_->lock_); RETURN_NOT_OK(Seek(position)); return Read(nbytes, out); } +Result> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(position, nbytes)); + // XXX the internal `IoRecordedRandomAccessFile` can return a null buffer + if (!allow_short_read && buffer && buffer->size() != nbytes) { + return Status::IOError("File too short: expected to be able to read ", nbytes, + " bytes, got ", buffer->size()); + } + return buffer; +} + Result> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes) { std::lock_guard lock(interface_impl_->lock_); @@ -162,25 +183,39 @@ Result> RandomAccessFile::ReadAt(int64_t position, return Read(nbytes); } -// Default ReadAsync() implementation: simply issue the read on the context's executor Future> RandomAccessFile::ReadAsync(const IOContext& ctx, int64_t position, int64_t nbytes) { + return ReadAsync(ctx, position, nbytes, /*allow_short_read=*/true); +} + +// Default ReadAsync() implementation: simply issue the read on the context's executor +Future> RandomAccessFile::ReadAsync(const IOContext& ctx, + int64_t position, + int64_t nbytes, + bool allow_short_read) { auto self = std::dynamic_pointer_cast(shared_from_this()); - return DeferNotOk(internal::SubmitIO( - ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); })); + return DeferNotOk(internal::SubmitIO(ctx, [self, position, nbytes, allow_short_read] { + return self->ReadAt(position, nbytes, allow_short_read); + })); } Future> RandomAccessFile::ReadAsync(int64_t position, int64_t nbytes) { - return ReadAsync(io_context(), position, nbytes); + return ReadAsync(io_context(), position, nbytes, /*allow_short_read=*/true); +} + +Future> RandomAccessFile::ReadAsync(int64_t position, + int64_t nbytes, + bool allow_short_read) { + return ReadAsync(io_context(), position, nbytes, allow_short_read); } std::vector>> RandomAccessFile::ReadManyAsync( const IOContext& ctx, const std::vector& ranges) { std::vector>> ret; for (auto r : ranges) { - ret.push_back(this->ReadAsync(ctx, r.offset, r.length)); + ret.push_back(this->ReadAsync(ctx, r.offset, r.length, /*allow_short_read=*/false)); } return ret; } diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index b36c38c6d486..8cb982470715 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -267,8 +267,9 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// \brief Read data from given file position. /// - /// At most `nbytes` bytes are read. The number of bytes read is returned - /// (it can be less than `nbytes` if EOF is reached). + /// At most `nbytes` bytes are read. The number of bytes read is returned. + /// If `allow_short_read` is true, the number of bytes read can be less than + /// `nbytes` if EOF is reached, otherwise an error is returned. /// /// This method can be safely called from multiple threads concurrently. /// It is unspecified whether this method updates the file position or not. @@ -279,13 +280,40 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// /// \param[in] position Where to read bytes from /// \param[in] nbytes The number of bytes to read + /// \param[in] allow_short_read Whether to allow reading less than `nbytes` + /// \param[out] out The buffer to read bytes into + /// \return The number of bytes read, or an error + virtual Result ReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out); + + /// \brief Read data from given file position. + /// + /// Like `ReadAt(position, nbytes, allow_short_read, out)` with `allow_short_read` + /// set to true. + /// + /// \param[in] position Where to read bytes from + /// \param[in] nbytes The number of bytes to read /// \param[out] out The buffer to read bytes into /// \return The number of bytes read, or an error virtual Result ReadAt(int64_t position, int64_t nbytes, void* out); /// \brief Read data from given file position. /// - /// At most `nbytes` bytes are read, but it can be less if EOF is reached. + /// At most `nbytes` bytes are read. If `allow_short_read` is true, the + /// number of bytes read can be less than `nbytes` if EOF is reached, + /// otherwise an error is returned. + /// + /// \param[in] position Where to read bytes from + /// \param[in] nbytes The number of bytes to read + /// \param[in] allow_short_read Whether to allow reading less than `nbytes` + /// \return A buffer containing the bytes read, or an error + virtual Result> ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read); + + /// \brief Read data from given file position. + /// + /// Like `ReadAt(position, nbytes, allow_short_read)` with `allow_short_read` + /// set to true. /// /// \param[in] position Where to read bytes from /// \param[in] nbytes The number of bytes to read @@ -293,10 +321,15 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { virtual Result> ReadAt(int64_t position, int64_t nbytes); /// EXPERIMENTAL: Read data asynchronously. + virtual Future> ReadAsync(const IOContext&, int64_t position, + int64_t nbytes, + bool allow_short_read); virtual Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes); /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. + Future> ReadAsync(int64_t position, int64_t nbytes, + bool allow_short_read); Future> ReadAsync(int64_t position, int64_t nbytes); /// EXPERIMENTAL: Explicit multi-read. diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index d7b118b39827..dcbb048c7eb0 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -317,47 +317,69 @@ Status BufferReader::WillNeed(const std::vector& ranges) { Future> BufferReader::ReadAsync(const IOContext&, int64_t position, int64_t nbytes) { - return Future>::MakeFinished(DoReadAt(position, nbytes)); + return Future>::MakeFinished( + DoReadAt(position, nbytes, /*allow_short_read=*/true)); } -Result BufferReader::DoReadAt(int64_t position, int64_t nbytes, void* buffer) { +Future> BufferReader::ReadAsync(const IOContext&, + int64_t position, int64_t nbytes, + bool allow_short_read) { + return Future>::MakeFinished( + DoReadAt(position, nbytes, allow_short_read)); +} + +Result BufferReader::DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read, void* buffer) { RETURN_NOT_OK(CheckClosed()); - ARROW_ASSIGN_OR_RAISE(nbytes, internal::ValidateReadRange(position, nbytes, size_)); + ARROW_ASSIGN_OR_RAISE(auto real_nbytes, + internal::ValidateReadRange(position, nbytes, size_)); DCHECK_GE(nbytes, 0); - if (nbytes) { - memcpy(buffer, data_ + position, nbytes); + if (!allow_short_read && real_nbytes != nbytes) { + return Status::IOError("File too short: expected to be able to read ", nbytes, + " bytes, got ", real_nbytes); } - return nbytes; + if (real_nbytes) { + memcpy(buffer, data_ + position, real_nbytes); + } + return real_nbytes; } -Result> BufferReader::DoReadAt(int64_t position, int64_t nbytes) { +Result> BufferReader::DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) { RETURN_NOT_OK(CheckClosed()); - ARROW_ASSIGN_OR_RAISE(nbytes, internal::ValidateReadRange(position, nbytes, size_)); - DCHECK_GE(nbytes, 0); + ARROW_ASSIGN_OR_RAISE(auto real_nbytes, + internal::ValidateReadRange(position, nbytes, size_)); + DCHECK_GE(real_nbytes, 0); + if (!allow_short_read && real_nbytes != nbytes) { + return Status::IOError("File too short: expected to be able to read ", nbytes, + " bytes, got ", real_nbytes); + } // Arrange for data to be paged in // RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed( // {{const_cast(data_ + position), static_cast(nbytes)}})); - if (nbytes > 0 && buffer_ != nullptr) { - return SliceBuffer(buffer_, position, nbytes); + if (real_nbytes > 0 && buffer_ != nullptr) { + return SliceBuffer(buffer_, position, real_nbytes); } else { - return std::make_shared(data_ + position, nbytes); + return std::make_shared(data_ + position, real_nbytes); } } Result BufferReader::DoRead(int64_t nbytes, void* out) { RETURN_NOT_OK(CheckClosed()); - ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, out)); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + DoReadAt(position_, nbytes, /*allow_short_read=*/true, out)); position_ += bytes_read; return bytes_read; } Result> BufferReader::DoRead(int64_t nbytes) { RETURN_NOT_OK(CheckClosed()); - ARROW_ASSIGN_OR_RAISE(auto buffer, DoReadAt(position_, nbytes)); + ARROW_ASSIGN_OR_RAISE(auto buffer, + DoReadAt(position_, nbytes, /*allow_short_read=*/true)); position_ += buffer->size(); return buffer; } diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 5ce0204654d0..77197bbe6e6e 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -162,6 +162,9 @@ class ARROW_EXPORT BufferReader // Synchronous ReadAsync override Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes) override; + Future> ReadAsync(const IOContext&, int64_t position, + int64_t nbytes, + bool allow_short_read) override; Status WillNeed(const std::vector& ranges) override; protected: @@ -171,8 +174,10 @@ class ARROW_EXPORT BufferReader Result DoRead(int64_t nbytes, void* buffer); Result> DoRead(int64_t nbytes); - Result DoReadAt(int64_t position, int64_t nbytes, void* out); - Result> DoReadAt(int64_t position, int64_t nbytes); + Result DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out); + Result> DoReadAt(int64_t position, int64_t nbytes, + bool allow_short_read); Result DoPeek(int64_t nbytes) override; Result DoTell() const; diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index 1b2c7bdbf393..3e95e5257a97 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -733,9 +733,10 @@ class CountingBufferReader : public BufferReader { public: using BufferReader::BufferReader; Future> ReadAsync(const IOContext& context, int64_t position, - int64_t nbytes) override { + int64_t nbytes, + bool allow_short_read) override { read_count_++; - return BufferReader::ReadAsync(context, position, nbytes); + return BufferReader::ReadAsync(context, position, nbytes, allow_short_read); } int64_t read_count() const { return read_count_; } diff --git a/cpp/src/arrow/io/slow.cc b/cpp/src/arrow/io/slow.cc index 7c11a484fc1e..44a50c5c25e6 100644 --- a/cpp/src/arrow/io/slow.cc +++ b/cpp/src/arrow/io/slow.cc @@ -134,12 +134,25 @@ Result SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, return stream_->ReadAt(position, nbytes, out); } +Result SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read, void* out) { + latencies_->Sleep(); + return stream_->ReadAt(position, nbytes, allow_short_read, out); +} + Result> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes) { latencies_->Sleep(); return stream_->ReadAt(position, nbytes); } +Result> SlowRandomAccessFile::ReadAt(int64_t position, + int64_t nbytes, + bool allow_short_read) { + latencies_->Sleep(); + return stream_->ReadAt(position, nbytes, allow_short_read); +} + Result SlowRandomAccessFile::Peek(int64_t nbytes) { return stream_->Peek(nbytes); } diff --git a/cpp/src/arrow/io/slow.h b/cpp/src/arrow/io/slow.h index fdcc56dfa6af..cf8c02c88c82 100644 --- a/cpp/src/arrow/io/slow.h +++ b/cpp/src/arrow/io/slow.h @@ -106,7 +106,11 @@ class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase Read(int64_t nbytes, void* out) override; Result> Read(int64_t nbytes) override; Result ReadAt(int64_t position, int64_t nbytes, void* out) override; + Result ReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out) override; Result> ReadAt(int64_t position, int64_t nbytes) override; + Result> ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) override; Result Peek(int64_t nbytes) override; Result GetSize() override; diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc index b3cfdd0eb210..f3f5073fd67e 100644 --- a/cpp/src/arrow/io/test_common.cc +++ b/cpp/src/arrow/io/test_common.cc @@ -136,13 +136,22 @@ class TrackedRandomAccessFileImpl : public TrackedRandomAccessFile { return delegate_->ReadAt(position, nbytes, out); } Result> ReadAt(int64_t position, int64_t nbytes) override { + return ReadAt(position, nbytes, /*allow_short_read=*/true); + } + Result> ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) override { SaveReadRange(position, nbytes); - return delegate_->ReadAt(position, nbytes); + return delegate_->ReadAt(position, nbytes, allow_short_read); } Future> ReadAsync(const io::IOContext& io_context, int64_t position, int64_t nbytes) override { + return ReadAsync(io_context, position, nbytes, /*allow_short_read=*/true); + } + Future> ReadAsync(const io::IOContext& io_context, + int64_t position, int64_t nbytes, + bool allow_short_read) override { SaveReadRange(position, nbytes); - return delegate_->ReadAsync(io_context, position, nbytes); + return delegate_->ReadAsync(io_context, position, nbytes, allow_short_read); } int64_t num_reads() const override { return read_ranges_.size(); } diff --git a/cpp/src/arrow/io/type_fwd.h b/cpp/src/arrow/io/type_fwd.h index a1b9e626bba2..40775eeaf407 100644 --- a/cpp/src/arrow/io/type_fwd.h +++ b/cpp/src/arrow/io/type_fwd.h @@ -29,6 +29,7 @@ struct FileMode { struct IOContext; struct CacheOptions; +struct ReadRange; /// EXPERIMENTAL: convenience global singleton for default IOContext settings ARROW_EXPORT diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index 6aceaa7f4480..54f16103aeb3 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -154,7 +154,8 @@ class ReaderV1 : public Reader { int footer_size = magic_size + static_cast(sizeof(uint32_t)); // Now get the footer and verify - ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(size - footer_size, footer_size)); + ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(size - footer_size, footer_size, + /*allow_short_read=*/false)); if (memcmp(buffer->data() + sizeof(uint32_t), kFeatherV1MagicBytes, magic_size)) { return Status::Invalid("Feather file footer incomplete"); @@ -164,9 +165,9 @@ class ReaderV1 : public Reader { if (size < magic_size + footer_size + metadata_length) { return Status::Invalid("File is smaller than indicated metadata size"); } - ARROW_ASSIGN_OR_RAISE( - metadata_buffer_, - source->ReadAt(size - footer_size - metadata_length, metadata_length)); + ARROW_ASSIGN_OR_RAISE(metadata_buffer_, + source->ReadAt(size - footer_size - metadata_length, + metadata_length, /*allow_short_read=*/false)); metadata_ = fbs::GetCTable(metadata_buffer_->data()); return ReadSchema(); @@ -273,8 +274,9 @@ class ReaderV1 : public Reader { // Buffer data from the source (may or may not perform a copy depending on // input source) - ARROW_ASSIGN_OR_RAISE(auto buffer, - source_->ReadAt(meta->offset(), meta->total_bytes())); + ARROW_ASSIGN_OR_RAISE( + auto buffer, + source_->ReadAt(meta->offset(), meta->total_bytes(), /*allow_short_read=*/false)); int64_t offset = 0; @@ -783,7 +785,8 @@ Result> Reader::Open( // Determine what kind of file we have. 6 is the max of len(FEA1) and // len(ARROW1) constexpr int magic_size = 6; - ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(0, magic_size)); + ARROW_ASSIGN_OR_RAISE(auto buffer, + source->ReadAt(0, magic_size, /*allow_short_read=*/false)); if (memcmp(buffer->data(), kFeatherV1MagicBytes, strlen(kFeatherV1MagicBytes)) == 0) { std::shared_ptr result = std::make_shared(); diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 84ee62fe9e8d..1fef961ff8b3 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -233,11 +233,8 @@ Result> Message::ReadFrom(const int64_t offset, MessageDecoder decoder(listener, MessageDecoder::State::METADATA, metadata->size()); ARROW_RETURN_NOT_OK(decoder.Consume(metadata)); - ARROW_ASSIGN_OR_RAISE(auto body, file->ReadAt(offset, decoder.next_required_size())); - if (body->size() < decoder.next_required_size()) { - return Status::IOError("Expected to be able to read ", decoder.next_required_size(), - " bytes for message body, got ", body->size()); - } + ARROW_ASSIGN_OR_RAISE(auto body, file->ReadAt(offset, decoder.next_required_size(), + /*allow_short_read=*/false)); RETURN_NOT_OK(decoder.Consume(body)); return result; } @@ -383,13 +380,8 @@ static Result> ReadMessageInternal( // When body_length is known, read metadata + body in one IO call. // Otherwise, read only metadata first. ARROW_ASSIGN_OR_RAISE(std::shared_ptr metadata, - file->ReadAt(offset, metadata_length + body_length.value_or(0))); - - if (metadata->size() < metadata_length) { - return Status::Invalid("Expected to read ", metadata_length, - " metadata bytes at offset ", offset, " but got ", - metadata->size()); - } + file->ReadAt(offset, metadata_length + body_length.value_or(0), + /*allow_short_read=*/false)); ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length))); @@ -415,21 +407,22 @@ static Result> ReadMessageInternal( decoder.next_required_size(), body)); } else if (body_length.has_value()) { // Body was already read as part of the combined IO; just slice it out. + if (*body_length != decoder.next_required_size()) { + // The streaming decoder got out of sync with the actual advertised + // metadata and body size, which signals an invalid IPC file. + return Status::IOError("Invalid IPC file: advertised body size is ", + *body_length, ", but message decoder expects to read ", + decoder.next_required_size(), " bytes instead"); + } body = SliceBuffer(metadata, metadata_length, std::min(*body_length, metadata->size() - metadata_length)); } else { // Body length was unknown; do a separate IO to read the body. ARROW_ASSIGN_OR_RAISE( - body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); + body, file->ReadAt(offset + metadata_length, decoder.next_required_size(), + /*allow_short_read=*/false)); } - if (body->size() != decoder.next_required_size()) { - // The streaming decoder got out of sync with the actual advertised - // metadata and body size, which signals an invalid IPC file. - return Status::IOError("Invalid IPC file: advertised body size is ", body->size(), - ", but message decoder expects to read ", - decoder.next_required_size(), " bytes instead"); - } RETURN_NOT_OK(decoder.Consume(body)); return result; } @@ -472,12 +465,11 @@ Future> ReadMessageAsync(int64_t offset, int32_t metada return Status::Invalid("metadata_length should be at least ", state->decoder->next_required_size()); } - return file->ReadAsync(context, offset, metadata_length + body_length) + return file + ->ReadAsync(context, offset, metadata_length + body_length, + /*allow_short_read=*/false) .Then([=](std::shared_ptr metadata) -> Result> { - if (metadata->size() < metadata_length) { - return Status::Invalid("Expected to read ", metadata_length, - " metadata bytes but got ", metadata->size()); - } + DCHECK_EQ(metadata->size(), metadata_length + body_length); ARROW_RETURN_NOT_OK( state->decoder->Consume(SliceBuffer(metadata, 0, metadata_length))); switch (state->decoder->state()) { diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index f7b9c779abbc..512305d6570a 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -210,7 +210,7 @@ class ArrayLoader { " did not start on 8-byte aligned offset: ", offset); } if (file_) { - return file_->ReadAt(offset, length).Value(out); + return file_->ReadAt(offset, length, /*allow_short_read=*/false).Value(out); } else { if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) { return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area"); @@ -1874,19 +1874,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::Invalid("File is too small: ", footer_offset_); } - int file_end_size = static_cast(kMagicSize + sizeof(int32_t)); + constexpr int64_t kTotalMagicSize = kMagicSize + sizeof(int32_t); auto self = std::dynamic_pointer_cast(shared_from_this()); - auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size); + auto read_magic = file_->ReadAsync(footer_offset_ - kTotalMagicSize, kTotalMagicSize, + /*allow_short_read=*/false); if (executor) read_magic = executor->Transfer(std::move(read_magic)); return read_magic .Then([=](const std::shared_ptr& buffer) -> Future> { - const int64_t expected_footer_size = kMagicSize + sizeof(int32_t); - if (buffer->size() < expected_footer_size) { - return Status::Invalid("Unable to read ", expected_footer_size, - "from end of file"); - } - + DCHECK_EQ(buffer->size(), kTotalMagicSize); const auto magic_start = buffer->data() + sizeof(int32_t); if (std::string_view(reinterpret_cast(magic_start), kMagicSize) != kArrowMagicBytes) { @@ -1903,7 +1899,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { // Now read the footer auto read_footer = self->file_->ReadAsync( - self->footer_offset_ - footer_length - file_end_size, footer_length); + self->footer_offset_ - footer_length - kTotalMagicSize, footer_length, + /*allow_short_read=*/false); if (executor) read_footer = executor->Transfer(std::move(read_footer)); return read_footer; }) @@ -2293,7 +2290,8 @@ Result> ReadSparseCOOIndex( auto* indices_buffer = sparse_index->indicesBuffer(); ARROW_ASSIGN_OR_RAISE(auto indices_data, - file->ReadAt(indices_buffer->offset(), indices_buffer->length())); + file->ReadAt(indices_buffer->offset(), indices_buffer->length(), + /*allow_short_read=*/false)); std::vector indices_shape({non_zero_length, ndim}); auto* indices_strides = sparse_index->indicesStrides(); std::vector strides(2); @@ -2329,11 +2327,13 @@ Result> ReadSparseCSXIndex( auto* indptr_buffer = sparse_index->indptrBuffer(); ARROW_ASSIGN_OR_RAISE(auto indptr_data, - file->ReadAt(indptr_buffer->offset(), indptr_buffer->length())); + file->ReadAt(indptr_buffer->offset(), indptr_buffer->length(), + /*allow_short_read=*/false)); auto* indices_buffer = sparse_index->indicesBuffer(); ARROW_ASSIGN_OR_RAISE(auto indices_data, - file->ReadAt(indices_buffer->offset(), indices_buffer->length())); + file->ReadAt(indices_buffer->offset(), indices_buffer->length(), + /*allow_short_read=*/false)); std::vector indices_shape({non_zero_length}); const auto indices_minimum_bytes = indices_shape[0] * indices_type->byte_width(); @@ -2384,12 +2384,13 @@ Result> ReadSparseCSFIndex( sparse_index, &axis_order, &indices_size, &indptr_type, &indices_type)); for (int i = 0; i < static_cast(indptr_buffers->size()); ++i) { ARROW_ASSIGN_OR_RAISE(indptr_data[i], file->ReadAt(indptr_buffers->Get(i)->offset(), - indptr_buffers->Get(i)->length())); + indptr_buffers->Get(i)->length(), + /*allow_short_read=*/false)); } for (int i = 0; i < static_cast(indices_buffers->size()); ++i) { - ARROW_ASSIGN_OR_RAISE(indices_data[i], - file->ReadAt(indices_buffers->Get(i)->offset(), - indices_buffers->Get(i)->length())); + ARROW_ASSIGN_OR_RAISE(indices_data[i], file->ReadAt(indices_buffers->Get(i)->offset(), + indices_buffers->Get(i)->length(), + /*allow_short_read=*/false)); } return SparseCSFIndex::Make(indptr_type, indices_type, indices_size, axis_order, @@ -2619,7 +2620,8 @@ Result> ReadSparseTensor(const Buffer& metadata, &non_zero_length, &sparse_tensor_format_id, &sparse_tensor, &buffer)); - ARROW_ASSIGN_OR_RAISE(auto data, file->ReadAt(buffer->offset(), buffer->length())); + ARROW_ASSIGN_OR_RAISE(auto data, file->ReadAt(buffer->offset(), buffer->length(), + /*allow_short_read=*/false)); std::shared_ptr sparse_index; switch (sparse_tensor_format_id) { @@ -2913,8 +2915,12 @@ Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) { Result IoRecordedRandomAccessFile::GetSize() { return file_size_; } Result IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, - void* out) { + bool allow_short_read, void* out) { auto num_bytes_read = std::min(file_size_, position + nbytes) - position; + if (!allow_short_read && num_bytes_read != nbytes) { + return Status::IOError("File too short: expected to be able to read ", nbytes, + " bytes, got ", num_bytes_read); + } if (!read_ranges_.empty() && position == read_ranges_.back().offset + read_ranges_.back().length) { @@ -2927,11 +2933,11 @@ Result IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t nby return num_bytes_read; } -Result> IoRecordedRandomAccessFile::ReadAt(int64_t position, - int64_t nbytes) { - std::shared_ptr out; - auto result = ReadAt(position, nbytes, &out); - return out; +Result> IoRecordedRandomAccessFile::ReadAt( + int64_t position, int64_t nbytes, bool allow_short_read) { + // We're not supposed to actually read anything, so pass a null output pointer. + RETURN_NOT_OK(ReadAt(position, nbytes, allow_short_read, /*out=*/nullptr)); + return nullptr; } Status IoRecordedRandomAccessFile::Close() { @@ -2958,6 +2964,7 @@ Result IoRecordedRandomAccessFile::Read(int64_t nbytes, void* out) { Result> IoRecordedRandomAccessFile::Read(int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, ReadAt(position_, nbytes)); + // Cannot use buffer->size() since a null buffer is returned... auto num_bytes_read = std::min(file_size_, position_ + nbytes) - position_; position_ += num_bytes_read; return buffer; diff --git a/cpp/src/arrow/ipc/reader_internal.h b/cpp/src/arrow/ipc/reader_internal.h index a71d070bb0d0..d61ac96f0f14 100644 --- a/cpp/src/arrow/ipc/reader_internal.h +++ b/cpp/src/arrow/ipc/reader_internal.h @@ -36,7 +36,7 @@ namespace internal { /// \brief An RandomAccessFile that doesn't perform real IO, but only save all the IO /// operations it receives, including read operation's , for replaying /// later -class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile { +class ARROW_EXPORT IoRecordedRandomAccessFile final : public io::RandomAccessFile { public: explicit IoRecordedRandomAccessFile(const int64_t file_size) : file_size_(file_size), position_(0) {} @@ -55,9 +55,17 @@ class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile { Result GetSize() override; - Result ReadAt(int64_t position, int64_t nbytes, void* out) override; - - Result> ReadAt(int64_t position, int64_t nbytes) override; + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + return ReadAt(position, nbytes, /*allow_short_read=*/true, out); + } + Result ReadAt(int64_t position, int64_t nbytes, bool allow_short_read, + void* out) override; + + Result> ReadAt(int64_t position, int64_t nbytes) override { + return ReadAt(position, nbytes, /*allow_short_read=*/true); + } + Result> ReadAt(int64_t position, int64_t nbytes, + bool allow_short_read) override; Result Read(int64_t nbytes, void* out) override; diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index e14e0558d31a..cabcd5b4a5d7 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "arrow/util/bit_util.h" #include "arrow/util/logging.h" diff --git a/cpp/src/parquet/bloom_filter_reader.h b/cpp/src/parquet/bloom_filter_reader.h index cbd267dd1972..46e046156da7 100644 --- a/cpp/src/parquet/bloom_filter_reader.h +++ b/cpp/src/parquet/bloom_filter_reader.h @@ -17,7 +17,7 @@ #pragma once -#include "arrow/io/interfaces.h" +#include "arrow/io/type_fwd.h" #include "parquet/properties.h" #include "parquet/type_fwd.h" diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index 1b3d4cfd413c..f7dbb2526a76 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -26,6 +26,8 @@ #include #include +#include "arrow/buffer.h" + #include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.cc b/cpp/src/parquet/encryption/file_system_key_material_store.cc index 7a7db3fa625e..fb8c92ceafee 100644 --- a/cpp/src/parquet/encryption/file_system_key_material_store.cc +++ b/cpp/src/parquet/encryption/file_system_key_material_store.cc @@ -17,6 +17,7 @@ #include +#include "arrow/buffer.h" #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/path_util.h" #include "arrow/json/object_parser.h" diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.h b/cpp/src/parquet/encryption/internal_file_decryptor.h index 1343769ef364..baf2fb646a79 100644 --- a/cpp/src/parquet/encryption/internal_file_decryptor.h +++ b/cpp/src/parquet/encryption/internal_file_decryptor.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index af7ccfd7ad7d..d0552adcee53 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -448,8 +448,9 @@ class SerializedFile : public ParquetFileReader::Contents { metadata_buffer = SliceBuffer( footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len); } else { - PARQUET_ASSIGN_OR_THROW(metadata_buffer, - source_->ReadAt(metadata_start, metadata_len)); + PARQUET_ASSIGN_OR_THROW( + metadata_buffer, + source_->ReadAt(metadata_start, metadata_len, /*allow_short_read=*/false)); } // Parse the footer depending on encryption type @@ -464,8 +465,9 @@ class SerializedFile : public ParquetFileReader::Contents { // Read the actual footer metadata_start = read_size.first; metadata_len = read_size.second; - PARQUET_ASSIGN_OR_THROW(metadata_buffer, - source_->ReadAt(metadata_start, metadata_len)); + PARQUET_ASSIGN_OR_THROW( + metadata_buffer, + source_->ReadAt(metadata_start, metadata_len, /*allow_short_read=*/false)); // Fall through } @@ -535,7 +537,8 @@ class SerializedFile : public ParquetFileReader::Contents { std::move(metadata_buffer), footer_read_size, metadata_len); } - return source_->ReadAsync(metadata_start, metadata_len) + return source_ + ->ReadAsync(metadata_start, metadata_len, /*allow_short_read=*/false) .Then([this, footer_buffer, footer_read_size, metadata_len]( const std::shared_ptr<::arrow::Buffer>& metadata_buffer) { return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer, @@ -563,7 +566,7 @@ class SerializedFile : public ParquetFileReader::Contents { // Read the actual footer int64_t metadata_start = read_size.first; metadata_len = read_size.second; - return source_->ReadAsync(metadata_start, metadata_len) + return source_->ReadAsync(metadata_start, metadata_len, /*allow_short_read=*/false) .Then([this, metadata_len, is_encrypted_footer, file_decryptor = std::move(file_decryptor)]( const std::shared_ptr<::arrow::Buffer>& metadata_buffer) { diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index c06fc77dc536..7434f2828da2 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -15,24 +15,25 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/page_index.h" +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/util/int_util_overflow.h" +#include "arrow/util/logging_internal.h" +#include "arrow/util/unreachable.h" + #include "parquet/encoding.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_decryptor.h" #include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" -#include "arrow/util/int_util_overflow.h" -#include "arrow/util/logging_internal.h" -#include "arrow/util/unreachable.h" - -#include -#include - namespace parquet { namespace { @@ -348,11 +349,8 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { std::shared_ptr ReadIndexBuffer(int64_t offset, int64_t length, const char* offset_kind) { - PARQUET_ASSIGN_OR_THROW(auto buffer, input_->ReadAt(offset, length)); - if (buffer->size() < length) { - throw ParquetException("Invalid or truncated ", offset_kind, ": attempted to read ", - length, " bytes but got only ", buffer->size(), " bytes"); - } + PARQUET_ASSIGN_OR_THROW(auto buffer, + input_->ReadAt(offset, length, /*allow_short_read=*/false)); return buffer; } diff --git a/cpp/src/parquet/platform.cc b/cpp/src/parquet/platform.cc index 98946029fb86..5e1eac6f123d 100644 --- a/cpp/src/parquet/platform.cc +++ b/cpp/src/parquet/platform.cc @@ -21,6 +21,7 @@ #include #include +#include "arrow/buffer.h" #include "arrow/io/memory.h" #include "parquet/exception.h" diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h index 92849347d4e9..e3d3d6691aec 100644 --- a/cpp/src/parquet/platform.h +++ b/cpp/src/parquet/platform.h @@ -20,11 +20,11 @@ #include #include -#include "arrow/buffer.h" // IWYU pragma: export -#include "arrow/io/interfaces.h" // IWYU pragma: export +#include "arrow/io/type_fwd.h" // IWYU pragma: export #include "arrow/status.h" // IWYU pragma: export #include "arrow/type_fwd.h" // IWYU pragma: export #include "arrow/util/macros.h" // IWYU pragma: export +#include "arrow/util/type_fwd.h" // IWYU pragma: export #if defined(_WIN32) || defined(__CYGWIN__) diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index 94024ad403b6..6e3cfddd7bb8 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -40,14 +40,8 @@ std::shared_ptr ReaderProperties::GetStream( safe_stream, num_bytes)); return stream; } else { - PARQUET_ASSIGN_OR_THROW(auto data, source->ReadAt(start, num_bytes)); - - if (data->size() != num_bytes) { - std::stringstream ss; - ss << "Tried reading " << num_bytes << " bytes starting at position " << start - << " from file but only got " << data->size(); - throw ParquetException(ss.str()); - } + PARQUET_ASSIGN_OR_THROW(auto data, + source->ReadAt(start, num_bytes, /*allow_short_read=*/false)); return std::make_shared<::arrow::io::BufferReader>(data); } } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index ee829c4dbc53..43657410df75 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -23,6 +23,7 @@ #include #include +#include "arrow/buffer.h" #include "arrow/io/caching.h" #include "arrow/type_fwd.h" #include "arrow/util/compression.h" diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc index a0df0d30487c..0743b7ad4de9 100644 --- a/cpp/src/parquet/properties_test.cc +++ b/cpp/src/parquet/properties_test.cc @@ -147,8 +147,7 @@ TEST(TestReaderProperties, GetStreamInsufficientData) { FAIL() << "No exception raised"; } catch (const ParquetException& e) { std::string ex_what = - ("Tried reading 15 bytes starting at position 12" - " from file but only got 9"); + "IOError: File too short: expected to be able to read 15 bytes, got 9"; ASSERT_EQ(ex_what, e.what()); } } diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc index 9b5435e026f8..09c48ef0ff69 100644 --- a/cpp/src/parquet/statistics.cc +++ b/cpp/src/parquet/statistics.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include #include diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h index c80fb8e3b528..796a889c3f40 100644 --- a/cpp/src/parquet/statistics.h +++ b/cpp/src/parquet/statistics.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include diff --git a/testing b/testing index 190638e1b14a..4080a40f573c 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 190638e1b14af926601dbe0a95caa1940dafedd8 +Subproject commit 4080a40f573c18beca92fc0db68fb322710f5ebb