不可不说的Java锁事

前言

Java提供了种类丰富的锁,每种锁因其特性的不同,在适当的场景下能够展现出非常高的效率。本文旨在对锁相关源码(本文中的源码来自JDK 8和Netty 3.10.6)、使用场景进行举例,为读者介绍主流锁的知识点,以及不同的锁的适用场景。

JAVA主流锁

Java中往往是按照是否含有某一特性来定义锁,我们通过特性将锁进行分组归类,再使用对比的方式进行介绍,帮助大家更快捷的理解相关知识。下面给出本文内容的总体分类目录:

img

1. 乐观锁 VS 悲观锁

乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同角度。在Java和数据库中都有此概念对应的实际应用。

先说概念。对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。Java中,synchronized关键字和Lock的实现类都是悲观锁。

而乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)。

乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的。

img

根据从上面的概念描述我们可以发现:

  • 悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确。
  • 乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。

光说概念有些抽象,我们来看下乐观锁和悲观锁的调用方式示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ------------------------- 悲观锁的调用方式 -------------------------
// synchronized
public synchronized void testMethod() {
// 操作同步资源
}
// ReentrantLock
private ReentrantLock lock = new ReentrantLock(); // 需要保证多个线程使用的是同一个锁
public void modifyPublicResources() {
lock.lock();
// 操作同步资源
lock.unlock();
}

// ------------------------- 乐观锁的调用方式 -------------------------
private AtomicInteger atomicInteger = new AtomicInteger(); // 需要保证多个线程使用的是同一个AtomicInteger
atomicInteger.incrementAndGet(); //执行自增1

通过调用方式示例,我们可以发现悲观锁基本都是在显式的锁定之后再操作同步资源,而乐观锁则直接去操作同步资源。那么,为何乐观锁能够做到不锁定同步资源也可以正确的实现线程同步呢?我们通过介绍乐观锁的主要实现方式 “CAS” 的技术原理来为大家解惑。

CAS全称 Compare And Swap(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。java.util.concurrent包中的原子类就是通过CAS来实现了乐观锁。

CAS算法涉及到三个操作数:

  • 需要读写的内存值 V。
  • 进行比较的值 A。
  • 要写入的新值 B。

当且仅当 V 的值等于 A 时,CAS通过原子方式用新值B来更新V的值(“比较+更新”整体是一个原子操作),否则不会执行任何操作。一般情况下,“更新”是一个不断重试的操作。

之前提到java.util.concurrent包中的原子类,就是通过CAS来实现了乐观锁,那么我们进入原子类AtomicInteger的源码,看一下AtomicInteger的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

根据定义我们可以看出各属性的作用:

  • unsafe: 获取并操作内存的数据。
  • valueOffset: 存储value在AtomicInteger中的偏移量。
  • value: 存储AtomicInteger的int值,该属性需要借助volatile关键字保证其在线程间是可见的。

接下来,我们查看AtomicInteger的自增函数incrementAndGet()的源码时,发现自增函数底层调用的是unsafe.getAndAddInt()。但是由于JDK本身只有Unsafe.class,只通过class文件中的参数名,并不能很好的了解方法的作用,所以我们通过OpenJDK 8 来查看Unsafe的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ------------------------- JDK 8 -------------------------
// AtomicInteger 自增方法
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}

// ------------------------- OpenJDK 8 -------------------------
// Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}

根据OpenJDK 8的源码我们可以看出,getAndAddInt()循环获取给定对象o中的偏移量处的值v,然后判断内存值是否等于v。如果相等则将内存值设置为 v + delta,否则返回false,继续循环进行重试,直到设置成功才能退出循环,并且将旧值返回。整个“比较+更新”操作封装在compareAndSwapInt()中,在JNI里是借助于一个CPU指令完成的,属于原子操作,可以保证多个线程都能够看到同一个变量的修改值。

后续JDK通过CPU的cmpxchg指令,去比较寄存器中的 A 和 内存中的值 V。如果相等,就把要写入的新值 B 存入内存中。如果不相等,就将内存值 V 赋值给寄存器中的值 A。然后通过Java代码中的while循环再次调用cmpxchg指令进行重试,直到设置成功为止。

CAS虽然很高效,但是它也存在三大问题,这里也简单说一下:

  1. ABA问题

    。CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。

    • JDK从1.5开始提供了AtomicStampedReference类来解决ABA问题,具体操作封装在compareAndSet()中。compareAndSet()首先检查当前引用和当前标志与预期引用和预期标志是否相等,如果都相等,则以原子方式将引用值和标志的值设置为给定的更新值。
  2. 循环时间长开销大。CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销。

  3. 只能保证一个共享变量的原子操作

    。对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。

    • Java从1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。

