Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ jobs:
go run .github/check.go
- name: clean build
run: ./gradlew clean build --info --stacktrace
env:
ORG_GRADLE_PROJECT_githubPackagesUsername: ${{ env.GITHUB_ACTOR }}
ORG_GRADLE_PROJECT_githubPackagesPassword: ${{ secrets.GITHUB_TOKEN }}
- name: Upload Test Results
# see publish-test-results.yml for workflow that publishes test results without security issues for forks
# https://github.com/marketplace/actions/publish-test-results#support-fork-repositories-and-dependabot-branches
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package de.gesellix.docker.remote.api.core;

import okio.Sink;

public interface StreamCallback<T> {

default void onStarting(Cancellable cancellable) {
}

default void attachInput(Sink sink) {
try {
Thread.sleep(500);
sink.close();
} catch (Exception ignored) {
}
throw new IllegalStateException("Falling back to default implementation that closes the sink after 500ms. This is probably not what you want. Please provide a custom implementation of this method to handle the input stream.");
}

void onNext(T element);

default void onFailed(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ import de.gesellix.docker.remote.api.core.ServerError
import de.gesellix.docker.remote.api.core.ServerException
import de.gesellix.docker.remote.api.core.StreamCallback
import de.gesellix.docker.remote.api.core.Success
import de.gesellix.docker.remote.api.core.SuccessBidirectionalStream
import de.gesellix.docker.remote.api.core.SuccessStream
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import okhttp3.RequestBody
import okio.Source
import okio.source
import java.io.InputStream
Expand Down Expand Up @@ -214,18 +216,33 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
)

when (localVarResponse.responseType) {
ResponseType.Success -> {
runBlocking {
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
callback.onFinished()
ResponseType.Success,
ResponseType.Informational -> {
when (localVarResponse) {
is SuccessBidirectionalStream ->
runBlocking {
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
callback.attachInput(localVarResponse.socket.sink)
localVarResponse.data.collect { callback.onNext(it) }
callback.onFinished()
}
}
}

else ->
runBlocking {
launch {
withTimeout(timeoutMillis) {
callback.onStarting(this@launch::cancel)
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
callback.onFinished()
}
}
}
}
}
}
ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.")
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
ResponseType.ClientError -> {
val localVarError = localVarResponse as ClientError<*>
Expand Down Expand Up @@ -259,7 +276,7 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
stdout: Boolean?,
stderr: Boolean?
): RequestConfig {
val localVariableBody = null
val localVariableBody = RequestBody.EMPTY
val localVariableQuery: MultiValueMap = mutableMapOf<String, List<String>>()
.apply {
if (detachKeys != null) {
Expand All @@ -282,6 +299,17 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
}
}
val localVariableHeaders: MutableMap<String, String> = mutableMapOf()
val requiresConnectionUpgrade = stdin != null && stdin
if (requiresConnectionUpgrade)
localVariableHeaders.apply {
put("Connection", "Upgrade")
put("Upgrade", "tcp")
}
// else
// localVariableHeaders.apply {
// put("Connection", "Upgrade")
// put("Upgrade", "tcp")
// }

return RequestConfig(
method = POST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ open class ApiClient(

protected inline fun <reified T> requestBody(content: T, mediaType: String = JsonMediaType): RequestBody =
when {
content is RequestBody -> content
content is File -> content.asRequestBody(
mediaType.toMediaTypeOrNull()
)
Expand Down Expand Up @@ -319,6 +320,12 @@ open class ApiClient(
response.code,
response.headers.toMultimap()
)
response.code == 101 && request.isTcpUpgrade() && response.isTcpUpgrade() -> return SuccessBidirectionalStream(
response.socket.consumeFrames(mediaType),
response.socket!!,
response.code,
response.headers.toMultimap()
)
response.isInformational -> return Informational(
response.message,
response.code,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.gesellix.docker.remote.api.core

import kotlinx.coroutines.flow.Flow
import okio.Socket

enum class ResponseType {
Success, Informational, Redirection, ClientError, ServerError
Expand All @@ -14,8 +15,15 @@ abstract class ApiInfrastructureResponse<T>(val responseType: ResponseType) : Re
abstract val headers: Map<String, List<String>>
}

class SuccessStream<T>(
val data: Flow<T>,
class SuccessBidirectionalStream<T>(
override val data: Flow<T>,
val socket: Socket,
override val statusCode: Int = -1,
override val headers: Map<String, List<String>> = mapOf()
) : SuccessStream<T>(data, statusCode, headers)

open class SuccessStream<T>(
open val data: Flow<T>,
override val statusCode: Int = -1,
override val headers: Map<String, List<String>> = mapOf()
) : ApiInfrastructureResponse<T>(ResponseType.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,34 @@ import de.gesellix.docker.response.JsonChunksReader
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import okhttp3.Request
import okhttp3.Response
import okhttp3.ResponseBody
import okio.Closeable
import okio.Socket
import okio.appendingSink
import okio.buffer
import java.io.File
import java.io.InputStream
import java.lang.reflect.Type
import java.nio.file.Files

fun Request?.isTcpUpgrade(): Boolean {
if (this == null) {
return false
}
return this.header("Connection")?.contains("Upgrade", ignoreCase = true) ?: false &&
this.header("Upgrade")?.contains("tcp", ignoreCase = true) ?: false
}

fun Response?.isTcpUpgrade(): Boolean {
if (this == null) {
return false
}
return this.header("Connection")?.contains("Upgrade", ignoreCase = true) ?: false &&
this.header("Upgrade")?.contains("tcp", ignoreCase = true) ?: false
}

fun ResponseBody?.consumeFile(): File? {
if (this == null) {
return null
Expand Down Expand Up @@ -57,16 +76,36 @@ inline fun <reified T : Any?> ResponseBody?.consumeStream(mediaType: String?): F
}
}

fun Socket?.consumeFrames(mediaType: String?): Flow<Frame> {
if (this == null) {
return emptyFlow()
}
when (mediaType) {
ApiClient.Companion.DockerMultiplexedStreamMediaType,
ApiClient.Companion.DockerRawStreamMediaType -> {
val reader = FrameReader(source, mediaType)
val events = flow {
while (reader.hasNext()) {
val next = reader.readNext(Frame::class.java)
emit(next)
}
source.closeQuietly()
// [email protected]()
}
return events
}
else -> {
throw UnsupportedOperationException("Can't handle media type $mediaType")
}
}
}

fun ResponseBody?.consumeFrames(mediaType: String?): Flow<Frame> {
if (this == null) {
return emptyFlow()
}
when (mediaType) {
// Requires api v1.42
// multiplexed-stream: without attached Tty
ApiClient.Companion.DockerMultiplexedStreamMediaType,
// Requires api v1.42
// raw-stream: with attached Tty
ApiClient.Companion.DockerRawStreamMediaType -> {
val reader = FrameReader(source(), mediaType)
val events = flow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import de.gesellix.docker.remote.api.testutil.InjectDockerClient;
import de.gesellix.docker.remote.api.testutil.TarUtil;
import de.gesellix.docker.remote.api.testutil.TestImage;
import okio.BufferedSink;
import okio.Okio;
import okio.Sink;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -759,6 +762,8 @@ public void containerStatsOnce() {

@Test
public void containerAttachNonInteractive() {
removeContainer(engineApiClient, "container-attach-non-interactive-test");

imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null);

ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest(
Expand Down Expand Up @@ -816,6 +821,107 @@ public void run() {
removeContainer(engineApiClient, "container-attach-non-interactive-test");
}

@Test
public void containerAttachInteractive() {
removeContainer(engineApiClient, "container-attach-interactive-test");

imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null);

ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest(
null, null, null,
true, true, true,
null,
true, true, null,
null,
null,
null,
null,
testImage.getImageWithTag(),
null, null, singletonList("/cat"),
null, null,
null,
singletonMap(LABEL_KEY, LABEL_VALUE),
null, null,
null,
null,
null
);
containerApi.containerCreate(containerCreateRequest, "container-attach-interactive-test");
containerApi.containerStart("container-attach-interactive-test", null);

Duration timeout = Duration.of(5, SECONDS);
LogFrameAndAttachStreamCallback callback = new LogFrameAndAttachStreamCallback() {
@Override
public void attachInput(Sink sink) {
System.out.println("attachInput, sending data...");
new Thread(() -> {
BufferedSink buffer = Okio.buffer(sink);
try {
buffer.writeUtf8("hello echo\n");
buffer.flush();
System.out.println("... data sent");
} catch (IOException e) {
e.printStackTrace();
System.err.println("Failed to write to stdin: " + e.getMessage());
} finally {
try {
Thread.sleep(100);
sink.close();
} catch (Exception ignored) {
// ignore
}
}
}).start();
}
};

new Thread(() -> containerApi.containerAttach(
"container-attach-interactive-test",
null, true, true, true, true, true,
callback, timeout.toMillis())).start();

CountDownLatch wait = new CountDownLatch(1);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (callback.job != null) {
callback.job.cancel();
}
wait.countDown();
}
}, 5000);

try {
wait.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType());
assertEquals(
"hello echo\nhello echo".replaceAll("[\\n\\r]", ""),
callback.frames.stream().map(Frame::getPayloadAsString).collect(Collectors.joining()).replaceAll("[\\n\\r]", ""));

removeContainer(engineApiClient, "container-attach-interactive-test");
}

static class LogFrameAndAttachStreamCallback implements StreamCallback<Frame> {

List<Frame> frames = new ArrayList<>();
Cancellable job = null;

@Override
public void onStarting(Cancellable cancellable) {
job = cancellable;
}

@Override
public void onNext(Frame frame) {
frames.add(frame);
log.info("next: {}", frame);
}
}

static class LogFrameStreamCallback implements StreamCallback<Frame> {

List<Frame> frames = new ArrayList<>();
Expand Down
Loading
Loading