Skip to content

Commit 69e47db

Browse files
committed
Fix ingestion-sink cross-project inserts
Update google cloud library to 1.61.0 to match beam 2.13.0 and pick up googleapis/google-cloud-java#4196 Emit stack traces in ingestion-sink
1 parent 7cbd98d commit 69e47db

7 files changed

Lines changed: 57 additions & 15 deletions

File tree

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/Sink.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package com.mozilla.telemetry.ingestion;
66

7+
import com.google.api.gax.batching.FlowControlSettings;
78
import com.google.cloud.bigquery.BigQueryOptions;
89
import com.mozilla.telemetry.ingestion.io.BigQuery;
910
import com.mozilla.telemetry.ingestion.io.Pubsub;
@@ -23,13 +24,25 @@ public static void main(String[] args) {
2324
PubsubMessageToTableRow.TableRowFormat.valueOf(Env.getString("OUTPUT_FORMAT", "raw")));
2425

2526
// output messages to BigQuery
26-
BigQuery.Write output = new BigQuery.Write(BigQueryOptions.getDefaultInstance().getService());
27+
BigQuery.Write output = new BigQuery.Write(BigQueryOptions.getDefaultInstance().getService(),
28+
// PubsubMessageToTableRow reports protobuf size, which can be ~1/3rd more efficient than
29+
// the JSON that actually gets sent over HTTP, so we use 60% of the API limit by default.
30+
Env.getInt("BATCH_MAX_BYTES", 6_000_000), // HTTP request size limit: 10 MB
31+
Env.getInt("BATCH_MAX_MESSAGES", 10_000), // Maximum rows per request: 10,000
32+
Env.getLong("BATCH_MAX_DELAY_MILLIS", 1000L)); // Default 1 second
2733

2834
// read pubsub messages from INPUT_SUBSCRIPTION
2935
new Pubsub.Read(Env.getString("INPUT_SUBSCRIPTION"),
3036
message -> CompletableFuture.supplyAsync(() -> message) // start new future with message
3137
.thenApplyAsync(routeMessage::apply) // determine output path of message
32-
.thenComposeAsync(output)) // output message
33-
.run(); // run pubsub consumer
38+
.thenComposeAsync(output), // output message
39+
builder -> builder.setFlowControlSettings(FlowControlSettings.newBuilder()
40+
.setMaxOutstandingElementCount(
41+
// Upstream default is 10K, but we get better performance from setting this higher
42+
Env.getLong("FLOW_CONTROL_MAX_OUTSTANDING_ELEMENT_COUNT", 1_000_000L))
43+
.setMaxOutstandingRequestBytes(
44+
// Upstream default of 1GB
45+
Env.getLong("FLOW_CONTROL_MAX_OUTSTANDING_REQUEST_BYTES", 1_000_000_000L))
46+
.build())).run(); // run pubsub consumer
3447
}
3548
}

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/BigQuery.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.ConcurrentMap;
1818
import java.util.concurrent.atomic.AtomicReference;
1919
import java.util.function.Function;
20+
import java.util.logging.Logger;
2021

