Skip to content

Commit 76b6a96

Browse files
tanjialiangfacebook-github-bot
authored andcommitted
refactor: Change native pos API to return BaseSerializedPage (#26692)
Summary: Use BaseSerializedPage directly from shuffle. This allows seamless handle of shuffle data all the way to ShuffleRead(subclass of Exchange operator) Reviewed By: xiaoxmeng Differential Revision: D87852058
1 parent d5c8633 commit 76b6a96

File tree

13 files changed

+101
-78
lines changed

13 files changed

+101
-78
lines changed

presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ void PrestoExchangeSource::processDataResponse(
323323
contentLength, 0, "next token is not set in non-empty data response");
324324
}
325325

326-
std::unique_ptr<exec::SerializedPage> page;
326+
std::unique_ptr<exec::SerializedPageBase> page;
327327
const bool empty = response->empty();
328328
if (!empty) {
329329
std::vector<std::unique_ptr<folly::IOBuf>> iobufs;
@@ -351,7 +351,7 @@ void PrestoExchangeSource::processDataResponse(
351351
}
352352

353353
if (enableBufferCopy_) {
354-
page = std::make_unique<exec::SerializedPage>(
354+
page = std::make_unique<exec::PrestoSerializedPage>(
355355
std::move(singleChain), [pool = pool_](folly::IOBuf& iobuf) {
356356
int64_t freedBytes{0};
357357
// Free the backed memory from MemoryAllocator on page dtor
@@ -365,7 +365,7 @@ void PrestoExchangeSource::processDataResponse(
365365
PrestoExchangeSource::updateMemoryUsage(-freedBytes);
366366
});
367367
} else {
368-
page = std::make_unique<exec::SerializedPage>(
368+
page = std::make_unique<exec::PrestoSerializedPage>(
369369
std::move(singleChain), [totalBytes](folly::IOBuf& iobuf) {
370370
PrestoExchangeSource::updateMemoryUsage(-totalBytes);
371371
});

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
4242
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
4343
#include "presto_cpp/main/operators/ShuffleRead.h"
44+
#include "presto_cpp/main/operators/ShuffleWrite.h"
4445
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
4546
#include "presto_cpp/main/types/VeloxPlanConversion.h"
4647
#include "velox/common/base/Counters.h"

presto-native-execution/presto_cpp/main/SessionProperties.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ SessionProperties::SessionProperties() {
345345
"creating tiny SerializedPages. For "
346346
"PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator"
347347
"would buffer up to that number of bytes / number of destinations for "
348-
"each destination before producing a SerializedPage.",
348+
"each destination before producing a SerializedPageBase.",
349349
BIGINT(),
350350
false,
351351
QueryConfig::kMaxPartitionedOutputBufferSize,

presto-native-execution/presto_cpp/main/SessionProperties.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class SessionProperties {
268268
/// creating tiny SerializedPages. For
269269
/// PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator
270270
/// would buffer up to that number of bytes / number of destinations for each
271-
/// destination before producing a SerializedPage.
271+
/// destination before producing a SerializedPageBase.
272272
static constexpr const char* kMaxPartitionedOutputBufferSize =
273273
"native_max_page_partitioning_buffer_size";
274274

presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ BroadcastExchangeSource::request(
4242

4343
return folly::makeTryWith([&]() -> Response {
4444
int64_t totalBytes = 0;
45-
std::vector<std::unique_ptr<velox::exec::SerializedPage>> pages;
45+
std::vector<std::unique_ptr<velox::exec::SerializedPageBase>> pages;
4646

4747
while (totalBytes < maxBytes && reader_->hasNext()) {
4848
auto buffer = reader_->next();
4949
VELOX_CHECK_NOT_NULL(buffer);
5050

5151
auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
52-
auto page = std::make_unique<velox::exec::SerializedPage>(
52+
auto page = std::make_unique<velox::exec::PrestoSerializedPage>(
5353
std::move(ioBuf), [buffer](auto& /*unused*/) {});
5454
pages.push_back(std::move(page));
5555

presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,30 @@ class SortedFileInputStream final : public velox::common::FileInputStream,
105105
std::string currentValue_;
106106
};
107107

108+
class LocalShuffleSerializedPage : public ShuffleSerializedPage {
109+
public:
110+
LocalShuffleSerializedPage(
111+
const std::vector<std::string_view>& rows,
112+
velox::BufferPtr buffer)
113+
: rows_{std::move(rows)}, buffer_{std::move(buffer)} {}
114+
115+
const std::vector<std::string_view>& rows() override {
116+
return rows_;
117+
}
118+
119+
uint64_t size() const override {
120+
return buffer_->size();
121+
}
122+
123+
std::optional<int64_t> numRows() const override {
124+
return rows_.size();
125+
}
126+
127+
private:
128+
const std::vector<std::string_view> rows_;
129+
const velox::BufferPtr buffer_;
130+
};
131+
108132
std::vector<RowMetadata>
109133
extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
110134
std::vector<RowMetadata> rows;
@@ -450,9 +474,9 @@ void LocalShuffleReader::initSortedShuffleRead() {
450474
}
451475
}
452476

453-
std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(
454-
uint64_t maxBytes) {
455-
std::vector<std::unique_ptr<ReadBatch>> batches;
477+
std::vector<std::unique_ptr<ShuffleSerializedPage>>
478+
LocalShuffleReader::nextSorted(uint64_t maxBytes) {
479+
std::vector<std::unique_ptr<ShuffleSerializedPage>> batches;
456480

457481
if (merge_ == nullptr) {
458482
return batches;
@@ -469,7 +493,7 @@ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(
469493
if (bufferUsed + data.size() > maxBytes) {
470494
if (bufferUsed > 0) {
471495
batches.push_back(
472-
std::make_unique<ReadBatch>(
496+
std::make_unique<LocalShuffleSerializedPage>(
473497
std::move(rows), std::move(batchBuffer)));
474498
return batches;
475499
}
@@ -489,15 +513,16 @@ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(
489513

490514
if (!rows.empty()) {
491515
batches.push_back(
492-
std::make_unique<ReadBatch>(std::move(rows), std::move(batchBuffer)));
516+
std::make_unique<LocalShuffleSerializedPage>(
517+
std::move(rows), std::move(batchBuffer)));
493518
}
494519

495520
return batches;
496521
}
497522

498-
std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted(
499-
uint64_t maxBytes) {
500-
std::vector<std::unique_ptr<ReadBatch>> batches;
523+
std::vector<std::unique_ptr<ShuffleSerializedPage>>
524+
LocalShuffleReader::nextUnsorted(uint64_t maxBytes) {
525+
std::vector<std::unique_ptr<ShuffleSerializedPage>> batches;
501526
uint64_t totalBytes{0};
502527

503528
while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
@@ -527,13 +552,14 @@ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted(
527552

528553
totalBytes += fileSize;
529554
batches.push_back(
530-
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
555+
std::make_unique<LocalShuffleSerializedPage>(
556+
std::move(rows), std::move(buffer)));
531557
}
532558

533559
return batches;
534560
}
535561

536-
folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
562+
folly::SemiFuture<std::vector<std::unique_ptr<ShuffleSerializedPage>>>
537563
LocalShuffleReader::next(uint64_t maxBytes) {
538564
VELOX_CHECK(
539565
initialized_,

presto-native-execution/presto_cpp/main/operators/LocalShuffle.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class LocalShuffleReader : public ShuffleReader {
173173
/// For sorted shuffle, this opens all shuffle files and prepares k-way merge.
174174
void initialize();
175175

176-
folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
176+
folly::SemiFuture<std::vector<std::unique_ptr<ShuffleSerializedPage>>> next(
177177
uint64_t maxBytes) override;
178178

179179
void noMoreData(bool success) override;
@@ -192,10 +192,12 @@ class LocalShuffleReader : public ShuffleReader {
192192
void initSortedShuffleRead();
193193

194194
// Reads sorted shuffle data using k-way merge with TreeOfLosers.
195-
std::vector<std::unique_ptr<ReadBatch>> nextSorted(uint64_t maxBytes);
195+
std::vector<std::unique_ptr<ShuffleSerializedPage>> nextSorted(
196+
uint64_t maxBytes);
196197

197198
// Reads unsorted shuffle data in batch-based file reading.
198-
std::vector<std::unique_ptr<ReadBatch>> nextUnsorted(uint64_t maxBytes);
199+
std::vector<std::unique_ptr<ShuffleSerializedPage>> nextUnsorted(
200+
uint64_t maxBytes);
199201

200202
const std::string rootPath_;
201203
const std::string queryId_;

presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,31 +35,29 @@ ShuffleExchangeSource::request(
3535
std::chrono::microseconds /*maxWait*/) {
3636
auto nextBatch = [this, maxBytes]() {
3737
return std::move(shuffleReader_->next(maxBytes))
38-
.deferValue([this](std::vector<std::unique_ptr<ReadBatch>>&& batches) {
39-
std::vector<velox::ContinuePromise> promises;
40-
int64_t totalBytes{0};
41-
{
42-
std::lock_guard<std::mutex> l(queue_->mutex());
43-
if (batches.empty()) {
44-
atEnd_ = true;
45-
queue_->enqueueLocked(nullptr, promises);
46-
} else {
47-
for (auto& batch : batches) {
48-
totalBytes = batch->data->size();
49-
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());
50-
++numBatches_;
51-
queue_->enqueueLocked(
52-
std::make_unique<ShuffleRowBatch>(std::move(batch)),
53-
promises);
38+
.deferValue(
39+
[this](
40+
std::vector<std::unique_ptr<ShuffleSerializedPage>>&& batches) {
41+
std::vector<velox::ContinuePromise> promises;
42+
int64_t totalBytes{0};
43+
{
44+
std::lock_guard<std::mutex> l(queue_->mutex());
45+
if (batches.empty()) {
46+
atEnd_ = true;
47+
queue_->enqueueLocked(nullptr, promises);
48+
} else {
49+
for (auto& batch : batches) {
50+
++numBatches_;
51+
queue_->enqueueLocked(std::move(batch), promises);
52+
}
53+
}
5454
}
55-
}
56-
}
5755

58-
for (auto& promise : promises) {
59-
promise.setValue();
60-
}
61-
return folly::makeFuture(Response{totalBytes, atEnd_});
62-
})
56+
for (auto& promise : promises) {
57+
promise.setValue();
58+
}
59+
return folly::makeFuture(Response{totalBytes, atEnd_});
60+
})
6361
.deferError(
6462
[](folly::exception_wrapper e) mutable
6563
-> ShuffleExchangeSource::Response {

presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,11 @@
1414
#pragma once
1515

1616
#include "presto_cpp/main/operators/ShuffleInterface.h"
17-
#include "presto_cpp/main/operators/ShuffleWrite.h"
1817
#include "velox/core/PlanNode.h"
19-
#include "velox/exec/Exchange.h"
2018
#include "velox/exec/Operator.h"
2119

2220
namespace facebook::presto::operators {
2321

24-
class ShuffleRowBatch : public velox::exec::SerializedPage {
25-
public:
26-
explicit ShuffleRowBatch(
27-
std::unique_ptr<ReadBatch> rowBatch)
28-
: velox::exec::
29-
SerializedPage{folly::IOBuf::wrapBuffer(
30-
rowBatch->data->as<char>(), rowBatch->data->size()), nullptr, rowBatch->rows.size()},
31-
rowBatch_{std::move(rowBatch)} {}
32-
33-
~ShuffleRowBatch() override {}
34-
35-
const std::vector<std::string_view>& rows() const {
36-
return rowBatch_->rows;
37-
}
38-
39-
private:
40-
const std::unique_ptr<ReadBatch> rowBatch_;
41-
};
42-
4322
class ShuffleExchangeSource : public velox::exec::ExchangeSource {
4423
public:
4524
ShuffleExchangeSource(

presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@
1414
#pragma once
1515

1616
#include <fmt/format.h>
17+
#include "velox/exec/Exchange.h"
1718
#include "velox/exec/Operator.h"
1819

20+
namespace facebook::velox {
21+
class ByteInputStream;
22+
}
23+
1924
namespace facebook::presto::operators {
2025

2126
class ShuffleWriter {
@@ -44,12 +49,21 @@ class ShuffleWriter {
4449
}
4550
};
4651

47-
struct ReadBatch {
48-
std::vector<std::string_view> rows;
49-
velox::BufferPtr data;
52+
class ShuffleSerializedPage : public velox::exec::SerializedPageBase {
53+
public:
54+
ShuffleSerializedPage() = default;
55+
~ShuffleSerializedPage() override = default;
56+
57+
std::unique_ptr<velox::ByteInputStream> prepareStreamForDeserialize()
58+
override {
59+
VELOX_UNSUPPORTED();
60+
}
61+
62+
std::unique_ptr<folly::IOBuf> getIOBuf() const override {
63+
VELOX_UNSUPPORTED();
64+
}
5065

51-
ReadBatch(std::vector<std::string_view>&& _rows, velox::BufferPtr&& _data)
52-
: rows{std::move(_rows)}, data{std::move(_data)} {}
66+
virtual const std::vector<std::string_view>& rows() = 0;
5367
};
5468

5569
class ShuffleReader {
@@ -58,10 +72,11 @@ class ShuffleReader {
5872

5973
/// Fetch the next batch of rows from the shuffle reader.
6074
/// @param bytes Maximum number of bytes to read in this batch.
61-
/// @return A semi-future resolving to a vector of ReadBatch pointers, where
62-
/// each ReadBatch contains rows and associated data buffers.
63-
virtual folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
64-
uint64_t maxBytes) = 0;
75+
/// @return A semi-future resolving to a vector of ShuffleSerializedPage
76+
/// pointers, where each ShuffleSerializedPage contains rows and associated
77+
/// data buffers.
78+
virtual folly::SemiFuture<std::vector<std::unique_ptr<ShuffleSerializedPage>>>
79+
next(uint64_t maxBytes) = 0;
6580

6681
/// Tell the shuffle system the reader is done. May be called with 'success'
6782
/// true before reading all the data. This happens when a query has a LIMIT or

0 commit comments

Comments
 (0)