0%

JDK NIO

Netty作为RPC框架的通信框架,比如阿里的Dubbo服务,为了能了解其底层原理,所以有必要了解下Apache的Netty框架,Netty又是基于JDK的NIO进行封装的,所以JDK NIO 了解一下。

JDK I/O 又可以区分为 普通的IO,伪异步IO,NIO,AIO;

themeimage

NIO

JDK 1.4 之后推出的非阻塞IO的API,如果根据时间定义NIO 可以理解为 New IO, 如果根据阻塞特性来定义NIO 可以理解为 No-Blocking IO;

NIO的核心API类有Channel, Selector, Buffer;

Channel

传统的IO是基于流的形式,数据只能单向传递,而Channel是可以双向传递的通道,既可以read(),也可以write(),只是这个读/写操作都必须依赖Buffer;

日常会用到的Channel有:

FileChannel: 文件通道用于从文件读取数据.通过getChannle创建;

DatagramChannel:数据报通道可以通过UDP(用户数据报协议)通过网络读取和写入数据.通过工厂方法创建;

SocketChannel:数据报通道可以通过TCP(传输控制协议)通过网络读取和写入数据。通过工厂方法来创建新对象。

ServerSocketChannel:服务器端允许用户监听传入的TCP连接。对于每个传入连接,都会为连接创建一个SocketChannel;

Selector

Selector 是可以选择通道的多路复用器,可以用于使用单线程处理多个通道,Selector会检查NIO的channel,并返回准备好了的channle。Selector 只能通过工厂方法获取实例(KQueueSelectorImpl);

1
2
3
4
5
6
7
//获取准备好的channel
int select = selector.select(); //返回准备好的channel个数
Set<SelectionKey> selectionKeys = selector.selectedKeys(); //获取准备好的channel的集合

// 获取所有的channel
Set<SelectionKey> allSelectionKeys = selector. keys();

Buffer

Buffer用于与NIO通道进行交互。Channel的读、写操作都需要Buffer类;

Buffer类下面4个属性是决定Channel即可读,又可写的关键;

1
2
3
4
5
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

mark : 用于标记,mark()方法添加标记;

position : 读写的指针位置;

limit:创建buffer默认limit = capacity 。写操作是:limit = capacity,limit表示极限存储的大小,读操作时:limit 表示buffer中数据的存储长度;

capacity : Buffer的存储大小;

默认创建的buffer用来读取channel中的数据;想从读切换到写操作,需要执行flip()操作,此时limit = 读操作时position的值,而position的值等于0;

1
ByteBuffer buffer = ByteBuffer.allocate(10);

buffer1图片

当从channel中读取5个数据到buffer中之后:

buffer2图片

当从channel需要从读模式切换到为写模式,执行flip()操作之后:

buffer3图片

将buffer中5个数据写出到channel中:

buffer4图片

如果执行buffer.clear()操作:

1
2
3
4
5
6
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

文件操作

使用channel.transferTo()操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 
System.out.println(new Date() + " " + System.currentTimeMillis());
FileInputStream inputStream = new FileInputStream(new File("/Users/zhangwenhao/openSource/testNio/src/big47m.pdf"));
File file = new File("/Users/zhangwenhao/openSource/testNio/src/new.pdf");
if (file.exists()) {
file.delete();
}
file.createNewFile();
FileOutputStream outputStream = new FileOutputStream(file);

FileChannel channel = inputStream.getChannel();
FileChannel channel3 = outputStream.getChannel();

channel.transferTo(0, channel.size(), channel3);
channel.close();
inputStream.close();

// Thu Jul 18 16:03:13 CST 2019 1563436993768
//Disconnected from the target VM, address: '127.0.0.1:55421', transport: 'socket'
//Thu Jul 18 16:03:13 CST 2019 1563436993821

// 3821 - 3768 = 53

channel3.close();
outputStream.close();

System.out.println(new Date() + " " + System.currentTimeMillis());

像普通IO流一样拷贝数据

值得注意的是:对于PDF文件,channel3.write(buffer,len)是没用的;拷贝完成文件打不开;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
System.out.println(new Date() + " " + System.currentTimeMillis());
FileInputStream inputStream = new FileInputStream(new File("/Users/zhangwenhao/openSource/testNio/src/big47m.pdf"));

File file = new File("/Users/zhangwenhao/openSource/testNio/src/new.pdf");
if (file.exists()) {
file.delete();
}
file.createNewFile();
FileOutputStream outputStream = new FileOutputStream(file);

FileChannel channel = inputStream.getChannel();
FileChannel channel3 = outputStream.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = channel.read(buffer)) != -1) {
buffer.flip();
channel3.write(buffer);
buffer.clear();
}
//Thu Jul 18 16:17:10 CST 2019 1563437830880
//Thu Jul 18 16:17:11 CST 2019 1563437831550 670

System.out.println(new Date() + " " + System.currentTimeMillis());

Socket编程

