原子操作类之18罗汉增强

1、原子类是什么

  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicIntegerArray
  4. AtomicIntegerFieldUpdater
  5. AtomicLong
  6. AtomicLongArray
  7. AtomicLongFieldUpdater
  8. AtomicMarkableReference
  9. AtomicReference
  10. AtomicReferenceArray
  11. AtomicReferenceFieldUpdater
  12. AtomicStampedReference
  13. DoubleAccumulator
  14. DoubleAdder
  15. LongAccumulator
  16. LongAdder

2、再分类

2.1、基本类型原子类

  • AtomicInteger
  • AtomicBoolean
  • AtomicLong

2.1.1、常用API简介

1
2
3
4
5
6
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)

2.1.2、case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Number {
@Getter
private final AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus() {
atomicInteger.incrementAndGet();
}
}

/**
* @see CountDownLatch
* @see AtomicInteger
*/
public class AtomicIntegerDemo {
public static void main(String[] args) throws InterruptedException {
Number number = new Number();
CountDownLatch countDownLatch = new CountDownLatch(100);

for (int i = 1; i <= 100; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 5000; j++) {
number.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
// 等待所有线程计算完成
countDownLatch.await();
System.out.println(number.getAtomicInteger().get());
}
}

2.2、数组类型原子类

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AtomicIntegerArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});

for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}

System.out.println();
int tmpInt = 0;

tmpInt = atomicIntegerArray.getAndSet(0, 1122);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
atomicIntegerArray.getAndIncrement(1);
atomicIntegerArray.getAndIncrement(1);
tmpInt = atomicIntegerArray.getAndIncrement(1);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));
}
}

2.3、引用类型原子类

2.3.1、AtomicReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @Author XiaoYu
* @Description 自定义原子引用类型
* @Datetime 2022-07-04 14:25:57
*/
public class AtomicReferenceDemo {

public static void main(String[] args) {
User z3 = new User("z3", 24);
User li4 = new User("li4", 26);
AtomicReference<User> atomicReferenceUser = new AtomicReference<>();
atomicReferenceUser.set(z3);
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
}
}

@Getter
@ToString
@AllArgsConstructor
class User {
String userName;
int age;
}

自旋锁SpinLockDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 题目:实现一个自旋锁
* 自旋锁好处:循环比较获取没有类似wait的阻塞
* 通过CAS操作完成自旋锁,A线程先进来调用Lock方法自己持有锁5秒钟,B随后进来后发现
* 当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到。
*/
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
while (!atomicReference.compareAndSet(null, Thread.currentThread())) {
}
System.out.println(Thread.currentThread().getName() + "\t" + "---持有锁成功");
}
public void unLock() {
atomicReference.compareAndSet(Thread.currentThread(), null);
System.out.println(Thread.currentThread().getName() + "\t" + "---释放锁成功");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();

new Thread(() -> {
spinLockDemo.lock();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
spinLockDemo.unLock();
}, "t1").start();

new Thread(() -> {
spinLockDemo.lock();
spinLockDemo.unLock();
}, "t2").start();
}
}

2.3.2、AtomicStampedReference

  • 携带版本号的引用类型原子类,可以解决ABA问题
  • 解决修改过几次
  • 状态戳原子引用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @Author XiaoYu
* @Description AtomicStampedReference
* @Datetime 2022-07-04 14:47:10
*/
public class ABADemo {
static AtomicInteger atomicInteger = new AtomicInteger(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t" + "---默认版本号: " + stamp);
//让后面的t4获得和t3一样的版本号,都是1,好比较
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
atomicStampedReference.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "---1次版本号: " + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "---2次版本号: " + atomicStampedReference.getStamp());
}, "t3").start();


new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t" + "---默认版本号: " + stamp);
//上前面的t3完成ABA问题
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
boolean result = atomicStampedReference.compareAndSet(100, 20210308, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "---操作成功否:" + result + "\t" + atomicStampedReference.getStamp() + "\t" + atomicStampedReference.getReference());
}, "t4").start();
}

