Skip to content

Commit 25f6b33

Browse files
authored
fix(spanner): correct handling of PartialResultSet.resume_token (googleapis#8521)
Problems: - A `google::spanner::v1::PartialResultSet` may not contain a `resume_token` when it also does not complete a new row, so `last_resume_token_` should not be updated in that case. - When `PartialResultSetResume::Read()` uses its factory and `last_resume_token_` to create a new `PartialResultSetReader`, it should inform the `PartialResultSetSource` so that any partly-constructed row can be discarded before processing the resumed stream (which will resend that row). That is, a "resume token" is really a row iterator, and a level above row fragmentation and value chunking. So fix all of that. Then expand the `ConnectionImplTest.ReadSuccess` test case to exercise the new code paths.
1 parent f3ba3f0 commit 25f6b33

12 files changed

Lines changed: 166 additions & 88 deletions

google/cloud/spanner/internal/connection_impl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ class DefaultPartialResultSetReader : public PartialResultSetReader {
5050

5151
void TryCancel() override { context_->TryCancel(); }
5252

53-
absl::optional<google::spanner::v1::PartialResultSet> Read() override {
53+
absl::optional<PartialResultSet> Read() override {
5454
google::spanner::v1::PartialResultSet result;
5555
bool success = reader_->Read(&result);
5656
if (!success) return {};
57-
return result;
57+
return PartialResultSet{std::move(result), false};
5858
}
5959

6060
Status Finish() override {

google/cloud/spanner/internal/connection_impl_test.cc

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ using ::testing::DoAll;
7272
using ::testing::Eq;
7373
using ::testing::HasSubstr;
7474
using ::testing::InSequence;
75+
using ::testing::IsEmpty;
7576
using ::testing::NiceMock;
7677
using ::testing::Not;
7778
using ::testing::Property;
@@ -379,22 +380,62 @@ TEST(ConnectionImplTest, ReadSuccess) {
379380
}
380381
}
381382
}
382-
resume_token: "test-token-0"
383383
values: { string_value: "12" }
384384
values: { string_value: "Steve" }
385+
values: { string_value: "4" }
386+
chunked_value: true
387+
resume_token: "restart-row-2"
385388
)pb",
386389
R"pb(
387-
resume_token: "test-token-1"
388390
values: { string_value: "42" }
389-
values: { string_value: "Ann" }
391+
values: { string_value: "A" }
392+
chunked_value: true
393+
)pb",
394+
R"pb(
395+
values: { string_value: "nn" }
396+
resume_token: "end-of-stream"
390397
)pb",
391398
};
392399
EXPECT_CALL(*mock,
393400
StreamingRead(
394401
_, HasPriority(spanner_proto::RequestOptions::PRIORITY_LOW)))
395-
.WillOnce(Return(ByMove(MakeReader({}, retry_status))))
396-
.WillOnce(Return(ByMove(MakeReader({responses[0]}, retry_status))))
397-
.WillOnce(Return(ByMove(MakeReader({responses[1]}))));
402+
.WillOnce(
403+
[&retry_status](grpc::ClientContext&,
404+
google::spanner::v1::ReadRequest const& request) {
405+
// The beginning of the row stream, but immediately fail.
406+
EXPECT_THAT(request.resume_token(), IsEmpty());
407+
return MakeReader({}, retry_status);
408+
})
409+
.WillOnce([&responses, &retry_status](
410+
grpc::ClientContext&,
411+
google::spanner::v1::ReadRequest const& request) {
412+
// Restart from the beginning, but return the first row and part of
413+
// the second before failing again.
414+
EXPECT_THAT(request.resume_token(), IsEmpty());
415+
return MakeReader({responses[0]}, retry_status);
416+
})
417+
.WillOnce([&responses, &retry_status](
418+
grpc::ClientContext&,
419+
google::spanner::v1::ReadRequest const& request) {
420+
// Restart from the second row, but only return part of it before
421+
// failing once more.
422+
EXPECT_THAT(request.resume_token(), Eq("restart-row-2"));
423+
return MakeReader({responses[1]}, retry_status);
424+
})
425+
.WillOnce([&responses, &retry_status](
426+
grpc::ClientContext&,
427+
google::spanner::v1::ReadRequest const& request) {
428+
// Restart from the second row, but now deliver it all in two chunks
429+
// before failing for the last time.
430+
EXPECT_THAT(request.resume_token(), Eq("restart-row-2"));
431+
return MakeReader({responses[1], responses[2]}, retry_status);
432+
})
433+
.WillOnce([](grpc::ClientContext&,
434+
google::spanner::v1::ReadRequest const& request) {
435+
// Finally, restart from the end, and signal end of stream.
436+
EXPECT_THAT(request.resume_token(), Eq("end-of-stream"));
437+
return MakeReader({});
438+
});
398439

