Skip to content

Commit b85a86d

Browse files
committed
Fixed errors
1 parent 1a27c30 commit b85a86d

File tree

4 files changed

+39
-43
lines changed

4 files changed

+39
-43
lines changed

src/main/java/tech/ydb/app/Application.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void run(String... args) {
7474
for (CdcReader reader: readers) {
7575
sessionPoolSize += reader.getWriter().getThreadsCount();
7676
}
77-
ydb.updatePoolSize(Math.min(sessionPoolSize, 50));
77+
ydb.updatePoolSize(Math.max(sessionPoolSize, 50));
7878

7979
for (CdcReader reader: readers) {
8080
reader.start();

src/main/java/tech/ydb/app/CdcMsgParser.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ private Result<Supplier<YqlQuery>> findDeleteQuery(TableDescription source) {
158158

159159
@SuppressWarnings("null")
160160
private Result<Supplier<YqlQuery>> validate(TableDescription source, XmlConfig.Query query, boolean keysOnly) {
161-
String queryText = query.getText().trim();
162-
Result<DataQuery> parsed = ydb.parseQuery(queryText);
161+
String text = query.getText().trim();
162+
Result<DataQuery> parsed = ydb.parseQuery(text);
163163
if (!parsed.isSuccess()) {
164164
logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus());
165165
return parsed.map(null);
@@ -222,24 +222,33 @@ private Result<Supplier<YqlQuery>> validate(TableDescription source, XmlConfig.Q
222222
}
223223

224224
List<String> keys = source.getPrimaryKeys();
225-
if (query.getUpsertTo() != null && !query.getUpsertTo().trim().isEmpty()) {
226-
String execute = "UPSERT INTO `" + query.getUpsertTo().trim() + "` ";
227-
return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc));
228-
}
229-
if (query.getDeleteFrom() != null && !query.getDeleteFrom().trim().isEmpty()) {
230-
String execute = "DELETE FROM `" + query.getDeleteFrom().trim() + "` ON ";
231-
return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc));
232-
}
233-
if (query.getUpdateOn() != null && !query.getUpdateOn().trim().isEmpty()) {
234-
String execute = "UPDATE `" + query.getUpdateOn().trim() + "` ON ";
235-
return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc));
236-
}
237-
if (query.getInsertTo() != null && !query.getInsertTo().trim().isEmpty()) {
238-
String execute = "INSERT INTO `" + query.getInsertTo().trim() + "` ";
239-
return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc));
225+
if (query.getActionTable() != null && !query.getActionTable().trim().isEmpty()) {
226+
String actionTable = query.getActionTable().trim();
227+
String action = query.getActionMode();
228+
if ("upsertInto".equalsIgnoreCase(action)) {
229+
String execute = "UPSERT INTO `" + actionTable + "` ";
230+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
231+
}
232+
if ("deleteFrom".equalsIgnoreCase(action)) {
233+
String execute = "DELETE FROM `" + actionTable + "` ON ";
234+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
235+
}
236+
if ("updateOn".equalsIgnoreCase(action)) {
237+
String execute = "UPDATE `" + actionTable + "` ON ";
238+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
239+
}
240+
if ("insertInto".equalsIgnoreCase(action)) {
241+
String execute = "INSERT INTO `" + actionTable + "` ";
242+
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
243+
}
244+
245+
return Result.fail(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, Issue.of(
246+
"Uknown actionName " + action + ", expected upsertInto/deleteFrom/updateOn/insertInto",
247+
Issue.Severity.ERROR
248+
)));
240249
}
241250

242-
return Result.success(YqlQuery.executeYql(queryText, keys, paramName, structType, cdc));
251+
return Result.success(YqlQuery.executeYql(text, keys, paramName, structType, cdc));
243252
}
244253
}
245254

src/main/java/tech/ydb/app/XmlConfig.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,11 @@ public static class Query {
3636
@XmlAttribute(name = "id", required = true)
3737
private String id;
3838

39-
@XmlAttribute(name = "upsertTo")
40-
private String upsertTo;
39+
@XmlAttribute(name = "actionMode")
40+
private String actionMode;
4141

42-
@XmlAttribute(name = "insertTo")
43-
private String insertTo;
44-
45-
@XmlAttribute(name = "updateOn")
46-
private String updateOn;
47-
48-
@XmlAttribute(name = "deleteFrom")
49-
private String deleteFrom;
42+
@XmlAttribute(name = "actionTable")
43+
private String actionTable;
5044

5145
// @XmlAttribute(name = "batchSize")
5246
// private Integer batchSize;
@@ -70,21 +64,14 @@ public String getText() {
7064
return this.text;
7165
}
7266

73-
public String getUpsertTo() {
74-
return this.upsertTo;
67+
public String getActionMode() {
68+
return this.actionMode;
7569
}
7670

77-
public String getInsertTo() {
78-
return this.insertTo;
71+
public String getActionTable() {
72+
return this.actionTable;
7973
}
8074

81-
public String getDeleteFrom() {
82-
return this.deleteFrom;
83-
}
84-
85-
public String getUpdateOn() {
86-
return this.updateOn;
87-
}
8875
// public int getBatchSize() {
8976
// if (batchSize == null) {
9077
// return DEFAULT_BATCH_SIZE;

src/main/java/tech/ydb/app/YqlQuery.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ public Status execute(YdbService ydb) {
216216

217217
StructType type = resultSetToType(rs);
218218
Value<?> values = ListType.of(type).newValue(resultSetToValues(rs, type));
219-
Params executePrms = Params.of("$input", values);
220-
String executeQuery = "DECLARE $input AS " + type + "; " + query + " SELECT FROM AS_TABLE($input);";
219+
Params executePrms = Params.of("$b", values);
220+
String executeQuery = "DECLARE $b AS List<" + type + ">; " + query + " SELECT * FROM AS_TABLE($b);";
221221
return ydb.executeYqlQuery(executeQuery, executePrms, timeout);
222222
}
223223
};
@@ -239,7 +239,7 @@ private static List<Value<?>> resultSetToValues(ResultSetReader rs, StructType t
239239
while (rs.next()) {
240240
Value<?>[] row = new Value[type.getMembersCount()];
241241
for (int idx = 0; idx < type.getMembersCount(); idx += 1) {
242-
row[idx] = rs.getColumn(type.getMemberIndex(type.getMemberName(idx))).getValue();
242+
row[idx] = rs.getColumn(type.getMemberName(idx)).getValue();
243243
}
244244
values.add(type.newValueUnsafe(row));
245245
}

0 commit comments

Comments
 (0)