11/**
22 * Copyright 2013 Netflix, Inc.
3- *
3+ *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
66 * You may obtain a copy of the License at
7- *
8- * http://www.apache.org/licenses/LICENSE-2.0
9- *
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
1010 * Unless required by applicable law or agreed to in writing, software
1111 * distributed under the License is distributed on an "AS IS" BASIS,
1212 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515 */
1616package rx .observables ;
1717
18+ import java .io .IOException ;
19+ import java .io .InputStream ;
20+ import java .io .Reader ;
1821import java .nio .ByteBuffer ;
1922import java .nio .CharBuffer ;
2023import java .nio .charset .CharacterCodingException ;
2730import java .util .regex .Pattern ;
2831
2932import rx .Observable ;
30- import rx .Observable .OnSubscribeFunc ;
31- import rx .Observer ;
32- import rx .Subscription ;
33+ import rx .Observable .OnSubscribe ;
34+ import rx .Subscriber ;
35+ import rx .operators . Operator ;
3336import rx .util .functions .Func1 ;
3437import rx .util .functions .Func2 ;
3538
3639public class StringObservable {
40+ public static Observable <byte []> from (final InputStream i ) {
41+ return from (i , 8 * 1024 );
42+ }
43+
44+ public static Observable <byte []> from (final InputStream i , final int size ) {
45+ return Observable .create (new OnSubscribe <byte []>() {
46+ @ Override
47+ public void call (Subscriber <? super byte []> o ) {
48+ byte [] buffer = new byte [size ];
49+ try {
50+ if (o .isUnsubscribed ())
51+ return ;
52+ int n = 0 ;
53+ n = i .read (buffer );
54+ while (n != -1 && !o .isUnsubscribed ()) {
55+ o .onNext (Arrays .copyOf (buffer , n ));
56+ n = i .read (buffer );
57+ }
58+ } catch (IOException e ) {
59+ o .onError (e );
60+ }
61+ if (o .isUnsubscribed ())
62+ return ;
63+ o .onCompleted ();
64+ }
65+ });
66+ }
67+
68+ public static Observable <String > from (final Reader i ) {
69+ return from (i , 8 * 1024 );
70+ }
71+
72+ public static Observable <String > from (final Reader i , final int size ) {
73+ return Observable .create (new OnSubscribe <String >() {
74+ @ Override
75+ public void call (Subscriber <? super String > o ) {
76+ char [] buffer = new char [size ];
77+ try {
78+ if (o .isUnsubscribed ())
79+ return ;
80+ int n = 0 ;
81+ n = i .read (buffer );
82+ while (n != -1 && !o .isUnsubscribed ()) {
83+ o .onNext (new String (buffer ));
84+ n = i .read (buffer );
85+ }
86+ } catch (IOException e ) {
87+ o .onError (e );
88+ }
89+ if (o .isUnsubscribed ())
90+ return ;
91+ o .onCompleted ();
92+ }
93+ });
94+ }
95+
3796 /**
38- * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
97+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
98+ * and where handles when a multibyte character spans two chunks.
3999 *
40100 * @param src
41101 * @param charsetName
@@ -46,7 +106,8 @@ public static Observable<String> decode(Observable<byte[]> src, String charsetNa
46106 }
47107
48108 /**
49- * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
109+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
110+ * and where handles when a multibyte character spans two chunks.
50111 *
51112 * @param src
52113 * @param charset
@@ -57,30 +118,31 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
57118 }
58119
59120 /**
60- * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
121+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
122+ * and where handles when a multibyte character spans two chunks.
61123 * This method allows for more control over how malformed and unmappable characters are handled.
62124 *
63125 * @param src
64126 * @param charsetDecoder
65127 * @return
66128 */
67129 public static Observable <String > decode (final Observable <byte []> src , final CharsetDecoder charsetDecoder ) {
68- return Observable . create (new OnSubscribeFunc <String >() {
130+ return src . lift (new Operator <String , byte [] >() {
69131 @ Override
70- public Subscription onSubscribe (final Observer <? super String > observer ) {
71- return src . subscribe ( new Observer <byte []>() {
132+ public Subscriber <? super byte []> call (final Subscriber <? super String > o ) {
133+ return new Subscriber <byte []>(o ) {
72134 private ByteBuffer leftOver = null ;
73135
74136 @ Override
75137 public void onCompleted () {
76138 if (process (null , leftOver , true ))
77- observer .onCompleted ();
139+ o .onCompleted ();
78140 }
79141
80142 @ Override
81143 public void onError (Throwable e ) {
82144 if (process (null , leftOver , true ))
83- observer .onError (e );
145+ o .onError (e );
84146 }
85147
86148 @ Override
@@ -120,7 +182,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
120182 cr .throwException ();
121183 }
122184 catch (CharacterCodingException e ) {
123- observer .onError (e );
185+ o .onError (e );
124186 return false ;
125187 }
126188 }
@@ -134,11 +196,11 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
134196
135197 String string = cb .toString ();
136198 if (!string .isEmpty ())
137- observer .onNext (string );
199+ o .onNext (string );
138200
139201 return true ;
140202 }
141- }) ;
203+ };
142204 }
143205 });
144206 }
@@ -190,13 +252,14 @@ public byte[] call(String str) {
190252 }
191253
192254 /**
193- * Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams.
255+ * Gather up all of the strings in to one string to be able to use it as one message. Don't use
256+ * this on infinite streams.
194257 *
195258 * @param src
196259 * @return
197260 */
198261 public static Observable <String > stringConcat (Observable <String > src ) {
199- return src .aggregate (new Func2 <String , String , String >() {
262+ return src .reduce (new Func2 <String , String , String >() {
200263 @ Override
201264 public String call (String a , String b ) {
202265 return a + b ;
@@ -218,22 +281,25 @@ public String call(String a, String b) {
218281 */
219282 public static Observable <String > split (final Observable <String > src , String regex ) {
220283 final Pattern pattern = Pattern .compile (regex );
221- return Observable .create (new OnSubscribeFunc <String >() {
284+
285+ return src .lift (new Operator <String , String >() {
222286 @ Override
223- public Subscription onSubscribe (final Observer <? super String > observer ) {
224- return src . subscribe ( new Observer <String >() {
287+ public Subscriber <? super String > call (final Subscriber <? super String > o ) {
288+ return new Subscriber <String >(o ) {
225289 private String leftOver = null ;
226290
227291 @ Override
228292 public void onCompleted () {
229293 output (leftOver );
230- observer .onCompleted ();
294+ if (!o .isUnsubscribed ())
295+ o .onCompleted ();
231296 }
232297
233298 @ Override
234299 public void onError (Throwable e ) {
235300 output (leftOver );
236- observer .onError (e );
301+ if (!o .isUnsubscribed ())
302+ o .onError (e );
237303 }
238304
239305 @ Override
@@ -250,24 +316,29 @@ public void onNext(String segment) {
250316 }
251317
252318 private int emptyPartCount = 0 ;
319+
253320 /**
254321 * when limit == 0 trailing empty parts are not emitted.
322+ *
255323 * @param part
256324 */
257325 private void output (String part ) {
258326 if (part .isEmpty ()) {
259327 emptyPartCount ++;
260328 }
261329 else {
262- for (; emptyPartCount >0 ; emptyPartCount --)
263- observer .onNext ("" );
264- observer .onNext (part );
330+ for (; emptyPartCount > 0 ; emptyPartCount --)
331+ if (!o .isUnsubscribed ())
332+ o .onNext ("" );
333+ if (!o .isUnsubscribed ())
334+ o .onNext (part );
265335 }
266336 }
267- }) ;
337+ };
268338 }
269339 });
270340 }
341+
271342 /**
272343 * Concatenates the sequence of values by adding a separator
273344 * between them and emitting the result once the source completes.
@@ -276,49 +347,55 @@ private void output(String part) {
276347 * {@link java.lang.String#valueOf(java.lang.Object)} calls.
277348 * <p>
278349 * For example:
350+ *
279351 * <pre>
280- * Observable<Object> source = Observable.from("a" , 1, "c" );
281- * Observable<String> result = join(source, ", " );
352+ * Observable<Object> source = Observable.from("a" , 1, "c" );
353+ * Observable<String> result = join(source, ", " );
282354 * </pre>
283355 *
284356 * will yield a single element equal to "a, 1, c".
285357 *
286- * @param source the source sequence of CharSequence values
287- * @param separator the separator to a
358+ * @param source
359+ * the source sequence of CharSequence values
360+ * @param separator
361+ * the separator to a
288362 * @return an Observable which emits a single String value having the concatenated
289363 * values of the source observable with the separator between elements
290364 */
291365 public static <T > Observable <String > join (final Observable <T > source , final CharSequence separator ) {
292- return Observable .create (new OnSubscribeFunc <String >() {
293-
366+ return source .lift (new Operator <String , T >() {
294367 @ Override
295- public Subscription onSubscribe (final Observer <? super String > t1 ) {
296- return source . subscribe ( new Observer <T >() {
368+ public Subscriber < T > call (final Subscriber <? super String > o ) {
369+ return new Subscriber <T >(o ) {
297370 boolean mayAddSeparator ;
298371 StringBuilder b = new StringBuilder ();
372+
299373 @ Override
300- public void onNext (T args ) {
301- if (mayAddSeparator ) {
302- b .append (separator );
303- }
304- mayAddSeparator = true ;
305- b .append (String .valueOf (args ));
374+ public void onCompleted () {
375+ String str = b .toString ();
376+ b = null ;
377+ if (!o .isUnsubscribed ())
378+ o .onNext (str );
379+ if (!o .isUnsubscribed ())
380+ o .onCompleted ();
306381 }
307382
308383 @ Override
309384 public void onError (Throwable e ) {
310385 b = null ;
311- t1 .onError (e );
386+ if (!o .isUnsubscribed ())
387+ o .onError (e );
312388 }
313389
314390 @ Override
315- public void onCompleted () {
316- String str = b .toString ();
317- b = null ;
318- t1 .onNext (str );
319- t1 .onCompleted ();
391+ public void onNext (Object t ) {
392+ if (mayAddSeparator ) {
393+ b .append (separator );
394+ }
395+ mayAddSeparator = true ;
396+ b .append (String .valueOf (t ));
320397 }
321- }) ;
398+ };
322399 }
323400 });
324401 }
0 commit comments