SPARK-SQL 读取外部数据源 json文件的读写

news/2023/9/28 20:04:45

相关的资源文件地址

链接:https://pan.baidu.com/s/1QGQIrVwg56g9eF16ERSLwQ 
提取码:7v8n

json文件读写示例

import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.*;public class test8 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().config("spark.driver.host", "localhost").appName("JsonFileTest").master("local").getOrCreate();spark.sparkContext().setLogLevel("ERROR");//将parquet文件数据转化成json文件数据Dataset<Row> sessionDf = spark.read().parquet(Utils.BASE_PATH + "/trackerSession");sessionDf.show(false);
//+------------------------------------+-------------------+-------+------------+---------+----------------------------------+--------------+-----------+---------------+------------+
//|session_id                          |session_server_time|cookie |cookie_label|ip       |landing_url                       |pageview_count|click_count|domain         |domain_label|
//+------------------------------------+-------------------+-------+------------+---------+----------------------------------+--------------+-----------+---------------+------------+
//|520815c9-bdd4-40c5-9ffa-df491dcd97e1|2017-09-04 12:00:00|cookie1|固执          |127.0.0.3|https://www.baidu.com             |1             |2          |www.baidu.com  |level1      |
//|912a4b47-6984-4763-a704-699ee9724585|2017-09-04 12:45:01|cookie1|固执          |127.0.0.3|https://tieba.baidu.com/index.html|1             |2          |tieba.baidu.com|-           |
//|79534f7c-b4dc-4bc6-b021-c05d5ceb634c|2017-09-04 12:00:01|cookie2|有偏见         |127.0.0.4|https://www.baidu.com             |3             |1          |www.baidu.com  |level1      |
//        +------------------------------------+-------------------+-------+------------+---------+----------------------------------+--------------+-----------+---------------+------------+sessionDf.write().mode(SaveMode.Overwrite).json(Utils.BASE_PATH + "/json");//读取json文件数据Dataset<Row> jsonDF = spark.read().json(Utils.BASE_PATH + "/json");jsonDF.show(false);
//+-----------+-------+------------+---------------+------------+---------+----------------------------------+--------------+------------------------------------+-------------------+
//|click_count|cookie |cookie_label|domain         |domain_label|ip       |landing_url                       |pageview_count|session_id                          |session_server_time|
//+-----------+-------+------------+---------------+------------+---------+----------------------------------+--------------+------------------------------------+-------------------+
//|2          |cookie1|固执          |www.baidu.com  |level1      |127.0.0.3|https://www.baidu.com             |1             |520815c9-bdd4-40c5-9ffa-df491dcd97e1|2017-09-04 12:00:00|
//|2          |cookie1|固执          |tieba.baidu.com|-           |127.0.0.3|https://tieba.baidu.com/index.html|1             |912a4b47-6984-4763-a704-699ee9724585|2017-09-04 12:45:01|
//|1          |cookie2|有偏见         |www.baidu.com  |level1      |127.0.0.4|https://www.baidu.com             |3             |79534f7c-b4dc-4bc6-b021-c05d5ceb634c|2017-09-04 12:00:01|
//+-----------+-------+------------+---------------+------------+---------+----------------------------------+--------------+------------------------------------+-------------------+//可以从JSON Dataset(类型为String)中创建一个DFList<String> jsonList =Arrays.asList("{\"name\":\"Yin\",\"address\":{\"is_old\":true,\"area\":23000.34}}");Dataset<String> jsonDataset = spark.createDataset(jsonList, Encoders.STRING());jsonDataset.show(false);
//        +--------------------------------------------------------+
//        |value                                                   |
//        +--------------------------------------------------------+
//        |{"name":"Yin","address":{"is_old":true,"area":23000.34}}|
//        +--------------------------------------------------------+//通过Dataset<String>读取jsonDataset<Row> jsonDFFromDS = spark.read().json(jsonDataset);jsonDFFromDS.show(false);
//       +---------------+----+
//       |address        |name|
//       +---------------+----+
//       |[23000.34,true]|Yin |
//       +---------------+----+//读参数的设置Map<String, String> readOpts = new HashMap<>();//将所有 原始类型 推断为 字符串类型readOpts.put("primitivesAsString", "true");//忽略JSON记录中的Java / C ++样式注释readOpts.put("allowComments", "true");//通过Dataset<String>读取jsonjsonDFFromDS = spark.read().options(readOpts).json(jsonDataset);jsonDFFromDS.show(false);
//        +---------------+----+
//        |address        |name|
//        +---------------+----+
//        |[23000.34,true]|Yin |
//        +---------------+----+//写参数的设置Map<String, String> writeOpts = new HashMap<>();//保存到文件时使用的压缩编解码器writeOpts.put("compression", "gzip");writeOpts.put("dateFormat", "yyyy/MM/dd");List<StructField> fields = new ArrayList<>();StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);StructField date = DataTypes.createStructField("date", DataTypes.DateType, true);fields.add(name);fields.add(date);StructType customSchema = DataTypes.createStructType(fields);List<String> dateJsonList = Arrays.asList("{'name':'Yin','date':'26/08/2015 18:00'}");Dataset<String> dateJsonDataset = spark.createDataset(dateJsonList, Encoders.STRING());dateJsonDataset.show(false);
//        +----------------------------------------+
//        |value                                   |
//        +----------------------------------------+
//        |{'name':'Yin','date':'26/08/2015 18:00'}|
//        +----------------------------------------+//通过Dataset<String>读取json,同时带有schema信息 和 读取的参数相关Dataset<Row> dateJsonDFFromDS =spark.read().schema(customSchema).option("dateFormat", "dd/MM/yyyy HH:mm").json(dateJsonDataset);dateJsonDFFromDS.write().mode(SaveMode.Overwrite).options(writeOpts).json(Utils.BASE_PATH + "/json_date");spark.read().json(Utils.BASE_PATH + "/json_date").show(false);
//        +----------+----+
//        |date      |name|
//        +----------+----+
//        |2015/08/26|Yin |
//        +----------+----+spark.stop();}
}

 


