SpringBatch从入门到实战(六):ItemReader

news2025/1/3 15:24:36

一:ListItemReader

用于简单的开发测试。

@Bean
public ItemReader<String> listItemReader() {
    return new ListItemReader<>(Arrays.asList("a", "b", "c"));
}

二:FlatFileItemReader

1.1 完全映射

当文件里的字段值和实体类的属性完全一样时,可以直接使用targetType(Class)来完成映射。常用的分割符如逗号, “\u001B” 表示ESC,

1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,lisi,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
@Getter
@Setter
@ToString
public class UserInfo {
    private Long id;
    private String username;
    private Integer age;
    private String city;
    private String area;
}
@Configuration
public class HelloWorldChunkJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job helloWorldChunkJob() {
        return jobBuilderFactory.get("helloWorldChunkJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<UserInfo, UserInfo>chunk(3)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemReader<UserInfo> itemReader() {
        return new FlatFileItemReaderBuilder<UserInfo>()
                .encoding("UTF-8")
                .name("userItemReader")
                .resource(new ClassPathResource("static/user.csv"))
                //.resource(new PathResource("/a/b/c/user.csv"))
                .delimited().delimiter(",")
                .names("id", "username", "age", "city", "area")
                .targetType(UserInfo.class)
                .build();
    }


    @Bean
    public ItemWriter<UserInfo> itemWriter() {
        return new ItemWriter<UserInfo>() {
            @Override
            public void write(List<? extends UserInfo> items) throws Exception {
                System.out.println("itemWriter=" + items);
            }
        };
    }
}

1.2 自定字段映射 fieldSetMapper

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String username;
    private Integer age;
    private String address;
}
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<UserInfo, User>chunk(3)
            .reader(itemReader())
            .writer(itemWriter())
            .build();
}

@Bean
public ItemReader<UserInfo> itemReader() {
    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            .resource(new ClassPathResource("static/user.csv"))
            //.resource(new PathResource("/a/b/c/user.csv"))
            .delimited().delimiter(",")
            .names("id", "username", "age", "city", "area")
            .fieldSetMapper(fieldSetMapper())
            .build();
}

@Bean
public FieldSetMapper fieldSetMapper() {
    return new UserFieldSetMapper();
}
public class UserFieldSetMapper implements FieldSetMapper<User> {

    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setUsername(fieldSet.readString("username"));
        user.setAge(fieldSet.readInt("age"));
        // 字段处理
        user.setAddress(fieldSet.readString("city") + fieldSet.readString("area"));

        return user;
    }
}

1.3 行映射 lineMapper

public ItemReader<UserInfo> itemReader() {
    //ESC
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("\u001B");
    // 行结束标志0
    tokenizer.setQuoteCharacter('\u001A');
    tokenizer.setFieldSetFactory(new DefaultFieldSetFactory());
    tokenizer.setNames("id", "username", "age", "city", "area");

    BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(UserInfo.class);

    DefaultLineMapper<UserInfo> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);

    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            .resource(new ClassPathResource("static/user.dat"))
            .lineMapper(lineMapper)
            .build();
}

三:JsonItemReader

[
  {"id":1, "username":"a", "age":18},
  {"id":2, "username":"b", "age":17},
  {"id":3, "username":"c", "age":16},
  {"id":4, "username":"d", "age":15},
  {"id":5, "username":"e", "age":14}
]
@Bean
public JsonItemReader<UserInfo> itemReader() {
    ObjectMapper objectMapper = new ObjectMapper();
    JacksonJsonObjectReader<UserInfo> jsonObjectReader = new JacksonJsonObjectReader<>(UserInfo.class);
    jsonObjectReader.setMapper(objectMapper);

    return new JsonItemReaderBuilder<UserInfo>()
            .name("jsonItemReader")
            .resource(new ClassPathResource("static/user.json"))
            .jsonObjectReader(jsonObjectReader)
            .build();
}

四:数据库

3.1 JdbcCursorItemReader

