Skip to content

Commit e1474e5

Browse files
committed
Implement modifyAckDeadline methods, add javadoc and tests
1 parent fe4c90c commit e1474e5

File tree

6 files changed

+177
-11
lines changed

6 files changed

+177
-11
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,14 +403,78 @@ interface MessageConsumer extends AutoCloseable {
403403

404404
Future<Void> nackAsync(String subscription, Iterable<String> ackIds);
405405

406+
/**
407+
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
408+
* the new deadline with respect to the time the modify request was received by the Pub/Sub
409+
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
410+
* the new ack deadline will expire 10 seconds after the modify request was received by the
411+
* service. Specifying 0 may be used to make the message available for another pull request
412+
* (corresponds to calling {@link #nack(String, String, String...)}).
413+
*
414+
* @param subscription the subscription whose messages need to update their acknowledge deadline
415+
* @param deadline the new deadline, relative to the time the modify request is received by the
416+
* Pub/Sub service
417+
* @param unit time unit for the {@code deadline} parameter
418+
* @param ackId the ack id of the first message for which the acknowledge deadline must be
419+
* modified
420+
* @param ackIds other ack ids of messages for which the acknowledge deadline must be modified
421+
* @throws PubSubException upon failure, or if the subscription was not found
422+
*/
406423
void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId,
407424
String... ackIds);
408425

426+
/**
427+
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
428+
* must be >= 0 and is the new deadline with respect to the time the modify request was received
429+
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
430+
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
431+
* was received by the service. Specifying 0 may be used to make the message available for another
432+
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
433+
* a {@code Future} object that can be used to wait for the modify operation to be completed.
434+
*
435+
* @param subscription the subscription whose messages need to update their acknowledge deadline
436+
* @param deadline the new deadline, relative to the time the modify request is received by the
437+
* Pub/Sub service
438+
* @param unit time unit for the {@code deadline} parameter
439+
* @param ackId the ack id of the first message for which the acknowledge deadline must be
440+
* modified
441+
* @param ackIds other ack ids of messages for which the acknowledge deadline must be modified
442+
*/
409443
Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
410444
String ackId, String... ackIds);
411445

446+
/**
447+
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
448+
* the new deadline with respect to the time the modify request was received by the Pub/Sub
449+
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
450+
* the new ack deadline will expire 10 seconds after the modify request was received by the
451+
* service. Specifying 0 may be used to make the message available for another pull request
452+
* (corresponds to calling {@link #nack(String, Iterable)}).
453+
*
454+
* @param subscription the subscription whose messages need to update their acknowledge deadline
455+
* @param deadline the new deadline, relative to the time the modify request is received by the
456+
* Pub/Sub service
457+
* @param unit time unit for the {@code deadline} parameter
458+
* @param ackIds the ack ids of messages for which the acknowledge deadline must be modified
459+
* @throws PubSubException upon failure, or if the subscription was not found
460+
*/
412461
void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, Iterable<String> ackIds);
413462

463+
/**
464+
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
465+
* must be >= 0 and is the new deadline with respect to the time the modify request was received
466+
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
467+
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
468+
* was received by the service. Specifying 0 may be used to make the message available for another
469+
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
470+
* a {@code Future} object that can be used to wait for the modify operation to be completed.
471+
*
472+
* @param subscription the subscription whose messages need to update their acknowledge deadline
473+
* @param deadline the new deadline, relative to the time the modify request is received by the
474+
* Pub/Sub service
475+
* @param unit time unit for the {@code deadline} parameter
476+
* @param ackIds the ack ids of messages for which the acknowledge deadline must be modified
477+
*/
414478
Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
415479
Iterable<String> ackIds);
416480

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
4848
import com.google.pubsub.v1.ListTopicsRequest;
4949
import com.google.pubsub.v1.ListTopicsResponse;
50+
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
5051
import com.google.pubsub.v1.ModifyPushConfigRequest;
5152
import com.google.pubsub.v1.PublishRequest;
5253
import com.google.pubsub.v1.PublishResponse;
@@ -504,25 +505,30 @@ public Future<Void> nackAsync(String subscription, Iterable<String> ackIds) {
504505
@Override
505506
public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId,
506507
String... ackIds) {
507-
508+
get(modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds)));
508509
}
509510

510511
@Override
511512
public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
512513
String ackId, String... ackIds) {
513-
return null;
514+
return modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds));
514515
}
515516

516517
@Override
517518
public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit,
518519
Iterable<String> ackIds) {
519-
520+
get(modifyAckDeadlineAsync(subscription, deadline, unit, ackIds));
520521
}
521522

522523
@Override
523524
public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
524525
Iterable<String> ackIds) {
525-
return null;
526+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
527+
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
528+
.setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit))
529+
.addAllAckIds(ackIds)
530+
.build();
531+
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
526532
}
527533

528534
static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import com.google.pubsub.v1.Subscription;
5555
import com.google.pubsub.v1.Topic;
5656

57-
import org.joda.time.Duration;
58-
5957
import io.grpc.ManagedChannel;
6058
import io.grpc.Status.Code;
6159
import io.grpc.netty.NegotiationType;
6260
import io.grpc.netty.NettyChannelBuilder;
6361

