课程目标
1. 多线程的发展历史
2. 线程的应用
3. 并发编程的基础
4. 线程安全的问题
特定的指令,计算机不会存储指令,把指令写下来,一次性读取指令,批处理。
然后我们需要把批处理进行隔离,把批处理操作进行保存它的进度。
进程 —> 线程
单核CPU 只有可能会有一个进程去执行。
什么情况下应该使用多线程
线程出现的目的是什么?解决进程中多任务的实时性的问题?其实简单来说,就是解决“阻塞”的问题,阻塞的意思就是程序运行到某个函数或过程后等待某些事件发生而暂时停止 CPU 占用的情况,也就是说会使得 CPU 限制。还有一些场景就是比如对于一个函数中的运算逻辑的性能问题,我们可以 通过多线程的技术,使得一个函数中的多个逻辑运算通过多线程技术达到一个并行执行,从而提高性能。
CPU 架构图解:
所以,多线程最终解决的就是“等待”的问题,所以简单总结的使用场景
- 通过并行计算提高程序执行性能
- 需要等待网络、I/O响应导致耗费大量的执行时间,可以采用异步线程的方式来减少阻塞
Tomcat 7 以前的 I/O 模型
多线程的应用场景
- 客户端阻塞 如果客户端只有一个线程,这个线程发期读取文件的操作必须等待 IO 流返回,线程(客户端)才能做其他的事
- 线程级别阻塞 BIO 客户端一个线程情况下,一个线程导致整个客户端阻塞。那么我们可以使用多线程,一部分线程在等待 IO 操作返回其他线程可以继续做其他的事。此时从客户端角度来说,客户端没有闲着。
tomcat 模型:
多个客户端都是阻塞的,我只有处理完一个请求才能接收下一个请求。然后客户端就会阻塞。所以 Tomcat 采用了多线程的技术。利用了多线程的技术实现了非阻塞。
如何应用多线程
在 JAVA 中有多个方式来实现多线程。继承 Thread 类、实现 Runable 接口、使用 ExecutorService 、Callable、Future 实现带返回结果的多线程。
Thread
Runable
Callable
/Future
可以实现带返回值的线程
继承 Thread 类创建线程
Thread 类本质上是实现了 Runable 接口的一个实例,代表一个线程的实例。启动前程的唯一方法就是通过 Thread 类的 start() 实例方法。 start() 方法是一个 native 方法。它会启动一个新线程,并执行 run() 方法。这种实现多线程很简单那,通过自己的类直接 extends Thread , 并重写 run() 方法,就可以启动新线程并执行自己定义的 run() 方法。
public class MyThread extends Thread{
public static void main(String[] args) {
new MyThread().start();
new MyThread().start();
}
@Override
public void run() {
System.out.println("MyThrea run().....");
}
}
public class MyThread extends Thread{
public static void main(String[] args) {
new MyThread().start();
new MyThread().start();
}
@Override
public void run() {
System.out.println("MyThrea run().....");
}
}
2
3
4
5
6
7
8
9
10
实现 Runable 接口创建线程
如果自己的类已经 extends 另一个类,就无法直接继承 Thread,此时,可以实现 Runable 接口。
public class RunableDemo implements Runnable {
public static void main(String[] args) {
new Thread(new RunableDemo()).start();
new Thread(new RunableDemo()).start();
}
@Override
public void run() {
System.out.println("[" + Thread.currentThread().getName() + "]" + "runable My run().....");
}
}
public class RunableDemo implements Runnable {
public static void main(String[] args) {
new Thread(new RunableDemo()).start();
new Thread(new RunableDemo()).start();
}
@Override
public void run() {
System.out.println("[" + Thread.currentThread().getName() + "]" + "runable My run().....");
}
}
2
3
4
5
6
7
8
9
10
[Thread-0]runable My run().....
[Thread-1]runable My run().....
[Thread-0]runable My run().....
[Thread-1]runable My run().....
2
错误的写法:
public class RunableDemo implements Runnable {
// public static void main(String[] args) {
// new Thread(new RunableDemo()).start();
// new Thread(new RunableDemo()).start();
// }
public static void main(String[] args) {
new RunableDemo().run();
new RunableDemo().run();
}
@Override
public void run() {
System.out.println("[" + Thread.currentThread().getName() + "]" + "runable My run().....");
}
}
public class RunableDemo implements Runnable {
// public static void main(String[] args) {
// new Thread(new RunableDemo()).start();
// new Thread(new RunableDemo()).start();
// }
public static void main(String[] args) {
new RunableDemo().run();
new RunableDemo().run();
}
@Override
public void run() {
System.out.println("[" + Thread.currentThread().getName() + "]" + "runable My run().....");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[main]runable My run().....
[main]runable My run().....
[main]runable My run().....
[main]runable My run().....
2
实现 Callable 接口通过 FutureTask 包装器来创建 Thread 线程
有的时候,我们可能需要让异步执行的线程在执行完以后,提供一个返回值到当前的主线程,主线程需要这个值进行后续的逻辑处理,那么这个时候,就需要带返回值的线程了。
/***
* 当你想要异步的线程执行你的某一个逻辑,那么在这个运行结束以后
* 我想要拿到子线程运行的结果
*/
public class CallableDemo implements Callable<String> {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CallableDemo callableDemo = new CallableDemo();
Future<String> future = executorService.submit(callableDemo);
/***
* 这里可以写其它的业务
* 去写其它东西
*/
String returnValue = future.get(); // 这个地方再阻塞
System.out.println(returnValue);
executorService.shutdown();
}
@Override
public String call() throws Exception {
return "darain" + 1;
}
}
/***
* 当你想要异步的线程执行你的某一个逻辑,那么在这个运行结束以后
* 我想要拿到子线程运行的结果
*/
public class CallableDemo implements Callable<String> {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CallableDemo callableDemo = new CallableDemo();
Future<String> future = executorService.submit(callableDemo);
/***
* 这里可以写其它的业务
* 去写其它东西
*/
String returnValue = future.get(); // 这个地方再阻塞
System.out.println(returnValue);
executorService.shutdown();
}
@Override
public String call() throws Exception {
return "darain" + 1;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
如何把多线程用的优雅
合理的利用异步操作,可以大大的提升程序的处理性能,下面这个案例,如何看过 zookeeper 源码的同学应该看到过。
通过阻塞队列以及多线程的方式,实现对请求的异步化处理,提升处理性能。
模仿多个线程处理同一个请求
@Data
public class Request {
private String name;
}
@Data
public class Request {
private String name;
}
2
3
4
public interface RequestProcessor {
void processorRequest(Request requset);
}
public interface RequestProcessor {
void processorRequest(Request requset);
}
2
3
@RequiredArgsConstructor
public class PrintProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();
private final RequestProcessor nextProcess;
@Override
public void processorRequest(Request requset) {
linkedBlockingQueue.add(requset);
}
@Override
public void run() {
while (true) {
try {
Request requset = linkedBlockingQueue.take();
out.println("[" + Thread.currentThread().getName() + "] " + "print Data:" + requset);
nextProcess.processorRequest(requset);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@RequiredArgsConstructor
public class PrintProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();
private final RequestProcessor nextProcess;
@Override
public void processorRequest(Request requset) {
linkedBlockingQueue.add(requset);
}
@Override
public void run() {
while (true) {
try {
Request requset = linkedBlockingQueue.take();
out.println("[" + Thread.currentThread().getName() + "] " + "print Data:" + requset);
nextProcess.processorRequest(requset);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RequiredArgsConstructor
public class SaveProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();
@Override
public void processorRequest(Request requset) {
linkedBlockingQueue.add(requset);
}
@Override
public void run() {
while (true) {
try {
Request requset = linkedBlockingQueue.take();
System.out.println("[" + Thread.currentThread().getName() + "] " + "save data:" + requset);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@RequiredArgsConstructor
public class SaveProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();
@Override
public void processorRequest(Request requset) {
linkedBlockingQueue.add(requset);
}
@Override
public void run() {
while (true) {
try {
Request requset = linkedBlockingQueue.take();
System.out.println("[" + Thread.currentThread().getName() + "] " + "save data:" + requset);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/***
* 我们去处理的时候,用异步线程去处理。
* 当我们把一个请求丢过来的时候,不是直接去处理,而是通过异步线程去处理。
* zookeeper 就是类似的处理,一方面,你可以通过你的处理把职责划分开,
* 一方面你可以通过异步线程的处理去提升你程序的性能
* 合理的利用你 CPU 的资源
*
* 这个和 zookeeper 里边非常像
*/
public class Demo {
private final PrintProcessor printProcessor;
public Demo() {
SaveProcessor saveProcessor = new SaveProcessor();
saveProcessor.start();
printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
}
public static void main(String[] args) {
Request requset = new Request();
requset.setName("darian");
new Demo().doTest(requset);
}
public void doTest(Request request) {
printProcessor.processorRequest(request);
}
}
/***
* 我们去处理的时候,用异步线程去处理。
* 当我们把一个请求丢过来的时候,不是直接去处理,而是通过异步线程去处理。
* zookeeper 就是类似的处理,一方面,你可以通过你的处理把职责划分开,
* 一方面你可以通过异步线程的处理去提升你程序的性能
* 合理的利用你 CPU 的资源
*
* 这个和 zookeeper 里边非常像
*/
public class Demo {
private final PrintProcessor printProcessor;
public Demo() {
SaveProcessor saveProcessor = new SaveProcessor();
saveProcessor.start();
printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
}
public static void main(String[] args) {
Request requset = new Request();
requset.setName("darian");
new Demo().doTest(requset);
}
public void doTest(Request request) {
printProcessor.processorRequest(request);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
就像一个链表一样的,上一个对象的引用指向下一个对象。是不会乱序的。
线程的基础知识
线程作为操作系统调度的最小单元,并且能够让多线程同时执行,极大的提高了程序的性能,在多核的环境下的优势更加明显。但是在多线程的使用过程中如果对它的特性和原理不够了解的话,就容易造成各种问题。
线程的状态(六种)
JAVA 线程既然能够创建,那么也会销毁,所以线程是存在声明周期的。那么我们解析来从线程的生命周期开始去了解线程。
线程一共六种状态
(NEW RUNNABLE BLOCKED WAITING TIME_WAITING TERMINATED)
NEW
初始状态,线程被构建,但是还没有调用 #start
方法
RUNNABLE
运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一称为 "运行中"
BLOCKED
阻塞,表示线程进如等待状态,也就是线程因为某种原因放弃了 CPU 的使用权,阻塞也分为几种情况。
等待阻塞 运行的线程调用了
#wait
方法,JVM 会把当前线程放到等待队列同步阻塞
synchronized
,运行的线程在获取对象的同步锁时,若该同步锁被其它线程占用了,那么 JVM 会把当前的线程放入到锁池中。其它阻塞
sleep
/join
运行的线程执行Thread.sleep()
或者t.join
方法,或者发出了 I/O 请求时, JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、io 处理完毕则线程恢复。WAITING
等待 waiting 是我们的线程调用了一个 #wait
方法,实际上也会变成一个阻塞。就是我们没有办法继续去运行了的
TIME_WAITING
超时等待状态,超时以后自动返回
TERMINATED
终止状态,表示当前线程执行完毕
线程运行状态图:
线程的运行状态有两种状态,
不存在就绪的状态,只是为了描述它的一个状态。
打开
Thread
类,搜索 state 有哪些状态,它写的很清楚。
当运行中的线程的时间片被 CPU 抢占的时候,那么它又会变成一个就绪状态。
线程执行完就是终止。
synchroninzed
就是让这个线程获得锁。获得锁,就意味着,其它线程再进来这个方法的时候,它会阻塞。当我们获得锁的时候。比如说我们现在有两个线程。第一个 T1 线程访问同步代码块。同步代码块里面,首先它会获得一个锁。当 T2 线程进来以后,它是没有办法获得锁的。
线程状态:
public class ThreadStatusDemo {
public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "timewaiting").start();
new Thread(() -> {
while (true) { // 我们在一个循环里边获得一个锁
synchronized (ThreadStatusDemo.class) {
try {
// 然后调用 wait() 方法,是因为它调用 wait 方法之前必须要获得锁
ThreadStatusDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "waiting").start();
new Thread(new blockDemo(), "blockDemo-0").start();
new Thread(new blockDemo(), "blockDemo-1").start();
}
static class blockDemo extends Thread {
@Override
public void run() {
synchronized (blockDemo.class) {
while (true) {
try { // 100 秒,一直让它阻塞
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
public class ThreadStatusDemo {
public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "timewaiting").start();
new Thread(() -> {
while (true) { // 我们在一个循环里边获得一个锁
synchronized (ThreadStatusDemo.class) {
try {
// 然后调用 wait() 方法,是因为它调用 wait 方法之前必须要获得锁
ThreadStatusDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "waiting").start();
new Thread(new blockDemo(), "blockDemo-0").start();
new Thread(new blockDemo(), "blockDemo-1").start();
}
static class blockDemo extends Thread {
@Override
public void run() {
synchronized (blockDemo.class) {
while (true) {
try { // 100 秒,一直让它阻塞
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
通过相应的命令显示线程状态:
- 打开终端或者命令提示符,键入
JPS
,(JDK 1.5 提供的要给显示当前所有的 JAVA 进程 PID 的命令),可以获得相应进程的 PID - 根据上一步骤获得的 PID,继续输入
jstack
+ pid (jstack
时 JAVA 虚拟机自带的一种堆栈跟踪工具。jstack 用户打印出给定的 JAVA 进程 ID 或 core file 或远程调试服务的 java 堆栈信息)
我们在写线程的时候,最好定义一个名称。我们去查看问题的时候,有利于我们去排查问题。
阻塞状态,blocked
当 synchronized
加锁的情况下,两个线程同时去访问一个方法,这个时候,就会存在 阻塞。
JPS
是 JDK 1.5 以后,显示所有 JAVA 进程的工具。
jstack 30112
可以查看线程的状态。
- blockDemo-0 获得锁,变成了一个
TIMED_WAITING
的状态。#sleep
- blockDemo-1 没有拿到锁 (on object monitor)
TIMED_WAITING
#sleep
方法
"DestroyJavaVM" #18 prio=5 os_prio=0 tid=0x0000000002a02800 nid=0x697c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"blockDemo-1" #17 prio=5 os_prio=0 tid=0x0000000029066800 nid=0x5f74 waiting for monitor entry [0x0000000029bff000]
java.lang.Thread.State: BLOCKED (on object monitor)
"blockDemo-0" #15 prio=5 os_prio=0 tid=0x0000000029065800 nid=0x36f8 waiting on condition [0x0000000029aff000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
"waiting" #13 prio=5 os_prio=0 tid=0x0000000029061000 nid=0x7bd0 in Object.wait() [0x00000000299fe000]
java.lang.Thread.State: WAITING (on object monitor)
"timewaiting" #12 prio=5 os_prio=0 tid=0x0000000029094800 nid=0x6bd4 waiting on condition [0x00000000298fe000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
"Service Thread" #11 daemon prio=9 os_prio=0 tid=0x0000000027540800 nid=0x8310 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread3" #10 daemon prio=9 os_prio=2 tid=0x000000002747e000 nid=0x5344 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x000000002747d000 nid=0x24c0 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x0000000027475800 nid=0x7c30 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x0000000027474800 nid=0x5c78 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x000000002745c800 nid=0xde0 runnable [0x00000000289fe000]
java.lang.Thread.State: RUNNABLE
"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x00000000273ba000 nid=0x3434 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x0000000027411800 nid=0x839c runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x00000000273a3800 nid=0x79bc in Object.wait() [0x00000000286fe000]
java.lang.Thread.State: WAITING (on object monitor)
"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x0000000002afa000 nid=0x4e7c in Object.wait() [0x00000000285ff000]
java.lang.Thread.State: WAITING (on object monitor)
"DestroyJavaVM" #18 prio=5 os_prio=0 tid=0x0000000002a02800 nid=0x697c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"blockDemo-1" #17 prio=5 os_prio=0 tid=0x0000000029066800 nid=0x5f74 waiting for monitor entry [0x0000000029bff000]
java.lang.Thread.State: BLOCKED (on object monitor)
"blockDemo-0" #15 prio=5 os_prio=0 tid=0x0000000029065800 nid=0x36f8 waiting on condition [0x0000000029aff000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
"waiting" #13 prio=5 os_prio=0 tid=0x0000000029061000 nid=0x7bd0 in Object.wait() [0x00000000299fe000]
java.lang.Thread.State: WAITING (on object monitor)
"timewaiting" #12 prio=5 os_prio=0 tid=0x0000000029094800 nid=0x6bd4 waiting on condition [0x00000000298fe000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
"Service Thread" #11 daemon prio=9 os_prio=0 tid=0x0000000027540800 nid=0x8310 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread3" #10 daemon prio=9 os_prio=2 tid=0x000000002747e000 nid=0x5344 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x000000002747d000 nid=0x24c0 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x0000000027475800 nid=0x7c30 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x0000000027474800 nid=0x5c78 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x000000002745c800 nid=0xde0 runnable [0x00000000289fe000]
java.lang.Thread.State: RUNNABLE
"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x00000000273ba000 nid=0x3434 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x0000000027411800 nid=0x839c runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x00000000273a3800 nid=0x79bc in Object.wait() [0x00000000286fe000]
java.lang.Thread.State: WAITING (on object monitor)
"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x0000000002afa000 nid=0x4e7c in Object.wait() [0x00000000285ff000]
java.lang.Thread.State: WAITING (on object monitor)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
我们很多时候,要发现线程的信息。
线程的启动和终止
你怎么去启动一个线程?终止?
#start
native 方法,告诉 JVM 去启动一个线程,然后调用 #run
方法去执行。
方法是不建议使用的。 #stop
@Deprecated
!!它就像我们在 Linux 系统中,kill
命令一样,就是我不知道我当前这个线程是不是还在运行,有没有还没处理完的。没有处理完的话,我强制关闭,就会出现一些数据问题,和一些不可预测的问题出现。 , #susped
#resume
怎么去优雅的关闭,我们关闭,Tomcat 也好,去关闭一些进程也好,我们都会提供一些优雅的方式去关闭。一些指令去执行,一般的中间件都会做一个操作,一般都会先去阻止后续的请求进来,然后等待正在运行的线程执行完以后优雅的停止掉。
#interrupt
优雅中断的方式。
当其它线程通过调用当前线程的 #interrupt
方法,表示向当前线程打个招呼,告诉他可以中断线程的执行了,至于什么时候中单,取决于当前线程自己。线程通过检查自身是或否被中断来进行相应,可以通过 isIntrrupted()
来判断是否被中断。
实现线程终止的逻辑:
public class InterruptDemo {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
// 我去判断是否中断这个线程
while (!Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println(i);
}, "interruptDemo");
thread.start();
TimeUnit.SECONDS.sleep(1);
// 通过线程的 interrupt 设置标识为 true
System.out.println(thread.isInterrupted());
thread.interrupt();
System.out.println(thread.isInterrupted());
}
}
public class InterruptDemo {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
// 我去判断是否中断这个线程
while (!Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println(i);
}, "interruptDemo");
thread.start();
TimeUnit.SECONDS.sleep(1);
// 通过线程的 interrupt 设置标识为 true
System.out.println(thread.isInterrupted());
thread.interrupt();
System.out.println(thread.isInterrupted());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
这种通过表示为或者中断操作的而方式能够使线程在终止时有机会去清理资源,而不是武断的将线程停止。因此更加安全和优雅。
Thread.interrupted
通过 interrupt,设置了一个表示告诉 线程可以终止了,线程中还提供了静态方法 Thread.interrupted()
对设置中断标识的线程复位。比如在啥变得线程,外边的线程调用 thread.interrupt
来设置中断标识,而在线程里边,又通过 Thread.interrupted
把线程的标识进行了复位。
public static void interrupt1() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
boolean interrupted = Thread.currentThread().isInterrupted();
if (interrupted) {
System.out.println("before:" + interrupted);
Thread.interrupted(); // 对线程进行复位,中断标识为 false
System.out.println("after:" + Thread.currentThread().isInterrupted());
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); // 设置中断标识,中断标识为 true
}
public static void interrupt1() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
boolean interrupted = Thread.currentThread().isInterrupted();
if (interrupted) {
System.out.println("before:" + interrupted);
Thread.interrupted(); // 对线程进行复位,中断标识为 false
System.out.println("after:" + Thread.currentThread().isInterrupted());
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); // 设置中断标识,中断标识为 true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
before:true
after:false
before:true
after:false
2
其它的线程复位
除了通过 Thread.interrupted
方法对线程中断标识进行复位意外,还有一张被动复位的场景,就是对抛出 interruptedException
异常的方法,在 interruptedException
抛出之前, JVM 会先把线程的中断标识位清楚,然后会抛出 InterruptedException
这个时候,如果调用 #isInterrupted
方法,将会返回 false。
public static void interrupt2() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// 抛出该异常,会将复位表示设置为 false
e.printStackTrace();
}
}
});
thread.start();
thread.interrupt(); // 将复位表示设置位 true
TimeUnit.SECONDS.sleep(1);
System.out.println("before:" + thread.isInterrupted());
TimeUnit.SECONDS.sleep(1);
System.out.println("after:" + thread.isInterrupted());
}
public static void interrupt2() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// 抛出该异常,会将复位表示设置为 false
e.printStackTrace();
}
}
});
thread.start();
thread.interrupt(); // 将复位表示设置位 true
TimeUnit.SECONDS.sleep(1);
System.out.println("before:" + thread.isInterrupted());
TimeUnit.SECONDS.sleep(1);
System.out.println("after:" + thread.isInterrupted());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
通过指令的方式,volatile boolean isStop = false;
这样的一个方式,也是可以的。通过内存的可见。
interrupt
和我们设置标志变量的方式是一样的。
java.lang.Thread#interrupt
java.lang.Thread#interrupt0
native 方法
thread.cpp
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}
2
3
4
5
6
os_linux.cpp
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
内存屏障 fence()
,让标志的改变,让所有线程看见,和 volatile
一个意思。
unpark()
线程。
其实就是通过 unpark
去唤醒
Thread#interrupted
是一个静态方法,对设置的中断标识的线程进行复位。
线程的停止方法之 2
除了通过 #interrupt
标识去中断线程意外,我们可以通过 :
定义一个 volatile
修饰的成员变量,来控制线程的终止。这实际上是应用了 volatile 实现多线程之间的共享变量可见性这一特点来实现的。
public class ThreadStopDemo3 {
// 这种和 interrupted 方式是一样的。
private static volatile boolean stop = true;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
int i = 0;
while (!stop) {
i++;
}
});
thread.start();
System.out.println("begin start thread");
Thread.sleep(1000);
stop = true;
}
}
public class ThreadStopDemo3 {
// 这种和 interrupted 方式是一样的。
private static volatile boolean stop = true;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
int i = 0;
while (!stop) {
i++;
}
});
thread.start();
System.out.println("begin start thread");
Thread.sleep(1000);
stop = true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
线程的安全性
- 可见性
- 原子性
- 有序性
认识这三个问题。
可见性
/***
* 可见性问题
*/
public class VisableDemo {
// 加上 volatile 之后,才可以停止。
private volatile static boolean stop = false;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
int i = 0;
while (!stop) {
i++;
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
stop = true;
}
}
/***
* 可见性问题
*/
public class VisableDemo {
// 加上 volatile 之后,才可以停止。
private volatile static boolean stop = false;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
int i = 0;
while (!stop) {
i++;
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
stop = true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
原子性
/***
*
*/
public class AutomicDemo {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(AutomicDemo::inc).start();
}
Thread.sleep(4000);
System.out.println("y运行结果:" + count);
// y运行结果:952 (<= 1000)
}
public static void inc() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
}
/***
*
*/
public class AutomicDemo {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(AutomicDemo::inc).start();
}
Thread.sleep(4000);
System.out.println("y运行结果:" + count);
// y运行结果:952 (<= 1000)
}
public static void inc() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
有序性
没有办法演示,
有序性是指,程序执行的顺序和我们代码编写的顺序是不一致的。它会存在编译器的优化,和指令的优化。就是 CPU 执行过程中的一个指令重排的问题。
JAVA 内存模型中允许编译器和处理器去指令重排序来优化我们的执行。提升我们 CPU 的利用率。
它有一个原则:
它不影响我们代码语义的情况下会进行适当的重排序。
CPU 的高速缓存
JMM 的高速缓存
线程是 CPU 调度的一个最小单元。线程设计的目的仍然是更充分的利用计算机处理的性能,但是绝大部分的运算任务不能只依靠处理器 “计算” 就能完成,处理器还需要和内存交互,比如地区运算数据、存储运算结果,这个 I/O 操作时很难消除的。而由于计算机的存储设备和处理器的运算速度差距非常大,所以现代计算机系统都会增加一层读写速度尽可能接近处理器运算速度的高速缓存来作为内存和处理器之间的缓冲:将运行需要使用的数据复制到缓存中,让运算能快速进行,当运算结束后再从缓存同步到内存中。
运行时会先去内存中去加载,如果找不到就会去主内存中去加载。
CPU 高速缓存图片:
寄存器:【Rages】
高速缓存从下到上越接近 CPU 速度越快,同时容量也越小,现在大部分处理器都有二级或者三级缓存,从下到上依次是 L3 cache, L2 cache, L1 cache , 缓存又可以分为指令缓存和数据缓存,指令缓存用来缓存程序的代码,数据缓存用来缓存程序的数据。
L1 Cache
一级缓存,本地 core 的缓存,分为 32K 的数据缓存 L1d 和 32K 指令缓存 L1i ,访问 L1 需要 3cycles, 耗时大约 1ns.
L2 Cache
二级缓存,本地 core 的缓存,被设计位 L1 缓存与共享 L3 缓存之间的缓冲,大小为 256K,访问 L2 需要 12 cycles,耗时 3ns
L3 Cache
三级缓存,在同插槽的所有 core 共享 L3 缓存,分为多个 2M 的端,访问 L3 需要 38 cycles, 耗时大约 12ns
L3 缓存主要是为了解决 CPU 操作的一个延时的问题。
告诉缓存中拿不到,就会去主内存中去加载。
缓存一致性问题
CPU-0 读取竹村的数据,缓存到 CPU-0 的告诉缓存中,CPU-1 也做了同样的事情,而 CPU-1 把 count 的值修改成了 2,并且同步到 CPU-1 的告诉缓存,但是这个修改后的值,并没有写入到主存中,CPU-0 访问该字节,由于缓存没有更新。所以仍然是之前的值,就会导致数据不一致的问题。
每个CPU 可以运行一个线程,那么多个 CPU 可以并行运行多个线程。多个线程同时去读取一个共享变量的时候,就会把这个数据都加载到它的高速缓存中来。每个 CPU 的高速缓存池对于其它 CPU 来说是不可见的。
引发这个问题的原因是,多核心 CPU 存在指令并行执行,而各个 CPU 核心之间的数据不共享从而导致缓存一致性问题,为了解决这个问题,CPU 生产厂商提供了相应的解决方案。
CPU 层面提供了两种锁
- 总线锁
- 缓存锁
总线锁
锁总线,当我们其中一个 CPU 在执行一个线程,去访问一个数据的时候,回往总线上发起一个 LOCK 信号,那么其它的CPU 再次去请求这个相同的数据进行操作的时候,它就会被阻塞,意味着这个总线锁就是一个排他锁。对于整个 CPU 来说,它是一个 排他的,对于 多个 CPU 来说,它会导致性能问题。多核的目的是为了做负载,提升运行效率。单核的提升达到瓶颈。加了一个 总线锁,就又让它串行执行。所以 P6 系列以后的处理器,出现了另外一种方式,就是缓存锁。
缓存锁
如果缓存在处理器缓存中的内存区域在 LOCK 操作期间被锁定,当它执行操作回写内存的时候,处理不再总线上声明 LOCK 信号,而是修改内部的缓存地址,然后通过缓存一致性机制来保证操作的原子性,因为缓存一致性会组之同时修改两个以上处理器缓存的内存区域的数据,当其他处理器会写已经被锁定的缓存行的数据时会导致该缓存行无效。
所以如果声明了 CPU 的锁机制,会生成一个 LOCK 指令,会产生两个作用。
- LOCK 前缀指令会引起处理器缓存器缓存会写到内存,在 P6 以后的处理器中,LOCK 信号一般不锁总线,而是锁缓存
- 一个处理器的缓存会写到内存会导致其他处理器的缓存无效。
缓存一致性协议
处理器上有一套完整的协议,来保证 Cache 的一致性,比较经典的就是 MESI 协议。
一般都是 MESI 的协议。它的方法时在 CPU 中保存了一个标记位,这个标记位有四种状态。
M(modify)
修改缓存,当前 CPU 缓存已经被修改,表示已经和内存中的数据不一致了
I(invalid)
失效缓存,说明 CPU 的缓存已经不能使用了
E(exclusive)
独占缓存,当前的 CPU 的缓存和内存中的数据保持一致,而且其它处理器没有缓存该数据
S(shared)
共享缓存,数据和内存中数据一致,并且该数据存在多个 CPU 缓存中,
每个 Core 的 Cache 控制器不仅知道自己的读写操作,也监听其它的 Cache 的读写操作,嗅探(snooping)协议。
CPU 的读取会遵循几个原则:
- 如果缓存的状态是 I ,那么就从内存中读取,否则直接从缓存中读取
- 如果缓存处于 M 或者 E 的 CPU 嗅探到其他 CPU 有读的操作,就把自己的缓存写入到内存,并把自己的的状态设置为 S
- 只有缓存状态时 M 或 E 的时候,CPU 才可以修改缓存中的数据,修改后,缓存状态变为 MC。
(怎么去通知缓存失效不需要去关心的)
CPU 的优化执行
除了增加高速缓存以外,为了更充分的利用处理器内部的运算单元,处理器会对输入的代码进行乱序执行优化,处理器会在计算之后将乱序执行的结果充足,保证该结果与顺序执行的结果一致,但并不保证程序中各个语句计算的先后顺序与输入代码中的顺序一致,这个是处理器的优化执行。还有一个就是编程语言的编译器也会有类似的优化,比如说做指令重拍来提升性能。
它会保证一个约束。保证我的语义不会变化。(你可以重排序,你可以对指令乱序执行,但是语义不能发生变化。)CPU 访问主内存的时候,可能是一个交叉访问的。对于同一个 CPU 我去访问内存,我是可控的。但是对于多处理器来说,我的访问顺序是可变的,不确定的。乱序执行,有些指令的执行的时间比较长,CPU 会比较占用时间。CPU 会进行优化的执行,通过我们编译器的优化,还有乱序访问。CPU 访问主内存的顺序,对多个 CPU 来说是不可控的。
并发编程的问题
原子性,可见性,有序性都是抽象的概念,他们的核心本质就是 缓存一致性问题,和处理器优化问题的指令重排序问题。
- 可见性问题?
- 乱序执行(内存乱序访问?)
缓存一致性,就会导致可见性问题。处理器的乱序执行会导致原子性的问题,指令重拍会导致有序性问题,为了解决这些问题,所以在 JVM 中引入了 JMM 的概念。
JMM (应用层面) 内存模型
内存模型定义了共享内存系统中多线程程序读写操作行为的规范,来屏蔽各种硬件和操作系统的内存访问差异,来实现 JAVA 程序在各个平台下都能达成一致的内存访问效果。JAVA 内存模型的主要目标是定义程序中各个变量的访问规则,也就是在虚拟机中将变量存储到内存以及从内存中取出变量(这里的变量指的是共享变量,也就是实例对象、静态字段、数组对象等存储在堆内存中的变量。而对于局部变量这类的,属于线程私有,不会被共享)这类的底层细节。通过这些规则来规范对内存的读写操作,从而保证指令执行的正确性。
它与处理器有关、与缓存有关、与并发有关、与编译器有关。他解决了 CPU 多级缓存、处理器优化、指令重拍等导致的内存访问问题,保证了并发场景下的可见性、原子性和有序性。内存模型解决并发问题主要采取两种方式:限制处理器优化和使用内存屏障。
JAVA 内存模型定义了线程和内存的交互方式,在 JMM 抽象模型中,分为主内存、工作内存。主内存是所有线程共享的,工作内存是每个线程独有的。线程对变量的所有操作(读取、赋值)都必须在工作内存中进行,不能直接读写主内存中的变量。并且不同的线程之间无法访问对方工作内存中的变量,县城及爱你的变量值的传递都需要通过主内存来完成。他们三者的交互关系如下:
JMM 交互图
所以总的来说,JMM 是一种规范,目的是解决由于多线程通过共享内存进行通讯是,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。目的是保证并发编程场景中的原子性、可见性和有序性。