From 29aa7c134ea4eed44ed9519a4cf7cea76e8af50b Mon Sep 17 00:00:00 2001 From: Juan C Galvis <8420868+juancgalvis@users.noreply.github.com> Date: Wed, 15 May 2024 11:43:23 -0500 Subject: [PATCH] fix(sender): Start fix in sender after disconnection --- .../async/rabbit/config/RabbitMqConfig.java | 7 +- .../communications/ReactiveMessageSender.java | 72 +++++++++++++------ .../main/java/sample/MyRabbitMQConfig.java | 3 +- .../src/main/resources/application.yaml | 5 +- 4 files changed, 57 insertions(+), 30 deletions(-) diff --git a/async/async-rabbit-standalone/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-standalone/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java index 0d930391..05cc65ab 100644 --- a/async/async-rabbit-standalone/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java +++ b/async/async-rabbit-standalone/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -32,10 +32,9 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M senderConnection, new ChannelPoolOptions().maxCacheSize(rabbitProperties.getChannelPoolMaxCacheSize()) ); - - final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool)); - - return new ReactiveMessageSender(sender, appName, converter, new TopologyCreator(sender)); + final SenderOptions senderOptions = new SenderOptions().channelPool(channelPool); + final Sender sender = RabbitFlux.createSender(senderOptions); + return new ReactiveMessageSender(senderOptions, appName, converter, new TopologyCreator(sender)); } /*public ReactiveMessageListener messageListener(ConnectionFactoryProvider provider) { diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java index 51bfdacb..97e7640d 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java @@ -1,6 +1,9 @@ package org.reactivecommons.async.rabbit.communications; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.ShutdownSignalException; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.reactivecommons.async.commons.communications.Message; import org.reactivecommons.async.commons.converters.MessageConverter; import org.reactivecommons.async.commons.exceptions.SendFailureNoAckException; @@ -20,15 +23,19 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.reactivecommons.async.api.DirectAsyncGateway.DELAYED; import static org.reactivecommons.async.commons.Headers.SOURCE_APPLICATION; +@Slf4j public class ReactiveMessageSender { + @Getter private final Sender sender; private final String sourceApplication; private final MessageConverter messageConverter; + @Getter private final TopologyCreator topologyCreator; private final int numberOfSenderSubscriptions = 4; @@ -46,37 +53,65 @@ public ReactiveMessageSender(Sender sender, String sourceApplication, MessageCon this.sourceApplication = sourceApplication; this.messageConverter = messageConverter; this.topologyCreator = topologyCreator; + createSendersWithConfirm(numberOfSenderSubscriptions); + createSenderNoConfirm(); + } - for (int i = 0; i < numberOfSenderSubscriptions; ++i) { - final Flux messageSource = Flux.create(fluxSinkConfirm::add); - sender.sendWithTypedPublishConfirms(messageSource).doOnNext((OutboundMessageResult outboundMessageResult) -> { - final Consumer ackNotifier = outboundMessageResult.getOutboundMessage().getAckNotifier(); - executorService.submit(() -> ackNotifier.accept(outboundMessageResult.isAck())); - }).subscribe(); + private void createSendersWithConfirm(int toAdd) { + for (int i = 0; i < toAdd; ++i) { + final AtomicReference> fluxSinkRef = new AtomicReference<>(); + final Flux messageSource = Flux.create(fluxSink -> { + fluxSinkRef.set(fluxSink); + this.fluxSinkConfirm.add(fluxSink); + }); + sender.sendWithTypedPublishConfirms(messageSource) + .doOnNext((OutboundMessageResult outboundMessageResult) -> { + final Consumer ackNotifier = outboundMessageResult.getOutboundMessage().getAckNotifier(); + executorService.submit(() -> ackNotifier.accept(outboundMessageResult.isAck())); + }) + .onErrorResume(ShutdownSignalException.class, throwable -> { + log.warn("RabbitMQ connection lost. Trying to recreate sender with confirm...", throwable); + fluxSinkConfirm.remove(fluxSinkRef.get()); + createSendersWithConfirm(1); + return Mono.empty(); + }) + .subscribe(); } + } - final Flux messageSourceNoConfirm = Flux.create(fluxSink -> { - this.fluxSinkNoConfirm = fluxSink; - }); - sender.send(messageSourceNoConfirm).subscribe(); - + private void createSenderNoConfirm() { + final Flux messageSourceNoConfirm = Flux.create(fluxSink -> this.fluxSinkNoConfirm = fluxSink); + sender.send(messageSourceNoConfirm) + .onErrorResume(ShutdownSignalException.class, throwable -> { + log.warn("RabbitMQ connection lost. Trying to recreate sender no confirm..."); + createSenderNoConfirm(); + return Mono.empty(); + }) + .subscribe(); } public Mono sendWithConfirm(T message, String exchange, String routingKey, Map headers, boolean persistent) { return Mono.create(monoSink -> { Consumer notifier = new AckNotifier(monoSink); final MyOutboundMessage outboundMessage = toOutboundMessage(message, exchange, routingKey, headers, notifier, persistent); - executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis() % numberOfSenderSubscriptions)).next(outboundMessage)); + executorService2.submit(() -> { + FluxSink outboundFlux = fluxSinkConfirm.get(random(numberOfSenderSubscriptions)); + outboundFlux.next(outboundMessage); + }); }); } + private static int random(int max) { + return (int) (System.currentTimeMillis() % max); + } + public Mono sendNoConfirm(T message, String exchange, String routingKey, Map headers, boolean persistent) { fluxSinkNoConfirm.next(toOutboundMessage(message, exchange, routingKey, headers, persistent)); return Mono.empty(); } - public Flux sendWithConfirmBatch(Flux messages, String exchange, String routingKey, Map headers, boolean persistent) { + public Flux> sendWithConfirmBatch(Flux messages, String exchange, String routingKey, Map headers, boolean persistent) { return messages.map(message -> toOutboundMessage(message, exchange, routingKey, headers, persistent)) .as(sender::sendWithPublishConfirms) .flatMap(result -> result.isAck() ? @@ -103,6 +138,7 @@ public void accept(Boolean ack) { } + @Getter static class MyOutboundMessage extends OutboundMessage { private final Consumer ackNotifier; @@ -112,9 +148,6 @@ public MyOutboundMessage(String exchange, String routingKey, AMQP.BasicPropertie this.ackNotifier = ackNotifier; } - public Consumer getAckNotifier() { - return ackNotifier; - } } private MyOutboundMessage toOutboundMessage(T object, String exchange, String routingKey, Map headers, Consumer ackNotifier, boolean persistent) { @@ -148,12 +181,5 @@ private AMQP.BasicProperties buildMessageProperties(Message message, Map