diff --git a/spring-batch-2/src/main/java/com/baeldung/restartjob/BatchConfig.java b/spring-batch-2/src/main/java/com/baeldung/restartjob/BatchConfig.java new file mode 100644 index 000000000000..51f09fdfa1fc --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/restartjob/BatchConfig.java @@ -0,0 +1,81 @@ +package com.baeldung.restartjob; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.file.mapping.PassThroughLineMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +public class BatchConfig { + + @Bean + public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + return new JobBuilder("simpleJob", jobRepository) + .start(step1(jobRepository, transactionManager)) + .build(); + } + + @Bean + public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + return new StepBuilder("step1", jobRepository) + .chunk(2, transactionManager) + .reader(flatFileItemReader()) + .processor(itemProcessor()) + .writer(itemWriter()) + .build(); + } + + @Bean + @StepScope + public FlatFileItemReader flatFileItemReader() { + return new FlatFileItemReaderBuilder() + .name("itemReader") + .resource(new ClassPathResource("data.csv")) + .lineMapper(new PassThroughLineMapper()) + .saveState(true) + .build(); + } + + @Bean + public RestartItemProcessor itemProcessor() { + return new RestartItemProcessor(); + } + + @Bean + public ItemWriter itemWriter() { + return items -> { + System.out.println("Writing items:"); + for (String item : items) { + System.out.println("- " + item); + } + }; + } + + static class RestartItemProcessor implements ItemProcessor { + private boolean failOnItem3 = true; + + public void setFailOnItem3(boolean failOnItem3) { + this.failOnItem3 = failOnItem3; + } + + @Override + public String process(String item) throws Exception { + System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")"); + if (failOnItem3 && item.equals("Item3")) { + throw new RuntimeException("Simulated failure on Item3"); + } + return "PROCESSED " + item; + } + } +} \ No newline at end of file diff --git a/spring-batch-2/src/main/java/com/baeldung/restartjob/RestartJobBatchApp.java b/spring-batch-2/src/main/java/com/baeldung/restartjob/RestartJobBatchApp.java new file mode 100644 index 000000000000..389d8c25ab99 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/restartjob/RestartJobBatchApp.java @@ -0,0 +1,64 @@ +package com.baeldung.restartjob; + +import java.util.List; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class RestartJobBatchApp { + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(RestartJobBatchApp.class); + app.setAdditionalProfiles("restart"); + app.run(args); + } + + @Bean + @ConditionalOnProperty(prefix = "job.autorun", name = "enabled", havingValue = "true", matchIfMissing = true) + CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer, + JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) { + return args -> { + JobParameters jobParameters = new JobParametersBuilder() + .addString("jobId", "test-job-" + System.currentTimeMillis()) + .toJobParameters(); + + List instances = jobExplorer.getJobInstances("simpleJob", 0, 1); + if (!instances.isEmpty()) { + JobInstance lastInstance = instances.get(0); + List executions = jobExplorer.getJobExecutions(lastInstance); + if (!executions.isEmpty()) { + JobExecution lastExecution = executions.get(0); + if (lastExecution.getStatus() == BatchStatus.FAILED) { + System.out.println("Restarting failed job execution with ID: " + lastExecution.getId()); + itemProcessor.setFailOnItem3(false); + + JobExecution restartedExecution = jobLauncher.run(job, jobParameters); + + // final Long restartId = jobOperator.restart(lastExecution.getId()); + // final JobExecution restartedExecution = jobExplorer.getJobExecution(restartedExecution); + + System.out.println("Restarted job status: " + restartedExecution.getStatus()); + return; + } + } + } + + System.out.println("Starting new job execution..."); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + System.out.println("Job started with status: " + jobExecution.getStatus()); + }; + } +} \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/application-restart.properties b/spring-batch-2/src/main/resources/application-restart.properties new file mode 100644 index 000000000000..4b2eff89245d --- /dev/null +++ b/spring-batch-2/src/main/resources/application-restart.properties @@ -0,0 +1,11 @@ +spring.datasource.url=jdbc:h2:~/test;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE +spring.datasource.username=sa +spring.datasource.password= +spring.datasource.driver-class-name=org.h2.Driver + +spring.jpa.hibernate.ddl-auto=create-drop +spring.jpa.properties.hibernate.format_sql=true + +spring.batch.jdbc.initialize-schema=always +spring.sql.init.mode=always +spring.batch.jdbc.table-prefix=BATCH_ \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/data.csv b/spring-batch-2/src/main/resources/data.csv new file mode 100644 index 000000000000..08b4b4431f3a --- /dev/null +++ b/spring-batch-2/src/main/resources/data.csv @@ -0,0 +1,5 @@ +Item1 +Item2 +Item3 +Item4 +Item5 \ No newline at end of file diff --git a/spring-batch-2/src/test/java/com/baeldung/restartjob/RestartJobBatchAppIntegrationTest.java b/spring-batch-2/src/test/java/com/baeldung/restartjob/RestartJobBatchAppIntegrationTest.java new file mode 100644 index 000000000000..b0b5ae3317df --- /dev/null +++ b/spring-batch-2/src/test/java/com/baeldung/restartjob/RestartJobBatchAppIntegrationTest.java @@ -0,0 +1,92 @@ +package com.baeldung.restartjob; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +@SpringBootTest(classes = {RestartJobBatchApp.class, BatchConfig.class}, + properties = {"job.autorun.enabled=false"}) +@Import(RestartJobBatchAppIntegrationTest.TestConfig.class) +public class RestartJobBatchAppIntegrationTest { + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Autowired + private BatchConfig.RestartItemProcessor itemProcessor; + + @TestConfiguration + static class TestConfig { + @Autowired + private JobLauncher jobLauncher; + + @Autowired + private Job job; + + @Bean + public JobLauncherTestUtils jobLauncherTestUtils() { + JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils(); + jobLauncherTestUtils.setJobLauncher(jobLauncher); + jobLauncherTestUtils.setJob(job); + return jobLauncherTestUtils; + } + } + + private final Resource inputFile = new ClassPathResource("data.csv"); + + @Test + public void givenItems_whenFailed_thenRestartFromFailure() throws Exception { + // Given + createTestFile("Item1\nItem2\nItem3\nItem4"); + + JobParameters jobParameters = new JobParametersBuilder() + .addLong("time", System.currentTimeMillis()) + .toJobParameters(); + + // When + JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters); + assertEquals(BatchStatus.FAILED, firstExecution.getStatus()); + + Long executionId = firstExecution.getId(); + + itemProcessor.setFailOnItem3(false); + + // Then + JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters); + + assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus()); + + assertEquals( + firstExecution.getJobInstance().getInstanceId(), + restartedExecution.getJobInstance().getInstanceId() + ); + } + + private void createTestFile(String content) throws IOException { + Path tempFile = Files.createTempFile("test-data", ".csv"); + Files.write(tempFile, content.getBytes()); + Files.copy(tempFile, inputFile.getFile().toPath(), StandardCopyOption.REPLACE_EXISTING); + } + +} + +