Index: beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java =================================================================== --- beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb) +++ beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java (revision ) @@ -201,7 +201,7 @@ getLogger().info("Start writing product " + getTargetProduct().getName() + " to " + getFile()); OperatorExecutor operatorExecutor = OperatorExecutor.create(this); try { - operatorExecutor.execute(ExecutionOrder.ROW_BAND_COLUMN, pm); + operatorExecutor.execute(ExecutionOrder.SCHEDULE_ROW_COLUMN_BAND, pm); getLogger().info("End writing product " + getTargetProduct().getName() + " to " + getFile()); Index: beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java =================================================================== --- beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb) +++ beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java (revision ) @@ -234,19 +234,16 @@ sourceProduct.setPreferredTileSize(50, 50); Operator op = new TestOP(sourceProduct); OperatorExecutor operatorExecutor = OperatorExecutor.create(op); - operatorExecutor.execute(ExecutionOrder.ROW_COLUMN_BAND, ProgressMonitor.NULL); + System.setProperty("beam.gpf.executionOrder", "SCHEDULE_ROW_COLUMN_BAND"); + operatorExecutor.execute(ExecutionOrder.PULL_ROW_BAND_COLUMN, ProgressMonitor.NULL); - assertEquals(17, recordingTileScheduler.recordedCalls.size()); + assertEquals(13, recordingTileScheduler.recordedCalls.size()); - assertEquals(8, recordingTileScheduler.requestedTileIndices.size()); + assertEquals(4, recordingTileScheduler.requestedTileIndices.size()); assertEquals(new Point(0, 0), recordingTileScheduler.requestedTileIndices.get(0)); - assertEquals(new Point(0, 0), recordingTileScheduler.requestedTileIndices.get(1)); - assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(2)); - assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(3)); - assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(4)); - assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(5)); - assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(6)); - assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(7)); + assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(1)); + assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(2)); + assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(3)); } private Product createSourceProduct() { Index: beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java =================================================================== --- beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb) +++ beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java (revision ) @@ -44,7 +44,7 @@ * target product of the given operator have. The computation of these tiles is * parallelized to use all available CPUs (cores) using the JAI * {@link TileScheduler}. - * + * * @author Marco Zuehlke * @since BEAM 4.7 */ @@ -66,15 +66,19 @@ } public enum ExecutionOrder { - ROW_COLUMN_BAND, - ROW_BAND_COLUMN, + SCHEDULE_ROW_COLUMN_BAND, + SCHEDULE_ROW_BAND_COLUMN, /** * Minimize disk seeks if following conditions are met:
* 1. Bands can be computed independently of each other
* 2. I/O-bound processing (time to compute band pixels will less than * time for I/O).
*/ - BAND_ROW_COLUMN, + SCHEDULE_BAND_ROW_COLUMN, + /** + * for debugging purpose + */ + PULL_ROW_BAND_COLUMN, } private final int tileCountX; @@ -87,7 +91,7 @@ public OperatorExecutor(PlanarImage[] images, int tileCountX, int tileCountY) { this(images, tileCountX, tileCountY, JAI.getDefaultInstance().getTileScheduler().getParallelism()); } - + public OperatorExecutor(PlanarImage[] images, int tileCountX, int tileCountY, int parallelism) { this.images = images; this.tileCountX = tileCountX; @@ -97,27 +101,29 @@ } public void execute(ProgressMonitor pm) { - execute(ExecutionOrder.ROW_BAND_COLUMN, pm); + execute(ExecutionOrder.SCHEDULE_ROW_BAND_COLUMN, pm); } public void execute(ExecutionOrder executionOrder, ProgressMonitor pm) { final Semaphore semaphore = new Semaphore(parallelism, true); final TileComputationListener tcl = new OperatorTileComputationListener(semaphore); - final TileComputationListener[] listeners = new TileComputationListener[] { tcl }; + final TileComputationListener[] listeners = new TileComputationListener[]{tcl}; - + ImagingListener imagingListener = JAI.getDefaultInstance().getImagingListener(); JAI.getDefaultInstance().setImagingListener(new GPFImagingListener()); pm.beginTask("Executing operator...", tileCountX * tileCountY * images.length); - + + ExecutionOrder effectiveExecutionOrder = getEffectiveExecutionOrder(executionOrder); + try { - if (executionOrder == ExecutionOrder.ROW_BAND_COLUMN) { - // for debugging purpose - // executeRowBandColumn(pm); + if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_ROW_BAND_COLUMN) { scheduleRowBandColumn(semaphore, listeners, pm); - } else if (executionOrder == ExecutionOrder.ROW_COLUMN_BAND) { + } else if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_ROW_COLUMN_BAND) { scheduleRowColumnBand(semaphore, listeners, pm); - } else if (executionOrder == ExecutionOrder.BAND_ROW_COLUMN) { + } else if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_BAND_ROW_COLUMN) { scheduleBandRowColumn(semaphore, listeners, pm); + } else if (effectiveExecutionOrder == ExecutionOrder.PULL_ROW_BAND_COLUMN) { + executeRowBandColumn(pm); } else { throw new IllegalArgumentException("executionOrder"); } @@ -132,6 +138,18 @@ } } + private ExecutionOrder getEffectiveExecutionOrder(ExecutionOrder executionOrder) { + ExecutionOrder effectiveExecutionOrder = executionOrder; + String executionOrderProperty = System.getProperty("beam.gpf.executionOrder"); + if (executionOrderProperty != null) { + effectiveExecutionOrder = ExecutionOrder.valueOf(executionOrderProperty); + } + if (effectiveExecutionOrder != executionOrder) { + BeamLogManager.getSystemLogger().info("Changing execution order from " + executionOrder + " to " + effectiveExecutionOrder); + } + return effectiveExecutionOrder; + } + private void scheduleBandRowColumn(Semaphore semaphore, TileComputationListener[] listeners, ProgressMonitor pm) { for (final PlanarImage image : images) { for (int tileY = 0; tileY < tileCountY; tileY++) { @@ -155,12 +173,28 @@ } private void scheduleRowColumnBand(Semaphore semaphore, TileComputationListener[] listeners, ProgressMonitor pm) { + //better handle stack operators, should equal well work for normal operators + final TileComputationListener tcl = new OperatorTileComputationListenerStack(semaphore, images); + listeners = new TileComputationListener[]{tcl}; + for (int tileY = 0; tileY < tileCountY; tileY++) { for (int tileX = 0; tileX < tileCountX; tileX++) { - BeamLogManager.getSystemLogger().info("Scheduling tile column " + tileX + ", row " + tileY); - for (final PlanarImage image : images) { - scheduleTile(image, tileX, tileY, semaphore, listeners, pm); + BeamLogManager.getSystemLogger().info("Scheduling tile x=" + tileX + " y=" + tileY); + checkForCancelation(pm); + acquirePermits(semaphore, 1); + if (error != null) { + semaphore.release(parallelism); + throw error; } + Point[] points = new Point[]{new Point(tileX, tileY)}; + ///////////////////////////////////////////////////////////////////// + // + // Note: GPF pull-processing is triggered here!!! + // + tileScheduler.scheduleTiles(images[0], points, listeners); + // + ///////////////////////////////////////////////////////////////////// + pm.worked(1); } } } @@ -173,7 +207,7 @@ semaphore.release(parallelism); throw error; } - Point[] points = new Point[] { new Point(tileX, tileY) }; + Point[] points = new Point[]{new Point(tileX, tileY)}; ///////////////////////////////////////////////////////////////////// // // Note: GPF pull-processing is triggered here!!! @@ -243,6 +277,45 @@ } } + private class OperatorTileComputationListenerStack implements TileComputationListener { + + private final Semaphore semaphore; + private final PlanarImage[] images; + + OperatorTileComputationListenerStack(Semaphore semaphore, PlanarImage[] images) { + this.semaphore = semaphore; + this.images = images; + } + + @Override + public void tileComputed(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, int tileY, + Raster raster) { + for (PlanarImage planarImage : images) { + if (image != planarImage) { + planarImage.getTile(tileX, tileY); + } + } + semaphore.release(); + } + + @Override + public void tileCancelled(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, int tileY) { + if (error == null) { + error = new OperatorException("Operation cancelled."); + } + semaphore.release(parallelism); + } + + @Override + public void tileComputationFailure(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, + int tileY, Throwable situation) { + if (error == null) { + error = new OperatorException("Operation failed.", situation); + } + semaphore.release(parallelism); + } + } + private class OperatorTileComputationListener implements TileComputationListener { private final Semaphore semaphore; @@ -274,17 +347,17 @@ semaphore.release(parallelism); } } - + private class GPFImagingListener implements ImagingListener { @Override public boolean errorOccurred(String message, Throwable thrown, Object where, boolean isRetryable) - throws RuntimeException { + throws RuntimeException { if (error == null && !thrown.getClass().getSimpleName().equals("MediaLibLoadException")) { error = new OperatorException(thrown); } return false; } } - + }