Skip to content

Commit

Permalink
YQ-4052 fixed url escaping for s3 insert (ydb-platform#13891)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jan 29, 2025
1 parent 03a54f8 commit eafe171
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 23 deletions.
82 changes: 82 additions & 0 deletions ydb/core/external_sources/s3/ut/s3_aws_credentials_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,88 @@ Y_UNIT_TEST_SUITE(S3AwsCredentials) {
}

}

Y_UNIT_TEST(TestInsertEscaping) {
const TString externalDataSourceName = "/Root/external_data_source";
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
auto kikimr = MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, s3ActorsFactory);

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE OBJECT id (TYPE SECRET) WITH (value=`minio`);
CREATE OBJECT key (TYPE SECRET) WITH (value=`minio123`);
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="id",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="key",
AWS_REGION="ru-central-1"
);
)",
"external_source"_a = externalDataSourceName,
"location"_a = "localhost:" + GetExternalPort("minio", "9000") + "/datalake/"
);

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

WaitBucket(kikimr, externalDataSourceName);

auto db = kikimr->GetQueryClient();

TString path = TStringBuilder() << "exp_folder/some_" << EscapeC(GetSymbolsString(' ', '~', "*?{}`")) << "\\`";

{
// NB: AtomicUploadCommit = "false" because in minio ListMultipartUploads by prefix is not supported
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
PRAGMA s3.AtomicUploadCommit = "false";
INSERT INTO `{external_source}`.`{path}/` WITH (FORMAT = "csv_with_names")
SELECT * FROM `{external_source}`.`/a/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
}

{
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`{path}/` WITH (
format="csv_with_names",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());

TResultSetParser resultSet(results.ExtractResultSet());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
}
}

} // namespace NKikimr::NKqp
10 changes: 10 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NKqp::NFederatedQueryTest {
TString GetSymbolsString(char start, char end, const TString& skip) {
TStringBuilder result;
for (char symbol = start; symbol <= end; ++symbol) {
if (skip.Contains(symbol)) {
continue;
}
result << symbol;
}
return result;
}

NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
NYdb::NOperation::TOperationClient client(ydbDriver);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace NKikimr::NKqp::NFederatedQueryTest {
using namespace NKikimr::NKqp;

TString GetSymbolsString(char start, char end, const TString& skip = "");

NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(
const NYdb::TOperation::TOperationId& operationId,
const NYdb::TDriver& ydbDriver);
Expand Down
11 changes: 0 additions & 11 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@ using namespace NTestUtils;
using namespace fmt::literals;

Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
TString GetSymbolsString(char start, char end, const TString& skip = "") {
TStringBuilder result;
for (char symbol = start; symbol <= end; ++symbol) {
if (skip.Contains(symbol)) {
continue;
}
result << symbol;
}
return result;
}

Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ TString TAwsSignature::HashSHA256(TStringBuf data) {
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash) {
TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash, bool encodePercent) {
TStringStream result;
for (const char ch : input) {
if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || ch == '_' ||
ch == '-' || ch == '~' || ch == '.') {
ch == '-' || ch == '~' || ch == '.' || (ch == '%' && !encodePercent)) {
result << ch;
} else if (ch == '/') {
if (encodeSlash) {
Expand Down Expand Up @@ -175,11 +175,10 @@ void TAwsSignature::PrepareCgiParameters() {

auto printSingleParam = [&canonicalCgi](const TString& key, const TVector<TString>& values) {
auto it = values.begin();
canonicalCgi << UriEncode(key, true) << "=" << UriEncode(*it, true);
canonicalCgi << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
while (++it != values.end()) {
canonicalCgi << "&" << UriEncode(key, true) << "=" << UriEncode(*it, true);
canonicalCgi << "&" << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
}

};

auto it = sortedCgi.begin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct TAwsSignature {

static TString HashSHA256(TStringBuf data);

static TString UriEncode(const TStringBuf input, bool encodeSlash = false);
static TString UriEncode(const TStringBuf input, bool encodeSlash = false, bool encodePercent = false);

void PrepareCgiParameters();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <util/network/address.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/string_utils/quote/quote.h>

namespace NYql {

Expand Down Expand Up @@ -68,5 +69,15 @@ Y_UNIT_TEST_SUITE(TAwsSignature) {
UNIT_ASSERT_VALUES_EQUAL(signature1.GetAmzDate(), signature2.GetAmzDate());
UNIT_ASSERT_VALUES_EQUAL(signature1.GetAuthorization(), signature2.GetAuthorization());
}

Y_UNIT_TEST(SignWithEscaping) {
auto time = TInstant::FromValue(30);
NYql::TAwsSignature signature("GET", UrlEscapeRet("http://os.com/my-bucket/ !\"#$%&'()+,-./0123456789:;<=>@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz|~/", true), "application/json", {}, "key", "pwd", time);
UNIT_ASSERT_VALUES_EQUAL(signature.GetContentType(), "application/json");
UNIT_ASSERT_VALUES_EQUAL(signature.GetXAmzContentSha256(), "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
UNIT_ASSERT_VALUES_EQUAL(signature.GetAuthorization(), "AWS4-HMAC-SHA256 Credential=/19700101///aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=21470c8f999941fdc785c508f0c55afa1a12735eddd868aa7276e532d687c436");
UNIT_ASSERT_VALUES_UNEQUAL(signature.GetAmzDate(), "");
}
} // Y_UNIT_TEST_SUITE(TAwsSignature)

} // namespace NYql
10 changes: 5 additions & 5 deletions ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TCompleteMultipartUpload {
}

TString BuildUrl() const {
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
urlBuilder.AddUrlParam("uploadId", UploadId);
return urlBuilder.Build();
}
Expand Down Expand Up @@ -87,7 +87,7 @@ struct TListMultipartUploads {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
if (KeyMarker) {
urlBuilder.AddUrlParam("key-marker", KeyMarker);
}
Expand All @@ -114,7 +114,7 @@ struct TAbortMultipartUpload {
}

TString BuildUrl() const {
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
urlBuilder.AddUrlParam("uploadId", UploadId);
return urlBuilder.Build();
}
Expand All @@ -141,7 +141,7 @@ struct TListParts {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(Url);
NS3Util::TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
if (PartNumberMarker) {
urlBuilder.AddUrlParam("part-number-marker", PartNumberMarker);
}
Expand Down Expand Up @@ -682,4 +682,4 @@ THolder<NActors::IActor> MakeS3ApplicatorActor(
);
}

} // namespace NYql::NDq
} // namespace NYql::NDq
30 changes: 30 additions & 0 deletions ydb/library/yql/providers/s3/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,34 @@ bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExpr
return true;
}

TUrlBuilder::TUrlBuilder(const TString& uri)
: MainUri(uri)
{}

TUrlBuilder& TUrlBuilder::AddUrlParam(const TString& name, const TString& value) {
Params.emplace_back(name, value);
return *this;
}

TString TUrlBuilder::Build() const {
if (Params.empty()) {
return MainUri;
}

TStringBuilder result;
result << MainUri << "?";

TStringBuf separator = ""sv;
for (const auto& p : Params) {
result << separator << p.Name;
if (auto value = p.Value) {
Quote(value, "");
result << "=" << value;
}
separator = "&"sv;
}

return std::move(result);
}

}
18 changes: 18 additions & 0 deletions ydb/library/yql/providers/s3/common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,22 @@ TString UrlEscapeRet(const TStringBuf from);

bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx);

class TUrlBuilder {
struct TParam {
TString Name;
TString Value;
};

public:
explicit TUrlBuilder(const TString& uri);

TUrlBuilder& AddUrlParam(const TString& name, const TString& value = "");

TString Build() const;

private:
std::vector<TParam> Params;
TString MainUri;
};

}
33 changes: 33 additions & 0 deletions ydb/library/yql/providers/s3/common/util_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,37 @@ Y_UNIT_TEST_SUITE(TestS3UrlEscape) {
}
}

Y_UNIT_TEST_SUITE(TestUrlBuilder) {
Y_UNIT_TEST(UriOnly) {
TUrlBuilder builder("https://localhost/abc");
UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc");
}

Y_UNIT_TEST(Basic) {
TUrlBuilder builder("https://localhost/abc");
builder.AddUrlParam("param1", "val1");
builder.AddUrlParam("param2", "val2");

UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc?param1=val1&param2=val2");
}

Y_UNIT_TEST(BasicWithEncoding) {
auto url = TUrlBuilder("https://localhost/abc")
.AddUrlParam("param1", "=!@#$%^&*(){}[]\" ")
.AddUrlParam("param2", "val2")
.Build();

UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc?param1=%3D%21%40%23%24%25%5E%26%2A%28%29%7B%7D%5B%5D%22+&param2=val2");
}

Y_UNIT_TEST(BasicWithAdditionalEncoding) {
auto url = TUrlBuilder("https://localhost/abc")
.AddUrlParam("param1", ":/?#[]@!$&\'()*+,;=")
.AddUrlParam("param2", "val2")
.Build();

UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc?param1=%3A%2F%3F%23%5B%5D%40%21%24%26%27%28%29%2A%2B%2C%3B%3D&param2=val2");
}
}

} // namespace NYql::NS3Util
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class TS3Lister : public IS3Lister {
// This requirement will be fixed in the curl library
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
TUrlBuilder urlBuilder(ctx.ListingRequest.Url);
NS3Util::TUrlBuilder urlBuilder(ctx.ListingRequest.Url);
if (ctx.ContinuationToken.Defined()) {
urlBuilder.AddUrlParam("continuation-token", *ctx.ContinuationToken);
}
Expand Down

0 comments on commit eafe171

Please sign in to comment.