Skip to content

Commit 5a331a9

Browse files
authored
GH-49896: [C++] Reject short buffer reads in IPC reader (#49897)
### Rationale for this change IO methods like `ReadAt` can return less bytes than asked for if the file is too short, but the IPC reader doesn't always detect for this situation. On invalid IPC files, this can produce issues down the road such as half-initialized buffers and large processing times (with a potential denial of service). This issue was detected by OSS-Fuzz: https://issues.oss-fuzz.com/issues/489758017 ### What changes are included in this PR? 1. Add `ReadAt` and `ReadAsync` overloads that accept a `bool allow_short_read` argument 2. Pass `allow_short_read = false` in all suitable places in IPC and Parquet readers ### Are these changes tested? Yes, by existing tests and new fuzz regression file. ### Are there any user-facing changes? No, except potentially better detection of invalid IPC streams and files. * GitHub Issue: #49896 Authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent 336af36 commit 5a331a9

36 files changed

Lines changed: 332 additions & 156 deletions

cpp/src/arrow/dataset/file_parquet_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class DelayedBufferReader : public ::arrow::io::BufferReader {
148148
return DeferNotOk(::arrow::io::internal::SubmitIO(
149149
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
150150
std::this_thread::sleep_for(std::chrono::seconds(1));
151-
return self->DoReadAt(position, nbytes);
151+
return self->DoReadAt(position, nbytes, /*allow_short_read=*/false);
152152
}));
153153
}
154154

cpp/src/arrow/gpu/cuda_memory.cc

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,28 +260,38 @@ Status CudaBufferReader::DoSeek(int64_t position) {
260260
}
261261

262262
Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
263-
void* buffer) {
263+
bool allow_short_read, void* buffer) {
264264
RETURN_NOT_OK(CheckClosed());
265265

266-
nbytes = std::min(nbytes, size_ - position);
267-
RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
268-
return nbytes;
266+
auto real_nbytes = std::min(nbytes, size_ - position);
267+
if (!allow_short_read && real_nbytes != nbytes) {
268+
return Status::IOError("Cuda buffer too short: expected to be able to read ", nbytes,
269+
" bytes, got ", real_nbytes);
270+
}
271+
RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, real_nbytes));
272+
return real_nbytes;
269273
}
270274

271275
Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
272276
RETURN_NOT_OK(CheckClosed());
273277

274-
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
278+
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
279+
DoReadAt(position_, nbytes, /*allow_short_read=*/true, buffer));
275280
position_ += bytes_read;
276281
return bytes_read;
277282
}
278283

279284
Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
280-
int64_t nbytes) {
285+
int64_t nbytes,
286+
bool allow_short_read) {
281287
RETURN_NOT_OK(CheckClosed());
282288

283-
int64_t size = std::min(nbytes, size_ - position);
284-
return std::make_shared<CudaBuffer>(buffer_, position, size);
289+
auto real_nbytes = std::min(nbytes, size_ - position);
290+
if (!allow_short_read && real_nbytes != nbytes) {
291+
return Status::IOError("Cuda buffer too short: expected to be able to read ", nbytes,
292+
" bytes, got ", real_nbytes);
293+
}
294+
return std::make_shared<CudaBuffer>(buffer_, position, real_nbytes);
285295
}
286296

287297
Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {

cpp/src/arrow/gpu/cuda_memory.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,10 @@ class ARROW_CUDA_EXPORT CudaBufferReader
177177

178178
Result<int64_t> DoRead(int64_t nbytes, void* buffer);
179179
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
180-
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
181-
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
180+
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
181+
void* out);
182+
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
183+
bool allow_short_read);
182184

183185
Result<int64_t> DoTell() const;
184186
Status DoSeek(int64_t position);

