Skip to content

Conversation

@tanjialiang
Copy link
Contributor

Summary: Use BaseSerializedPage directly from shuffle. This allows seamless handle of shuffle data all the way to ShuffleRead(subclass of Exchange operator)

Differential Revision: D87852058

@tanjialiang tanjialiang requested review from a team as code owners November 25, 2025 08:31
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Nov 25, 2025
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 25, 2025

Reviewer's Guide

Refactors the native shuffle and exchange path to use a new ShuffleSerializedPage (subclass of velox::exec::BaseSerializedPage) instead of the custom ReadBatch/SerializedPage types, and updates readers, exchange sources, the ShuffleRead operator, and tests to consume this unified page abstraction.

Sequence diagram for ShuffleReader to ShuffleRead data flow using ShuffleSerializedPage

sequenceDiagram
  participant ShuffleRead
  participant ShuffleExchangeSource
  participant LocalShuffleReader
  participant ExchangeQueue

  ShuffleRead->>ShuffleExchangeSource: request(maxBytes, maxWait)
  activate ShuffleExchangeSource

  ShuffleExchangeSource->>LocalShuffleReader: next(maxBytes)
  activate LocalShuffleReader
  LocalShuffleReader-->>ShuffleExchangeSource: SemiFuture<vector<ShuffleSerializedPage>>
  deactivate LocalShuffleReader

  ShuffleExchangeSource->>ShuffleExchangeSource: deferValue(on batches)
  ShuffleExchangeSource->>ExchangeQueue: enqueueLocked(ShuffleSerializedPage pages)
  ExchangeQueue-->>ShuffleExchangeSource: ContinuePromise list

  ShuffleExchangeSource-->>ShuffleRead: Response{totalBytes, atEnd}
  deactivate ShuffleExchangeSource

  ShuffleRead->>ShuffleRead: getOutput()
  ShuffleRead->>ShuffleRead: iterate currentPages_ as BaseSerializedPage
  ShuffleRead->>ShuffleRead: cast each page to ShuffleSerializedPage
  ShuffleRead->>ShuffleRead: use rows() and numRows() to build RowVector
Loading

Class diagram for unified ShuffleSerializedPage hierarchy and consumers

classDiagram

class BaseSerializedPage {
  <<abstract>>
  +size() uint64_t
  +numRows() std::optional<int64_t>
  +prepareStreamForDeserialize() ByteInputStream*
  +getIOBuf() follyIOBuf*
}

class PrestoSerializedPage {
}
BaseSerializedPage <|-- PrestoSerializedPage

class ShuffleSerializedPage {
  <<abstract>>
  +prepareStreamForDeserialize() ByteInputStream*
  +getIOBuf() follyIOBuf*
  +rows() std::vector_string_view&
}
BaseSerializedPage <|-- ShuffleSerializedPage

class LocalShuffleSerializedPage {
  -rows_ std::vector_string_view
  -buffer_ BufferPtr
  +LocalShuffleSerializedPage(rows std::vector_string_view, buffer BufferPtr)
  +rows() std::vector_string_view&
  +size() uint64_t
  +numRows() std::optional_int64_t
}
ShuffleSerializedPage <|-- LocalShuffleSerializedPage

class ShuffleReader {
  <<interface>>
  +next(maxBytes uint64_t) SemiFuture_vector_unique_ptr_ShuffleSerializedPage
  +noMoreData(success bool) void
}

class LocalShuffleReader {
  +initialize() void
  +next(maxBytes uint64_t) SemiFuture_vector_unique_ptr_ShuffleSerializedPage
  -nextSorted(maxBytes uint64_t) vector_unique_ptr_ShuffleSerializedPage
  -nextUnsorted(maxBytes uint64_t) vector_unique_ptr_ShuffleSerializedPage
}
ShuffleReader <|.. LocalShuffleReader
LocalShuffleReader ..> LocalShuffleSerializedPage
LocalShuffleReader ..> ShuffleSerializedPage

class ShuffleExchangeSource {
  +request(maxBytes int64_t, maxWait microseconds) SemiFuture_Response
}
ShuffleExchangeSource ..> ShuffleReader
ShuffleExchangeSource ..> ShuffleSerializedPage
ShuffleExchangeSource ..> BaseSerializedPage

class ShuffleRead {
  -currentPages_ vector_unique_ptr_BaseSerializedPage
  +getOutput() RowVectorPtr
}
ShuffleRead ..> BaseSerializedPage
ShuffleRead ..> ShuffleSerializedPage

class BroadcastExchangeSource {
  +request(maxBytes int64_t, maxWait microseconds) Response
}
BroadcastExchangeSource ..> BaseSerializedPage
BroadcastExchangeSource ..> PrestoSerializedPage

class PrestoExchangeSource {
  -processDataResponse(response DataResponsePtr) void
}
PrestoExchangeSource ..> BaseSerializedPage
PrestoExchangeSource ..> PrestoSerializedPage

class ReadBatch {
  <<removed>>
  -rows std::vector_string_view
  -data BufferPtr
}

class ShuffleRowBatch {
  <<removed>>
  -rowBatch_ unique_ptr_ReadBatch
  +rows() std::vector_string_view&
}
ReadBatch ..> ShuffleRowBatch
Loading

File-Level Changes

