Spring batch 不同数据源迁移百万级数据遇到NonSkippableReadException的解决方案
Spring batch 不同数据源迁移百万级数据遇到NonSkippableReadException的解决方案
- 问题背景
- 心路历程
- 问题原因
- 问题解决
问题背景
需求特殊哈,需要从不同的数据源迁移数据,因为涉及到迁移的数据量比较大,故想使用下Spring batch来做,听说它的功能非常强大,于是就踩坑了,被按在地上摩擦了好几天哈哈,最后终于找到了好的解决方案。因国内关于此类问题的文章并不多, 故记录一下!
心路历程
- 使用AI chatGPT ,chatGPT给的答案真的只能当做参考,误导性比较大,特别是对于这种不太常用的技术框架;
- 百度搜索,基本上没有可参考的答案;
- 阅读官方参考文档 : 各版本地址 查看自己使用版本的参考文档就行;
- 啃源码;
问题原因
首先介绍下Spring batch的相关组件:

LZ的问题原因在于读取数据时因数据源数据连接不稳定的从而导致数据库连接断掉,默认Spring batch 捕获到该异常时认为该异常是不可恢复的,所以它会抛出一个它自己定义的致命的异常,即为 NonSkippableReadException ,完整的异常如下:
<#9f70370a> com.mysql.cj.exceptions.ConnectionIsClosedException: No operations allowed after connection closed.at sun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)at com.mysql.cj.NativeSession.checkClosed(NativeSession.java:1171)at com.mysql.cj.jdbc.ConnectionImpl.checkClosed(ConnectionImpl.java:576)at com.mysql.cj.jdbc.ConnectionImpl.setNetworkTimeout(ConnectionImpl.java:2486)... 81 common frames omitted
Wrapped by: <#7b93c319> java.sql.SQLNonTransientConnectionException: No operations allowed after connection closed.at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:73)at com.mysql.cj.jdbc.ConnectionImpl.setNetworkTimeout(ConnectionImpl.java:2490)at com.zaxxer.hikari.pool.PoolBase.setNetworkTimeout(PoolBase.java:550)at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:165)at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:179)... 78 common frames omitted
Wrapped by: <#f9ccd589> java.sql.SQLTransientConnectionException: HikariPool-2 - Connection is not available, request timed out after 30039ms.at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:676)at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:190)at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:155)at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)at sun.reflect.GeneratedMethodAccessor318.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:494)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)at com.sun.proxy.$Proxy150.getConnection(Unknown Source)at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:158)at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:116)at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:79)... 65 common frames omitted
Wrapped by: <#29957423> org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is java.sql.SQLTransientConnectionException: HikariPool-2 - Connection is not available, request timed out after 30039ms.at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:82)at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:371)at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452)at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462)at org.springframework.batch.item.database.JdbcPagingItemReader.doReadPage(JdbcPagingItemReader.java:210)at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92)at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)... 57 common frames omitted
Wrapped by: <#0fe19c8d> org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during readat org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105)at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203)at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136)at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313)at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144)at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)at com.sun.proxy.$Proxy323.run(Unknown Source)...
问题解决
1、【不可行】尝试使用Spring batch自带的异常重试机制,配置如下:
@Beanpublic Step labelAssetDataMigrationStep() {return stepBuilderFactory.get("migrationStep")// 设置批处理大小.<Object, Object>chunk(1000).reader(reader).processor(processor).writer(writer).faultTolerant().retry(Exception.class).retryLimit(5).listener(listener).build();}
不生效,具体原因在于抛出的异常NonSkippableReadException是被Spring batch视为致命异常, 默认的重试不支持这种致命异常,所以配置的重试机制无效。
证明,啃源码看到的,源码如下:



2、【不可行】尝试使用Step监听器,监听到该异常NonSkippableReadException后通过更新step执行参数来触发重试(来源于询问chatGPT), 倒是能监听到该异常,但很遗憾的是并不会触发重试,只是将执行结果改变了而已。监听器代码如下:
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;public class CustomStepExecutionListener extends StepExecutionListenerSupport {@Overridepublic void beforeStep(StepExecution stepExecution) {// 在Step执行之前调用,可以进行一些初始化操作}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {// 在Step执行之后调用if (stepExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) {// 判断Step是否失败Throwable exception = stepExecution.getFailureExceptions().get(0); // 获取第一个异常(通常只会有一个)// 根据具体情况决定是否重试,可以根据异常类型或其他条件来决定是否重试boolean shouldRetry = yourLogicToDecideRetry(exception);if (shouldRetry) {stepExecution.setStatus(BatchStatus.UNKNOWN); // 将Step状态设置为UNKNOWN,以便Spring Batch触发重试stepExecution.getExecutionContext().putString("retryReason", "Exception occurred, retrying...");return ExitStatus.UNKNOWN; // 返回UNKNOWN,表示需要重试}}return stepExecution.getExitStatus();}private boolean yourLogicToDecideRetry(Throwable exception) {// 根据具体情况决定是否重试,返回true表示重试,返回false表示不重试return false;}
}
至于不会进行重试的原因我个人理解是该方法为afterStep,即为在step执行之后处理一下善后工作,因为本身Step已经执行结束了, 只能在发生异常的时候记录下日志啥的, 即使将执行参数和结果改掉,例如将ExitStatus改为非FAILED,将failureExceptions异常原因清空或者换成其他非致命异常等等,都是无济于事的。
3、【可行】尝试使用restart,而非retry,即是在调用run job的地方进行重新执行, 根据run job的返回结果可以知道是否执行成功, 如果执行失败,将执行id传入restart方法进行重试即可(这里的的重试会在上次执行失败的地方接着继续执行的,所以满足需求),具体代码参考如下:
public void runMigrationJob() {JobParameters parameters = new JobParametersBuilder().addString("execParam", "migrationJob" + LocalDateTime.now()).toJobParameters();try {JobExecution jobExecution = jobLauncher.run(migrationJob, parameters);if (ExitStatus.COMPLETED.getExitCode().equals(jobExecution.getExitStatus().getExitCode())) {return;}int retryNum = 10;JobExecution restartJobExecution = jobExecution;do {Long execId = jobOperator.restart(restartJobExecution.getId());restartJobExecution = jobExplorer.getJobExecution(execId );assert restartJobExecution != null;if (ExitStatus.FAILED.getExitCode().equals(restartJobExecution.getExitStatus().getExitCode())) {retryNum--;} else {retryNum = 0;}} while (retryNum > 0);if (ExitStatus.FAILED.getExitCode().equals(restartJobExecution.getExitStatus().getExitCode())) {throw new RuntimeException("MigrationJob execution failed, try 10 times",restartJobExecution.getFailureExceptions().get(0));}} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException |JobParametersInvalidException e) {log.error("MigrationJob execution failed: " + e.getMessage(), e);throw new RuntimeException("MigrationJob execution failed");}}
LZ采用此方案解决了问题。
4、【可行】自定义ItemReader读取器,重写read方法,捕获异常不要抛出, 在read方法中如果获取数据库连接失败尝试重新获取即可,也是一种可行的方法, 不过可能会导致资源负载问题
推荐使用 3
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
