pyspark处理数据基本语法

作为一个和数据相关的专业,想学习pyspark,从而了解并学习pyspark ,以便更好的应用到工作中。

1、连接数据库

import findspark
findspark.init()
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
# 定义数据库的地址,以及表,登录用户及密码
url = "jdbc:mysql://localhost:3306/xx" 
table="table"
#密码账户需要字典的形式传入
properties ={"user":"root","password":"12345678"}
spark = SparkSession.builder.appName('My first app').getOrCreate()
df = spark.read.jdbc(url=url,table=table,properties=properties)
df.show(4)

2、查看数据维度

df.count(),len(df.columns)# 查看数据维度

3、查看字段类型

df.printSchema()# 元数据分析,查看字段类型

4、使用select 按照黎明筛选数据

df.select(['cxkh','jymc']) # 按照列名选择

5、使用filter 过滤筛选数据

df.filter((df['jyje']>100000) & (df['jysj']>'2015-01-01')) #按照条件进行筛选

6、数据类型转换

#在元数据上更改字段
df.withColumn("jyje",df3.jyje.astype("int"))

7、描述性统计

df3.describe().show()  

8、增加字段

df.withColumn('test',(df['jyje']+100))

9、筛选空值

df[df['jydfmc'].isNull()]  # 筛选未空

10、缺失值填充

dd.fillna('wz').show(2) #缺失值填充
dd.na.fill('wz').show(2)  #缺失值填充

11、分组处理

#按照一个index分组
df.groupBy('jymc').count().orderBy('count',ascending=False).show(3)
#按照两个index分组
df.groupBy('jymc','cxkh').agg({'jyje':'sum','jysj':'max'}).orderBy('sum(jyje)',ascending=False).show(5,False) #特定函数聚合求解

12、去重

df.select(['cxkh','jymc']).dropDuplicates().show(5)

13、遍历处理,类似于Python 的lambda

from pyspark.sql.functions import  udf
from pyspark.sql.types import  StringType  #类似于lambda
brand_udf=udf(jyje_level,StringType())  #注册udf
df.withColumn('jyje_level',brand_udf(df['jyje'])).show(5)   

14、模糊匹配

df.select(['cxkh','jymc','jdbz','jyje','jydfmc']).filter("jydfmc rlike  '基金|理财|证券'") # 模糊匹配

15、修改指定列名

df.select('cxkh','jysj','jyje').groupBy('cxkh').agg({'jysj':'max','jyje':'max'}).\
withColumnRenamed('cxkh','cxkh1').withColumnRenamed('max(jysj)','最晚一次交易时间').\
withColumnRenamed('max(jyje)','最大交易金额') #修改指定字段的名字, df.seleceExpr('first_name as lase_name'),这个好像需要字段名全部写上,否则最终显示结果只有更改了列名的那一列。

16、数据框拼接

df_person.join(df_lastime,df_person['cxkh']==df_lastime['cxkh1'],\
                                       'left_outer').orderBy('cxkh',ascending=True)# 两表拼接指定字段的拼接,其他的连接方式:inner, outer, left_outer, right_outer, leftsemi

17、pyspark 中实现sql语句的查询

df.registerTempTable('df1') # 将数据框注册为一个table
spark.sql("select * from df1where jymc='xxx'").show(2)#程序 mysql程序

18、取交集、差集、并集

df.select('jymc').dropDuplicates().intersect(df.select('jydfmc').dropDuplicates())#交集
df.select('jymc').dropDuplicates().subtract(df.select('jydfmc').dropDuplicates())#差集
df.select('jymc').dropDuplicates().union(df.select('jydfmc').dropDuplicates())#并集

19、并集去重

df.select('jymc').union(df.select('jydfmc')).distinct()#并集+去重

20、数据拆分

from pyspark.sql.functions import split
df.withColumn('jyrq',split(df_year['jysj']," ").getItem(0))#拆分后取第一个元素
df.withColumn('jytime',split(df_year['jysj']," ")[1])#拆分后取第二个元素

21、获取对应的年,月,日,一周内第几天,一年内第几天

from pyspark.sql.functions import month,year,dayofmonth,dayofweek,dayofyear
df.withColumn('year',year('jysj')).\
withColumn('month',month('jysj')).\
withColumn('day',dayofmonth('jysj')).\
withColumn('week',dayofweek('jysj')).\
withColumn('day_num',dayofyear('jysj'))  # 获取对应的年,月,日,一周内第几天,一年内第几天
拆分后的数据结果,对应的年月日等
拆分后的字段类型

22、数据写出

#写出在多个csv文件中,一个csv文件为一行数据
df.write.save('path.csv')
#写出在一个csv 文件中