Skip to content

Commit 6e24ab6

Browse files
authored
impl(bigtable): update stub_factory and connection creation functions for dynamic pool (#16052)
1 parent cf2f183 commit 6e24ab6

5 files changed

Lines changed: 332 additions & 40 deletions

File tree

google/cloud/bigtable/data_connection.cc

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,35 @@ std::shared_ptr<DataConnection> MakeDataConnection(Options options) {
184184
google::cloud::internal::MakeBackgroundThreadsFactory(options)();
185185
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
186186
background->cq(), options);
187-
auto stub = bigtable_internal::CreateBigtableStub(std::move(auth),
188-
background->cq(), options);
189187
auto limiter =
190188
bigtable_internal::MakeMutateRowsLimiter(background->cq(), options);
191-
std::shared_ptr<DataConnection> conn =
192-
std::make_shared<bigtable_internal::DataConnectionImpl>(
193-
std::move(background), std::move(stub), std::move(limiter),
194-
std::move(options));
189+
std::shared_ptr<DataConnection> conn;
190+
191+
if (options.has<experimental::InstanceChannelAffinityOption>()) {
192+
auto stub_creation_fn =
193+
[auth, cq = background->cq(), options](
194+
std::string_view instance_name,
195+
bigtable_internal::StubManager::Priming priming) {
196+
return bigtable_internal::CreateBigtableStub(auth, cq, instance_name,
197+
priming, options);
198+
};
199+
200+
auto affinity_stubs = bigtable_internal::CreateBigtableAffinityStubs(
201+
options.get<experimental::InstanceChannelAffinityOption>(),
202+
stub_creation_fn);
203+
conn = std::make_shared<bigtable_internal::DataConnectionImpl>(
204+
std::move(background),
205+
std::make_unique<bigtable_internal::StubManager>(
206+
std::move(affinity_stubs), stub_creation_fn),
207+
std::move(limiter), std::move(options));
208+
} else {
209+
auto stub = bigtable_internal::CreateBigtableStub(
210+
std::move(auth), background->cq(), options);
211+
conn = std::make_shared<bigtable_internal::DataConnectionImpl>(
212+
std::move(background),
213+
std::make_unique<bigtable_internal::StubManager>(std::move(stub)),
214+
std::move(limiter), std::move(options));
215+
}
195216
if (google::cloud::internal::TracingEnabled(conn->options())) {
196217
conn = bigtable_internal::MakeDataTracingConnection(std::move(conn));
197218
}

google/cloud/bigtable/data_connection_test.cc

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/data_connection.h"
16+
#include "google/cloud/bigtable/internal/bigtable_stub_factory.h"
1617
#include "google/cloud/bigtable/options.h"
1718
#include "google/cloud/common_options.h"
1819
#include "google/cloud/credentials.h"
@@ -29,6 +30,12 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2930
namespace {
3031

3132
using ms = std::chrono::milliseconds;
33+
using ::google::cloud::testing_util::DisableTracing;
34+
using ::google::cloud::testing_util::EnableTracing;
35+
using ::google::cloud::testing_util::SpanNamed;
36+
using ::testing::Contains;
37+
using ::testing::Eq;
38+
using ::testing::IsEmpty;
3239

3340
Options TestOptions() {
3441
return Options{}
@@ -54,12 +61,6 @@ TEST(MakeDataConnection, DefaultsOptions) {
5461
<< "User supplied Options are overridden in MakeDataConnection()";
5562
}
5663

57-
using ::google::cloud::testing_util::DisableTracing;
58-
using ::google::cloud::testing_util::EnableTracing;
59-
using ::google::cloud::testing_util::SpanNamed;
60-
using ::testing::Contains;
61-
using ::testing::IsEmpty;
62-
6364
TEST(MakeDataConnection, TracingEnabled) {
6465
auto span_catcher = testing_util::InstallSpanCatcher();
6566

@@ -71,6 +72,24 @@ TEST(MakeDataConnection, TracingEnabled) {
7172
Contains(SpanNamed("bigtable::Table::Apply")));
7273
}
7374

75+
TEST(MakeDataConnection, InstanceChannelAffinityOption) {
76+
InstanceResource instance_a{Project("my-project"), "instance-a"};
77+
InstanceResource instance_b{Project("my-project"), "instance-b"};
78+
auto conn =
79+
MakeDataConnection(TestOptions()
80+
.set<AppProfileIdOption>("user-supplied")
81+
.set<experimental::InstanceChannelAffinityOption>(
82+
{instance_a, instance_b}));
83+
auto options = conn->options();
84+
EXPECT_TRUE(options.has<DataBackoffPolicyOption>())
85+
<< "Options are not defaulted in MakeDataConnection()";
86+
EXPECT_EQ(options.get<AppProfileIdOption>(), "user-supplied")
87+
<< "User supplied Options are overridden in MakeDataConnection()";
88+
ASSERT_TRUE(options.has<experimental::InstanceChannelAffinityOption>());
89+
EXPECT_THAT(options.get<experimental::InstanceChannelAffinityOption>().size(),
90+
Eq(2));
91+
}
92+
7493
TEST(MakeDataConnection, TracingDisabled) {
7594
auto span_catcher = testing_util::InstallSpanCatcher();
7695

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 136 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "google/cloud/bigtable/internal/bigtable_channel_refresh.h"
1818
#include "google/cloud/bigtable/internal/bigtable_logging_decorator.h"
1919
#include "google/cloud/bigtable/internal/bigtable_metadata_decorator.h"
20+
#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h"
2021
#include "google/cloud/bigtable/internal/bigtable_round_robin_decorator.h"
2122
#include "google/cloud/bigtable/internal/bigtable_tracing_stub.h"
2223
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
@@ -60,56 +61,154 @@ std::string FeaturesMetadata() {
6061
return *kFeatures;
6162
}
6263

64+
std::shared_ptr<BigtableStub> ApplyCommonDecorators(
65+
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
66+
std::shared_ptr<BigtableStub> stub, Options const& options) {
67+
if (auth->RequiresConfigureContext()) {
68+
stub = std::make_shared<BigtableAuth>(std::move(auth), std::move(stub));
69+
}
70+
stub = std::make_shared<BigtableMetadata>(
71+
std::move(stub),
72+
std::multimap<std::string, std::string>{
73+
{"bigtable-features", FeaturesMetadata()}},
74+
internal::HandCraftedLibClientHeader());
75+
if (internal::Contains(options.get<LoggingComponentsOption>(), "rpc")) {
76+
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
77+
stub = std::make_shared<BigtableLogging>(
78+
std::move(stub), options.get<GrpcTracingOptionsOption>(),
79+
options.get<LoggingComponentsOption>());
80+
}
81+
if (internal::TracingEnabled(options)) {
82+
stub = MakeBigtableTracingStub(std::move(stub));
83+
}
84+
return stub;
85+
}
86+
6387
} // namespace
6488

6589
std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
66-
Options const& options,
67-
std::function<std::shared_ptr<BigtableStub>(int)> child_factory) {
90+
Options const& options, std::function<std::shared_ptr<BigtableStub>(int)>
91+
refreshing_channel_stub_factory) {
6892
std::vector<std::shared_ptr<BigtableStub>> children(
6993
(std::max)(1, options.get<GrpcNumChannelsOption>()));
7094
int id = 0;
7195
std::generate(children.begin(), children.end(),
72-
[&id, &child_factory] { return child_factory(id++); });
96+
[&id, &refreshing_channel_stub_factory] {
97+
return refreshing_channel_stub_factory(id++);
98+
});
7399
return std::make_shared<BigtableRoundRobin>(std::move(children));
74100
}
75101

102+
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
103+
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
104+
std::shared_ptr<internal::CompletionQueueImpl> cq_impl,
105+
std::string_view instance_name, StubManager::Priming priming,
106+
Options const& options, BaseBigtableStubFactory stub_factory,
107+
std::shared_ptr<ConnectionRefreshState> refresh_state) {
108+
auto refreshing_channel_stub_factory =
109+
[stub_factory = std::move(stub_factory), cq_impl, refresh_state,
110+
auth = std::move(auth),
111+
options](std::uint32_t id, std::string_view instance_name,
112+
StubManager::Priming priming)
113+
-> StatusOr<std::shared_ptr<ChannelUsage<BigtableStub>>> {
114+
auto wrapper = std::make_shared<ChannelUsage<BigtableStub>>();
115+
auto connection_status_fn = [weak = wrapper->MakeWeak()](Status const& s) {
116+
if (auto self = weak.lock()) {
117+
self->set_last_refresh_status(s);
118+
}
119+
if (!s.ok()) {
120+
GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << s;
121+
}
122+
};
123+
124+
auto channel = CreateGrpcChannel(*auth, options, id);
125+
auto stub = stub_factory(std::move(channel));
126+
if (priming == StubManager::Priming::kSynchronousPriming) {
127+
grpc::ClientContext client_context;
128+
google::bigtable::v2::PingAndWarmRequest request;
129+
request.set_name(std::string{instance_name});
130+
auto response =
131+
stub->PingAndWarm(client_context, options, std::move(request));
132+
if (!response.ok()) return response.status();
133+
}
134+
135+
ScheduleStubRefresh(cq_impl, refresh_state, stub,
136+
std::string{instance_name},
137+
std::move(connection_status_fn));
138+
139+
wrapper->set_stub(std::move(stub));
140+
return wrapper;
141+
};
142+
143+
std::vector<std::shared_ptr<ChannelUsage<BigtableStub>>> children;
144+
children.reserve(std::max(1, options.get<GrpcNumChannelsOption>()));
145+
std::uint32_t id = 0;
146+
for (std::uint32_t i = 0; i < children.capacity(); ++i) {
147+
auto stub = refreshing_channel_stub_factory(id++, instance_name, priming);
148+
if (stub.ok()) {
149+
children.push_back(*std::move(stub));
150+
}
151+
}
152+
153+
return std::make_shared<BigtableRandomTwoLeastUsed>(
154+
DynamicChannelPool<BigtableStub>::Create(
155+
std::string{instance_name}, CompletionQueue(std::move(cq_impl)),
156+
std::move(children), std::move(refresh_state),
157+
std::move(refreshing_channel_stub_factory)));
158+
}
159+
160+
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
161+
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
162+
CompletionQueue const& cq, std::string_view instance_name,
163+
StubManager::Priming priming, Options const& options,
164+
BaseBigtableStubFactory const& stub_factory) {
165+
auto cq_impl = internal::GetCompletionQueueImpl(cq);
166+
auto refresh = std::make_shared<ConnectionRefreshState>(
167+
cq_impl, options.get<bigtable::MinConnectionRefreshOption>(),
168+
options.get<bigtable::MaxConnectionRefreshOption>());
169+
auto stub = CreateBigtableStubRandomTwoLeastUsed(
170+
auth, std::move(cq_impl), instance_name, priming, options, stub_factory,
171+
std::move(refresh));
172+
return ApplyCommonDecorators(std::move(auth), std::move(stub), options);
173+
}
174+
76175
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
77176
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
78177
CompletionQueue const& cq, Options const& options,
79-
BaseBigtableStubFactory const& base_factory) {
178+
BaseBigtableStubFactory const& stub_factory) {
80179
auto cq_impl = internal::GetCompletionQueueImpl(cq);
81180
auto refresh = std::make_shared<ConnectionRefreshState>(
82181
cq_impl, options.get<bigtable::MinConnectionRefreshOption>(),
83182
options.get<bigtable::MaxConnectionRefreshOption>());
84-
auto child_factory = [base_factory, cq_impl, refresh, &auth,
85-
options](int id) {
183+
// Cannot use Dynamic Channel Pool as it requires affinity.
184+
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, &auth,
185+
options](int id) {
86186
auto channel = CreateGrpcChannel(*auth, options, id);
87187
if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel);
88-
return base_factory(std::move(channel));
188+
return stub_factory(std::move(channel));
89189
};
90-
auto stub = CreateBigtableStubRoundRobin(options, std::move(child_factory));
190+
auto stub = CreateBigtableStubRoundRobin(
191+
options, std::move(refreshing_channel_stub_factory));
91192
if (refresh->enabled()) {
92193
stub = std::make_shared<BigtableChannelRefresh>(std::move(stub),
93194
std::move(refresh));
94195
}
95-
if (auth->RequiresConfigureContext()) {
96-
stub = std::make_shared<BigtableAuth>(std::move(auth), std::move(stub));
97-
}
98-
stub = std::make_shared<BigtableMetadata>(
99-
std::move(stub),
100-
std::multimap<std::string, std::string>{
101-
{"bigtable-features", FeaturesMetadata()}},
102-
internal::HandCraftedLibClientHeader());
103-
if (internal::Contains(options.get<LoggingComponentsOption>(), "rpc")) {
104-
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
105-
stub = std::make_shared<BigtableLogging>(
106-
std::move(stub), options.get<GrpcTracingOptionsOption>(),
107-
options.get<LoggingComponentsOption>());
108-
}
109-
if (internal::TracingEnabled(options)) {
110-
stub = MakeBigtableTracingStub(std::move(stub));
196+
return ApplyCommonDecorators(std::move(auth), std::move(stub), options);
197+
}
198+
199+
absl::flat_hash_map<std::string, std::shared_ptr<BigtableStub>>
200+
CreateBigtableAffinityStubs(
201+
std::vector<bigtable::InstanceResource> const& instances,
202+
StubManager::StubCreationFn const& stub_creation_fn) {
203+
absl::flat_hash_map<std::string, std::shared_ptr<BigtableStub>>
204+
affinity_stubs;
205+
for (auto const& instance : instances) {
206+
affinity_stubs.insert(std::make_pair(
207+
instance.FullName(),
208+
stub_creation_fn(instance.FullName(),
209+
StubManager::Priming::kSynchronousPriming)));
111210
}
112-
return stub;
211+
return affinity_stubs;
113212
}
114213

115214
std::shared_ptr<BigtableStub> CreateBigtableStub(
@@ -122,6 +221,18 @@ std::shared_ptr<BigtableStub> CreateBigtableStub(
122221
});
123222
}
124223

224+
std::shared_ptr<BigtableStub> CreateBigtableStub(
225+
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
226+
CompletionQueue const& cq, std::string_view instance_name,
227+
StubManager::Priming priming, Options const& options) {
228+
return CreateDecoratedStubs(
229+
std::move(auth), cq, instance_name, priming, options,
230+
[](std::shared_ptr<grpc::Channel> c) {
231+
return std::make_shared<DefaultBigtableStub>(
232+
google::bigtable::v2::Bigtable::NewStub(std::move(c)));
233+
});
234+
}
235+
125236
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
126237
} // namespace bigtable_internal
127238
} // namespace cloud

0 commit comments

Comments
 (0)