相关的资源文件地址
链接:https://pan.baidu.com/s/1QGQIrVwg56g9eF16ERSLwQ
提取码:7v8n
toDF算子
个人感觉就是修改schema中列的名称,顺序要一样,不能多,也不能少
parquet 文件读写示例
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class test14 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().config("spark.driver.host", "localhost").appName("ParquetFileTest").master("local").getOrCreate();spark.sparkContext().setLogLevel("ERROR");//1: 将json文件数据转化成parquet文件数据Dataset<Row> df = spark.read().json(Utils.BASE_PATH + "/people.json");df.show();df.write().option("compression", "snappy").mode(SaveMode.Overwrite).parquet(Utils.BASE_PATH + "/parquet");//2: 读取parquet文件Dataset<Row> parquetDF = spark.read().parquet(Utils.BASE_PATH + "/parquet");parquetDF.show();
// +---+-------+
// |age| name|
// +---+-------+
// | 29|Michael|
// | 30| Andy|
// | 19| Justin|
// +---+-------+//3: parquet schema merge//全局设置spark.sql.parquet.mergeSchema = trueDataset<Row> dataset_2 = df.toDF("age", "first_name");dataset_2.show(false);
// +---+-------+
// |age| first_name|
// +---+-------+
// | 29|Michael|
// | 30| Andy|
// | 19| Justin|
// +---+-------+dataset_2.write().mode(SaveMode.Overwrite).parquet(Utils.BASE_PATH + "/parquet_schema_change");//mergeSchema 合并 schema//这里读取了多个目录的parquet文件Dataset<Row> schemaMergeDF =spark.read().option("mergeSchema", "true").parquet(Utils.BASE_PATH + "/parquet",Utils.BASE_PATH + "/parquet_schema_change");schemaMergeDF.show();
// +---+-------+----------+
// |age| name|first_name|
// +---+-------+----------+
// | 29| null| Michael|
// | 30| null| Andy|
// | 19| null| Justin|
// | 29|Michael| null|
// | 30| Andy| null|
// | 19| Justin| null|
// +---+-------+----------+spark.stop();}
}