2121import com .google .cloud .Role ;
2222import com .google .cloud .pubsub .Message ;
2323import com .google .cloud .pubsub .PubSub ;
24- import com .google .cloud .pubsub .PubSub .MessageProcessor ;
2524import com .google .cloud .pubsub .PubSubOptions ;
2625import com .google .cloud .pubsub .PushConfig ;
27- import com .google .cloud .pubsub .ReceivedMessage ;
2826import com .google .cloud .pubsub .Subscription ;
2927import com .google .cloud .pubsub .SubscriptionId ;
3028import com .google .cloud .pubsub .SubscriptionInfo ;
@@ -453,192 +451,6 @@ public String params() {
453451 }
454452 }
455453
456- /**
457- * This class demonstrates how to acknowledge Pub/Sub messages for a subscription.
458- *
459- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
460- * pull messages</a>
461- */
462- private static class AckMessagesAction extends MessagesAction {
463- @ Override
464- public void run (PubSub pubsub , Tuple <String , List <String >> params ) {
465- String subscription = params .x ();
466- List <String > ackIds = params .y ();
467- pubsub .ack (subscription , ackIds );
468- System .out .printf ("Acked %d messages for subscription %s%n" , ackIds .size (), subscription );
469- }
470- }
471-
472- /**
473- * This class demonstrates how to "nack" Pub/Sub messages for a subscription. This action
474- * corresponds to setting the acknowledge deadline to 0.
475- *
476- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Message
477- * acknowledgement deadline</a>
478- */
479- private static class NackMessagesAction extends MessagesAction {
480- @ Override
481- public void run (PubSub pubsub , Tuple <String , List <String >> params ) {
482- String subscription = params .x ();
483- List <String > ackIds = params .y ();
484- pubsub .nack (subscription , ackIds );
485- System .out .printf ("Nacked %d messages for subscription %s%n" , ackIds .size (), subscription );
486- }
487- }
488-
489- /**
490- * This class demonstrates how modify the acknowledge deadline for messages in a Pub/Sub
491- * subscription.
492- *
493- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Message
494- * acknowledgement deadline</a>
495- */
496- private static class ModifyAckDeadlineAction
497- extends PubSubAction <Tuple <ModifyAckDeadlineAction .SubscriptionAndDeadline , List <String >>> {
498-
499- static class SubscriptionAndDeadline {
500-
501- private final String subscription ;
502- private final int deadlineMillis ;
503-
504- private SubscriptionAndDeadline (String subscription , int deadlineMillis ) {
505- this .subscription = subscription ;
506- this .deadlineMillis = deadlineMillis ;
507- }
508-
509- String subscription () {
510- return subscription ;
511- }
512-
513- int deadlineMillis () {
514- return deadlineMillis ;
515- }
516- }
517-
518- @ Override
519- public void run (PubSub pubsub , Tuple <SubscriptionAndDeadline , List <String >> params )
520- throws Exception {
521- String subscription = params .x ().subscription ();
522- int deadline = params .x ().deadlineMillis ();
523- List <String > ackIds = params .y ();
524- pubsub .modifyAckDeadline (subscription , deadline , TimeUnit .MILLISECONDS , ackIds );
525- System .out .printf ("Ack deadline set to %d for %d messages in subscription %s%n" , deadline ,
526- ackIds .size (), subscription );
527- }
528-
529- @ Override
530- Tuple <SubscriptionAndDeadline , List <String >> parse (String ... args ) throws Exception {
531- if (args .length < 3 ) {
532- throw new IllegalArgumentException ("Missing required subscription, deadline and ack IDs" );
533- }
534- String subscription = args [0 ];
535- int deadline = Integer .parseInt (args [1 ]);
536- return Tuple .of (new SubscriptionAndDeadline (subscription , deadline ),
537- Arrays .asList (Arrays .copyOfRange (args , 2 , args .length )));
538- }
539-
540- @ Override
541- public String params () {
542- return "<subscription> <deadlineMillis> <ackId>+" ;
543- }
544- }
545-
546- /**
547- * This class demonstrates how to asynchronously pull messages from a Pub/Sub pull subscription.
548- * Messages are pulled until a timeout is reached.
549- *
550- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
551- * pull messages</a>
552- */
553- private static class PullAsyncAction extends PubSubAction <Tuple <String , Long >> {
554- @ Override
555- public void run (PubSub pubsub , Tuple <String , Long > params ) throws Exception {
556- String subscription = params .x ();
557- Long timeout = params .y ();
558- final AtomicInteger messageCount = new AtomicInteger ();
559- MessageProcessor messageProcessor = new MessageProcessor () {
560-
561- @ Override
562- public void process (Message message ) throws Exception {
563- System .out .printf ("Received message \" %s\" %n" , message );
564- messageCount .incrementAndGet ();
565- }
566- };
567- try (PubSub .MessageConsumer consumer = pubsub .pullAsync (subscription , messageProcessor )) {
568- Thread .sleep (timeout );
569- }
570- System .out .printf ("Pulled %d messages from subscription %s%n" , messageCount .get (),
571- subscription );
572- }
573-
574- @ Override
575- Tuple <String , Long > parse (String ... args ) throws Exception {
576- String message ;
577- if (args .length > 2 ) {
578- message = "Too many arguments." ;
579- } else if (args .length < 1 ) {
580- message = "Missing required subscription name" ;
581- } else {
582- String subscription = args [0 ];
583- long timeout = 60_000 ;
584- if (args .length == 2 ) {
585- timeout = Long .parseLong (args [1 ]);
586- }
587- return Tuple .of (subscription , timeout );
588- }
589- throw new IllegalArgumentException (message );
590- }
591-
592- @ Override
593- public String params () {
594- return "<subscription> <timeoutMillis>?" ;
595- }
596- }
597-
598- /**
599- * This class demonstrates how to synchronously pull messages from a Pub/Sub pull subscription.
600- * No more than the requested number of messages are pulled. Possibly less messages are pulled.
601- *
602- * @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
603- * pull messages</a>
604- */
605- private static class PullSyncAction extends PubSubAction <Tuple <String , Integer >> {
606- @ Override
607- public void run (PubSub pubsub , Tuple <String , Integer > params ) throws Exception {
608- String subscription = params .x ();
609- Integer maxMessages = params .y ();
610- Iterator <ReceivedMessage > messageIterator = pubsub .pull (subscription , maxMessages );
611- int messageCount = 0 ;
612- while (messageIterator .hasNext ()) {
613- ReceivedMessage message = messageIterator .next ();
614- System .out .printf ("Received message \" %s\" %n" , message );
615- message .ack ();
616- messageCount ++;
617- }
618- System .out .printf ("Pulled %d messages from subscription %s%n" , messageCount , subscription );
619- }
620-
621- @ Override
622- Tuple <String , Integer > parse (String ... args ) throws Exception {
623- String message ;
624- if (args .length == 2 ) {
625- String subscription = args [0 ];
626- int maxMessages = Integer .parseInt (args [1 ]);
627- return Tuple .of (subscription , maxMessages );
628- } else if (args .length > 2 ) {
629- message = "Too many arguments." ;
630- } else {
631- message = "Missing required subscription name" ;
632- }
633- throw new IllegalArgumentException (message );
634- }
635-
636- @ Override
637- public String params () {
638- return "<subscription> <maxMessages>" ;
639- }
640- }
641-
642454 private abstract static class GetPolicyAction extends PubSubAction <String > {
643455 @ Override
644456 String parse (String ... args ) throws Exception {
@@ -817,8 +629,6 @@ public void run(PubSub pubsub, Tuple<String, List<String>> param) throws Excepti
817629 LIST_ACTIONS .put ("subscriptions" , new ListSubscriptionsAction ());
818630 DELETE_ACTIONS .put ("topic" , new DeleteTopicAction ());
819631 DELETE_ACTIONS .put ("subscription" , new DeleteSubscriptionAction ());
820- PULL_ACTIONS .put ("async" , new PullAsyncAction ());
821- PULL_ACTIONS .put ("sync" , new PullSyncAction ());
822632 GET_IAM_ACTIONS .put ("topic" , new GetTopicPolicyAction ());
823633 GET_IAM_ACTIONS .put ("subscription" , new GetSubscriptionPolicyAction ());
824634 REPLACE_IAM_ACTIONS .put ("topic" , new AddIdentityTopicAction ());
@@ -835,9 +645,6 @@ public void run(PubSub pubsub, Tuple<String, List<String>> param) throws Excepti
835645 ACTIONS .put ("test-permissions" , new ParentAction (TEST_IAM_ACTIONS ));
836646 ACTIONS .put ("publish" , new PublishMessagesAction ());
837647 ACTIONS .put ("replace-push-config" , new ReplacePushConfigAction ());
838- ACTIONS .put ("ack" , new AckMessagesAction ());
839- ACTIONS .put ("nack" , new NackMessagesAction ());
840- ACTIONS .put ("modify-ack-deadline" , new ModifyAckDeadlineAction ());
841648 }
842649
843650 private static void printUsage () {
0 commit comments