cpp/src/arrow/io/caching.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ struct ReadRangeCache::Impl {
167167
std::vector<RangeCacheEntry> new_entries;
168168
new_entries.reserve(ranges.size());
169169
for (const auto& range : ranges) {
170-
new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length));
170+
new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length,
171+
/*allow_short_read=*/false));
171172
}
172173
return new_entries;
173174
}
@@ -219,7 +220,8 @@ struct ReadRangeCache::Impl {
219220
++next_it) {
220221
if (!next_it->future.is_valid()) {
221222
next_it->future =
222-
file->ReadAsync(ctx, next_it->range.offset, next_it->range.length);
223+
file->ReadAsync(ctx, next_it->range.offset, next_it->range.length,
224+
/*allow_short_read=*/false);
223225
}
224226
++num_prefetched;
225227
}
@@ -272,7 +274,8 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
272274
Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
273275
// Called by superclass Read()/WaitFor() so we have the lock
274276
if (!entry->future.is_valid()) {
275-
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length);
277+
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length,
278+
/*allow_short_read=*/false);
276279
}
277280
return entry->future;
278281
}

cpp/src/arrow/io/concurrency.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,23 @@ class RandomAccessFileConcurrencyWrapper : public RandomAccessFile {
208208
// to use the exclusive_guard.
209209

210210
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final {
211+
return ReadAt(position, nbytes, /*allow_short_read=*/true, out);
212+
}
213+
214+
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
215+
void* out) final {
211216
auto guard = lock_.shared_guard();
212-
return derived()->DoReadAt(position, nbytes, out);
217+
return derived()->DoReadAt(position, nbytes, allow_short_read, out);
213218
}
214219

215220
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final {
221+
return ReadAt(position, nbytes, /*allow_short_read=*/true);
222+
}
223+
224+
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
225+
bool allow_short_read) final {
216226
auto guard = lock_.shared_guard();
217-
return derived()->DoReadAt(position, nbytes);
227+
return derived()->DoReadAt(position, nbytes, allow_short_read);
218228
}
219229

220230
/*

cpp/src/arrow/io/file.cc

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,21 @@ class OSFile {
133133
return ::arrow::internal::FileRead(fd_.fd(), reinterpret_cast<uint8_t*>(out), nbytes);
134134
}
135135

136-
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
136+
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
137+
void* out) {
137138
RETURN_NOT_OK(CheckClosed());
138139
RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
139140
// ReadAt() leaves the file position undefined, so require that we seek
140141
// before calling Read() or Write().
141142
need_seeking_.store(true);
142-
return ::arrow::internal::FileReadAt(fd_.fd(), reinterpret_cast<uint8_t*>(out),
143-
position, nbytes);
143+
ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ::arrow::internal::FileReadAt(
144+
fd_.fd(), reinterpret_cast<uint8_t*>(out),
145+
position, nbytes));
146+
if (!allow_short_read && real_nbytes != nbytes) {
147+
return Status::IOError("File too short: expected to be able to read ", nbytes,
148+
" bytes, got ", real_nbytes);
149+
}
150+
return real_nbytes;
144151
}
145152

146153
Status Seek(int64_t pos) {
@@ -230,21 +237,20 @@ class ReadableFile::ReadableFileImpl : public OSFile {
230237
RETURN_NOT_OK(buffer->Resize(bytes_read));
231238
buffer->ZeroPadding();
232239
}
233-
// R build with openSUSE155 requires an explicit shared_ptr construction
234-
return std::shared_ptr<Buffer>(std::move(buffer));
240+
return buffer;
235241
}
236242

237-
Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) {
243+
Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes,
244+
bool allow_short_read) {
238245
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
239246

240-
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
241-
ReadAt(position, nbytes, buffer->mutable_data()));
247+
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, allow_short_read,
248+
buffer->mutable_data()));
242249
if (bytes_read < nbytes) {
243250
RETURN_NOT_OK(buffer->Resize(bytes_read));
244251
buffer->ZeroPadding();
245252
}
246-
// R build with openSUSE155 requires an explicit shared_ptr construction
247-
return std::shared_ptr<Buffer>(std::move(buffer));
253+
return buffer;
248254
}
249255

250256
Status WillNeed(const std::vector<ReadRange>& ranges) {
@@ -322,12 +328,14 @@ Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) {
322328
return impl_->Read(nbytes, out);
323329
}
324330

325-
Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) {
326-
return impl_->ReadAt(position, nbytes, out);
331+
Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes,
332+
bool allow_short_read, void* out) {
333+
return impl_->ReadAt(position, nbytes, allow_short_read, out);
327334
}
328335

329-
Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) {
330-
return impl_->ReadBufferAt(position, nbytes);
336+
Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes,
337+
bool allow_short_read) {
338+
return impl_->ReadBufferAt(position, nbytes, allow_short_read);
331339
}
332340

333341
Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {

cpp/src/arrow/io/file.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ class ARROW_EXPORT ReadableFile
124124
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
125125

126126
/// \brief Thread-safe implementation of ReadAt
127-
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
127+
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
128+
void* out);
128129

129130
/// \brief Thread-safe implementation of ReadAt
130-
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
131+
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
132+
bool allow_short_read);
131133

132134
Result<int64_t> DoGetSize();
133135
Status DoSeek(int64_t position);

cpp/src/arrow/io/file_test.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <thread>
3232
#include <vector>
3333

34+
#include <gmock/gmock-matchers.h>
3435
#include <gtest/gtest.h>
3536

3637
#include "arrow/buffer.h"
@@ -399,12 +400,18 @@ TEST_F(TestReadableFile, ReadAsync) {
399400
MakeTestFile();
400401
OpenFile();
401402

402-
auto fut1 = file_->ReadAsync({}, 1, 10);
403-
auto fut2 = file_->ReadAsync({}, 0, 4);
403+
auto fut1 = file_->ReadAsync(default_io_context(), 1, 10);
404+
auto fut2 = file_->ReadAsync(default_io_context(), 0, 4);
405+
auto fut3 = file_->ReadAsync(default_io_context(), 1, 10, /*allow_short_read=*/false);
406+
auto fut4 = file_->ReadAsync(default_io_context(), 0, 4, /*allow_short_read=*/false);
404407
ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result());
405408
ASSERT_OK_AND_ASSIGN(auto buf2, fut2.result());
409+
EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, ::testing::HasSubstr("File too short"),
410+
fut3.result());
411+
ASSERT_OK_AND_ASSIGN(auto buf4, fut4.result());
406412
AssertBufferEqual(*buf1, "estdata");
407413
AssertBufferEqual(*buf2, "test");
414+
AssertBufferEqual(*buf4, "test");
408415
}
409416

