【Java】集合类 - 并发容器(Concurrent Container)

Posted by 西维蜀黍 on 2019-03-18, Last Modified on 2023-02-28

背景

java.util.concurrent包下面为我们提供了丰富的类和接口供我们开发出支持高并发、线程安全的程序。

  • 同步容器类:Vector,HashTable和Collections.SynchronizedXXX()
  • 并发容器类:包括ConcurrentHashMap,CopyOnWrite容器以及阻塞队列。

同步容器类

同步容器类包括:Vector和HashTable,这两个类是早期JDK的一部分。此外还包括了JDK1.2之后提供的同步封装器方法Collections.synchronizedXxx()。

同步容器类的问题

我们都知道Vector是线程安全的容器,但当我们对Vector进行多线程操作时,可能会得到意想之外的结果。

比如迭代操作:

for(int i=0;i<vector.size();i++){
    doSomething(vector.get(i));
}

例如两个线程A和B,A线程对容器进行迭代操作的时候,B线程可能对容器进行了删除操作,这样就会导致A线程的迭代操作抛出IndexOutOfBoundsException。而这显然不是我们想得到的结果。

所以,为了解决不可靠迭代的问题,我们只能牺牲伸缩性为容器加锁,代码如下:

synchronized(vector){
    for(int i=0;i<vector.size();i++){
        doSomething(vector.get(i));
    }
}

加锁防止了迭代期间对容器的修改,但这同时也降低了容器的并发性。当容器很大时,其他线程要一直等待迭代结束,因此性能不高。

迭代器与CoucurrentModificationException

上面的例子我们举了Vector这种“古老”的容器。

但是实际上很多“现代”的容器类也并没有避免上面所提到的问题。比如:for-each循环和Iterator。

当我们调用Iterator类的hasNext和next方法对容器进行迭代时,若其他线程并发修改该容器,那么此时容器表现出的行为是“及时失败”的,即抛出CoucurrentModificationException。

因此,为了避免以上问题,我们又不得不为容器加锁,或者对容器进行“克隆”,在副本上进行操作,但这样做的性能显然是不高的。

隐藏的迭代器

这里要特别提到的是有些类的方法会隐藏的调用容器的迭代器,这往往是很容易被我们忽视的。看下面列子:

private final Set<Integer> set = new HashSet<Integer>();

public synchronized void add(Integer i){set.add(i);}

public void addMore(){
    for(int i=0;i<10;i++)
        add(i);
    System.out.println("this is set:"+set);
}

以上代码能看出对容器的迭代操作吗?乍看没有。

但实际上最后一段“this is set:”+set的代码影藏的调用了set的toString方法,而toString方法又会影藏的调用容器的迭代方法,这样该类的addMore方法同样可能会抛出CoucurrentModificationException异常。

更严重的问题是该类并不是线程安全的,我们可以使用SynchronizedSet来包装set类,对同步代码进行封装,这样就避免了问题。此外我们还需要特别注意:容器的equals、hashCode方法都会隐式的进行迭代操作,当一个容器作为另一个容器的健值或者元素时就会出现这种情况。同样,containsAll,removeAll等方法以及把容器当做参数的构造函数,都会进行迭代操作。所有这些间接迭代的操作都可能导致程序抛出CoucurrentModificationException。

并发容器(Concurrent Container)

针对于同步容器的巨大缺陷,java.util.concurrent中提供了并发容器。并发容器包注重以下特性:

  • 根据具体场景进行设计,尽量避免使用锁,提高容器的并发访问性。
  • 并发容器定义了一些线程安全的复合操作。
  • 并发容器在迭代时,可以不封闭在synchronized中。但是未必每次看到的都是"最新的、当前的"数据。如果说将迭代操作包装在synchronized中,可以达到"串行"的并发安全性,那么并发容器的迭代达到了"脏读"。

List接口和Set接口

CopyOnWriteArrayList和CopyOnWriteArraySet

CopyOnWriteArrayList和CopyOnWriteArraySet分别代替ArrayList和HashSet,主要是在遍历操作为主的情况下来代替同步的List和同步的Set。

实现并发主要可以通过两种方法:

  • 加锁
  • “克隆"容器对象
