Skip to content

Commit

Permalink
YQ-4046 KqpRun improved templates (#13951)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jan 29, 2025
1 parent 97df0fb commit f490faf
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 34 deletions.
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ ActorSystemConfig {

ColumnShardConfig {
DisabledOnSchemeShard: false
WritingInFlightRequestBytesLimit: 104857600
}

FeatureFlags {
EnableExternalDataSources: true
EnableScriptExecutionOperations: true
EnableExternalSourceSchemaInference: true
EnableTempTables: true
EnableReplaceIfExistsForExternalEntities: true
}

KQPConfig {
Expand Down
70 changes: 53 additions & 17 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ struct TExecutionOptions {

TRequestOptions GetSchemeQueryOptions() const {
TString sql = SchemeQuery;
if (UseTemplates) {
ReplaceYqlTokenTemplate(sql);
}

return {
.Query = sql,
Expand All @@ -108,7 +105,6 @@ struct TExecutionOptions {

TString sql = ScriptQueries[index];
if (UseTemplates) {
ReplaceYqlTokenTemplate(sql);
SubstGlobal(sql, "${QUERY_ID}", ToString(queryId));
}

Expand Down Expand Up @@ -271,16 +267,6 @@ struct TExecutionOptions {
ythrow yexception() << "Cannot format storage without real PDisks, please use --storage-path";
}
}

private:
static void ReplaceYqlTokenTemplate(TString& sql) {
const TString variableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
SubstGlobal(sql, variableName, yqlToken);
} else if (sql.Contains(variableName)) {
ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n";
}
}
};


Expand Down Expand Up @@ -446,6 +432,7 @@ class TMain : public TMainClassArgs {
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;

std::unordered_map<TString, TString> Templates;
THashMap<TString, TString> TablesMapping;
TVector<TString> UdfsPaths;
TString UdfsDirectory;
Expand Down Expand Up @@ -537,6 +524,31 @@ class TMain : public TMainClassArgs {
.NoArgument()
.SetFlag(&ExecutionOptions.UseTemplates);

options.AddLongOption("var-template", "Add template from environment variables or file for -s and -p queries (use variable@file for files)")
.RequiredArgument("variable")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
TStringBuf variable;
TStringBuf filePath;
TStringBuf(option->CurVal()).Split('@', variable, filePath);
if (variable.empty()) {
ythrow yexception() << "Variable name should not be empty";
}

TString value;
if (!filePath.empty()) {
value = LoadFile(TString(filePath));
} else {
value = GetEnv(TString(variable));
if (!value) {
ythrow yexception() << "Invalid env template, can not find value for variable '" << variable << "'";
}
}

if (!Templates.emplace(variable, value).second) {
ythrow yexception() << "Got duplicated template variable name '" << variable << "'";
}
});

options.AddLongOption('t', "table", "File with input table (can be used by YT with -E flag), table@file")
.RequiredArgument("table@file")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
Expand Down Expand Up @@ -845,6 +857,11 @@ class TMain : public TMainClassArgs {
.NoArgument()
.SetFlag(&EmulateYt);

options.AddLongOption('H', "health-check", "Level of health check before start (max level 2)")
.RequiredArgument("uint")
.DefaultValue(1)
.StoreResult(&RunnerOptions.YdbSettings.HealthCheckLevel);

options.AddLongOption("domain", "Test cluster domain name")
.RequiredArgument("name")
.DefaultValue(RunnerOptions.YdbSettings.DomainName)
Expand All @@ -862,9 +879,8 @@ class TMain : public TMainClassArgs {
.RequiredArgument("path")
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);

options.AddLongOption("storage-size", "Domain storage size in gigabytes")
options.AddLongOption("storage-size", "Domain storage size in gigabytes (32 GiB by default)")
.RequiredArgument("uint")
.DefaultValue(32)
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
return static_cast<ui64>(diskSize) << 30;
});
Expand Down Expand Up @@ -898,6 +914,11 @@ class TMain : public TMainClassArgs {
int DoRun(NLastGetopt::TOptsParseResult&&) override {
ExecutionOptions.Validate(RunnerOptions);

ReplaceTemplates(ExecutionOptions.SchemeQuery);
for (auto& sql : ExecutionOptions.ScriptQueries) {
ReplaceTemplates(sql);
}

RunnerOptions.YdbSettings.YqlToken = YqlToken;
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();

Expand Down Expand Up @@ -943,6 +964,21 @@ class TMain : public TMainClassArgs {

return 0;
}

private:
void ReplaceTemplates(TString& sql) const {
for (const auto& [variable, value] : Templates) {
SubstGlobal(sql, TStringBuilder() << "${" << variable <<"}", value);
}
if (ExecutionOptions.UseTemplates) {
const TString tokenVariableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
SubstGlobal(sql, tokenVariableName, yqlToken);
} else if (sql.Contains(tokenVariableName)) {
ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable";
}
}
}
};


Expand Down Expand Up @@ -972,7 +1008,7 @@ void FloatingPointExceptionHandler(int) {

Cerr << colors.Red() << "======= floating point exception call stack ========" << colors.Default() << Endl;
FormatBackTrace(&Cerr);
Cerr << colors.Red() << "==============================================" << colors.Default() << Endl;
Cerr << colors.Red() << "====================================================" << colors.Default() << Endl;

abort();
}
Expand Down
81 changes: 69 additions & 12 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,40 +232,78 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);

public:
TResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
: ExpectedNodeCount_(expectedNodeCount)
TResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings)
: Settings_(settings)
, Promise_(promise)
{}

void Bootstrap() {
Become(&TResourcesWaiterActor::StateFunc);
if (Settings_.HealthCheckLevel < 1) {
Finish();
return;
}

Become(&TResourcesWaiterActor::StateWaitNodeCont);
CheckResourcesPublish();
}

void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
void HandleWaitNodeCountWakeup() {
CheckResourcesPublish();
}

void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
if (ev->Get()->NodeCount == ExpectedNodeCount_) {
Promise_.SetValue();
PassAway();
const auto nodeCont = ev->Get()->NodeCount;
if (nodeCont == Settings_.ExpectedNodeCount) {
if (Settings_.HealthCheckLevel < 2) {
Finish();
} else {
Become(&TResourcesWaiterActor::StateWaitScript);
StartScriptQuery();
}
return;
}

if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry invalid node count, got " << nodeCont << ", expected " << Settings_.ExpectedNodeCount << CoutColors_.Default() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
}

STRICT_STFUNC(StateFunc,
hFunc(NActors::TEvents::TEvWakeup, Handle);
void HandleWaitScriptWakeup() {
StartScriptQuery();
}

void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
Finish();
return;
}

if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry script creation fail with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
}

