Skip to content

Commit 8145849

Browse files
author
xuchao
committed
修改之前合并代码冲突导致的代码丢失
1 parent 3b9667e commit 8145849

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.apache.flink.api.common.serialization.SerializationSchema;
2828
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
30-
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
31-
import org.apache.flink.formats.json.DTJsonRowSerializationSchema;
3229
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3330
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3431
import org.apache.flink.table.runtime.types.CRow;

kafka-base/kafka-base-sink/src/main/java/org/apache/flink/formats/json/DTJsonRowSerializationSchema.java renamed to kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/DTJsonRowSerializationSchema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.formats.json;
18+
package com.dtstack.flink.sql.sink.kafka.serialization;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.common.typeinfo.Types;
2626
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.formats.json.JsonRowSchemaConverter;
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/JsonCRowSerializationSchema.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,34 +159,34 @@ private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
159159
}
160160

161161
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) {
162-
if (info == Types.VOID || object == null) {
162+
if (info.equals(Types.VOID) || object == null) {
163163
return container.nullNode();
164-
} else if (info == Types.BOOLEAN) {
164+
} else if (info.equals(Types.BOOLEAN)) {
165165
return container.booleanNode((Boolean) object);
166-
} else if (info == Types.STRING) {
166+
} else if (info.equals(Types.STRING)) {
167167
return container.textNode((String) object);
168-
} else if (info == Types.BIG_DEC) {
168+
} else if (info.equals(Types.BIG_DEC)) {
169169
// convert decimal if necessary
170170
if (object instanceof BigDecimal) {
171171
return container.numberNode((BigDecimal) object);
172172
}
173173
return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
174-
} else if (info == Types.BIG_INT) {
174+
} else if (info.equals(Types.BIG_INT)) {
175175
// convert integer if necessary
176176
if (object instanceof BigInteger) {
177177
return container.numberNode((BigInteger) object);
178178
}
179179
return container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
180-
} else if (info == Types.SQL_DATE) {
180+
} else if (info.equals(Types.SQL_DATE)) {
181181
return container.textNode(object.toString());
182-
} else if (info == Types.SQL_TIME) {
182+
} else if (info.equals(Types.SQL_TIME)) {
183183
final Time time = (Time) object;
184184
// strip milliseconds if possible
185185
if (time.getTime() % 1000 > 0) {
186186
return container.textNode(timeFormatWithMillis.format(time));
187187
}
188188
return container.textNode(timeFormat.format(time));
189-
} else if (info == Types.SQL_TIMESTAMP) {
189+
} else if (info.equals(Types.SQL_TIMESTAMP)) {
190190
return container.textNode(timestampFormat.format((Timestamp) object));
191191
} else if (info instanceof RowTypeInfo) {
192192
if (reuse != null && reuse instanceof ObjectNode) {

0 commit comments

Comments
 (0)