SpringBoot使用ForkJoinPool编写多线程程序
实例
直接看代码:
第一种情况:有返回值1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class ExecutorHandler
{
private UserInfoCaculator userInfoCaculator;
//@Value("${app.parallelism:20}")
private int parallelism;
"${app.threshold:5}") (
private int threshold;
private ForkJoinPool forkJoinPool;
private void init()
{
parallelism = Runtime.getRuntime().availableProcessors();
forkJoinPool = new ForkJoinPool(handleParallelism);
}
/**
* 并发执行计算用户信息
*
* @param userId
* @param metaDatas
* @return
*/
public List<UserInfo> execute(Integer userId, List<MetaData> metaDatas)
{
return forkJoinPool.invoke(new ForkTask(userId, metaDatas));
}
class ForkTask extends RecursiveTask<List<UserInfo>>
{
private Integer userId;
private List<MetaData> metaDatas;
public ForkTask(Integer userId, List<MetaData> metaDatas)
{
this.userId = userId;
this.metaDatas = metaDatas;
}
protected List<UserInfo> compute()
{
int listSize = metaDatas.size();
if (listSize <= handleThreshold)
{
// 到达门槛值,直接运行
return doCompute();
}
else
{
int split = listSize / 2;
ForkTask left = new ForkTask(userInfo, metaDatas.subList(0, split));
ForkTask right = new ForkTask(userInfo, metaDatas.subList(split, listSize));
left.fork();
right.fork();
left.join();
right.join();
List<userInfo> infos = new ArrayList<>();
infos.addAll(left.join());
infos.addAll(right.join());
return infos;
}
}
private List<UserInfo> doCompute()
{
List<UserInfo> infos = new ArrayList<>();
for (MetaData metaData : metaDatas)
{
infos.add(userInfoCaculator.calculate(userId,metaData));
}
return infos;
}
}
}
无返回值:可以使用Void类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
4j
public class ScheduleHandler
{
//@Value("${app.parallelism:20}")
private int parallelism;
"${app.threshold:10}") (
private int threshold;
private SchedulerService schedulerService;
private ForkJoinPool forkJoinPool;
private void init()
{
parallelism = Runtime.getRuntime().availableProcessors();
forkJoinPool = new ForkJoinPool(handleParallelism);
}
/**
* 执行
*/
public void execute()
{
// 默认是同步执行
execute(false);
}
/**
* 执行
*
* @param async 是否需要异步执行
*/
public void execute(boolean async)
{
if (async)
{
// 异步
new Thread(() -> doExecute()).start();
}
else
{
// 同步
doExecute();
}
}
public void doExecute()
{
List<Article> articles = getAllArticles();
if (articles == null || articles.isEmpty())
{
log.warn("articles is null or empty");
return;
}
try
{
forkJoinPool.invoke(new ForkTask(articles));
}
catch (Exception e)
{
log.error("fork join execute fail", e);
}
}
/**
* 获取要执行的文章列表
*
* @return
*/
private List<Article> getAllArticles()
{
return ArticleLoaderFactory.getArticleLoader().loadArticles();
}
class ForkTask extends
RecursiveTask<Void>
{
List<Article> articles;
public ForkTask(List<Article> articles)
{
this.articles = articles;
}
protected Void compute()
{
int listSize = articles.size();
if (listSize <= handleThreshold)
{
// 到达门槛值,直接运行
return doCompute();
}
else
{
int split = listSize / 2;
ForkTask left = new ForkTask(articles.subList(0, split));
ForkTask right = new ForkTask(articles.subList(split, listSize));
left.fork();
right.fork();
left.join();
right.join();
return null;
}
}
private Void doCompute()
{
for (String article : articles)
{
schedulerService.printArticle(article);
}
return null;
}
}
}
结论
- ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。(见 Java Tip: When to use ForkJoinPool vs ExecutorService )
- ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
- ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。
Read More
[1] Oracle官方文档
[2] Java 并发编程笔记:如何使用 ForkJoinPool 以及原理
[3] JAVA多线程系列–ForkJoinPool详解
[4] Java Tip: When to use ForkJoinPool vs ExecutorService
[5] 多线程 ForkJoinPool
[6] 关于看完ForkJoinPool和ForkJoinTask文章后一些总结
[7] Fork/Join框架浅谈
[8] Java 线程池ThreadPoolExecutor(基于jdk1.8)(一)
[9] springboot创建及使用多线程的几种方式
[10] Springboot对多线程的支持详解
[11] SpringBoot 并发编程学习历程(绝对的干货)
[12] 【spring boot】13.在spring boot下使用多线程