Spring Batch 高级篇-多线程步骤

news2024/9/25 3:19:29

目录

引言

概念

 案例

转视频版


引言

接着上篇:Spring Batch ItemWriter组件,了解Spring Batch ItemWriter处理组件后,接下来一起学习一下Spring Batch 高级功能-多线程步骤

概念

默认的情况下,步骤基本上在单线程中执行,那能不能在多线程环境执行呢?答案肯定是yes,但是也要注意,多线程环境步骤执行一定要慎重。原因:多线程环境下,步骤是要设置不可重启

Spring Batch 的多线程步骤是使用Spring 的 TaskExecutor(任务执行器)实现的。约定每一个块开启一个线程独立执行。

 案例

需求:分5个块处理user-thread.txt文件

1>编写user-thread.txt文件

1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
6#zhangsan#14
7#lisi#13
8#wangwu#12
9#zhaoliu#11
10#qianqi#10

2>定义实体对象

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}

3>完整代码

package com.langfeiyes.batch._35_step_thread;


import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.util.List;

@SpringBootApplication
@EnableBatchProcessing
public class ThreadStepJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public FlatFileItemReader<User> userItemReader(){

        System.out.println(Thread.currentThread());

        FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .saveState(false) //防止状态被覆盖
                .resource(new ClassPathResource("user-thread.txt"))
                .delimited().delimiter("#")
                .names("id", "name", "age")
                .targetType(User.class)
                .build();

        return reader;
    }

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step step(){
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(2)
                .reader(userItemReader())
                .writer(itemWriter())
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();

    }

    @Bean
    public Job job(){
        return jobBuilderFactory.get("thread-step-job")
                .start(step())
                .build();
    }
    public static void main(String[] args) {
        SpringApplication.run(ThreadStepJob.class, args);
    }
}

4>结果

User(id=2, name=xiaofei, age=16)
User(id=5, name=feifei, age=15)
User(id=4, name=zhongfei, age=19)
User(id=7, name=lisi, age=13)
User(id=1, name=dafei, age=18)
User(id=6, name=zhangsan, age=14)
User(id=3, name=laofei, age=20)
User(id=8, name=wangwu, age=12)
User(id=9, name=zhaoliu, age=11)
User(id=10, name=qianqi, age=10)

解析

1:userItemReader() 加上saveState(false) Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。

2:step() 方法加上.taskExecutor(new SimpleAsyncTaskExecutor()) 为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。

到这,本篇就结束了,欲知后事如何,请听下回分解~

转视频版

看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/367538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springBoot使用ShardingJDBC实现分表

ShardingSphere的介绍 ShardingSphere是一款起源于当当网内部的应用框架。2015年在当当网内部诞 生&#xff0c;最初就叫ShardingJDBC。2016年的时候&#xff0c;由其中一个主要的开发人员张亮&#xff0c; 带入到京东数科&#xff0c;组件团队继续开发。在国内历经了当当网、…

链动2+1系统|购买三单就能迅速回本,链动2+1模式到底有多暴利?

链动21模式号称起步创业无泡沫&#xff0c;半个月就能盈利上百万&#xff0c;用户裂变速度更是让人瞠目结舌。那么&#xff0c;链动21模式到底有多暴利&#xff1f;其实链动21模式最关键的&#xff0c;是合理的利润分配和奖励机制&#xff0c;让消费者在购物的同时&#xff0c;…

【解决报错】‘jupyter‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件

在当前路径下使用cmd打开后&#xff0c;输入jupyter notebook出现如下错误&#xff1a; 通常可能出现的问题有两种&#xff1a; &#xff08;1&#xff09;你本身就没安装jupyter&#xff0c;如果你配置了anaconda&#xff0c;就自带jupyter&#xff0c;直接跳到问题2。如果确…

Confluence主页面更新记录停留在去年,搜索也只能搜索去年之前的数据问题解决方案

问题描述 Confluence主页最近更新页面不更新了&#xff0c;停留在之前的时间段。其次搜索也只能搜索出来停留在这个时间段之前的数据。 核心原因 索引出现问题了&#xff0c;重建索引即可。 解决办法 直接重启Confluence。 重启Confluence的姿势 描述一下我解决思路&…

28-vuex

vuex 一、vuex 专门在vue中实现集中式状态&#xff08;数据&#xff09;管理的一个Vue插件&#xff0c;对Vue应用中多个组件的共享状态进行集中式的管理&#xff08;读/写&#xff09;&#xff0c;也是一种组件间通信的方式&#xff0c;且适用于任意组件间通信。 使用场景&a…

Java 【数据结构OJ题十道】—— 二叉树篇1

文章目录一、 检查两棵二叉树是否相同二、 另一棵二叉树的子树三、 二叉树的构建及遍历四、序列化二叉树和反序列化二叉树(难)五、二叉树创建字符串六、 二叉树前序非递归遍历实现七、 二叉树中序非递归遍历实现八、 二叉树后序非递归遍历实现九、二叉搜索树中找到两个结点的最…

如何将电脑文件备份到百度网盘

