getNewPartitionsList() {
+ return newPartitions_;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ @java.lang.Override
+ public java.util.List extends com.google.bigtable.v2.StreamPartitionOrBuilder>
+ getNewPartitionsOrBuilderList() {
+ return newPartitions_;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ @java.lang.Override
+ public int getNewPartitionsCount() {
+ return newPartitions_.size();
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ @java.lang.Override
+ public com.google.bigtable.v2.StreamPartition getNewPartitions(int index) {
+ return newPartitions_.get(index);
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ @java.lang.Override
+ public com.google.bigtable.v2.StreamPartitionOrBuilder getNewPartitionsOrBuilder(int index) {
+ return newPartitions_.get(index);
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
@@ -6351,6 +6510,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io
for (int i = 0; i < continuationTokens_.size(); i++) {
output.writeMessage(2, continuationTokens_.get(i));
}
+ for (int i = 0; i < newPartitions_.size(); i++) {
+ output.writeMessage(3, newPartitions_.get(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -6367,6 +6529,9 @@ public int getSerializedSize() {
size +=
com.google.protobuf.CodedOutputStream.computeMessageSize(2, continuationTokens_.get(i));
}
+ for (int i = 0; i < newPartitions_.size(); i++) {
+ size += com.google.protobuf.CodedOutputStream.computeMessageSize(3, newPartitions_.get(i));
+ }
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
return size;
@@ -6388,6 +6553,7 @@ public boolean equals(final java.lang.Object obj) {
if (!getStatus().equals(other.getStatus())) return false;
}
if (!getContinuationTokensList().equals(other.getContinuationTokensList())) return false;
+ if (!getNewPartitionsList().equals(other.getNewPartitionsList())) return false;
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -6407,6 +6573,10 @@ public int hashCode() {
hash = (37 * hash) + CONTINUATION_TOKENS_FIELD_NUMBER;
hash = (53 * hash) + getContinuationTokensList().hashCode();
}
+ if (getNewPartitionsCount() > 0) {
+ hash = (37 * hash) + NEW_PARTITIONS_FIELD_NUMBER;
+ hash = (53 * hash) + getNewPartitionsList().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -6514,10 +6684,25 @@ protected Builder newBuilderForType(
*
*
* A message indicating that the client should stop reading from the stream.
- * If status is OK and `continuation_tokens` is empty, the stream has finished
- * (for example if there was an `end_time` specified).
- * If `continuation_tokens` is present, then a change in partitioning requires
- * the client to open a new stream for each token to resume reading.
+ * If status is OK and `continuation_tokens` & `new_partitions` are empty, the
+ * stream has finished (for example if there was an `end_time` specified).
+ * If `continuation_tokens` & `new_partitions` are present, then a change in
+ * partitioning requires the client to open a new stream for each token to
+ * resume reading. Example:
+ * [B, D) ends
+ * |
+ * v
+ * new_partitions: [A, C) [C, E)
+ * continuation_tokens.partitions: [B,C) [C,D)
+ * ^---^ ^---^
+ * ^ ^
+ * | |
+ * | StreamContinuationToken 2
+ * |
+ * StreamContinuationToken 1
+ * To read the new partition [A,C), supply the continuation tokens whose
+ * ranges cover the new partition, for example ContinuationToken[A,B) &
+ * ContinuationToken[B,C).
*
*
* Protobuf type {@code google.bigtable.v2.ReadChangeStreamResponse.CloseStream}
@@ -6565,6 +6750,13 @@ public Builder clear() {
continuationTokensBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
+ if (newPartitionsBuilder_ == null) {
+ newPartitions_ = java.util.Collections.emptyList();
+ } else {
+ newPartitions_ = null;
+ newPartitionsBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -6612,6 +6804,15 @@ private void buildPartialRepeatedFields(
} else {
result.continuationTokens_ = continuationTokensBuilder_.build();
}
+ if (newPartitionsBuilder_ == null) {
+ if (((bitField0_ & 0x00000004) != 0)) {
+ newPartitions_ = java.util.Collections.unmodifiableList(newPartitions_);
+ bitField0_ = (bitField0_ & ~0x00000004);
+ }
+ result.newPartitions_ = newPartitions_;
+ } else {
+ result.newPartitions_ = newPartitionsBuilder_.build();
+ }
}
private void buildPartial0(
@@ -6701,6 +6902,33 @@ public Builder mergeFrom(com.google.bigtable.v2.ReadChangeStreamResponse.CloseSt
}
}
}
+ if (newPartitionsBuilder_ == null) {
+ if (!other.newPartitions_.isEmpty()) {
+ if (newPartitions_.isEmpty()) {
+ newPartitions_ = other.newPartitions_;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ } else {
+ ensureNewPartitionsIsMutable();
+ newPartitions_.addAll(other.newPartitions_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.newPartitions_.isEmpty()) {
+ if (newPartitionsBuilder_.isEmpty()) {
+ newPartitionsBuilder_.dispose();
+ newPartitionsBuilder_ = null;
+ newPartitions_ = other.newPartitions_;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ newPartitionsBuilder_ =
+ com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders
+ ? getNewPartitionsFieldBuilder()
+ : null;
+ } else {
+ newPartitionsBuilder_.addAllMessages(other.newPartitions_);
+ }
+ }
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -6747,6 +6975,19 @@ public Builder mergeFrom(
}
break;
} // case 18
+ case 26:
+ {
+ com.google.bigtable.v2.StreamPartition m =
+ input.readMessage(
+ com.google.bigtable.v2.StreamPartition.parser(), extensionRegistry);
+ if (newPartitionsBuilder_ == null) {
+ ensureNewPartitionsIsMutable();
+ newPartitions_.add(m);
+ } else {
+ newPartitionsBuilder_.addMessage(m);
+ }
+ break;
+ } // case 26
default:
{
if (!super.parseUnknownField(input, extensionRegistry, tag)) {
@@ -6962,8 +7203,8 @@ private void ensureContinuationTokensIsMutable() {
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -6980,8 +7221,8 @@ private void ensureContinuationTokensIsMutable() {
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -6997,8 +7238,8 @@ public int getContinuationTokensCount() {
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7014,8 +7255,8 @@ public com.google.bigtable.v2.StreamContinuationToken getContinuationTokens(int
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7038,8 +7279,8 @@ public Builder setContinuationTokens(
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7059,8 +7300,8 @@ public Builder setContinuationTokens(
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7082,8 +7323,8 @@ public Builder addContinuationTokens(com.google.bigtable.v2.StreamContinuationTo
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7106,8 +7347,8 @@ public Builder addContinuationTokens(
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7127,8 +7368,8 @@ public Builder addContinuationTokens(
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7148,8 +7389,8 @@ public Builder addContinuationTokens(
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7169,8 +7410,8 @@ public Builder addAllContinuationTokens(
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7189,8 +7430,8 @@ public Builder clearContinuationTokens() {
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7209,8 +7450,8 @@ public Builder removeContinuationTokens(int index) {
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7223,8 +7464,8 @@ public com.google.bigtable.v2.StreamContinuationToken.Builder getContinuationTok
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7241,8 +7482,8 @@ public com.google.bigtable.v2.StreamContinuationTokenOrBuilder getContinuationTo
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7259,8 +7500,8 @@ public com.google.bigtable.v2.StreamContinuationTokenOrBuilder getContinuationTo
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7273,8 +7514,8 @@ public com.google.bigtable.v2.StreamContinuationToken.Builder addContinuationTok
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7288,8 +7529,8 @@ public com.google.bigtable.v2.StreamContinuationToken.Builder addContinuationTok
*
*
*
- * If non-empty, contains the information needed to start reading the new
- * partition(s) that contain segments of this partition's row range.
+ * If non-empty, contains the information needed to resume reading their
+ * associated partitions.
*
*
* repeated .google.bigtable.v2.StreamContinuationToken continuation_tokens = 2;
@@ -7319,6 +7560,396 @@ public com.google.bigtable.v2.StreamContinuationToken.Builder addContinuationTok
return continuationTokensBuilder_;
}
+ private java.util.List newPartitions_ =
+ java.util.Collections.emptyList();
+
+ private void ensureNewPartitionsIsMutable() {
+ if (!((bitField0_ & 0x00000004) != 0)) {
+ newPartitions_ =
+ new java.util.ArrayList(newPartitions_);
+ bitField0_ |= 0x00000004;
+ }
+ }
+
+ private com.google.protobuf.RepeatedFieldBuilderV3<
+ com.google.bigtable.v2.StreamPartition,
+ com.google.bigtable.v2.StreamPartition.Builder,
+ com.google.bigtable.v2.StreamPartitionOrBuilder>
+ newPartitionsBuilder_;
+
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public java.util.List getNewPartitionsList() {
+ if (newPartitionsBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(newPartitions_);
+ } else {
+ return newPartitionsBuilder_.getMessageList();
+ }
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public int getNewPartitionsCount() {
+ if (newPartitionsBuilder_ == null) {
+ return newPartitions_.size();
+ } else {
+ return newPartitionsBuilder_.getCount();
+ }
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public com.google.bigtable.v2.StreamPartition getNewPartitions(int index) {
+ if (newPartitionsBuilder_ == null) {
+ return newPartitions_.get(index);
+ } else {
+ return newPartitionsBuilder_.getMessage(index);
+ }
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder setNewPartitions(int index, com.google.bigtable.v2.StreamPartition value) {
+ if (newPartitionsBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureNewPartitionsIsMutable();
+ newPartitions_.set(index, value);
+ onChanged();
+ } else {
+ newPartitionsBuilder_.setMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder setNewPartitions(
+ int index, com.google.bigtable.v2.StreamPartition.Builder builderForValue) {
+ if (newPartitionsBuilder_ == null) {
+ ensureNewPartitionsIsMutable();
+ newPartitions_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ newPartitionsBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder addNewPartitions(com.google.bigtable.v2.StreamPartition value) {
+ if (newPartitionsBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureNewPartitionsIsMutable();
+ newPartitions_.add(value);
+ onChanged();
+ } else {
+ newPartitionsBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder addNewPartitions(int index, com.google.bigtable.v2.StreamPartition value) {
+ if (newPartitionsBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureNewPartitionsIsMutable();
+ newPartitions_.add(index, value);
+ onChanged();
+ } else {
+ newPartitionsBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder addNewPartitions(
+ com.google.bigtable.v2.StreamPartition.Builder builderForValue) {
+ if (newPartitionsBuilder_ == null) {
+ ensureNewPartitionsIsMutable();
+ newPartitions_.add(builderForValue.build());
+ onChanged();
+ } else {
+ newPartitionsBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder addNewPartitions(
+ int index, com.google.bigtable.v2.StreamPartition.Builder builderForValue) {
+ if (newPartitionsBuilder_ == null) {
+ ensureNewPartitionsIsMutable();
+ newPartitions_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ newPartitionsBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder addAllNewPartitions(
+ java.lang.Iterable extends com.google.bigtable.v2.StreamPartition> values) {
+ if (newPartitionsBuilder_ == null) {
+ ensureNewPartitionsIsMutable();
+ com.google.protobuf.AbstractMessageLite.Builder.addAll(values, newPartitions_);
+ onChanged();
+ } else {
+ newPartitionsBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder clearNewPartitions() {
+ if (newPartitionsBuilder_ == null) {
+ newPartitions_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000004);
+ onChanged();
+ } else {
+ newPartitionsBuilder_.clear();
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public Builder removeNewPartitions(int index) {
+ if (newPartitionsBuilder_ == null) {
+ ensureNewPartitionsIsMutable();
+ newPartitions_.remove(index);
+ onChanged();
+ } else {
+ newPartitionsBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public com.google.bigtable.v2.StreamPartition.Builder getNewPartitionsBuilder(int index) {
+ return getNewPartitionsFieldBuilder().getBuilder(index);
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public com.google.bigtable.v2.StreamPartitionOrBuilder getNewPartitionsOrBuilder(int index) {
+ if (newPartitionsBuilder_ == null) {
+ return newPartitions_.get(index);
+ } else {
+ return newPartitionsBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public java.util.List extends com.google.bigtable.v2.StreamPartitionOrBuilder>
+ getNewPartitionsOrBuilderList() {
+ if (newPartitionsBuilder_ != null) {
+ return newPartitionsBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(newPartitions_);
+ }
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public com.google.bigtable.v2.StreamPartition.Builder addNewPartitionsBuilder() {
+ return getNewPartitionsFieldBuilder()
+ .addBuilder(com.google.bigtable.v2.StreamPartition.getDefaultInstance());
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public com.google.bigtable.v2.StreamPartition.Builder addNewPartitionsBuilder(int index) {
+ return getNewPartitionsFieldBuilder()
+ .addBuilder(index, com.google.bigtable.v2.StreamPartition.getDefaultInstance());
+ }
+ /**
+ *
+ *
+ *
+ * If non-empty, contains the new partitions to start reading from, which
+ * are related to but not necessarily identical to the partitions for the
+ * above `continuation_tokens`.
+ *
+ *
+ * repeated .google.bigtable.v2.StreamPartition new_partitions = 3;
+ */
+ public java.util.List
+ getNewPartitionsBuilderList() {
+ return getNewPartitionsFieldBuilder().getBuilderList();
+ }
+
+ private com.google.protobuf.RepeatedFieldBuilderV3<
+ com.google.bigtable.v2.StreamPartition,
+ com.google.bigtable.v2.StreamPartition.Builder,
+ com.google.bigtable.v2.StreamPartitionOrBuilder>
+ getNewPartitionsFieldBuilder() {
+ if (newPartitionsBuilder_ == null) {
+ newPartitionsBuilder_ =
+ new com.google.protobuf.RepeatedFieldBuilderV3<
+ com.google.bigtable.v2.StreamPartition,
+ com.google.bigtable.v2.StreamPartition.Builder,
+ com.google.bigtable.v2.StreamPartitionOrBuilder>(
+ newPartitions_,
+ ((bitField0_ & 0x00000004) != 0),
+ getParentForChildren(),
+ isClean());
+ newPartitions_ = null;
+ }
+ return newPartitionsBuilder_;
+ }
+
@java.lang.Override
public final Builder setUnknownFields(
final com.google.protobuf.UnknownFieldSet unknownFields) {
diff --git a/proto-google-cloud-bigtable-v2/src/main/proto/google/bigtable/v2/bigtable.proto b/proto-google-cloud-bigtable-v2/src/main/proto/google/bigtable/v2/bigtable.proto
index c85e0cfc8c..098d17e3e7 100644
--- a/proto-google-cloud-bigtable-v2/src/main/proto/google/bigtable/v2/bigtable.proto
+++ b/proto-google-cloud-bigtable-v2/src/main/proto/google/bigtable/v2/bigtable.proto
@@ -788,17 +788,37 @@ message ReadChangeStreamResponse {
}
// A message indicating that the client should stop reading from the stream.
- // If status is OK and `continuation_tokens` is empty, the stream has finished
- // (for example if there was an `end_time` specified).
- // If `continuation_tokens` is present, then a change in partitioning requires
- // the client to open a new stream for each token to resume reading.
+ // If status is OK and `continuation_tokens` & `new_partitions` are empty, the
+ // stream has finished (for example if there was an `end_time` specified).
+ // If `continuation_tokens` & `new_partitions` are present, then a change in
+ // partitioning requires the client to open a new stream for each token to
+ // resume reading. Example:
+ // [B, D) ends
+ // |
+ // v
+ // new_partitions: [A, C) [C, E)
+ // continuation_tokens.partitions: [B,C) [C,D)
+ // ^---^ ^---^
+ // ^ ^
+ // | |
+ // | StreamContinuationToken 2
+ // |
+ // StreamContinuationToken 1
+ // To read the new partition [A,C), supply the continuation tokens whose
+ // ranges cover the new partition, for example ContinuationToken[A,B) &
+ // ContinuationToken[B,C).
message CloseStream {
// The status of the stream.
google.rpc.Status status = 1;
- // If non-empty, contains the information needed to start reading the new
- // partition(s) that contain segments of this partition's row range.
+ // If non-empty, contains the information needed to resume reading their
+ // associated partitions.
repeated StreamContinuationToken continuation_tokens = 2;
+
+ // If non-empty, contains the new partitions to start reading from, which
+ // are related to but not necessarily identical to the partitions for the
+ // above `continuation_tokens`.
+ repeated StreamPartition new_partitions = 3;
}
// The data or control message on the stream.
From 8847fed7f77ce4715c197ca1cfcc3108e0fa1004 Mon Sep 17 00:00:00 2001
From: tengzhonger <109308630+tengzhonger@users.noreply.github.com>
Date: Wed, 1 Mar 2023 17:06:14 -0500
Subject: [PATCH 09/40] feat: Add getNewPartitions method to CloseStream for
Bigtable ChangeStream (#1655)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
Fixes # ☕️
If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
---
.../clirr-ignored-differences.xml | 6 ++
.../bigtable/data/v2/models/CloseStream.java | 31 ++++++-
.../ReadChangeStreamResumptionStrategy.java | 8 +-
.../v2/models/ChangeStreamRecordTest.java | 75 ++++++++++++++++-
.../DefaultChangeStreamRecordAdapterTest.java | 2 -
...ChangeStreamRecordMergingCallableTest.java | 6 +-
...ReadChangeStreamMergingAcceptanceTest.java | 9 +++
.../ReadChangeStreamRetryTest.java | 30 +++++--
.../src/test/resources/changestream.json | 80 ++++++++++++++++++-
9 files changed, 228 insertions(+), 19 deletions(-)
diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml
index a0ffe39bd1..da5feada67 100644
--- a/google-cloud-bigtable/clirr-ignored-differences.xml
+++ b/google-cloud-bigtable/clirr-ignored-differences.xml
@@ -100,6 +100,12 @@
*getStatus*
com.google.cloud.bigtable.common.Status
+
+
+ 7013
+ com/google/cloud/bigtable/data/v2/models/CloseStream
+ *getNewPartitions*
+
7006
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java
index d5e121e664..221b05f587 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java
@@ -19,6 +19,8 @@
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
@@ -35,8 +37,22 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {
private static CloseStream create(
com.google.rpc.Status status,
- List changeStreamContinuationTokens) {
- return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
+ List changeStreamContinuationTokens,
+ List newPartitions) {
+ if (status.getCode() == 0) {
+ Preconditions.checkState(
+ changeStreamContinuationTokens.isEmpty(),
+ "An OK CloseStream should not have continuation tokens.");
+ } else {
+ Preconditions.checkState(
+ !changeStreamContinuationTokens.isEmpty(),
+ "A non-OK CloseStream should have continuation token(s).");
+ Preconditions.checkState(
+ changeStreamContinuationTokens.size() == newPartitions.size(),
+ "Number of continuation tokens does not match number of new partitions.");
+ }
+ return new AutoValue_CloseStream(
+ Status.fromProto(status), changeStreamContinuationTokens, newPartitions);
}
/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
@@ -46,6 +62,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
closeStream.getStatus(),
closeStream.getContinuationTokensList().stream()
.map(ChangeStreamContinuationToken::fromProto)
+ .collect(ImmutableList.toImmutableList()),
+ closeStream.getNewPartitionsList().stream()
+ .map(
+ newPartition ->
+ ByteStringRange.create(
+ newPartition.getRowRange().getStartKeyClosed(),
+ newPartition.getRowRange().getEndKeyOpen()))
.collect(ImmutableList.toImmutableList()));
}
@@ -56,4 +79,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List getChangeStreamContinuationTokens();
+
+ @InternalApi("Intended for use by the BigtableIO in apache/beam only.")
+ @Nonnull
+ public abstract List getNewPartitions();
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java
index 660466db95..fda608eda5 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java
@@ -56,7 +56,13 @@ public StreamResumptionStrategy cr
public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) {
// Update the token from a Heartbeat or a ChangeStreamMutation.
// We don't worry about resumption after CloseStream, since the server
- // will return an OK status right after sending a CloseStream.
+ // will close the stream with an OK status right after sending a CloseStream,
+ // no matter what status the CloseStream.Status is:
+ // 1) ... => CloseStream.Ok => final OK. This means the read finishes successfully.
+ // 2) ... => CloseStream.Error => final OK. This means the client should start
+ // a new ReadChangeStream call with the continuation tokens specified in
+ // CloseStream.
+ // Either case, we don't need to retry after receiving a CloseStream.
if (changeStreamRecordAdapter.isHeartbeat(response)) {
this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response);
} else if (changeStreamRecordAdapter.isChangeStreamMutation(response)) {
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java
index 688ce46bcf..c00221be3d 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java
@@ -30,7 +30,11 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.function.ThrowingRunnable;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;
@@ -38,6 +42,8 @@
@RunWith(JUnit4.class)
public class ChangeStreamRecordTest {
+ @Rule public ExpectedException expect = ExpectedException.none();
+
@Test
public void heartbeatSerializationTest() throws IOException, ClassNotFoundException {
ReadChangeStreamResponse.Heartbeat heartbeatProto =
@@ -60,7 +66,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept
@Test
public void closeStreamSerializationTest() throws IOException, ClassNotFoundException {
- Status status = Status.newBuilder().setCode(0).build();
+ Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
@@ -85,6 +91,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
+ .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
+ .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream closeStream = CloseStream.fromProto(closeStreamProto);
@@ -98,6 +106,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
assertThat(actual.getChangeStreamContinuationTokens())
.isEqualTo(closeStream.getChangeStreamContinuationTokens());
assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus());
+ assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions());
}
@Test
@@ -129,7 +138,7 @@ public void heartbeatTest() {
@Test
public void closeStreamTest() {
- Status status = Status.newBuilder().setCode(0).build();
+ Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
@@ -154,6 +163,8 @@ public void closeStreamTest() {
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
+ .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
+ .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);
@@ -169,5 +180,65 @@ public void closeStreamTest() {
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
assertThat(token2)
.isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken());
+ assertThat(actualCloseStream.getNewPartitions().get(0))
+ .isEqualTo(
+ ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
+ assertThat(actualCloseStream.getNewPartitions().get(1))
+ .isEqualTo(
+ ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
+ }
+
+ // Tests that an OK CloseStream should not have continuation tokens.
+ @Test(expected = IllegalStateException.class)
+ public void closeStreamOkWithContinuationTokenShouldFail() {
+ Status status = Status.newBuilder().setCode(0).build();
+ RowRange rowRange =
+ RowRange.newBuilder()
+ .setStartKeyClosed(ByteString.copyFromUtf8(""))
+ .setEndKeyOpen(ByteString.copyFromUtf8("apple"))
+ .build();
+ String token = "close-stream-token-1";
+ ReadChangeStreamResponse.CloseStream closeStreamProto =
+ ReadChangeStreamResponse.CloseStream.newBuilder()
+ .addContinuationTokens(
+ StreamContinuationToken.newBuilder()
+ .setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
+ .setToken(token))
+ .setStatus(status)
+ .build();
+ Assert.assertThrows(
+ IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
+ }
+
+ // Tests that a non-OK CloseStream should have continuation tokens.
+ @Test(expected = IllegalStateException.class)
+ public void closeStreamErrorWithoutContinuationTokenShouldFail() {
+ Status status = Status.newBuilder().setCode(11).build();
+ ReadChangeStreamResponse.CloseStream closeStreamProto =
+ ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(status).build();
+ Assert.assertThrows(
+ IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
+ }
+
+ // Tests that the number of continuation tokens should match the number of new partitions.
+ @Test(expected = IllegalStateException.class)
+ public void closeStreamTokenAndNewPartitionCountMismatchedTest() {
+ Status status = Status.newBuilder().setCode(11).build();
+ RowRange rowRange =
+ RowRange.newBuilder()
+ .setStartKeyClosed(ByteString.copyFromUtf8(""))
+ .setEndKeyOpen(ByteString.copyFromUtf8("apple"))
+ .build();
+ String token = "close-stream-token-1";
+ ReadChangeStreamResponse.CloseStream closeStreamProto =
+ ReadChangeStreamResponse.CloseStream.newBuilder()
+ .addContinuationTokens(
+ StreamContinuationToken.newBuilder()
+ .setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
+ .setToken(token))
+ .setStatus(status)
+ .build();
+ Assert.assertThrows(
+ IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java
index 99af76fb03..22270bc269 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java
@@ -150,8 +150,6 @@ public void heartbeatTest() {
public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream expectedCloseStream =
ReadChangeStreamResponse.CloseStream.newBuilder()
- .addContinuationTokens(
- StreamContinuationToken.newBuilder().setToken("random-token").build())
.setStatus(Status.newBuilder().setCode(0).build())
.build();
assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream))
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java
index 736491a0af..f0939fb0cf 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java
@@ -102,7 +102,8 @@ public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(streamContinuationToken)
- .setStatus(Status.newBuilder().setCode(0).build())
+ .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
+ .setStatus(Status.newBuilder().setCode(11))
.build();
ReadChangeStreamResponse response =
ReadChangeStreamResponse.newBuilder().setCloseStream(closeStreamProto).build();
@@ -127,5 +128,8 @@ public void closeStreamTest() {
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
assertThat(changeStreamContinuationToken.getToken())
.isEqualTo(streamContinuationToken.getToken());
+ assertThat(closeStream.getNewPartitions().size()).isEqualTo(1);
+ assertThat(closeStream.getNewPartitions().get(0))
+ .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
}
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java
index 67d6a99f7b..7c3243ecfe 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java
@@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile;
@@ -173,6 +174,14 @@ public void test() throws Exception {
.setToken(token.getToken())
.build());
}
+ for (ByteStringRange newPartition : closeStream.getNewPartitions()) {
+ builder.addNewPartitions(
+ StreamPartition.newBuilder()
+ .setRowRange(
+ RowRange.newBuilder()
+ .setStartKeyClosed(newPartition.getStart())
+ .setEndKeyOpen(newPartition.getEnd())));
+ }
ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build();
actualResults.add(
ReadChangeStreamTest.Result.newBuilder()
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java
index c994f3fc8d..48a62bfee8 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java
@@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to
.build();
}
+ private StreamPartition createNewPartitionForCloseStream() {
+ return StreamPartition.newBuilder()
+ .setRowRange(
+ RowRange.newBuilder()
+ .setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED))
+ .setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)))
+ .build();
+ }
+
private ReadChangeStreamResponse.Heartbeat createHeartbeat(
StreamContinuationToken streamContinuationToken) {
return ReadChangeStreamResponse.Heartbeat.newBuilder()
@@ -130,11 +139,18 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat(
.build();
}
- private ReadChangeStreamResponse.CloseStream createCloseStream() {
- return ReadChangeStreamResponse.CloseStream.newBuilder()
- .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
- .setStatus(com.google.rpc.Status.newBuilder().setCode(0).build())
- .build();
+ private ReadChangeStreamResponse.CloseStream createCloseStream(boolean isOk) {
+ ReadChangeStreamResponse.CloseStream.Builder builder =
+ ReadChangeStreamResponse.CloseStream.newBuilder();
+ if (isOk) {
+ builder.setStatus(com.google.rpc.Status.newBuilder().setCode(0));
+ } else {
+ builder
+ .setStatus(com.google.rpc.Status.newBuilder().setCode(11))
+ .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
+ .addNewPartitions(createNewPartitionForCloseStream());
+ }
+ return builder.build();
}
private ReadChangeStreamResponse.DataChange createDataChange(boolean done) {
@@ -178,7 +194,7 @@ public void happyPathHeartbeatTest() {
@Test
public void happyPathCloseStreamTest() {
ReadChangeStreamResponse closeStreamResponse =
- ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build();
+ ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(true)).build();
service.expectations.add(
RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse));
List actualResults = getResults();
@@ -221,7 +237,7 @@ public void singleHeartbeatImmediateRetryTest() {
public void singleCloseStreamImmediateRetryTest() {
// CloseStream.
ReadChangeStreamResponse closeStreamResponse =
- ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build();
+ ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(false)).build();
service.expectations.add(
RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE));
// Resume with the exact same request.
diff --git a/google-cloud-bigtable/src/test/resources/changestream.json b/google-cloud-bigtable/src/test/resources/changestream.json
index 9d9e2d46cc..661bf1b4cb 100644
--- a/google-cloud-bigtable/src/test/resources/changestream.json
+++ b/google-cloud-bigtable/src/test/resources/changestream.json
@@ -61,11 +61,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
- "end_key_open": "0000000000000002"
+ "end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
+ ],
+ "new_partitions": [
+ {
+ "row_range": {
+ "start_key_closed": "",
+ "end_key_open": "0000000000000002"
+ }
+ },
+ {
+ "row_range": {
+ "start_key_closed": "0000000000000002",
+ "end_key_open": "0000000000000003"
+ }
+ }
]
}
}
@@ -92,11 +106,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
- "end_key_open": "0000000000000002"
+ "end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
+ ],
+ "new_partitions": [
+ {
+ "row_range": {
+ "start_key_closed": "",
+ "end_key_open": "0000000000000002"
+ }
+ },
+ {
+ "row_range": {
+ "start_key_closed": "0000000000000002",
+ "end_key_open": "0000000000000003"
+ }
+ }
]
}
},
@@ -137,6 +165,14 @@
},
"token": "close-stream-token-1"
}
+ ],
+ "new_partitions": [
+ {
+ "row_range": {
+ "start_key_closed": "",
+ "end_key_open": "0000000000000002"
+ }
+ }
]
}
}
@@ -176,6 +212,14 @@
},
"token": "close-stream-token-1"
}
+ ],
+ "new_partitions": [
+ {
+ "row_range": {
+ "start_key_closed": "",
+ "end_key_open": "0000000000000002"
+ }
+ }
]
}
},
@@ -1280,11 +1324,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
- "end_key_open": "0000000000000002"
+ "end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
+ ],
+ "new_partitions": [
+ {
+ "row_range": {
+ "start_key_closed": "",
+ "end_key_open": "0000000000000002"
+ }
+ },
+ {
+ "row_range": {
+ "start_key_closed": "0000000000000002",
+ "end_key_open": "0000000000000003"
+ }
+ }
]
}
}
@@ -1363,11 +1421,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
- "end_key_open": "0000000000000002"
+ "end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
+ ],
+ "new_partitions": [
+ {
+ "row_range": {
+ "start_key_closed": "",
+ "end_key_open": "0000000000000002"
+ }
+ },
+ {
+ "row_range": {
+ "start_key_closed": "0000000000000002",
+ "end_key_open": "0000000000000003"
+ }
+ }
]
}
},
From c7a3e29dc717e2fa3d9b15f1ae9fb9f795d6f78a Mon Sep 17 00:00:00 2001
From: Mend Renovate
Date: Thu, 2 Mar 2023 15:10:12 +0000
Subject: [PATCH 10/40] deps: update dependency
com.google.cloud:google-cloud-shared-dependencies to v3.4.0 (#1657)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
[](https://renovatebot.com)
This PR contains the following updates:
| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [com.google.cloud:google-cloud-shared-dependencies](https://togithub.com/googleapis/google-cloud-java) | `3.3.0` -> `3.4.0` | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) |
---
### Configuration
📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined).
🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.
♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.
🔕 **Ignore**: Close this PR and you won't be reminded about this update again.
---
- [ ] If you want to rebase/retry this PR, check this box
---
This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-bigtable).
---
google-cloud-bigtable-deps-bom/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/google-cloud-bigtable-deps-bom/pom.xml b/google-cloud-bigtable-deps-bom/pom.xml
index d6266de583..94ae2b0b91 100644
--- a/google-cloud-bigtable-deps-bom/pom.xml
+++ b/google-cloud-bigtable-deps-bom/pom.xml
@@ -66,7 +66,7 @@
com.google.cloud
google-cloud-shared-dependencies
- 3.3.0
+ 3.4.0
pom
import
From 1c632ec63987958e469e2b5861c29724c5cb8970 Mon Sep 17 00:00:00 2001
From: Mend Renovate
Date: Thu, 2 Mar 2023 15:32:13 +0000
Subject: [PATCH 11/40] deps: update dependency
com.google.cloud:google-cloud-monitoring-bom to v3.13.0 (#1656)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
[](https://renovatebot.com)
This PR contains the following updates:
| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [com.google.cloud:google-cloud-monitoring-bom](https://togithub.com/googleapis/google-cloud-java) | `3.12.0` -> `3.13.0` | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) |
---
### Configuration
📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined).
🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.
♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.
🔕 **Ignore**: Close this PR and you won't be reminded about this update again.
---
- [ ] If you want to rebase/retry this PR, check this box
---
This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-bigtable).
---
google-cloud-bigtable-deps-bom/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/google-cloud-bigtable-deps-bom/pom.xml b/google-cloud-bigtable-deps-bom/pom.xml
index 94ae2b0b91..df58c09ec9 100644
--- a/google-cloud-bigtable-deps-bom/pom.xml
+++ b/google-cloud-bigtable-deps-bom/pom.xml
@@ -73,7 +73,7 @@
com.google.cloud
google-cloud-monitoring-bom
- 3.12.0
+ 3.13.0
From 261ffc204fc092cd1cdac7566df70594615bd7eb Mon Sep 17 00:00:00 2001
From: "release-please[bot]"
<55107282+release-please[bot]@users.noreply.github.com>
Date: Thu, 2 Mar 2023 16:02:12 +0000
Subject: [PATCH 12/40] chore(main): release 2.20.0 (#1649)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
:robot: I have created a release *beep* *boop*
---
## [2.20.0](https://togithub.com/googleapis/java-bigtable/compare/v2.19.2...v2.20.0) (2023-03-02)
### Features
* Add getNewPartitions method to CloseStream for Bigtable ChangeStream ([#1655](https://togithub.com/googleapis/java-bigtable/issues/1655)) ([8847fed](https://togithub.com/googleapis/java-bigtable/commit/8847fed7f77ce4715c197ca1cfcc3108e0fa1004))
* Add new_partitions field for CloseStream for Cloud Bigtable ChangeStream ([#1654](https://togithub.com/googleapis/java-bigtable/issues/1654)) ([0e283bf](https://togithub.com/googleapis/java-bigtable/commit/0e283bff0a12f5e4da8b0975d4bd747229c3780c))
### Bug Fixes
* Fix StackOverflow in ChangeStreamStateMachine due to excessive mods ([#1648](https://togithub.com/googleapis/java-bigtable/issues/1648)) ([9e11106](https://togithub.com/googleapis/java-bigtable/commit/9e1110600dc64defcd9143753f45b5b8226aa339))
* Use org.threeten.bp.Duration for ReadChangeStreamQuery::heartbeatDura… ([#1652](https://togithub.com/googleapis/java-bigtable/issues/1652)) ([87261a9](https://togithub.com/googleapis/java-bigtable/commit/87261a977d6fc7877d7d253c67ea34c264f63f7c))
### Dependencies
* Update dependency com.google.cloud:google-cloud-monitoring-bom to v3.13.0 ([#1656](https://togithub.com/googleapis/java-bigtable/issues/1656)) ([1c632ec](https://togithub.com/googleapis/java-bigtable/commit/1c632ec63987958e469e2b5861c29724c5cb8970))
* Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.4.0 ([#1657](https://togithub.com/googleapis/java-bigtable/issues/1657)) ([c7a3e29](https://togithub.com/googleapis/java-bigtable/commit/c7a3e29dc717e2fa3d9b15f1ae9fb9f795d6f78a))
---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
---
CHANGELOG.md | 20 +++++++++++++++++++
google-cloud-bigtable-bom/pom.xml | 18 ++++++++---------
google-cloud-bigtable-deps-bom/pom.xml | 2 +-
google-cloud-bigtable-emulator-core/pom.xml | 4 ++--
google-cloud-bigtable-emulator/pom.xml | 10 +++++-----
google-cloud-bigtable-stats/pom.xml | 6 +++---
google-cloud-bigtable/pom.xml | 10 +++++-----
.../com/google/cloud/bigtable/Version.java | 2 +-
grpc-google-cloud-bigtable-admin-v2/pom.xml | 8 ++++----
grpc-google-cloud-bigtable-v2/pom.xml | 8 ++++----
pom.xml | 12 +++++------
proto-google-cloud-bigtable-admin-v2/pom.xml | 8 ++++----
proto-google-cloud-bigtable-v2/pom.xml | 8 ++++----
samples/snapshot/pom.xml | 2 +-
test-proxy/pom.xml | 4 ++--
versions.txt | 14 ++++++-------
16 files changed, 78 insertions(+), 58 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 03bd1e243a..85f389727f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,25 @@
# Changelog
+## [2.20.0](https://github.com/googleapis/java-bigtable/compare/v2.19.2...v2.20.0) (2023-03-02)
+
+
+### Features
+
+* Add getNewPartitions method to CloseStream for Bigtable ChangeStream ([#1655](https://github.com/googleapis/java-bigtable/issues/1655)) ([8847fed](https://github.com/googleapis/java-bigtable/commit/8847fed7f77ce4715c197ca1cfcc3108e0fa1004))
+* Add new_partitions field for CloseStream for Cloud Bigtable ChangeStream ([#1654](https://github.com/googleapis/java-bigtable/issues/1654)) ([0e283bf](https://github.com/googleapis/java-bigtable/commit/0e283bff0a12f5e4da8b0975d4bd747229c3780c))
+
+
+### Bug Fixes
+
+* Fix StackOverflow in ChangeStreamStateMachine due to excessive mods ([#1648](https://github.com/googleapis/java-bigtable/issues/1648)) ([9e11106](https://github.com/googleapis/java-bigtable/commit/9e1110600dc64defcd9143753f45b5b8226aa339))
+* Use org.threeten.bp.Duration for ReadChangeStreamQuery::heartbeatDura… ([#1652](https://github.com/googleapis/java-bigtable/issues/1652)) ([87261a9](https://github.com/googleapis/java-bigtable/commit/87261a977d6fc7877d7d253c67ea34c264f63f7c))
+
+
+### Dependencies
+
+* Update dependency com.google.cloud:google-cloud-monitoring-bom to v3.13.0 ([#1656](https://github.com/googleapis/java-bigtable/issues/1656)) ([1c632ec](https://github.com/googleapis/java-bigtable/commit/1c632ec63987958e469e2b5861c29724c5cb8970))
+* Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.4.0 ([#1657](https://github.com/googleapis/java-bigtable/issues/1657)) ([c7a3e29](https://github.com/googleapis/java-bigtable/commit/c7a3e29dc717e2fa3d9b15f1ae9fb9f795d6f78a))
+
## [2.19.2](https://github.com/googleapis/java-bigtable/compare/v2.19.1...v2.19.2) (2023-02-21)
diff --git a/google-cloud-bigtable-bom/pom.xml b/google-cloud-bigtable-bom/pom.xml
index 372eb59c70..366c2695db 100644
--- a/google-cloud-bigtable-bom/pom.xml
+++ b/google-cloud-bigtable-bom/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-bigtable-bom
- 2.19.3-SNAPSHOT
+ 2.20.0
pom
com.google.cloud
@@ -63,42 +63,42 @@
com.google.cloud
google-cloud-bigtable
- 2.19.3-SNAPSHOT
+ 2.20.0
com.google.cloud
google-cloud-bigtable-emulator
- 0.156.3-SNAPSHOT
+ 0.157.0
com.google.cloud
google-cloud-bigtable-emulator-core
- 0.156.3-SNAPSHOT
+ 0.157.0
com.google.api.grpc
grpc-google-cloud-bigtable-admin-v2
- 2.19.3-SNAPSHOT
+ 2.20.0
com.google.api.grpc
grpc-google-cloud-bigtable-v2
- 2.19.3-SNAPSHOT
+ 2.20.0
com.google.api.grpc
proto-google-cloud-bigtable-admin-v2
- 2.19.3-SNAPSHOT
+ 2.20.0
com.google.api.grpc
proto-google-cloud-bigtable-v2
- 2.19.3-SNAPSHOT
+ 2.20.0
com.google.cloud
google-cloud-bigtable-stats
- 2.19.3-SNAPSHOT
+ 2.20.0