Skip to content

Commit 6b8cfd6

Browse files
authored
merge to stable-25-3: AvailabilityPeriod for topic consumers (#24752) (#26308)
Introduce the availability_period option to extend the period for which unprocessed messages can be retained in the topic. KIKIMR-24054
2 parents 247535b + 3784947 commit 6b8cfd6

31 files changed

+990
-191
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,10 @@ namespace {
469469
protoConsumer->set_important(FromString<bool>(
470470
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
471471
));
472+
} else if (name == "setAvailabilityPeriod"sv) {
473+
auto period = TDuration::MicroSeconds(FromString<ui64>(setting.Value().Cast<TCoInterval>().Literal().Value()));
474+
protoConsumer->mutable_availability_period()->set_seconds(period.Seconds());
475+
protoConsumer->mutable_availability_period()->set_nanos(period.NanoSecondsOfSecond());
472476
} else if (name == "setReadFromTs") {
473477
ui64 tsValue = 0;
474478
if(setting.Value().Maybe<TCoDatetime>()) {
@@ -514,6 +518,12 @@ namespace {
514518
protoConsumer->set_set_important(FromString<bool>(
515519
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
516520
));
521+
} else if (name == "setAvailabilityPeriod"sv) {
522+
auto period = TDuration::MicroSeconds(FromString<ui64>(setting.Value().Cast<TCoInterval>().Literal().Value()));
523+
protoConsumer->mutable_set_availability_period()->set_seconds(period.Seconds());
524+
protoConsumer->mutable_set_availability_period()->set_nanos(period.NanoSecondsOfSecond());
525+
} else if (name == "resetAvailabilityPeriod"sv) {
526+
protoConsumer->mutable_reset_availability_period();
517527
} else if (name == "setReadFromTs") {
518528
ui64 tsValue = 0;
519529
if(setting.Value().Maybe<TCoDatetime>()) {
@@ -1716,7 +1726,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
17161726
return SyncError();
17171727
} else if (constraint.Name().Value() == "default") {
17181728
if (table.Metadata->Kind == EKikimrTableKind::Olap) {
1719-
ctx.AddError(TIssue(ctx.GetPosition(constraint.Pos()),
1729+
ctx.AddError(TIssue(ctx.GetPosition(constraint.Pos()),
17201730
"Default values are not supported in column tables"));
17211731
return SyncError();
17221732
}
@@ -2046,10 +2056,10 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
20462056
}
20472057
} else if (name == "indexSettings") {
20482058
YQL_ENSURE(add_index->type_case() == Ydb::Table::TableIndex::kGlobalVectorKmeansTreeIndex);
2049-
2059+
20502060
Ydb::Table::KMeansTreeSettings& settings = *add_index->mutable_global_vector_kmeans_tree_index()->mutable_vector_settings();
20512061
TString error;
2052-
2062+
20532063
auto indexSettings = columnTuple.Item(1).Cast<TCoAtomList>();
20542064
for (const auto& indexSetting : indexSettings.Cast<TCoNameValueTupleList>()) {
20552065
YQL_ENSURE(indexSetting.Value().Maybe<TCoAtom>());

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,9 +1650,9 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
16501650
}
16511651
static bool CheckConsumerSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
16521652
for (const auto& setting : settings) {
1653-
auto name = setting.Name().Value();
1654-
auto val = TString(setting.Value().Cast<TCoDataCtor>().Literal().template Cast<TCoAtom>().Value());
1653+
const auto name = setting.Name().Value();
16551654
if (name == "setSupportedCodecs") {
1655+
auto val = TString(setting.Value().Cast<TCoDataCtor>().Literal().template Cast<TCoAtom>().Value());
16561656
auto codecsList = GetTopicCodecsFromString(val);
16571657
if (codecsList.empty()) {
16581658
ctx.AddError(TIssue(ctx.GetPosition(setting.Value().Ref().Pos()),

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4137,7 +4137,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41374137
} else {
41384138
query = Sprintf(R"(
41394139
--!syntax_v1
4140-
ALTER TABLE `/Root/TestTable` ADD INDEX vector_idx%d
4140+
ALTER TABLE `/Root/TestTable` ADD INDEX vector_idx%d
41414141
GLOBAL USING vector_kmeans_tree
41424142
ON (Embedding)
41434143
WITH (%s);
@@ -4161,18 +4161,18 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41614161
// valid settings:
41624162
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=3, clusters=10", "");
41634163

4164-
// unknown index setting:
4164+
// unknown index setting:
41654165
check("XxX=YyY, similarity=inner_product, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41664166
"Unknown index setting: xxx");
41674167
check("XxX=42, similarity=inner_product, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41684168
"Unknown index setting: xxx");
4169-
4169+
41704170
// distance:
41714171
check("distance=XxX, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41724172
"Invalid distance: xxx");
41734173
check("distance=42, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41744174
"Invalid distance: 42");
4175-
4175+
41764176
// similarity
41774177
check("similarity=XxX, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41784178
"Invalid similarity: xxx");
@@ -4194,7 +4194,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41944194
"Invalid vector_type: 42");
41954195
check("similarity=inner_product, vector_dimension=1024, levels=3, clusters=10",
41964196
"vector_type should be set");
4197-
4197+
41984198
// vector_dimension
41994199
check("similarity=inner_product, vector_type=float, vector_dimension=XxX, levels=3, clusters=10",
42004200
"Invalid vector_dimension: xxx");
@@ -4211,7 +4211,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
42114211
"Invalid vector_dimension: 99999999999999999999");
42124212
check("similarity=inner_product, vector_type=float, levels=3, clusters=10",
42134213
"vector_dimension should be set");
4214-
4214+
42154215
// levels
42164216
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=XxX, clusters=2",
42174217
"Invalid levels: xxx");
@@ -4247,13 +4247,13 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
42474247
"Invalid clusters: 99999999999999999999");
42484248
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=1",
42494249
"clusters should be set");
4250-
4250+
42514251
// clusters^levels
42524252
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=10, clusters=10",
42534253
"Invalid clusters^levels: 10^10 should be less than 1073741824");
42544254
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=16, clusters=1024",
42554255
"Invalid clusters^levels: 1024^16 should be less than 1073741824");
4256-
4256+
42574257
// vector_dimension*clusters
42584258
check("similarity=inner_product, vector_type=float, vector_dimension=2048, levels=1, clusters=2048", "");
42594259
check("similarity=inner_product, vector_type=float, vector_dimension=2049, levels=1, clusters=2048",
@@ -11452,6 +11452,77 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
1145211452
}
1145311453
}
1145411454

11455+
Y_UNIT_TEST_TWIN(CreateAndAlterTopicAvailabilityPeriod, UseQueryService) {
11456+
TKikimrRunner kikimr;
11457+
auto queryClient = kikimr.GetQueryClient();
11458+
auto db = kikimr.GetTableClient();
11459+
auto session = db.CreateSession().GetValueSync().GetSession();
11460+
11461+
auto executeQuery = [&queryClient, &session](const TString& query) {
11462+
return ExecuteGeneric<UseQueryService>(queryClient, session, query);
11463+
};
11464+
11465+
// ok
11466+
{
11467+
const auto query = R"(
11468+
--!syntax_v1
11469+
CREATE TOPIC `/Root/topic` (
11470+
CONSUMER cons1 WITH (availability_period = Interval('PT1H'))
11471+
)
11472+
)";
11473+
const auto result = executeQuery(query);
11474+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11475+
}
11476+
{
11477+
const auto query = R"(
11478+
--!syntax_v1
11479+
ALTER TOPIC `/Root/topic`
11480+
ALTER CONSUMER cons1 SET (availability_period = Interval('PT9H'))
11481+
)";
11482+
const auto result = executeQuery(query);
11483+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11484+
}
11485+
{
11486+
const auto query = R"(
11487+
--!syntax_v1
11488+
ALTER TOPIC `/Root/topic`
11489+
DROP CONSUMER cons1,
11490+
ADD CONSUMER cons2 WITH (availability_period = Interval('PT8H'))
11491+
)";
11492+
const auto result = executeQuery(query);
11493+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11494+
}
11495+
{
11496+
const auto query = R"(
11497+
--!syntax_v1
11498+
ALTER TOPIC `/Root/topic`
11499+
ALTER CONSUMER cons2 RESET (availability_period)
11500+
)";
11501+
const auto result = executeQuery(query);
11502+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11503+
}
11504+
// bad
11505+
{
11506+
const auto query = R"(
11507+
--!syntax_v1
11508+
ALTER TOPIC `/Root/topic`
11509+
ALTER CONSUMER cons2 SET (availability_period = 0)
11510+
)";
11511+
const auto result = executeQuery(query);
11512+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
11513+
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Interval type is expected", result.GetIssues().ToString());
11514+
}
11515+
{
11516+
const auto query = R"(
11517+
--!syntax_v1
11518+
ALTER TOPIC `/Root/topic`
11519+
ADD CONSUMER cons_neg WITH (availability_period = Interval('-PT8H'))
11520+
)";
11521+
const auto result = executeQuery(query);
11522+
UNIT_ASSERT_VALUES_UNEQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11523+
}
11524+
}
11525+
1145511526
Y_UNIT_TEST(DisableResourcePools) {
1145611527
NKikimrConfig::TAppConfig config;
1145711528
config.MutableFeatureFlags()->SetEnableResourcePools(false);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include "consumer_offset_tracker.h"
2+
3+
namespace NKikimr::NPQ {
4+
5+
bool ImportantConsumerNeedToKeepCurrentKey(const TDuration availabilityPeriod, const ui64 offset, const TDataKey& currentKey, const TDataKey& nextKey, const TInstant now) {
6+
const TInstant endOfLife = currentKey.Timestamp + availabilityPeriod; // note: sum with saturation
7+
if (endOfLife < now) {
8+
// The current key is too old. It doesn't matter whether the consumer has read it or not. It can be retired.
9+
return false;
10+
}
11+
if (offset < nextKey.Key.GetOffset()) {
12+
// The first message in the next blob was not read by an important consumer.
13+
// We also save the current blob, since not all messages from it could be read.
14+
return true;
15+
}
16+
if (offset == nextKey.Key.GetOffset() && nextKey.Key.GetPartNo() != 0) {
17+
// We save all the blobs that contain parts of the last message read by an important consumer.
18+
return true;
19+
}
20+
return false;
21+
}
22+
23+
TImportantConsumerOffsetTracker::TImportantConsumerOffsetTracker(std::vector<TImportantConsumerOffsetTracker::TConsumerOffset> consumersToCheck)
24+
: Consumers_(std::move(consumersToCheck))
25+
{
26+
}
27+
28+
bool TImportantConsumerOffsetTracker::ShouldKeepCurrentKey(const TDataKey& currentKey, const TDataKey& nextKey, const TInstant now) const {
29+
for (const auto& consumer : Consumers_) {
30+
if (ImportantConsumerNeedToKeepCurrentKey(consumer.AvailabilityPeriod, consumer.Offset, currentKey, nextKey, now)) {
31+
return true;
32+
}
33+
}
34+
return false;
35+
}
36+
37+
} // namespace NKikimr::NPQ
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include <ydb/core/persqueue/events/internal.h>
4+
#include <util/datetime/base.h>
5+
#include <vector>
6+
7+
namespace NKikimr::NPQ {
8+
9+
class TImportantConsumerOffsetTracker {
10+
public:
11+
struct TConsumerOffset {
12+
TDuration AvailabilityPeriod = TDuration::Max();
13+
ui64 Offset = 0;
14+
};
15+
16+
explicit TImportantConsumerOffsetTracker(std::vector<TConsumerOffset> consumers);
17+
18+
bool ShouldKeepCurrentKey(const TDataKey& currentKey, const TDataKey& nextKey, const TInstant now) const;
19+
20+
private:
21+
std::vector<TConsumerOffset> Consumers_;
22+
};
23+
24+
bool ImportantConsumerNeedToKeepCurrentKey(const TDuration availabilityPeriod, const ui64 offset, const TDataKey& currentKey, const TDataKey& nextKey, const TInstant now);
25+
26+
} // namespace NKikimr::NPQ

0 commit comments

Comments
 (0)