STRICT_STFUNC(StateWaitNodeCont,
sFunc(NActors::TEvents::TEvWakeup, HandleWaitNodeCountWakeup);
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
)

STRICT_STFUNC(StateWaitScript,
sFunc(NActors::TEvents::TEvWakeup, HandleWaitScriptWakeup);
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
)

private:
void CheckResourcesPublish() {
GetResourceManager();

if (!ResourceManager_) {
if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry uninitialized resource manager" << CoutColors_.Default() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
return;
}
Expand All @@ -287,8 +325,27 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
});
}

void StartScriptQuery() {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString());

auto request = event->Record.MutableRequest();
request->SetQuery("SELECT 42");
request->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT);
request->SetAction(NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE);
request->SetDatabase(Settings_.Database);

Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
}

void Finish() {
Promise_.SetValue();
PassAway();
}

private:
const i32 ExpectedNodeCount_;
const TWaitResourcesSettings Settings_;
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
NThreading::TPromise<void> Promise_;

std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
Expand Down Expand Up @@ -415,8 +472,8 @@ NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settin
return new TAsyncQueryRunnerActor(settings);
}

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
return new TResourcesWaiterActor(promise, expectedNodeCount);
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings) {
return new TResourcesWaiterActor(promise, settings);
}

NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise) {
Expand Down
9 changes: 8 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ struct TCreateSessionRequest {
ui8 VerboseLevel;
};

struct TWaitResourcesSettings {
i32 ExpectedNodeCount;
ui8 HealthCheckLevel;
ui8 VerboseLevel;
TString Database;
};

struct TEvPrivate {
enum EEv : ui32 {
EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
Expand Down Expand Up @@ -83,7 +90,7 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr

NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings);

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings);

NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise);

Expand Down
3 changes: 2 additions & 1 deletion ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ struct TYdbSetupSettings {
std::unordered_set<TString> SharedTenants;
std::unordered_set<TString> ServerlessTenants;
TDuration InitializationTimeout = TDuration::Seconds(10);
ui8 HealthCheckLevel = 1;
bool SameSession = false;

bool DisableDiskMock = false;
bool FormatStorage = false;
std::optional<TString> PDisksPath;
ui64 DiskSize = 32_GB;
std::optional<ui64> DiskSize;

bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/proto/storage_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package NKqpRun;

message TStorageMeta {
uint64 StorageGeneration = 1;
uint64 StorageSize = 2;
}
22 changes: 19 additions & 3 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ class TYdbSetup::TImpl {
void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const {
TString diskPath;
if (Settings_.PDisksPath && *Settings_.PDisksPath != "-") {
diskPath = TStringBuilder() << *Settings_.PDisksPath << "/";
if (Settings_.PDisksPath->empty()) {
ythrow yexception() << "Storage directory path should not be empty";
}
diskPath = TStringBuilder() << Settings_.PDisksPath << (Settings_.PDisksPath->back() != '/' ? "/" : "");
}

bool formatDisk = true;
Expand All @@ -196,6 +199,13 @@ class TYdbSetup::TImpl {
formatDisk = false;
}

if (Settings_.DiskSize && storageMeta.GetStorageSize() != *Settings_.DiskSize) {
if (!formatDisk) {
ythrow yexception() << "Cannot change disk size without formatting storage, please use --format-storage";
}
storageMeta.SetStorageSize(*Settings_.DiskSize);
}

TString storageMetaStr;
google::protobuf::TextFormat::PrintToString(storageMeta, &storageMetaStr);

Expand All @@ -208,7 +218,7 @@ class TYdbSetup::TImpl {
.UseDisk = !!Settings_.PDisksPath,
.SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE,
.ChunkSize = Settings_.PDisksPath ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE,
.DiskSize = Settings_.DiskSize,
.DiskSize = Settings_.DiskSize ? *Settings_.DiskSize : 32_GB,
.FormatDisk = formatDisk,
.DiskPath = diskPath
};
Expand Down Expand Up @@ -351,7 +361,13 @@ class TYdbSetup::TImpl {

void WaitResourcesPublishing() const {
auto promise = NThreading::NewPromise();
GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount), 0, GetRuntime()->GetAppData().SystemPoolId);
const TWaitResourcesSettings settings = {
.ExpectedNodeCount = static_cast<i32>(Settings_.NodeCount),
.HealthCheckLevel = Settings_.HealthCheckLevel,
.VerboseLevel = Settings_.VerboseLevel,
.Database = NKikimr::CanonizePath(Settings_.DomainName)
};
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId);

try {
promise.GetFuture().GetValue(Settings_.InitializationTimeout);
Expand Down

0 comments on commit f490faf

Please sign in to comment.