准备orders.json文件
{"id":"1", "userId":"1", "userName":"Join", "totalPrice":80.0,"qty":3.0}
{"id":"2", "userId":"1", "userName":"Join", "totalPrice":50.0,"qty":3.0}
{"id":"3", "userId":"2", "userName":"Jeffy", "totalPrice":200.0,"qty":3.0}
{"id":"4", "userId":"99999", "userName":"zombie", "totalPrice":222.0,"qty":3.0}
用agg来将分组函数聚合起来一起查询
示例代码
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;import java.util.HashMap;
import java.util.Map;import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.sum;public class test_28_2 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().config("spark.driver.host", "localhost").appName("GroupApiTest").master("local").getOrCreate();spark.sparkContext().setLogLevel("ERROR");Dataset<Row> ordersDataSet = spark.read().json(Utils.BASE_PATH + "/join/orders.json");ordersDataSet.show();/*+---+---+----------+------+--------+| id|qty|totalPrice|userId|userName|+---+---+----------+------+--------+| 1|3.0| 80.0| 1| Join|| 2|3.0| 50.0| 1| Join|| 3|3.0| 200.0| 2| Jeffy|| 4|3.0| 222.0| 99999| zombie|+---+---+----------+------+--------+*///2: 用agg来将分组函数聚合起来一起查询ordersDataSet.groupBy("userId").agg(avg("totalPrice"),max("totalPrice"),min("totalPrice"),sum("totalPrice")).show();/*+------+---------------+---------------+---------------+---------------+|userId|avg(totalPrice)|max(totalPrice)|min(totalPrice)|sum(totalPrice)|+------+---------------+---------------+---------------+---------------+| 1| 65.0| 80.0| 50.0| 130.0|| 99999| 222.0| 222.0| 222.0| 222.0|| 2| 200.0| 200.0| 200.0| 200.0|+------+---------------+---------------+---------------+---------------+*/Map<String, String> map = new HashMap<>();map.put("totalPrice", "avg");map.put("totalPrice", "max");map.put("totalPrice", "min");map.put("totalPrice", "sum");ordersDataSet.groupBy("userId").agg(map).show();/*+------+---------------+|userId|sum(totalPrice)|+------+---------------+| 1| 130.0|| 99999| 222.0|| 2| 200.0|+------+---------------+*///对整个orders进行聚合计算ordersDataSet.agg(avg("totalPrice"),max("totalPrice"),min("totalPrice"),sum("totalPrice")).show();/*+---------------+---------------+---------------+---------------+|avg(totalPrice)|max(totalPrice)|min(totalPrice)|sum(totalPrice)|+---------------+---------------+---------------+---------------+| 138.0| 222.0| 50.0| 552.0|+---------------+---------------+---------------+---------------+*/ordersDataSet.agg(map).show();/*+---------------+|sum(totalPrice)|+---------------+| 552.0|+---------------+*/}
}