public static void abaProblem() {
new Thread(() -> {
atomicInteger.compareAndSet(100, 101);
atomicInteger.compareAndSet(101, 100);
}, "t1").start();

//暂停毫秒
try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }

new Thread(() -> {
boolean b = atomicInteger.compareAndSet(100, 20210308);
System.out.println(Thread.currentThread().getName() + "\t" + "修改成功否:" + b + "\t" + atomicInteger.get());
}, "t2").start();
}
}

2.3.3、AtomicMarkableReference

  • 原子更新带有标记位的引用类型对象
  • 解决是否修改过 它的定义就是将状态戳简化为true|false – 类似一次性筷子

状态戳(true/false)原子引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @see AtomicMarkableReference
*/
public class AtomicMarkableReferenceDemo {

static AtomicMarkableReference<Integer> reference = new AtomicMarkableReference<>(100, false);

public static void main(String[] args) {
new Thread(() -> {
boolean marked = reference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t" + "---默认修改标识:" + marked);
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = reference.compareAndSet(100, 101, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t" + "---操作是否成功:" + b);
System.out.println(Thread.currentThread().getName() + "\t" + reference.getReference());
System.out.println(Thread.currentThread().getName() + "\t" + reference.isMarked());

}, "t1").start();

new Thread(() -> {
boolean marked = reference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t" + "---默认修改标识:" + marked);
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = reference.compareAndSet(100, 20210308, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t" + "---操作是否成功:" + b);
System.out.println(Thread.currentThread().getName() + "\t" + reference.getReference());
System.out.println(Thread.currentThread().getName() + "\t" + reference.isMarked());
}, "t2").start();
}
}

2.4、对象的属性修改原子类

  • AtomicIntegerFieldUpdater
    • 原子更新对象中int类型字段的值
  • AtomicLongFieldUpdater
    • 原子更新对象中Long类型字段的值
  • AtomicReferenceFieldUpdater
    • 原子更新引用类型字段的值

2.4.1、使用目的

以一种线程安全的方式操作非线程安全对象内的某些字段

2.4.2、使用要求

更新的对象属性必须使用 public volatile 修饰符。

因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

2.4.3、AtomicIntegerFieldUpdaterDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class BankAccount {
private final String bankName = "CCB";//银行
public volatile int money = 0;//钱数
static final AtomicIntegerFieldUpdater<BankAccount> updater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");

//不加锁+性能高,局部微创
public void transferMoney(BankAccount bankAccount) {
updater.incrementAndGet(bankAccount);
}
}

/**
* @see AtomicIntegerFieldUpdater
* 以一种线程安全的方式操作非线程安全对象的某些字段。
* 需求:
* 1000个人同时向一个账号转账一元钱,那么累计应该增加1000元,
* 除了synchronized和CAS,还可以使用AtomicIntegerFieldUpdater来实现。
*/
public class AtomicIntegerFieldUpdaterDemo {

public static void main(String[] args)
{
BankAccount bankAccount = new BankAccount();
for (int i = 1; i <= 1000; i++) {
new Thread(() -> bankAccount.transferMoney(bankAccount),String.valueOf(i)).start();
}
//暂停毫秒
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(bankAccount.money);
}
}

2.4.4、AtomicReferenceFieldUpdater

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Var {
public volatile Boolean isInit = Boolean.FALSE;
static final AtomicReferenceFieldUpdater<Var, Boolean> UPDATER = AtomicReferenceFieldUpdater.newUpdater(Var.class, Boolean.class, "isInit");
public void init(Var var) {
if (UPDATER.compareAndSet(var, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + "\t" + "---init.....");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t" + "---init.....over");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "------其它线程正在初始化");
}
}
}


/**
* 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
*/
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
Var var = new Var();
for (int i = 1; i <= 5; i++) {
new Thread(() -> var.init(var), String.valueOf(i)).start();
}
}
}