62+
import org.joda.time.Duration;
63+
6464
import java.io.IOException;
6565
import java.util.Set;
6666
import java.util.concurrent.Future;

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubsubHelper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import com.google.cloud.RetryParams;
2424
import com.google.cloud.pubsub.PubSubOptions;
2525

26+
import io.grpc.ManagedChannel;
27+
import io.grpc.netty.NegotiationType;
28+
import io.grpc.netty.NettyChannelBuilder;
29+
2630
import java.io.IOException;
2731
import java.net.MalformedURLException;
2832
import java.net.URL;
@@ -31,10 +35,6 @@
3135
import java.util.List;
3236
import java.util.UUID;
3337

34-
import io.grpc.ManagedChannel;
35-
import io.grpc.netty.NegotiationType;
36-
import io.grpc.netty.NettyChannelBuilder;
37-
3838
/**
3939
* A class that runs a Pubsub emulator instance for use in tests.
4040
*/

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static void startServer() throws IOException, InterruptedException {
4747

4848
@AfterClass
4949
public static void stopServer() throws Exception {
50-
pubsub.options().rpc().close();
50+
pubsub.close();
5151
pubsubHelper.reset();
5252
pubsubHelper.stop();
5353
}

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
4848
import com.google.pubsub.v1.ListTopicsRequest;
4949
import com.google.pubsub.v1.ListTopicsResponse;
50+
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
5051
import com.google.pubsub.v1.ModifyPushConfigRequest;
5152
import com.google.pubsub.v1.PublishRequest;
5253
import com.google.pubsub.v1.PublishResponse;
@@ -61,6 +62,7 @@
6162
import java.util.List;
6263
import java.util.concurrent.ExecutionException;
6364
import java.util.concurrent.Future;
65+
import java.util.concurrent.TimeUnit;
6466

6567
public class PubSubImplTest {
6668

@@ -1187,6 +1189,100 @@ public void testListTopicSubscriptionsAsyncWithOptions()
11871189
Iterables.toArray(page.values(), SubscriptionId.class));
11881190
}
11891191

1192+
@Test
1193+
public void testModifyAckDeadlineOneMessage() {
1194+
pubsub = options.service();
1195+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1196+
.setAckDeadlineSeconds(10)
1197+
.setSubscription(SUBSCRIPTION_NAME_PB)
1198+
.addAckIds("ackId")
1199+
.build();
1200+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1201+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1202+
EasyMock.replay(pubsubRpcMock);
1203+
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId");
1204+
}
1205+
1206+
@Test
1207+
public void testModifyAckDeadlineOneMessageAsync()
1208+
throws ExecutionException, InterruptedException {
1209+
pubsub = options.service();
1210+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1211+
.setAckDeadlineSeconds(10)
1212+
.setSubscription(SUBSCRIPTION_NAME_PB)
1213+
.addAckIds("ackId")
1214+
.build();
1215+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1216+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1217+
EasyMock.replay(pubsubRpcMock);
1218+
Future<Void> future =
1219+
pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId");
1220+
assertNull(future.get());
1221+
}
1222+
1223+
@Test
1224+
public void testModifyAckDeadlineMoreMessages() {
1225+
pubsub = options.service();
1226+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1227+
.setAckDeadlineSeconds(10)
1228+
.setSubscription(SUBSCRIPTION_NAME_PB)
1229+
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
1230+
.build();
1231+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1232+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1233+
EasyMock.replay(pubsubRpcMock);
1234+
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2");
1235+
}
1236+
1237+
@Test
1238+
public void testModifyAckDeadlineMoreMessagesAsync()
1239+
throws ExecutionException, InterruptedException {
1240+
pubsub = options.service();
1241+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1242+
.setAckDeadlineSeconds(10)
1243+
.setSubscription(SUBSCRIPTION_NAME_PB)
1244+
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
1245+
.build();
1246+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1247+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1248+
EasyMock.replay(pubsubRpcMock);
1249+
Future<Void> future =
1250+
pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2");
1251+
assertNull(future.get());
1252+
}
1253+
1254+
@Test
1255+
public void testModifyAckDeadlineMessageList() {
1256+
pubsub = options.service();
1257+
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
1258+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1259+
.setAckDeadlineSeconds(10)
1260+
.setSubscription(SUBSCRIPTION_NAME_PB)
1261+
.addAllAckIds(ackIds)
1262+
.build();
1263+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1264+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1265+
EasyMock.replay(pubsubRpcMock);
1266+
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds);
1267+
}
1268+
1269+
@Test
1270+
public void testModifyAckDeadlineMessageListAsync()
1271+
throws ExecutionException, InterruptedException {
1272+
pubsub = options.service();
1273+
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
1274+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1275+
.setAckDeadlineSeconds(10)
1276+
.setSubscription(SUBSCRIPTION_NAME_PB)
1277+
.addAllAckIds(ackIds)
1278+
.build();
1279+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1280+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1281+
EasyMock.replay(pubsubRpcMock);
1282+
Future<Void> future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds);
1283+
assertNull(future.get());
1284+
}
1285+
11901286
@Test
11911287
public void testClose() throws Exception {
11921288
pubsub = options.service();

0 commit comments

Comments
 (0)