Contents
  1. 1. 参考
  2. 2. 阻塞式多线程TCPsocket编程原理
  3. 3. BIO
  4. 4. NIO
    1. 4.1. 相关代码
  5. 5. AIO Socket编程
    1. 5.1. 服务端:
    2. 5.2. 客户端

参考

我的github源码实现
https://github.com/huangzhenshi/IO_NIO_NIO2Demo

代码精华
https://blog.csdn.net/anxpp/article/details/51512200

非常详细很深刻的IO socket博客专栏
http://blog.csdn.net/column/details/18326.html

阻塞式多线程TCPsocket编程原理

  • ServerSocket创建TCP的服务器,调用阻塞式的accept()方法监听连接的客户端。
  • 客户端和服务端通过Socket对象的输入流、输出流进行内容交互 socket.getInputStream()和socket.getOutputStream()

BIO

服务器端:用线程池去管理多个接入的客户端,把接收到的客户端Socket作为参数,用处理类在单独的线程里面和客户端交互

  1. 初始化服务端
  2. while(true)+ socket.accept的方式,阻塞式的等待新客户的接入
  3. 一旦有一个先Clinet接入的话,则从线程池中创建一个线程,来和Client交互,一般是读取信息,也可以输出固定的信息,比如服务器已经收到
  4. 工作子线程也是while(true)+buf.readLine(),阻塞和循环式的读取Clinet发过来的消息
    public static void main(String[] args) {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
    try {
    ServerSocket socket = new ServerSocket(5000);
    while (true) {
    Socket client = socket.accept();
    newFixedThreadPool.execute(new ClientHandler(client));
    }
    }
    catch (IOException ioe) {
    ioe.printStackTrace();
    }
    }

交互类:

public class ClientHandler implements Runnable {
private Socket client = null;
private String address;
//通过构造函数注入接收到的客户端Socket对象
public ClientHandler(Socket client) {
this.client = client;
}
@Override
public void run() {
try {
String host = client.getInetAddress().toString();
String port = Integer.toString(client.getPort());
PrintStream out = new PrintStream(client.getOutputStream());
System.out.println("Get Connection");
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
address = host + ":" + port;
System.out.println("get connection from" + address);
while (true) {
//阻塞式遍历读取客户端发过来的消息
String str = buf.readLine();
System.out.println(str);
if (str != null) {
out.println(str);
out.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

客户端:

public class TcpClient1 {
public static void main(String[] args) throws IOException {
//尝试连接服务端 127.0.1.1:5000的服务器
Socket client = new Socket("127.0.0.1", 5000);
//获取输出流,往服务端写数据
PrintStream out = new PrintStream(client.getOutputStream());
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] msgs = {"你好,我是client1"};
for (String msg : msgs) {
out.println(msg);
out.flush();
while (true) {
String echo = buf.readLine();
if (echo != null) {
System.out.println(echo);
}
}
}
}
}

NIO

NIO server端的代码逻辑

  1. 创建和初始化Selector和ServerSocketChannel,给Server绑定Accept事件
  2. 主线程While(true) 从selector中读取发生的事件,一旦获取到就触发监听
  3. 如果阻塞过程中触发了事件,就遍历事件按照事件的类型来处理(连接事件、读取事件、写事件),处理完成删除
  4. 处理连接操作,就可以从key里面获取到Server,从Server.accept返回ClientChannel,就可以和客户端端交互了

服务端的操作(读取客户端的消息,和往客户端的Channel里面写消息)

  1. 获取到ClientChannel需要首先注册READ的监听,就可以读取到客户端发过来的消息,socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
  2. 也可以直接往clientChannel里面写内容, socketChannel.write(ByteBuffer.wrap((“welcome”.getBytes())));

客户端的操作:

  • 初始化客户端和Selector,类似的绑定OP_CONNECT的监听,需要在监听事件中 finishConnect()的方法才算连接成功
  • 连接成功之后,要在Client端绑定OP_READ监听,这样如果Server往里面写数据,就可以读取到。 socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer);
  • 也可以直接往Channel里面写数据,这样服务端也可以收的到 channel.write(ByteBuffer.wrap(new String(“say hi from client”).getBytes()));

    相关代码

  1. 服务端代码

    public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(10083));
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    ServerSelectorProtocol protocol = new ServerSelectorProtocol(BUFF_SIZE);
    while (true) {
    selector.select();
    Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
    while (keyIter.hasNext()) {
    SelectionKey key = keyIter.next();
    keyIter.remove();
    if (key.isAcceptable()){
    protocol.handleAccept(key);
    }
    if (key.isReadable()){
    protocol.handleRead(key);
    }
    }
    }
    }
  2. 服务端处理类ServerSelectorProtocol

