Contents
  1. 1. 参考博客
  2. 2. Netty特性
  3. 3. 核心功能
  4. 4. netty 高性能的原因
  5. 5. Nettty实现聊天室Server
  6. 6. Netty的客户端
  7. 7. 粘包、半包、拆包的解决方案
  8. 8. 字符串拆包工具类
  9. 9. Netty对心跳检测的支持 IdleStateHandler
    1. 9.1. 实现原理:
  10. 10. 核心类

参考博客

非常好的一个系列,有入门、有源码分析、有实战
https://blog.csdn.net/column/details/enjoynetty.html

偏实战
https://www.jianshu.com/u/e2d07947c112

Netty特性

  1. 相比NIO,更简洁的代码每个Handler处理自己的业务,解耦。封装好了2个线程池来高性能可控的处理并发业务。注册好对应的Handler之后,不需要重复在channel上重复绑定监听。
  2. 封装更多的通用功能,比如字符串拆包,心跳检测功能,字符串的编码和解码
  3. 更高级的ByteBuf

核心功能

  • Server端创建的时候,绑定一个Selector的线程池和一个Worker的线程池
  • Selector的线程池负责监听所有的事件,比如Accept,read,write等事件,一旦检测到事件,就提交到对应的Worker线程池去处理
  • 根据自己的业务特点,在Initializer上绑定对应的Handler,然后处理对应的消息的读写和客户端的连接退出等功能

netty 高性能的原因

  1. 异步解耦: 监听客户端IO事件由boss线程组来处理,监听到的事件处理由worker线程组来做,异步的调用提升性能。
  2. 多线程优势:boss线程组可支持多个线程来支持不同客户端的IO行为,worker线程组支持多个线程来response客户的IO行为

传统NIO的服务端的代码:一个服务端只有一个线程,一个Selector,并且是同步串行的执行IO事件操作

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSelectorProtocol protocol = new ServerSelectorProtocol(BUFF_SIZE);
while (true) {
selector.select();
Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
while (keyIter.hasNext()) {
SelectionKey key = keyIter.next();
keyIter.remove();
if (key.isAcceptable()){
protocol.handleAccept(key);
}
if (key.isReadable()){
protocol.handleRead(key);
}
}
}

Nettty实现聊天室Server

HelloServer初始化绑定一个HelloServerInitializer集群,里面配置业务的拦截器,然后在拦截器里面处理客户端的交互逻辑

  1. HelloServer: 创建服务器和设置Handler群(设置工作模式,2种模式:监听线程和工作线程公用线程池、监听线程池和工作线程池分开)

    public class HelloServer {
    private static final int portNumber = 7878;
    public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup);
    b.channel(NioServerSocketChannel.class);
    b.childHandler(new HelloServerInitializer());
    b.bind(portNumber).sync().channel().closeFuture().sync();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }
  2. HelloServerInitializer 设置Handler,这也是主要的开发类,根据业务规则配置拦截器,常用的封装好的拦截器很多

    public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    // 以("\n")为结尾分割的 解码器
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());
    // 自己的逻辑Handler
    pipeline.addLast("handler", new HelloServerHandler());
    }
    }
  3. HelloServerHandler 自定义的拦截器,处理各种客户端的消息读取和往客户端发消息

    public class HelloServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println(ctx.channel().remoteAddress() + " Say : " + msg);
    ctx.writeAndFlush("Received your message !\n");
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
    ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
    super.channelActive(ctx);
    }
    }

Netty的客户端

初始化,然后绑定HelloClientInitializer,和配置和实现对应的HelloClientHandler

public class HelloClient {
public static String host = "127.0.0.1";
public static int port = 7878;
public Channel ch=null;
public EventLoopGroup group = new NioEventLoopGroup();;
public void start() throws Exception{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new HelloClientInitializer());
ch = b.connect(host, port).sync().channel();
}
public void write(String msg){
ch.writeAndFlush(msg + "\n");
}
public void shutdown(){
group.shutdownGracefully();
}
}
public class HelloClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 客户端的逻辑
pipeline.addLast("handler", new HelloClientHandler());
}
}
public class HelloClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Server say : " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close ");
super.channelInactive(ctx);
}
}

