Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,45 +166,58 @@ private[redshift] class RedshiftWriter(
} catch {
case e: SQLException =>
log.error("SQLException thrown while running COPY query; will attempt to retrieve " +
"more information by querying the STL_LOAD_ERRORS table: {}", e.getMessage)
// Try to query Redshift's STL_LOAD_ERRORS table to figure out why the load failed.
// See http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html for details.
"more information by querying Redshift load error tables: {}", e.getMessage)

redshiftWrapper.rollback(conn)
val errorLookupQuery =
"""
| SELECT *
| FROM stl_load_errors
| WHERE query = pg_last_query_id()
""".stripMargin

val detailedException: Option[SQLException] = try {
val results = {
redshiftWrapper.executeQueryInterruptibly(conn, errorLookupQuery)
}
if (results.next()) {
val errCode = results.getInt("err_code")
val errReason = results.getString("err_reason").trim
// Because raw data logging is disabled for security, we strictly use the unified
// sys_load_error_detail table which safely works on both Provisioned and Serverless.
// Using pg_last_copy_id() ensures we get the failed COPY even if the rollback registered as a query.
val errorLookupQuery =
"""
| SELECT
| error_code,
| error_message,
| column_length,
| column_name,
| column_type
| FROM sys_load_error_detail
| WHERE query_id = pg_last_copy_id()
""".stripMargin

val results = redshiftWrapper.executeQueryInterruptibly(conn, errorLookupQuery)

if (results != null && results.next()) {
val errCode = results.getInt("error_code")

// Safely handle potential nulls from JDBC strings before trimming to prevent NPEs
val errReason = Option(results.getString("error_message")).map(_.trim).getOrElse("Unknown")
val colName = Option(results.getString("column_name")).map(_.trim).getOrElse("Unknown")
val colType = Option(results.getString("column_type")).map(_.trim).getOrElse("Unknown")

val columnLength: String =
Option(results.getString("col_length"))
Option(results.getString("column_length"))
.map(_.trim)
.filter(_.nonEmpty)
.map(n => s"($n)")
.getOrElse("")

val exceptionMessage =
s"""
|Error (code $errCode) while loading data into Redshift: "$errReason"
|Table name: ${params.table.get}
|Column name: ${results.getString("colname").trim}
|Column type: ${results.getString("type").trim}$columnLength
|Raw line: ${results.getString("raw_line")}
|Raw field value: ${results.getString("raw_field_value")}
|Column name: $colName
|Column type: $colType$columnLength
|Raw data redacted for security reasons. Query Redshift directly to view payload.
""".stripMargin
Some(new SQLException(exceptionMessage, e))
} else {
None
}
} catch {
case NonFatal(e2) =>
log.error("Error occurred while querying STL_LOAD_ERRORS: {}", e2.getMessage)
log.error("Error occurred while querying sys_load_error_detail: {}", e2.getMessage)
None
}
throw detailedException.getOrElse(e)
Expand Down