1111import org .reactivestreams .Subscriber ;
1212import org .reactivestreams .Subscription ;
1313import reactor .core .publisher .Flux ;
14+ import reactor .core .publisher .Mono ;
15+ import reactor .core .scheduler .Schedulers ;
1416
1517import javax .annotation .Nonnull ;
1618import java .util .Map ;
@@ -39,6 +41,7 @@ private void run() {
3941 " searchVideo {\n " +
4042 " id\n " +
4143 " name\n " +
44+ " lastEpisode\n " +
4245 " isFavorite\n " +
4346 " }\n " +
4447 "}" ;
@@ -57,6 +60,7 @@ private GraphQL mkGraphQl() {
5760 "type VideoSearch {" +
5861 " id : ID" +
5962 " name : String" +
63+ " lastEpisode : String" +
6064 " isFavorite : Boolean" +
6165 "}" ;
6266 RuntimeWiring runtimeWiring = newRuntimeWiring ()
@@ -66,6 +70,7 @@ private GraphQL mkGraphQl() {
6670 .type (newTypeWiring ("VideoSearch" )
6771 .dataFetcher ("name" , this ::nameDF )
6872 .dataFetcher ("isFavorite" , this ::isFavoriteDF )
73+ .dataFetcher ("lastEpisode" , this ::lastEpisode )
6974 )
7075 .build ();
7176
@@ -95,6 +100,16 @@ private Object isFavoriteDF(DataFetchingEnvironment env) {
95100 });
96101 }
97102
103+ private Object lastEpisode (DataFetchingEnvironment env ) {
104+ // Mono-based async property that uses CF as the interface
105+ return Mono .fromCallable (() -> {
106+ Integer counter = getCounter (env .getSource ());
107+ return "episode-" + Thread .currentThread ().getName () + "for" + counter ;
108+ })
109+ .publishOn (Schedulers .boundedElastic ())
110+ .toFuture ();
111+ }
112+
98113 private Object nameDF (DataFetchingEnvironment env ) {
99114 // async deliver of the isFavorite property with random delay
100115 return CompletableFuture .supplyAsync (() -> {
0 commit comments