Netty介绍使用

一、Netty简介

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可拓展性在同类框架(比如Mina、Grizzly)中首屈一指,得到上百上千的商用项目检验。

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。它最牛逼的地方在于简化了网络编程规范,例如TCP和UDP的Socket服务。

二、Netty的优势

  1. API使用简单,开发门槛低;

  2. 功能强大,预置了多种编解码功能,支持多种主流协议;

  3. 定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;

  4. 性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;

  5. 成熟,稳定,Netty修复了已经发现的所有JDK NIO的bug,业务开发人员不需要再为NIO的bug烦恼;

  6. 社区活跃,版本迭代周期短,发现的bug可以被及时修复,同时,更多的新功能也会加入;

  7. 经历了大规模的商业应用考验,质量得到验证,Netty在互联网、大数据、网络游戏、企业应用、电信软件等众多行业已经得到了成功商用,证明它已经能够满足不同行业的商业应用了;

三、代码实例

3.1 服务端

TimeServer.java

package com.wangjun.io.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeServer {

    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组,一个用于接收客户端的链接,另一个用于进行SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 阻塞main线程,等待服务端监听端口关闭之后main函数才退出
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            //pipeline是一个管理ChannelHandler列表,间接处理channel读写
             arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeServer().bind(port);
    }
}

TimeServerHandler.java

package com.wangjun.io.netty;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("服务器收到的命令是:" + body);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString()
                : "BADORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

3.2 客户端

TimeClient.java

package com.wangjun.io.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeClient {

    public void connect(int port, String host) throws Exception {
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeClient().connect(port, "127.0.0.1");
    }

}

TimeClientHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(req.length);//?????????
        firstMessage.writeBytes(req);
    }

    /*
     * 当客户端和服务端TCP链路建立成功之后调用此方法,发送消息给服务端
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);
    }

    /*
     * 当服务端返回应答消息时调用此方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("unexpected exception from downstream:" + cause.getMessage());
        ctx.close();
    }
}

3.3 运行结果

客户端

Now is:Fri Apr 03 09:47:30 CST 2020

服务端

服务器收到的命令是:QUERY TIME ORDER

四、解决TCP的粘包拆包问题

4.1 发生原因

当发生以下情况下,TCP会发生粘包拆包:

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包;

  • 待发送的数据大于MSS(最大报文长度),TCP在传输前将进行拆包;

  • 要发送的数据小于TCP缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包;

  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包;

4.2 解决办法

解决粘包拆包的办法:

  • 消息定长,累计读到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息;将计数器置位,重新开始读取下一个数据报;

  • 将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;

  • 将特殊的分隔符作为消息结束符,回车换行符就是一种特殊的结束分隔符;

  • 将消息分为消息头和消息体,消息头中包含数据包的长度,这样在接收端通过读取消息头长度字段便知道每个数据包的实际长度了。

4.3 netty解决方案

为了解决TCP的粘包拆包问题,netty默认提供了多种编解码器用于处理半包,分别对应上面解决TCP粘包拆包的办法,掌握这些,TCP粘包问题就变的非常容易。

4.3.1 反面实例

先来看一下产生TCP粘包的现象,我们改一下TimeClientHandlerTimeServerHandler的代码如下,期望是客户端发送100个指令,服务端也收到100个指令并返回结果给客户端:

TimeClientHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    /*
     * 当客户端和服务端TCP链路建立成功之后调用此方法,发送消息给服务端
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for(int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    /*
     * 当服务端返回应答消息时调用此方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is:" + body + ", and the counter is: " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("unexpected exception from downstream:" + cause.getMessage());
        ctx.close();
    }
}

TimeServerHandler.java

package com.wangjun.io.netty;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeServerHandler extends ChannelHandlerAdapter { 
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
        System.out.println("服务器收到的命令是:" + body + ";, the counter is :" + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString()
                : "BADORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

运行服务端和客户端:

# 服务端结果:
服务器收到的命令是:QUERY TIME ORDER
QUERY TIME ORDER
......(省略掉54个 QUERY TIME ORDER)
QUERY TIME ORD;, the counter is :1
服务器收到的命令是:
QUERY TIME ORDER
......(省略掉41个 QUERY TIME ORDER)
QUERY TIME ORDER;, the counter is :2

# 客户端结果
Now is:BADORDERBADORDER, and the counter is: 1

服务端运行结果表明它只接收到了两条消息,第一条包含57条” QUERY TIME ORDER” 指令, 第二条包含了43条” QUERY TIME ORDER” 指令,总数正好是100条。 我们期待的是收到100条消息,每条包含 一条” QUERY TIME ORDER” 指令。这说明发生了TCP粘包。

按照设计初衷,客户端应该收到100条当前时间的消息,但实际上只收到了1条错误指令,这是因为服务端只收到两条请求,所以服务端发送了两个应答消息,但实际上客户端只收到了1条,说明服务端返回的应答消息也发生了粘包。

4.3.2 LineBasedFrameDecoder

下面我们使用LineBaseFrameDecoderStringDecoder解决TCP粘包问题。

TimeServer.java

package com.wangjun.io.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeServer {

    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组,一个用于接收客户端的链接,另一个用于进行SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 阻塞main线程,等待服务端监听端口关闭之后main函数才退出
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            //这里增加了两个ChannelHandler
            arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeServer().bind(port);
    }
}

TimeServerHandler.java

package com.wangjun.io.netty;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeServerHandler extends ChannelHandlerAdapter { 
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("服务器收到的命令是:" + body + ";, the counter is :" + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString()
                : "BADORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

TimeClient.java

package com.wangjun.io.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeClient {

    public void connect(int port, String host) throws Exception {
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //这里增加了两个ChannelHandler
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeClient().connect(port, "127.0.0.1");
    }

}

TimeClientHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    /*
     * 当客户端和服务端TCP链路建立成功之后调用此方法,发送消息给服务端
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for(int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    /*
     * 当服务端返回应答消息时调用此方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("Now is:" + body + ", and the counter is: " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("unexpected exception from downstream:" + cause.getMessage());
        ctx.close();
    }
}

运行服务端和客户端:

服务器收到的命令是:QUERY TIME ORDER;, the counter is :1
......(省略98个打印日志)
服务器收到的命令是:QUERY TIME ORDER;, the counter is :100
Now is:Mon Apr 13 15:03:37 CST 2020, and the counter is: 1
......(省略98个打印日志)
Now is:Mon Apr 13 15:03:37 CST 2020, and the counter is: 100

可以看到使用LineBaseFrameDecoderStringDecoder完美解决了TCP粘包问题。

解决TCP粘包原理

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断是否有\n或者\r\n,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。LineBasedFrameDecoder+baseDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

4.3.3 DelimiterBasedDecoder

DelimiterBasedDecoder可以自动完成以分隔符作为码流结束标志的消息解码,下面我们使用$_作为分隔符演示DelimiterBasedDecoder的使用方法。以经典的Echo服务为例,Server接收到Client的请求消息后,将其打印出来,然后将原始消息返回给客户端。

服务端

EchoServer.java

package com.wangjun.io.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoServer {

    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组,一个用于接收客户端的链接,另一个用于进行SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 阻塞main线程,等待服务端监听端口关闭之后main函数才退出
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
            arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
            arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new EchoServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer().bind(port);
    }
}

EchoServerHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoServerHandler extends ChannelHandlerAdapter { 
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("服务器收到的命令是:" + body + ", the counter is :" + ++counter);
        body += "$_";
        ByteBuf resp = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

EchoClient.java

package com.wangjun.io.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoClient {

    public void connect(int port, String host) throws Exception {
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new EchoClientHandler());
                }
            });
            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoClient().connect(port, "127.0.0.1");
    }

}

EchoClientHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoClientHandler extends ChannelHandlerAdapter {
    private int counter;

    public EchoClientHandler() {
    }

    /*
     * 当客户端和服务端TCP链路建立成功之后调用此方法,发送消息给服务端
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i = 0; i < 10; i++) {
            String msg = "Commend-" + i + "$_";
            ctx.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes()));
        }
    }

    /*
     * 当服务端返回应答消息时调用此方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("收到服务器消息:" + body + ", and the counter is: " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("unexpected exception from downstream:" + cause.getMessage());
        ctx.close();
    }
}

分别启动服务端和客户端打印结果如下:

服务器收到的命令是:Commend-0, the counter is :1
服务器收到的命令是:Commend-1, the counter is :2
服务器收到的命令是:Commend-2, the counter is :3
服务器收到的命令是:Commend-3, the counter is :4
服务器收到的命令是:Commend-4, the counter is :5
服务器收到的命令是:Commend-5, the counter is :6
服务器收到的命令是:Commend-6, the counter is :7
服务器收到的命令是:Commend-7, the counter is :8
服务器收到的命令是:Commend-8, the counter is :9
服务器收到的命令是:Commend-9, the counter is :10

收到服务器消息:Commend-0, and the counter is: 1
收到服务器消息:Commend-1, and the counter is: 2
收到服务器消息:Commend-2, and the counter is: 3
收到服务器消息:Commend-3, and the counter is: 4
收到服务器消息:Commend-4, and the counter is: 5
收到服务器消息:Commend-5, and the counter is: 6
收到服务器消息:Commend-6, and the counter is: 7
收到服务器消息:Commend-7, and the counter is: 8
收到服务器消息:Commend-8, and the counter is: 9
收到服务器消息:Commend-9, and the counter is: 10

可以看到DelimiterBasedDecoder以及处理好TCP的粘包。

4.3.4 FixedLengthFrameDecoder

FixedLengthFrameDecoder是规定长度解码器。它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP包的粘包/拆包问题,非常实用。

服务端

EchoServer.java

package com.wangjun.io.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoServer {

    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组,一个用于接收客户端的链接,另一个用于进行SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 阻塞main线程,等待服务端监听端口关闭之后main函数才退出
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new FixedLengthFrameDecoder(20));
            arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new EchoServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer().bind(port);
    }
}

EchoServerHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoServerHandler extends ChannelHandlerAdapter { 

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("服务器收到的命令是:" + body);
        ByteBuf resp = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

EchoClient.java

package com.wangjun.io.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoClient {

    public void connect(int port, String host) throws Exception {
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new EchoClientHandler());
                }
            });
            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoClient().connect(port, "127.0.0.1");
    }

}

EchoClientHandler.java

package com.wangjun.io.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author wangjun
 * @date 2020-04-02
 * @version 1.0
 */
public class EchoClientHandler extends ChannelHandlerAdapter {

    public EchoClientHandler() {
    }

    /*
     * 当客户端和服务端TCP链路建立成功之后调用此方法,发送消息给服务端
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String msg = "12345678901234567890123";
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes()));
        System.out.println("客户端发送的消息:" + msg);
    }

    /*
     * 当服务端返回应答消息时调用此方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("收到服务器消息:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("unexpected exception from downstream:" + cause.getMessage());
        ctx.close();
    }
}

运行服务端和客户端结果如下:

服务器收到的命令是:12345678901234567890

客户端发送的消息:12345678901234567890123
收到服务器消息:12345678901234567890

可以看到服务端只截取了前20个字节的长度。

4.4 编码器总结

  • LineBasedFrameDecoder:以换行符为结束符对消息解码;

  • DelimiterBasedDecoder:以指定的特殊字符为结束符对消息解码;

  • FixedLengthFrameDecoder:获取指定的长度的消息;

使用这三种解码器再结合其他解码器比如字符串解码器,就可以轻松完成对消息的自动编码,而且不再需要考虑TCP粘包拆包问题,极大地提升了开发效率。

Last updated