Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions batch/chunk-exception/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

<artifactId>chunk-exception</artifactId>
<packaging>war</packaging>

<name>${project.artifactId}</name>
<name>Batch Chunk Exception</name>
<description>Chunk Exception Handling - Retrying and Skipping</description>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
*/
public class ChunkExceptionRecorder {
public static CountDownLatch chunkExceptionsCountDownLatch = new CountDownLatch(3);
public static int retryReadExecutions = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
*/
package org.javaee7.batch.chunk.exception;

import java.io.Serializable;

/**
* @author Arun Gupta
*/
public class MyInputRecord {
public class MyInputRecord implements Serializable {
private int id;

public MyInputRecord() { }


public MyInputRecord(int id) {
this.id = id;
}
Expand All @@ -58,7 +58,22 @@ public int getId() {
public void setId(int id) {
this.id = id;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

MyInputRecord that = (MyInputRecord) o;

return id == that.id;
}

@Override
public int hashCode() {
return id;
}

@Override
public String toString() {
return "MyInputRecord: " + id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ public class MyItemProcessor implements ItemProcessor {
@Override
public Object processItem(Object t) {
System.out.println("MyItemProcessor.processItem: " + t);
if (((MyInputRecord)t).getId() == 6)

if (((MyInputRecord) t).getId() == 6)
throw new NullPointerException();

// return (t.getId() % 2 == 0) ? null : new MyOutputRecord(t.getId() * 2);
return new MyOutputRecord(((MyInputRecord)t).getId() * 2);

return new MyOutputRecord(((MyInputRecord) t).getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,55 @@
*/
package org.javaee7.batch.chunk.exception;

import java.io.Serializable;
import java.util.StringTokenizer;
import javax.batch.api.chunk.AbstractItemReader;
import javax.inject.Named;
import java.io.Serializable;
import java.util.StringTokenizer;

/**
* @author Arun Gupta
*/
@Named
public class MyItemReader extends AbstractItemReader {

private StringTokenizer tokens;


private MyInputRecord lastElement;
private boolean alreadyFailed;

@Override
public void open(Serializable c) {
public void open(Serializable checkpoint) {
tokens = new StringTokenizer("1,2,3,4,5,6,7,8,9,10", ",");

// This will place the nextToken into the last batch checkpoint. Called on exception retry.
if (checkpoint != null) {
while (!Integer.valueOf(tokens.nextToken()).equals(((MyInputRecord) checkpoint).getId())) {
System.out.println("Skipping already read elements");
}
}
}

@Override
public Object readItem() {
if (tokens.hasMoreTokens()) {
int token = Integer.valueOf(tokens.nextToken());
if (token == 3)
throw new IllegalArgumentException();

return new MyInputRecord(token);

// Simulate a read exception when the token is equal to 5. Do it once only.
if (token == 5 && !alreadyFailed) {
alreadyFailed = true;
throw new IllegalArgumentException("Could not read record");
}

lastElement = new MyInputRecord(token);
System.out.println("MyItemReader.readItem " + lastElement);
return lastElement;
}
return null;
}

@Override
public Serializable checkpointInfo() throws Exception {
// This is used internally by batch to stop the retry. Remember to implement equals on the read elements.
return lastElement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
*/
package org.javaee7.batch.chunk.exception;

import java.util.List;
import javax.batch.api.chunk.AbstractItemWriter;
import javax.inject.Named;
import java.util.List;

/**
* @author Arun Gupta
Expand All @@ -51,7 +51,7 @@ public class MyItemWriter extends AbstractItemWriter {

@Override
public void writeItems(List list) {
if (list.contains(new MyOutputRecord(2))) {
if (list.contains(new MyOutputRecord(8))) {
throw new IllegalArgumentException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
*/
package org.javaee7.batch.chunk.exception;

import java.io.Serializable;

/**
* @author Arun Gupta
*/
public class MyOutputRecord {
public class MyOutputRecord implements Serializable {
private int id;

public MyOutputRecord() { }


public MyOutputRecord(int id) {
this.id = id;
}
Expand All @@ -58,11 +58,6 @@ public int getId() {
public void setId(int id) {
this.id = id;
}

@Override
public String toString() {
return "MyOutputRecord: " + id;
}

@Override
public boolean equals(Object o) {
Expand All @@ -78,4 +73,9 @@ public boolean equals(Object o) {
public int hashCode() {
return id;
}

@Override
public String toString() {
return "MyOutputRecord: " + id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.javaee7.batch.chunk.exception;

import javax.batch.api.chunk.listener.RetryProcessListener;
import javax.inject.Named;

/**
* @author Roberto Cortez
*/
@Named
public class MyRetryProcessorListener implements RetryProcessListener {
@Override
public void onRetryProcessException(Object item, Exception ex) throws Exception {
System.out.println("MyRetryProcessorListener.onRetryProcessException");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.javaee7.batch.chunk.exception;

import javax.batch.api.chunk.listener.RetryReadListener;
import javax.inject.Named;

/**
* @author Roberto Cortez
*/
@Named
public class MyRetryReadListener implements RetryReadListener {
@Override
public void onRetryReadException(Exception ex) throws Exception {
ChunkExceptionRecorder.retryReadExecutions++;
ChunkExceptionRecorder.chunkExceptionsCountDownLatch.countDown();
System.out.println("MyRetryReadListener.onRetryReadException " + ex.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.javaee7.batch.chunk.exception;

import javax.batch.api.chunk.listener.RetryWriteListener;
import javax.inject.Named;
import java.util.List;

/**
* @author Roberto Cortez
*/
@Named
public class MyRetryWriteListener implements RetryWriteListener {
@Override
public void onRetryWriteException(List<Object> items, Exception ex) throws Exception {
System.out.println("MyRetryWriteListener.onRetryWriteException");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/

package org.javaee7.batch.chunk.exception;

import javax.batch.api.chunk.listener.SkipProcessListener;
Expand All @@ -48,11 +47,9 @@
*/
@Named
public class MySkipProcessorListener implements SkipProcessListener {

@Override
public void onSkipProcessItem(Object t, Exception e) throws Exception {
ChunkExceptionRecorder.chunkExceptionsCountDownLatch.countDown();
System.err.println("MySkipProcessorListener.onSkipProcessItem: " + ((MyInputRecord)t).getId() + ", " + e.getMessage());
System.out.println("MySkipProcessorListener.onSkipProcessItem: " + ((MyInputRecord) t).getId() + ", " + e.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/

package org.javaee7.batch.chunk.exception;

import javax.batch.api.chunk.listener.SkipReadListener;
Expand All @@ -48,11 +47,9 @@
*/
@Named
public class MySkipReadListener implements SkipReadListener {

@Override
public void onSkipReadItem(Exception e) throws Exception {
ChunkExceptionRecorder.chunkExceptionsCountDownLatch.countDown();
System.err.println("MySkipReadListener.onSkipReadItem: " + e.getMessage());
System.out.println("MySkipReadListener.onSkipReadItem: " + e.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,21 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/

package org.javaee7.batch.chunk.exception;

import java.util.List;
import javax.batch.api.chunk.listener.SkipWriteListener;
import javax.inject.Named;
import java.util.List;

/**
* @author Arun Gupta
*/
@Named
public class MySkipWriteListener implements SkipWriteListener {

@Override
public void onSkipWriteItem(List list, Exception e) throws Exception {
ChunkExceptionRecorder.chunkExceptionsCountDownLatch.countDown();
System.err.println("MySkipWriteListener.onSkipWriteItem: " + list.size() + ", " + e.getMessage());
System.out.println("MySkipWriteListener.onSkipWriteItem: " + list.size() + ", " + e.getMessage());
list.remove(new MyOutputRecord(2));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@
<listener ref="mySkipReadListener"/>
<listener ref="mySkipProcessorListener"/>
<listener ref="mySkipWriteListener"/>
<listener ref="myRetryReadListener"/>
<listener ref="myRetryProcessorListener"/>
<listener ref="myRetryWriteListener"/>
</listeners>
<chunk item-count="5" skip-limit="5">
<!-- skip-limit and retry-limit should have values greater than 0 to perform the expected behaviour -->
<chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
<reader ref="myItemReader"/>
<processor ref="myItemProcessor"/>
<writer ref="myItemWriter"/>
Expand Down
Loading