Skip to content

Commit

Permalink
Merge branch 'quic'
Browse files Browse the repository at this point in the history
  • Loading branch information
TobiasGrether committed Mar 4, 2024
2 parents 2b3a3c7 + 8183cf5 commit ff732a7
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 23 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<version>2.0.2-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-codec-native-quic</artifactId>
<version>0.0.57.Final</version>
<classifier>linux-x86_64</classifier>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import dev.waterdog.waterdogpe.network.protocol.ProtocolCodecs;
import dev.waterdog.waterdogpe.plugin.Plugin;
import org.nethergames.proxytransport.integration.CustomTransportServerInfo;
import org.nethergames.proxytransport.integration.QuicTransportServerInfo;
import org.nethergames.proxytransport.integration.TcpTransportServerInfo;
import org.nethergames.proxytransport.utils.CodecUpdater;

public class ProxyTransport extends Plugin {
Expand All @@ -12,7 +13,8 @@ public void onStartup() {
ProtocolCodecs.addUpdater(new CodecUpdater());

getLogger().info("ProxyTransport was started.");
getLogger().info("Registered type with name {}", CustomTransportServerInfo.TYPE.getIdentifier());
getLogger().info("Registered type with name {}", QuicTransportServerInfo.TYPE.getIdentifier());
getLogger().info("Registered type with name {}", TcpTransportServerInfo.TYPE.getIdentifier());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicConnectionStats;
import io.netty.util.concurrent.Future;
import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.cloudburstmc.protocol.bedrock.codec.BedrockCodec;
Expand Down Expand Up @@ -46,7 +51,7 @@ public class TransportClientConnection extends BedrockClientConnection {

private final Channel channel;
private long lastPingTimestamp;
private long latency;
private long latency; // Latency in microseconds

private final List<ScheduledFuture<?>> scheduledTasks = new ArrayList<>();

Expand All @@ -56,7 +61,7 @@ public TransportClientConnection(ProxiedPlayer player, ServerInfo serverInfo, Ch
this.channel = channel;
this.channel.closeFuture().addListener(future -> cleanActiveChannels());

scheduledTasks.add(channel.eventLoop().scheduleAtFixedRate(this::sendAcknowledge, PING_CYCLE_TIME, PING_CYCLE_TIME, TimeUnit.SECONDS));
scheduledTasks.add(channel.eventLoop().scheduleAtFixedRate(this::collectStats, PING_CYCLE_TIME, PING_CYCLE_TIME, TimeUnit.SECONDS));
scheduledTasks.add(channel.eventLoop().scheduleAtFixedRate(() -> packetSendingLimit.set(0), 1, 1, TimeUnit.SECONDS));
}

Expand Down Expand Up @@ -100,7 +105,8 @@ private void onBedrockBatch(@NonNull BedrockBatchWrapper batch) {
wrapper.release(); // release
batch.modify();

receiveAcknowledge();
this.latency = (System.nanoTime() - this.lastPingTimestamp) / 2000;
this.broadcastPing();
}
}
}
Expand Down Expand Up @@ -165,25 +171,37 @@ public long getPing() {
return latency;
}

public void sendAcknowledge() {
public void collectStats() {
var connection = getPlayer().getDownstreamConnection();
if (connection instanceof TransportClientConnection && connection.getServerInfo().getServerName().equalsIgnoreCase(getServerInfo().getServerName())) {
NetworkStackLatencyPacket packet = new NetworkStackLatencyPacket();
packet.setTimestamp(0L);
packet.setFromServer(true);

sendPacket(packet);

lastPingTimestamp = System.currentTimeMillis();
if (this.channel instanceof NioSocketChannel) {
NetworkStackLatencyPacket packet = new NetworkStackLatencyPacket();
packet.setTimestamp(0L);
packet.setFromServer(true);

sendPacket(packet);

this.lastPingTimestamp = System.nanoTime();
} else if (this.channel instanceof EpollSocketChannel epollChannel) {
this.latency = epollChannel.tcpInfo().rtt() / 2;
this.broadcastPing();
} else if (this.channel instanceof QuicChannel quicChannel) {
quicChannel.collectStats().addListener((Future<QuicConnectionStats> future) -> {
if (future.isSuccess()) {
QuicConnectionStats stats = future.getNow();

this.latency = stats.recv();
this.broadcastPing();
}
});
}
}
}

private void receiveAcknowledge() {
latency = (System.currentTimeMillis() - lastPingTimestamp) / 2;

private void broadcastPing() {
TickSyncPacket latencyPacket = new TickSyncPacket();
latencyPacket.setRequestTimestamp(getPlayer().getPing());
latencyPacket.setResponseTimestamp(latency);
latencyPacket.setRequestTimestamp(getPlayer().getPing() * 1000);
latencyPacket.setResponseTimestamp(this.latency);

sendPacket(latencyPacket);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.netty.channel.ChannelInboundHandlerAdapter;

public class CustomClientEventHandler extends ChannelInboundHandlerAdapter {
public static final String NAME = "tcp-client-event-handler";
public static final String NAME = "client-event-handler";

private final ProxiedPlayer player;
private final ClientConnection connection;
Expand All @@ -27,7 +27,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
return;
}

this.player.getLogger().warning("[" + connection.getSocketAddress() + "|" + this.player.getName() + "] - Exception in TCP connection caught", cause);
this.player.getLogger().warning("[" + connection.getSocketAddress() + "|" + this.player.getName() + "] - Exception in connection caught", cause);
this.player.onDownstreamTimeout(this.connection.getServerInfo());

this.connection.disconnect();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.nethergames.proxytransport.integration;

import dev.waterdog.waterdogpe.network.connection.client.ClientConnection;
import dev.waterdog.waterdogpe.network.serverinfo.ServerInfo;
import dev.waterdog.waterdogpe.network.serverinfo.ServerInfoType;
import dev.waterdog.waterdogpe.player.ProxiedPlayer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.incubator.codec.quic.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import net.jodah.expiringmap.internal.NamedThreadFactory;
import org.nethergames.proxytransport.impl.TransportChannelInitializer;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class QuicTransportServerInfo extends ServerInfo {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static final ThreadFactory downstreamThreadFactory = new NamedThreadFactory("QUIC-Downstream %s");
public static final EventLoopGroup downstreamLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(availableCPU, downstreamThreadFactory) : new NioEventLoopGroup(availableCPU, downstreamThreadFactory);

public static final String TYPE_IDENT = "quic";
public static final ServerInfoType TYPE = ServerInfoType.builder()
.identifier(TYPE_IDENT)
.serverInfoFactory(QuicTransportServerInfo::new)
.register();

private final HashMap<InetSocketAddress, Future<QuicChannel>> serverConnections = new HashMap<>();

public QuicTransportServerInfo(String serverName, InetSocketAddress address, InetSocketAddress publicAddress) {
super(serverName, address, publicAddress);
}

@Override
public ServerInfoType getServerType() {
return TYPE;
}

@Override
public Future<ClientConnection> createConnection(ProxiedPlayer proxiedPlayer) {
EventLoop eventLoop = proxiedPlayer.getProxy().getWorkerEventLoopGroup().next();
Promise<ClientConnection> promise = eventLoop.newPromise();

this.createServerConnection(eventLoop, this.getAddress()).addListener((Future<QuicChannel> future) -> {
if (future.isSuccess()) {
QuicChannel quicChannel = future.getNow();

quicChannel.createStream(QuicStreamType.BIDIRECTIONAL, new TransportChannelInitializer(proxiedPlayer, this, promise)).addListener((Future<QuicStreamChannel> streamFuture) -> {
if (!streamFuture.isSuccess()) {
promise.tryFailure(streamFuture.cause());
quicChannel.close();
}
});
} else {
promise.tryFailure(future.cause());
}
});

return promise;
}

private Future<QuicChannel> createServerConnection(EventLoopGroup eventLoopGroup, InetSocketAddress address) {
EventLoop eventLoop = eventLoopGroup.next();

if (this.serverConnections.containsKey(address)) {
return this.serverConnections.get(address);
}

Promise<QuicChannel> promise = eventLoop.newPromise();
this.serverConnections.put(address, promise);

QuicSslContext sslContext = QuicSslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).applicationProtocols("ng").build();
ChannelHandler codec = new QuicClientCodecBuilder()
.sslContext(sslContext)
.maxIdleTimeout(5000, TimeUnit.MILLISECONDS)
.initialMaxData(10000000)
.initialMaxStreamDataBidirectionalLocal(1000000)
.build();

new Bootstrap()
.group(downstreamLoopGroup)
.handler(codec)
.channel(getProperSocketChannel())
.bind(0).addListener((ChannelFuture channelFuture) -> {
if (channelFuture.isSuccess()) {
QuicChannel.newBootstrap(channelFuture.channel())
.streamHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.close();
}
})
.remoteAddress(address)
.connect().addListener((Future<QuicChannel> quicChannelFuture) -> {
if (quicChannelFuture.isSuccess()) {
QuicChannel quicChannel = quicChannelFuture.getNow();
quicChannel.closeFuture().addListener(f -> serverConnections.remove(address));

promise.trySuccess(quicChannel);
} else {
promise.tryFailure(quicChannelFuture.cause());
channelFuture.channel().close();
}
});
} else {
promise.tryFailure(channelFuture.cause());
channelFuture.channel().close();
}
});

return promise;
}

public Class<? extends DatagramChannel> getProperSocketChannel() {
return Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;

public class CustomTransportServerInfo extends ServerInfo {
public class TcpTransportServerInfo extends ServerInfo {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static final ThreadFactory downstreamThreadFactory = new NamedThreadFactory("TCP-Downstream %s");
public static final EventLoopGroup downstreamLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(availableCPU, downstreamThreadFactory) : new NioEventLoopGroup(availableCPU, downstreamThreadFactory);

public static final String TYPE_IDENT = "tcp";
public static final ServerInfoType TYPE = ServerInfoType.builder()
.identifier(TYPE_IDENT)
.serverInfoFactory(CustomTransportServerInfo::new)
.serverInfoFactory(TcpTransportServerInfo::new)
.register();

public CustomTransportServerInfo(String serverName, InetSocketAddress address, InetSocketAddress publicAddress) {
public TcpTransportServerInfo(String serverName, InetSocketAddress address, InetSocketAddress publicAddress) {
super(serverName, address, publicAddress);
}

Expand Down

0 comments on commit ff732a7

Please sign in to comment.