2121import com .google .cloud .Role ;
2222import com .google .cloud .pubsub .Message ;
2323import com .google .cloud .pubsub .PubSub ;
24- import com .google .cloud .pubsub .PubSub .MessageProcessor ;
2524import com .google .cloud .pubsub .PubSubOptions ;
2625import com .google .cloud .pubsub .PushConfig ;
27- import com .google .cloud .pubsub .ReceivedMessage ;
2826import com .google .cloud .pubsub .Subscription ;
2927import com .google .cloud .pubsub .SubscriptionId ;
3028import com .google .cloud .pubsub .SubscriptionInfo ;
@@ -453,39 +451,6 @@ public String params() {
453451 }
454452 }
455453
456- /**
457- * This class demonstrates how to acknowledge Pub/Sub messages for a subscription.
458- *
459- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
460- * pull messages</a>
461- */
462- private static class AckMessagesAction extends MessagesAction {
463- @ Override
464- public void run (PubSub pubsub , Tuple <String , List <String >> params ) {
465- String subscription = params .x ();
466- List <String > ackIds = params .y ();
467- pubsub .ack (subscription , ackIds );
468- System .out .printf ("Acked %d messages for subscription %s%n" , ackIds .size (), subscription );
469- }
470- }
471-
472- /**
473- * This class demonstrates how to "nack" Pub/Sub messages for a subscription. This action
474- * corresponds to setting the acknowledge deadline to 0.
475- *
476- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Message
477- * acknowledgement deadline</a>
478- */
479- private static class NackMessagesAction extends MessagesAction {
480- @ Override
481- public void run (PubSub pubsub , Tuple <String , List <String >> params ) {
482- String subscription = params .x ();
483- List <String > ackIds = params .y ();
484- pubsub .nack (subscription , ackIds );
485- System .out .printf ("Nacked %d messages for subscription %s%n" , ackIds .size (), subscription );
486- }
487- }
488-
489454 /**
490455 * This class demonstrates how modify the acknowledge deadline for messages in a Pub/Sub
491456 * subscription.
@@ -543,102 +508,6 @@ public String params() {
543508 }
544509 }
545510
546- /**
547- * This class demonstrates how to asynchronously pull messages from a Pub/Sub pull subscription.
548- * Messages are pulled until a timeout is reached.
549- *
550- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
551- * pull messages</a>
552- */
553- private static class PullAsyncAction extends PubSubAction <Tuple <String , Long >> {
554- @ Override
555- public void run (PubSub pubsub , Tuple <String , Long > params ) throws Exception {
556- String subscription = params .x ();
557- Long timeout = params .y ();
558- final AtomicInteger messageCount = new AtomicInteger ();
559- MessageProcessor messageProcessor = new MessageProcessor () {
560-
561- @ Override
562- public void process (Message message ) throws Exception {
563- System .out .printf ("Received message \" %s\" %n" , message );
564- messageCount .incrementAndGet ();
565- }
566- };
567- try (PubSub .MessageConsumer consumer = pubsub .pullAsync (subscription , messageProcessor )) {
568- Thread .sleep (timeout );
569- }
570- System .out .printf ("Pulled %d messages from subscription %s%n" , messageCount .get (),
571- subscription );
572- }
573-
574- @ Override
575- Tuple <String , Long > parse (String ... args ) throws Exception {
576- String message ;
577- if (args .length > 2 ) {
578- message = "Too many arguments." ;
579- } else if (args .length < 1 ) {
580- message = "Missing required subscription name" ;
581- } else {
582- String subscription = args [0 ];
583- long timeout = 60_000 ;
584- if (args .length == 2 ) {
585- timeout = Long .parseLong (args [1 ]);
586- }
587- return Tuple .of (subscription , timeout );
588- }
589- throw new IllegalArgumentException (message );
590- }
591-
592- @ Override
593- public String params () {
594- return "<subscription> <timeoutMillis>?" ;
595- }
596- }
597-
598- /**
599- * This class demonstrates how to synchronously pull messages from a Pub/Sub pull subscription.
600- * No more than the requested number of messages are pulled. Possibly less messages are pulled.
601- *
602- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
603- * pull messages</a>
604- */
605- private static class PullSyncAction extends PubSubAction <Tuple <String , Integer >> {
606- @ Override
607- public void run (PubSub pubsub , Tuple <String , Integer > params ) throws Exception {
608- String subscription = params .x ();
609- Integer maxMessages = params .y ();
610- Iterator <ReceivedMessage > messageIterator = pubsub .pull (subscription , maxMessages );
611- int messageCount = 0 ;
612- while (messageIterator .hasNext ()) {
613- ReceivedMessage message = messageIterator .next ();
614- System .out .printf ("Received message \" %s\" %n" , message );
615- message .ack ();
616- messageCount ++;
617- }
618- System .out .printf ("Pulled %d messages from subscription %s%n" , messageCount , subscription );
619- }
620-
621- @ Override
622- Tuple <String , Integer > parse (String ... args ) throws Exception {
623- String message ;
624- if (args .length == 2 ) {
625- String subscription = args [0 ];
626- int maxMessages = Integer .parseInt (args [1 ]);
627- return Tuple .of (subscription , maxMessages );
628- } else if (args .length > 2 ) {
629- message = "Too many arguments." ;
630- } else {
631- message = "Missing required subscription name" ;
632- }
633- throw new IllegalArgumentException (message );
634- }
635-
636- @ Override
637- public String params () {
638- return "<subscription> <maxMessages>" ;
639- }
640- }
641-
642511 private abstract static class GetPolicyAction extends PubSubAction <String > {
643512 @ Override
644513 String parse (String ... args ) throws Exception {
@@ -817,8 +686,6 @@ public void run(PubSub pubsub, Tuple<String, List<String>> param) throws Excepti
817686 LIST_ACTIONS .put ("subscriptions" , new ListSubscriptionsAction ());
818687 DELETE_ACTIONS .put ("topic" , new DeleteTopicAction ());
819688 DELETE_ACTIONS .put ("subscription" , new DeleteSubscriptionAction ());
820- PULL_ACTIONS .put ("async" , new PullAsyncAction ());
821- PULL_ACTIONS .put ("sync" , new PullSyncAction ());
822689 GET_IAM_ACTIONS .put ("topic" , new GetTopicPolicyAction ());
823690 GET_IAM_ACTIONS .put ("subscription" , new GetSubscriptionPolicyAction ());
824691 REPLACE_IAM_ACTIONS .put ("topic" , new AddIdentityTopicAction ());
@@ -835,8 +702,6 @@ public void run(PubSub pubsub, Tuple<String, List<String>> param) throws Excepti
835702 ACTIONS .put ("test-permissions" , new ParentAction (TEST_IAM_ACTIONS ));
836703 ACTIONS .put ("publish" , new PublishMessagesAction ());
837704 ACTIONS .put ("replace-push-config" , new ReplacePushConfigAction ());
838- ACTIONS .put ("ack" , new AckMessagesAction ());
839- ACTIONS .put ("nack" , new NackMessagesAction ());
840705 ACTIONS .put ("modify-ack-deadline" , new ModifyAckDeadlineAction ());
841706 }
842707
0 commit comments