From 02b2d6d520526f11009bebc66c9e2b87d4688d4c Mon Sep 17 00:00:00 2001 From: Daniel Eggert Date: Mon, 28 Apr 2025 11:39:58 +0200 Subject: [PATCH 1/5] Add unix domain socket based async channel test --- .../AsynChannelUnixDomainSocketTests.swift | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift diff --git a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift new file mode 100644 index 0000000000..3b336a4452 --- /dev/null +++ b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift @@ -0,0 +1,169 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import NIOPosix +import Testing + +@testable import NIOCore + +@Suite +private enum AsynChannelUnixDomainSocketTests { + /// This is a end-to-end async channel based test. + /// + /// The server side listens on a UNIX domain socket, and the client connects to this socket. + /// + /// The server and client exchange simple, line based messages. + @available(macOS 10.15, iOS 17, tvOS 13, watchOS 6, *) + @Test() + static func runServer() async throws { + try await confirmation("Client did receive message") { clientDidReceive in + try await confirmation("Server did receive message") { serverDidReceive in + try await check( + clientDidReceive: clientDidReceive, + serverDidReceive: serverDidReceive + ) + } + } + } +} + +@available(iOS 17.0, *) +private func check( + clientDidReceive: Confirmation, + serverDidReceive: Confirmation +) async throws { + // This uses a hard-coded path. + // + // The path of a UNIX domain socket has a relatively low limit on its total + // length, and we thus can not put this inside some (potentially) deeply + // nested directory hierarchy. + let path = "/tmp/9ac7750dc22a066066871aadf481e31a" + let serverChannel = try await makeServerChannel(path: path) + + try await withThrowingDiscardingTaskGroup { group in + try await serverChannel.executeThenClose { inbound in + group.addTask { + // Create a client connection to the server: + let clientChannel = try await makeClientChannel(path: path) + print("Executing client channel") + try await clientChannel.executeThenClose { inbound, outbound in + print("C: Sending hello") + try await outbound.write("Hello") + + var inboundIterator = inbound.makeAsyncIterator() + guard let messageA = try await inboundIterator.next() else { return } + print("C: Did receive '\(messageA)'") + clientDidReceive.confirm() + #expect(messageA == "Hello") + + try await outbound.write("QUIT") + } + } + + for try await connectionChannel in inbound { + group.addTask { + print("Handling new connection") + await handleConnection( + channel: connectionChannel, + serverDidReceive: serverDidReceive + ) + print("Done handling connection") + } + break + } + } + } +} + +private func makeServerChannel( + path: String +) async throws -> NIOAsyncChannel, Never> { + try await ServerBootstrap( + group: NIOSingletons.posixEventLoopGroup + ).bind( + unixDomainSocketPath: path, + cleanupExistingSocketFile: true, + serverBackPressureStrategy: nil + ) { childChannel in + childChannel.eventLoop.makeCompletedFuture { + try childChannel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder())) + try childChannel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder())) + return try NIOAsyncChannel( + wrappingChannelSynchronously: childChannel + ) + } + } +} + +private func makeClientChannel( + path: String +) async throws -> NIOAsyncChannel { + try await ClientBootstrap(group: NIOSingletons.posixEventLoopGroup) + .connect(unixDomainSocketPath: path) + .flatMap { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder())) + try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder())) + return try NIOAsyncChannel(wrappingChannelSynchronously: channel) + } + } + .get() +} + +private func handleConnection( + channel: NIOAsyncChannel, + serverDidReceive: Confirmation +) async { + do { + print("S: New channel") + try await channel.executeThenClose { inbound, outbound in + for try await message in inbound { + print("S: Did receive '\(message)'") + guard message != "QUIT" else { return } + serverDidReceive.confirm() + try await outbound.write(message) + } + print("S: Bye") + } + } catch { + print("Error: \(error)") + } +} + +/// A simple newline based encoder and decoder. +private final class NewlineDelimiterCoder: ByteToMessageDecoder, MessageToByteEncoder { + typealias InboundIn = ByteBuffer + typealias InboundOut = String + + private let newLine = UInt8(ascii: "\n") + + init() {} + + func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState { + let readableBytes = buffer.readableBytesView + + if let firstLine = readableBytes.firstIndex(of: self.newLine).map({ readableBytes[..<$0] }) { + buffer.moveReaderIndex(forwardBy: firstLine.count + 1) + // Fire a read without a newline + context.fireChannelRead(Self.wrapInboundOut(String(buffer: ByteBuffer(firstLine)))) + return .continue + } else { + return .needMoreData + } + } + + func encode(data: String, out: inout ByteBuffer) throws { + out.writeString(data) + out.writeInteger(self.newLine) + } +} From 05699f22094942509b534bb69cc609c35c321e05 Mon Sep 17 00:00:00 2001 From: Daniel Eggert Date: Thu, 1 May 2025 13:19:15 +0200 Subject: [PATCH 2/5] Guard Swift Testing code with #if + #endif --- .../AsyncChannel/AsynChannelUnixDomainSocketTests.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift index 3b336a4452..cacbb2b09a 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift @@ -11,6 +11,7 @@ // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// +#if canImport(Testing) import NIOPosix import Testing @@ -167,3 +168,4 @@ private final class NewlineDelimiterCoder: ByteToMessageDecoder, MessageToByteEn out.writeInteger(self.newLine) } } +#endif // canImport(Testing) From 6771f4e5152818c6642f64b219e6ed74272e2400 Mon Sep 17 00:00:00 2001 From: Daniel Eggert Date: Thu, 1 May 2025 13:21:26 +0200 Subject: [PATCH 3/5] Whitespace --- .../AsyncChannel/AsynChannelUnixDomainSocketTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift index cacbb2b09a..70cf4aa60b 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift @@ -168,4 +168,4 @@ private final class NewlineDelimiterCoder: ByteToMessageDecoder, MessageToByteEn out.writeInteger(self.newLine) } } -#endif // canImport(Testing) +#endif // canImport(Testing) From 3263be811e1c1a609909a6a2cc7d9a19d482c945 Mon Sep 17 00:00:00 2001 From: Daniel Eggert Date: Thu, 1 May 2025 13:32:01 +0200 Subject: [PATCH 4/5] Bump / update @available versions --- .../AsyncChannel/AsynChannelUnixDomainSocketTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift index 70cf4aa60b..bd4019abf3 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift @@ -24,7 +24,7 @@ private enum AsynChannelUnixDomainSocketTests { /// The server side listens on a UNIX domain socket, and the client connects to this socket. /// /// The server and client exchange simple, line based messages. - @available(macOS 10.15, iOS 17, tvOS 13, watchOS 6, *) + @available(macOS 14.0, iOS 17, tvOS 17, watchOS 10, *) @Test() static func runServer() async throws { try await confirmation("Client did receive message") { clientDidReceive in @@ -38,7 +38,7 @@ private enum AsynChannelUnixDomainSocketTests { } } -@available(iOS 17.0, *) +@available(macOS 14.0, iOS 17, tvOS 17, watchOS 10, *) private func check( clientDidReceive: Confirmation, serverDidReceive: Confirmation From ffcfbcb99f8b8460449629f6feee4f205022d05a Mon Sep 17 00:00:00 2001 From: Daniel Eggert Date: Tue, 6 May 2025 11:23:12 +0200 Subject: [PATCH 5/5] Propagate error from handleConnection() --- .../AsynChannelUnixDomainSocketTests.swift | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift index bd4019abf3..7716fb781d 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsynChannelUnixDomainSocketTests.swift @@ -74,7 +74,7 @@ private func check( for try await connectionChannel in inbound { group.addTask { print("Handling new connection") - await handleConnection( + try await handleConnection( channel: connectionChannel, serverDidReceive: serverDidReceive ) @@ -124,20 +124,16 @@ private func makeClientChannel( private func handleConnection( channel: NIOAsyncChannel, serverDidReceive: Confirmation -) async { - do { - print("S: New channel") - try await channel.executeThenClose { inbound, outbound in - for try await message in inbound { - print("S: Did receive '\(message)'") - guard message != "QUIT" else { return } - serverDidReceive.confirm() - try await outbound.write(message) - } - print("S: Bye") +) async throws { + print("S: New channel") + try await channel.executeThenClose { inbound, outbound in + for try await message in inbound { + print("S: Did receive '\(message)'") + guard message != "QUIT" else { return } + serverDidReceive.confirm() + try await outbound.write(message) } - } catch { - print("Error: \(error)") + print("S: Bye") } }