Skip to content

Commit e52d5bb

Browse files
authored
Enhanced persistence priority rate limiting (temporalio#3163)
Take caller and call origin into consideration when performing persistence rate limiting
1 parent 53d11ae commit e52d5bb

80 files changed

Lines changed: 1059 additions & 212 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/cluster/metadata.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,10 @@ func (m *metadataImpl) Start() {
228228
}
229229

230230
// TODO: specify a timeout for the context
231-
ctx := headers.SetCallerInfo(context.TODO(), headers.NewCallerInfo(headers.CallerTypeBackground))
231+
ctx := headers.SetCallerInfo(
232+
context.TODO(),
233+
headers.SystemBackgroundCallerInfo,
234+
)
232235
err := m.refreshClusterMetadata(ctx)
233236
if err != nil {
234237
m.logger.Fatal("Unable to initialize cluster metadata cache", tag.Error(err))

common/dynamicconfig/constants.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ const (
151151
FrontendPersistenceMaxQPS = "frontend.persistenceMaxQPS"
152152
// FrontendPersistenceGlobalMaxQPS is the max qps frontend cluster can query DB
153153
FrontendPersistenceGlobalMaxQPS = "frontend.persistenceGlobalMaxQPS"
154+
// FrontendPersistenceNamespaceMaxQPS is the max qps each namespace on frontend host can query DB
155+
FrontendPersistenceNamespaceMaxQPS = "frontend.persistenceNamespaceMaxQPS"
154156
// FrontendEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in frontend persistence client
155157
FrontendEnablePersistencePriorityRateLimiting = "frontend.enablePersistencePriorityRateLimiting"
156158
// FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page
@@ -247,6 +249,8 @@ const (
247249
MatchingPersistenceMaxQPS = "matching.persistenceMaxQPS"
248250
// MatchingPersistenceGlobalMaxQPS is the max qps matching cluster can query DB
249251
MatchingPersistenceGlobalMaxQPS = "matching.persistenceGlobalMaxQPS"
252+
// MatchingPersistenceNamespaceMaxQPS is the max qps each namespace on matching host can query DB
253+
MatchingPersistenceNamespaceMaxQPS = "matching.persistenceNamespaceMaxQPS"
250254
// MatchingEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in matching persistence client
251255
MatchingEnablePersistencePriorityRateLimiting = "matching.enablePersistencePriorityRateLimiting"
252256
// MatchingMinTaskThrottlingBurstSize is the minimum burst size for task queue throttling
@@ -296,6 +300,8 @@ const (
296300
HistoryPersistenceMaxQPS = "history.persistenceMaxQPS"
297301
// HistoryPersistenceGlobalMaxQPS is the max qps history cluster can query DB
298302
HistoryPersistenceGlobalMaxQPS = "history.persistenceGlobalMaxQPS"
303+
// HistoryPersistenceNamespaceMaxQPS is the max qps each namespace on history host can query DB
304+
HistoryPersistenceNamespaceMaxQPS = "history.persistenceNamespaceMaxQPS"
299305
// HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client
300306
HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting"
301307
// HistoryLongPollExpirationInterval is the long poll expiration interval in the history service
@@ -574,6 +580,8 @@ const (
574580
WorkerPersistenceMaxQPS = "worker.persistenceMaxQPS"
575581
// WorkerPersistenceGlobalMaxQPS is the max qps worker cluster can query DB
576582
WorkerPersistenceGlobalMaxQPS = "worker.persistenceGlobalMaxQPS"
583+
// WorkerPersistenceNamespaceMaxQPS is the max qps each namespace on worker host can query DB
584+
WorkerPersistenceNamespaceMaxQPS = "worker.persistenceNamespaceMaxQPS"
577585
// WorkerEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in worker persistence client
578586
WorkerEnablePersistencePriorityRateLimiting = "worker.enablePersistencePriorityRateLimiting"
579587
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time

common/headers/caller_info.go

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,47 +33,128 @@ import (
3333
const (
3434
CallerTypeAPI = "api"
3535
CallerTypeBackground = "background"
36+
37+
CallerNameSystem = "system"
3638
)
3739

38-
type CallerInfo struct {
39-
CallerType string
40+
var (
41+
SystemBackgroundCallerInfo CallerInfo = CallerInfo{
42+
CallerName: CallerNameSystem,
43+
CallerType: CallerTypeBackground,
44+
}
45+
)
4046

41-
// TODO: add fields for CallerName and CallerInitiation
42-
}
47+
type (
48+
CallerInfo struct {
49+
// CallerName is the name of the caller.
50+
// It can either user namespace name or
51+
// the predefined CallerNameSystem.
52+
CallerName string
53+
54+
// CallerType indicates if the call originates from
55+
// user API calls or from system background operations.
56+
CallerType string
57+
58+
// CallOrigin is the first API method name in the call chain.
59+
// Currently, it is only specified when CallerType is CallerTypeAPI
60+
CallOrigin string
61+
}
62+
)
4363

64+
// NewCallerInfo creates a new CallerInfo
4465
func NewCallerInfo(
66+
callerName string,
4567
callerType string,
68+
callOrigin string,
4669
) CallerInfo {
4770
return CallerInfo{
71+
CallerName: callerName,
4872
CallerType: callerType,
73+
CallOrigin: callOrigin,
4974
}
5075
}
5176

52-
// SetCallerInfo sets callerName and callerType value in incoming context
53-
// if not already exists.
77+
// NewBackgroundCallerInfo creates a new CallerInfo with Background callerType
78+
// and empty callOrigin.
79+
// This is equivalent to NewCallerInfo(callerName, CallerTypeBackground, "")
80+
func NewBackgroundCallerInfo(
81+
callerName string,
82+
) CallerInfo {
83+
return CallerInfo{
84+
CallerName: callerName,
85+
CallerType: CallerTypeBackground,
86+
}
87+
}
88+
89+
// SetCallerInfo sets callerName, callerType and CcllOrigin in the context.
90+
// Existing values will be overwritten if new value is not empty.
5491
// TODO: consider only set the caller info to golang context instead of grpc metadata
5592
// and propagate to grpc outgoing context upon making an rpc call
5693
func SetCallerInfo(
5794
ctx context.Context,
5895
info CallerInfo,
96+
) context.Context {
97+
return setIncomingMD(ctx, map[string]string{
98+
callerNameHeaderName: info.CallerName,
99+
callerTypeHeaderName: info.CallerType,
100+
callOriginHeaderName: info.CallOrigin,
101+
})
102+
}
103+
104+
// SetCallerName set caller name in the context.
105+
// Existing caller name will be overwritten if exists and new caller name is not empty.
106+
func SetCallerName(
107+
ctx context.Context,
108+
callerName string,
109+
) context.Context {
110+
return setIncomingMD(ctx, map[string]string{callerNameHeaderName: callerName})
111+
}
112+
113+
// SetCallerType set caller type in the context.
114+
// Existing caller type will be overwritten if exists and new caller type is not empty.
115+
func SetCallerType(
116+
ctx context.Context,
117+
callerType string,
118+
) context.Context {
119+
return setIncomingMD(ctx, map[string]string{callerTypeHeaderName: callerType})
120+
}
121+
122+
// SetOrigin set call origin in the context.
123+
// Existing call origin will be overwritten if exists and new call origin is not empty.
124+
func SetOrigin(
125+
ctx context.Context,
126+
callOrigin string,
127+
) context.Context {
128+
return setIncomingMD(ctx, map[string]string{callOriginHeaderName: callOrigin})
129+
}
130+
131+
func setIncomingMD(
132+
ctx context.Context,
133+
kv map[string]string,
59134
) context.Context {
60135
mdIncoming, ok := metadata.FromIncomingContext(ctx)
61136
if !ok {
62137
mdIncoming = metadata.MD{}
63138
}
64139

65-
if len(mdIncoming.Get(callerTypeHeaderName)) == 0 {
66-
mdIncoming.Set(callerTypeHeaderName, string(info.CallerType))
140+
for k, v := range kv {
141+
if v != "" {
142+
mdIncoming.Set(k, v)
143+
}
67144
}
68145

69146
return metadata.NewIncomingContext(ctx, mdIncoming)
70147
}
71148

149+
// GetCallerInfo retrieves caller information from the context if exists. Empty value is returned
150+
// if any piece of caller information is not specified in the context.
72151
func GetCallerInfo(
73152
ctx context.Context,
74153
) CallerInfo {
75-
values := GetValues(ctx, callerTypeHeaderName)
154+
values := GetValues(ctx, callerNameHeaderName, callerTypeHeaderName, callOriginHeaderName)
76155
return CallerInfo{
77-
CallerType: values[0],
156+
CallerName: values[0],
157+
CallerType: values[1],
158+
CallOrigin: values[2],
78159
}
79160
}

common/headers/caller_info_test.go

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,51 +48,142 @@ func (s *callerInfoSuite) SetupTest() {
4848
s.Assertions = require.New(s.T())
4949
}
5050

51+
func (s *callerInfoSuite) TestSetCallerName() {
52+
ctx := context.Background()
53+
info := GetCallerInfo(ctx)
54+
s.Empty(info.CallerName)
55+
56+
ctx = SetCallerName(ctx, CallerNameSystem)
57+
info = GetCallerInfo(ctx)
58+
s.Equal(CallerNameSystem, info.CallerName)
59+
60+
ctx = SetCallerName(ctx, "")
61+
info = GetCallerInfo(ctx)
62+
s.Equal(CallerNameSystem, info.CallerName)
63+
64+
newCallerName := "new caller name"
65+
ctx = SetCallerName(ctx, newCallerName)
66+
info = GetCallerInfo(ctx)
67+
s.Equal(newCallerName, info.CallerName)
68+
}
69+
70+
func (s *callerInfoSuite) TestSetCallerType() {
71+
ctx := context.Background()
72+
info := GetCallerInfo(ctx)
73+
s.Empty(info.CallerType)
74+
75+
ctx = SetCallerType(ctx, CallerTypeBackground)
76+
info = GetCallerInfo(ctx)
77+
s.Equal(CallerTypeBackground, info.CallerType)
78+
79+
ctx = SetCallerName(ctx, "")
80+
info = GetCallerInfo(ctx)
81+
s.Equal(CallerTypeBackground, info.CallerType)
82+
83+
ctx = SetCallerType(ctx, CallerTypeAPI)
84+
info = GetCallerInfo(ctx)
85+
s.Equal(CallerTypeAPI, info.CallerType)
86+
}
87+
88+
func (s *callerInfoSuite) TestSetCallOrigin() {
89+
ctx := context.Background()
90+
info := GetCallerInfo(ctx)
91+
s.Empty(info.CallOrigin)
92+
93+
initiation := "method name"
94+
ctx = SetOrigin(ctx, initiation)
95+
info = GetCallerInfo(ctx)
96+
s.Equal(initiation, info.CallOrigin)
97+
98+
ctx = SetOrigin(ctx, "")
99+
info = GetCallerInfo(ctx)
100+
s.Equal(initiation, info.CallOrigin)
101+
102+
newCallOrigin := "another method name"
103+
ctx = SetOrigin(ctx, newCallOrigin)
104+
info = GetCallerInfo(ctx)
105+
s.Equal(newCallOrigin, info.CallOrigin)
106+
}
107+
51108
func (s *callerInfoSuite) TestSetCallerInfo_PreserveOtherValues() {
52109
existingKey := "key"
53110
existingValue := "value"
111+
callerName := "callerName"
54112
callerType := CallerTypeAPI
113+
callOrigin := "methodName"
55114

56115
ctx := metadata.NewIncomingContext(
57116
context.Background(),
58117
metadata.Pairs(existingKey, existingValue),
59118
)
60119

61-
ctx = SetCallerInfo(ctx, NewCallerInfo(callerType))
120+
ctx = SetCallerInfo(ctx, NewCallerInfo(callerName, callerType, callOrigin))
62121

63122
md, ok := metadata.FromIncomingContext(ctx)
64123
s.True(ok)
65124
s.Equal(existingValue, md.Get(existingKey)[0])
125+
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
66126
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
67-
s.Len(md, 2)
127+
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
128+
s.Len(md, 4)
68129
}
69130

70131
func (s *callerInfoSuite) TestSetCallerInfo_NoExistingCallerInfo() {
132+
callerName := CallerNameSystem
71133
callerType := CallerTypeAPI
134+
callOrigin := "methodName"
72135

73136
ctx := SetCallerInfo(context.Background(), CallerInfo{
137+
CallerName: callerName,
74138
CallerType: callerType,
139+
CallOrigin: callOrigin,
75140
})
76141

77142
md, ok := metadata.FromIncomingContext(ctx)
78143
s.True(ok)
144+
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
79145
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
80-
s.Len(md, 1)
146+
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
147+
s.Len(md, 3)
81148
}
82149

83150
func (s *callerInfoSuite) TestSetCallerInfo_WithExistingCallerInfo() {
84-
callerType := CallerTypeAPI
151+
callerName := CallerNameSystem
152+
callerType := CallerTypeBackground
153+
callOrigin := "methodName"
85154

86-
ctx := SetCallerInfo(context.Background(), CallerInfo{
155+
ctx := SetCallerName(context.Background(), callerName)
156+
ctx = SetCallerType(ctx, CallerTypeAPI)
157+
ctx = SetOrigin(ctx, callOrigin)
158+
159+
ctx = SetCallerInfo(ctx, CallerInfo{
160+
CallerName: "",
87161
CallerType: callerType,
162+
CallOrigin: "",
88163
})
89164

165+
md, ok := metadata.FromIncomingContext(ctx)
166+
s.True(ok)
167+
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
168+
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
169+
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
170+
s.Len(md, 3)
171+
}
172+
173+
func (s *callerInfoSuite) TestSetCallerInfo_WithPartialCallerInfo() {
174+
callerName := CallerNameSystem
175+
callerType := CallerTypeAPI
176+
177+
ctx := SetCallerType(context.Background(), callerType)
178+
90179
ctx = SetCallerInfo(ctx, CallerInfo{
91-
CallerType: CallerTypeBackground,
180+
CallerName: callerName,
92181
})
93182

94183
md, ok := metadata.FromIncomingContext(ctx)
95184
s.True(ok)
185+
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
96186
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
97-
s.Len(md, 1)
187+
s.Empty(md.Get(callOriginHeaderName))
188+
s.Len(md, 2)
98189
}

common/headers/headers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ const (
3737
SupportedFeaturesHeaderName = "supported-features"
3838
SupportedFeaturesHeaderDelim = ","
3939

40+
callerNameHeaderName = "caller-name"
4041
callerTypeHeaderName = "caller-type"
42+
callOriginHeaderName = "call-initiation"
4143
)
4244

4345
var (
@@ -47,7 +49,9 @@ var (
4749
ClientVersionHeaderName,
4850
SupportedServerVersionsHeaderName,
4951
SupportedFeaturesHeaderName,
52+
callerNameHeaderName,
5053
callerTypeHeaderName,
54+
callOriginHeaderName,
5155
}
5256
)
5357

common/membership/rpMonitor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ func NewRingpopMonitor(
8484
) Monitor {
8585

8686
lifecycleCtx, lifecycleCancel := context.WithCancel(context.Background())
87-
lifecycleCtx = headers.SetCallerInfo(lifecycleCtx, headers.NewCallerInfo(headers.CallerTypeBackground))
87+
lifecycleCtx = headers.SetCallerInfo(
88+
lifecycleCtx,
89+
headers.SystemBackgroundCallerInfo,
90+
)
8891

8992
rpo := &ringpopMonitor{
9093
status: common.DaemonStatusInitialized,

common/namespace/registry.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,10 @@ func (r *registry) Start() {
220220
defer atomic.StoreInt32(&r.status, running)
221221

222222
// initialize the cache by initial scan
223-
ctx := headers.SetCallerInfo(context.Background(), headers.NewCallerInfo(headers.CallerTypeBackground))
223+
ctx := headers.SetCallerInfo(
224+
context.Background(),
225+
headers.SystemBackgroundCallerInfo,
226+
)
224227

225228
err := r.refreshNamespaces(ctx)
226229
if err != nil {

0 commit comments

Comments
 (0)