Spark Transformations 算子

Python之王

共 16056字,需浏览 33分钟

 · 2024-04-11

下面对相关常用算子进行演示。

三、Transformations 算子

1. map

RDD 中的数据 一对一 的转为另一种形式:

例如:

  • scala:
      
      val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(
  num.map(_+1).collect().toList
)

  • java:
      
      JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(
       num.map(i -> i + 1).collect()
);

  • python:
      
      num = sc.parallelize((1, 2, 3, 4, 5))
print(
    num.map(lambda i:i+1).collect()
)

bd872f20d1f419cc13de8ca67770a0de.webp

2. flatMap

Map 算子类似,但是 FlatMap 是一对多,并都转化为一维数据:

例如:

  • scala:
      
      val text = sc.parallelize(Seq("abc def""hello word""dfg,okh""he,word"))
println(
  text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
)

  • java:
      
      JavaRDD<String> text = sc.parallelize(Arrays.asList("abc def""hello word""dfg,okh""he,word"));
System.out.println(
        text.flatMap(s ->Arrays.asList(s.split(" ")).iterator())
                .flatMap(s ->Arrays.asList(s.split(",")).iterator())
                .collect()
);

  • python:
      
      text = sc.parallelize(("abc def""hello word""dfg,okh""he,word"))
print(
    text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
)

301407e77a8a33989b661bfdeb9531bb.webp

3. filter

过滤掉不需要的内容:

例如:

  • scala:
      
      val text = sc.parallelize(Seq("hello""hello""word""word"))
println(
  text.filter(_.equals("hello")).collect().toList
)

  • java:
      
      JavaRDD<String> text = sc.parallelize(Arrays.asList("hello""hello""word""word"));
System.out.println(
        text.filter(s -> Objects.equals(s,"hello"))
                .collect()
);

  • python:
      
      text = sc.parallelize(("hello""hello""word""word"))
print(
    text.filter(lambda s: s == 'hello').collect()
)

b657425fc0db190f1bf8c89c7acdff0d.webp

4. mapPartitions

map 类似,针对整个分区的数据转换,拿到的是每个分区的集合:

例如:

  • scala:
      
      val text = sc.parallelize(Seq("hello""hello""word""word"), 2)
println(
  text.mapPartitions(iter => {<!-- -->
    iter.map(_ + "333")
  }).collect().toList
)

  • java:
      
      JavaRDD<String> text = sc.parallelize(Arrays.asList("hello""hello""word""word"), 2);
System.out.println(
        text.mapPartitions(iter -> {<!-- -->
            List<String> list = new ArrayList<>();
            iter.forEachRemaining(s -> list.add(s+"333"));
            return list.iterator();
        }).collect()
);

  • python:
      
       text = sc.parallelize(("hello""hello""word""word"), 2)
 
 def partition(par):
     tmpArr = []
     for s in par:
         tmpArr.append(s + "333")
     return tmpArr

 print(
     text.mapPartitions(partition).collect()
 )

957c23570fe9f2c1f614d505a23d5d3c.webp

5. mapPartitionsWithIndex

mapPartitions 类似, 只是在函数中增加了分区的 Index

例如:

  • scala:
      
      val text = sc.parallelize(Seq("hello""hello""word""word"), 2)
println(
  text.mapPartitionsWithIndex((index, iter) => {<!-- -->
    println("当前分区" + index)
    iter.map(_ + "333")
  }, true).collect().toList
)

  • java:
      
      JavaRDD<String> text = sc.parallelize(Arrays.asList("hello""hello""word""word"), 2);
System.out.println(
       text.mapPartitionsWithIndex((index, iter) -> {<!-- -->
           System.out.println("当前分区" + index);
           List<String> list = new ArrayList<>();
           iter.forEachRemaining(s -> list.add(s + "333"));
           return list.iterator();
       }, true).collect()
);

  • python:
      
      text = sc.parallelize(("hello""hello""word""word"), 2)

def partition(index, par):
    print("当前分区" + str(index))
    tmpArr = []
    for s in par:
        tmpArr.append(s + "333")
    return tmpArr

print(
    text.mapPartitionsWithIndex(partition).collect()
)

fa4f803213527e674db8c638d196da17.webp

6. mapValues

只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value

例如:

  • scala:
      
      val text = sc.parallelize(Seq("abc""bbb""ccc""dd"))
println(
  text.map((_, "v" + _))
    .mapValues(_ + "66")
    .collect().toList
)

  • java:
      
      JavaRDD<String> text = sc.parallelize(Arrays.asList("abc""bbb""ccc""dd"));
System.out.println(
       text.mapToPair(s -> new Tuple2<>(s, "v" + s))
               .mapValues(v -> v + "66").collect()
);

  • python:
      
      text = sc.parallelize(("abc""bbb""ccc""dd"))
print(
    text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
)

21fa756da7f491064beeaab51a9e5cac.webp

7. sample

可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:

第一个参数为withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。

第二个参数为fraction, 意为抽样的比例。

第三个参数为seed, 随机数种子, 用于 Sample 内部随机生成下标,一般不指定,使用默认值。

例如:

  • scala:
      
      val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(
  num.sample(true,0.6,2)
    .collect().toList
)

  • java:
      
      JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(
    num.sample(true, 0.6, 2).collect()
);

  • python:
      
      num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(
    num.sample(True, 0.6, 2).collect()
)

0e99fd459da1f475dad1e0b275db51ad.webp

8. union

两个数据并集,类似于数据库的 union

