Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-4052 fixed url escaping for s3 insert #13891

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions ydb/core/external_sources/s3/ut/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "common.h"

#include <ydb/core/kqp/ut/federated_query/common/common.h>
#include <ydb-cpp-sdk/client/draft/ydb_scripting.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/types/operation/operation.h>

#include <fmt/format.h>

namespace NKikimr::NKqp {

using namespace NYdb;
using namespace NYdb::NQuery;
using namespace NKikimr::NKqp::NFederatedQueryTest;
using namespace fmt::literals;

TString Exec(const TString& cmd) {
std::array<char, 128> buffer;
TString result;
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd.c_str(), "r"), pclose);
if (!pipe) {
throw std::runtime_error("popen() failed!");
}
while (fgets(buffer.data(), static_cast<int>(buffer.size()), pipe.get()) != nullptr) {
result += buffer.data();
}
return result;
}

TString GetExternalPort(const TString& service, const TString& port) {
auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose");
auto composeFileYml = ArcadiaFromCurrentLocation(__SOURCE_FILE__, "docker-compose.yml");
auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList<TString>();
return result ? Strip(result.back()) : TString{};
}

void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDataSourceName) {
auto db = kikimr->GetQueryClient();
for (size_t i = 0; i < 100; i++) {
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`/a/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) {
return;
}
Sleep(TDuration::Seconds(1));
}
UNIT_FAIL("Bucket isn't ready");
}

} // namespace NKikimr::NKqp
13 changes: 13 additions & 0 deletions ydb/core/external_sources/s3/ut/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include <ydb/core/kqp/ut/common/kqp_ut_common.h>

namespace NKikimr::NKqp {

TString Exec(const TString& cmd);

TString GetExternalPort(const TString& service, const TString& port);

void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDataSourceName);

} // namespace NKikimr::NKqp
46 changes: 2 additions & 44 deletions ydb/core/external_sources/s3/ut/s3_aws_credentials_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "common.h"

#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/federated_query/common/common.h>
Expand All @@ -24,50 +26,6 @@ using namespace NYdb::NQuery;
using namespace NKikimr::NKqp::NFederatedQueryTest;
using namespace fmt::literals;

TString Exec(const TString& cmd) {
std::array<char, 128> buffer;
TString result;
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd.c_str(), "r"), pclose);
if (!pipe) {
throw std::runtime_error("popen() failed!");
}
while (fgets(buffer.data(), static_cast<int>(buffer.size()), pipe.get()) != nullptr) {
result += buffer.data();
}
return result;
}

TString GetExternalPort(const TString& service, const TString& port) {
auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose");
auto composeFileYml = ArcadiaFromCurrentLocation(__SOURCE_FILE__, "docker-compose.yml");
auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList<TString>();
return result ? Strip(result.back()) : TString{};
}

void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDataSourceName) {
auto db = kikimr->GetQueryClient();
for (size_t i = 0; i < 100; i++) {
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`/a/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) {
return;
}
Sleep(TDuration::Seconds(1));
}
UNIT_FAIL("Bucket isn't ready");
}

Y_UNIT_TEST_SUITE(S3AwsCredentials) {
Y_UNIT_TEST(ExecuteScriptWithEqSymbol) {
const TString externalDataSourceName = "/Root/external_data_source";
Expand Down
113 changes: 113 additions & 0 deletions ydb/core/external_sources/s3/ut/s3_insert_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include "common.h"

#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/federated_query/common/common.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
#include <yql/essentials/utils/log/log.h>
#include <ydb-cpp-sdk/client/draft/ydb_scripting.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/types/operation/operation.h>

#include <library/cpp/testing/unittest/registar.h>

#include <util/generic/strbuf.h>
#include <util/generic/string.h>
#include <util/system/env.h>

#include <fmt/format.h>

namespace NKikimr::NKqp {

using namespace NYdb;
using namespace NYdb::NQuery;
using namespace NKikimr::NKqp::NFederatedQueryTest;
using namespace fmt::literals;

Y_UNIT_TEST_SUITE(S3Inset) {
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
Y_UNIT_TEST(TestInsertEscaping) {
const TString externalDataSourceName = "/Root/external_data_source";
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
auto kikimr = MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, s3ActorsFactory);

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE OBJECT id (TYPE SECRET) WITH (value=`minio`);
CREATE OBJECT key (TYPE SECRET) WITH (value=`minio123`);
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="id",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="key",
AWS_REGION="ru-central-1"
);
)",
"external_source"_a = externalDataSourceName,
"location"_a = "localhost:" + GetExternalPort("minio", "9000") + "/datalake/"
);

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

WaitBucket(kikimr, externalDataSourceName);

auto db = kikimr->GetQueryClient();

// TODO: remove ';' from skip list
TString path = TStringBuilder() << "exp_folder/some_" << EscapeC(GetSymbolsString(' ', '~', "*?{}`;")) << "\\`";

{
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
PRAGMA s3.AtomicUploadCommit = "true";
INSERT INTO `{external_source}`.`{path}/` WITH (FORMAT = "csv_with_names")
SELECT * FROM `{external_source}`.`/a/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
}

{
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`{path}/` WITH (
format="csv_with_names",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());

TResultSetParser resultSet(results.ExtractResultSet());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
}
}

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/external_sources/s3/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ IF (OPENSOURCE)
ENDIF()

SRCS(
common.cpp
s3_aws_credentials_ut.cpp
s3_insert_ut.cpp
)

PEERDIR(
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NKqp::NFederatedQueryTest {
TString GetSymbolsString(char start, char end, const TString& skip) {
TStringBuilder result;
for (char symbol = start; symbol <= end; ++symbol) {
if (skip.Contains(symbol)) {
continue;
}
result << symbol;
}
return result;
}

NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
NYdb::NOperation::TOperationClient client(ydbDriver);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace NKikimr::NKqp::NFederatedQueryTest {
using namespace NKikimr::NKqp;

TString GetSymbolsString(char start, char end, const TString& skip = "");

NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(
const NYdb::TOperation::TOperationId& operationId,
const NYdb::TDriver& ydbDriver);
Expand Down
11 changes: 0 additions & 11 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@ using namespace NTestUtils;
using namespace fmt::literals;

Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
TString GetSymbolsString(char start, char end, const TString& skip = "") {
TStringBuilder result;
for (char symbol = start; symbol <= end; ++symbol) {
if (skip.Contains(symbol)) {
continue;
}
result << symbol;
}
return result;
}

Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,12 @@ TString TAwsSignature::HashSHA256(TStringBuf data) {
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash) {
TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash, bool encodePercent) {
TStringStream result;
for (const char ch : input) {
if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || ch == '_' ||
ch == '-' || ch == '~' || ch == '.') {
ch == '-' || ch == '~' || ch == '.' || (ch == '/' && !encodeSlash) || (ch == '%' && !encodePercent)) {
result << ch;
} else if (ch == '/') {
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
if (encodeSlash) {
result << "%2F";
} else {
result << ch;
}
} else {
result << "%" << HexEncode(&ch, 1);
}
Expand All @@ -175,11 +169,10 @@ void TAwsSignature::PrepareCgiParameters() {

auto printSingleParam = [&canonicalCgi](const TString& key, const TVector<TString>& values) {
auto it = values.begin();
canonicalCgi << UriEncode(key, true) << "=" << UriEncode(*it, true);
canonicalCgi << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
while (++it != values.end()) {
canonicalCgi << "&" << UriEncode(key, true) << "=" << UriEncode(*it, true);
canonicalCgi << "&" << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
}

};

auto it = sortedCgi.begin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct TAwsSignature {

static TString HashSHA256(TStringBuf data);

static TString UriEncode(const TStringBuf input, bool encodeSlash = false);
static TString UriEncode(const TStringBuf input, bool encodeSlash = false, bool encodePercent = false);

void PrepareCgiParameters();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TCompleteMultipartUpload {
}

TString BuildUrl() const {
TUrlBuilder urlBuilder(Url);
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
urlBuilder.AddUrlParam("uploadId", UploadId);
return urlBuilder.Build();
}
Expand Down Expand Up @@ -87,7 +87,7 @@ struct TListMultipartUploads {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(Url);
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
if (KeyMarker) {
urlBuilder.AddUrlParam("key-marker", KeyMarker);
}
Expand All @@ -114,7 +114,7 @@ struct TAbortMultipartUpload {
}

TString BuildUrl() const {
TUrlBuilder urlBuilder(Url);
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
urlBuilder.AddUrlParam("uploadId", UploadId);
return urlBuilder.Build();
}
Expand All @@ -141,7 +141,7 @@ struct TListParts {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(Url);
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
if (PartNumberMarker) {
urlBuilder.AddUrlParam("part-number-marker", PartNumberMarker);
}
Expand Down Expand Up @@ -682,4 +682,4 @@ THolder<NActors::IActor> MakeS3ApplicatorActor(
);
}

} // namespace NYql::NDq
} // namespace NYql::NDq
Loading