Skip to content

Commit

Permalink
AN-380 Make call cache hashing strategy configurable per filesystem a…
Browse files Browse the repository at this point in the history
…nd backend (#7683)

Co-authored-by: Adam Nichols <[email protected]>
  • Loading branch information
jgainerdewar and aednichols authored Feb 18, 2025
1 parent 71f5760 commit b652a9e
Show file tree
Hide file tree
Showing 47 changed files with 792 additions and 454 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ Cloud Life Sciences (aka `v2Beta`, deprecated) and Google Batch (aka `batch`, re
* Default GPU on GCP Batch is now Nvidia T4
* Updated runtime attributes documentation to clarify that the `nvidiaDriverVersion` key is ignored on GCP Batch.

#### Call Caching Hash Strategy
Users can now configure which algorithm is used to hash files for call caching purposes. See Configuring page in
ReadTheDocs for details. Default behavior is unchanged.

## 87 Release Notes

### GCP Batch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package cromwell.backend.standard.callcaching

import java.util.concurrent.TimeoutException

import akka.actor.{Actor, ActorLogging, ActorRef, Timers}
import akka.event.LoggingAdapter
import cats.data.NonEmptyList
import com.typesafe.config.Config
import cromwell.backend.standard.StandardCachingActorHelper
import cromwell.backend.standard.callcaching.RootWorkflowFileHashCacheActor.IoHashCommandWithContext
import cromwell.backend.standard.callcaching.StandardFileHashingActor._
Expand All @@ -12,8 +12,13 @@ import cromwell.core.JobKey
import cromwell.core.callcaching._
import cromwell.core.io._
import cromwell.core.logging.JobLogging
import cromwell.core.path.Path
import cromwell.services.instrumentation.CromwellInstrumentation
import net.ceedubs.ficus.Ficus._
import wom.values.WomFile

import java.util.concurrent.TimeoutException
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -48,6 +53,10 @@ case class FileHashContext(hashKey: HashKey, file: String)
class DefaultStandardFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("drs", FileHashStrategy.Drs)
)
}

object StandardFileHashingActor {
Expand All @@ -72,18 +81,50 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
with JobLogging
with IoClientHelper
with StandardCachingActorHelper
with Timers {
with Timers
with CromwellInstrumentation {
override lazy val ioActor: ActorRef = standardParams.ioActor
override lazy val jobDescriptor: BackendJobDescriptor = standardParams.jobDescriptor
override lazy val backendInitializationDataOption: Option[BackendInitializationData] =
standardParams.backendInitializationDataOption
override lazy val serviceRegistryActor: ActorRef = standardParams.serviceRegistryActor
override lazy val configurationDescriptor: BackendConfigurationDescriptor = standardParams.configurationDescriptor

protected def ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder
// Child classes can override to set per-filesystem defaults
val defaultHashingStrategies: Map[String, FileHashStrategy] = Map.empty

// Hashing strategy to use if none is specified.
val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy.Md5

// Combines defaultHashingStrategies with user-provided configuration
lazy val hashingStrategies: Map[String, FileHashStrategy] = {
val configuredHashingStrategies = for {
fsConfigs <- configurationDescriptor.backendConfig.as[Option[Config]]("filesystems").toList
fsKey <- fsConfigs.root.keySet().asScala
configKey = s"${fsKey}.caching.hashing-strategy"
fileHashStrategyConfigFromList = Try(fsConfigs.as[List[String]](configKey)).toOption
fileHashStrategyConfigFromString = Try(fsConfigs.as[String](configKey)).toOption.map(List(_))
fileHashStrategyConfig <- fileHashStrategyConfigFromList.orElse(fileHashStrategyConfigFromString)
fileHashStrategy = FileHashStrategy.of(fileHashStrategyConfig)
nonEmptyFileHashStrategy <- if (fileHashStrategy.isEmpty) None else Option(fileHashStrategy)
} yield (fsKey, nonEmptyFileHashStrategy)

defaultHashingStrategies ++ configuredHashingStrategies

}

protected def metricsCallback: Set[NonEmptyList[String]] => Unit = { pathsToIncrement =>
pathsToIncrement.foreach(increment(_))
}

protected def ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder(metricsCallback)

// Used by ConfigBackend for synchronous hashing of local files
def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = None

def hashStrategyForPath(p: Path): FileHashStrategy =
hashingStrategies.getOrElse(p.filesystemTypeKey, fallbackHashingStrategy)

def fileHashingReceive: Receive = {
// Hash Request
case fileRequest: SingleFileHashRequest =>
Expand Down Expand Up @@ -115,8 +156,9 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
def asyncHashing(fileRequest: SingleFileHashRequest, replyTo: ActorRef): Unit = {
val fileAsString = fileRequest.file.value
val ioHashCommandTry = for {
gcsPath <- getPath(fileAsString)
command <- ioCommandBuilder.hashCommand(gcsPath)
path <- getPath(fileAsString)
hashStrategy = hashStrategyForPath(path)
command <- ioCommandBuilder.hashCommand(path, hashStrategy)
} yield command
lazy val fileHashContext = FileHashContext(fileRequest.hashKey, fileRequest.file.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.Props
import akka.testkit._
import cromwell.backend.standard.callcaching.RootWorkflowFileHashCacheActor.{IoHashCommandWithContext, _}
import cromwell.core.actor.RobustClientHelper.RequestTimeout
import cromwell.core.callcaching.HashKey
import cromwell.core.callcaching.{FileHashStrategy, HashKey}
import cromwell.core.io.DefaultIoCommand.DefaultIoHashCommand
import cromwell.core.io.IoSuccess
import cromwell.core.path.DefaultPathBuilder
Expand All @@ -28,7 +28,7 @@ class RootWorkflowHashCacheActorSpec extends TestKitSuite with ImplicitSender wi
)

val ioHashCommandWithContext =
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get),
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get, FileHashStrategy.Md5),
FileHashContext(HashKey(checkForHitOrMiss = false, List.empty), fakeFileName)
)
rootWorkflowFileHashCacheActor ! ioHashCommandWithContext
Expand Down Expand Up @@ -56,7 +56,7 @@ class RootWorkflowHashCacheActorSpec extends TestKitSuite with ImplicitSender wi
)

val ioHashCommandWithContext =
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get),
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get, FileHashStrategy.Md5),
FileHashContext(HashKey(checkForHitOrMiss = false, List.empty), fakeFileName)
)
rootWorkflowFileHashCacheActor ! ioHashCommandWithContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package cromwell.backend.standard.callcaching

