Skip to content

Commit fa88fbf

Browse files
authored
Refactor XDC cache (temporalio#6583)
## What changed? <!-- Describe what has changed in this PR --> 1. Remove MaxEventId from cache key 2. Add NextEventID to cache value ## Why? <!-- Tell your future self why have you made these changes --> For State based replication usage. So the events can be retrieved from cache ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> current unit test ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> cache may stop working. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> n/a ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> no
1 parent e35b218 commit fa88fbf

4 files changed

Lines changed: 33 additions & 6 deletions

File tree

common/persistence/client/fx.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,12 @@ func ClusterNameProvider(config *cluster.Config) ClusterName {
100100

101101
func EventBlobCacheProvider(
102102
dc *dynamicconfig.Collection,
103+
logger log.Logger,
103104
) persistence.XDCCache {
104105
return persistence.NewEventsBlobCache(
105106
dynamicconfig.XDCCacheMaxSizeBytes.Get(dc)(),
106107
20*time.Second,
108+
logger,
107109
)
108110
}
109111

common/persistence/execution_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,12 +477,12 @@ func (m *executionManagerImpl) serializeWorkflowEventBatches(
477477
xdcKVs[NewXDCCacheKey(
478478
definition.NewWorkflowKey(workflowEvents.NamespaceID, workflowEvents.WorkflowID, workflowEvents.RunID),
479479
workflowEvents.Events[0].EventId,
480-
workflowEvents.Events[len(workflowEvents.Events)-1].EventId+1,
481480
workflowEvents.Events[0].Version,
482481
)] = NewXDCCacheValue(
483482
baseWorkflowInfo,
484483
versionHistoryItems,
485484
[]*commonpb.DataBlob{newEvents.Node.Events},
485+
workflowEvents.Events[len(workflowEvents.Events)-1].EventId+1,
486486
)
487487
newEvents.ShardID = shardID
488488
workflowNewEvents = append(workflowNewEvents, newEvents)

common/persistence/xdc_cache.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,33 @@
2525
package persistence
2626

2727
import (
28+
"fmt"
2829
"time"
2930

3031
commonpb "go.temporal.io/api/common/v1"
32+
historypb "go.temporal.io/api/history/v1"
3133
historyspb "go.temporal.io/server/api/history/v1"
3234
persistencepb "go.temporal.io/server/api/persistence/v1"
3335
workflowspb "go.temporal.io/server/api/workflow/v1"
3436
"go.temporal.io/server/common/cache"
3537
"go.temporal.io/server/common/definition"
38+
"go.temporal.io/server/common/log"
39+
"go.temporal.io/server/common/log/tag"
40+
"go.temporal.io/server/common/persistence/serialization"
3641
"go.temporal.io/server/common/persistence/versionhistory"
3742
)
3843

3944
type (
4045
XDCCacheKey struct {
4146
WorkflowKey definition.WorkflowKey
4247
MinEventID int64 // inclusive
43-
MaxEventID int64 // exclusive
4448
Version int64
4549
}
4650
XDCCacheValue struct {
4751
BaseWorkflowInfo *workflowspb.BaseExecutionInfo
4852
VersionHistoryItems []*historyspb.VersionHistoryItem
4953
EventBlobs []*commonpb.DataBlob
54+
NextEventID int64
5055
}
5156

5257
XDCCache interface {
@@ -55,7 +60,9 @@ type (
5560
}
5661

5762
XDCCacheImpl struct {
58-
cache cache.Cache
63+
cache cache.Cache
64+
logger log.Logger
65+
serializer serialization.Serializer
5966
}
6067
)
6168

@@ -69,13 +76,11 @@ var _ cache.SizeGetter = XDCCacheValue{}
6976
func NewXDCCacheKey(
7077
workflowKey definition.WorkflowKey,
7178
minEventID int64,
72-
maxEventID int64,
7379
version int64,
7480
) XDCCacheKey {
7581
return XDCCacheKey{
7682
WorkflowKey: workflowKey,
7783
MinEventID: minEventID,
78-
MaxEventID: maxEventID,
7984
Version: version,
8085
}
8186
}
@@ -84,11 +89,13 @@ func NewXDCCacheValue(
8489
baseWorkflowInfo *workflowspb.BaseExecutionInfo,
8590
versionHistoryItems []*historyspb.VersionHistoryItem,
8691
eventBlobs []*commonpb.DataBlob,
92+
nextEventID int64,
8793
) XDCCacheValue {
8894
return XDCCacheValue{
8995
BaseWorkflowInfo: baseWorkflowInfo,
9096
VersionHistoryItems: versionHistoryItems,
9197
EventBlobs: eventBlobs,
98+
NextEventID: nextEventID,
9299
}
93100
}
94101

@@ -106,6 +113,7 @@ func (v XDCCacheValue) CacheSize() int {
106113
func NewEventsBlobCache(
107114
maxBytes int,
108115
ttl time.Duration,
116+
logger log.Logger,
109117
) *XDCCacheImpl {
110118
return &XDCCacheImpl{
111119
cache: cache.New(
@@ -115,13 +123,31 @@ func NewEventsBlobCache(
115123
Pin: false,
116124
},
117125
),
126+
logger: logger,
127+
serializer: serialization.NewSerializer(),
118128
}
119129
}
120130

121131
func (e *XDCCacheImpl) Put(
122132
key XDCCacheKey,
123133
value XDCCacheValue,
124134
) {
135+
existingValue, found := e.Get(key)
136+
if found && existingValue.NextEventID != value.NextEventID {
137+
deserializeBlobs := func(blobs []*commonpb.DataBlob) [][]*historypb.HistoryEvent {
138+
events := make([][]*historypb.HistoryEvent, len(blobs))
139+
for i, blob := range blobs {
140+
var err error
141+
events[i], err = e.serializer.DeserializeEvents(blob)
142+
if err != nil {
143+
e.logger.Error("Error deserializing events", tag.Error(err))
144+
return nil
145+
}
146+
}
147+
return events
148+
}
149+
e.logger.Error(fmt.Sprintf("Putting duplicate key in XDC cache: wf-key: %v, existing event blobs: %v, new event blobs: %v", key.WorkflowKey, deserializeBlobs(existingValue.EventBlobs), deserializeBlobs(value.EventBlobs)))
150+
}
125151
e.cache.Put(key, value)
126152
}
127153

service/history/replication/raw_task_converter.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,6 @@ func getVersionHistoryAndEvents(
420420
if xdcCacheValue, ok := eventBlobCache.Get(persistence.NewXDCCacheKey(
421421
workflowKey,
422422
firstEventID,
423-
nextEventID,
424423
eventVersion,
425424
)); ok {
426425
return xdcCacheValue.VersionHistoryItems, xdcCacheValue.EventBlobs, xdcCacheValue.BaseWorkflowInfo, nil

0 commit comments

Comments
 (0)