Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding_internal.h"
Expand Down Expand Up @@ -112,7 +113,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
if (data_size < 4) {
throw ParquetException("Received invalid levels (corrupt data page?)");
}
num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data);
num_bytes =
::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<int32_t>(data));
if (num_bytes < 0 || num_bytes > data_size - 4) {
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
}
Expand All @@ -132,7 +134,11 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
"Number of buffered values too large (corrupt data page?)");
}
num_bytes = static_cast<int32_t>(bit_util::BytesForBits(num_bits));
#if ARROW_LITTLE_ENDIAN
if (num_bytes < 0 || num_bytes > data_size - 4) {
#else
if (num_bytes < 0 || num_bytes > data_size) {
#endif
Comment on lines +137 to +141
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou You added - 4 in #6848. Do you think that we need - 4 with big endian too?

throw ParquetException("Received invalid number of bytes (corrupt data page?)");
}
if (!bit_packed_decoder_) {
Expand Down
54 changes: 53 additions & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,8 @@ int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
DCHECK_EQ(encoded, num_buffered_values_);

if (include_length_prefix) {
reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
::arrow::util::SafeStore(dest_buffer->mutable_data(),
::arrow::bit_util::ToLittleEndian(level_encoder_.len()));
}

return level_encoder_.len() + prefix_size;
Expand Down Expand Up @@ -2578,13 +2579,31 @@ struct SerializeFunctor<
if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal64Type>) {
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal128Type>) {
#if ARROW_LITTLE_ENDIAN
// On little-endian: u64_in[0] = low, u64_in[1] = high
// Write high first for big-endian output
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
#else
// On big-endian: u64_in[0] = high, u64_in[1] = low
// Write high first for big-endian output
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
#endif
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal256Type>) {
#if ARROW_LITTLE_ENDIAN
// On little-endian: write words in reverse order (high to low)
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
#else
// On big-endian: write words in natural order (high to low)
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
#endif
}
scratch = reinterpret_cast<uint8_t*>(p);
}
Expand All @@ -2600,6 +2619,7 @@ struct SerializeFunctor<

// Requires a custom serializer because Float16s in Parquet are stored as a 2-byte
// (little-endian) FLBA, whereas in Arrow they're a native `uint16_t`.
#if ARROW_LITTLE_ENDIAN
template <>
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
Expand All @@ -2621,6 +2641,38 @@ struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
return FLBA{reinterpret_cast<const uint8_t*>(value_ptr)};
}
};
#else
template <>
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
const uint16_t* values = array.raw_values();
const int64_t length = array.length();

// Allocate buffer for little-endian converted values
converted_values_.resize(length);

if (array.null_count() == 0) {
for (int64_t i = 0; i < length; ++i) {
converted_values_[i] = ::arrow::bit_util::ToLittleEndian(values[i]);
out[i] = FLBA{reinterpret_cast<const uint8_t*>(&converted_values_[i])};
}
} else {
for (int64_t i = 0; i < length; ++i) {
if (array.IsValid(i)) {
converted_values_[i] = ::arrow::bit_util::ToLittleEndian(values[i]);
out[i] = FLBA{reinterpret_cast<const uint8_t*>(&converted_values_[i])};
} else {
out[i] = FLBA{};
}
}
}
return Status::OK();
}

private:
std::vector<uint16_t> converted_values_;
};
#endif
Comment on lines +2644 to +2675
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share implementation as much as possible something like:

template <>
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
  Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
#if ARROW_LITTLE_ENDIAN
    return SerializeLittleEndianValues(array.raw_values(), out);
#else
    const uint16_t* values = array.raw_values();
    const int64_t length = array.length();
    converted_values_.resize(length);
    for (int64_t i = 0; i < length; ++i) {
      // We don't need IsValid() here. Non valid values are just ignored in SerializeLittleEndianValues().
      converted_values_[i] = ::arrow::bit_util::ToLittleEndian(values[i]);
    }
    return SerializeLittleEndianValues(converted_values_.data(), out);
#endif
  }

 private:
  Status SerializeLittleEndianValues(const uint16_t* values, FLBA* out) {
    if (array.null_count() == 0) {
      for (int64_t i = 0; i < array.length(); ++i) {
        out[i] = ToFLBA(&values[i]);
      }
    } else {
      for (int64_t i = 0; i < array.length(); ++i) {
        out[i] = array.IsValid(i) ? ToFLBA(&values[i]) : FLBA{};
      }
    }
    return Status::OK();
  }

  FLBA ToFLBA(const uint16_t* value_ptr) const {
    return FLBA{reinterpret_cast<const uint8_t*>(value_ptr)};
  }

#if !ARROW_LITTLE_ENDIAN
  std::vector<uint16_t> converted_values_;  
#endif
};


template <>
Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "arrow/util/endian.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/types.h"
Expand Down Expand Up @@ -260,13 +261,21 @@ constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
#if ARROW_LITTLE_ENDIAN
(*impala_timestamp).value[2] = (uint32_t)julian_days;
#endif
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this #if?

It seems that the below (*impala_timestamp).value[2] = static_cast<uint32_t>(julian_days); does the same thing.


int64_t last_day_units = time % UnitPerDay;
auto last_day_nanos = last_day_units * NanosecondsPerUnit;
#if ARROW_LITTLE_ENDIAN
// impala_timestamp will be unaligned every other entry so do memcpy instead
// of assign and reinterpret cast to avoid undefined behavior.
std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
#else
(*impala_timestamp).value[0] = static_cast<uint32_t>(last_day_nanos);
(*impala_timestamp).value[1] = static_cast<uint32_t>(last_day_nanos >> 32);
Comment on lines 269 to +276
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the following instead of #if?

auto last_day_nanos = last_day_units * NanosecondsPerUnit;
auto last_day_nanos_little_endian = ::arrow::bit_util::ToLittleEndian(last_day_nanos);
std::memcpy(impala_timestamp, &last_day_nanos_little_endian, sizeof(int64_t));

(*impala_timestamp).value[2] = static_cast<uint32_t>(julian_days);
#endif
}

constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
Expand Down
Loading