Skip to content

Commit 5dd3583

Browse files
committed
Reproduction showing async DFs keeping field order
spring-projects/spring-graphql#949
1 parent 321c7d8 commit 5dd3583

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ dependencies {
117117

118118
testImplementation 'org.reactivestreams:reactive-streams-tck:' + reactiveStreamsVersion
119119
testImplementation "io.reactivex.rxjava2:rxjava:2.2.21"
120+
testImplementation "io.projectreactor:reactor-core:3.6.5"
120121

121122
testImplementation 'org.testng:testng:7.10.1' // use for reactive streams test inheritance
122123

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package reproductions;
2+
3+
import graphql.ExecutionResult;
4+
import graphql.GraphQL;
5+
import graphql.schema.DataFetchingEnvironment;
6+
import graphql.schema.GraphQLSchema;
7+
import graphql.schema.idl.RuntimeWiring;
8+
import graphql.schema.idl.SchemaGenerator;
9+
import graphql.schema.idl.SchemaParser;
10+
import org.reactivestreams.Publisher;
11+
import org.reactivestreams.Subscriber;
12+
import org.reactivestreams.Subscription;
13+
import reactor.core.publisher.Flux;
14+
15+
import javax.annotation.Nonnull;
16+
import java.util.Map;
17+
import java.util.Random;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
21+
import static graphql.schema.idl.RuntimeWiring.newRuntimeWiring;
22+
import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring;
23+
24+
/**
25+
* Related to <a href="https://github.com/spring-projects/spring-graphql/issues/949">...</a>
26+
* <p>
27+
* This reproduction is to see what's happening with Subscriptions and whether they keep their
28+
* order when values are async.
29+
*/
30+
public class SubscriptionReproduction {
31+
public static void main(String[] args) {
32+
new SubscriptionReproduction().run();
33+
}
34+
35+
private void run() {
36+
37+
GraphQL graphQL = mkGraphQl();
38+
String query = "subscription MySubscription {\n" +
39+
" searchVideo {\n" +
40+
" id\n" +
41+
" name\n" +
42+
" isFavorite\n" +
43+
" }\n" +
44+
"}";
45+
ExecutionResult executionResult = graphQL.execute(query);
46+
Publisher<Map<String, Object>> publisher = executionResult.getData();
47+
48+
DeathEater eater = new DeathEater();
49+
eater.eat(publisher);
50+
}
51+
52+
private GraphQL mkGraphQl() {
53+
String sdl = "type Query { f : ID }" +
54+
"type Subscription {" +
55+
" searchVideo : VideoSearch" +
56+
"}" +
57+
"type VideoSearch {" +
58+
" id : ID" +
59+
" name : String" +
60+
" isFavorite : Boolean" +
61+
"}";
62+
RuntimeWiring runtimeWiring = newRuntimeWiring()
63+
.type(newTypeWiring("Subscription")
64+
.dataFetcher("searchVideo", this::mkFluxDF)
65+
)
66+
.type(newTypeWiring("VideoSearch")
67+
.dataFetcher("name", this::nameDF)
68+
.dataFetcher("isFavorite", this::isFavoriteDF)
69+
)
70+
.build();
71+
72+
GraphQLSchema schema = new SchemaGenerator().makeExecutableSchema(
73+
new SchemaParser().parse(sdl), runtimeWiring
74+
);
75+
return GraphQL.newGraphQL(schema).build();
76+
}
77+
78+
private CompletableFuture<Flux<Object>> mkFluxDF(DataFetchingEnvironment env) {
79+
// async deliver of the publisher with random snoozing between values
80+
return CompletableFuture.supplyAsync(() -> Flux.generate(() -> 0, (counter, sink) -> {
81+
sink.next(mkValue(counter));
82+
snooze(rand(10, 100));
83+
if (counter == 10) {
84+
sink.complete();
85+
}
86+
return counter + 1;
87+
}));
88+
}
89+
90+
private Object isFavoriteDF(DataFetchingEnvironment env) {
91+
// async deliver of the isFavorite property with random delay
92+
return CompletableFuture.supplyAsync(() -> {
93+
Integer counter = getCounter(env.getSource());
94+
return counter % 2 == 0;
95+
});
96+
}
97+
98+
private Object nameDF(DataFetchingEnvironment env) {
99+
// async deliver of the isFavorite property with random delay
100+
return CompletableFuture.supplyAsync(() -> {
101+
Integer counter = getCounter(env.getSource());
102+
return "name" + counter;
103+
});
104+
}
105+
106+
private static Integer getCounter(Map<String, Object> video) {
107+
Integer counter = (Integer) video.getOrDefault("counter", 0);
108+
snooze(rand(100, 500));
109+
return counter;
110+
}
111+
112+
private @Nonnull Object mkValue(Integer counter) {
113+
// name and isFavorite are future values via DFs
114+
return Map.of(
115+
"counter", counter,
116+
"id", String.valueOf(counter) // immediate value
117+
);
118+
}
119+
120+
121+
private static void snooze(int ms) {
122+
try {
123+
Thread.sleep(ms);
124+
} catch (InterruptedException e) {
125+
throw new RuntimeException(e);
126+
}
127+
}
128+
129+
static Random rn = new Random();
130+
131+
private static int rand(int min, int max) {
132+
return rn.nextInt(max - min + 1) + min;
133+
}
134+
135+
public static class DeathEater implements Subscriber<Object> {
136+
private Subscription subscription;
137+
private final AtomicBoolean done = new AtomicBoolean();
138+
139+
public boolean isDone() {
140+
return done.get();
141+
}
142+
143+
@Override
144+
public void onSubscribe(Subscription subscription) {
145+
this.subscription = subscription;
146+
System.out.println("onSubscribe");
147+
subscription.request(1);
148+
}
149+
150+
@Override
151+
public void onNext(Object o) {
152+
System.out.println("\tonNext : " + o);
153+
subscription.request(1);
154+
}
155+
156+
@Override
157+
public void onError(Throwable throwable) {
158+
System.out.println("onError");
159+
throwable.printStackTrace(System.err);
160+
done.set(true);
161+
}
162+
163+
@Override
164+
public void onComplete() {
165+
System.out.println("complete");
166+
done.set(true);
167+
}
168+
169+
public void eat(Publisher<?> publisher) {
170+
publisher.subscribe(this);
171+
while (!this.isDone()) {
172+
snooze(2);
173+
}
174+
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)