@@ -22,8 +22,9 @@ import kotlinx.coroutines.flow.Flow
2222import kotlinx.coroutines.reactive.awaitFirstOrNull
2323import kotlinx.coroutines.reactive.awaitSingle
2424import kotlinx.coroutines.reactive.flow.asFlow
25- import kotlinx.coroutines.reactive.flow.asPublisher
2625import org.springframework.core.ParameterizedTypeReference
26+ import reactor.core.publisher.Flux
27+ import reactor.core.publisher.Mono
2728import java.net.URI
2829
2930/* *
@@ -53,16 +54,18 @@ suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int)
5354suspend fun RSocketRequester.Builder.connectWebSocketAndAwait (uri : URI ): RSocketRequester =
5455 connectWebSocket(uri).awaitSingle()
5556
56-
5757/* *
58- * Kotlin [Flow] variant of [RSocketRequester.RequestSpec.data].
58+ * Extension for [RSocketRequester.RequestSpec.data] providing a `data<Foo>(producer)`
59+ * variant leveraging Kotlin reified type parameters. This extension is not subject to type
60+ * erasure and retains actual generic type arguments.
5961 *
6062 * @author Sebastien Deleuze
6163 * @since 5.2
6264 */
65+ @Suppress(" EXTENSION_SHADOWED_BY_MEMBER" )
6366@FlowPreview
64- fun <T : Any > RSocketRequester.RequestSpec.dataFlow ( data : Flow < T > ): RSocketRequester .ResponseSpec =
65- data(data.asPublisher() , object : ParameterizedTypeReference <T >() {})
67+ fun <T : Any > RSocketRequester.RequestSpec.data ( producer : Any ): RSocketRequester .ResponseSpec =
68+ data(producer , object : ParameterizedTypeReference <T >() {})
6669
6770/* *
6871 * Coroutines variant of [RSocketRequester.ResponseSpec.send].
@@ -92,3 +95,26 @@ suspend fun <T : Any> RSocketRequester.ResponseSpec.retrieveAndAwait(): T =
9295@FlowPreview
9396fun <T : Any > RSocketRequester.ResponseSpec.retrieveFlow (batchSize : Int = 1): Flow <T > =
9497 retrieveFlux(object : ParameterizedTypeReference <T >() {}).asFlow(batchSize)
98+
99+ /* *
100+ * Extension for [RSocketRequester.ResponseSpec.retrieveMono] providing a `retrieveMono<Foo>()`
101+ * variant leveraging Kotlin reified type parameters. This extension is not subject to type
102+ * erasure and retains actual generic type arguments.
103+ *
104+ * @author Sebastien Deleuze
105+ * @since 5.2
106+ */
107+ fun <T : Any > RSocketRequester.ResponseSpec.retrieveMono (): Mono <T > =
108+ retrieveMono(object : ParameterizedTypeReference <T >() {})
109+
110+
111+ /* *
112+ * Extension for [RSocketRequester.ResponseSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
113+ * variant leveraging Kotlin reified type parameters. This extension is not subject to type
114+ * erasure and retains actual generic type arguments.
115+ *
116+ * @author Sebastien Deleuze
117+ * @since 5.2
118+ */
119+ fun <T : Any > RSocketRequester.ResponseSpec.retrieveFlux (): Flux <T > =
120+ retrieveFlux(object : ParameterizedTypeReference <T >() {})
0 commit comments