Skip to content

Commit

Permalink
Fix lost logs in data integrity test (#13871)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Jan 27, 2025
1 parent afd8c9e commit 0e77ef0
Showing 1 changed file with 131 additions and 119 deletions.
250 changes: 131 additions & 119 deletions ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@ namespace {

Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
Y_UNIT_TEST_TWIN(Upsert, LogEnabled) {
TKikimrSettings serverSettings;
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
{
TKikimrSettings serverSettings;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);

if (LogEnabled) {
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
}

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
if (LogEnabled) {
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
}
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0);
Expand All @@ -51,26 +53,28 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

Y_UNIT_TEST(UpsertEvWrite) {
NKikimrConfig::TAppConfig AppConfig;
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
{
NKikimrConfig::TAppConfig AppConfig;
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// check write actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
Expand All @@ -83,45 +87,47 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

Y_UNIT_TEST_TWIN(UpsertEvWriteQueryService, isOlap) {
NKikimrConfig::TAppConfig AppConfig;
if (!isOlap) {
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
} else {
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
}
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
{
NKikimrConfig::TAppConfig AppConfig;
if (!isOlap) {
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
} else {
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
}
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);

auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();
auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

{
const TString query = Sprintf(R"(
CREATE TABLE `/Root/test_evwrite` (
Key Int64 NOT NULL,
Value String,
primary key (Key)
) WITH (STORE=%s);
)", isOlap ? "COLUMN" : "ROW");
{
const TString query = Sprintf(R"(
CREATE TABLE `/Root/test_evwrite` (
Key Int64 NOT NULL,
Value String,
primary key (Key)
) WITH (STORE=%s);
)", isOlap ? "COLUMN" : "ROW");

auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

ss.Clear();
ss.Clear();

auto result = session.ExecuteQuery(R"(
--!syntax_v1
auto result = session.ExecuteQuery(R"(
--!syntax_v1
UPSERT INTO `/Root/test_evwrite` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UPSERT INTO `/Root/test_evwrite` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

if (!isOlap) {
// check write actor logs
Expand All @@ -148,24 +154,26 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

Y_UNIT_TEST(Ddl) {
TKikimrSettings serverSettings;
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/Tmp` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
{
TKikimrSettings serverSettings;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/Tmp` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0);
Expand All @@ -178,20 +186,22 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

Y_UNIT_TEST(Select) {
TKikimrSettings serverSettings;
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
{
TKikimrSettings serverSettings;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
SELECT * FROM `/Root/KeyValue`;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
SELECT * FROM `/Root/KeyValue`;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// check executer logs (should be empty, because executer only logs modification operations)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0);
Expand All @@ -204,30 +214,32 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
TKikimrSettings serverSettings;
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());


const auto query = R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)";

if (Streaming) {
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CollectStreamResult(result);
} else {
auto result = client.ExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
{
TKikimrSettings serverSettings;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());


const auto query = R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)";

if (Streaming) {
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CollectStreamResult(result);
} else {
auto result = client.ExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

// check executer logs
Expand Down

0 comments on commit 0e77ef0

Please sign in to comment.