    public void handleAccept(SelectionKey key) throws IOException {
    System.out.println("Accept");
    //根据key.channel() 获取Server,再根据Server.accept()获取Clinet
    SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
    socketChannel.configureBlocking(false);
    //附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作
    socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
    System.out.println("create new session from "+socketChannel.getRemoteAddress()+"\n");
    //往ClinetChannle里面写数据
    socketChannel.write(ByteBuffer.wrap(("welcome".getBytes())));
    }
    public void handleRead(SelectionKey key) throws IOException {
    SocketChannel clntChan = (SocketChannel) key.channel();
    //获取该信道所关联的附件,这里为缓冲区
    ByteBuffer buf = (ByteBuffer) key.attachment();
    buf.clear();
    long bytesRead = clntChan.read(buf);
    //如果read()方法返回-1,说明客户端关闭了连接,那么客户端已经接收到了与自己发送字节数相等的数据,可以安全地关闭
    if (bytesRead == -1){
    clntChan.close();
    }else if(bytesRead > 0){
    buf.flip();
    String result = "";
    while(bytesRead>0){
    byte[] data = buf.array();
    result+=new String(data);
    bytesRead = clntChan.read(buf);
    }
    System.out.println(result);
    }
    }
  3. 客户端代码

    public static void main(String[] args) throws IOException {
    SocketChannel clientChannel = SocketChannel.open();
    Selector selector = Selector.open();
    ClientHandler handler=new ClientHandler();
    clientChannel.configureBlocking(false);
    clientChannel.connect(new InetSocketAddress("127.0.0.1", 10083));
    clientChannel.register(selector, SelectionKey.OP_CONNECT);
    while (true) {
    selector.select();
    Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
    while (keyIter.hasNext()) {
    SelectionKey key = keyIter.next();
    keyIter.remove();
    if (key.isConnectable()){
    handler.handleConnect(key);
    }
    if (key.isReadable()){
    handler.handleRead(key);
    }
    }
    }
    }
  4. 客户端Handler

    public void handleConnect(SelectionKey key) throws IOException {
    SocketChannel channel=(SocketChannel)key.channel();
    //如果正在连接,则完成连接
    if(channel.isConnectionPending()){
    channel.finishConnect();
    }
    System.out.println("Connected");
    //ServerChannel的key,有accept接入
    SocketChannel socketChannel = (SocketChannel) key.channel();
    socketChannel.configureBlocking(false);
    //附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作
    socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFF_SIZE));
    channel.write(ByteBuffer.wrap(new String("say hi from client").getBytes()));
    }
    public void handleRead(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel)key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(200);
    channel.read(buffer);
    byte[] data = buffer.array();
    String message = new String(data);
    System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);
    }

