/* * Copyright 2012 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package com.linkedin.parseq; import com.linkedin.parseq.internal.InternalUtil; import com.linkedin.parseq.internal.SystemHiddenTask; import com.linkedin.parseq.promise.Promise; import com.linkedin.parseq.promise.PromiseListener; import com.linkedin.parseq.promise.Promises; import com.linkedin.parseq.promise.SettablePromise; import com.linkedin.parseq.trace.ResultType; import com.linkedin.parseq.trace.ShallowTrace; import com.linkedin.parseq.trace.ShallowTraceBuilder; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * A {@link Task} that will run all of the constructor-supplied tasks in parallel. *

* Use {@link Tasks#par(Task[])} or {@link Tasks#par(Iterable)} to create an * instance of this class. * * @author Chris Pettitt (cpettitt@linkedin.com) * @author Chi Chan (ckchan@linkedin.com) */ /* package private */ class ParTaskImpl extends SystemHiddenTask> implements ParTask { private final List> _tasks; public ParTaskImpl(final String name, final Iterable> tasks) { super(name); List> taskList = new ArrayList>(); for(Task task : tasks) { // Safe to coerce Task to Task @SuppressWarnings("unchecked") final Task coercedTask = (Task)task; taskList.add(coercedTask); } if (taskList.isEmpty()) { throw new IllegalArgumentException("No tasks to parallelize!"); } _tasks = Collections.unmodifiableList(taskList); } @Override protected Promise> run(final Context context) throws Exception { final SettablePromise> result = Promises.settable(); final PromiseListener listener = new PromiseListener() { @Override public void onResolved(Promise resolvedPromise) { boolean allEarlyFinish = true; final List taskResult = new ArrayList(); final List errors = new ArrayList(); for (Task task : _tasks) { if (task.isFailed()) { if (allEarlyFinish && ResultType.fromTask(task) != ResultType.EARLY_FINISH) { allEarlyFinish = false; } errors.add(task.getError()); } else { taskResult.add(task.get()); } } if (!errors.isEmpty()) { result.fail(allEarlyFinish ? errors.get(0) : new MultiException("Multiple errors in 'ParTask' task.", errors)); } else { result.done(taskResult); } } }; InternalUtil.after(listener, _tasks.toArray(new Task[_tasks.size()])); for (Task task : _tasks) { context.run(task); } return result; } @Override public List> getTasks() { return _tasks; } @Override public List getSuccessful() { if(!this.isFailed()) { return this.get(); } final List taskResult = new ArrayList(); for (Task task : _tasks) { if (!task.isFailed()) { taskResult.add(task.get()); } } return taskResult; } }