相关文章推荐

有如下数据:

32365 MOVE 1577808000000 {"goodid": 478777, "title": "商品478777", "price": "12000", 
	"shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}
90339 MOVE 1577808008000 {"goodid": 998446, "title": "商品998446", "price": "12000",
	 "shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}
10519 ORDER 1577808016000 {"goodid": 914583, "title": "商品914583", "price": "12000",
	"shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}
53844 CART 1577808024000 {"goodid": 4592971, "title": "商品4592971", "price": "12000",
	 "shopid": "1", "mark": "mark"} 6.0.0 android {"appid": "123456", "appversion": "11.0.0"}

字段如下:

  • 其中goodinfo和appinfo为上表所示的json格式的字段,现要求使用pyspark提取出解析其中的json内容(也可以用正则提取)并写入到hive表中
userid int,
action string,
acttime string,
goodinfo string,
version string,
system string,
appinfo string

思路分析:

  • 本题只需要解析json内容,并且是从表中读取写到一张新表中,所以本题使用pyspark,可以使用pyspark下的get_json_object函数

踩雷汇总:

  • python字符集与hadoop平台默认不一致,因此需要在第一行加# -*- coding:utf-8 -*-使得文本在utf-8的环境下生成
  • 配置本地spark,安装findpyspark模块并且初始化本地模式下的spark,否则会报py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM的错误

代码如下:

# -*- coding:utf-8 -*-
import findspark
findspark.init()
from pyspark.sql import HiveContext, SparkSession
import pyspark.sql.functions as F
if __name__ == '__main__':
    spark = SparkSession.builder.master("local[*]").appName("logs") \
        .config("hive.metastore.uris", "thrift://single:9083") \
        .enableHiveSupport().getOrCreate()
    df = spark.sql("select * from ods_myshops.ods_logs")
    df.withColumn("goodid",F.get_json_object("goodinfo","$.goodid"))\
        .withColumn("title",F.get_json_object("$.goodinfo","$.title"))\
        .withColumn("price",F.get_json_object("goodinfo","$.price"))\
        .withColumn("shopid",F.get_json_object("goodinfo","$.shopid"))\
        .withColumn("mark",F.get_json_object("goodinfo","$.mark"))\
        .withColumn("soft"
                    ,F.when(F.instr(df['appinfo'],"browsetype")==0
                    ,F.get_json_object("appinfo","$.appid"))
                    .otherwise(F.get_json_object("appinfo","$.browsetype")))\
        .withColumn("soft_version"
                    ,F.when(F.instr(df['info'],"browsetype")==0
                    ,F.get_json_object("appinfo","$.appversion"))
                    .otherwise("appinfo","$.browseversion"))\
        .drop("goodinfo","appinfo")\
        .write.format("hive").mode("overwrite")\
        .saveAsTable("ods_myshops.ods_newlog")
                    有如下数据:32365 MOVE 1577808000000 {"goodid": 478777, "title": "商品478777", "price": "12000", 	"shopid": "1", "mark": "mark"} 6.0.0 android {"browsetype": "chrome", "browseversion": "82,0"}90339 MOVE 1577808008000 {"goodid": 998446, "title": "商品998446", "pri
				
把本地数据导入到Hive from pyspark.sql import SparkSession spark = SparkSession.builder.appName('write_data').getOrCreate() import pyspark.sql.functions as F from pyspark.sql.types import * # Row, StructTyp...
初次写入分区数据 spark写入hive分区时,如果数据事先不存在,可以选择手动建,可以使用以下代码写入数据,会自动创建数据: df = spark.createDataFrame([(1, "alice", "20220412"), (2, "bob", "20220412")], ["id", "name", "date"]) df.show() df.write.format("orc").mode("overwrite").partitionBy("date").saveAsTable("t
spark有3种数据结构——RDD、DataFrame、DataSet。这里展示的文件读写方式,都是针对dataFrame数据结构的,也就是文件读进来之后,是一个spark dataFrame。 1、读写hdfs上的文件 1.1 读写hdfs上的文件 ——> 按照指定文件格式读取与保存 SparkSession在读取文件时,可以指定读取文件的格式。举个例子。 按照csv文件格式,读取文件(其余的文件格式只需将csv变成相应的文件格式名称即可) 【读取】 from pyspark.sql.types i
首先说明,此方案是一个不可行的方案。与导入Mysql数据库不同,Hive数据库不支持记录级数据插入;即使一些版本支持,插入速度也是奇慢。Hive主要优势在于处理批量数据,数据量越大越能体现出性能优势;数据量小,如记录级数据插入,则没有可用性。所以,对于使用python将json数据解析出来再一条条插入的方法肯定是行不通的。方案记录在此,为通过python连接操作Hive数据库等提供一些参考。
总体来说,有两大类方法:  1、将json以字符串的方式整个入Hive,然后使用LATERAL VIEW json_tuple的方法,获取所需要的列名。  2、将json拆成各个字段,入Hive。这将需要使用第三方的SerDe。 第一种方法的的缺点是不能处理复杂类型(如果hive表中字段为array,map等)。 1. 创建 [sql] view
public class MovieJsonParser extends UDF { public String evaluate(String json) throws IOException { ObjectMapper objectMapper = new ObjectMapper...
文章目录1、需求:2、知识点:3、方法1:4、方法2:4.1、伴生类创建KafkaProducer包装器4.2、SparkStreaming消费kafka并写入kafka 1、需求: 使用sparkStreaming对kafka中某topic数据进行数据处理后再重新写入kafka 2、知识点: SparkStreaming连接kafka进行消费 rdd算子写入kafka 伴生类与伴生对象的使用 producerRecord手动序列化 3、方法1: KafkaProducer不可序
java flink是一个高性能的分布式流式计算框架,可以实现大规模的数据处理和分析。而hive是一个分布式数据仓库工具,可以用于存储和查询大规模的结构化数据。redis是一个高性能的内存数据库,可以用于存储和检索数据。 要实现java flink读取hive表中的数据写入redis,我们可以按照以下步骤进行操作: 1. 在java flink中,首先需要配置并连接到hive数据库。可以使用Flink的HiveCatalog来创建一个连接到Hive的catalog,并设置相关的hive metastore地址、用户名和密码等。 2. 根据需要,编写flink程序来读取hive的数据。可以使用flink的DataStream或Table API来读取hive数据,并将其转换为适当的数据流或。 3. 在准备好数据之后,我们可以使用flink的RedisSink来将数据写入redis中。在使用RedisSink之前,需要先引入flink-connector-redis的依赖包,并在flink配置文件中配置好redis的连接参数,如redis的主机地址、端口号、密码等。 4. 编写代码将数据写入redis。可以根据数据的特点,选择将整个数据写入一个redis数据结构中,或者将数据分解为多个key-value对存储到redis中。 5. 在代码编写完成后,我们可以使用flink提供的命令或者IDE工具来运行flink程序,它会自动连接到hive数据库和redis,并完成数据的读取写入。 通过以上步骤,java flink就可以实现读取hive表中的数据并写入redis。这样可以通过flink的分布式计算和hive的数据存储能力,结合redis的高速读写能力,实现大规模数据的处理和查询。
 
推荐文章