Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions include/grpcpp/ext/otel_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ class OpenTelemetryPluginBuilder {
kServerCallRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.rcvd_total_compressed_message_size";

/// Experimental Retry Metrics
static constexpr absl::string_view kClientCallRetriesInstrumentName =
"grpc.client.call.retries";
static constexpr absl::string_view
kClientCallTransparentRetriesInstrumentName =
"grpc.client.call.transparent_retries";
static constexpr absl::string_view kClientCallRetryDelayInstrumentName =
"grpc.client.call.retry_delay";

OpenTelemetryPluginBuilder();
~OpenTelemetryPluginBuilder();
/// If `SetMeterProvider()` is not called, no metrics are collected.
Expand Down
38 changes: 37 additions & 1 deletion src/cpp/ext/otel/otel_client_call_tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer<
//
template <typename UnrefBehavior>
OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer<UnrefBehavior>::
CallAttemptTracer(const OpenTelemetryPluginImpl::ClientCallTracer* parent,
CallAttemptTracer(OpenTelemetryPluginImpl::ClientCallTracer* const parent,
uint64_t attempt_num, bool is_transparent_retry)
: parent_(parent), start_time_(absl::Now()) {
if (parent_->otel_plugin_->client_.attempt.started != nullptr) {
Expand Down Expand Up @@ -310,6 +310,12 @@ void OpenTelemetryPluginImpl::ClientCallTracer::
parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
->Record(incoming_bytes, labels, opentelemetry::context::Context{});
}
{
grpc_core::MutexLock lock(&parent_->mu_);
Comment thread
yashykt marked this conversation as resolved.
if (--parent_->num_active_attempts_ == 0) {
parent_->time_at_last_attempt_end_ = absl::Now();
}
}
if (span_ != nullptr) {
if (status.ok()) {
span_->SetStatus(opentelemetry::trace::StatusCode::kOk);
Expand Down Expand Up @@ -441,6 +447,32 @@ OpenTelemetryPluginImpl::ClientCallTracer::ClientCallTracer(
}

OpenTelemetryPluginImpl::ClientCallTracer::~ClientCallTracer() {
if (otel_plugin_->client_.call.retries != nullptr && retries_ > 1) {
otel_plugin_->client_.call.retries->Record(
retries_ - 1,
std::array<std::pair<absl::string_view, absl::string_view>, 3>{
{{OpenTelemetryMethodKey(), MethodForStats()},
{OpenTelemetryTargetKey(), scope_config_->filtered_target()}}},
opentelemetry::context::Context{});
}
if (otel_plugin_->client_.call.transparent_retries != nullptr &&
transparent_retries_ != 0) {
otel_plugin_->client_.call.transparent_retries->Record(
transparent_retries_,
std::array<std::pair<absl::string_view, absl::string_view>, 3>{
{{OpenTelemetryMethodKey(), MethodForStats()},
{OpenTelemetryTargetKey(), scope_config_->filtered_target()}}},
opentelemetry::context::Context{});
}
if (otel_plugin_->client_.call.retry_delay != nullptr &&
retry_delay_ != absl::ZeroDuration() && retries_ > 1) {
otel_plugin_->client_.call.retry_delay->Record(
absl::ToDoubleSeconds(retry_delay_),
std::array<std::pair<absl::string_view, absl::string_view>, 2>{
{{OpenTelemetryMethodKey(), MethodForStats()},
{OpenTelemetryTargetKey(), scope_config_->filtered_target()}}},
opentelemetry::context::Context{});
}
if (span_ != nullptr) {
span_->End();
}
Expand All @@ -458,13 +490,17 @@ OpenTelemetryPluginImpl::ClientCallTracer::StartNewAttempt(
grpc_core::MutexLock lock(&mu_);
if (transparent_retries_ != 0 || retries_ != 0) {
is_first_attempt = false;
if (num_active_attempts_ == 0 && !is_transparent_retry) {
retry_delay_ += absl::Now() - time_at_last_attempt_end_;
}
}
if (is_transparent_retry) {
++transparent_retries_;
} else {
++retries_;
}
attempt_num = retries_ - 1; // Sequence starts at 0
++num_active_attempts_;
}
if (is_first_attempt) {
return arena_
Expand Down
10 changes: 7 additions & 3 deletions src/cpp/ext/otel/otel_client_call_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class OpenTelemetryPluginImpl::ClientCallTracer
grpc_core::NonPolymorphicRefCount,
UnrefBehavior> {
public:
CallAttemptTracer(const OpenTelemetryPluginImpl::ClientCallTracer* parent,
CallAttemptTracer(OpenTelemetryPluginImpl::ClientCallTracer* const parent,
uint64_t attempt_num, bool is_transparent_retry);

~CallAttemptTracer() override;
Expand Down Expand Up @@ -105,7 +105,7 @@ class OpenTelemetryPluginImpl::ClientCallTracer

void PopulateLabelInjectors(grpc_metadata_batch* metadata);

const ClientCallTracer* parent_;
ClientCallTracer* const parent_;
// Start time (for measuring latency).
absl::Time start_time_;
std::unique_ptr<LabelsIterable> injected_labels_;
Expand Down Expand Up @@ -158,10 +158,14 @@ class OpenTelemetryPluginImpl::ClientCallTracer
OpenTelemetryPluginImpl* otel_plugin_;
std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config_;
grpc_core::Mutex mu_;
// Non-transparent attempts per call (including first attempt)
// Non-transparent retry attempts per call (includes initial attempt)
uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Transparent retries per call
uint64_t transparent_retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Retry delay
absl::Duration retry_delay_ ABSL_GUARDED_BY(&mu_);
absl::Time time_at_last_attempt_end_ ABSL_GUARDED_BY(&mu_);
uint64_t num_active_attempts_ ABSL_GUARDED_BY(&mu_) = 0;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
};

Expand Down
27 changes: 27 additions & 0 deletions src/cpp/ext/otel/otel_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,33 @@ OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per server call", "By");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientCallRetriesInstrumentName)) {
client_.call.retries = meter->CreateUInt64Histogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientCallRetriesInstrumentName),
"EXPERIMENTAL: Number of retries during the client call. If there "
"were no retries, 0 is not reported.",
"{retry}");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName)) {
client_.call.transparent_retries = meter->CreateUInt64Histogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName),
"EXPERIMENTAL: Number of transparent retries during the client call. "
"If there were no transparent retries, 0 is not reported.",
"{transparent_retry}");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName)) {
client_.call.retry_delay = meter->CreateDoubleHistogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName),
"EXPERIMENTAL: Total time of delay while there is no active attempt "
"during the client call",
"s");
}
// Store optional label keys for per call metrics
CHECK(static_cast<size_t>(grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kSize) <=
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/ext/otel/otel_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ class OpenTelemetryPluginImpl
};