2. 自旋锁 VS 适应性自旋锁

在介绍自旋锁前,我们需要介绍一些前提知识来帮助大家明白自旋锁的概念。

阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。

在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。

而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。

img

自旋锁本身是有缺点的,它不能代替阻塞。自旋等待虽然避免了线程切换的开销,但它要占用处理器时间。如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。所以,自旋等待的时间必须要有一定的限度,如果自旋超过了限定次数(默认是10次,可以使用-XX:PreBlockSpin来更改)没有成功获得锁,就应当挂起线程。

自旋锁的实现原理同样也是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功。

1
2
3
4
5
6
7
8
9
// sun.misc.Unsafe#getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

自旋锁在JDK1.4.2中引入,使用-XX:+UseSpinning来开启。JDK 6中变为默认开启,并且引入了自适应的自旋锁(适应性自旋锁)。

自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。

在自旋锁中 另有三种常见的锁形式:TicketLock、CLHlock和MCSlock,本文中仅做名词介绍,不做深入讲解,感兴趣的同学可以自行查阅相关资料。

3. 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁

这四种锁是指锁的状态,专门针对synchronized的。在介绍这四种锁状态之前还需要介绍一些额外的知识。

首先为什么Synchronized能实现线程同步?

在回答这个问题之前我们需要了解两个重要的概念:“Java对象头”、“Monitor”。

Java对象头

synchronized是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在Java对象头里的,而Java对象头又是什么呢?

我们以Hotspot虚拟机为例,Hotspot的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。

Mark Word:默认存储对象的HashCode,分代年龄和锁标志位信息。这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。

Klass Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

Monitor

Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。

Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

现在话题回到synchronized,synchronized通过Monitor来实现线程同步,Monitor是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步。

如同我们在自旋锁中提到的“阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长”。这种方式就是synchronized最初实现同步的方式,这就是JDK 6之前synchronized效率低的原因。这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”,JDK 6中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。

所以目前锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级。

通过上面的介绍,我们对synchronized的加锁机制以及相关知识有了一个了解,那么下面我们给出四种锁状态对应的的Mark Word内容,然后再分别讲解四种锁状态的思路以及特点:

锁状态 存储内容 存储内容
无锁 对象的hashCode、对象分代年龄、是否是偏向锁(0) 01
偏向锁 偏向线程ID、偏向时间戳、对象分代年龄、是否是偏向锁(1) 01
轻量级锁 指向栈中锁记录的指针 00
重量级锁 指向互斥量(重量级锁)的指针 10

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。

在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。

当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。

轻量级锁

是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。

如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。

整体的锁状态升级流程如下:

img

综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

4. 公平锁 VS 非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。

非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

直接用语言描述可能有点抽象,这里作者用从别处看到的一个例子来讲述一下公平锁和非公平锁。

img

如上图所示,假设有一口水井,有管理员看守,管理员有一把锁,只有拿到锁的人才能够打水,打完水要把锁还给管理员。每个过来打水的人都要管理员的允许并拿到锁之后才能去打水,如果前面有人正在打水,那么这个想要打水的人就必须排队。管理员会查看下一个要去打水的人是不是队伍里排最前面的人,如果是的话,才会给你锁让你去打水;如果你不是排第一的人,就必须去队尾排队,这就是公平锁。

但是对于非公平锁,管理员对打水的人没有要求。即使等待队伍里有排队等待的人,但如果在上一个人刚打完水把锁还给管理员而且管理员还没有允许等待队伍里下一个人去打水时,刚好来了一个插队的人,这个插队的人是可以直接从管理员那里拿到锁去打水,不需要排队,原本排队等待的人只能继续等待。如下图所示:

img

接下来我们通过ReentrantLock的源码来讲解公平锁和非公平锁。

img

根据代码可知,ReentrantLock里面有一个内部类Sync,Sync继承AQS(AbstractQueuedSynchronizer),添加锁和释放锁的大部分操作实际上都是在Sync中实现的。它有公平锁FairSync和非公平锁NonfairSync两个子类。ReentrantLock默认使用非公平锁,也可以通过构造器来显示的指定使用公平锁。

下面我们来看一下公平锁与非公平锁的加锁方法的源码:

img

通过上图中的源代码对比,我们可以明显的看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()。

img

再进入hasQueuedPredecessors(),可以看到该方法主要做一件事情:主要是判断当前线程是否位于同步队列中的第一个。如果是则返回true,否则返回false。

综上,公平锁就是通过同步队列来实现多个线程按照申请锁的顺序来获取锁,从而实现公平的特性。非公平锁加锁时不考虑排队等待问题,直接尝试获取锁,所以存在后申请却先获得锁的情况。

