Skip to content

Commit 4ecfa40

Browse files
committed
[github] update 1.10_release
2 parents f7c78ee + d022e4b commit 4ecfa40

File tree

40 files changed

+1055
-611
lines changed

40 files changed

+1055
-611
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,16 @@
1919

2020
package com.dtstack.flink.sql.side.clickhouse;
2121

22-
import com.dtstack.flink.sql.factory.DTThreadFactory;
22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2726
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
28-
import io.vertx.core.Vertx;
29-
import io.vertx.core.VertxOptions;
3027
import io.vertx.core.json.JsonObject;
31-
import io.vertx.ext.jdbc.JDBCClient;
3228
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3329
import org.apache.flink.configuration.Configuration;
3430

3531
import java.util.List;
36-
import java.util.concurrent.LinkedBlockingQueue;
37-
import java.util.concurrent.ThreadPoolExecutor;
38-
import java.util.concurrent.TimeUnit;
3932

4033

4134
public class ClickhouseAsyncReqRow extends RdbAsyncReqRow {
@@ -48,6 +41,10 @@ public ClickhouseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Fi
4841
@Override
4942
public void open(Configuration parameters) throws Exception {
5043
super.open(parameters);
44+
}
45+
46+
@Override
47+
public JsonObject buildJdbcConfig() {
5148
JsonObject clickhouseClientConfig = new JsonObject();
5249
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5350
clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
@@ -60,13 +57,6 @@ public void open(Configuration parameters) throws Exception {
6057
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6158
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6259

63-
System.setProperty("vertx.disableFileCPResolving", "true");
64-
VertxOptions vo = new VertxOptions();
65-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
66-
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
67-
vo.setFileResolverCachingEnabled(false);
68-
Vertx vertx = Vertx.vertx(vo);
69-
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
60+
return clickhouseClientConfig;
7061
}
71-
7262
}

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030
import java.net.URLClassLoader;
3131

3232
/**
33-
* local模式获取sql任务的执行计划
33+
* local模式获取sql任务的执行计划
3434
* Date: 2020/2/17
3535
* Company: www.dtstack.com
36+
*
3637
* @author maqi
3738
*/
3839
public class GetPlan {
@@ -46,7 +47,9 @@ public static String getExecutionPlan(String[] args) {
4647
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
4748
paramsInfo.setGetPlan(true);
4849
ClassLoader envClassLoader = StreamExecutionEnvironment.class.getClassLoader();
49-
ClassLoader plannerClassLoader = URLClassLoader.newInstance(new URL[0], envClassLoader);
50+
ClassLoader plannerClassLoader = URLClassLoader.newInstance(
51+
paramsInfo.getJarUrlList().toArray(new URL[0]),
52+
envClassLoader);
5053
Thread.currentThread().setContextClassLoader(plannerClassLoader);
5154
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
5255
String executionPlan = env.getExecutionPlan();

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import com.dtstack.flink.sql.util.PluginUtil;
2222
import com.dtstack.flink.sql.util.ReflectionUtils;
23+
import org.apache.commons.codec.digest.DigestUtils;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

28+
import java.io.FileInputStream;
2729
import java.lang.reflect.InvocationTargetException;
2830
import java.lang.reflect.Method;
2931
import java.net.URL;
@@ -71,9 +73,19 @@ private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
7173
});
7274
}
7375

