【Java】集合类 - 并发容器(Concurrent Container)

Posted by 西维蜀黍 on 2019-03-18, Last Modified on 2023-02-28

背景

java.util.concurrent 包下面为我们提供了丰富的类和接口供我们开发出支持高并发、线程安全的程序。

  • 同步容器类:Vector,HashTable 和 Collections.SynchronizedXXX ()
  • 并发容器类:包括 ConcurrentHashMap,CopyOnWrite 容器以及阻塞队列。

同步容器类

同步容器类包括:Vector 和 HashTable,这两个类是早期 JDK 的一部分。此外还包括了 JDK1.2 之后提供的同步封装器方法 Collections.synchronizedXxx ()。

同步容器类的问题

我们都知道 Vector 是线程安全的容器,但当我们对 Vector 进行多线程操作时,可能会得到意想之外的结果。

比如迭代操作:

for(int i=0;i<vector.size();i++){
    doSomething(vector.get(i));
}

例如两个线程 A 和 B,A 线程对容器进行迭代操作的时候,B 线程可能对容器进行了删除操作,这样就会导致 A 线程的迭代操作抛出 IndexOutOfBoundsException。而这显然不是我们想得到的结果。

所以,为了解决不可靠迭代的问题,我们只能牺牲伸缩性为容器加锁,代码如下:

synchronized(vector){
    for(int i=0;i<vector.size();i++){
        doSomething(vector.get(i));
    }
}

加锁防止了迭代期间对容器的修改,但这同时也降低了容器的并发性。当容器很大时,其他线程要一直等待迭代结束,因此性能不高。

迭代器与 CoucurrentModificationException

上面的例子我们举了 Vector 这种 “古老” 的容器。

但是实际上很多 “现代” 的容器类也并没有避免上面所提到的问题。比如:for-each 循环和 Iterator。

当我们调用 Iterator 类的 hasNext 和 next 方法对容器进行迭代时,若其他线程并发修改该容器,那么此时容器表现出的行为是 “及时失败” 的,即抛出 CoucurrentModificationException。

因此,为了避免以上问题,我们又不得不为容器加锁,或者对容器进行 “克隆”,在副本上进行操作,但这样做的性能显然是不高的。

隐藏的迭代器

这里要特别提到的是有些类的方法会隐藏的调用容器的迭代器,这往往是很容易被我们忽视的。看下面列子:

private final Set<Integer> set = new HashSet<Integer>();

public synchronized void add(Integer i){set.add(i);}

public void addMore(){
    for(int i=0;i<10;i++)
        add(i);
    System.out.println("this is set:"+set);
}

以上代码能看出对容器的迭代操作吗?乍看没有。

但实际上最后一段 “this is set:”+set 的代码影藏的调用了 set 的 toString 方法,而 toString 方法又会影藏的调用容器的迭代方法,这样该类的 addMore 方法同样可能会抛出 CoucurrentModificationException 异常。

更严重的问题是该类并不是线程安全的,我们可以使用 SynchronizedSet 来包装 set 类,对同步代码进行封装,这样就避免了问题。此外我们还需要特别注意:容器的 equals、hashCode 方法都会隐式的进行迭代操作,当一个容器作为另一个容器的健值或者元素时就会出现这种情况。同样,containsAll,removeAll 等方法以及把容器当做参数的构造函数,都会进行迭代操作。所有这些间接迭代的操作都可能导致程序抛出 CoucurrentModificationException。

并发容器(Concurrent Container)

针对于同步容器的巨大缺陷,java.util.concurrent 中提供了并发容器。并发容器包注重以下特性:

  • 根据具体场景进行设计,尽量避免使用锁,提高容器的并发访问性。
  • 并发容器定义了一些线程安全的复合操作。
  • 并发容器在迭代时,可以不封闭在 synchronized 中。但是未必每次看到的都是 "最新的、当前的" 数据。如果说将迭代操作包装在 synchronized 中,可以达到 "串行" 的并发安全性,那么并发容器的迭代达到了 "脏读"。

List 接口和 Set 接口

CopyOnWriteArrayList 和 CopyOnWriteArraySet

CopyOnWriteArrayList 和 CopyOnWriteArraySet 分别代替 ArrayList 和 HashSet,主要是在遍历操作为主的情况下来代替同步的 List 和同步的 Set。

实现并发主要可以通过两种方法:

  • 加锁
  • “克隆 " 容器对象
CopyOnWriteArrayList

CopyOnWrite 容器用于替代同步的 List,它提供了更好的并发性,并且在使用时不需要加锁或者拷贝容器。

CopyOnWriteArrayList 的主要应用就是迭代容器操作多而修改少的场景,迭代时也不会抛出 CoucurrentModificationException 异常。

CopyOnWrite 容器的底层实现是在迭代操作前都会复制一个底层数组,这保证了多线程下的并发性和一致性,但是当底层数组的数据量比较大的时候,就需要效率问题了。

