1. 引言
默认情况下,Spring批处理作业在执行过程中出现任何错误都会失败。然而有些时候,为了提高应用程序的弹性,我们就需要处理这类间歇性的故障。在这篇短文中,我们就来一起探讨 如何在Spring批处理框架中配置重试逻辑。
如果对spring batch不了解,可以参考以前的一篇文章:
开车!Spring Batch 入门级示例教程!
2. 简单举例
假设有一个批处理作业,它读取一个CSV文件作为输入:
- username, userid, transaction_date, transaction_amount
- sammy, 1234, 31/10/2015, 10000
- john, 9999, 3/12/2015, 12321
然后,它通过访问REST端点来处理每条记录,获取用户的 age 和 postCode 属性:
- public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
-
- @Override
- public Transaction process(Transaction transaction) throws IOException {
- log.info("RetryItemProcessor, attempting to process: {}", transaction);
- HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
- //parse user's age and postCode from response and update transaction
- ...
- return transaction;
- }
- ...
- }
最后,它生成并输出一个合并的XML:
-
-
10000.0 -
2015-10-31 00:00:00 -
1234 -
sammy -
10 -
430222 -
- ...
-
3. ItemProcessor 中添加重试
现在假设,如果到REST端点的连接由于某些网络速度慢而超时,该怎么办?如果发生这种情况,则我们的批处理工作将失败。
在这种情况下,我们希望失败的 item 处理重试几次。因此,接下来我将批处理作业配置为:在出现故障时执行最多三次重试:
- @Bean
- public Step retryStep(
- ItemProcessor<Transaction, Transaction> processor,
- ItemWriter<Transaction> writer) throws ParseException {
- return stepBuilderFactory
- .get("retryStep")
- .<Transaction, Transaction>chunk(10)
- .reader(itemReader(inputCsv))
- .processor(processor)
- .writer(writer)
- .faultTolerant()
- .retryLimit(3)
- .retry(ConnectTimeoutException.class)
- .retry(DeadlockLoserDataAccessException.class)
- .build();
- }
这里调用了 faultTolerant() 来启用重试功能。另外,我们使用 retry 和 retryLimit 分别定义符合重试条件的异常和 item 的最大重试次数。
4. 测试重试次数
假设我们有一个测试场景,其中返回 age 和 postCode 的REST端点关闭了一段时间。在这个测试场景中,我们只对前两个 API 调用获取一个 ConnectTimeoutException ,而第三个调用将成功:
- @Test
- public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
- FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
- FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);
-
- when(httpResponse.getEntity())
- .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
-
- //fails for first two calls and passes third time onwards
- when(httpClient.execute(any()))
- .thenThrow(new ConnectTimeoutException("Timeout count 1"))
- .thenThrow(new ConnectTimeoutException("Timeout count 2"))
- .thenReturn(httpResponse);
-
- JobExecution jobExecution = jobLauncherTestUtils
- .launchJob(defaultJobParameters());
- JobInstance actualJobInstance = jobExecution.getJobInstance();
- ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
-
- assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
- assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
- AssertFile.assertFileEquals(expectedResult, actualResult);
- }
在这里,我们的工作成功地完成了。另外,从日志中可以明显看出 第一条记录 id=1234 失败了两次,最后在第三次重试时成功了:
- 19:06:57.742 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
- 19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms
同样,看下另一个测试用例,当所有重试次数都用完时会发生什么:
- @Test
- public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
- when(httpClient.execute(any()))
- .thenThrow(new ConnectTimeoutException("Endpoint is down"));
-
- JobExecution jobExecution = jobLauncherTestUtils
- .launchJob(defaultJobParameters());
- JobInstance actualJobInstance = jobExecution.getJobInstance();
- ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
-
- assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
- assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
- assertThat(actualJobExitStatus.getExitDescription(),
- containsString("org.apache.http.conn.ConnectTimeoutException"));
- }
在这个测试用例中,在作业因 ConnectTimeoutException 而失败之前,会尝试对第一条记录重试三次。
5. 使用XML配置重试
最后,让我们看一下与上述配置等价的XML:
"retryBatchJob"> -
"retryStep" > -
-
"itemReader" writer="itemWriter" - processor="retryItemProcessor" commit-interval="10"
- retry-limit="3">
-
-
"org.apache.http.conn.ConnectTimeoutException" /> -
"org.springframework.dao.DeadlockLoserDataAccessException" /> -
-
-
-
-
6. 简单总结
在本文中,我们学习了如何在Spring批处理中配置重试逻辑,其中包括使用Java和XML配置。以及使用单元测试来观察重试在实践中是如何工作的。
本文转载自微信公众号「锅外的大佬」,可以通过以下二维码关注。转载本文请联系锅外的大佬公众号。