Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the fuse support for Google Batch backend #7378

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package cromwell.docker.local

import scala.Function.const
import scala.util.Try

import sys.process._
import io.circe.parser._
/**
* Wrapper around the docker cli.
* https://docs.docker.com/engine/reference/commandline/docker/
Expand All @@ -28,6 +29,39 @@ trait DockerCliClient {
_.flatMap(parseHashLine).find(_.key == dockerCliKey).map(_.digest)
}

/**
* Looks up a docker hash from manifest
*
* @param dockerCliKey The docker hash to lookup.
* @return The hash if found, None if not found, and Failure if an error occurs.
*/
def lookupHashDirect(dockerCliKey: DockerCliKey): Try[Option[String]] = {
Try {
val cmd = Seq("docker", "manifest", "inspect", "-v", dockerCliKey.fullName)
val output = new StringBuilder
val logger = ProcessLogger(
(o: String) => output.append(o + "\n"),
(e: String) => System.err.println(e)
)

val exitValue = cmd.!(logger)

if (exitValue == 0) {
val outputJson: String = output.toString()
parse(outputJson) match {
case Left(failure) => throw new RuntimeException("Failed to parse JSON: " + failure.message)
case Right(json) =>
json.hcursor.downField("Descriptor").downField("digest").as[String] match {
case Left(failure) => throw new RuntimeException("Unexpected JSON structure: " + failure.message)
case Right(digest) => Some(digest)
}
}
} else {
throw new RuntimeException(s"Process ran with error. Exit code: $exitValue")
}
}
}

/**
* Pulls a docker image.
* @param dockerCliKey The docker hash to lookup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DockerCliFlow(implicit ec: ExecutionContext) extends DockerRegistry {
implicit val timer = IO.timer(ec)

DockerCliFlow
.lookupHashOrTimeout(firstLookupTimeout)(dockerInfoContext)
.lookupHashOrTimeout(firstLookupTimeout, true)(dockerInfoContext)
.flatMap {
// If the image isn't there, pull it and try again
case (_: DockerInfoNotFound, _) =>
Expand All @@ -50,10 +50,11 @@ object DockerCliFlow {
* @param context The image to lookup.
* @return The docker hash response plus the context of our flow.
*/
private def lookupHash(context: DockerInfoContext): (DockerInfoResponse, DockerInfoContext) = {
private def lookupHash(context: DockerInfoContext, useDirect: Boolean): (DockerInfoResponse, DockerInfoContext) = {
val dockerCliKey = cliKeyFromImageId(context)
DockerInfoActor.logger.debug("Looking up hash of {}", dockerCliKey.fullName)
val result = DockerCliClient.lookupHash(dockerCliKey) match {
val lookupFunction = if (useDirect) DockerCliClient.lookupHashDirect(_) else DockerCliClient.lookupHash(_)
val result = lookupFunction(dockerCliKey) match {
case Success(None) => DockerInfoNotFound(context.request)
case Success(Some(hash)) =>
DockerHashResult.fromString(hash) match {
Expand All @@ -75,10 +76,10 @@ object DockerCliFlow {
* @param context The image to lookup.
* @return The docker hash response plus the context of our flow.
*/
private def lookupHashOrTimeout(timeout: FiniteDuration)(
private def lookupHashOrTimeout(timeout: FiniteDuration, useDirect: Boolean = false)(
context: DockerInfoContext
)(implicit cs: ContextShift[IO], timer: Timer[IO]): IO[(DockerInfoResponse, DockerInfoContext)] =
IO(lookupHash(context))
)(implicit cs: ContextShift[IO], timer: Timer[IO]): IO[(DockerInfoResponse, DockerInfoContext)] =
IO(lookupHash(context, useDirect))
.timeout(timeout)
.handleErrorWith {
case _: TimeoutException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,22 @@ object RunnableBuilder {
scriptContainerPath: String,
jobShell: String,
volumes: List[Volume],
dockerhubCredentials: (String, String)
dockerhubCredentials: (String, String),
fuseEnabled: Boolean
): Runnable.Builder = {

val container = (dockerhubCredentials._1, dockerhubCredentials._2) match {
case (username, password) if username.nonEmpty && password.nonEmpty =>
Container.newBuilder
.setOptions(if(fuseEnabled) "--privileged" else "")
.setImageUri(docker)
.setEntrypoint(jobShell)
.addCommands(scriptContainerPath)
.setUsername(username)
.setPassword(password)
case _ =>
Container.newBuilder
.setOptions(if(fuseEnabled) "--privileged" else "")
.setImageUri(docker)
.setEntrypoint(jobShell)
.addCommands(scriptContainerPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ trait UserRunnable {
scriptContainerPath = createParameters.commandScriptContainerPath.pathAsString,
jobShell = "/bin/bash",
volumes = volumes,
dockerhubCredentials = createParameters.dockerhubCredentials
dockerhubCredentials = createParameters.dockerhubCredentials,
fuseEnabled = createParameters.fuseEnabled
)

val describeRunnable = RunnableBuilder.describeDocker("user runnable", userRunnable)
Expand Down
Loading