游标一次读一条。
在这里插入图片描述

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}
public class UserRowMapper implements RowMapper<User> {
    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        User user = new User();
        user.setId(rs.getLong("id"));
        user.setName(rs.getString("name"));
        user.setAge(rs.getInt("age"));
        return user;
    }
}
@Configuration
public class CursorDBReaderJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public UserRowMapper userRowMapper(){
        return new UserRowMapper();
    }

    @Bean
    public JdbcCursorItemReader<User> userItemReader(){

        return new JdbcCursorItemReaderBuilder<User>()
                .name("userCursorItemReader")
                .dataSource(dataSource)
                .sql("select * from user where age > ?")
                .rowMapper(userRowMapper())
                //拼接参数
        		.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
                .build();
	}

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step step(){
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(1)
                .reader(userItemReader())
                .writer(itemWriter())
                .build();

    }

    @Bean
    public Job job(){
        return jobBuilderFactory.get("cursor-db-reader-job")
                .start(step())
                .build();
    }
}

3.2 JdbcPagingItemReader 分页

一次性读一页。

在这里插入图片描述

@Configuration
public class PageDBReaderJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public UserRowMapper userRowMapper(){
        return new UserRowMapper();
    }


    @Bean
    public PagingQueryProvider pagingQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.setSelectClause("select *");   //查询列
        factoryBean.setFromClause("from user");    //查询的表
        factoryBean.setWhereClause("where age > :age"); //where 条件
        factoryBean.setSortKey("id");   //结果排序
        return factoryBean.getObject();
    }

    @Bean
    public JdbcPagingItemReader<User> userItemReader() throws Exception {
        HashMap<String, Object> param = new HashMap<>();
        param.put("age", 16);
        return new JdbcPagingItemReaderBuilder<User>()
                .name("userPagingItemReader")
                .dataSource(dataSource)  //数据源
                .queryProvider(pagingQueryProvider())  //分页逻辑
                .parameterValues(param)   //条件
                .pageSize(10) //每页显示条数
                .rowMapper(userRowMapper())  //映射规则
                .build();
    }

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(1)
                .reader(userItemReader())
                .writer(itemWriter())
                .build();

    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("page-db-reader-job1")
                .start(step())
                .build();
    }
}


3.3 MyBatisPagingItemReader

@Configuration
public class HelloWorldChunkJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    private static int PAGE_SIZE = 3;


    @Bean
    public Job helloWorldChunkJob() {
        return jobBuilderFactory.get("helloWorldChunkJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(PAGE_SIZE)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public MyBatisPagingItemReader<User> itemReader() {
        Map<String, Object> map = new HashMap<>();
        map.put("id", 1);

        MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();
        itemReader.setSqlSessionFactory(sqlSessionFactory);
        itemReader.setQueryId("com.example.batch.mapper.UserMapper.selectUserList");
        itemReader.setPageSize(PAGE_SIZE);
        itemReader.setParameterValues(map);
        return itemReader;
    }

    @Bean
    public ItemWriter<User> itemWriter() {
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                System.out.println("itemWriter=" + items);
            }
        };
    }
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.batch.mapper.UserMapper">
    <select id="selectUserList" resultType="com.example.batch.entity.User">
        select * from tbl_user
        where id > #{id}
        limit #{_pagesize} offset #{_skiprows}
    </select>

</mapper>

五:多线程读

  • userItemReader() 加上saveState(false) Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。

  • step() 方法加上 .taskExecutor(new SimpleAsyncTaskExecutor()) 为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。

@Bean
public FlatFileItemReader<User> userItemReader(){
    System.out.println(Thread.currentThread());

    FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>()
            .name("userItemReader")
            .saveState(false) //防止状态被覆盖
            .resource(new ClassPathResource("static/user.csv"))
            .delimited().delimiter("#")
            .names("id", "username", "age")
            .targetType(User.class)
            .build();

    return reader;
}

@Bean
public Step step(){
    return stepBuilderFactory.get("step1")
            .<User, User>chunk(2)
            .reader(userItemReader())
            .writer(itemWriter())
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .build();

}

六:多步骤并行执行

@Bean
public Job parallelJob(){

    //线程1-读user-parallel.txt
    Flow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1")
            .start(flatStep())
            .build();

    //线程2-读user-parallel.json
    Flow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2")
            .start(jsonStep())
            .split(new SimpleAsyncTaskExecutor())
            .add(parallelFlow1)
            .build();


    return jobBuilderFactory.get("parallel-step-job")
            .start(parallelFlow2)
            .end()
            .build();
}

parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用**.split(new SimpleAsyncTaskExecutor())** 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。

七:异常处理