74-
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
76+
public static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
7577
jarUrls.sort(Comparator.comparing(URL::toString));
76-
String jarUrlkey = StringUtils.join(jarUrls, "_");
78+
79+
List<String> jarMd5s = new ArrayList<>(jarUrls.size());
80+
for (URL jarUrl : jarUrls) {
81+
try (FileInputStream inputStream = new FileInputStream(jarUrl.getPath())){
82+
String jarMd5 = DigestUtils.md5Hex(inputStream);
83+
jarMd5s.add(jarMd5);
84+
} catch (Exception e) {
85+
throw new RuntimeException("Exceptions appears when read file:" + e);
86+
}
87+
}
88+
String jarUrlkey = StringUtils.join(jarMd5s, "_");
7789
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
7890
try {
7991
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.dtstack.flink.sql.exception;
2+
3+
import java.util.Objects;
4+
5+
/**
6+
* @author tiezhu
7+
* @date 2021/2/2 星期二
8+
* Company dtstack
9+
*/
10+
public class ExceptionTrace {
11+
// 追溯当前异常的最原始异常信息
12+
public static String traceOriginalCause(Throwable e) {
13+
String errorMsg;
14+
if (Objects.nonNull(e.getCause())) {
15+
errorMsg = traceOriginalCause(e.getCause());
16+
} else {
17+
errorMsg = e.getMessage();
18+
}
19+
return errorMsg;
20+
}
21+
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.calcite.sql.SqlInsert;
5454
import org.apache.calcite.sql.SqlNode;
5555
import org.apache.commons.io.Charsets;
56+
import org.apache.commons.lang3.SerializationUtils;
5657
import org.apache.commons.lang3.StringUtils;
5758
import org.apache.flink.api.common.typeinfo.TypeInformation;
5859
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -75,13 +76,7 @@
7576
import java.net.URLClassLoader;
7677
import java.net.URLDecoder;
7778
import java.time.ZoneId;
78-
import java.util.ArrayList;
79-
import java.util.Arrays;
80-
import java.util.List;
81-
import java.util.Map;
82-
import java.util.Properties;
83-
import java.util.Set;
84-
import java.util.TimeZone;
79+
import java.util.*;
8580
import java.util.stream.Stream;
8681

8782
/**
@@ -215,7 +210,11 @@ private static void sqlTranslation(String localSqlPluginPath,
215210
scope++;
216211
}
217212

213+
final Map<String, AbstractSideTableInfo> tmpTableMap = new HashMap<>();
218214
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
215+
// prevent current sql use last sql's sideTableInfo
216+
sideTableMap.forEach((s, abstractSideTableInfo) -> tmpTableMap.put(s, SerializationUtils.clone(abstractSideTableInfo)));
217+
219218
if (LOG.isInfoEnabled()) {
220219
LOG.info("exe-sql:\n" + result.getExecSql());
221220
}
@@ -228,17 +227,17 @@ private static void sqlTranslation(String localSqlPluginPath,
228227
SqlNode sqlNode = flinkPlanner.getParser().parse(realSql);
229228
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
230229
tmp.setExecSql(tmpSql);
231-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, tmp, scope + "");
230+
sideSqlExec.exec(tmp.getExecSql(), tmpTableMap, tableEnv, registerTableCache, tmp, scope + "");
232231
} else {
233232
for (String sourceTable : result.getSourceTableList()) {
234-
if (sideTableMap.containsKey(sourceTable)) {
233+
if (tmpTableMap.containsKey(sourceTable)) {
235234
isSide = true;
236235
break;
237236
}
238237
}
239238
if (isSide) {
240239
//sql-dimensional table contains the dimension table of execution
241-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null, String.valueOf(scope));
240+
sideSqlExec.exec(result.getExecSql(), tmpTableMap, tableEnv, registerTableCache, null, String.valueOf(scope));
242241
} else {
243242
LOG.info("----------exec sql without dimension join-----------");
244243
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
@@ -251,26 +250,17 @@ private static void sqlTranslation(String localSqlPluginPath,
251250

252251
scope++;
253252
}
253+
tmpTableMap.clear();
254254
}
255255
}
256256

257257
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
258258
throws IllegalAccessException, InvocationTargetException {
259259
// udf和tableEnv须由同一个类加载器加载
260-
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
261260
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
262-
URLClassLoader classLoader = null;
261+
URLClassLoader classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) currentClassLoader);
263262
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
264263
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
265-
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
266-
if (getPlan) {
267-
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) currentClassLoader);
268-
}
269-
270-
//classloader
271-
if (classLoader == null) {
272-
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);
273-
}
274264
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
275265
}
276266
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.util.DtStringUtil;
2222
import com.google.common.base.Preconditions;
2323
import com.google.common.collect.Maps;
24-
import org.apache.commons.lang3.StringUtils;
2524

2625
import java.util.List;
2726
import java.util.Map;

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.api.common.functions.RuntimeContext;
3535
import org.apache.flink.configuration.Configuration;
3636
import org.apache.flink.metrics.Counter;
37+
import org.apache.flink.runtime.execution.SuppressRestartsException;
3738
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3839
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3940
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -275,7 +276,7 @@ protected void dealFillDataError(BaseRow input, ResultFuture<BaseRow> resultFutu
275276
parseErrorRecords.inc();
276277
if (parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)) {
277278
LOG.info("dealFillDataError", e);
278-
resultFuture.completeExceptionally(e);
279+
resultFuture.completeExceptionally(new SuppressRestartsException(e));
279280
} else {
280281
dealMissKey(input, resultFuture);
281282
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void parseSelectFields(JoinInfo joinInfo){
118118
public String getTargetFieldType(String fieldName){
119119
int fieldIndex = sideTableInfo.getFieldList().indexOf(fieldName);
120120
if(fieldIndex == -1){
121-
throw new RuntimeException(sideTableInfo.getName() + "can't find field: " + fieldName);
121+
throw new RuntimeException(sideTableInfo.getName() + " can't find field: " + fieldName);
122122
}
123123

124124
return sideTableInfo.getFieldTypes()[fieldIndex];

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919

2020
package com.dtstack.flink.sql.table;
2121

22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2223
import com.dtstack.flink.sql.util.ClassUtil;
2324
import com.dtstack.flink.sql.util.DtStringUtil;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.Maps;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.java.tuple.Tuple2;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
import java.util.Arrays;
3033
import java.util.List;
@@ -43,6 +46,8 @@
4346

4447
public abstract class AbstractTableParser {
4548

49+
private static final Logger LOG = LoggerFactory.getLogger(AbstractTableParser.class);
50+
4651
private static final String PRIMARY_KEY = "primaryKey";
4752
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4853
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
@@ -105,30 +110,50 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
105110
continue;
106111
}
107112

108-
Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName());
109-
String fieldName = t.f0;
110-
String fieldType = t.f1;
113+
handleKeyNotHaveAlias(fieldRow, tableInfo);
114+
}
111115

112-
Class fieldClass;
113-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
116+
/*
117+
* check whether filed list contains pks and then add pks into field list.
118+
* because some no-sql database is not primary key. eg :redis、hbase etc...
119+
*/
120+
if (tableInfo instanceof AbstractSideTableInfo) {
121+
tableInfo.getPrimaryKeys().stream()
122+
.filter(pk -> (!tableInfo.getFieldList().contains(pk)))
123+
.forEach(pk -> {
124+
try {
125+
handleKeyNotHaveAlias(String.format("%s varchar", pk.trim()), tableInfo);
126+
} catch (Exception e) {
127+
LOG.error(String.format("Add primary key to field list failed. Reason: %s", e.getMessage()));
128+
}
129+
});
130+
}
114131

115-
Matcher matcher = charTypePattern.matcher(fieldType);
116-
if (matcher.find()) {
117-
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
118-
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
119-
fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1)));
120-
} else {
121-
fieldClass = dbTypeConvertToJavaType(fieldType);
122-
}
132+
tableInfo.finish();
133+
}
123134

