Java并发实现原理:JDK源码剖析

余春龙

前言

  • 多进程、多线程和多协程
  • 在Java中,并发就是多线程模式。

第1章 多线程基础

  • 如果强制杀死线程,则线程中所使用的资源,例如文件描述符、网络连接等不能正常关闭。
  • 答案是不会的。在C语言中,main(..)函数退出后,整个程序也就退出了,但在Java中并非如此。
  • 在t1.start()前面加一行代码t1.setDaemon(true)。当main(..)函数退出后,线程t1就会退出,整个进程也会退出。
  • 当所有的非守护线程退出后,整个JVM进程就会退出

1.2 InterruptedException()函数与interrupt()函数

  • 实际上,只有那些声明了会抛出InterruptedException 的函数才会抛出异常,也就是下面这些常用的函数:
  • 能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING;而像synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是BLOCKED。
  • [插图]
  • 如果没有调用任何的阻塞函数,线程只会在RUNNING和READY之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,开发者基本没有机会介入,除了可以调用yield()函数,放弃对CPU的占用。
  • t.interrupted()的精确含义是“唤醒轻量级阻塞”
  • 二者的区别在于,前者只是读取中断状态,不修改状态;后者不仅读取中断状态,还会重置中断标志位。

1.3 synchronized关键字

  • synchronized关键字其实是“给某个对象加了把锁”
  • 对于非静态成员函数,锁其实是加在对象a上面的;对于静态成员函数,锁是加在A.class上面的
  • 一个静态成员函数和一个非静态成员函数,都加了synchronized关键字,分别被两个线程调用,它们是否互斥?很显然,因为是两把不同的锁,所以不会互斥。
  • 锁就是要实现线程对资源的访问控制,保证同一时间只能有一个线程去访问某一个资源
  • (3)这个对象还得维护一个thread id list,记录其他所有阻塞的、等待拿这个锁的线程(也就是记录所有在外边等待的游客)。在当前线程释放锁之后(也就是把state从1改回0),从这个thread id list里面取一个线程唤醒。
  • synchronized关键字可以加在任何对象的成员上面。这意味着,这个对象既是共享资源,同时也具备“锁”的功能!
  • 在对象头里,有一块数据叫Mark Word。在64位机器上,Mark Word是8字节(64位)的,这64位中有2个重要字段:锁标志位和占用该锁的thread ID。
  • synchronized还会有偏向、自旋等优化策略

1.4 wait()与notify()

  • 内存队列本身要加锁,才能实现线程安全
  • 办法2:用一个阻塞队列,当取不到或者放不进去数据的时候,入队/出队函数本身就是阻塞的。这也就是BlockingQueue的实现
  • 在Java 里面,wait()和notify()是Object 的成员函数,是基础中的基础。
  • 要回答为什么wait()和notify()必须和synchronized一起使用?
  • 在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁。
  • 在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,去重新拿锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁。
  • 原因就是wait()和notify()所作用的对象和synchronized所作用的对象是同一个,只能有一个对象,无法区分队列空和列队满两个条件。这正是Condition要解决的问题。

1.5 volatile关键字

  • 这是因为JVM的规范并没有要求64位的long或者double的写入是原子的。在32位的机器上,一个64位变量的写入可能被拆分成两个32位的写操作来执行。
  • 也就是“最终一致性”,不是“强一致性”。
  • 我们所说的“内存可见性”,指的是“写完之后立即对其他线程可见”,它的反面不是“不可见”,而是“稍后才能可见”。
  • DCL(Double Checking Locking)
  • volatile的三重功效:64位写入的原子性、内存可见性和禁止重排序