2.4.5、原子操作增强类原理深度解析

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

  • 热点商品点赞计算器,点赞数加加统计,不要求实时精确
  • 一个很大的 List ,里面都是 int 类型,如何实现加加,说说思路

2.4.5.1、点赞计数器,看看性能

  • LongAdder只能用来计算加法,且从零开始计算
  • LongAccumulator提供了自定义的函数操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加乘等

import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;

public class LongAccumulatorDemo {

LongAdder longAdder = new LongAdder();

public void add_LongAdder() {
longAdder.increment();
}

//LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y,0);
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
@Override
public long applyAsLong(long left, long right) {
return left - right;
}
}, 777);

public void add_LongAccumulator() {
longAccumulator.accumulate(1);
}

public static void main(String[] args) {
LongAccumulatorDemo demo = new LongAccumulatorDemo();

demo.add_LongAccumulator();
demo.add_LongAccumulator();
System.out.println(demo.longAccumulator.longValue());
}
}

2.4.5.2、LongAdderAPIDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LongAdderAPIDemo {
public static void main(String[] args) {
LongAdder longAdder = new LongAdder();

longAdder.increment();
longAdder.increment();
longAdder.increment();

System.out.println(longAdder.longValue());

// 从2开始相乘
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x * y, 2);

longAccumulator.accumulate(1); // 2
longAccumulator.accumulate(2); // 4
longAccumulator.accumulate(3); // 12

System.out.println(longAccumulator.longValue());
}
}

2.4.5.3、LongAdder高性能对比Code演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
class ClickNumberNet {
int number = 0;

public synchronized void clickBySync() {
number++;
}

AtomicLong atomicLong = new AtomicLong(0);

public void clickByAtomicLong() {
atomicLong.incrementAndGet();
}

LongAdder longAdder = new LongAdder();

public void clickByLongAdder() {
longAdder.increment();
}

LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);

public void clickByLongAccumulator() {
longAccumulator.accumulate(1);
}
}

