File tree Expand file tree Collapse file tree
rxjava-contrib/rxjava-string/src Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -71,11 +71,11 @@ public void call(Subscriber<? super byte[]> o) {
7171 try {
7272 if (o .isUnsubscribed ())
7373 return ;
74- int n = 0 ;
75- n = i .read (buffer );
74+ int n = i .read (buffer );
7675 while (n != -1 && !o .isUnsubscribed ()) {
7776 o .onNext (Arrays .copyOf (buffer , n ));
78- n = i .read (buffer );
77+ if (!o .isUnsubscribed ())
78+ n = i .read (buffer );
7979 }
8080 } catch (IOException e ) {
8181 o .onError (e );
Original file line number Diff line number Diff line change 3333import java .nio .charset .MalformedInputException ;
3434import java .util .Arrays ;
3535import java .util .List ;
36+ import java .util .concurrent .atomic .AtomicInteger ;
3637
3738import org .junit .Test ;
3839
@@ -246,6 +247,22 @@ public void testFromInputStream() {
246247 assertArrayEquals (inBytes , outBytes );
247248 }
248249
250+ @ Test
251+ public void testFromInputStreamWillUnsubscribeBeforeCallingNextRead () {
252+ final byte [] inBytes = "test" .getBytes ();
253+ final AtomicInteger numReads = new AtomicInteger (0 );
254+ ByteArrayInputStream is = new ByteArrayInputStream (inBytes ) {
255+
256+ @ Override
257+ public synchronized int read (byte [] b , int off , int len ) {
258+ numReads .incrementAndGet ();
259+ return super .read (b , off , len );
260+ }
261+ };
262+ StringObservable .from (is ).first ().toBlockingObservable ().single ();
263+ assertEquals (1 , numReads .get ());
264+ }
265+
249266 @ Test
250267 public void testFromReader () {
251268 final String inStr = "test" ;
You can’t perform that action at this time.
0 commit comments