5. 可重入锁 VS 非可重入锁

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。下面用示例代码来进行分析:

1
2
3
4
5
6
7
8
9
10
public class Widget {
public synchronized void doSomething() {
System.out.println("方法1执行...");
doOthers();
}

public synchronized void doOthers() {
System.out.println("方法2执行...");
}
}

在上面的代码中,类中的两个方法都是被内置锁synchronized修饰的,doSomething()方法中调用doOthers()方法。因为内置锁是可重入的,所以同一个线程在调用doOthers()时可以直接获得当前对象的锁,进入doOthers()进行操作。

如果是一个不可重入锁,那么当前线程在调用doOthers()之前需要将执行doSomething()时获取当前对象的锁释放掉,实际上该对象锁已被当前线程所持有,且无法释放。所以此时会出现死锁。

而为什么可重入锁就可以在嵌套调用时可以自动获得锁呢?我们通过图示和源码来分别解析一下。

还是打水的例子,有多个人在排队打水,此时管理员允许锁和同一个人的多个水桶绑定。这个人用多个水桶打水时,第一个水桶和锁绑定并打完水之后,第二个水桶也可以直接和锁绑定并开始打水,所有的水桶都打完水之后打水人才会将锁还给管理员。这个人的所有打水流程都能够成功执行,后续等待的人也能够打到水。这就是可重入锁。

img

但如果是非可重入锁的话,此时管理员只允许锁和同一个人的一个水桶绑定。第一个水桶和锁绑定打完水之后并不会释放锁,导致第二个水桶不能和锁绑定也无法打水。当前线程出现死锁,整个等待队列中的所有线程都无法被唤醒。

img

之前我们说过ReentrantLock和synchronized都是重入锁,那么我们通过重入锁ReentrantLock以及非可重入锁NonReentrantLock的源码来对比分析一下为什么非可重入锁在重复调用同步资源时会出现死锁。

首先ReentrantLock和NonReentrantLock都继承父类AQS,其父类AQS中维护了一个同步状态status来计数重入次数,status初始值为0。

当线程尝试获取锁时,可重入锁先尝试获取并更新status值,如果status == 0表示没有其他线程在执行同步代码,则把status置为1,当前线程开始执行。如果status != 0,则判断当前线程是否是获取到这个锁的线程,如果是的话执行status+1,且当前线程可以再次获取锁。而非可重入锁是直接去获取并尝试更新当前status的值,如果status != 0的话会导致其获取锁失败,当前线程阻塞。

释放锁时,可重入锁同样先获取当前status的值,在当前线程是持有锁的线程的前提下。如果status-1 == 0,则表示当前线程所有重复获取锁的操作都已经执行完毕,然后该线程才会真正释放锁。而非可重入锁则是在确定当前线程是持有锁的线程之后,直接将status置为0,将锁释放。

img

6. 独享锁 VS 共享锁

独享锁和共享锁同样是一种概念。我们先介绍一下具体的概念,然后通过ReentrantLock和ReentrantReadWriteLock的源码来介绍独享锁和共享锁。

独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。JDK中的synchronized和JUC中Lock的实现类就是互斥锁。

共享锁是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。

独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。

下图为ReentrantReadWriteLock的部分源码:

img

我们看到ReentrantReadWriteLock有两把锁:ReadLock和WriteLock,由词知意,一个读锁一个写锁,合称“读写锁”。再进一步观察可以发现ReadLock和WriteLock是靠内部类Sync实现的锁。Sync是AQS的一个子类,这种结构在CountDownLatch、ReentrantLock、Semaphore里面也都存在。

在ReentrantReadWriteLock里面,读锁和写锁的锁主体都是Sync,但读锁和写锁的加锁方式不一样。读锁是共享锁,写锁是独享锁。读锁的共享锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升。

那读锁和写锁的具体加锁方式有什么区别呢?在了解源码之前我们需要回顾一下其他知识。 在最开始提及AQS的时候我们也提到了state字段(int类型,32位),该字段用来描述有多少线程获持有锁。

在独享锁中这个值通常是0或者1(如果是重入锁的话state值就是重入的次数),在共享锁中state就是持有锁的数量。但是在ReentrantReadWriteLock中有读、写两把锁,所以需要在一个整型变量state上分别描述读锁和写锁的数量(或者也可以叫状态)。于是将state变量“按位切割”切分成了两个部分,高16位表示读锁状态(读锁个数),低16位表示写锁状态(写锁个数)。如下图所示:

img

