JUC
# JUC
# JUC 是什么
代码仓库待续..
C:\phpStudy\PHPTutorial\WWW\JAVA-BIG-PROJECT\Spring-Boot\gmall190401
参考: E:\course\1.程序员鱼皮分享\鱼皮分享-保姆级Java系列资源\05-Java高级\07-JUC
E:\course\大型电商--谷粒商城\4.课件和文档\高级篇\课件
课程讲师: 夏磊
java.util.concurrent在并发编程中使用的工具类
进程和线程是什么
进程:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
线程:通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度。
线程的六个状态
- NEW 新建
- RUNNABLE 准备就绪
- BLOCKED 阻塞
- WAITING 不见不散
- TIMED_WAITING 过时不候
- TERMINATED 终结
wait和sleep的区别
功能都是使当前线程暂停
wait会释放锁
sleep不会释放锁
什么是并发 什么是并行
并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:小米9今天上午10点,限量抢购
春运抢票
电商秒杀...
并行:多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
# Lock接口
锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。
# 如何使用
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Ticket {
private int number = 30;
private Lock lock = new ReentrantLock();
/* public synchronized void sale(){
synchronized(this){
}
if(number>0){
System.out.println(Thread.currentThread().getName()
+"\t 卖出"+number--+"号票\t还剩"+number
);
}
}*/
public void sale() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName()
+ "\t 卖出" + number-- + "号票\t还剩" + number
);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SaleTicket {
/**
* 卖票程序复习线程知识:三个售票员 卖出 30张票
*
* @param args
* @throws Exception 线程 操作 资源类,高内聚低耦合
*/
public static void main(String[] args) throws Exception {
Ticket ticket = new Ticket();
new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale(); },"AA").start();
new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale(); },"BB").start();
new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale(); },"CC").start();
/*new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 40; i++) {
ticket.sale();
}
}
}, "AA").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 40; i++) {
ticket.sale();
}
}
}, "BB").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 40; i++) {
ticket.sale();
}
}
}, "CC").start();
*/
}
}
/**
* w w h
* java.util.concurrent
* java.util.concurrent.atomic
* int i++ 100 AtomicInteger
* <p>
* java.util.concurrent.locks
**/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# 线程间通信
面试题:两个线程打印
两个线程,一个线程打印1-52,另一个打印字母A-Z打印顺序为12A34B...5152Z,要求用线程间通信
方法比较
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareDataOne {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition cd = lock.newCondition();
public void incr() throws InterruptedException {
lock.lock();
try {
//判断
while (number != 0) {
cd.await();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//通知
cd.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decr() throws InterruptedException {
lock.lock();
try {
//判断
while (number != 1) {
cd.await();
}
//干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//通知
cd.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
/**
* 现在两个线程
* 操作一个初始值为0的变量
* 实现一个线程对变量增加1,一个线程对变量减少1
* 交替,来10轮
* 、线程 操作 资源类 2、高内聚低耦合
* 1、判断
* 2、干活
* 3、通知
*
* 注意多线程之间的虚假唤醒
*/
public class NotifyWaitDemo {
public static void main(String[] args) {
ShareDataOne shareDataOne = new ShareDataOne();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareDataOne.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareDataOne.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareDataOne.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareDataOne.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "DD").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
synchronized同步方法实现
package com.atguigu.gmall.jucdemo;
/**
* 线程通信
*
* 现在两个线程
* 操作一个初始值为0的变量
* 实现一个线程对变量增加1,一个线程对变量减少1
* 交替,来10轮
* 、线程 操作 资源类 2、高内聚低耦合
* 1、判断
* 2、干活
* 3、通知
*
* 注意多线程之间的虚假唤醒
*/
public class SynchronizedNotifyWaitDemo {
public static void main(String[] args) {
SynchronizedShareDataOne synchronizedShareDataOne = new SynchronizedShareDataOne();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
synchronizedShareDataOne.increase();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
synchronizedShareDataOne.decrease();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
synchronizedShareDataOne.increase();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
}, "CC").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
synchronizedShareDataOne.decrease();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
}, "DD").start();
}
}
// 资源类
class SynchronizedShareDataOne {
private int number = 0;
public synchronized void increase() throws InterruptedException {
// 判断 (线程在哪wait 就在哪唤醒 不能使用if 使用if唤醒之后不会再判断条件)
//if (number != 0) {
// this.wait();
//}
// 循环判断
while (number != 0) {
this.wait();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
this.notifyAll();
}
public synchronized void decrease() throws InterruptedException {
// 判断
//if (number != 1) {
// this.wait();
//}
// 循环判断
while (number != 1) {
this.wait();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
this.notifyAll();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
使用if判断换成4个线程会导致错误,虚假唤醒
原因:在java多线程判断时,不能用if,程序出事出在了判断上面,
突然有一添加的线程进到if了,突然中断了交出控制权,没有进行验证,而是直接走下去了,加了两次,甚至多次
# 线程间定制化调用通信
案例: 多线程之间按顺序调用,实现A->B->C
AA打印5次,BB打印10次,CC打印15次
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareData{
private int num = 1;//1:AA,2:BB,3:CC
private Lock lock = new ReentrantLock();
// 钥匙
Condition cd1 = lock.newCondition();
Condition cd2 = lock.newCondition();
Condition cd3 = lock.newCondition();
public void print5(int total){
lock.lock();
try {
//判断
while (num!=1){
cd1.await();
}
//干活
for (int i = 1; i <= 5 ; i++) {
System.out.println(Thread.currentThread().getName()+ "\t "+total+"\t"+i);
}
//通知
num = 2;
cd2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10(int total){
lock.lock();
try {
//判断
while (num!=2){
cd2.await();
}
//干活
for (int i = 1; i <= 10 ; i++) {
System.out.println(Thread.currentThread().getName()+ "\t "+total+"\t"+i);
}
//通知
num = 3;
cd3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15(int total){
lock.lock();
try {
//判断
while (num!=3){
cd3.await();
}
//干活
for (int i = 1; i <= 15 ; i++) {
System.out.println(Thread.currentThread().getName()+ "\t "+total+"\t"+i);
}
//通知
num = 1;
cd1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
/**
*
* 多线程之间按顺序调用,实现A->B->C
* 三个线程启动,要求如下:
*
* AA打印5次,BB打印10次,CC打印15次
* 接着
* AA打印5次,BB打印10次,CC打印15次
* ......来10轮
*
*/
public class ThreadOrderAccess {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
shareData.print5(i);
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
shareData.print10(i);
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
shareData.print15(i);
}
},"CC").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# 集合类线程不安全及解决方案
多线程并发修改集合会报java.util.ConcurrentModificationException
ArrayList在迭代的时候如果同时对其进行修改就会抛出java.util.ConcurrentModificationException异常并发修改异常
小实验
package com.atguigu.gmall.jucdemo;
import java.util.ArrayList;
import java.util.UUID;
public class NotSafeArrayList {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
会报Exception in thread "16" Exception in thread "14" java.util.ConcurrentModificationException异常
ArrayList源码
public boolean add(E e) {
ensureCapacityInternal(size + 1);// Increments modCount!!
elementData[size++] = e;
return true;
}//没有synchronized线程不安全
2
3
4
5
# 解决方案
# Vector
List<String> list = new Vector<>();

查看Vector的源码
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
2
3
4
5
6
有synchronized 线程安全
# Collections
List<String> list = Collections.synchronizedList(new ArrayList<>());
Collections提供了方法synchronizedList保证list是同步线程安全的
那HashMap,HashSet是线程安全的吗?也不是
所以有同样的线程安全方法

# 写时复制

List<String> list = new CopyOnWriteArrayList<>();
# CopyOnWriteArrayList定义
A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
CopyOnWriteArrayList是arraylist的一种线程安全变体,其中所有可变操作(add、set等)都是通过生成底层数组的新副本来实现的。
举例: 名单签到

源码:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
package com.atguigu.gmall.jucdemo;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 请举例说明集合类是不安全的
*/
public class NotSafeDemo {
public static void main(String[] args) {
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 1; i <=30 ; i++) {
new Thread(()->{
map.put(UUID.randomUUID().toString().substring(0,8),Thread.currentThread().getName());
System.out.println(map);
},String.valueOf(i)).start();
}
}
private static void nosafeSet() {
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <=30 ; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
},String.valueOf(i)).start();
}
}
private static void nosafeList() {
// List<String> list = Arrays.asList("a","b","c");
// list.forEach(System.out::println);
List<String> list = new CopyOnWriteArrayList<>();
//Collections.synchronizedList(new ArrayList<>());
//new Vector<>();
//new ArrayList<>();
for (int i = 1; i <=30 ; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 扩展
Set<String> set = new HashSet<>();// 线程不安全
Set<String> set = new CopyOnWriteArraySet<>();// 线程安全
2
HashSet底层数据结构是什么? HashMap底层数据结构?
HashSet的add是放一个值 HashMap是放K V键值对
public HashSet() {
map = new HashMap<>();
}
private static final Object PRESENT = new Object();
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
2
3
4
5
6
7
Map<String,String> map = new HashMap<>();//线程不安全
Map<String,String> map = new ConcurrentHashMap<>();//线程安全
2
# 多线程锁 8锁问题
一个对象里面如果有多个synchronized方法,某一个时刻内,只能一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待.
换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法.
加个普通方法后发现和同步锁无关
换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们是同一个类的实例对象!
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.TimeUnit;
class Phone
{
public static synchronized void sendSMS() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello(){
System.out.println("--------Hello java190401!!!");
}
}
/**
*
* @Description: 8锁
* @author xialei
*
1 标准访问,先打印短信还是邮件
2 停4秒在短信方法内,先打印短信还是邮件
3 新增普通的hello方法,是先打短信还是hello
4 现在有两部手机,先打印短信还是邮件
5 两个静态同步方法,1部手机,先打印短信还是邮件
6 两个静态同步方法,2部手机,先打印短信还是邮件
7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
*/
public class Lock_8
{
public static void main(String[] args) throws Exception
{
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "AA").start();
Thread.sleep(100);
new Thread(() -> {
try {
// phone.sendEmail();
//phone.getHello();
phone2.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "BB").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# Callable接口
获取多线程的方式:
传统的是继承thread类和实现runnable接口,java5以后又有实现callable接口和java的线程池获得
原理:
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,
当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,
就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,
然后会返回结果或者抛出异常。
只计算一次
get方法放到最后
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
class MyThread implements Runnable{
@Override
public void run() {
}
}
class MyThread2 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"---Callable.call()");
return 200;
}
}
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//FutureTask(Callable<V> callable)
//FutureTask<Integer> ft = new FutureTask<Integer>(new MyThread2());
//FutureTask<Integer> ft2 = new FutureTask<Integer>(new MyThread2());
FutureTask<Integer> ft = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "---Callable.call()");
return 1024;
});
new Thread(ft,"zhang3").start();
// new Thread(ft2,"li4").start();
System.out.println(Thread.currentThread().getName());
while (!ft.isDone()){
System.out.println("-----wait");
}
System.out.println(ft.get());
//System.out.println(ft2.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# JUC强大的辅助类
# CountDownLatch类 减少计数
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
package com.atguigu.gmall.jucdemo.enums;
public enum CountryEnum {
ONE(1,"齐"),
TWO(2,"楚"),
THREE(3,"燕"),
FOUR(4,"赵"),
FIVE(5,"魏"),
SIX(6,"韩");
private int code;
private String message;
CountryEnum(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public static CountryEnum forEach_CountryEnum(int index){
CountryEnum[] values = CountryEnum.values();
for (CountryEnum countryEnum : values) {
if(countryEnum.getCode()==index){
return countryEnum;
}
}
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.atguigu.gmall.jucdemo;
import com.atguigu.gmall.jucdemo.enums.CountryEnum;
import java.util.concurrent.CountDownLatch;
/**
* @author xialei
* @Description: *让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
* <p>
* CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
* 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
* 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
* <p>
* 解释:6个同学陆续离开教室后值班同学才可以关门。
* <p>
* main主线程必须要等前面6个线程完成全部工作后,自己才能开干
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cd = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() +"\t 国被灭");
cd.countDown();
},CountryEnum.forEach_CountryEnum(i).getMessage()).start();
}
cd.await();
System.out.println(Thread.currentThread().getName() +"\t 秦灭六国,一统华夏");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# CyclicBarrier 循环栅栏
CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。
它要做的事情是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
线程进入屏障通过CyclicBarrier的await()方法。
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @Description: TODO(这里用一句话描述这个类的作用)
* @author xialei
*
* CyclicBarrier
* 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
* 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
* 直到最后一个线程到达屏障时,屏障才会开门,所有
* 被屏障拦截的线程才会继续干活。
* 线程进入屏障通过CyclicBarrier的await()方法。
*
* 集齐7颗龙珠就可以召唤神龙
*/
public class CyclicBarrierDemo {
public static void main(String[] args)
{
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("集齐7颗龙珠召唤神龙");
});
for (int i =1; i <=7 ; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() +"\t 星龙珠被收集");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Semaphore 信号量
在信号量上我们定义两种操作:
acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
要么一直等下去,直到有线程释放信号量,或超时。
release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author xialei
* <p>
* 在信号量上我们定义两种操作:
* acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
* 要么一直等下去,直到有线程释放信号量,或超时。
* release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
* <p>
* 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//三个停车位
Semaphore sp = new Semaphore(3);
//停六个汽车
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
try {
// 信号量减一
sp.acquire();
System.out.println(Thread.currentThread().getName() + "\t号车驶入停车位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t号车驶出停车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 信号量加一
sp.release();
}
},String.valueOf(i)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# ReentrantReadWriteLock 读写锁
package com.atguigu.gmall.jucdemo;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache{
private volatile Map<String, String> map = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, String value){
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 准备写入数据" + key);
TimeUnit.MILLISECONDS.sleep(200);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写入数据完成" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public void get(String key){
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 准备读取数据" + key);
TimeUnit.MILLISECONDS.sleep(200);
String value = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读取数据完成" + value);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();
for (int i = 1; i <=5 ; i++) {
String key = String.valueOf(i);
new Thread(() -> {
myCache.put(key, UUID.randomUUID().toString().substring(0, 8));
}, String.valueOf(i)).start();
}
TimeUnit.SECONDS.sleep(2);
for (int i = 1; i <=5 ; i++) {
String key = String.valueOf(i);
new Thread(() -> {
myCache.get(key);
}, String.valueOf(i)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# BlockingQueue 阻塞队列
# 概念
阻塞:必须要阻塞/不得不阻塞
阻塞队列是一个队列,在数据结构中起的作用如下图:

线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
# 阻塞队列的用处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
# 架构梳理

# 种类分析
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
- LinkedTransferQueue:由链表组成的无界阻塞队列。
- LinkedBlockingDeque:由链表组成的双向阻塞队列。
# BlockingQueue 核心方法

| 抛出异常 | 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException |
|---|---|
| 特殊值 | 插入方法,成功ture失败false 移除方法,成功返回出队列的元素,队列里没有就返回null |
| 一直阻塞 | 1.当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 2.当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 |
| 超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//第一组
//System.out.println(blockingQueue.add("a"));
//System.out.println(blockingQueue.add("b"));
//System.out.println(blockingQueue.add("c"));
//System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());
//第二组
//System.out.println(blockingQueue.offer("a"));
//System.out.println(blockingQueue.offer("b"));
//System.out.println(blockingQueue.offer("c"));
//System.out.println(blockingQueue.offer("x"));
//System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());
//第三组
//blockingQueue.put("a");
//blockingQueue.put("b");
//blockingQueue.put("c");
//blockingQueue.put("x");
//System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());
//第四组
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(4, TimeUnit.SECONDS));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# ThreadPool 线程池
# 为什么要用线程池
例子:
10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。
现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。
线程池的优势:
线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:线程复用;控制最大并发数;管理线程。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
# 如何使用
# 架构说明
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类

# 编码实现
- Executors.newFixedThreadPool(int)
执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的是LinkedBlockingQueue
2
3
4
- Executors.newSingleThreadExecutor()
一个任务一个任务的执行,一池一线程
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
//newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue
2
3
4
- Executors.newCachedThreadPool()
执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
//newCachedThreadPool创建的线程池将corePoolSize设置为0,
//将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
2
3
4
5
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池 * Arrays * Collections * Executors
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
//List list = new ArrayList();
//List list = Arrays.asList("a", "b");
// 固定数的线程池,一池五线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);// 一个银行网点,5个受理业务的窗口
//ExecutorService threadPool = Executors.newSingleThreadExecutor();// 一个银行网点,1个受理业务的窗口
ExecutorService threadPool = Executors.newCachedThreadPool();// 一个银行网点,可扩展受理业务的窗口
// 10个顾客请求
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# ThreadPoolExecutor底层原理

# 线程池的七大参数
1、corePoolSize:线程池中的常驻核心线程数
2、maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
3、keepAliveTime:
多余的空闲线程的存活时间,当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到 只剩下corePoolSize个线程为止
4、unit:keepAliveTime的单位
5、workQueue:任务队列,被提交但尚未被执行的任务
6、threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
7、handler:
拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝 请求执行的runnable的策略
# 线程池工作原理


以下重要:以下重要:以下重要:以下重要:以下重要:以下重要:
1、在创建了线程池后,线程池中的线程数为零。
2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
# 线程池用哪个?生产中如设置合理参数
# 线程池的拒绝策略
等待队列已经排满了,再也塞不下新任务了
同时,线程池中的max线程也达到了,无法继续为新任务服务。
这个是时候我们就需要拒绝策略机制合理的处理这个问题。
# JDK内置的拒绝策略
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
以上内置拒绝策略均实现了RejectedExecutionHandle接口
# 在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?超级大坑
答案是一个都不用,我们工作中只能使用自定义的
Executors中JDK已经给你提供了,为什么不用?

# 在工作中如何使用线程池,如何自定义线程池
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.*;
/**
* 线程池例子
* <p>
* Arrays
* Collections
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
//new ThreadPoolExecutor.DiscardOldestPolicy()
//new ThreadPoolExecutor.CallerRunsPolicy()
//new ThreadPoolExecutor.AbortPolicy()
);
// 10 个顾客请求
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 号业务员办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 分支合并框架
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并

# ForkJoinPool类

分支合并池 类比=> 线程池
# ForkJoinTask类

ForkJoinTask 类比=> FutureTask
# RecursiveTask类

递归任务:继承后可以实现递归(自己调自己)调用的任务
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.RecursiveTask;
// 斐波那契数列
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
protected Integer compute() {
if (n <= 1) return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ForkJoinPool ForkJoinTask RecursiveTask类应用案例
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class MyTask extends RecursiveTask<Integer> {
public static final int ADJUST_VALUE = 10;
private final int begin;
private final int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - begin) <= ADJUST_VALUE) {
for (int i = begin; i <= end; i++) {
result = result + i;
}
} else {
int mid = (begin + end) / 2;
MyTask myTask1 = new MyTask(begin, mid);
MyTask myTask2 = new MyTask(mid + 1, end);
myTask1.fork();
myTask2.fork();
result = myTask1.join() + myTask2.join();
}
return result;
}
}
/**
* 分支合并例子
* ForkJoinPool
* ForkJoinTask
* RecursiveTask
*/
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0, 100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
System.out.println(forkJoinTask.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 异步回调
同步 异步 异步回调
#
案例
package com.atguigu.gmall.jucdemo;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
//同步,异步,异步回调
//MQ消息中间件
//同步
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("CompletableFuture.runAsync");
});
future1.get();
//异步回调
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("CompletableFuture.supplyAsync");
int a = 10 / 0;
return 1024;
});
future2.whenComplete(
(t, u) -> {
System.out.println("*****t=" + t);
System.out.println("*****u=" + u);
}
).exceptionally(
f -> {
System.out.println(f.getMessage());
return 444;
}
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
输出为:
CompletableFuture.runAsync
CompletableFuture.supplyAsync
*****t=null
*****u=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
2
3
4
5