Java并行流实战:解决parallelStream中的null和大小不一致问题
共 4436字,需浏览 9分钟
·
2024-05-12 16:08
来源:blog.csdn.net/itigoitie
推荐:https://t.zsxq.com/RFTws
本文讲了一些 parallelStream 踩坑指南,出现 null 元素,输出 list 的 size 不符合预期等问题,希望能引起大家思考。
奇怪情形
使用 parallelStream() 出现的一些奇怪情形,如果已经遇到过,那么恭喜你,你已经成为了一名老司机。如果还没有遇到过,那么也恭喜你,你已经掌握了应有的经验。
有时候,为了使用多线程加快代码运行速度,我们会使用parallelStream()
来代替stream()
,我们先来看一段示例代码:
List<Integer> integerList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
integerList.add(i);
}
List<Integer> list = new ArrayList<>();
integerList.parallelStream().forEach(list::add);
System.out.println(list);
我们的预期是,输出的list
能够是 0-99 的一共 99 个数字,顺序不限。
然而,人生就是这样,就连我们如此简单的预期,也往往无法得到满足。。。
经过多次运行代码,会发现一些很奇怪的现象:
-
输出的 list的 size() 不符合预期,有时候是 100,有时候是 99,甚至是 97等; -
输出的 list 中有时含有 null 元素,数量不定,有时甚至达到十几个之多; -
有时会出现 IndexOutOfBounds 异常; -
由于以上问题的出现,可能会导致业务代码中出现 NPE;
原因探究
为什么会出现以上问题呢?我们来逐个分析一下各个问题出现的原因。
输出的list的size()不符合预期
【现象】输出的 list 的 size() 不符合预期,有时候会比预想的要少,也就是出现了元素丢失的现象;
【原因】
-
我们来看一下ArrayList的add方法:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
-
add方法是分两步进行的,第一步是通过
ensureCapacityInternal(size + 1);
进行扩容,第二步是通过elementData[size++] = e;
添加新元素。在添加新元素时,先读取size的值,然后执行elementData[size] = e;
,将e添加到size的位置,在执行size++;
,有三个步骤,并不是原子操作。 -
因此存在内存可见性问题。当线程 A 从内存读取 size 后,可能这是还没来得及继续执行,线程B就迅速地从内存中读取了size,并且将5写入到了size处,然后size++,然后线程A才将6写入到了size处,将 size 加 1,然后写入内存。在这种情况下,线程B的更新就丢失了,出现了元素丢失的现象。
输出的list中有时含有null元素
【现象】输出的list中有时含有null元素,数量不定,有时甚至达到十几个之多;
【原因】
-
null 元素产生跟元素数据丢失类似,也是由于 elementData[size++] = e;
这一步并不是原子操作导致的。 -
假设存在三个线程,线程A、线程B、线程C。三个线程同时开始执行,初始 size 值为 1。 -
线程A首先读取size值为1,然后线程B读取size值为1,然后线程C读取size为1,然后线程B将数据添加到size位置,然后线程A将数据也添加到了size位置,覆盖了B的更新,然后线程A将size更新为2;然后线程B将size更新为3;然后线程C将数据更新到size也就是3的位置,然后将size更新为4;这样2的位置就是null了。
有时会出现IndexOutOfBounds异常;
【现象】有时会出现IndexOutOfBounds异常;
【原因】
-
由于ArrayList的add方法,第一步是通过 ensureCapacityInternal(size + 1);
进行扩容,第二步是通过elementData[size++] = e;
添加新元素。 -
如果线程A已经进行了扩容,但还没添加新元素,此时线程B也进行了扩容(注意此时扩容是无效的,因为在线程B看来,目前的size还是原来的size),然后线程A读取size,将数据更新到size的位置,size++后结束;线程B读取size,发现已经超出了数组的界限,抛出IndexOutOfBounds异常;
解决方法
可以使用线程安全的 List:
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
或者
List<Integer> list = new CopyOnWriteArrayList<>();
参考资料
在《The Java™ Tutorials Parallelism》https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
中,有一个部分叫做 Stateful Lambda Expressions。这部分告诉我们,不要使用 Stateful Lambda Expressions,比如
e -> { parallelStorage.add(e); return e; }
因为每次运行代码时,其结果可能会有所不同;
❝Note: This example invokes the method
synchronizedList
(https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html
) so that theList
parallelStorage
is thread-safe. Remember that collections are not thread-safe. This means that multiple threads should not access a particular collection at the same time. Suppose that you do not invoke the methodsynchronizedList
when creatingparallelStorage
:List<Integer> parallelStorage = new ArrayList<>();
The example behaves erratically because multiple threads access and modify
parallelStorage
without a mechanism like synchronization to schedule when a particular thread may access theList
instance. Consequently, the example could print output similar to the following:Parallel stream:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2
并且该文章已经指出,如果我们使用的集合不是线程安全的,那么就会得到一个不稳定的实例,可能会出现 null 元素。