pyspark处理数据基本语法
作为一个和数据相关的专业,想学习pyspark,从而了解并学习pyspark ,以便更好的应用到工作中。
1、连接数据库
import findspark
findspark.init()
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
# 定义数据库的地址,以及表,登录用户及密码
url = "jdbc:mysql://localhost:3306/xx"
table="table"
#密码账户需要字典的形式传入
properties ={"user":"root","password":"12345678"}
spark = SparkSession.builder.appName('My first app').getOrCreate()
df = spark.read.jdbc(url=url,table=table,properties=properties)
df.show(4)
2、查看数据维度
df.count(),len(df.columns)# 查看数据维度
3、查看字段类型
df.printSchema()# 元数据分析,查看字段类型
4、使用select 按照黎明筛选数据
df.select(['cxkh','jymc']) # 按照列名选择
5、使用filter 过滤筛选数据
df.filter((df['jyje']>100000) & (df['jysj']>'2015-01-01')) #按照条件进行筛选
6、数据类型转换
#在元数据上更改字段
df.withColumn("jyje",df3.jyje.astype("int"))
7、描述性统计
df3.describe().show()
8、增加字段
df.withColumn('test',(df['jyje']+100))
9、筛选空值
df[df['jydfmc'].isNull()] # 筛选未空
10、缺失值填充
dd.fillna('wz').show(2) #缺失值填充
dd.na.fill('wz').show(2) #缺失值填充
11、分组处理
#按照一个index分组
df.groupBy('jymc').count().orderBy('count',ascending=False).show(3)
#按照两个index分组
df.groupBy('jymc','cxkh').agg({'jyje':'sum','jysj':'max'}).orderBy('sum(jyje)',ascending=False).show(5,False) #特定函数聚合求解
12、去重
df.select(['cxkh','jymc']).dropDuplicates().show(5)
13、遍历处理,类似于Python 的lambda
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType #类似于lambda
brand_udf=udf(jyje_level,StringType()) #注册udf
df.withColumn('jyje_level',brand_udf(df['jyje'])).show(5)
14、模糊匹配
df.select(['cxkh','jymc','jdbz','jyje','jydfmc']).filter("jydfmc rlike '基金|理财|证券'") # 模糊匹配
15、修改指定列名
df.select('cxkh','jysj','jyje').groupBy('cxkh').agg({'jysj':'max','jyje':'max'}).\
withColumnRenamed('cxkh','cxkh1').withColumnRenamed('max(jysj)','最晚一次交易时间').\
withColumnRenamed('max(jyje)','最大交易金额') #修改指定字段的名字, df.seleceExpr('first_name as lase_name'),这个好像需要字段名全部写上,否则最终显示结果只有更改了列名的那一列。
16、数据框拼接
df_person.join(df_lastime,df_person['cxkh']==df_lastime['cxkh1'],\
'left_outer').orderBy('cxkh',ascending=True)# 两表拼接指定字段的拼接,其他的连接方式:inner, outer, left_outer, right_outer, leftsemi
17、pyspark 中实现sql语句的查询
df.registerTempTable('df1') # 将数据框注册为一个table
spark.sql("select * from df1where jymc='xxx'").show(2)#程序 mysql程序
18、取交集、差集、并集
df.select('jymc').dropDuplicates().intersect(df.select('jydfmc').dropDuplicates())#交集
df.select('jymc').dropDuplicates().subtract(df.select('jydfmc').dropDuplicates())#差集
df.select('jymc').dropDuplicates().union(df.select('jydfmc').dropDuplicates())#并集
19、并集去重
df.select('jymc').union(df.select('jydfmc')).distinct()#并集+去重
20、数据拆分
from pyspark.sql.functions import split
df.withColumn('jyrq',split(df_year['jysj']," ").getItem(0))#拆分后取第一个元素
df.withColumn('jytime',split(df_year['jysj']," ")[1])#拆分后取第二个元素
21、获取对应的年,月,日,一周内第几天,一年内第几天
from pyspark.sql.functions import month,year,dayofmonth,dayofweek,dayofyear
df.withColumn('year',year('jysj')).\
withColumn('month',month('jysj')).\
withColumn('day',dayofmonth('jysj')).\
withColumn('week',dayofweek('jysj')).\
withColumn('day_num',dayofyear('jysj')) # 获取对应的年,月,日,一周内第几天,一年内第几天
22、数据写出
#写出在多个csv文件中,一个csv文件为一行数据
df.write.save('path.csv')
#写出在一个csv 文件中