From ca18ddb99f446df0a22cf398cecd0c10a110941a Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Mon, 20 Jan 2025 13:41:08 +0300 Subject: [PATCH] refactoring --- ydb/core/tx/datashard/export_s3_uploader.cpp | 90 ++++++++------------ 1 file changed, 37 insertions(+), 53 deletions(-) diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 0c5807dac28d..55d4ea712104 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -185,23 +185,42 @@ class TS3Uploader: public TActorBootstrapped { } } + template + void PutBuffer(TString& buffer, const TString& key, T stateFunc) { + auto request = Aws::S3::Model::PutObjectRequest().WithKey(key); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(buffer))); + this->Become(stateFunc); + } + + template + void PutBufferWithChecksum(TString& buffer, const TString& key, TString& checksum, T stateFunc) { + if (EnableChecksums) { + checksum = ComputeChecksum(buffer); + } + PutBuffer(buffer, key, stateFunc); + } + + template + void PutMessage(const google::protobuf::Message& message, const TString& key, TString& checksum, T stateFunc) { + google::protobuf::TextFormat::PrintToString(message, &Buffer); + PutBufferWithChecksum(Buffer, key, checksum, stateFunc); + } + + void PutScheme(const Ydb::Table::CreateTableRequest& scheme) { + PutMessage(scheme, Settings.GetSchemeKey(), SchemeChecksum, &TThis::StateUploadScheme); + } + void UploadScheme() { Y_ABORT_UNLESS(!SchemeUploaded); if (!Scheme) { return Finish(false, "Cannot infer scheme"); } + PutScheme(Scheme.GetRef()); + } - google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); - if (EnableChecksums) { - SchemeChecksum = NBackup::ComputeChecksum(Buffer); - } - - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(Settings.GetSchemeKey()); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - - this->Become(&TThis::StateUploadScheme); + void PutPermissions(const Ydb::Scheme::ModifyPermissionsRequest& permissions) { + PutMessage(permissions, Settings.GetPermissionsKey(), PermissionsChecksum, &TThis::StateUploadPermissions); } void UploadPermissions() { @@ -210,37 +229,11 @@ class TS3Uploader: public TActorBootstrapped { if (!Permissions) { return Finish(false, "Cannot infer permissions"); } - - google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer); - if (EnableChecksums) { - PermissionsChecksum = NBackup::ComputeChecksum(Buffer); - } - - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(Settings.GetPermissionsKey()); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - - this->Become(&TThis::StateUploadPermissions); - } - - template - void PutDescription(const google::protobuf::Message& desc, const TString& key, TString& checksum, T stateFunc) { - google::protobuf::TextFormat::PrintToString(desc, &Buffer); - if (EnableChecksums) { - checksum = ComputeChecksum(Buffer); - } - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(key); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - this->Become(stateFunc); + PutPermissions(Permissions.GetRef()); } void PutChangefeedDescription(const Ydb::Table::ChangefeedDescription& changefeed, const TString& changefeedName) { - PutDescription(changefeed, Settings.GetChangefeedKey(changefeedName), ChangefeedChecksum, &TThis::StateUploadChangefeed); - } - - void PutTopicDescription(const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedName) { - PutDescription(topic, Settings.GetTopicKey(changefeedName), TopicChecksum, &TThis::StateUploadTopic); + PutMessage(changefeed, Settings.GetChangefeedKey(changefeedName), ChangefeedChecksum, &TThis::StateUploadChangefeed); } const TString& GetCurrentChangefeedName() const { @@ -259,6 +252,10 @@ class TS3Uploader: public TActorBootstrapped { PutChangefeedDescription(Changefeeds[IndexExportedChangefeed].ChangefeedDescription, GetCurrentChangefeedName()); } + void PutTopicDescription(const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedName) { + PutMessage(topic, Settings.GetTopicKey(changefeedName), TopicChecksum, &TThis::StateUploadTopic); + } + void UploadTopic() { PutTopicDescription(Changefeeds[IndexExportedChangefeed].Topic, GetCurrentChangefeedName()); } @@ -267,15 +264,7 @@ class TS3Uploader: public TActorBootstrapped { Y_ABORT_UNLESS(!MetadataUploaded); Buffer = std::move(Metadata); - if (EnableChecksums) { - MetadataChecksum = NBackup::ComputeChecksum(Buffer); - } - - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(Settings.GetMetadataKey()); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - - this->Become(&TThis::StateUploadMetadata); + PutBufferWithChecksum(Buffer, Settings.GetMetadataKey(), MetadataChecksum, &TThis::StateUploadMetadata); } void UploadChecksum(TString&& checksum, const TString& checksumKey, const TString& objectKeySuffix, @@ -283,13 +272,8 @@ class TS3Uploader: public TActorBootstrapped { { // make checksum verifiable using sha256sum CLI checksum += ' ' + objectKeySuffix; - - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(checksumKey); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(checksum))); - + PutBuffer(checksum, checksumKey, &TThis::StateUploadChecksum); ChecksumUploadedCallback = checksumUploadedCallback; - this->Become(&TThis::StateUploadChecksum); } void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {