Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-4052 fixed url escaping for s3 insert #13891

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions ydb/core/external_sources/s3/ut/s3_aws_credentials_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,88 @@ Y_UNIT_TEST_SUITE(S3AwsCredentials) {
}

}

Y_UNIT_TEST(TestInsertEscaping) {
const TString externalDataSourceName = "/Root/external_data_source";
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
auto kikimr = MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, s3ActorsFactory);

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE OBJECT id (TYPE SECRET) WITH (value=`minio`);
CREATE OBJECT key (TYPE SECRET) WITH (value=`minio123`);
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="id",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="key",
AWS_REGION="ru-central-1"
);
)",
"external_source"_a = externalDataSourceName,
"location"_a = "localhost:" + GetExternalPort("minio", "9000") + "/datalake/"
);

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

WaitBucket(kikimr, externalDataSourceName);

auto db = kikimr->GetQueryClient();

TString path = TStringBuilder() << "exp_folder/some_" << EscapeC(GetSymbolsString(' ', '~', "*?{}`")) << "\\`";

{
// NB: AtomicUploadCommit = "false" because in minio ListMultipartUploads by prefix is not supported
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
PRAGMA s3.AtomicUploadCommit = "false";
INSERT INTO `{external_source}`.`{path}/` WITH (FORMAT = "csv_with_names")
SELECT * FROM `{external_source}`.`/a/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
}

{
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`{path}/` WITH (
format="csv_with_names",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());

TResultSetParser resultSet(results.ExtractResultSet());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
}
}

} // namespace NKikimr::NKqp
10 changes: 10 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NKqp::NFederatedQueryTest {
TString GetSymbolsString(char start, char end, const TString& skip) {
TStringBuilder result;
for (char symbol = start; symbol <= end; ++symbol) {
if (skip.Contains(symbol)) {
continue;
}
result << symbol;
}
return result;
}

NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
NYdb::NOperation::TOperationClient client(ydbDriver);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace NKikimr::NKqp::NFederatedQueryTest {
using namespace NKikimr::NKqp;

TString GetSymbolsString(char start, char end, const TString& skip = "");

NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(
const NYdb::TOperation::TOperationId& operationId,
const NYdb::TDriver& ydbDriver);
Expand Down
11 changes: 0 additions & 11 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@ using namespace NTestUtils;
using namespace fmt::literals;

Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
TString GetSymbolsString(char start, char end, const TString& skip = "") {
TStringBuilder result;
for (char symbol = start; symbol <= end; ++symbol) {
if (skip.Contains(symbol)) {
continue;
}
result << symbol;
}
return result;
}

Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ TString TAwsSignature::HashSHA256(TStringBuf data) {
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash) {
TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash, bool encodePercent) {
TStringStream result;
for (const char ch : input) {
if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || ch == '_' ||
ch == '-' || ch == '~' || ch == '.') {
ch == '-' || ch == '~' || ch == '.' || (ch == '%' && !encodePercent)) {
result << ch;
} else if (ch == '/') {
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
if (encodeSlash) {
Expand Down Expand Up @@ -175,11 +175,10 @@ void TAwsSignature::PrepareCgiParameters() {

auto printSingleParam = [&canonicalCgi](const TString& key, const TVector<TString>& values) {
auto it = values.begin();
canonicalCgi << UriEncode(key, true) << "=" << UriEncode(*it, true);
canonicalCgi << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
while (++it != values.end()) {
canonicalCgi << "&" << UriEncode(key, true) << "=" << UriEncode(*it, true);
canonicalCgi << "&" << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
}

};

auto it = sortedCgi.begin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct TAwsSignature {

static TString HashSHA256(TStringBuf data);

static TString UriEncode(const TStringBuf input, bool encodeSlash = false);
static TString UriEncode(const TStringBuf input, bool encodeSlash = false, bool encodePercent = false);

void PrepareCgiParameters();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <util/network/address.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/string_utils/quote/quote.h>

namespace NYql {

Expand Down Expand Up @@ -68,5 +69,15 @@ Y_UNIT_TEST_SUITE(TAwsSignature) {
UNIT_ASSERT_VALUES_EQUAL(signature1.GetAmzDate(), signature2.GetAmzDate());
UNIT_ASSERT_VALUES_EQUAL(signature1.GetAuthorization(), signature2.GetAuthorization());
}

Y_UNIT_TEST(SignWithEscaping) {
auto time = TInstant::FromValue(30);
NYql::TAwsSignature signature("GET", UrlEscapeRet("http://os.com/my-bucket/ !\"#$%&'()+,-./0123456789:;<=>@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz|~/", true), "application/json", {}, "key", "pwd", time);
UNIT_ASSERT_VALUES_EQUAL(signature.GetContentType(), "application/json");
UNIT_ASSERT_VALUES_EQUAL(signature.GetXAmzContentSha256(), "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
UNIT_ASSERT_VALUES_EQUAL(signature.GetAuthorization(), "AWS4-HMAC-SHA256 Credential=/19700101///aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=21470c8f999941fdc785c508f0c55afa1a12735eddd868aa7276e532d687c436");
UNIT_ASSERT_VALUES_UNEQUAL(signature.GetAmzDate(), "");
}
} // Y_UNIT_TEST_SUITE(TAwsSignature)

} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TCompleteMultipartUpload {
}

TString BuildUrl() const {
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
urlBuilder.AddUrlParam("uploadId", UploadId);
return urlBuilder.Build();
}
Expand Down Expand Up @@ -87,7 +87,7 @@ struct TListMultipartUploads {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
if (KeyMarker) {
urlBuilder.AddUrlParam("key-marker", KeyMarker);
}
Expand All @@ -114,7 +114,7 @@ struct TAbortMultipartUpload {
}

TString BuildUrl() const {
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
urlBuilder.AddUrlParam("uploadId", UploadId);
return urlBuilder.Build();
}
Expand All @@ -141,7 +141,7 @@ struct TListParts {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
if (PartNumberMarker) {
urlBuilder.AddUrlParam("part-number-marker", PartNumberMarker);
}
Expand Down Expand Up @@ -682,4 +682,4 @@ THolder<NActors::IActor> MakeS3ApplicatorActor(
);
}

} // namespace NYql::NDq
} // namespace NYql::NDq
30 changes: 30 additions & 0 deletions ydb/library/yql/providers/s3/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,34 @@ bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExpr
return true;
}

TUrlBuilder::TUrlBuilder(const TString& uri)
: MainUri(uri)
{}

TUrlBuilder& TUrlBuilder::AddUrlParam(const TString& name, const TString& value) {
Params.emplace_back(name, value);
return *this;
}

TString TUrlBuilder::Build() const {
if (Params.empty()) {
return MainUri;
}

TStringBuilder result;
result << MainUri << "?";

TStringBuf separator = ""sv;
for (const auto& p : Params) {
result << separator << p.Name;
if (auto value = p.Value) {
Quote(value, "");
result << "=" << value;
}
separator = "&"sv;
}

return std::move(result);
}

}
18 changes: 18 additions & 0 deletions ydb/library/yql/providers/s3/common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,22 @@ TString UrlEscapeRet(const TStringBuf from);

bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx);

class TUrlBuilder {
struct TParam {
TString Name;
TString Value;
};

public:
explicit TUrlBuilder(const TString& uri);

TUrlBuilder& AddUrlParam(const TString& name, const TString& value = "");

TString Build() const;

private:
std::vector<TParam> Params;
TString MainUri;
};

}
33 changes: 33 additions & 0 deletions ydb/library/yql/providers/s3/common/util_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,37 @@ Y_UNIT_TEST_SUITE(TestS3UrlEscape) {
}
}

Y_UNIT_TEST_SUITE(TestUrlBuilder) {
Y_UNIT_TEST(UriOnly) {
TUrlBuilder builder("https://localhost/abc");
UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc");
}

Y_UNIT_TEST(Basic) {
TUrlBuilder builder("https://localhost/abc");
builder.AddUrlParam("param1", "val1");
builder.AddUrlParam("param2", "val2");

UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc?param1=val1&param2=val2");
}

Y_UNIT_TEST(BasicWithEncoding) {
auto url = TUrlBuilder("https://localhost/abc")
.AddUrlParam("param1", "=!@#$%^&*(){}[]\" ")
.AddUrlParam("param2", "val2")
.Build();

UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc?param1=%3D%21%40%23%24%25%5E%26%2A%28%29%7B%7D%5B%5D%22+&param2=val2");
}

Y_UNIT_TEST(BasicWithAdditionalEncoding) {
auto url = TUrlBuilder("https://localhost/abc")
.AddUrlParam("param1", ":/?#[]@!$&\'()*+,;=")
.AddUrlParam("param2", "val2")
.Build();

UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc?param1=%3A%2F%3F%23%5B%5D%40%21%24%26%27%28%29%2A%2B%2C%3B%3D&param2=val2");
}
}

} // namespace NYql::NS3Util
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class TS3Lister : public IS3Lister {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(ctx.ListingRequest.Url);
NS3Util::TUrlBuilder urlBuilder(ctx.ListingRequest.Url);
if (ctx.ContinuationToken.Defined()) {
urlBuilder.AddUrlParam("continuation-token", *ctx.ContinuationToken);
}
Expand Down
Loading