NIO的Socket编程,服务器端的开发思路大概如下

· 1:创建Selector
· 2:创建ServerSocketChannel
· 3:设置监听端口
· 4:设置ServerSocketChannel为非阻塞模式
· 5:将ServerSocketChannel注册监听到Selector,以accepte类型监听
· 6:轮休监听,直到有满足channel(如果是链接成功,则创建SocketChannel 设置为非阻塞模式,并注册到Selector中,以SelectionKey.OP_READ类型监听)
· 7:执行执行select 获取准备好的channle
· 8:根据channle的状态相应的逻辑

Selector.select()方法是一个阻塞方法,如果没有准备好的Channel会一直等待;

在不同的平台下面,该方法可能有bug,比如linux系统下的Epoll Bug,就是该方法可能出现不阻塞的情况导致无限轮询;

服务端代码:

值得注意的是:当在操作准备好的Channel时候,要将其从准备好的channel集合中移除;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
 /**
* @author: zhangwenhao
* @since: 2019/7/17
*/
public class NioServer {

public void start() throws IOException {

/**
* 1:获取Selector
*/

Selector selector = Selector.open();
/**
* 2:获取ServerSocketChannel
*/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

/**
* 3:监听端口
*/

serverSocketChannel.bind(new InetSocketAddress(9091));

/**
* 4:设置非阻塞模式
*/

serverSocketChannel.configureBlocking(false);

/**
* 5:将channel 注册到selector
*/

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动成功");


for (; ; ) {

/**
* 6:执行select 获取准备好的channle
*/

int select = selector.select();
if (select == 0) {
continue;
}

/**
* 7:执行业务操作
*/

Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {

SelectionKey selectionKey = iterator.next();
iterator.remove();

/**
* 更加channel状态执行相应逻辑
*/
if (selectionKey.isConnectable()) {

}

if (selectionKey.isAcceptable()) {
accepteHandler(selector, serverSocketChannel);
}

if (selectionKey.isReadable()) {
readHandler(selector, selectionKey);
}

}


}
}

public void accepteHandler(Selector selector, ServerSocketChannel serverSocketChannel) throws IOException {

/**
* 从serversocketchannel 中获取socketchannel
*/
SocketChannel socketChannel = serverSocketChannel.accept();

/**
* 设置非阻塞模式
*/
socketChannel.configureBlocking(false);

/**
* 设置监听
*/
socketChannel.write(Charset.forName("UTF-8").encode("你已经成功加入"));
socketChannel.register(selector, SelectionKey.OP_READ);

}

public void readHandler(Selector selector, SelectionKey selectionKey) throws IOException {

/**
* 从selectorkey 中获取已经准备就绪的socketchannel
*/

SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

/**
* 创建buffer
*/

ByteBuffer buffer = ByteBuffer.allocate(1024);

/**
* 循环读取客户端信息
*/

String request = "";

while (socketChannel.read(buffer) > 0) {
buffer.flip();
request += Charset.forName("UTF-8").decode(buffer);
}

/**
* 将channel 再次注册到selector
*/
socketChannel.register(selector, SelectionKey.OP_READ);

/**
* 广播消息到其他客户端
*/

if (request.length() > 0) {
System.out.println(" ;; " + request);
socketChannel.write(Charset.forName("UTF-8").encode("服务器发给客户端的" + request));
}

}

public static void main(String[] args) throws IOException {
NioServer nioServer = new NioServer();
nioServer.start();

}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 /**
* @author: zhangwenhao
* @since: 2019/7/17
*/
public class NioClient {

public void start() throws IOException {

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9091));
socketChannel.configureBlocking(false);

Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
new Thread(new NioClientHandler(selector)).start();

Scanner scanner = new Scanner(System.in);

while (scanner.hasNextLine()) {
String request = scanner.next();
if (request != null && request.length() > 0) {
socketChannel.write(Charset.forName("UTF-8").encode(request));
}
}


}

public static void main(String[] args) throws IOException {
NioClient nioClient = new NioClient();
nioClient.start();
}
}

NioClientHandler: 用于监听来自服务器端的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
 /**
* @author: zhangwenhao
* @since: 2019/7/17
*/
public class NioClientHandler implements Runnable {

private Selector selector;

public NioClientHandler(Selector selector) {
this.selector = selector;
}

@Override
public void run() {
try {
for (; ; ) {
int select = selector.select();
if (select == 0) {
continue;
}

Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();

if (selectionKey.isReadable()) {
readHandler(selector, selectionKey);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void readHandler(Selector selector, SelectionKey selectionKey) throws IOException {

SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
String response = "";
while (socketChannel.read(buffer) > 0) {
buffer.flip();
response += Charset.forName("UTF-8").decode(buffer);
}
socketChannel.register(selector, SelectionKey.OP_READ);

if (response.length() > 0) {
System.out.println(" 客户端接收到;; " + response);
}

}
}

GitHub登录不了?信任该网站之后再登录。