Skip to content

Commit 88466a1

Browse files
authored
Merge pull request #4 from alex268/master
Added support of action excution in two steps, read and write
2 parents 2e780d4 + b85a86d commit 88466a1

File tree

7 files changed

+177
-31
lines changed

7 files changed

+177
-31
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
<maven.compiler.target>17</maven.compiler.target>
1616

1717
<spring.boot.version>3.2.12</spring.boot.version>
18-
<ydb.sdk.version>2.3.5</ydb.sdk.version>
18+
<ydb.sdk.version>2.3.20</ydb.sdk.version>
1919

2020
<exec.mainClass>tech.ydb.app.Application</exec.mainClass>
2121
</properties>
2222

2323
<dependencies>
2424
<dependency>
2525
<groupId>tech.ydb</groupId>
26-
<artifactId>ydb-sdk-table</artifactId>
26+
<artifactId>ydb-sdk-query</artifactId>
2727
</dependency>
2828
<dependency>
2929
<groupId>tech.ydb</groupId>

src/main/java/tech/ydb/app/Application.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public void run(String... args) {
7070
warnings.add("No reader configs found!!");
7171
}
7272

73+
int sessionPoolSize = 0;
74+
for (CdcReader reader: readers) {
75+
sessionPoolSize += reader.getWriter().getThreadsCount();
76+
}
77+
ydb.updatePoolSize(Math.max(sessionPoolSize, 50));
78+
7379
for (CdcReader reader: readers) {
7480
reader.start();
7581
}

src/main/java/tech/ydb/app/CdcMsgParser.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.util.HashMap;
55
import java.util.HashSet;
6+
import java.util.List;
67
import java.util.Map;
78
import java.util.Set;
89
import java.util.function.Supplier;
@@ -97,6 +98,7 @@ public Parser(YdbService ydb, XmlConfig.Cdc cdc, Map<String, XmlConfig.Query> xm
9798
this.xmlQueries = xmlQueries;
9899
}
99100

