Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
stanislav-shchetinin committed Jan 20, 2025
1 parent 35b059a commit ca18ddb
Showing 1 changed file with 37 additions and 53 deletions.
90 changes: 37 additions & 53 deletions ydb/core/tx/datashard/export_s3_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,23 +185,42 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
}
}

template <typename T>
void PutBuffer(TString& buffer, const TString& key, T stateFunc) {
auto request = Aws::S3::Model::PutObjectRequest().WithKey(key);
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(buffer)));
this->Become(stateFunc);
}

template <typename T>
void PutBufferWithChecksum(TString& buffer, const TString& key, TString& checksum, T stateFunc) {
if (EnableChecksums) {
checksum = ComputeChecksum(buffer);
}
PutBuffer(buffer, key, stateFunc);
}

template <typename T>
void PutMessage(const google::protobuf::Message& message, const TString& key, TString& checksum, T stateFunc) {
google::protobuf::TextFormat::PrintToString(message, &Buffer);
PutBufferWithChecksum(Buffer, key, checksum, stateFunc);
}

void PutScheme(const Ydb::Table::CreateTableRequest& scheme) {
PutMessage(scheme, Settings.GetSchemeKey(), SchemeChecksum, &TThis::StateUploadScheme);
}

void UploadScheme() {
Y_ABORT_UNLESS(!SchemeUploaded);

if (!Scheme) {
return Finish(false, "Cannot infer scheme");
}
PutScheme(Scheme.GetRef());
}

google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
if (EnableChecksums) {
SchemeChecksum = NBackup::ComputeChecksum(Buffer);
}

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetSchemeKey());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));

this->Become(&TThis::StateUploadScheme);
void PutPermissions(const Ydb::Scheme::ModifyPermissionsRequest& permissions) {
PutMessage(permissions, Settings.GetPermissionsKey(), PermissionsChecksum, &TThis::StateUploadPermissions);
}

void UploadPermissions() {
Expand All @@ -210,37 +229,11 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
if (!Permissions) {
return Finish(false, "Cannot infer permissions");
}

google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer);
if (EnableChecksums) {
PermissionsChecksum = NBackup::ComputeChecksum(Buffer);
}

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetPermissionsKey());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));

this->Become(&TThis::StateUploadPermissions);
}

template <typename T>
void PutDescription(const google::protobuf::Message& desc, const TString& key, TString& checksum, T stateFunc) {
google::protobuf::TextFormat::PrintToString(desc, &Buffer);
if (EnableChecksums) {
checksum = ComputeChecksum(Buffer);
}
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(key);
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
this->Become(stateFunc);
PutPermissions(Permissions.GetRef());
}

void PutChangefeedDescription(const Ydb::Table::ChangefeedDescription& changefeed, const TString& changefeedName) {
PutDescription(changefeed, Settings.GetChangefeedKey(changefeedName), ChangefeedChecksum, &TThis::StateUploadChangefeed);
}

void PutTopicDescription(const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedName) {
PutDescription(topic, Settings.GetTopicKey(changefeedName), TopicChecksum, &TThis::StateUploadTopic);
PutMessage(changefeed, Settings.GetChangefeedKey(changefeedName), ChangefeedChecksum, &TThis::StateUploadChangefeed);
}

const TString& GetCurrentChangefeedName() const {
Expand All @@ -259,6 +252,10 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
PutChangefeedDescription(Changefeeds[IndexExportedChangefeed].ChangefeedDescription, GetCurrentChangefeedName());
}

void PutTopicDescription(const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedName) {
PutMessage(topic, Settings.GetTopicKey(changefeedName), TopicChecksum, &TThis::StateUploadTopic);
}

void UploadTopic() {
PutTopicDescription(Changefeeds[IndexExportedChangefeed].Topic, GetCurrentChangefeedName());
}
Expand All @@ -267,29 +264,16 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
Y_ABORT_UNLESS(!MetadataUploaded);

Buffer = std::move(Metadata);
if (EnableChecksums) {
MetadataChecksum = NBackup::ComputeChecksum(Buffer);
}

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetMetadataKey());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));

this->Become(&TThis::StateUploadMetadata);
PutBufferWithChecksum(Buffer, Settings.GetMetadataKey(), MetadataChecksum, &TThis::StateUploadMetadata);
}

void UploadChecksum(TString&& checksum, const TString& checksumKey, const TString& objectKeySuffix,
std::function<void()> checksumUploadedCallback)
{
// make checksum verifiable using sha256sum CLI
checksum += ' ' + objectKeySuffix;

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(checksumKey);
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(checksum)));

PutBuffer(checksum, checksumKey, &TThis::StateUploadChecksum);
ChecksumUploadedCallback = checksumUploadedCallback;
this->Become(&TThis::StateUploadChecksum);
}

void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
Expand Down

0 comments on commit ca18ddb

Please sign in to comment.