例如:

  • scala:
      
      val text1 = sc.parallelize(Seq("aa""bb"))
val text2 = sc.parallelize(Seq("cc""dd"))
println(
  text1.union(text2).collect().toList
)

  • java:
      
      JavaRDD<String> text1 = sc.parallelize(Arrays.asList("aa""bb"));
JavaRDD<String> text2 = sc.parallelize(Arrays.asList("cc""dd"));
System.out.println(
        text1.union(text2).collect()
);

  • python:
      
      text1 = sc.parallelize(("aa""bb"))
text2 = sc.parallelize(("cc""dd"))
print(
   text1.union(text2).collect()
)

aaf4ee4dacf3798524463ea3a9089d35.webp

9. join,leftOuterJoin,rightOuterJoin

两个(key,value)数据集,根据 key 取连接、左连接、右连接,类似数据库中的连接:

例如:

  • scala:
      
      val s1 = sc.parallelize(Seq("1,3""2,6""3,8""4,2"))
val s2 = sc.parallelize(Seq("1,小明""2,小张""3,小李""4,小红""5,张三"))

val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))

println(s3.join(s4).collectAsMap)
println(s3.leftOuterJoin(s4).collectAsMap)
println(s3.rightOuterJoin(s4).collectAsMap)

  • java:
      
      JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3""2,6""3,8""4,2"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明""2,小张""3,小李""4,小红""5,张三"));

JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));

System.out.println(s3.join(s4).collectAsMap());
System.out.println(s3.leftOuterJoin(s4).collectAsMap());
System.out.println(s3.rightOuterJoin(s4).collectAsMap());

  • python:
      
      s1 = sc.parallelize(("1,3""2,6""3,8""4,2"))
s2 = sc.parallelize(("1,小明""2,小张""3,小李""4,小红""5,张三"))

s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))

print(s3.join(s4).collectAsMap())
print(s3.leftOuterJoin(s4).collectAsMap())
print(s3.rightOuterJoin(s4).collectAsMap())

676ef863c47a9e9366e4e228d504e5ce.webp

10. intersection

获取两个集合的交集 :

例如:

  • scala:
      
      val s1 = sc.parallelize(Seq("abc""dfe""hello"))
val s2 = sc.parallelize(Seq("fgh""nbv""hello""word""jkl""abc"))
println(
  s1.intersection(s2).collect().toList
)

  • java:
      
      JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""dfe""hello"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh""nbv""hello""word""jkl""abc"));
System.out.println(
     s1.intersection(s2).collect()
);

  • python:
      
      s1 = sc.parallelize(("abc""dfe""hello"))
s2 = sc.parallelize(("fgh""nbv""hello""word""jkl""abc"))
print(
    s1.intersection(s2).collect()
)

8c677304f883e79473c2c7df92162a76.webp

11. subtract

获取差集,a - b ,取 a 集合中 b 集合没有的元素:

例如:

  • scala:
      
      val s1 = sc.parallelize(Seq("abc""dfe""hello"))
val s2 = sc.parallelize(Seq("fgh""nbv""hello""word""jkl""abc"))
println(
  s1.subtract(s2).collect().toList
)

  • java:
      
      JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""dfe""hello"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh""nbv""hello""word""jkl""abc"));
System.out.println(
        s1.subtract(s2).collect()
);

  • python:
      
      s1 = sc.parallelize(("abc""dfe""hello"))
s2 = sc.parallelize(("fgh""nbv""hello""word""jkl""abc"))
print(
    s1.subtract(s2).collect()
)

a7a13d9f7dd31339a88e0425dd00f2c4.webp

12. distinct

元素去重,是一个需要 Shuffled 的操作:

例如:

  • scala:
      
      val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
println(
  s1.distinct().collect().toList
)

  • java:
      
      JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
System.out.println(
    s1.distinct().collect()
);

  • python:
      
      s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
print(
  s1.distinct().collect()
)

641f2f04339d6cf8d6f5382147674d7b.webp

13. reduceByKey

只能作用于 Key-Value 型数据,根据 Key 分组生成一个 Tuple,然后针对每个组执行 reduce 算子,传入两个参数,一个是当前值,一个是局部汇总,这个函数需要有一个输出, 输出就是这个 Key 的汇总结果,是一个需要 Shuffled 的操作:

例如:

  • scala:
      
      val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
println(
  s1.map((_, 1))
    .reduceByKey(Integer.sum)
    .collectAsMap
)

  • java:
      
      JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
System.out.println(
   s1.mapToPair(s -> new Tuple2<>(s, 1))
           .reduceByKey(Integer::sum)
           .collectAsMap()
);

  • python:
      
      s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
print(
  s1.map(lambda s: (s, 1))
      .reduceByKey(lambda v1, v2: v1 + v2)
      .collectAsMap()
)

4d59bf8faa6ee8e3a012ffcc9a0ac2eb.webp

14. groupByKey

只能作用于 Key-Value 型数据,根据 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value,是一个需要 Shuffled 的操作。

GroupByKeyReduceByKey 不同,因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好:

例如:

  • scala:
      
      val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
println(
  s1.map((_, 1))
    .groupByKey()
    .collectAsMap
)

  • java:
      
      JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
System.out.println(
        s1.mapToPair(s -> new Tuple2<>(s, 1))
                .groupByKey()
                .collectAsMap()
);

  • python:
      
      s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
print(
    s1.map(lambda s: (s, 1))
        .reduceByKey()
        .collectAsMap()
)


浏览 4
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报