Skip to content

Commit 5a3ceb3

Browse files
authored
chore: add aibridge data to telemetry (#20449)
- Adds a new table to keep track of which payloads have already been reported since we only report for the last clock hour - Adds a query to gather and aggregate all the data by provider/model/client Relates to coder/coder-telemetry-server#27
1 parent cadf135 commit 5a3ceb3

20 files changed

Lines changed: 940 additions & 6 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,5 @@ result
8989
__debug_bin*
9090

9191
**/.claude/settings.local.json
92+
93+
/.env

coderd/database/check_constraint.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbauthz/dbauthz.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,13 @@ func (q *querier) BulkMarkNotificationMessagesSent(ctx context.Context, arg data
14241424
return q.db.BulkMarkNotificationMessagesSent(ctx, arg)
14251425
}
14261426

1427+
func (q *querier) CalculateAIBridgeInterceptionsTelemetrySummary(ctx context.Context, arg database.CalculateAIBridgeInterceptionsTelemetrySummaryParams) (database.CalculateAIBridgeInterceptionsTelemetrySummaryRow, error) {
1428+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceAibridgeInterception); err != nil {
1429+
return database.CalculateAIBridgeInterceptionsTelemetrySummaryRow{}, err
1430+
}
1431+
return q.db.CalculateAIBridgeInterceptionsTelemetrySummary(ctx, arg)
1432+
}
1433+
14271434
func (q *querier) ClaimPrebuiltWorkspace(ctx context.Context, arg database.ClaimPrebuiltWorkspaceParams) (database.ClaimPrebuiltWorkspaceRow, error) {
14281435
empty := database.ClaimPrebuiltWorkspaceRow{}
14291436

@@ -1723,6 +1730,13 @@ func (q *querier) DeleteOldProvisionerDaemons(ctx context.Context) error {
17231730
return q.db.DeleteOldProvisionerDaemons(ctx)
17241731
}
17251732

1733+
func (q *querier) DeleteOldTelemetryLocks(ctx context.Context, beforeTime time.Time) error {
1734+
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
1735+
return err
1736+
}
1737+
return q.db.DeleteOldTelemetryLocks(ctx, beforeTime)
1738+
}
1739+
17261740
func (q *querier) DeleteOldWorkspaceAgentLogs(ctx context.Context, threshold time.Time) error {
17271741
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
17281742
return err
@@ -4212,6 +4226,13 @@ func (q *querier) InsertTelemetryItemIfNotExists(ctx context.Context, arg databa
42124226
return q.db.InsertTelemetryItemIfNotExists(ctx, arg)
42134227
}
42144228

4229+
func (q *querier) InsertTelemetryLock(ctx context.Context, arg database.InsertTelemetryLockParams) error {
4230+
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
4231+
return err
4232+
}
4233+
return q.db.InsertTelemetryLock(ctx, arg)
4234+
}
4235+
42154236
func (q *querier) InsertTemplate(ctx context.Context, arg database.InsertTemplateParams) error {
42164237
obj := rbac.ResourceTemplate.InOrg(arg.OrganizationID)
42174238
if err := q.authorizeContext(ctx, policy.ActionCreate, obj); err != nil {
@@ -4523,6 +4544,13 @@ func (q *querier) ListAIBridgeInterceptions(ctx context.Context, arg database.Li
45234544
return q.db.ListAuthorizedAIBridgeInterceptions(ctx, arg, prep)
45244545
}
45254546

4547+
func (q *querier) ListAIBridgeInterceptionsTelemetrySummaries(ctx context.Context, arg database.ListAIBridgeInterceptionsTelemetrySummariesParams) ([]database.ListAIBridgeInterceptionsTelemetrySummariesRow, error) {
4548+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceAibridgeInterception); err != nil {
4549+
return nil, err
4550+
}
4551+
return q.db.ListAIBridgeInterceptionsTelemetrySummaries(ctx, arg)
4552+
}
4553+
45264554
func (q *querier) ListAIBridgeTokenUsagesByInterceptionIDs(ctx context.Context, interceptionIDs []uuid.UUID) ([]database.AIBridgeTokenUsage, error) {
45274555
// This function is a system function until we implement a join for aibridge interceptions.
45284556
// Matches the behavior of the workspaces listing endpoint.

coderd/database/dbauthz/dbauthz_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4627,3 +4627,25 @@ func (s *MethodTestSuite) TestAIBridge() {
46274627
check.Args(params).Asserts(intc, policy.ActionUpdate).Returns(intc)
46284628
}))
46294629
}
4630+
4631+
func (s *MethodTestSuite) TestTelemetry() {
4632+
s.Run("InsertTelemetryLock", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
4633+
db.EXPECT().InsertTelemetryLock(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
4634+
check.Args(database.InsertTelemetryLockParams{}).Asserts(rbac.ResourceSystem, policy.ActionCreate)
4635+
}))
4636+
4637+
s.Run("DeleteOldTelemetryLocks", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
4638+
db.EXPECT().DeleteOldTelemetryLocks(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
4639+
check.Args(time.Time{}).Asserts(rbac.ResourceSystem, policy.ActionDelete)
4640+
}))
4641+
4642+
s.Run("ListAIBridgeInterceptionsTelemetrySummaries", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
4643+
db.EXPECT().ListAIBridgeInterceptionsTelemetrySummaries(gomock.Any(), gomock.Any()).Return([]database.ListAIBridgeInterceptionsTelemetrySummariesRow{}, nil).AnyTimes()
4644+
check.Args(database.ListAIBridgeInterceptionsTelemetrySummariesParams{}).Asserts(rbac.ResourceAibridgeInterception, policy.ActionRead)
4645+
}))
4646+
4647+
s.Run("CalculateAIBridgeInterceptionsTelemetrySummary", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
4648+
db.EXPECT().CalculateAIBridgeInterceptionsTelemetrySummary(gomock.Any(), gomock.Any()).Return(database.CalculateAIBridgeInterceptionsTelemetrySummaryRow{}, nil).AnyTimes()
4649+
check.Args(database.CalculateAIBridgeInterceptionsTelemetrySummaryParams{}).Asserts(rbac.ResourceAibridgeInterception, policy.ActionRead)
4650+
}))
4651+
}

