1717#include " google/cloud/bigtable/internal/bigtable_channel_refresh.h"
1818#include " google/cloud/bigtable/internal/bigtable_logging_decorator.h"
1919#include " google/cloud/bigtable/internal/bigtable_metadata_decorator.h"
20+ #include " google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h"
2021#include " google/cloud/bigtable/internal/bigtable_round_robin_decorator.h"
2122#include " google/cloud/bigtable/internal/bigtable_tracing_stub.h"
2223#include " google/cloud/bigtable/internal/connection_refresh_state.h"
@@ -60,56 +61,154 @@ std::string FeaturesMetadata() {
6061 return *kFeatures ;
6162}
6263
64+ std::shared_ptr<BigtableStub> ApplyCommonDecorators (
65+ std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
66+ std::shared_ptr<BigtableStub> stub, Options const & options) {
67+ if (auth->RequiresConfigureContext ()) {
68+ stub = std::make_shared<BigtableAuth>(std::move (auth), std::move (stub));
69+ }
70+ stub = std::make_shared<BigtableMetadata>(
71+ std::move (stub),
72+ std::multimap<std::string, std::string>{
73+ {" bigtable-features" , FeaturesMetadata ()}},
74+ internal::HandCraftedLibClientHeader ());
75+ if (internal::Contains (options.get <LoggingComponentsOption>(), " rpc" )) {
76+ GCP_LOG (INFO) << " Enabled logging for gRPC calls" ;
77+ stub = std::make_shared<BigtableLogging>(
78+ std::move (stub), options.get <GrpcTracingOptionsOption>(),
79+ options.get <LoggingComponentsOption>());
80+ }
81+ if (internal::TracingEnabled (options)) {
82+ stub = MakeBigtableTracingStub (std::move (stub));
83+ }
84+ return stub;
85+ }
86+
6387} // namespace
6488
6589std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin (
66- Options const & options,
67- std::function<std::shared_ptr<BigtableStub>( int )> child_factory ) {
90+ Options const & options, std::function<std::shared_ptr<BigtableStub>( int )>
91+ refreshing_channel_stub_factory ) {
6892 std::vector<std::shared_ptr<BigtableStub>> children (
6993 (std::max)(1 , options.get <GrpcNumChannelsOption>()));
7094 int id = 0 ;
7195 std::generate (children.begin (), children.end (),
72- [&id, &child_factory] { return child_factory (id++); });
96+ [&id, &refreshing_channel_stub_factory] {
97+ return refreshing_channel_stub_factory (id++);
98+ });
7399 return std::make_shared<BigtableRoundRobin>(std::move (children));
74100}
75101
102+ std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed (
103+ std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
104+ std::shared_ptr<internal::CompletionQueueImpl> cq_impl,
105+ std::string_view instance_name, StubManager::Priming priming,
106+ Options const & options, BaseBigtableStubFactory stub_factory,
107+ std::shared_ptr<ConnectionRefreshState> refresh_state) {
108+ auto refreshing_channel_stub_factory =
109+ [stub_factory = std::move (stub_factory), cq_impl, refresh_state,
110+ auth = std::move (auth),
111+ options](std::uint32_t id, std::string_view instance_name,
112+ StubManager::Priming priming)
113+ -> StatusOr<std::shared_ptr<ChannelUsage<BigtableStub>>> {
114+ auto wrapper = std::make_shared<ChannelUsage<BigtableStub>>();
115+ auto connection_status_fn = [weak = wrapper->MakeWeak ()](Status const & s) {
116+ if (auto self = weak.lock ()) {
117+ self->set_last_refresh_status (s);
118+ }
119+ if (!s.ok ()) {
120+ GCP_LOG (WARNING) << " Failed to refresh connection. Error: " << s;
121+ }
122+ };
123+
124+ auto channel = CreateGrpcChannel (*auth, options, id);
125+ auto stub = stub_factory (std::move (channel));
126+ if (priming == StubManager::Priming::kSynchronousPriming ) {
127+ grpc::ClientContext client_context;
128+ google::bigtable::v2::PingAndWarmRequest request;
129+ request.set_name (std::string{instance_name});
130+ auto response =
131+ stub->PingAndWarm (client_context, options, std::move (request));
132+ if (!response.ok ()) return response.status ();
133+ }
134+
135+ ScheduleStubRefresh (cq_impl, refresh_state, stub,
136+ std::string{instance_name},
137+ std::move (connection_status_fn));
138+
139+ wrapper->set_stub (std::move (stub));
140+ return wrapper;
141+ };
142+
143+ std::vector<std::shared_ptr<ChannelUsage<BigtableStub>>> children;
144+ children.reserve (std::max (1 , options.get <GrpcNumChannelsOption>()));
145+ std::uint32_t id = 0 ;
146+ for (std::uint32_t i = 0 ; i < children.capacity (); ++i) {
147+ auto stub = refreshing_channel_stub_factory (id++, instance_name, priming);
148+ if (stub.ok ()) {
149+ children.push_back (*std::move (stub));
150+ }
151+ }
152+
153+ return std::make_shared<BigtableRandomTwoLeastUsed>(
154+ DynamicChannelPool<BigtableStub>::Create (
155+ std::string{instance_name}, CompletionQueue (std::move (cq_impl)),
156+ std::move (children), std::move (refresh_state),
157+ std::move (refreshing_channel_stub_factory)));
158+ }
159+
160+ std::shared_ptr<BigtableStub> CreateDecoratedStubs (
161+ std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
162+ CompletionQueue const & cq, std::string_view instance_name,
163+ StubManager::Priming priming, Options const & options,
164+ BaseBigtableStubFactory const & stub_factory) {
165+ auto cq_impl = internal::GetCompletionQueueImpl (cq);
166+ auto refresh = std::make_shared<ConnectionRefreshState>(
167+ cq_impl, options.get <bigtable::MinConnectionRefreshOption>(),
168+ options.get <bigtable::MaxConnectionRefreshOption>());
169+ auto stub = CreateBigtableStubRandomTwoLeastUsed (
170+ auth, std::move (cq_impl), instance_name, priming, options, stub_factory,
171+ std::move (refresh));
172+ return ApplyCommonDecorators (std::move (auth), std::move (stub), options);
173+ }
174+
76175std::shared_ptr<BigtableStub> CreateDecoratedStubs (
77176 std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
78177 CompletionQueue const & cq, Options const & options,
79- BaseBigtableStubFactory const & base_factory ) {
178+ BaseBigtableStubFactory const & stub_factory ) {
80179 auto cq_impl = internal::GetCompletionQueueImpl (cq);
81180 auto refresh = std::make_shared<ConnectionRefreshState>(
82181 cq_impl, options.get <bigtable::MinConnectionRefreshOption>(),
83182 options.get <bigtable::MaxConnectionRefreshOption>());
84- auto child_factory = [base_factory, cq_impl, refresh, &auth,
85- options](int id) {
183+ // Cannot use Dynamic Channel Pool as it requires affinity.
184+ auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, &auth,
185+ options](int id) {
86186 auto channel = CreateGrpcChannel (*auth, options, id);
87187 if (refresh->enabled ()) ScheduleChannelRefresh (cq_impl, refresh, channel);
88- return base_factory (std::move (channel));
188+ return stub_factory (std::move (channel));
89189 };
90- auto stub = CreateBigtableStubRoundRobin (options, std::move (child_factory));
190+ auto stub = CreateBigtableStubRoundRobin (
191+ options, std::move (refreshing_channel_stub_factory));
91192 if (refresh->enabled ()) {
92193 stub = std::make_shared<BigtableChannelRefresh>(std::move (stub),
93194 std::move (refresh));
94195 }
95- if (auth->RequiresConfigureContext ()) {
96- stub = std::make_shared<BigtableAuth>(std::move (auth), std::move (stub));
97- }
98- stub = std::make_shared<BigtableMetadata>(
99- std::move (stub),
100- std::multimap<std::string, std::string>{
101- {" bigtable-features" , FeaturesMetadata ()}},
102- internal::HandCraftedLibClientHeader ());
103- if (internal::Contains (options.get <LoggingComponentsOption>(), " rpc" )) {
104- GCP_LOG (INFO) << " Enabled logging for gRPC calls" ;
105- stub = std::make_shared<BigtableLogging>(
106- std::move (stub), options.get <GrpcTracingOptionsOption>(),
107- options.get <LoggingComponentsOption>());
108- }
109- if (internal::TracingEnabled (options)) {
110- stub = MakeBigtableTracingStub (std::move (stub));
196+ return ApplyCommonDecorators (std::move (auth), std::move (stub), options);
197+ }
198+
199+ absl::flat_hash_map<std::string, std::shared_ptr<BigtableStub>>
200+ CreateBigtableAffinityStubs (
201+ std::vector<bigtable::InstanceResource> const & instances,
202+ StubManager::StubCreationFn const & stub_creation_fn) {
203+ absl::flat_hash_map<std::string, std::shared_ptr<BigtableStub>>
204+ affinity_stubs;
205+ for (auto const & instance : instances) {
206+ affinity_stubs.insert (std::make_pair (
207+ instance.FullName (),
208+ stub_creation_fn (instance.FullName (),
209+ StubManager::Priming::kSynchronousPriming )));
111210 }
112- return stub ;
211+ return affinity_stubs ;
113212}
114213
115214std::shared_ptr<BigtableStub> CreateBigtableStub (
@@ -122,6 +221,18 @@ std::shared_ptr<BigtableStub> CreateBigtableStub(
122221 });
123222}
124223
224+ std::shared_ptr<BigtableStub> CreateBigtableStub (
225+ std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
226+ CompletionQueue const & cq, std::string_view instance_name,
227+ StubManager::Priming priming, Options const & options) {
228+ return CreateDecoratedStubs (
229+ std::move (auth), cq, instance_name, priming, options,
230+ [](std::shared_ptr<grpc::Channel> c) {
231+ return std::make_shared<DefaultBigtableStub>(
232+ google::bigtable::v2::Bigtable::NewStub (std::move (c)));
233+ });
234+ }
235+
125236GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
126237} // namespace bigtable_internal
127238} // namespace cloud
0 commit comments