pyspark partitionby与mapPartitions并行

在大数据中,算法工程师经常使用spark来进行模型训练,但是基于不同的业务场景和模型的训练时间要求,算法工程师可能需要并行运行某个任务(训练模型)
举个例子:在电商场景中,不同的品类具有不同的时序性,具有不同的表现,工程师可能想将每个品类都运行一个算法,最终汇总看看效果,但是for循环满足不了时间的要求,这时候就可以运用mapPartitions来处理啦

1. 先定义个rdd数据吧

from pyspark import SparkContext as sc
from pyspark import SparkConf
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = sc.getOrCreate(conf)
rdd = sc.parallelize([(1,2),(1,3),(2,3),(2,4),(6,6)],3)
print(rdd)
print(rdd.glom().collect())
print(rdd.getNumPartitions())
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
[[(1, 2)], [(1, 3), (2, 3)], [(2, 4), (6, 6)]]

2. repartition 和 partitionBy

2.1 repartition(self,numPartitions)

  • numPartitions:划分到多少个partition
  • return:MapPartitionsRDD
    对rdd、dataframe或者dataset进行随机划分,注意可能某个分区的数据为空
b = rdd.repartition(3)
print(b)
print(b.glom().collect())
print(b.getNumPartitions())
MapPartitionsRDD[10] at coalesce at NativeMethodAccessorImpl.java:0
[[], [(1, 2), (2, 4), (6, 6)], [(1, 3), (2, 3)]]

某个分区的数据为空

2.2 partitionby(self,numPartitions,parttitionFunc)

  • numPartitions:划分到多少个partition
  • parttitionFunc:分区函数,可根据某个字段指定分区位置
  • return:MapPartitionsRDD

可用此函数对数据集进行按组分区,对组内成员进行操作

a = rdd.partitionBy(3,lambda x:x)
print(a)
print(a.glom().collect())
print(a.getNumPartitions())

由于此输入是个pair_rdd,且pair_rdd中每个组合的第一个元素都是数字,所以可实现按照第一个元素的值当做partition的位置,所以此处的lambda x:x实现了pair_rdd按照index分区

MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:133
[[(6, 6)], [(1, 2), (1, 3)], [(2, 3), (2, 4)]]
[(1, 2), (1, 3)]
[(2, 3), (2, 4)]
[(6, 6)]

按照一个元素分区了,含有相同index的划分到同一个partition中。

2.3 分组列为string类型,partitionBy

首先,构建数据集

from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]")\
        .getOrCreate()
schema = StructType([StructField("k", StringType(), True),
                         StructField("v", IntegerType(), True)])
rdd_df = spark.createDataFrame([("a",2),("a",3),("b",3),("b",4),("c",6)],schema=schema)
print(rdd_df.show(5))
+---+---+
|  k|  v|
+---+---+
|  a|  2|
|  a|  3|
|  b|  3|
|  b|  4|
|  c|  6|
+---+---+

得到一个dataframe
此时我们想对"k"列进行分组分区
那么分步骤进行,首先获取"k"列的去重list,并且构建映射字典

print("得到分区列的去重值")
partition_column = "k"
partition_column_list = rdd_df.select(partition_column).distinct().rdd.flatMap(lambda x:x).collect()
print("由于分区列是字符串,分区是数字,所以需要映射一下")
partition_dic = {}
for i ,v in enumerate(partition_column_list):
    partition_dic[v] = i
print(partition_dic)
由于分区列是字符串,分区是数字,所以需要映射一下
{'c': 0, 'b': 1, 'a': 2}

第二步:分区

map_par_rdd= rdd_df.rdd.map(lambda x:(x[partition_column], x)).partitionBy(len(partition_column_list),lambda x: partition_dic[x])
print(map_par_rdd.glom().collect())
[[('c', Row(k='c', v=6))], [('b', Row(k='b', v=3)), ('b', Row(k='b', v=4))], [('a', Row(k='a', v=2)), ('a', Row(k='a', v=3))]]

可见,此方法将某个取值相同的数据分到了同一个分区

2.4 partitionby与mapPartitions并行

