一、CAS概述CAS (compareAndSwap),是一种无锁原子算法,也就是映射到操作系统就是一条CPU指令,是原子操作。其实现方式是基于硬件平台的汇编指令,在intel的CPU中,使用的是cmpxchg指令,就是说CAS是靠硬件实现的,从而在硬件层面提高效率。
当多个线程同时使用CAS 操做一个变量时,最多只有一个会胜出,并成功更新,其他均会失败。失败的线程不会挂起,仅是被告知失败,而且容许再次尝试(自旋),固然也容许实现的线程放弃操做。基于这样的原理,CAS 操做即便没有锁,也能够避免其余线程对当前线程的干扰。
二、底层实现其实,CAS操作原本就是两个原子操作的,但是归功于硬件指令集的发展,让硬件保证这个从语义上看起来需要屡次操作的CAS行为只经过一条处理器指令就能完成(原子指令的定义)。常见的原子指令有:
测试并设置(Tetst-and-Set)
获取并增长(Fetch-and-Increment)
比较并交换(Compare-and-Swap)
加载连接/条件存储(Load-Linked/Store-Conditional)
CPU实现原子指令的方式总线锁定
总线锁定就是处理器使用了总线锁,总线锁就是使用处理器提供的一个 LOCK# 信号,当处理器操作共享变量,在BUS总线上发出一个LOCK信号,其它的处理器的请求将会被阻塞,该处理器能够独占共享内存。总线锁定在阻塞其它处理器获取该共享变量的操作请求时,也可能会导致大量阻塞,从而增加系统的性能开销。
缓存锁定
缓存锁定是指变量缓存在处理器的缓存行当中,LOCK期间操作锁定,当该处理器对缓存中的共享变量进行了操作,其他处理器会有个嗅探机制,阻止同时修改两个以上处理器缓存的内存区域数据,当其余处理器回写已被锁定的缓存行的数据时,会使缓存行无效。待其他线程读取时会重新从主内存中读取最新的数据,基于 MESI 缓存一致性协议来实现的。
https://www.jianshu.com/p/563436e974a0
两种情况下处理器不能使用缓存锁定:
1.当操作的数据不能被缓存在处理器内部或者操作的数据跨越多个缓存行时,处理器会调用总线锁定
2.一些处理器不支持
CAS是由Unsafe类实现的,Java没法直接访问底层操做系统,而是经过本地(native)方法来访问,因此启用了一个Unsafe类进行硬件级别的原子操作。
以AtomicInteger 类调用incrementAndGet()方法实现原子性的自增为例:
在Unsafe类的getAndAddInt方法中主要是看compareAndSwapInt方法:
可见Unsafe类中的compareAndSwapInt是一个native本地方法
具体实现的虚拟机(HotSpot)中去查看C++源码:
查看Atomic::cmpxchg函数的具体实现,这个本地方法的最终实现在openjdk的如下位置:openjdk-7-fcs-src-b147-27jun2011\openjdk\hotspot\src\oscpu\windowsx86\vm\ atomicwindowsx86.inline.hpp(对应于windows操作系统,X86处理器)。下面是对应于intel x86处理器的源代码的片段,asm是C++中的一个关键字,用于在C++源码中内嵌汇编语言
// Adding a lock prefix to an instruction on MP machine// VC++ doesn't like the lock prefix to be on a single line// so we can't insert a label after the lock prefix.// By emitting a lock prefix, we can define a label after it.#define LOCK_IF_MP(mp) __asm cmp mp, 0 \ __asm je L0 \ __asm _emit 0xF0 \ __asm L0:inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp)//根据cpu类型加锁 cmpxchg dword ptr [edx], ecx//原子指令 }}
可见最后这个cmpxchg是单条操作指令,这条汇编语言能够直接操做内存进行数据交换,实现CAS最终目的。
可以看到这里有一个LOCK_IF_MP,作用是若是多处理器,在指令前加上LOCK前缀,由于在单处理器中,是不会存在缓存不一致的问题的,全部线程都在一个CPU上跑,使用同一个缓存区,也就不存在缓存与主内存不一致的问题,不会形成可见性问题。
(缓存在CPU上,主内存不在CPU上,CPU是经过缓存去读取主内存的,每一个CPU对应一个缓存,不一样缓存对应不一样CPU,这里要结合前面的JMM模型和硬件架构理一理)
然而在多核处理器中,须要遵循缓存一致性协议通知其余处理器更新本身的缓存。
Lock在这里的做用:
在cmpxchg执行期间,锁住内存地址[edx],其余处理器不能访问该内存,保证原子性。
(这个就是保证CAS原子性的关键所在)
写内存屏障,保证每一个线程的本地空间与主存一致。
禁止cmpxchg与先后任何指令重排序,防止指令重排序。
CAS缺点
CAS虽然高效地解决了原子操作,可是仍是存在一些缺陷的,主要表如今三个方面:
1.自旋时间太长
若是CAS一直不成功呢?这种状况绝对有可能发生,若是自旋CAS长时间地不成功,则会给CPU带来很是大的开销。在JUC中有些地方就限制了CAS自旋的次数,例如BlockingQueue的SynchronousQueue。
2.只能保证一个共享变量原子操做
看了CAS的实现就知道这只能针对一个共享变量,若是是多个共享变量就只能使用锁了,固然若是你有办法把多个变量整成一个变量,利用CAS也不错。例如读写锁中state的高低位。(https://www.cnblogs.com/wait-pigblog/p/9350569.html)
3.ABA问题
CAS须要检查操做值有没有发生改变,若是没有发生改变则更新。可是存在这样一种状况:若是一个值原来是A,变成了B,而后又变成了A,那么在CAS检查的时候会发现没有改变,可是实质上它已经发生了改变,只是又回到了原来的值而已,这就是所谓的ABA问题。对于ABA问题其解决方案是加上版本号,即在每一个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A,采用AtomicStampedRdference类能够实现这个方案。
1.入门实例基于无锁解决取款并发问题
public class Test5 { public static void main(String[] args) { Account account = new AccountCas(10000); Account.demo(account); }}class AccountCas implements Account{ private AtomicInteger balance; public AccountCas(int balance) { this.balance = new AtomicInteger(balance); } @Override public Integer getBalance() { return balance.get(); } @Override public void withdraw(Integer amount) { while (true) { //1.获取余额最新值 int prev = balance.get(); //2.减去取款金额 int next = prev - amount; //3.真正修改余额 if (balance.compareAndSet(prev, next)) { break; } } }}interface Account { // 获取余额 Integer getBalance(); // 取款 void withdraw(Integer amount); /** * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作 * 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(Account account) { List
小结 :其实这里并不是无锁,而是使用了乐观锁。
线程的最新值与线程工作内存中的值进行比较,如果相等为true则修改为最新值,反之不修改
CAS底层实现是由CPU硬件实现的:是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
CAS必须借助volatile才能读取到共享变量的最新值来实现比较并交换操作的效果
思考 :无锁(乐观锁)的效率为何要更高些?
1.即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
2.需要注意的是,在无所情况下,需要额外多余CPU的支持,如果CPU不够会由于分不到时间片仍然进入到就绪状态,依然会导致上下文切换。
小结 :结合CAS和volatile可以实现无锁并发,适用于线程数少,多核CPU的场景a.CAS是基于乐观锁的思想,不怕其他的线程修改共享变量,在需要使用时再去考虑是否发生变化满足一致性b.Synchronized是基于悲观锁的思想,以最悲观的打算防止其他线程修改共享变量,等到自己释放锁其他线程才能有机会修改共享变量。c.CAS体现的是无锁并发(不需要加锁来保证线程安全),无阻塞并发(保持线程不断运行,不发生上下文切换);总之线程不会因为加锁使得进入阻塞状态,但如果竞争激烈时可以频繁重试,此时反而会影响效率。
2.原子类2.1原子整数JUC并发包下的工具类:
AtomicBooleanAtomicIntegerAtomicLong
对常用的类型进行封装,使得对其操作满足原子性。
public class Test6 { public static void main(String[] args) { AtomicInteger i = new AtomicInteger(0); //保证了操作的原子性,不会被打断 System.out.println(i.incrementAndGet());// ++i System.out.println(i.getAndIncrement());//i++ System.out.println(i.getAndAdd(3));//先获取值2再加3,因此这里输出为2 i.updateAndGet(value -> value * 12);//取得当前的i值按照括号内的规则进行操作 System.out.println(i.get()); }}
//updateAndGet的底层实现也是使用CASpublic final int updateAndGet(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return next;}
2.2原子引用由于有时需要保证一些非基本类型的原子操作,因此需要原子引用类型AtomicReferenceAtomicMarkableReferenceAtomicStampedReference
public class Test7 { public static void main(String[] args) { DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000"))); }}class DecimalAccountCas implements DecimalAccount{ private AtomicReference
可见其基本用法其实和原子整数差不多,都是以CAS为基础的。
2.3ABA问题将一个值改变后又改回去,CAS发现不了这个过程,如果是基本类型无影响,但是对于引用类型产生影响,解决方法就是加上版本信息
示例:
public class Test8 { static AtomicReference
加上版本号(AtomicStampedReference)
public class Test9 { static AtomicStampedReference
AtomicMarkableReference
有时候并不关心引用变量更改了几次,只是单纯的关心是否更改过,此时就应该使用 AtomicMarkableReference
public class Test10 { public static void main(String[] args) throws InterruptedException { GarbageBag bag = new GarbageBag("已满"); AtomicMarkableReference
注意清洁工线程没有更换垃圾袋,而是将其清空并修改了垃圾袋状态,因此最后主线程无法成功更换,而且最终垃圾袋也没有变过。
2.4原子数组AtomicIntegerArray AtomicLongArray AtomicReferenceArray
public class Test11 { public static void main(String[] args) { demo( () -> new int[10], (array) -> array.length, (array, index) -> array[index] ++, array -> System.out.println(Arrays.toString(array)) ); demo( () -> new AtomicIntegerArray(10), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array) ); } /** 参数1,提供数组、可以是线程不安全数组或线程安全数组 参数2,获取数组长度的方法 参数3,自增方法,回传 array, index 参数4,打印数组的方法 */ //supplier提供者,无中生有 ()->结果 //function函数 一个参数一个结果 (参数)->结果 ; BiFunction (参数1, 参数2) -> 结果 //consumer消费者 一个参数没有结果 (参数)->void ; BiConsumer (参数1, 参数2) -> void private static
可见原子数组可以保证数组当中各元素在多个线程访问时依然保持线程安全性。
2.5字段更新器对某个对象的属性或者成员变量进行保护,满足线程安全性。
AtomicReferenceFieldUpdater // 域 字段AtomicIntegerFieldUpdaterAtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
public class Test12 { public static void main(String[] args) { Student stu = new Student(); //三个参数为:对象的类;修改字段的类型;修改的字段的名称 AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name"); updater.compareAndSet(stu, null, "小明"); System.out.println(stu.toString()); }}class Student{ volatile String name; @Override public String toString() { return "Student{" + "name='" + name + '\'' + '}'; }}
这个API的使用需要注意参数,对于对象的属性在多线程情况下进行保护。
2.6原子累加器public class Test13 { static int cnt = 0; public static void main(String[] args) { for (int i = 0; i < 5; i++) { demo(() -> new LongAdder(), adder -> adder.increment()); } for (int i = 0; i < 5; i++) { demo(() -> new AtomicLong(), adder -> adder.getAndIncrement()); } for (int i = 0; i < 5; i++) { Object o = new Object(); long start = System.nanoTime(); List
修改切换频率
if (cnt % 100 == 0) Thread.yield();
经过实验可以发现使用累加器性能高很多,在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
深入理解LongAdder
LongAdder类的关键域:
// 累加单元数组, 懒惰初始化transient volatile Cell[] cells;// 基础值, 如果没有竞争, 则用 cas 累加这个域transient volatile long base;// 在 cells 创建或扩容时, 置为 1, 表示加锁transient volatile int cellsBusy;
Cell类的定义:
// 防止缓存行伪共享@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值 final boolean cas(long prev, long next) { return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next); } // 省略不重要代码 }
缓存行伪共享
从缓存方面开始:
cpu到
需要的时钟周期(cycle)
寄存器
1
L1
3-4
L2
10-20
L3
40-45
内存
120-240
根据计算机组成原理的知识,CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。而缓存实际上是以缓存行为单位的,每个缓存行对应着一块内存,一般是64字节。
缓存的加入会造成数据副本产生,即同一份数据会被缓存在不同核心的缓存行当中。然而CPU要保证数据一致性,如果某个CPU核心更改了数据,其它CPU核心对应的整个缓存行必须失效。
LongAdder中采用的是Cell累加方式,Cell是数组形式,在内存当中是连续存储的,一个Cell为24字节(16字节对象头以及8字节的value),所以一般来说一个缓存行可以存下2个Cell对象。此时就会出现问题:
CPU核心0需要修改Cell[0],CPU核心1要修改Cell[1],无论哪一个核心修改成功都会导致另一方对应的缓存行失效。比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
@sun.misc.Contended就解决了这个问题,其原理就是在使用此注解后的对象或者字段的前后各增加128字节大小的padding,从而使得将对象预读至缓存时占用的是不同的缓存行,从而不会造成对方的缓存行失效。
累加源码分析
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); }}
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base }}
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum;}