Skip to content

Commit eaecf0c

Browse files
committed
Improve RSocketRequester.ResponseSpec Kotlin extensions
This commit adds retrieveMono and retrieveFlux reified variants, and turns dataFlow(flow: Flow) extension into a general purpose reified data(producer: Any) one. Closes spring-projectsgh-23164
1 parent 003247d commit eaecf0c

File tree

2 files changed

+59
-8
lines changed

2 files changed

+59
-8
lines changed

spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import kotlinx.coroutines.flow.Flow
2222
import kotlinx.coroutines.reactive.awaitFirstOrNull
2323
import kotlinx.coroutines.reactive.awaitSingle
2424
import kotlinx.coroutines.reactive.flow.asFlow
25-
import kotlinx.coroutines.reactive.flow.asPublisher
2625
import org.springframework.core.ParameterizedTypeReference
26+
import reactor.core.publisher.Flux
27+
import reactor.core.publisher.Mono
2728
import java.net.URI
2829

2930
/**
@@ -53,16 +54,18 @@ suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int)
5354
suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocketRequester =
5455
connectWebSocket(uri).awaitSingle()
5556

56-
5757
/**
58-
* Kotlin [Flow] variant of [RSocketRequester.RequestSpec.data].
58+
* Extension for [RSocketRequester.RequestSpec.data] providing a `data<Foo>(producer)`
59+
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
60+
* erasure and retains actual generic type arguments.
5961
*
6062
* @author Sebastien Deleuze
6163
* @since 5.2
6264
*/
65+
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
6366
@FlowPreview
64-
fun <T : Any> RSocketRequester.RequestSpec.dataFlow(data: Flow<T>): RSocketRequester.ResponseSpec =
65-
data(data.asPublisher(), object : ParameterizedTypeReference<T>() {})
67+
fun <T : Any> RSocketRequester.RequestSpec.data(producer: Any): RSocketRequester.ResponseSpec =
68+
data(producer, object : ParameterizedTypeReference<T>() {})
6669

6770
/**
6871
* Coroutines variant of [RSocketRequester.ResponseSpec.send].
@@ -92,3 +95,26 @@ suspend fun <T : Any> RSocketRequester.ResponseSpec.retrieveAndAwait(): T =
9295
@FlowPreview
9396
fun <T : Any> RSocketRequester.ResponseSpec.retrieveFlow(batchSize: Int = 1): Flow<T> =
9497
retrieveFlux(object : ParameterizedTypeReference<T>() {}).asFlow(batchSize)
98+
99+
/**
100+
* Extension for [RSocketRequester.ResponseSpec.retrieveMono] providing a `retrieveMono<Foo>()`
101+
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
102+
* erasure and retains actual generic type arguments.
103+
*
104+
* @author Sebastien Deleuze
105+
* @since 5.2
106+
*/
107+
fun <T : Any> RSocketRequester.ResponseSpec.retrieveMono(): Mono<T> =
108+
retrieveMono(object : ParameterizedTypeReference<T>() {})
109+
110+
111+
/**
112+
* Extension for [RSocketRequester.ResponseSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
113+
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
114+
* erasure and retains actual generic type arguments.
115+
*
116+
* @author Sebastien Deleuze
117+
* @since 5.2
118+
*/
119+
fun <T : Any> RSocketRequester.ResponseSpec.retrieveFlux(): Flux<T> =
120+
retrieveFlux(object : ParameterizedTypeReference<T>() {})

spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package org.springframework.messaging.rsocket
33
import io.mockk.every
44
import io.mockk.mockk
55
import kotlinx.coroutines.FlowPreview
6-
import kotlinx.coroutines.flow.Flow
76
import kotlinx.coroutines.flow.toList
87
import kotlinx.coroutines.runBlocking
98
import org.junit.Assert.assertEquals
@@ -54,11 +53,19 @@ class RSocketRequesterExtensionsTests {
5453
}
5554

5655
@Test
57-
fun dataFlow() {
56+
fun dataFlowWithType() {
5857
val requestSpec = mockk<RSocketRequester.RequestSpec>()
5958
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
6059
every { requestSpec.data(any<Publisher<String>>(), any<ParameterizedTypeReference<String>>()) } returns responseSpec
61-
assertEquals(responseSpec, requestSpec.dataFlow(mockk<Flow<String>>()))
60+
assertEquals(responseSpec, requestSpec.data<String>(mockk()))
61+
}
62+
63+
@Test
64+
fun dataFlowWithoutType() {
65+
val requestSpec = mockk<RSocketRequester.RequestSpec>()
66+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
67+
every { requestSpec.data(any()) } returns responseSpec
68+
assertEquals(responseSpec, requestSpec.data(mockk()))
6269
}
6370

6471
@Test
@@ -88,4 +95,22 @@ class RSocketRequesterExtensionsTests {
8895
assertEquals(listOf("foo", "bar"), responseSpec.retrieveFlow<String>().toList())
8996
}
9097
}
98+
99+
@Test
100+
fun retrieveMono() {
101+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
102+
every { responseSpec.retrieveMono(any<ParameterizedTypeReference<String>>()) } returns Mono.just("foo")
103+
runBlocking {
104+
assertEquals("foo", responseSpec.retrieveMono<String>().block())
105+
}
106+
}
107+
108+
@Test
109+
fun retrieveFlux() {
110+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
111+
every { responseSpec.retrieveFlux(any<ParameterizedTypeReference<String>>()) } returns Flux.just("foo", "bar")
112+
runBlocking {
113+
assertEquals(listOf("foo", "bar"), responseSpec.retrieveFlux<String>().collectList().block())
114+
}
115+
}
91116
}

0 commit comments

Comments
 (0)