@@ -30,6 +30,12 @@ namespace {
3030 */
3131auto constexpr kConnectionReadyTimeout = std::chrono::seconds(10 );
3232
33+ void LogFailedConnectionRefresh (Status const & conn_status) {
34+ if (!conn_status.ok ()) {
35+ GCP_LOG (WARNING) << " Failed to refresh connection. Error: " << conn_status;
36+ }
37+ }
38+
3339} // namespace
3440
3541ConnectionRefreshState::ConnectionRefreshState (
@@ -81,10 +87,7 @@ void ScheduleChannelRefresh(
8187 std::chrono::system_clock::now () + kConnectionReadyTimeout )
8288 .then ([weak_channel, weak_cq_impl, state](future<Status> fut) {
8389 auto conn_status = fut.get ();
84- if (!conn_status.ok ()) {
85- GCP_LOG (WARNING) << " Failed to refresh connection. Error: "
86- << conn_status;
87- }
90+ LogFailedConnectionRefresh (conn_status);
8891 auto channel = weak_channel.lock ();
8992 if (!channel) return ;
9093 auto cq_impl = weak_cq_impl.lock ();
@@ -95,6 +98,65 @@ void ScheduleChannelRefresh(
9598 state->timers ().RegisterTimer (std::move (timer_future));
9699}
97100
101+ void ScheduleStubRefresh (
102+ std::shared_ptr<internal::CompletionQueueImpl> const & cq_impl,
103+ std::shared_ptr<ConnectionRefreshState> const & state,
104+ std::shared_ptr<BigtableStub> const & stub, std::string const & instance_name,
105+ std::function<void (Status const &)> connection_status_fn) {
106+ if (!connection_status_fn) {
107+ connection_status_fn = LogFailedConnectionRefresh;
108+ }
109+ // The timers will only hold weak pointers to the channel or to the
110+ // completion queue, so if either of them are destroyed, the timer chain
111+ // will simply not continue.
112+ std::weak_ptr<BigtableStub> weak_stub (stub);
113+ std::weak_ptr<internal::CompletionQueueImpl> weak_cq_impl (cq_impl);
114+ auto cq = CompletionQueue (cq_impl);
115+ using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
116+ auto timer_future =
117+ cq.MakeRelativeTimer (state->RandomizedRefreshDelay ())
118+ .then ([weak_stub, weak_cq_impl, state, instance_name,
119+ connection_status_fn =
120+ std::move (connection_status_fn)](TimerFuture fut) mutable {
121+ if (!fut.get ()) {
122+ // Timer cancelled.
123+ return ;
124+ }
125+ auto stub = weak_stub.lock ();
126+ if (!stub) return ;
127+ auto cq_impl = weak_cq_impl.lock ();
128+ if (!cq_impl) return ;
129+ auto cq = CompletionQueue (cq_impl);
130+
131+ auto client_context = std::make_shared<grpc::ClientContext>();
132+ google::cloud::internal::ImmutableOptions options;
133+ google::bigtable::v2::PingAndWarmRequest request;
134+ request.set_name (instance_name);
135+ // Use the client_context to set a deadline similar to
136+ // AsyncWaitConnectionReady.
137+ client_context->set_deadline (std::chrono::system_clock::now () +
138+ kConnectionReadyTimeout );
139+ stub->AsyncPingAndWarm (cq, client_context, std::move (options),
140+ request)
141+ .then (
142+ [weak_stub, weak_cq_impl, state, instance_name,
143+ connection_status_fn = std::move (connection_status_fn)](
144+ future<
145+ StatusOr<google::bigtable::v2::PingAndWarmResponse>>
146+ fut) mutable {
147+ auto response = fut.get ();
148+ connection_status_fn (response.status ());
149+ auto stub = weak_stub.lock ();
150+ if (!stub) return ;
151+ auto cq_impl = weak_cq_impl.lock ();
152+ if (!cq_impl) return ;
153+ ScheduleStubRefresh (cq_impl, state, stub, instance_name,
154+ std::move (connection_status_fn));
155+ });
156+ });
157+ state->timers ().RegisterTimer (std::move (timer_future));
158+ }
159+
98160void OutstandingTimers::RegisterTimer (future<void > fut) {
99161 std::unique_lock<std::mutex> lk (mu_);
100162 if (shutdown_) {
0 commit comments