101+
@SuppressWarnings("null")
100102
public Result<Supplier<CdcMsgParser>> parse() {
101103
String changefeed = ydb.expandPath(cdc.getChangefeed());
102104

@@ -129,13 +131,13 @@ public Result<Supplier<CdcMsgParser>> parse() {
129131

130132
private Result<Supplier<YqlQuery>> findUpdateQuery(TableDescription source) {
131133
if (cdc.getQuery() != null && !cdc.getQuery().trim().isEmpty()) {
132-
return validate(source, cdc.getQuery().trim(), false);
134+
return validate(source, new XmlConfig.Query(cdc.getQuery().trim()), false);
133135
}
134136
String queryId = cdc.getUpdateQueryId();
135137
if (queryId != null && xmlQueries.containsKey(queryId)) {
136138
XmlConfig.Query query = xmlQueries.get(queryId);
137139
if (query.getText() != null && !query.getText().trim().isEmpty()) {
138-
return validate(source, query.getText().trim(), false);
140+
return validate(source, query, false);
139141
}
140142
}
141143

@@ -147,15 +149,17 @@ private Result<Supplier<YqlQuery>> findDeleteQuery(TableDescription source) {
147149
if (queryId != null && xmlQueries.containsKey(queryId)) {
148150
XmlConfig.Query query = xmlQueries.get(queryId);
149151
if (query.getText() != null && !query.getText().trim().isEmpty()) {
150-
return validate(source, query.getText().trim(), true);
152+
return validate(source, query, true);
151153
}
152154
}
153155

154156
return Result.success(YqlQuery.skipMessages("erase", "deleteQueryId", source.getPrimaryKeys(), cdc));
155157
}
156158

157-
private Result<Supplier<YqlQuery>> validate(TableDescription source, String query, boolean keysOnly) {
158-
Result<DataQuery> parsed = ydb.parseQuery(query);
159+
@SuppressWarnings("null")
160+
private Result<Supplier<YqlQuery>> validate(TableDescription source, XmlConfig.Query query, boolean keysOnly) {
161+
String text = query.getText().trim();
162+
Result<DataQuery> parsed = ydb.parseQuery(text);
159163
if (!parsed.isSuccess()) {
160164
logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus());
161165
return parsed.map(null);
@@ -217,7 +221,34 @@ private Result<Supplier<YqlQuery>> validate(TableDescription source, String quer
217221
}
218222
}
219223

220-
return Result.success(YqlQuery.executeYql(query, source.getPrimaryKeys(), paramName, structType, cdc));
224+
List<String> keys = source.getPrimaryKeys();
225+
if (query.getActionTable() != null && !query.getActionTable().trim().isEmpty()) {
226+
String actionTable = query.getActionTable().trim();
227+
String action = query.getActionMode();
228+
if ("upsertInto".equalsIgnoreCase(action)) {
229+
String execute = "UPSERT INTO `" + actionTable + "` ";
230+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
231+
}
232+
if ("deleteFrom".equalsIgnoreCase(action)) {
233+
String execute = "DELETE FROM `" + actionTable + "` ON ";
234+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
235+
}
236+
if ("updateOn".equalsIgnoreCase(action)) {
237+
String execute = "UPDATE `" + actionTable + "` ON ";
238+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
239+
}
240+
if ("insertInto".equalsIgnoreCase(action)) {
241+
String execute = "INSERT INTO `" + actionTable + "` ";
242+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
243+
}
244+
245+
return Result.fail(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, Issue.of(
246+
"Uknown actionName " + action + ", expected upsertInto/deleteFrom/updateOn/insertInto",
247+
Issue.Severity.ERROR
248+
)));
249+
}
250+
251+
return Result.success(YqlQuery.executeYql(text, keys, paramName, structType, cdc));
221252
}
222253
}
223254

src/main/java/tech/ydb/app/XmlConfig.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,26 @@ public static class Query {
3636
@XmlAttribute(name = "id", required = true)
3737
private String id;
3838

39-
// @XmlAttribute(name = "upsertTo")
40-
// private String upsertTo;
41-
//
42-
// @XmlAttribute(name = "deleteFrom")
43-
// private String deleteFrom;
39+
@XmlAttribute(name = "actionMode")
40+
private String actionMode;
41+
42+
@XmlAttribute(name = "actionTable")
43+
private String actionTable;
44+
45+
// @XmlAttribute(name = "batchSize")
46+
// private Integer batchSize;
4447

4548
@XmlValue
4649
private String text;
4750

51+
public Query() {
52+
}
53+
54+
public Query(String query) {
55+
this.id = "inplacement";
56+
this.text = query;
57+
}
58+
4859
public String getId() {
4960
return this.id;
5061
}
@@ -53,12 +64,19 @@ public String getText() {
5364
return this.text;
5465
}
5566

56-
// public String getUpsertTo() {
57-
// return this.upsertTo;
58-
// }
59-
//
60-
// public String getDeleteFrom() {
61-
// return this.deleteFrom;
67+
public String getActionMode() {
68+
return this.actionMode;
69+
}
70+
71+
public String getActionTable() {
72+
return this.actionTable;
73+
}
74+
75+
// public int getBatchSize() {
76+
// if (batchSize == null) {
77+
// return DEFAULT_BATCH_SIZE;
78+
// }
79+
// return batchSize;
6280
// }
6381
}
6482

@@ -80,6 +98,7 @@ public static class Cdc {
8098
private String updateQueryId;
8199
@XmlAttribute(name = "deleteQueryId")
82100
private String deleteQueryId;
101+
83102
@XmlValue
84103
private String query;
85104

src/main/java/tech/ydb/app/YdbService.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818

1919
import tech.ydb.auth.TokenAuthProvider;
2020
import tech.ydb.auth.iam.CloudAuthHelper;
21+
import tech.ydb.common.transaction.TxMode;
2122
import tech.ydb.core.Result;
2223
import tech.ydb.core.Status;
2324
import tech.ydb.core.auth.StaticCredentials;
2425
import tech.ydb.core.grpc.GrpcTransport;
2526
import tech.ydb.core.grpc.GrpcTransportBuilder;
27+
import tech.ydb.query.QuerySession;
28+
import tech.ydb.query.QueryStream;
29+
import tech.ydb.query.impl.QueryClientImpl;
30+
import tech.ydb.query.settings.ExecuteQuerySettings;
31+
import tech.ydb.query.tools.QueryReader;
2632
import tech.ydb.table.Session;
2733
import tech.ydb.table.TableClient;
2834
import tech.ydb.table.description.TableDescription;
2935
import tech.ydb.table.query.DataQuery;
3036
import tech.ydb.table.query.Params;
31-
import tech.ydb.table.settings.ExecuteDataQuerySettings;
32-
import tech.ydb.table.transaction.TxControl;
3337
import tech.ydb.topic.TopicClient;
3438
import tech.ydb.topic.read.AsyncReader;
3539
import tech.ydb.topic.settings.ReadEventHandlersSettings;
@@ -54,6 +58,7 @@ public class YdbService {
5458
private final GrpcTransport transport;
5559

5660
private final TableClient tableClient;
61+
private final QueryClientImpl queryClient;
5762
private final TopicClient topicClient;
5863

5964
public YdbService(Environment env) {
@@ -93,11 +98,17 @@ public YdbService(Environment env) {
9398

9499
this.transport = builder.build();
95100
this.tableClient = TableClient.newClient(transport).build();
101+
this.queryClient = QueryClientImpl.newClient(transport).build();
96102
this.topicClient = TopicClient.newClient(transport)
97-
.setCompressionExecutor(Runnable::run) // Prevent OOM
103+
.setCompressionExecutor(Runnable::run)
98104
.build();
99105
}
100106

107+
public void updatePoolSize(int maxSize) {
108+
logger.error("set session pool max size {}", maxSize);
109+
queryClient.updatePoolMaxSize(maxSize);
110+
}
111+
101112
@PreDestroy
102113
public void close() {
103114
this.topicClient.close();
@@ -122,6 +133,7 @@ public String expandPath(String name) {
122133
return sb.toString();
123134
}
124135

136+
@SuppressWarnings("null")
125137
public Result<DataQuery> parseQuery(String query) {
126138
Result<Session> session = tableClient.createSession(Duration.ofSeconds(5)).join();
127139
if (!session.isSuccess()) {
@@ -133,6 +145,7 @@ public Result<DataQuery> parseQuery(String query) {
133145
}
134146
}
135147

148+
@SuppressWarnings("null")
136149
public Result<TableDescription> describeTable(String tablePath) {
137150
Result<Session> session = tableClient.createSession(Duration.ofSeconds(5)).join();
138151
if (!session.isSuccess()) {
@@ -144,18 +157,35 @@ public Result<TableDescription> describeTable(String tablePath) {
144157
}
145158
}
146159

147-
public Status executeQuery(String query, Params params, int timeoutSeconds) {
148-
Result<Session> session = tableClient.createSession(Duration.ofSeconds(5)).join();
160+
public Status executeYqlQuery(String query, Params params, int timeoutSeconds) {
161+
Result<QuerySession> session = queryClient.createSession(Duration.ofSeconds(5)).join();
149162
if (!session.isSuccess()) {
150163
return session.getStatus();
151164
}
152165

153-
try (Session s = session.getValue()) {
154-
ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings();
166+
try (QuerySession s = session.getValue()) {
167+
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
168+
if (timeoutSeconds > 0) {
169+
settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds));
170+
}
171+
return s.createQuery(query, TxMode.NONE, params, settings.build()).execute().join().getStatus();
172+
}
173+
}
174+
175+
@SuppressWarnings("null")
176+
public Result<QueryReader> readYqlQuery(String query, Params params, int timeoutSeconds) {
177+
Result<QuerySession> session = queryClient.createSession(Duration.ofSeconds(5)).join();
178+
if (!session.isSuccess()) {
179+
return session.map(null);
180+
}
181+
182+
try (QuerySession s = session.getValue()) {
183+
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
155184
if (timeoutSeconds > 0) {
156-
settings.setTimeout(Duration.ofSeconds(timeoutSeconds));
185+
settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds));
157186
}
158-
return s.executeDataQuery(query, TxControl.serializableRw(), params, settings).join().getStatus();
187+
QueryStream stream = s.createQuery(query, TxMode.SNAPSHOT_RO, params, settings.build());
188+
return QueryReader.readFrom(stream).join();
159189
}
160190
}
161191

src/main/java/tech/ydb/app/YqlQuery.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717

18+
import tech.ydb.core.Result;
1819
import tech.ydb.core.Status;
20+
import tech.ydb.query.tools.QueryReader;
1921
import tech.ydb.table.query.Params;
22+
import tech.ydb.table.result.ResultSetReader;
2023
import tech.ydb.table.values.DecimalType;
2124
import tech.ydb.table.values.ListType;
2225
import tech.ydb.table.values.NullValue;
@@ -143,7 +146,8 @@ private Value<?> readValue(JsonNode node, Type type) throws IOException {
143146
case Date:
144147
return PrimitiveValue.newDate(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDate());
145148
case Datetime:
146-
return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDateTime());
149+
return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC)
150+
.toLocalDateTime());
147151
case Timestamp:
148152
return PrimitiveValue.newTimestamp(Instant.parse(node.asText()));
149153
case Interval:
@@ -178,15 +182,67 @@ public Status execute(YdbService ydb) {
178182
};
179183
}
180184

181-
public static Supplier<YqlQuery> executeYql(String query, List<String> keys, String name, StructType type, XmlConfig.Cdc config) {
185+
public static Supplier<YqlQuery> executeYql(String query, List<String> keys, String name, StructType type,
186+
XmlConfig.Cdc config) {
182187
final int batchSize = config.getBatchSize();
183188
final int timeout = config.getTimeoutSeconds();
184189
return () -> new YqlQuery(type, keys, batchSize) {
185190
@Override
186191
public Status execute(YdbService ydb) {
187192
Params prm = Params.of(name, ListType.of(type).newValue(batch));
188-
return ydb.executeQuery(query, prm, timeout);
193+
return ydb.executeYqlQuery(query, prm, timeout);
189194
}
190195
};
191196
}
197+
198+
public static Supplier<YqlQuery> readAndExecuteYql(String selectQuery, String query, List<String> keys,
199+
String name, StructType type, XmlConfig.Cdc config) {
200+
final int batchSize = config.getBatchSize();
201+
final int timeout = config.getTimeoutSeconds();
202+
return () -> new YqlQuery(type, keys, batchSize) {
203+
@Override
204+
public Status execute(YdbService ydb) {
205+
Params selectPrms = Params.of(name, ListType.of(type).newValue(batch));
206+
Result<QueryReader> res = ydb.readYqlQuery(selectQuery, selectPrms, timeout);
207+
if (!res.isSuccess()) {
208+
return res.getStatus();
209+
}
210+
QueryReader reader = res.getValue();
211+
if (reader.getResultSetCount() < 1) {
212+
return Status.SUCCESS;
213+
}
214+
215+
ResultSetReader rs = reader.getResultSet(0);
216+
217+
StructType type = resultSetToType(rs);
218+
Value<?> values = ListType.of(type).newValue(resultSetToValues(rs, type));
219+
Params executePrms = Params.of("$b", values);
220+
String executeQuery = "DECLARE $b AS List<" + type + ">; " + query + " SELECT * FROM AS_TABLE($b);";
221+
return ydb.executeYqlQuery(executeQuery, executePrms, timeout);
222+
}
223+
};
224+
}
225+
226+
private static StructType resultSetToType(ResultSetReader rs) {
227+
String[] names = new String[rs.getColumnCount()];
228+
Type[] types = new Type[rs.getColumnCount()];
229+
for (int idx = 0; idx < rs.getColumnCount(); idx += 1) {
230+
names[idx] = rs.getColumnName(idx);
231+
types[idx] = rs.getColumnType(idx);
232+
}
233+
234+
return StructType.ofOwn(names, types);
235+
}
236+
237+
private static List<Value<?>> resultSetToValues(ResultSetReader rs, StructType type) {
238+
List<Value<?>> values = new ArrayList<>();
239+
while (rs.next()) {
240+
Value<?>[] row = new Value[type.getMembersCount()];
241+
for (int idx = 0; idx < type.getMembersCount(); idx += 1) {
242+
row[idx] = rs.getColumn(type.getMemberName(idx)).getValue();
243+
}
244+
values.add(type.newValueUnsafe(row));
245+
}
246+
return values;
247+
}
192248
}

src/main/java/tech/ydb/app/YqlWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public YqlWriter(YdbService ydb, Supplier<CdcMsgParser> parser, XmlConfig.Cdc co
5656
}
5757
}
5858

59+
public int getThreadsCount() {
60+
return writers.size();
61+
}
62+
5963
public Status getLastStatus() {
6064
for (int idx = 0; idx < writers.size(); idx++) {
6165
Status last = writers.get(idx).lastStatus;

0 commit comments

Comments
 (0)