1.6 JMM与happen-before

  • 因为存在CPU缓存一致性协议,例如MESI,多个CPU之间的缓存不会出现不同步的问题,不会有“内存可见性”问题。
  • Store Buffer、Load Buffer和L1之间却是异步的。
  • 每个逻辑CPU都有自己的缓存,这些缓存和主内存之间不是完全同步的。
  • 1)编译器重排序。对于没有先后依赖关系的语句,编译器可以重新调整语句的执行顺序。(2)CPU指令重排序。在指令级别,让没有依赖关系的多条指令并行。(3)CPU内存重排序。CPU有自己的缓存,指令的执行顺序和写入主内存的顺序不完全一致。
  • 原因是线程1先执行X=1,后执行a=Y,但此时X=1还在自己的Store Buffer里面,没有及时写入主内存中。所以,线程2看到的X还是0。线程2的道理与此相
  • 指令没有重排序,是写入内存的操作被延迟了,也就是内存被重排序了,这就造成内存可见性问题。
  • 从编译器和CPU的角度来看,希望尽最大可能进行重排序,提升运行效率。
  • 无论什么语言,站在编译器和CPU的角度来说,不管怎么重排序,单线程程序的执行结果不能改变,这就是单线程程序的重排序规则
  • 编译器和CPU只能保证每个线程的as-if-serial语义。线程之间的数据依赖和相互影响,需要编译器和CPU的上层来确定。上层要告知编译器和CPU在多线程场景下什么时候可以重排序,什么时候不能重排序。
  • 编译器和CPU遵守了as-if-serial语义,保证每个线程内部都是“看似完全串行的”。
  • 这个模型就是一套规范,对上,是JVM和开发者之间的协定;对下,是JVM和编译器、CPU之间的协定。
  • JMM引入了happen-before,使用happen-before描述两个操作之间的内存可见性
  • happen-before只确保如果A在B之前执行,则A的执行结果必须对B可见
  • 通俗来讲,就是JMM对编译器和CPU 来说,volatile 变量不能重排序;非volatile 变量可以任意重排序。
  • happen-before还具有传递性,即若A happen-before B,B happen-before C,则A happen-before C。
  • 与volatile一样,synchronized同样具有happen-before语义
  • happen-before的传递性
  • Java中的volatile关键字不仅具有内存可见性,还会禁止volatile变量写入和非volatile变量写入的重排序

1.7 内存屏障

  • 为了禁止编译器重排序和CPU 重排序,在编译器和CPU 层面都有对应的指令,也就是内存屏障(Memory Barrier)。这也正是JMM和happen-before规则的底层实现原理。
  • 但从JDK 8开始,Java在Unsafe类中提供了三个内存屏障函数
  • (1)LoadLoad:禁止读和读的重排序。 (2)StoreStore:禁止写和写的重排序。 (3)LoadStore:禁止读和写的重排序。 (4)StoreLoad:禁止写和读的重排序。
  • loadFence=LoadLoad+LoadStorestoreFence=StoreStore+LoadStorefullFence=loadFence+storeFence+StoreLoad
  • 这里只探讨为了实现volatile关键字的语义的一种参考做法:(1)在volatile写操作的前面插入一个StoreStore屏障。保证volatile写操作不会和之前的写操作重排序。(2)在volatile写操作的后面插入一个StoreLoad屏障。保证volatile写操作不会和之后的读操作重排序。(3)在volatile读操作的后面插入一个LoadLoad屏障+LoadStore屏障。保证volatile读操作不会和之后的读操作、写操作重排序。

1.8 final关键字

  • 对于构造函数溢出,通俗来讲,就是一个对象的构造并不是“原子的”,当一个线程正在构造对象时,另外一个线程却可以读到未构造好的“一半对象”。
  • (1)对final域的写(构造函数内部),happen-before于后续对final域所在对象的读。(2)对final域所在对象的读,happen-before于后续对final域的读。
  • (1)单线程中的每个操作,happen-before于该线程中任意后续操作。(2)对volatile变量的写,happen-before于后续对这个变量的读。(3)对synchronized的解锁,happen-before于后续对这个锁的加锁。(4)对final变量的写,happen-before于final域对象的读,happen-before于后续对final变量的读。
  • 从底向上看volatile背后的原理

