有如下数据:
32365 MOVE 1577808000000 {"goodid": 478777, "title": "商品478777", "price": "12000",
"shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}
90339 MOVE 1577808008000 {"goodid": 998446, "title": "商品998446", "price": "12000",
"shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}
10519 ORDER 1577808016000 {"goodid": 914583, "title": "商品914583", "price": "12000",
"shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}
53844 CART 1577808024000 {"goodid": 4592971, "title": "商品4592971", "price": "12000",
"shopid": "1", "mark": "mark"} 6.0.0 android {"appid": "123456", "appversion": "11.0.0"}
字段如下:
- 其中goodinfo和appinfo为上表所示的json格式的字段,现要求使用pyspark提取出解析其中的json内容(也可以用正则提取)并写入到hive表中
userid int,
action string,
acttime string,
goodinfo string,
version string,
system string,
appinfo string
思路分析:
- 本题只需要解析json内容,并且是从表中读取写到一张新表中,所以本题使用pyspark,可以使用pyspark下的get_json_object函数
踩雷汇总:
- python字符集与hadoop平台默认不一致,因此需要在第一行加
# -*- coding:utf-8 -*-
使得文本在utf-8的环境下生成 - 配置本地spark,安装findpyspark模块并且初始化本地模式下的spark,否则会报
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM
的错误
代码如下:
import findspark
findspark.init()
from pyspark.sql import HiveContext, SparkSession
import pyspark.sql.functions as F
if __name__ == '__main__':
spark = SparkSession.builder.master("local[*]").appName("logs") \
.config("hive.metastore.uris", "thrift://single:9083") \
.enableHiveSupport().getOrCreate()
df = spark.sql("select * from ods_myshops.ods_logs")
df.withColumn("goodid",F.get_json_object("goodinfo","$.goodid"))\
.withColumn("title",F.get_json_object("$.goodinfo","$.title"))\
.withColumn("price",F.get_json_object("goodinfo","$.price"))\
.withColumn("shopid",F.get_json_object("goodinfo","$.shopid"))\
.withColumn("mark",F.get_json_object("goodinfo","$.mark"))\
.withColumn("soft"
,F.when(F.instr(df['appinfo'],"browsetype")==0
,F.get_json_object("appinfo","$.appid"))
.otherwise(F.get_json_object("appinfo","$.browsetype")))\
.withColumn("soft_version"
,F.when(F.instr(df['info'],"browsetype")==0
,F.get_json_object("appinfo","$.appversion"))
.otherwise("appinfo","$.browseversion"))\
.drop("goodinfo","appinfo")\
.write.format("hive").mode("overwrite")\
.saveAsTable("ods_myshops.ods_newlog")
有如下数据:32365 MOVE 1577808000000 {"goodid": 478777, "title": "商品478777", "price": "12000", "shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}90339 MOVE 1577808008000 {"goodid": 998446, "title": "商品998446", "pri
把本地数据导入到Hive
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('write_data').getOrCreate()
import pyspark.sql.functions as F
from pyspark.sql.types import * # Row, StructTyp...
初次写入分区数据
spark写入hive分区表时,如果数据表事先不存在,可以选择手动建表,可以使用以下代码写入数据,会自动创建数据表:
df = spark.createDataFrame([(1, "alice", "20220412"), (2, "bob", "20220412")], ["id", "name", "date"])
df.show()
df.write.format("orc").mode("overwrite").partitionBy("date").saveAsTable("t
spark有3种数据结构——RDD、DataFrame、DataSet。这里展示的文件读写方式,都是针对dataFrame数据结构的,也就是文件读进来之后,是一个spark dataFrame。
1、读写hdfs上的文件
1.1 读写hdfs上的文件 ——> 按照指定文件格式读取与保存
SparkSession在读取文件时,可以指定读取文件的格式。举个例子。
按照csv文件格式,读取文件(其余的文件格式只需将csv变成相应的文件格式名称即可)
【读取】
from pyspark.sql.types i
首先说明,此方案是一个不可行的方案。与导入Mysql数据库不同,
Hive数据库不支持记录级数据插入;即使一些版本支持,插入速度也是奇慢。
Hive主要优势在于处理批量数据,数据量越大越能体现出性能优势;数据量小,如记录级数据插入,则没有可用性。所以,对于使用python将
json数据
解析出来再一条条插入的方法肯定是行不通的。方案记录在此,为通过python连接操作
Hive数据库等提供一些参考。
总体来说,有两大类方法:
1、将json以字符串的方式整个入Hive表,然后使用LATERAL VIEW json_tuple的方法,获取所需要的列名。
2、将json拆成各个字段,入Hive表。这将需要使用第三方的SerDe。
第一种方法的的缺点是不能处理复杂类型(如果hive表中字段为array,map等)。
1. 创建表
[sql] view
public class Movie
JsonParser extends UDF {
public String evaluate(String
json) throws IOException {
ObjectMapper objectMapper = new ObjectMapper...
文章目录1、需求:2、知识点:3、方法1:4、方法2:4.1、伴生类创建KafkaProducer包装器4.2、SparkStreaming消费kafka并写入kafka
1、需求:
使用sparkStreaming对kafka中某topic数据进行数据处理后再重新写入kafka
2、知识点:
SparkStreaming连接kafka进行消费
rdd算子写入kafka
伴生类与伴生对象的使用
producerRecord手动序列化
3、方法1:
KafkaProducer不可序
java flink是一个高性能的分布式流式计算框架,可以实现大规模的数据处理和分析。而hive是一个分布式数据仓库工具,可以用于存储和查询大规模的结构化数据。redis是一个高性能的内存数据库,可以用于存储和检索数据。
要实现java flink读取hive表中的数据写入redis,我们可以按照以下步骤进行操作:
1. 在java flink中,首先需要配置并连接到hive数据库。可以使用Flink的HiveCatalog来创建一个连接到Hive的catalog,并设置相关的hive metastore地址、用户名和密码等。
2. 根据需要,编写flink程序来读取hive表的数据。可以使用flink的DataStream或Table API来读取hive表数据,并将其转换为适当的数据流或表。
3. 在准备好数据之后,我们可以使用flink的RedisSink来将数据写入redis中。在使用RedisSink之前,需要先引入flink-connector-redis的依赖包,并在flink配置文件中配置好redis的连接参数,如redis的主机地址、端口号、密码等。
4. 编写代码将数据写入redis。可以根据数据的特点,选择将整个数据写入一个redis数据结构中,或者将数据分解为多个key-value对存储到redis中。
5. 在代码编写完成后,我们可以使用flink提供的命令或者IDE工具来运行flink程序,它会自动连接到hive数据库和redis,并完成数据的读取和写入。
通过以上步骤,java flink就可以实现读取hive表中的数据并写入redis。这样可以通过flink的分布式计算和hive的数据存储能力,结合redis的高速读写能力,实现大规模数据的处理和查询。