【Java并发编程】并发编程基础

Java并发/多线程编程系列blog(三)Java并发编程基础含线程简介/启动与终止线程/线程间通信

线程简介

现代操作系统在运行一个程序时,会为其创建一个进程。例如,启动一个Java程序,操作系统就会创建一个Java进程现代操作系统调度的最小单元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局 部变量等属性,并且能够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。

1
2
3
4
5
6
7
8
9
10
11
12
public class MultiThread{ 
public static void main(String[] args) {
// 获取Java线程管理MXBean
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
// 不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
// 遍历线程信息,仅打印线程ID和线程名称信息
for (ThreadInfo threadInfo : threadInfos) {
System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo. getThreadName());
}
}
}

输出:

1
2
3
4
[4] Signal Dispatcher // 分发处理发送给JVM信号的线程 
[3] Finalizer // 调用对象finalize方法的线程
[2] Reference Handler // 清除Reference的线程
[1] main // main线程,用户程序入口

可以看到,一个Java程序的运行不仅仅是main()方法的运行,而是main线程和多个其他线程的同时运行。

为什么要使用多线程

  • 更多的处理器核心;
  • 更快的响应时间;
  • 更好的编程模型。

线程优先级

现代操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下次分配。线程分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器资源的线程属性。

在Java线程中,通过一个整型成员变量priority来控制优先级,优先级的范围从1~10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级时,针对频繁阻塞(休眠或者I/O操作)的线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较 低的优先级,确保处理器不会被独占。在不同的JVM以及操作系统上,线程规划会存在差异,有些操作系统甚至会忽略对线程优先级的设定,如示例代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Priority {
private static volatile boolean notStart = true;
private static volatile boolean notEnd = true;
public static void main(String[] args) throws Exception {
List<Job> jobs = new ArrayList<Job>();
for (int i = 0; i < 10; i++) {
int priority = i < 5 Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
Job job = new Job(priority);
jobs.add(job);
Thread thread = new Thread(job, "Thread:" + i); thread.setPriority(priority);
thread.start();
}
notStart = false;
TimeUnit.SECONDS.sleep(10);
notEnd = false;
for (Job job : jobs) {
System.out.println("Job Priority : " + job.priority + ", Count : " + job.jobCount);
}
}
static class Job implements Runnable {
private int priority;
private long jobCount;
public Job(int priority) {
this.priority = priority;

}
public void run() {
while (notStart) {
Thread.yield();

}while (notEnd) {
Thread.yield();
jobCount++;
}
}
}
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
Job Priority : 1, Count : 1259592 
Job Priority : 1, Count : 1260717
Job Priority : 1, Count : 1264510
Job Priority : 1, Count : 1251897
Job Priority : 1, Count : 1264060
Job Priority : 10, Count : 1256938
Job Priority : 10, Count : 1267663
Job Priority : 10, Count : 1260637
Job Priority : 10, Count : 1261705
Job Priority : 10, Count : 1259967

从输出可以看到线程优先级没有生效,优先级1和优先级10的Job计数的结果非常相近,没有明显差距。这表示程序正确性不能依赖线程的优先级高低。

线程的状态

Java线程在运行的生命周期中的某个时刻处于其中的一个状态:
CCsqM.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package Second;

public class ThreadState {
public static void main(String[] args) {
new Thread(new TimeWaiting(), "TimeWaitingThread").start();
new Thread(new Waiting(), "WaitingThread").start(); // 使用两个Blocked线程,一个获取锁成功,另一个被阻塞
new Thread(new Blocked(), "BlockedThread-1").start();
new Thread(new Blocked(), "BlockedThread-2").start();
}// 该线程不断地进行睡眠
static class TimeWaiting implements Runnable { @Override
public void run() {
while (true) {
SleepUtils.second(100);
}
}
}// 该线程在Waiting.class实例上等待
static class Waiting implements Runnable {
public void run() {
while (true) {
synchronized (Waiting.class) {
try {Waiting.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}// 该线程在Blocked.class实例上加锁后,不会释放该锁
static class Blocked implements Runnable {
public void run() {
synchronized (Blocked.class) {
while (true) {
SleepUtils.second(100);
}
}
}
}
}

package Second;

import java.util.concurrent.TimeUnit;

public class SleepUtils {
public static final void second(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {

}
}
}

运行该示例,打开终端或者命令提示符,键入“jps”,输出如下:

1
2
3
4
611 
935 Jps
929 ThreadState
270

可以看到运行示例对应的进程ID是929,接着再键入“jstack 929”(这里的进程ID需要和读
者自己键入jps得出的ID一致),部分输出如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// BlockedThread-2线程阻塞在获取Blocked.class示例的锁上
"BlockedThread-2" prio=5 tid=0x00007feacb05d000 nid=0x5d03 waiting for monitor
entry [0x000000010fd58000]
java.lang.Thread.State: BLOCKED (on object monitor)
// BlockedThread-1线程获取到了Blocked.class的锁
"BlockedThread-1" prio=5 tid=0x00007feacb05a000 nid=0x5b03 waiting on condition
[0x000000010fc55000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
// WaitingThread线程在Waiting实例上等待
"WaitingThread" prio=5 tid=0x00007feacb059800 nid=0x5903 in Object.wait()
[0x000000010fb52000]
java.lang.Thread.State: WAITING (on object monitor)
// TimeWaitingThread线程处于超时等待
"TimeWaitingThread" prio=5 tid=0x00007feacb058800 nid=0x5703 waiting on condition
[0x000000010fa4f000]
java.lang.Thread.State: TIMED_WAITING (sleeping)

通过示例,我们了解到Java程序运行中线程状态的具体含义。线程在自身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换,Java线程状态变迁如图所示:

CCuhI.png

Daemon线程

Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个Java虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出。可以通过调用Thread.setDaemon(true)将线程设置为Daemon线程。注意Daemon属性需要在启动线程之前设置,不能在启动线程之后设置。 Daemon线程被用作完成支持性工作,但是在Java虚拟机退出时Daemon线程中的finally块并不一定会执行,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package Second;

public class Daemon {

public static void main(String[] args) {
Thread thread = new Thread(new DaemonRunner(), "RaemonRunner");
thread.setDaemon(true);
thread.start();
}
static class DaemonRunner implements Runnable{
public void run() {
try {
SleepUtils.second(10);
} finally {
System.out.println("DaemonRunner finally run");
}
}
}
}

结果终端上没有任何输出。main线程在启动了线程DaemonRunner之后随着main方法执行完毕而终止,而此时Java虚拟机中已经没有非Daemon线程,虚拟机需要退出。Java虚拟机中的所有Daemon线程都需要立即终止,因此DaemonRunner立即终止,但是DaemonRunner中的finally块并没有执行。注意:在构建Daemon线程时,不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑

启动和终止线程

构造线程

在运行线程之前首先要构造一个线程对象,线程对象在构造的时候需要提供线程所需要的属性,如线程所属的线程组、线程优先级、是否是Daemon线程等信息。代码摘自java.lang.Thread中对线程进行初始化的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void init(ThreadGroup g, Runnable target, String name,long stackSize, AccessControlContext acc) { 
if (name == null) {
throw new NullPointerException("name cannot be null");
}
// 当前线程就是该线程的父线程
Thread parent = currentThread();
this.group = g;
// 将daemon、priority属性设置为父线程的对应属性 this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
this.name = name.toCharArray();
this.target = target; setPriority(priority);
//将父线程的InheritableThreadLocal复制过来
if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals=ThreadLocal.createInheritedMap(parent. inheritableThreadLocals);
// 分配一个线程ID
tid = nextThreadID();
}

一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent是否为Daemon、优先级和加载资源的contextClassLoader以及可继承的ThreadLocal,同时还会分配一个唯一的ID来标识这个child线程。至此,一个能够运行的线程对象就初始化好了,在堆内存中等待着运行。

启动线程

线程对象在初始化完成之后,调用start()方法就可以启动这个线程。线程start()方法的含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,应立即启动调用start()方法的线程。注意:启动一个线程前,最好为这个线程设置线程名称,因为这样在使用jstack分析程 序或者进行问题排查时,就会给开发人员提供一些提示,自定义的线程最好能够起个名字。

理解中断

中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。其他线程通过调用该线程的interrupt() 方法对其进行中断操作。处于终结状态的线程,即使该线程被中断过,在调用该线程对象的isInterrupted()时依旧会返回false。从Java的API中可以看到,许多声明抛出InterruptedException的方法(例如Thread.sleep(long millis)方法)这些方法在抛出InterruptedException之前,Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,此时调用isInterrupted()方法将会返回false。代码例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package Second;

import java.util.concurrent.TimeUnit;

public class Interrupted {

public static void main(String[] args) throws Exception {
//不停进行休眠的线程
Thread sleepthread = new Thread(new SleepRunner(), "sleepthread");
sleepthread.setDaemon(true);
//不停进行工作的线程
Thread busythread = new Thread(new BusyRunner(), "busythread");
busythread.setDaemon(true);

sleepthread.start();
busythread.start();
//休眠五秒,让两个线程充分运行
TimeUnit.SECONDS.sleep(5);
sleepthread.interrupt();
busythread.interrupt();
System.out.println("SleepThread interrupted is: "+sleepthread.isInterrupted());
System.out.println("BusyThread interrupted is: "+busythread.isInterrupted());
}
static class SleepRunner implements Runnable{
public void run() {
while(true){
SleepUtils.second(10);
}
}
}
static class BusyRunner implements Runnable{
public void run() {
while(true) {
}
}
}
}

结果:

1
2
SleepThread interrupted is: false
BusyThread interrupted is: true

从结果可以看出,抛出InterruptedException的线程SleepThread,其中断标识位被清除了,而一直忙碌运作的线程BusyThread,中断标识位没有被清除。

过期的suspend()、resume()、stop()

暂停、恢复和停止操作对应在线程Thread的API就是suspend()、resume()和stop()。对应的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package Second;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class Delayed {
public static void main(String[] args) throws InterruptedException {
DateFormat format = new SimpleDateFormat("HH:MM:SS");
Thread thread = new Thread(new Runner(), "PrintThread");
thread.setDaemon(true);
thread.start();
TimeUnit.SECONDS.sleep(3);
//将Thread进行暂停,输出工作内容后停止
thread.suspend();
System.out.println(thread.getName()+" run at "+format.format(new Date()));
TimeUnit.SECONDS.sleep(3);
//将Thread进行恢复,输出工作内容后恢复
thread.resume();
System.out.println(thread.getName()+" resume at "+format.format(new Date()));
TimeUnit.SECONDS.sleep(3);
//将Thread进行停止,输出工作内容后停止
thread.stop();
System.out.println(thread.getName()+" stop at "+format.format(new Date()));
TimeUnit.SECONDS.sleep(3);
}
static class Runner implements Runnable{
public void run() {
DateFormat format = new SimpleDateFormat("HH:MM:SS");
while(true) {
System.out.println(Thread.currentThread().getName()+" run at "+format.format(new Date()));
SleepUtils.second(1);
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
PrintThread run at 15:07:676
PrintThread run at 15:07:681
PrintThread run at 15:07:681
PrintThread run at 15:07:679
PrintThread run at 15:07:684
PrintThread resume at 15:07:684
PrintThread run at 15:07:689
PrintThread run at 15:07:693
PrintThread stop at 15:07:690

在执行过程中,PrintThread运行了3秒,随后被暂停,3秒后恢复,最后经过3秒被终止。 通过示例的输出可以看到,suspend()、resume()和stop()方法完成了线程的暂停、恢复和终止工作,但是这些API是过期的,也就是不建议使用的。不建议使用的原因主要有:

  • 以suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
  • stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。

注意:正因为suspend()、resume()和stop()方法带来的副作用,这些方法才被标注为不建议使用的过期方法,而暂停和恢复操作可以等待/通知机制来替代

安全地终止线程

中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。除了中断以外,还可以利用一个boolean变量来控制是否需要停止任务并终止该线程。实例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package Second;

import java.util.concurrent.TimeUnit;

public class ShutDown {

public static void main(String[] args) throws InterruptedException {
Runner one = new Runner();
Runner two = new Runner();
Thread onethread = new Thread(one, "onethread");
onethread.setDaemon(true);
onethread.start();
TimeUnit.SECONDS.sleep(1);
onethread.interrupt();

Thread twothread = new Thread(two, "twothread");
twothread.setDaemon(true);
twothread.start();
TimeUnit.SECONDS.sleep(1);
two.cancel();
}
private static class Runner implements Runnable{
private long i;
private boolean on = true;
public void run() {
while(on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i = "+i);
}
public void cancel() {
on = false;
}
}
}

运行结果:

1
2
Count i = 466312256
Count i = 481484154

示例在执行过程中,main线程通过中断操作和cancel()方法均可使thread得以终止。这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全。

线程间通信

线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

volatile与synchronized关键字

Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个 变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。

  • 关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。

举个例子:定义一个表示程序是否运行的成员变量boolean on=true,那么另一个线程可能对它执行关闭动作(on=false),这里涉及多个线程对变量的访问,因此需要将其定义成为 volatile boolean on=true,这样其他线程对它进行改变时,可以让所有线程感知到变化,因为所有对on变量的访问和修改都需要以共享内存为准。但是,过多地使用volatile是不必要的,因为它会降低程序执行的效率。

  • 关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

使用了同步块和同步方法,通过使用javap工具查看生成的class文件信息来分析synchronized关键字的实现细节,示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package Second;

public class Synchronized {

public static void main(String[] args) {
//对synchronized加锁
synchronized (Synchronized.class) {
}
//同步静态方法,对该class的对象进行加锁
s();
}
public synchronized static void s() {
}
}

在Synchronized.class同级目录执行javap –v Synchronized.class,部分相关输出如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Code:
stack=2, locals=1, args_size=1
0: ldc #1 // class Second/Synchronized
2: dup
3: monitorenter
4: monitorexit
5: invokestatic #16 // Method s:()V
8: return
LineNumberTable:
line 7: 0
line 10: 5
line 11: 8
LocalVariableTable:
Start Length Slot Name Signature
0 9 0 args [Ljava/lang/String;

public static synchronized void s();
descriptor: ()V
flags: (0x0029) ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=0, args_size=0
0: return

上面class信息中,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED 状态。下图描述了对象、对象的监视器、同步队列和执行线程之间的关系:
C3tn2.png

等待/通知机制

一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦体系结构上具备了良好的伸缩性,但是在Java语言中如何实现类似的功能呢?
等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上,方法和描述如表所示:
C3NYe.png

等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package Second;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class WaitNotify {

static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread wait = new Thread(new Wait(), "waitThread");
wait.start();
TimeUnit.SECONDS.sleep(2);
Thread Notify = new Thread(new Notify(), "notifyThred");
Notify.start();
}

static class Wait implements Runnable{
public void run() {
//加锁的monitor
synchronized(lock) {
//条件不足时,继续wait 释放锁
while(flag) {
try {
System.out.println(Thread.currentThread()+"flag is true, wait on "+new SimpleDateFormat("HH:MM:SS").format(new Date()));
lock.wait();
} catch (InterruptedException e) {}
}
//条件满足时完成任务
System.out.println(Thread.currentThread()+"flag is false, wait on "+new SimpleDateFormat("HH:MM:SS").format(new Date()));
}
}
}
static class Notify implements Runnable{
public void run() {
//拥有加锁的monitor
synchronized(lock) {
//获取lock的锁然后进行通知,通知时不会释放lock的锁
//知道线程释放了lock之后 Wait线程才能从wait方法中返回
System.out.println(Thread.currentThread()+"Hold lock notify "+new SimpleDateFormat("HH:MM:SS").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
//再次加锁
synchronized(lock) {
System.out.println(Thread.currentThread()+"Hold lock again and sleep"+new SimpleDateFormat("HH:MM:SS").format(new Date()));
SleepUtils.second(5);
}
}
}
}

程序运行的结果如下:

1
2
3
4
Thread[waitThread,5,main]flag is true, wait on 20:07::698
Thread[notifyThred,5,main]Hold lock notify 20:07:596
Thread[notifyThred,5,main]Hold lock again and sleep20:07:601
Thread[waitThread,5,main]flag is false, wait on 20:07:605

上述第3行和第4行输出的顺序可能会互换,而上述例子主要说明了调用wait()、notify()以 及notifyAll()时需要注意的细节,如下:

  • 使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
  • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
  • notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
  • notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为获得了调用对象的锁。

从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改。

过程图:
C9GsM.png
WaitThread首先获取了对象的锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。

等待/通知的基本范式

等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)。
等待方遵循如下原则:

  • 获取对象的锁;
  • 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
  • 条件满足则执行对应的逻辑。

对应的伪代码如下:

1
2
3
4
5
6
synchronized(对象){
while(条件不满足){
对象.wait();
}
对应的处理逻辑
}

通知方遵循如下原则:

  • 获得对象的锁。
  • 改变条件。
  • 通知所有等待在对象上的线程。

对应的伪代码如下:

1
2
3
4
synchronized(对象){
改变条件;
对象.notifyAll();
}

管道输入/输出流

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、 PipedReader和PipedWriter,前两种面向字节,而后两种面向字符

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package Second;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

public class Piped {

public static void main(String[] args) throws IOException {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
//将输入输出流进行连接,否则会抛出IOException
out.connect(in);
Thread printthread = new Thread(new Print(in), "PrintThread");
printthread.start();
int received = 0;
try {
while((received = System.in.read())!=-1) {
out.write(received);
}
}finally {
out.close();
}
}
static class Print implements Runnable{
PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
public void run() {
int received = 0;
try {
while((received = in.read())!=-1) {
System.out.print((char)received);
}
}catch(IOException e) {
}
}
}
}

运行结果如下:

1
2
Easonhe
Easonhe

例子中创建了printThread,它用来接受main线程的输入,任何main线程的输入均通过PipedWriter写入,而printThread在另一端通过PipedReader将内容读出并打印。对于Piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输入/输出流绑定起来,对于该流的访问将会抛出异常。

Thread.join()的使用

如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回

实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package Second;

import java.util.concurrent.TimeUnit;

public class Join {

public static void main(String[] args) throws InterruptedException {
Thread previous = Thread.currentThread();
for(int i=0;i<10;i++) {
Thread domino = new Thread(new Domino(previous), String.valueOf(i));
domino.start();
previous = domino;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+" terminate");
}

static class Domino implements Runnable{
//每一个线程拥有前一个线程的引用,需要等待前一个线程的终止才能从wait中返回
Thread previous;
public Domino(Thread previous) {
this.previous = previous;
}
public void run() {
try {
previous.join();
} catch(InterruptedException e) {
}
System.out.println(Thread.currentThread().getName()+" terminate");
}
}
}

程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
main terminate
0 terminate
1 terminate
2 terminate
3 terminate
4 terminate
5 terminate
6 terminate
7 terminate
8 terminate
9 terminate

从上述输出可以看到,每个线程终止的前提是前驱线程的终止,每个线程等待前驱线程终止后,才从join()方法返回,这里涉及了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)。

JDK中Thread.join()方法的源码(进行了部分调整):

1
2
3
4
5
6
7
// 加锁当前线程对象 
public final synchronized void join() throws InterruptedException { // 条件不满足,继续等待
while (isAlive()) {
wait(0);
}
// 条件符合,方法返回
}

当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。可以看到join()方法的逻辑结构与等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤。

ThreadLocal的使用

ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package Second;

import java.util.concurrent.TimeUnit;

public class Profiler {

private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>();
public long initialValue() {
return System.currentTimeMillis();
}
public static final void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end() {
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) throws InterruptedException {
Profiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("cost : "+Profiler.end()+" mills.");
}
}

执行结果如下:

1
cost : 1003 mills.

Profiler可以被复用在方法调用耗时统计的功能上,在方法的入口前执行begin()方法,在方法调用后执行end()方法,好处是两个方法的调用不用在一个方法或者类中,比如在AOP(面向方面编程)中,可以在方法调用前的切入点执行begin()方法,而在方法调用后的切入点执行end()方法,这样依旧可以获得方法的执行耗时。

线程应用实例

等待超时模式

经典问题:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。前面的章节介绍了等待/通知的经典范式,即加锁、条件循环和处理逻辑3个步骤,而这种范式无法做到超时等待。,只需要对经典范式做出非常小的改动,就能实现超时等待的加入。

改动内容
假设超时时间段是T,那么可以推断出在当前时间now+T之后就会超时。定义如下变量:等待持续时间:REMAINING=T超时时间:FUTURE=now+T这时仅需要wait(REMAINING)即可,在wait(REMAINING)返回之后会将执行: REMAINING=FUTURE–now。如果REMAINING小于等于0,表示已经超时,直接退出,否则将 继续执行wait(REMAINING)。

等待超时模式的伪代码如下:

1
2
3
4
5
6
7
8
9
10
// 对当前对象加锁 
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMillis() + mills;
long remaining = mills//当超时大于0并且result返回值不满足要求
while ((result == null) && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}

可以看出,等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会“永久”阻塞调用者,而是会按照调用者的要求“按时”返回。

小结

多线程技术带来了好处有很多,并且讲述了如何启动和终止线程以及线程的状态,详细阐述了多线程之间进行通信的基本方式和等待/通知经典范式

参考

《Java并发编程的艺术》

如果觉得还不错的话,把它分享给朋友们吧(ง •̀_•́)ง