Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
stanislav-shchetinin committed Jan 16, 2025
1 parent 5445590 commit c8a6d44
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 17 deletions.
27 changes: 11 additions & 16 deletions ydb/core/tx/datashard/export_s3_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
this->Become(&TThis::StateUploadPermissions);
}

void PutDescription(const google::protobuf::Message& desc, TString key, TString& checksum, auto stateFunc) {
void PutDescription(const google::protobuf::Message& desc, const TString& key, TString& checksum, const auto& stateFunc) {
google::protobuf::TextFormat::PrintToString(desc, &Buffer);
if (EnableChecksums) {
checksum = ComputeExportChecksum(Buffer);
checksum = ComputeChecksum(Buffer);
}
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(key);
Expand Down Expand Up @@ -255,13 +255,11 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
this->Become(&TThis::StateUploadData);
return;
}
const auto& changefeed = Changefeeds[IndexExportedChangefeed].ChangefeedDescription;
PutChangefeedDescription(changefeed, GetCurrentChangefeedName());
PutChangefeedDescription(Changefeeds[IndexExportedChangefeed].ChangefeedDescription, GetCurrentChangefeedName());
}

void UploadTopic() {
const auto& topic = Changefeeds[IndexExportedChangefeed].Topic;
PutTopicDescription(topic, GetCurrentChangefeedName());
PutTopicDescription(Changefeeds[IndexExportedChangefeed].Topic, GetCurrentChangefeedName());
}

void UploadMetadata() {
Expand Down Expand Up @@ -353,12 +351,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
}

auto nextStep = [this]() {
if (IndexExportedChangefeed == Changefeeds.size()) {
ChangefeedsUploaded = true;
this->Become(&TThis::StateUploadData);
} else {
UploadTopic();
}
UploadTopic();
};
if (EnableChecksums) {
TString checksumKey = ChecksumKey(Settings.GetChangefeedKey(GetCurrentChangefeedName()));
Expand Down Expand Up @@ -943,12 +936,14 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
for (int i = 0; i < changefeedsCount; ++i) {
Ydb::Table::ChangefeedDescription changefeedDesc;
Ydb::Topic::DescribeTopicResult topic;
FillChangefeedDescription(changefeedDesc, cdcStreams[i]);
const auto& cdcStream = cdcStreams[i];
const auto& persQueueTPathDesc = persQueuesTPathDesc[i];
FillChangefeedDescription(changefeedDesc, cdcStream);
Ydb::StatusIds_StatusCode status;
TString error;
FillTopicDescription(topic, persQueuesTPathDesc[i].GetPersQueueGroup(),
persQueuesTPathDesc[i].GetSelf(),
cdcStreams[i].GetName(), status, error);
FillTopicDescription(topic, persQueueTPathDesc.GetPersQueueGroup(),
persQueueTPathDesc.GetSelf(),
cdcStream.GetName(), status, error);
changefeedsExportDescs.emplace_back(changefeedDesc, topic);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,7 @@ TPath TPath::FindOlapStore() const {

bool TPath::IsCommonSensePath() const {
Y_ABORT_UNLESS(IsResolved());

for (auto item = ++Elements.rbegin(); item != Elements.rend(); ++item) {
// Directories and domain roots are always ok as intermediaries
bool ok = (*item)->IsDirectory() || (*item)->IsDomainRoot();
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,58 @@ partitioning_settings {
)"));
}

Y_UNIT_TEST(Checksums) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);

UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8);

const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
UNIT_ASSERT(dataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");

const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256");
UNIT_ASSERT(metadataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json");

const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256");
UNIT_ASSERT(schemeChecksum);
UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb");

const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256");
UNIT_ASSERT(permissionsChecksum);
UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb");
}

Y_UNIT_TEST(ChecksumsWithCompression) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down

0 comments on commit c8a6d44

Please sign in to comment.