diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2afea63e..1bb4ce76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,6 +61,9 @@ jobs: go run .github/check.go - name: clean build run: ./gradlew clean build --info --stacktrace + env: + ORG_GRADLE_PROJECT_githubPackagesUsername: ${{ env.GITHUB_ACTOR }} + ORG_GRADLE_PROJECT_githubPackagesPassword: ${{ secrets.GITHUB_TOKEN }} - name: Upload Test Results # see publish-test-results.yml for workflow that publishes test results without security issues for forks # https://github.com/marketplace/actions/publish-test-results#support-fork-repositories-and-dependabot-branches diff --git a/api-client/src/main/java/de/gesellix/docker/remote/api/core/StreamCallback.java b/api-client/src/main/java/de/gesellix/docker/remote/api/core/StreamCallback.java index f5b29726..35f485ce 100644 --- a/api-client/src/main/java/de/gesellix/docker/remote/api/core/StreamCallback.java +++ b/api-client/src/main/java/de/gesellix/docker/remote/api/core/StreamCallback.java @@ -1,10 +1,21 @@ package de.gesellix.docker.remote.api.core; +import okio.Sink; + public interface StreamCallback { default void onStarting(Cancellable cancellable) { } + default void attachInput(Sink sink) { + try { + Thread.sleep(500); + sink.close(); + } catch (Exception ignored) { + } + throw new IllegalStateException("Falling back to default implementation that closes the sink after 500ms. This is probably not what you want. Please provide a custom implementation of this method to handle the input stream."); + } + void onNext(T element); default void onFailed(Exception e) { diff --git a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ContainerApi.kt b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ContainerApi.kt index 97d8479c..bc92643a 100644 --- a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ContainerApi.kt +++ b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ContainerApi.kt @@ -41,11 +41,13 @@ import de.gesellix.docker.remote.api.core.ServerError import de.gesellix.docker.remote.api.core.ServerException import de.gesellix.docker.remote.api.core.StreamCallback import de.gesellix.docker.remote.api.core.Success +import de.gesellix.docker.remote.api.core.SuccessBidirectionalStream import de.gesellix.docker.remote.api.core.SuccessStream import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout +import okhttp3.RequestBody import okio.Source import okio.source import java.io.InputStream @@ -214,18 +216,33 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, ) when (localVarResponse.responseType) { - ResponseType.Success -> { - runBlocking { - launch { - withTimeout(timeoutMillis) { - callback.onStarting(this@launch::cancel) - (localVarResponse as SuccessStream).data.collect { callback.onNext(it) } - callback.onFinished() + ResponseType.Success, + ResponseType.Informational -> { + when (localVarResponse) { + is SuccessBidirectionalStream -> + runBlocking { + launch { + withTimeout(timeoutMillis) { + callback.onStarting(this@launch::cancel) + callback.attachInput(localVarResponse.socket.sink) + localVarResponse.data.collect { callback.onNext(it) } + callback.onFinished() + } + } + } + + else -> + runBlocking { + launch { + withTimeout(timeoutMillis) { + callback.onStarting(this@launch::cancel) + (localVarResponse as SuccessStream).data.collect { callback.onNext(it) } + callback.onFinished() + } + } } - } } } - ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.") ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.") ResponseType.ClientError -> { val localVarError = localVarResponse as ClientError<*> @@ -259,7 +276,7 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, stdout: Boolean?, stderr: Boolean? ): RequestConfig { - val localVariableBody = null + val localVariableBody = RequestBody.EMPTY val localVariableQuery: MultiValueMap = mutableMapOf>() .apply { if (detachKeys != null) { @@ -282,6 +299,17 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, } } val localVariableHeaders: MutableMap = mutableMapOf() + val requiresConnectionUpgrade = stdin != null && stdin + if (requiresConnectionUpgrade) + localVariableHeaders.apply { + put("Connection", "Upgrade") + put("Upgrade", "tcp") + } +// else +// localVariableHeaders.apply { +// put("Connection", "Upgrade") +// put("Upgrade", "tcp") +// } return RequestConfig( method = POST, diff --git a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiClient.kt b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiClient.kt index 0fdc3296..3d14e95f 100644 --- a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiClient.kt +++ b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiClient.kt @@ -77,6 +77,7 @@ open class ApiClient( protected inline fun requestBody(content: T, mediaType: String = JsonMediaType): RequestBody = when { + content is RequestBody -> content content is File -> content.asRequestBody( mediaType.toMediaTypeOrNull() ) @@ -319,6 +320,12 @@ open class ApiClient( response.code, response.headers.toMultimap() ) + response.code == 101 && request.isTcpUpgrade() && response.isTcpUpgrade() -> return SuccessBidirectionalStream( + response.socket.consumeFrames(mediaType), + response.socket!!, + response.code, + response.headers.toMultimap() + ) response.isInformational -> return Informational( response.message, response.code, diff --git a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiInfrastructureResponse.kt b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiInfrastructureResponse.kt index 22702415..015df8fe 100644 --- a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiInfrastructureResponse.kt +++ b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiInfrastructureResponse.kt @@ -1,6 +1,7 @@ package de.gesellix.docker.remote.api.core import kotlinx.coroutines.flow.Flow +import okio.Socket enum class ResponseType { Success, Informational, Redirection, ClientError, ServerError @@ -14,8 +15,15 @@ abstract class ApiInfrastructureResponse(val responseType: ResponseType) : Re abstract val headers: Map> } -class SuccessStream( - val data: Flow, +class SuccessBidirectionalStream( + override val data: Flow, + val socket: Socket, + override val statusCode: Int = -1, + override val headers: Map> = mapOf() +) : SuccessStream(data, statusCode, headers) + +open class SuccessStream( + open val data: Flow, override val statusCode: Int = -1, override val headers: Map> = mapOf() ) : ApiInfrastructureResponse(ResponseType.Success) diff --git a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ResponseConsumer.kt b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ResponseConsumer.kt index ea591a51..6be03205 100644 --- a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ResponseConsumer.kt +++ b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ResponseConsumer.kt @@ -6,8 +6,11 @@ import de.gesellix.docker.response.JsonChunksReader import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow +import okhttp3.Request +import okhttp3.Response import okhttp3.ResponseBody import okio.Closeable +import okio.Socket import okio.appendingSink import okio.buffer import java.io.File @@ -15,6 +18,22 @@ import java.io.InputStream import java.lang.reflect.Type import java.nio.file.Files +fun Request?.isTcpUpgrade(): Boolean { + if (this == null) { + return false + } + return this.header("Connection")?.contains("Upgrade", ignoreCase = true) ?: false && + this.header("Upgrade")?.contains("tcp", ignoreCase = true) ?: false +} + +fun Response?.isTcpUpgrade(): Boolean { + if (this == null) { + return false + } + return this.header("Connection")?.contains("Upgrade", ignoreCase = true) ?: false && + this.header("Upgrade")?.contains("tcp", ignoreCase = true) ?: false +} + fun ResponseBody?.consumeFile(): File? { if (this == null) { return null @@ -57,16 +76,36 @@ inline fun ResponseBody?.consumeStream(mediaType: String?): F } } +fun Socket?.consumeFrames(mediaType: String?): Flow { + if (this == null) { + return emptyFlow() + } + when (mediaType) { + ApiClient.Companion.DockerMultiplexedStreamMediaType, + ApiClient.Companion.DockerRawStreamMediaType -> { + val reader = FrameReader(source, mediaType) + val events = flow { + while (reader.hasNext()) { + val next = reader.readNext(Frame::class.java) + emit(next) + } + source.closeQuietly() +// this@consumeFrames.cancel() + } + return events + } + else -> { + throw UnsupportedOperationException("Can't handle media type $mediaType") + } + } +} + fun ResponseBody?.consumeFrames(mediaType: String?): Flow { if (this == null) { return emptyFlow() } when (mediaType) { - // Requires api v1.42 - // multiplexed-stream: without attached Tty ApiClient.Companion.DockerMultiplexedStreamMediaType, - // Requires api v1.42 - // raw-stream: with attached Tty ApiClient.Companion.DockerRawStreamMediaType -> { val reader = FrameReader(source(), mediaType) val events = flow { diff --git a/api-client/src/test/java/de/gesellix/docker/remote/api/client/ContainerApiIntegrationTest.java b/api-client/src/test/java/de/gesellix/docker/remote/api/client/ContainerApiIntegrationTest.java index 1272424e..212c251c 100644 --- a/api-client/src/test/java/de/gesellix/docker/remote/api/client/ContainerApiIntegrationTest.java +++ b/api-client/src/test/java/de/gesellix/docker/remote/api/client/ContainerApiIntegrationTest.java @@ -26,7 +26,10 @@ import de.gesellix.docker.remote.api.testutil.InjectDockerClient; import de.gesellix.docker.remote.api.testutil.TarUtil; import de.gesellix.docker.remote.api.testutil.TestImage; +import okio.BufferedSink; import okio.Okio; +import okio.Sink; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -759,6 +762,8 @@ public void containerStatsOnce() { @Test public void containerAttachNonInteractive() { + removeContainer(engineApiClient, "container-attach-non-interactive-test"); + imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null); ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest( @@ -816,6 +821,107 @@ public void run() { removeContainer(engineApiClient, "container-attach-non-interactive-test"); } + @Test + public void containerAttachInteractive() { + removeContainer(engineApiClient, "container-attach-interactive-test"); + + imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null); + + ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest( + null, null, null, + true, true, true, + null, + true, true, null, + null, + null, + null, + null, + testImage.getImageWithTag(), + null, null, singletonList("/cat"), + null, null, + null, + singletonMap(LABEL_KEY, LABEL_VALUE), + null, null, + null, + null, + null + ); + containerApi.containerCreate(containerCreateRequest, "container-attach-interactive-test"); + containerApi.containerStart("container-attach-interactive-test", null); + + Duration timeout = Duration.of(5, SECONDS); + LogFrameAndAttachStreamCallback callback = new LogFrameAndAttachStreamCallback() { + @Override + public void attachInput(Sink sink) { + System.out.println("attachInput, sending data..."); + new Thread(() -> { + BufferedSink buffer = Okio.buffer(sink); + try { + buffer.writeUtf8("hello echo\n"); + buffer.flush(); + System.out.println("... data sent"); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Failed to write to stdin: " + e.getMessage()); + } finally { + try { + Thread.sleep(100); + sink.close(); + } catch (Exception ignored) { + // ignore + } + } + }).start(); + } + }; + + new Thread(() -> containerApi.containerAttach( + "container-attach-interactive-test", + null, true, true, true, true, true, + callback, timeout.toMillis())).start(); + + CountDownLatch wait = new CountDownLatch(1); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + if (callback.job != null) { + callback.job.cancel(); + } + wait.countDown(); + } + }, 5000); + + try { + wait.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType()); + assertEquals( + "hello echo\nhello echo".replaceAll("[\\n\\r]", ""), + callback.frames.stream().map(Frame::getPayloadAsString).collect(Collectors.joining()).replaceAll("[\\n\\r]", "")); + + removeContainer(engineApiClient, "container-attach-interactive-test"); + } + + static class LogFrameAndAttachStreamCallback implements StreamCallback { + + List frames = new ArrayList<>(); + Cancellable job = null; + + @Override + public void onStarting(Cancellable cancellable) { + job = cancellable; + } + + @Override + public void onNext(Frame frame) { + frames.add(frame); + log.info("next: {}", frame); + } + } + static class LogFrameStreamCallback implements StreamCallback { List frames = new ArrayList<>(); diff --git a/build.gradle.kts b/build.gradle.kts index 3acd5daf..0ae983f1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -40,13 +40,19 @@ val dependencyVersionsByGroup = mapOf( subprojects { repositories { // mavenLocal() -// fun findProperty(s: String) = project.findProperty(s) as String? -// maven { -// name = "github" -// setUrl("https://maven.pkg.github.com/docker-client/*") -// credentials { -// username = System.getenv("PACKAGE_REGISTRY_USER") ?: findProperty("github.package-registry.username") -// password = System.getenv("PACKAGE_REGISTRY_TOKEN") ?: findProperty("github.package-registry.password") +// listOf( +//// "gesellix/okhttp", +//// "docker-client/*", +// ).forEach { slug -> +//// fun findProperty(s: String) = project.findProperty(s) as String? +// maven { +// name = "githubPackages" +// url = uri("https://maven.pkg.github.com/${slug}") +// credentials(PasswordCredentials::class) +//// credentials { +//// username = System.getenv("PACKAGE_REGISTRY_USER") ?: findProperty("github.package-registry.username") +//// password = System.getenv("PACKAGE_REGISTRY_TOKEN") ?: findProperty("github.package-registry.password") +//// } // } // } mavenCentral() @@ -64,6 +70,10 @@ allprojects { useVersion(forcedVersion) } } +// dependencySubstitution { +// substitute(module("com.squareup.okhttp3:okhttp-jvm")) +// .using(module("de.gesellix.okhttp3-forked:okhttp-jvm:${libs.versions.okhttp.get()}")) +// } } } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6f1ca2dd..1df82e36 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -31,7 +31,7 @@ logback = { module = "ch.qos.logback:logback-classic", version.ref = "logback" } moshi = { module = "com.squareup.moshi:moshi", version.ref = "moshi" } moshiKotlin = { module = "com.squareup.moshi:moshi-kotlin", version.ref = "moshi" } okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } -okhttpMockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" } +okhttpMockwebserverJunit5 = { module = "com.squareup.okhttp3:mockwebserver3-junit5", version.ref = "okhttp" } okio = { module = "com.squareup.okio:okio", version.ref = "okio" } okioJvm = { module = "com.squareup.okio:okio-jvm", version.ref = "okio" } slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } @@ -39,7 +39,7 @@ slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } [bundles] kotlin = ["kotlin", "kotlinCommon", "kotlinJdk7", "kotlinJdk8", "kotlinReflect", "kotlinScriptingJvm", "kotlinStdlib", "kotlinTest"] moshi = ["moshi", "moshiKotlin"] -okhttp = ["okhttp", "okhttpMockwebserver"] +okhttp = ["okhttp", "okhttpMockwebserverJunit5"] okio = ["okio", "okioJvm"] [plugins]