diff --git a/.travis.yml b/.travis.yml
index 7047779b..76da1a34 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,9 +1,6 @@
-language: java
+dist: trusty
-addons:
- apt:
- packages:
- - openjdk-6-jdk
+language: java
install:
- echo "Downloading Maven 3.0";
@@ -17,10 +14,11 @@ install:
jdk:
- oraclejdk8
- oraclejdk9
-- openjdk7
-- openjdk6
+- oraclejdk11
+- openjdk8
+
-script: mvn install
+script: mvn clean install
env:
global:
secure: hmPdcALAi6qE3TqJDRqdVCqZftd/i2hWLCyZbIAcRzu38nO94JZYKSZjfif1FvXTJYotFW25JClXNyvOMwMjjK3OPQINfYFZIp6LLeOmXGbUcktwQ8TIoKZ7IOvvWiZK054H7zNKapz+ke3OPN/5WTmMBezV0Ct4+bSf9udKVnQSMG2sJ8YJ/SeZkh7RTlqO+zTkh+yq8Hk0BdaWEOK8RtEoWgcUFGVfkycvjgvna+TbDp3K7vjmhYBBqACsNKxXPgIumStbCGW4vwjoVkCOGIJKWnuQEVHxiqBUH3pp81bxnt+RIcMuZMR2HnDSpHyAIulTJNHVo3VFAAiy9HMdP8Wfy/OVdjBSZ8xIOoQvFijo+yGNNn8v4hILcX4IpumQeyjpG134BOWVbMLhKH7qWR3Z8TGgijSd4lYYjabCJ564E93KvqK1u2CuS9u89N8J7AKFYMbknH1DP8E5tCD+VI3Gwut9YNofywj3Jln8uCOP4I//8p61j9A9QF7ORpY59Ru4RNzxYrFn2QSTltMfaBfVZchh5AqURUamcJd+1orZfz/v+6yH9FOW+MAG8EJdzHDsqzP1NXrt+4VtF6yqOnhBxnKVNEwFwjsinW9PFi9dXyzdEd33jKGL7UO8Old5XlBoA7idWIDH4GKKSlBRZhEKWMe4ZfxpQVg3VPz2Qqo=
diff --git a/openmessaging-admin/pom.xml b/openmessaging-admin/pom.xml
index debe8f13..9b172e46 100644
--- a/openmessaging-admin/pom.xml
+++ b/openmessaging-admin/pom.xml
@@ -2,7 +2,7 @@
io.openmessagingparent
- 1.0.0-beta-SNAPSHOT
+ 2.0.2-pubsub-SNAPSHOT4.0.0
diff --git a/openmessaging-api-samples/pom.xml b/openmessaging-api-samples/pom.xml
index d35c519d..f62c70f1 100644
--- a/openmessaging-api-samples/pom.xml
+++ b/openmessaging-api-samples/pom.xml
@@ -2,13 +2,13 @@
io.openmessagingparent
- 1.0.0-beta-SNAPSHOT
+ 2.0.2-pubsub-SNAPSHOT4.0.0jaropenmessaging-api-samples
- 1.0.0-beta-SNAPSHOT
+ 2.0.2-pubsub-SNAPSHOTopenmessaging-api-samples ${project.version}
@@ -21,7 +21,7 @@
${project.groupId}openmessaging-api
- 1.0.0-beta-SNAPSHOT
+ 2.0.2-pubsub-SNAPSHOTorg.slf4j
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
index 7f9a2e85..c16d83bf 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
@@ -17,37 +17,46 @@
package io.openmessaging.samples.consumer;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.OMS;
-import io.openmessaging.consumer.PullConsumer;
-import io.openmessaging.message.Message;
-import java.util.Arrays;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessagingAccessPoint;
+import io.openmessaging.api.OMS;
+import io.openmessaging.api.PullConsumer;
+import io.openmessaging.api.TopicPartition;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
public class PullConsumerApp {
public static void main(String[] args) {
//Load and start the vendor implementation from a specific OMS driver URL.
final MessagingAccessPoint messagingAccessPoint =
- OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
+ OMS.builder()
+ .endpoint("http://mq-instance-xxx-1234567890-test:8080")
+ .region("Shenzhen")
+ .driver("rocketmq")
+ .build();
+ Properties properties = new Properties();
//Start a PullConsumer to receive messages from the specific queue.
- final PullConsumer consumer = messagingAccessPoint.createPullConsumer();
+ final PullConsumer consumer = messagingAccessPoint.createPullConsumer(properties);
//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- consumer.stop();
+ consumer.shutdown();
}
}));
- consumer.bindQueue(Arrays.asList("NS://HELLO_QUEUE"));
+ Set topicPartitions = consumer.topicPartitions("NS://TOPIC");
+ consumer.assign(topicPartitions);
consumer.start();
- Message message = consumer.receive(1000);
+ List message = consumer.poll(1000);
System.out.println("Received message: " + message);
//Acknowledge the consumed message
- consumer.ack(message.getMessageReceipt());
- consumer.stop();
+ consumer.commitSync();
+ consumer.shutdown();
}
}
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
index 47ea14eb..2b48fdaa 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
@@ -17,51 +17,50 @@
package io.openmessaging.samples.consumer;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.OMS;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.consumer.MessageListener;
-import io.openmessaging.consumer.PushConsumer;
-import io.openmessaging.manager.ResourceManager;
-import io.openmessaging.message.Message;
-import java.util.Arrays;
+import io.openmessaging.api.Action;
+import io.openmessaging.api.ConsumeContext;
+import io.openmessaging.api.Consumer;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessageListener;
+import io.openmessaging.api.MessagingAccessPoint;
+import io.openmessaging.api.OMS;
+import java.util.Properties;
public class PushConsumerApp {
public static void main(String[] args) {
//Load and start the vendor implementation from a specific OMS driver URL.
final MessagingAccessPoint messagingAccessPoint =
- OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");
+ OMS.builder()
+ .region("Shenzhen")
+ .endpoint("127.0.0.1:9876")
+ .driver("rocketmq")
+ .withCredentials(new Properties())
+ .build();
- //Fetch a ResourceManager to create Queue resource.
- ResourceManager resourceManager = messagingAccessPoint.resourceManager();
- resourceManager.createNamespace("NS://XXXX");
- final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
+ Properties properties = new Properties();
+ final Consumer consumer = messagingAccessPoint.createConsumer(properties);
consumer.start();
//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- consumer.stop();
+ consumer.shutdown();
}
}));
//Consume messages from a simple queue.
- String simpleQueue = "NS://HELLO_QUEUE";
- resourceManager.createQueue(simpleQueue);
- //This queue doesn't has a source queue, so only the message delivered to the queue directly can
- //be consumed by this consumer.
- consumer.bindQueue(Arrays.asList(simpleQueue), new MessageListener() {
+ String topic = "NS://HELLO_TOPIC";
+
+ consumer.subscribe(topic, "*", new MessageListener(){
@Override
- public void onReceived(Message message, Context context) {
- System.out.println("Received one message: " + message);
- context.ack();
- }
+ public Action consume(Message message, ConsumeContext context) {
+ return Action.CommitMessage;
+ }
});
- consumer.unbindQueue(Arrays.asList(simpleQueue));
- consumer.stop();
+ consumer.shutdown();
}
}
\ No newline at end of file
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
index 9084b8a8..117e0f80 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
@@ -17,71 +17,57 @@
package io.openmessaging.samples.producer;
-import io.openmessaging.Future;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.OMS;
-import io.openmessaging.interceptor.Context;
-import io.openmessaging.interceptor.ProducerInterceptor;
-import io.openmessaging.message.Message;
-import io.openmessaging.producer.Producer;
-import io.openmessaging.producer.SendResult;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessagingAccessPoint;
+import io.openmessaging.api.OMS;
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.Producer;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+import java.util.Properties;
public class ProducerApp {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
- OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
+ OMS.builder()
+ .region("shanghai,shenzhen")
+ .endpoint("127.0.0.1:9876")
+ .driver("rocketmq")
+ .withCredentials(new Properties())
+ .build();
- final Producer producer = messagingAccessPoint.createProducer();
- ProducerInterceptor interceptor = new ProducerInterceptor() {
- @Override
- public void preSend(Message message, Context attributes) {
- System.out.println("PreSend message: " + message);
- }
-
- @Override
- public void postSend(Message message, Context attributes) {
- System.out.println("PostSend message: " + message);
- }
- };
- producer.addInterceptor(interceptor);
+ final Producer producer = messagingAccessPoint.createProducer(new Properties());
producer.start();
//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- producer.stop();
+ producer.shutdown();
}
}));
- //Send a message to the specified destination synchronously.
- Message message = producer.createMessage(
- "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
- message.header().setBornHost("127.0.0.1").setDurability((short) 0);
- message.extensionHeader().setPartition(1);
+ Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());
+
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);
//Sends a message to the specified destination async.
- Future sendResultFuture = producer.sendAsync(message);
- sendResult = sendResultFuture.get(1000);
- System.out.println("SendResult: " + sendResult);
+ producer.sendAsync(message, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.println("SendResult: " + sendResult);
+ }
+
+ @Override
+ public void onException(OnExceptionContext context) {
+
+ }
+ });
//Sends a message to the specified destination in one way mode.
producer.sendOneway(message);
- //Sends messages to the specified destination in batch mode.
- List messages = new ArrayList(10);
- for (int i = 0; i < 10; i++) {
- Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
- messages.add(msg);
- }
-
- producer.send(messages);
- producer.removeInterceptor(interceptor);
- producer.stop();
+ producer.shutdown();
}
}
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
index bed57642..a8abbb3a 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
@@ -17,22 +17,30 @@
package io.openmessaging.samples.producer;
-import io.openmessaging.message.Message;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.OMS;
-import io.openmessaging.producer.Producer;
-import io.openmessaging.producer.TransactionStateCheckListener;
-import io.openmessaging.producer.TransactionalResult;
-import java.nio.charset.Charset;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessagingAccessPoint;
+import io.openmessaging.api.OMS;
+import io.openmessaging.api.SendResult;
+import io.openmessaging.api.transaction.LocalTransactionChecker;
+import io.openmessaging.api.transaction.LocalTransactionExecuter;
+import io.openmessaging.api.transaction.TransactionProducer;
+import io.openmessaging.api.transaction.TransactionStatus;
+import java.util.Properties;
public class TransactionProducerApp {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
- OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
-
- final Producer producer = messagingAccessPoint.createProducer(new TransactionStateCheckListener() {
- @Override public void check(Message message, TransactionalContext context) {
+ OMS.builder()
+ .region("Shenzhen")
+ .endpoint("127.0.0.1:9876")
+ .driver("rocketmq")
+ .withCredentials(new Properties())
+ .build();
+ final TransactionProducer producer = messagingAccessPoint.createTransactionProducer(new Properties(), new LocalTransactionChecker() {
+ @Override
+ public TransactionStatus check(Message msg) {
+ return TransactionStatus.CommitTransaction;
}
});
producer.start();
@@ -41,23 +49,19 @@ public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- producer.stop();
+ producer.shutdown();
}
}));
- Message message = producer.createMessage(
- "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());
//Sends a transaction message to the specified destination synchronously.
- TransactionalResult result = producer.prepare(message);
- executeLocalTransaction(result);
- result.commit();
- producer.stop();
- System.out.println("Send transaction message OK, message id is: " + result.messageId());
+ SendResult result = producer.send(message, new LocalTransactionExecuter() {
+ @Override public TransactionStatus execute(Message message, Object arg) {
+ return TransactionStatus.CommitTransaction;
+ }
+ }, null);
+ System.out.println("Send transaction message OK, message id is: " + result.getMessageId());
}
- private static void executeLocalTransaction(TransactionalResult result) {
- System.out.println("transactionId: " + result.transactionId());
- System.out.println("execute local transaction");
- }
}
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java
deleted file mode 100644
index 298655cc..00000000
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.openmessaging.samples.routing;
-
-import io.openmessaging.consumer.PushConsumer;
-import io.openmessaging.message.Message;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.OMS;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.consumer.MessageListener;
-import io.openmessaging.manager.ResourceManager;
-import io.openmessaging.producer.Producer;
-import java.util.Arrays;
-
-public class RoutingApp {
- public static void main(String[] args) {
- //Load and start the vendor implementation from a specific OMS driver URL.
- final MessagingAccessPoint messagingAccessPoint =
- OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
-
- String destinationQueue = "NS://DESTINATION_QUEUE";
- String sourceQueue = "NS://SOURCE_QUEUE";
- //Fetch a ResourceManager to create source Queue, destination Queue, and the Routing instance.
- ResourceManager resourceManager = messagingAccessPoint.resourceManager();
-
- //Create the destination queue.
- resourceManager.createQueue(destinationQueue);
- //Create the source queue.
- resourceManager.createQueue(sourceQueue);
-
- resourceManager.routing(sourceQueue, destinationQueue);
- resourceManager.filter(destinationQueue, "name = 'kaka'");
-
- //Send messages to the source queue ahead of the routing
- final Producer producer = messagingAccessPoint.createProducer();
- producer.start();
-
- Message message = producer.createMessage(sourceQueue, "RED_COLOR".getBytes());
- message.properties().put("color", "green").put("shape", "round");
-
- producer.send(message);
-
- //Consume messages from the queue behind the routing.
- final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
- consumer.start();
-
- consumer.bindQueue(Arrays.asList(destinationQueue), new MessageListener() {
- @Override
- public void onReceived(Message message, Context context) {
- //The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule
- //In this case, the push consumer will only receive the message with red color.
- System.out.println("Received a red message: " + message);
- context.ack();
- }
-
- });
-
- //Register a shutdown hook to close the opened endpoints.
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- producer.stop();
- consumer.stop();
- }
- }));
-
- }
-}
diff --git a/openmessaging-api/README.md b/openmessaging-api/README.md
deleted file mode 100644
index 999ed6ac..00000000
--- a/openmessaging-api/README.md
+++ /dev/null
@@ -1,14 +0,0 @@
-## Server Side
-* Namespace
-* Queue
- * Stream
-* Routing
- * Expression
-
-## Client Side
-* Producer
-* PushConsumer
-* PullConsumer
-* StreamingConsumer
-* Message
- * BytesMessage
diff --git a/openmessaging-api/pom.xml b/openmessaging-api/pom.xml
index 9e493ee9..bd53ce6c 100644
--- a/openmessaging-api/pom.xml
+++ b/openmessaging-api/pom.xml
@@ -2,7 +2,7 @@
io.openmessagingparent
- 1.0.0-beta-SNAPSHOT
+ 2.0.2-pubsub-SNAPSHOT4.0.0
diff --git a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
deleted file mode 100644
index 9c617f2f..00000000
--- a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.openmessaging;
-
-import java.util.Set;
-
-/**
- * The {@code KeyValue} class represents a persistent set of attributes, which supports method chaining.
- *
- * A {@code KeyValue} object only allows {@code String} keys and can contain four primitive type as values: {@code int},
- * {@code long}, {@code double}, {@code String}.
- *
- * The {@code KeyValue} is a replacement of {@code Properties}, with simpler interfaces and reasonable entry limits.
- *
- * A {@code KeyValue} object may be used in concurrent scenarios, so the implementation of {@code KeyValue} should
- * consider concurrent related issues.
- *
- * @version OMS 1.0.0
- * @since OMS 1.0.0
- */
-public interface KeyValue {
-
- /**
- * Inserts or replaces {@code boolean} value for the specified key.
- *
- * @param key the key to be placed into this {@code KeyValue} object
- * @param value the value corresponding to key
- */
- KeyValue put(String key, boolean value);
-
- /**
- * Inserts or replaces {@code short} value for the specified key.
- *
- * @param key the key to be placed into this {@code KeyValue} object
- * @param value the value corresponding to key
- */
- KeyValue put(String key, short value);
-
- /**
- * Inserts or replaces {@code int} value for the specified key.
- *
- * @param key the key to be placed into this {@code KeyValue} object
- * @param value the value corresponding to key
- */
- KeyValue put(String key, int value);
-
- /**
- * Inserts or replaces {@code long} value for the specified key.
- *
- * @param key the key to be placed into this {@code KeyValue} object
- * @param value the value corresponding to key
- */
- KeyValue put(String key, long value);
-
- /**
- * Inserts or replaces {@code double} value for the specified key.
- *
- * @param key the key to be placed into this {@code KeyValue} object
- * @param value the value corresponding to key
- */
- KeyValue put(String key, double value);
-
- /**
- * Inserts or replaces {@code String} value for the specified key.
- *
- * @param key the key to be placed into this {@code KeyValue} object
- * @param value the value corresponding to key
- */
- KeyValue put(String key, String value);
-
- /**
- * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, false is returned.
- *
- * @param key the property key
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, boolean)
- */
- boolean getBoolean(String key);
-
- /**
- * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, false is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, boolean)
- */
- boolean getBoolean(String key, boolean defaultValue);
-
- /**
- * Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, zero is returned.
- *
- * @param key the property key
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, short)
- */
- short getShort(String key);
-
- /**
- * Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, zero is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, short)
- */
- short getShort(String key, short defaultValue);
-
- /**
- * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, zero is returned.
- *
- * @param key the property key
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, int)
- */
- int getInt(String key);
-
- /**
- * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, int)
- */
- int getInt(String key, int defaultValue);
-
- /**
- * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, zero is returned.
- *
- * @param key the property key
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, long)
- */
- long getLong(String key);
-
- /**
- * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, long)
- */
- long getLong(String key, long defaultValue);
-
- /**
- * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, zero is returned.
- *
- * @param key the property key
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, double)
- */
- double getDouble(String key);
-
- /**
- * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, double)
- */
- double getDouble(String key, double defaultValue);
-
- /**
- * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, {@code null} is returned.
- *
- * @param key the property key
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, String)
- */
- String getString(String key);
-
- /**
- * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, String)
- */
- String getString(String key, String defaultValue);
-
- /**
- * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.
- *
- * The set is backed by the {@code KeyValue}, so changes to the set are reflected in the @code KeyValue}, and
- * vice-versa.
- *
- * @return the key set view of this {@code KeyValue} object.
- */
- Set keySet();
-
- /**
- * Tests if the specified {@code String} is a key in this {@code KeyValue}.
- *
- * @param key possible key
- * @return true if and only if the specified key is in this {@code KeyValue}, false
- * otherwise.
- */
- boolean containsKey(String key);
-}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/OMS.java b/openmessaging-api/src/main/java/io/openmessaging/OMS.java
deleted file mode 100644
index 4f3d9342..00000000
--- a/openmessaging-api/src/main/java/io/openmessaging/OMS.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.openmessaging;
-
-import io.openmessaging.exception.OMSRuntimeException;
-import io.openmessaging.internal.DefaultKeyValue;
-import io.openmessaging.internal.MessagingAccessPointAdapter;
-import io.openmessaging.manager.ResourceManager;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-/**
- * The oms class provides some static methods to create a {@code MessagingAccessPoint} from the specified OMS driver url
- * and some useful util methods.
- *
- * The first part of the URL specifies which OMS implementation is to be used, rocketmq is a optional driver type.
- *
- * The brackets indicate that the extra access points are optional, and a correct OMS driver url needs at least one
- * access point, which consists of hostname and port, like localhost:8081.
- *
- * @version OMS 1.0.0
- * @see ResourceManager
- * @since OMS 1.0.0
- */
-public final class OMS {
- /**
- * Returns a {@code MessagingAccessPoint} instance from the specified OMS driver url.
- *
- * @param url the specified OMS driver url
- * @return a {@code MessagingAccessPoint} instance
- * @throws OMSRuntimeException if the factory fails to create a {@code MessagingAccessPoint} due to some driver url
- * some syntax error or internal error.
- */
- public static MessagingAccessPoint getMessagingAccessPoint(String url) {
- return getMessagingAccessPoint(url, OMS.newKeyValue());
- }
-
- /**
- * Returns a {@code MessagingAccessPoint} instance from the specified OMS driver url with some preset attributes,
- * which will be passed to MessagingAccessPoint's implementation class as a unique constructor parameter.
- *
- * There are some standard attributes defined by OMS for this method, the same as {@link
- * MessagingAccessPoint#attributes()} ()}
- *
- * @param url the specified OMS driver url
- * @return a {@code MessagingAccessPoint} instance
- * @throws OMSRuntimeException if the factory fails to create a {@code MessagingAccessPoint} due to some driver url
- * some syntax error or internal error.
- */
- public static MessagingAccessPoint getMessagingAccessPoint(String url, KeyValue attributes) {
- return MessagingAccessPointAdapter.getMessagingAccessPoint(url, attributes);
- }
-
- /**
- * Returns a default and internal {@code KeyValue} implementation instance.
- *
- * @return a {@code KeyValue} instance
- */
- public static KeyValue newKeyValue() {
- return new DefaultKeyValue();
- }
-
- /**
- * The version format is X.Y.Z (Major.Minor.Patch), a pre-release version may be denoted by appending a hyphen and a
- * series of dot-separated identifiers immediately following the patch version, like X.Y.Z-alpha.
- *
- *
- * OMS version follows semver scheme partially.
- *
- * @see http://semver.org
- */
- public static String specVersion = "UnKnown";
-
- static {
- InputStream stream = OMS.class.getClassLoader().getResourceAsStream("oms.spec.properties");
- try {
- if (stream != null) {
- Properties properties = new Properties();
- properties.load(stream);
- specVersion = String.valueOf(properties.get("version"));
- }
- } catch (IOException ignore) {
- }
- }
-
- private OMS() {
- }
-}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/Promise.java b/openmessaging-api/src/main/java/io/openmessaging/Promise.java
deleted file mode 100644
index 58fbe767..00000000
--- a/openmessaging-api/src/main/java/io/openmessaging/Promise.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.openmessaging;
-
-/**
- * Special {@link Future} which is writable.
- *
- * A {@code Promise} can be completed or canceled, cancellation is performed by the {@code cancel} method.
- * Once a computation has completed, the computation cannot be cancelled. If you would like to use a {@code Promise}
- * for the sake of cancellability but not provide a usable result, you can declare type+s of the form
- * {@code Promise>} and return {@code null} as a result of the underlying task.
- *
- * @version OMS 1.0.0
- * @since OMS 1.0.0
- */
-public interface Promise extends Future {
-
- /**
- * Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already
- * been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started
- * when {@code cancel} is called, this task should never run. If the task has already started, then the {@code
- * mayInterruptIfRunning} parameter determines whether the thread executing this task should be interrupted in an
- * attempt to stop the task.
- *
- * After this method returns, subsequent calls to {@link #isDone} will always return {@code true}. Subsequent calls
- * to {@link #isCancelled} will always return {@code true} if this method returned {@code true}.
- *
- * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; otherwise,
- * in-progress tasks are allowed to complete
- * @return {@code false} if the task could not be cancelled, typically because it has already completed normally;
- * {@code true} otherwise
- */
- boolean cancel(boolean mayInterruptIfRunning);
-
- /**
- * Set the value to this promise and mark it completed if set successfully.
- *
- * @param value Value
- * @return Whether set is success
- */
- boolean set(V value);
-
- /**
- * Marks this promise as a failure and notifies all listeners.
- *
- * @param cause the cause
- * @return Whether set is success
- */
- boolean setFailure(Throwable cause);
-}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java
deleted file mode 100644
index eafdb18f..00000000
--- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.openmessaging;
-
-/**
- * A collection of all service states.
- *
- * @version OMS 1.0.0
- * @since OMS 1.0.0
- */
-public enum ServiceLifeState {
-
- /**
- * Service has been initialized.
- */
- INITIALIZED,
-
- /**
- * Service in starting.
- */
- STARTING,
-
- /**
- * Service in running.
- */
- STARTED,
-
- /**
- * Service is stopping.
- */
- STOPPING,
-
- /**
- * Service has been stopped.
- */
- STOPPED,
-}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java
deleted file mode 100644
index b5b9d490..00000000
--- a/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.annotation;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- *
- * A {@code Optional} is an annotation to mark some certain methods ,interfaces and etc. this annotation represented
- * these methods or interfaces are not mandatory in OpenMessaging.
- *
- *
- *
- * If these methods or interfaces adopted by more and more vendors and end users, they may be become the mandatory
- * interface in the future. Of course, if they are used very little, they may be removed.
- *
- *
- * @version OMS 1.0.0
- * @since OMS 1.0.0
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.PACKAGE, ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
-public @interface Optional {
-}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/Action.java b/openmessaging-api/src/main/java/io/openmessaging/api/Action.java
new file mode 100644
index 00000000..851163fc
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/Action.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.api;
+
+
+public enum Action {
+
+ CommitMessage,
+
+ ReconsumeLater,
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/Admin.java b/openmessaging-api/src/main/java/io/openmessaging/api/Admin.java
new file mode 100644
index 00000000..5fd24301
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/Admin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+/**
+ *
+ * Client basic management interface, used for as a unified interface to manage basic operations such as start, stop,
+ * and authorization information management.
+ *
+ *
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public interface Admin extends LifeCycle, Credentials {
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/ConsumeContext.java b/openmessaging-api/src/main/java/io/openmessaging/api/ConsumeContext.java
new file mode 100644
index 00000000..e2f9996a
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/ConsumeContext.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+public class ConsumeContext {
+
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/api/Consumer.java
new file mode 100644
index 00000000..4ed9cec0
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/Consumer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+/**
+ * Consumer interface.
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public interface Consumer extends Admin {
+
+ /**
+ * Subscribe message in order.
+ *
+ * @param topic message topic.
+ * @param subExpression Subscribe to the filter expression string, which the broker filters based on this
+ * expression. eg: "tag1 || tag2 || tag3" , if subExpression is equal to null or *, it means subscribe all
+ * messages.
+ * @param listener The message callback listener, the consumer receives the message and then passes it to the
+ * message callback listener for consumption.
+ */
+ void subscribe(final String topic, final String subExpression, final MessageListener listener);
+
+ /**
+ * Subscribe to messages, which can be filtered using SQL expressions.
+ *
+ * @param topic
+ * @param selector Subscribe to the message selector (can be empty, indicating no filtering), the ONS server filters
+ * according to the expression in this selector. Currently supports two expression syntax: {@link
+ * ExpressionType#TAG}, {@link ExpressionType#SQL92} Among them, the effect of TAG filtering is consistent with the
+ * above interface.
+ * @param listener Message callback listener
+ */
+ void subscribe(final String topic, final MessageSelector selector, final MessageListener listener);
+
+ /**
+ * Unsubscribe message
+ *
+ * @param topic
+ */
+ void unsubscribe(final String topic);
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/Credentials.java b/openmessaging-api/src/main/java/io/openmessaging/api/Credentials.java
new file mode 100644
index 00000000..674b9db5
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/Credentials.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+import java.util.Properties;
+
+/**
+ *
+ * Used for update credentials.
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public interface Credentials {
+ /**
+ * Update credentials for instance, properties can be found in {@link OMSBuiltinKeys}
+ * @param credentialProperties
+ */
+ void updateCredential(Properties credentialProperties);
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/ExpressionType.java b/openmessaging-api/src/main/java/io/openmessaging/api/ExpressionType.java
new file mode 100644
index 00000000..64a444a3
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/ExpressionType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+/**
+ * Message filter expression type.
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public enum ExpressionType {
+
+ SQL92,
+
+ TAG
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/Future.java b/openmessaging-api/src/main/java/io/openmessaging/api/Future.java
similarity index 98%
rename from openmessaging-api/src/main/java/io/openmessaging/Future.java
rename to openmessaging-api/src/main/java/io/openmessaging/api/Future.java
index e0973549..a04dc520 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/Future.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/Future.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package io.openmessaging;
+package io.openmessaging.api;
/**
*
@@ -28,8 +28,8 @@
* The reason for adding this future is mainly to satisfy the old version of jdk, such as jdk 1.6.
*
*
- * @version OMS 1.0.0
- * @since OMS 1.0.0
+ * @version OMS 1.1.0
+ * @since OMS 1.1.0
*/
public interface Future {
/**
diff --git a/openmessaging-api/src/main/java/io/openmessaging/FutureListener.java b/openmessaging-api/src/main/java/io/openmessaging/api/FutureListener.java
similarity index 87%
rename from openmessaging-api/src/main/java/io/openmessaging/FutureListener.java
rename to openmessaging-api/src/main/java/io/openmessaging/api/FutureListener.java
index 866d7449..06010cfa 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/FutureListener.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/FutureListener.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package io.openmessaging;
+package io.openmessaging.api;
/**
* A listener that is called back when a Promise is done.
* {@code FutureListener} instances are attached to {@link Future} by passing
* them in to {@link Future#addListener(FutureListener)}.
*
- * @version OMS 1.0.0
- * @since OMS 1.0.0
+ * @version OMS 1.1.0
+ * @since OMS 1.1.0
*/
public interface FutureListener {
/**
- * Invoked when the operation completes, be the associated {@link Promise} successful or not.
+ * Invoked when the operation completes, successful or not.
* @param future The associated promise facade
*/
void operationComplete(Future future);
diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java b/openmessaging-api/src/main/java/io/openmessaging/api/LifeCycle.java
similarity index 55%
rename from openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java
rename to openmessaging-api/src/main/java/io/openmessaging/api/LifeCycle.java
index 3066cd56..8b562d8f 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/LifeCycle.java
@@ -8,32 +8,41 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
-package io.openmessaging;
-
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.extension.Extension;
-import io.openmessaging.producer.Producer;
+package io.openmessaging.api;
/**
- * The {@code ServiceLifecycle} defines a lifecycle interface for a OMS related service endpoint, like {@link Producer},
- * {@link Consumer}, and so on.
+ * The {@code LifeCycle} defines a lifecycle interface for a OMS related service endpoint, like {@link Producer}, {@link
+ * Consumer}, and so on.
*
* If the service endpoint class implements the {@code ServiceLifecycle} interface, most of the containers can manage
* the lifecycle of the corresponding service endpoint objects easily.
*
* Any service endpoint should support repeated restart if it implements the {@code ServiceLifecycle} interface.
*
- * @version OMS 1.0.0
- * @since OMS 1.0.0
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
*/
-public interface ServiceLifecycle extends Extension {
+public interface LifeCycle {
+ /**
+ * Used to determine whether the current instance is started.
+ *
+ * @return if this instance has been started success, this method will return true, otherwise false.
+ */
+ boolean isStarted();
+
+ /**
+ * Used to determine whether the current instance is closed.
+ *
+ * @return if this instance has been stopped, this method will return true, otherwise false.
+ */
+ boolean isClosed();
+
/**
* Used for startup or initialization of a service endpoint. A service endpoint instance will be in a ready state
* after this method has been completed.
@@ -44,13 +53,5 @@ public interface ServiceLifecycle extends Extension {
* Notify a service instance of the end of its life cycle. Once this method completes, the service endpoint could be
* destroyed and eligible for garbage collection.
*/
- void stop();
-
- /**
- * Used for get service current state, for execution of some operations is dependent on the current service state.
- *
- * @return This service current state {@link ServiceLifeState}
- */
- ServiceLifeState currentState();
-
+ void shutdown();
}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/Message.java b/openmessaging-api/src/main/java/io/openmessaging/api/Message.java
new file mode 100644
index 00000000..ad5d4256
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/Message.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is
+ * {@link Message}.
+ *
+ * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and
+ * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ.
+ *
+ * The header contains fields used by the messaging system that describes the message's meta information, while the body
+ * contains the application data being transmitted.
+ *
+ * As for the message header, OMS defines two kinds types: userProperties and systemProperties with respect to
+ * flexibility in vendor implementation and user usage.
+ *
+ *
+ * System Properties, OMS defines some standard attributes in {@link SystemPropKey} that represent the characteristics
+ * of the message.
+ *
+ *
+ * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to
+ * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios.
+ *
+ *
+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and
+ * simply transmitted to its destination.
+ *
+ * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the
+ * messaging system. The application is responsible for explaining the concrete content and format of the message body,
+ * OMS is never aware of that.
+ *
+ * The body part is placed in the implementation classes of {@code Message}.
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public class Message implements Serializable {
+
+ private static final long serialVersionUID = -1385924226856188094L;
+
+ protected Properties systemProperties;
+
+ private String topic;
+
+ private Properties userProperties;
+
+ private byte[] body;
+
+ public Message() {
+ this(null, null, "", null);
+ }
+
+ public Message(String topic, String tag, String key, byte[] body) {
+ this.topic = topic;
+ this.body = body;
+
+ this.putSystemProperties(SystemPropKey.TAG, tag);
+ this.putSystemProperties(SystemPropKey.KEY, key);
+ }
+
+ public void putSystemProperties(final String key, final String value) {
+ if (null == this.systemProperties) {
+ this.systemProperties = new Properties();
+ }
+
+ if (key != null && value != null) {
+ this.systemProperties.put(key, value);
+ }
+ }
+
+ public Message(String topic, String tags, byte[] body) {
+ this(topic, tags, "", body);
+ }
+
+ public void putUserProperties(final String key, final String value) {
+ if (null == this.userProperties) {
+ this.userProperties = new Properties();
+ }
+
+ if (key != null && value != null) {
+ this.userProperties.put(key, value);
+ }
+ }
+
+ public String getUserProperties(final String key) {
+ if (null != this.userProperties) {
+ return (String) this.userProperties.get(key);
+ }
+
+ return null;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTag() {
+ return this.getSystemProperties(SystemPropKey.TAG);
+ }
+
+ public String getSystemProperties(final String key) {
+ if (null != this.systemProperties) {
+ return this.systemProperties.getProperty(key);
+ }
+
+ return null;
+ }
+
+ public void setTag(String tag) {
+ this.putSystemProperties(SystemPropKey.TAG, tag);
+ }
+
+ public String getKey() {
+ return this.getSystemProperties(SystemPropKey.KEY);
+ }
+
+ public void setKey(String key) {
+ this.putSystemProperties(SystemPropKey.KEY, key);
+ }
+
+ public String getMsgID() {
+ return this.getSystemProperties(SystemPropKey.MSGID);
+ }
+
+ public void setMsgID(String msgid) {
+ this.putSystemProperties(SystemPropKey.MSGID, msgid);
+ }
+
+ public Properties getSystemProperties() {
+ if (null == systemProperties) {
+ return new Properties();
+ }
+ return systemProperties;
+ }
+
+ public void setSystemProperties(Properties systemProperties) {
+ this.systemProperties = systemProperties;
+ }
+
+ public Properties getUserProperties() {
+ if (null == userProperties) {
+ return new Properties();
+ }
+ return userProperties;
+ }
+
+ public void setUserProperties(Properties userProperties) {
+ this.userProperties = userProperties;
+ }
+
+ public byte[] getBody() {
+ return body;
+ }
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+ public int getReconsumeTimes() {
+ String pro = this.getSystemProperties(SystemPropKey.RECONSUMETIMES);
+ if (pro != null) {
+ return Integer.parseInt(pro);
+ }
+
+ return 0;
+ }
+
+ public void setReconsumeTimes(final int value) {
+ putSystemProperties(SystemPropKey.RECONSUMETIMES, String.valueOf(value));
+ }
+
+ public long getBornTimestamp() {
+ String pro = this.getSystemProperties(SystemPropKey.BORNTIMESTAMP);
+ if (pro != null) {
+ return Long.parseLong(pro);
+ }
+
+ return 0L;
+ }
+
+ public void setBornTimestamp(final long value) {
+ putSystemProperties(SystemPropKey.BORNTIMESTAMP, String.valueOf(value));
+ }
+
+ public String getBornHost() {
+ String pro = this.getSystemProperties(SystemPropKey.BORNHOST);
+ return pro == null ? "" : pro;
+ }
+
+ public void setBornHost(final String value) {
+ putSystemProperties(SystemPropKey.BORNHOST, value);
+ }
+
+ public long getStartDeliverTime() {
+ String pro = this.getSystemProperties(SystemPropKey.STARTDELIVERTIME);
+ if (pro != null) {
+ return Long.parseLong(pro);
+ }
+
+ return 0L;
+ }
+
+ public String getShardingKey() {
+ String pro = this.getSystemProperties(SystemPropKey.SHARDINGKEY);
+ return pro == null ? "" : pro;
+ }
+
+ public void setShardingKey(final String value) {
+ putSystemProperties(SystemPropKey.SHARDINGKEY, value);
+ }
+
+ public void setStartDeliverTime(final long value) {
+ putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value));
+ }
+
+ /**
+ * Get the offset of this message assigned by the broker.
+ *
+ * @return Message offset in relative partition
+ */
+ public long getOffset() {
+ String v = getSystemProperties(SystemPropKey.CONSUMEOFFSET);
+ if (v != null) {
+ return Long.parseLong(v);
+ }
+ return 0;
+ }
+
+ /**
+ * Get the partition to which the message belongs.
+ *
+ * @return Message offset in relative partition
+ */
+ public TopicPartition getTopicPartition() {
+ return new TopicPartition(topic, getSystemProperties(SystemPropKey.PARTITION));
+ }
+
+ @Override
+ public String toString() {
+ return "Message [topic=" + topic + ", systemProperties=" + systemProperties + ", userProperties=" + userProperties + ", body="
+ + (body != null ? body.length : 0) + "]";
+ }
+
+ static public class SystemPropKey {
+ public static final String TAG = "__TAG";
+ public static final String KEY = "__KEY";
+ public static final String MSGID = "__MSGID";
+ public static final String SHARDINGKEY = "__SHARDINGKEY";
+ public static final String RECONSUMETIMES = "__RECONSUMETIMES";
+ public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";
+ public static final String BORNHOST = "__BORNHOST";
+
+ public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";
+
+ public static final String CONSUMEOFFSET = "__CONSUMEOFFSET";
+
+ public static final String PARTITION = "__PARTITION";
+ }
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/MessageAccessor.java b/openmessaging-api/src/main/java/io/openmessaging/api/MessageAccessor.java
new file mode 100644
index 00000000..a20e72b0
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/MessageAccessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+import java.util.Properties;
+
+/**
+ * Used for set or get the relevant properties of a message.
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public class MessageAccessor {
+ /**
+ * Used for get all system properties.
+ *
+ * @param msg
+ * @return system properties
+ */
+ public static Properties getSystemProperties(final Message msg) {
+ return msg.systemProperties;
+ }
+
+ /**
+ * Used for set system properties, will used new systemProperties to override current systemProperties.
+ *
+ * @param msg
+ * @return system properties
+ */
+ public static void setSystemProperties(final Message msg, Properties systemProperties) {
+ msg.systemProperties = systemProperties;
+ }
+
+ /**
+ * Used for set system property for a specified key, will used new value to replace origin system property of a
+ * specified key.
+ *
+ * @param msg
+ * @return
+ */
+ public static void putSystemProperties(final Message msg, final String key, final String value) {
+ msg.putSystemProperties(key, value);
+ }
+
+ /**
+ * Used for get a system property value for a specified key.
+ *
+ * @param msg
+ * @return system property value of specified key
+ */
+ public static String getSystemProperties(final Message msg, final String key) {
+ return msg.getSystemProperties(key);
+ }
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/MessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/api/MessageListener.java
new file mode 100644
index 00000000..4cc2b379
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/MessageListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.api;
+
+/**
+ * Message consume listener, registed for consume messages by consumer.
+ *
+ *
+ * Thread safe requirements: this interface will be invoked by multi threads, so users should keep thread safe during
+ * the consume process.
+ *
+ *
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.2.0
+ */
+public interface MessageListener {
+
+ /**
+ * Consumer message interface, implemented by the application, unstable situations such as network jitter may lead
+ * to message duplication, and services sensitive to repeated messages need to guarantee idempotent.
+ *
+ * @param message
+ * @param context
+ * @return if current message consumed success, {@link Action#CommitMessage} should be returned, otherwise, {@link
+ * Action#ReconsumeLater} should be returned, and this message will be delivered again.
+ */
+ Action consume(final Message message, final ConsumeContext context);
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/MessageSelector.java b/openmessaging-api/src/main/java/io/openmessaging/api/MessageSelector.java
new file mode 100644
index 00000000..37bde240
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/MessageSelector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.api;
+
+public class MessageSelector {
+
+ private ExpressionType type;
+
+ private String subExpression;
+
+ public static MessageSelector bySql(String subExpression) {
+ return new MessageSelector(ExpressionType.SQL92, subExpression);
+ }
+
+ public static MessageSelector byTag(String subExpression) {
+ return new MessageSelector(ExpressionType.TAG, subExpression);
+ }
+
+ private MessageSelector() {
+ }
+
+ private MessageSelector(ExpressionType type, String subExpression) {
+ this.type = type;
+ this.subExpression = subExpression;
+ }
+
+ public ExpressionType getType() {
+ return type;
+ }
+
+ public String getSubExpression() {
+ return subExpression;
+ }
+}
\ No newline at end of file
diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/api/MessagingAccessPoint.java
similarity index 52%
rename from openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java
rename to openmessaging-api/src/main/java/io/openmessaging/api/MessagingAccessPoint.java
index 80bee1dd..5166d79e 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/MessagingAccessPoint.java
@@ -15,19 +15,14 @@
* limitations under the License.
*/
-package io.openmessaging;
+package io.openmessaging.api;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.consumer.MessageListener;
-import io.openmessaging.consumer.PullConsumer;
-import io.openmessaging.consumer.PushConsumer;
-import io.openmessaging.exception.OMSRuntimeException;
-import io.openmessaging.exception.OMSSecurityException;
-import io.openmessaging.manager.ResourceManager;
-import io.openmessaging.message.MessageFactory;
-import io.openmessaging.producer.Producer;
-import io.openmessaging.producer.TransactionStateCheckListener;
-import java.util.Collection;
+import io.openmessaging.api.batch.BatchConsumer;
+import io.openmessaging.api.order.OrderConsumer;
+import io.openmessaging.api.order.OrderProducer;
+import io.openmessaging.api.transaction.LocalTransactionChecker;
+import io.openmessaging.api.transaction.TransactionProducer;
+import java.util.Properties;
/**
* An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code
@@ -39,11 +34,10 @@
* messagingAccessPoint.startup();
* Producer producer = messagingAccessPoint.createProducer();
* producer.startup();
- * producer.send(producer.createBytesMessage("HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
*
*
- * @version OMS 1.0.0
- * @since OMS 1.0.0
+ * @version OMS 1.1.0
+ * @since OMS 1.1.0
*/
public interface MessagingAccessPoint {
@@ -60,16 +54,15 @@ public interface MessagingAccessPoint {
*
* There are some standard attributes defined by OMS for {@code MessagingAccessPoint}:
*
- *
{@link OMSBuiltinKeys#ACCESS_POINTS}, the specified access points.
+ *
{@link OMSBuiltinKeys#ENDPOINT}, the specified access points.
*
{@link OMSBuiltinKeys#DRIVER_IMPL}, the fully qualified class name of the specified MessagingAccessPoint's
* implementation, the default value is {@literal io.openmessaging..MessagingAccessPointImpl}.
*
{@link OMSBuiltinKeys#REGION}, the region the resources reside in.
- *
{@link OMSBuiltinKeys#ACCOUNT_ID}, the ID of the specific account system that owns the resource.
*
*
* @return the attributes
*/
- KeyValue attributes();
+ Properties attributes();
/**
* Creates a new {@code Producer} for the specified {@code MessagingAccessPoint}.
@@ -79,75 +72,66 @@ public interface MessagingAccessPoint {
* error
* @throws OMSSecurityException if have no authority to create a producer.
*/
- Producer createProducer();
+ Producer createProducer(final Properties properties);
/**
- * Creates a new transactional {@code Producer} for the specified {@code MessagingAccessPoint}, the producer is able
- * to respond to requests from the server to check the status of the transaction.
+ * Creates a new {@code OrderProducer} for the specified {@code MessagingAccessPoint}.
*
- * @param transactionStateCheckListener transactional check listener {@link TransactionStateCheckListener}
- * @return the created {@code Producer}
+ * @return the created {@code OrderProducer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
* error
* @throws OMSSecurityException if have no authority to create a producer.
*/
- Producer createProducer(TransactionStateCheckListener transactionStateCheckListener);
+ OrderProducer createOrderProducer(final Properties properties);
/**
- * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}.
- * The returned {@code PushConsumer} isn't attached to any queue,
- * uses {@link PushConsumer#bindQueue(Collection, MessageListener)} to attach queues.
+ * Creates a new {@code TransactionProducer} for the specified {@code MessagingAccessPoint}.
*
- * @return the created {@code PushConsumer}
- * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
- * due to some internal error
- */
- PushConsumer createPushConsumer();
-
- /**
- * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
- *
- * @return the created {@code PullConsumer}
- * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
- * due to some internal error
+ * @return the created {@code TransactionProducer}
+ * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
+ * error
+ * @throws OMSSecurityException if have no authority to create a producer.
*/
- PullConsumer createPullConsumer();
+ TransactionProducer createTransactionProducer(final Properties properties, final LocalTransactionChecker checker);
/**
- * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes.
+ * Creates a new {@code Consumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
+ * isn't subscribe any topic, and default Consumer will running with push model.
*
- * @param attributes the preset attributes
- * @return the created {@code PushConsumer}
- * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
- * due to some internal error
+ * @return the created {@code Consumer}
+ * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
+ * error
*/
- PushConsumer createPushConsumer(KeyValue attributes);
+ Consumer createConsumer(final Properties properties);
/**
- * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
+ * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
+ * isn't subscribe any topic.
*
* @return the created {@code PullConsumer}
- * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
- * due to some internal error
+ * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
+ * error
*/
- PullConsumer createPullConsumer(KeyValue attributes);
+ PullConsumer createPullConsumer(final Properties properties);
+
/**
- * Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}.
+ * Creates a new {@code BatchConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
+ * isn't subscribe any topic.
*
- * @return the resource manger
+ * @return the created {@code BatchConsumer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
* error
- * @throws OMSSecurityException if have no authority to obtain a resource manager.
*/
- ResourceManager resourceManager();
+ BatchConsumer createBatchConsumer(final Properties properties);
/**
- * Gets a {@link MessageFactory} instance from the specified {@code MessagingAccessPoint}.
+ * Creates a new {@code OrderConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
+ * isn't subscribe any topic.
*
- * @return the resource manger
+ * @return the created {@code OrderConsumer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
* error
*/
- MessageFactory messageFactory();
+ OrderConsumer createOrderedConsumer(final Properties properties);
}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/OMS.java b/openmessaging-api/src/main/java/io/openmessaging/api/OMS.java
new file mode 100644
index 00000000..908b1ed8
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/api/OMS.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.api;
+
+import io.openmessaging.api.internal.MessagingAccessPointAdapter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ *
+ * The oms class provides some static methods to create a {@code MessagingAccessPoint} from the specified OMS driver url
+ * and some useful util methods.
+ *
+ *
+ *
+ * The brackets indicate that the extra access points are optional, and a correct OMS driver url needs at least one
+ * access point, which consists of hostname and port, like localhost:8081.
+ *
+ *
+ * @version OMS 1.2.0
+ * @since OMS 1.1.0
+ */
+public final class OMS {
+
+ private final Properties properties = new Properties();
+
+ public static OMS builder() {
+ return new OMS();
+ }
+
+ /**
+ * Set the endpoint provided by messaging vendor.
+ *
+ * @param endpoint
+ * @return
+ */
+ public OMS endpoint(String endpoint) {
+ this.properties.put(OMSBuiltinKeys.ENDPOINT, endpoint);
+ return this;
+ }
+
+ /**
+ * Set the region provided by messaging vendor.
+ *
+ * @param region
+ * @return
+ */
+ public OMS region(String region) {
+ this.properties.put(OMSBuiltinKeys.REGION, region);
+ return this;
+ }
+
+ /**
+ *
+ * Set the the driver type of the specified MessagingAccessPoint's * implementation, the default value is {@literal
+ * io.openmessaging..MessagingAccessPointImpl}.
+ *
+ *
+ *
+ * But if the {@link OMS#driverImpl(String)} attribute was set, this attribute will be ignored.
+ *
+ * Set the the fully qualified class name of the specified MessagingAccessPoint's * implementation, the default
+ * value is {@literal io.openmessaging..MessagingAccessPointImpl}.
+ *
+ *
+ *
+ * If this attribute was set, {@link OMS#driver(String)} will be ignored.
+ *