Skip to content

Commit 27813dc

Browse files
committed
[hotfix-37535][elasticsearch7]override parseSelectFields in Elasticsearch7AsyncSideInfo
1 parent 73a152a commit 27813dc

File tree

2 files changed

+37
-6
lines changed

2 files changed

+37
-6
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
3535

3636
import java.io.Serializable;
37-
import java.util.Arrays;
3837
import java.util.List;
3938
import java.util.Map;
4039

@@ -93,20 +92,17 @@ public void parseSelectFields(JoinInfo joinInfo){
9392
String sideTableName = joinInfo.getSideTableName();
9493
String nonSideTableName = joinInfo.getNonSideTable();
9594
List<String> fields = Lists.newArrayList();
96-
int sideTableFieldIndex;
95+
int sideTableFieldIndex = 0;
9796

9897
for( int i=0; i<outFieldInfoList.size(); i++){
9998
FieldInfo fieldInfo = outFieldInfoList.get(i);
10099
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
101100
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
102101
fields.add(sideFieldName);
103-
sideTableFieldIndex = Arrays.asList(sideTableInfo.getFields()).indexOf(sideFieldName);
104-
if (sideTableFieldIndex == -1){
105-
throw new RuntimeException(String.format("unknown filed {%s} in sideTable {%s} ", sideFieldName, sideTableName));
106-
}
107102
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
108103
sideFieldIndex.put(i, sideTableFieldIndex);
109104
sideFieldNameIndex.put(i, sideFieldName);
105+
sideTableFieldIndex++;
110106
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
111107
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
112108
inFieldIndex.put(i, nonSideIndex);

elasticsearch7/elasticsearch7-side/elasticsearch7-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AsyncSideInfo.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.calcite.sql.SqlNode;
2929
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3030

31+
import java.util.Arrays;
3132
import java.util.List;
3233

3334
/**
@@ -43,6 +44,40 @@ public Elasticsearch7AsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, L
4344
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4445
}
4546

47+
@Override
48+
public void parseSelectFields(JoinInfo joinInfo){
49+
String sideTableName = joinInfo.getSideTableName();
50+
String nonSideTableName = joinInfo.getNonSideTable();
51+
List<String> fields = Lists.newArrayList();
52+
int sideTableFieldIndex;
53+
54+
for( int i=0; i<outFieldInfoList.size(); i++){
55+
FieldInfo fieldInfo = outFieldInfoList.get(i);
56+
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
57+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
58+
fields.add(sideFieldName);
59+
sideTableFieldIndex = Arrays.asList(sideTableInfo.getFields()).indexOf(sideFieldName);
60+
if (sideTableFieldIndex == -1){
61+
throw new RuntimeException(String.format("unknown filed {%s} in sideTable {%s} ", sideFieldName, sideTableName));
62+
}
63+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
64+
sideFieldIndex.put(i, sideTableFieldIndex);
65+
sideFieldNameIndex.put(i, sideFieldName);
66+
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
67+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
68+
inFieldIndex.put(i, nonSideIndex);
69+
}else{
70+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
71+
}
72+
}
73+
74+
if(fields.size() == 0){
75+
throw new RuntimeException("select non field from table " + sideTableName);
76+
}
77+
78+
sideSelectFields = String.join(",", fields);
79+
}
80+
4681
@Override
4782
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
4883

0 commit comments

Comments
 (0)