CopyOnWriteArraySet

基于 CopyOnWriteArrayList 实现,其唯一的不同是在 add 时调用的是 CopyOnWriteArrayList 的 addIfAbsent 方法,其遍历当前 Object 数组,如 Object 数组中已有了当前元素,则直接返回,如果没有则放入 Object 数组的尾部,并返回。

ConcurrentSkipListSet

目标:代替 TreeSet。

原理:内部基于 ConcurrentSkipListMap 实现。

Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照 Key 值升序的。

Query 接口

Java 5.0 之后新增加了 Queue(队列)和 BlockingQueue(阻塞队列)。

队列分为有界队列无界队列。无界队列会因为数据的累计造成内存溢出,使用时要小心。

BlockingQueue 接口(阻塞队列)

BlockingQueue 扩展了 Queue,增加了可阻塞的插入和获取操作,如果队列为空,那么获取操作将阻塞,直到队列中有一个可用的元素。如果队列已满,那么插入操作就阻塞,直到队列中出现可用的空间。

阻塞队列有很多种实现,最常用的有 ArrayBlockingQueue 和 LinkedBlockingQueue。阻塞队列提供了阻塞的 take 和 put 方法,如果队列已满,那么 put 方法将等待队列有空间时在执行插入操作;如果队列为空,那么 take 方法将一直阻塞直到有元素可取。有界队列是一个强大的资源管理器,它能抑制产生过多的工作项,使程序更加健壮。

原理

通过 ReentrantLock 实现线程安全,通过 Condition 实现阻塞和唤醒

实现类
  • LinkedBlockingQueue:基于链表实现的可阻塞的 FIFO 队列
  • ArrayBlockingQueue:基于数组实现的可阻塞的 FIFO 队列
  • PriorityBlockingQueue:按优先级排序的队列,一个可以按照优先级排序的阻塞队列,当你希望按照某种顺序来排序时非常有用。
  • DelayQueue:无界阻塞队列,只有在延迟期满时才能从中提取元素。
ArrayBlockingQueue 类

一个由数组支持的有界阻塞队列。ArrayBlockingQueue 是一个典型的 "有界缓存区”,由于数组大小固定,所以一旦创建了这样的 “缓存区",就不能再增加其容量。

阻塞条件:试图向已满队列中执行 put 元素会导致操作受阻塞;试图从空队列中 take 元素将导致类似阻塞。

非阻塞队列

ConcurrentLinkedQuerue

原理:基于链表实现的非阻塞的 FIFO 队列(LinkedList 的并发版本)

ConcurrentLinkedQueue 是使用非阻塞的方式实现的基于链接节点的无界的线程安全队列,性能非常好。

多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

ConcurrentLinkedQuerue 是 Query 实现,是一个先进先出的队列。一般的 Queue 实现中操作不会阻塞,如果队列为空,那么取元素的操作将返回空。Queue 一般用 LinkedList 实现的,因为去掉了 List 的随机访问需求,因此并发性更好。

Deque 接口(双端队列)

Deque 是一种双端队列,它支持在两端进行插入和删除元素,Deque 继承自 Queue。

BlockingDeque 就是支持阻塞的双端队列,常用的实现有 LinkedBlockingDeque。双端队列最典型的应用是工作密取,每个消费者都有各自的双端队列,它适用于既是生产者又是消费者的场景。

Map 接口

ConcurrentHashMap

Java 5.0 增加的 ConcurrentHashMap 几乎是最常用的并发容器了。

与 HashTable 相比,ConcurrentHashMap 不仅线程安全,而且还支持高并发、高吞吐。

ConcurrentHashMap 的底层实现使用了分段锁技术,而不是独占锁,它允许多个线程可以同时访问和修改容器,而不会抛出 CoucurrentModificationException 异常,极大地提高了效率。

在这里要说明的是 ConcurrentHashMap 返回的迭代器是弱一致性的,而不是及时失败的。另外 size、isEmpty 等需要在整个容器上执行的方法其返回值也是弱一致性的,是近似值而非准确值。所以,在实际使用中要对此做权衡。与同步容器和加锁机制相比,ConcurrentHashMap 优势明显,是我们优先考虑的容器。

ConcurrentSkipListMap

  • 对应的非并发容器:TreeMap
  • 目标:代替 synchronizedSortedMap (TreeMap)
  • 原理:Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照 Key 值升序的。Skip list 让已排序的数据分布在多层链表中,以 0-1 随机数决定一个数据的向上攀升与否,通过” 空间来换取时间” 的一个算法。ConcurrentSkipListMap 提供了一种线程安全的并发访问的排序映射表。内部是 SkipList(跳表)结构实现,在理论上能够在 O(log(n))时间内完成查找、插入、删除操作。

Reference