Skip to content

Commit eea3715

Browse files
authored
impl(bigtable): add ScheduleStubRefresh function using AsyncPingAndWarm (#16029)
1 parent 282a931 commit eea3715

3 files changed

Lines changed: 150 additions & 5 deletions

File tree

google/cloud/bigtable/internal/connection_refresh_state.cc

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ namespace {
3030
*/
3131
auto constexpr kConnectionReadyTimeout = std::chrono::seconds(10);
3232

33+
void LogFailedConnectionRefresh(Status const& conn_status) {
34+
if (!conn_status.ok()) {
35+
GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << conn_status;
36+
}
37+
}
38+
3339
} // namespace
3440

3541
ConnectionRefreshState::ConnectionRefreshState(
@@ -81,10 +87,7 @@ void ScheduleChannelRefresh(
8187
std::chrono::system_clock::now() + kConnectionReadyTimeout)
8288
.then([weak_channel, weak_cq_impl, state](future<Status> fut) {
8389
auto conn_status = fut.get();
84-
if (!conn_status.ok()) {
85-
GCP_LOG(WARNING) << "Failed to refresh connection. Error: "
86-
<< conn_status;
87-
}
90+
LogFailedConnectionRefresh(conn_status);
8891
auto channel = weak_channel.lock();
8992
if (!channel) return;
9093
auto cq_impl = weak_cq_impl.lock();
@@ -95,6 +98,65 @@ void ScheduleChannelRefresh(
9598
state->timers().RegisterTimer(std::move(timer_future));
9699
}
97100

101+
void ScheduleStubRefresh(
102+
std::shared_ptr<internal::CompletionQueueImpl> const& cq_impl,
103+
std::shared_ptr<ConnectionRefreshState> const& state,
104+
std::shared_ptr<BigtableStub> const& stub, std::string const& instance_name,
105+
std::function<void(Status const&)> connection_status_fn) {
106+
if (!connection_status_fn) {
107+
connection_status_fn = LogFailedConnectionRefresh;
108+
}
109+
// The timers will only hold weak pointers to the channel or to the
110+
// completion queue, so if either of them are destroyed, the timer chain
111+
// will simply not continue.
112+
std::weak_ptr<BigtableStub> weak_stub(stub);
113+
std::weak_ptr<internal::CompletionQueueImpl> weak_cq_impl(cq_impl);
114+
auto cq = CompletionQueue(cq_impl);
115+
using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
116+
auto timer_future =
117+
cq.MakeRelativeTimer(state->RandomizedRefreshDelay())
118+
.then([weak_stub, weak_cq_impl, state, instance_name,
119+
connection_status_fn =
120+
std::move(connection_status_fn)](TimerFuture fut) mutable {
121+
if (!fut.get()) {
122+
// Timer cancelled.
123+
return;
124+
}
125+
auto stub = weak_stub.lock();
126+
if (!stub) return;
127+
auto cq_impl = weak_cq_impl.lock();
128+
if (!cq_impl) return;
129+
auto cq = CompletionQueue(cq_impl);
130+
131+
auto client_context = std::make_shared<grpc::ClientContext>();
132+
google::cloud::internal::ImmutableOptions options;
133+
google::bigtable::v2::PingAndWarmRequest request;
134+
request.set_name(instance_name);
135+
// Use the client_context to set a deadline similar to
136+
// AsyncWaitConnectionReady.
137+
client_context->set_deadline(std::chrono::system_clock::now() +
138+
kConnectionReadyTimeout);
139+
stub->AsyncPingAndWarm(cq, client_context, std::move(options),
140+
request)
141+
.then(
142+
[weak_stub, weak_cq_impl, state, instance_name,
143+
connection_status_fn = std::move(connection_status_fn)](
144+
future<
145+
StatusOr<google::bigtable::v2::PingAndWarmResponse>>
146+
fut) mutable {
147+
auto response = fut.get();
148+
connection_status_fn(response.status());
149+
auto stub = weak_stub.lock();
150+
if (!stub) return;
151+
auto cq_impl = weak_cq_impl.lock();
152+
if (!cq_impl) return;
153+
ScheduleStubRefresh(cq_impl, state, stub, instance_name,
154+
std::move(connection_status_fn));
155+
});
156+
});
157+
state->timers().RegisterTimer(std::move(timer_future));
158+
}
159+
98160
void OutstandingTimers::RegisterTimer(future<void> fut) {
99161
std::unique_lock<std::mutex> lk(mu_);
100162
if (shutdown_) {

google/cloud/bigtable/internal/connection_refresh_state.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CONNECTION_REFRESH_STATE_H
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CONNECTION_REFRESH_STATE_H
1717

18+
#include "google/cloud/bigtable/internal/bigtable_stub.h"
1819
#include "google/cloud/bigtable/version.h"
1920
#include "google/cloud/completion_queue.h"
2021
#include "google/cloud/future.h"
@@ -81,13 +82,23 @@ class ConnectionRefreshState {
8182
};
8283

8384
/**
84-
* Schedule a chain of timers to refresh the connection.
85+
* Schedule a chain of timers to refresh the grpc::Channel using
86+
* AsyncWaitConnectionReady which leverages grpc::Channel::GetState.
8587
*/
8688
void ScheduleChannelRefresh(
8789
std::shared_ptr<internal::CompletionQueueImpl> const& cq,
8890
std::shared_ptr<ConnectionRefreshState> const& state,
8991
std::shared_ptr<grpc::Channel> const& channel);
9092

93+
/**
94+
* Schedule a chain of timers to refresh the BigtableStub using PingAndWarm.
95+
*/
96+
void ScheduleStubRefresh(
97+
std::shared_ptr<internal::CompletionQueueImpl> const& cq,
98+
std::shared_ptr<ConnectionRefreshState> const& state,
99+
std::shared_ptr<BigtableStub> const& stub, std::string const& instance_name,
100+
std::function<void(Status const&)> connection_status_fn = {});
101+
91102
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
92103
} // namespace bigtable_internal
93104
} // namespace cloud

google/cloud/bigtable/internal/connection_refresh_state_test.cc

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,21 @@
1212
// See the License for the specific language governing permissions and
1313

1414
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
15+
#include "google/cloud/bigtable/testing/mock_bigtable_stub.h"
1516
#include "google/cloud/testing_util/status_matchers.h"
1617
#include <gmock/gmock.h>
18+
#include <thread>
1719

1820
namespace google {
1921
namespace cloud {
2022
namespace bigtable_internal {
2123
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2224

25+
using ::google::cloud::bigtable::testing::MockBigtableStub;
26+
using ::google::cloud::testing_util::IsOk;
27+
using ::testing::Eq;
28+
using ::testing::MockFunction;
29+
2330
using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
2431

2532
class OutstandingTimersTest : public ::testing::Test {
@@ -118,6 +125,71 @@ TEST(ConnectionRefreshState, Disabled) {
118125
EXPECT_FALSE(state.enabled());
119126
}
120127

128+
class ScheduleStubRefreshTest : public ::testing::Test {
129+
public:
130+
ScheduleStubRefreshTest() : thread1_([this] { cq_.Run(); }) {}
131+
~ScheduleStubRefreshTest() override {
132+
cq_.Shutdown();
133+
thread1_.join();
134+
}
135+
136+
protected:
137+
CompletionQueue cq_;
138+
std::thread thread1_;
139+
};
140+
141+
TEST_F(ScheduleStubRefreshTest, RefreshedUsingAsyncPingAndWarm) {
142+
auto cq_impl = internal::GetCompletionQueueImpl(cq_);
143+
auto refresh_state = std::make_shared<ConnectionRefreshState>(
144+
cq_impl, std::chrono::milliseconds(1), std::chrono::milliseconds(2));
145+
std::string instance_name = "projects/my-project/instances/my-instance";
146+
147+
// These promises are used to coordinate thread execution to ensure the test
148+
// does not finish before the CompletionQueue thread executes all the tasks
149+
// on the queue.
150+
promise<void> p;
151+
promise<StatusOr<google::bigtable::v2::PingAndWarmResponse>> p2;
152+
153+
auto mock_stub = std::make_shared<MockBigtableStub>();
154+
EXPECT_CALL(*mock_stub, AsyncPingAndWarm)
155+
.WillRepeatedly(
156+
[&](CompletionQueue&, std::shared_ptr<grpc::ClientContext> const&,
157+
internal::ImmutableOptions const&,
158+
google::bigtable::v2::PingAndWarmRequest const& request)
159+
-> future<StatusOr<google::bigtable::v2::PingAndWarmResponse>> {
160+
EXPECT_THAT(request.name(), Eq(instance_name));
161+
return p2.get_future();
162+
});
163+
164+
MockFunction<void(Status const&)> mock_fn;
165+
EXPECT_CALL(mock_fn, Call).WillOnce([&p](Status const& s) -> void {
166+
EXPECT_THAT(s, IsOk());
167+
p.set_value();
168+
});
169+
170+
ScheduleStubRefresh(cq_impl, refresh_state, mock_stub, instance_name,
171+
mock_fn.AsStdFunction());
172+
p2.set_value(google::bigtable::v2::PingAndWarmResponse{});
173+
p.get_future().get();
174+
}
175+
176+
TEST_F(ScheduleStubRefreshTest, RefreshTimerCancelled) {
177+
auto cq_impl = internal::GetCompletionQueueImpl(cq_);
178+
auto refresh_state = std::make_shared<ConnectionRefreshState>(
179+
cq_impl, std::chrono::seconds(60), std::chrono::seconds(120));
180+
std::string instance_name = "projects/my-project/instances/my-instance";
181+
182+
auto mock_stub = std::make_shared<MockBigtableStub>();
183+
EXPECT_CALL(*mock_stub, AsyncPingAndWarm).Times(0);
184+
185+
MockFunction<void(Status const&)> mock_fn;
186+
EXPECT_CALL(mock_fn, Call).Times(0);
187+
188+
ScheduleStubRefresh(cq_impl, refresh_state, mock_stub, instance_name,
189+
mock_fn.AsStdFunction());
190+
cq_impl->CancelAll();
191+
}
192+
121193
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
122194
} // namespace bigtable_internal
123195
} // namespace cloud

0 commit comments

Comments
 (0)