2323import static io .temporal .internal .common .WorkflowExecutionUtils .isFullHistory ;
2424
2525import com .google .common .base .Preconditions ;
26+ import com .google .common .cache .Cache ;
2627import com .google .common .cache .CacheBuilder ;
27- import com .google .common .cache .CacheLoader ;
28- import com .google .common .cache .LoadingCache ;
2928import com .uber .m3 .tally .Scope ;
3029import io .temporal .api .common .v1 .WorkflowExecution ;
3130import io .temporal .api .workflowservice .v1 .PollWorkflowTaskQueueResponseOrBuilder ;
3231import io .temporal .internal .replay .WorkflowRunTaskHandler ;
3332import io .temporal .worker .MetricsType ;
3433import java .util .Objects ;
3534import java .util .concurrent .Callable ;
36- import java . util . concurrent . ExecutionException ;
35+ import javax . annotation . Nullable ;
3736import javax .annotation .concurrent .ThreadSafe ;
3837import org .slf4j .Logger ;
3938import org .slf4j .LoggerFactory ;
4241public final class WorkflowExecutorCache {
4342 private final Logger log = LoggerFactory .getLogger (WorkflowExecutorCache .class );
4443 private final WorkflowRunLockManager runLockManager ;
45- private final LoadingCache <String , WorkflowRunTaskHandler > cache ;
44+ private final Cache <String , WorkflowRunTaskHandler > cache ;
4645 private final Scope metricsScope ;
4746
4847 public WorkflowExecutorCache (
@@ -73,13 +72,7 @@ public WorkflowExecutorCache(
7372 }
7473 }
7574 })
76- .build (
77- new CacheLoader <String , WorkflowRunTaskHandler >() {
78- @ Override
79- public WorkflowRunTaskHandler load (String key ) {
80- return null ;
81- }
82- });
75+ .build ();
8376 this .metricsScope = Objects .requireNonNull (scope );
8477 this .metricsScope .gauge (MetricsType .STICKY_CACHE_SIZE ).update (size ());
8578 }
@@ -90,47 +83,30 @@ public WorkflowRunTaskHandler getOrCreate(
9083 Callable <WorkflowRunTaskHandler > workflowExecutorFn )
9184 throws Exception {
9285 WorkflowExecution execution = workflowTask .getWorkflowExecution ();
86+ String runId = execution .getRunId ();
9387 if (isFullHistory (workflowTask )) {
94- // no need to call a full-blown #invalidate, because we don't need to unmark from processing
95- // yet
96- cache .invalidate (execution .getRunId ());
97- metricsScope .counter (MetricsType .STICKY_CACHE_TOTAL_FORCED_EVICTION ).inc (1 );
98-
88+ invalidate (execution , metricsScope , "full history" , null );
9989 log .trace (
10090 "New Workflow Executor {}-{} has been created for a full history run" ,
10191 execution .getWorkflowId (),
102- execution . getRunId () );
92+ runId );
10393 return workflowExecutorFn .call ();
10494 }
10595
106- WorkflowRunTaskHandler workflowRunTaskHandler = getForProcessing (execution , workflowTypeScope );
96+ @ Nullable WorkflowRunTaskHandler workflowRunTaskHandler = cache .getIfPresent (runId );
97+
10798 if (workflowRunTaskHandler != null ) {
99+ workflowTypeScope .counter (MetricsType .STICKY_CACHE_HIT ).inc (1 );
108100 return workflowRunTaskHandler ;
109101 }
110102
111103 log .trace (
112104 "Workflow Executor {}-{} wasn't found in cache and a new executor has been created" ,
113105 execution .getWorkflowId (),
114- execution .getRunId ());
115- return workflowExecutorFn .call ();
116- }
106+ runId );
107+ workflowTypeScope .counter (MetricsType .STICKY_CACHE_MISS ).inc (1 );
117108
118- public WorkflowRunTaskHandler getForProcessing (
119- WorkflowExecution workflowExecution , Scope metricsScope ) throws ExecutionException {
120- String runId = workflowExecution .getRunId ();
121- try {
122- WorkflowRunTaskHandler workflowRunTaskHandler = cache .get (runId );
123- log .trace (
124- "Workflow Execution {}-{} has been marked as in-progress" ,
125- workflowExecution .getWorkflowId (),
126- workflowExecution .getRunId ());
127- metricsScope .counter (MetricsType .STICKY_CACHE_HIT ).inc (1 );
128- return workflowRunTaskHandler ;
129- } catch (CacheLoader .InvalidCacheLoadException e ) {
130- // We don't have a default loader and don't want to have one. So it's ok to get null value.
131- metricsScope .counter (MetricsType .STICKY_CACHE_MISS ).inc (1 );
132- return null ;
133- }
109+ return workflowExecutorFn .call ();
134110 }
135111
136112 public void addToCache (
@@ -143,7 +119,13 @@ public void addToCache(
143119 this .metricsScope .gauge (MetricsType .STICKY_CACHE_SIZE ).update (size ());
144120 }
145121
146- public boolean evictAnyNotInProcessing (WorkflowExecution inFavorOfExecution , Scope metricsScope ) {
122+ /**
123+ * @param workflowTypeScope accepts workflow metric scope (tagged with task queue and workflow
124+ * type)
125+ */
126+ @ SuppressWarnings ("deprecation" )
127+ public boolean evictAnyNotInProcessing (
128+ WorkflowExecution inFavorOfExecution , Scope workflowTypeScope ) {
147129 try {
148130 String inFavorOfRunId = inFavorOfExecution .getRunId ();
149131 for (String key : cache .asMap ().keySet ()) {
@@ -159,8 +141,8 @@ public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Sco
159141 inFavorOfRunId ,
160142 key );
161143 cache .invalidate (key );
162- metricsScope .counter (MetricsType .STICKY_CACHE_THREAD_FORCED_EVICTION ).inc (1 );
163- metricsScope .counter (MetricsType .STICKY_CACHE_TOTAL_FORCED_EVICTION ).inc (1 );
144+ workflowTypeScope .counter (MetricsType .STICKY_CACHE_THREAD_FORCED_EVICTION ).inc (1 );
145+ workflowTypeScope .counter (MetricsType .STICKY_CACHE_TOTAL_FORCED_EVICTION ).inc (1 );
164146 return true ;
165147 } finally {
166148 runLockManager .unlock (key );
@@ -175,22 +157,23 @@ public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Sco
175157 }
176158 }
177159
160+ @ SuppressWarnings ("deprecation" )
178161 public void invalidate (
179162 WorkflowExecution execution , Scope workflowTypeScope , String reason , Throwable cause ) {
180- try {
181- String runId = execution .getRunId ();
182- if (log .isTraceEnabled ()) {
183- log .trace (
184- "Invalidating {}-{} because of '{}', value is present in the cache: {}" ,
185- execution .getWorkflowId (),
186- runId ,
187- reason ,
188- cache .getIfPresent (runId ),
189- cause );
190- }
191- cache .invalidate (runId );
163+ String runId = execution .getRunId ();
164+ @ Nullable WorkflowRunTaskHandler present = cache .getIfPresent (runId );
165+ if (log .isTraceEnabled ()) {
166+ log .trace (
167+ "Invalidating {}-{} because of '{}', value is present in the cache: {}" ,
168+ execution .getWorkflowId (),
169+ runId ,
170+ reason ,
171+ present ,
172+ cause );
173+ }
174+ cache .invalidate (runId );
175+ if (present != null ) {
192176 workflowTypeScope .counter (MetricsType .STICKY_CACHE_TOTAL_FORCED_EVICTION ).inc (1 );
193- } finally {
194177 this .metricsScope .gauge (MetricsType .STICKY_CACHE_SIZE ).update (size ());
195178 }
196179 }
0 commit comments