Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ arrow::Result<int64_t> compressBuffer(
// Same wire format as compressBuffer:
// kTypeAwareBuffer (int64) | uncompressedLength (int64) | compressedLength (int64) | compressed data
// If compressed size >= uncompressed size, falls back to kUncompressedBuffer (same as standard codec).
//
// For TAC type kStringDict, the offsetsBuffer + numRows arguments must be
// supplied so the codec can build the dictionary. They are ignored for other
// TAC types.
arrow::Result<int64_t> compressTypeAwareBuffer(
const std::shared_ptr<arrow::Buffer>& buffer,
uint8_t* output,
int64_t outputLength,
int8_t typeKind) {
int8_t typeKind,
const uint8_t* offsetsBuffer = nullptr,
int32_t numRows = 0) {
auto outputPtr = &output;
if (!buffer) {
write<int64_t>(outputPtr, kNullBuffer);
Expand All @@ -116,7 +122,8 @@ arrow::Result<int64_t> compressTypeAwareBuffer(

ARROW_ASSIGN_OR_RAISE(
auto compressedSize,
TypeAwareCompressCodec::compress(buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind));
TypeAwareCompressCodec::compress(
buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind, offsetsBuffer, numRows));

if (compressedSize >= buffer->size()) {
// Compression didn't help. Fall back to uncompressed, same as compressBuffer.
Expand Down Expand Up @@ -272,12 +279,46 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
auto typeKind = (bufferTypes != nullptr && i < bufferTypes->size()) ? (*bufferTypes)[i] : tac::kUnsupported;

int64_t compressedSize = 0;
if (TypeAwareCompressCodec::support(typeKind)) {
// Use type-aware compression for supported types.
// For string-dict compression the codec needs per-row offsets into
// the string-data buffer. Production string columns reach us in two
// shapes:
//
// (a) Arrow standard layout: validity, offsets[numRows+1], data.
// offsets[0] == 0, offsets[numRows] == data buffer size.
// (b) VeloxHashShuffleWriter layout: validity, lengths[numRows],
// data — i.e. a per-row length, not cumulative offsets.
//
// The codec reads `offsets[numRows]` as the data-buffer end sentinel,
// which is in-bounds for shape-a but ONE PAST THE END for shape-b.
// To stay correct, only route shape-a inputs through compressStringDict;
// shape-b inputs fall through to the standard LZ4/ZSTD codec below.
// The codec's internal `sliced` early-exit was already producing LZ4
// output for the common shape-b case (where `offsets[0] != 0`), so
// observable compressed size is unchanged at the user-data level.
//
// Use the authoritative `numRows` parameter (NOT `bufferSize/4 - 1`)
// so single-row batches don't underflow to `numRows == 0`.
bool routeToTacStringDict = false;
const uint8_t* offsetsBuf = nullptr;
int32_t offsetsNumRows = 0;
if (typeKind == tac::kStringDict && numRows > 0 && i >= 1 && buffers[i - 1] != nullptr) {
const auto prevSize = buffers[i - 1]->size();
const auto expectedOffsetsBytes = static_cast<int64_t>(numRows + 1) * sizeof(int32_t);
if (prevSize == expectedOffsetsBytes) {
offsetsBuf = buffers[i - 1]->data();
offsetsNumRows = static_cast<int32_t>(numRows);
routeToTacStringDict = true;
}
}

if (TypeAwareCompressCodec::support(typeKind) && (typeKind != tac::kStringDict || routeToTacStringDict)) {
ARROW_ASSIGN_OR_RAISE(
compressedSize, compressTypeAwareBuffer(std::move(buffers[i]), output, availableLength, typeKind));
compressedSize,
compressTypeAwareBuffer(
std::move(buffers[i]), output, availableLength, typeKind, offsetsBuf, offsetsNumRows));
} else {
// Use standard codec (LZ4/ZSTD) for unsupported types.
// Use standard codec (LZ4/ZSTD) for unsupported types and for
// kStringDict on shape-b lengths buffers (or any unexpected shape).
ARROW_ASSIGN_OR_RAISE(compressedSize, compressBuffer(std::move(buffers[i]), output, availableLength, codec));
}
output += compressedSize;
Expand Down
Loading
Loading