Skip to content

Commit

Permalink
[YQL-17709] [channel] split spilling into actor and non-actor parts (y…
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll authored Mar 31, 2024
1 parent d99f5c5 commit 41196f0
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 342 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext
return {};
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */, TActorSystem* /* actorSystem */) const override {
return {};
}

Expand Down
6 changes: 3 additions & 3 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChan
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling) const {
return CreateChannelStorage(channelId, withSpilling, nullptr, false);
return CreateChannelStorage(channelId, withSpilling, NActors::TlsActivationContext->ActorSystem());
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
if (withSpilling) {
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent);
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
} else {
return nullptr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp);

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const override;

std::function<void()> GetWakeupCallback() const override;

Expand Down
106 changes: 76 additions & 30 deletions ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,108 @@ using namespace NActors;

namespace {


constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;

class TDqChannelStorage : public IDqChannelStorage {
struct TWritingBlobInfo {
ui64 BlobSize_;
NThreading::TFuture<void> IsBlobWrittenFuture_;
};
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent)
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
: ActorSystem_(actorSystem)
{
if (isConcurrent) {
SelfActor_ = CreateConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
} else {
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
}
SelfActorId_ = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
SelfId_ = TlsActivationContext->AsActorContext().SelfID;
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
ChannelStorageActorId_ = ActorSystem_->Register(ChannelStorageActor_->GetActor());
}

~TDqChannelStorage() {
if (ActorSystem_) {
ActorSystem_->Send(
new IEventHandle(
SelfActorId_,
SelfId_,
new TEvents::TEvPoison,
/*flags=*/0,
/*cookie=*/0));
} else {
TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison);
}
ActorSystem_->Send(ChannelStorageActorId_, new TEvents::TEvPoison);
}

bool IsEmpty() const override {
return SelfActor_->IsEmpty();
bool IsEmpty() override {
UpdateWriteStatus();

return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadingBlobs_.empty();
}

bool IsFull() const override {
return SelfActor_->IsFull();
bool IsFull() override {
UpdateWriteStatus();

return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsTotalSize_ > MAX_INFLIGHT_BLOBS_SIZE;
}

void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) override {
SelfActor_->Put(blobId, std::move(blob), cookie);
UpdateWriteStatus();

auto promise = NThreading::NewPromise<void>();
auto future = promise.GetFuture();

ui64 blobSize = blob.size();

ActorSystem_->Send(ChannelStorageActorId_, new TEvDqChannelSpilling::TEvPut(blobId, std::move(blob), std::move(promise)), /*flags*/0, cookie);

WritingBlobs_.emplace(blobId, TWritingBlobInfo(blobSize, std::move(future)));
WritingBlobsTotalSize_ += blobSize;
}

bool Get(ui64 blobId, TBuffer& blob, ui64 cookie = 0) override {
return SelfActor_->Get(blobId, blob, cookie);
UpdateWriteStatus();

const auto it = LoadingBlobs_.find(blobId);
// If we didn't request loading blob from spilling -> request it
if (it == LoadingBlobs_.end()) {
auto promise = NThreading::NewPromise<TBuffer>();
auto future = promise.GetFuture();
ActorSystem_->Send(ChannelStorageActorId_, new TEvDqChannelSpilling::TEvGet(blobId, std::move(promise)), /*flags*/0, cookie);

LoadingBlobs_.emplace(blobId, std::move(future));
return false;
}
// If we requested loading blob, but it's not loaded -> wait
if (!it->second.HasValue()) return false;

blob = std::move(it->second.ExtractValue());
LoadingBlobs_.erase(it);
--StoredBlobsCount_;

return true;
}

private:
void UpdateWriteStatus() {
for (auto it = WritingBlobs_.begin(); it != WritingBlobs_.end();) {
if (it->second.IsBlobWrittenFuture_.HasValue()) {
WritingBlobsTotalSize_ -= it->second.BlobSize_;
++StoredBlobsCount_;
it = WritingBlobs_.erase(it);
} else {
++it;
}
}
}

private:
IDqChannelStorageActor* SelfActor_;
TActorId SelfActorId_;
TActorId SelfId_;
IDqChannelStorageActor* ChannelStorageActor_;
TActorId ChannelStorageActorId_;
TActorSystem *ActorSystem_;

// BlobId -> future with requested blob
std::unordered_map<ui64, NThreading::TFuture<TBuffer>> LoadingBlobs_;
// BlobId -> future with some additional info
std::unordered_map<ui64, TWritingBlobInfo> WritingBlobs_;
ui64 WritingBlobsTotalSize_ = 0;

ui64 StoredBlobsCount_ = 0;
};

} // anonymous namespace

IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem, bool isConcurrent)
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
{
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem, isConcurrent);
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
}

} // namespace NYql::NDq
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/spilling/channel_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ namespace NActors {
namespace NYql::NDq {

IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem, bool isConcurrent);
IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);

} // namespace NYql::NDq
Loading

0 comments on commit 41196f0

Please sign in to comment.