Skip to content
On this page

lock

synchronized

并发编程中的三个问题

可见性

可见性概念

可见性(Visibility):是指一个线程对共享变量进行修改,另一个先立即得到修改后的最新值。

可见性演示

案例演示:一个线程根据boolean类型的标记flag, while循环,另一个线程改变这个flag变量的值,另 一个线程并不会停止循环。 小结

java
package com.itheima.concurrent_problem;

/**
 案例演示:
 一个线程对共享变量的修改,另一个线程不能立即得到最新值
 */
public class Test01Visibility {
    // 多个线程都会访问的数据,我们称为线程的共享数据
    private static boolean run = true;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            while (run) {
            }
        });
        t1.start();
        Thread.sleep(1000);
        Thread t2 = new Thread(() -> {
            run = false;
            System.out.println("时间到,线程2设置为false");
        });
        t2.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

并发编程时,会出现可见性问题,当一个线程对共享变量进行了修改,另外的线程并没有立即看到修改 后的最新值。

原子性

原子性概念

原子性(Atomicity):在一次或多次操作中,要么所有的操作都执行并且不会受其他因素干扰而中 断,要么所有的操作都不执行。

原子性演示

案例演示:5个线程各执行1000次 i++;

java
package com.itheima.demo01_concurrent_problem;

import java.util.ArrayList;

/**
 案例演示:5个线程各执行1000次 i++;
 */
public class Test02Atomicity {
    private static int number = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++) {
                number++;
            }
        };
        ArrayList<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(increment);
            t.start();
            ts.add(t);
        }
        for (Thread t : ts) {
            t.join();
        }
        System.out.println("number = " + number);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

使用javap反汇编class文件,得到下面的字节码指令:

其中,对于 number++ 而言(number 为静态变量),实际会产生如下的 JVM 字节码指令:

c
9: getstatic #12 // Field number:I
12: iconst_1
13: iadd
14: putstatic #12 // Field number:I
1
2
3
4

由此可见number++是由多条语句组成,以上多条指令在一个线程的情况下是不会出问题的,但是在多 线程情况下就可能会出现问题。比如一个线程在执行13: iadd时,另一个线程又执行9: getstatic。会导 致两次number++,实际上只加了1。

有序性

有序性概念

有序性(Ordering):是指程序中代码的执行顺序,Java在编译时和运行时会对代码进行优化,会导致 程序最终的执行顺序不一定就是我们编写代码时的顺序。

java

@JCStressTest
@Outcome(id = {"1", "4"}, expect = Expect.ACCEPTABLE, desc = "ok")
@Outcome(id = "0", expect = Expect.ACCEPTABLE_INTERESTING, desc = "danger")
@State
public class OrderMain {


    int num = 0;
    boolean ready = false;

    // 线程一执行的代码
    @Actor
    public void actor1(I_Result r) {
        if (ready) {
            r.r1 = num + num;
        } else {
            r.r1 = 1;
        }
    }

