1515 */
1616package rx .operators ;
1717
18- import java .util .concurrent .ConcurrentLinkedQueue ;
18+ import java .util .ArrayList ;
19+ import java .util .List ;
1920import java .util .concurrent .atomic .AtomicLong ;
2021
2122import rx .Observable .Operator ;
@@ -60,7 +61,7 @@ private static class ObserveOnSubscriber<T> extends Subscriber<T> {
6061 final Subscriber <? super T > observer ;
6162 private final Scheduler .Worker recursiveScheduler ;
6263
63- private final ConcurrentLinkedQueue < Object > queue = new ConcurrentLinkedQueue < Object > ();
64+ private FastList queue = new FastList ();
6465 final AtomicLong counter = new AtomicLong (0 );
6566
6667 public ObserveOnSubscriber (Scheduler scheduler , Subscriber <? super T > subscriber ) {
@@ -72,19 +73,25 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber
7273
7374 @ Override
7475 public void onNext (final T t ) {
75- queue .offer (on .next (t ));
76+ synchronized (this ) {
77+ queue .add (on .next (t ));
78+ }
7679 schedule ();
7780 }
7881
7982 @ Override
8083 public void onCompleted () {
81- queue .offer (on .completed ());
84+ synchronized (this ) {
85+ queue .add (on .completed ());
86+ }
8287 schedule ();
8388 }
8489
8590 @ Override
8691 public void onError (final Throwable e ) {
87- queue .offer (on .error (e ));
92+ synchronized (this ) {
93+ queue .add (on .error (e ));
94+ }
8895 schedule ();
8996 }
9097
@@ -103,11 +110,43 @@ public void call() {
103110
104111 private void pollQueue () {
105112 do {
106- Object v = queue .poll ();
107- on .accept (observer , v );
108- } while (counter .decrementAndGet () > 0 );
113+ FastList vs ;
114+ synchronized (this ) {
115+ vs = queue ;
116+ queue = new FastList ();
117+ }
118+ for (Object v : vs .array ) {
119+ if (v == null ) {
120+ break ;
121+ }
122+ on .accept (observer , v );
123+ }
124+ if (counter .addAndGet (-vs .size ) == 0 ) {
125+ break ;
126+ }
127+ } while (true );
109128 }
110129
111130 }
112131
132+ static final class FastList {
133+ Object [] array ;
134+ int size ;
135+
136+ public void add (Object o ) {
137+ int s = size ;
138+ Object [] a = array ;
139+ if (a == null ) {
140+ a = new Object [16 ];
141+ array = a ;
142+ } else if (s == a .length ) {
143+ Object [] array2 = new Object [s + (s >> 2 )];
144+ System .arraycopy (a , 0 , array2 , 0 , s );
145+ a = array2 ;
146+ array = a ;
147+ }
148+ a [s ] = o ;
149+ size = s + 1 ;
150+ }
151+ }
113152}
0 commit comments