java io nio aio_learn

关于Java io nio aio 的学习


Java io


  字节流 字符流
输入流 InputStream Reader
输出流 OutputStream Writer



1. Java 文件IO

用字节流,字符流读写文件

public static void Read_WriteToFile()throw IOException{
  File file = new File("path.txt");
  //字节流读取
  byte[] byteArray = new byte[(int)file.length()];
  InputStream ins = new FileInputStream(file);
  int size = ins.read(byteArray);//读取字节流到byteArray中返回大小
  ins.close;
  //字节流写入
  OutputStream ous = new FileOutputStream(file);
  ous.write(byteArray);//将byteArray中的内容写入file
  ous.close;
  //字符流读取
  Reader reader = new FileReader(file);
  char[] charArray = new char[(int)file.length()];
  size = reader.read(charArray);//读取file字符流到charArray
  reader.close;
  //字符流写入
  Write os = new FileWrite(file);
  os.write(charArray);//将charArray内容写入file
  os.close;
}


用缓冲字节流读写文件

public static void Buffer_File()throws IOException{
  //用缓冲字节流读文件
  File file =new File("path.txt");
  byte[] byteArray = new byte[(int)file.length()];
  InputStream ins = new BufferedInputStream(new FileInputStream(file),2*1024);
  int size = ins.read(byteArray);
  ins.close;
  //用缓冲字节流写文件
  OutputStream ous = new BufferedOutputStream(new FileOutputStream(file),2*1024);
  ous.write(byteArray);
  ous.close;
}

2. BIO网络编程

传统的BIO编程


传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通讯。
传统的BIO模型图:


同步阻塞式I/O创建的Server源码:

/**
 * 
 */
package io;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @ClassName ServerNormal
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月1日 下午1:45:05
 * @site usthe.com
 */
public final class ServerNormal {
	private static int DEFAULT_PORT=12345;
	private static ServerSocket serverSocket;
	public static void start() throws IOException{
		start(DEFAULT_PORT);
	}
	public synchronized static void start(int port) throws IOException{
		if(serverSocket!=null)
			return;
		try {
			serverSocket = new ServerSocket(port);
			while(true)
			{
				Socket socket = serverSocket.accept();
				new Thread(new ServerHandler(socket)).start();
			}
		} finally {
			// TODO: handle finally clause
			if(serverSocket!=null)
			{
				System.out.println("服务器关闭");
				serverSocket.close();
				serverSocket = null; 
			}
				
		}
		
		
	}
	

}

客户端消息处理线程ServerHandler源码:

/**
 * 
 */
package io;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * @ClassName ServerHandler
 * @Description TODO 用于处理一个客户端的Socket连接
 * @author tomsun28
 * @Date 2017年11月1日 下午9:57:47
 * @site usthe.com
 */
public class ServerHandler implements Runnable {

	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	private Socket socket;
	 /**
	 * @Title ServerHandler
	 * @Description TODO
	 * @param 
	 * @throws 
	 */
	public ServerHandler(Socket socket) {
		// TODO Auto-generated constructor stub
		this.socket = socket;
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		BufferedReader in = null;
		DataOutputStream out = null;
		try {
			in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			 out = new DataOutputStream(socket.getOutputStream());
           
			String expression;
			while(true){
				if((expression = in.readLine())==null)break;
				System.out.println("服务器收到消息!"+expression);
				
			}
			out.writeUTF("谢谢连接我:" + socket.getLocalSocketAddress() + "\nGoodbye!");
			out.close();
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}

}

同步阻塞式I/O创建的Client源码:

/**
 * 
 */
package io;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.nio.channels.ScatteringByteChannel;

import javax.sound.midi.VoiceStatus;

/**
 * @ClassName Client
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月1日 下午10:20:30
 * @site usthe.com
 */
public class Client {
	private static int DEFAULT_SERVER_PORT = 12345;
	private static String DEFAULT_SERVER_IP = "127.0.0.1";
	public static void send(String expression)
	{
		send(DEFAULT_SERVER_PORT,expression);
	}
	public static void send(int port,String expression)
	{
		Socket socket = null;
		DataInputStream in = null;
		PrintWriter out = null;
		String str;
		try {
			socket = new Socket(DEFAULT_SERVER_IP, port);
			
			out = new PrintWriter(socket.getOutputStream(),true);
			
			out.println(expression);
			System.out.println("hahhahaha");		
				in = new DataInputStream(socket.getInputStream());

                System.out.println("from server"+in.readUTF());

		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}finally{
			if(in != null){
				try {
					in.close();
				} catch (Exception e2) {
					// TODO: handle exception
				}
				in = null;
				
			}
			if(out !=null){
				out.close();
				out = null;
			}
			if(socket != null)
			{
				try{
					socket.close();
				}catch(IOException e){
					e.printStackTrace();
				}
				socket = null;
			}
		}
	}
}

I/O测试代码:

/**
 * 
 */
package io;


/**
 * @ClassName TestBIO
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月1日 下午10:34:17
 * @site usthe.com
 */
public class TestBIO {
	
