Java 并行操作之流的使用

全栈港

共 5637字,需浏览 12分钟

 ·

2021-02-02 02:30

在实际开发过程中,我们使用最多的程序结构是顺序结构,也就是序列 sequence,通常还会与循环结构的代码相混合 Loops

顺序结构只能依次执行任务,消耗的时间是各个顺序任务的总和。当对接口的请求时间有要求,需要对任务的处理时间进行压缩,转化到代码上就是对代码结构进行优化,我们希望将序列任务转化为并行任务 parallelism。并行指的是同时发生,在计算机上为每个任务分配 单独的 CPU 核心,同时进行任务处理完毕后进行汇总。如果你了解 Hadoop ,这种方式与 MapReduce 如出一辙。

我们可以调用 parallelStream 方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样就可以把工作负荷分配给多核处理器的所有内核,让它们都忙起来。

什么是流

流是 JavaAPI 的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。

Stream<T>

支持顺序 sequential  和并行 parallel  聚合操作的元素序列。下面的示例演示了使用 Stream和 IntStream 的聚合操作。

     int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();

在这个例子中,widgets是一个 Collection<Widget>。我们通过以下方法创建一个小部件对象流  Collection.stream() 。该方法返回以此集合为源的顺序流。如上方法过滤颜色为红色的 Widgets 并将 Collection<widget> 转化为Collection<int> 的流,并对所有int流求和。

流在与集合主要不同:

  • 没有存储空间。流不是存储元素的数据结构。

  • 本质上是功能性的。对流的操作会产生结果,但不会修改其源。例如,上面的例子不会修改 Collection<Widget>。

  • 消耗品。在流的生存期内,流中元素只能访问一次。

流操作分为中间(intermediate )操作终端(terminal )操作,并合并以形成流管道(stream pipelines)。流管道由源(例如 Collection,数组,生成器函数或I / O通道)组成;随后是零个或多个中间操作,例如 Stream.filterStream.map;以及诸如Stream.forEach 或终端操作 Stream.reduce

终端操作(例如Stream.forEach或 IntStream.sum)可能会遍历流执行操作。执行终端操作后,流管道被视为已消耗,无法再使用;如果需要再次遍历同一数据源,则必须返回到数据源以获取新的流。

流的各种操作

我们使用一个 Menu 菜单列表的例子来看一下流的各种操作

List < Dish > menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), 
new Dish("beef", false, 700, Dish.Type.MEAT),
new Dish("chicken", false, 400, Dish.Type.MEAT),
new Dish("french fries", true, 530, Dish.Type.OTHER),
new Dish("rice", true, 350, Dish.Type.OTHER),
new Dish("season fruit", true, 120, Dish.Type.OTHER),
new Dish("pizza", true, 550, Dish.Type.OTHER),
new Dish("prawns", false, 300, Dish.Type.FISH),
new Dish("salmon", false, 450, Dish.Type.FISH) );


public class Dish {
private final String name;
private final boolean vegetarian;
private final int calories;
private final Type type;

public boolean isVegetarian() { return vegetarian; }

public enum Type { MEAT, FISH, OTHER }

//...省略 Getter Setter
}


如上的集合可以使用如下流操作

import static java.util.Comparator.comparing; 
import static java.util.stream.Collectors.toList;

List <String> lowCaloricDishesName = menu.stream()
.filter(d -> d.getCalories() < 400) // 选出400卡路里以下的菜肴
.sorted(comparing(Dish::getCalories)) // 按照卡路里排序
.map(Dish::getName) // 提取菜肴的名称
.limit(3) // 只选择头三个
.collect(toList()); // 将所有名称保存到 List 中

为 了 利 用 多 核 架 构 并 行 执 行 这 段 代 码, 你 只 需 要 把 stream() 换 成 parallelStream() :

List < String > lowCaloricDishesName = menu.parallelStream() 
.filter(d -> d.getCalories() < 400)
.sorted(comparing( Dishes::getCalories))
.map(Dish::getName)
.limit(3)
.collect(toList());

数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如 filter 、map 、reduce 、find 、match 、sort 等。流操作可以顺序执行,也可并行执行。

