Skip to content

Commit 4e31088

Browse files
prathyushpvyycptt
andauthored
Emitting metrics for DLQ message count (temporalio#6125)
## What changed? Added a DLQMEtricsEmitter which will emit DLQ message count every 3 hours. ## Why? We alredy have a counter metric for DLQ writes. This will increment each time a new message is written to DLQ. This can be used to create an alarm that fires when a message is added to DLQ. But we also need to know if DLQ messages are inspected and cleared on time. This is not possible with the existing metric. This new metric DLQMessageCount will help in monitoring the current number of messages in DLQ. This metric will only be emitted from the history service instance which hosts shard numbered 1. I also considered adding a gauge metric for DLQ message count. If the history service restarts, we will lose this count. ## How did you test it? Unit test ## Potential risks None ## Documentation ## Is hotfix candidate? --------- Co-authored-by: Yichao Yang <yichao@temporal.io>
1 parent 0ad4287 commit 4e31088

8 files changed

Lines changed: 331 additions & 0 deletions

File tree

common/log/tag/tags.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,10 @@ func TaskType(taskType enumsspb.TaskType) ZapTag {
645645
return NewStringTag("queue-task-type", taskType.String())
646646
}
647647

648+
func TaskCategoryID(taskCategoryID int) ZapTag {
649+
return NewInt("queue-task-category-id", taskCategoryID)
650+
}
651+
648652
// TaskVisibilityTimestamp returns tag for task visibilityTimestamp
649653
func TaskVisibilityTimestamp(timestamp time.Time) ZapTag {
650654
return NewTimeTag("queue-task-visibility-timestamp", timestamp)

common/metrics/metric_defs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,10 @@ var (
958958
"dlq_writes",
959959
WithDescription("The number of times a message is enqueued to DLQ. DLQ can be inspected using tdbg dlq command."),
960960
)
961+
DLQMessageCount = NewGaugeDef(
962+
"dlq_message_count",
963+
WithDescription("The number of messages currently in DLQ."),
964+
)
961965
ReadNamespaceErrors = NewCounterDef("read_namespace_errors")
962966
RateLimitedTaskRunnableWaitTime = NewTimerDef("rate_limited_task_runnable_wait_time")
963967
CircuitBreakerExecutableBlocked = NewCounterDef("circuit_breaker_executable_blocked")

common/persistence/client/fx.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ var Module = fx.Options(
9191

9292
fx.Provide(ClusterNameProvider),
9393
fx.Provide(HealthSignalAggregatorProvider),
94+
fx.Provide(persistence.NewDLQMetricsEmitter),
9495
fx.Provide(EventBlobCacheProvider),
9596
)
9697

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in
13+
// all copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
// THE SOFTWARE.
22+
23+
package persistence
24+
25+
import (
26+
"context"
27+
"sync/atomic"
28+
"time"
29+
30+
"go.temporal.io/server/common"
31+
"go.temporal.io/server/common/headers"
32+
"go.temporal.io/server/common/log"
33+
"go.temporal.io/server/common/log/tag"
34+
"go.temporal.io/server/common/membership"
35+
"go.temporal.io/server/common/metrics"
36+
"go.temporal.io/server/service/history/tasks"
37+
)
38+
39+
const (
40+
emitDLQMetricsInterval = 3 * time.Hour
41+
)
42+
43+
// DLQMetricsEmitter emits the number of messages in DLQ in each task category.
44+
// This only has to be emitted from one history service instance. For this, DLQMetricsEmitter will only emit metrics
45+
// if the history service it currently run hosts shard 1.
46+
type (
47+
DLQMetricsEmitter struct {
48+
status int32
49+
shutdownCh chan struct{}
50+
metricsHandler metrics.Handler
51+
logger log.Logger
52+
emitMetricsTimer *time.Ticker
53+
historyTaskQueueManager HistoryTaskQueueManager
54+
historyServiceResolver membership.ServiceResolver
55+
hostInfoProvider membership.HostInfoProvider
56+
taskCategoryRegistry tasks.TaskCategoryRegistry
57+
}
58+
)
59+
60+
func NewDLQMetricsEmitter(
61+
metricsHandler metrics.Handler,
62+
logger log.Logger,
63+
manager HistoryTaskQueueManager,
64+
historyServiceResolver membership.ServiceResolver,
65+
hostInfoProvider membership.HostInfoProvider,
66+
taskCategoryRegistry tasks.TaskCategoryRegistry,
67+
) *DLQMetricsEmitter {
68+
return &DLQMetricsEmitter{
69+
status: common.DaemonStatusInitialized,
70+
shutdownCh: make(chan struct{}),
71+
metricsHandler: metricsHandler,
72+
emitMetricsTimer: time.NewTicker(emitDLQMetricsInterval),
73+
logger: logger,
74+
historyTaskQueueManager: manager,
75+
historyServiceResolver: historyServiceResolver,
76+
hostInfoProvider: hostInfoProvider,
77+
taskCategoryRegistry: taskCategoryRegistry,
78+
}
79+
}
80+
81+
func (s *DLQMetricsEmitter) Start() {
82+
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
83+
return
84+
}
85+
go s.emitMetricsLoop()
86+
}
87+
88+
func (s *DLQMetricsEmitter) Stop() {
89+
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
90+
return
91+
}
92+
close(s.shutdownCh)
93+
s.emitMetricsTimer.Stop()
94+
}
95+
96+
func (s *DLQMetricsEmitter) emitMetricsLoop() {
97+
for {
98+
select {
99+
case <-s.shutdownCh:
100+
return
101+
case <-s.emitMetricsTimer.C:
102+
if !s.shouldEmitMetrics() {
103+
continue
104+
}
105+
s.emitMetrics()
106+
}
107+
}
108+
}
109+
110+
func (s *DLQMetricsEmitter) emitMetrics() {
111+
categories := s.taskCategoryRegistry.GetCategories()
112+
messageCounts := make(map[int]int64)
113+
for category := range categories {
114+
messageCounts[category] = 0
115+
}
116+
queues, err := s.getDLQList()
117+
if err != nil {
118+
s.logger.Error("Failed to list DLQs to emit metrics", tag.Error(err))
119+
return
120+
}
121+
for _, q := range queues {
122+
category, err := GetHistoryTaskQueueCategoryID(q.QueueName)
123+
if err != nil {
124+
s.logger.Error("Failed to process DLQ queue name", tag.Error(err))
125+
}
126+
messageCounts[category] += q.MessageCount
127+
}
128+
for id, count := range messageCounts {
129+
category, ok := categories[id]
130+
if !ok {
131+
s.logger.Error("Failed to find category from ID", tag.TaskCategoryID(id))
132+
}
133+
metrics.DLQMessageCount.With(s.metricsHandler).Record(float64(count), metrics.TaskCategoryTag(category.Name()))
134+
}
135+
}
136+
137+
func (s *DLQMetricsEmitter) getDLQList() ([]QueueInfo, error) {
138+
var queues []QueueInfo
139+
var nextPageToken []byte
140+
for {
141+
ctx := headers.SetCallerInfo(context.Background(), headers.SystemPreemptableCallerInfo)
142+
resp, err := s.historyTaskQueueManager.ListQueues(ctx, &ListQueuesRequest{
143+
QueueType: QueueTypeHistoryDLQ,
144+
PageSize: 100,
145+
NextPageToken: nextPageToken,
146+
})
147+
if err != nil {
148+
return nil, err
149+
}
150+
queues = append(queues, resp.Queues...)
151+
nextPageToken = resp.NextPageToken
152+
if len(nextPageToken) == 0 {
153+
return queues, nil
154+
}
155+
}
156+
}
157+
158+
// shouldEmitMetrics determines if DLQMetricsEmitter should emit metrics. It returns true only if this instance of
159+
// history service is hosting shard 1.
160+
func (s *DLQMetricsEmitter) shouldEmitMetrics() bool {
161+
ownerInfo, err := s.historyServiceResolver.Lookup("1")
162+
if err != nil {
163+
s.logger.Error("Failed to get the history service hosting shard 1")
164+
return false
165+
}
166+
167+
hostInfo := s.hostInfoProvider.HostInfo()
168+
if ownerInfo.Identity() == hostInfo.Identity() {
169+
return true
170+
}
171+
172+
return false
173+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in
13+
// all copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
// THE SOFTWARE.
22+
23+
package persistence
24+
25+
import (
26+
"testing"
27+
"time"
28+
29+
"github.com/golang/mock/gomock"
30+
"github.com/stretchr/testify/assert"
31+
"go.temporal.io/server/common/log"
32+
"go.temporal.io/server/common/membership"
33+
"go.temporal.io/server/common/metrics"
34+
"go.temporal.io/server/common/metrics/metricstest"
35+
"go.temporal.io/server/service/history/tasks"
36+
)
37+
38+
func TestDLQMetricsEmitter_EmitMetrics_WhenInstanceHostsShardOne(t *testing.T) {
39+
ctrl := gomock.NewController(t)
40+
defer ctrl.Finish()
41+
42+
metricsHandler := metricstest.NewCaptureHandler()
43+
logger := log.NewMockLogger(ctrl)
44+
manager := NewMockHistoryTaskQueueManager(ctrl)
45+
manager.EXPECT().ListQueues(gomock.Any(), &ListQueuesRequest{
46+
QueueType: QueueTypeHistoryDLQ,
47+
PageSize: 100,
48+
NextPageToken: nil,
49+
}).Return(&ListQueuesResponse{
50+
Queues: []QueueInfo{
51+
{QueueName: GetHistoryTaskQueueName(tasks.CategoryIDTransfer, "source", "target"), MessageCount: 1},
52+
{QueueName: GetHistoryTaskQueueName(tasks.CategoryIDTimer, "source", "target"), MessageCount: 2},
53+
{QueueName: GetHistoryTaskQueueName(tasks.CategoryIDReplication, "source", "target"), MessageCount: 3},
54+
},
55+
NextPageToken: []byte("test_page_token"),
56+
}, nil).Times(1)
57+
manager.EXPECT().ListQueues(gomock.Any(), &ListQueuesRequest{
58+
QueueType: QueueTypeHistoryDLQ,
59+
PageSize: 100,
60+
NextPageToken: []byte("test_page_token"),
61+
}).Return(&ListQueuesResponse{
62+
Queues: []QueueInfo{
63+
{QueueName: GetHistoryTaskQueueName(tasks.CategoryIDTransfer, "source", "target"), MessageCount: 8},
64+
{QueueName: GetHistoryTaskQueueName(tasks.CategoryIDTimer, "source", "target"), MessageCount: 9},
65+
{QueueName: GetHistoryTaskQueueName(tasks.CategoryIDReplication, "source", "target"), MessageCount: 10},
66+
},
67+
NextPageToken: nil,
68+
}, nil).Times(1)
69+
resolver := membership.NewMockServiceResolver(ctrl)
70+
resolver.EXPECT().Lookup("1").Return(membership.NewHostInfoFromAddress("testAddress"), nil).Times(1)
71+
hostInfoProvider := membership.NewMockHostInfoProvider(ctrl)
72+
hostInfoProvider.EXPECT().HostInfo().Return(membership.NewHostInfoFromAddress("testAddress")).Times(1)
73+
categoryRegistry := tasks.NewDefaultTaskCategoryRegistry()
74+
emitter := NewDLQMetricsEmitter(metricsHandler, logger, manager, resolver, hostInfoProvider, categoryRegistry)
75+
emitter.emitMetricsTimer = time.NewTicker(60 * time.Millisecond)
76+
logger.EXPECT().Info(gomock.Any()).AnyTimes()
77+
logger.EXPECT().Error(gomock.Any()).AnyTimes()
78+
79+
capture := metricsHandler.StartCapture()
80+
snapshot := capture.Snapshot()
81+
emitter.Start()
82+
assert.Eventually(t, func() bool {
83+
snapshot = capture.Snapshot()
84+
return len(snapshot[metrics.DLQMessageCount.Name()]) == len(categoryRegistry.GetCategories())
85+
}, 5*time.Second, 100*time.Millisecond)
86+
87+
emitter.Stop()
88+
<-emitter.shutdownCh
89+
90+
messageCount := make(map[string]float64)
91+
for _, recording := range snapshot[metrics.DLQMessageCount.Name()] {
92+
value, ok := recording.Value.(float64)
93+
assert.True(t, ok)
94+
messageCount[recording.Tags[metrics.TaskCategoryTagName]] = value
95+
}
96+
assert.Equal(t, float64(9), messageCount["transfer"])
97+
assert.Equal(t, float64(11), messageCount["timer"])
98+
assert.Equal(t, float64(13), messageCount["replication"])
99+
}
100+
101+
func TestDLQMetricsEmitter_DoesNotEmitMetrics_WhenInstanceDoesNotHostShardOne(t *testing.T) {
102+
ctrl := gomock.NewController(t)
103+
defer ctrl.Finish()
104+
105+
metricsHandler := metricstest.NewCaptureHandler()
106+
capture := metricsHandler.StartCapture()
107+
logger := log.NewMockLogger(ctrl)
108+
manager := NewMockHistoryTaskQueueManager(ctrl)
109+
110+
resolver := membership.NewMockServiceResolver(ctrl)
111+
resolver.EXPECT().Lookup("1").Return(membership.NewHostInfoFromAddress("testAddress1"), nil).MinTimes(1)
112+
hostInfoProvider := membership.NewMockHostInfoProvider(ctrl)
113+
hostInfoProvider.EXPECT().HostInfo().Return(membership.NewHostInfoFromAddress("testAddress2")).MinTimes(1)
114+
115+
emitter := NewDLQMetricsEmitter(metricsHandler, logger, manager, resolver, hostInfoProvider, tasks.NewDefaultTaskCategoryRegistry())
116+
emitter.emitMetricsTimer = time.NewTicker(time.Millisecond)
117+
logger.EXPECT().Info(gomock.Any()).AnyTimes()
118+
logger.EXPECT().Error(gomock.Any()).AnyTimes()
119+
120+
emitter.Start()
121+
time.Sleep(100 * time.Millisecond) //nolint
122+
emitter.Stop()
123+
<-emitter.shutdownCh
124+
125+
snapshot := capture.Snapshot()
126+
assert.Empty(t, snapshot[metrics.DLQMessageCount.Name()])
127+
}

common/persistence/history_task_queue_manager.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"encoding/base64"
3131
"errors"
3232
"fmt"
33+
"strconv"
34+
"strings"
3335

3436
commonpb "go.temporal.io/api/common/v1"
3537
"go.temporal.io/api/enums/v1"
@@ -56,6 +58,8 @@ const (
5658
// - ShardID
5759
// - Blob (a serialized task) <-- when this cannot be deserialized
5860
ErrMsgDeserializeHistoryTask = "failed to deserialize history task blob"
61+
// ErrMsgFailedToParseCategoryID is returned when category id cannot be parsed as an integer value.
62+
ErrMsgFailedToParseCategoryID = "failed to parse category id from queue name"
5963
)
6064

6165
var (
@@ -64,6 +68,7 @@ var (
6468
ErrEnqueueTaskRequestTaskIsNil = errors.New("enqueue task request task is nil")
6569
ErrQueueAlreadyExists = errors.New("queue already exists")
6670
ErrShardIDInvalid = errors.New("shard ID must be greater than 0")
71+
ErrInvalidQueueName = errors.New("invalid queue name, expected 4 fields")
6772
)
6873

6974
func NewHistoryTaskQueueManager(queue QueueV2, serializer serialization.Serializer) *HistoryTaskQueueManagerImpl {
@@ -266,3 +271,15 @@ func GetHistoryTaskQueueName(
266271
hash := combineUnique(sourceCluster, targetCluster)[:clusterNamesHashSuffixLength]
267272
return fmt.Sprintf("%d_%s_%s_%s", categoryID, sourceCluster, targetCluster, hash)
268273
}
274+
275+
func GetHistoryTaskQueueCategoryID(queueName string) (int, error) {
276+
fields := strings.Split(queueName, "_")
277+
if len(fields) != 4 {
278+
return 0, fmt.Errorf("%w: %s", ErrInvalidQueueName, queueName)
279+
}
280+
category, err := strconv.Atoi(fields[0])
281+
if err != nil {
282+
return 0, fmt.Errorf("%v: %w", ErrMsgFailedToParseCategoryID, err)
283+
}
284+
return category, nil
285+
}

service/history/fx.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func HandlerProvider(args NewHandlerArgs) *Handler {
157157
tracer: args.TracerProvider.Tracer(consts.LibraryName),
158158
taskQueueManager: args.TaskQueueManager,
159159
taskCategoryRegistry: args.TaskCategoryRegistry,
160+
dlqMetricsEmitter: args.DLQMetricsEmitter,
160161

161162
replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory,
162163
replicationTaskConverterProvider: args.ReplicationTaskConverterFactory,

0 commit comments

Comments
 (0)