	public static void main(String args[]){
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					ServerNormal.start();
				} catch (Exception e) {
					// TODO: handle exception
					e.printStackTrace();
				}
			}
		}).start();
		String exp = "hello server!";
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				// TODO Auto-generated method stub
				Client.send(exp);
			}
		}).start();
	}
}


伪异步I/O编程

为了改进这种一个connect一个thread情况,用线程池来管理请求线程,实现一个或多个线程处理N个客户端(但底层还是BIO),这种通常称为伪异步IO。 伪异步IO模型图:
改动server代码,将为连接请求所新建的线程交给线程池管理。

/**
 * 
 */
package io;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName ServerNormal
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月1日 下午1:45:05
 * @site usthe.com
 */
public final class ServerNormal {
	private static int DEFAULT_PORT=12345;
	private static ServerSocket serverSocket;
	private static ExecutorService executorService = Executors.newFixedThreadPool(60);
	public static void start() throws IOException{
		start(DEFAULT_PORT);
	}
	public synchronized static void start(int port) throws IOException{
		if(serverSocket!=null)
			return;
		try {
			serverSocket = new ServerSocket(port);
			while(true)
			{
				Socket socket = serverSocket.accept();
//				new Thread(new ServerHandler(socket)).start();
				executorService.execute(new ServerHandler(socket));
			}
		} finally {
			// TODO: handle finally clause
			if(serverSocket!=null)
			{
				System.out.println("服务器关闭");
				serverSocket.close();
				serverSocket = null; 
			}		
		}
	}
}



Java nio


IO是以流的方式处理数据,一次一个字节的处理数据,而NIO是以块的方式处理数据,一次操作消费一个数据块。NIO可以为非阻塞IO。
Buffer,Channel,Seletor 是NIO的三个核心对象。


缓冲区Buffer


Buffer是一个对象,包含一些要写入或者读出的数据。在NIO中,数据是放入buffer对象的,而在IO中,数据是直接写入或者读到Stream对象的。应用程序不能直接对 Channel 进行读写操作,而必须通过 Buffer 来进行,即 Channel 是通过 Buffer 来读写数据的。
在NIO中,所有的数据都是用Buffer处理的,它是NIO读写数据的中转池。Buffer实质上是一个数组,通常是一个字节数据,但也可以是其他类型的数组。但一个缓冲区不仅仅是一个数组,重要的是它提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。
使用 Buffer 读写数据一般遵循以下四个步骤:

1. 写入数据到 Buffer;
2. 调用 flip() 方法;
3. 从 Buffer 中读取数据;
4. 调用 clear() 方法或者 compact() 方法。


limit - buffer读写上限,
posttion - 读写时当前游标的位置
capacity - 缓冲区的最大容量
mark     - 快照点,供reset恢复到快照点

clear 清空缓冲区  posttion=0,limit=capacity,mark=-1
flip  反转缓冲区  limit=position,position=0,mark=-1
rewind 重0开始读缓冲区 position=0,mark=-1
reset 根据mark恢复缓冲区  position=mark

compact 压实缓冲区,复用缓冲区空间,将已读的内容丢弃
slice 共享缓冲区空间limit-position,其获取的是同一缓冲区

当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。
一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
Buffer主要有如下几种:


通道Channel


Channel是一个对象,可以通过它读取和写入数据。所有数据都通过Buffer处理,不会直接把字节写入到Channel中,也不会直接从Channel中读数据。可以把它看做IO中的流。但是它和流相比还有一些不同:

1. Channel是双向的,既可以读又可以写,而流是单向的
2. Channel可以进行异步的读写
3. 对Channel的读写必须通过buffer对象

在Java NIO中Channel主要有如下几种类型:

1. FileChannel:从文件读取数据的
2. DatagramChannel:读写UDP网络协议数据
3. SocketChannel:读写TCP网络协议数据
4. ServerSocketChannel:可以监听TCP连接


多路复用器Selector


Selector是Java NIO编程基础,其是一个对象,它可以注册到很多个Channel上,监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样通过一个线程管理多个Channel,就可以处理大量网络连接了。

  1. 创建一个Selector
Selector selector = Selector.open();
  1. 注册Channel到Selector上