CopyOnWriteArrayList

CopyOnWrite容器用于替代同步的List,它提供了更好的并发性,并且在使用时不需要加锁或者拷贝容器。

CopyOnWriteArrayList的主要应用就是迭代容器操作多而修改少的场景,迭代时也不会抛出CoucurrentModificationException异常。

CopyOnWrite容器的底层实现是在迭代操作前都会复制一个底层数组,这保证了多线程下的并发性和一致性,但是当底层数组的数据量比较大的时候,就需要效率问题了。

CopyOnWriteArraySet

基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。

ConcurrentSkipListSet

目标:代替TreeSet。

原理:内部基于ConcurrentSkipListMap实现。

Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。

Query接口

Java 5.0之后新增加了Queue(队列)和BlockingQueue(阻塞队列)。

队列分为有界队列无界队列。无界队列会因为数据的累计造成内存溢出,使用时要小心。

BlockingQueue接口(阻塞队列)

BlockingQueue扩展了Queue,增加了可阻塞的插入和获取操作,如果队列为空,那么获取操作将阻塞,直到队列中有一个可用的元素。如果队列已满,那么插入操作就阻塞,直到队列中出现可用的空间。

阻塞队列有很多种实现,最常用的有ArrayBlockingQueue和LinkedBlockingQueue。阻塞队列提供了阻塞的take和put方法,如果队列已满,那么put方法将等待队列有空间时在执行插入操作;如果队列为空,那么take方法将一直阻塞直到有元素可取。有界队列是一个强大的资源管理器,它能抑制产生过多的工作项,使程序更加健壮。

原理

通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒

实现类
  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
  • PriorityBlockingQueue:按优先级排序的队列,一个可以按照优先级排序的阻塞队列,当你希望按照某种顺序来排序时非常有用。
  • DelayQueue:无界阻塞队列,只有在延迟期满时才能从中提取元素。
ArrayBlockingQueue类

一个由数组支持的有界阻塞队列。ArrayBlockingQueue是一个典型的"有界缓存区”,由于数组大小固定,所以一旦创建了这样的“缓存区",就不能再增加其容量。

阻塞条件:试图向已满队列中执行put元素会导致操作受阻塞;试图从空队列中take元素将导致类似阻塞。

非阻塞队列

ConcurrentLinkedQuerue

原理:基于链表实现的非阻塞的FIFO队列(LinkedList的并发版本)

ConcurrentLinkedQueue是使用非阻塞的方式实现的基于链接节点的无界的线程安全队列,性能非常好。

多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

ConcurrentLinkedQuerue是Query实现,是一个先进先出的队列。一般的Queue实现中操作不会阻塞,如果队列为空,那么取元素的操作将返回空。Queue一般用LinkedList实现的,因为去掉了List的随机访问需求,因此并发性更好。

Deque接口(双端队列)

Deque是一种双端队列,它支持在两端进行插入和删除元素,Deque继承自Queue。

BlockingDeque就是支持阻塞的双端队列,常用的实现有LinkedBlockingDeque。双端队列最典型的应用是工作密取,每个消费者都有各自的双端队列,它适用于既是生产者又是消费者的场景。

Map接口

ConcurrentHashMap

Java 5.0增加的ConcurrentHashMap几乎是最常用的并发容器了。

与HashTable相比,ConcurrentHashMap不仅线程安全,而且还支持高并发、高吞吐。

ConcurrentHashMap的底层实现使用了分段锁技术,而不是独占锁,它允许多个线程可以同时访问和修改容器,而不会抛出CoucurrentModificationException异常,极大地提高了效率。

在这里要说明的是ConcurrentHashMap返回的迭代器是弱一致性的,而不是及时失败的。另外size、isEmpty等需要在整个容器上执行的方法其返回值也是弱一致性的,是近似值而非准确值。所以,在实际使用中要对此做权衡。与同步容器和加锁机制相比,ConcurrentHashMap优势明显,是我们优先考虑的容器。

ConcurrentSkipListMap

  • 对应的非并发容器:TreeMap
  • 目标:代替synchronizedSortedMap(TreeMap)
  • 原理:Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过”空间来换取时间”的一个算法。ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。

Reference