package rx; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static junit.framework.Assert.*; import org.junit.Test; import rx.Observable.OnSubscribe; import rx.Observable.Operator; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Func1; import rx.functions.Func2; import rx.internal.operators.OperatorFilter; import rx.internal.operators.OperatorMap; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; public class ObservableConversionTest { public static class Cylon {} public static class Jail { Object cylon; Jail(Object cylon) { this.cylon = cylon; } } public static class CylonDetectorObservable { protected OnSubscribe onSubscribe; public static CylonDetectorObservable create(OnSubscribe onSubscribe) { return new CylonDetectorObservable(onSubscribe); } protected CylonDetectorObservable(OnSubscribe onSubscribe) { this.onSubscribe = onSubscribe; } public void subscribe(Subscriber subscriber) { onSubscribe.call(subscriber); } public CylonDetectorObservable lift(Operator operator) { return x(new RobotConversionFunc(operator)); } public O x(Func1, O> operator) { return operator.call(onSubscribe); } public CylonDetectorObservable compose(Func1, CylonDetectorObservable> transformer) { return transformer.call(this); } public final CylonDetectorObservable beep(Func1 predicate) { return lift(new OperatorFilter(predicate)); } public final CylonDetectorObservable boop(Func1 func) { return lift(new OperatorMap(func)); } public CylonDetectorObservable DESTROY() { return boop(new Func1() { @Override public String call(T t) { Object cylon = ((Jail) t).cylon; throwOutTheAirlock(cylon); if (t instanceof Jail) { String name = cylon.toString(); return "Cylon '" + name + "' has been destroyed"; } else { return "Cylon 'anonymous' has been destroyed"; } }}); } private static void throwOutTheAirlock(Object cylon) { // ... } } public static class RobotConversionFunc implements Func1, CylonDetectorObservable> { private Operator operator; public RobotConversionFunc(Operator operator) { this.operator = operator; } @Override public CylonDetectorObservable call(final OnSubscribe onSubscribe) { return CylonDetectorObservable.create(new OnSubscribe() { @Override public void call(Subscriber o) { try { Subscriber st = operator.call(o); try { st.onStart(); onSubscribe.call(st); } catch (OnErrorNotImplementedException e) { throw e; } catch (Throwable e) { st.onError(e); } } catch (OnErrorNotImplementedException e) { throw e; } catch (Throwable e) { o.onError(e); } }}); } } public static class ConvertToCylonDetector implements Func1, CylonDetectorObservable> { @Override public CylonDetectorObservable call(final OnSubscribe onSubscribe) { return CylonDetectorObservable.create(onSubscribe); } } public static class ConvertToObservable implements Func1, Observable> { @Override public Observable call(final OnSubscribe onSubscribe) { return Observable.create(onSubscribe); } } @Test public void testConversionBetweenObservableClasses() { final TestSubscriber subscriber = new TestSubscriber(new Subscriber(){ @Override public void onCompleted() { System.out.println("Complete"); } @Override public void onError(Throwable e) { System.out.println("error: " + e.getMessage()); e.printStackTrace(); } @Override public void onNext(String t) { System.out.println(t); }}); List crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()}); Observable.from(crewOfBattlestarGalactica) .extend(new ConvertToCylonDetector()) .beep(new Func1(){ @Override public Boolean call(Object t) { return t instanceof Cylon; }}) .boop(new Func1() { @Override public Jail call(Object cylon) { return new Jail(cylon); }}) .DESTROY() .x(new ConvertToObservable()) .reduce("Cylon Detector finished. Report:\n", new Func2() { @Override public String call(String a, String n) { return a + n + "\n"; }}) .subscribe(subscriber); subscriber.assertNoErrors(); subscriber.assertCompleted(); } @Test public void testConvertToConcurrentQueue() { final AtomicReference thrown = new AtomicReference(null); final AtomicBoolean isFinished = new AtomicBoolean(false); ConcurrentLinkedQueue queue = Observable.range(0,5) .flatMap(new Func1>(){ @Override public Observable call(final Integer i) { return Observable.range(0, 5) .observeOn(Schedulers.io()) .map(new Func1(){ @Override public Integer call(Integer k) { try { Thread.sleep(System.currentTimeMillis() % 100); } catch (InterruptedException e) { e.printStackTrace(); } return i + k; }}); }}) .extend(new Func1, ConcurrentLinkedQueue>() { @Override public ConcurrentLinkedQueue call(OnSubscribe onSubscribe) { final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); onSubscribe.call(new Subscriber(){ @Override public void onCompleted() { isFinished.set(true); } @Override public void onError(Throwable e) { thrown.set(e); } @Override public void onNext(Integer t) { q.add(t); }}); return q; }}); int x = 0; while(!isFinished.get()) { Integer i = queue.poll(); if (i != null) { x++; System.out.println(x + " item: " + i); } } assertEquals(null, thrown.get()); } }