基于上述,可得到一个mappartition_rdd
那么基于这个index为string类型的pair_rdd该如何在mapPartitions中操作呢?
现在举个简单的例子,请看代码

print("执行mapPartition")
        def fruc_1(row_list):
        res = []
        for k,v in row_list:
            res.append(v)
        df = pd.DataFrame(res,columns=["k","v"])
        df["partition_id"] = k
        return df.values.__iter__()
result = map_par_rdd.mapPartitions(fruc_1)
print(result.collect())
[array(['c', 6, 0], dtype=object), 
array(['b', 3, 1], dtype=object),
array(['b', 4, 1], dtype=object), 
array(['a', 2, 2], dtype=object), 
array(['a', 3, 2], dtype=object)]

可见第三列为partition_id

3. mapPartitions的优缺点

  • mapPartitions能够实现并行化,时效性更快,性能更好,在此申明,在分区完成后,各个分区的数据都是在本地进行操作的
  • mapPartitions当分区中的操作完成后,会汇总到executor中,要注意,因为这时候会读入内存,如果内存不足,会报oom
RDD作为分布式计算弹性数据集在PySpark占有十分重要的地位,因此学会如何操作RDD的pyspark的接口函数显得十分重要,PySpark系列的专栏文章目前的话应该只会比Pandas更多不会更少,可以用PySpark实现的功能太多了,基本上Spark能实现的PySpark都能实现,而且能够实现兼容python其他库,这就给了PySpark极大的使用空间,能够结合大数据集群实现更高效更精确的大数据处理或者预测。如果能够将这些工具都使用的相当熟练的话,那必定是一名优秀的大数据工程师。 在Spark这类分布式程序中,通信的开销非常大。控制数据分区的意义就在于,通过合理的数据分布减少网络传输从而提升性能。对数据进行分区主要用于优化基于键的操作。上面的代码可以进行曝光过滤,但是考虑到latest_impressions需要实时更新,这个过滤操作可能会被经常调用,而上面的代码每次都会执行join()操作,导致代码效率很低。实际工作中,存储全量用户的user_recall表要比一直更新的latest_impressions表大很多,并且没有那么频繁的更新。 关于 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 的问题 Spark API  之 mapmapPartitions、mapValues、flatMap、flatMapValues详解1、创建一个RDD变量,通过help函数,查看相关函数定义和例子:>>> a = sc.parallelize([(1,2),(3,4),(5,6)]) >>> a ParallelCollectionRDD[21] at paral... mapPartitions 对一个分区进行操作,如果要实现向map一样的处理,函数里面需要遍历分区中的每一行。 def f(partitionData): for element in partitionData: # return updated data df.rdd.mapPartitions(f) +---------------+-----+ | name|bonus| +---------------+-----+ | James,S def partitionBy(self, numPartitions, partitionFunc=portable_hash): def partitionBy(self, numPartitions, partitionFunc=portable_hash): Return a copy of the RDD partitioned us 笔者最近在尝试使用PySpark,发现pyspark.dataframe跟pandas很像,但是数据操作的功能并不强大。由于,pyspark环境非自建,别家工程师也不让改,导致本来想pyspark环境跑一个随机森林,用 《Comprehensive Introduction to Apache Spark, RDDs & Dataframes (using PySpark) 》中的案例,... ## F.when(if条件,if结果).otherwise(else结果) df = df.withColumn('a', F.when(F.col('a').isin([1,2,3]) | F.isnan(F.col('a')) | F.col('a').isNull(), F.col('a')).othe from pyspark.sql import Row from pyspark.sql.window import Window from pyspark.sql.functions import mean, col row = Row("name", "date", "score") rdd = sc.parallelize([ row("Ali", "2020-01-01". TopicAndPartition(self.topic_name, i) ] = int(0)#指定偏移量 kafkaStreams = KafkaUtils.createDirectStream... 網路上的精選摘要 In Spark or PySpark repartition is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the Spark coalesce is used to only decrease the number of partitions in an efficient way.2020年4月12日 Spark Repartition() vs Coalesce