Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
decoder.next_required_size());
}

// TODO(GH-48846): we should take a body_length just like ReadMessageAsync
// and read metadata + body in one go.
ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
Expand Down
33 changes: 19 additions & 14 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1180,31 +1180,36 @@ Status CheckAligned(const FileBlock& block) {
return Status::OK();
}

template <typename MessagePtr>
Result<MessagePtr> CheckBodyLength(MessagePtr message, const FileBlock& block) {
if (message->body_length() != block.body_length) {
return Status::Invalid(
"Mismatching body length for IPC message "
"(Block.bodyLength: ",
block.body_length, " vs. Message.bodyLength: ", message->body_length(), ")");
}
// NOTE: we cannot check metadata length as easily as we would have to account
// for the additional IPC signalisation (such as optional continuation bytes).
return message;
}

Result<std::unique_ptr<Message>> ReadMessageFromBlock(
const FileBlock& block, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
RETURN_NOT_OK(CheckAligned(block));
// TODO(wesm): this breaks integration tests, see ARROW-3256
// DCHECK_EQ((*out)->body_length(), block.body_length);

ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
file, fields_loader));
return message;
return CheckBodyLength(std::move(message), block);
}

Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(
const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) {
if (!bit_util::IsMultipleOf8(block.offset) ||
!bit_util::IsMultipleOf8(block.metadata_length) ||
!bit_util::IsMultipleOf8(block.body_length)) {
return Status::Invalid("Unaligned block in IPC file");
}

// TODO(wesm): this breaks integration tests, see ARROW-3256
// DCHECK_EQ((*out)->body_length(), block.body_length);

RETURN_NOT_OK(CheckAligned(block));
return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file,
io_context);
io_context)
.Then([block](std::shared_ptr<Message> message) {
return CheckBodyLength(std::move(message), block);
});
}

class RecordBatchFileReaderImpl;
Expand Down
Loading