如何将电脑文件备份到百度网盘&#xff1f;说到文件备份&#xff0c;很多小伙伴会将电脑文件备份到移动硬盘或者U盘里&#xff0c;移动硬盘和U盘是比较常见的存储介质&#xff0c;使用和携带起来也是非常方便&#xff0c;因此深受大家的喜欢。除此之外&#xff0c;大家可能还忽…

2023年,IT互联网还有发展前景吗?

不得不说&#xff0c;互联网在整个社会经济发展中扮演着不可或缺的角色&#xff1b;不仅自身的技术具有前沿性&#xff0c;也推动着其他行业进入数字化经济时代&#xff0c;让我们的工作生活变得更加便捷。 在“互联网”时代&#xff0c;每个服务行业都会利用大数据&#xff0…

将自带记事本替换为Notepad2【中文版,带替换文件】

Notepad2是我在寻找一个合适的代码浏览工具的时候发现的&#xff0c;当需要一个用来浏览代码的文本编辑器时候&#xff0c;需要体积小&#xff0c;速度快&#xff0c;语法高亮&#xff0c;解释度高&#xff0c;VsCode作为生产环境已经不适合作为浏览工具了。了解到Notepad2&…

《动手学习深度学习》笔记(二)线性神经网络

三、线性神经网络 3.1 线性回归 3.1.1 介绍 1. 回归是为一个或多个自变量与因变量之间的关系建模的一类方法。而线性回归基于几个简单的假设&#xff1a;① 自变量和因变量关系是线性的&#xff1b;② 允许包含噪声但是噪声遵循正态分布。   2. 训练数据集/训练集&#xff…

算法训练营 day53 动态规划 买卖股票的最佳时机系列2

算法训练营 day53 动态规划 买卖股票的最佳时机系列2 买卖股票的最佳时机III 123. 买卖股票的最佳时机 III - 力扣&#xff08;LeetCode&#xff09; 给定一个数组&#xff0c;它的第 i 个元素是一支给定的股票在第 i 天的价格。 设计一个算法来计算你所能获取的最大利润。…

软件项目管理知识回顾---网络图

网络图 9.网络图 9.1简介 1.分类 AOA&#xff0c;双代号&#xff0c;ADMAON,PDM&#xff0c;单代号&#xff0c;前导图2.活动的逻辑管理 头到头/尾&#xff0c;尾到头/尾 依赖关系 3.工序 紧前紧后9.2绘制规则 1.两个节点只能一条线。不能是平行线。平行的话就不知道是哪个活动…

LeetCode-93. 复原 IP 地址

目录题目思路回溯法题目来源 93. 复原 IP 地址 题目思路 意识到这是切割问题&#xff0c;切割问题就可以使用回溯搜索法把所有可能性搜出来&#xff0c;和131.分割回文串就十分类似了。 回溯法 1.递归参数 startIndex一定是需要的&#xff0c;因为不能重复分割&#xff0c…

【GeoDjango框架解析】读取矢量数据写入postgis数据库

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 【GeoDjango框架解析】读取矢量数据写入postgis数据库 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录系列…

关于iframe一些通讯的记录(可适用工作流审批,文中有项目实践,欢迎咨询)

一.知识点(1).我们可以通过postMessage(发送方)和onmessage(接收方)这两个HTML5的方法, 来解决跨页面通信问题&#xff0c;或者通过iframe嵌套的不同页面之间的通信a.父页面代码如下<div v-if"src" class"iframe"><iframeref"iframe"id…

Kafka进阶篇-消费者详解Flume消费Kafka原理

简介 由于挺多时候如果不太熟系kafka消费者详细的话&#xff0c;很容易产生问题&#xff0c;所有剖析一定的原理很重要。 Kafka消费者图解 消费方式 消费者总体工作流程 消费者组初始化流程 消费者详细消费流程 消费者重要参数 bootstrap.servers 向 Kafka 集群建立初…

Jackson使用进阶

实现 注解 属性命名 JsonProperty 定义属性序列化时的名称。 JacksonAnnotation public interface JsonProperty {public final static String USE_DEFAULT_NAME "";public final static int INDEX_UNKNOWN -1;//指定属性的名称。String value() default USE_…

2022IDEA搭建springMvc项目

springmvc项目搭建一. 创建maven项目二. Add Framework Support三. 添加依赖并配置maven四. 配置前端控制器DispatcherServlet五. 配置SpringMVC.XML文件六. 创建controller类七. 创建index.html页面八. 查看jar包是否添加九. 配置tomcat&#xff08;重&#xff09;十. springm…

Kafka(7):生产者详解

1 消息发送 1.1 Kafka Java客户端数据生产流程解析 1 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。 2 调用send() 方法进行消息发送。 3 因为消息要到网络上进行传输…

国产蓝牙耳机什么便宜又好用?学生党平价蓝牙耳机推荐

蓝牙耳机凭借近几年的快速发展&#xff0c;越来越多的品牌、款式出现在人们的日常生活当中。最近看到很多人问&#xff0c;国产蓝牙耳机什么便宜又好用&#xff1f;针对这个问题&#xff0c;我来给大家推荐几款平价蓝牙耳机&#xff0c;很适合学生党&#xff0c;一起来看看吧。…