diff --git a/SpringBatch/src/main/java/org/example/Config/BatchConfig.java b/SpringBatch/src/main/java/org/example/Config/BatchConfig.java index 39a5af4..14fcdb4 100644 --- a/SpringBatch/src/main/java/org/example/Config/BatchConfig.java +++ b/SpringBatch/src/main/java/org/example/Config/BatchConfig.java @@ -1,20 +1,30 @@ package org.example.Config; +import com.fasterxml.jackson.databind.ObjectMapper; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; import io.github.resilience4j.bulkhead.annotation.Bulkhead; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import lombok.Data; import org.example.component.TimeoutListener; +import org.example.dto.PersonDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.*; import org.springframework.batch.core.job.builder.*; +import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.*; +import org.springframework.batch.core.repository.ExecutionContextSerializer; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.core.step.builder.*; +import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.circuitbreaker.resilience4j.ReactiveResilience4jBulkheadProvider; import org.springframework.cloud.circuitbreaker.resilience4j.Resilience4JCircuitBreakerFactory; import org.springframework.cloud.circuitbreaker.resilience4j.Resilience4JConfigBuilder; @@ -22,39 +32,120 @@ import org.springframework.cloud.client.circuitbreaker.Customizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; +import java.io.Serializable; import java.time.Duration; +import java.util.Objects; @Configuration -@EnableBatchProcessing +@EnableBatchProcessing(executionContextSerializerRef = "jacksonSerializer") @EnableTransactionManagement public class BatchConfig { + + private static final Logger log = LoggerFactory.getLogger(BatchConfig.class); + @Bean - public Job demoJob(JobRepository jobRepository, Step demoStep) { + public Job demoJob(JobRepository jobRepository, Step demoStep, Step step1, Step step2, Step step3) { + // Create a flow for step1 followed by step3 + Flow flow1 = new FlowBuilder("flow1") + .start(step1) + .next(step3) + .build(); + + // Create a flow for step2 (runs in parallel with flow1) + Flow flow2 = new FlowBuilder("flow2") + .start(step2) + .build(); + + // Create a split that runs flow1 and flow2 in parallel + Flow parallelFlow = new FlowBuilder("parallelFlow") + .split(new SimpleAsyncTaskExecutor()) + .add(flow1, flow2) + .build(); + return new JobBuilder("demoJob", jobRepository) .listener(new TimeoutListener()) .start(demoStep) + .on("COMPLETED").to(parallelFlow) + .end() .build(); } + @Autowired + private ObjectMapper objectMapper; + @Bean public Step demoStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("demoStep", jobRepository) .tasklet((contribution, chunkContext) -> { - for(int i=0; i<5000; i++) { - System.out.println("Processing step " + (i + 1) + "/5"); - Thread.sleep(1000); // Simulate work - } + StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); + JobExecution jobExecution = stepExecution.getJobExecution(); + + // Get job parameter person as JSON string + String personJson = jobExecution.getJobParameters().getString("person"); + log.info("Person JSON: {}", personJson); + + // Deserialize JSON to PersonDTO object + PersonDTO personDTO = objectMapper.readValue(personJson, PersonDTO.class); + log.info("Person DTO - Name: {}, Age: {}", personDTO.getName(), personDTO.getAge()); + + ExecutionContext jobContext = jobExecution.getExecutionContext(); + + jobContext.putString("Data", new MyCustomData("LOLLLLL<>" + jobExecution.getId() + ".txt").toString()); + + String earlyData = jobContext.getString("Data"); + log.info("Early Data from Job Execution Context: {}", earlyData); System.out.println(">>> Running batch job with REST trigger <<<"); return RepeatStatus.FINISHED; }, transactionManager).allowStartIfComplete(true) .build(); } + @Bean + public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, org.example.service.ExternalReactiveService externalReactiveService) { + return new StepBuilder("step1", jobRepository) + .tasklet((contribution, chunkContext) -> { + log.info("Step 1: Fetching data from external source..."); + String result = externalReactiveService.fetchData().block(); + log.info("Step 1 Result: {}", result); + return RepeatStatus.FINISHED; + }, transactionManager) + .build(); + } + + @Bean + public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager, org.example.service.ExternalReactiveService externalReactiveService) { + return new StepBuilder("step2", jobRepository) + .tasklet((contribution, chunkContext) -> { + int page = 0; + int size = 100; + log.info("Step 2: Fetching data batch - Page: {}, Size: {}", page, size); + String result = Objects.requireNonNull(externalReactiveService.fetchAllDataFromPath("/api/data/v1", size).collectList().block()).toString(); + log.info("Step 2 Result: {}", result); + return RepeatStatus.FINISHED; + }, transactionManager) + .build(); + } + + @Bean + public Step step3(JobRepository jobRepository, PlatformTransactionManager transactionManager, org.example.service.ExternalReactiveService externalReactiveService) { + return new StepBuilder("step3", jobRepository) + .tasklet((contribution, chunkContext) -> { + int page = 1; + int size = 500; + log.info("Step 3: Fetching data batch - Page: {}, Size: {}", page, size); + String result = Objects.requireNonNull(externalReactiveService.fetchAllDataFromPath("/api/data/v2", size).collectList().block()).toString(); + log.info("Step 3 Result: {}", result); + return RepeatStatus.FINISHED; + }, transactionManager) + .build(); + } + @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); @@ -99,5 +190,9 @@ public Customizer reactiveSpecificBulkhead } + @Bean + public ExecutionContextSerializer jacksonSerializer() { + return new Jackson2ExecutionContextStringSerializer(); + } } diff --git a/SpringBatch/src/main/java/org/example/Config/MyCustomData.java b/SpringBatch/src/main/java/org/example/Config/MyCustomData.java new file mode 100644 index 0000000..e5e0a6a --- /dev/null +++ b/SpringBatch/src/main/java/org/example/Config/MyCustomData.java @@ -0,0 +1,14 @@ +package org.example.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NonNull; + +import java.io.Serializable; + + +@Data +public class MyCustomData implements Serializable { + @NonNull + private String value; +} \ No newline at end of file diff --git a/SpringBatch/src/main/java/org/example/controller/Batch.java b/SpringBatch/src/main/java/org/example/controller/Batch.java index c9f2360..a1f1ca4 100644 --- a/SpringBatch/src/main/java/org/example/controller/Batch.java +++ b/SpringBatch/src/main/java/org/example/controller/Batch.java @@ -1,7 +1,9 @@ package org.example.controller; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.example.dto.PersonDTO; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; @@ -27,16 +29,28 @@ public class Batch { @Autowired private Job demoJob; + @Autowired + private ObjectMapper objectMapper; + @GetMapping("/batch") - private ResponseEntity signUp() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { + public ResponseEntity signUp() throws Exception { + // Hardcoded PersonDTO values + PersonDTO personDTO = new PersonDTO("John Doe", 30); + + // Serialize PersonDTO to JSON string + String personJson = objectMapper.writeValueAsString(personDTO); + JobParameters params = new JobParametersBuilder() .addLocalDateTime("date", java.time.LocalDateTime.now()) + .addString("name", personDTO.getName()) + .addString("person", personJson) .toJobParameters(); jobLauncher.run(demoJob, params); log.info("==============================="); - return ResponseEntity.ok("Hello World"); + log.info("Batch job started with name: {} and age: {}", personDTO.getName(), personDTO.getAge()); + return ResponseEntity.ok("Batch job started for " + personDTO.getName()); } diff --git a/SpringBatch/src/main/java/org/example/dto/PersonDTO.java b/SpringBatch/src/main/java/org/example/dto/PersonDTO.java new file mode 100644 index 0000000..38eed10 --- /dev/null +++ b/SpringBatch/src/main/java/org/example/dto/PersonDTO.java @@ -0,0 +1,13 @@ +package org.example.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class PersonDTO { + private String name; + private Integer age; +} diff --git a/SpringBatch/src/main/java/org/example/service/ExternalReactiveService.java b/SpringBatch/src/main/java/org/example/service/ExternalReactiveService.java index 6d695ae..37073a8 100644 --- a/SpringBatch/src/main/java/org/example/service/ExternalReactiveService.java +++ b/SpringBatch/src/main/java/org/example/service/ExternalReactiveService.java @@ -1,14 +1,91 @@ package org.example.service; -import org.springframework.web.reactive.function.client.WebClient; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.github.resilience4j.bulkhead.annotation.Bulkhead; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; @Service public class ExternalReactiveService { - @Bulkhead(name = "serviceBulkhead", type = Bulkhead.Type.THREADPOOL) + + private static final Logger log = LoggerFactory.getLogger(ExternalReactiveService.class); + + @Autowired + private ObjectMapper objectMapper; + + @Bulkhead(name = "serviceBulkhead", type = Bulkhead.Type.SEMAPHORE) public Mono fetchData() { - return WebClient.create().get().uri("https://example.com").retrieve().bodyToMono(String.class); + return WebClient.create().get().uri("https://api.github.com/zen") + .retrieve() + .bodyToMono(String.class) + .doOnError(WebClientResponseException.class, ex -> + log.error("Error fetching data from GitHub API - Status Code: {}, Message: {}", + ex.getStatusCode().value(), ex.getMessage())) + .retryWhen(Retry.fixedDelay(5, Duration.ofMinutes(5)) + .filter(throwable -> throwable instanceof WebClientResponseException && + (((WebClientResponseException) throwable).getStatusCode().value() == 504 || + ((WebClientResponseException) throwable).getStatusCode().value() == 503)) + .doBeforeRetry(retrySignal -> + log.warn("Retrying GitHub API call - Attempt: {}, Error: {}", + retrySignal.totalRetries() + 1, retrySignal.failure().getMessage()))); + } + + @Bulkhead(name = "serviceBulkhead", type = Bulkhead.Type.SEMAPHORE) + public Mono fetchDataPageFromPath(String path, int page, int size) { + return WebClient.create() + .get() + .uri(uriBuilder -> uriBuilder + .scheme("http") + .host("localhost") + .port(9000) + .path(path) + .queryParam("page", page) + .queryParam("size", size) + .build()) + .retrieve() + .bodyToMono(String.class) + .doOnError(WebClientResponseException.class, ex -> + log.error("Error fetching page {} from {} - Status Code: {}, Message: {}", + page, path, ex.getStatusCode().value(), ex.getMessage())) + .retryWhen(Retry.fixedDelay(5, Duration.ofMinutes(1)) + .filter(throwable -> throwable instanceof WebClientResponseException && + (((WebClientResponseException) throwable).getStatusCode().value() == 504 || + ((WebClientResponseException) throwable).getStatusCode().value() == 503)) + .doBeforeRetry(retrySignal -> + log.warn("Retrying API call for page {} from {} - Attempt: {}, Error: {}", + page, path, retrySignal.totalRetries() + 1, retrySignal.failure().getMessage()))); + } + + public Flux fetchAllDataFromPath(String path, int size) { + return fetchDataPageFromPath(path, 0, size).flatMapMany(firstPage -> { + try { + JsonNode root = objectMapper.readTree(firstPage); + int totalPages = root.path("totalPages").asInt(); + log.info("Fetched page 0/{} from {}, response: {}", totalPages - 1, path, firstPage); + + if (totalPages <= 1) { + return Flux.just(firstPage); + } + + return Flux.concat( + Mono.just(firstPage), + Flux.range(1, totalPages) + .concatMap(page -> fetchDataPageFromPath(path, page, size) + .doOnNext(response -> log.info("Fetched page {}/{} from {}, response: {}", page, totalPages, path, response))) + ); + } catch (Exception e) { + return Flux.error(e); + } + }); } } \ No newline at end of file diff --git a/SpringBatch/target/classes/application.properties b/SpringBatch/target/classes/application.properties new file mode 100644 index 0000000..1ae0bf4 --- /dev/null +++ b/SpringBatch/target/classes/application.properties @@ -0,0 +1,16 @@ +## Spring JPA +spring.datasource.url=jdbc:postgresql://localhost:5432/postgres +spring.datasource.username=postgres +spring.datasource.password=naveen +spring.jpa.hibernate.ddl-auto=none +spring.jpa.show-sql=true +spring.jpa.properties.hibernate.format_sql=false +hibernate.dialect=org.hibernate.dialect.PostgreSQL9Dialect +spring.servlet.multipart.enabled=true +spring.servlet.multipart.max-file-size=50MB +spring.servlet.multipart.max-request-size=60MB +spring.output.ansi.enabled=ALWAYS +management.endpoint.metrics.enabled=true +management.endpoints.prometheus.enabled=true +management.endpoints.web.exposure.include=* +spring.batch.jdbc.initialize-schema=never diff --git a/SpringBatch/target/classes/org/example/Application.class b/SpringBatch/target/classes/org/example/Application.class new file mode 100644 index 0000000..bf96010 Binary files /dev/null and b/SpringBatch/target/classes/org/example/Application.class differ diff --git a/SpringBatch/target/classes/org/example/Config/BatchConfig.class b/SpringBatch/target/classes/org/example/Config/BatchConfig.class new file mode 100644 index 0000000..5f81ba9 Binary files /dev/null and b/SpringBatch/target/classes/org/example/Config/BatchConfig.class differ diff --git a/SpringBatch/target/classes/org/example/Config/MyCustomData.class b/SpringBatch/target/classes/org/example/Config/MyCustomData.class new file mode 100644 index 0000000..861ea24 Binary files /dev/null and b/SpringBatch/target/classes/org/example/Config/MyCustomData.class differ diff --git a/SpringBatch/target/classes/org/example/component/TimeoutListener.class b/SpringBatch/target/classes/org/example/component/TimeoutListener.class new file mode 100644 index 0000000..b139dcf Binary files /dev/null and b/SpringBatch/target/classes/org/example/component/TimeoutListener.class differ diff --git a/SpringBatch/target/classes/org/example/controller/Batch.class b/SpringBatch/target/classes/org/example/controller/Batch.class new file mode 100644 index 0000000..4e82227 Binary files /dev/null and b/SpringBatch/target/classes/org/example/controller/Batch.class differ diff --git a/SpringBatch/target/classes/org/example/dto/PersonDTO.class b/SpringBatch/target/classes/org/example/dto/PersonDTO.class new file mode 100644 index 0000000..bc0bb8b Binary files /dev/null and b/SpringBatch/target/classes/org/example/dto/PersonDTO.class differ diff --git a/SpringBatch/target/classes/org/example/service/ExternalReactiveService.class b/SpringBatch/target/classes/org/example/service/ExternalReactiveService.class new file mode 100644 index 0000000..0d6fa85 Binary files /dev/null and b/SpringBatch/target/classes/org/example/service/ExternalReactiveService.class differ