struct ClientMetrics {
struct Call {
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>> retries;
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
transparent_retries;
std::unique_ptr<opentelemetry::metrics::Histogram<double>> retry_delay;
} call;
struct Attempt {
std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> started;
std::unique_ptr<opentelemetry::metrics::Histogram<double>> duration;
Expand Down
1 change: 1 addition & 0 deletions test/cpp/ext/otel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ grpc_cc_test(
"//:grpc++",
"//src/cpp/ext/otel:otel_plugin",
"//test/core/promise:test_context",
"//test/core/test_util:fail_first_call_filter",
"//test/core/test_util:grpc_test_util",
],
)
Expand Down
179 changes: 179 additions & 0 deletions test/cpp/ext/otel/otel_plugin_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "src/core/config/core_configuration.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/telemetry/call_tracer.h"
#include "test/core/test_util/fail_first_call_filter.h"
#include "test/core/test_util/fake_stats_plugin.h"
#include "test/core/test_util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
Expand Down Expand Up @@ -313,6 +314,184 @@ TEST_F(OpenTelemetryPluginEnd2EndTest,
EXPECT_EQ(*status_value, "OK");
}

// When there are no retries, no retry stats are exported.
TEST_F(OpenTelemetryPluginEnd2EndTest, RetryStatsWithoutRetries) {
Init(std::move(Options().set_metric_names(
{grpc::OpenTelemetryPluginBuilder::kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::kClientCallRetriesInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName})));
SendRPC();
const char* kRetryMetricName = "grpc.client.call.retries";
const char* kTransparentRetryMetricName =
"grpc.client.call.transparent_retries";
const char* kRetryDelayMetricName = "grpc.client.call.retry_delay";
const char* kClientAttemptDurationMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
// Use grpc.client.attempt.duration as a signal that client metrics have
// been collected for the call.
return !data.contains(kClientAttemptDurationMetricName);
});
EXPECT_TRUE(data.contains(kClientAttemptDurationMetricName));
EXPECT_FALSE(data.contains(kRetryMetricName));
EXPECT_FALSE(data.contains(kTransparentRetryMetricName));
EXPECT_FALSE(data.contains(kRetryDelayMetricName));
}

TEST_F(OpenTelemetryPluginEnd2EndTest, RetryStatsWithRetries) {
Init(std::move(
Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientCallRetriesInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName})
.set_service_config(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"grpc.testing.EchoTestService\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 3,\n"
" \"initialBackoff\": \"0.1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1,\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}")));
{
EchoRequest request;
request.mutable_param()->mutable_expected_error()->set_code(
StatusCode::ABORTED);
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_EQ(status.error_code(), StatusCode::ABORTED);
}
const char* kRetryMetricName = "grpc.client.call.retries";
const char* kTransparentRetryMetricName =
"grpc.client.call.transparent_retries";
const char* kRetryDelayMetricName = "grpc.client.call.retry_delay";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains(kRetryMetricName) ||
!data.contains(kRetryDelayMetricName);
});

ASSERT_EQ(data.size(), 2);
// Check for retry stats
EXPECT_THAT(
data[kRetryMetricName],
::testing::UnorderedElementsAre(::testing::AllOf(
AttributesEq(
/*label_keys=*/std::array<absl::string_view, 3>{"grpc.method",
"grpc.target"},
/*label_values=*/
std::array<absl::string_view, 3>{kMethodName,
canonical_server_address_},
/*optional_label_keys=*/std::array<absl::string_view, 0>{},
/*optional_label_values=*/std::array<absl::string_view, 0>{}),
HistogramResultEq(/*sum_matcher=*/::testing::Eq(int64_t(2)),
/*min_matcher=*/::testing::Eq(int64_t(2)),
/*max_matcher=*/::testing::Eq(int64_t(2)),
/*count=*/1))));
// Check for retry delay stats.
EXPECT_THAT(
data[kRetryDelayMetricName],
::testing::ElementsAre(::testing::AllOf(
AttributesEq(
/*label_keys=*/std::array<absl::string_view, 2>{"grpc.method",
"grpc.target"},
/*label_values=*/
std::array<absl::string_view, 2>{kMethodName,
canonical_server_address_},
/*optional_label_keys=*/std::array<absl::string_view, 0>{},
/*optional_label_values=*/std::array<absl::string_view, 0>{}),
HistogramResultEq(
/*sum_matcher=*/IsWithinRange(0.1,
0.3 * grpc_test_slowdown_factor()),
/*min_matcher=*/
IsWithinRange(0.1, 0.3 * grpc_test_slowdown_factor()),
/*max_matcher=*/
IsWithinRange(0.1, 0.3 * grpc_test_slowdown_factor()),
/*count=*/1))));
// No transparent retry stats reported
EXPECT_FALSE(data.contains(kTransparentRetryMetricName));
}

class OTelMetricsTestForTransparentRetries
: public OpenTelemetryPluginEnd2EndTest {
protected:
void SetUp() override {
grpc_core::CoreConfiguration::RegisterEphemeralBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
// Register FailFirstCallFilter to simulate transparent retries.
builder->channel_init()->RegisterFilter(
GRPC_CLIENT_SUBCHANNEL,
&grpc_core::testing::FailFirstCallFilter::kFilterVtable);
});
OpenTelemetryPluginEnd2EndTest::SetUp();
}
};

TEST_F(OTelMetricsTestForTransparentRetries, RetryStatsWithTransparentRetries) {
Init(std::move(Options().set_metric_names(
{grpc::OpenTelemetryPluginBuilder::kClientCallRetriesInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName})));
ChannelArguments args;
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
auto channel = grpc::CreateCustomChannel(
server_address_, grpc::InsecureChannelCredentials(), args);
ResetStub(std::move(channel));
SendRPC();
const char* kRetryMetricName = "grpc.client.call.retries";
const char* kTransparentRetryMetricName =
"grpc.client.call.transparent_retries";
const char* kRetryDelayMetricName = "grpc.client.call.retry_delay";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kTransparentRetryMetricName); });
ASSERT_EQ(data.size(), 1);
// No retry stats reported
EXPECT_FALSE(data.contains(kRetryMetricName));
// Check for transparent retry stats
EXPECT_THAT(
data[kTransparentRetryMetricName],
::testing::UnorderedElementsAre(::testing::AllOf(
AttributesEq(
/*label_keys=*/std::array<absl::string_view, 3>{"grpc.method",
"grpc.target"},
/*label_values=*/
std::array<absl::string_view, 3>{kMethodName,
canonical_server_address_},
/*optional_label_keys=*/std::array<absl::string_view, 0>{},
/*optional_label_values=*/std::array<absl::string_view, 0>{}),
HistogramResultEq(/*sum_matcher=*/::testing::Eq(int64_t(1)),
/*min_matcher=*/::testing::Eq(int64_t(1)),
/*max_matcher=*/::testing::Eq(int64_t(1)),
/*count=*/1))));
// No retry delay reported
EXPECT_FALSE(data.contains(kRetryDelayMetricName));
}

// Make sure that no meter provider results in normal operations.
TEST_F(OpenTelemetryPluginEnd2EndTest, NoMeterProviderRegistered) {
Init(
Expand Down
Loading