Skip to content

Commit

Permalink
Fix Utils.isTesting to use Delta implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Felipe Pessoto <[email protected]>
  • Loading branch information
felipepessoto committed Jan 18, 2025
1 parent 221d95c commit a649ef0
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{DeltaFileOperations, DeltaLogGroupingIterator, FileNames}
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
Expand Down Expand Up @@ -280,7 +281,7 @@ trait Checkpoints extends DeltaLogging {
data = Map("exception" -> e.getMessage(), "stackTrace" -> e.getStackTrace())
)
logWarning(log"Error when writing checkpoint-related files", e)
val throwError = Utils.isTesting ||
val throwError = DeltaUtils.isTesting ||
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED)
if (throwError) throw e
}
Expand Down Expand Up @@ -1081,7 +1082,7 @@ object Checkpoints
// overrides the final path even if it already exists. So we use exists here to handle that
// case.
// TODO: Remove isTesting and fs.exists check after fixing LocalFS
if (Utils.isTesting && fs.exists(finalPath)) {
if (DeltaUtils.isTesting && fs.exists(finalPath)) {
false
} else {
fs.rename(tempPath, finalPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -423,7 +424,7 @@ trait RecordChecksum extends DeltaLogging {
deltaLog,
opType = "delta.allFilesInCrc.checksumMismatch.aggregated",
data = eventData)
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
throw new IllegalStateException("Incrementally Computed State failed checksum check" +
s" for commit $attemptVersion [$eventData]")
}
Expand Down Expand Up @@ -808,7 +809,7 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
this.deltaLog,
opType = "delta.allFilesInCrc.checksumMismatch.differentAllFiles",
data = eventData)
if (Utils.isTesting) throw new IllegalStateException(message)
if (DeltaUtils.isTesting) throw new IllegalStateException(message)
false
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources._
import org.apache.spark.sql.delta.storage.LogStoreProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
Expand Down Expand Up @@ -152,7 +153,8 @@ class DeltaLog private(

private[delta] def shouldVerifyIncrementalCommit: Boolean = {
spark.conf.get(DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY) ||
(Utils.isTesting && spark.conf.get(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS))
(DeltaUtils.isTesting
&& spark.conf.get(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS))
}

/** The unique identifier for this table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.delta.util.threads.DeltaThreadPool
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import com.fasterxml.jackson.annotation.JsonIgnore
import io.delta.storage.commit.{Commit, GetCommitsResponse}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ThreadUtils

/**
* Wraps the most recently updated snapshot along with the timestamp the update was started.
Expand Down Expand Up @@ -270,7 +271,7 @@ trait SnapshotManagement { self: DeltaLog =>
deltaLog = this,
opType = CoordinatedCommitsUsageLogs.FS_COMMIT_COORDINATOR_LISTING_UNEXPECTED_GAPS,
data = eventData)
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
throw new IllegalStateException(
s"Delta table at $dataPath unexpectedly still requires additional file-system listing " +
s"after an additional file-system listing was already performed to reconcile the gap " +
Expand Down Expand Up @@ -646,7 +647,7 @@ trait SnapshotManagement { self: DeltaLog =>
deltaLog = this,
opType = "delta.getLogSegmentForVersion.compactedDeltaValidationFailed",
data = eventData)
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
assert(false, s"Validation around Compacted deltas failed while creating Snapshot. " +
s"[${JsonUtils.toJson(eventData)}]")
}
Expand Down Expand Up @@ -1071,7 +1072,7 @@ trait SnapshotManagement { self: DeltaLog =>
catalogTableOpt)
}
} catch {
case NonFatal(e) if !Utils.isTesting =>
case NonFatal(e) if !DeltaUtils.isTesting =>
// Failed to schedule the future -- fail in testing, but just log it in prod.
recordDeltaEvent(this, "delta.snapshot.asyncUpdateFailed", data = Map("exception" -> e))
}
Expand Down Expand Up @@ -1200,7 +1201,7 @@ trait SnapshotManagement { self: DeltaLog =>
/** Installs the given `newSnapshot` as the `currentSnapshot` */
protected def installSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Snapshot = {
if (!snapshotLock.isHeldByCurrentThread) {
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
throw new RuntimeException("DeltaLog snapshot replaced without taking lock")
}
recordDeltaEvent(this, "delta.update.unsafeReplace")
Expand Down Expand Up @@ -1292,7 +1293,7 @@ trait SnapshotManagement { self: DeltaLog =>
// NOTE: Validation is a no-op with incremental commit disabled.
newSnapshot.validateChecksum(Map("context" -> checksumContext))
} catch {
case _: IllegalStateException if !Utils.isTesting => false
case _: IllegalStateException if !DeltaUtils.isTesting => false
}

if (!crcIsValid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, De
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.tablefeatures.DropFeature
import org.apache.spark.sql.delta.util.PartitionUtils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -155,7 +156,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
// Note: Spark generates the table location for managed tables in
// `DeltaCatalog#delegate#createTable`, so `isManagedLocation` should never be true if
// Unity Catalog is not involved. For safety we also check `isUnityCatalog` here.
val respectManagedLoc = isUnityCatalog || org.apache.spark.util.Utils.isTesting
val respectManagedLoc = isUnityCatalog || DeltaUtils.isTesting
val tableType = if (location.isEmpty || (isManagedLocation && respectManagedLoc)) {
CatalogTableType.MANAGED
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ trait VacuumCommandImpl extends DeltaCommand {
protected def setCommitClock(deltaLog: DeltaLog, version: Long) = {
// This is done to make sure that the commit timestamp reflects the one provided by the clock
// object.
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf())
val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version)
if (fs.exists(filePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.{Date, UUID}

import org.apache.spark.sql.delta.DeltaOptions
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce._
Expand Down Expand Up @@ -188,7 +189,7 @@ object DeltaFileFormatWriter extends LoggingShims {
// 1) When the planned write config is disabled.
// 2) When the concurrent writers are enabled (in this case the required ordering of a
// V1 write command will be empty).
if (Utils.isTesting) outputOrderingMatched = orderingMatched
if (DeltaUtils.isTesting) outputOrderingMatched = orderingMatched

if (writeFilesOpt.isDefined) {
// build `WriteFilesSpec` for `WriteFiles`
Expand Down Expand Up @@ -248,7 +249,7 @@ object DeltaFileFormatWriter extends LoggingShims {
}

// In testing, this is the only way to get hold of the actually executed plan written to file
if (Utils.isTesting) executedPlan = Some(planToExecute)
if (DeltaUtils.isTesting) executedPlan = Some(planToExecute)

val rdd = planToExecute.execute()

Expand Down Expand Up @@ -331,7 +332,7 @@ object DeltaFileFormatWriter extends LoggingShims {
val description = writeFilesSpec.description

// In testing, this is the only way to get hold of the actually executed plan written to file
if (Utils.isTesting) executedPlan = Some(planForWrites)
if (DeltaUtils.isTesting) executedPlan = Some(planForWrites)

writeAndCommit(job, description, committer) {
val rdd = planForWrites.executeWrite(writeFilesSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.util.DeltaProgressReporter
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkThrowable
import org.apache.spark.internal.{LoggingShims, MDC, MessageWithContext}
import org.apache.spark.util.Utils

/**
* Convenience wrappers for logging that include delta specific options and
Expand Down Expand Up @@ -153,7 +153,7 @@ trait DeltaLogging
data: AnyRef = null,
path: Option[Path] = None)
: Unit = {
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
assert(check, msg)
} else if (!check) {
recordDeltaEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening, TypeWideningMode}
import org.apache.spark.sql.delta.{RowCommitVersion, RowId}
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.logging.DeltaLogKeys
Expand All @@ -42,7 +43,6 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

object SchemaUtils extends DeltaLogging {
// We use case insensitive resolution while writing into Delta
Expand Down Expand Up @@ -289,7 +289,7 @@ def normalizeColumnNamesInDataType(
// The integral types can be cast to each other later on.
sourceDataType
case _ =>
if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
assert(sourceDataType == tableDataType,
s"Types without nesting should match but $sourceDataType != $tableDataType")
} else if (sourceDataType != tableDataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.commands.optimize.AddFileWithNumRecords
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.delta.zorder.ZCubeInfo
import org.apache.spark.sql.delta.zorder.ZCubeInfo.{getForFile => getZCubeInfo}

import org.apache.spark.util.Utils

/**
* Collection of files that were produced by the same job in a run of the clustering command.
*/
case class ZCube(files: Seq[AddFile]) {
require(files.nonEmpty)

if (Utils.isTesting) {
if (DeltaUtils.isTesting) {
assert(files.forall(getZCubeInfo(_) == Some(zCubeInfo)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}

/**
* [[SQLConf]] entries for Delta features.
Expand Down Expand Up @@ -2344,21 +2344,21 @@ trait DeltaSQLConfBase {
.internal()
.doc("If true, post-commit hooks will by default throw an exception when they fail.")
.booleanConf
.createWithDefault(Utils.isTesting)
.createWithDefault(DeltaUtils.isTesting)

val TEST_FILE_NAME_PREFIX =
buildStaticConf("testOnly.dataFileNamePrefix")
.internal()
.doc("[TEST_ONLY]: The prefix to use for the names of all Parquet data files.")
.stringConf
.createWithDefault(if (Utils.isTesting) "test%file%prefix-" else "")
.createWithDefault(if (DeltaUtils.isTesting) "test%file%prefix-" else "")

val TEST_DV_NAME_PREFIX =
buildStaticConf("testOnly.dvFileNamePrefix")
.internal()
.doc("[TEST_ONLY]: The prefix to use for the names of all Deletion Vector files.")
.stringConf
.createWithDefault(if (Utils.isTesting) "test%dv%prefix-" else "")
.createWithDefault(if (DeltaUtils.isTesting) "test%dv%prefix-" else "")

///////////
// UTC TIMESTAMP PARTITION VALUES
Expand Down

0 comments on commit a649ef0

Please sign in to comment.