From 98a64b86ac17f8a4499dbbfdac8c9882ebad28a7 Mon Sep 17 00:00:00 2001 From: Alejandro Betancur Barrientos Date: Mon, 6 Jun 2022 14:26:03 -0500 Subject: [PATCH 1/3] SQS/SNS adapter code base --- .../async-sqs-starter.gradle | 90 +++++ .../async/impl/config/AWSConfig.java | 145 ++++++++ .../async/impl/config/AWSProperties.java | 19 ++ .../impl/config/DirectAsyncGatewayConfig.java | 36 ++ .../async/impl/config/EventBusConfig.java | 36 ++ .../impl/config/MessageListenersConfig.java | 109 ++++++ .../annotations/EnableDirectAsyncGateway.java | 19 ++ .../annotations/EnableDomainEventBus.java | 19 ++ .../annotations/EnableMessageListeners.java | 19 ++ .../async/impl/config/props/AsyncProps.java | 32 ++ .../impl/config/props/BrokerConfigProps.java | 78 +++++ .../async/impl/config/props/DirectProps.java | 12 + .../async/impl/config/props/DomainProps.java | 14 + .../async/impl/config/props/EventsProps.java | 12 + .../async/impl/config/props/FluxProps.java | 12 + .../src/test/resources/application.properties | 1 + async/async-sqs/async-sqs.gradle | 99 ++++++ .../async/impl/DynamicRegistryImp.java | 19 ++ .../reactivecommons/async/impl/Handlers.java | 39 +++ .../reactivecommons/async/impl/Headers.java | 14 + .../async/impl/SNSDirectAsyncGateway.java | 36 ++ .../async/impl/SNSDomainEventBus.java | 21 ++ .../handlers/ApplicationCommandHandler.java | 53 +++ .../handlers/ApplicationEventHandler.java | 53 +++ .../impl/handlers/GenericMessageHandler.java | 16 + .../async/impl/model/MessageSQS.java | 12 + .../async/impl/model/SNSEventModel.java | 61 ++++ .../impl/sns/communications/Listener.java | 118 +++++++ .../impl/sns/communications/SQSSender.java | 54 +++ .../async/impl/sns/communications/Sender.java | 78 +++++ .../sns/communications/TopologyCreator.java | 172 ++++++++++ .../async/impl/DynamicRegistryImpTest.java | 37 +++ .../async/impl/SNSDirectAsyncGatewayTest.java | 60 ++++ .../async/impl/SNSDomainEventBusTest.java | 59 ++++ .../ApplicationCommandHandlerTest.java | 92 ++++++ .../handlers/ApplicationEventHandlerTest.java | 92 ++++++ .../async/impl/model/SNSEventModelTest.java | 20 ++ .../sns/communications/SQSSenderTest.java | 67 ++++ .../impl/sns/communications/SenderTest.java | 87 +++++ .../communications/TopologyCreatorTest.java | 311 ++++++++++++++++++ 40 files changed, 2323 insertions(+) create mode 100644 async/async-sqs-starter/async-sqs-starter.gradle create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSConfig.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSProperties.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/EventBusConfig.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DirectProps.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DomainProps.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/EventsProps.java create mode 100644 async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/FluxProps.java create mode 100644 async/async-sqs-starter/src/test/resources/application.properties create mode 100644 async/async-sqs/async-sqs.gradle create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/Handlers.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/Headers.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDirectAsyncGateway.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDomainEventBus.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/SNSEventModel.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Listener.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/SQSSender.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Sender.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/TopologyCreator.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java create mode 100644 async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java diff --git a/async/async-sqs-starter/async-sqs-starter.gradle b/async/async-sqs-starter/async-sqs-starter.gradle new file mode 100644 index 00000000..55491d90 --- /dev/null +++ b/async/async-sqs-starter/async-sqs-starter.gradle @@ -0,0 +1,90 @@ +plugins { + id "com.jfrog.bintray" version "1.8.5" + id 'java-library' + id 'maven' + id 'maven-publish' +} + +test.onlyIf { false } + +def pomConfig = { + licenses { + license { + name "The Apache Software License, Version 2.0" + url "http://www.apache.org/licenses/LICENSE-2.0.txt" + distribution "repo" + } + } + developers { + developer { + id "andmagom" + name "Andrés Mauricio Gómez P" + email "andmagom@outlook.com" + } + developer { + id "alejobtc" + name "Alejandro Betancur Barrientos" + email "alejobtc@gmail.com" + } + } + + scm { + url "git@github.com:reactive-commons/reactive-commons-java.git" + } +} + +publishing { + publications { + MyPublication(MavenPublication) { + from components.java + artifact sourcesJar { + classifier "sources" + } + artifact javadocJar { + classifier "javadoc" + } + groupId 'org.reactivecommons' + artifactId 'async-sqs-starter' + version project.property('version') + pom.withXml { + def root = asNode() + root.appendNode('description', 'Async SQS Starter') + root.appendNode('name', 'async-sqs-starter') + root.appendNode('url', 'https://site_for_lib.tld') + root.children().last() + pomConfig + } + } + } +} + +bintray { + user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') + key = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') + publications = ['MyPublication'] + publish = true + pkg { + repo = 'maven-artifacts' + userOrg = 'reactive-commons' + name = 'reactive-commons' + licenses = ['Apache-2.0'] + vcsUrl = 'git@github.com:reactive-commons/reactive-commons-java.git' + version { + name = project.property('version') + desc = 'First version' + released = new Date() + vcsTag = project.property('version') + } + } +} + +dependencies { + implementation platform('software.amazon.awssdk:bom:2.13.10') + compile project(":async-sqs") + api('org.springframework.boot:spring-boot-starter') + implementation 'software.amazon.awssdk:sns' + implementation 'software.amazon.awssdk:sqs' + + annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' + + testImplementation 'io.projectreactor:reactor-test' +} \ No newline at end of file diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSConfig.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSConfig.java new file mode 100644 index 00000000..7da9723a --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSConfig.java @@ -0,0 +1,145 @@ +package org.reactivecommons.async.impl.config; + +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import org.reactivecommons.async.impl.config.props.AsyncProps; +import org.reactivecommons.async.impl.config.props.BrokerConfigProps; +import org.reactivecommons.async.impl.handlers.ApplicationCommandHandler; +import org.reactivecommons.async.impl.handlers.ApplicationEventHandler; +import org.reactivecommons.async.impl.sns.communications.Listener; +import org.reactivecommons.async.impl.sns.communications.SQSSender; +import org.reactivecommons.async.impl.sns.communications.Sender; +import org.reactivecommons.async.impl.sns.communications.TopologyCreator; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.ListTopicsRequest; +import software.amazon.awssdk.services.sns.model.Topic; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + + +@Log +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties({ + AWSProperties.class, + AsyncProps.class +}) +@Import({BrokerConfigProps.class, MessageListenersConfig.class}) +public class AWSConfig { + + private final AsyncProps asyncProps; + + @Value("${spring.application.name}") + private String appName; + private String arnSnsPrefix; + private String arnSqsPrefix; + + @Bean + public Sender messageSender(SnsAsyncClient client, AWSProperties awsProperties, BrokerConfigProps props) { + String exchangeName = props.getDomainEventsExchangeName(); + arnSnsPrefix = getTopicArn(exchangeName, client).block(); + final Sender sender = new Sender(client, appName, arnSnsPrefix); + arnSqsPrefix = arnSnsPrefix.replace("sns", "sqs"); + return sender; + } + + @Bean + public SQSSender sqsSender(SqsAsyncClient client) { + return new SQSSender(client); + } + + @Bean("evtListener") + public Listener messageEventListener(SqsAsyncClient sqsClient, ApplicationEventHandler appEvtListener, + BrokerConfigProps props, TopologyCreator topology, SQSSender sqsSender) { + final Listener listener = getListener(sqsClient, sqsSender); + + final String exchangeName = props.getDomainEventsExchangeName(); + + String queueName = props.getEventsQueue(); + topology.createQueue(queueName).block(); + topology.bind(queueName, exchangeName).block(); + topology.setQueueAttributes(queueName, exchangeName, arnSnsPrefix, arnSqsPrefix).block(); + String queueUrl = topology.getQueueUrl(queueName).block(); + listener.startListener(queueUrl, appEvtListener::handle).subscribe(); + return listener; + } + + private Listener getListener(SqsAsyncClient sqsClient, SQSSender sqsSender) { + return Listener.builder() + .client(sqsClient) + .sqsSender(sqsSender) + .retryDelay(asyncProps.getRetryDelay()) + .maxRetries(asyncProps.getMaxRetries()) + .prefetchCount(asyncProps.getPrefetchCount()) + .build(); + } + + @Bean("commandListener") + public Listener messageCommandListener(SqsAsyncClient sqsClient, ApplicationCommandHandler appCmdListener, + BrokerConfigProps props, TopologyCreator topoloy, SQSSender sqsSender) { + final Listener listener = getListener(sqsClient, sqsSender); + + final String exchangeName = appName.concat(props.getDirectMessagesExchangeName()); + + String queueName = props.getCommandsQueue(); + topoloy.createTopic(exchangeName).block(); + topoloy.createQueue(queueName).block(); + topoloy.bind(queueName, exchangeName).block(); + topoloy.setQueueAttributes(queueName, exchangeName, arnSnsPrefix, arnSqsPrefix).block(); + String queueUrl = topoloy.getQueueUrl(queueName).block(); + listener.startListener(queueUrl, appCmdListener::handle).subscribe(); + return listener; + } + + @Bean + public SqsAsyncClient getSQSAsyncClient(AWSProperties awsProperties) { + Region region = Region.of(awsProperties.getRegion()); + return SqsAsyncClient.builder() + .region(region) + .credentialsProvider(DefaultCredentialsProvider.create()) + .build(); + } + + @Bean + public SnsAsyncClient getSNSAsyncClient(AWSProperties awsProperties) { + Region region = Region.of(awsProperties.getRegion()); + return SnsAsyncClient.builder() + .region(region) + .credentialsProvider(DefaultCredentialsProvider.create()) + .build(); + } + + @Bean + public TopologyCreator getTopology(SqsAsyncClient sqsAsyncClient, SnsAsyncClient snsAsyncClient, BrokerConfigProps props) { + String queueName = props.getDomainEventsExchangeName(); + return new TopologyCreator(snsAsyncClient, sqsAsyncClient); + } + + public Flux listTopics(SnsAsyncClient snsAsyncClient) { + return getListTopicRequest() + .flatMap(request -> Mono.fromFuture(snsAsyncClient.listTopics(request))) + .flatMapMany((response) -> Flux.fromIterable(response.topics())); + } + + public Mono getTopicArn(String name, SnsAsyncClient snsAsyncClient) { + return listTopics(snsAsyncClient) + .map(Topic::topicArn) + .filter((topic) -> topic.contains(":" + name)) + .map((e) -> e.replace(":" + name, "")) + .single(); + } + + private Mono getListTopicRequest() { + return Mono.just(ListTopicsRequest.builder().build()); + } + + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSProperties.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSProperties.java new file mode 100644 index 00000000..99c8eb81 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/AWSProperties.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.convert.DurationUnit; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +@ConfigurationProperties(prefix = "aws") +@Data +public class AWSProperties { + private String region; +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java new file mode 100644 index 00000000..33b3725a --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java @@ -0,0 +1,36 @@ +package org.reactivecommons.async.impl.config; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.impl.SNSDirectAsyncGateway; +import org.reactivecommons.async.impl.config.props.BrokerConfigProps; +import org.reactivecommons.async.impl.reply.ReactiveReplyRouter; +import org.reactivecommons.async.impl.sns.communications.Sender; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +@Configuration +@Import(AWSConfig.class) +@RequiredArgsConstructor +public class DirectAsyncGatewayConfig { + + private final BrokerConfigProps props; + + @Bean + public SNSDirectAsyncGateway sqsDirectAsyncGateway( Sender rSender ) throws Exception { + return new SNSDirectAsyncGateway(rSender, props.getDirectMessagesExchangeName()); + } + + @Bean + public BrokerConfig brokerConfig() { + return new BrokerConfig(); + } + + + @Bean + public ReactiveReplyRouter router() { + return new ReactiveReplyRouter(); + } + + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/EventBusConfig.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/EventBusConfig.java new file mode 100644 index 00000000..965d1b6d --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/EventBusConfig.java @@ -0,0 +1,36 @@ +package org.reactivecommons.async.impl.config; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.impl.SNSDomainEventBus; +import org.reactivecommons.async.impl.config.props.BrokerConfigProps; +import org.reactivecommons.async.impl.sns.communications.Sender; +import org.reactivecommons.async.impl.sns.communications.TopologyCreator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import javax.annotation.PostConstruct; + +import static reactor.rabbitmq.ExchangeSpecification.exchange; + +@Configuration +@Import(AWSConfig.class) +@RequiredArgsConstructor +public class EventBusConfig { + + private final BrokerConfigProps props; + private final TopologyCreator topology; + + @PostConstruct + public void createEventTopic() { + final String exchangeName = props.getDomainEventsExchangeName(); + topology.createTopic(exchangeName).block(); + } + + @Bean + public DomainEventBus domainEventBus(Sender sender) { + final String exchangeName = props.getDomainEventsExchangeName(); + return new SNSDomainEventBus(sender, exchangeName); + } +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java new file mode 100644 index 00000000..5f042d75 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java @@ -0,0 +1,109 @@ +package org.reactivecommons.async.impl.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.DefaultQueryHandler; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; +import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.impl.config.props.AsyncProps; +import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.impl.handlers.ApplicationCommandHandler; +import org.reactivecommons.async.impl.handlers.ApplicationEventHandler; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Mono; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Configuration +@RequiredArgsConstructor +public class MessageListenersConfig { + + @Value("${spring.application.name}") + private String appName; + + private final AsyncProps asyncProps; + + + @Bean //TODO: move to own config (QueryListenerConfig) + public ApplicationEventHandler eventListener(HandlerResolver resolver, MessageConverter messageConverter) { + final ApplicationEventHandler appListener = new ApplicationEventHandler(resolver, messageConverter); + + return appListener; + } + + @Bean + public ApplicationCommandHandler applicationCommandListener(HandlerResolver resolver, MessageConverter messageConverter) { + ApplicationCommandHandler commandListener = new ApplicationCommandHandler(resolver, messageConverter); + return commandListener; + } + + @Bean + public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) { + final Map registries = context.getBeansOfType(HandlerRegistry.class); + + final ConcurrentMap handlers = registries + .values().stream() + .flatMap(r -> r.getHandlers().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap eventListeners = registries + .values().stream() + .flatMap(r -> r.getEventListeners().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap commandHandlers = registries + .values().stream() + .flatMap(r -> r.getCommandHandlers().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap notificationHandlers = registries + .values().stream() + .flatMap(r -> r.getEventListeners().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + return new HandlerResolver(handlers, eventListeners, commandHandlers, notificationHandlers) { + @Override + @SuppressWarnings("unchecked") + public RegisteredCommandHandler getCommandHandler(String path) { + final RegisteredCommandHandler handler = super.getCommandHandler(path); + return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); + } + }; + } + + @Bean + @ConditionalOnMissingBean + public MessageConverter messageConverter() { + ObjectMapper mapper = new ObjectMapper(); + return new JacksonMessageConverter(mapper); + } + + @Bean + @ConditionalOnMissingBean + public DefaultQueryHandler defaultHandler() { + return (DefaultQueryHandler) command -> + Mono.error(new RuntimeException("No Handler Registered")); + } + + + @Bean + @ConditionalOnMissingBean + public DefaultCommandHandler defaultCommandHandler() { + return message -> Mono.error(new RuntimeException("No Handler Registered")); + } +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java new file mode 100644 index 00000000..f8cf2579 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.impl.config.DirectAsyncGatewayConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(DirectAsyncGatewayConfig.class) +@Configuration +public @interface EnableDirectAsyncGateway { +} + + + diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java new file mode 100644 index 00000000..3fcf2f39 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.impl.config.EventBusConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(EventBusConfig.class) +@Configuration +public @interface EnableDomainEventBus { +} + + + diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java new file mode 100644 index 00000000..bc308ca2 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.impl.config.MessageListenersConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(MessageListenersConfig.class) +@Configuration +public @interface EnableMessageListeners { +} + + + diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java new file mode 100644 index 00000000..705fc7e8 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java @@ -0,0 +1,32 @@ +package org.reactivecommons.async.impl.config.props; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + + +@Getter +@Setter +@ConfigurationProperties(prefix = "app.async") +public class AsyncProps { + + @NestedConfigurationProperty + private FluxProps flux = new FluxProps(); + + @NestedConfigurationProperty + private DomainProps domain = new DomainProps(); + + @NestedConfigurationProperty + private DirectProps direct = new DirectProps(); + + private Integer maxRetries = 2; + + private Integer prefetchCount = 10; + + private Integer retryDelay = 1000; + + private Boolean withDLQRetry = false; + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java new file mode 100644 index 00000000..7c86f539 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java @@ -0,0 +1,78 @@ +package org.reactivecommons.async.impl.config.props; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.impl.config.IBrokerConfigProps; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.Base64Utils; + +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + + +@Getter +@Configuration +@RequiredArgsConstructor +public class BrokerConfigProps implements IBrokerConfigProps { + + @Value("${spring.application.name}") + private String appName; + + @Value("${spring.application.domain-name}") + private String domainName; + + private final AsyncProps asyncProps; + + private final AtomicReference replyQueueName = new AtomicReference<>(); + + @Override + public String getEventsQueue() { + return appName + "-" + domainName + "-subsEvents"; + } + + @Override + public String getQueriesQueue() { + return appName + "-" + domainName + "-query"; + } + + @Override + public String getCommandsQueue() { + return appName + "-" + domainName + "-commands"; + } + + @Override + public String getReplyQueue() { + final String name = replyQueueName.get(); + if (name == null) { + final String replyName = newRandomQueueName(); + if (replyQueueName.compareAndSet(null, replyName)) { + return replyName; + } else { + return replyQueueName.get(); + } + } + return name; + } + + @Override + public String getDomainEventsExchangeName() { + return domainName + "-" + asyncProps.getDomain().getEvents().getTopic(); + } + + @Override + public String getDirectMessagesExchangeName() { + return "-" + domainName + "-" + asyncProps.getDirect().getTopic(); + } + + private String newRandomQueueName() { + UUID uuid = UUID.randomUUID(); + ByteBuffer bb = ByteBuffer.wrap(new byte[16]); + bb.putLong(uuid.getMostSignificantBits()) + .putLong(uuid.getLeastSignificantBits()); + return appName + Base64Utils.encodeToUrlSafeString(bb.array()) + .replaceAll("=", ""); + } + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DirectProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DirectProps.java new file mode 100644 index 00000000..d7e79822 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DirectProps.java @@ -0,0 +1,12 @@ +package org.reactivecommons.async.impl.config.props; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class DirectProps { + + private String topic = "directMessages"; + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DomainProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DomainProps.java new file mode 100644 index 00000000..761e5ac7 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/DomainProps.java @@ -0,0 +1,14 @@ +package org.reactivecommons.async.impl.config.props; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +@Getter +@Setter +public class DomainProps { + + @NestedConfigurationProperty + private EventsProps events = new EventsProps(); + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/EventsProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/EventsProps.java new file mode 100644 index 00000000..139d998f --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/EventsProps.java @@ -0,0 +1,12 @@ +package org.reactivecommons.async.impl.config.props; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class EventsProps { + + private String topic = "domainEvents"; + +} diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/FluxProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/FluxProps.java new file mode 100644 index 00000000..cc15ab42 --- /dev/null +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/FluxProps.java @@ -0,0 +1,12 @@ +package org.reactivecommons.async.impl.config.props; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class FluxProps { + + private Integer maxConcurrency = 250; + +} diff --git a/async/async-sqs-starter/src/test/resources/application.properties b/async/async-sqs-starter/src/test/resources/application.properties new file mode 100644 index 00000000..3d8d7db0 --- /dev/null +++ b/async/async-sqs-starter/src/test/resources/application.properties @@ -0,0 +1 @@ +spring.application.name=test-app \ No newline at end of file diff --git a/async/async-sqs/async-sqs.gradle b/async/async-sqs/async-sqs.gradle new file mode 100644 index 00000000..18cdf72c --- /dev/null +++ b/async/async-sqs/async-sqs.gradle @@ -0,0 +1,99 @@ +plugins { + id "com.jfrog.bintray" version "1.8.5" + id 'java-library' + id 'maven' + id 'maven-publish' +} + + +def pomConfig = { + licenses { + license { + name "The Apache Software License, Version 2.0" + url "http://www.apache.org/licenses/LICENSE-2.0.txt" + distribution "repo" + } + } + developers { + developer { + id "andmagom" + name "Andrés Mauricio Gómez P" + email "andmagom@outlook.com" + } + developer { + id "alejobtc" + name "Alejandro Betancur Barrientos" + email "alejobtc@gmail.com" + } + } + + scm { + url "git@github.com:reactive-commons/reactive-commons-java.git" + } +} + +publishing { + publications { + MyPublication(MavenPublication) { + from components.java + artifact sourcesJar { + classifier "sources" + } + artifact javadocJar { + classifier "javadoc" + } + groupId 'org.reactivecommons' + artifactId 'async-rabbit' + version project.property('version') + pom.withXml { + def root = asNode() + root.appendNode('description', 'Async Rabbit') + root.appendNode('name', 'async-rabbit') + root.appendNode('url', 'https://site_for_lib.tld') + root.children().last() + pomConfig + } + } + } +} + +bintray { + user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') + key = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') + publications = ['MyPublication'] + publish = true + pkg { + repo = 'maven-artifacts' + userOrg = 'reactive-commons' + name = 'reactive-commons' + licenses = ['Apache-2.0'] + vcsUrl = 'git@github.com:reactive-commons/reactive-commons-java.git' + version { + name = project.property('version') + desc = 'First version' + released = new Date() + vcsTag = project.property('version') + } + } +} + +dependencies { + implementation platform('software.amazon.awssdk:bom:2.13.10') + compile project(":async-commons-api") + compile project(":domain-events-api") + compile project(":async-commons") + + api 'io.projectreactor:reactor-core' + api 'com.fasterxml.jackson.core:jackson-databind' + testImplementation 'io.projectreactor:reactor-test' + implementation 'software.amazon.awssdk:sns' + implementation 'software.amazon.awssdk:sqs' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.11.1' + compile group: 'joda-time', name: 'joda-time', version: '2.10.6' + compile("org.springframework.boot:spring-boot-starter-log4j2") +} + +configurations { + all { + exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' + } +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java new file mode 100644 index 00000000..f908baa1 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.api.DynamicRegistry; +import org.reactivecommons.async.api.handlers.EventHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +public class DynamicRegistryImp implements DynamicRegistry { + + private final HandlerResolver resolver; + + @Override + public Mono listenEvent(String eventName, EventHandler fn, Class eventClass) { + resolver.addEventListener(new RegisteredEventListener<>(eventName, fn, eventClass)); + return Mono.empty(); + } +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/Handlers.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/Handlers.java new file mode 100644 index 00000000..6fa42f90 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/Handlers.java @@ -0,0 +1,39 @@ +package org.reactivecommons.async.impl; + +import java.util.Collection; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; + +@RequiredArgsConstructor +public class Handlers { + private final Map queryHandlers; + private final Map eventListeners; + private final Map commandHandlers; + + + @SuppressWarnings("unchecked") + public RegisteredQueryHandler getQueryHandler(String path) { + return (RegisteredQueryHandler) queryHandlers.get(path); + } + + @SuppressWarnings("unchecked") + public RegisteredCommandHandler getCommandHandler(String path) { + return commandHandlers.get(path); + } + + @SuppressWarnings("unchecked") + public RegisteredEventListener getEventListener(String path) { + return eventListeners.get(path); + } + + public Collection getEventListeners() { + return eventListeners.values(); + } + + void addEventListener(RegisteredEventListener listener) { + eventListeners.put(listener.getPath(), listener); + } +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/Headers.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/Headers.java new file mode 100644 index 00000000..ad7d7dc0 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/Headers.java @@ -0,0 +1,14 @@ +package org.reactivecommons.async.impl; + +public final class Headers { + public static final String REPLY_ID = "x-reply_id"; + public static final String CORRELATION_ID = "x-correlation-id"; + public static final String COMPLETION_ONLY_SIGNAL = "x-empty-completion"; + public static final String SERVED_QUERY_ID = "x-serveQuery-id"; + public static final String SOURCE_APPLICATION = "sourceApplication"; + public static final String SIGNAL_TYPE = "x-signal-type"; + public static final String TIMESTAMP = "timestamp"; + + private Headers() { + } +} \ No newline at end of file diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDirectAsyncGateway.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDirectAsyncGateway.java new file mode 100644 index 00000000..58610cc6 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDirectAsyncGateway.java @@ -0,0 +1,36 @@ +package org.reactivecommons.async.impl; + +import lombok.AllArgsConstructor; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.api.DirectAsyncGateway; +import org.reactivecommons.async.api.From; +import org.reactivecommons.async.impl.sns.communications.Sender; +import reactor.core.publisher.Mono; + +@AllArgsConstructor +public class SNSDirectAsyncGateway implements DirectAsyncGateway { + + private Sender sender; + private String topicTarget; + + @Override + public Mono sendCommand(Command command, String targetAppName) { + return sender.publish(command, getTargetTopic(targetAppName)) + .onErrorMap(err -> new RuntimeException("Command send failure: " + command.getName() + " Reason: "+ err.getMessage(), err)); + } + + @Override + public Mono requestReply(AsyncQuery query, String targetName, Class type) { + return null; + } + + @Override + public Mono reply(T response, From from) { + return null; + } + + private String getTargetTopic(String targetAppName) { + return targetAppName.concat(topicTarget); + } +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDomainEventBus.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDomainEventBus.java new file mode 100644 index 00000000..3ffcc6e3 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/SNSDomainEventBus.java @@ -0,0 +1,21 @@ +package org.reactivecommons.async.impl; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.impl.sns.communications.Sender; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +public class SNSDomainEventBus implements DomainEventBus { + + private final Sender sender; + public final String topicName ; + + @Override + public Mono emit(DomainEvent event) { + return sender.publish(event, topicName) + .onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName() + " Reason: "+ err.getMessage(), err)); + } + +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java new file mode 100644 index 00000000..f2adc249 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java @@ -0,0 +1,53 @@ +package org.reactivecommons.async.impl.handlers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.java.Log; +import lombok.extern.log4j.Log4j2; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.impl.model.MessageSQS; +import org.reactivecommons.async.impl.model.SNSEventModel; +import reactor.core.publisher.Mono; + +@Log4j2 +public class ApplicationCommandHandler extends GenericMessageHandler { + + private final MessageConverter messageConverter; + + public ApplicationCommandHandler(HandlerResolver handlers, MessageConverter messageConverter) { + super(handlers); + this.messageConverter = messageConverter; + } + + private Mono getHandler(SNSEventModel msj) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + Command command = objectMapper.readValue(msj.getMessage(), Command.class); + String commandName = command.getName(); + RegisteredCommandHandler handler = handlers.getCommandHandler(commandName); + if (handler != null) { + return Mono.just(handler); + } else { + log.error("Handler doesn't found for command " + commandName); + return Mono.empty(); + } + } catch (JsonProcessingException e) { + return Mono.error(e); + } + } + + public Mono handle(SNSEventModel msj) { + return getHandler(msj) + .flatMap(handler -> { + Class dataClass = handler.getInputClass(); + MessageSQS message = new MessageSQS(msj.getMessage()); + Command command = messageConverter.readCommand(message, dataClass); + return handler.getHandler() + .handle(command); + }); + } + +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java new file mode 100644 index 00000000..7b0f6fbc --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java @@ -0,0 +1,53 @@ +package org.reactivecommons.async.impl.handlers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.java.Log; +import lombok.extern.log4j.Log4j2; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.impl.model.MessageSQS; +import org.reactivecommons.async.impl.model.SNSEventModel; +import reactor.core.publisher.Mono; + +@Log4j2 +public class ApplicationEventHandler extends GenericMessageHandler { + + private final MessageConverter messageConverter; + + public ApplicationEventHandler(HandlerResolver handlers, MessageConverter messageConverter) { + super(handlers); + this.messageConverter = messageConverter; + } + + private Mono getHandler(SNSEventModel msj) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + DomainEvent event = objectMapper.readValue(msj.getMessage(), DomainEvent.class); + String eventName = event.getName(); + RegisteredEventListener handler = handlers.getEventListener(eventName); + if (handler != null) { + return Mono.just(handler); + } else { + log.info("Handler doesn't found for event " + eventName); + return Mono.empty(); + } + } catch (JsonProcessingException e) { + return Mono.error(e); + } + } + + public Mono handle(SNSEventModel msj) { + return getHandler(msj) + .flatMap(handler -> { + Class dataClass = handler.getInputClass(); + MessageSQS message = new MessageSQS(msj.getMessage()); + DomainEvent domainEvent = messageConverter.readDomainEvent(message, dataClass); + return handler.getHandler() + .handle(domainEvent); + }); + } + +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java new file mode 100644 index 00000000..ca92bd29 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java @@ -0,0 +1,16 @@ +package org.reactivecommons.async.impl.handlers; + +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import lombok.extern.log4j.Log4j2; +import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.impl.model.SNSEventModel; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +public abstract class GenericMessageHandler { + protected final HandlerResolver handlers; + public abstract Mono handle(SNSEventModel msj); +} + + diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java new file mode 100644 index 00000000..3ae7c27a --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java @@ -0,0 +1,12 @@ +package org.reactivecommons.async.impl.model; + +import lombok.Data; + +@Data +public class MessageSQS implements org.reactivecommons.async.impl.communications.Message { + private byte[] body; + private Properties properties; + public MessageSQS(String message) { + body = message.getBytes(); + } +} \ No newline at end of file diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/SNSEventModel.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/SNSEventModel.java new file mode 100644 index 00000000..a4dda8e7 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/SNSEventModel.java @@ -0,0 +1,61 @@ +package org.reactivecommons.async.impl.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import lombok.Data; +import org.joda.time.DateTime; + +@Data +public class SNSEventModel { + + private static final long serialVersionUID = -7038894618736475592L; + + @JsonProperty("MessageAttributes") + private Map messageAttributes; + + @JsonProperty("SigningCertURL") + private String signingCertUrl; + + @JsonProperty("MessageId") + private String messageId; + + @JsonProperty("Message") + private String message; + + @JsonProperty("Subject") + private String subject; + + @JsonProperty("UnsubscribeURL") + private String unsubscribeUrl; + + @JsonProperty("Type") + private String type; + + @JsonProperty("SignatureVersion") + private String signatureVersion; + + @JsonProperty("Signature") + private String signature; + + @JsonProperty("Timestamp") + private DateTime timestamp; + + @JsonProperty("TopicArn") + private String topicArn; + + public void setTimestamp(String timestamp) { + DateTime dt = new DateTime(timestamp); + this.timestamp = dt; + } +} + +@Data +class MessageAttributeModel { + private static final long serialVersionUID = -5656179310535967619L; + + @JsonProperty("Type") + private String type; + + @JsonProperty("Value") + private String value; +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Listener.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Listener.java new file mode 100644 index 00000000..822b5ce4 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Listener.java @@ -0,0 +1,118 @@ +package org.reactivecommons.async.impl.sns.communications; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Builder; +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import lombok.extern.log4j.Log4j2; +import org.reactivecommons.async.api.handlers.GenericHandler; +import org.reactivecommons.async.impl.model.SNSEventModel; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.Map; + +@RequiredArgsConstructor +@Log4j2 +@Builder +public class Listener { + + private final SqsAsyncClient client; + private final SQSSender sqsSender; + private final int retryDelay; + private final int maxRetries; + private final int prefetchCount; + + public Mono listen(String queueName, GenericHandler handler) { + return getMessages(queueName) + .flatMap(this::mapObject) + .flatMap(tuple -> handleMessage(tuple, queueName, handler)) + .then(); + } + + public Mono handleMessage(Tuple2 tuple, + String queueName, + GenericHandler handler) { + return handler.handle(tuple.getT1()) + .onErrorResume(message -> { + log.error("Error handling the message: " + message.getMessage()); + retryMessage(tuple, queueName); + return Mono.just(1).then(); + }) + .doOnSuccess(message -> deleteMessage(queueName, tuple.getT2()).subscribe()); + } + + private void retryMessage(Tuple2 tuple, String queueName) { + log.info("Executing retry message"); + Map attributes = tuple.getT2().messageAttributes(); + MessageAttributeValue count = attributes.getOrDefault( "retries", MessageAttributeValue.builder().stringValue("0").build()) ; + String countString = count.stringValue(); + if(Integer.parseInt(countString) < maxRetries) { + sqsSender.send(tuple.getT2().body(), getRetryDelay(), countString, queueName).subscribe(); + } else { + log.info("Discarding message completely !!! " + tuple.getT2().messageId()); + } + } + + + public Mono> mapObject(Message message) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + return Mono.zip(Mono.just(objectMapper.readValue(message.body(), SNSEventModel.class)), + Mono.just(message)); + } catch (JsonProcessingException ex) { + return Mono.error(ex); + } + } + + public Mono deleteMessage(String queueName, Message m) { + DeleteMessageRequest deleteMessageRequest = getDeleteMessageRequest(queueName, + m.receiptHandle()); + return Mono.fromFuture(client.deleteMessage(deleteMessageRequest)) + .doOnNext(response -> log.info("Message from: " + queueName + " Hash: " + response.hashCode() + " deleted")) + .doOnError(message -> log.error("Error to delete a message - posible message duplication: " + message.getMessage())); + } + + public Flux getMessages(String queueName) { + return getReceiveMessageRequest(queueName) + .flatMap((req) -> Mono.fromFuture(client.receiveMessage(req)) + .doOnSuccess(response -> log.info("Size: " + response.messages().size())) + .doOnError((e) -> { + System.out.println(e.getMessage()); + }) + ) + .flatMapMany((response) -> Flux.fromIterable(response.messages())); + + } + + public Mono getReceiveMessageRequest(String name) { + log.info("Getting messages from " + name); + return Mono.just( + ReceiveMessageRequest.builder() + .queueUrl(name) + .maxNumberOfMessages(prefetchCount) + .waitTimeSeconds(20) + .messageAttributeNames("retries") + .build() + ); + } + + public DeleteMessageRequest getDeleteMessageRequest(String queueName, String receiptHandle) { + return DeleteMessageRequest.builder().queueUrl(queueName).receiptHandle(receiptHandle).build(); + } + + public Flux startListener(String queueName, GenericHandler handler) { + return listen(queueName, handler) + .doOnSuccess((e) -> log.debug("listen terminated " + queueName)) + .repeat(); + } + + private int getRetryDelay() { + return retryDelay / 1000; + } + +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/SQSSender.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/SQSSender.java new file mode 100644 index 00000000..731d84fb --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/SQSSender.java @@ -0,0 +1,54 @@ +package org.reactivecommons.async.impl.sns.communications; + +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +import java.util.HashMap; +import java.util.Map; + +@RequiredArgsConstructor +@Log4j2 +public class SQSSender { + private final SqsAsyncClient client; + + public Mono send(String message, int delaySeconds, String retryCount, String queueUrl) { + return getSendMessageRequest(message, delaySeconds, retryCount, queueUrl) + .flatMap( request -> Mono.fromFuture( client.sendMessage(request) )) + .doOnSuccess(response -> log.info("Retry messange sent " + response.messageId())) + .doOnError(error -> log.error("Retry message error" + error.getMessage())) + .then(); + } + + private Mono getSendMessageRequest(String message, int delaySeconds, String retryCount, String queueUrl) { + SendMessageRequest request = SendMessageRequest.builder() + .delaySeconds(delaySeconds) + .messageBody( message ) + .queueUrl(queueUrl) + .messageAttributes( incrementRetryAttribute( retryCount )) + .build(); + + return Mono.just(request); + } + + private Map incrementRetryAttribute(String retryCount) { + Map attributes = new HashMap<>(); + int newCount = Integer.parseInt(retryCount); + newCount += 1; + addAttribute(attributes, "retries", newCount + ""); + return attributes; + } + + private void addAttribute(Map messageAttributes, final String attributeName, final String attributeValue) { + MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder() + .dataType("String") + .stringValue(attributeValue) + .build(); + + messageAttributes.put(attributeName, messageAttributeValue); + } + +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Sender.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Sender.java new file mode 100644 index 00000000..3300b7c7 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/Sender.java @@ -0,0 +1,78 @@ +package org.reactivecommons.async.impl.sns.communications; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.reactivecommons.async.impl.Headers; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +@Data +@RequiredArgsConstructor +@Log4j2 +public class Sender { + + private final SnsAsyncClient client; + private final String sourceApplication; + private final String prefixARN; + + public Mono publish(T message, String targetName) { + return getPublishRequest(message, targetName) + .flatMap(request -> Mono.fromFuture(client.publish(request))) + .doOnSuccess(response -> log.info(response.messageId())) + .then(); + } + + private Mono getPublishRequest(T message, String targetName) { + try { + PublishRequest request = PublishRequest.builder() + .message(objectToJSON(message)) + .messageAttributes(getMessageAttributes()) + .topicArn(getTopicARN(targetName)) + .build(); + return Mono.just(request); + } catch (JsonProcessingException e) { + return Mono.error(e); + } + } + + private String getTopicARN(String targetTopic) { + return prefixARN + ":" + targetTopic; + } + + private String objectToJSON(T message) throws JsonProcessingException { + ObjectWriter ow = new ObjectMapper().writer(); + String json = ow.writeValueAsString(message); + return json; + } + + // TODO add messageAttributes + private Map getMessageAttributes() { + Map messageAttributes = new HashMap<>(); + addAttribute(messageAttributes, Headers.SOURCE_APPLICATION, sourceApplication); + addAttribute(messageAttributes, Headers.CORRELATION_ID, UUID.randomUUID().toString()); + addAttribute(messageAttributes, Headers.TIMESTAMP, new Date().toString()); + return messageAttributes; + } + + private void addAttribute(Map messageAttributes, final String attributeName, + final String attributeValue) { + MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder() + .dataType("String") + .stringValue(attributeValue) + .build(); + + messageAttributes.put(attributeName, messageAttributeValue); + } + +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/TopologyCreator.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/TopologyCreator.java new file mode 100644 index 00000000..2cf931e8 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/sns/communications/TopologyCreator.java @@ -0,0 +1,172 @@ +package org.reactivecommons.async.impl.sns.communications; + +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.*; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.HashMap; +import java.util.Map; + +@Log4j2 +@AllArgsConstructor +public class TopologyCreator { + + private final SnsAsyncClient topicClient; + private final SqsAsyncClient queueClient; + + + public Mono declareTopic(String name) { + return listTopics() + .map((topic) -> topic.topicArn()) + .filter((topic) -> topic.contains(":" + name)) + .switchIfEmpty(createTopic(name)) + .single(); + } + + private Mono getListTopicRequest() { + return Mono.just(ListTopicsRequest.builder().build()); + } + + public Flux listTopics() { + return getListTopicRequest() + .flatMap(request -> Mono.fromFuture(topicClient.listTopics(request))) + .flatMapMany((response) -> Flux.fromIterable(response.topics())); + } + + private Mono getCreateTopicRequest(String name) { + return Mono.just(CreateTopicRequest.builder().name(name).build()); + } + + public Mono createTopic(String name) { + return getCreateTopicRequest(name) + .flatMap(request -> Mono.fromFuture(topicClient.createTopic(request))) + .map(CreateTopicResponse::topicArn) + .doOnNext((response) -> log.debug("Topic Created: " + response)) + .doOnError((e) -> log.error("Error creating topic: " + e.toString())); + } + + + private Mono createQueueRequest(String name) { + return Mono.just(CreateQueueRequest.builder().queueName(name).build()); + } + + public Mono createQueue(String name) { + return createQueueRequest(name) + .flatMap(request -> Mono.fromFuture(queueClient.createQueue(request))) + .map(CreateQueueResponse::queueUrl) + .doOnNext((response) -> log.debug("Queue created: " + response)) + .doOnError((e) -> log.error("Error creating queue: " + e.toString())); + } + + + public Mono getTopicArn(String name) { + return listTopics() + .map(Topic::topicArn) + .filter((topic) -> topic.contains(":" + name)) + .single(); + } + + private Mono getQueueUrlRequest(String queueName) { + return Mono.just(GetQueueUrlRequest.builder().queueName(queueName).build()); + } + + public Mono getQueueUrl(String name) { + return getQueueUrlRequest(name) + .flatMap((request) -> Mono.fromFuture(queueClient.getQueueUrl(request))) + .map(GetQueueUrlResponse::queueUrl); + } + + public Mono bind(String queueName, String topicName) { + return getQueueUrl(queueName) + .flatMap(this::getQueueArn) + .zipWith(getTopicArn(topicName)) + .flatMap((a) -> getSubscribeRequest(a.getT1(), a.getT2())) + .flatMap(request -> Mono.fromFuture(topicClient.subscribe(request))) + .map(SubscribeResponse::subscriptionArn) + .onErrorMap(TopologyDefException::new); + } + + + private Mono getSubscribeRequest(String queueArn, String topicArn) { + SubscribeRequest subscribeRequest = SubscribeRequest.builder() + .protocol("sqs") + .endpoint(queueArn) + .topicArn(topicArn) + .returnSubscriptionArn(true) + .build(); + return Mono.just(subscribeRequest); + } + + private Mono getQueueAttributesRequest(String queueUrl) { + GetQueueAttributesRequest subscribeRequest = GetQueueAttributesRequest.builder() + .queueUrl(queueUrl) + .attributeNamesWithStrings("All") + .build(); + return Mono.just(subscribeRequest); + } + + public Mono getQueueArn(String queueUrl) { + return getQueueAttributesRequest(queueUrl) + .flatMap(request -> Mono.fromFuture(queueClient.getQueueAttributes(request))) + .map((response) -> response.attributesAsStrings().get("QueueArn")); + + } + + public Mono setQueueAttributes(String queueName, String topicName, String arnSnsPrefix, + String arnSqsPrefix) { + return getQueueUrl(queueName) + .flatMap(queueUrl -> { + Map attributes = getAttributeMap(queueName, topicName, arnSnsPrefix, arnSqsPrefix); + return setQueueAttributesRequest(queueUrl, attributes); + }) + .flatMap(request -> Mono.fromFuture(queueClient.setQueueAttributes(request))) + .map(SetQueueAttributesResponse::toString); + + } + + private Map getAttributeMap(String queueName, String topicName, String arnSnsPrefix, + String arnSqsPrefix) { + Map map = new HashMap<>(); + map.put("Policy", "{\n" + + " \"Version\": \"2012-10-17\",\n" + + " \"Id\": \"" + arnSqsPrefix + ":" + queueName + "/SQSDefaultPolicy\",\n" + + " \"Statement\": [\n" + + " {\n" + + " \"Sid\": \"topic-subscription-" + arnSnsPrefix + ":" + topicName + "\",\n" + + " \"Effect\": \"Allow\",\n" + + " \"Principal\": {\n" + + " \"AWS\": \"*\"\n" + + " },\n" + + " \"Action\": \"SQS:SendMessage\",\n" + + " \"Resource\": \"" + arnSqsPrefix + ":" + queueName + "\",\n" + + " \"Condition\": {\n" + + " \"ArnLike\": {\n" + + " \"aws:SourceArn\": \"" + arnSnsPrefix + ":" + topicName + "\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"); + return map; + } + + + private Mono setQueueAttributesRequest(String queueUrl, Map attributes) { + SetQueueAttributesRequest setQueueAttributesRequest = SetQueueAttributesRequest.builder().queueUrl(queueUrl) + .attributesWithStrings(attributes) + .build(); + return Mono.just(setQueueAttributesRequest); + } + + + public static class TopologyDefException extends RuntimeException { + public TopologyDefException(Throwable cause) { + super(cause); + } + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java new file mode 100644 index 00000000..0e746dbf --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java @@ -0,0 +1,37 @@ +package org.reactivecommons.async.impl; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivecommons.async.api.DynamicRegistry; +import org.reactivecommons.async.api.handlers.EventHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class DynamicRegistryImpTest { + @Mock + private HandlerResolver resolver; + @Mock + private EventHandler eventHandler; + + @Test + public void shouldAddEventListener() { + // Arrange + DynamicRegistry registry = new DynamicRegistryImp(resolver); + String eventName = "eventName"; + Class cla = String.class; + // Act + Mono result = registry.listenEvent(eventName, eventHandler, cla); + // Assert + StepVerifier.create(result) + .verifyComplete(); + verify(resolver, times(1)).addEventListener(any(RegisteredEventListener.class)); + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java new file mode 100644 index 00000000..f03f6869 --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java @@ -0,0 +1,60 @@ +package org.reactivecommons.async.impl; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.async.api.DirectAsyncGateway; +import org.reactivecommons.async.impl.sns.communications.Sender; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class SNSDirectAsyncGatewayTest { + @Mock + private Sender sender; + private DirectAsyncGateway directAsyncGateway; + private String topicTarget; + private String targetAppName; + + @Before + public void setup() { + topicTarget = "topicTarget"; + targetAppName = "targetAppName"; + directAsyncGateway = new SNSDirectAsyncGateway(sender, topicTarget); + } + + @Test + public void shouldSendCommand() { + // Arrange + when(sender.publish(any(), anyString())).thenReturn(Mono.empty()); + Command command = new Command<>("name", "commandId", "data"); + // Act + Mono result = directAsyncGateway.sendCommand(command, targetAppName); + // Assert + StepVerifier.create(result) + .expectComplete() + .verify(); + verify(sender, times(1)).publish(command, targetAppName + topicTarget); + } + + @Test + public void shouldHandleErrorSignalWhenFail() { + // Arrange + when(sender.publish(any(), anyString())).thenReturn(Mono.error(new Exception("Unhandled exception"))); + Command command = new Command<>("name", "commandId", "data"); + // Act + Mono result = directAsyncGateway.sendCommand(command, targetAppName); + // Assert + StepVerifier.create(result) + .expectError(RuntimeException.class) + .verify(); + verify(sender, times(1)).publish(command, targetAppName + topicTarget); + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java new file mode 100644 index 00000000..42bbf2ca --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java @@ -0,0 +1,59 @@ +package org.reactivecommons.async.impl; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.impl.sns.communications.Sender; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class SNSDomainEventBusTest { + @Mock + private Sender sender; + private DomainEventBus domainEventBus; + private String topicName; + + @Before + public void setup() { + topicName = "topicName"; + domainEventBus = new SNSDomainEventBus(sender, topicName); + } + + @Test + public void shouldEmit() { + // Arrange + when(sender.publish(any(), anyString())).thenReturn(Mono.empty()); + DomainEvent event = new DomainEvent<>("name", "eventId", "data"); + // Act + Publisher result = domainEventBus.emit(event); + // Assert + StepVerifier.create(result) + .expectComplete() + .verify(); + verify(sender, times(1)).publish(event, topicName); + } + + @Test + public void shouldHandleErrorSignalWhenFail() { + // Arrange + when(sender.publish(any(), anyString())).thenReturn(Mono.error(new Exception("Unhandled exception"))); + DomainEvent event = new DomainEvent<>("name", "eventId", "data"); + // Act + Publisher result = domainEventBus.emit(event); + // Assert + StepVerifier.create(result) + .expectError(RuntimeException.class) + .verify(); + verify(sender, times(1)).publish(event, topicName); + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java new file mode 100644 index 00000000..d793bfc2 --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java @@ -0,0 +1,92 @@ +package org.reactivecommons.async.impl.handlers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivecommons.async.api.handlers.CommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; +import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.impl.model.SNSEventModel; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ApplicationCommandHandlerTest { + @Mock + private RegisteredCommandHandler commandHandler; + @Mock + private CommandHandler handler; + + private HandlerResolver resolver; + private MessageConverter messageConverter; + + @Before + public void setup() { + Map commandHandlers = new ConcurrentHashMap<>(); + Map eventListeners = new ConcurrentHashMap<>(); + Map queryHandlers = new ConcurrentHashMap<>(); + commandHandlers.put("my.command", commandHandler); + resolver = new HandlerResolver(queryHandlers, eventListeners, commandHandlers); + messageConverter = new JacksonMessageConverter(new ObjectMapper()); + } + + @Test + public void shouldGetHandlerCorrectly() { + // Arrange + when(handler.handle(any())).thenReturn(Mono.empty()); + when(commandHandler.getInputClass()).thenReturn(String.class); + when(commandHandler.getHandler()).thenReturn(handler); + ApplicationCommandHandler commandHandler = new ApplicationCommandHandler(resolver, messageConverter); + SNSEventModel eventModel = new SNSEventModel(); + eventModel.setMessage("{\"name\":\"my.command\",\"commandId\":\"my.command.id\",\"data\":\"string data\"}"); + // Act + Mono handledMessage = commandHandler.handle(eventModel); + // Assert + StepVerifier.create(handledMessage) + .expectComplete() + .verify(); + } + + @Test + public void shouldReturnEmptyWhenNoHandler() { + // Arrange + ApplicationCommandHandler commandHandler = new ApplicationCommandHandler(resolver, messageConverter); + SNSEventModel eventModel = new SNSEventModel(); + eventModel.setMessage("{\"name\":\"non-existent\",\"commandId\":\"my.command.id\",\"data\":\"string data\"}"); + // Act + Mono handledMessage = commandHandler.handle(eventModel); + // Assert + StepVerifier.create(handledMessage) + .expectComplete() + .verify(); + } + + @Test + public void shouldHandleErrorParsingJson() { + // Arrange + ApplicationCommandHandler commandHandler = new ApplicationCommandHandler(resolver, messageConverter); + SNSEventModel eventModel = new SNSEventModel(); + eventModel.setMessage("it's a bad command json"); + // Act + Mono handledMessage = commandHandler.handle(eventModel); + // Assert + StepVerifier.create(handledMessage) + .expectError(JsonProcessingException.class) + .verify(); + } + +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java new file mode 100644 index 00000000..6f646d59 --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java @@ -0,0 +1,92 @@ +package org.reactivecommons.async.impl.handlers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivecommons.async.api.handlers.EventHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; +import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.impl.model.SNSEventModel; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ApplicationEventHandlerTest { + @Mock + private RegisteredEventListener eventListener; + @Mock + private EventHandler handler; + + private HandlerResolver resolver; + private MessageConverter messageConverter; + + @Before + public void setup() { + Map commandHandlers = new ConcurrentHashMap<>(); + Map eventListeners = new ConcurrentHashMap<>(); + Map queryHandlers = new ConcurrentHashMap<>(); + eventListeners.put("my.event", eventListener); + resolver = new HandlerResolver(queryHandlers, eventListeners, commandHandlers); + messageConverter = new JacksonMessageConverter(new ObjectMapper()); + } + + @Test + public void shouldGetHandlerCorrectly() { + // Arrange + when(handler.handle(any())).thenReturn(Mono.empty()); + when(eventListener.getInputClass()).thenReturn(String.class); + when(eventListener.getHandler()).thenReturn(handler); + ApplicationEventHandler eventHandler = new ApplicationEventHandler(resolver, messageConverter); + SNSEventModel eventModel = new SNSEventModel(); + eventModel.setMessage("{\"name\":\"my.event\",\"eventId\":\"my.event.id\",\"data\":\"string data\"}"); + // Act + Mono handledMessage = eventHandler.handle(eventModel); + // Assert + StepVerifier.create(handledMessage) + .expectComplete() + .verify(); + } + + @Test + public void shouldReturnEmptyWhenNoHandler() { + // Arrange + ApplicationEventHandler eventHandler = new ApplicationEventHandler(resolver, messageConverter); + SNSEventModel eventModel = new SNSEventModel(); + eventModel.setMessage("{\"name\":\"non-existent\",\"eventId\":\"my.event.id\",\"data\":\"string data\"}"); + // Act + Mono handledMessage = eventHandler.handle(eventModel); + // Assert + StepVerifier.create(handledMessage) + .expectComplete() + .verify(); + } + + @Test + public void shouldHandleErrorParsingJson() { + // Arrange + ApplicationEventHandler commandHandler = new ApplicationEventHandler(resolver, messageConverter); + SNSEventModel eventModel = new SNSEventModel(); + eventModel.setMessage("it's a bad command json"); + // Act + Mono handledMessage = commandHandler.handle(eventModel); + // Assert + StepVerifier.create(handledMessage) + .expectError(JsonProcessingException.class) + .verify(); + } + +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java new file mode 100644 index 00000000..dda9f70c --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java @@ -0,0 +1,20 @@ +package org.reactivecommons.async.impl.model; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SNSEventModelTest { + + @Test + public void shouldParseTimestamp() { + // Arrange + String timestampStr = "2020-09-21T14:39:01.962-05:00"; + long timestamp = 1600717141962L; + SNSEventModel model = new SNSEventModel(); + // Act + model.setTimestamp(timestampStr); + // Assert + assertThat(model.getTimestamp().getMillis()).isEqualTo(timestamp); + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java new file mode 100644 index 00000000..dde90d40 --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java @@ -0,0 +1,67 @@ +package org.reactivecommons.async.impl.sns.communications; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; + +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SQSSenderTest { + private final String message = "my string message"; + private final int delaySeconds = 0; + private final String retryCount = "0"; + private final String queueUrl = "queueUrl"; + @Mock + private SqsAsyncClient client; + private SQSSender sender; + + @Before + public void setup() { + sender = new SQSSender(client); + } + + @Test + public void shouldSend() { + // Arrange + SendMessageResponse response = SendMessageResponse.builder().messageId("messageId").build(); + when(client.sendMessage(any(SendMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); + ArgumentCaptor captor = ArgumentCaptor.forClass(SendMessageRequest.class); + // Act + Mono result = sender.send(message, delaySeconds, retryCount, queueUrl); + // Assert + StepVerifier.create(result).verifyComplete(); + verify(client).sendMessage(captor.capture()); + SendMessageRequest request = captor.getValue(); + assertThat(request.delaySeconds()).isEqualTo(delaySeconds); + assertThat(request.queueUrl()).isEqualTo(queueUrl); + assertThat(request.messageBody()).isEqualTo(message); + assertThat(request.messageAttributes().get("retries").stringValue()).isEqualTo("1"); + } + + + @Test + public void shouldHandleErrorSignalWhenFail() { + // Arrange + CompletableFuture response = new CompletableFuture<>(); + response.completeExceptionally(new Exception("Unexpected Exception")); + when(client.sendMessage(any(SendMessageRequest.class))).thenReturn(response); + // Act + Mono result = sender.send(message, delaySeconds, retryCount, queueUrl); + // Assert + StepVerifier.create(result).verifyError(Exception.class); + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java new file mode 100644 index 00000000..0a09c5a5 --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java @@ -0,0 +1,87 @@ +package org.reactivecommons.async.impl.sns.communications; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivecommons.async.impl.Headers; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.PublishRequest; +import software.amazon.awssdk.services.sns.model.PublishResponse; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SenderTest { + + private final String sourceApp = "myAppName"; + private final String arnPrefix = "arnPrefix"; + @Mock + private SnsAsyncClient client; + private Sender sender; + + @Before + public void setup() { + sender = new Sender(client, sourceApp, arnPrefix); + } + + @Test + public void shouldSend() { + // Arrange + ArgumentCaptor captor = ArgumentCaptor.forClass(PublishRequest.class); + PublishResponse response = PublishResponse.builder().messageId("messageId").build(); + when(client.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); + String targetName = "targetName"; + Object message = getMessage(); + String jsonMessage = "{\"a\":1,\"b\":\"dos\"}"; + // Act + Mono result = sender.publish(message, targetName); + // Assert + StepVerifier.create(result).verifyComplete(); + verify(client).publish(captor.capture()); + PublishRequest request = captor.getValue(); + assertThat(request.message()).isEqualTo(jsonMessage); + assertThat(request.topicArn()).isEqualTo(arnPrefix + ":" + targetName); + assertThat(request.messageAttributes().get(Headers.SOURCE_APPLICATION).stringValue()).isEqualTo(sourceApp); + } + + @Test + public void shouldHandleErrorSignalWhenFail() { + // Arrange + String targetName = "targetName"; + ClassThatJacksonCannotSerialize message = new ClassThatJacksonCannotSerialize(); + // Act + Mono result = sender.publish(message, targetName); + // Assert + StepVerifier.create(result) + .verifyError(JsonProcessingException.class); + } + + private Object getMessage() { + Map message = new HashMap<>(); + message.put("a", 1); + message.put("b", "dos"); + return message; + } + + private static class ClassThatJacksonCannotSerialize { + private final ClassThatJacksonCannotSerialize self = this; + + @Override + public String toString() { + return self.getClass().getName(); + } + } +} diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java new file mode 100644 index 00000000..9db8fa7c --- /dev/null +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java @@ -0,0 +1,311 @@ +package org.reactivecommons.async.impl.sns.communications; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.*; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TopologyCreatorTest { + @Mock + private SnsAsyncClient topicClient; + @Mock + private SqsAsyncClient queueClient; + + private TopologyCreator creator; + + @Before + public void setup() { + creator = new TopologyCreator(topicClient, queueClient); + } + + @Test + public void shouldListTopics() { + // Arrange + String name = "topic-name"; + String expectedTopicArn = buildTopicArn(name); + mockListTopics(name); + // Act + Flux topics = creator.listTopics(); + // Assert + StepVerifier.create(topics) + .assertNext(topic -> assertThat(topic.topicArn()).isEqualTo(expectedTopicArn)) + .verifyComplete(); + } + + @Test + public void shouldCreateTopic() { + // Arrange + String name = "test"; + String expectedArn = buildTopicArn(name); + mockCreateTopic(name); + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateTopicRequest.class); + // Act + Mono topic = creator.createTopic(name); + // Assert + StepVerifier.create(topic) + .assertNext(topicArn -> assertThat(topicArn).isEqualTo(expectedArn)) + .verifyComplete(); + verify(topicClient).createTopic(captor.capture()); + CreateTopicRequest request = captor.getValue(); + assertThat(request.name()).isEqualTo(name); + } + + @Test + public void shouldDeclareTopic() { + // Arrange + String name = "new-topic-name"; + String expectedArn = buildTopicArn(name); + mockListTopics("existing-topic-name"); + mockCreateTopic(name); + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateTopicRequest.class); + // Act + Mono topic = creator.declareTopic(name); + // Assert + StepVerifier.create(topic) + .assertNext(topicArn -> assertThat(topicArn).isEqualTo(expectedArn)) + .verifyComplete(); + verify(topicClient).createTopic(captor.capture()); + CreateTopicRequest request = captor.getValue(); + assertThat(request.name()).isEqualTo(name); + } + + @Test + public void shouldNotDeclareTopicWhenExists() { + // Arrange + String name = "topic-name"; + String expectedArn = buildTopicArn(name); + mockListTopics(name); + mockCreateTopic(name); + // Act + Mono topic = creator.declareTopic(name); + // Assert + StepVerifier.create(topic) + .assertNext(topicArn -> assertThat(topicArn).isEqualTo(expectedArn)) + .verifyComplete(); + verify(topicClient, times(0)).createTopic(any(CreateTopicRequest.class)); + } + + @Test + public void shouldGetTopicArn() { + // Arrange + String name = "topic-name"; + String expectedArn = buildTopicArn(name); + mockListTopics(name); + // Act + Mono topic = creator.getTopicArn(name); + // Assert + StepVerifier.create(topic) + .assertNext(topicArn -> assertThat(topicArn).isEqualTo(expectedArn)) + .verifyComplete(); + } + + @Test + public void shouldCreateQueue() { + // Arrange + String name = "queue-name"; + String expectedUrl = buildQueueUrl(name); + mockCreateQueue(name); + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateQueueRequest.class); + // Act + Mono queue = creator.createQueue(name); + // Assert + StepVerifier.create(queue) + .assertNext(queueUrl -> assertThat(queueUrl).isEqualTo(expectedUrl)) + .verifyComplete(); + verify(queueClient).createQueue(captor.capture()); + CreateQueueRequest request = captor.getValue(); + assertThat(request.queueName()).isEqualTo(name); + } + + @Test + public void shouldGetQueueUrl() { + // Arrange + String name = "queue-name"; + String expectedUrl = buildQueueUrl(name); + mockGetQueueUrl(name); + ArgumentCaptor captor = ArgumentCaptor.forClass(GetQueueUrlRequest.class); + // Act + Mono queue = creator.getQueueUrl(name); + // Assert + StepVerifier.create(queue) + .assertNext(queueUrl -> assertThat(queueUrl).isEqualTo(expectedUrl)) + .verifyComplete(); + verify(queueClient).getQueueUrl(captor.capture()); + GetQueueUrlRequest request = captor.getValue(); + assertThat(request.queueName()).isEqualTo(name); + } + + @Test + public void shouldGetQueueAttributes() { + // Arrange + String name = "queue-name"; + String queueUrl = buildQueueUrl(name); + String expectedArn = buildQueueArn(name); + mockGetQueueAttributes(name); + ArgumentCaptor captor = ArgumentCaptor.forClass(GetQueueAttributesRequest.class); + // Act + Mono queue = creator.getQueueArn(queueUrl); + // Assert + StepVerifier.create(queue) + .assertNext(queueArn -> assertThat(queueArn).isEqualTo(expectedArn)) + .verifyComplete(); + verify(queueClient).getQueueAttributes(captor.capture()); + GetQueueAttributesRequest request = captor.getValue(); + assertThat(request.queueUrl()).isEqualTo(queueUrl); + } + + @Test + public void shouldSetQueueAttributes() { + // Arrange + String sqsPrefix = "sqs-prefix"; + String queueName = "queue-name"; + String snsPrefix = "sns-prefix"; + String topicName = "topic-name"; + String expectedResult = "SetQueueAttributesResponse()"; + mockGetQueueUrl(queueName); + mockSetQueueAttributes(); + ArgumentCaptor captor = ArgumentCaptor.forClass(SetQueueAttributesRequest.class); + // Act + Mono queue = creator.setQueueAttributes(queueName, topicName, snsPrefix, sqsPrefix); + // Assert + StepVerifier.create(queue) + .assertNext(queueArn -> assertThat(queueArn).isEqualTo(expectedResult)) + .verifyComplete(); + verify(queueClient).setQueueAttributes(captor.capture()); + SetQueueAttributesRequest request = captor.getValue(); + assertThat(request.queueUrl()).isEqualTo(buildQueueUrl(queueName)); + assertThat(request.attributesAsStrings().get("Policy")).isEqualTo(expectedPolicy()); + } + + @Test + public void shouldBind() { + // Arrange + String queueName = "queue-name"; + String topicName = "topic-name"; + String expectedArn = buildSubscriptionArn(topicName, queueName); + mockGetQueueUrl(queueName); + mockGetQueueAttributes(queueName); + mockListTopics(topicName); + mockTopicSubscription(topicName, queueName); + ArgumentCaptor captor = ArgumentCaptor.forClass(SubscribeRequest.class); + // Act + Mono subscription = creator.bind(queueName, topicName); + // Assert + StepVerifier.create(subscription) + .assertNext(arn -> assertThat(arn).isEqualTo(expectedArn)) + .verifyComplete(); + verify(topicClient).subscribe(captor.capture()); + SubscribeRequest request = captor.getValue(); + assertThat(request.protocol()).isEqualTo("sqs"); + assertThat(request.endpoint()).isEqualTo(buildQueueArn(queueName)); + assertThat(request.topicArn()).isEqualTo(buildTopicArn(topicName)); + assertThat(request.returnSubscriptionArn()).isTrue(); + } + + private void mockListTopics(String name) { + ListTopicsResponse response = ListTopicsResponse.builder() + .topics(Topic.builder().topicArn(buildTopicArn(name)).build()) + .build(); + when(topicClient.listTopics(any(ListTopicsRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private void mockCreateTopic(String name) { + CreateTopicResponse response = CreateTopicResponse.builder().topicArn(buildTopicArn(name)).build(); + when(topicClient.createTopic(any(CreateTopicRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private void mockTopicSubscription(String name, String queueName) { + SubscribeResponse response = SubscribeResponse.builder() + .subscriptionArn(buildSubscriptionArn(name, queueName)) + .build(); + when(topicClient.subscribe(any(SubscribeRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private void mockCreateQueue(String name) { + CreateQueueResponse response = CreateQueueResponse.builder().queueUrl(buildQueueUrl(name)).build(); + when(queueClient.createQueue(any(CreateQueueRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private void mockGetQueueUrl(String name) { + GetQueueUrlResponse response = GetQueueUrlResponse.builder().queueUrl(buildQueueUrl(name)).build(); + when(queueClient.getQueueUrl(any(GetQueueUrlRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private void mockGetQueueAttributes(String name) { + Map attributes = new HashMap<>(); + attributes.put("QueueArn", buildQueueArn(name)); + GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() + .attributesWithStrings(attributes) + .build(); + when(queueClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private void mockSetQueueAttributes() { + SetQueueAttributesResponse response = SetQueueAttributesResponse.builder().build(); + when(queueClient.setQueueAttributes(any(SetQueueAttributesRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } + + private String buildTopicArn(String name) { + return "arn:aws:sns:us-east-1:123456789012:" + name; + } + + private String buildQueueArn(String name) { + return "arn:aws:sqs:us-east-1:123456789012::" + name; + } + + private String buildQueueUrl(String name) { + return "https://queue.amazonaws.com/123456789012/" + name; + } + + private String buildSubscriptionArn(String topicName, String id) { + return buildTopicArn(topicName) + ":" + id; + } + + private String expectedPolicy() { + return "{\n" + + " \"Version\": \"2012-10-17\",\n" + + " \"Id\": \"sqs-prefix:queue-name/SQSDefaultPolicy\",\n" + + " \"Statement\": [\n" + + " {\n" + + " \"Sid\": \"topic-subscription-sns-prefix:topic-name\",\n" + + " \"Effect\": \"Allow\",\n" + + " \"Principal\": {\n" + + " \"AWS\": \"*\"\n" + + " },\n" + + " \"Action\": \"SQS:SendMessage\",\n" + + " \"Resource\": \"sqs-prefix:queue-name\",\n" + + " \"Condition\": {\n" + + " \"ArnLike\": {\n" + + " \"aws:SourceArn\": \"sns-prefix:topic-name\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + } +} From ddca06e2e0a8cb72aa6d84a38ac9a63db4de02ec Mon Sep 17 00:00:00 2001 From: Alejandro Betancur Barrientos Date: Tue, 7 Jun 2022 15:54:54 -0500 Subject: [PATCH 2/3] SQS and SNS responsabilities isolated --- .../async/commons/HandlerResolver.java | 80 +++++++++++ .../async-sqs-starter.gradle | 78 ----------- .../impl/config/DirectAsyncGatewayConfig.java | 3 +- .../impl/config/MessageListenersConfig.java | 41 +++--- .../impl/config/props/BrokerConfigProps.java | 2 +- async/async-sqs/async-sqs.gradle | 42 ------ .../async/impl/DynamicRegistryImp.java | 25 +++- .../converters/JacksonMessageConverter.java | 125 ++++++++++++++++++ .../handlers/ApplicationCommandHandler.java | 4 +- .../handlers/ApplicationEventHandler.java | 6 +- .../impl/handlers/GenericMessageHandler.java | 3 +- .../async/impl/model/MessageSQS.java | 14 +- .../async/impl/DynamicRegistryImpTest.java | 8 +- .../async/impl/SNSDirectAsyncGatewayTest.java | 8 +- .../async/impl/SNSDomainEventBusTest.java | 8 +- .../ApplicationCommandHandlerTest.java | 32 +++-- .../handlers/ApplicationEventHandlerTest.java | 31 +++-- .../async/impl/model/SNSEventModelTest.java | 3 +- .../sns/communications/SQSSenderTest.java | 8 +- .../impl/sns/communications/SenderTest.java | 8 +- .../communications/TopologyCreatorTest.java | 9 +- 21 files changed, 330 insertions(+), 208 deletions(-) create mode 100644 async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java create mode 100644 async/async-sqs/src/main/java/org/reactivecommons/async/impl/converters/JacksonMessageConverter.java diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java new file mode 100644 index 00000000..0081cde0 --- /dev/null +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java @@ -0,0 +1,80 @@ +package org.reactivecommons.async.commons; + +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; +import org.reactivecommons.async.commons.utils.matcher.KeyMatcher; +import org.reactivecommons.async.commons.utils.matcher.Matcher; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; + +@Log +@RequiredArgsConstructor +public class HandlerResolver { + + private final Map> queryHandlers; + private final Map> eventListeners; + private final Map> eventsToBind; + private final Map> eventNotificationListeners; + private final Map> commandHandlers; + private final Matcher matcher = new KeyMatcher(); + + @SuppressWarnings("unchecked") + public RegisteredQueryHandler getQueryHandler(String path) { + return (RegisteredQueryHandler) queryHandlers + .computeIfAbsent(path, getMatchHandler(queryHandlers)); + } + + @SuppressWarnings("unchecked") + public RegisteredCommandHandler getCommandHandler(String path) { + return (RegisteredCommandHandler) commandHandlers + .computeIfAbsent(path, getMatchHandler(commandHandlers)); + } + + @SuppressWarnings("unchecked") + public RegisteredEventListener getEventListener(String path) { + if (eventListeners.containsKey(path)) { + return (RegisteredEventListener) eventListeners.get(path); + } + return (RegisteredEventListener) getMatchHandler(eventListeners).apply(path); + } + + + public Collection> getNotificationListeners() { + return eventNotificationListeners.values(); + } + + @SuppressWarnings("unchecked") + public RegisteredEventListener getNotificationListener(String path) { + return (RegisteredEventListener) eventNotificationListeners + .computeIfAbsent(path, getMatchHandler(eventNotificationListeners)); + } + + // Returns only the listenEvent not the handleDynamicEvents + public Collection> getEventListeners() { + return eventsToBind.values(); + } + + void addEventListener(RegisteredEventListener listener) { + eventListeners.put(listener.getPath(), listener); + } + + void addQueryHandler(RegisteredQueryHandler handler) { + if (handler.getPath().contains("*")) { + throw new RuntimeException("avoid * in dynamic handlers, make sure you have no conflicts with cached patterns"); + } + queryHandlers.put(handler.getPath(), handler); + } + + private Function getMatchHandler(Map handlers) { + return name -> { + String matched = matcher.match(handlers.keySet(), name); + return handlers.get(matched); + }; + } + +} diff --git a/async/async-sqs-starter/async-sqs-starter.gradle b/async/async-sqs-starter/async-sqs-starter.gradle index 55491d90..36fcdaae 100644 --- a/async/async-sqs-starter/async-sqs-starter.gradle +++ b/async/async-sqs-starter/async-sqs-starter.gradle @@ -1,81 +1,3 @@ -plugins { - id "com.jfrog.bintray" version "1.8.5" - id 'java-library' - id 'maven' - id 'maven-publish' -} - -test.onlyIf { false } - -def pomConfig = { - licenses { - license { - name "The Apache Software License, Version 2.0" - url "http://www.apache.org/licenses/LICENSE-2.0.txt" - distribution "repo" - } - } - developers { - developer { - id "andmagom" - name "Andrés Mauricio Gómez P" - email "andmagom@outlook.com" - } - developer { - id "alejobtc" - name "Alejandro Betancur Barrientos" - email "alejobtc@gmail.com" - } - } - - scm { - url "git@github.com:reactive-commons/reactive-commons-java.git" - } -} - -publishing { - publications { - MyPublication(MavenPublication) { - from components.java - artifact sourcesJar { - classifier "sources" - } - artifact javadocJar { - classifier "javadoc" - } - groupId 'org.reactivecommons' - artifactId 'async-sqs-starter' - version project.property('version') - pom.withXml { - def root = asNode() - root.appendNode('description', 'Async SQS Starter') - root.appendNode('name', 'async-sqs-starter') - root.appendNode('url', 'https://site_for_lib.tld') - root.children().last() + pomConfig - } - } - } -} - -bintray { - user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') - key = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') - publications = ['MyPublication'] - publish = true - pkg { - repo = 'maven-artifacts' - userOrg = 'reactive-commons' - name = 'reactive-commons' - licenses = ['Apache-2.0'] - vcsUrl = 'git@github.com:reactive-commons/reactive-commons-java.git' - version { - name = project.property('version') - desc = 'First version' - released = new Date() - vcsTag = project.property('version') - } - } -} dependencies { implementation platform('software.amazon.awssdk:bom:2.13.10') diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java index 33b3725a..b7e5e808 100644 --- a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java @@ -1,9 +1,10 @@ package org.reactivecommons.async.impl.config; import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivecommons.async.commons.reply.ReactiveReplyRouter; import org.reactivecommons.async.impl.SNSDirectAsyncGateway; import org.reactivecommons.async.impl.config.props.BrokerConfigProps; -import org.reactivecommons.async.impl.reply.ReactiveReplyRouter; import org.reactivecommons.async.impl.sns.communications.Sender; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java index 5f042d75..9c90044b 100644 --- a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java @@ -8,10 +8,11 @@ import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; -import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.impl.Handlers; import org.reactivecommons.async.impl.config.props.AsyncProps; -import org.reactivecommons.async.impl.converters.MessageConverter; -import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.impl.converters.JacksonMessageConverter; import org.reactivecommons.async.impl.handlers.ApplicationCommandHandler; import org.reactivecommons.async.impl.handlers.ApplicationEventHandler; import org.springframework.beans.factory.annotation.Value; @@ -37,53 +38,51 @@ public class MessageListenersConfig { @Bean //TODO: move to own config (QueryListenerConfig) public ApplicationEventHandler eventListener(HandlerResolver resolver, MessageConverter messageConverter) { - final ApplicationEventHandler appListener = new ApplicationEventHandler(resolver, messageConverter); - - return appListener; + return new ApplicationEventHandler(resolver, messageConverter); } @Bean public ApplicationCommandHandler applicationCommandListener(HandlerResolver resolver, MessageConverter messageConverter) { - ApplicationCommandHandler commandListener = new ApplicationCommandHandler(resolver, messageConverter); - return commandListener; + return new ApplicationCommandHandler(resolver, messageConverter); } @Bean - public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) { + public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) { final Map registries = context.getBeansOfType(HandlerRegistry.class); - final ConcurrentMap handlers = registries + final ConcurrentMap> handlers = registries .values().stream() .flatMap(r -> r.getHandlers().stream()) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); - final ConcurrentMap eventListeners = registries + final ConcurrentMap> eventListeners = registries + .values().stream() + .flatMap(r -> r.getEventListeners().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap> eventsToBind = registries .values().stream() .flatMap(r -> r.getEventListeners().stream()) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); - final ConcurrentMap commandHandlers = registries + final ConcurrentMap> commandHandlers = registries .values().stream() .flatMap(r -> r.getCommandHandlers().stream()) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); - final ConcurrentMap notificationHandlers = registries + final ConcurrentMap> notificationHandlers = registries .values().stream() .flatMap(r -> r.getEventListeners().stream()) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); - return new HandlerResolver(handlers, eventListeners, commandHandlers, notificationHandlers) { - @Override - @SuppressWarnings("unchecked") - public RegisteredCommandHandler getCommandHandler(String path) { - final RegisteredCommandHandler handler = super.getCommandHandler(path); - return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); - } - }; + + return new HandlerResolver(handlers, eventListeners, eventsToBind, notificationHandlers, commandHandlers); + } @Bean diff --git a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java index 7c86f539..8a61e203 100644 --- a/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java +++ b/async/async-sqs-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java @@ -2,7 +2,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.reactivecommons.async.impl.config.IBrokerConfigProps; +import org.reactivecommons.async.commons.config.IBrokerConfigProps; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.util.Base64Utils; diff --git a/async/async-sqs/async-sqs.gradle b/async/async-sqs/async-sqs.gradle index 18cdf72c..e07c0033 100644 --- a/async/async-sqs/async-sqs.gradle +++ b/async/async-sqs/async-sqs.gradle @@ -32,49 +32,7 @@ def pomConfig = { } } -publishing { - publications { - MyPublication(MavenPublication) { - from components.java - artifact sourcesJar { - classifier "sources" - } - artifact javadocJar { - classifier "javadoc" - } - groupId 'org.reactivecommons' - artifactId 'async-rabbit' - version project.property('version') - pom.withXml { - def root = asNode() - root.appendNode('description', 'Async Rabbit') - root.appendNode('name', 'async-rabbit') - root.appendNode('url', 'https://site_for_lib.tld') - root.children().last() + pomConfig - } - } - } -} -bintray { - user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') - key = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') - publications = ['MyPublication'] - publish = true - pkg { - repo = 'maven-artifacts' - userOrg = 'reactive-commons' - name = 'reactive-commons' - licenses = ['Apache-2.0'] - vcsUrl = 'git@github.com:reactive-commons/reactive-commons-java.git' - version { - name = project.property('version') - desc = 'First version' - released = new Date() - vcsTag = project.property('version') - } - } -} dependencies { implementation platform('software.amazon.awssdk:bom:2.13.10') diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java index f908baa1..5640c9ca 100644 --- a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java @@ -3,17 +3,40 @@ import lombok.RequiredArgsConstructor; import org.reactivecommons.async.api.DynamicRegistry; import org.reactivecommons.async.api.handlers.EventHandler; +import org.reactivecommons.async.api.handlers.QueryHandler; +import org.reactivecommons.async.api.handlers.QueryHandlerDelegate; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.commons.HandlerResolver; import reactor.core.publisher.Mono; @RequiredArgsConstructor public class DynamicRegistryImp implements DynamicRegistry { - private final HandlerResolver resolver; + private final Handlers resolver; @Override public Mono listenEvent(String eventName, EventHandler fn, Class eventClass) { resolver.addEventListener(new RegisteredEventListener<>(eventName, fn, eventClass)); return Mono.empty(); } + + @Override + public void serveQuery(String resource, QueryHandler handler, Class queryClass) { + + } + + @Override + public void serveQuery(String resource, QueryHandlerDelegate handler, Class queryClass) { + + } + + @Override + public Mono startListeningEvent(String eventName) { + return null; + } + + @Override + public Mono stopListeningEvent(String eventName) { + return null; + } } diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/converters/JacksonMessageConverter.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/converters/JacksonMessageConverter.java new file mode 100644 index 00000000..468fa995 --- /dev/null +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/converters/JacksonMessageConverter.java @@ -0,0 +1,125 @@ +package org.reactivecommons.async.impl.converters; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.exceptions.MessageConversionException; +import org.reactivecommons.async.impl.model.MessageSQS; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class JacksonMessageConverter implements MessageConverter { + private static final String CONTENT_TYPE = "application/json"; + + private final ObjectMapper objectMapper; + + + public JacksonMessageConverter(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { + try { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass); + return new AsyncQuery<>(asyncQueryJson.getResource(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public DomainEvent readDomainEvent(Message message, Class bodyClass) { + try { + final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class); + final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass); + return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public Command readCommand(Message message, Class bodyClass) { + try { + final CommandJson commandJson = readValue(message, CommandJson.class); + final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public T readValue(Message message, Class valueClass) { + try { + return objectMapper.readValue(message.getBody(), valueClass); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public Command readCommandStructure(Message message) { + final CommandJson commandJson = readValue(message, CommandJson.class); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public DomainEvent readDomainEventStructure(Message message) { + final DomainEventJson eventJson = readValue(message, DomainEventJson.class); + return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public AsyncQuery readAsyncQueryStructure(Message message) { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData()); + } + + @Override + public Message toMessage(Object object) { + byte[] bytes; + try { + String jsonString = this.objectMapper.writeValueAsString(object); + bytes = jsonString.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + MessageSQS.RabbitMessageProperties props = new MessageSQS.RabbitMessageProperties(); + props.setContentType(CONTENT_TYPE); + props.setContentEncoding(StandardCharsets.UTF_8.name()); + props.setContentLength(bytes.length); + return new MessageSQS(""); + } + + @Data + private static class AsyncQueryJson { + private String resource; + private JsonNode queryData; + } + + @Data + private static class DomainEventJson { + private String name; + private String eventId; + private JsonNode data; + } + + @Data + private static class CommandJson { + private String name; + private String commandId; + private JsonNode data; + } +} diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java index f2adc249..bd3e60a5 100644 --- a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandler.java @@ -6,8 +6,8 @@ import lombok.extern.log4j.Log4j2; import org.reactivecommons.api.domain.Command; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; -import org.reactivecommons.async.impl.HandlerResolver; -import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; import org.reactivecommons.async.impl.model.MessageSQS; import org.reactivecommons.async.impl.model.SNSEventModel; import reactor.core.publisher.Mono; diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java index 7b0f6fbc..b6da331b 100644 --- a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandler.java @@ -6,8 +6,10 @@ import lombok.extern.log4j.Log4j2; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; -import org.reactivecommons.async.impl.HandlerResolver; -import org.reactivecommons.async.impl.converters.MessageConverter; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.impl.Handlers; +import org.reactivecommons.async.impl.converters.*; import org.reactivecommons.async.impl.model.MessageSQS; import org.reactivecommons.async.impl.model.SNSEventModel; import reactor.core.publisher.Mono; diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java index ca92bd29..f7d0e565 100644 --- a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/handlers/GenericMessageHandler.java @@ -3,7 +3,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; import lombok.extern.log4j.Log4j2; -import org.reactivecommons.async.impl.HandlerResolver; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.impl.Handlers; import org.reactivecommons.async.impl.model.SNSEventModel; import reactor.core.publisher.Mono; diff --git a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java index 3ae7c27a..3b7963bd 100644 --- a/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java +++ b/async/async-sqs/src/main/java/org/reactivecommons/async/impl/model/MessageSQS.java @@ -1,12 +1,24 @@ package org.reactivecommons.async.impl.model; import lombok.Data; +import org.reactivecommons.async.commons.communications.Message; + +import java.util.HashMap; +import java.util.Map; @Data -public class MessageSQS implements org.reactivecommons.async.impl.communications.Message { +public class MessageSQS implements Message { private byte[] body; private Properties properties; public MessageSQS(String message) { body = message.getBytes(); } + + @Data + public static class RabbitMessageProperties implements Properties{ + private String contentType; + private String contentEncoding; + private long contentLength; + private Map headers = new HashMap<>(); + } } \ No newline at end of file diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java index 0e746dbf..eda3eecb 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java @@ -1,7 +1,6 @@ package org.reactivecommons.async.impl; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.reactivecommons.async.api.DynamicRegistry; @@ -10,14 +9,15 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.logging.Handler; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -@RunWith(MockitoJUnitRunner.class) public class DynamicRegistryImpTest { @Mock - private HandlerResolver resolver; + private Handlers resolver; @Mock private EventHandler eventHandler; diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java index f03f6869..418e788e 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDirectAsyncGatewayTest.java @@ -1,8 +1,7 @@ package org.reactivecommons.async.impl; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.reactivecommons.api.domain.Command; @@ -15,7 +14,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; -@RunWith(MockitoJUnitRunner.class) public class SNSDirectAsyncGatewayTest { @Mock private Sender sender; @@ -23,7 +21,7 @@ public class SNSDirectAsyncGatewayTest { private String topicTarget; private String targetAppName; - @Before + @BeforeEach public void setup() { topicTarget = "topicTarget"; targetAppName = "targetAppName"; diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java index 42bbf2ca..52cd9840 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/SNSDomainEventBusTest.java @@ -1,8 +1,7 @@ package org.reactivecommons.async.impl; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.reactivecommons.api.domain.DomainEvent; @@ -16,14 +15,13 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; -@RunWith(MockitoJUnitRunner.class) public class SNSDomainEventBusTest { @Mock private Sender sender; private DomainEventBus domainEventBus; private String topicName; - @Before + @BeforeEach public void setup() { topicName = "topicName"; domainEventBus = new SNSDomainEventBus(sender, topicName); diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java index d793bfc2..b1d80e43 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationCommandHandlerTest.java @@ -2,18 +2,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; import org.reactivecommons.async.api.handlers.CommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; -import org.reactivecommons.async.impl.HandlerResolver; -import org.reactivecommons.async.impl.converters.MessageConverter; -import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.impl.converters.JacksonMessageConverter; import org.reactivecommons.async.impl.model.SNSEventModel; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -24,7 +23,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) public class ApplicationCommandHandlerTest { @Mock private RegisteredCommandHandler commandHandler; @@ -34,13 +32,19 @@ public class ApplicationCommandHandlerTest { private HandlerResolver resolver; private MessageConverter messageConverter; - @Before + @BeforeAll public void setup() { - Map commandHandlers = new ConcurrentHashMap<>(); - Map eventListeners = new ConcurrentHashMap<>(); - Map queryHandlers = new ConcurrentHashMap<>(); - commandHandlers.put("my.command", commandHandler); - resolver = new HandlerResolver(queryHandlers, eventListeners, commandHandlers); + Map> commandHandlers = new ConcurrentHashMap<>(); + Map> eventListeners = new ConcurrentHashMap<>(); + eventListeners.put("event.name", new RegisteredEventListener<>("event.name", message -> Mono.empty(), String.class)); + eventListeners.put("event.name2", new RegisteredEventListener<>("event.name2", message -> Mono.empty(), String.class)); + eventListeners.put("some.*", new RegisteredEventListener<>("some.*", message -> Mono.empty(), String.class)); + Map> eventsToBind = new ConcurrentHashMap<>(); + eventsToBind.put("event.name", new RegisteredEventListener<>("event.name", message -> Mono.empty(), String.class)); + eventsToBind.put("event.name2", new RegisteredEventListener<>("event.name2", message -> Mono.empty(), String.class)); + Map> notificationEventListeners = new ConcurrentHashMap<>(); + Map> queryHandlers = new ConcurrentHashMap<>(); + resolver = new HandlerResolver(queryHandlers, eventListeners, eventsToBind, notificationEventListeners, commandHandlers); messageConverter = new JacksonMessageConverter(new ObjectMapper()); } diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java index 6f646d59..32720943 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/handlers/ApplicationEventHandlerTest.java @@ -2,18 +2,16 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; import org.reactivecommons.async.api.handlers.EventHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; -import org.reactivecommons.async.impl.HandlerResolver; -import org.reactivecommons.async.impl.converters.MessageConverter; -import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.impl.converters.JacksonMessageConverter; import org.reactivecommons.async.impl.model.SNSEventModel; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -24,7 +22,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) public class ApplicationEventHandlerTest { @Mock private RegisteredEventListener eventListener; @@ -34,13 +31,19 @@ public class ApplicationEventHandlerTest { private HandlerResolver resolver; private MessageConverter messageConverter; - @Before + @BeforeEach public void setup() { - Map commandHandlers = new ConcurrentHashMap<>(); - Map eventListeners = new ConcurrentHashMap<>(); - Map queryHandlers = new ConcurrentHashMap<>(); - eventListeners.put("my.event", eventListener); - resolver = new HandlerResolver(queryHandlers, eventListeners, commandHandlers); + Map> commandHandlers = new ConcurrentHashMap<>(); + Map> eventListeners = new ConcurrentHashMap<>(); + eventListeners.put("event.name", new RegisteredEventListener<>("event.name", message -> Mono.empty(), String.class)); + eventListeners.put("event.name2", new RegisteredEventListener<>("event.name2", message -> Mono.empty(), String.class)); + eventListeners.put("some.*", new RegisteredEventListener<>("some.*", message -> Mono.empty(), String.class)); + Map> eventsToBind = new ConcurrentHashMap<>(); + eventsToBind.put("event.name", new RegisteredEventListener<>("event.name", message -> Mono.empty(), String.class)); + eventsToBind.put("event.name2", new RegisteredEventListener<>("event.name2", message -> Mono.empty(), String.class)); + Map> notificationEventListeners = new ConcurrentHashMap<>(); + Map> queryHandlers = new ConcurrentHashMap<>(); + resolver = new HandlerResolver(queryHandlers, eventListeners, eventsToBind, notificationEventListeners, commandHandlers); messageConverter = new JacksonMessageConverter(new ObjectMapper()); } diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java index dda9f70c..108c1c5f 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/model/SNSEventModelTest.java @@ -1,6 +1,7 @@ package org.reactivecommons.async.impl.model; -import org.junit.Test; + +import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java index dde90d40..b5770fa0 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SQSSenderTest.java @@ -1,8 +1,7 @@ package org.reactivecommons.async.impl.sns.communications; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -19,7 +18,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) public class SQSSenderTest { private final String message = "my string message"; private final int delaySeconds = 0; @@ -29,7 +27,7 @@ public class SQSSenderTest { private SqsAsyncClient client; private SQSSender sender; - @Before + @BeforeEach public void setup() { sender = new SQSSender(client); } diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java index 0a09c5a5..edaa252b 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/SenderTest.java @@ -1,9 +1,8 @@ package org.reactivecommons.async.impl.sns.communications; import com.fasterxml.jackson.core.JsonProcessingException; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -23,7 +22,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) public class SenderTest { private final String sourceApp = "myAppName"; @@ -32,7 +30,7 @@ public class SenderTest { private SnsAsyncClient client; private Sender sender; - @Before + @BeforeEach public void setup() { sender = new Sender(client, sourceApp, arnPrefix); } diff --git a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java index 9db8fa7c..4778a14d 100644 --- a/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java +++ b/async/async-sqs/src/test/java/org/reactivecommons/async/impl/sns/communications/TopologyCreatorTest.java @@ -1,8 +1,8 @@ package org.reactivecommons.async.impl.sns.communications; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -22,7 +22,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; -@RunWith(MockitoJUnitRunner.class) public class TopologyCreatorTest { @Mock private SnsAsyncClient topicClient; @@ -31,7 +30,7 @@ public class TopologyCreatorTest { private TopologyCreator creator; - @Before + @BeforeEach public void setup() { creator = new TopologyCreator(topicClient, queueClient); } From bd1f11df3189996695ee8041093157f6af9fe991 Mon Sep 17 00:00:00 2001 From: Alejandro Betancur Barrientos Date: Tue, 7 Jun 2022 16:03:25 -0500 Subject: [PATCH 3/3] Maven and Bintray Publishing reverted --- .../async-sqs-starter.gradle | 78 +++++++++++++++++++ async/async-sqs/async-sqs.gradle | 73 ++++++++++------- 2 files changed, 125 insertions(+), 26 deletions(-) diff --git a/async/async-sqs-starter/async-sqs-starter.gradle b/async/async-sqs-starter/async-sqs-starter.gradle index 36fcdaae..55491d90 100644 --- a/async/async-sqs-starter/async-sqs-starter.gradle +++ b/async/async-sqs-starter/async-sqs-starter.gradle @@ -1,3 +1,81 @@ +plugins { + id "com.jfrog.bintray" version "1.8.5" + id 'java-library' + id 'maven' + id 'maven-publish' +} + +test.onlyIf { false } + +def pomConfig = { + licenses { + license { + name "The Apache Software License, Version 2.0" + url "http://www.apache.org/licenses/LICENSE-2.0.txt" + distribution "repo" + } + } + developers { + developer { + id "andmagom" + name "Andrés Mauricio Gómez P" + email "andmagom@outlook.com" + } + developer { + id "alejobtc" + name "Alejandro Betancur Barrientos" + email "alejobtc@gmail.com" + } + } + + scm { + url "git@github.com:reactive-commons/reactive-commons-java.git" + } +} + +publishing { + publications { + MyPublication(MavenPublication) { + from components.java + artifact sourcesJar { + classifier "sources" + } + artifact javadocJar { + classifier "javadoc" + } + groupId 'org.reactivecommons' + artifactId 'async-sqs-starter' + version project.property('version') + pom.withXml { + def root = asNode() + root.appendNode('description', 'Async SQS Starter') + root.appendNode('name', 'async-sqs-starter') + root.appendNode('url', 'https://site_for_lib.tld') + root.children().last() + pomConfig + } + } + } +} + +bintray { + user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') + key = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') + publications = ['MyPublication'] + publish = true + pkg { + repo = 'maven-artifacts' + userOrg = 'reactive-commons' + name = 'reactive-commons' + licenses = ['Apache-2.0'] + vcsUrl = 'git@github.com:reactive-commons/reactive-commons-java.git' + version { + name = project.property('version') + desc = 'First version' + released = new Date() + vcsTag = project.property('version') + } + } +} dependencies { implementation platform('software.amazon.awssdk:bom:2.13.10') diff --git a/async/async-sqs/async-sqs.gradle b/async/async-sqs/async-sqs.gradle index e07c0033..d4533c7d 100644 --- a/async/async-sqs/async-sqs.gradle +++ b/async/async-sqs/async-sqs.gradle @@ -16,14 +16,9 @@ def pomConfig = { } developers { developer { - id "andmagom" - name "Andrés Mauricio Gómez P" - email "andmagom@outlook.com" - } - developer { - id "alejobtc" - name "Alejandro Betancur Barrientos" - email "alejobtc@gmail.com" + id "danielbustamante" + name "Daniel Bustamante Ospina" + email "daniel.bustamante@sofka.com.co" } } @@ -32,26 +27,52 @@ def pomConfig = { } } +publishing { + publications { + MyPublication(MavenPublication) { + from components.java + artifact sourcesJar { + classifier "sources" + } + artifact javadocJar { + classifier "javadoc" + } + groupId 'org.reactivecommons' + artifactId 'async-commons-api' + version project.property('version') + pom.withXml { + def root = asNode() + root.appendNode('description', 'Async Commons API') + root.appendNode('name', 'async-commons-api') + root.appendNode('url', 'http://reactivecommons.org') + root.children().last() + pomConfig + } + } + } +} +bintray { + user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') + key = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') + publications = ['MyPublication'] + publish = true + pkg { + repo = 'maven-artifacts' + userOrg = 'reactive-commons' + name = 'reactive-commons' + licenses = ['Apache-2.0'] + vcsUrl = 'git@github.com:reactive-commons/reactive-commons-java.git' + version { + name = project.property('version') + desc = 'First version' + released = new Date() + vcsTag = project.property('version') + } + } +} dependencies { - implementation platform('software.amazon.awssdk:bom:2.13.10') - compile project(":async-commons-api") - compile project(":domain-events-api") - compile project(":async-commons") - - api 'io.projectreactor:reactor-core' - api 'com.fasterxml.jackson.core:jackson-databind' + api project(":domain-events-api") + compileOnly 'io.projectreactor:reactor-core' testImplementation 'io.projectreactor:reactor-test' - implementation 'software.amazon.awssdk:sns' - implementation 'software.amazon.awssdk:sqs' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.11.1' - compile group: 'joda-time', name: 'joda-time', version: '2.10.6' - compile("org.springframework.boot:spring-boot-starter-log4j2") -} - -configurations { - all { - exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' - } }