Skip to content

4 Socket

BladeCode edited this page Nov 23, 2019 · 1 revision

基于 Socket 的 Netty 程序,是 Netty 应用最广泛的场景,对于高性能,高并发的支持是非常的棒,通过下面的示例来展示 Netty 在 Socket 中的应用

服务端

服务启动类,程序入口

public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        // handler() 与 childHandler() 的区别
        // 服务端可以使用 handler() 或者 childHandler(),而对于客户端,一般只使用handler()
        // handler()对于处理的 bossGroup 相关的信息,比如链接后,输出日志
        // childHandler() 是指连接丢给 workerGroup 之后,对 workerGroup 的 NIO 线程作用
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                .childHandler(new SocketServerInitializer());
        ChannelFuture channelFuture = serverBootstrap.bind(3333).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

自定义服务初始化

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast("server-lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    pipeline.addLast("server-lengthFieldPrepender", new LengthFieldPrepender(4));
    pipeline.addLast("server-stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    pipeline.addLast("server-stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
    // 自定义服务处理器
    pipeline.addLast("SocketServerHandler", new SocketServerHandler());
}

自定义服务处理器

/**
 * 读取客户端发送过来的请求,并向客户端发送响应
 *
 * @param ctx 处理上下文
 * @param msg Http对象
 * @throws Exception 异常
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

    System.out.println("客户端远程地址:" + ctx.channel().remoteAddress());
    // 接收到客户端的消息 msg,这里是 String 类型
    System.out.println("接收到客户端的消息:" + msg);
    // 方法:writeAndFlush() = write() + flush()
    // write():写
    // flush():把缓冲清理掉
    // writeAndFlush():写并且把缓冲清理掉,常用该方法
    ctx.channel().writeAndFlush("来之服务端的响应!" + UUID.randomUUID());
    }
}

/**
 * 发生异常时,关闭连接
 *
 * @param ctx 处理上下文
 * @param cause 原因
 * @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    super.exceptionCaught(ctx, cause);
    ctx.close();
}

客户端

public static void main(String[] args) throws InterruptedException {
    // 因为客户端,只需要一个时间循环组来和服务端建立连接
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new SocketClientInitializer());

        ChannelFuture channelFuture = bootstrap.connect("localhost", 3333).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
        eventLoopGroup.shutdownGracefully();
    }
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast("client-lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    pipeline.addLast("client-lengthFieldPrepender", new LengthFieldPrepender(4));
    pipeline.addLast("client-stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    pipeline.addLast("client-stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
    // 自定义服务处理器
    pipeline.addLast("SocketClientHandler", new SocketClientHandler());
}
/**
 * 读取服务端的响应
 *
 * @param ctx 处理上下文
 * @param msg Http对象
 * @throws Exception 异常
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

    System.out.println("服务器地址:" + ctx.channel().remoteAddress());
    // 服务器向客户端发生的数据
    System.out.println("接收到服务端的消息:" + msg);
    // 返回给服务端的信息
    ctx.writeAndFlush("来自客户端的问候!" + LocalDateTime.now());
    }
}

/**
 * 模拟客户端连接服务端之后,向服务端发送数据
 * channelActive表示通道已经连接
 *
 * @param ctx ctx
 * @throws Exception 异常
 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("来自客服端的问候");
}

/**
 * 发生异常时,关闭连接
 *
 * @param ctx 处理上下文
 * @param cause 原因
 * @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    super.exceptionCaught(ctx, cause);
    ctx.close();
}

Chat 示例

一对多(一台服务,与 N 个客户端进行连接,且 N 个客户端直接可进行互相通信)

  1. 第一台客户端(A)与服务器(S)建立连接,未发生任何事情
  2. 第二台客户端(B)与服务器(S)建立连接后,S 打印:B 已上线,同时 S 通知 A : B 已上线【A在线情况下】
  3. 第三台客户端(C)与服务器(S)建立连接后,S 打印:C 已上线,同时 S 分别通知 A,B:C 已上线【A,B 都在线情况下】

关于 Socket 的简单场景示例 chat

Clone this wiki locally