410417
TEST_F(TestReadableFile, ReadManyAsync) {

cpp/src/arrow/io/interfaces.cc

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,38 +149,73 @@ RandomAccessFile::~RandomAccessFile() = default;
149149

150150
RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {}
151151

152+
Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
153+
bool allow_short_read, void* out) {
154+
ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ReadAt(position, nbytes, out));
155+
if (!allow_short_read && real_nbytes != nbytes) {
156+
return Status::IOError("File too short: expected to be able to read ", nbytes,
157+
" bytes, got ", real_nbytes);
158+
}
159+
return real_nbytes;
160+
}
161+
152162
Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
153163
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
154164
RETURN_NOT_OK(Seek(position));
155165
return Read(nbytes, out);
156166
}
157167

168+
Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
169+
bool allow_short_read) {
170+
ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(position, nbytes));
171+
// XXX the internal `IoRecordedRandomAccessFile` can return a null buffer
172+
if (!allow_short_read && buffer && buffer->size() != nbytes) {
173+
return Status::IOError("File too short: expected to be able to read ", nbytes,
174+
" bytes, got ", buffer->size());
175+
}
176+
return buffer;
177+
}
178+
158179
Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
159180
int64_t nbytes) {
160181
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
161182
RETURN_NOT_OK(Seek(position));
162183
return Read(nbytes);
163184
}
164185