1.9 综合应用:无锁编程

  • 一写一读的无锁队列:内存屏障
  • 一写多读的无锁队列:volatile关键字
  • Disruptor的RingBuffer之所以可以做到完全无锁,也是因为“单线程写”,这是“前提的前提”
  • 多写多读的无锁队列:CAS
  • 同内存屏障一样,CAS(Compare And Set)也是CPU提供的一种原子指令。

2.1 AtomicInteger和AtomicLong

  • 对于乐观锁,作者认为数据发生并发冲突的概率比较小,所以读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS(Compare And Set)。
  • Unsafe类是整个Concurrent包的基础,里面所有函数都是native的。
  • 它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量本身。
  • 当一个线程拿不到锁的时候,有以下两种基本的等待策略。策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。策略2:不放弃CPU,空转,不断重试,也就是所谓的“自旋”。
  • 这两种策略并不是互斥的,可以结合使用。如果拿不到锁,先自旋几圈;如果自旋还拿不到锁,再阻塞,synchronized关键字就是这样的实现策略。

2.2 AtomicBoolean和AtomicReference

  • 在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)
  • 对于用int型来代替的,在入参的时候,将boolean类型转换成int类型;在返回值的时候,将int类型转换成boolean类型
  • 这依赖double类型提供的一对double类型和long类型互转的函数,这点在介绍DoubleAdder的时候会提到。

2.3 AtomicStampedReference和AtomicMarkableReference

  • 如果另外一个线程把变量的值从A改为B,再从B改回到A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是所谓的ABA问题。
  • 要解决ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是AtomicStamped-Reference做的事情
  • 当expectedReference==对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。

2.4 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater

  • 但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和AtomicReferenceFieldUpdater。
  • 要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装类)

2.5 AtomicIntegerArray、AtomicLongArray和Atomic-ReferenceArray

  • 这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。
  • 其底层的CAS函数用的还是compareAndSwapInt,但是把数组下标i转化成对应的内存偏移量
  • base表示数组的首地址的位置,scale表示一个数组元素的大小,i的偏移量则等于:i*scale+base。

2.6 Striped64与LongAdder

  • 多个线程同时对一个变量进行CAS操作,在高并发的场景下仍不够快
  • 把一个Long型拆成一个base变量外加多个Cell,每个Cell包装了一个Long型变量。当多个线程并发累加的时候,如果并发度低,就直接加到base变量上;如果并发度高,冲突大,平摊到这些Cell上。在最后取值的时候,再把base和这些Cell求sum运算。
  • 在sum求和函数中,并没有对cells[]数组加锁。也就是说,一边有线程对其执行求和操作,一边还有线程修改数组里的值,也就是最终一致性,而不是强一致性。
  • 它适合高并发的统计场景,而不适合要对某个Long 型变量进行严格同步的场景。
  • 缓存与主内存进行数据交换的基本单位叫Cache Line(缓存行)
  • 虽然只修改了X变量,本应该只失效X变量的缓存,但Y、Z变量也随之失效。Y、Z变量的数据没有修改,本应该很好地被CPU1和CPU2共享,却没做到,这就是所谓的“伪共享问题”
  • Cell[]数组的大小始终是2的整数次方,在运行中会不断扩容,每次扩容都是增长2倍。

第3章 Lock与Condition

  • “可重入锁”是指当一个线程调用object.lock()拿到锁,进入互斥区后,再次调用object.lock(),仍然可以拿到该锁。很显然,通常的锁都要设计成可重入的,否则就会发生死锁。
  • lock()不能被中断,对应的lockInterruptibly()可以被中断。
  • Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁
  • 默认设置的是非公平锁,其实是为了提高效率,减少线程切换。
  • Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS)
  • 为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素: ① 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,要确保线程安全,也就是会用到CAS。 ② 需要记录当前是哪个线程持有锁。 ③ 需要底层支持对一个线程进行阻塞或唤醒操作。 ④ 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS。
  • state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性
  • 针对要素③,在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。
  • 尤其是unpark(Thread t),它实现了一个线程对另外一个线程的“精准唤醒”。前面讲到的wait()/notify(),notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。
  • 在AQS中利用双向链表和CAS实现了一个阻塞队列
  • 阻塞队列是整个AQS核心中的核心
  • tryAcquire(..)是一个虚函数,也就是再次尝试拿锁
  • 就是只有当c==0(没有线程持有锁),并且排在队列的第1个时(即当队列中没有其他线程的时候),才去抢锁,否则继续排队,这才叫“公平”!
  • addWaiter(..)函数,就是为当前线程生成一个Node,然后把Node放入双向链表的尾部
  • 线程一旦进入acquireQueued(..)就会被无限期阻塞,即使有其他线程调用interrupt()函数也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(..)返回。
  • 虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。
  • 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。

