Skip to content

Commit

Permalink
Retry binding server to random port in the resource staging server te…
Browse files Browse the repository at this point in the history
…st. (apache#378)

* Retry binding server to random port in the resource staging server test.

* Break if successful start

* Start server in try block.

* FIx scalastyle

* More rigorous cleanup logic. Increment port numbers.

* Move around more exception logic.

* More exception refactoring.

* Remove whitespace

* Fix test

* Rename variable
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent e086f4d commit 7d0fa56
Showing 1 changed file with 58 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.google.common.io.ByteStreams
import okhttp3.{RequestBody, ResponseBody}
import org.eclipse.jetty.server.Server
import org.scalatest.BeforeAndAfter
import org.scalatest.mock.MockitoSugar.mock
import retrofit2.Call

import org.apache.spark.{SparkFunSuite, SSLOptions}
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
Expand All @@ -40,30 +42,37 @@ import org.apache.spark.util.Utils
* we've configured the Jetty server correctly and that the endpoints reached over HTTP can
* receive streamed uploads and can stream downloads.
*/
class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with Logging {

private val MAX_SERVER_START_ATTEMPTS = 5
private var serviceImpl: ResourceStagingService = _
private var stagedResourcesCleaner: StagedResourcesCleaner = _
private var server: ResourceStagingServer = _
private var server: Option[ResourceStagingServer] = None
private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule)

private val serverPort = new ServerSocket(0).getLocalPort

private val sslOptionsProvider = new SettableReferenceSslOptionsProvider()

before {
stagedResourcesCleaner = mock[StagedResourcesCleaner]
serviceImpl = new ResourceStagingServiceImpl(
new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner)
server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider)
}

after {
server.stop()
server.foreach { s =>
try {
s.stop()
} catch {
case e: Throwable =>
log.warn("Failed to stop the resource staging server.", e)
}
}
server = None
}

test("Accept file and jar uploads and downloads") {
server.start()
runUploadAndDownload(SSLOptions())
val serverPort = startServer()
runUploadAndDownload(SSLOptions(), serverPort)
}

test("Enable SSL on the server") {
Expand All @@ -80,11 +89,11 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
trustStore = Some(keyStoreAndTrustStore.trustStore),
trustStorePassword = Some("trustStore"))
sslOptionsProvider.setOptions(sslOptions)
server.start()
runUploadAndDownload(sslOptions)
val serverPort = startServer()
runUploadAndDownload(sslOptions, serverPort)
}

private def runUploadAndDownload(sslOptions: SSLOptions): Unit = {
private def runUploadAndDownload(sslOptions: SSLOptions, serverPort: Int): Unit = {
val scheme = if (sslOptions.enabled) "https" else "http"
val retrofitService = RetrofitClientFactoryImpl.createRetrofitClient(
s"$scheme://127.0.0.1:$serverPort/",
Expand Down Expand Up @@ -125,6 +134,44 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
val downloadedBytes = ByteStreams.toByteArray(responseBody.byteStream())
assert(downloadedBytes.toSeq === bytes)
}

private def startServer(): Int = {
var currentAttempt = 0
var successfulStart = false
var latestServerPort = new ServerSocket(0).getLocalPort
while (currentAttempt < MAX_SERVER_START_ATTEMPTS && !successfulStart) {
val newServer = new ResourceStagingServer(latestServerPort, serviceImpl, sslOptionsProvider)
try {
newServer.start()
successfulStart = true
server = Some(newServer)
} catch {
case e: Throwable =>
try {
newServer.stop()
} catch {
case e1: Throwable =>
log.warn("Failed to stop a resource staging server that failed to start.", e1)
}

if (Utils.isBindCollision(e)) {
currentAttempt += 1
latestServerPort = latestServerPort + 1
if (currentAttempt == MAX_SERVER_START_ATTEMPTS) {
throw new RuntimeException(s"Failed to bind to a random port" +
s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort", e)
} else {
logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" +
s" server on port $latestServerPort.", e)
}
} else {
throw e
}
}
}
logInfo(s"Started resource staging server on port $latestServerPort.")
latestServerPort
}
}

private class SettableReferenceSslOptionsProvider extends ResourceStagingServerSslOptionsProvider {
Expand Down

0 comments on commit 7d0fa56

Please sign in to comment.