399440
spanner::ReadOptions read_options;
400441
read_options.request_priority = spanner::RequestPriority::kLow;

google/cloud/spanner/internal/logging_result_set_reader.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ void LoggingResultSetReader::TryCancel() {
2828
GCP_LOG(DEBUG) << __func__ << "() >> (void)";
2929
}
3030

31-
absl::optional<google::spanner::v1::PartialResultSet>
32-
LoggingResultSetReader::Read() {
31+
absl::optional<PartialResultSet> LoggingResultSetReader::Read() {
3332
GCP_LOG(DEBUG) << __func__ << "() << (void)";
3433
auto result = impl_->Read();
3534
if (!result) {
3635
GCP_LOG(DEBUG) << __func__ << "() >> (optional-with-no-value)";
3736
} else {
3837
GCP_LOG(DEBUG) << __func__ << "() >> "
39-
<< DebugString(*result, tracing_options_);
38+
<< (result->resumption ? "resumption " : "")
39+
<< DebugString(result->result, tracing_options_);
4040
}
4141
return result;
4242
}

google/cloud/spanner/internal/logging_result_set_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class LoggingResultSetReader : public PartialResultSetReader {
3434
~LoggingResultSetReader() override = default;
3535

3636
void TryCancel() override;
37-
absl::optional<google::spanner::v1::PartialResultSet> Read() override;
37+
absl::optional<PartialResultSet> Read() override;
3838
Status Finish() override;
3939

4040
private:

google/cloud/spanner/internal/logging_result_set_reader_test.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,13 @@ TEST_F(LoggingResultSetReaderTest, Read) {
5151
.WillOnce([] {
5252
spanner_proto::PartialResultSet result;
5353
result.set_resume_token("test-token");
54-
return result;
54+
return PartialResultSet{std::move(result), false};
5555
})
56-
.WillOnce(
57-
[] { return absl::optional<spanner_proto::PartialResultSet>{}; });
56+
.WillOnce([] { return absl::optional<PartialResultSet>{}; });
5857
LoggingResultSetReader reader(std::move(mock), TracingOptions{});
5958
auto result = reader.Read();
6059
ASSERT_TRUE(result.has_value());
61-
EXPECT_EQ("test-token", result->resume_token());
60+
EXPECT_EQ("test-token", result->result.resume_token());
6261

6362
auto log_lines = log_.ExtractLines();
6463
EXPECT_THAT(log_lines, Contains(HasSubstr("Read")));

google/cloud/spanner/internal/partial_result_set_reader.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ namespace cloud {
2727
namespace spanner_internal {
2828
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2929

30+
/**
31+
* The result of a successful `PartialResultSetReader::Read()`, which may
32+
* be the next partial result of a stream, or a resumption of an interrupted
33+
* stream from the last "resume token". In the latter case, the caller should
34+
* discard any pending row-assembly state as that data will be replayed.
35+
*/
36+
struct PartialResultSet {
37+
google::spanner::v1::PartialResultSet result;
38+
bool resumption;
39+
};
40+
3041
/**
3142
* Wrap `grpc::ClientReaderInterface<google::spanner::v1::PartialResultSet>`.
3243
*
@@ -40,7 +51,7 @@ class PartialResultSetReader {
4051
public:
4152
virtual ~PartialResultSetReader() = default;
4253
virtual void TryCancel() = 0;
43-
virtual absl::optional<google::spanner::v1::PartialResultSet> Read() = 0;
54+
virtual absl::optional<PartialResultSet> Read() = 0;
4455
virtual Status Finish() = 0;
4556
};
4657

google/cloud/spanner/internal/partial_result_set_resume.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,22 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2222

2323
void PartialResultSetResume::TryCancel() { child_->TryCancel(); }
2424

25-
absl::optional<google::spanner::v1::PartialResultSet>
26-
PartialResultSetResume::Read() {
25+
absl::optional<PartialResultSet> PartialResultSetResume::Read() {
26+
bool resumption = false;
2727
do {
28-
absl::optional<google::spanner::v1::PartialResultSet> result =
29-
child_->Read();
28+
absl::optional<PartialResultSet> result = child_->Read();
3029
if (result) {
31-
last_resume_token_ = result->resume_token();
30+
// If the resume_token is empty then this PartialResultSet does not
31+
// contain enough data for PartialResultSetSource to be able to yield
32+
// a new row, so we should leave last_resume_token_ as is---ready to
33+
// re-request this undelivered chunk should a following Read() fail.
34+
if (!result->result.resume_token().empty()) {
35+
last_resume_token_ = result->result.resume_token();
36+
}
37+
// Let the caller know if we recreated the PartialResultSetReader using
38+
// last_resume_token_ so that they might discard any pending row-assembly
39+
// state as that data will also be in this new result.
40+
result->resumption = resumption;
3241
return result;
3342
}
3443
auto status = Finish();
@@ -38,6 +47,7 @@ PartialResultSetResume::Read() {
3847
return {};
3948
}
4049
std::this_thread::sleep_for(backoff_policy_prototype_->OnCompletion());
50+
resumption = true;
4151
last_status_.reset();
4252
child_ = factory_(last_resume_token_);
4353
} while (!retry_policy_prototype_->IsExhausted());

google/cloud/spanner/internal/partial_result_set_resume.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class PartialResultSetResume : public PartialResultSetReader {
5353
~PartialResultSetResume() override = default;
5454

5555
void TryCancel() override;
56-
absl::optional<google::spanner::v1::PartialResultSet> Read() override;
56+
absl::optional<PartialResultSet> Read() override;
5757
Status Finish() override;
5858

5959
private:

google/cloud/spanner/internal/partial_result_set_resume_test.cc

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ using ::testing::AtLeast;
4141
using ::testing::HasSubstr;
4242
using ::testing::Return;
4343

44-
using ReadReturn = absl::optional<spanner_proto::PartialResultSet>;
44+
absl::optional<PartialResultSet> ReadReturn(
45+
spanner_proto::PartialResultSet response) {
46+
return PartialResultSet{std::move(response), false};
47+
}
48+
49+
absl::optional<PartialResultSet> ReadReturn() { return {}; }
4550

4651
struct MockFactory {
4752
MOCK_METHOD(std::unique_ptr<PartialResultSetReader>, MakeReader,
@@ -85,7 +90,7 @@ TEST(PartialResultSetResume, Success) {
8590
auto mock = absl::make_unique<MockPartialResultSetReader>();
8691
EXPECT_CALL(*mock, Read())
8792
.WillOnce([&response] { return ReadReturn(response); })
88-
.WillOnce(Return(ReadReturn{}));
93+
.WillOnce(Return(ReadReturn()));
8994
EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status()));
9095
return mock;
9196
});
@@ -96,7 +101,7 @@ TEST(PartialResultSetResume, Success) {
96101
auto reader = MakeTestResume(factory, Idempotency::kIdempotent);
97102
auto v = reader->Read();
98103
ASSERT_TRUE(v.has_value());
99-
EXPECT_THAT(*v, IsProtoEqual(response));
104+
EXPECT_THAT(v->result, IsProtoEqual(response));
100105
v = reader->Read();
101106
ASSERT_FALSE(v.has_value());
102107
auto status = reader->Finish();
@@ -135,7 +140,7 @@ TEST(PartialResultSetResume, SuccessWithRestart) {
135140
auto mock = absl::make_unique<MockPartialResultSetReader>();
136141
EXPECT_CALL(*mock, Read())
137142
.WillOnce([&r0] { return ReadReturn(r0); })
138-
.WillOnce(Return(ReadReturn{}));
143+
.WillOnce(Return(ReadReturn()));
139144
EXPECT_CALL(*mock, Finish())
140145
.WillOnce(Return(Status(StatusCode::kUnavailable, "try-again-0")));
141146
return mock;
@@ -145,15 +150,15 @@ TEST(PartialResultSetResume, SuccessWithRestart) {
145150
auto mock = absl::make_unique<MockPartialResultSetReader>();
146151
EXPECT_CALL(*mock, Read())
147152
.WillOnce([&r1] { return ReadReturn(r1); })
148-
.WillOnce(Return(ReadReturn{}));
153+
.WillOnce(Return(ReadReturn()));
149154
EXPECT_CALL(*mock, Finish())
150155
.WillOnce(Return(Status(StatusCode::kUnavailable, "try-again-1")));
151156
return mock;
152157
})
153158
.WillOnce([](std::string const& token) {
154159
EXPECT_EQ("test-token-1", token);
155160
auto mock = absl::make_unique<MockPartialResultSetReader>();
156-
EXPECT_CALL(*mock, Read()).WillOnce(Return(ReadReturn{}));
161+
EXPECT_CALL(*mock, Read()).WillOnce(Return(ReadReturn()));
157162
EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status()));
158163
return mock;
159164
});
@@ -164,10 +169,10 @@ TEST(PartialResultSetResume, SuccessWithRestart) {
164169
auto reader = MakeTestResume(factory, Idempotency::kIdempotent);
165170
auto v = reader->Read();
166171
ASSERT_TRUE(v.has_value());
167-
EXPECT_THAT(*v, IsProtoEqual(r0));
172+
EXPECT_THAT(v->result, IsProtoEqual(r0));
168173
v = reader->Read();
169174
ASSERT_TRUE(v.has_value());
170-
EXPECT_THAT(*v, IsProtoEqual(r1));
175+
EXPECT_THAT(v->result, IsProtoEqual(r1));
171176
v = reader->Read();
172177
ASSERT_FALSE(v.has_value());
173178
auto status = reader->Finish();
@@ -199,15 +204,15 @@ TEST(PartialResultSetResume, PermanentError) {
199204
auto mock = absl::make_unique<MockPartialResultSetReader>();
200205
EXPECT_CALL(*mock, Read())
201206
.WillOnce([&r0] { return ReadReturn(r0); })
202-
.WillOnce(Return(ReadReturn{}));
207+
.WillOnce(Return(ReadReturn()));
203208
EXPECT_CALL(*mock, Finish())
204209
.WillOnce(Return(Status(StatusCode::kUnavailable, "try-again-0")));
205210
return mock;
206211
})
207212
.WillOnce([](std::string const& token) {
208213
EXPECT_EQ("test-token-0", token);
209214
auto mock = absl::make_unique<MockPartialResultSetReader>();
210-
EXPECT_CALL(*mock, Read()).WillOnce(Return(ReadReturn{}));
215+
EXPECT_CALL(*mock, Read()).WillOnce(Return(ReadReturn()));
211216
EXPECT_CALL(*mock, Finish())
212217
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh-1")));
213218
return mock;
@@ -219,7 +224,7 @@ TEST(PartialResultSetResume, PermanentError) {
219224
auto reader = MakeTestResume(factory, Idempotency::kIdempotent);
220225
auto v = reader->Read();
221226
ASSERT_TRUE(v.has_value());
222-
EXPECT_THAT(*v, IsProtoEqual(r0));
227+
EXPECT_THAT(v->result, IsProtoEqual(r0));
223228
v = reader->Read();
224229
ASSERT_FALSE(v.has_value());
225230
auto status = reader->Finish();
@@ -251,7 +256,7 @@ TEST(PartialResultSetResume, TransientNonIdempotent) {
251256
auto mock = absl::make_unique<MockPartialResultSetReader>();
252257
EXPECT_CALL(*mock, Read())
253258
.WillOnce([&r0] { return ReadReturn(r0); })
254-
.WillOnce(Return(ReadReturn{}));
259+
.WillOnce(Return(ReadReturn()));
255260
EXPECT_CALL(*mock, Finish())
256261
.WillOnce(Return(Status(StatusCode::kUnavailable, "try-again-0")));
257262
return mock;
@@ -263,7 +268,7 @@ TEST(PartialResultSetResume, TransientNonIdempotent) {
263268
auto reader = MakeTestResume(factory, Idempotency::kNonIdempotent);
264269
auto v = reader->Read();
265270
ASSERT_TRUE(v.has_value());
266-
EXPECT_THAT(*v, IsProtoEqual(r0));
271+
EXPECT_THAT(v->result, IsProtoEqual(r0));
267272
v = reader->Read();
268273
ASSERT_FALSE(v.has_value());
269274
auto status = reader->Finish();
@@ -278,7 +283,7 @@ TEST(PartialResultSetResume, TooManyTransients) {
278283
.WillRepeatedly([](std::string const& token) {
279284
EXPECT_TRUE(token.empty());
280285
auto mock = absl::make_unique<MockPartialResultSetReader>();
281-
EXPECT_CALL(*mock, Read()).WillOnce(Return(ReadReturn{}));
286+
EXPECT_CALL(*mock, Read()).WillOnce(Return(ReadReturn()));
282287
EXPECT_CALL(*mock, Finish())
283288
.WillOnce(Return(Status(StatusCode::kUnavailable, "try-again-N")));
284289
return mock;

google/cloud/spanner/internal/partial_result_set_source.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ Status PartialResultSetSource::ReadFromStream() {
105105
return reader_->Finish();
106106
}
107107

108-
if (result_set->has_metadata()) {
108+
if (result_set->result.has_metadata()) {
109109
// If we got metadata more than once, log it, but use the first one.
110110
if (metadata_) {
111111
GCP_LOG(WARNING) << "Unexpectedly received two sets of metadata";
112112
} else {
113-
metadata_ = std::move(*result_set->mutable_metadata());
113+
metadata_ = std::move(*result_set->result.mutable_metadata());
114114
// Copies the column names into a shared_ptr that will be shared with
115115
// every Row object returned from NextRow().
116116
columns_ = std::make_shared<std::vector<std::string>>();
@@ -120,15 +120,23 @@ Status PartialResultSetSource::ReadFromStream() {
120120
}
121121
}
122122

123-
if (result_set->has_stats()) {
123+
if (result_set->result.has_stats()) {
124124
// If we got stats more than once, log it, but use the last one.
125125
if (stats_) {
126126
GCP_LOG(WARNING) << "Unexpectedly received two sets of stats";
127127
}
128-
stats_ = std::move(*result_set->mutable_stats());
128+
stats_ = std::move(*result_set->result.mutable_stats());
129129
}
130130

131-
auto& new_values = *result_set->mutable_values();
131+
// If reader_->Read() resulted in a new PartialResultSetReader (i.e., it
132+
// used the last resume_token to resume an interrupted stream), then we
133+
// must clear the buffered partial-row data as it will be replayed.
134+
if (result_set->resumption) {
135+
buffer_.clear();
136+
chunk_ = {};
137+
}
138+
139+
auto& new_values = *result_set->result.mutable_values();
132140

133141
// Merge values if necessary, as described in:
134142
// https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.PartialResultSet
@@ -163,7 +171,7 @@ Status PartialResultSetSource::ReadFromStream() {
163171
chunk_ = {};
164172
}
165173

166-
if (result_set->chunked_value()) {
174+
if (result_set->result.chunked_value()) {
167175
if (new_values.empty()) {
168176
return Status(StatusCode::kInternal,
169177
"PartialResultSet had chunked_value "

0 commit comments

Comments
 (0)