diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 7919878f148..8be09956f10 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -375,6 +375,8 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le decoder.next_required_size()); } + // TODO(GH-48846): we should take a body_length just like ReadMessageAsync + // and read metadata + body in one go. ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length)); if (metadata->size() < metadata_length) { return Status::Invalid("Expected to read ", metadata_length, diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 4910b1596c3..6a20dbb8c85 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1180,31 +1180,36 @@ Status CheckAligned(const FileBlock& block) { return Status::OK(); } +template +Result CheckBodyLength(MessagePtr message, const FileBlock& block) { + if (message->body_length() != block.body_length) { + return Status::Invalid( + "Mismatching body length for IPC message " + "(Block.bodyLength: ", + block.body_length, " vs. Message.bodyLength: ", message->body_length(), ")"); + } + // NOTE: we cannot check metadata length as easily as we would have to account + // for the additional IPC signalisation (such as optional continuation bytes). + return message; +} + Result> ReadMessageFromBlock( const FileBlock& block, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { RETURN_NOT_OK(CheckAligned(block)); - // TODO(wesm): this breaks integration tests, see ARROW-3256 - // DCHECK_EQ((*out)->body_length(), block.body_length); - ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, file, fields_loader)); - return message; + return CheckBodyLength(std::move(message), block); } Future> ReadMessageFromBlockAsync( const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) { - if (!bit_util::IsMultipleOf8(block.offset) || - !bit_util::IsMultipleOf8(block.metadata_length) || - !bit_util::IsMultipleOf8(block.body_length)) { - return Status::Invalid("Unaligned block in IPC file"); - } - - // TODO(wesm): this breaks integration tests, see ARROW-3256 - // DCHECK_EQ((*out)->body_length(), block.body_length); - + RETURN_NOT_OK(CheckAligned(block)); return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file, - io_context); + io_context) + .Then([block](std::shared_ptr message) { + return CheckBodyLength(std::move(message), block); + }); } class RecordBatchFileReaderImpl;