|
| 1 | +package reproductions; |
| 2 | + |
| 3 | +import graphql.ExecutionResult; |
| 4 | +import graphql.GraphQL; |
| 5 | +import graphql.schema.DataFetchingEnvironment; |
| 6 | +import graphql.schema.GraphQLSchema; |
| 7 | +import graphql.schema.idl.RuntimeWiring; |
| 8 | +import graphql.schema.idl.SchemaGenerator; |
| 9 | +import graphql.schema.idl.SchemaParser; |
| 10 | +import org.reactivestreams.Publisher; |
| 11 | +import org.reactivestreams.Subscriber; |
| 12 | +import org.reactivestreams.Subscription; |
| 13 | +import reactor.core.publisher.Flux; |
| 14 | + |
| 15 | +import javax.annotation.Nonnull; |
| 16 | +import java.util.Map; |
| 17 | +import java.util.Random; |
| 18 | +import java.util.concurrent.CompletableFuture; |
| 19 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 20 | + |
| 21 | +import static graphql.schema.idl.RuntimeWiring.newRuntimeWiring; |
| 22 | +import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring; |
| 23 | + |
| 24 | +/** |
| 25 | + * Related to <a href="https://github.com/spring-projects/spring-graphql/issues/949">...</a> |
| 26 | + * <p> |
| 27 | + * This reproduction is to see what's happening with Subscriptions and whether they keep their |
| 28 | + * order when values are async. |
| 29 | + */ |
| 30 | +public class SubscriptionReproduction { |
| 31 | + public static void main(String[] args) { |
| 32 | + new SubscriptionReproduction().run(); |
| 33 | + } |
| 34 | + |
| 35 | + private void run() { |
| 36 | + |
| 37 | + GraphQL graphQL = mkGraphQl(); |
| 38 | + String query = "subscription MySubscription {\n" + |
| 39 | + " searchVideo {\n" + |
| 40 | + " id\n" + |
| 41 | + " name\n" + |
| 42 | + " isFavorite\n" + |
| 43 | + " }\n" + |
| 44 | + "}"; |
| 45 | + ExecutionResult executionResult = graphQL.execute(query); |
| 46 | + Publisher<Map<String, Object>> publisher = executionResult.getData(); |
| 47 | + |
| 48 | + DeathEater eater = new DeathEater(); |
| 49 | + eater.eat(publisher); |
| 50 | + } |
| 51 | + |
| 52 | + private GraphQL mkGraphQl() { |
| 53 | + String sdl = "type Query { f : ID }" + |
| 54 | + "type Subscription {" + |
| 55 | + " searchVideo : VideoSearch" + |
| 56 | + "}" + |
| 57 | + "type VideoSearch {" + |
| 58 | + " id : ID" + |
| 59 | + " name : String" + |
| 60 | + " isFavorite : Boolean" + |
| 61 | + "}"; |
| 62 | + RuntimeWiring runtimeWiring = newRuntimeWiring() |
| 63 | + .type(newTypeWiring("Subscription") |
| 64 | + .dataFetcher("searchVideo", this::mkFluxDF) |
| 65 | + ) |
| 66 | + .type(newTypeWiring("VideoSearch") |
| 67 | + .dataFetcher("name", this::nameDF) |
| 68 | + .dataFetcher("isFavorite", this::isFavoriteDF) |
| 69 | + ) |
| 70 | + .build(); |
| 71 | + |
| 72 | + GraphQLSchema schema = new SchemaGenerator().makeExecutableSchema( |
| 73 | + new SchemaParser().parse(sdl), runtimeWiring |
| 74 | + ); |
| 75 | + return GraphQL.newGraphQL(schema).build(); |
| 76 | + } |
| 77 | + |
| 78 | + private CompletableFuture<Flux<Object>> mkFluxDF(DataFetchingEnvironment env) { |
| 79 | + // async deliver of the publisher with random snoozing between values |
| 80 | + return CompletableFuture.supplyAsync(() -> Flux.generate(() -> 0, (counter, sink) -> { |
| 81 | + sink.next(mkValue(counter)); |
| 82 | + snooze(rand(10, 100)); |
| 83 | + if (counter == 10) { |
| 84 | + sink.complete(); |
| 85 | + } |
| 86 | + return counter + 1; |
| 87 | + })); |
| 88 | + } |
| 89 | + |
| 90 | + private Object isFavoriteDF(DataFetchingEnvironment env) { |
| 91 | + // async deliver of the isFavorite property with random delay |
| 92 | + return CompletableFuture.supplyAsync(() -> { |
| 93 | + Integer counter = getCounter(env.getSource()); |
| 94 | + return counter % 2 == 0; |
| 95 | + }); |
| 96 | + } |
| 97 | + |
| 98 | + private Object nameDF(DataFetchingEnvironment env) { |
| 99 | + // async deliver of the isFavorite property with random delay |
| 100 | + return CompletableFuture.supplyAsync(() -> { |
| 101 | + Integer counter = getCounter(env.getSource()); |
| 102 | + return "name" + counter; |
| 103 | + }); |
| 104 | + } |
| 105 | + |
| 106 | + private static Integer getCounter(Map<String, Object> video) { |
| 107 | + Integer counter = (Integer) video.getOrDefault("counter", 0); |
| 108 | + snooze(rand(100, 500)); |
| 109 | + return counter; |
| 110 | + } |
| 111 | + |
| 112 | + private @Nonnull Object mkValue(Integer counter) { |
| 113 | + // name and isFavorite are future values via DFs |
| 114 | + return Map.of( |
| 115 | + "counter", counter, |
| 116 | + "id", String.valueOf(counter) // immediate value |
| 117 | + ); |
| 118 | + } |
| 119 | + |
| 120 | + |
| 121 | + private static void snooze(int ms) { |
| 122 | + try { |
| 123 | + Thread.sleep(ms); |
| 124 | + } catch (InterruptedException e) { |
| 125 | + throw new RuntimeException(e); |
| 126 | + } |
| 127 | + } |
| 128 | + |
| 129 | + static Random rn = new Random(); |
| 130 | + |
| 131 | + private static int rand(int min, int max) { |
| 132 | + return rn.nextInt(max - min + 1) + min; |
| 133 | + } |
| 134 | + |
| 135 | + public static class DeathEater implements Subscriber<Object> { |
| 136 | + private Subscription subscription; |
| 137 | + private final AtomicBoolean done = new AtomicBoolean(); |
| 138 | + |
| 139 | + public boolean isDone() { |
| 140 | + return done.get(); |
| 141 | + } |
| 142 | + |
| 143 | + @Override |
| 144 | + public void onSubscribe(Subscription subscription) { |
| 145 | + this.subscription = subscription; |
| 146 | + System.out.println("onSubscribe"); |
| 147 | + subscription.request(1); |
| 148 | + } |
| 149 | + |
| 150 | + @Override |
| 151 | + public void onNext(Object o) { |
| 152 | + System.out.println("\tonNext : " + o); |
| 153 | + subscription.request(1); |
| 154 | + } |
| 155 | + |
| 156 | + @Override |
| 157 | + public void onError(Throwable throwable) { |
| 158 | + System.out.println("onError"); |
| 159 | + throwable.printStackTrace(System.err); |
| 160 | + done.set(true); |
| 161 | + } |
| 162 | + |
| 163 | + @Override |
| 164 | + public void onComplete() { |
| 165 | + System.out.println("complete"); |
| 166 | + done.set(true); |
| 167 | + } |
| 168 | + |
| 169 | + public void eat(Publisher<?> publisher) { |
| 170 | + publisher.subscribe(this); |
| 171 | + while (!this.isDone()) { |
| 172 | + snooze(2); |
| 173 | + } |
| 174 | + |
| 175 | + } |
| 176 | + } |
| 177 | +} |
0 commit comments