Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,6 @@ async Task MultiplexingReadLoop()

// We have a resultset for the command - hand back control to the command (which will
// return it to the user)
command.TraceReceivedFirstResponse(DataSource.Configuration.TracingOptions);
ReaderCompleted.Reset();
command.ExecutionCompletion.SetResult(this);

Expand Down
7 changes: 4 additions & 3 deletions src/Npgsql/NpgsqlActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ internal static void ReceivedFirstResponse(Activity activity, NpgsqlTracingOptio

internal static void CommandStop(Activity activity)
{
activity.SetTag("otel.status_code", "OK");
activity.SetStatus(ActivityStatusCode.Ok);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a dup of the line just above? Or are there two different things? Same below for exception.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. Activity.Status (and Activity.Description) do not add any tags, they're essentially just fields in Activity and it's up to exporters to populate tags depending on Activity.Status. So in case there's an exporter which doesn't react on Activity.Status, then these tags will never be added if we remove them from there.
I'm more-or-less OK with removing explicit tags.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough about this... What do ASP.NET do, for instance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking - I guess we can do the same then?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure

activity.Dispose();
}

internal static void SetException(Activity activity, Exception ex, bool escaped = true)
{
// TODO: We can instead use Activity.AddException whenever we start using .NET 9
var tags = new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName },
Expand All @@ -122,8 +123,8 @@ internal static void SetException(Activity activity, Exception ex, bool escaped
};
var activityEvent = new ActivityEvent("exception", tags: tags);
activity.AddEvent(activityEvent);
activity.SetTag("otel.status_code", "ERROR");
activity.SetTag("otel.status_description", ex is PostgresException pgEx ? pgEx.SqlState : ex.Message);
var statusDescription = ex is PostgresException pgEx ? pgEx.SqlState : ex.Message;
activity.SetStatus(ActivityStatusCode.Error, statusDescription);
activity.Dispose();
}
}
10 changes: 6 additions & 4 deletions src/Npgsql/NpgsqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,8 @@ internal virtual async ValueTask<NpgsqlDataReader> ExecuteReader(bool async, Com
connector.CurrentReader = reader;
await reader.NextResultAsync(cancellationToken).ConfigureAwait(false);

TraceReceivedFirstResponse(connector.DataSource.Configuration.TracingOptions);

return reader;
}
}
Expand Down Expand Up @@ -1718,12 +1720,12 @@ internal void TraceCommandStart(NpgsqlConnectionStringBuilder settings, NpgsqlTr
? tracingOptions.BatchFilter?.Invoke(WrappingBatch) ?? true
: tracingOptions.CommandFilter?.Invoke(this) ?? true;

var spanName = WrappingBatch is not null
? tracingOptions.BatchSpanNameProvider?.Invoke(WrappingBatch)
: tracingOptions.CommandSpanNameProvider?.Invoke(this);

if (enableTracing)
{
var spanName = WrappingBatch is not null
? tracingOptions.BatchSpanNameProvider?.Invoke(WrappingBatch)
: tracingOptions.CommandSpanNameProvider?.Invoke(this);

CurrentActivity = NpgsqlActivitySource.CommandStart(
settings,
WrappingBatch is not null ? GetBatchFullCommandText() : CommandText,
Expand Down
201 changes: 201 additions & 0 deletions test/Npgsql.Tests/TracingTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Npgsql.Tests;

[NonParallelizable]
public class TracingTests(MultiplexingMode multiplexingMode) : MultiplexingTestBase(multiplexingMode)
{
[Test]
public async Task Basic([Values] bool async, [Values] bool batch)
{
if (IsMultiplexing && !async)
return;

var activities = new List<Activity>();

using var activityListener = new ActivityListener();
activityListener.ShouldListenTo = source => source.Name == "Npgsql";
activityListener.Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded;
activityListener.ActivityStopped = activity => activities.Add(activity);
ActivitySource.AddActivityListener(activityListener);

await using var dataSource = CreateDataSource();
await using var conn = await dataSource.OpenConnectionAsync();
await ExecuteScalar(conn, async, batch, "SELECT 42");

Assert.That(activities.Count, Is.EqualTo(1));
var activity = activities[0];
Assert.That(activity.DisplayName, Is.EqualTo(conn.Settings.Database));
Assert.That(activity.OperationName, Is.EqualTo(conn.Settings.Database));
Assert.That(activity.Status, Is.EqualTo(ActivityStatusCode.Ok));

Assert.That(activity.Events.Count(), Is.EqualTo(1));
var firstResponseEvent = activity.Events.First();
Assert.That(firstResponseEvent.Name, Is.EqualTo("received-first-response"));

var expectedTagCount = conn.Settings.Port == 5432 ? 9 : 10;
Assert.That(activity.TagObjects.Count(), Is.EqualTo(expectedTagCount));

var queryTag = activity.TagObjects.First(x => x.Key == "db.statement");
Assert.That(queryTag.Value, Is.EqualTo("SELECT 42"));

var systemTag = activity.TagObjects.First(x => x.Key == "db.system");
Assert.That(systemTag.Value, Is.EqualTo("postgresql"));

var userTag = activity.TagObjects.First(x => x.Key == "db.user");
Assert.That(userTag.Value, Is.EqualTo(conn.Settings.Username));

var dbNameTag = activity.TagObjects.First(x => x.Key == "db.name");
Assert.That(dbNameTag.Value, Is.EqualTo(conn.Settings.Database));

var connStringTag = activity.TagObjects.First(x => x.Key == "db.connection_string");
Assert.That(connStringTag.Value, Is.EqualTo(conn.ConnectionString));

if (!IsMultiplexing)
{
var connIDTag = activity.TagObjects.First(x => x.Key == "db.connection_id");
Assert.That(connIDTag.Value, Is.EqualTo(conn.ProcessID));
}
}

[Test]
public async Task Error([Values] bool async, [Values] bool batch)
{
if (IsMultiplexing && !async)
return;

var activities = new List<Activity>();

using var activityListener = new ActivityListener();
activityListener.ShouldListenTo = source => source.Name == "Npgsql";
activityListener.Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded;
activityListener.ActivityStopped = activity => activities.Add(activity);
ActivitySource.AddActivityListener(activityListener);

await using var dataSource = CreateDataSource();
await using var conn = await dataSource.OpenConnectionAsync();
Assert.ThrowsAsync<PostgresException>(async () => await ExecuteScalar(conn, async, batch, "SELECT * FROM non_existing_table"));

Assert.That(activities.Count, Is.EqualTo(1));
var activity = activities[0];
Assert.That(activity.DisplayName, Is.EqualTo(conn.Settings.Database));
Assert.That(activity.OperationName, Is.EqualTo(conn.Settings.Database));
Assert.That(activity.Status, Is.EqualTo(ActivityStatusCode.Error));
Assert.That(activity.StatusDescription, Is.EqualTo(PostgresErrorCodes.UndefinedTable));

Assert.That(activity.Events.Count(), Is.EqualTo(1));
var exceptionEvent = activity.Events.First();
Assert.That(exceptionEvent.Name, Is.EqualTo("exception"));

Assert.That(exceptionEvent.Tags.Count(), Is.EqualTo(4));

var exceptionTypeTag = exceptionEvent.Tags.First(x => x.Key == "exception.type");
Assert.That(exceptionTypeTag.Value, Is.EqualTo("Npgsql.PostgresException"));

var exceptionMessageTag = exceptionEvent.Tags.First(x => x.Key == "exception.message");
StringAssert.Contains("relation \"non_existing_table\" does not exist", (string)exceptionMessageTag.Value!);

var exceptionStacktraceTag = exceptionEvent.Tags.First(x => x.Key == "exception.stacktrace");
StringAssert.Contains("relation \"non_existing_table\" does not exist", (string)exceptionStacktraceTag.Value!);

var exceptionEscapedTag = exceptionEvent.Tags.First(x => x.Key == "exception.escaped");
Assert.That(exceptionEscapedTag.Value, Is.True);

var expectedTagCount = conn.Settings.Port == 5432 ? 9 : 10;
Assert.That(activity.TagObjects.Count(), Is.EqualTo(expectedTagCount));

var queryTag = activity.TagObjects.First(x => x.Key == "db.statement");
Assert.That(queryTag.Value, Is.EqualTo("SELECT * FROM non_existing_table"));

var systemTag = activity.TagObjects.First(x => x.Key == "db.system");
Assert.That(systemTag.Value, Is.EqualTo("postgresql"));

var userTag = activity.TagObjects.First(x => x.Key == "db.user");
Assert.That(userTag.Value, Is.EqualTo(conn.Settings.Username));

var dbNameTag = activity.TagObjects.First(x => x.Key == "db.name");
Assert.That(dbNameTag.Value, Is.EqualTo(conn.Settings.Database));

var connStringTag = activity.TagObjects.First(x => x.Key == "db.connection_string");
Assert.That(connStringTag.Value, Is.EqualTo(conn.ConnectionString));

if (!IsMultiplexing)
{
var connIDTag = activity.TagObjects.First(x => x.Key == "db.connection_id");
Assert.That(connIDTag.Value, Is.EqualTo(conn.ProcessID));
}
}

[Test]
public async Task Configure_tracing([Values] bool async, [Values] bool batch)
{
if (IsMultiplexing && !async)
return;

var activities = new List<Activity>();

using var activityListener = new ActivityListener();
activityListener.ShouldListenTo = source => source.Name == "Npgsql";
activityListener.Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded;
activityListener.ActivityStopped = activity => activities.Add(activity);
ActivitySource.AddActivityListener(activityListener);

var dataSourceBuilder = CreateDataSourceBuilder();
dataSourceBuilder.ConfigureTracing(options =>
{
options
.EnableFirstResponseEvent(enable: false)
.ConfigureCommandFilter(cmd => cmd.CommandText.Contains('2'))
.ConfigureBatchFilter(batch => batch.BatchCommands[0].CommandText.Contains('2'))
.ConfigureCommandSpanNameProvider(_ => "unknown_query")
.ConfigureBatchSpanNameProvider(_ => "unknown_query")
.ConfigureCommandEnrichmentCallback((activity, _) => activity.AddTag("custom_tag", "custom_value"))
.ConfigureBatchEnrichmentCallback((activity, _) => activity.AddTag("custom_tag", "custom_value"));
});
await using var dataSource = dataSourceBuilder.Build();
await using var conn = await dataSource.OpenConnectionAsync();

await ExecuteScalar(conn, async, batch, "SELECT 1");

Assert.That(activities.Count, Is.EqualTo(0));

await ExecuteScalar(conn, async, batch, "SELECT 2");

Assert.That(activities.Count, Is.EqualTo(1));
var activity = activities[0];
Assert.That(activity.DisplayName, Is.EqualTo("unknown_query"));
Assert.That(activity.OperationName, Is.EqualTo("unknown_query"));

Assert.That(activity.Events.Count(), Is.EqualTo(0));

var customTag = activity.TagObjects.First(x => x.Key == "custom_tag");
Assert.That(customTag.Value, Is.EqualTo("custom_value"));
}

async Task<object?> ExecuteScalar(NpgsqlConnection connection, bool async, bool isBatch, string query)
{
if (!isBatch)
{
if (async)
return await connection.ExecuteScalarAsync(query);
else
return connection.ExecuteScalar(query);
}
else
{
await using var batch = connection.CreateBatch();
var batchCommand = batch.CreateBatchCommand();
batchCommand.CommandText = query;
batch.BatchCommands.Add(batchCommand);

if (async)
return await batch.ExecuteScalarAsync();
else
return batch.ExecuteScalar();
}
}
}