https://dhexx.cn/news/show-18372.html

相关文章

Nacos整合Open-Feign负载均衡

Nacos整合Open-Feign负载均衡一、前言二、服务调用1、添加依赖2、配置启动类注解3、新建接口用于调用服务4、调用方法5、post方式&#xff08;同理&#xff09;1、被调用方Controller2、调用方Service3、调用方Controller6、总结一、前言 提示&#xff1a;本文讲到的代码部分来…

数学图形(2.25)三维悬链线与悬链面

这一节是将数学图形(1.9)悬链线由2D曲线变换为3D曲线 #http://www.mathcurve.com/surfaces/catenoid/catenoid.shtmlvertices 12000u from (-2*PI) to (PI) v rand2(0, 2*PI)a 10.0x a*ch(u)*cos(v) y a*ch(u)*sin(v) z a*u 上述脚本代码中使用了一个随机数v rand2(0, …

解决cmd运行java程序时报错Error: could not open `C:\Program Files\Java\jre1.8.0_271\lib\amd64\jvm.cfg‘

一、windows使用java命令时报错 当我们安装玩jdk时&#xff0c;系统的默认jdk还是原来那个 因此我们要在cmd中输入&#xff1a; set path"%JAVA_HOME%\bin"来重新定向java路径 之后我们要在path环境变量中添加 %JAVA_HOME%\jre\bin 并放置最上面 之后输入java-vers…

ifame 高度自适应

ifame 高度自适应的原理 其实 就是 先把子页面中的高度求出来 然后更改 ifame 的值 <iframe width"100%" id"iframeid" height"auto" scrolling"none" frameborder"0" src"index.html"></iframe> in…

如何识别一个字符串是否Json格式

如何识别一个字符串是否Json格式 原文:如何识别一个字符串是否Json格式前言&#xff1a;距离上一篇文章&#xff0c;又过去一个多月了&#xff0c;近些时间&#xff0c;工作依旧很忙碌&#xff0c;除了管理方面的事&#xff0c;代码方面主要折腾三个事&#xff1a;1&#xff1a…

分布式系统Sentinel整合Open-Feign限流

分布式系统Sentinel整合Open-Feign限流一、前言二、sentinel控制台1、调用与被调用方引入依赖2、控制台搭建3、启动三、限流配置1、多个微服务接入Sentinel配置四、面板介绍五、基于并发线程数进行限流配置六、降级熔断策略1、熔断实操2、自定义降级异常数据七、Sentinel整合Op…

网关Gateway断⾔+过滤器整合注册中心Nacos项目实战

网关Gateway断⾔过滤器整合注册中心Nacos项目实战一、前言二、网关介绍三、基本网关转发1、创建Gateway项目2、配置四、整合注册中心Nacos1、添加Nacos依赖2、启动类开启支持3、修改配置文件4、网关访问的代码五、Gateway内置断言实现接口定时下线与强制参数六、自定义全局过滤…

Hadoop的体系结构

HDFS和MapReduce是Hadoop的两大核心。而整个Hadoop的体系结构主要是通过HDFS来实现对分布式存储的底层支持的&#xff0c;并且它会通过MapReduce来实现对分布式并行任务处理的程序支持。 1、HDFS的体系结构 HDFS采用了主从&#xff08;Master/Slave&#xff09;结构模型&#x…

Redis分布式锁实现

Redis分布式锁实现一、Redis分布式锁的出现二、普通分布式锁&#xff08;不推荐&#xff09;1、pom依赖2、普通版本的分布式锁3、redis分布式锁保证三、升级版分布式锁1、工具类2、场景一程序运行时间大于锁时间提前结束3、场景二程序运行时间小于锁自动释放时间&#xff0c;触…

Unity NGUI 创建简单的按钮

Unity版本&#xff1a;4.5.1  NGUI版本&#xff1a;3.6.5 注意NGUI版本&#xff0c;网上的大部分教程都是2.x版本的&#xff0c;在步骤上面略有不同&#xff0c;此文适合初学者。 示例&#xff1a; 通过NGUI创建一个背景和按钮。 1.首先创建一个新场景&#xff0c;并保存&…