Skip to content

Commit 504bb51

Browse files
authored
fix: fix skip large row (#2785)
1 parent f45bb0e commit 504bb51

File tree

5 files changed

+333
-50
lines changed

5 files changed

+333
-50
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2;
17+
18+
import com.google.cloud.bigtable.data.v2.models.Row;
19+
import com.google.cloud.bigtable.data.v2.models.TableId;
20+
21+
public class Main {
22+
23+
public static void main(String[] args) throws Exception {
24+
try (BigtableDataClient client =
25+
BigtableDataClient.create("google.com:cloud-bigtable-dev", "mattiefu-test")) {
26+
27+
for (int i = 0; i < 100; i++) {
28+
Row row = client.readRow(TableId.of("benchmark"), "key-0");
29+
30+
System.out.println(row.getKey());
31+
32+
Thread.sleep(10000);
33+
}
34+
}
35+
}
36+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public static RowSet eraseLargeRow(RowSet rowSet, ByteString excludePoint) {
6868
// Handle ranges
6969
for (RowRange rowRange : rowSet.getRowRangesList()) {
7070
List<RowRange> afterSplit = splitOnLargeRowKey(rowRange, excludePoint);
71-
if (afterSplit != null && !afterSplit.isEmpty()) {
71+
if (!afterSplit.isEmpty()) {
7272
afterSplit.forEach(newRowSet::addRowRanges);
7373
}
7474
}
@@ -162,8 +162,11 @@ private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString larg
162162
ByteString startKey = StartPoint.extract(range).value;
163163
ByteString endKey = EndPoint.extract(range).value;
164164

165-
// if end key is on the left of large row key, don't split
166-
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
165+
// Empty endKey means it's unbounded
166+
boolean boundedEnd = !endKey.isEmpty();
167+
168+
// if end key is on the left of large row key, don't split.
169+
if (boundedEnd && ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
167170
rowRanges.add(range);
168171
return rowRanges;
169172
}
@@ -181,11 +184,19 @@ private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString larg
181184
}
182185

183186
// if the end key is on the right of the large row key, set the start key to be large row key
184-
// open
185-
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
186-
RowRange afterSplit = range.toBuilder().setStartKeyOpen(largeRowKey).build();
187-
rowRanges.add(afterSplit);
187+
// open.
188+
if (!boundedEnd || ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
189+
// handle the edge case where (key, key\0) is an empty range and should be excluded
190+
ByteString nextKey = largeRowKey.concat(ByteString.copyFrom(new byte[] {0}));
191+
EndPoint endPoint = EndPoint.extract(range);
192+
boolean isEmptyRange = !endPoint.isClosed && endPoint.value.equals(nextKey);
193+
194+
if (!isEmptyRange) {
195+
RowRange afterSplit = range.toBuilder().setStartKeyOpen(largeRowKey).build();
196+
rowRanges.add(afterSplit);
197+
}
188198
}
199+
189200
return rowRanges;
190201
}
191202

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtilTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,20 @@ public void multipleRangeBoundTest() {
331331
assertThat(actual).isEqualTo(ByteStringRange.create("a", "z"));
332332
}
333333

334+
@Test
335+
public void eraseLargeRowEmptyRangeTest() {
336+
ByteString key = ByteString.copyFromUtf8("a");
337+
ByteString keyTrailer = key.concat(ByteString.copyFrom(new byte[] {0}));
338+
339+
RowSet rowSet =
340+
RowSet.newBuilder()
341+
.addRowRanges(
342+
RowRange.newBuilder().setStartKeyClosed(key).setEndKeyOpen(keyTrailer).build())
343+
.build();
344+
RowSet actual = RowSetUtil.eraseLargeRow(rowSet, key);
345+
assertThat(actual).isNull();
346+
}
347+
334348
// Helpers
335349
private static void verifyShard(RowSet input, SortedSet<ByteString> splits, RowSet... expected) {
336350
List<RowSet> actualWithNull = RowSetUtil.shard(input, splits);

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java

Lines changed: 139 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -114,51 +114,13 @@ public void testWriteRead() throws Exception {
114114
assertThat(row.getCells().get(1).getValue()).isEqualTo(largeValue);
115115
}
116116

117-
static class AccumulatingObserver implements ResponseObserver<Row> {
118-
119-
final List<Row> responses = Lists.newArrayList();
120-
final SettableApiFuture<Void> completionFuture = SettableApiFuture.create();
121-
122-
void awaitCompletion() throws Throwable {
123-
try {
124-
completionFuture.get(10, TimeUnit.MINUTES);
125-
} catch (ExecutionException e) {
126-
throw e.getCause();
127-
}
128-
}
129-
130-
@Override
131-
public void onStart(StreamController controller) {}
132-
133-
@Override
134-
public void onResponse(Row row) {
135-
responses.add(row);
136-
}
137-
138-
@Override
139-
public void onError(Throwable t) {
140-
completionFuture.setException(t);
141-
}
142-
143-
@Override
144-
public void onComplete() {
145-
completionFuture.set(null);
146-
}
147-
}
148-
149117
@Test
150-
public void read() throws Throwable {
118+
public void testSkipLargeRow() throws Throwable {
151119
assume()
152120
.withMessage("Large row read errors are not supported by emulator")
153121
.that(testEnvRule.env())
154122
.isNotInstanceOf(EmulatorEnv.class);
155123

156-
// TODO: remove this once skip large row for read is released
157-
assume()
158-
.withMessage("Skip large row for read is not released yet")
159-
.that(System.getProperty("bigtable.testSkipLargeRowIntegrationTests"))
160-
.isEqualTo("true");
161-
162124
BigtableDataClient client = testEnvRule.env().getDataClient();
163125
String tableId = table.getId();
164126
String familyId = this.familyId;
@@ -202,12 +164,34 @@ public void read() throws Throwable {
202164
ImmutableList.<String>of(),
203165
ByteString.copyFromUtf8("my-value"))));
204166

167+
Row expectedRow5 =
168+
Row.create(
169+
ByteString.copyFromUtf8("r5"),
170+
ImmutableList.of(
171+
RowCell.create(
172+
familyId,
173+
ByteString.copyFromUtf8("qualifier"),
174+
timestampMicros,
175+
ImmutableList.of(),
176+
ByteString.copyFromUtf8("my-value"))));
177+
178+
Row expectedRow6 =
179+
Row.create(
180+
ByteString.copyFromUtf8("r6"),
181+
ImmutableList.of(
182+
RowCell.create(
183+
familyId,
184+
ByteString.copyFromUtf8("qualifier"),
185+
timestampMicros,
186+
ImmutableList.of(),
187+
ByteString.copyFromUtf8("my-value"))));
188+
205189
// large row creation
206190
byte[] largeValueBytes = new byte[3 * 1024 * 1024];
207191
ByteString largeValue = ByteString.copyFrom(largeValueBytes);
208192

209193
for (int i = 0; i < 100; i++) {
210-
ByteString qualifier = ByteString.copyFromUtf8("qualifier1_" + "_" + String.valueOf(i));
194+
ByteString qualifier = ByteString.copyFromUtf8("qualifier1_" + "_" + i);
211195
client.mutateRow(
212196
RowMutation.create(TableId.of(tableId), "r2").setCell(familyId, qualifier, largeValue));
213197
client.mutateRow(
@@ -222,7 +206,8 @@ public void read() throws Throwable {
222206
.call(
223207
Query.create(tableId)
224208
.range(ByteStringRange.unbounded().startClosed("r1").endOpen("r3"))))
225-
.containsExactly(expectedRow1);
209+
.containsExactly(expectedRow1)
210+
.inOrder();
226211

227212
assertThat(
228213
client
@@ -231,7 +216,8 @@ public void read() throws Throwable {
231216
.call(
232217
Query.create(tableId)
233218
.range(ByteStringRange.unbounded().startClosed("r1").endClosed("r4"))))
234-
.containsExactly(expectedRow1, expectedRow4);
219+
.containsExactly(expectedRow1, expectedRow4)
220+
.inOrder();
235221

236222
List<Row> emptyRows =
237223
client
@@ -267,7 +253,78 @@ public void read() throws Throwable {
267253
.call(
268254
Query.create(tableId)
269255
.range(ByteStringRange.unbounded().startClosed("r1").endClosed("r4"))))
270-
.containsExactly(expectedRow1, expectedRow4);
256+
.containsExactly(expectedRow1, expectedRow4)
257+
.inOrder();
258+
259+
assertThat(client.skipLargeRowsCallable().all().call(Query.create(tableId)))
260+
.containsExactly(expectedRow1, expectedRow4, expectedRow5, expectedRow6)
261+
.inOrder();
262+
263+
assertThat(
264+
client
265+
.skipLargeRowsCallable()
266+
.all()
267+
.call(Query.create(tableId).range(ByteStringRange.unbounded().endClosed("r4"))))
268+
.containsExactly(expectedRow1, expectedRow4)
269+
.inOrder();
270+
271+
assertThat(
272+
client
273+
.skipLargeRowsCallable()
274+
.all()
275+
.call(Query.create(tableId).range(ByteStringRange.unbounded().startClosed("r1"))))
276+
.containsExactly(expectedRow1, expectedRow4, expectedRow5, expectedRow6)
277+
.inOrder();
278+
279+
assertThat(
280+
client
281+
.skipLargeRowsCallable()
282+
.all()
283+
.call(Query.create(tableId).range(ByteStringRange.unbounded().endOpen("r4"))))
284+
.containsExactly(expectedRow1);
285+
286+
assertThat(
287+
client
288+
.skipLargeRowsCallable()
289+
.all()
290+
.call(Query.create(tableId).range(ByteStringRange.unbounded().startOpen("r1"))))
291+
.containsExactly(expectedRow4, expectedRow5, expectedRow6);
292+
293+
assertThat(client.skipLargeRowsCallable().all().call(Query.create(tableId).reversed(true)))
294+
.containsExactly(expectedRow6, expectedRow5, expectedRow4, expectedRow1)
295+
.inOrder();
296+
297+
assertThat(
298+
client
299+
.skipLargeRowsCallable()
300+
.all()
301+
.call(
302+
Query.create(tableId)
303+
.range(ByteStringRange.unbounded().endClosed("r4"))
304+
.reversed(true)))
305+
.containsExactly(expectedRow4, expectedRow1)
306+
.inOrder();
307+
308+
assertThat(
309+
client
310+
.skipLargeRowsCallable()
311+
.all()
312+
.call(
313+
Query.create(tableId)
314+
.range(ByteStringRange.unbounded().startClosed("r1"))
315+
.reversed(true)))
316+
.containsExactly(expectedRow6, expectedRow5, expectedRow4, expectedRow1)
317+
.inOrder();
318+
319+
assertThat(
320+
client
321+
.skipLargeRowsCallable()
322+
.all()
323+
.call(
324+
Query.create(tableId)
325+
.range(ByteStringRange.unbounded().startClosed("r2").endOpen("r3\0"))))
326+
.isEmpty();
327+
271328
// async
272329
AccumulatingObserver observer = new AccumulatingObserver();
273330
Query query = Query.create(tableId).range("r1", "r3");
@@ -280,5 +337,44 @@ public void read() throws Throwable {
280337
client.skipLargeRowsCallable().call(query2, observer2);
281338
observer2.awaitCompletion();
282339
assertThat(observer2.responses).containsExactly(expectedRow1, expectedRow4);
340+
341+
AccumulatingObserver observer3 = new AccumulatingObserver();
342+
Query query3 = Query.create(tableId);
343+
client.skipLargeRowsCallable().call(query3, observer3);
344+
observer3.awaitCompletion();
345+
assertThat(observer3.responses)
346+
.containsExactly(expectedRow1, expectedRow4, expectedRow5, expectedRow6);
347+
}
348+
349+
static class AccumulatingObserver implements ResponseObserver<Row> {
350+
351+
final List<Row> responses = Lists.newArrayList();
352+
final SettableApiFuture<Void> completionFuture = SettableApiFuture.create();
353+
354+
void awaitCompletion() throws Throwable {
355+
try {
356+
completionFuture.get(10, TimeUnit.MINUTES);
357+
} catch (ExecutionException e) {
358+
throw e.getCause();
359+
}
360+
}
361+
362+
@Override
363+
public void onStart(StreamController controller) {}
364+
365+
@Override
366+
public void onResponse(Row row) {
367+
responses.add(row);
368+
}
369+
370+
@Override
371+
public void onError(Throwable t) {
372+
completionFuture.setException(t);
373+
}
374+
375+
@Override
376+
public void onComplete() {
377+
completionFuture.set(null);
378+
}
283379
}
284380
}

0 commit comments

Comments
 (0)