Skip to content

Commit c6a2c89

Browse files
committed
Package refactoring in rsocket support
Create annotation.support sub-package and move handler code there. This prepares for a future, functional handler (responder) variant and is consistent with the package structure under simp.
1 parent cc05608 commit c6a2c89

12 files changed

Lines changed: 56 additions & 11 deletions

File tree

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,15 @@ public final void onNext(T item) {
189189
else if (this.state == State.NEW) {
190190
this.item = item;
191191
this.state = State.FIRST_SIGNAL_RECEIVED;
192-
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
192+
Publisher<Void> result;
193+
try {
194+
result = writeFunction.apply(this);
195+
}
196+
catch (Throwable ex) {
197+
this.writeCompletionBarrier.onError(ex);
198+
return;
199+
}
200+
result.subscribe(this.writeCompletionBarrier);
193201
}
194202
else {
195203
if (this.subscription != null) {

spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* @author Rossen Stoyanchev
3535
* @since 5.2
3636
*/
37-
abstract class PayloadUtils {
37+
public abstract class PayloadUtils {
3838

3939
/**
4040
* Use this method to slice, retain and wrap the data portion of the
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/**
2+
* Annotations and support classes for handling RSocket streams.
3+
*/
4+
@NonNullApi
5+
@NonNullFields
6+
package org.springframework.messaging.rsocket.annotation;
7+
8+
import org.springframework.lang.NonNullApi;
9+
import org.springframework.lang.NonNullFields;

spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java renamed to spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.messaging.rsocket;
17+
package org.springframework.messaging.rsocket.annotation.support;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.Arrays;
21+
import java.util.List;
2022
import java.util.concurrent.atomic.AtomicBoolean;
2123
import java.util.function.Function;
2224

@@ -39,6 +41,8 @@
3941
import org.springframework.messaging.MessageHeaders;
4042
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
4143
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
44+
import org.springframework.messaging.rsocket.PayloadUtils;
45+
import org.springframework.messaging.rsocket.RSocketRequester;
4246
import org.springframework.messaging.support.MessageBuilder;
4347
import org.springframework.messaging.support.MessageHeaderAccessor;
4448
import org.springframework.util.Assert;
@@ -57,6 +61,13 @@
5761
*/
5862
class MessagingRSocket extends AbstractRSocket {
5963

64+
static final MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0");
65+
66+
private static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0");
67+
68+
private static final List<MimeType> METADATA_MIME_TYPES = Arrays.asList(COMPOSITE_METADATA, ROUTING);
69+
70+
6071
private final RSocketMessageHandler messageHandler;
6172

6273
private final RouteMatcher routeMatcher;
@@ -80,7 +91,7 @@ class MessagingRSocket extends AbstractRSocket {
8091
Assert.notNull(dataMimeType, "'dataMimeType' is required");
8192
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
8293

83-
Assert.isTrue(DefaultRSocketRequester.METADATA_MIME_TYPES.contains(metadataMimeType),
94+
Assert.isTrue(METADATA_MIME_TYPES.contains(metadataMimeType),
8495
() -> "Unexpected metadatata mime type: '" + metadataMimeType + "'");
8596

8697
this.messageHandler = messageHandler;
@@ -178,17 +189,17 @@ private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payload
178189
}
179190

180191
private String getDestination(Payload payload) {
181-
if (this.metadataMimeType.equals(DefaultRSocketRequester.COMPOSITE_METADATA)) {
192+
if (this.metadataMimeType.equals(COMPOSITE_METADATA)) {
182193
CompositeMetadata metadata = new CompositeMetadata(payload.metadata(), false);
183194
for (CompositeMetadata.Entry entry : metadata) {
184195
String mimeType = entry.getMimeType();
185-
if (DefaultRSocketRequester.ROUTING.toString().equals(mimeType)) {
196+
if (ROUTING.toString().equals(mimeType)) {
186197
return entry.getContent().toString(StandardCharsets.UTF_8);
187198
}
188199
}
189200
return "";
190201
}
191-
else if (this.metadataMimeType.equals(DefaultRSocketRequester.ROUTING)) {
202+
else if (this.metadataMimeType.equals(ROUTING)) {
192203
return payload.getMetadataUtf8();
193204
}
194205
// Should not happen (given constructor assertions)

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java renamed to spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.messaging.rsocket;
17+
package org.springframework.messaging.rsocket.annotation.support;
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
@@ -33,6 +33,8 @@
3333
import org.springframework.messaging.MessageDeliveryException;
3434
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
3535
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
36+
import org.springframework.messaging.rsocket.RSocketRequester;
37+
import org.springframework.messaging.rsocket.RSocketStrategies;
3638
import org.springframework.util.Assert;
3739
import org.springframework.util.MimeType;
3840
import org.springframework.util.MimeTypeUtils;
@@ -61,7 +63,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
6163
@Nullable
6264
private MimeType defaultDataMimeType;
6365

64-
private MimeType defaultMetadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA;
66+
private MimeType defaultMetadataMimeType = MessagingRSocket.COMPOSITE_METADATA;
6567

6668

6769
/**

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketPayloadReturnValueHandler.java renamed to spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.messaging.rsocket;
17+
package org.springframework.messaging.rsocket.annotation.support;
1818

1919
import java.util.List;
2020

@@ -30,6 +30,7 @@
3030
import org.springframework.lang.Nullable;
3131
import org.springframework.messaging.Message;
3232
import org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler;
33+
import org.springframework.messaging.rsocket.PayloadUtils;
3334
import org.springframework.util.Assert;
3435

3536
/**

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequesterMethodArgumentResolver.java renamed to spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketRequesterMethodArgumentResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.messaging.rsocket;
17+
package org.springframework.messaging.rsocket.annotation.support;
1818

1919
import io.rsocket.RSocket;
2020
import reactor.core.publisher.Mono;
2121

2222
import org.springframework.core.MethodParameter;
2323
import org.springframework.messaging.Message;
2424
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
25+
import org.springframework.messaging.rsocket.RSocketRequester;
2526
import org.springframework.util.Assert;
2627

2728
/**
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/**
2+
* Support classes for working with annotated RSocket stream handling methods.
3+
*/
4+
@NonNullApi
5+
@NonNullFields
6+
package org.springframework.messaging.rsocket.annotation.support;
7+
8+
import org.springframework.lang.NonNullApi;
9+
import org.springframework.lang.NonNullFields;

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
5959
import org.springframework.messaging.handler.annotation.MessageMapping;
6060
import org.springframework.messaging.handler.annotation.Payload;
61+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
6162
import org.springframework.stereotype.Controller;
6263
import org.springframework.util.ObjectUtils;
6364

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.core.io.buffer.NettyDataBufferFactory;
4040
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
4141
import org.springframework.messaging.handler.annotation.MessageMapping;
42+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
4243
import org.springframework.stereotype.Controller;
4344

4445
import static org.assertj.core.api.Assertions.assertThat;

0 commit comments

Comments
 (0)