diff --git a/src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftWriter.scala b/src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftWriter.scala index 8624b389..4ceed1dd 100644 --- a/src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftWriter.scala @@ -166,37 +166,50 @@ private[redshift] class RedshiftWriter( } catch { case e: SQLException => log.error("SQLException thrown while running COPY query; will attempt to retrieve " + - "more information by querying the STL_LOAD_ERRORS table: {}", e.getMessage) - // Try to query Redshift's STL_LOAD_ERRORS table to figure out why the load failed. - // See http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html for details. + "more information by querying Redshift load error tables: {}", e.getMessage) + redshiftWrapper.rollback(conn) - val errorLookupQuery = - """ - | SELECT * - | FROM stl_load_errors - | WHERE query = pg_last_query_id() - """.stripMargin + val detailedException: Option[SQLException] = try { - val results = { - redshiftWrapper.executeQueryInterruptibly(conn, errorLookupQuery) - } - if (results.next()) { - val errCode = results.getInt("err_code") - val errReason = results.getString("err_reason").trim + // Because raw data logging is disabled for security, we strictly use the unified + // sys_load_error_detail table which safely works on both Provisioned and Serverless. + // Using pg_last_copy_id() ensures we get the failed COPY even if the rollback registered as a query. + val errorLookupQuery = + """ + | SELECT + | error_code, + | error_message, + | column_length, + | column_name, + | column_type + | FROM sys_load_error_detail + | WHERE query_id = pg_last_copy_id() + """.stripMargin + + val results = redshiftWrapper.executeQueryInterruptibly(conn, errorLookupQuery) + + if (results != null && results.next()) { + val errCode = results.getInt("error_code") + + // Safely handle potential nulls from JDBC strings before trimming to prevent NPEs + val errReason = Option(results.getString("error_message")).map(_.trim).getOrElse("Unknown") + val colName = Option(results.getString("column_name")).map(_.trim).getOrElse("Unknown") + val colType = Option(results.getString("column_type")).map(_.trim).getOrElse("Unknown") + val columnLength: String = - Option(results.getString("col_length")) + Option(results.getString("column_length")) .map(_.trim) .filter(_.nonEmpty) .map(n => s"($n)") .getOrElse("") + val exceptionMessage = s""" |Error (code $errCode) while loading data into Redshift: "$errReason" |Table name: ${params.table.get} - |Column name: ${results.getString("colname").trim} - |Column type: ${results.getString("type").trim}$columnLength - |Raw line: ${results.getString("raw_line")} - |Raw field value: ${results.getString("raw_field_value")} + |Column name: $colName + |Column type: $colType$columnLength + |Raw data redacted for security reasons. Query Redshift directly to view payload. """.stripMargin Some(new SQLException(exceptionMessage, e)) } else { @@ -204,7 +217,7 @@ private[redshift] class RedshiftWriter( } } catch { case NonFatal(e2) => - log.error("Error occurred while querying STL_LOAD_ERRORS: {}", e2.getMessage) + log.error("Error occurred while querying sys_load_error_detail: {}", e2.getMessage) None } throw detailedException.getOrElse(e)