Skip to content

Commit 0b1e34f

Browse files
committed
Fix ingestion-sink cross-project inserts
Update google cloud library to 1.61.0 to match beam 2.13.0 and pick up googleapis/google-cloud-java#4196 Emit stack traces in ingestion-sink
1 parent 7cbd98d commit 0b1e34f

3 files changed

Lines changed: 8 additions & 2 deletions

File tree

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/BigQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private synchronized Optional<CompletableFuture<TableRow>> add(TableRow row) {
137137
int index = newSize - 1;
138138
return Optional.of(result.thenApplyAsync(r -> {
139139
List<BigQueryError> errors = r.getErrorsFor(index);
140-
if (!errors.isEmpty()) {
140+
if (errors != null && !errors.isEmpty()) {
141141
throw new WriteErrors(errors);
142142
}
143143
return row;

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/Pubsub.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44

55
package com.mozilla.telemetry.ingestion.io;
66

7+
import static java.util.logging.Level.WARNING;
8+
79
import com.google.cloud.pubsub.v1.Subscriber;
810
import com.google.common.annotations.VisibleForTesting;
911
import com.google.pubsub.v1.ProjectSubscriptionName;
1012
import com.google.pubsub.v1.PubsubMessage;
1113
import java.util.concurrent.CompletableFuture;
1214
import java.util.function.Function;
15+
import java.util.logging.Logger;
1316

1417
public class Pubsub {
1518

@@ -18,6 +21,8 @@ private Pubsub() {
1821

1922
public static class Read {
2023

24+
static Logger logger = Logger.getLogger("Pubsub.Read");
25+
2126
@VisibleForTesting
2227
Subscriber subscriber;
2328

@@ -30,6 +35,7 @@ public Read(String subscriptionName, Function<PubsubMessage, CompletableFuture<?
3035
if (exception == null) {
3136
consumer.ack();
3237
} else {
38+
logger.log(WARNING, "Exception while attempting to deliver message:", exception);
3339
consumer.nack();
3440
}
3541
})))

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
<!-- Keep these dependency versions in sync with those pulled in by beam;
2929
check https://mvnrepository.com/artifact/org.apache.beam -->
30-
<google-cloud.version>1.49.0</google-cloud.version>
30+
<google-cloud.version>1.61.0</google-cloud.version>
3131
<jackson.version>2.9.9</jackson.version>
3232
</properties>
3333

0 commit comments

Comments
 (0)