channel.configureBlocking(false); //注册的Channel必须设置成异步模式
SelectionKey key = channel.register(selector,SelectionKey.OP_READ); //注册感兴趣的事件到selector获得对应的SelectorKey
//需要注意register()方法的第二个参数,它是一个“interest set”,意思是注册的Selector对Channel中的哪些时间感兴趣,事件类型有四种:connect,accept,read,write 
//通道触发了一个事件意思是该事件已经 Ready(就绪)。所以某个Channel成功连接到另一个服务器称为 Connect Ready。一个ServerSocketChannel准备好接收新连接称为 Accept Ready,一个有数据可读的通道可以说是 Read Ready,等待写数据的通道可以说是Write Ready。
//上面四个事件多对应到SelectionKey中的常量为:SelectionKey.OP_CONNECT,SelectionKey.OP_ACCEPT,SelectionKey.OP_READ,SelectionKey.OP_WRITE
//多个感兴趣事件操作符连接:SelectionKey.OP_ACCEPT|SelectionKey.OP_READ


NIO编程


Server


/**
 * 
 */
package nio;



/**
 * @ClassName Server
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月3日 下午7:30:20
 * @site usthe.com
 */
public class Server {
	 private static int DEFAULT_PORT=12345;
	 private static ServerHandle serverHandle;
	 public static void start(){
		 start(DEFAULT_PORT);
	 }
	 
	 public static synchronized void start(int port){
		 if(serverHandle!=null){
			 serverHandle.stop();
		 }
		 serverHandle = new ServerHandle(port);
		 new Thread(serverHandle,"server").start();
		 
	 }
	 
	 public static void main(String[] args){
		 start();
	 }
	

}



ServerHandle


/**
 * 
 */
package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * @ClassName ServerHandle
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月3日 下午7:43:24
 * @site usthe.com
 */
public class ServerHandle implements Runnable {
	
	private static Logger logger = LoggerFactory.getLogger(ServerHandle.class);
	
	private Selector selector;
	private ServerSocketChannel serverChannel;
	private volatile boolean started;
	
	public ServerHandle(int port){
		
		try {
			//创建选择器
			selector = Selector.open();
			//打开监听的通道
			serverChannel = ServerSocketChannel.open();
			//将此通道设置为非阻塞模式
			serverChannel.configureBlocking(false);
			//绑定端口port,backlog设置为1024
			serverChannel.socket().bind(new InetSocketAddress(port), 1024);
            //将channel注册到selector上,注册响应事件
			serverChannel.register(selector,SelectionKey.OP_ACCEPT);
			//标记服务已经开启
			started = true;
			System.out.println("服务器已经启动端口号为:"+port);
			
			
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("出错系统进行退出:{}", e);
			System.exit(1);
		}
	}
	
	public void stop(){
		started=false;
	}
	
	public void run(){
		while(started){
			try {
				//无论是否有事件发生,每隔以一秒唤醒一次
				selector.select(1000);
				//阻塞形式,至少有一个注册的事件发生时才会继续
				//selector.select();
				//获取每个管道对应的selectionKey
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = keys.iterator();
				SelectionKey key = null;
				while(iterator.hasNext()){
					key = iterator.next();
					iterator.remove();
					try {
						inputHandle(key);
					} catch (Exception e) {
						// TODO: handle exception
						if(key!=null){
							key.cancel();
							if(key.channel()!=null){
								key.channel().close();
							}		
						}
					}
				}
			} catch (Exception e) {
				// TODO: handle exception
				logger.error(e.toString());
			}
			
		}
		if(selector!=null){
			try {
				selector.close();
			} catch (Exception e) {
				// TODO: handle exception
				logger.error("关闭selector失败:{}",e);
			}
		}
	}
	
	private void inputHandle(SelectionKey key) throws IOException{
		if(key.isValid())
		{
			//处理新接入的请求消息
			if(key.isAcceptable()){
				ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
				//通过serverSocketChannel的accept()创建SocketChannel实例
				SocketChannel sc = ssc.accept();
				//将SocketChannel设置为非阻塞
				sc.configureBlocking(false);
				//把socketChannel注册到selector上,注册事件为OP_READ
				sc.register(selector,SelectionKey.OP_READ);
			}
			//处理读消息
			if(key.isReadable()){
				SocketChannel sc = (SocketChannel)key.channel();
				//创建ByteBuffer开辟1M的缓冲区
				ByteBuffer buffer =ByteBuffer.allocate(1024);
				//将请求消息读出写入到buffer中,返回读到的字节数
				int readBytes = sc.read(buffer);
				
				if(readBytes>=0){
					//将缓冲区buffer转换成读模式
					buffer.flip();
					//根据缓冲区可读的字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					//将缓冲区可读的字节数组复制到新建的字节数组bytes中
					buffer.get(bytes);
					String expression = new String(bytes,"UTF-8");
					logger.info("服务器收到消息:{}",expression);
					//发送应答消息
					String response = "嗨我是服务器!";
					//清空buffer
					buffer.clear();
					//将buffer转换为写模式
					buffer.flip();
					//写数据到buffer
					buffer.put(response.getBytes());
					//把buffer转换为读模式
					buffer.flip();
					//将buffer的数据读取到channel中发送
					sc.write(buffer);
				}
				else{
					key.cancel();
					sc.close();
				}
			}
		}			
		}		
	}