coderd/database/dbmetrics/querymetrics.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 58 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbpurge/dbpurge.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ const (
2424
// but we won't touch the `connection_logs` table.
2525
maxAuditLogConnectionEventAge = 90 * 24 * time.Hour // 90 days
2626
auditLogConnectionEventBatchSize = 1000
27+
// Telemetry heartbeats are used to deduplicate events across replicas. We
28+
// don't need to persist heartbeat rows for longer than 24 hours, as they
29+
// are only used for deduplication across replicas. The time needs to be
30+
// long enough to cover the maximum interval of a heartbeat event (currently
31+
// 1 hour) plus some buffer.
32+
maxTelemetryHeartbeatAge = 24 * time.Hour
2733
)
2834

2935
// New creates a new periodically purging database instance.
@@ -71,6 +77,10 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz.
7177
if err := tx.ExpirePrebuildsAPIKeys(ctx, dbtime.Time(start)); err != nil {
7278
return xerrors.Errorf("failed to expire prebuilds user api keys: %w", err)
7379
}
80+
deleteOldTelemetryLocksBefore := start.Add(-maxTelemetryHeartbeatAge)
81+
if err := tx.DeleteOldTelemetryLocks(ctx, deleteOldTelemetryLocksBefore); err != nil {
82+
return xerrors.Errorf("failed to delete old telemetry locks: %w", err)
83+
}
7484

7585
deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge)
7686
if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{

coderd/database/dbpurge/dbpurge_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,3 +704,56 @@ func TestExpireOldAPIKeys(t *testing.T) {
704704
// Out of an abundance of caution, we do not expire explicitly named prebuilds API keys.
705705
assertKeyActive(namedPrebuildsAPIKey.ID)
706706
}
707+
708+
func TestDeleteOldTelemetryHeartbeats(t *testing.T) {
709+
t.Parallel()
710+
711+
ctx := testutil.Context(t, testutil.WaitLong)
712+
713+
db, _, sqlDB := dbtestutil.NewDBWithSQLDB(t, dbtestutil.WithDumpOnFailure())
714+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
715+
clk := quartz.NewMock(t)
716+
now := clk.Now().UTC()
717+
718+
// Insert telemetry heartbeats.
719+
err := db.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{
720+
EventType: "aibridge_interceptions_summary",
721+
PeriodEndingAt: now.Add(-25 * time.Hour), // should be purged
722+
})
723+
require.NoError(t, err)
724+
err = db.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{
725+
EventType: "aibridge_interceptions_summary",
726+
PeriodEndingAt: now.Add(-23 * time.Hour), // should be kept
727+
})
728+
require.NoError(t, err)
729+
err = db.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{
730+
EventType: "aibridge_interceptions_summary",
731+
PeriodEndingAt: now, // should be kept
732+
})
733+
require.NoError(t, err)
734+
735+
done := awaitDoTick(ctx, t, clk)
736+
closer := dbpurge.New(ctx, logger, db, clk)
737+
defer closer.Close()
738+
<-done // doTick() has now run.
739+
740+
require.Eventuallyf(t, func() bool {
741+
// We use an SQL queries directly here because we don't expose queries
742+
// for deleting heartbeats in the application code.
743+
var totalCount int
744+
err := sqlDB.QueryRowContext(ctx, `
745+
SELECT COUNT(*) FROM telemetry_locks;
746+
`).Scan(&totalCount)
747+
assert.NoError(t, err)
748+
749+
var oldCount int
750+
err = sqlDB.QueryRowContext(ctx, `
751+
SELECT COUNT(*) FROM telemetry_locks WHERE period_ending_at < $1;
752+
`, now.Add(-24*time.Hour)).Scan(&oldCount)
753+
assert.NoError(t, err)
754+
755+
// Expect 2 heartbeats remaining and none older than 24 hours.
756+
t.Logf("eventually: total count: %d, old count: %d", totalCount, oldCount)
757+
return totalCount == 2 && oldCount == 0
758+
}, testutil.WaitShort, testutil.IntervalFast, "it should delete old telemetry heartbeats")
759+
}

coderd/database/dump.sql

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE telemetry_locks;

0 commit comments

Comments
 (0)