了解了概念之后我们再来看代码,先看写锁的加锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); // 取到当前锁的个数
int w = exclusiveCount(c); // 取写锁的个数w
if (c != 0) { // 如果已经有线程持有了锁(c!=0)
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 如果写线程数(w)为0(换言之存在读锁) 或者持有锁的线程不是当前线程就返回失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 如果写入锁的数量大于最大数(65535,2的16次方-1)就抛出一个Error。
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 如果当且写线程数为0,并且当前线程需要阻塞那么就返回失败;或者如果通过CAS增加写线程数失败也返回失败。
return false;
setExclusiveOwnerThread(current); // 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者
return true;
}
  • 这段代码首先取到当前锁的个数c,然后再通过c来获取写锁的个数w。因为写锁是低16位,所以取低16位的最大值与当前的c做与运算( int w = exclusiveCount©; ),高16位和0与运算后是0,剩下的就是低位运算的值,同时也是持有写锁的线程数目。
  • 在取到写锁线程的数目后,首先判断是否已经有线程持有了锁。如果已经有线程持有了锁(c!=0),则查看当前写锁线程的数目,如果写线程数为0(即此时存在读锁)或者持有锁的线程不是当前线程就返回失败(涉及到公平锁和非公平锁的实现)。
  • 如果写入锁的数量大于最大数(65535,2的16次方-1)就抛出一个Error。
  • 如果当且写线程数为0(那么读线程也应该为0,因为上面已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;如果通过CAS增加写线程数失败也返回失败。
  • 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,返回成功!

tryAcquire()除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:必须确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。

因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,然后等待的读写线程才能够继续访问读写锁,同时前次写线程的修改对后续的读写线程可见。

接着是读锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; // 如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

可以看到在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是“1<<16”。所以读写锁才能实现读读的过程共享,而读写、写读、写写的过程互斥。

此时,我们再回头看一下互斥锁ReentrantLock中公平锁和非公平锁的加锁源码:

img

我们发现在ReentrantLock虽然有公平锁和非公平锁两种,但是它们添加的都是独享锁。根据源码所示,当某一个线程调用lock方法获取锁时,如果同步资源没有被其他线程锁住,那么当前线程在使用CAS更新state成功后就会成功抢占该资源。而如果公共资源被占用且不是被当前线程占用,那么就会加锁失败。所以可以确定ReentrantLock无论读操作还是写操作,添加的锁都是都是独享锁。

其他锁细节

synchronized

synchronized 关键字是一把经典的锁,也是我们平时用得最多的。在 JDK1.6 之前, syncronized 是一把重量级的锁,不过随着 JDK 的升级,也在对它进行不断的优化,如今它变得不那么重了,甚至在某些场景下,它的性能反而优于轻量级锁。在加了 syncronized 关键字的方法、代码块中,一次只允许一个线程进入特定代码段,从而避免多线程同时修改同一数据。

synchronized 锁有如下几个特点:

有锁升级过程

在 JDK1.5 (含)之前, synchronized 的底层实现是重量级的,所以之前一致称呼它为”重量级锁”,在 JDK1.5 之后,对 synchronized 进行了各种优化,它变得不那么重了,实现原理就是锁升级的过程。我们先聊聊 1.5 之后的 synchronized 实现原理是怎样的。说到 synchronized 加锁原理,就不得不先说 Java 对象在内存中的布局, Java 对象内存布局如下:

image-20200213202817218

如上图所示,在创建一个对象后,在 JVM 虚拟机( HotSpot )中,对象在 Java 内存中的存储布局 可分为三块:

对象头区域此处存储的信息包括两部分:

1、对象自身的运行时数据( MarkWord )

存储 hashCode、GC 分代年龄、锁类型标记、偏向锁线程 ID 、 CAS 锁指向线程 LockRecord 的指针等, synconized 锁的机制与这个部分( markwork )密切相关,用 markword 中最低的三位代表锁的状态,其中一位是偏向锁位,另外两位是普通锁位。

2、对象类型指针( Class Pointer )

对象指向它的类元数据的指针、 JVM 就是通过它来确定是哪个 Class 的实例。

实例数据区域

此处存储的是对象真正有效的信息,比如对象中所有字段的内容

对齐填充区域

JVM 的实现 HostSpot 规定对象的起始地址必须是 8 字节的整数倍,换句话来说,现在 64 位的 OS 往外读取数据的时候一次性读取 64bit 整数倍的数据,也就是 8 个字节,所以 HotSpot 为了高效读取对象,就做了”对齐”,如果一个对象实际占的内存大小不是 8byte 的整数倍时,就”补位”到 8byte 的整数倍。所以对齐填充区域的大小不是固定的。

当线程进入到 synchronized 处尝试获取该锁时, synchronized 锁升级流程如下:

image-20200213203159163

如上图所示, synchronized 锁升级的顺序为:偏向锁->轻量级锁->重量级锁,每一步触发锁升级的情况如下:

偏向锁

在 JDK1.8 中,其实默认是轻量级锁,但如果设定了 -XX:BiasedLockingStartupDelay = 0 ,那在对一个 Object 做 syncronized 的时候,会立即上一把偏向锁。当处于偏向锁状态时, markwork 会记录当前线程 ID 。

升级到轻量级锁

当下一个线程参与到偏向锁竞争时,会先判断 markword 中保存的线程 ID 是否与这个线程 ID 相等,如果不相等,会立即撤销偏向锁,升级为轻量级锁。每个线程在自己的线程栈中生成一个 LockRecord ( LR ),然后每个线程通过 CAS (自旋)的操作将锁对象头中的 markwork 设置为指向自己的 LR 的指针,哪个线程设置成功,就意味着获得锁。关于 synchronized 中此时执行的 CAS 操作是通过 native 的调用 HotSpot 中 bytecodeInterpreter.cpp 文件 C++ 代码实现的,有兴趣的可以继续深挖。

升级到重量级锁

如果锁竞争加剧(如线程自旋次数或者自旋的线程数超过某阈值, JDK1.6 之后,由 JVM 自己控制该规则),就会升级为重量级锁。此时就会向操作系统申请资源,线程挂起,进入到操作系统内核态的等待队列中,等待操作系统调度,然后映射回用户态。在重量级锁中,由于需要做内核态到用户态的转换,而这个过程中需要消耗较多时间,也就是”重”的原因之一。

可重入

synchronized 拥有强制原子性的内部锁机制,是一把可重入锁。因此,在一个线程使用 synchronized 方法时调用该对象另一个 synchronized 方法,即一个线程得到一个对象锁后再次请求该对象锁,是永远可以拿到锁的。在 Java 中线程获得对象锁的操作是以线程为单位的,而不是以调用为单位的。 synchronized 锁的对象头的 markwork 中会记录该锁的线程持有者和计数器,当一个线程请求成功后, JVM 会记下持有锁的线程,并将计数器计为1。此时其他线程请求该锁,则必须等待。而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增。当线程退出一个 synchronized 方法/块时,计数器会递减,如果计数器为 0 则释放该锁锁。

悲观锁(互斥锁、排他锁)

synchronized 是一把悲观锁(独占锁),当前线程如果获取到锁,会导致其它所有需要锁该的线程等待,一直等待持有锁的线程释放锁才继续进行锁的争抢。

ReentrantLock

ReentrantLock 从字面可以看出是一把可重入锁,这点和 synchronized 一样,但实现原理也与 syncronized 有很大差别,它是基于经典的 AQS(AbstractQueueSyncronized) 实现的, AQS 是基于 volitale 和 CAS 实现的,其中 AQS 中维护一个 valitale 类型的变量 state 来做一个可重入锁的重入次数,加锁和释放锁也是围绕这个变量来进行的。 ReentrantLock 也提供了一些 synchronized 没有的特点,因此比 synchronized 好用。

AQS模型如下图:

image-20200213203315752

ReentrantLock 有如下特点:

可重入

ReentrantLock 和 syncronized 关键字一样,都是可重入锁,不过两者实现原理稍有差别, RetrantLock 利用 AQS 的的 state 状态来判断资源是否已锁,同一线程重入加锁, state 的状态 +1 ; 同一线程重入解锁, state 状态 -1 (解锁必须为当前独占线程,否则异常); 当 state 为 0 时解锁成功。

需要手动加锁、解锁

synchronized 关键字是自动进行加锁、解锁的,而 ReentrantLock 需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成,来手动加锁、解锁。

支持设置锁的超时时间

synchronized 关键字无法设置锁的超时时间,如果一个获得锁的线程内部发生死锁,那么其他线程就会一直进入阻塞状态,而 ReentrantLock 提供 tryLock 方法,允许设置线程获取锁的超时时间,如果超时,则跳过,不进行任何操作,避免死锁的发生。

支持公平/非公平锁

synchronized 关键字是一种非公平锁,先抢到锁的线程先执行。而 ReentrantLock 的构造方法中允许设置 true/false 来实现公平、非公平锁,如果设置为 true ,则线程获取锁要遵循”先来后到”的规则,每次都会构造一个线程 Node ,然后到双向链表的”尾巴”后面排队,等待前面的 Node 释放锁资源。

可中断锁

ReentrantLock 中的 lockInterruptibly() 方法使得线程可以在被阻塞时响应中断,比如一个线程 t1 通过 lockInterruptibly() 方法获取到一个可重入锁,并执行一个长时间的任务,另一个线程通过 interrupt() 方法就可以立刻打断 t1 线程的执行,来获取t1持有的那个可重入锁。而通过 ReentrantLock 的 lock() 方法或者 Synchronized 持有锁的线程是不会响应其他线程的 interrupt() 方法的,直到该方法主动释放锁之后才会响应 interrupt() 方法。

ReentrantReadWriteLock

ReentrantReadWriteLock (读写锁)其实是两把锁,一把是 WriteLock (写锁),一把是读锁, ReadLock 。读写锁的规则是:读读不互斥、读写互斥、写写互斥。在一些实际的场景中,读操作的频率远远高于写操作,如果直接用一般的锁进行并发控制的话,就会读读互斥、读写互斥、写写互斥,效率低下,读写锁的产生就是为了优化这种场景的操作效率。一般情况下独占锁的效率低来源于高并发下对临界区的激烈竞争导致线程上下文切换。因此当并发不是很高的情况下,读写锁由于需要额外维护读锁的状态,可能还不如独占锁的效率高,因此需要根据实际情况选择使用。

ReentrantReadWriteLock 的原理也是基于 AQS 进行实现的,与 ReentrantLock 的差别在于 ReentrantReadWriteLock 锁拥有共享锁、排他锁属性。读写锁中的加锁、释放锁也是基于 Sync (继承于 AQS ),并且主要使用 AQS 中的 state 和 node 中的 waitState 变量进行实现的。实现读写锁与实现普通互斥锁的主要区别在于需要分别记录读锁状态及写锁状态,并且等待队列中需要区别处理两种加锁操作。 ReentrantReadWriteLock 中将 AQS 中的 int 类型的 state 分为高 16 位与第 16 位分别记录读锁和写锁的状态,如下图所示:

image-20200213203511263

WriteLock(写锁)是悲观锁(排他锁、互斥锁)

通过计算 state&((1<<16)-1) ,将 state 的高 16 位全部抹去,因此 state 的低位记录着写锁的重入计数。

获取写锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* 获取写锁
Acquires the write lock.
* 如果此时没有任何线程持有写锁或者读锁,那么当前线程执行CAS操作更新status,
* 若更新成功,则设置读锁重入次数为1,并立即返回
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately, setting the write lock hold count to
* one.
* 如果当前线程已经持有该写锁,那么将写锁持有次数设置为1,并立即返回
* <p>If the current thread already holds the write lock then the
* hold count is incremented by one and the method returns
* immediately.
* 如果该锁已经被另外一个线程持有,那么停止该线程的CPU调度并进入休眠状态,
* 直到该写锁被释放,且成功将写锁持有次数设置为1才表示获取写锁成功
* <p>If the lock is held by another thread then the current
* thread becomes disabled for thread scheduling purposes and
* lies dormant until the write lock has been acquired, at which
* time the write lock hold count is set to one.
*/
public void lock() {
sync.acquire(1);
}
/**
* 该方法为以独占模式获取锁,忽略中断
* 如果调用一次该“tryAcquire”方法更新status成功,则直接返回,代表抢锁成功
* 否则,将会进入同步队列等待,不断执行“tryAcquire”方法尝试CAS更新status状态,直到成功抢到锁
* 其中“tryAcquire”方法在NonfairSync(公平锁)中和FairSync(非公平锁)中都有各自的实现
*
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1、如果读写锁的计数不为0,且持有锁的线程不是当前线程,则返回false
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2、如果持有锁的计数不为0且计数总数超过限定的最大值,也返回false
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3、如果该锁是可重入或该线程在队列中的策略是允许它尝试抢锁,那么该线程就能获取锁
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
//获取读写锁的状态
int c = getState();
//获取该写锁重入的次数
int w = exclusiveCount(c);
//如果读写锁状态不为0,说明已经有其他线程获取了读锁或写锁
if (c != 0) {
//如果写锁重入次数为0,说明有线程获取到读锁,根据“读写锁互斥”原则,返回false
//或者如果写锁重入次数不为0,且获取写锁的线程不是当前线程,根据"写锁独占"原则,返回false
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果写锁可重入次数超过最大次数(65535),则抛异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//到这里说明该线程是重入写锁,更新重入写锁的计数(+1),返回true
// Reentrant acquire
setState(c + acquires);
return true;
}
//如果读写锁状态为0,说明读锁和写锁都没有被获取,会走下面两个分支:
//如果要阻塞或者执行CAS操作更新读写锁的状态失败,则返回false
//如果不需要阻塞且CAS操作成功,则当前线程成功拿到锁,设置锁的owner为当前线程,返回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

释放写锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
//若锁的持有者不是当前线程,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//写锁的可重入计数减掉releases个
int nextc = getState() - releases;
//如果写锁重入计数为0了,则说明写锁被释放了
boolean free = exclusiveCount(nextc) == 0;
if (free)
//若写锁被释放,则将锁的持有者设置为null,进行GC
setExclusiveOwnerThread(null);
//更新写锁的重入计数
setState(nextc);
return free;
}

ReadLock(读锁)是共享锁(乐观锁)

通过计算 state>>>16 进行无符号补 0 ,右移 16 位,因此 state 的高位记录着写锁的重入计数.

读锁获取锁的过程比写锁稍微复杂些,首先判断写锁是否为 0 并且当前线程不占有独占锁,直接返回;否则,判断读线程是否需要被阻塞并且读锁数量是否小于最大值并且比较设置状态成功,若当前没有读锁,则设置第一个读线程 firstReader 和 firstReaderHoldCount ;若当前线程线程为第一个读线程,则增加 firstReaderHoldCount ;否则,将设置当前线程对应的 HoldCounter 对象的值,更新成功后会在 firstReaderHoldCount 中 readHolds ( ThreadLocal 类型的)的本线程副本中记录当前线程重入数,这是为了实现 JDK1.6 中加入的 getReadHoldCount ()方法的,这个方法能获取当前线程重入共享锁的次数( state 中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用 ThreadLocal ,直接往 firstReaderHoldCount 这个成员变量里存重入数,当有第二个线程来的时候,就要动用 ThreadLocal 变量 readHolds 了,每个线程拥有自己的副本,用来保存自己的重入数。

获取读锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* 获取读锁
* Acquires the read lock.
* 如果写锁未被其他线程持有,执行CAS操作更新status值,获取读锁后立即返回
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* 如果写锁被其他线程持有,那么停止该线程的CPU调度并进入休眠状态,直到该读锁被释放
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}
/**
* 该方法为以共享模式获取读锁,忽略中断
* 如果调用一次该“tryAcquireShared”方法更新status成功,则直接返回,代表抢锁成功
* 否则,将会进入同步队列等待,不断执行“tryAcquireShared”方法尝试CAS更新status状态,直到成功抢到锁
* 其中“tryAcquireShared”方法在NonfairSync(公平锁)中和FairSync(非公平锁)中都有各自的实现
* (看这注释是不是和写锁很对称)
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1、如果已经有其他线程获取到了写锁,根据“读写互斥”原则,抢锁失败,返回-1
* 1.If write lock held by another thread, fail.
* 2、如果该线程本身持有写锁,那么看一下是否要readerShouldBlock,如果不需要阻塞,
* 则执行CAS操作更新state和重入计数。
* 这里要注意的是,上面的步骤不检查是否可重入(因为读锁属于共享锁,天生支持可重入)
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3、如果因为CAS更新status失败或者重入计数超过最大值导致步骤2执行失败
* 那就进入到fullTryAcquireShared方法进行死循环,直到抢锁成功
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/

//当前尝试获取读锁的线程
Thread current = Thread.currentThread();
//获取该读写锁状态
int c = getState();
//如果有线程获取到了写锁 ,且获取写锁的不是当前线程则返回失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取读锁的重入计数
int r = sharedCount(c);
//如果读线程不应该被阻塞,且重入计数小于最大值,且CAS执行读锁重入计数+1成功,则执行线程重入的计数加1操作,返回成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果还未有线程获取到读锁,则将firstReader设置为当前线程,firstReaderHoldCount设置为1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果firstReader是当前线程,则将firstReader的重入计数变量firstReaderHoldCount加1
firstReaderHoldCount++;
} else {
//否则说明有至少两个线程共享读锁,获取共享锁重入计数器HoldCounter
//从HoldCounter中拿到当前线程的线程变量cachedHoldCounter,将此线程的重入计数count加1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//如果上面的if条件有一个都不满足,则进入到这个方法里进行死循环重新获取
return fullTryAcquireShared(current);
}
/**
* 用于处理CAS操作state失败和tryAcquireShared中未执行获取可重入锁动作的full方法(补偿方法?)
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* 此代码与tryAcquireShared中的代码有部分相似的地方,
* 但总体上更简单,因为不会使tryAcquireShared与重试和延迟读取保持计数之间的复杂判断
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
//死循环
for (;;) {
//获取读写锁状态
int c = getState();
//如果有线程获取到了写锁
if (exclusiveCount(c) != 0) {
//如果获取写锁的线程不是当前线程,返回失败
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//如果没有线程获取到写锁,且读线程要阻塞
// Make sure we're not acquiring read lock reentrantly
//如果当前线程为第一个获取到读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else { //如果当前线程不是第一个获取到读锁的线程(也就是说至少有有一个线程获取到了读锁)
//
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
/**
*下面是既没有线程获取写锁,当前线程又不需要阻塞的情况
*/
//重入次数等于最大重入次数,抛异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果执行CAS操作成功将读写锁的重入计数加1,则对当前持有这个共享读锁的线程的重入计数加1,然后返回成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

释放读锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放一次共享锁计数
doReleaseShared();//真正释放锁
return true;
}
return false;
}
/**
*此方法表示读锁线程释放锁。
*首先判断当前线程是否为第一个读线程firstReader,
*若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,
若是,则设置第一个读线程firstReader为空,否则,将第一个读线程占有的资源数firstReaderHoldCount减1;
若当前线程不是第一个读线程,
那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),
若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,
如果计数器的计数count小于等于1,则移除当前线程对应的计数器,
如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。
无论何种情况,都会进入死循环,该循环可以确保成功设置状态state
*/
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
if (firstReader == current) { // 当前线程为第一个读线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 读线程占用的资源数为1
firstReader = null;
else // 减少占用的资源
firstReaderHoldCount--;
} else { // 当前线程不为第一个读线程
// 获取缓存的计数器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
// 获取当前线程对应的计数器
rh = readHolds.get();
// 获取计数
int count = rh.count;
if (count <= 1) { // 计数小于等于1
// 移除
readHolds.remove();
if (count <= 0) // 计数小于等于0,抛出异常
throw unmatchedUnlockException();
}
// 减少计数
--rh.count;
}
for (;;) { // 死循环
// 获取状态
int c = getState();
// 获取状态
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 比较并进行设置
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
/**真正释放锁
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

通过分析可以看出:

在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。

在线程持有写锁的况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

LongAdder

在高并发的情况下,我们对一个 Integer 类型的整数直接进行 i++ 的时候,无法保证操作的原子性,会出现线程安全的问题。为此我们会用 juc 下的 AtomicInteger ,它是一个提供原子操作的 Interger 类,内部也是通过 CAS 实现线程安全的。但当大量线程同时去访问时,就会因为大量线程执行 CAS 操作失败而进行空旋转,导致 CPU 资源消耗过多,而且执行效率也不高。 Doug Lea 大神应该也不满意,于是在 JDK1.8 中对 CAS 进行了优化,提供了 LongAdder ,它是基于了 CAS 分段锁的思想实现的。

线程去读写一个 LongAdder 类型的变量时,流程如下:

image-20200213204318608

LongAdder 也是基于 Unsafe 提供的 CAS 操作 +valitale 去实现的。在 LongAdder 的父类 Striped64 中维护着一个 base 变量和一个 cell 数组,当多个线程操作一个变量的时候,先会在这个 base 变量上进行 cas 操作,当它发现线程增多的时候,就会使用 cell 数组。比如当 base 将要更新的时候发现线程增多(也就是调用 casBase 方法更新 base 值失败),那么它会自动使用 cell 数组,每一个线程对应于一个 cell ,在每一个线程中对该 cell 进行 cas 操作,这样就可以将单一 value 的更新压力分担到多个 value 中去,降低单个 value 的 “热度”,同时也减少了大量线程的空转,提高并发效率,分散并发压力。这种分段锁需要额外维护一个内存空间 cells ,不过在高并发场景下,这点成本几乎可以忽略。分段锁是一种优秀的优化思想, juc 中提供的的 ConcurrentHashMap 也是基于分段锁保证读写操作的线程安全。

结语

本文Java中常用的锁以及常见的锁的概念进行了基本介绍,并从源码以及实际应用的角度进行了对比分析。限于篇幅以及个人水平,没有在本篇文章中对所有内容进行深层次的讲解。

其实Java本身已经对锁本身进行了良好的封装,降低了研发同学在平时工作中的使用难度。但是研发同学也需要熟悉锁的底层原理,不同场景下选择最适合的锁。而且源码中的思路都是非常好的思路,也是值得大家去学习和借鉴的。

参考资料

  1. 《Java并发编程艺术》
  2. Java中的锁
  3. Java CAS 原理剖析
  4. Java并发——关键字synchronized解析
  5. Java synchronized原理总结
  6. 聊聊并发(二)——Java SE1.6中的Synchronized
  7. 深入理解读写锁—ReadWriteLock源码分析
  8. 【JUC】JDK1.8源码分析之ReentrantReadWriteLock
  9. Java多线程(十)之ReentrantReadWriteLock深入分析
  10. Java–读写锁的实现原理

转载自美团技术团队,原文地址,在原文基础上有修改.