|
19 | 19 | #include "google/cloud/pubsub/testing/fake_streaming_pull.h" |
20 | 20 | #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" |
21 | 21 | #include "google/cloud/pubsub/testing/test_retry_policies.h" |
| 22 | +#include "google/cloud/credentials.h" |
22 | 23 | #include "google/cloud/internal/api_client_header.h" |
| 24 | +#include "google/cloud/internal/opentelemetry.h" |
23 | 25 | #include "google/cloud/internal/url_encode.h" |
| 26 | +#include "google/cloud/testing_util/opentelemetry_matchers.h" |
24 | 27 | #include "google/cloud/testing_util/scoped_log.h" |
25 | 28 | #include "google/cloud/testing_util/status_matchers.h" |
26 | 29 | #include "google/cloud/testing_util/validate_metadata.h" |
| 30 | +#include "google/cloud/testing_util/validate_propagator.h" |
27 | 31 | #include <gmock/gmock.h> |
28 | 32 | #include <atomic> |
29 | 33 |
|
@@ -159,6 +163,112 @@ TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsMetadata) { |
159 | 163 | ASSERT_STATUS_OK(response.get()); |
160 | 164 | } |
161 | 165 |
|
| 166 | +#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY |
| 167 | +using ::google::cloud::testing_util::DisableTracing; |
| 168 | +using ::google::cloud::testing_util::EnableTracing; |
| 169 | +using ::google::cloud::testing_util::InstallSpanCatcher; |
| 170 | +using ::google::cloud::testing_util::SetServerMetadata; |
| 171 | +using ::google::cloud::testing_util::SpanNamed; |
| 172 | +using ::google::cloud::testing_util::ValidatePropagator; |
| 173 | +using ::google::pubsub::v1::PullRequest; |
| 174 | +using ::testing::_; |
| 175 | +using ::testing::AllOf; |
| 176 | +using ::testing::IsEmpty; |
| 177 | +using ::testing::Property; |
| 178 | +using ::testing::UnorderedElementsAre; |
| 179 | + |
| 180 | +TEST(MakeSubscriberConnectionTest, TracingEnabledForUnaryPull) { |
| 181 | + auto span_catcher = InstallSpanCatcher(); |
| 182 | + auto const subscription = Subscription("test-project", "test-subscription"); |
| 183 | + auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>(); |
| 184 | + EXPECT_CALL(*mock, AsyncModifyAckDeadline) |
| 185 | + .WillRepeatedly([](google::cloud::CompletionQueue&, auto context, |
| 186 | + google::pubsub::v1::ModifyAckDeadlineRequest const&) { |
| 187 | + SetServerMetadata(*context, {}); |
| 188 | + return make_ready_future(Status{}); |
| 189 | + }); |
| 190 | + EXPECT_CALL(*mock, AsyncAcknowledge) |
| 191 | + .WillOnce([](google::cloud::CompletionQueue&, auto context, |
| 192 | + google::pubsub::v1::AcknowledgeRequest const& request) { |
| 193 | + SetServerMetadata(*context, {}); |
| 194 | + EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0")); |
| 195 | + return make_ready_future( |
| 196 | + Status{StatusCode::kUnknown, "test-only-unknown"}); |
| 197 | + }); |
| 198 | + EXPECT_CALL(*mock, Pull(_, AllOf(Property(&PullRequest::max_messages, 1), |
| 199 | + Property(&PullRequest::subscription, |
| 200 | + subscription.FullName())))) |
| 201 | + .WillOnce([&](grpc::ClientContext& context, |
| 202 | + google::pubsub::v1::PullRequest const&) { |
| 203 | + ValidatePropagator(context); |
| 204 | + google::pubsub::v1::PullResponse response; |
| 205 | + auto& message = *response.add_received_messages(); |
| 206 | + message.set_delivery_attempt(42); |
| 207 | + message.set_ack_id("test-ack-id-0"); |
| 208 | + message.mutable_message()->set_data("test-data-0"); |
| 209 | + return response; |
| 210 | + }); |
| 211 | + |
| 212 | + auto subscriber = MakeTestSubscriberConnection(subscription, mock, |
| 213 | + EnableTracing(Options{})); |
| 214 | + |
| 215 | + google::cloud::internal::OptionsSpan span(subscriber->options()); |
| 216 | + auto response = subscriber->Pull(); |
| 217 | + ASSERT_STATUS_OK(response); |
| 218 | + EXPECT_EQ(response->message.data(), "test-data-0"); |
| 219 | + std::move(response->handler).ack(); |
| 220 | + |
| 221 | + auto spans = span_catcher->GetSpans(); |
| 222 | + EXPECT_THAT(spans, UnorderedElementsAre( |
| 223 | + SpanNamed("test-subscription receive"), |
| 224 | + SpanNamed("google.pubsub.v1.Subscriber/Pull"), |
| 225 | + SpanNamed("google.pubsub.v1.Subscriber/Acknowledge"))); |
| 226 | +} |
| 227 | + |
| 228 | +TEST(MakeSubscriberConnectionTest, TracingDisabledForUnaryPull) { |
| 229 | + auto span_catcher = InstallSpanCatcher(); |
| 230 | + auto const subscription = Subscription("test-project", "test-subscription"); |
| 231 | + auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>(); |
| 232 | + EXPECT_CALL(*mock, AsyncModifyAckDeadline) |
| 233 | + .WillRepeatedly([](google::cloud::CompletionQueue&, auto, |
| 234 | + google::pubsub::v1::ModifyAckDeadlineRequest const&) { |
| 235 | + return make_ready_future(Status{}); |
| 236 | + }); |
| 237 | + EXPECT_CALL(*mock, AsyncAcknowledge) |
| 238 | + .WillOnce([](google::cloud::CompletionQueue&, auto, |
| 239 | + google::pubsub::v1::AcknowledgeRequest const& request) { |
| 240 | + EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0")); |
| 241 | + return make_ready_future( |
| 242 | + Status{StatusCode::kUnknown, "test-only-unknown"}); |
| 243 | + }); |
| 244 | + EXPECT_CALL(*mock, Pull(_, AllOf(Property(&PullRequest::max_messages, 1), |
| 245 | + Property(&PullRequest::subscription, |
| 246 | + subscription.FullName())))) |
| 247 | + .WillOnce( |
| 248 | + [&](grpc::ClientContext&, google::pubsub::v1::PullRequest const&) { |
| 249 | + google::pubsub::v1::PullResponse response; |
| 250 | + auto& message = *response.add_received_messages(); |
| 251 | + message.set_delivery_attempt(42); |
| 252 | + message.set_ack_id("test-ack-id-0"); |
| 253 | + message.mutable_message()->set_data("test-data-0"); |
| 254 | + return response; |
| 255 | + }); |
| 256 | + |
| 257 | + auto subscriber = MakeTestSubscriberConnection(subscription, mock, |
| 258 | + DisableTracing(Options{})); |
| 259 | + |
| 260 | + google::cloud::internal::OptionsSpan span(subscriber->options()); |
| 261 | + auto response = subscriber->Pull(); |
| 262 | + ASSERT_STATUS_OK(response); |
| 263 | + EXPECT_EQ(response->message.data(), "test-data-0"); |
| 264 | + std::move(response->handler).ack(); |
| 265 | + |
| 266 | + auto spans = span_catcher->GetSpans(); |
| 267 | + EXPECT_THAT(spans, IsEmpty()); |
| 268 | +} |
| 269 | + |
| 270 | +#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY |
| 271 | + |
162 | 272 | } // namespace |
163 | 273 | GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END |
164 | 274 | } // namespace pubsub |
|
0 commit comments