AIO Socket编程

  1. AIO Socket编程支持2种方式,将来式和回调式,我演示的是回调式
  2. 参与的类比较多,但是职责单一,AcceptHandler监听客户的连接行为,ReadHandler监听客户的写入行为,WriteHandler没什么用,只是作为服务端向客户端写消息成功后,系统调用成功的反馈
  3. 因为服务端的行为很多都是要重复的,而回调函数都是一次性的,比如异步监听客户端的accept、read行为,所以每次回调之后都需要重复绑定

    服务端:

  • Server 服务创建和初始化AsynchronousServerSocketChannel,并且绑定AcceptHandler,监听客户的连接行为
  • AcceptHandler 监听到客户连接后,获取到客户的AsynchronousSocketChannel,然后对客户绑定ReadHandler
  • ReadHandler 监听客户往服务端发消息,读取到之后,会发起一个反馈消息,告知客户端已经收到你的消息,此时服务端写消息,需要绑定一个WriteHandler作为写入完成的反馈
  • WriteHandler 因为服务端要往客户端写反馈消息,该写入操作需要有个异步回调操作,WriteHandler 里面输出 “写入完成”
    public class Server {
    public final static int PORT = 8001;
    public final static String IP = "127.0.0.1";
    public static void main(String[] args) throws IOException {
    AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));
    System.out.println("Server listen on "+PORT);
    server.accept(null,new AcceptHandler(server));
    while(true){
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
    public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
    final ByteBuffer buffer = ByteBuffer.allocate(1024);
    private AsynchronousServerSocketChannel server = null;
    AcceptHandler(AsynchronousServerSocketChannel server){
    this.server=server;
    }
    @Override
    public void completed(AsynchronousSocketChannel socket, Server attachment) {
    server.accept(null,this);
    try {
    System.out.println("有客户端连接:" + socket.getRemoteAddress().toString());
    } catch (IOException e1) {
    e1.printStackTrace();
    }
    startRead(socket);
    }
    @Override
    public void failed(Throwable exc, Server attachment) {}
    public void startRead(AsynchronousSocketChannel socket) {
    ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
    ReadHandler rd=new ReadHandler(socket);
    socket.read(clientBuffer, clientBuffer, rd);
    try {
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    public class ReadHandler implements CompletionHandler<Integer,ByteBuffer>{
    private AsynchronousSocketChannel socket;
    public String msg;
    public ReadHandler(AsynchronousSocketChannel socket) {
    this.socket = socket;
    }
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
    //回调函数里面的buf参数,就是Client写的内容,调用 msg=decoder.decode(buf).toString(); 读取客户端的内容
    @Override
    public void completed(Integer i, ByteBuffer buf) {
    if (i > 0) {
    socket.read(buf, buf, this);
    buf.flip();
    try {
    msg=decoder.decode(buf).toString();
    System.out.println("收到" +socket.getRemoteAddress().toString() + "的消息:" + msg);
    buf.compact();
    } catch (CharacterCodingException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    }
    try {
    write(socket);
    } catch (UnsupportedEncodingException ex) {
    Logger.getLogger(ReadHandler.class.getName()).log(Level.SEVERE, null, ex);
    }
    } else if (i == -1) {
    try {
    System.out.println("客户端断线:" + socket.getRemoteAddress().toString());
    buf = null;
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) { }
    public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{
    String sendString="server recieve your message:"+msg;
    ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));
    socket.write(clientBuffer, clientBuffer, new WriteHandler(socket));
    }
    }
    public class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {
    private AsynchronousSocketChannel socket;
    public WriteHandler(AsynchronousSocketChannel socket) {
    this.socket = socket;
    }
    @Override
    public void completed(Integer i, ByteBuffer buf) {
    if (i > 0) {
    System.out.println("往客户端发送消息成功");
    } else if (i == -1) {
    try {
    System.out.println("对端断线:" + socket.getRemoteAddress().toString());
    buf = null;
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {}
    }

客户端

相对来说比较简单,直接用匿名内部类来解决,connect事件绑定一个回调,连接成功后写入一段话,写入成功后,再绑一个read的监听,来接受客户端的消息

public static void main(String[] args) throws IOException {
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1",8001);
CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void,Object>(){
@Override
public void completed(Void result, Object attachment) {
client.write(ByteBuffer.wrap("Hello".getBytes()),null,
new CompletionHandler<Integer,Object>(){
@Override
public void completed(Integer result,Object attachment) {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){
@Override
public void completed(Integer result,
ByteBuffer attachment) {
buffer.flip();
System.out.println(new String(buffer.array()));
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {}
});
}
@Override
public void failed(Throwable exc, Object attachment) {}
});
}
@Override
public void failed(Throwable exc, Object attachment) {}
};
client.connect(serverAddress, null, handler);
}

Contents
  1. 1. 参考
  2. 2. 阻塞式多线程TCPsocket编程原理
  3. 3. BIO
  4. 4. NIO
    1. 4.1. 相关代码
  5. 5. AIO Socket编程
    1. 5.1. 服务端:
    2. 5.2. 客户端