Skip to content

Commit 1dbb725

Browse files
lw-linsrowen
authored andcommitted
[SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV cast null values properly
## Problem CSV in Spark 2.0.0: - does not read null values back correctly for certain data types such as `Boolean`, `TimestampType`, `DateType` -- this is a regression comparing to 1.6; - does not read empty values (specified by `options.nullValue`) as `null`s for `StringType` -- this is compatible with 1.6 but leads to problems like SPARK-16903. ## What changes were proposed in this pull request? This patch makes changes to read all empty values back as `null`s. ## How was this patch tested? New test cases. Author: Liwei Lin <[email protected]> Closes #14118 from lw-lin/csv-cast-null.
1 parent 7151011 commit 1dbb725

File tree

7 files changed

+93
-83
lines changed

7 files changed

+93
-83
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
329329
being read should be skipped. If None is set, it uses
330330
the default value, ``false``.
331331
:param nullValue: sets the string representation of a null value. If None is set, it uses
332-
the default value, empty string.
332+
the default value, empty string. Since 2.0.1, this ``nullValue`` param
333+
applies to all supported types including the string type.
333334
:param nanValue: sets the string representation of a non-number value. If None is set, it
334335
uses the default value, ``NaN``.
335336
:param positiveInf: sets the string representation of a positive infinity value. If None

python/pyspark/sql/streaming.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
497497
being read should be skipped. If None is set, it uses
498498
the default value, ``false``.
499499
:param nullValue: sets the string representation of a null value. If None is set, it uses
500-
the default value, empty string.
500+
the default value, empty string. Since 2.0.1, this ``nullValue`` param
501+
applies to all supported types including the string type.
501502
:param nanValue: sets the string representation of a non-number value. If None is set, it
502503
uses the default value, ``NaN``.
503504
:param positiveInf: sets the string representation of a positive infinity value. If None

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
376376
* from values being read should be skipped.</li>
377377
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
378378
* whitespaces from values being read should be skipped.</li>
379-
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
379+
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
380+
* 2.0.1, this applies to all supported types including the string type.</li>
380381
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
381382
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
382383
* value.</li>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala

Lines changed: 50 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -232,66 +232,58 @@ private[csv] object CSVTypeCast {
232232
nullable: Boolean = true,
233233
options: CSVOptions = CSVOptions()): Any = {
234234

235-
castType match {
236-
case _: ByteType => if (datum == options.nullValue && nullable) null else datum.toByte
237-
case _: ShortType => if (datum == options.nullValue && nullable) null else datum.toShort
238-
case _: IntegerType => if (datum == options.nullValue && nullable) null else datum.toInt
239-
case _: LongType => if (datum == options.nullValue && nullable) null else datum.toLong
240-
case _: FloatType =>
241-
if (datum == options.nullValue && nullable) {
242-
null
243-
} else if (datum == options.nanValue) {
244-
Float.NaN
245-
} else if (datum == options.negativeInf) {
246-
Float.NegativeInfinity
247-
} else if (datum == options.positiveInf) {
248-
Float.PositiveInfinity
249-
} else {
250-
Try(datum.toFloat)
251-
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
252-
}
253-
case _: DoubleType =>
254-
if (datum == options.nullValue && nullable) {
255-
null
256-
} else if (datum == options.nanValue) {
257-
Double.NaN
258-
} else if (datum == options.negativeInf) {
259-
Double.NegativeInfinity
260-
} else if (datum == options.positiveInf) {
261-
Double.PositiveInfinity
262-
} else {
263-
Try(datum.toDouble)
264-
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
265-
}
266-
case _: BooleanType => datum.toBoolean
267-
case dt: DecimalType =>
268-
if (datum == options.nullValue && nullable) {
269-
null
270-
} else {
271-
val value = new BigDecimal(datum.replaceAll(",", ""))
272-
Decimal(value, dt.precision, dt.scale)
273-
}
274-
case _: TimestampType =>
275-
// This one will lose microseconds parts.
276-
// See https://issues.apache.org/jira/browse/SPARK-10681.
277-
Try(options.timestampFormat.parse(datum).getTime * 1000L)
278-
.getOrElse {
279-
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
280-
// compatibility.
281-
DateTimeUtils.stringToTime(datum).getTime * 1000L
235+
if (nullable && datum == options.nullValue) {
236+
null
237+
} else {
238+
castType match {
239+
case _: ByteType => datum.toByte
240+
case _: ShortType => datum.toShort
241+
case _: IntegerType => datum.toInt
242+
case _: LongType => datum.toLong
243+
case _: FloatType =>
244+
datum match {
245+
case options.nanValue => Float.NaN
246+
case options.negativeInf => Float.NegativeInfinity
247+
case options.positiveInf => Float.PositiveInfinity
248+
case _ =>
249+
Try(datum.toFloat)
250+
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
282251
}
283-
case _: DateType =>
284-
// This one will lose microseconds parts.
285-
// See https://issues.apache.org/jira/browse/SPARK-10681.x
286-
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
287-
.getOrElse {
288-
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
289-
// compatibility.
290-
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
252+
case _: DoubleType =>
253+
datum match {
254+
case options.nanValue => Double.NaN
255+
case options.negativeInf => Double.NegativeInfinity
256+
case options.positiveInf => Double.PositiveInfinity
257+
case _ =>
258+
Try(datum.toDouble)
259+
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
291260
}
292-
case _: StringType => UTF8String.fromString(datum)
293-
case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
294-
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
261+
case _: BooleanType => datum.toBoolean
262+
case dt: DecimalType =>
263+
val value = new BigDecimal(datum.replaceAll(",", ""))
264+
Decimal(value, dt.precision, dt.scale)
265+
case _: TimestampType =>
266+
// This one will lose microseconds parts.
267+
// See https://issues.apache.org/jira/browse/SPARK-10681.
268+
Try(options.timestampFormat.parse(datum).getTime * 1000L)
269+
.getOrElse {
270+
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
271+
// compatibility.
272+
DateTimeUtils.stringToTime(datum).getTime * 1000L
273+
}
274+
case _: DateType =>
275+
// This one will lose microseconds parts.
276+
// See https://issues.apache.org/jira/browse/SPARK-10681.x
277+
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
278+
.getOrElse {
279+
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
280+
// compatibility.
281+
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
282+
}
283+
case _: StringType => UTF8String.fromString(datum)
284+
case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
285+
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
286+
}
295287
}
296288
}
297289

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
232232
* from values being read should be skipped.</li>
233233
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
234234
* whitespaces from values being read should be skipped.</li>
235-
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
235+
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
236+
* 2.0.1, this applies to all supported types including the string type.</li>
236237
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
237238
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
238239
* value.</li>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
554554

555555
verifyCars(cars, withHeader = true, checkValues = false)
556556
val results = cars.collect()
557-
assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null"))
557+
assert(results(0).toSeq === Array(2012, "Tesla", "S", null, null))
558558
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
559559
}
560560

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,46 @@ class CSVTypeCastSuite extends SparkFunSuite {
6868
}
6969

7070
test("Nullable types are handled") {
71-
assert(CSVTypeCast.castTo("", IntegerType, nullable = true, CSVOptions()) == null)
71+
assertNull(
72+
CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-")))
73+
assertNull(
74+
CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-")))
75+
assertNull(
76+
CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-")))
77+
assertNull(
78+
CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-")))
79+
assertNull(
80+
CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-")))
81+
assertNull(
82+
CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-")))
83+
assertNull(
84+
CSVTypeCast.castTo("-", BooleanType, nullable = true, CSVOptions("nullValue", "-")))
85+
assertNull(
86+
CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-")))
87+
assertNull(
88+
CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-")))
89+
assertNull(
90+
CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-")))
91+
assertNull(
92+
CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-")))
7293
}
7394

74-
test("String type should always return the same as the input") {
95+
test("String type should also respect `nullValue`") {
96+
assertNull(
97+
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()))
7598
assert(
76-
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) ==
99+
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) ==
77100
UTF8String.fromString(""))
101+
78102
assert(
79-
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) ==
103+
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions("nullValue", "null")) ==
104+
UTF8String.fromString(""))
105+
assert(
106+
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", "null")) ==
80107
UTF8String.fromString(""))
108+
109+
assertNull(
110+
CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null")))
81111
}
82112

83113
test("Throws exception for empty string with non null type") {
@@ -170,20 +200,4 @@ class CSVTypeCastSuite extends SparkFunSuite {
170200
assert(doubleVal2 == Double.PositiveInfinity)
171201
}
172202

173-
test("Type-specific null values are used for casting") {
174-
assertNull(
175-
CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-")))
176-
assertNull(
177-
CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-")))
178-
assertNull(
179-
CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-")))
180-
assertNull(
181-
CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-")))
182-
assertNull(
183-
CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-")))
184-
assertNull(
185-
CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-")))
186-
assertNull(
187-
CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-")))
188-
}
189203
}

0 commit comments

Comments
 (0)