Netty
参考博客
非常好的一个系列,有入门、有源码分析、有实战
https://blog.csdn.net/column/details/enjoynetty.html
偏实战
https://www.jianshu.com/u/e2d07947c112
Netty特性
- 相比NIO,更简洁的代码每个Handler处理自己的业务,解耦。封装好了2个线程池来高性能可控的处理并发业务。注册好对应的Handler之后,不需要重复在channel上重复绑定监听。
- 封装更多的通用功能,比如字符串拆包,心跳检测功能,字符串的编码和解码
- 更高级的ByteBuf
核心功能
- Server端创建的时候,绑定一个Selector的线程池和一个Worker的线程池
- Selector的线程池负责监听所有的事件,比如Accept,read,write等事件,一旦检测到事件,就提交到对应的Worker线程池去处理
- 根据自己的业务特点,在Initializer上绑定对应的Handler,然后处理对应的消息的读写和客户端的连接退出等功能
netty 高性能的原因
- 异步解耦: 监听客户端IO事件由boss线程组来处理,监听到的事件处理由worker线程组来做,异步的调用提升性能。
- 多线程优势:boss线程组可支持多个线程来支持不同客户端的IO行为,worker线程组支持多个线程来response客户的IO行为
传统NIO的服务端的代码:一个服务端只有一个线程,一个Selector,并且是同步串行的执行IO事件操作
Nettty实现聊天室Server
HelloServer初始化绑定一个HelloServerInitializer集群,里面配置业务的拦截器,然后在拦截器里面处理客户端的交互逻辑
HelloServer: 创建服务器和设置Handler群(设置工作模式,2种模式:监听线程和工作线程公用线程池、监听线程池和工作线程池分开)
public class HelloServer {private static final int portNumber = 7878;public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new HelloServerInitializer());b.bind(portNumber).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}HelloServerInitializer 设置Handler,这也是主要的开发类,根据业务规则配置拦截器,常用的封装好的拦截器很多
public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 以("\n")为结尾分割的 解码器pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());// 自己的逻辑Handlerpipeline.addLast("handler", new HelloServerHandler());}}HelloServerHandler 自定义的拦截器,处理各种客户端的消息读取和往客户端发消息
public class HelloServerHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(ctx.channel().remoteAddress() + " Say : " + msg);ctx.writeAndFlush("Received your message !\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");super.channelActive(ctx);}}
Netty的客户端
初始化,然后绑定HelloClientInitializer,和配置和实现对应的HelloClientHandler
粘包、半包、拆包的解决方案
- 在报文末尾增加换行符表明一条完整的消息,这样在接收端可以根据这个换行符来判断消息是否完整。
- 将消息分为消息头、消息体。可以在消息头中声明消息的长度,根据这个长度来获取报文(比如 808 协议)。
- 规定好报文长度,不足的空位补齐,取的时候按照长度截取即可。
字符串拆包工具类
使用了拆包工具类以后,即使一次性发送1.5个消息发2次,也可以获取到完整和独立的3条消息,或者一次性发送半个消息分2次,也可以获取到一个完整的消息。
LineBasedFrameDecoder
写消息的时候末位添加 \n,如果长度不超过设定的话,就自动识别为一条消息,如果一个消息的长度超过了预设值,也不会丢弃,会存在缓冲区一直到下个分隔符,才为一个完整的消息客户端写消息的时候ch.writeAndFlush(msg + "\n");服务端拦截器配置pipeline.addLast(new LineBasedFrameDecoder(1024));DelimiterBasedFrameDecoder
类似于前面的客户端写消息的时候ch.writeAndFlush(msg + "\n");服务端拦截器配置pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
Netty对心跳检测的支持 IdleStateHandler
- 作用:关闭掉超时未操作的客户端,或者是持续保持客户端和服务端的通讯正常。
核心构造函数:服务端的读操作,写操作,读写任意操作;例如 5,0,0;如果服务端5s内未接收到客户端的消息,就会触发其它Handler里面的userEventTriggered,然后判断Event的类型进行判断操作。
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}使用示例:
- ChannelPipe里面绑定IdleStateHandler,并设置好规则
- 自定义的HelloServerHandler里面去重写userEventTriggered方法(判断是读Idle还是写Idle)public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", new HelloServerHandler());}}public class HelloServerHandler extends SimpleChannelInboundHandler<String> {private int loss_connect_time = 0;@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {loss_connect_time++;System.out.println("5 秒没有接收到客户端的信息了");ctx.writeAndFlush( "please response ,no msg for 5 second \n");if (loss_connect_time > 2) {System.out.println("关闭这个不活跃的channel");ctx.writeAndFlush( "no msg,shut down you \n");ctx.channel().close();}}} else {super.userEventTriggered(ctx, evt);}}}
实现原理:
客户端active的时候,init方法会触发一个循环监听的任务
private void initialize(ChannelHandlerContext ctx) {switch (state) {case 1:case 2:return;}state = 1;initOutputChanged(ctx);//开始初始化时间戳lastReadTime = lastWriteTime = ticksInNanos();if (readerIdleTimeNanos > 0) {readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),readerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (writerIdleTimeNanos > 0) {writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),writerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (allIdleTimeNanos > 0) {allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),allIdleTimeNanos, TimeUnit.NANOSECONDS);}}时间戳随着交互而更新的操作,每当服务端读取客户端的一个消息,完成读取的时候,会重置lastReadTime为当前时间戳。
类似于面向切面编程的环绕,IdleStateHandler里面开始收到客户端的消息时,调用channelRead方法,当服务端处理完读操作的时候,会调用channelReadComplete方法,重置当前读取状态和最后读取消息的时间戳@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {reading = true;firstReaderIdleEvent = firstAllIdleEvent = true;}ctx.fireChannelRead(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {lastReadTime = ticksInNanos();reading = false;}ctx.fireChannelReadComplete();}
核心类
NioEventLoopGroup(定义了一个NioEventLoopd 的多线程池(其实是数组,不同于传统的线程池),默认数组的长度是CPUCore*2个线程,也可以指定)
EventExecutor[] children= new EventExecutor[nThreads];child[i]=newChild(executor, args);NioEventLoop 工作线程,每个NioEventLoop绑定一个Selector,会循环遍历IO事件,并且派发到worker线程池处理
每个NioEventLoop都独立拥有:一个任务队列、一个延迟任务队列、一个thread,并且每一个EventLoop都有一个属于自己的Executor执行器、一个selector