如上操作中

  • filter —— 接受 Lambda, 从流中排除某些信息

  • map —— 接受 Lambada,将元素转化为其他形式或提取信息。

  • limit —— 截断流,使元素不超过指定数量。

  • collect ——将流转化为其他形式。

你可以看到两类操作:filter、map 和 limit 可以连成一条流水线;collect 触发流水线执行并关闭它。可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作

流的三要素

  • 一个数据源(如集合)来执行一个查询;

  • 一个中间操作链,形成一条流的流水线;

  • 一个终端操作,执行流水线,并能生成结果。

中间操作 :filter、map、limit、sorted、distinct

终端操作:forEach、count、collect

流的中间操作


使用谓词筛选

filter() 操作会接受一个谓词(一个返回boolean的函数)作为参数,并返回一个包括所有符合谓词的元素的流。filter(d -> d.getCalories() < 400) 这个代码中定义了一个匿名内部类,这个用 Lambda 表达式定义的方法返回 d.getCalories()<400 这个表达式的值,随后 filter 会根据过滤条件进行流过滤。

也可以使用 filter(Dish::isVegetarian) ,有双冒号的用法,就是把方法当做参数传到stream内部,使 stream 的每个元素都传入到该方法里面执行一下。Dish::isVegetarian 返回每个元素的 true 或 false 值。

映射

map 方法会接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映射成一个新的元素。

List < String > dishNames = menu.stream() 
.map(Dish::getName)
.collect(toList());

map 方法返回的流类型是 Stream<String>。

匹配

anyMatch 方法返回 Boolean 值,用来查找流中是否包含某个值,是一个终端操作。

if( menu.stream().anyMatch(Dish::isVegetarian)){ 
System.out.println(" The menu is vegetarian friendly!!");
}

AllMatch 方法也返回 Boolean 值,用来判断是否流中元素是否全部匹配。

NoneMatch  方法确保流中没有任何元素与谓词相匹配。

查找

可以使用流的 find 方法查找流中元素,可以使用顺序查找 findFirst() 或任意查找 findAny() 。查找方法是终端操作。

List <Integer> someNumbers = Arrays.asList(1, 2, 3, 4, 5); 
Optional <Integer> firstSquareDivisibleByThree = someNumbers.stream()
.map( x -> x * x)
.filter( x -> x % 3 = = 0)
.findFirst(); // 9

规约操作

规约操作指将流规约成一个值。也可称为折叠 fold。

元素求和

int sum = 0; 
for (int x : numbers) { sum + = x; }

// 使用流的写法
int sum = numbers.stream().reduce(0, (a, b) -> a + b);

reduce 接受两个参数:

  • 一个初始值,这里是 0;

  • 一个 BinaryOperator 来将两个元素结合起来产生一个新值,这里我们用的是 lambda (a,b)->a+b。

如上的例子中,0 作为 a 的第一个参数,从流中获取 numbers 列表中的一个数值如 3 作为 b ,两者之和 3 作为 a 的第二个参数与 numbers 列表获取的第二个数值 b =4 相加,直到所有 numbers 列表中数据都参与计算返回最终的 a + b 的 sum 值。

最大值与最小值

Optional <Integer> max = numbers.stream().reduce(Integer::max);
Optional <Integer> min = numbers.stream().reduce(Integer::min);


// 也可以使用 Lambda (X,Y)->X<Y?X:Y

数值流

int calories = menu.stream() .map(Dish:: getCalories) .reduce( 0, Integer:: sum);
int calories = menu.stream() // 返回一个 Stream <Dish>.mapToInt(Dish::getCalories) //返回一个 IntStream.sum();

IntStream intStream = menu.stream(). mapToInt( Dish:: getCalories); // 将 Stream 转换为数值流
Stream <Integer > stream = intStream.boxed(); // 将数值流转换为 Stream

并行流

并行计算数值和

public static long parallelSum( long n) { 
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //将流转换为并行流
.reduce(0L, Long:: sum);
}

并行流默认的线程数量是你的处理器数量,要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。使用并行流要避免共享对象可变状态,并且要避免自动拆装箱。对于较小的数据一般不使用并行流,并行处理少数几个元素可能耗费时间增多,因为移动多个核的数据代价也很大。

参考资料

  • 《 Java 8实战 》 . 人民邮电出版社.

  • JDK 11 官方文档


浏览 38
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报