2122
public class BigQuery {
2223

@@ -35,16 +36,28 @@ private WriteErrors(List<BigQueryError> errors) {
3536

3637
public static class Write implements Function<Write.TableRow, CompletableFuture<Write.TableRow>> {
3738

39+
private static final Logger logger = Logger.getLogger("BigQuery.Write");
40+
3841
private final com.google.cloud.bigquery.BigQuery bigquery;
39-
private final int maxBytes = 10_000_000; // HTTP request size limit: 10 MB
40-
private final int maxMessages = 10_000; // Maximum rows per request: 10,000
41-
private final long maxDelayMillis = 1000; // Default to 1 second
42+
private final int maxBytes;
43+
private final int maxMessages;
44+
private final long maxDelayMillis;
4245

4346
@VisibleForTesting
4447
final ConcurrentMap<TableId, Batch> batches = new ConcurrentHashMap<>();
4548

4649
public Write(com.google.cloud.bigquery.BigQuery bigquery) {
50+
this(bigquery, 10_000_000, // HTTP request size limit: 10 MB
51+
10_000, // Maximum rows per request: 10,000
52+
100); // Default 0.1 second
53+
}
54+
55+
public Write(com.google.cloud.bigquery.BigQuery bigquery, int maxBytes, int maxMessages,
56+
long maxDelayMillis) {
4757
this.bigquery = bigquery;
58+
this.maxBytes = maxBytes;
59+
this.maxMessages = maxMessages;
60+
this.maxDelayMillis = maxDelayMillis;
4861
}
4962

5063
@Override
@@ -137,7 +150,8 @@ private synchronized Optional<CompletableFuture<TableRow>> add(TableRow row) {
137150
int index = newSize - 1;
138151
return Optional.of(result.thenApplyAsync(r -> {
139152
List<BigQueryError> errors = r.getErrorsFor(index);
140-
if (!errors.isEmpty()) {
153+
if (errors != null && !errors.isEmpty()) {
154+
logger.warning(errors.toString());
141155
throw new WriteErrors(errors);
142156
}
143157
return row;

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/Pubsub.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44

55
package com.mozilla.telemetry.ingestion.io;
66

7+
import static java.util.logging.Level.WARNING;
8+
79
import com.google.cloud.pubsub.v1.Subscriber;
810
import com.google.common.annotations.VisibleForTesting;
911
import com.google.pubsub.v1.ProjectSubscriptionName;
1012
import com.google.pubsub.v1.PubsubMessage;
1113
import java.util.concurrent.CompletableFuture;
1214
import java.util.function.Function;
15+
import java.util.logging.Logger;
1316

1417
public class Pubsub {
1518

@@ -18,6 +21,8 @@ private Pubsub() {
1821

1922
public static class Read {
2023

24+
static Logger logger = Logger.getLogger("Pubsub.Read");
25+
2126
@VisibleForTesting
2227
Subscriber subscriber;
2328

@@ -30,6 +35,7 @@ public Read(String subscriptionName, Function<PubsubMessage, CompletableFuture<?
3035
if (exception == null) {
3136
consumer.ack();
3237
} else {
38+
logger.log(WARNING, "Exception while attempting to deliver message:", exception);
3339
consumer.nack();
3440
}
3541
})))

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/transform/PubsubMessageToTableRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.cloud.bigquery.TableId;
88
import com.google.pubsub.v1.PubsubMessage;
99
import com.mozilla.telemetry.ingestion.io.BigQuery.Write.TableRow;
10+
import java.util.Base64;
1011
import java.util.HashMap;
1112
import java.util.Map;
1213
import java.util.Optional;
@@ -18,6 +19,8 @@ public enum TableRowFormat {
1819
raw, decoded, payload
1920
}
2021

22+
private static final Base64.Encoder base64Encoder = Base64.getEncoder();
23+
2124
private final String tableSpecTemplate;
2225
private final TableRowFormat tableRowFormat;
2326

@@ -69,7 +72,8 @@ public TableRow apply(PubsubMessage message) {
6972
*/
7073
private Map<String, Object> rawContents(PubsubMessage message) {
7174
Map<String, Object> contents = new HashMap<>(message.getAttributesMap());
72-
contents.put("payload", message.getData().toByteArray());
75+
// bytes must be inserted as base64 encoded strings
76+
contents.put("payload", base64Encoder.encodeToString(message.getData().toByteArray()));
7377
return contents;
7478
}
7579
}

ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/util/Env.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,12 @@ public static String getString(String key) {
2222
public static String getString(String key, String defaultValue) {
2323
return Optional.ofNullable(System.getenv(key)).orElse(defaultValue);
2424
}
25+
26+
public static Integer getInt(String key, Integer defaultValue) {
27+
return Optional.ofNullable(System.getenv(key)).map(Integer::new).orElse(defaultValue);
28+
}
29+
30+
public static Long getLong(String key, Long defaultValue) {
31+
return Optional.ofNullable(System.getenv(key)).map(Long::new).orElse(defaultValue);
32+
}
2533
}

ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/transform/PubsubMessageToTableRowTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ public void canTransformSingleMessage() {
2828
assertEquals("telemetry_raw", actual.tableId.getDataset());
2929
assertEquals("main_v4", actual.tableId.getTable());
3030
assertEquals(104, actual.byteSize);
31-
assertEquals("test", new String((byte[]) actual.content.remove("payload")));
3231
assertEquals(ImmutableMap.of("document_id", "id", "document_namespace", "telemetry",
33-
"document_type", "main", "document_version", "4"), actual.content);
32+
"document_type", "main", "document_version", "4", "payload", "dGVzdA=="), actual.content);
3433
}
3534

3635
@Test
@@ -42,10 +41,8 @@ public void canHandleEmptyValues() {
4241
assertEquals("_raw", actual.tableId.getDataset());
4342
assertEquals("_v", actual.tableId.getTable());
4443
assertEquals(65, actual.byteSize);
45-
assertEquals("", new String((byte[]) actual.content.remove("payload")));
46-
assertEquals(
47-
ImmutableMap.of("document_namespace", "", "document_type", "", "document_version", ""),
48-
actual.content);
44+
assertEquals(ImmutableMap.of("document_namespace", "", "document_type", "", "document_version",
45+
"", "payload", ""), actual.content);
4946
}
5047

5148
@Test(expected = IllegalArgumentException.class)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
<!-- Keep these dependency versions in sync with those pulled in by beam;
2929
check https://mvnrepository.com/artifact/org.apache.beam -->
30-
<google-cloud.version>1.49.0</google-cloud.version>
30+
<google-cloud.version>1.61.0</google-cloud.version>
3131
<jackson.version>2.9.9</jackson.version>
3232
</properties>
3333

0 commit comments

Comments
 (0)