Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/org/nethergames/proxytransport/compression/ProxyTransportCompressionCodec.java
#	src/main/java/org/nethergames/proxytransport/impl/TransportChannelInitializer.java
#	src/main/java/org/nethergames/proxytransport/impl/TransportClientConnection.java
  • Loading branch information
TobiasGrether committed Feb 12, 2024
2 parents 19eeae2 + 72223ec commit 2b3a3c7
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.nethergames.proxytransport.compression;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import org.cloudburstmc.protocol.bedrock.netty.BedrockBatchWrapper;

import java.util.List;

public class FrameIdCodec extends MessageToMessageCodec<ByteBuf, BedrockBatchWrapper> {
public static final String NAME = "frame-id-codec";

@Override
protected void encode(ChannelHandlerContext ctx, BedrockBatchWrapper msg, List<Object> out) throws Exception {
out.add(msg.getCompressed().retain());
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(BedrockBatchWrapper.newInstance(msg.readRetainedSlice(msg.readableBytes()), null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.cloudburstmc.protocol.bedrock.data.CompressionAlgorithm;
import org.cloudburstmc.protocol.bedrock.data.PacketCompressionAlgorithm;
import org.cloudburstmc.protocol.bedrock.netty.BedrockBatchWrapper;
import org.cloudburstmc.protocol.bedrock.netty.codec.compression.BatchCompression;
import org.cloudburstmc.protocol.bedrock.netty.codec.compression.CompressionStrategy;

import java.util.List;
Expand All @@ -23,28 +25,31 @@ protected void encode(ChannelHandlerContext ctx, BedrockBatchWrapper msg, List<O
if (msg.getCompressed() == null && msg.getUncompressed() == null) {
throw new IllegalStateException("Batch was not encoded before");
} else if (msg.getCompressed() != null && !msg.isModified()) { // already compressed
if (this.prefixed) {
this.onPassedThrough(ctx, msg);
out.add(msg.retain());
} else { // we need to prefix the compressed data
if (!this.prefixed) { // we need to prefix the compressed data
CompositeByteBuf buf = ctx.alloc().compositeDirectBuffer(2);
buf.addComponent(true, ctx.alloc().ioBuffer(1).writeByte(getCompressionHeader(msg.getAlgorithm())));
buf.addComponent(true, msg.getCompressed().retainedSlice());

this.onPassedThrough(ctx, msg);
out.add(buf.retain());
msg.setCompressed(buf, msg.getAlgorithm());
}

this.onPassedThrough(ctx, msg);
out.add(msg.retain());
} else {
ByteBuf compressed = this.zstdCompression.encode(ctx, msg.getUncompressed());
BatchCompression compression = this.getStrategy().getCompression(msg);
if (!compression.getAlgorithm().equals(PacketCompressionAlgorithm.NONE)) {
compression = this.zstdCompression;
}

ByteBuf compressed = compression.encode(ctx, msg.getUncompressed());

try {
ByteBuf outBuf;

outBuf = ctx.alloc().ioBuffer(1 + compressed.readableBytes());
outBuf.writeByte(this.getCompressionHeader(this.zstdCompression.getAlgorithm()));
outBuf.writeByte(this.getCompressionHeader(compression.getAlgorithm()));
outBuf.writeBytes(compressed);

msg.setCompressed(outBuf, this.zstdCompression.getAlgorithm());
msg.setCompressed(outBuf, compression.getAlgorithm());
} finally {
compressed.release();
}
Expand All @@ -59,14 +64,14 @@ protected byte getCompressionHeader0(CompressionAlgorithm algorithm) {
return -2;
}

return -1;
return super.getCompressionHeader0(algorithm);
}

protected CompressionAlgorithm getCompressionAlgorithm0(byte header) {
if (header == -2) {
return ProxyTransportAlgorithm.ZSTD;
}

return null;
return super.getCompressionAlgorithm0(header);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.waterdog.waterdogpe.network.connection.client.ClientConnection;
import dev.waterdog.waterdogpe.network.connection.codec.batch.BedrockBatchDecoder;
import dev.waterdog.waterdogpe.network.connection.codec.batch.BedrockBatchEncoder;
import dev.waterdog.waterdogpe.network.connection.codec.client.ClientPacketQueue;
import dev.waterdog.waterdogpe.network.connection.codec.compression.CompressionType;
import dev.waterdog.waterdogpe.network.connection.codec.packet.BedrockPacketCodec;
import dev.waterdog.waterdogpe.network.serverinfo.ServerInfo;
Expand All @@ -17,6 +18,8 @@
import org.cloudburstmc.netty.channel.raknet.RakChannel;
import org.cloudburstmc.netty.channel.raknet.config.RakChannelOption;
import org.cloudburstmc.netty.channel.raknet.config.RakMetrics;
import org.cloudburstmc.protocol.bedrock.netty.codec.compression.CompressionCodec;
import org.nethergames.proxytransport.compression.FrameIdCodec;
import org.nethergames.proxytransport.compression.ProxyTransportCompressionCodec;
import org.nethergames.proxytransport.integration.CustomClientEventHandler;

Expand Down Expand Up @@ -56,14 +59,15 @@ protected void initChannel(Channel channel) {
.addLast(FRAME_DECODER, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addLast(FRAME_ENCODER, new LengthFieldPrepender(4));


ClientConnection connection = this.createConnection(channel);
channel.pipeline()
.addLast(ProxyTransportCompressionCodec.NAME, new ProxyTransportCompressionCodec(getCompressionStrategy(compression, rakVersion, true), false))
.addLast(FrameIdCodec.NAME, new FrameIdCodec())
.addLast(CompressionCodec.NAME, new ProxyTransportCompressionCodec(getCompressionStrategy(compression, rakVersion, true), false))
.addLast(BedrockBatchDecoder.NAME, BATCH_DECODER)
.addLast(BedrockBatchEncoder.NAME, new BedrockBatchEncoder())
.addLast(BedrockPacketCodec.NAME, getPacketCodec(rakVersion));
.addLast(BedrockPacketCodec.NAME, getPacketCodec(rakVersion))
.addLast(ClientPacketQueue.NAME, new ClientPacketQueue());

ClientConnection connection = this.createConnection(channel);
if (connection instanceof ChannelHandler handler) { // For reference: This will take care of the packets received being handled.
channel.pipeline().addLast(ClientConnection.NAME, handler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import org.cloudburstmc.protocol.bedrock.codec.BedrockCodecHelper;
import org.cloudburstmc.protocol.bedrock.netty.BedrockBatchWrapper;
import org.cloudburstmc.protocol.bedrock.netty.BedrockPacketWrapper;
import org.cloudburstmc.protocol.bedrock.netty.codec.FrameIdCodec;
import org.cloudburstmc.protocol.bedrock.netty.codec.compression.CompressionCodec;
import org.cloudburstmc.protocol.bedrock.netty.codec.compression.CompressionStrategy;
import org.cloudburstmc.protocol.bedrock.packet.BedrockPacket;
import org.cloudburstmc.protocol.bedrock.packet.NetworkStackLatencyPacket;
import org.cloudburstmc.protocol.bedrock.packet.TickSyncPacket;
import org.nethergames.proxytransport.compression.FrameIdCodec;
import org.nethergames.proxytransport.compression.ProxyTransportCompressionCodec;

import javax.crypto.SecretKey;
Expand Down Expand Up @@ -105,26 +105,36 @@ private void onBedrockBatch(@NonNull BedrockBatchWrapper batch) {
}
}

private boolean increaseRateLimit(int value) {
packetSendingLimit.set(this.packetSendingLimit.get() + value);

if (packetSendingLimit.get() >= MAX_UPSTREAM_PACKETS) {
if (packetSendingLock.compareAndSet(false, true)) {
getPlayer().getLogger().warning(getPlayer().getName() + " sent too many packets (" + packetSendingLimit.get() + "/s), disconnecting.");
getPlayer().getConnection().disconnect("§cToo many packets!");
}
} else return !packetSendingLock.get();

return false;
}

@Override
public void sendPacket(BedrockPacket packet) {
this.sendPacket(BedrockBatchWrapper.create(getSubClientId(), packet));
if (this.increaseRateLimit(1)) {
super.sendPacket(packet);
}
}

@Override
public void sendPacketImmediately(BedrockPacket packet) {
this.sendPacket(BedrockBatchWrapper.create(getSubClientId(), packet));
if (this.increaseRateLimit(1)) {
super.sendPacketImmediately(packet);
}
}

@Override
public void sendPacket(BedrockBatchWrapper wrapper) {
packetSendingLimit.set(this.packetSendingLimit.get() + wrapper.getPackets().size());

if (packetSendingLimit.get() >= MAX_UPSTREAM_PACKETS) {
if (packetSendingLock.compareAndSet(false, true)) {
getPlayer().getLogger().warning(getPlayer().getName() + " sent too many packets (" + packetSendingLimit.get() + "/s), disconnecting.");
getPlayer().getConnection().disconnect("§cToo many packets!");
}
} else if (!packetSendingLock.get()) {
if (this.increaseRateLimit(wrapper.getPackets().size())) {
super.sendPacket(wrapper);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,18 @@ public Future<ClientConnection> createConnection(ProxiedPlayer proxiedPlayer) {
EventLoop eventLoop = proxiedPlayer.getProxy().getWorkerEventLoopGroup().next();
Promise<ClientConnection> promise = eventLoop.newPromise();

Bootstrap b = new Bootstrap()
new Bootstrap()
.group(downstreamLoopGroup)
.handler(new TransportChannelInitializer(proxiedPlayer, this, promise))
.localAddress(new InetSocketAddress("0.0.0.0", 0))
.channel(getProperSocketChannel())
.remoteAddress(this.getAddress());

b.connect().addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
future.channel().close();
}
});
.remoteAddress(this.getAddress())
.connect().addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
future.channel().close();
}
});

return promise;
}
Expand Down

This file was deleted.

0 comments on commit 2b3a3c7

Please sign in to comment.