import akka.actor.{ActorRef, Props}
import akka.testkit._
import com.typesafe.config.ConfigFactory
import cromwell.backend.standard.callcaching.StandardFileHashingActor.SingleFileHashRequest
import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendJobDescriptor}
import cromwell.core.TestKitSuite
import cromwell.core.callcaching.HashingFailedMessage
import cromwell.core.callcaching.{FileHashStrategy, HashingFailedMessage, HashType, SuccessfulHashResultMessage}
import cromwell.core.io.{IoCommand, IoCommandBuilder, IoHashCommand, IoSuccess, PartialIoCommandBuilder}
import cromwell.core.path.{DefaultPathBuilder, Path}
import org.scalatest.flatspec.AnyFlatSpecLike
Expand All @@ -16,7 +17,7 @@ import wom.values.WomSingleFile
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import scala.util.{Failure, Try}
import scala.util.{Failure, Success, Try}

class StandardFileHashingActorSpec
extends TestKitSuite
Expand Down Expand Up @@ -51,10 +52,12 @@ class StandardFileHashingActorSpec
val parentProbe = TestProbe("parentProbe")
val params = StandardFileHashingActorSpec.defaultParams()
val props = Props(new StandardFileHashingActor(params) {
override val ioCommandBuilder: IoCommandBuilder = IoCommandBuilder(
new PartialIoCommandBuilder {
override def hashCommand = throw new RuntimeException("I am expected during tests")
}
override val ioCommandBuilder: IoCommandBuilder = new IoCommandBuilder(
List(
new PartialIoCommandBuilder {
override def hashCommand = throw new RuntimeException("I am expected during tests")
}
)
)
override def getPath(str: String): Try[Path] = Try(DefaultPathBuilder.get(str))
})
Expand Down Expand Up @@ -98,7 +101,7 @@ class StandardFileHashingActorSpec
}
}