Client


/**
 * 
 */
package nio;

import java.io.IOException;

/**
 * @ClassName Client
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月4日 下午2:10:23
 * @site usthe.com
 */
public class Client {
	private static String DEFAULT_HOST = "127.0.0.1";
	private static int DEFAULE_PORT = 12345;
	private static ClientHandle clientHandle;
	public static void start(String ip,int port){
		if(clientHandle!=null){
			clientHandle.stop();
		}
		clientHandle = new ClientHandle(ip,port);
		new Thread(clientHandle,"client").start();
	}
	public static boolean sendMsg(String msg) throws IOException
	{
		if(msg.equals("q")) return false;
		clientHandle.sendMsg(msg);
		return true;
	}
}


ClientHandle


/**
 * 
 */
package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @ClassName ClientHandle
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月4日 下午1:30:09
 * @site usthe.com
 */
public class ClientHandle implements Runnable {

	private static final Logger logger = LoggerFactory.getLogger(ClientHandle.class);
	
	private String host;
	private int port;
	private Selector selector;
	private SocketChannel socketChannel;
	private volatile boolean started;
	
	public ClientHandle(String ip,int port){
		this.host=ip;
		this.port=port;
		try {
			//创建选择器
			selector = Selector.open();
			//打开通道
			socketChannel = SocketChannel.open();
			//ture为阻塞模式,false为非阻塞模式
			socketChannel.configureBlocking(false);
			started=true;
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("通道开启失败:{}",e);
			System.exit(1);
		}
		
		
	}
	
	public void stop(){
		started = false;
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		
		try {
			if(socketChannel.connect(new InetSocketAddress(host, port)));
			else
				socketChannel.register(selector,SelectionKey.OP_READ);
		} catch (IOException e) {
			// TODO Auto-generated catch block
//			e.printStackTrace();
			logger.error(e.toString());
			System.exit(1);
		}
		
		while(started){
			
			try {
				selector.select(1000);
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = keys.iterator();
				SelectionKey key = null;
				while(iterator.hasNext()){
					key = iterator.next();
					iterator.remove();
					try {
						inputHandle(key);
					} catch (Exception e) {
						// TODO: handle exception
						if(key!=null)
						{
							key.cancel();
							if(key.channel()!=null){
								key.channel().close();
							}
						}
					}
				}
				
			} catch (Exception e) {
				// TODO: handle exception
				logger.error(e.toString());
				System.exit(1);
			}
			
		}
		
		if(selector!=null){
			try {
				selector.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}	
		}
	}
	private void inputHandle(SelectionKey key) throws IOException
	{
		if(key.isValid()){
			SocketChannel sc = (SocketChannel)key.channel();
			
			if(key.isReadable()){
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				int readBytes = sc.read(buffer);
				if(readBytes>0){
					buffer.flip();
					byte[] bytes = new byte[buffer.remaining()];
					buffer.get(bytes);
					String result = new String(bytes,"UTF-8");
					logger.info("客户端收到消息:{}",result);		
				}
				else if(readBytes<0){
					key.cancel();
					sc.close();
				}	
			}
		}
	}
	public void sendMsg(String msg) throws IOException{
		socketChannel.register(selector,SelectionKey.OP_READ);
		byte[] bytes = msg.getBytes();
		ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
		buffer.put(bytes);
		buffer.flip();
		socketChannel.write(buffer);
	}
}


Test


/**
 * 
 */
package nio;

import java.io.IOException;
import java.util.Scanner;

/**
 * @ClassName Test
 * @Description TODO
 * @author tomsun28
 * @Date 2017年11月4日 下午2:07:46
 * @site usthe.com
 */
public class Test {
	
	private static String DEFAULT_HOST = "127.0.0.1";
	private static int DEFAULE_PORT = 12345;
	@SuppressWarnings("resource")
	public static void main(String[] args) throws InterruptedException, IOException
	{
		Server.start();
		Thread.sleep(1000);
		Client.start(DEFAULT_HOST, DEFAULE_PORT);
		while(Client.sendMsg(new Scanner(System.in).nextLine()));
		
	}

}




Java AIO


待续。。。。。。。




转载自Java网络IO编程总结 参考自 菜鸟教程 Java NIO详解

打赏

取消

感谢您的支持!

扫码支持
扫码支持