Change Details Files
Introduce ShuffleSerializedPage abstraction and wire LocalShuffleReader to return it instead of ReadBatch.
  • Define ShuffleSerializedPage as a subclass of velox::exec::BaseSerializedPage with unsupported deserialize/IOBuf methods and a virtual rows() accessor.
  • Introduce LocalShuffleSerializedPage to wrap row string_views and backing BufferPtr and implement size() and numRows() using the buffer and row count.
  • Change LocalShuffleReader::next/nextSorted/nextUnsorted signatures and implementations to produce vectors of ShuffleSerializedPage instead of ReadBatch instances, updating batch construction accordingly.
presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Update exchange sources to use BaseSerializedPage-based pages and remove the ShuffleRowBatch adapter.
  • Change ShuffleExchangeSource to request ShuffleSerializedPage objects from ShuffleReader::next and enqueue them directly, tracking numBatches_ while relying on page->size()/numRows() later.
  • Remove the ShuffleRowBatch class wrapper and its dependency on ShuffleWrite/SerializedPage from ShuffleExchangeSource.h to rely on BaseSerializedPage subclasses instead.
  • Change PrestoExchangeSource and BroadcastExchangeSource to traffic in std::unique_ptrvelox::exec::BaseSerializedPage and construct exec::PrestoSerializedPage instead of exec::SerializedPage for remote/broadcast pages.
presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp
presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h
presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp
Adapt ShuffleRead operator and tests to the new ShuffleSerializedPage API.
  • Update ShuffleRead::getOutput to treat currentPages_ as BaseSerializedPage, casting to ShuffleSerializedPage to validate size and access rows() for row extraction.
  • Replace direct field access on ReadBatch::rows in unit tests with calls to ShuffleSerializedPage::rows(), keeping total row counting and value extraction logic intact.
  • Include ShuffleWrite header in PrestoServer.cpp to satisfy new dependencies introduced by using ShuffleSerializedPage/PrestoSerializedPage types.
presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp
presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp
presto-native-execution/presto_cpp/main/PrestoServer.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In ShuffleExchangeSource::request, totalBytes is no longer updated inside the deferValue loop (it stays 0 for the Response), so you likely want to accumulate page->size() for each enqueued batch as before to preserve correct accounting.
  • LocalShuffleSerializedPage takes const std::vectorstd::string_view& rows and stores it in a const member initialized with std::move(rows), which just copies because the parameter is const; consider making the parameter by value or rvalue-reference and the member non-const so you can actually move and avoid an extra copy.
  • ShuffleSerializedPage::rows() returns a non-const reference but cannot be called on a const ShuffleSerializedPage, which is awkward for read-only use; consider making rows() const (and updating overrides) so callers with const BaseSerializedPage pointers can still access row views without casting away constness.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In ShuffleExchangeSource::request, totalBytes is no longer updated inside the deferValue loop (it stays 0 for the Response), so you likely want to accumulate page->size() for each enqueued batch as before to preserve correct accounting.
- LocalShuffleSerializedPage takes const std::vector<std::string_view>& rows and stores it in a const member initialized with std::move(rows), which just copies because the parameter is const; consider making the parameter by value or rvalue-reference and the member non-const so you can actually move and avoid an extra copy.
- ShuffleSerializedPage::rows() returns a non-const reference but cannot be called on a const ShuffleSerializedPage, which is awkward for read-only use; consider making rows() const (and updating overrides) so callers with const BaseSerializedPage pointers can still access row views without casting away constness.

## Individual Comments

### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp:38` </location>
<code_context>
-        .deferValue([this](std::vector<std::unique_ptr<ReadBatch>>&& batches) {
</code_context>

<issue_to_address>
**issue (bug_risk):** totalBytes is no longer updated and will always be 0 in the Response

In the new lambda, `totalBytes` is set to 0 but never updated in the loop over `batches`, whereas previously it was set from `batch->data->size()`. This means `request` will always return a `Response` with `totalBytes == 0`, which can break byte-based accounting and backpressure. Please restore accumulation of `totalBytes` (e.g., sum `batch->size()` for each enqueued page and re-apply the `int32_t` bounds check if still required).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this, maxBytes]() {
return std::move(shuffleReader_->next(maxBytes))
.deferValue([this](std::vector<std::unique_ptr<ReadBatch>>&& batches) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): totalBytes is no longer updated and will always be 0 in the Response

In the new lambda, totalBytes is set to 0 but never updated in the loop over batches, whereas previously it was set from batch->data->size(). This means request will always return a Response with totalBytes == 0, which can break byte-based accounting and backpressure. Please restore accumulation of totalBytes (e.g., sum batch->size() for each enqueued page and re-apply the int32_t bounds check if still required).

xiaoxmeng
xiaoxmeng previously approved these changes Nov 26, 2025
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanjialiang thanks!

tanjialiang added a commit to tanjialiang/presto that referenced this pull request Nov 27, 2025
…b#26692)

Summary:

Use BaseSerializedPage directly from shuffle. This allows seamless handle of shuffle data all the way to ShuffleRead(subclass of Exchange operator)

Reviewed By: xiaoxmeng

Differential Revision: D87852058
tanjialiang added a commit to tanjialiang/presto that referenced this pull request Nov 27, 2025
…b#26692)

Summary:

Use BaseSerializedPage directly from shuffle. This allows seamless handle of shuffle data all the way to ShuffleRead(subclass of Exchange operator)

Reviewed By: xiaoxmeng

Differential Revision: D87852058
…b#26692)

Summary:

Use BaseSerializedPage directly from shuffle. This allows seamless handle of shuffle data all the way to ShuffleRead(subclass of Exchange operator)

Reviewed By: xiaoxmeng

Differential Revision: D87852058
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants