Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ package de.gesellix.docker.remote.api.client
import com.squareup.moshi.Json
import com.squareup.moshi.Moshi
import de.gesellix.docker.engine.DockerClientConfig
import de.gesellix.docker.engine.RequestMethod.*
import de.gesellix.docker.engine.RequestMethod.DELETE
import de.gesellix.docker.engine.RequestMethod.GET
import de.gesellix.docker.engine.RequestMethod.HEAD
import de.gesellix.docker.engine.RequestMethod.POST
import de.gesellix.docker.engine.RequestMethod.PUT
import de.gesellix.docker.remote.api.ContainerCreateRequest
import de.gesellix.docker.remote.api.ContainerCreateResponse
import de.gesellix.docker.remote.api.ContainerInspectResponse
Expand All @@ -39,7 +43,6 @@ import de.gesellix.docker.remote.api.core.StreamCallback
import de.gesellix.docker.remote.api.core.Success
import de.gesellix.docker.remote.api.core.SuccessStream
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
Expand Down Expand Up @@ -206,9 +209,8 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
) {
val localVariableConfig = containerAttachRequestConfig(id = id, detachKeys = detachKeys, logs = logs, stream = stream, stdin = stdin, stdout = stdout, stderr = stderr)

val expectMultiplexedResponse = !(containerInspect(id, false).config?.tty ?: false)
val localVarResponse = requestFrames(
localVariableConfig, expectMultiplexedResponse
localVariableConfig
)

when (localVarResponse.responseType) {
Expand All @@ -217,7 +219,7 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
((localVarResponse as SuccessStream<*>).data as Flow<Frame>).collect { callback.onNext(it) }
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
callback.onFinished()
}
}
Expand Down Expand Up @@ -766,7 +768,7 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
)

return when (localVarResponse.responseType) {
ResponseType.Success -> (localVarResponse as Success<*>).data as List<ContainerSummary>
ResponseType.Success -> (localVarResponse as Success<List<ContainerSummary>>).data
ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.")
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
ResponseType.ClientError -> {
Expand Down Expand Up @@ -848,9 +850,8 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
) {
val localVariableConfig = containerLogsRequestConfig(id = id, follow = follow, stdout = stdout, stderr = stderr, since = since, until = until, timestamps = timestamps, tail = tail)

val expectMultiplexedResponse = !(containerInspect(id, false).config?.tty ?: false)
val localVarResponse = requestFrames(
localVariableConfig, expectMultiplexedResponse
localVariableConfig
)

when (localVarResponse.responseType) {
Expand All @@ -859,7 +860,7 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
((localVarResponse as SuccessStream<*>).data as Flow<Frame>).collect { callback.onNext(it) }
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
callback.onFinished()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import de.gesellix.docker.remote.api.core.StreamCallback
import de.gesellix.docker.remote.api.core.Success
import de.gesellix.docker.remote.api.core.SuccessStream
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull
Expand Down Expand Up @@ -247,7 +245,7 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
!(execInspect(id).processConfig?.tty ?: false)
}
val localVarResponse = requestFrames(
localVariableConfig, expectMultiplexedResponse
localVariableConfig
)

val timeout = if (timeoutMillis == null) {
Expand All @@ -263,7 +261,7 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
launch {
withTimeoutOrNull(timeout.toMillis()) {
actualCallback.onStarting(this@launch::cancel)
((localVarResponse as SuccessStream<*>).data as Flow<Frame>).collect { actualCallback.onNext(it) }
(localVarResponse as SuccessStream<Frame>).data.collect { actualCallback.onNext(it) }
actualCallback.onFinished()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import de.gesellix.docker.remote.api.core.StreamCallback
import de.gesellix.docker.remote.api.core.Success
import de.gesellix.docker.remote.api.core.SuccessStream
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
Expand Down Expand Up @@ -241,7 +240,7 @@ class ServiceApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, p
)

return when (localVarResponse.responseType) {
ResponseType.Success -> (localVarResponse as Success<*>).data as List<Service>
ResponseType.Success -> (localVarResponse as Success<List<Service>>).data
ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.")
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
ResponseType.ClientError -> {
Expand Down Expand Up @@ -316,7 +315,7 @@ class ServiceApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, p
val localVariableConfig = serviceLogsRequestConfig(id = id, details = details, follow = follow, stdout = stdout, stderr = stderr, since = since, timestamps = timestamps, tail = tail)

val localVarResponse = requestFrames(
localVariableConfig, true /* do services/tasks always have container.tty == false? */
localVariableConfig
)

when (localVarResponse.responseType) {
Expand All @@ -325,7 +324,7 @@ class ServiceApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, p
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
((localVarResponse as SuccessStream<*>).data as Flow<Frame>).collect { callback.onNext(it) }
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
callback.onFinished()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import de.gesellix.docker.remote.api.core.StreamCallback
import de.gesellix.docker.remote.api.core.Success
import de.gesellix.docker.remote.api.core.SuccessStream
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
Expand Down Expand Up @@ -115,7 +114,7 @@ class TaskApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
)

return when (localVarResponse.responseType) {
ResponseType.Success -> (localVarResponse as Success<*>).data as List<Task>
ResponseType.Success -> (localVarResponse as Success<List<Task>>).data
ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.")
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
ResponseType.ClientError -> {
Expand Down Expand Up @@ -186,7 +185,7 @@ class TaskApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
val localVariableConfig = taskLogsRequestConfig(id = id, details = details, follow = follow, stdout = stdout, stderr = stderr, since = since, timestamps = timestamps, tail = tail)

val localVarResponse = requestFrames(
localVariableConfig, true /* do services/tasks always have container.tty == false? */
localVariableConfig
)

when (localVarResponse.responseType) {
Expand All @@ -195,7 +194,7 @@ class TaskApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
((localVarResponse as SuccessStream<*>).data as Flow<Frame>).collect { callback.onNext(it) }
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
callback.onFinished()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ open class ApiClient(
protected const val TextPlainMediaType = "text/plain"
protected const val JsonMediaType = "application/json"
protected const val OctetStreamMediaType = "application/octet-stream"
protected const val DockerRawStreamMediaType = "application/vnd.docker.raw-stream"
// see https://docs.docker.com/engine/api/version-history/#v142-api-changes
// see https://github.com/moby/moby/pull/39812
// raw-stream: with attached Tty
const val DockerRawStreamMediaType = "application/vnd.docker.raw-stream"
// see https://docs.docker.com/engine/api/version-history/#v142-api-changes
// see https://github.com/moby/moby/pull/39812
// multiplexed-stream: without attached Tty
const val DockerMultiplexedStreamMediaType = "application/vnd.docker.multiplexed-stream"

// val apiKey: MutableMap<String, String> = mutableMapOf()
// val apiKeyPrefix: MutableMap<String, String> = mutableMapOf()
Expand Down Expand Up @@ -136,15 +143,15 @@ open class ApiClient(
return requestStream(request, client)
}

protected fun requestFrames(requestConfig: RequestConfig, expectMultiplexedResponse: Boolean = false): ApiInfrastructureResponse<Frame> {
protected fun requestFrames(requestConfig: RequestConfig): ApiInfrastructureResponse<Frame> {
val engineRequest = EngineRequest(requestConfig.method, requestConfig.path).also {
it.headers = requestConfig.headers
it.query = requestConfig.query
it.body = requestConfig.body
}
val request = prepareRequest(engineRequest, DockerRawStreamMediaType)
val request = prepareRequest(engineRequest)
val client = prepareClient(engineRequest)
return requestFrames(request, client, expectMultiplexedResponse)
return requestFrames(request, client)
}

protected fun prepareRequest(requestConfig: EngineRequest, fallbackContentType: String = ""): Request {
Expand Down Expand Up @@ -254,13 +261,13 @@ open class ApiClient(
)
response.isClientError -> return ClientError(
response.message,
response.body?.string(),
response.body.string(),
response.code,
response.headers.toMultimap()
)
else -> return ServerError(
response.message,
response.body?.string(),
response.body.string(),
response.code,
response.headers.toMultimap()
)
Expand Down Expand Up @@ -289,20 +296,20 @@ open class ApiClient(
)
response.isClientError -> return ClientError(
response.message,
response.body?.string(),
response.body.string(),
response.code,
response.headers.toMultimap()
)
else -> return ServerError(
response.message,
response.body?.string(),
response.body.string(),
response.code,
response.headers.toMultimap()
)
}
}

protected fun requestFrames(request: Request, client: OkHttpClient, expectMultiplexedResponse: Boolean = false): ApiInfrastructureResponse<Frame> {
protected fun requestFrames(request: Request, client: OkHttpClient): ApiInfrastructureResponse<Frame> {
val response = client.newCall(request).execute()
val mediaType = response.header(ContentType)?.substringBefore(";")?.lowercase(Locale.getDefault())

Expand All @@ -318,19 +325,19 @@ open class ApiClient(
response.headers.toMultimap()
)
response.isSuccessful -> return SuccessStream(
response.body.consumeFrames(mediaType, expectMultiplexedResponse),
response.body.consumeFrames(mediaType),
response.code,
response.headers.toMultimap()
)
response.isClientError -> return ClientError(
response.message,
response.body?.string(),
response.body.string(),
response.code,
response.headers.toMultimap()
)
else -> return ServerError(
response.message,
response.body?.string(),
response.body.string(),
response.code,
response.headers.toMultimap()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import java.net.HttpURLConnection.HTTP_NO_CONTENT
data class EnforceResponseContentTypeConfig(val fallbackContentType: String = "")

// This one would work automatically, when the response content-type would be set correctly :-/
// - for /attach, /logs and similar endpoints see https://github.com/gesellix/docker-client/issues/21
// see https://github.com/gesellix/docker-client/issues/21
// see https://github.com/moby/moby/pull/39812
// - for /attach, /logs and similar endpoints the issue has been fixed with api version 1.42
// - for /stats see (?)
class EnforceResponseContentTypeInterceptor : Interceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import okio.BufferedSource
import okio.Source
import okio.buffer

class FrameReader(source: Source, private val expectMultiplexedResponse: Boolean = false) : Reader<Frame> {
class FrameReader(source: Source, private val mediaType: String) : Reader<Frame> {

private val bufferedSource: BufferedSource = source.buffer()

private val buffer = Buffer()

override fun readNext(type: Class<Frame>?): Frame {
return if (expectMultiplexedResponse) {
// see https://docs.docker.com/reference/api/engine/version-history/#v142-api-changes
// see https://github.com/moby/moby/pull/39812
return if (mediaType == ApiClient.Companion.DockerMultiplexedStreamMediaType) {
// See https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach for the stream format documentation.
// header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ inline fun <reified T : Any?> ResponseBody?.consumeStream(mediaType: String?): F
}
}

fun ResponseBody?.consumeFrames(mediaType: String?, expectMultiplexedResponse: Boolean = false): Flow<Frame> {
fun ResponseBody?.consumeFrames(mediaType: String?): Flow<Frame> {
if (this == null) {
return emptyFlow()
}
when (mediaType) {
// TODO since api v1.42 we should be able to use the media-type instead of the 'expectMultiplexedResponse' flag
// see https://docs.docker.com/engine/api/version-history/#v142-api-changes
"application/vnd.docker.multiplexed-stream",
"application/vnd.docker.raw-stream" -> {
val reader = FrameReader(source(), expectMultiplexedResponse)
// Requires api v1.42
// multiplexed-stream: without attached Tty
ApiClient.Companion.DockerMultiplexedStreamMediaType,
// Requires api v1.42
// raw-stream: with attached Tty
ApiClient.Companion.DockerRawStreamMediaType -> {
val reader = FrameReader(source(), mediaType)
val events = flow {
while (reader.hasNext()) {
val next = reader.readNext(Frame::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,9 @@ public void containerAttachNonInteractive() {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
callback.job.cancel();
if (callback.job != null) {
callback.job.cancel();
}
wait.countDown();
}
}, 5000);
Expand All @@ -806,7 +808,10 @@ public void run() {
catch (InterruptedException e) {
e.printStackTrace();
}
assertSame(callback.frames.stream().findAny().get().getStreamType(), Frame.StreamType.STDOUT);

// we receive a RAW response because the connection is not upgraded - which is ok for non-interactive usage
// assertSame(Frame.StreamType.STDOUT, callback.frames.stream().findAny().get().getStreamType());
assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType());

removeContainer(engineApiClient, "container-attach-non-interactive-test");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void systemPingHead() {
@Test
public void systemVersion() {
SystemVersion systemVersion = systemApi.systemVersion();
assertTrue(asList("1.42", "1.43", "1.44", "1.45", "1.46", "1.47", "1.48").contains(systemVersion.getApiVersion()));
assertTrue(asList("1.42", "1.43", "1.44", "1.45", "1.46", "1.47", "1.48", "1.49", "1.50", "1.51").contains(systemVersion.getApiVersion()));
}

static class SystemEventsCallback implements StreamCallback<EventMessage> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public TestImage(EngineApiClient engineApiClient) {

this.useWindowsContainer = Objects.requireNonNull(engineApiClient.getSystemApi().systemVersion().getOs()).equalsIgnoreCase("windows");
this.repository = "gesellix/echo-server";
this.tag = "2024-12-22T16-35-00";
this.tag = "2025-07-27T22-12-00";

// TODO consider NOT calling prepare inside the constructor
prepare();
Expand Down
Loading