方式一:设置跳过异常次数

@Bean
public Step step() throws Exception {
    return stepBuilderFactory.get("step1")
        .<User, User>chunk(1)
        .reader(userItemReader())
        .writer(itemWriter())
        .faultTolerant() //容错
        .skip(Exception.class)  //跳过啥异常
        .noSkip(RuntimeException.class)  //不能跳过啥异常
        .skipLimit(10)  //跳过异常次数
        .throttleLimit(10)
        .skipPolicy(new SkipPolicy() {
            @Override
            public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
                //定制跳过异常与异常次数
                return false;
            }
        })
        .build();

}

方式二:记录错误信息

public class ErrorItemReaderListener implements ItemReadListener {
    @Override
    public void beforeRead() {

    }

    @Override
    public void afterRead(Object item) {

    }

    @Override
    public void onReadError(Exception ex) {
        System.out.println("记录读数据相关信息...");
    }
}


方式三:直接跳过不处理

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<User, User>chunk(PAGE_SIZE)
            .reader(itemReader())
            .writer(itemWriter())
            .faultTolerant()
            .skip(Exception.class)
            .build();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/645060.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

万物云原生下的服务进化 | 京东云技术团队

导读&#xff1a; 在万物云原生下的环境下&#xff0c;Java的市场份额也因耗资源、启动慢等缺点&#xff0c;导致在云原生环境里被放大而降低&#xff0c;通过这篇文章&#xff0c;读者可以更好地了解如何在云原生环境下通过升级相关版本和使用GraalVM打出原生镜像到方式&…

Linux之特殊权限

目录 Linux之特殊权限 SUID 定义 案例 原因 查找真个系统的SUID/SGID文件 SGID 定义&#xff1a; Sticky Bit 案例 设置文件和目录的特殊权限 方法一 使用 chmod命令 方法二 使用数字形式的权限模式 设置新建文件或目录的默认权限 设置修改文件的扩展性 设置文件…

MySQL连接查询——外连接

内连接查询顺序 首先看student和exam表的内容&#xff1a; 然后执行如下内连接查询&#xff1a; explain select a.*,b.* from student a inner join exam b on a.uidb.uid;查询计划如下 由于a表记录数量少为小表做全表扫描&#xff08;rows为6&#xff09;&#xff0c;然后到…

微信小程序标签知识点总结

View 标签 <scroll-view class"scroll_list" scroll-x"true"> 标签 设置 scroll-x/y是可以设置 滚动模式到底是x方向还是Y方向 &#xff08; 需要调整样式&#xff0c;请参考如下 .scroll_list{ border: 1px solid red; width: 240px; white-sp…

算法学习day21

文章目录 530.二叉搜索树的最小绝对差递归 501.二叉搜索树中的众数递归 236.二叉树的最近公共祖先递归 总结 530.二叉搜索树的最小绝对差 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值…

管理类联考——逻辑——知识篇——题型说明

管理类联考基础逻辑—逻辑规划 一、联考中逻辑部分的重要性 管理类综合能力测试的数学、逻辑、写作三个部分中&#xff0c;逻辑是毫无疑问最重要的一部分&#xff0c;体现在以下三个方面&#xff1a; 1、时间分配&#xff1a;逻辑部分的阅读量相当大&#xff0c;30道题的阅读…

2023史上最全java面试题题库大全800题含答案

如果你不停地加班。却很少冒险&#xff0c;也很少学习&#xff0c;那你极大可能会陷入到内卷中。 为什么这么说呢&#xff1f;我们先来捋清楚「内卷」的概念&#xff1a; 「内卷化」简而言之就是&#xff1a;日复一日&#xff0c;越混越掉坑里。 所谓内卷化&#xff0c;指一种社…

HTTP 网络通讯过程

1.知识点&#xff1a; 在计算机网络中&#xff0c;通信协议用于规范数据传输的格式和传送方式。下面是常见的网络通信协议&#xff1a; HTTP协议&#xff1a;超文本传输协议&#xff0c;用于在Web浏览器和Web服务器之间传输HTML文件和其他资源。 HTTPS协议&#xff1a;HTTP安…

《水经注地图服务》如何快速发布经纬度DAT缓存

