Spring Batch(4)——Item概念及使用代码
在 批处理概念 中介绍一个标准的批处理分为 Job 和 Step。本文将结合代码介绍在Step
中Reader
、Processor
、Writer
的实际使用。
Reader
Reader
是指从各种各样的外部输入中获取数据,框架为获取各种类型的文件已经预定义了常规的Reader
实现类。Reader
通过ItemReader
接口实现:
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
read
方法的作用就是读取一条数据,数据以泛型T的实体结构返回,当read返回null时表示所有数据读取完毕。返回的数据可以是任何结构,比如文件中的一行字符串,数据库的一行数据,或者xml文件中的一系列元素,只要是一个Java对象即可。
Writer
Writer
通过ItemWriter
接口实现:
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
Writer
是Reader
的反向操作,是将数据写入到特定的数据源中。在Step控制一文已经介绍Writer
是根据chunk
属性设定的值按列表进行操作的,所以传入的是一个List
结构。chunk
用于表示批处理的事物分片,因此需要注意的是,在writer
方法中进行完整数据写入事物操作。例如向数据库写入List
中的数据,在写入完成之后再提交事物。
读写的组合模式
无论是读还是写,有时会需要从多个不同的来源获取文件,或者写入到不同的数据源,或者是需要在读和写之间处理一些业务。可以使用组合模式来实现这个目的:
public class CompositeItemWriter<T> implements ItemWriter<T> { ItemWriter<T> itemWriter; public CompositeItemWriter(ItemWriter<T> itemWriter) { this.itemWriter = itemWriter; } public void write(List<? extends T> items) throws Exception { //Add business logic here itemWriter.write(items); } public void setDelegate(ItemWriter<T> itemWriter){ this.itemWriter = itemWriter; } }
Processor
除了使用组合模式,直接使用Processor
是一种更优雅的方法。Processor
是Step
中的可选项,但是批处理大部分时候都需要对数据进行处理,因此框架提供了ItemProcessor
接口来满足Processor
过程:
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
Processor
的结构非常简单也是否易于理解。传入一个类型I,然后由Processor
处理成为O。
Processor链
在一个Step中可以使用多个Processor
来按照顺序处理业务,此时同样可以使用CompositeItem
模式来实现:
@Bean public CompositeItemProcessor compositeProcessor() { //创建 CompositeItemProcessor CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>(); List itemProcessors = new ArrayList(); //添加第一个 Processor itemProcessors.add(new FooTransformer()); //添加第二个 Processor itemProcessors.add(new BarTransformer()); //添加链表 compositeProcessor.setDelegates(itemProcessors); return processor; }
过滤记录
在Reader
读取数据的过程中,并不是所有的数据都可以使用,此时Processor
还可以用于过滤非必要的数据,同时不会影响Step
的处理过程。只要ItemProcesspr
的实现类在procss
方法中返回null
即表示改行数据被过滤掉了。
ItemStream
在Step控制一文中已经提到了ItemStream
。在数据批处理概念中提到过,Spring Batch的每一步都是无状态的,进而Reader
和Writer
也是无状态的,这种方式能够很好的隔离每行数据的处理,也能将容错的范围收窄到可以空子的范围。但是这并不意味着整个批处理的过程中并不需要控制状态。例如从数据库持续读入或写入数据,每次Reader
和Writer
都单独去申请数据源的链接、维护数据源的状态(打开、关闭等)。因此框架提供了ItemStream
接口来完善这些操作:
public interface ItemStream { void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() throws ItemStreamException; }
持久化数据
在使用Spring Batch之前需要初始化他的元数据存储(Meta-Data Schema),也就是要将需要用到的表导入到对应的数据库中。当然,Spring Batch支持不使用任何持久化数据库,仅仅将数据放到内存中,不设置DataSource
即可。
初始化序列
Spring Batch相关的工作需要使用序列SEQUENCE
:
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_SEQ;
有些数据库不支持SEQUENCE
,可以通过表代理,比如在MySql(InnoDB数据库)中:
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL); INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0); CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL); INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0); CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL); INSERT INTO BATCH_JOB_SEQ values(0);
关于Version字段
某些表中都有Version
字段。因为Spring的更新策略是乐观锁,因此在进行数据更新之后都会对表的Version
字段进行+1处理。在内存与数据库交互的过程中,会使用采用getVersion、increaseVersion(+1)、updateDataAndVersion的过程,如果在update
的时候发现Version不是预计的数值(+1),则会抛出OptimisticLockingFailureException
的异常。当同一个Job
在进群中不同服务上执行时,需要注意这个问题。
BATCH_JOB_INSTANCE
BATCH_JOB_INSTANCE
用于记录JobInstance,在数据批处理概念中介绍了他的工作方式,其结构为:
CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT PRIMARY KEY , VERSION BIGINT, JOB_NAME VARCHAR(100) NOT NULL , JOB_KEY VARCHAR(2500) );
字段 | 说明 |
---|---|
JOB_INSTANCE_ID | 主键,主键与单个JobInstance 相关。当获取到某个JobInstance 实例后,通过getId 方法可以获取到此数据 |
VERSION | |
JOB_NAME | Job的名称,用于标记运行的Job,在创建Job时候指定 |
JOB_KEY | JobParameters的序列化数值。在数据批处理概念中介绍了一个JobInstance 相当于Job+JobParameters。他用于标记同一个Job 不同的实例 |
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_EXECUTION_PARAMS
对应的是JobParameters
对象。其核心功能是存储Key-Value结构的各种状态数值。字段中IDENTIFYING=true
用于标记那些运行过程中必须的数据(可以理解是框架需要用到的数据),为了存储key-value结构该表一个列数据格式:
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , TYPE_CD VARCHAR(6) NOT NULL , KEY_NAME VARCHAR(100) NOT NULL , STRING_VAL VARCHAR(250) , DATE_VAL DATETIME DEFAULT NULL , LONG_VAL BIGINT , DOUBLE_VAL DOUBLE PRECISION , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) );
字段 | 说明 |
---|---|
JOB_EXECUTION_ID | 与BATCH_JOB_EXECUTION表关联的外键,详见数据批处理概念中Job、JobInstance、JobExecute的关系 |
TYPE_CD | 用于标记数据的对象类型,例如 string、date、long、double,非空 |
KEY_NAME | key的值 |
STRING_VAL | string类型的数值 |
DATE_VAL | date类型的数值 |
LONG_VAL | long类型的数值 |
DOUBLE_VAL | double类型的数值 |
IDENTIFYING | 标记这对key-valuse是否来自于JobInstace自身 |
BATCH_JOB_EXECUTION
关联JobExecution
,每当运行一个Job
都会产生一个新的JobExecution
,对应的在表中都会新增一行数据。
CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT PRIMARY KEY , VERSION BIGINT, JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP NOT NULL, START_TIME TIMESTAMP DEFAULT NULL, END_TIME TIMESTAMP DEFAULT NULL, STATUS VARCHAR(10), EXIT_CODE VARCHAR(20), EXIT_MESSAGE VARCHAR(2500), LAST_UPDATED TIMESTAMP, JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL, constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ;
字段 | 说明 |
---|---|
JOB_EXECUTION_ID | JobExecution的主键,JobExecution::getId方法可以获取到该值 |
VERSION | |
JOB_INSTANCE_ID | 关联到JobInstace的外键,详见数据批处理概念中Job、JobInstance、JobExecute的关系 |
CREATE_TIME | 创建时间戳 |
START_TIME | 开始时间戳 |
END_TIME | 结束时间戳,无论成功或失败都会更新这一项数据。如果某行数据该值为空表示运行期间出现错误,并且框架无法更新该值 |
STATUS | JobExecute的运行状态:COMPLETED、STARTED或者其他状态。此数值对应Java中BatchStatus枚举值 |
EXIT_CODE | JobExecute执行完毕之后的退出返回值 |
EXIT_MESSAGE | JobExecute退出的详细内容,如果是异常退出可能会包括异常堆栈的内容 |
LAST_UPDATED | 最后一次更新的时间戳 |
BATCH_STEP_EXECUTION
该表对应的是StepExecution
,其结构和BATCH_JOB_EXECUTION
基本相似,只是对应的对象是Step
,增加了与之相对的一些字段数值:
CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, START_TIME TIMESTAMP NOT NULL , END_TIME TIMESTAMP DEFAULT NULL, STATUS VARCHAR(10), COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(20) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP, constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ;
未填入内容部分见BATCH_JOB_EXECUTION
说明。
字段 | 说明 |
---|---|
STEP_EXECUTION_ID | StepExecute对应的主键 |
VERSION | |
STEP_NAME | Step名称 |
JOB_EXECUTION_ID | 关联到BATCH_JOB_EXECUTION表的外键,标记该StepExecute所属的JobExecute |
START_TIME | |
END_TIME | |
STATUS | |
COMMIT_COUNT | 执行过程中,事物提交的次数,该值与数据的规模以及chunk的设置有关 |
READ_COUNT | 读取数据的次数 |
FILTER_COUNT | Processor中过滤记录的次数 |
WRITE_COUNT | 吸入数据的次数 |
READ_SKIP_COUNT | 读数据的跳过次数 |
WRITE_SKIP_COUNT | 写数据的跳过次数 |
PROCESS_SKIP_COUNT | Processor跳过的次数 |
ROLLBACK_COUNT | 回滚的次数 |
EXIT_CODE | |
EXIT_MESSAGE | |
LAST_UPDATED |
BATCH_JOB_EXECUTION_CONTEXT
该表会记录所有与Job
相关的ExecutionContext
信息。每个ExecutionContext
都对应一个JobExecution
,在运行的过程中它包含了所有Job
范畴的状态数据,这些数据在执行失败后对于后续处理有中重大意义。
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT CLOB, constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ;
字段 | 说明 |
---|---|
JOB_EXECUTION_ID | 关联到JobExecution的外键,建立JobExecution和ExecutionContext的关系。 |
SHORT_CONTEXT | 标记SERIALIZED_CONTEXT的版本号 |
SERIALIZED_CONTEXT | 序列化的ExecutionContext |
BATCH_STEP_EXECUTION_CONTEXT
Step
中ExecutionContext
相关的数据表,结构与BATCH_JOB_EXECUTION_CONTEXT
完全一样。
表索引建议
上面的所有建表语句都没有提供索引,但是并不代表索引没有价值。当感觉到SQL语句的执行有效率问题时候,可以增加索引。
索引带来的价值取决于SQL查询的频率以及关联关系,下面是Spring Batch框架在运行过程中会用到的一些查询条件语句,用于参考优化索引:
表 | Where条件 | 执行频率 |
---|---|---|
BATCH_JOB_INSTANCE | JOB_NAME = ? and JOB_KEY = ? | 每次Job启动执时 |
BATCH_JOB_EXECUTION | JOB_INSTANCE_ID = ? | 每次Job重启时 |
BATCH_EXECUTION_CONTEXT | EXECUTION_ID = ? and KEY_NAME = ? | 视chunk的大小而定 |
BATCH_STEP_EXECUTION | VERSION = ? | 视chunk的大小而定 |
BATCH_STEP_EXECUTION | STEP_NAME = ? and JOB_EXECUTION_ID = ? | 每一个Step执行之前 |
使用案例
下面是Spring Batch一些简单的应用,源码在下列地址的simple工程:
- Gitee:https://gitee.com/chkui-com/spring-batch-sample
- Github:https://github.com/chkui/spring-batch-sample
Spring Batch提供了2种执行方式:命令行方式或Java内嵌方式。命令行方式是直到需要执行批处理任务的时候才启动程序,内嵌方式是结合Web工程或其他外部化框架来使用。2者最大的差别就是是否直接向IoCs注入一个Job
实例。
通用基本配置
两种方式的基本配置都是一样的,通过Reader
、Processor
、Writer
来组装一个Step
。代码中Item
并不涉及文件或数据库的操作,只是简单的模拟数据读取、处理、写入的过程。实体Record
和Msg
用于模拟数据转换,基本配置如下:
public class BatchDefaultConfig { @Bean //配置Step public Step simpleStep(StepBuilderFactory builder, ItemReader<Record> reader, ItemProcessor<Record, Msg> processor, ItemWriter<Msg> writer) { return builder.get("SimpleStep").<Record, Msg>chunk(10).reader(reader).processor(processor).writer(writer) .build(); } @Bean //配置 Reader public ItemReader<Record> reader() { return new ItemReader<Record>() { private int count = 0; public Record read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { return ++this.count < 100 ? new Record().setId(this.count).setMsg("Read Number:" + this.count) : null; } }; } @Bean //配置 Processor public ItemProcessor<Record, Msg> processor() { return new ItemProcessor<Record, Msg>() { public Msg process(Record item) throws Exception { return new Msg("MSG GET INFO = " + item.getMsg()); } }; } @Bean //配置 Writer public ItemWriter<Msg> writer() { return new ItemWriter<Msg>() { private int batchCount = 0; public void write(List<? extends Msg> items) throws Exception { System.out.println("Batch Count : " + ++batchCount + ". Data:"); for (Msg msg : items) { System.out.println(msg.getInfo()); } } }; } }
命令行方式运行
有了基本配置之后,命令行运行的方式仅仅是向容器添加一个Job
:
@Configuration //导入依赖配置 @Import({ BatchDefaultConfig.class }) public class BatchCommondConfig { @Bean public Job simpleJob(Step step, JobBuilderFactory builder) { return builder.get("SimpleJob").start(step).build(); //向容器返回一个Job的Bean } }
然后启动Spring Framework则会自动启用Command Runner运行方式运行——先调用SpringApplication::callRunner
方法,然后使用JobLauncherCommandLineRunner::execute
运行:
public class CommondSample { public static void main(String[] args) throws DuplicateJobException { //模拟测试参数, 这些参数值在执行Java时从外部传入的,比如-Dkey=value String[] argsExt = new String[2]; argsExt[0] = "BuilderParam1=Value1"; argsExt[1] = "BuilderParam2=Value2"; //运行Spring Framework SpringApplication.run(CommondSample.class, argsExt); } }
启用之后观察数据库已经发生了变更。使用命令行需要通过 Java运行参数(-Dkey=value)传递JobParameters
的数据,上面的代码模拟实现了相关的过程。
Java内嵌运行
Java内嵌的方式主要是用于搭配外部工程化使用,比如使用Web框架或则统一调度平台管之类的结构化框架来统一管理批处理任务。与命令行执行最大的区别就是不向容器注入Job
:
@Configuration //导入进出配置 @Import({BatchDefaultConfig.class}) public class BatchOperatoConfig { @Bean //返回JobFactory public JobFactory simpleJob(Step step, JobBuilderFactory builder) { SimpleJobFactory sampleJobFactory = new SimpleJobFactory(); sampleJobFactory.setJob(builder.get("SimpleJob").start(step).build()); return sampleJobFactory; } }
配置代码向容器添加了一个JobFactory
的实现类,JobFactory
的两个接口一个是获取Job
一个是获取Job
的名称,SimpleJobFactory
实现了JobFactory
:
public class SimpleJobFactory implements JobFactory { private Job job; public void setJob(Job job) { this.job = job; } @Override public Job createJob() { return job; } @Override public String getJobName() { return job.getName(); } }
最后通过SimpleJobFactory
来启动一个Job
:
@SpringBootApplication @EnableBatchProcessing @EnableScheduling public class OperatorSample { public static void main(String[] args) throws DuplicateJobException { new SuspendThread().run(); //挂起系统一直运行 ConfigurableApplicationContext ctx = SpringApplication.run(OperatorSample.class); Cron cron = ctx.getBean(Cron.class); cron.register(); //注册JobFactory cron.runJobLaunch(); } } @Service class Cron { @Autowired JobLauncher jobLauncher; @Autowired private JobOperator jobOperator; @Autowired private JobRegistry jobRegistry; @Autowired private JobFactory jobFactory; //注册JobFactory void register() throws DuplicateJobException { jobRegistry.register(jobFactory); } //使用JobLaunch执行 void runJobLaunch() { Map<String, JobParameter> map = new HashMap<>(); map.put("Builder", new JobParameter("1")); map.put("Timer", new JobParameter("2")); jobLauncher.run(jobFactory.createJob(), new JobParameters(map)); } @Scheduled(cron = "30 * * * * ? ") void task1() { System.out.println("1"); runOperator(); } //定时任务使用 JobOperator执行 private void runOperator() { jobOperator.start("SimpleJob", "Builder=1,Timer=2"); } }
这里使用了2种执行方式:JobLauncher
和JobOperator
。JobLauncher
简单明了的启动一个批处理任务。而JobOperator
扩展了一些用于Job
管理的接口方法,观察JobOperator
的源码可以发现它提供了获取ExecuteContext
、检查JobInstance
等功能,如果需要定制开发一个基于Web或者JMX管理批处理任务的系统,JobOperator
更合适。JobOperator
的第二个参数用于传递JobParameters
,等号两端分别是key
和value
,逗号用于分割多行数据。
在Job配置与运行提及过一个JobInstance
相当于Job
+JobParameters
,因此虽然上面的代码使用了两种不同的运行方式,但是Job
和JobParameters
是一样的。在运行被定时任务包裹的runOperator
方法时,会一直抛出JobInstanceAlreadyExistsException
异常,因为同一个实例不能运行2次。如果运行失败可以使用对应的restart
方法。
后续会介绍各种Reader
和Writer
的使用。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
一篇文章教你如何捕获前端错误
本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/E51lKQOojsvhHvACIyXwhw 作者:黄文佳 常见错误的分类 对于用户在访问页面时发生的错误,主要包括以下几个类型: 1、js运行时错误 JavaScript代码在用户浏览器中执行时,由于一些边界情况、本地环境的不可控等因素,可能会存在js运行时错误。 而依赖客户端的某些方法,由于兼容性或者网络等问题,也有概率会出现运行时错误。 e.g: 下图是当使用了未定义的变量"foo",导致产生js运行时错误时的上报数据: 2、资源加载错误 这里的静态资源包括js、css以及image等。现在的web项目,往往依赖了大量的静态资源,而且一般也会有cdn存在。 如果某个节点出现问题导致某个静态资源无法访问,就需要能够捕获这种异常并进行上报,方便第一时间解决问题。 e.g: 下图是图片资源不存在时的上报数据: 3、未处理的promise错误 未使用catch捕获的promise错误,往往都会存在比较大的风险。而编码时有可能覆盖的不够全面,因此有必要监控未处理的promise错误并进行上报...
- 下一篇
Mysql主主同步失败后的恢复
基础信息 主库: 数据库2 10.126.4.2 数据库3 10.126.4.3 1. 停止数据库3对外服务 防止同步过程中服务通过数据库3写入数据 $ firewall-cmd --remove-port=3306/tcp $ firewall-cmd --add-rich-rule="rule f amily="ipv4" source address="10.126.4.2" port protocol="tcp" port="3306" accept" $ firewall-cmd --reload 2. 备份主库 $ mysqldump -uroot -p --single-transaction --master-data=2 --no-autocommit -A >alldatas-190708.sql 记住 MASTER_LOG_FILE 和 MASTER_LOG_POS $ head -n 30 alldatas-190708.sql -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000016', MASTER_L...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程