it should "handle string hash responses" in {
it should "handle non-string hash responses" in {
val parentProbe = TestProbe("testParentHashString")
val params = StandardFileHashingActorSpec.ioActorParams(ActorRef.noSender)
val props = Props(new StandardFileHashingActor(params) {
Expand All @@ -124,17 +127,104 @@ class StandardFileHashingActorSpec
}
}

it should "handle string hash responses" in {
val parentProbe = TestProbe("testParentHashString")
val params = StandardFileHashingActorSpec.ioActorParams(ActorRef.noSender)
val props = Props(new StandardFileHashingActor(params) {
override lazy val defaultIoTimeout: FiniteDuration = 1.second.dilated

override def getPath(str: String): Try[Path] = Try(DefaultPathBuilder.get(str))
})
val standardFileHashingActorRef = parentProbe.childActorOf(props, "testStandardFileHashingActorHashString")

val fileHashContext = mock[FileHashContext]
fileHashContext.file returns "/expected/failure/path"
val command = mock[IoCommand[String]]
val message: (FileHashContext, IoSuccess[String]) = (fileHashContext, IoSuccess(command, "a_nice_hash"))

standardFileHashingActorRef ! message

parentProbe.expectMsgPF(10.seconds.dilated) {
case succeeded: SuccessfulHashResultMessage =>
succeeded.hashes.map(_.hashValue.value).headOption shouldBe Some("a_nice_hash")
case unexpected => fail(s"received unexpected message $unexpected")
}
}

it should "use the right hashing strategies" in {
val parentProbe = TestProbe("testParentHashStrategies")
val ioActorProbe = TestProbe("ioActorProbe")
val backendConfig = ConfigFactory.parseString(
"""filesystems.gcs.caching.hashing-strategy = ["md5", "identity"]
|filesystems.s3.caching.hashing-strategy = "etag"
|filesystems.http.some-other-config = "foobar"
|filesystems.ftp.caching.hashing-strategy = []
|filesystems.nfs.caching.hashing-strategy = "bogohash"""".stripMargin
)
val config = BackendConfigurationDescriptor(backendConfig, ConfigFactory.empty)

val props =
Props(new StandardFileHashingActor(StandardFileHashingActorSpec.ioActorParams(ioActorProbe.ref, config)) {
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
"gcs" -> FileHashStrategy.Crc32c,
"drs" -> FileHashStrategy.Drs
)
override val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy(List(HashType.Sha256))

override def getPath(str: String): Try[Path] = {
val p = mock[Path]
p.filesystemTypeKey returns str
Success(p)
}
})
val standardFileHashingActorRef = parentProbe.childActorOf(props, "testStandardFileHashingActorHashStrategy")

def checkHashStrategy(filesystemKey: String, expectedStrategy: FileHashStrategy): Unit = {
val request = SingleFileHashRequest(null, null, WomSingleFile(filesystemKey), None)
standardFileHashingActorRef ! request
ioActorProbe.expectMsgPF(10.seconds.dilated) {
case (_: FileHashContext, cmd: IoHashCommand) if cmd.hashStrategy == expectedStrategy =>
case unexpected => fail(s"received unexpected ${filesystemKey} message $unexpected")
}
}

// Test an actor-defined default overriden by config
checkHashStrategy("gcs", FileHashStrategy(List(HashType.Md5, HashType.Identity)))

// Test a strategy only defined in config
checkHashStrategy("s3", FileHashStrategy.ETag)

// Test a strategy defined as an empty list in config, should use fallback
checkHashStrategy("ftp", FileHashStrategy(List(HashType.Sha256)))

// Test a strategy with a default defined in the actor
checkHashStrategy("drs", FileHashStrategy.Drs)

// Test a filesystem that has config, but not this config
checkHashStrategy("http", FileHashStrategy(List(HashType.Sha256)))

// Test a strategy not defined in config or actor defaults
checkHashStrategy("blob", FileHashStrategy(List(HashType.Sha256)))

// Test a strategy with an invalid value in config
checkHashStrategy("nfs", FileHashStrategy(List(HashType.Sha256)))
}

}

