package rx; import java.util.HashMap; import java.util.Map; import org.junit.Test; import rx.EventStream.Event; import rx.util.functions.Action1; import rx.util.functions.Func2; public class ScanTests { @Test public void testUnsubscribeScan() { EventStream.getEventStream("HTTP-ClusterB", 20) .scan(new HashMap(), new Func2, Event, Map>() { @Override public Map call(Map accum, Event perInstanceEvent) { accum.put("instance", perInstanceEvent.instanceId); return accum; } }) .take(10) .toBlockingObservable().forEach(new Action1>() { @Override public void call(Map v) { System.out.println(v); } }); } }