Skip to content

3.x Flowable#sample may complete on an interrupted thread #7201

@feneuilflo

Description

@feneuilflo

Hi !

Using version 3.0.10, and the following code:

int i = 0;
while (true) {
        System.out.println("i = " + i++);
        PublishSubject<Long> subj = PublishSubject.create();
        subj.toFlowable(BackpressureStrategy.ERROR)
	        .sample(10, TimeUnit.MILLISECONDS)
	        .doFinally(() -> {
		        System.out.println(String.format("Current thread %s is interrupted: %b", 
                                        Thread.currentThread().getName(),
				        Thread.currentThread().isInterrupted())
                        );
		        if (Thread.currentThread().isInterrupted()) {
			        System.exit(1);
		        }
	        })
	        .subscribe();
        subj.onNext(1L);
        
        Observable.timer(10, TimeUnit.MILLISECONDS)
	        .blockingSubscribe(any -> subj.onComplete());
}

the loop ends after a few iterations (usually between 250 and 300 on my device).

The first iterations give: Current thread main is interrupted: false.
And the last: Current thread RxComputationThreadPool-7 is interrupted: true.

I quite understand why I get this result, but I'm still surprised by it. Is it an known/accepted/intended behavior ?

NB 1: Using buffer unstead of sample gives the same result.
NB 2: Changing sample scheduler to Schedulers.from(Executors.newSingleThreadExecutor(...)) causes the loop to never end.
NB 3: First observed with RxJava2, with no change in behavior.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions