Skip to content

Commit ba3730b

Browse files
committed
fix: 去除了一个愚蠢的全局变量,去除了supervisor愚蠢的日志系统
1 parent 1153f8f commit ba3730b

17 files changed

+113
-250
lines changed

README.md

+29-80
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
### **workbranch**
3030

31-
**workbranch**(工作分支)是动态线程池的抽象,内置了一条线程安全的**任务队列**用于同步任务。其管理的每一条异步工作线程被称为**worker**,负责从任务队列不断获取任务并执行。(以下示例位于`workspace/example/`
31+
**workbranch**(工作分支)是动态线程池的抽象,内置了一条线程安全的**任务队列**用于同步任务。其管理的每一条异步工作线程被称为**worker**,负责从任务队列不断获取任务并执行。(以下示例按顺序置于`workspace/example/`
3232
<br>
3333

3434
让我们先简单地提交一点任务,当你的任务带有返回值时,workbranch会返回一个std::future,否则返回void。
@@ -38,7 +38,7 @@
3838

3939
int main() {
4040
// 2 threads
41-
wsp::workbranch br("My First BR", 2);
41+
wsp::workbranch br(2);
4242
// return void
4343
br.submit([]{ std::cout<<"hello world"<<std::endl; });
4444
// return std::future<int>
@@ -59,7 +59,7 @@ int main() {
5959

6060
int main() {
6161
// 1 threads
62-
wsp::workbranch br("My Second BR");
62+
wsp::workbranch br;
6363
br.submit<wsp::task::nor>([]{ std::cout<<"task B done\n";}); // normal task
6464
br.submit<wsp::task::urg>([]{ std::cout<<"task A done\n";}); // urgent task
6565
br.wait_tasks(); // wait for tasks done (timeout: no limit)
@@ -82,7 +82,7 @@ task B done
8282
#include <workspace/workspace.h>
8383

8484
int main() {
85-
wsp::workbranch br("My Third BR");
85+
wsp::workbranch br;
8686
// sequence tasks
8787
br.submit<wsp::task::seq>([]{std::cout<<"task 1 done\n";},
8888
[]{std::cout<<"task 2 done\n";},
@@ -145,7 +145,7 @@ Caught error: YYYY
145145

146146
### **supervisor**
147147

148-
supervisor是异步管理者线程的抽象,负责监控workbranch的负载情况并进行动态调整。它自带简单的日志系统,并允许你在每一次检查workbranch的时候插入一个小任务,比如:定制你的专属日志(如加入时间)、简单地统计任务负载等
148+
supervisor是异步管理者线程的抽象,负责监控workbranch的负载情况并进行动态调整。它允许你在每一次调控workbranch之后执行一个小任务,你可以用来**写日志**或者做一些其它调控等
149149
<br>
150150

151151
每一个supervisor可以管理多个workbranch。此时workbranch之间共享supervisor的所有设定。
@@ -154,99 +154,53 @@ supervisor是异步管理者线程的抽象,负责监控workbranch的负载情
154154
#include <workspace/workspace.h>
155155

156156
int main() {
157-
wsp::workbranch br1("BR-1", 2);
158-
wsp::workbranch br2("BR-2", 2);
157+
wsp::workbranch br1(2);
158+
wsp::workbranch br2(2);
159159

160160
// 2 <= thread number <= 4
161161
// time interval: 1000 ms
162162
wsp::supervisor sp(2, 4, 1000);
163163

164-
sp.enable_log(std::cout);
165-
sp.set_tick_cb([]{
166-
static char buffer[40];
164+
sp.set_tick_cb([&br1, &br2]{
167165
auto now = std::chrono::system_clock::now();
168166
std::time_t timestamp = std::chrono::system_clock::to_time_t(now);
169167
std::tm local_time = *std::localtime(&timestamp);
168+
static char buffer[40];
170169
std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &local_time);
171-
std::cout<<"["<<buffer<<"] ";
170+
std::cout<<"["<<buffer<<"] "<<"br1: [workers] "<<br1.num_workers()<<" | [blocking-tasks] "<<br1.num_tasks()<<'\n';
171+
std::cout<<"["<<buffer<<"] "<<"br2: [workers] "<<br2.num_workers()<<" | [blocking-tasks] "<<br2.num_tasks()<<'\n';
172172
});
173+
173174
sp.supervise(br1); // start supervising
174175
sp.supervise(br2); // start supervising
175176

176177
for (int i = 0; i < 1000; ++i) {
177178
br1.submit([]{std::this_thread::sleep_for(std::chrono::milliseconds(10));});
178179
br2.submit([]{std::this_thread::sleep_for(std::chrono::milliseconds(20));});
179180
}
181+
180182
br1.wait_tasks();
181183
br2.wait_tasks();
182184
}
183185
```
184186

185-
在我的机器上,输出样式如下
187+
在我的机器上,输出如下
186188

187189
```
188190
jack@xxx:~/workspace/example/build$ ./e4
189-
[2023-04-24 16:37:15] workspace: BR-2 workers: 2 [min] | blocking-task: 0
190-
[2023-04-24 16:37:15] workspace: BR-1 workers: 2 [min] | blocking-task: 0
191-
[2023-04-24 16:37:16] workspace: BR-2 workers: 2 [min] | blocking-task: 900
192-
[2023-04-24 16:37:16] workspace: BR-1 workers: 2 [min] | blocking-task: 802
193-
[2023-04-24 16:37:17] workspace: BR-2 workers: 4 [max] | blocking-task: 700
194-
[2023-04-24 16:37:17] workspace: BR-1 workers: 4 [max] | blocking-task: 406
195-
[2023-04-24 16:37:18] workspace: BR-2 workers: 4 [max] | blocking-task: 500
196-
[2023-04-24 16:37:18] workspace: BR-1 workers: 4 [max] | blocking-task: 10
197-
[2023-04-24 16:37:19] workspace: BR-2 workers: 4 [max] | blocking-task: 302
198-
[2023-04-24 16:37:19] workspace: BR-1 workers: 4 [max] | blocking-task: 0
199-
[2023-04-24 16:37:20] workspace: BR-2 workers: 4 [max] | blocking-task: 104
200-
[2023-04-24 16:37:20] workspace: BR-1 workers: 3 [mid] | blocking-task: 0
201-
```
202-
203-
其中:
204-
205-
- `workers`:代表线程数
206-
207-
- `[min]|[mid]|[max]`: 代表了<最小|中等|最大>的线程数。
208-
209-
- `blocking-task`:代表了当前任务队列中阻塞的任务数。
210-
211-
从代码中可以看到,我们在每一次检查的间隔插入了一个日志任务,用于补充supervisor的日志信息 —— 将当前的系统时间安插到默认日志之前。
212-
<br>
213-
214-
我们也可以让每一个supervisor单独管理一个workbranch,如下:
215-
216-
```c++
217-
#include <workspace/workspace.h>
218-
#define TASKS 1000
219-
#define SLEEP_FOR(ms) std::this_thread::sleep_for(std::chrono::milliseconds(ms))
220-
221-
int main() {
222-
wsp::workbranch main_br("Main ");
223-
wsp::workbranch help_br("Helper");
224-
225-
wsp::supervisor main_br_sp(2, 4); // interval 500(ms)
226-
wsp::supervisor help_br_sp(0, 2); // interval 500(ms)
227-
main_br_sp.enable_log();
228-
help_br_sp.enable_log();
229-
230-
main_br_sp.supervise(main_br);
231-
help_br_sp.supervise(help_br);
232-
233-
for (int i = 0; i < TASKS; ++i) {
234-
if (main_br.num_tasks() > 200) {
235-
if (help_br.num_tasks() > 200)
236-
SLEEP_FOR(20);
237-
else
238-
help_br.submit([]{SLEEP_FOR(10);});
239-
} else {
240-
main_br.submit([]{SLEEP_FOR(10);});
241-
}
242-
}
243-
main_br.wait_tasks();
244-
help_br.wait_tasks();
245-
}
191+
[2023-06-13 12:24:31] br1: [workers] 4 | [blocking-tasks] 606
192+
[2023-06-13 12:24:31] br2: [workers] 4 | [blocking-tasks] 800
193+
[2023-06-13 12:24:32] br1: [workers] 4 | [blocking-tasks] 213
194+
[2023-06-13 12:24:32] br2: [workers] 4 | [blocking-tasks] 600
195+
[2023-06-13 12:24:33] br1: [workers] 4 | [blocking-tasks] 0
196+
[2023-06-13 12:24:33] br2: [workers] 4 | [blocking-tasks] 404
197+
[2023-06-13 12:24:34] br1: [workers] 3 | [blocking-tasks] 0
198+
[2023-06-13 12:24:34] br2: [workers] 4 | [blocking-tasks] 204
199+
[2023-06-13 12:24:35] br1: [workers] 2 | [blocking-tasks] 0
200+
[2023-06-13 12:24:35] br2: [workers] 4 | [blocking-tasks] 4
201+
[2023-06-13 12:24:35] br1: [workers] 2 | [blocking-tasks] 0
202+
[2023-06-13 12:24:35] br2: [workers] 4 | [blocking-tasks] 0
246203
```
247-
248-
在这个例子中我们申请了两个工作分支,并将其中一个作为**主要分支**,另一个作为**辅助分支**,接纳主要分支无法容纳的任务。值得注意的是:各个supervisor之间的输出**线程安全**的。
249-
250204
---
251205

252206
### **workspace**
@@ -258,25 +212,20 @@ workspace是一个**托管器**/**任务分发器**,你可以将workbranch和s
258212
- 避免空悬指针问题:当workbranch先于supervisor析构会造成**空悬指针**的问题,使用workspace可以避免这种情况
259213
- 更低的框架开销:workspace的任务分发机制能减少与工作线程的竞争,提高性能(见下Benchmark)。
260214

261-
我们可以通过workspace自带的任务分发机制(调用`submit`),或者调用`for_each`来进行任务分发。前者显然是更好的方式
215+
我们可以通过workspace自带的任务分发机制来异步执行任务(调用`submit`
262216

263217
```C++
264218
#include <workspace/workspace.h>
265219

266220
int main() {
267221
wsp::workspace spc;
268-
auto bid1 = spc.attach(new wsp::workbranch("BR1"));
269-
auto bid2 = spc.attach(new wsp::workbranch("BR2"));
222+
auto bid1 = spc.attach(new wsp::workbranch);
223+
auto bid2 = spc.attach(new wsp::workbranch);
270224
auto sid1 = spc.attach(new wsp::supervisor(2, 4));
271225
auto sid2 = spc.attach(new wsp::supervisor(2, 4));
272226
spc[sid1].supervise(spc[bid1]); // start supervising
273227
spc[sid2].supervise(spc[bid2]); // start supervising
274228

275-
// Manual assignment
276-
spc.for_each([](wsp::workbranch& each){
277-
each.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
278-
each.wait_tasks();
279-
});
280229
// Automatic assignment
281230
spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
282231
spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});

benchmark/bench1.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ int main(int argn, char** argvs) {
1010
fprintf(stderr, "Invalid parameter! usage: [threads + tasks]\n");
1111
return -1;
1212
}
13-
wsp::workbranch wb("bench", thread_nums);
13+
wsp::workbranch wb(thread_nums);
1414
auto time_cost = timewait([&]{
1515
auto task = []{/* empty task */};
1616
for (int i = 0; i < task_nums/10; ++i) {

benchmark/bench3.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ int main(int argn, char** argvs) {
1212
}
1313
wsp::workspace spc;
1414
for (int i = 0; i < thread_nums/2; ++i) {
15-
spc.attach(new wsp::workbranch("xx", 2));
15+
spc.attach(new wsp::workbranch(2));
1616
}
1717
auto time_cost = timewait([&]{
1818
auto task = []{/* empty task */};

example/CMakeLists.txt

-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,3 @@ target_link_libraries(e5 PRIVATE Threads::Threads)
2424

2525
add_executable(e6 e6.cc)
2626
target_link_libraries(e6 PRIVATE Threads::Threads)
27-
28-
add_executable(e7 e7.cc)
29-
target_link_libraries(e7 PRIVATE Threads::Threads)

example/e1.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
int main() {
44
// 2 threads
5-
wsp::workbranch br("My First BR", 2);
5+
wsp::workbranch br(2);
66

77
// no return
88
br.submit([]{ std::cout<<"hello world"<<std::endl; });

example/e2.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
int main() {
44
// 1 threads
5-
wsp::workbranch br("My Second BR");
5+
wsp::workbranch br;
66

77
// normal task
88
br.submit<wsp::task::nor>([]{ std::cout<<"task B done\n";});

example/e3.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include <workspace/workspace.h>
22

33
int main() {
4-
wsp::workbranch br("My Third BR");
4+
wsp::workbranch br;
55

66
br.submit<wsp::task::seq>([]{std::cout<<"task 1 done\n";},
77
[]{std::cout<<"task 2 done\n";},

example/e4.cc

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
#include <workspace/workspace.h>
22

33
int main() {
4-
wsp::workbranch br1("BR-1", 2);
5-
wsp::workbranch br2("BR-2", 2);
4+
wsp::workbranch br1(2);
5+
wsp::workbranch br2(2);
66

77
// 2 <= thread number <= 4
88
// time interval: 1000 ms
99
wsp::supervisor sp(2, 4, 1000);
1010

11-
sp.enable_log(std::cout);
12-
sp.set_tick_cb([]{
11+
sp.set_tick_cb([&br1, &br2]{
1312
auto now = std::chrono::system_clock::now();
1413
std::time_t timestamp = std::chrono::system_clock::to_time_t(now);
1514
std::tm local_time = *std::localtime(&timestamp);
1615
static char buffer[40];
1716
std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &local_time);
18-
std::cout<<"["<<buffer<<"] ";
17+
std::cout<<"["<<buffer<<"] "<<"br1: [workers] "<<br1.num_workers()<<" | [blocking-tasks] "<<br1.num_tasks()<<'\n';
18+
std::cout<<"["<<buffer<<"] "<<"br2: [workers] "<<br2.num_workers()<<" | [blocking-tasks] "<<br2.num_tasks()<<'\n';
1919
});
2020

2121
sp.supervise(br1); // start supervising

example/e5.cc

+12-22
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,19 @@
11
#include <workspace/workspace.h>
2-
#define TASKS 1000
3-
#define SLEEP_FOR(ms) std::this_thread::sleep_for(std::chrono::milliseconds(ms))
42

53
int main() {
6-
wsp::workbranch main_br("Main ");
7-
wsp::workbranch help_br("Helper");
4+
wsp::workspace spc;
85

9-
wsp::supervisor main_br_sp(2, 4); // interval 500(ms)
10-
wsp::supervisor help_br_sp(0, 2); // interval 500(ms)
11-
main_br_sp.enable_log();
12-
help_br_sp.enable_log();
6+
auto bid1 = spc.attach(new wsp::workbranch);
7+
auto bid2 = spc.attach(new wsp::workbranch);
8+
auto sid1 = spc.attach(new wsp::supervisor(2, 4));
9+
auto sid2 = spc.attach(new wsp::supervisor(2, 4));
1310

14-
main_br_sp.supervise(main_br);
15-
help_br_sp.supervise(help_br);
11+
spc[sid1].supervise(spc[bid1]); // start supervising
12+
spc[sid2].supervise(spc[bid2]); // start supervising
1613

17-
for (int i = 0; i < TASKS; ++i) {
18-
if (main_br.num_tasks() > 200) {
19-
if (help_br.num_tasks() > 200)
20-
SLEEP_FOR(20);
21-
else
22-
help_br.submit([]{SLEEP_FOR(10);});
23-
} else {
24-
main_br.submit([]{SLEEP_FOR(10);});
25-
}
26-
}
27-
main_br.wait_tasks();
28-
help_br.wait_tasks();
14+
// Automatic assignment
15+
spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
16+
spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
17+
18+
spc.for_each([](wsp::workbranch& each){each.wait_tasks(1000);}); // timeout: 1000ms
2919
}

example/e6.cc

+11-19
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
11
#include <workspace/workspace.h>
22

33
int main() {
4+
wsp::futures<int> futures;
45
wsp::workspace spc;
6+
spc.attach(new wsp::workbranch(2));
7+
8+
futures.add_back(spc.submit([]{return 1;}));
9+
futures.add_back(spc.submit([]{return 2;}));
510

6-
auto bid1 = spc.attach(new wsp::workbranch("BR1"));
7-
auto bid2 = spc.attach(new wsp::workbranch("BR2"));
8-
auto sid1 = spc.attach(new wsp::supervisor(2, 4));
9-
auto sid2 = spc.attach(new wsp::supervisor(2, 4));
10-
11-
spc[sid1].supervise(spc[bid1]); // start supervising
12-
spc[sid2].supervise(spc[bid2]); // start supervising
13-
14-
// Manual assignment
15-
spc.for_each([](wsp::workbranch& each){
16-
each.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
17-
each.wait_tasks();
18-
});
19-
20-
// Automatic assignment
21-
spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
22-
spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
23-
24-
spc.for_each([](wsp::workbranch& each){each.wait_tasks(1000);}); // timeout: 1000ms
11+
// wait all tasks done and get the results
12+
futures.wait();
13+
auto res = futures.get();
14+
for (auto& each: res) {
15+
std::cout<<"got "<<each<<std::endl;
16+
}
2517
}

example/e7.cc

-16
This file was deleted.

0 commit comments

Comments
 (0)