diff --git a/backend/src/main/scala/cromwell/backend/io/JobPaths.scala b/backend/src/main/scala/cromwell/backend/io/JobPaths.scala
index 05ad6a56dc1..6fb3b40e777 100644
--- a/backend/src/main/scala/cromwell/backend/io/JobPaths.scala
+++ b/backend/src/main/scala/cromwell/backend/io/JobPaths.scala
@@ -80,6 +80,8 @@ trait JobPaths {
   lazy val dockerCid = callExecutionRoot.resolve(dockerCidFilename)
   lazy val returnCode = callExecutionRoot.resolve(returnCodeFilename)
   lazy val memoryRetryRC = callExecutionRoot.resolve(memoryRetryRCFilename)
+  // Path to to an existing file that contains the error text of the job if it failed due to memory constraints.
+  lazy val memoryRetryError = standardPaths.error
 
   // This is a `def` because `standardPaths` is a `var` that may be reassigned during the calculation of
   // standard output and error file names.
diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala
index b2d300bbc34..39d6b6e373b 100644
--- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala
+++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala
@@ -1361,7 +1361,7 @@ trait StandardAsyncExecutionActor
   ): Future[ExecutionHandle] = {
 
     // Returns true if the task has written an RC file that indicates OOM, false otherwise
-    def memoryRetryRC: Future[Boolean] = {
+    def memoryRetryRC: Future[(Boolean, Option[Path])] = {
 
       def readFile(path: Path, maxBytes: Option[Int]): Future[String] =
         asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false)
@@ -1384,22 +1384,33 @@ trait StandardAsyncExecutionActor
         }
 
       def checkMemoryRetryStderr(errorKeys: List[String], maxBytes: Int): Future[Boolean] =
-        readFile(jobPaths.standardPaths.error, Option(maxBytes)) map { errorContent =>
+        readFile(jobPaths.memoryRetryError, Option(maxBytes)) map { errorContent =>
           errorKeys.exists(errorContent.contains)
         }
 
-      asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
-        case true => checkMemoryRetryRC()
-        case false =>
-          (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
-            case (Some(keys), Some(limit)) =>
-              asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
-                case true => checkMemoryRetryStderr(keys, limit)
-                case false => Future.successful(false)
-              }
-            case _ => Future.successful(false)
-          }
-      }
+      def checkMemoryRetryError(): Future[Boolean] =
+        (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
+          case (Some(keys), Some(limit)) =>
+            for {
+              memoryRetryErrorExists <- asyncIo.existsAsync(jobPaths.memoryRetryError)
+              memoryRetryErrorFound <-
+                if (memoryRetryErrorExists) checkMemoryRetryStderr(keys, limit) else Future.successful(false)
+            } yield memoryRetryErrorFound
+          case _ => Future.successful(false)
+        }
+
+      // For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
+      // the errors from the standard error file, but now sometimes the error is written to a separate log file.
+      // If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
+      for {
+        memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
+        memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future.successful(false)
+        memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future.successful(true) else checkMemoryRetryError()
+        memoryErrorPathOption =
+          if (memoryRetryRCErrorFound) Option(jobPaths.standardPaths.error)
+          else if (memoryRetryErrorFound) Option(jobPaths.memoryRetryError)
+          else None
+      } yield (memoryRetryErrorFound, memoryErrorPathOption)
     }
 
     val stderr = jobPaths.standardPaths.error
@@ -1410,70 +1421,72 @@ trait StandardAsyncExecutionActor
       // Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
       // may fail due to race conditions on quickly-executing jobs.
       stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
-      outOfMemoryDetected <- memoryRetryRC
-    } yield (stderrSize, returnCodeAsString, outOfMemoryDetected)
-
-    stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
-      val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)
-
-      if (isDone(status)) {
-        tryReturnCodeAsInt match {
-          case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
-            val executionHandle = Future.successful(
-              FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
-                                                Option(returnCodeAsInt),
-                                                None
+      (outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
+    } yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)
+
+    stderrSizeAndReturnCodeAndMemoryRetry flatMap {
+      case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
+        val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)
+
+        if (isDone(status)) {
+          tryReturnCodeAsInt match {
+            case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
+              val executionHandle = Future.successful(
+                FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
+                                                  Option(returnCodeAsInt),
+                                                  None
+                )
               )
-            )
-            retryElseFail(executionHandle)
-          case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
-            handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
-          // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
-          // if it was caused by OOM killer, want to handle as OOM and not job abort.
-          case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
-            val executionHandle = Future.successful(
-              FailedNonRetryableExecutionHandle(
-                RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
-                Option(returnCodeAsInt),
-                None
+              retryElseFail(executionHandle)
+            case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
+              handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
+            // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
+            // if it was caused by OOM killer, want to handle as OOM and not job abort.
+            case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
+              val executionHandle = Future.successful(
+                FailedNonRetryableExecutionHandle(
+                  RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
+                  Option(returnCodeAsInt),
+                  None
+                )
               )
-            )
-            retryElseFail(executionHandle, outOfMemoryDetected)
-          case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
-            Future.successful(AbortedExecutionHandle)
-          case Success(returnCodeAsInt) =>
-            val executionHandle = Future.successful(
-              FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
-                                                Option(returnCodeAsInt),
-                                                None
+              retryElseFail(executionHandle, outOfMemoryDetected)
+            case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
+              Future.successful(AbortedExecutionHandle)
+            case Success(returnCodeAsInt) =>
+              val executionHandle = Future.successful(
+                FailedNonRetryableExecutionHandle(
+                  WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
+                  Option(returnCodeAsInt),
+                  None
+                )
               )
-            )
-            retryElseFail(executionHandle)
-          case Failure(_) =>
-            Future.successful(
-              FailedNonRetryableExecutionHandle(
-                ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
-                kvPairsToSave = None
+              retryElseFail(executionHandle)
+            case Failure(_) =>
+              Future.successful(
+                FailedNonRetryableExecutionHandle(
+                  ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
+                  kvPairsToSave = None
+                )
               )
-            )
-        }
-      } else {
-        tryReturnCodeAsInt match {
-          case Success(returnCodeAsInt)
-              if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
-            val executionHandle = Future.successful(
-              FailedNonRetryableExecutionHandle(
-                RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
-                Option(returnCodeAsInt),
-                None
+          }
+        } else {
+          tryReturnCodeAsInt match {
+            case Success(returnCodeAsInt)
+                if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
+              val executionHandle = Future.successful(
+                FailedNonRetryableExecutionHandle(
+                  RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
+                  Option(returnCodeAsInt),
+                  None
+                )
               )
-            )
-            retryElseFail(executionHandle, outOfMemoryDetected)
-          case _ =>
-            val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
-            retryElseFail(failureStatus)
+              retryElseFail(executionHandle, outOfMemoryDetected)
+            case _ =>
+              val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
+              retryElseFail(failureStatus)
+          }
         }
-      }
     } recoverWith { case exception =>
       if (isDone(status)) Future.successful(FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None))
       else {
diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiJobPaths.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiJobPaths.scala
index ec4af5344cf..dfb910348f8 100644
--- a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiJobPaths.scala
+++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiJobPaths.scala
@@ -41,6 +41,8 @@ case class PipelinesApiJobPaths(override val workflowPaths: PipelinesApiWorkflow
   val jesMonitoringScriptFilename: String = s"${PipelinesApiJobPaths.JesMonitoringKey}.sh"
   val jesMonitoringImageScriptFilename: String = s"${PipelinesApiJobPaths.JesMonitoringImageKey}.sh"
 
+  override lazy val memoryRetryError: Path = jesLogPath
+
   override lazy val customMetadataPaths = Map(
     CallMetadataKeys.BackendLogsPrefix + ":log" -> jesLogPath
   ) ++ (