CS-Notes/notes/Java 并发.md

1687 lines
64 KiB
Markdown
Raw Normal View History

2018-04-08 21:27:55 +08:00
<!-- GFM-TOC -->
* [一、线程状态转换](#一线程状态转换)
* [新建New](#新建new)
* [可运行Runnable](#可运行runnable)
* [阻塞Blocking](#阻塞blocking)
* [无限期等待Waiting](#无限期等待waiting)
* [限期等待Timed Waiting](#限期等待timed-waiting)
* [死亡Terminated](#死亡terminated)
* [二、使用线程](#二使用线程)
* [实现 Runnable 接口](#实现-runnable-接口)
* [实现 Callable 接口](#实现-callable-接口)
* [继承 Thread 类](#继承-thread-类)
* [实现接口 VS 继承 Thread](#实现接口-vs-继承-thread)
* [三、基础线程机制](#三基础线程机制)
* [Executor](#executor)
* [Daemon](#daemon)
* [sleep()](#sleep)
* [yield()](#yield)
* [四、中断](#四中断)
* [InterruptedException](#interruptedexception)
* [interrupted()](#interrupted)
* [Executor 的中断操作](#executor-的中断操作)
* [五、互斥同步](#五互斥同步)
* [synchronized](#synchronized)
* [ReentrantLock](#reentrantlock)
* [synchronized 和 ReentrantLock 比较](#synchronized-和-reentrantlock-比较)
* [六、线程之间的协作](#六线程之间的协作)
* [join()](#join)
* [wait() notify() notifyAll()](#wait-notify-notifyall)
* [await() signal() signalAll()](#await-signal-signalall)
2018-04-08 23:34:51 +08:00
* [七、J.U.C - AQS](#七juc---aqs)
* [CountdownLatch](#countdownlatch)
* [CyclicBarrier](#cyclicbarrier)
* [Semaphore](#semaphore)
* [八、J.U.C - 其它组件](#八juc---其它组件)
* [FutureTask](#futuretask)
2018-04-08 21:27:55 +08:00
* [BlockingQueue](#blockingqueue)
2018-04-08 23:34:51 +08:00
* [ForkJoin](#forkjoin)
* [九、线程不安全示例](#九线程不安全示例)
* [十、Java 内存模型](#十java-内存模型)
2018-04-08 21:27:55 +08:00
* [主内存与工作内存](#主内存与工作内存)
* [内存间交互操作](#内存间交互操作)
* [内存模型三大特性](#内存模型三大特性)
* [先行发生原则](#先行发生原则)
2018-04-08 23:34:51 +08:00
* [十一、线程安全](#十一线程安全)
2018-04-27 12:51:17 +08:00
* [线程安全定义](#线程安全定义)
2018-04-08 21:27:55 +08:00
* [线程安全分类](#线程安全分类)
* [线程安全的实现方法](#线程安全的实现方法)
2018-04-08 23:34:51 +08:00
* [十二、锁优化](#十二锁优化)
2018-05-20 20:42:51 +08:00
* [自旋锁](#自旋锁)
2018-04-08 21:27:55 +08:00
* [锁消除](#锁消除)
* [锁粗化](#锁粗化)
* [轻量级锁](#轻量级锁)
* [偏向锁](#偏向锁)
2018-04-08 23:34:51 +08:00
* [十三、多线程开发良好的实践](#十三多线程开发良好的实践)
2018-04-08 21:27:55 +08:00
* [参考资料](#参考资料)
<!-- GFM-TOC -->
# 一、线程状态转换
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//ace830df-9919-48ca-91b5-60b193f593d2.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
## 新建New
创建后尚未启动。
## 可运行Runnable
可能正在运行,也可能正在等待 CPU 时间片。
包含了操作系统线程状态中的 Running 和 Ready。
## 阻塞Blocking
等待获取一个排它锁,如果其线程释放了锁就会结束此状态。
## 无限期等待Waiting
2018-04-09 17:13:32 +08:00
等待其它线程显式地唤醒,否则不会被分配 CPU 时间片。
2018-04-08 21:27:55 +08:00
| 进入方法 | 退出方法 |
| --- | --- |
2018-04-09 12:17:22 +08:00
| 没有设置 Timeout 参数的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
2018-04-08 21:27:55 +08:00
| 没有设置 Timeout 参数的 Thread.join() 方法 | 被调用的线程执行完毕 |
| LockSupport.park() 方法 | - |
## 限期等待Timed Waiting
2018-04-27 12:51:17 +08:00
无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。
2018-04-08 21:27:55 +08:00
2018-04-09 12:17:22 +08:00
调用 Thread.sleep() 方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。
调用 Object.wait() 方法使线程进入限期等待或者无限期等待时,常常用“挂起一个线程”进行描述。
2018-05-20 20:42:51 +08:00
睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。
阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁;而等待是主动的,通过调用 Thread.sleep() 和 Object.wait() 等方法进入。
2018-04-08 21:27:55 +08:00
| 进入方法 | 退出方法 |
| --- | --- |
| Thread.sleep() 方法 | 时间结束 |
2018-04-09 12:17:22 +08:00
| 设置了 Timeout 参数的 Object.wait() 方法 | 时间结束 / Object.notify() / Object.notifyAll() |
2018-04-08 21:27:55 +08:00
| 设置了 Timeout 参数的 Thread.join() 方法 | 时间结束 / 被调用的线程执行完毕 |
| LockSupport.parkNanos() 方法 | - |
| LockSupport.parkUntil() 方法 | - |
## 死亡Terminated
可以是线程结束任务之后自己结束,或者产生了异常而结束。
# 二、使用线程
有三种使用线程的方法:
2018-05-20 20:42:51 +08:00
- 实现 Runnable 接口;
- 实现 Callable 接口;
- 继承 Thread 类。
2018-04-08 21:27:55 +08:00
实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以说任务是通过线程驱动从而执行的。
## 实现 Runnable 接口
需要实现 run() 方法。
通过 Thread 调用 start() 方法来启动线程。
```java
public class MyRunnable implements Runnable {
public void run() {
// ...
}
}
```
```java
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start();
}
```
## 实现 Callable 接口
与 Runnable 相比Callable 可以有返回值,返回值通过 FutureTask 进行封装。
```java
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
```
```java
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
```
## 继承 Thread 类
同样也是需要实现 run() 方法,并且最后也是调用 start() 方法来启动线程。
```java
public class MyThread extends Thread {
public void run() {
// ...
}
}
```
```java
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
```
## 实现接口 VS 继承 Thread
实现接口会更好一些,因为:
2018-05-20 20:42:51 +08:00
- Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
- 类可能只要求可执行就行,继承整个 Thread 类开销过大。
2018-04-08 21:27:55 +08:00
# 三、基础线程机制
## Executor
2018-04-27 22:27:55 +08:00
Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。
2018-04-08 21:27:55 +08:00
主要有三种 Executor
2018-05-20 20:42:51 +08:00
- CachedThreadPool一个任务创建一个线程
- FixedThreadPool所有任务只能使用固定大小的线程
- SingleThreadExecutor相当于大小为 1 的 FixedThreadPool。
2018-04-08 21:27:55 +08:00
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable());
}
executorService.shutdown();
}
```
## Daemon
守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。
当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。
main() 属于非守护线程。
使用 setDaemon() 方法将一个线程设置为守护线程。
```java
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setDaemon(true);
}
```
## sleep()
Thread.sleep(millisec) 方法会休眠当前正在执行的线程millisec 单位为毫秒。
sleep() 可能会抛出 InterruptedException因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。
```java
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
```
## yield()
对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。
```java
public void run() {
Thread.yield();
}
```
# 四、中断
一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。
## InterruptedException
通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。
2018-05-20 20:42:51 +08:00
对于以下代码,在 main() 中启动一个线程之后再中断它,由于线程中调用了 Thread.sleep() 方法,因此会抛出一个 InterruptedException从而提前结束线程不执行之后的语句。
2018-04-08 21:27:55 +08:00
```java
public class InterruptExample {
private static class MyThread1 extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2018-05-20 20:42:51 +08:00
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new MyThread1();
thread1.start();
thread1.interrupt();
System.out.println("Main run");
}
2018-04-08 21:27:55 +08:00
```
```html
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at InterruptExample.lambda$main$0(InterruptExample.java:5)
at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
```
## interrupted()
如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。
但是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。
```java
public class InterruptExample {
private static class MyThread2 extends Thread {
@Override
public void run() {
while (!interrupted()) {
// ..
}
System.out.println("Thread end");
}
}
}
```
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new MyThread2();
thread2.start();
thread2.interrupt();
}
```
2018-04-08 21:27:55 +08:00
```html
Thread end
```
## Executor 的中断操作
调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。
以下使用 Lambda 创建线程,相当于创建了一个匿名内部线程。
```java
2018-05-20 20:42:51 +08:00
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
2018-04-08 21:27:55 +08:00
}
```
```html
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。
```java
Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);
```
# 五、互斥同步
Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized而另一个是 JDK 实现的 ReentrantLock。
## synchronized
**1. 同步一个代码块**
```java
public void func () {
synchronized (this) {
// ...
}
}
```
它只作用于同一个对象,如果调用两个对象上的同步代码块,就不会进行同步。
2018-05-20 20:42:51 +08:00
对于以下代码,使用 ExecutorService 执行了两个线程(这两个线程使用 Lambda 创建),由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程进入同步语句块时,另一个线程就必须等待。
2018-04-08 21:27:55 +08:00
```java
public class SynchronizedExample {
public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
2018-04-08 21:27:55 +08:00
}
```
```html
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
```
对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉执行。
```java
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}
```
```html
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
```
**2. 同步一个方法**
```java
public synchronized void func () {
// ...
}
```
它和同步代码块一样,只作用于同一个对象。
**3. 同步一个类**
```java
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}
```
作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也需要进行同步。
```java
public class SynchronizedExample {
public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
2018-04-08 21:27:55 +08:00
}
```
```html
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
```
**4. 同步一个静态方法**
```java
public synchronized static void fun() {
// ...
}
```
作用于整个类。
## ReentrantLock
```java
public class LockExample {
private Lock lock = new ReentrantLock();
public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 确保释放锁,从而避免发生死锁。
}
}
}
```
```java
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
```
```html
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
```
2018-05-20 20:42:51 +08:00
ReentrantLock 是 java.util.concurrentJ.U.C包中的锁相比于 synchronized它多了以下高级功能
2018-04-08 21:27:55 +08:00
**1. 等待可中断**
当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情,可中断特性对处理执行时间非常长的同步块很有帮助。
**2. 可实现公平锁**
公平锁是指多个线程在等待同一个锁时必须按照申请锁的时间顺序来依次获得锁而非公平锁则不保证这一点在锁被释放时任何一个等待锁的线程都有机会获得锁。synchronized 中的锁是非公平的ReentrantLock 默认情况下也是非公平的,但可以通过带布尔值的构造函数要求使用公平锁。
**3. 锁绑定多个条件**
一个 ReentrantLock 对象可以同时绑定多个 Condition 对象,而在 synchronized 中,锁对象的 wait() 和 notify() 或 notifyAll() 方法可以实现一个隐含的条件,如果要和多于一个的条件关联的时候,就不得不额外地添加一个锁,而 ReentrantLock 则无须这样做,只需要多次调用 newCondition() 方法即可。
## synchronized 和 ReentrantLock 比较
**1. 锁的实现**
synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
**2. 性能**
2018-05-20 20:42:51 +08:00
从性能上来看,新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等。目前来看它和 ReentrantLock 的性能基本持平了,因此性能因素不再是选择 ReentrantLock 的理由。synchronized 有更大的性能优化空间,应该优先考虑 synchronized。
2018-04-08 21:27:55 +08:00
**3. 功能**
ReentrantLock 多了一些高级功能。
**4. 使用选择**
除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。
# 六、线程之间的协作
2018-04-09 12:17:22 +08:00
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。
2018-04-08 21:27:55 +08:00
## join()
2018-05-20 20:42:51 +08:00
在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待, 直到目标线程结束。
2018-04-08 21:27:55 +08:00
对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,因此 b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先与 b 线程的输出。
```java
public class JoinExample {
private class A extends Thread {
@Override
public void run() {
System.out.println("A");
}
}
private class B extends Thread {
private A a;
B(A a) {
this.a = a;
}
@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B");
}
}
public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
2018-04-08 21:27:55 +08:00
}
```
```
A
B
```
## wait() notify() notifyAll()
调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。
它们都属于 Object 的一部分,而不属于 Thread。
只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateExeception。
2018-04-09 12:17:22 +08:00
使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。
2018-04-08 21:27:55 +08:00
```java
public class WaitNotifyExample {
public synchronized void before() {
System.out.println("before");
notifyAll();
}
public synchronized void after() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after");
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
2018-04-08 21:27:55 +08:00
}
```
```html
before
after
```
**wait() 和 sleep() 的区别**
2018-04-09 12:17:22 +08:00
1. wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
2018-04-08 21:27:55 +08:00
2. wait() 会释放锁sleep() 不会。
## await() signal() signalAll()
2018-04-09 12:17:22 +08:00
java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。相比于 wait() 这种等待方式await() 可以指定等待的条件,因此更加灵活。
2018-04-08 21:27:55 +08:00
使用 Lock 来获取一个 Condition 对象。
```java
public class AwaitSignalExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void before() {
lock.lock();
try {
System.out.println("before");
condition.signalAll();
} finally {
lock.unlock();
}
}
public void after() {
lock.lock();
try {
condition.await();
System.out.println("after");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AwaitSignalExample example = new AwaitSignalExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
2018-04-08 21:27:55 +08:00
}
```
2018-04-09 12:17:22 +08:00
```html
before
after
```
2018-04-08 23:34:51 +08:00
# 七、J.U.C - AQS
java.util.concurrentJ.U.C大大提高了并发性能AQS 被认为是 J.U.C 的核心。
## CountdownLatch
用来控制一个线程等待多个线程。
维护了一个计数器 cnt每次调用 countDown() 方法会让计数器的值减 1减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//CountdownLatch.png" width=""/> </div><br>
2018-04-08 23:34:51 +08:00
```java
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
2018-04-22 16:23:52 +08:00
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
2018-04-08 23:34:51 +08:00
ExecutorService executorService = Executors.newCachedThreadPool();
2018-04-22 16:23:52 +08:00
for (int i = 0; i < totalThread; i++) {
2018-04-08 23:34:51 +08:00
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
```
```html
run..run..run..run..run..run..run..run..run..run..end
```
## CyclicBarrier
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
2018-04-09 12:17:22 +08:00
和 CountdownLatch 相似,都是通过维护计数器来实现的。但是它的计数器是递增的,每次执行 await() 方法之后,计数器会加 1直到计数器的值和设置的值相等等待的所有线程才会继续执行。和 CountdownLatch 的另一个区别是CyclicBarrier 的计数器可以循环使用,所以它才叫做循环屏障。
2018-04-08 23:34:51 +08:00
下图应该从下往上看才正确。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//CyclicBarrier.png" width=""/> </div><br>
2018-04-08 23:34:51 +08:00
```java
public class CyclicBarrierExample {
public static void main(String[] args) throws InterruptedException {
2018-04-22 16:23:52 +08:00
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
2018-04-08 23:34:51 +08:00
ExecutorService executorService = Executors.newCachedThreadPool();
2018-04-22 16:23:52 +08:00
for (int i = 0; i < totalThread; i++) {
2018-04-08 23:34:51 +08:00
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
```
```html
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
```
## Semaphore
Semaphore 就是操作系统中的信号量,可以控制对互斥资源的访问线程数。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//Semaphore.png" width=""/> </div><br>
2018-04-08 23:34:51 +08:00
2018-04-09 12:17:22 +08:00
以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。
2018-04-08 23:34:51 +08:00
```java
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
2018-04-10 11:05:18 +08:00
} finally {
semaphore.release();
2018-04-08 23:34:51 +08:00
}
});
}
executorService.shutdown();
}
}
```
```html
2 1 2 2 2 2 2 1 2 2
```
# 八、J.U.C - 其它组件
## FutureTask
2018-04-09 12:32:53 +08:00
在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future<V> 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future<V> 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。
2018-04-08 23:34:51 +08:00
```java
public class FutureTask<V> implements RunnableFuture<V>
```
```java
public interface RunnableFuture<V> extends Runnable, Future<V>
```
2018-04-09 12:32:53 +08:00
当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,用一个线程去执行该任务,然后其它线程继续执行其它任务。当需要该任务的计算结果时,再通过 FutureTask 的 get() 方法获取。
2018-04-08 23:34:51 +08:00
```java
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});
Thread computeThread = new Thread(futureTask);
computeThread.start();
Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
```
```html
other task is running...
4950
```
2018-04-08 21:27:55 +08:00
## BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:
- **FIFO 队列** LinkedBlockingQueue、ArrayListBlockingQueue固定长度
- **优先级队列** PriorityBlockingQueue
提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,指到队列有空闲位置。
**使用 BlockingQueue 实现生产者消费者问题**
```java
2018-04-09 12:32:53 +08:00
public class ProducerConsumer {
2018-04-08 21:27:55 +08:00
2018-04-09 12:32:53 +08:00
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
2018-04-09 12:34:38 +08:00
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
2018-04-09 12:32:53 +08:00
System.out.print("produce..");
2018-04-08 21:27:55 +08:00
}
}
2018-04-09 12:32:53 +08:00
private static class Consumer extends Thread {
2018-04-08 21:27:55 +08:00
2018-04-09 12:32:53 +08:00
@Override
public void run() {
try {
2018-04-09 12:34:38 +08:00
String product = queue.take();
2018-04-09 12:32:53 +08:00
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
2018-04-08 21:27:55 +08:00
}
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
2018-04-08 21:27:55 +08:00
}
}
```
```html
2018-04-09 12:32:53 +08:00
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
2018-04-08 21:27:55 +08:00
```
2018-04-08 23:34:51 +08:00
## ForkJoin
2018-04-09 13:33:32 +08:00
主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。
```java
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threhold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threhold) {
// 任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
```
```java
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
```
ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。
```java
public class ForkJoinPool extends AbstractExecutorService
```
ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务避免和队列所属线程发生竞争。例如下图中Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。
2018-04-08 23:34:51 +08:00
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//15b45dc6-27aa-4519-9194-f4acfa2b077f.jpg" width=""/> </div><br>
2018-04-08 23:34:51 +08:00
# 九、线程不安全示例
2018-04-08 21:27:55 +08:00
如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。
以下代码演示了 1000 个线程同时对 cnt 执行自增操作,操作结束之后它的值为 997 而不是 1000。
```java
public class ThreadUnsafeExample {
private int cnt = 0;
public void add() {
cnt++;
}
public int get() {
return cnt;
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
ThreadUnsafeExample example = new ThreadUnsafeExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
2018-04-08 21:27:55 +08:00
}
2018-05-20 20:42:51 +08:00
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
2018-04-08 21:27:55 +08:00
}
```
```html
997
```
2018-04-08 23:34:51 +08:00
# 十、Java 内存模型
2018-04-08 21:27:55 +08:00
2018-04-15 15:41:54 +08:00
Java 内存模型试图屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。
2018-04-08 21:27:55 +08:00
## 主内存与工作内存
处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。
2018-04-15 15:41:54 +08:00
加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//68778c1b-15ab-4826-99c0-3b4fd38cb9e9.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。
线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//47358f87-bc4c-496f-9a90-8d696de94cee.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
## 内存间交互操作
Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//536c6dfd-305a-4b95-b12c-28ca5e8aa043.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
- read把一个变量的值从主内存传输到工作内存中
- load在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
- use把工作内存中一个变量的值传递给执行引擎
- assign把一个从执行引擎接收到的值赋给工作内存的变量
- store把工作内存的一个变量的值传送到主内存中
- write在 store 之后执行,把 store 得到的值放入主内存的变量中
- lock作用于主内存的变量
- unlock
## 内存模型三大特性
### 1. 原子性
2018-04-09 16:35:58 +08:00
Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性,例如对一个 int 类型的变量执行 assign 赋值操作,这个操作就是原子性的。但是 Java 内存模型允许虚拟机将没有被 volatile 修饰的 64 位数据longdouble的读写操作划分为两次 32 位的操作来进行,即 load、store、read 和 write 操作可以不具备原子性。
2018-04-08 21:27:55 +08:00
2018-04-08 21:39:11 +08:00
有一个错误认识就是int 等原子性的变量在多线程环境中不会出现线程安全问题。前面的线程不安全示例代码中cnt 变量属于 int 类型变量1000 个线程对它进行自增操作之后,得到的值为 997 而不是 1000。
2018-04-08 21:27:55 +08:00
为了方便讨论,将内存间的交互操作简化为 3 个load、assign、store。
2018-04-09 16:35:58 +08:00
下图演示了两个线程同时对 cnt 变量进行操作load、assign、store 这一系列操作整体上看不具备原子性,那么在 T1 修改 cnt 并且还没有将修改后的值写入主内存T2 依然可以读入该变量的值。可以看出,这两个线程虽然执行了两次自增运算,但是主内存中 cnt 的值最后为 1 而不是 2。因此对 int 类型读写操作满足原子性只是说明 load、assign、store 这些单个操作具备原子性。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//ef8eab00-1d5e-4d99-a7c2-d6d68ea7fe92.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
AtomicInteger 能保证多个线程修改的原子性。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//952afa9a-458b-44ce-bba9-463e60162945.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
使用 AtomicInteger 重写之前线程不安全的代码之后得到以下线程安全实现:
```java
public class AtomicExample {
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
public int get() {
return cnt.get();
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicExample example = new AtomicExample(); // 只修改这条语句
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
2018-04-08 21:27:55 +08:00
}
2018-05-20 20:42:51 +08:00
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
2018-04-08 21:27:55 +08:00
}
```
```html
1000
```
除了使用原子类之外,也可以使用 synchronized 互斥锁来保证操作的完整性它对应的内存间交互操作为lock 和 unlock在虚拟机实现上对应的字节码指令为 monitorenter 和 monitorexit。
```java
public class AtomicSynchronizedExample {
private int cnt = 0;
public synchronized void add() {
cnt++;
}
public synchronized int get() {
return cnt;
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicSynchronizedExample example = new AtomicSynchronizedExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
2018-04-08 21:27:55 +08:00
}
2018-05-20 20:42:51 +08:00
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
2018-04-08 21:27:55 +08:00
}
```
```html
1000
```
### 2. 可见性
可见性指当一个线程修改了共享变量的值其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。
2018-04-15 15:41:54 +08:00
volatile 可保证可见性。synchronized 也能够保证可见性,对一个变量执行 unlock 操作之前必须把变量值同步回主内存。final 关键字也能保证可见性:被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程可以通过 this 引用访问到初始化了一半的对象),那么其它线程就能看见 final 字段的值。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
对前面的线程不安全示例中的 cnt 变量用 volatile 修饰,不能解决线程不安全问题,因为 volatile 并不能保证操作的原子性。
2018-04-09 16:35:58 +08:00
2018-04-08 21:27:55 +08:00
### 3. 有序性
有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。
在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
volatile 关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。
也可以通过 synchronized 来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。
## 先行发生原则
上面提到了可以用 volatile 和 synchronized 来保证有序性。除此之外JVM 还规定了先行发生原则,让一个操作无需控制就能先于另一个操作完成。
主要有以下这些原则:
### 1. 单一线程原则
> Single Thread rule
在一个线程内,在程序前面的操作先行发生于后面的操作。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//single-thread-rule.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
### 2. 管程锁定规则
> Monitor Lock Rule
一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//monitor-lock-rule.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
### 3. volatile 变量规则
> Volatile Variable Rule
对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//volatile-variable-rule.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
### 4. 线程启动规则
> Thread Start Rule
2018-04-15 15:41:54 +08:00
Thread 对象的 start() 方法调用先行发生于此线程的每一个动作。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//thread-start-rule.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
### 5. 线程加入规则
> Thread Join Rule
join() 方法返回先行发生于 Thread 对象的结束。
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//thread-join-rule.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
### 6. 线程中断规则
> Thread Interruption Rule
对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 Thread.interrupted() 方法检测到是否有中断发生。
### 7. 对象终结规则
> Finalizer Rule
一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。
### 8. 传递性
> Transitivity
如果操作 A 先行发生于操作 B操作 B 先行发生于操作 C那么操作 A 先行发生于操作 C。
2018-04-08 23:34:51 +08:00
# 十一、线程安全
2018-04-08 21:27:55 +08:00
2018-04-27 12:51:17 +08:00
## 线程安全定义
2018-04-27 11:23:03 +08:00
一个类在可以被多个线程安全调用时就是线程安全的。
2018-04-08 21:27:55 +08:00
2018-04-27 11:23:03 +08:00
## 线程安全分类
2018-04-27 12:51:17 +08:00
2018-04-08 21:27:55 +08:00
线程安全不是一个非真即假的命题,可以将共享数据按照安全程度的强弱顺序分成以下五类:不可变、绝对线程安全、相对线程安全、线程兼容和线程对立。
### 1. 不可变
不可变Immutable的对象一定是线程安全的无论是对象的方法实现还是方法的调用者都不需要再采取任何的线程安全保障措施只要一个不可变的对象被正确地构建出来那其外部的可见状态永远也不会改变永远也不会看到它在多个线程之中处于不一致的状态。
不可变的类型:
- final 关键字修饰的基本数据类型;
- String
- 枚举类型
- Number 部分子类,如 Long 和 Double 等数值包装类型BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的子类型的原子类 AtomicInteger 和 AtomicLong 则并非不可变的。
对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
```java
public class ImmutableExample {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
```
```html
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at ImmutableExample.main(ImmutableExample.java:9)
```
Collections.unmodifiableXXX() 先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常。
```java
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
```
2018-04-15 15:41:54 +08:00
多线程环境下,应当尽量使对象成为不可变,来满足线程安全。
2018-04-08 21:27:55 +08:00
### 2. 绝对线程安全
不管运行时环境如何,调用者都不需要任何额外的同步措施。
### 3. 相对线程安全
相对的线程安全需要保证对这个对象单独的操作是线程安全的,在调用的时候不需要做额外的保障措施,但是对于一些特定顺序的连续调用,就可能需要在调用端使用额外的同步手段来保证调用的正确性。
在 Java 语言中,大部分的线程安全类都属于这种类型,例如 Vector、HashTable、Collections 的 synchronizedCollection() 方法包装的集合等。
对于下面的代码,如果删除元素的线程删除了一个元素,而获取元素的线程试图访问一个已经被删除的元素,那么就会抛出 ArrayIndexOutOfBoundsException。
```java
public class VectorUnsafeExample {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 100; i++) {
vector.add(i);
}
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
});
executorService.execute(() -> {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
});
executorService.shutdown();
}
}
}
```
```html
Exception in thread "Thread-159738" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 3
at java.util.Vector.remove(Vector.java:831)
at VectorUnsafeExample.lambda$main$0(VectorUnsafeExample.java:14)
at VectorUnsafeExample$$Lambda$1/713338599.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
```
如果要保证上面的代码能正确执行下去,就需要对删除元素和获取元素的代码进行同步。
```java
executorService.execute(() -> {
synchronized (vector) {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
executorService.execute(() -> {
synchronized (vector) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
});
```
### 4. 线程兼容
线程兼容是指对象本身并不是线程安全的但是可以通过在调用端正确地使用同步手段来保证对象在并发环境中可以安全地使用我们平常说一个类不是线程安全的绝大多数时候指的是这一种情况。Java API 中大部分的类都是属于线程兼容的,如与前面的 Vector 和 HashTable 相对应的集合类 ArrayList 和 HashMap 等。
### 5. 线程对立
线程对立是指无论调用端是否采取了同步措施,都无法在多线程环境中并发使用的代码。由于 Java 语言天生就具备多线程特性,线程对立这种排斥多线程的代码是很少出现的,而且通常都是有害的,应当尽量避免。
## 线程安全的实现方法
### 1. 互斥同步
synchronized 和 ReentrantLock。
### 2. 非阻塞同步
互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题因此这种同步也称为阻塞同步Blocking Synchronization
从处理问题的方式上说互斥同步属于一种悲观的并发策略总是认为只要不去做正确的同步措施例如加锁那就肯定会出现问题无论共享数据是否真的会出现竞争它都要进行加锁这里讨论的是概念模型实际上虚拟机会优化掉很大一部分不必要的加锁、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。随着硬件指令集的发展我们有了另外一个选择基于冲突检测的乐观并发策略通俗地说就是先进行操作如果没有其他线程争用共享数据那操作就成功了如果共享数据有争用产生了冲突那就再采取其他的补偿措施最常见的补偿措施就是不断地重试直到成功为止这种乐观的并发策略的许多实现都不需要把线程挂起因此这种同步操作称为非阻塞同步Non-Blocking Synchronization
乐观锁需要操作和冲突检测这两个步骤具备原子性这里就不能再使用互斥同步来保证了只能靠硬件来完成。硬件支持的原子性操作最典型的是比较并交换Compare-and-SwapCAS
CAS 指令需要有 3 个操作数,分别是内存位置(在 Java 中可以简单理解为变量的内存地址,用 V 表示)、旧的预期值(用 A 表示)和新值(用 B 表示。CAS 指令执行时,当且仅当 V 符合旧预期值 A 时,处理器用新值 B 更新 V 的值,否则它就不执行更新。但是无论是否更新了 V 的值,都会返回 V 的旧值,上述的处理过程是一个原子操作。
J.U.C 包里面的整数原子类 AtomicInteger其中的 compareAndSet() 和 getAndIncrement() 等方法都使用了 Unsafe 类的 CAS 操作。
在下面的代码 1 中,使用了 AtomicInteger 执行了自增的操作。代码 2 是 incrementAndGet() 的源码,它调用了 unsafe 的 getAndAddInt() 。代码 3 是 getAndAddInt() 源码var1 指示内存位置var2 指示新值var4 指示操作需要加的数值,这里为 1。在代码 3 的实现中,通过 getIntVolatile(var1, var2) 得到旧的预期值。通过调用 compareAndSwapInt() 来进行 CAS 比较,如果 var2=var5那么就更新内存地址为 var1 的变量为 var5+var4。可以看到代码 3 是在一个循环中进行,发生冲突的做法是不断的进行重试。
```java
// 代码 1
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
```
```java
// 代码 2
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
```
```java
// 代码 3
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
```
2018-05-20 20:42:51 +08:00
ABA :如果一个变量初次读取的时候是 A 值,它的值被改成了 B后来又被改回为 A那 CAS 操作就会误认为它从来没有被改变过。J.U.C 包提供了一个带有标记的原子引用类“AtomicStampedReference”来解决这个问题它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。
2018-04-08 21:27:55 +08:00
### 3. 无同步方案
要保证线程安全,并不是一定就要进行同步,两者没有因果关系。同步只是保证共享数据争用时的正确性的手段,如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性,因此会有一些代码天生就是线程安全的。
**可重入代码Reentrant Code**
这种代码也叫做纯代码Pure Code可以在代码执行的任何时刻中断它转而去执行另外一段代码包括递归调用它本身而在控制权返回后原来的程序不会出现任何错误。相对线程安全来说可重入性是更基本的特性它可以保证线程安全即所有的可重入的代码都是线程安全的但是并非所有的线程安全的代码都是可重入的。
可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。我们可以通过一个简单的原则来判断代码是否具备可重入性:如果一个方法,它的返回结果是可以预测的,只要输入了相同的数据,就都能返回相同的结果,那它就满足可重入性的要求,当然也就是线程安全的。
**(二)栈封闭**
2018-04-10 17:24:31 +08:00
多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在栈中,属于线程私有的。
2018-04-08 21:27:55 +08:00
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
2018-05-20 20:42:51 +08:00
}
```
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
```java
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
2018-04-08 21:27:55 +08:00
}
```
```html
100
100
```
**线程本地存储Thread Local Storage**
如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。
符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完,其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”Thread-per-Request的处理方式这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。
可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。
对于以下代码thread1 中设置 threadLocal 为 1而 thread2 设置 threadLocal 为 2。过了一段时间之后thread1 读取 threadLocal 依然是 1不受 thread2 的影响。
```java
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
```
```html
1
```
为了理解 ThreadLocal先看以下代码
```java
public class ThreadLocalExample1 {
public static void main(String[] args) {
ThreadLocal threadLocal1 = new ThreadLocal();
ThreadLocal threadLocal2 = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal1.set(1);
threadLocal2.set(1);
});
Thread thread2 = new Thread(() -> {
threadLocal1.set(2);
threadLocal2.set(2);
});
thread1.start();
thread2.start();
}
}
```
它所对应的底层结构图为:
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//3646544a-cb57-451d-9e03-d3c4f5e4434a.png" width=""/> </div><br>
2018-04-08 21:27:55 +08:00
2018-04-22 16:23:52 +08:00
每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象Thread 类中就定义了 ThreadLocal.ThreadLocalMap 成员。
2018-04-08 21:27:55 +08:00
```java
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
```
当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中。
```java
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
```
get() 方法类似。
```java
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
```
ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。在一些场景 (尤其是使用线程池) 下,由于 ThreadLocal.ThreadLocalMap 的底层数据结构导致 ThreadLocal 有内存泄漏的情况,尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。
2018-04-08 23:34:51 +08:00
# 十二、锁优化
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
这里的锁优化主要是指虚拟机对 synchronized 的优化。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
## 自旋锁
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
互斥同步的进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
自选锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。自旋次数的默认值是 10 次,用户可以使用虚拟机参数 -XX:PreBlockSpin 来更改。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。
2018-04-08 21:27:55 +08:00
## 锁消除
2018-05-20 20:42:51 +08:00
锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们上的锁进行消除。
对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:
2018-04-08 21:27:55 +08:00
```java
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}
```
2018-05-20 20:42:51 +08:00
String 是一个不可变的类Javac 编译器会对 String 的拼接自动优化。在 JDK 1.5 之前,会转化为 StringBuffer 对象的连续 append() 操作,在 JDK 1.5 及以后的版本中,会转化为 StringBuilder 对象的连续 append() 操作,即上面的代码可能会变成下面的样子:
2018-04-08 21:27:55 +08:00
```java
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}
```
2018-05-20 20:42:51 +08:00
每个 StringBuffer.append() 方法中都有一个同步块,锁就是 sb 对象。虚拟机观察变量 sb很快就会发现它的动态作用域被限制在 concatString() 方法内部。也就是说sb 的所有引用永远不会“逃逸”到 concatString() 方法之外,其他线程无法访问到它。因此,虽然这里有锁,但是可以被安全地消除掉。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
## 锁粗化
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。
2018-04-08 21:27:55 +08:00
## 轻量级锁
2018-05-20 20:42:51 +08:00
JDK 1.6 引入了偏向锁和轻量级锁从而让锁拥有了四个状态无锁状态unlocked、偏向锁状态biasble、轻量级锁状态lightweight locked和重量级锁状态inflated
以下是 HotSpot 虚拟机对象头的内存布局,这些数据被称为 mark word。其中 tag bits 对应了五个状态,这些状态在右侧的 state 表格中给出,应该注意的是 state 表格不是存储在对象头中的。除了 marked for gc 状态,其它四个状态已经在前面介绍过了。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
<div align="center"> <img src="../pics//bb6a49be-00f2-4f27-a0ce-4ed764bc605c.png" width="600"/> </div><br>
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
下图左侧是一个线程的虚拟机栈,其中有一部分称为 Lock Record 的区域,这是在轻量级锁运行过程创建的,用于存放锁对象的 Mark Word。而右侧就是一个锁对象包含了 Mark Word 和其它信息。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
<div align="center"> <img src="../pics//051e436c-0e46-4c59-8f67-52d89d656182.png" width="500"/> </div><br>
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
当尝试获取一个锁对象时,如果锁对象标记为 0 01说明锁对象的锁未锁定unlocked状态。此时虚拟机在当前线程栈中创建 Lock Record然后使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针。如果 CAS 操作成功了,那么线程就获取了该对象上的锁,并且对象的 Mark Word 的锁标记变为 00表示该对象处于轻量级锁状态。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
<div align="center"> <img src="../pics//baaa681f-7c52-4198-a5ae-303b9386cf47.png" width="500"/> </div><br>
如果 CAS 操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的虚拟机栈,如果是的话说明当前线程已经拥有了这个锁对象,那就可以直接进入同步块继续执行,否则说明这个锁对象已经被其他线程线程抢占了。如果有两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁。
2018-04-08 21:27:55 +08:00
## 偏向锁
2018-05-20 20:42:51 +08:00
偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
可以使用 -XX:+UseBiasedLocking=true 开启偏向锁,不过在 JDK 1.6 中它是默认开启的。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
当锁对象第一次被线程获得的时候,进入偏向状态,标记为 1 01。同时使用 CAS 操作将线程 ID 记录到 Mark Word 中,如果 CAS 操作成功,这个线程以后每次进入这个锁相关的同步块就不需要再进行任何同步操作。
2018-04-08 21:27:55 +08:00
2018-05-20 20:42:51 +08:00
当有另外一个线程去尝试获取这个锁对象时偏向状态就宣告结束此时撤销偏向Revoke Bias后恢复到未锁定状态或者轻量级锁状态。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
<div align="center"> <img src="../pics//390c913b-5f31-444f-bbdb-2b88b688e7ce.jpg" width="600"/> </div><br>
2018-04-08 21:27:55 +08:00
2018-04-08 23:34:51 +08:00
# 十三、多线程开发良好的实践
2018-04-08 21:27:55 +08:00
2018-04-09 13:50:29 +08:00
- 给线程起个有意义的名字,这样可以方便找 Bug。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
- 缩小同步范围,例如对于 synchronized应该尽量使用同步块而不是同步方法。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
- 多用同步类少用 wait() 和 notify()。首先CountDownLatch, Semaphore, CyclicBarrier 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现对复杂控制流的控制。其次,这些类是由最好的企业编写和维护,在后续的 JDK 中它们还会不断优化和完善,使用这些更高等级的同步工具你的程序可以不费吹灰之力获得优化。
2018-04-08 21:27:55 +08:00
2018-04-15 16:23:38 +08:00
- 多用并发集合少用同步集合。并发集合比同步集合的可扩展性更好,例如应该使用 ConcurrentHashMap 而不是 Hashtable。
2018-04-09 13:50:29 +08:00
- 使用本地变量和不可变类来保证线程安全。
- 使用线程池而不是直接创建 Thread 对象,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务。
- 使用 BlockingQueue 实现生产者消费者问题。
2018-04-08 21:27:55 +08:00
# 参考资料
- BruceEckel. Java 编程思想: 第 4 版 [M]. 机械工业出版社, 2007.
- 周志明. 深入理解 Java 虚拟机 [M]. 机械工业出版社, 2011.
- [Threads and Locks](https://docs.oracle.com/javase/specs/jvms/se6/html/Threads.doc.html)
- [线程通信](http://ifeve.com/thread-signaling/#missed_signal)
- [Java 线程面试题 Top 50](http://www.importnew.com/12773.html)
- [BlockingQueue](http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html)
- [thread state java](https://stackoverflow.com/questions/11265289/thread-state-java)
- [CSC 456 Spring 2012/ch7 MN](http://wiki.expertiza.ncsu.edu/index.php/CSC_456_Spring_2012/ch7_MN)
- [Java - Understanding Happens-before relationship](https://www.logicbig.com/tutorials/core-java-tutorial/java-multi-threading/happens-before.html)
- [6장 Thread Synchronization](https://www.slideshare.net/novathinker/6-thread-synchronization)
- [How is Java's ThreadLocal implemented under the hood?](https://stackoverflow.com/questions/1202444/how-is-javas-threadlocal-implemented-under-the-hood/15653015)
2018-04-08 23:34:51 +08:00
- [Concurrent](https://sites.google.com/site/webdevelopart/21-compile/06-java/javase/concurrent?tmpl=%2Fsystem%2Fapp%2Ftemplates%2Fprint%2F&showPrintDialog=1)
2018-04-09 13:33:32 +08:00
- [JAVA FORK JOIN EXAMPLE](http://www.javacreed.com/java-fork-join-example/ "Java Fork Join Example")
- [聊聊并发——Fork/Join 框架介绍](http://ifeve.com/talk-concurrency-forkjoin/)
2018-05-20 20:42:51 +08:00
- [Eliminating SynchronizationRelated Atomic Operations with Biased Locking and Bulk Rebiasing](http://www.oracle.com/technetwork/java/javase/tech/biasedlocking-oopsla2006-preso-150106.pdf)