Skip to content

Commit 8bdbebc

Browse files
committed
Parallel calls
1 parent a920885 commit 8bdbebc

File tree

3 files changed

+6
-8
lines changed

3 files changed

+6
-8
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,6 @@ trait RecordChecksum extends DeltaLogging {
139139
* @param deltaLog The DeltaLog
140140
* @param versionToCompute The version for which we want to compute the checksum
141141
* @param actions The actions corresponding to the version `versionToCompute`
142-
* @param metadata The metadata corresponding to the version `versionToCompute`
143-
* @param protocol The protocol corresponding to the version `versionToCompute`
144142
* @param operationName The operation name corresponding to the version `versionToCompute`
145143
* @param txnIdOpt The transaction identifier for the version `versionToCompute`
146144
* @param previousVersionState Contains either the versionChecksum corresponding to
@@ -156,8 +154,6 @@ trait RecordChecksum extends DeltaLogging {
156154
deltaLog: DeltaLog,
157155
versionToCompute: Long,
158156
actions: Seq[Action],
159-
metadata: Metadata,
160-
protocol: Protocol,
161157
operationName: String,
162158
txnIdOpt: Option[String],
163159
previousVersionState: Either[Snapshot, VersionChecksum],
@@ -213,6 +209,10 @@ trait RecordChecksum extends DeltaLogging {
213209
// Incrementally compute the new version checksum, if the old one is available.
214210
val ignoreAddFilesInOperation =
215211
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
212+
val protocol =
213+
actions.collectFirst { case p: Protocol => p }.getOrElse(oldVersionChecksum.protocol)
214+
val metadata =
215+
actions.collectFirst { case m: Metadata => m }.getOrElse(oldVersionChecksum.metadata)
216216
val persistentDVsOnTableReadable =
217217
DeletionVectorUtils.deletionVectorsReadable(protocol, metadata)
218218
val persistentDVsOnTableWritable =

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

-2
Original file line numberDiff line numberDiff line change
@@ -2553,8 +2553,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite
25532553
deltaLog,
25542554
attemptVersion,
25552555
actions = currentTransactionInfo.finalActionsToCommit,
2556-
metadata = currentTransactionInfo.metadata,
2557-
protocol = currentTransactionInfo.protocol,
25582556
operationName = currentTransactionInfo.op.name,
25592557
txnIdOpt = Some(currentTransactionInfo.txnId),
25602558
previousVersionState = scala.Left(snapshot),

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ class Snapshot(
384384
* checksum file. If the checksum file is not present or if the protocol or metadata is missing
385385
* this will return None.
386386
*/
387-
protected def getProtocolMetadataAndIctFromCrc():
387+
protected def getProtocolMetadataAndIctFromCrc(checksumOpt: Option[VersionChecksum]):
388388
Option[Array[ReconstructedProtocolMetadataAndICT]] = {
389389
if (!spark.sessionState.conf.getConf(
390390
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
@@ -431,7 +431,7 @@ class Snapshot(
431431
Array[ReconstructedProtocolMetadataAndICT] = {
432432
import implicits._
433433

434-
getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc =>
434+
getProtocolMetadataAndIctFromCrc(checksumOpt).foreach { protocolMetadataAndIctFromCrc =>
435435
return protocolMetadataAndIctFromCrc
436436
}
437437

0 commit comments

Comments
 (0)