165-
// Default ReadAsync() implementation: simply issue the read on the context's executor
166186
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
167187
int64_t position,
168188
int64_t nbytes) {
189+
return ReadAsync(ctx, position, nbytes, /*allow_short_read=*/true);
190+
}
191+
192+
// Default ReadAsync() implementation: simply issue the read on the context's executor
193+
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
194+
int64_t position,
195+
int64_t nbytes,
196+
bool allow_short_read) {
169197
auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
170-
return DeferNotOk(internal::SubmitIO(
171-
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
198+
return DeferNotOk(internal::SubmitIO(ctx, [self, position, nbytes, allow_short_read] {
199+
return self->ReadAt(position, nbytes, allow_short_read);
200+
}));
172201
}
173202

174203
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
175204
int64_t nbytes) {
176-
return ReadAsync(io_context(), position, nbytes);
205+
return ReadAsync(io_context(), position, nbytes, /*allow_short_read=*/true);
206+
}
207+
208+
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
209+
int64_t nbytes,
210+
bool allow_short_read) {
211+
return ReadAsync(io_context(), position, nbytes, allow_short_read);
177212
}
178213

179214
std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
180215
const IOContext& ctx, const std::vector<ReadRange>& ranges) {
181216
std::vector<Future<std::shared_ptr<Buffer>>> ret;
182217
for (auto r : ranges) {
183-
ret.push_back(this->ReadAsync(ctx, r.offset, r.length));
218+
ret.push_back(this->ReadAsync(ctx, r.offset, r.length, /*allow_short_read=*/false));
184219
}
185220
return ret;
186221
}

cpp/src/arrow/io/interfaces.h

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,9 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
267267

268268
/// \brief Read data from given file position.
269269
///
270-
/// At most `nbytes` bytes are read. The number of bytes read is returned
271-
/// (it can be less than `nbytes` if EOF is reached).
270+
/// At most `nbytes` bytes are read. The number of bytes read is returned.
271+
/// If `allow_short_read` is true, the number of bytes read can be less than
272+
/// `nbytes` if EOF is reached, otherwise an error is returned.
272273
///
273274
/// This method can be safely called from multiple threads concurrently.
274275
/// It is unspecified whether this method updates the file position or not.
@@ -279,24 +280,56 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
279280
///
280281
/// \param[in] position Where to read bytes from
281282
/// \param[in] nbytes The number of bytes to read
283+
/// \param[in] allow_short_read Whether to allow reading less than `nbytes`
284+
/// \param[out] out The buffer to read bytes into
285+
/// \return The number of bytes read, or an error
286+
virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
287+
void* out);
288+
289+
/// \brief Read data from given file position.
290+
///
291+
/// Like `ReadAt(position, nbytes, allow_short_read, out)` with `allow_short_read`
292+
/// set to true.
293+
///
294+
/// \param[in] position Where to read bytes from
295+
/// \param[in] nbytes The number of bytes to read
282296
/// \param[out] out The buffer to read bytes into
283297
/// \return The number of bytes read, or an error
284298
virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out);
285299

286300
/// \brief Read data from given file position.
287301
///
288-
/// At most `nbytes` bytes are read, but it can be less if EOF is reached.
302+
/// At most `nbytes` bytes are read. If `allow_short_read` is true, the
303+
/// number of bytes read can be less than `nbytes` if EOF is reached,
304+
/// otherwise an error is returned.
305+
///
306+
/// \param[in] position Where to read bytes from
307+
/// \param[in] nbytes The number of bytes to read
308+
/// \param[in] allow_short_read Whether to allow reading less than `nbytes`
309+
/// \return A buffer containing the bytes read, or an error
310+
virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
311+
bool allow_short_read);
312+
313+
/// \brief Read data from given file position.
314+
///
315+
/// Like `ReadAt(position, nbytes, allow_short_read)` with `allow_short_read`
316+
/// set to true.
289317
///
290318
/// \param[in] position Where to read bytes from
291319
/// \param[in] nbytes The number of bytes to read
292320
/// \return A buffer containing the bytes read, or an error
293321
virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes);
294322

295323
/// EXPERIMENTAL: Read data asynchronously.
324+
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
325+
int64_t nbytes,
326+
bool allow_short_read);
296327
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
297328
int64_t nbytes);
298329

299330
/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
331+
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes,
332+
bool allow_short_read);
300333
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);
301334

302335
/// EXPERIMENTAL: Explicit multi-read.

0 commit comments

Comments
 (0)