124-
tableInfo.addPhysicalMappings(fieldName, fieldName);
125-
tableInfo.addField(fieldName);
126-
tableInfo.addFieldClass(fieldClass);
127-
tableInfo.addFieldType(fieldType);
128-
tableInfo.addFieldExtraInfo(fieldExtraInfo);
135+
private void handleKeyNotHaveAlias(String fieldRow, AbstractTableInfo tableInfo) {
136+
Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName());
137+
String fieldName = t.f0;
138+
String fieldType = t.f1;
139+
140+
Class fieldClass;
141+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
142+
143+
Matcher matcher = charTypePattern.matcher(fieldType);
144+
if (matcher.find()) {
145+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
146+
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
147+
fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1)));
148+
} else {
149+
fieldClass = dbTypeConvertToJavaType(fieldType);
129150
}
130151

131-
tableInfo.finish();
152+
tableInfo.addPhysicalMappings(fieldName, fieldName);
153+
tableInfo.addField(fieldName);
154+
tableInfo.addFieldClass(fieldClass);
155+
tableInfo.addFieldType(fieldType);
156+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
132157
}
133158

134159
private Tuple2<String, String> extractType(String fieldRow, String tableName) {

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,25 @@ private static void replaceConditionNode(SqlNode selectNode, String oldTbName, S
539539

540540
String tableName = sqlIdentifier.names.asList().get(0);
541541
String tableField = sqlIdentifier.names.asList().get(1);
542-
String fieldKey = tableName + "_" + tableField;
542+
String fieldKey = tableName + "." + tableField;
543543

544544
if(tableName.equalsIgnoreCase(oldTbName)){
545545

546-
String newFieldName = fieldReplaceRef.get(fieldKey) == null ? tableField : fieldReplaceRef.get(fieldKey);
546+
/*
547+
* ****Before replace:*****
548+
* fieldKey: b.department
549+
* fieldReplaceRef : b.department -> a_b_0.department0
550+
* oldFieldRef: a_b_0.department0
551+
* oldTbName: b
552+
* oldFieldName: department
553+
* ****After replace:*****
554+
* newTbName: a_b_0
555+
* newFieldName: department0
556+
*/
557+
String oldFieldRef = fieldReplaceRef.get(fieldKey);
558+
String newFieldName = (oldFieldRef != null && !StringUtils.substringAfter(oldFieldRef, ".").isEmpty()) ?
559+
StringUtils.substringAfter(oldFieldRef, ".") : tableField;
560+
547561
SqlIdentifier newField = ((SqlIdentifier)selectNode).setName(0, newTbName);
548562
newField = newField.setName(1, newFieldName);
549563
((SqlIdentifier)selectNode).assignNamesFrom(newField);

0 commit comments

Comments
 (0)