Socket编程
参考
我的github源码实现
https://github.com/huangzhenshi/IO_NIO_NIO2Demo
代码精华
https://blog.csdn.net/anxpp/article/details/51512200
非常详细很深刻的IO socket博客专栏
http://blog.csdn.net/column/details/18326.html
阻塞式多线程TCPsocket编程原理
- ServerSocket创建TCP的服务器,调用阻塞式的accept()方法监听连接的客户端。
- 客户端和服务端通过Socket对象的输入流、输出流进行内容交互 socket.getInputStream()和socket.getOutputStream()
BIO
服务器端:用线程池去管理多个接入的客户端,把接收到的客户端Socket作为参数,用处理类在单独的线程里面和客户端交互
- 初始化服务端
- while(true)+ socket.accept的方式,阻塞式的等待新客户的接入
- 一旦有一个先Clinet接入的话,则从线程池中创建一个线程,来和Client交互,一般是读取信息,也可以输出固定的信息,比如服务器已经收到
- 工作子线程也是while(true)+buf.readLine(),阻塞和循环式的读取Clinet发过来的消息public static void main(String[] args) {ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);try {ServerSocket socket = new ServerSocket(5000);while (true) {Socket client = socket.accept();newFixedThreadPool.execute(new ClientHandler(client));}}catch (IOException ioe) {ioe.printStackTrace();}}
交互类:
客户端:
NIO
NIO server端的代码逻辑
- 创建和初始化Selector和ServerSocketChannel,给Server绑定Accept事件
- 主线程While(true) 从selector中读取发生的事件,一旦获取到就触发监听
- 如果阻塞过程中触发了事件,就遍历事件按照事件的类型来处理(连接事件、读取事件、写事件),处理完成删除
- 处理连接操作,就可以从key里面获取到Server,从Server.accept返回ClientChannel,就可以和客户端端交互了
服务端的操作(读取客户端的消息,和往客户端的Channel里面写消息)
- 获取到ClientChannel需要首先注册READ的监听,就可以读取到客户端发过来的消息,socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
- 也可以直接往clientChannel里面写内容, socketChannel.write(ByteBuffer.wrap((“welcome”.getBytes())));
客户端的操作:
- 初始化客户端和Selector,类似的绑定OP_CONNECT的监听,需要在监听事件中 finishConnect()的方法才算连接成功
- 连接成功之后,要在Client端绑定OP_READ监听,这样如果Server往里面写数据,就可以读取到。 socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer);
- 也可以直接往Channel里面写数据,这样服务端也可以收的到 channel.write(ByteBuffer.wrap(new String(“say hi from client”).getBytes()));
相关代码
服务端代码
public static void main(String[] args) throws IOException {Selector selector = Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(10083));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);ServerSelectorProtocol protocol = new ServerSelectorProtocol(BUFF_SIZE);while (true) {selector.select();Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();while (keyIter.hasNext()) {SelectionKey key = keyIter.next();keyIter.remove();if (key.isAcceptable()){protocol.handleAccept(key);}if (key.isReadable()){protocol.handleRead(key);}}}}服务端处理类ServerSelectorProtocol
public void handleAccept(SelectionKey key) throws IOException {System.out.println("Accept");//根据key.channel() 获取Server,再根据Server.accept()获取ClinetSocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();socketChannel.configureBlocking(false);//附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));System.out.println("create new session from "+socketChannel.getRemoteAddress()+"\n");//往ClinetChannle里面写数据socketChannel.write(ByteBuffer.wrap(("welcome".getBytes())));}public void handleRead(SelectionKey key) throws IOException {SocketChannel clntChan = (SocketChannel) key.channel();//获取该信道所关联的附件,这里为缓冲区ByteBuffer buf = (ByteBuffer) key.attachment();buf.clear();long bytesRead = clntChan.read(buf);//如果read()方法返回-1,说明客户端关闭了连接,那么客户端已经接收到了与自己发送字节数相等的数据,可以安全地关闭if (bytesRead == -1){clntChan.close();}else if(bytesRead > 0){buf.flip();String result = "";while(bytesRead>0){byte[] data = buf.array();result+=new String(data);bytesRead = clntChan.read(buf);}System.out.println(result);}}客户端代码
public static void main(String[] args) throws IOException {SocketChannel clientChannel = SocketChannel.open();Selector selector = Selector.open();ClientHandler handler=new ClientHandler();clientChannel.configureBlocking(false);clientChannel.connect(new InetSocketAddress("127.0.0.1", 10083));clientChannel.register(selector, SelectionKey.OP_CONNECT);while (true) {selector.select();Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();while (keyIter.hasNext()) {SelectionKey key = keyIter.next();keyIter.remove();if (key.isConnectable()){handler.handleConnect(key);}if (key.isReadable()){handler.handleRead(key);}}}}客户端Handler
public void handleConnect(SelectionKey key) throws IOException {SocketChannel channel=(SocketChannel)key.channel();//如果正在连接,则完成连接if(channel.isConnectionPending()){channel.finishConnect();}System.out.println("Connected");//ServerChannel的key,有accept接入SocketChannel socketChannel = (SocketChannel) key.channel();socketChannel.configureBlocking(false);//附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFF_SIZE));channel.write(ByteBuffer.wrap(new String("say hi from client").getBytes()));}public void handleRead(SelectionKey key) throws IOException {SocketChannel channel = (SocketChannel)key.channel();ByteBuffer buffer = ByteBuffer.allocate(200);channel.read(buffer);byte[] data = buffer.array();String message = new String(data);System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);}
AIO Socket编程
- AIO Socket编程支持2种方式,将来式和回调式,我演示的是回调式
- 参与的类比较多,但是职责单一,AcceptHandler监听客户的连接行为,ReadHandler监听客户的写入行为,WriteHandler没什么用,只是作为服务端向客户端写消息成功后,系统调用成功的反馈
- 因为服务端的行为很多都是要重复的,而回调函数都是一次性的,比如异步监听客户端的accept、read行为,所以每次回调之后都需要重复绑定
服务端:
- Server 服务创建和初始化AsynchronousServerSocketChannel,并且绑定AcceptHandler,监听客户的连接行为
- AcceptHandler 监听到客户连接后,获取到客户的AsynchronousSocketChannel,然后对客户绑定ReadHandler
- ReadHandler 监听客户往服务端发消息,读取到之后,会发起一个反馈消息,告知客户端已经收到你的消息,此时服务端写消息,需要绑定一个WriteHandler作为写入完成的反馈
- WriteHandler 因为服务端要往客户端写反馈消息,该写入操作需要有个异步回调操作,WriteHandler 里面输出 “写入完成”public class Server {public final static int PORT = 8001;public final static String IP = "127.0.0.1";public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));System.out.println("Server listen on "+PORT);server.accept(null,new AcceptHandler(server));while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {final ByteBuffer buffer = ByteBuffer.allocate(1024);private AsynchronousServerSocketChannel server = null;AcceptHandler(AsynchronousServerSocketChannel server){this.server=server;}@Overridepublic void completed(AsynchronousSocketChannel socket, Server attachment) {server.accept(null,this);try {System.out.println("有客户端连接:" + socket.getRemoteAddress().toString());} catch (IOException e1) {e1.printStackTrace();}startRead(socket);}@Overridepublic void failed(Throwable exc, Server attachment) {}public void startRead(AsynchronousSocketChannel socket) {ByteBuffer clientBuffer = ByteBuffer.allocate(1024);ReadHandler rd=new ReadHandler(socket);socket.read(clientBuffer, clientBuffer, rd);try {} catch (Exception e) {e.printStackTrace();}}}public class ReadHandler implements CompletionHandler<Integer,ByteBuffer>{private AsynchronousSocketChannel socket;public String msg;public ReadHandler(AsynchronousSocketChannel socket) {this.socket = socket;}private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();//回调函数里面的buf参数,就是Client写的内容,调用 msg=decoder.decode(buf).toString(); 读取客户端的内容@Overridepublic void completed(Integer i, ByteBuffer buf) {if (i > 0) {socket.read(buf, buf, this);buf.flip();try {msg=decoder.decode(buf).toString();System.out.println("收到" +socket.getRemoteAddress().toString() + "的消息:" + msg);buf.compact();} catch (CharacterCodingException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}try {write(socket);} catch (UnsupportedEncodingException ex) {Logger.getLogger(ReadHandler.class.getName()).log(Level.SEVERE, null, ex);}} else if (i == -1) {try {System.out.println("客户端断线:" + socket.getRemoteAddress().toString());buf = null;} catch (IOException e) {e.printStackTrace();}}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) { }public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{String sendString="server recieve your message:"+msg;ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));socket.write(clientBuffer, clientBuffer, new WriteHandler(socket));}}public class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {private AsynchronousSocketChannel socket;public WriteHandler(AsynchronousSocketChannel socket) {this.socket = socket;}@Overridepublic void completed(Integer i, ByteBuffer buf) {if (i > 0) {System.out.println("往客户端发送消息成功");} else if (i == -1) {try {System.out.println("对端断线:" + socket.getRemoteAddress().toString());buf = null;} catch (IOException e) {e.printStackTrace();}}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {}}
客户端
相对来说比较简单,直接用匿名内部类来解决,connect事件绑定一个回调,连接成功后写入一段话,写入成功后,再绑一个read的监听,来接受客户端的消息