概述 《水经注地图服务》的快速发布功能是一个能够帮助用户实现快速发布地图服务的功能&#xff0c;并且提供常规情况下大多数用户所需的默认配置&#xff0c;让用户在发布地图时更加便捷。 这里为大家演示如何快速发布经纬度DAT缓存以及如何在水经微图中加载。 准备工作 离…

2023-6-14-第五式原型模式

&#x1f37f;*★,*:.☆(&#xffe3;▽&#xffe3;)/$:*.★* &#x1f37f; &#x1f4a5;&#x1f4a5;&#x1f4a5;欢迎来到&#x1f91e;汤姆&#x1f91e;的csdn博文&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f49f;&#x1f49f;喜欢的朋友可以关注一下&#xf…

IDEA中类模板注释和方法注释模板

类注释 /* *ClassName ${NAME} *Author --你的别名 *Description //TODO *Date ${DATE} ${TIME} *Version 1.0 **/ 把上面的代码粘贴到settings-editor-file and code templates下的class的“public class”和#parese的中间 2.方法注释 /* *Author --你的别名 *Description …

ZK+麦克风:反AI音频认证

1. 引言 当前&#xff0c;已越来越难以区分AI生成的音频与人类的声音。可能带来欺诈、身份盗用以及其它滥用问题。 在AI生成的音频可以完美模仿人声的当前环境中&#xff0c;需要一个可靠的信任链——从最初的音频捕获到最终的播放。这种信任链可以使用加密技术建立&#xff…

Java实训日志04

文章目录 八、项目开发实现步骤&#xff08;六&#xff09;创建数据访问接口1、创建学校数据访问接口2、创建状态数据访问接口3、创建学生数据访问接口4、创建用户数据访问接口 八、项目开发实现步骤 &#xff08;六&#xff09;创建数据访问接口 DAO: Data Acess Object - 数…

antd-vue - - - - - upload组件的使用

upload组件的使用 参数说明&#xff1a; file:list : 上传列表数据 name: 上传时的key data: 上传时额外的参数 header: 上传列表数据 actions: 上传接口地址 before-upload: 上传之前的回掉 change: 传文件改变时 // 变量定义updateData: { billId: "", fileType: &…

Flutter 自定义Grade组件

/*** images 图片数组* titles title数组* length_w 一行的数量* length_h 行数*/static Widget getMenuGrade(List<String> images, List<String> titles, int length_w, int length_h) {int startIndex 0;List<Widget> rowList [];List<List<Widge…

docker-compose服务名称和容器名称区别

需求及结论 在docker-compose文件一般书写如下&#xff1a; version: 3.8 x-logging: &default-loggingoptions:max-size: "200m"max-file: "5"driver: json-file x-environment: &default-environmentTZ: Asia/ShanghaiLANG: C.UTF-8 services:…

前端什么最难学?

前言 个人认为是JS&#xff0c;无论是在平时的项目或者找工作时候JS都是大头&#xff0c;相比起其他的部分&#xff0c;它相对而言是难一点&#xff0c;同时也是十分重要的一部分&#xff0c;学好原生JS&#xff0c;后续的学习才能基于此循序渐进&#xff0c;下面是我总结的关…

(2023 最新版)大厂面试必问的1000道Java面试题附答案详解

很多 Java 工程师的技术不错&#xff0c;但是一面试就头疼&#xff0c;10 次面试 9 次都是被刷&#xff0c;过的那次还是去了家不知名的小公司。 问题就在于&#xff1a;面试有技巧&#xff0c;而你不会把自己的能力表达给面试官。 应届生&#xff1a;你该如何准备简历&#x…

报表生成器FastReport .Net用户指南:“SVG”对象

FastReport .Net是一款全功能的Windows Forms、ASP.NET和MVC报表分析解决方案&#xff0c;使用FastReport .NET可以创建独立于应用程序的.NET报表&#xff0c;同时FastReport .Net支持中文、英语等14种语言&#xff0c;可以让你的产品保证真正的国际性。 FastReport.NET官方版…

数据库|TiDB灾备切换实践-部署

刘昊 | 数据库工程师 最近手头有个系统&#xff0c;有需要搭建灾备库的需求&#xff08;RTO要求4小时内&#xff0c;根据实际情况计算&#xff09;。考虑到生产系统是5版本&#xff0c;TiCDC存在一些兼容性问题&#xff0c;且TiDB Binlog已经有实践案例及经验可供参考&#xf…