Skip to content

Commit fe7880a

Browse files
committed
[merge]resolve conflits from merge
2 parents 333506a + 2aa79ab commit fe7880a

File tree

4 files changed

+24
-0
lines changed

4 files changed

+24
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/enums/ColumnType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public enum ColumnType {
102102
* timestamp
103103
*/
104104
TIMESTAMP,
105+
/**
106+
* time eg: 23:59:59
107+
*/
108+
TIME,
105109
/**
106110
* decimal
107111
*/

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ public static String col2string(Object column, String type) {
243243
case DATE:
244244
result = DateUtil.dateToString((java.util.Date)column);
245245
break;
246+
case TIME:
247+
result = DateUtil.getTimeFromStr(String.valueOf(column));
248+
break;
246249
case TIMESTAMP:
247250
result = DateUtil.timestampToString((java.util.Date)column);
248251
break;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

23+
import com.dtstack.flink.sql.util.DateUtil;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.api.common.functions.RuntimeContext;
2526
import org.apache.flink.api.java.tuple.Tuple2;
@@ -32,6 +33,8 @@
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

36+
import java.sql.Date;
37+
import java.sql.Timestamp;
3538
import java.util.List;
3639
import java.util.Map;
3740
import java.util.stream.Collectors;
@@ -100,6 +103,14 @@ private IndexRequest createIndexRequest(Row element) {
100103
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
101104
int length = Math.min(element.getArity(), fieldNames.size());
102105
for(int i=0; i<length; i++){
106+
if (element.getField(i) instanceof Date) {
107+
dataMap.put(fieldNames.get(i), DateUtil.transformSqlDateToUtilDate((Date) element.getField(i)));
108+
continue;
109+
}
110+
if (element.getField(i) instanceof Timestamp) {
111+
dataMap.put(fieldNames.get(i), ((Timestamp) element.getField(i)).getTime());
112+
continue;
113+
}
103114
dataMap.put(fieldNames.get(i), element.getField(i));
104115
}
105116

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.sql.Date;
36+
import java.sql.Time;
37+
import java.sql.Timestamp;
3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.stream.Collectors;
@@ -111,6 +113,10 @@ private IndexRequest createIndexRequest(Row element) {
111113
dataMap.put(fieldNames.get(i), DateUtil.transformSqlDateToUtilDate((Date) element.getField(i)));
112114
continue;
113115
}
116+
if (element.getField(i) instanceof Timestamp) {
117+
dataMap.put(fieldNames.get(i), ((Timestamp) element.getField(i)).getTime());
118+
continue;
119+
}
114120
dataMap.put(fieldNames.get(i), element.getField(i));
115121
}
116122

0 commit comments

Comments
 (0)