Index: beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java
===================================================================
--- beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb)
+++ beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java (revision )
@@ -201,7 +201,7 @@
getLogger().info("Start writing product " + getTargetProduct().getName() + " to " + getFile());
OperatorExecutor operatorExecutor = OperatorExecutor.create(this);
try {
- operatorExecutor.execute(ExecutionOrder.ROW_BAND_COLUMN, pm);
+ operatorExecutor.execute(ExecutionOrder.SCHEDULE_ROW_COLUMN_BAND, pm);
getLogger().info("End writing product " + getTargetProduct().getName() + " to " + getFile());
Index: beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java
===================================================================
--- beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb)
+++ beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java (revision )
@@ -234,19 +234,16 @@
sourceProduct.setPreferredTileSize(50, 50);
Operator op = new TestOP(sourceProduct);
OperatorExecutor operatorExecutor = OperatorExecutor.create(op);
- operatorExecutor.execute(ExecutionOrder.ROW_COLUMN_BAND, ProgressMonitor.NULL);
+ System.setProperty("beam.gpf.executionOrder", "SCHEDULE_ROW_COLUMN_BAND");
+ operatorExecutor.execute(ExecutionOrder.PULL_ROW_BAND_COLUMN, ProgressMonitor.NULL);
- assertEquals(17, recordingTileScheduler.recordedCalls.size());
+ assertEquals(13, recordingTileScheduler.recordedCalls.size());
- assertEquals(8, recordingTileScheduler.requestedTileIndices.size());
+ assertEquals(4, recordingTileScheduler.requestedTileIndices.size());
assertEquals(new Point(0, 0), recordingTileScheduler.requestedTileIndices.get(0));
- assertEquals(new Point(0, 0), recordingTileScheduler.requestedTileIndices.get(1));
- assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(2));
- assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(3));
- assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(4));
- assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(5));
- assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(6));
- assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(7));
+ assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(1));
+ assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(2));
+ assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(3));
}
private Product createSourceProduct() {
Index: beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java
===================================================================
--- beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb)
+++ beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java (revision )
@@ -44,7 +44,7 @@
* target product of the given operator have. The computation of these tiles is
* parallelized to use all available CPUs (cores) using the JAI
* {@link TileScheduler}.
- *
+ *
* @author Marco Zuehlke
* @since BEAM 4.7
*/
@@ -66,15 +66,19 @@
}
public enum ExecutionOrder {
- ROW_COLUMN_BAND,
- ROW_BAND_COLUMN,
+ SCHEDULE_ROW_COLUMN_BAND,
+ SCHEDULE_ROW_BAND_COLUMN,
/**
* Minimize disk seeks if following conditions are met:
* 1. Bands can be computed independently of each other
* 2. I/O-bound processing (time to compute band pixels will less than
* time for I/O).
*/
- BAND_ROW_COLUMN,
+ SCHEDULE_BAND_ROW_COLUMN,
+ /**
+ * for debugging purpose
+ */
+ PULL_ROW_BAND_COLUMN,
}
private final int tileCountX;
@@ -87,7 +91,7 @@
public OperatorExecutor(PlanarImage[] images, int tileCountX, int tileCountY) {
this(images, tileCountX, tileCountY, JAI.getDefaultInstance().getTileScheduler().getParallelism());
}
-
+
public OperatorExecutor(PlanarImage[] images, int tileCountX, int tileCountY, int parallelism) {
this.images = images;
this.tileCountX = tileCountX;
@@ -97,27 +101,29 @@
}
public void execute(ProgressMonitor pm) {
- execute(ExecutionOrder.ROW_BAND_COLUMN, pm);
+ execute(ExecutionOrder.SCHEDULE_ROW_BAND_COLUMN, pm);
}
public void execute(ExecutionOrder executionOrder, ProgressMonitor pm) {
final Semaphore semaphore = new Semaphore(parallelism, true);
final TileComputationListener tcl = new OperatorTileComputationListener(semaphore);
- final TileComputationListener[] listeners = new TileComputationListener[] { tcl };
+ final TileComputationListener[] listeners = new TileComputationListener[]{tcl};
-
+
ImagingListener imagingListener = JAI.getDefaultInstance().getImagingListener();
JAI.getDefaultInstance().setImagingListener(new GPFImagingListener());
pm.beginTask("Executing operator...", tileCountX * tileCountY * images.length);
-
+
+ ExecutionOrder effectiveExecutionOrder = getEffectiveExecutionOrder(executionOrder);
+
try {
- if (executionOrder == ExecutionOrder.ROW_BAND_COLUMN) {
- // for debugging purpose
- // executeRowBandColumn(pm);
+ if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_ROW_BAND_COLUMN) {
scheduleRowBandColumn(semaphore, listeners, pm);
- } else if (executionOrder == ExecutionOrder.ROW_COLUMN_BAND) {
+ } else if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_ROW_COLUMN_BAND) {
scheduleRowColumnBand(semaphore, listeners, pm);
- } else if (executionOrder == ExecutionOrder.BAND_ROW_COLUMN) {
+ } else if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_BAND_ROW_COLUMN) {
scheduleBandRowColumn(semaphore, listeners, pm);
+ } else if (effectiveExecutionOrder == ExecutionOrder.PULL_ROW_BAND_COLUMN) {
+ executeRowBandColumn(pm);
} else {
throw new IllegalArgumentException("executionOrder");
}
@@ -132,6 +138,18 @@
}
}
+ private ExecutionOrder getEffectiveExecutionOrder(ExecutionOrder executionOrder) {
+ ExecutionOrder effectiveExecutionOrder = executionOrder;
+ String executionOrderProperty = System.getProperty("beam.gpf.executionOrder");
+ if (executionOrderProperty != null) {
+ effectiveExecutionOrder = ExecutionOrder.valueOf(executionOrderProperty);
+ }
+ if (effectiveExecutionOrder != executionOrder) {
+ BeamLogManager.getSystemLogger().info("Changing execution order from " + executionOrder + " to " + effectiveExecutionOrder);
+ }
+ return effectiveExecutionOrder;
+ }
+
private void scheduleBandRowColumn(Semaphore semaphore, TileComputationListener[] listeners, ProgressMonitor pm) {
for (final PlanarImage image : images) {
for (int tileY = 0; tileY < tileCountY; tileY++) {
@@ -155,12 +173,28 @@
}
private void scheduleRowColumnBand(Semaphore semaphore, TileComputationListener[] listeners, ProgressMonitor pm) {
+ //better handle stack operators, should equal well work for normal operators
+ final TileComputationListener tcl = new OperatorTileComputationListenerStack(semaphore, images);
+ listeners = new TileComputationListener[]{tcl};
+
for (int tileY = 0; tileY < tileCountY; tileY++) {
for (int tileX = 0; tileX < tileCountX; tileX++) {
- BeamLogManager.getSystemLogger().info("Scheduling tile column " + tileX + ", row " + tileY);
- for (final PlanarImage image : images) {
- scheduleTile(image, tileX, tileY, semaphore, listeners, pm);
+ BeamLogManager.getSystemLogger().info("Scheduling tile x=" + tileX + " y=" + tileY);
+ checkForCancelation(pm);
+ acquirePermits(semaphore, 1);
+ if (error != null) {
+ semaphore.release(parallelism);
+ throw error;
}
+ Point[] points = new Point[]{new Point(tileX, tileY)};
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Note: GPF pull-processing is triggered here!!!
+ //
+ tileScheduler.scheduleTiles(images[0], points, listeners);
+ //
+ /////////////////////////////////////////////////////////////////////
+ pm.worked(1);
}
}
}
@@ -173,7 +207,7 @@
semaphore.release(parallelism);
throw error;
}
- Point[] points = new Point[] { new Point(tileX, tileY) };
+ Point[] points = new Point[]{new Point(tileX, tileY)};
/////////////////////////////////////////////////////////////////////
//
// Note: GPF pull-processing is triggered here!!!
@@ -243,6 +277,45 @@
}
}
+ private class OperatorTileComputationListenerStack implements TileComputationListener {
+
+ private final Semaphore semaphore;
+ private final PlanarImage[] images;
+
+ OperatorTileComputationListenerStack(Semaphore semaphore, PlanarImage[] images) {
+ this.semaphore = semaphore;
+ this.images = images;
+ }
+
+ @Override
+ public void tileComputed(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, int tileY,
+ Raster raster) {
+ for (PlanarImage planarImage : images) {
+ if (image != planarImage) {
+ planarImage.getTile(tileX, tileY);
+ }
+ }
+ semaphore.release();
+ }
+
+ @Override
+ public void tileCancelled(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, int tileY) {
+ if (error == null) {
+ error = new OperatorException("Operation cancelled.");
+ }
+ semaphore.release(parallelism);
+ }
+
+ @Override
+ public void tileComputationFailure(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX,
+ int tileY, Throwable situation) {
+ if (error == null) {
+ error = new OperatorException("Operation failed.", situation);
+ }
+ semaphore.release(parallelism);
+ }
+ }
+
private class OperatorTileComputationListener implements TileComputationListener {
private final Semaphore semaphore;
@@ -274,17 +347,17 @@
semaphore.release(parallelism);
}
}
-
+
private class GPFImagingListener implements ImagingListener {
@Override
public boolean errorOccurred(String message, Throwable thrown, Object where, boolean isRetryable)
- throws RuntimeException {
+ throws RuntimeException {
if (error == null && !thrown.getClass().getSimpleName().equals("MediaLibLoadException")) {
error = new OperatorException(thrown);
}
return false;
}
}
-
+
}