粘包、半包、拆包的解决方案

  1. 在报文末尾增加换行符表明一条完整的消息,这样在接收端可以根据这个换行符来判断消息是否完整。
  2. 将消息分为消息头、消息体。可以在消息头中声明消息的长度,根据这个长度来获取报文(比如 808 协议)。
  3. 规定好报文长度,不足的空位补齐,取的时候按照长度截取即可。

字符串拆包工具类

使用了拆包工具类以后,即使一次性发送1.5个消息发2次,也可以获取到完整和独立的3条消息,或者一次性发送半个消息分2次,也可以获取到一个完整的消息。

  1. LineBasedFrameDecoder
    写消息的时候末位添加 \n,如果长度不超过设定的话,就自动识别为一条消息,如果一个消息的长度超过了预设值,也不会丢弃,会存在缓冲区一直到下个分隔符,才为一个完整的消息

    客户端写消息的时候
    ch.writeAndFlush(msg + "\n");
    服务端拦截器配置
    pipeline.addLast(new LineBasedFrameDecoder(1024));
  2. DelimiterBasedFrameDecoder
    类似于前面的

    客户端写消息的时候
    ch.writeAndFlush(msg + "\n");
    服务端拦截器配置
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

Netty对心跳检测的支持 IdleStateHandler

  1. 作用:关闭掉超时未操作的客户端,或者是持续保持客户端和服务端的通讯正常。
  2. 核心构造函数:服务端的读操作,写操作,读写任意操作;例如 5,0,0;如果服务端5s内未接收到客户端的消息,就会触发其它Handler里面的userEventTriggered,然后判断Event的类型进行判断操作。

    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }
  3. 使用示例:

  • ChannelPipe里面绑定IdleStateHandler,并设置好规则
  • 自定义的HelloServerHandler里面去重写userEventTriggered方法(判断是读Idle还是写Idle)
    public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("handler", new HelloServerHandler());
    }
    }
    public class HelloServerHandler extends SimpleChannelInboundHandler<String> {
    private int loss_connect_time = 0;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
    IdleStateEvent event = (IdleStateEvent) evt;
    if (event.state() == IdleState.READER_IDLE) {
    loss_connect_time++;
    System.out.println("5 秒没有接收到客户端的信息了");
    ctx.writeAndFlush( "please response ,no msg for 5 second \n");
    if (loss_connect_time > 2) {
    System.out.println("关闭这个不活跃的channel");
    ctx.writeAndFlush( "no msg,shut down you \n");
    ctx.channel().close();
    }
    }
    } else {
    super.userEventTriggered(ctx, evt);
    }
    }
    }

实现原理:

  1. 客户端active的时候,init方法会触发一个循环监听的任务

    private void initialize(ChannelHandlerContext ctx) {
    switch (state) {
    case 1:
    case 2:
    return;
    }
    state = 1;
    initOutputChanged(ctx);
    //开始初始化时间戳
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
    readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
    writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
    allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    }
  2. 时间戳随着交互而更新的操作,每当服务端读取客户端的一个消息,完成读取的时候,会重置lastReadTime为当前时间戳。
    类似于面向切面编程的环绕,IdleStateHandler里面开始收到客户端的消息时,调用channelRead方法,当服务端处理完读操作的时候,会调用channelReadComplete方法,重置当前读取状态和最后读取消息的时间戳

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
    reading = true;
    firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
    lastReadTime = ticksInNanos();
    reading = false;
    }
    ctx.fireChannelReadComplete();
    }

核心类

  1. NioEventLoopGroup(定义了一个NioEventLoopd 的多线程池(其实是数组,不同于传统的线程池),默认数组的长度是CPUCore*2个线程,也可以指定)

    EventExecutor[] children= new EventExecutor[nThreads];
    child[i]=newChild(executor, args);
  2. NioEventLoop 工作线程,每个NioEventLoop绑定一个Selector,会循环遍历IO事件,并且派发到worker线程池处理
    每个NioEventLoop都独立拥有:一个任务队列、一个延迟任务队列、一个thread,并且每一个EventLoop都有一个属于自己的Executor执行器、一个selector

Contents
  1. 1. 参考博客
  2. 2. Netty特性
  3. 3. 核心功能
  4. 4. netty 高性能的原因
  5. 5. Nettty实现聊天室Server
  6. 6. Netty的客户端
  7. 7. 粘包、半包、拆包的解决方案
  8. 8. 字符串拆包工具类
  9. 9. Netty对心跳检测的支持 IdleStateHandler
    1. 9.1. 实现原理:
  10. 10. 核心类