    // 线程2执行的代码
    @Actor
    public void actor2(I_Result r) {
        num = 2;
        ready = true;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

Java内存模型(JMM)

计算机结构

java内存模型

Java Memory Molde (Java内存模型/JMM) ,千万不要和Java内存结构混淆关于“Java内存模型”的权威解释,请参考 https://download.oracle.com/otn-pub/jcp/memory_model1.0-pfd-spec-oth-JSpec/memory_model-1_0-pfd-spec.pdf。 Java内存模型,是Java虚拟机规范中所定义的一种内存模型,Java内存模型是标准化的,屏蔽掉了底层不同计算机的区别。 Java内存模型是一套规范,描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存和从内存中读取变量这样的底层细节,具体如下。

  • 主内存 主内存是所有线程都共享的,都能访问的。所有的共享变量都存储于主内存。
  • 工作内存 每一个线程有自己的工作内存,工作内存只存储该线程对共享变量的副本。线程对变量的所有的操 作(读,取)都必须在工作内存中完成,而不能直接读写主内存中的变量,不同线程之间也不能直接访问对方工作内存中的变量。

image-20220905161333413.png

Java内存模型的作用

Java内存模型是一套在多线程读写共享数据时,对共享数据的可见性、有序性、和原子性的规则和保障。 synchronized,volatile

CPU缓存,内存与Java内存模型的关系

通过对前面的CPU硬件内存架构、Java内存模型以及Java多线程的实现原理的了解,我们应该已经意识到,多线程的执行最终都会映射到硬件处理器上进行执行。

但Java内存模型和硬件内存架构并不完全一致。对于硬件内存来说只有寄存器、缓存内存、主内存的概 念,并没有工作内存和主内存之分,也就是说Java内存模型对内存的划分对硬件内存并没有任何影响,因为JMM只是一种抽象的概念,是一组规则,不管是工作内存的数据还是主内存的数据,对于计算机硬件来说都会存储在计算机主内存中,当然也有可能存储到CPU缓存或者寄存器中,因此总体上来说,Java内存模型和计算机硬件内存架构是一个相互交叉的关系,是一种抽象概念划分与真实物理硬件的交叉。

JMM内存模型与CPU硬件内存架构的关系:

image-20220905161634229.png

小结

Java内存模型是一套规范,描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量 存储到内存和从内存中读取变量这样的底层细节,Java内存模型是对共享数据的可见性、有序性、和原 子性的规则和保障。

主内存与工作内存之间的交互

image-20220905163709238.png

小结

主内存与工作内存之间的数据交互过程

lock -> read -> load -> use -> assign -> store -> write -> unlock

synchronized保证三大特性

synchronized能够保证在同一时刻最多只有一个线程执行该段代码,以达到保证并发安全的效果。

java
public class Main {
    static {
        synchronized (锁对象) {
            // 受保护资源;
        }
    }
}
1
2
3
4
5
6
7

synchronized与原子性

java
/**
 案例演示:5个线程各执行1000次 i++;
 */
public class Test01Atomicity {
    private static int number = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable increment = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    synchronized (Test01Atomicity.class) {
                        number++;
                    }
                }
            }
        };
        ArrayList<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            Thread t = new Thread(increment);
            t.start();
            ts.add(t);
        }
        for (Thread t : ts) {
            t.join();
        }
        System.out.println("number = " + number);
    }


    {
        for (int i = 0; i < 1000; i++) {
            synchronized (Test01Atomicity.class) {
                number++;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

对number++;增加同步代码块后,保证同一时间只有一个线程操作number++;。就不会出现安全问题。

synchronized保证原子性的原理,synchronized保证只有一个线程拿到锁,能够进入同步代码块。

synchronized与可见性

一个线程根据boolean类型的标记flag, while循环,另一个线程改变这个flag变量的值,另 一个线程并不会停止循环。

java
/**
 案例演示:
 一个线程根据boolean类型的标记flag, while循环,另一个线程改变这个flag变量的值,
 另一个线程并不会停止循环.
 */
public class Test01Visibility {
    // 多个线程都会访问的数据,我们称为线程的共享数据
    private static boolean run = true;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            while (run) {
// 增加对象共享数据的打印,println是同步方法
                System.out.println("run = " + run);
            }
        });
        t1.start();
        Thread.sleep(1000);
        Thread t2 = new Thread(() -> {
            run = false;
            System.out.println("时间到,线程2设置为false");
        });
        t2.start();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

image-20220905163709238.png

synchronized保证可见性的原理,执行synchronized时,会对应lock原子操作会刷新工作内存中共享变量的值

synchronized与有序性

为什么要重排序

为了提高程序的执行效率,编译器和CPU会对程序中代码进行重排序。

as-if-serial语义

as-if-serial语义的意思是:不管编译器和CPU如何重排序,必须保证在单线程情况下程序的结果是正确的。

以下数据有依赖关系,不能重排序。

写后读:

java
int a=1;
int b=a;
1
2

写后写:

java
int a=1;
int a=2;
1
2

读后写:

java
int a=1;
int b=a;
int a=2;
1
2
3

编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。但是,如 果操作之间不存在数据依赖关系,这些操作就可能被编译器和处理器重排序。

java
int a=1;
int b=2;
int c=a+b;
1
2
3
使用synchronized保证有序性

synchronized后,虽然进行了重排序,保证只有一个线程会进入同步代码块,也能保证有序性。

synchronized保证有序性的原理,我们加synchronized后,依然会发生重排序,只不过,我们有同步 代码块,可以保证只有一个线程执行同步代码中的代码。保证有序性

synchronized的特性

可重入特性

一个线程可以多次执行synchronized,重复获取同一把锁。

java
public class Demo01 {
    public static void main(String[] args) {
        Runnable sellTicket = new Runnable() {
            @Override
            public void run() {
                synchronized (Demo01.class) {
                    System.out.println("我是run");
                    test01();
                }
            }

            public void test01() {
                synchronized (Demo01.class) {
                    System.out.println("我是test01");
                }
            }
        };
        new Thread(sellTicket).start();
        new Thread(sellTicket).start();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

synchronized的锁对象中有一个计数器(recursions变量)会记录线程获得几次锁

可重入的好处
  1. 可以避免死锁
  2. 可以让我们更好的来封装代码

synchronized是可重入锁,内部锁对象中会有一个计数器记录线程获取几次锁啦,在执行完同步代码块 时,计数器的数量会-1,知道计数器的数量为0,就释放这个锁。

不可中断特性

一个线程获得锁后,另一个线程想要获得锁,必须处于阻塞或等待状态,如果第一个线程不释放锁,第 二个线程会一直阻塞或等待,不可被中断。

java
/*
目标:演示synchronized不可中断
1.定义一个Runnable
2.在Runnable定义同步代码块
3.先开启一个线程来执行同步代码块,保证不退出同步代码块
4.后开启一个线程来执行同步代码块(阻塞状态)
5.停止第二个线程
*/
public class Demo02_Uninterruptible {
    private static Object obj = new Object();

    public static void main(String[] args) throws InterruptedException {
// 1.定义一个Runnable
        Runnable run = () -> {
// 2.在Runnable定义同步代码块
            synchronized (obj) {
                String name = Thread.currentThread().getName();
                System.out.println(name + "进入同步代码块");
// 保证不退出同步代码块
                try {
                    Thread.sleep(888888);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
// 3.先开启一个线程来执行同步代码块
        Thread t1 = new Thread(run);
        t1.start();
        Thread.sleep(1000);
// 4.后开启一个线程来执行同步代码块(阻塞状态)
        Thread t2 = new Thread(run);
        t2.start();
// 5.停止第二个线程
        System.out.println("停止线程前");
        t2.interrupt();
        System.out.println("停止线程后");
        System.out.println(t1.getState());
        System.out.println(t2.getState());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java
/*
目标:演示Lock不可中断和可中断
*/
public class Demo03_Interruptible {
    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
// test01();
        test02();
    }

    // 演示Lock可中断
    public static void test02() throws InterruptedException {
        Runnable run = () -> {
            String name = Thread.currentThread().getName();
            boolean b = false;
            try {
                b = lock.tryLock(3, TimeUnit.SECONDS);
                if (b) {
                    System.out.println(name + "获得锁,进入锁执行");
                    Thread.sleep(88888);
                } else {
                    System.out.println(name + "在指定时间没有得到锁做其他操作");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (b) {
                    lock.unlock();
                    System.out.println(name + "释放锁");
                }
            }
        };
        Thread t1 = new Thread(run);
        t1.start();
        Thread.sleep(1000);
        Thread t2 = new Thread(run);
        t2.start();
// System.out.println("停止t2线程前");
// t2.interrupt();
// System.out.println("停止t2线程后");
//
// Thread.sleep(1000);
// System.out.println(t1.getState());
// System.out.println(t2.getState());
    }

    // 演示Lock不可中断
    public static void test01() throws InterruptedException {
        Runnable run = () -> {
            String name = Thread.currentThread().getName();
            try {
                lock.lock();
                System.out.println(name + "获得锁,进入锁执行");
                Thread.sleep(88888);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(name + "释放锁");
            }
        };
        Thread t1 = new Thread(run);
        t1.start();
        Thread.sleep(1000);
        Thread t2 = new Thread(run);
        t2.start();
        System.out.println("停止t2线程前");
        t2.interrupt();
        System.out.println("停止t2线程后");
        Thread.sleep(1000);
        System.out.println(t1.getState());
        System.out.println(t2.getState());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

不可中断是指,当一个线程获得锁后,另一个线程一直处于阻塞或等待状态,前一个线程不释放锁,后 一个线程会一直阻塞或等待,不可被中断。

synchronized属于不可被中断

Lock的lock方法是不可中断的

Lock的tryLock方法是可中断的

synchronized原理

javap 反汇编

monitorenter
java
public class Demo01 {
    private static Object obj = new Object();

    public static void main(String[] args) {
        synchronized (obj) {
            System.out.println("1");
        }
    }

    public synchronized void test() {
        System.out.println("a");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

我们要看synchronized的原理,但是synchronized是一个关键字,看不到源码。我们可以将class文件进行反汇编。

c
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=4, args_size=1
0: iconst_0
1: istore_1
2: getstatic #2 // Field obj:Ljava/lang/Object;
5: dup
6: astore_2
7: monitorenter
8: iinc 1, 1
11: aload_2
12: monitorexit
13: goto 21
16: astore_3
17: aload_2
18: monitorexit
19: aload_3
20: athrow
21: return
Exception table:
from to target type
8 13 16 any
16 19 16 any
LineNumberTable:
line 8: 0
line 9: 2
line 10: 8
line 11: 11
line 12: 21
LocalVariableTable:
Start Length Slot Name Signature
0 22 0 args [Ljava/lang/String;
2 20 1 number I
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 16
locals = [ class "[Ljava/lang/String;", int, class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
public synchronized void test();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=1, args_size=1
0: getstatic #3 // Field
java/lang/System.out:Ljava/io/PrintStream;
3: ldc #4 // String a
5: invokevirtual #5 // Method
java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
LineNumberTable:
line 15: 0
line 16: 8
LocalVariableTable:
Start Length Slot Name Signature
0 9 0 this
Lcom/itheima/demo04_synchronized_monitor/Demo01;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

image-20220906164334938.png

每一个对象都会和一个监视器monitor关联。监视器被占用时会被锁住,其他线程无法来获 取该monitor。 当JVM执行某个线程的某个方法内部的monitorenter时,它会尝试去获取当前对象对应 的monitor的所有权。其过程如下:

  1. 若monior的进入数为0,线程可以进入monitor,并将monitor的进入数置为1。当前线程成为 monitor的owner(所有者)
  2. 若线程已拥有monitor的所有权,允许它重入monitor,则进入monitor的进入数加1
  3. 若其他线程已经占有monitor的所有权,那么当前尝试获取monitor的所有权的线程会被阻塞,直到monitor的进入数变为0,才能重新尝试获取monitor的所有权。

synchronized的锁对象会关联一个monitor,这个monitor不是我们主动创建的,是JVM的线程执行到这个同步代码块,发现锁对象没有monitor就会创建monitor,monitor内部有两个重要的成员变量owner: 拥有这把锁的线程,recursions会记录线程拥有锁的次数,当一个线程拥有monitor后其他线程只能等待

monitorexit
  1. 能执行monitorexit指令的线程一定是拥有当前对象的monitor的所有权的线程。
  2. 执行monitorexit时会将monitor的进入数减1。当monitor的进入数减为0时,当前线程退出 monitor,不再拥有monitor的所有权,此时其他被这个monitor阻塞的线程可以尝试去获取这个 monitor的所有权

monitorexit插入在方法结束处和异常处,JVM保证每个monitorenter必须有对应的monitorexit。

通过javap反汇编我们看到synchronized使用编程了monitorentor和monitorexit两个指令.每个锁对象 都会关联一个monitor(监视器,它才是真正的锁对象),它内部有两个重要的成员变量owner会保存获得锁 的线程,recursions会保存线程获得锁的次数,当执行到monitorexit时,recursions会-1,当计数器减到0时 这个线程就会释放锁。

深入JVM源码

在HotSpot虚拟机中,monitor是由ObjectMonitor实现的。其源码是用c++来实现的,位于HotSpot虚 拟机源码ObjectMonitor.hpp文件中( src/share/vm/runtime/objectMonitor.hpp)。ObjectMonitor主 要数据结构如下

c
ObjectMonitor() {
    _header = NULL;
    _count = 0;
    _waiters = 0
    _recursions = 0; // 线程的重入次数
    _object = NULL; // 存储该monitor的对象
    _owner = NULL; // 标识拥有该monitor的线程
    _WaitSet = NULL; // 处于wait状态的线程,会被加入到_WaitSet
    _WaitSetLock = 0 ;
    _Responsible = NULL;
    _succ = NULL;
    _cxq = NULL; // 多线程竞争锁时的单向列表
    FreeNext = NULL;
    _EntryList = NULL; // 处于等待锁block状态的线程,会被加入到该列表
    _SpinFreq = 0;
    _SpinClock = 0;
    OwnerIsThread = 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  1. _owner:初始时为NULL。当有线程占有该monitor时,owner标记为该线程的唯一标识。当线程释放monitor时,owner又恢复为NULL。owner是一个临界资源,JVM是通过CAS操作来保证其线程安全的。
  2. _cxq:竞争队列,所有请求锁的线程首先会被放在这个队列中(单向链接)。_cxq是一个临界资源,JVM通过CAS原子指令来修改_cxq队列。修改前_cxq的旧值填入了node的next字段,_cxq指向新值(新线程)。因此_cxq是一个后进先出的stack(栈)。
  3. _EntryList:_cxq队列中有资格成为候选资源的线程会被移动到该队列中。
  4. _WaitSet:因为调用wait方法而被阻塞的线程会被放在该队列中。

每一个Java对象都可以与一个监视器monitor关联,我们可以把它理解成为一把锁,当一个线程想要执行一段被synchronized圈起来的同步方法或者代码块时,该线程得先获取到synchronized修饰的对象对应的monitor。

我们的Java代码里不会显示地去创造这么一个monitor对象,我们也无需创建,事实上可以这么理解:monitor并不是随着对象创建而创建的。我们是通过synchronized修饰符告诉JVM需要为我们的某个对象创建关联的monitor对象。每个线程都存在两个ObjectMonitor对象列表,分别为free和used列表。同时JVM中也维护着global locklist。当线程需要ObjectMonitor对象时,首先从线程自身的free表中申请,若存在则使用,若不存在则从global list中申请。

image-20220907133131608.png

JDK6 synchronized优化

synchronized锁升级过程

高效并发是从JDK 5到JDK 6的一个重要改进,HotSpot虛拟机开发团队在这个版本上花费了大量的精力去实现各种锁优化技术,包括偏向锁( Biased Locking )、轻量级锁( Lightweight Locking )和如适应性自旋(Adaptive Spinning)、锁消除( Lock Elimination)、锁粗化( Lock Coarsening )等,这些技术都是为了在线程之间更高效地共享数据,以及解决竞争问题,从而提高程序的执行效率。

无锁--》偏向锁--》轻量级锁–》重量级锁

Java对象的布局

在JVM中,对象在内存中的布局分为三块区域:对象头、实例数据和对齐填充。如下图所示:

image-20220907140744981.png

image-20220908103300846.png

xml
<dependency>
	<groupId>org.openjdk.jol</groupId>
	<artifactId>jol-core</artifactId>
	<version>0.9</version>
</dependency>
1
2
3
4
5

偏向锁

无锁 > 偏向锁

偏向锁的“偏”,就是偏心的“偏”、偏袒的“偏”,它的意思是这个锁会偏向于第一个获得它的线程,会在对 象头存储锁偏向的线程ID,以后该线程进入和退出同步块时只需要检查是否为偏向锁、锁标志位以及 ThreadID即可。

image-20220908104226302.png

不过一旦出现多个线程竞争时必须撤销偏向锁,所以撤销偏向锁消耗的性能必须小于之前节省下来的 CAS原子操作的性能消耗,不然就得不偿失了。

即只有一个线程进出同步代码块即偏向锁,一旦有多个线程同时访问就存在锁竞争就会升级为轻量级锁

偏向锁原理

当线程第一次访问同步块并获取锁时,

偏向锁处理流程如下:

  1. 虚拟机将会把对象头中的标志位设为“01”,即偏向模式。
  2. 同时使用CAS操作把获取到这个锁的线程的ID记录在对象的Mark Word之中 ,如果CAS操作成功,持有偏向锁的线程以后每次进入这个锁相关的同步块时,虚拟机都可以不再进行任何同步操作,偏向锁的效率高。

持有偏向锁的线程以后每次进入这个锁相关的同步块时,虚拟机都可以不再进行任何同步操作,偏向锁的效率高

偏向锁撤销
  1. 偏向锁的撤销动作必须等待全局安全点
  2. 暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态
  3. 撤销偏向锁,恢复到无锁(标志位为 01)或轻量级锁(标志位为 00)的状态

当锁对象第一次被线程获取的时候,虚拟机将会把对象头中的标志位设为“01”,即偏向模式。同时使用CAS操作把获取到这个锁的线程的ID记录在对象的Mark Word之中 ,如果CAS操作成功,持有偏向锁的线程以后每 次进入这个锁相关的同步块时,虚拟机都可以不再进行任何同步操作,偏向锁的效率高。

好处

偏向锁是在只有一个线程执行同步块时进一步提高性能,适用于一个线程反复获得同一锁的情况。偏向锁可以提高带有同步但无竞争的程序性能。

它同样是一个带有效益权衡性质的优化,也就是说,它并不一定总是对程序运行有利,如果程序中大多数的锁总是被多个不同的线程访问比如线程池,那偏向模式就是多余的。

轻量级锁

轻量级锁是JDK 6之中加入的新型锁机制,它名字中的“轻量级”是相对于使用monitor的传统锁而言的,因此传统的锁机制就称为“重量级”锁。首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的。

引入轻量级锁的目的:在多线程交替执行同步块的情况下,尽量避免重量级锁引起的性能消耗,但是如果多个线程在同一时刻进入临界区,会导致轻量级锁膨胀升级重量级锁,所以轻量级锁的出现并非是要替代重量级锁。

轻量级锁原理

当关闭偏向锁功能或者多个线程竞争偏向锁导致偏向锁升级为轻量级锁,则会尝试获取轻量级锁,其步骤如下: 获取锁

  1. 判断当前对象是否处于无锁状态(hashcode、0、01),如果是,则JVM首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝(官方把这份拷贝加了一个Displaced前缀,即Displaced Mark Word),将对象的Mark Word复制到栈帧中的Lock Record中,将Lock Reocrd中的owner指向当前对象。
  2. JVM利用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,如果成功表示竞争到锁,则将锁标志位变成00,执行同步操作。
  3. 如果失败则判断当前对象的Mark Word是否指向当前线程的栈帧,如果是则表示当前线程已经持有当前对象的锁,则直接执行同步代码块;否则只能说明该锁对象已经被其他线程抢占了,这时轻量级锁需要膨胀为重量级锁,锁标志位变成10,后面等待的线程将会进入阻塞状态。

image-20220908134224601.png

轻量级锁的释放

轻量级锁的释放也是通过CAS操作来进行的,主要步骤如下:

  1. 取出在获取轻量级锁保存在Displaced Mark Word中的数据。
  2. 用CAS操作将取出的数据替换当前对象的Mark Word中,如果成功,则说明释放锁成功。
  3. 如果CAS操作替换失败,说明有其他线程尝试获取该锁,则需要将轻量级锁需要膨胀升级为重量级锁。

对于轻量级锁,其性能提升的依据是“对于绝大部分的锁,在整个生命周期内都是不会存在竞争的”,如果打破这个依据则除了互斥的开销外,还有额外的CAS操作,因此在有多线程竞争的情况下,轻量级锁比重量级锁更慢。

自旋锁

自旋锁原理

前面我们讨论monitor实现锁的时候,知道monitor会阻塞和唤醒线程,线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁的阻塞和唤醒对CPU来说是一件负担很重的工作,这些操作给系统的并发性能带来了很大的压力。如果物理机器有一个以上的处理器,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程“稍等一下”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只需让线程执行一个忙循环(自旋) , 这项技术就是所谓的自旋锁。

自旋等待不能代替阻塞,且先不说对处理器数量的要求,自旋等待本 身虽然避免了线程切换的开销,但它是要占用处理器时间的,因此,如果锁被占用的时间很短,自旋等待的效果就会非常好,反之,如果锁被占用的时间很长。那么自旋的线程只会白白消耗处理器资源,而不会做任何有用的工作,反而会带来性能上的浪费。因此,自旋等待的时间必须要有一定的限度,如果自旋超过了限定的次数仍然没有成功获得锁,就应当使用传统的方式去挂起线程了。自旋次数的默认值 是10次,用户可以使用参数-XX : PreBlockSpin来更改。

适应性自旋锁

在JDK 6中引入了自适应的自旋锁。自适应意味着自旋的时间不再固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100次循环。另外,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能省略掉自旋过程,以避免浪费处理器资源。

ReentrantLock

队列同步器AQS

**注意:**难度巨大,如果对锁的使用不是很熟悉建议之后再来看!

前面我们了解了可重入锁和读写锁,那么它们的底层实现原理到底是什么样的呢?又是大家看到就想跳过的套娃解析环节。

比如我们执行了ReentrantLock的lock()方法,那它的内部是怎么在执行的呢?

java
public void lock() {
    sync.lock();
}
1
2
3

可以看到,它的内部实际上啥都没做,而是交给了Sync对象在进行,并且,不只是这个方法,其他的很多方法都是依靠Sync对象在进行:

java
public void unlock() {
    sync.release(1);
}
1
2
3

那么这个Sync对象是干什么的呢?可以看到,公平锁和非公平锁都是继承自Sync,而Sync是继承自AbstractQueuedSynchronizer,简称队列同步器:

java
abstract static class Sync extends AbstractQueuedSynchronizer {
   //...
}

static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
1
2
3
4
5
6

所以,要了解它的底层到底是如何进行操作的,还得看队列同步器,我们就先从这里下手吧!

底层实现

AbstractQueuedSynchronizer(下面称为AQS)是实现锁机制的基础,它的内部封装了包括锁的获取、释放、以及等待队列。

一个锁(排他锁为例)的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列,AQS已经将这些基本的功能封装完成了,其中等待队列是核心内容,等待队列是由双向链表数据结构实现的,每个等待状态下的线程都可以被封装进结点中并放入双向链表中,而对于双向链表是以队列的形式进行操作的,它像这样:

image-20220306162015545

AQS中有一个head字段和一个tail字段分别记录双向链表的头结点和尾结点,而之后的一系列操作都是围绕此队列来进行的。我们先来了解一下每个结点都包含了哪些内容:

java
//每个处于等待状态的线程都可以是一个节点,并且每个节点是有很多状态的
static final class Node {
  	//每个节点都可以被分为独占模式节点或是共享模式节点,分别适用于独占锁和共享锁
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

  	//等待状态,这里都定义好了
   	//唯一一个大于0的状态,表示已失效,可能是由于超时或中断,此节点被取消。
    static final int CANCELLED =  1;
  	//此节点后面的节点被挂起(进入等待状态)
    static final int SIGNAL    = -1;	
  	//在条件队列中的节点才是这个状态
    static final int CONDITION = -2;
  	//传播,一般用于共享锁
    static final int PROPAGATE = -3;

    volatile int waitStatus;    //等待状态值
    volatile Node prev;   //双向链表基操
    volatile Node next;
    volatile Thread thread;   //每一个线程都可以被封装进一个节点进入到等待队列
  
    Node nextWaiter;   //在等待队列中表示模式,条件队列中作为下一个结点的指针

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

在一开始的时候,headtail都是nullstate为默认值0

java
private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;
1
2
3
4
5

不用担心双向链表不会进行初始化,初始化是在实际使用时才开始的,先不管,我们接着来看其他的初始化内容:

java
//直接使用Unsafe类进行操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
//记录类中属性的在内存中的偏移地址,方便Unsafe类直接操作内存进行赋值等(直接修改对应地址的内存)
private static final long stateOffset;   //这里对应的就是AQS类中的state成员字段
private static final long headOffset;    //这里对应的就是AQS类中的head头结点成员字段
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {   //静态代码块,在类加载的时候就会自动获取偏移地址
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

//通过CAS操作来修改头结点
private final boolean compareAndSetHead(Node update) {
  	//调用的是Unsafe类的compareAndSwapObject方法,通过CAS算法比较对象并替换
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

//同上,省略部分代码
private final boolean compareAndSetTail(Node expect, Node update) {

private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {

private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装。

现在我们大致了解了一下它的底层运作机制,我们接着来看这个类是如何进行使用的,它提供了一些可重写的方法(根据不同的锁类型和机制,可以自由定制规则,并且为独占式和非独占式锁都提供了对应的方法),以及一些已经写好的模板方法(模板方法会调用这些可重写的方法),使用此类只需要将可重写的方法进行重写,并调用提供的模板方法,从而实现锁功能(学习过设计模式会比较好理解一些)

我们首先来看可重写方法:

java
//独占式获取同步状态,查看同步状态是否和参数一致,如果返没有问题,那么会使用CAS操作设置同步状态并返回true
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

//独占式释放同步状态
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

//共享式获取同步状态,返回值大于0表示成功,否则失败
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

//共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

//是否在独占模式下被当前线程占用(锁是否被当前线程持有)
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException,也就是说根据不同的锁类型,我们需要去实现对应的方法,我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的:

java
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

  	//加锁操作调用了模板方法acquire
  	//为了防止各位绕晕,请时刻记住,lock方法一定是在某个线程下为了加锁而调用的,并且同一时间可能会有其他线程也在调用此方法
    final void lock() {
        acquire(1);
    }

    // ...
}
1
2
3
4
5
6
7
8
9
10
11

我们先看看加锁操作干了什么事情,这里直接调用了AQS提供的模板方法acquire(),我们来看看它在AQS类中的实现细节:

java
@ReservedStackAccess //这个是JEP 270添加的新注解,它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出,下同
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))   //节点为独占模式Node.EXCLUSIVE
        selfInterrupt();
}
1
2
3
4
5
6

首先会调用tryAcquire()方法(这里是由FairSync类实现的),如果尝试加独占锁失败(返回false了)说明可能这个时候有其他线程持有了此独占锁,所以当前线程得先等着,那么会调用addWaiter()方法将线程加入等待队列中:

java
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试使用CAS直接入队,如果这个时候其他线程也在入队(就是不止一个线程在同一时间争抢这把锁)就进入enq()
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
  	//此方法是CAS快速入队失败时调用
    enq(node);
    return node;
}

private Node enq(final Node node) {
  	//自旋形式入队,可以看到这里是一个无限循环
    for (;;) {
        Node t = tail;
        if (t == null) {  //这种情况只能说明头结点和尾结点都还没初始化
            if (compareAndSetHead(new Node()))   //初始化头结点和尾结点
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;   //只有CAS成功的情况下,才算入队成功,如果CAS失败,那说明其他线程同一时间也在入队,并且手速还比当前线程快,刚好走到CAS操作的时候,其他线程就先入队了,那么这个时候node.prev就不是我们预期的节点了,而是另一个线程新入队的节点,所以说得进下一次循环再来一次CAS,这种形式就是自旋
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

在了解了addWaiter()方法会将节点加入等待队列之后,我们接着来看,addWaiter()会返回已经加入的节点,acquireQueued()在得到返回的节点时,也会进入自旋状态,等待唤醒(也就是开始进入到拿锁的环节了):

java
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {   //可以看到当此节点位于队首(node.prev == head)时,会再次调用tryAcquire方法获取锁,如果获取成功,会返回此过程中是否被中断的值
                setHead(node);    //新的头结点设置为当前结点
                p.next = null; // 原有的头结点没有存在的意义了
                failed = false;   //没有失败
                return interrupted;   //直接返回等待过程中是否被中断
            }	
          	//依然没获取成功,
            if (shouldParkAfterFailedAcquire(p, node) &&   //将当前节点的前驱节点等待状态设置为SIGNAL,如果失败将直接开启下一轮循环,直到成功为止,如果成功接着往下
                parkAndCheckInterrupt())   //挂起线程进入等待状态,等待被唤醒,如果在等待状态下被中断,那么会返回true,直接将中断标志设为true,否则就是正常唤醒,继续自旋
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);   //通过unsafe类操作底层挂起线程(会直接进入阻塞状态)
    return Thread.interrupted();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;   //已经是SIGNAL,直接true
    if (ws > 0) {   //不能是已经取消的节点,必须找到一个没被取消的
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;   //直接抛弃被取消的节点
    } else {
        //不是SIGNAL,先CAS设置为SIGNAL(这里没有返回true因为CAS不一定成功,需要下一轮再判断一次)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;   //返回false,马上开启下一轮循环
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

所以,acquire()中的if条件如果为true,那么只有一种情况,就是等待过程中被中断了,其他任何情况下都是成功获取到独占锁,所以当等待过程被中断时,会调用selfInterrupt()方法:

java
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
1
2
3

这里就是直接向当前线程发送中断信号了。

上面提到了LockSupport类,它是一个工具类,我们也可以来玩一下这个parkunpark:

java
public static void main(String[] args) throws InterruptedException {
    Thread t = Thread.currentThread();  //先拿到主线程的Thread对象
    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("主线程可以继续运行了!");
            LockSupport.unpark(t);
          	//t.interrupt();   发送中断信号也可以恢复运行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    System.out.println("主线程被挂起!");
    LockSupport.park();
    System.out.println("主线程继续运行!");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

这里我们就把公平锁的lock()方法实现讲解完毕了(让我猜猜,已经晕了对吧,越是到源码越考验个人的基础知识掌握,基础不牢地动山摇)接着我们来看公平锁的tryAcquire()方法:

java
static final class FairSync extends Sync {
  	//可重入独占锁的公平实现
    @ReservedStackAccess
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();   //先获取当前线程的Thread对象
        int c = getState();     //获取当前AQS对象状态(独占模式下0为未占用,大于0表示已占用)
        if (c == 0) {       //如果是0,那就表示没有占用,现在我们的线程就要来尝试占用它
            if (!hasQueuedPredecessors() &&    //等待队列是否不为空且当前线程没有拿到锁,其实就是看看当前线程有没有必要进行排队,如果没必要排队,就说明可以直接获取锁
                compareAndSetState(0, acquires)) {   //CAS设置状态,如果成功则说明成功拿到了这把锁,失败则说明可能这个时候其他线程在争抢,并且还比你先抢到
                setExclusiveOwnerThread(current);    //成功拿到锁,会将独占模式所有者线程设定为当前线程(这个方法是父类AbstractOwnableSynchronizer中的,就表示当前这把锁已经是这个线程的了)
                return true;   //占用锁成功,返回true
            }
        }
        else if (current == getExclusiveOwnerThread()) {   //如果不是0,那就表示被线程占用了,这个时候看看是不是自己占用的,如果是,由于是可重入锁,可以继续加锁
            int nextc = c + acquires;    //多次加锁会将状态值进行增加,状态值就是加锁次数
            if (nextc < 0)   //加到int值溢出了?
                throw new Error("Maximum lock count exceeded");
            setState(nextc);   //设置为新的加锁次数
            return true;
        }
        return false;   //其他任何情况都是加锁失败
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。

加锁过程已经OK,我们接着来看,它的解锁过程,unlock()方法是在AQS中实现的:

java
public void unlock() {
    sync.release(1);    //直接调用了AQS中的release方法,参数为1表示解锁一次state值-1
}
1
2
3
java
@ReservedStackAccess
public final boolean release(int arg) {
    if (tryRelease(arg)) {   //和tryAcquire一样,也得子类去重写,释放锁操作
        Node h = head;    //释放锁成功后,获取新的头结点
        if (h != null && h.waitStatus != 0)   //如果新的头结点不为空并且不是刚刚建立的结点(初始状态下status为默认值0,而上面在进行了shouldParkAfterFailedAcquire之后,会被设定为SIGNAL状态,值为-1)
            unparkSuccessor(h);   //唤醒头节点下一个节点中的线程
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
java
private void unparkSuccessor(Node node) {
    // 将等待状态waitStatus设置为初始值0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取下一个结点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {   //如果下一个结点为空或是等待状态是已取消,那肯定是不能通知unpark的,这时就要遍历所有节点再另外找一个符合unpark要求的节点了
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)   //这里是从队尾向前,因为enq()方法中的t.next = node是在CAS之后进行的,而 node.prev = t 是CAS之前进行的,所以从后往前一定能够保证遍历所有节点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)   //要是找到了,就直接unpark,要是还是没找到,那就算了
        LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

那么我们来看看tryRelease()方法是怎么实现的,具体实现在Sync中:

java
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;   //先计算本次解锁之后的状态值
    if (Thread.currentThread() != getExclusiveOwnerThread())   //因为是独占锁,那肯定这把锁得是当前线程持有才行
        throw new IllegalMonitorStateException();   //否则直接抛异常
    boolean free = false;
    if (c == 0) {  //如果解锁之后的值为0,表示已经完全释放此锁
        free = true;
        setExclusiveOwnerThread(null);  //将独占锁持有线程设置为null
    }
    setState(c);   //状态值设定为c
    return free;  //如果不是0表示此锁还没完全释放,返回false,是0就返回true
}
1
2
3
4
5
6
7
8
9
10
11
12
13

综上,我们来画一个完整的流程图:

image-20220306141248030

这里我们只讲解了公平锁,有关非公平锁和读写锁,还请各位观众根据我们之前的思路,自行解读。

公平锁一定公平吗?

前面我们讲解了公平锁的实现原理,那么,我们尝试分析一下,在并发的情况下,公平锁一定公平吗?

我们再次来回顾一下tryAcquire()方法的实现:

java
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&   //注意这里,公平锁的机制是,一开始会查看是否有节点处于等待
            compareAndSetState(0, acquires)) {   //如果前面的方法执行后发现没有等待节点,就直接进入占锁环节了
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

所以hasQueuedPredecessors()这个环节容不得半点闪失,否则会直接破坏掉公平性,假如现在出现了这样的情况:

线程1已经持有锁了,这时线程2来争抢这把锁,走到hasQueuedPredecessors(),判断出为 false,线程2继续运行,然后线程2肯定获取锁失败(因为锁这时是被线程1占有的),因此就进入到等待队列中:

java
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 线程2进来之后,肯定是要先走这里的,因为head和tail都是null
            if (compareAndSetHead(new Node()))
                tail = head;   //这里就将tail直接等于head了,注意这里完了之后还没完,这里只是初始化过程
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {   //由于一开始head和tail都是null,所以线程2直接就进enq()了
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);   //请看上面
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

而碰巧不巧,这个时候线程3也来抢锁了,按照正常流程走到了hasQueuedPredecessors()方法,而在此方法中:

java
public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
  	//这里直接判断h != t,而此时线程2才刚刚执行完 tail = head,所以直接就返回false了
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
1
2
3
4
5
6
7
8

因此,线程3这时就紧接着准备开始CAS操作了,又碰巧,这时线程1释放锁了,现在的情况就是,线程3直接开始CAS判断,而线程2还在插入节点状态,结果可想而知,居然是线程3先拿到了锁,这显然是违背了公平锁的公平机制。

一张图就是:

image-20220306155509195

因此公不公平全看hasQueuedPredecessors(),而此方法只有在等待队列中存在节点时才能保证不会出现问题。所以公平锁,只有在等待队列存在节点时,才是真正公平的。

Condition实现原理

通过前面的学习,我们知道Condition类实际上就是用于代替传统对象的wait/notify操作的,同样可以实现等待/通知模式,并且同一把锁下可以创建多个Condition对象。那么我们接着来看看,它又是如何实现的呢,我们先从单个Condition对象进行分析:

在AQS中,Condition有一个实现类ConditionObject,而这里也是使用了链表实现了条件队列:

java
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** 条件队列的头结点 */
    private transient Node firstWaiter;
    /** 条件队列的尾结点 */
    private transient Node lastWaiter;

    //...
}
1
2
3
4
5
6
7
8
9

这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:

image-20220307115850295

我们知道,当一个线程调用await()方法时,会进入等待状态,直到其他线程调用signal()方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。

我们先来看看最关键的await()方法是如何实现的,为了防止一会绕晕,在开始之前,我们先明确此方法的目标:

  • 只有已经持有锁的线程才可以使用此方法
  • 当调用此方法后,会直接释放锁,无论加了多少次锁
  • 只有其他线程调用signal()或是被中断时才会唤醒等待中的线程
  • 被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)

好了,差不多可以上源码了:

java
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();   //如果在调用await之前就被添加了中断标记,那么会直接抛出中断异常
    Node node = addConditionWaiter();    //为当前线程创建一个新的节点,并将其加入到条件队列中
    int savedState = fullyRelease(node);    //完全释放当前线程持有的锁,并且保存一下state值,因为唤醒之后还得恢复
    int interruptMode = 0;     //用于保存中断状态
    while (!isOnSyncQueue(node)) {   //循环判断是否位于同步队列中,如果等待状态下的线程被其他线程唤醒,那么会正常进入到AQS的等待队列中(之后我们会讲)
        LockSupport.park(this);   //如果依然处于等待状态,那么继续挂起
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)   //看看等待的时候是不是被中断了
            break;
    }
  	//出了循环之后,那线程肯定是已经醒了,这时就差拿到锁就可以恢复运行了
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  //直接开始acquireQueued尝试拿锁(之前已经讲过了)从这里开始基本就和一个线程去抢锁是一样的了
        interruptMode = REINTERRUPT;
  	//已经拿到锁了,基本可以开始继续运行了,这里再进行一下后期清理工作
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();  //将等待队列中,不是Node.CONDITION状态的节点移除
    if (interruptMode != 0)   //依然是响应中断
        reportInterruptAfterWait(interruptMode);
  	//OK,接着该干嘛干嘛
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

实际上await()方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:

  • 只有持有锁的线程才能唤醒锁所属的Condition等待的线程
  • 优先唤醒条件队列中的第一个,如果唤醒过程中出现问题,接着找往下找,直到找到一个可以唤醒的
  • 唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中,让其参与到锁的竞争中
  • 拿到锁之后,线程才能恢复运行

image-20220307120449303

好了,上源码:

java
public final void signal() {
    if (!isHeldExclusively())    //先看看当前线程是不是持有锁的状态
        throw new IllegalMonitorStateException();   //不是?那你不配唤醒别人
    Node first = firstWaiter;    //获取条件队列的第一个结点
    if (first != null)    //如果队列不为空,获取到了,那么就可以开始唤醒操作
        doSignal(first);
}
1
2
3
4
5
6
7
java
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)   //如果当前节点在本轮循环没有后继节点了,条件队列就为空了
            lastWaiter = null;   //所以这里相当于是直接清空
        first.nextWaiter = null;   //将给定节点的下一个结点设置为null,因为当前结点马上就会离开条件队列了
    } while (!transferForSignal(first) &&   //接着往下看
             (first = firstWaiter) != null);   //能走到这里只能说明给定节点被设定为了取消状态,那就继续看下一个结点
}
1
2
3
4
5
6
7
8
java
final boolean transferForSignal(Node node) {
    /*
     * 如果这里CAS失败,那有可能此节点被设定为了取消状态
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //CAS成功之后,结点的等待状态就变成了默认值0,接着通过enq方法直接将节点丢进AQS的等待队列中,相当于唤醒并且可以等待获取锁了
  	//这里enq方法返回的是加入之后等待队列队尾的前驱节点,就是原来的tail
    Node p = enq(node);
    int ws = p.waitStatus;   //保存前驱结点的等待状态
  	//如果上一个节点的状态为取消, 或者尝试设置上一个节点的状态为SIGNAL失败(可能是在ws>0判断完之后马上变成了取消状态,导致CAS失败)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);  //直接唤醒线程
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?

这里其实是为了进行优化而编写,直接unpark会有两种情况:

  • 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
  • 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws > 0之后!compareAndSetWaitStatus(p, ws, Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws, Node.SIGNAL) == false

如果这里被提前unpark,那么在await()方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。

所以,大致流程下:

image-20220307131536020

只要把整个流程理清楚,还是很好理解的。

自行实现锁类

既然前面了解了那么多AQS的功能,那么我就仿照着这些锁类来实现一个简单的锁:

  • 要求:同一时间只能有一个线程持有锁,不要求可重入(反复加锁无视即可)
java
public class Main {
    public static void main(String[] args) throws InterruptedException {
        
    }

    /**
     * 自行实现一个最普通的独占锁
     * 要求:同一时间只能有一个线程持有锁,不要求可重入
     */
    private static class MyLock implements Lock {

        /**
         * 设计思路:
         * 1. 锁被占用,那么exclusiveOwnerThread应该被记录,并且state = 1
         * 2. 锁没有被占用,那么exclusiveOwnerThread为null,并且state = 0
         */
        private static class Sync extends AbstractQueuedSynchronizer {
            @Override
            protected boolean tryAcquire(int arg) {
                if(isHeldExclusively()) return true;     //无需可重入功能,如果是当前线程直接返回true
                if(compareAndSetState(0, arg)){    //CAS操作进行状态替换
                    setExclusiveOwnerThread(Thread.currentThread());    //成功后设置当前的所有者线程
                    return true;
                }
                return false;
            }

            @Override
            protected boolean tryRelease(int arg) {
                if(getState() == 0)
                    throw new IllegalMonitorStateException();   //没加锁情况下是不能直接解锁的
                if(isHeldExclusively()){     //只有持有锁的线程才能解锁
                    setExclusiveOwnerThread(null);    //设置所有者线程为null
                    setState(0);    //状态变为0
                    return true;
                }
                return false;
            }

            @Override
            protected boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }

            protected Condition newCondition(){
                return new ConditionObject();    //直接用现成的
            }
        }

        private final Sync sync = new Sync();

        @Override
        public void lock() {
            sync.acquire(1);
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }

        @Override
        public void unlock() {
            sync.release(1);
        }

        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

ReentrantLock 加锁.jpg

Released under the MIT License.