本文转自Java CAS 和 ABA 问题

Java CAS 和 ABA 问题

  • 独占锁:是一种悲观锁,synchronized 就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。
  • 乐观锁:每次不加锁,假设没有冲突去完成某项操作,如果因为冲突失败就重试,直到成功为止。

CAS 操作

乐观锁用到的机制就是 CAS,Compare and Swap。

CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

非阻塞算法 (nonblocking algorithms)

一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。

现代的 CPU 提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet() 就用这些代替了锁定。

AtomicInteger 示例

拿 AtomicInteger 来研究在没有锁的情况下是如何做到数据正确性的。

1
private volatile int value;

在没有锁的机制下需要借助 volatile 原语,保证线程间的数据是可见的(共享的)。

这样才获取变量的值的时候才能直接读取。

1
2
3
public final int get() {
return value;
}

然后来看看 ++i 是怎么做到的。

1
2
3
4
5
6
7
8
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}

在这里采用了 CAS 操作,每次从内存中读取数据然后将此数据和 + 1 后的结果进行 CAS 操作,如果成功就返回结果,否则重试直到成功为止。

而 compareAndSet 利用 JNI 来完成 CPU 指令的操作。

1
2
3
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

整体的过程就是这样子的,利用 CPU 的 CAS 指令,同时借助 JNI 来完成 Java 的非阻塞算法。其它原子操作都是利用类似的特性完成的。

而整个 J.U.C 都是建立在 CAS 之上的,因此对于 synchronized 阻塞算法,J.U.C 在性能上有了很大的提升。参考资料的文章中介绍了如果利用 CAS 构建非阻塞计数器、队列等数据结构。

ABA 问题

CAS 看起来很爽,但是会导致 “ABA 问题”。

CAS 算法实现一个重要前提是需要取出内存中某时刻的数据,而在下一时刻比较并替换,那么在这个时间差内类会忽略数据的变化。

比如说一个线程 one 从内存位置 V 中取出 A,这时候另一个线程 two 也从内存中取出 A,并且 two 进行了一些操作变成了 B,然后 two 又将 V 位置的数据变成 A,这时候线程 one 进行 CAS 操作发现内存中仍然是 A,然后 one 操作成功。
尽管线程 one 的 CAS 操作成功,但是不代表这个过程就是没有问题的。如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。因此前面提到的原子操作 AtomicStampedReference/AtomicMarkableReference 就很有用了。这允许一对变化的元素进行原子操作。

在运用 CAS 做 Lock-Free 操作中有一个经典的 ABA 问题:

线程 1 准备用 CAS 将变量的值由 A 替换为 B,在此之前,线程 2 将变量的值由 A 替换为 C,又由 C 替换为 A,然后线程 1 执行 CAS 时发现变量的值仍然为 A,所以 CAS 成功。但实际上这时的现场已经和最初不同了,尽管 CAS 成功,但可能存在潜藏的问题,例如下面的例子:
20201230142455
现有一个用单向链表实现的堆栈,栈顶为 A,这时线程 T1 已经知道 A.next 为 B,然后希望用 CAS 将栈顶替换为 B:

1
head.compareAndSet(A,B);

在 T1 执行上面这条指令之前,线程 T2 介入,将 A、B 出栈,再 pushD、C、A,此时堆栈结构如下图,而对象 B 此时处于游离状态:
20201230142512
此时轮到线程 T1 执行 CAS 操作,检测发现栈顶仍为 A,所以 CAS 成功,栈顶变为 B,但实际上 B.next 为 null,所以此时的情况变为:
20201230142545
其中堆栈中只有 B 一个元素,C 和 D 组成的链表不再存在于堆栈中,平白无故就把 C、D 丢掉了。

以上就是由于 ABA 问题带来的隐患,各种乐观锁的实现中通常都会用版本戳 version 来对记录或对象标记,避免并发操作带来的问题,在 Java 中,AtomicStampedReference 也实现了这个作用,它通过包装 [E,Integer] 的元组来对对象标记版本戳 stamp,从而避免 ABA 问题,例如下面的代码分别用 AtomicInteger 和 AtomicStampedReference 来对初始值为 100 的原子整型变量进行更新,AtomicInteger 会成功执行 CAS 操作,而加上版本戳的 AtomicStampedReference 对于 ABA 问题会执行 CAS 失败:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package concur.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class ABA {

private static AtomicInteger atomicInt = new AtomicInteger(100);
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<Integer>(100, 0);

public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(new Runnable() {
@Override
public void run() {
atomicInt.compareAndSet(100, 101);
atomicInt.compareAndSet(101, 100);
}
});

Thread intT2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println(c3); //true
}
});

intT1.start();
intT2.start();
intT1.join();
intT2.join();

Thread refT1 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedRef.compareAndSet(100, 101,
atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
atomicStampedRef.compareAndSet(101, 100,
atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
}
});

Thread refT2 = new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedRef.getStamp();
System.out.println("before sleep : stamp = " + stamp); // stamp = 0
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep : stamp = " + atomicStampedRef.getStamp());//stamp = 2
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp+1);
System.out.println(c3); //false
}
});

refT1.start();
refT2.start();
}

}

Reference:

非阻塞算法简介 >流行的原子 >深入浅出 Java Concurrency (5): 原子操作 part 4 >用 AtomicStampedReference 解决 ABA 问题