/**
* 50个线程,每个线程100W次,总点赞数出来
*/
public class LongAdderDemo {
public static void main(String[] args) throws InterruptedException {
ClickNumberNet clickNumberNet = new ClickNumberNet();

long startTime;
long endTime;
int count = 50;
CountDownLatch countDownLatch = new CountDownLatch(count);
CountDownLatch countDownLatch2 = new CountDownLatch(count);
CountDownLatch countDownLatch3 = new CountDownLatch(count);
CountDownLatch countDownLatch4 = new CountDownLatch(count);


startTime = System.currentTimeMillis();
for (int i = 1; i <= count; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickBySync();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickBySync result: " + clickNumberNet.number);

startTime = System.currentTimeMillis();
for (int i = 1; i <= count; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByAtomicLong();
}
} finally {
countDownLatch2.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByAtomicLong result: " + clickNumberNet.atomicLong);

startTime = System.currentTimeMillis();
for (int i = 1; i <= count; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByLongAdder();
}
} finally {
countDownLatch3.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAdder result: " + clickNumberNet.longAdder.sum());

startTime = System.currentTimeMillis();
for (int i = 1; i <= count; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByLongAccumulator();
}
} finally {
countDownLatch4.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAccumulator result: " + clickNumberNet.longAccumulator.longValue());
}
}
1
2
3
4
----costTime: 1586 毫秒	 clickBySync result: 50000000
----costTime: 1051 毫秒 clickByAtomicLong result: 50000000
----costTime: 170 毫秒 clickByLongAdder result: 50000000
----costTime: 135 毫秒 clickByLongAccumulator result: 50000000

2.4.5.4、源码、原理分析

架构

LongAdder是Striped64的子类

1
2
3
public class LongAdder extends Striped64 implements Serializable

abstract class Striped64 extends Number

剩下两罗汉

1
2
Striped64
Number

原理(LongAdder为什么这么快)

Striped64有几个比较重要的成员函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Number of CPUS, to place bound on table size        CPU数量,即cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
cells数组,为2的幂,2,4,8,16.....,方便以后位运算
*/
transient volatile Cell[] cells;

/**基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;

/**创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;

最重要的2个

Striped64中一些变量或者方法的定义
  • base:类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
  • collide:表示扩容意向,false一定不会扩容,true可能会扩容。
  • cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态1:表示其他线程已经持有了锁
  • casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
  • NCPU:当前计算机CPU数量,CelI数组扩容时会使用到
  • getProbe():获取当前线程的hash值
  • advanceProbe():重置当前线程的hash值
Cell

java.util.concurrent.atomicStriped64的一个内部类

LongAdder为什么这么快

LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

*sum()*会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去, 从而降级更新热点

$$
value=Base+\sum_{i=n}^{n}{Cell[i]}
$$

内部有一个base变量,一个Cell[]数组。

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

源码解读深度分析

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

1
longAdder.increment()
add(1L)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void add(long x) {
// as 是Striped64中的cells数组属性
// b 是Striped64中的base属性
// v 是当前线程hash到的Cell中存储的值
// m 是cells的长度减一,hash时作为掩码使用
// a 是当前线程hash到的Cell
Cell[] as; long b, v; int m; Cell a;
// 首先第一个线程((as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base的值,且只有当cas更新失败的时候,才会走到if中
// 条件①:cells不为空,说明出现过竞争,Cell[]已创建
// 条件②:cas操作base属性失败,说明其他现在先一步修改了base正在出现竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true无竞争 false表示竞争激烈,多个线程hash到同一个Cell,可能要扩容
boolean uncontended = true;
// 条件①:cells为空,说明正在出现竞争,是从上面的条件②过来的
// 条件②:as第一次初始化成功数组长度为2,条件不会成立
// 条件③:当前线程所在的Cell为空,说明当前线程还没有更新过Cell,应初始化一个Cell
// 条件④:更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是线程中的threadLocalRandomProbe字段(通过反射获取)
// 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意去修改它)
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended); // 调用Striped64中的方法处理
}
}
longAccumulate

该方法首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支

  • 分支①:Cell[]数组已经初始化
  • 分支②:Cell[]数组未初始化(首次新建)
  • 分支③:Cell[]数组正在初始化中
未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 条件①:将cellsBusy赋值为0
// 条件②:和下面的cells==as一样,通过docblue check确保只会new一个cell数组,否则有可能再次new一个cell数组,上一个线程对应数组中的值将会被篡改
// 条件③:casCellsBusy()方法通过CAS占有锁,将cellsBusy的值改为1
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 初始化完成,跳出循环标记
boolean init = false;
try { // Initialize table
if (cells == as) { // 与上面条件②一样,使用docblue check检查只会创建一个cell数组
Cell[] rs = new Cell[2]; // 新建一个大小为2的Cell数组
// 新建一个新的Cell元素,value是x值,默认值是1
// h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思。
rs[h & 1] = new Cell(x);
cells = rs; // 将新建的rs对象赋值给cells
init = true; // 将初始标记设置为true
}
} finally {
cellsBusy = 0; // 将cellsBusy赋值为0,释放CAS锁
}
if (init)
break; // 初始完成跳出循环
}
cells正在初始化,则尝试直接在基数base上进行累加操作
1
2
3
4
5
// 多个线程尝试CAS修改失败的线程会走到这个分支
// 该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
Cell数组不再为空且可能存在Cell数组扩容
  • branch①:当前数组下标元素未初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // 当前线程的hash值运算后映射得到Cell单元为null,说明cell没有被使用
    if ((a = as[(n - 1) & h]) == null) {
    // Cell数组没有正在扩容
    if (cellsBusy == 0) { // Try to attach new Cell
    // 创建一个Cell单元
    Cell r = new Cell(x); // Optimistically create
    // 尝试加锁,成功后cellsBusy为1
    if (cellsBusy == 0 && casCellsBusy()) {
    // 创建标记,默认false
    boolean created = false;
    try { // Recheck under lock
    Cell[] rs; int m, j;
    if ((rs = cells) != null &&
    (m = rs.length) > 0 &&
    rs[j = (m - 1) & h] == null) {// 有锁的情况下在检查一遍之前的判断条件是否满足,将Cell单元添加到Cell[]数组上
    rs[j] = r;
    created = true; // 创建标记为true
    }
    } finally {
    cellsBusy = 0; // 最后解锁
    }
    if (created)
    break; // 创建成功跳出循环
    // 如果无法赋值成功则跳出循环
    continue; // Slot is now non-empty
    }
    }
    collide = false;
    }
  • branch②:当前线程竞争失败,重新计算hash,重新循环

    1
    2
    3
    4
    5
    // wasUncontended表示cells初始化之后,当前线程竞争修改失败
    // wasUncontended=false,这里只是重新设置了这个值为true
    // 紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
    else if (!wasUncontended) // CAS already known to fail
    wasUncontended = true; // Continue after rehash
  • branch③:尝试对当前数组元素进行修改数据,修改成功跳出循环

    1
    2
    3
    4
    // 说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
    fn.applyAsLong(v, x))))
    break;
  • branch④:扩容前置检查

    1
    2
    3
    4
    // 如果n大于cpu的最大核数,或者cells与es不是同一个,不可扩容
    // 通过修改下面的h = advanceProbe(h)方法修改线程的probe在重新尝试
    else if (n >= NCPU || cells != as)
    collide = false; // At max size or stale
  • branch⑤:扩展意向collide值设置为true

    1
    2
    3
    4
    // 如果扩容意向collide为false则修改他为true,然后重新计算当前线程的hash值继续循环
    // 如果当前数组的长度已经大于了CPU的最大核数,就会再次设置扩容意向collide = false(上一步)
    else if (!collide)
    collide = true;
  • branch⑥:扩容操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 当前cellsBusy为0并且通过cas将cellsBusy设置为1(抢到了锁)
    else if (cellsBusy == 0 && casCellsBusy()) {
    try {
    // 当前数组和最先赋值的as是同一个,代表没有被其他线程扩容过
    if (cells == as) { // Expand table unless stale
    // 按照位左移1位来操作,扩容大小为之前容量的两倍
    Cell[] rs = new Cell[n << 1];
    for (int i = 0; i < n; ++i)
    // 扩容后再将之前数组的元素拷贝到新数组中
    rs[i] = as[i];
    cells = rs;
    }
    } finally {
    // 设置cellsBusy为0释放锁
    cellsBusy = 0;
    }
    // 设置扩容状态,继续循环
    collide = false;
    continue; // Retry with expanded table
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// 参数x:需要增加的值,默认是1
// 参数fn:默认传递的是null
// 参数wasUncontended:竞争标识,如果是false代表有竞争。只有cells初始化完成之后,并且当前线程CAS竞争修改失败,才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h; // 存储线程的probe值
if ((h = getProbe()) == 0) { // 如果getProbe()方法返回0,说明随机数未生成
// 使用ThreadLocalRandom.current()为当前线程重新计算一个hash值,并强制初始化
ThreadLocalRandom.current(); // force initialization
// 重新获取probe值,hash值被重置了就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
h = getProbe();
// 重新计算了当前线程的hash后认为此次不算是一次竞争,都还未初始化,肯定还不存在竞争激烈wasUncontended竞争状态为true
wasUncontended = true;
}
// 如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看做是扩容意向
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// 将cells赋值给as,但是第一次进入cells==null,所以进入下一个分支
if ((as = cells) != null && (n = as.length) > 0) {
// 当前线程的hash值运算后映射得到Cell单元为null,说明cell没有被使用
if ((a = as[(n - 1) & h]) == null) {
// Cell数组没有正在扩容
if (cellsBusy == 0) { // Try to attach new Cell
// 创建一个Cell单元
Cell r = new Cell(x); // Optimistically create
// 尝试加锁,成功后cellsBusy为1
if (cellsBusy == 0 && casCellsBusy()) {
// 创建标记,默认false
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {// 有锁的情况下在检查一遍之前的判断条件是否满足,将Cell单元添加到Cell[]数组上
rs[j] = r;
created = true; // 创建标记为true
}
} finally {
cellsBusy = 0; // 最后解锁
}
if (created)
break; // 创建成功跳出循环
// 如果无法赋值成功则跳出循环
continue; // Slot is now non-empty
}
}
collide = false;
}
// wasUncontended表示cells初始化之后,当前线程竞争修改失败
// wasUncontended=false,这里只是重新设置了这个值为true
// 紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果n大于cpu的最大核数,或者cells与es不是同一个,不可扩容
// 通过修改下面的h = advanceProbe(h)方法修改线程的probe在重新尝试
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 如果扩容意向collide为false则修改他为true,然后重新计算当前线程的hash值继续循环
// 如果当前数组的长度已经大于了CPU的最大核数,就会再次设置扩容意向collide = false(上一步)
else if (!collide)
collide = true;
// 当前cellsBusy为0并且通过cas将cellsBusy设置为1(抢到了锁)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 当前数组和最先赋值的as是同一个,代表没有被其他线程扩容过
if (cells == as) { // Expand table unless stale
// 按照位左移1位来操作,扩容大小为之前容量的两倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
// 扩容后再将之前数组的元素拷贝到新数组中
rs[i] = as[i];
cells = rs;
}
} finally {
// 设置cellsBusy为0释放锁
cellsBusy = 0;
}
// 设置扩容状态,继续循环
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 条件①:将cellsBusy赋值为0
// 条件②:和下面的cells==as一样,通过docblue check确保只会new一个cell数组,否则有可能再次new一个cell数组,上一个线程对应数组中的值将会被篡改
// 条件③:casCellsBusy()方法通过CAS占有锁,将cellsBusy的值改为1
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 初始化完成,跳出循环标记
boolean init = false;
try { // Initialize table
if (cells == as) { // 与上面条件②一样,使用docblue check检查只会创建一个cell数组
Cell[] rs = new Cell[2]; // 新建一个大小为2的Cell数组
// 新建一个新的Cell元素,value是x值,默认值是1
// h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思
rs[h & 1] = new Cell(x);
cells = rs; // 将新建的rs对象赋值给cells
init = true; // 将初始标记设置为true
}
} finally {
cellsBusy = 0; // 将cellsBusy赋值为0,释放CAS锁
}
if (init)
break; // 初始完成跳出循环
}
// 多个线程尝试CAS修改失败的线程会走到这个分支
// 该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

LongAdder.longAccumulate方法

sum

sum()会将所有Cell数组中的value和base累加作为返回值。 核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

为啥在并发情况下sum的值不精确

sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。

首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。 其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

2.4.5.5、使用总结

  • AtomicLong
    • 线程安全,可允许一些性能损耗,要求高精度时可使用
    • 保证精度,性能代价
    • AtomicLong是多个线程针对单个热点值value进行原子操作
  • LongAdder
    • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
    • 保证性能,精度代价
    • LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作

2.4.6、总结

2.4.6.1、AtomicLong

  • 原理
    • CAS+自旋
    • incrementAndGet
  • 场景
    • 低并发下的全局计算
    • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
  • 缺陷
    • 高并发后性能急剧下降
    • AtomicLong的自旋会成为瓶颈
    • N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

2.4.6.2、LongAdder

  • 原理
    • CAS+Base+Cell数组分散
    • 空间换时间并分散了热点数据
  • 场景
    • 高并发下的全局计算
  • 缺陷
    • sum求和后还有计算线程修改结果的话,最后结果不够准确

原子操作类之18罗汉增强
https://xiaoyu72.com/articles/7b64a63b/
Author
XiaoYu
Posted on
June 17, 2022
Updated on
August 28, 2023
Licensed under