④JAVA 并发安全容器
④JAVA 并发安全容器
学习核心
- ConcurrentHashMap(重点)
- CopyOnWriteArrayList
- ConcurrentLinkedQueue
- Java中的阻塞队列 BlockingQueue(重点)
- Fork/Join框架
参考资料
扩展
概念引入
1.Java中的同步容器和并发容器
同步容器
同步容器分类
容器中的同步容器:Java提供了一系列容器,其中同步容器是用于确保线程安全而引入的,其主要包括两种分类:
- 分类1:通过引入同步机制构建的容器:Hashtable、Vector、Stack
- Hashtable:实现Map接口(和HashMap类似),使用synchronized
- Vector:数组结构(和ArrayList类似),使用synchronized
- Stack:实际上是继承于Vector
- 分类2:通过Collectios工具类的静态工厂方法构建的类(Collectios.synchronizedXXX等方法)
同步容器的问题
同步容器的同步原理就是在其 get
、set
、size
等主要方法上用 synchronized
修饰。 synchronized
可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块
性能问题:synchronized的互斥同步会产生阻塞和唤醒线程的开销,因此性能相对比没有引入synchronized的容器要低
安全问题:同步容器不一定能保证绝对安全(例如在做一些复合操作(非原子操作)的时候:迭代、跳转、条件运算等)
同步容器问题:不安全的示例
public class VectorDemo {
static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
vector.clear();
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
};
thread1.start();
thread2.start();
while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}
}
以上程序执行时可能会出现数组越界错误。Vector
是线程安全的,那为什么还会报这个错?
这是因为,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:当某个线程在某个时刻执行这句时:
for(int i=0;i<vector.size();i++)
vector.get(i);
假若此时 vector 的 size 方法返回的是 10,i 的值为 9。然后另外一个线程执行了这句:将下标为 9 的元素删除了,那么通过 get 方法访问下标为 9 的元素肯定就会出问题了
for(int i=0;i<vector.size();i++)
vector.remove(i);
✔ 安全示例
因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:
public class VectorDemo2 {
static Vector<Integer> vector = new Vector<Integer>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) { //进行额外的同步
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
}
};
thread1.start();
thread2.start();
while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}
}
ConcurrentModificationException
异常 : 在对Vector 等容器并发地进行迭代修改时,会抛ConcurrentModificationException
2.如何保证容器是线程安全的?ConcurrentHashMap 如何实现高效地线程安全?
思路核心
针对集合的线程安全问题,要考虑线程安全问题,一般有如下思路:(每种思路都是一种从粗粒度到细粒度实现线程安全的演化)
- 使用线程安全的同步容器(例如Hashtable、Vector等)
- 使用工具类包装(例如Collection.synchronizedMap),将容器包装为线程安全
- 使用JUC(Java并发包的线程安全容器)
参考
Java 提供了不同层面的线程安全支持。在传统集合框架内部,除了 Hashtable 等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),可以调用 Collections 工具类提供的包装方法,来获取一个同步的包装容器(如 Collections.synchronizedMap),但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。
另外,更加普遍的选择是利用并发包提供的线程安全容器类,它提供了:
- 各种并发容器,比如 ConcurrentHashMap、CopyOnWriteArrayList。
- 各种线程安全队列(Queue/Deque),如 ArrayBlockingQueue、SynchronousQueue。
- 各种有序容器的线程安全版本等。
具体保证线程安全的方式,包括有从简单的 synchronize 方式,到基于更加精细化的,比如基于分离锁实现的 ConcurrentHashMap 等并发实现等。具体选择要看开发的场景需求,总体来说,并发包内提供的容器通用场景,远优于早期的简单同步实现
思路扩展
理解基本的线程安全工具
理解传统集合框架并发编程中 Map 存在的问题,清楚简单同步方式的不足
梳理并发包内,尤其是 ConcurrentHashMap 采取了哪些方法来提高并发表现(掌握 ConcurrentHashMap 自身的演进、原理分析)
并发容器
同步容器将所有对容器状态的访问都串行化,以保证线程安全性,这种策略会严重降低并发性
Java 1.5 后提供了多种并发容器,使用并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。J.U.C 包中提供了几个非常有用的并发容器作为线程安全的容器:
并发容器 | 对应的普通容器 | 描述 |
---|---|---|
ConcurrentHashMap | HashMap | Java 1.8 之前采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性;Java 1.8 之后基于 CAS 实现。 |
ConcurrentSkipListMap | SortedMap | 基于跳表实现的 |
CopyOnWriteArrayList | ArrayList | |
CopyOnWriteArraySet | Set | 基于 CopyOnWriteArrayList 实现。 |
ConcurrentSkipListSet | SortedSet | 基于 ConcurrentSkipListMap 实现。 |
ConcurrentLinkedQueue | Queue | 线程安全的无界队列。底层采用单链表。支持 FIFO。 |
ConcurrentLinkedDeque | Deque | 线程安全的无界双端队列。底层采用双向链表。支持 FIFO 和 FILO。 |
ArrayBlockingQueue | Queue | 数组实现的阻塞队列。 |
LinkedBlockingQueue | Queue | 链表实现的阻塞队列。 |
LinkedBlockingDeque | Deque | 双向链表实现的双端阻塞队列。 |
J.U.C 包中提供的并发容器命名一般分为三类:Concurrent、CopyOnWrite、Blocking
Concurrent
- 这类型的锁竞争相对于
CopyOnWrite
要高一些,但写操作代价要小一些 - 此外,
Concurrent
往往提供了较低的遍历一致性,即:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。代价就是,在获取容器大小size()
,容器是否为空等方法,不一定完全精确,但这是为了获取并发吞吐量的设计取舍,可以理解。与之相比,如果是使用同步容器,就会出现fail-fast
问题,即:检测到容器在遍历过程中发生了修改,则抛出ConcurrentModificationException
,不再继续遍历
- 这类型的锁竞争相对于
CopyOnWrite
- 一个线程写,多个线程读。读操作时不加锁,写操作时通过在副本上加锁保证并发安全,空间开销较大Blocking
- 内部实现一般是基于锁,提供阻塞队列的能力
容器选择
并发场景下的Map
- 如果对数据有强一致要求,则需使用
Hashtable
; - 在大部分场景通常都是弱一致性的情况下,使用
ConcurrentHashMap
即可; - 如果数据量在千万级别,且存在大量增删改操作,则可以考虑使用
ConcurrentSkipListMap
- 如果对数据有强一致要求,则需使用
并发场景下的List
- 读多写少用
CopyOnWriteArrayList
- 写多读少用
ConcurrentLinkedQueue
,但由于是无界的,要有容量限制,避免无限膨胀,导致内存溢出
- 读多写少用
ConcurrentHashMap
1.为什么要引入ConcurrentHashMap?
结合两个方面分析:线程安全、性能(对比HashMap和Hashtable)
在多线程场景下使用HashMap是不安全的;虽然可以使用线程安全的Hashtable,但是其底层是通过synchronized对方法进行加锁(锁的是整个对象,即在进行put等修改Hash表操作的时候锁的是整个Hash表),在实际执行过程中运行效率表现较低,现已不推荐使用,因此引入ConcurrentHashMap
细分段技术:解决锁竞争问题
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问
结合并发优化思路演进来梳理
- HashMap是线程不安全的,无法支持多线程并发场景,因此考虑使用线程安全同步的容器Hashtable
- Hashtable虽然可以保证线程安全,但是它的实现机制是基于synchronized机制构建(对put、get等方法加锁),就会导致所有并发操作都会竞争同一把锁,一个线程在进行同步操作时,其他线程只能等待,大大降低并发效率,在很多场景下并不推荐使用,因此进一步引入ConcurrentHashMap
- ConcurrentHashMap是在更细粒度上去解决并发问题,其不同的演进版本也是对并发操作效率提升相应的优化(从数据结构+并发机制去拆解分析)
- JDK1.7:采用分段锁机制,将ConcurrentHashMap拆分为多个Segment,每个Segment分段类似于一个Hashtable。当线程访问操作时会先定位元素归属于哪个Segment进而确认加锁、访问等操作
- 数据结构:数组+单链表
- 并发机制:分段锁(细化锁粒度,降低阻塞,提高并发性)
- JDK1.8:采用synchronized+CAS锁机制,基本结构基于JDK1.8的HashMap进行构建(数组+链表+红黑树)
- 数据结构:数组+单链表+红黑树
- 并发机制:取消分段锁,基于CAS+synchronized实现
- JDK1.7:采用分段锁机制,将ConcurrentHashMap拆分为多个Segment,每个Segment分段类似于一个Hashtable。当线程访问操作时会先定位元素归属于哪个Segment进而确认加锁、访问等操作
2.ConcurrentHashMap原理分析
JDK1.7-ConcurrentHashMap
在JDK1.5~1.7版本,Java使用了分段锁机制实现ConcurrentHashMap
简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此ConcurrentHashMap在多线程并发编程中可以实现多线程put操作
数据结构:Segment数组 + HashEntry数组结构组成
整个ConcurrentHashMap由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。简单理解就是,ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重 入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色:HashEntry则用于存储键值对数据。
初始化
初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的 HashEntry数组来实现的。
concurrencyLevel:并行级别、并发数、Segment 数。默认是16,即ConcurrentHashMap 有 16个Segments,所以理论上最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment
loadFactor:Segment 数组不可以扩容,这个负载因子是给每个 Segment 内部使用的
以new ConcurrentHashMap()为例:初始化完成,得到一个 Segment 数组
默认构造函数初始化完成后Segment 数组长度为 16,不可以扩容
Segment[i] 的默认大小为 2,负载因子是 0.75,得出值为1.5,取整后初始阈值为1,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
这里初始化了 segment[0],其他位置还是 null(至于为什么要初始化 segment[0],结合后面的内容分析)
当前 segmentShift 的值为 32-4=28,segmentMask为16-1=15、姑且把它们简单翻译为移位数和掩码、这两个值主要是用来定位Segment
定位Segment
根据前面的数据结构介绍可知,数据都是受到Segment分段锁保护的,所以插入或者获取元素的时候,必须要先定位到Segment。可以看到ConcurrentHashMap会首先使用 Wang/enkins hash的变种算法对元素的hashCode进行一次再散列。
todo hash方法
之所以进行再散列,目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率。
ConcurrentHashMap通过以下散列算法定位segment
todo 定位方法
ConcurrentHashMap操作 todo 源码分析
(1)get方法
Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。
get操作提效在于整个get方法不需要加锁,而是通过使用volatile关键字修饰共享变量来确保get操作的并发安全
get操作的高效之处在于整个get过程不需要加锁。我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHash-Map的get操作是如何做到不加锁不出问题的呢? 原因是它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前 Segement大小的count字段和用于存储值的HashEntry的value。
在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值的高位,而定位HashEntry直接使用的是再散列后的值。
(2)put方法
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位 置,然后将其放在HashEntry数组里。
是否需要扩容:在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈 值,则对数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap 是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
如何扩容:在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
(3)size方法
如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是可能累加前使用的count发生了变化,那么统计结果就不准了。所以,最安全的做法是在统计size的时候把所有Segment的put.remove和clean方法全部锁住,但是这种做法显然非常低效。因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试3次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put.remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size 前后比较modCount是否发生变化,从而得知容器的大小是否发生变化,
JDK1.8-ConcurrentHashMap
在JDK1.7之前,ConcurrentHashMap是通过分段锁机制来实现的,所以其最大并发度受Segment的个数限制。因此,在JDK1.8中,ConcurentHashMap的实现原理摒弃了这种设计,而是选择了与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS和synchronized实现。数据结构:结构上和 Java8 的 HashMap 基本上一样,不过它要保证线程安全性,所以在源码上确实要复杂一些。
数据结构
结构上和 Java8 的 HashMap 基本上一样,不过它要保证线程安全性,所以在源码上确实要复杂一些。
图中的节点有三种类型:(结合HashMap的数据结构对比记忆)
空元素:空着的位置代表当前还没有元素来填充
链表元素(绿色):就是和 HashMap 非常类似的拉链法结构,在每一个槽中会首先填入第一个节点,但是后续如果计算出相同的 Hash 值,以链表的形式往后进行延伸
红黑树结构(红黑):这是 Java7的ConcurrentHashMap 中所没有的结构,当第二种情况的链表长度大于某一个阈值(默认为8),且同时满足一定的容量要求的时候,ConcurrentHashMap 便会把这个链表从链表的形式转化为红黑树的形式,目的是进一步提高它的查找性能
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
...
}
Node节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}
每个Node是key-value形式,并定义next指针用于指向下一个节点
其中val、next 用volatile修饰,用于保证可见性
put方法
// put方法:核心是调用putVal方法
public V put(K key, V value) {
return putVal(key, value, false);
}
// putVal:源码解析
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// hash值计算
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组为空,则进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 找指定hash值对应的数组下标
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果该位置为空(槽点为空),则以CAS的方式放入新值
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash值等于MOVED代表在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 槽点上有值的情况
else {
V oldVal = null;
// 使用synchronized锁住当前槽点,确保并发安全(区分链表、红黑树的情况)
synchronized (f) {
// 如果是链表的形式(因此遍历链表节点,然后判断是否要进行覆盖)
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key已存在,则判断是否进行覆盖,返回
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果遍历到链表尾部还没有发现key,则将新值添加到链表末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树的形式
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用putTreeVal方法完成put操作
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 检查是否满足红黑树转化的要求(如果原来是链表的情况,新增元素之后需要校验是否超出阈值,如果超出需要将其转化为红黑树形式)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// putVal的返回值是添加前的旧值oldVal
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
结合ConcurrentHashMap的结构进行理解:put方法的核心是逐步根据当前槽点的状态做相应的处理(数组初始化、槽点是否为空、扩容、链表、红黑树),分析如下:
- 如果数组为空则进行初始化,随后根据hash定位槽点位置(即hash对应的数组下标位置)
- 如果槽点为空,则以CAS方式插入新值
- 判断是否需要扩容操作
- 如果槽点有值,则进一步判断如何执行put操作(拆分链表、红黑树两种类型进行出俩)
- 如果是链表,则遍历链表
- 存在指定key,进行覆盖
- 不存在key,直接在链表末尾插入数据
- 如果是红黑树
- 调用红黑树处理方法完成put操作
- 如果是链表,则遍历链表
- 红黑树转化:判断当前槽点状态是否满足红黑树转化要求(阈值判断:如果当前链表长度超出阈值则进行转化)
JDK1.8中的ConcurrentHashMap什么情况下链表转红黑树?
链表长度大于8,且Node数组长度大于64(阈值判断),可以结合HashMap去理解
如果Node数组长度较小(小于默认阈值MIN_TREEIFY_CAPACITY:64),则会先通过扩容数组容量为原来的2倍用于缓解单个链表长度过长的性能问题;
然后进一步判断链表长度是否超出阈值(默认8),如果满足两个条件则转红黑树
get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算hash值
int h = spread(key.hashCode());
// 如果数组为空,或者槽点数据为空,则说明key对应的value不存在,直接返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 头节点判断:如果匹配则直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果头节点hash<0,则说明是红黑树或者正在扩容,调用对应find方法查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法实现思路:
- 计算hash值,找到对应槽点
- 如果数组为空,或者槽点数据为空,则说明key对应的value不存在,直接返回null
- 如果槽点处的头节点(槽点位置)匹配,则返回该节点的值
- 如果槽点处的位置节点是红黑树或者正在扩容,则调用find方法查找(此处判断的条件是头节点hash<0,此处头节点的hash不是计算出来的,而是当做特殊节点处理进行设置,用于标记某种状态)
- 如果槽点处的位置节点是链表,则遍历链表查找
JDK1.7 VS JDK1.8的ConcurrentHashMap实现(*)
结合并发机制(线程安全实现方式)、Hash碰撞解决方法、并发度这三个维度扩展
- 并发机制
- JDK1.7:分段锁Segment
- JDK1.8:取消分段锁,引入synchronized + AQS + Node 确保线程安全,锁的粒度更细(synchronized只锁定链表或者红黑树的首节点)
- Hash碰撞解决方法(不同元素hash相同,定位到同一个位桶(数组下标))
- JDK1.7:拉链法
- JDK1.8:拉链法结合红黑树(链表长度超过阈值则将链表转为红黑树,提升检索效率)
- 并发度
- JDK1.7:最大并发数取决于Segment数组个数(默认是16)
- JDK1.8:最大并发数取决于Node数组大小
CopyOnWriteArrayList
从 JDK1.5 开始,Java 并发包里提供了使用 CopyOnWrite 机制实现的并发容器 CopyOnWriteArray-List 作为主要的并发 List,CopyOnWrite 的并发集合还包括 CopyOnWriteArraySet,其底层正是利用 CopyOnWriteArrayList 实现的。
CopyonWrite的含义
CopyOnWrite:当容器需要被修改的时候,不直接修改当前容器,而是先将当前容器进行 Copy,复制出一个新的容器,然后修改新的容器,完成修改之后,再将原容器的引用指向新的容器,以此完成了整个修改过程
CopyOnWriteArrayList:满足 CopyOnWrite 的ArrayList
适用场景:读多写少
适用于读多写少、且能够容忍读写的短暂不一致的场景
为了将读取的性能发挥到极致,CopyOnWriteArrayList读取是完全不用加锁的,并且写入也不会阻塞读取操作,也就是说你可以在写入的同时进行读取。
只有写入和写入之间需要进行同步,也就是不允许多个写入同时发生,所以会更慢一点。
读多写少:写入本身是会拷贝一份出来,会增加资源的消耗,同时多个写入之间需要进行同步,所以应该尽量的少。
特点:迭代期间允许修改集合内容
- ArrayList 在迭代期间如果修改集合的内容,会抛出Concurrent-ModificationException 异常的
- CopyOnWriteArrayList 的迭代器在迭代的时候,如果数组内容被修改了CopyOnWriteArrayList 不会报 ConcurrentModificationException 的异常,因为迭代器使用的依然是旧数组只不过迭代的内容可能已经过时了
缺点:
内存占用问题:因为 CopyOnwrite 的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,这一点会占用额外的内存空间。
数据一致性问题:由于 CopyOnWrite 容器的修改是先修改副本,所以这次修改对于其他线程来说,并不是实时能看到的,只有在修改完之后才能体现出来。如果你希望写入的的数据马上能被其他线程看到,CopyOnwrite 容器是不适用的。
理解此处CopyonWrite并发容器的意义(既然会有数据一致性问题为什么还要用这个并发容器):如果同时存在读写的情况,可能会出现读出来的内容不一定是最新的数据?既然无法保证读出来的数据是最新的数据,不适用一个普通的非并发容器内?
首先要明确CopyonWrite是为了解决并发问题而引入的概念,而基于多线程的场景下分析传统的非并发容器无法保证这一点,甚至还可能导致容器存入错误值或者容器本身运行出问题等情况
其次在一些真实的并发场景中,很多应用只要求不要读取到错误的值(就是一个中间值),对于其是新值还是旧值其实问题都不大(可以理解为新值生效的时机稍微延迟了一点,只要不是错误值就可以),因此基于上述两点,CopyonWrite的数据一致性问题其实可以忽略其业务影响
ConcurrentLinkedQueue
实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阳阻塞算法
- 使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现
- 非阻塞的实现方式则可以使用循环CAS的方式来实现
线程安全队列ConcurrentLinkedQueue是基于非阻塞的方式来实现的,核心是采用了“wait-free”算法(即CAS算法)来实现,该算法在 Michael&Scott算法上进行了一些修改。看下关键方法offer的源码:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
把目光放到整体的代码结构上,在检查完空判断之后,可以看到它整个是一个大的 for 循环,而且是一个非常明显的死循环。在这个循环中有一个非常亮眼的 p.casNext 方法,这个方法正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想
从源代码角度来看,整个入队过程主要做两件事情:
- 1)定位出尾节点
- 2)使用 CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。适合用在不需要阻塞功能,且并发不是特别剧烈的场景。
Java中的阻塞队列(BlockingQueue)
1.阻塞队列基本概念
阻塞队列(BlockingQueue):一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法
支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满
支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空
在阻塞队列不可用时,这两个附加操作提供了4种处理方式
抛出异常:当队列满时,如果再往队列里插入元素,会抛出llegalStateException("Queue full”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常
返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移 除方法,则是从队列里取出一个元素,如果没有则返回null
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者 线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队 列会阻塞住消费者线程,直到队列不为空
超时退出:当阳塞队列满时,如果牛产者线程往队列里插入元素,队列会阳塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出
2.Java中的阻塞队列
JDK 7提供了7个阻塞队列:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUEPriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 默认情况下元素采取自然顺序 升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。队列使用PriorityQueue来实现。队 列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。(常用在缓存有效期,定时任务调度等场景)
- SynchronousQueue:一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。队列本身并不存储任何元素,非常适合传递性场景
- LinkedTransferQueue:一个由链表结构组成的单向无界阻塞队列。它设计了一种直接在生产者和消费者之间传输元素的机制,称为“transfer”。当生产者调用transfer(e)方法时,它会阻塞直到有一个消费者接收该元素。适用于需要高效地在生产者和消费者之间直接传输数据的场景,尤其是当生产者和消费者之间的速度大致匹配时
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以 从队列的两端插入和移出元素。相比其他的阻塞队列,LinkedBlockingDegue多了addFirst、addLast、offer-First、offerLast、peekFirst和peekLast等方法。双向阻塞队列可以 运用在“工作窃取”模式中
扩展
此处场景应用可以通过【收快递】场景来理解
其他带缓冲的队列:相当于快递员把快递放在菜鸟驿站,然后用户自己去取。至于什么时候去取?有没有多个人一起去取?这点快递员并不关心
SynchronousQueue:基于阻塞队列的概念,快递员必须把快递一个个交到用户手上,每个用户必须等待前一个用户取完才能进行操作
3.阻塞队列实现的原理
如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢? 使用通知模式实现。所谓通知模式分析如下:
- 当消费者从空的队列获取元素时会阻塞住消费者,此时如果生产者放了一个元素进入队列,则需要通知阻塞住消费者当前有元素可取
- 同理当生产者往满的队列里添加元素时会阻塞住生产者当消费者消费了一个队列中的元素后,会通知生产者当前队列可用
通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现
Fork/Join框架
1.Fork/Join简介
Fork/oin框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
运行流程图分析如下:
2.工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。那么,为什么 需要使用工作窃取算法呢?
假如需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个 队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如A线程负责处理A 队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列 里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行
Fork/Join 并行计算的核心组件是 ForkJoinPool。ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能。ForkJoinPool 内部有多个任务队列,当通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。
如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。
ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。此处仅仅是简化后的原理,具体实现更为复杂需要参考源码理解
3.使用Fork/Join框架
基本概念
应用案例:实现1+2+3+4
通过一个简单的需求来使用Fork/Join框架,需求是:计算1+2+3+4的结果
使用Fork/Join框架首先要考虑到的是如何分割任务,如果希望每个子任务最多执行两个数的相加,那么设置分割的阈值是2,由于是4个数字相加,所以Fork/Join框架会把这个任务fork成两个子任务,子任务一负责计算1+2,子任务二负责计算3+4,然后再ioin两个子任务的结果。因为是有结果的任务,所以必须继承RecursiveTask,实现代码如下
// 定义计数任务
class CountTask extends RecursiveTask<Integer> {
// 设置阈值
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就直接计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if(canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
}else {
// 如果任务大于阈值,则分裂成两个子任务进行计算
int middle = (end + start) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并获取到相应的结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务:负责计算1+2+3+4
CountTask countTask = new CountTask(1, 4);
// 执行一个任务
Future<Integer> res = forkJoinPool.submit(countTask);
System.out.println(res.get());
}
}
ForkJoinTask:ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果
Fork/Join实现原理
Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务
- ForkJoinPool:分治任务的线程池
- ForkJoinTask:分治任务
(1)ForkJoinTask的fork方法实现原理
fork方法:当调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
// push方法将当前的任务存放在ForkJoinTask数组队列中,然后再调用ForkJoinPool的signalWork方法唤醒或者创建一个工作线程来执行任务
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
(2)ForkJoinTask的join方法实现原理
join方法:阻塞当前线程并等待获取结果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)
- 如果任务状态是已完成,则直接返回任务结果
- 如果任务状态是被取消,则直接抛出CancellationException
- 如果任务状态是抛出异常,则直接抛出对应的异常
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL
场景应用
场景应用选择
- 对于简单的并行任务,可以通过“线程池 +Future”的方案来解决;
- 如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;
- 批量的并行任务,则可以通过 CompletionService 来解决
(1)CompletableFuture
runAsync 和 supplyAsync 方法
CompletableFuture 提供了四个静态方法来创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync 方法不支持返回值
- supplyAsync 可以支持返回值
(2)CompletionStage
CompletionStage 接口可以清晰地描述任务之间的时序关系,如串行关系、并行关系、汇聚关系等
串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口
thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>
,这个接口里与 CompletionStage 相关的方法是 R apply(T t)
,这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage
而 thenAccept 系列方法里参数 consumer 的类型是接口 Consumer<T>
,这个接口里与 CompletionStage 相关的方法是 void accept(T t)
,这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>
thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>
这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的
并行关系
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
汇聚关系
CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系
CompletableFuture<String> f1 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f3 =
f1.applyToEither(f2,s -> s);
System.out.println(f3.join());
异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0
就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{} 来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
CompletableFuture<Integer>
f0 = CompletableFuture.
.supplyAsync(()->(7/0))
.thenApply(r->r*10);
System.out.println(f0.join());
CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
CompletableFuture<Integer>
f0 = CompletableFuture
.supplyAsync(()->7/0))
.thenApply(r->r*10)
.exceptionally(e->0);
System.out.println(f0.join());