3.2 读写锁

  • 和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间可以不用互斥了。
  • 当使用ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。
  • ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已
  • readerLock和writerLock实际共用同一个sync对象
  • 也就是把state 变量拆成两半,低16位,用来记录写锁。但同一时间既然只能有一个线程写,为什么还需要16位呢?这是因为一个写线程可能多次重入。例如,低16位的值等于5,表示一个写线程重入了5次
  • 这个地方的设计很巧妙,为什么要把一个int类型变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态呢?这是因为无法用一次CAS 同时操作两个int变量,所以用了一个int型的高16位和低16位分别表示读锁和写锁的状态。
  • acquire/release、acquireShared/releaseShared 是AQS里面的两对模板方法
  • 对于读线程的非公平,要做一些“约束”。当发现队列的第1个元素是写线程的时候,读线程也要阻塞一下,不能“肆无忌惮”地直接去抢
  • 因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个for循环+CAS操作不断重试

3.3 Condition

  • 在讲多线程基础的时候,强调wait()/notify()必须和synchronized一起使用,Condition也是如此,必须和Lock一起使用。
  • 读写锁中的ReadLock 是不支持Condition 的,读写锁的写锁和互斥锁都支持Condition。
  • ConditionObject内部也有一个双向链表组成的队列

3.4 StampedLock

  • StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。
  • 要说明的是,这三行关键代码对顺序非常敏感,不能有重排序。因为state 变量已经是volatile,所以可以禁止重排序,但stamp并不是volatile的。为此,在validate(stamp)函数里面插入内存屏障。
  • 因为写锁只有一个bit位,所以写锁是不可重入的。
  • 在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列,并且进入阻塞状态。但在StampedLock中,CAS state失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。

4.1 Semaphore

  • 资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。

4.2 CountDownLatch

  • 考虑一个场景:一个主线程要等待10个Worker 线程工作完毕才退出,就能使用CountDownLatch来实现。
  • 因为是基于AQS 阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直累减state,减到0后一次性唤醒所有线程。

4.4 Exchanger

  • Exchanger用于线程之间交换数据

4.5 Phaser

  • CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser 可以在运行期间动态地调整要同步的线程个数。

5.1 BlockingQueue

  • LinkedBlockingQueue是一种基于单向链表的阻塞队列
  • SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(..),线程会阻塞;直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然。

5.3 CopyOnWrite

  • CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。

5.5 ConcurrentHashMap

  • 为了提高并发度,在JDK7中,一个HashMap被拆分为多个子HashMap。每一个子HashMap称作一个Segment,多个线程操作多个Segment相互独立
  • JDK 8的实现有很大变化,首先是没有了分段锁,所有数据都放在一个大的HashMap中;其次是引入了红黑树

6.3 ThreadPoolExector

  • 首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。
  • 在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量
  • 线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。
  • 从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移
  • (1)前者不会清空任务队列,会等所有任务执行完成,后者再清空任务队列。 (2)前者只会中断空闲的线程,后者会中断所有线程。

6.5 ScheduledThreadPoolExecutor

  • AtFixedRate:按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。

第7章 ForkJoinPool

  • 也可以将ForkJoinPool看作一个单机版的Map/Reduce,只不过这里的并行不是多台机器并行计算,而是多个线程并行计算。

7.3 工作窃取队列

  • 所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。