object StandardFileHashingActorSpec {
private def testing: Nothing = throw new UnsupportedOperationException("should not be run during tests")
private val emptyBackendConfig = BackendConfigurationDescriptor(ConfigFactory.empty, ConfigFactory.empty)

def defaultParams(): StandardFileHashingActorParams = defaultParams(testing, testing, testing, testing, testing)
def defaultParams(config: BackendConfigurationDescriptor = emptyBackendConfig): StandardFileHashingActorParams =
defaultParams(testing, emptyBackendConfig, testing, testing, testing)

def ioActorParams(ioActor: ActorRef): StandardFileHashingActorParams =
def ioActorParams(ioActor: ActorRef,
config: BackendConfigurationDescriptor = emptyBackendConfig
): StandardFileHashingActorParams =
defaultParams(
withJobDescriptor = testing,
withConfigurationDescriptor = testing,
withConfigurationDescriptor = config,
withIoActor = ioActor,
withServiceRegistryActor = testing,
withBackendInitializationDataOption = testing
Expand All @@ -161,4 +251,13 @@ object StandardFileHashingActorSpec {
override def fileHashCachingActor: Option[ActorRef] = None
}

class StrategyTestFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
"gcs" -> FileHashStrategy.Crc32c,
"drs" -> FileHashStrategy.Drs
)
override val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy(List(HashType.Sha256))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class DrsCloudNioFileProvider(drsPathResolver: DrsPathResolver, drsReadInterpret
val fileAttributesIO = for {
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
sizeOption = drsResolverResponse.size
hashOption = getPreferredHash(drsResolverResponse.hashes)
hashOptions = drsResolverResponse.hashes.getOrElse(Map.empty)
timeCreatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeCreated, drsResolverResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeUpdated, drsResolverResponse.timeUpdated)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOption, timeCreatedOption, timeUpdatedOption)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOptions, timeCreatedOption, timeUpdatedOption)

Option(fileAttributesIO.unsafeRunSync())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package cloud.nio.impl.drs
import java.nio.file.attribute.FileTime
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
import cats.effect.IO
import cloud.nio.spi.HashType.HashType
import cloud.nio.spi.{CloudNioRegularFileAttributes, FileHash, HashType}
import cloud.nio.spi.CloudNioRegularFileAttributes
import org.apache.commons.lang3.exception.ExceptionUtils

class DrsCloudNioRegularFileAttributes(drsPath: String,
sizeOption: Option[Long],
hashOption: Option[FileHash],
val fileHashes: Map[String, String],
timeCreatedOption: Option[FileTime],
timeUpdatedOption: Option[FileTime]
) extends CloudNioRegularFileAttributes {
Expand All @@ -18,30 +17,12 @@ class DrsCloudNioRegularFileAttributes(drsPath: String,

override def size(): Long = sizeOption.getOrElse(0)

override def fileHash: Option[FileHash] = hashOption

override def creationTime(): FileTime = timeCreatedOption.getOrElse(lastModifiedTime())

override def lastModifiedTime(): FileTime = timeUpdatedOption.getOrElse(FileTime.fromMillis(0))
}

object DrsCloudNioRegularFileAttributes {
private val priorityHashList: Seq[(String, HashType)] = Seq(
("crc32c", HashType.Crc32c),
("md5", HashType.Md5),
("sha256", HashType.Sha256),
("etag", HashType.S3Etag)
)

def getPreferredHash(hashesOption: Option[Map[String, String]]): Option[FileHash] =
hashesOption match {
case Some(hashes: Map[String, String]) if hashes.nonEmpty =>
priorityHashList collectFirst {
case (key, hashType) if hashes.contains(key) => FileHash(hashType, hashes(key))
}
// if no preferred hash was found, go ahead and return none because we don't support anything that the DRS object is offering
case _ => None
}

private def convertToOffsetDateTime(timeInString: String): IO[OffsetDateTime] =
// Here timeInString is assumed to be a ISO-8601 DateTime with timezone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cloud.nio.impl.drs
import cats.data.NonEmptyList
import cats.effect.IO
import cloud.nio.impl.drs.DrsCloudNioFileProvider.DrsReadInterpreter
import cloud.nio.spi.{FileHash, HashType}
import com.typesafe.config.ConfigFactory
import common.assertion.CromwellTimeoutSpec
import org.apache.http.HttpVersion
Expand Down Expand Up @@ -145,7 +144,7 @@ class DrsCloudNioFileProviderSpec extends AnyFlatSpecLike with CromwellTimeoutSp
drsFileAttributes.creationTime().toMillis should be(123L)
drsFileAttributes.lastModifiedTime().toMillis should be(456L)
drsFileAttributes.size() should be(789L)
drsFileAttributes.fileHash should be(Option(FileHash(HashType.Md5, "gg0217869")))
drsFileAttributes.fileHashes should be(Map("md5" -> "gg0217869"))
}

it should "throw exceptions for unsupported methods" in {
Expand Down
Loading

0 comments on commit b652a9e

Please sign in to comment.