Skip to content

Commit 2c0b54c

Browse files
committed
多线程 reactor 模型
1 parent 31fa480 commit 2c0b54c

File tree

5 files changed

+427
-5
lines changed

5 files changed

+427
-5
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
、- [缓存击穿解决方法](/src/main/java/com/haobin/codeBlock/CacheBreakdown.java)
6868
- [传统IO模型](/src/main/java/com/haobin/codeBlock/IOModel/ClassicServerLoop.java)
6969
- [IO-reactor模式(单线程)](/src/main/java/com/haobin/codeBlock/IOModel/SingleReactor.java)
70+
- [IO-reactor模式(多线程)](/src/main/java/com/haobin/codeBlock/IOModel/MultiReactor.java)
7071

7172

7273

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package com.haobin.codeBlock.IOModel;
2+
3+
import java.io.EOFException;
4+
import java.io.IOException;
5+
import java.nio.ByteBuffer;
6+
import java.nio.channels.SelectionKey;
7+
import java.nio.channels.Selector;
8+
import java.nio.channels.SocketChannel;
9+
import java.nio.charset.StandardCharsets;
10+
11+
/**
12+
* @author: HaoBin
13+
* @create: 2019/10/23 15:16
14+
* @description: 单线程reactor handler 处理器
15+
* <p>
16+
* 在单线程reactor中, IO读写以及业务处理均由reactor线程完成
17+
**/
18+
public class BasicHandler implements Runnable {
19+
20+
private static final int INPUT_SIZE = 1024;
21+
private static final int OUTPUT_SIZE = 1024;
22+
23+
public SocketChannel socket;
24+
public SelectionKey selectionKey;
25+
ByteBuffer input = ByteBuffer.allocate(INPUT_SIZE);
26+
ByteBuffer output = ByteBuffer.allocate(OUTPUT_SIZE);
27+
28+
// 定义服务的逻辑状态
29+
static final int READING = 0;
30+
static final int SENDING = 1;
31+
static final int CLOSED = 2;
32+
int state = READING;
33+
34+
35+
public BasicHandler(SocketChannel socket) {
36+
this.socket = socket;
37+
}
38+
39+
public BasicHandler(Selector selector, SocketChannel sc) throws IOException {
40+
socket = sc;
41+
sc.configureBlocking(false); // 非阻塞
42+
// 注册到selector上,但不订阅任何事件
43+
selectionKey = socket.register(selector, 0);
44+
// 绑定要处理事件
45+
selectionKey.interestOps(SelectionKey.OP_READ);
46+
// 处理事件对应的程序
47+
selectionKey.attach(this);
48+
// 唤醒selector
49+
selector.wakeup();
50+
}
51+
52+
@Override
53+
public void run() {
54+
try {
55+
if (state == READING)
56+
read(); // 此时通道已经准备好读取字节
57+
else if (state == SENDING)
58+
send(); // 此时通道已经准备好写入字节
59+
} catch (IOException ex) {
60+
// 关闭连接
61+
try {
62+
selectionKey.channel().close();
63+
} catch (IOException ignore) {
64+
}
65+
}
66+
}
67+
68+
protected void read() throws IOException {
69+
input.clear(); // 清空接收缓冲区
70+
int n = socket.read(input);
71+
if (inputIsComplete(n)) {// 如果读取了完整的数据
72+
process();
73+
// 待发送的数据已经放入发送缓冲区中
74+
75+
// 更改服务的逻辑状态以及要处理的事件类型
76+
selectionKey.interestOps(SelectionKey.OP_WRITE);
77+
}
78+
}
79+
80+
// 缓存每次读取的内容
81+
StringBuilder request = new StringBuilder();
82+
/**
83+
* 当读取到 \r\n 时表示结束
84+
* @param bytes 读取的字节数,-1 通常是连接被关闭,0 非阻塞模式可能返回
85+
* @throws IOException
86+
*/
87+
protected boolean inputIsComplete(int bytes) throws IOException {
88+
if (bytes > 0) {
89+
input.flip(); // 切换成读取模式
90+
while (input.hasRemaining()) {
91+
byte ch = input.get();
92+
93+
if (ch == 3) { // ctrl+c 关闭连接
94+
state = CLOSED;
95+
return true;
96+
} else if (ch == '\r') { // continue
97+
} else if (ch == '\n') {
98+
// 读取到了 \r\n 读取结束
99+
state = SENDING;
100+
return true;
101+
} else {
102+
request.append((char)ch);
103+
}
104+
}
105+
} else if (bytes == -1) {
106+
// -1 客户端关闭了连接
107+
throw new EOFException();
108+
} else {} // bytes == 0 继续读取
109+
return false;
110+
}
111+
112+
/**
113+
* 根据业务处理结果,判断如何响应
114+
* @throws EOFException 用户输入 ctrl+c 主动关闭
115+
*/
116+
protected void process() throws EOFException {
117+
if (state == CLOSED) {
118+
throw new EOFException();
119+
} else if (state == SENDING) {
120+
String requestContent = request.toString(); // 请求内容
121+
byte[] response = requestContent.getBytes(StandardCharsets.UTF_8);
122+
output.put(response);
123+
}
124+
}
125+
126+
protected void send() throws IOException {
127+
int written = -1;
128+
output.flip();// 切换到读取模式,判断是否有数据要发送
129+
if (output.hasRemaining()) {
130+
written = socket.write(output);
131+
}
132+
133+
// 检查连接是否处理完毕,是否断开连接
134+
if (outputIsComplete(written)) {
135+
selectionKey.channel().close();
136+
} else {
137+
// 否则继续读取
138+
state = READING;
139+
// 把提示发到界面
140+
socket.write(ByteBuffer.wrap("\r\nreactor> ".getBytes()));
141+
selectionKey.interestOps(SelectionKey.OP_READ);
142+
}
143+
144+
}
145+
146+
/**
147+
* 当用户输入了一个空行,表示连接可以关闭了
148+
*/
149+
protected boolean outputIsComplete(int written) {
150+
if (written <= 0) {
151+
// 用户只敲了个回车, 断开连接
152+
return true;
153+
}
154+
155+
// 清空旧数据,接着处理后续的请求
156+
output.clear();
157+
request.delete(0, request.length());
158+
return false;
159+
}
160+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.haobin.codeBlock.IOModel;
2+
3+
import java.io.EOFException;
4+
import java.io.IOException;
5+
import java.nio.channels.SelectionKey;
6+
import java.nio.channels.Selector;
7+
import java.nio.channels.SocketChannel;
8+
import java.util.concurrent.ArrayBlockingQueue;
9+
import java.util.concurrent.ThreadFactory;
10+
import java.util.concurrent.ThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
/**
15+
* @author: HaoBin
16+
* @create: 2019/10/23 16:11
17+
* @description: 多线程 reactor handler
18+
**/
19+
public class MultiHandler extends BasicHandler {
20+
21+
static ThreadPoolExecutor workPool =
22+
new ThreadPoolExecutor(1,
23+
1,
24+
60, TimeUnit.SECONDS,
25+
new ArrayBlockingQueue<>(10),
26+
new ThreadFactory() {
27+
private AtomicInteger count = new AtomicInteger(0);
28+
@Override
29+
public Thread newThread(Runnable r) {
30+
Thread t = new Thread(r);
31+
t.setName(MultiHandler.class.getName() + count.addAndGet(1));
32+
return t;
33+
}
34+
},
35+
new ThreadPoolExecutor.AbortPolicy());
36+
37+
static final int PROCESSING = 4;
38+
private Object lock = new Object();
39+
40+
public MultiHandler(Selector selector, SocketChannel sc) throws IOException {
41+
super(selector, sc);
42+
}
43+
44+
@Override
45+
protected void read() throws IOException {
46+
// 为什么要同步?Processer 线程处理时通道还有可能有读事件发生
47+
// 保护 input ByteBuffer 不会重置和状态的可见性
48+
// 应该是这样
49+
synchronized (lock) {
50+
input.clear();
51+
int n = socket.read(input);
52+
if (inputIsComplete(n)) {
53+
54+
// 读取完毕后将后续的处理交给
55+
state = PROCESSING;
56+
workPool.execute(new Processer());
57+
}
58+
}
59+
}
60+
61+
class Processer implements Runnable {
62+
@Override
63+
public void run() {
64+
processAndHandOff();
65+
}
66+
}
67+
68+
private void processAndHandOff() {
69+
synchronized (lock) {
70+
try {
71+
process();
72+
} catch (EOFException e) {
73+
// 直接关闭连接
74+
try {
75+
selectionKey.channel().close();
76+
} catch (IOException e1) {}
77+
return;
78+
}
79+
80+
// 最后的发送还是交给 SingleReactor 线程处理
81+
state = SENDING;
82+
selectionKey.interestOps(SelectionKey.OP_WRITE);
83+
84+
// 这里需要唤醒 Selector,因为当把处理交给 workpool 时,SingleReactor 线程已经阻塞在 select() 方法了, 注意
85+
// 此时该通道感兴趣的事件还是 OP_READ,这里将通道感兴趣的事件改为 OP_WRITE,如果不唤醒的话,就只能在
86+
// 下次select 返回时才能有响应了,当然了也可以在 select 方法上设置超时
87+
selectionKey.selector().wakeup();
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)