Skip to content

Commit b48e4b8

Browse files
authored
impl(pubsub): use subscriber tracing connection (googleapis#13308)
* impl(pubsub): use subscriber tracing connection * clang * clang my bff * fix
1 parent ba8ea04 commit b48e4b8

2 files changed

Lines changed: 131 additions & 4 deletions

File tree

google/cloud/pubsub/subscriber_connection.cc

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
#include "google/cloud/pubsub/internal/subscriber_metadata_decorator.h"
2222
#include "google/cloud/pubsub/internal/subscriber_round_robin_decorator.h"
2323
#include "google/cloud/pubsub/internal/subscriber_stub_factory.h"
24+
#include "google/cloud/pubsub/internal/subscriber_tracing_connection.h"
2425
#include "google/cloud/pubsub/options.h"
2526
#include "google/cloud/pubsub/retry_policy.h"
2627
#include "google/cloud/credentials.h"
2728
#include "google/cloud/internal/api_client_header.h"
2829
#include "google/cloud/internal/make_status.h"
30+
#include "google/cloud/internal/opentelemetry.h"
2931
#include "google/cloud/internal/random.h"
3032
#include "google/cloud/log.h"
3133
#include <algorithm>
@@ -35,6 +37,23 @@ namespace google {
3537
namespace cloud {
3638
namespace pubsub {
3739
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
40+
namespace {
41+
42+
std::shared_ptr<pubsub::SubscriberConnection> ConnectionFromDecoratedStub(
43+
std::shared_ptr<pubsub_internal::SubscriberStub> stub,
44+
Options const& opts) {
45+
auto tracing_enabled = google::cloud::internal::TracingEnabled(opts);
46+
std::shared_ptr<SubscriberConnection> connection =
47+
std::make_shared<pubsub_internal::SubscriberConnectionImpl>(
48+
opts, std::move(stub));
49+
if (tracing_enabled) {
50+
connection =
51+
pubsub_internal::MakeSubscriberTracingConnection(std::move(connection));
52+
}
53+
return connection;
54+
}
55+
56+
} // namespace
3857

3958
SubscriberConnection::~SubscriberConnection() = default;
4059

@@ -73,8 +92,7 @@ std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
7392
auto background = internal::MakeBackgroundThreadsFactory(opts)();
7493
auto stub =
7594
pubsub_internal::MakeRoundRobinSubscriberStub(background->cq(), opts);
76-
return std::make_shared<pubsub_internal::SubscriberConnectionImpl>(
77-
std::move(opts), std::move(stub));
95+
return ConnectionFromDecoratedStub(std::move(stub), std::move(opts));
7896
}
7997

8098
std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
@@ -103,8 +121,7 @@ std::shared_ptr<pubsub::SubscriberConnection> MakeTestSubscriberConnection(
103121
auto stub = pubsub_internal::MakeTestSubscriberStub(background->cq(), opts,
104122
std::move(stubs));
105123
opts.set<pubsub::SubscriptionOption>(std::move(subscription));
106-
return std::make_shared<SubscriberConnectionImpl>(std::move(opts),
107-
std::move(stub));
124+
return pubsub::ConnectionFromDecoratedStub(std::move(stub), std::move(opts));
108125
}
109126

110127
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/pubsub/subscriber_connection_test.cc

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
#include "google/cloud/pubsub/testing/fake_streaming_pull.h"
2020
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
2121
#include "google/cloud/pubsub/testing/test_retry_policies.h"
22+
#include "google/cloud/credentials.h"
2223
#include "google/cloud/internal/api_client_header.h"
24+
#include "google/cloud/internal/opentelemetry.h"
2325
#include "google/cloud/internal/url_encode.h"
26+
#include "google/cloud/testing_util/opentelemetry_matchers.h"
2427
#include "google/cloud/testing_util/scoped_log.h"
2528
#include "google/cloud/testing_util/status_matchers.h"
2629
#include "google/cloud/testing_util/validate_metadata.h"
30+
#include "google/cloud/testing_util/validate_propagator.h"
2731
#include <gmock/gmock.h>
2832
#include <atomic>
2933

@@ -159,6 +163,112 @@ TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsMetadata) {
159163
ASSERT_STATUS_OK(response.get());
160164
}
161165

166+
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
167+
using ::google::cloud::testing_util::DisableTracing;
168+
using ::google::cloud::testing_util::EnableTracing;
169+
using ::google::cloud::testing_util::InstallSpanCatcher;
170+
using ::google::cloud::testing_util::SetServerMetadata;
171+
using ::google::cloud::testing_util::SpanNamed;
172+
using ::google::cloud::testing_util::ValidatePropagator;
173+
using ::google::pubsub::v1::PullRequest;
174+
using ::testing::_;
175+
using ::testing::AllOf;
176+
using ::testing::IsEmpty;
177+
using ::testing::Property;
178+
using ::testing::UnorderedElementsAre;
179+
180+
TEST(MakeSubscriberConnectionTest, TracingEnabledForUnaryPull) {
181+
auto span_catcher = InstallSpanCatcher();
182+
auto const subscription = Subscription("test-project", "test-subscription");
183+
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
184+
EXPECT_CALL(*mock, AsyncModifyAckDeadline)
185+
.WillRepeatedly([](google::cloud::CompletionQueue&, auto context,
186+
google::pubsub::v1::ModifyAckDeadlineRequest const&) {
187+
SetServerMetadata(*context, {});
188+
return make_ready_future(Status{});
189+
});
190+
EXPECT_CALL(*mock, AsyncAcknowledge)
191+
.WillOnce([](google::cloud::CompletionQueue&, auto context,
192+
google::pubsub::v1::AcknowledgeRequest const& request) {
193+
SetServerMetadata(*context, {});
194+
EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0"));
195+
return make_ready_future(
196+
Status{StatusCode::kUnknown, "test-only-unknown"});
197+
});
198+
EXPECT_CALL(*mock, Pull(_, AllOf(Property(&PullRequest::max_messages, 1),
199+
Property(&PullRequest::subscription,
200+
subscription.FullName()))))
201+
.WillOnce([&](grpc::ClientContext& context,
202+
google::pubsub::v1::PullRequest const&) {
203+
ValidatePropagator(context);
204+
google::pubsub::v1::PullResponse response;
205+
auto& message = *response.add_received_messages();
206+
message.set_delivery_attempt(42);
207+
message.set_ack_id("test-ack-id-0");
208+
message.mutable_message()->set_data("test-data-0");
209+
return response;
210+
});
211+
212+
auto subscriber = MakeTestSubscriberConnection(subscription, mock,
213+
EnableTracing(Options{}));
214+
215+
google::cloud::internal::OptionsSpan span(subscriber->options());
216+
auto response = subscriber->Pull();
217+
ASSERT_STATUS_OK(response);
218+
EXPECT_EQ(response->message.data(), "test-data-0");
219+
std::move(response->handler).ack();
220+
221+
auto spans = span_catcher->GetSpans();
222+
EXPECT_THAT(spans, UnorderedElementsAre(
223+
SpanNamed("test-subscription receive"),
224+
SpanNamed("google.pubsub.v1.Subscriber/Pull"),
225+
SpanNamed("google.pubsub.v1.Subscriber/Acknowledge")));
226+
}
227+
228+
TEST(MakeSubscriberConnectionTest, TracingDisabledForUnaryPull) {
229+
auto span_catcher = InstallSpanCatcher();
230+
auto const subscription = Subscription("test-project", "test-subscription");
231+
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
232+
EXPECT_CALL(*mock, AsyncModifyAckDeadline)
233+
.WillRepeatedly([](google::cloud::CompletionQueue&, auto,
234+
google::pubsub::v1::ModifyAckDeadlineRequest const&) {
235+
return make_ready_future(Status{});
236+
});
237+
EXPECT_CALL(*mock, AsyncAcknowledge)
238+
.WillOnce([](google::cloud::CompletionQueue&, auto,
239+
google::pubsub::v1::AcknowledgeRequest const& request) {
240+
EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0"));
241+
return make_ready_future(
242+
Status{StatusCode::kUnknown, "test-only-unknown"});
243+
});
244+
EXPECT_CALL(*mock, Pull(_, AllOf(Property(&PullRequest::max_messages, 1),
245+
Property(&PullRequest::subscription,
246+
subscription.FullName()))))
247+
.WillOnce(
248+
[&](grpc::ClientContext&, google::pubsub::v1::PullRequest const&) {
249+
google::pubsub::v1::PullResponse response;
250+
auto& message = *response.add_received_messages();
251+
message.set_delivery_attempt(42);
252+
message.set_ack_id("test-ack-id-0");
253+
message.mutable_message()->set_data("test-data-0");
254+
return response;
255+
});
256+
257+
auto subscriber = MakeTestSubscriberConnection(subscription, mock,
258+
DisableTracing(Options{}));
259+
260+
google::cloud::internal::OptionsSpan span(subscriber->options());
261+
auto response = subscriber->Pull();
262+
ASSERT_STATUS_OK(response);
263+
EXPECT_EQ(response->message.data(), "test-data-0");
264+
std::move(response->handler).ack();
265+
266+
auto spans = span_catcher->GetSpans();
267+
EXPECT_THAT(spans, IsEmpty());
268+
}
269+
270+
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
271+
162272
} // namespace
163273
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
164274
} // namespace pubsub

0 commit comments

Comments
 (0)