Skip to content

Commit f55f610

Browse files
authored
Cleanup WorkflowExecutorCache, rearrange checks in a flaky test (#1478)
1 parent 40f9212 commit f55f610

7 files changed

Lines changed: 86 additions & 63 deletions

File tree

gradle/linting.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ subprojects {
1515
// needs to be aligned with .editorconfig
1616
// https://github.com/diffplug/spotless/tree/main/plugin-gradle#ktlint
1717
// reenable filename rule after https://github.com/pinterest/ktlint/issues/1521
18-
.editorConfigOverride(['indent_size': '2', 'disabled_rules': 'filename'])
18+
.editorConfigOverride(['indent_size': '2', 'ktlint_disabled_rules': 'filename'])
1919
}
2020
}
2121

temporal-kotlin/.editorconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[*.{kt,kts}]
22
indent_size = 2
33
# reenable filename rule after https://github.com/pinterest/ktlint/issues/1521
4-
disabled_rules = filename
4+
ktlint_disabled_rules = filename

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java

Lines changed: 36 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@
2323
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
2424

2525
import com.google.common.base.Preconditions;
26+
import com.google.common.cache.Cache;
2627
import com.google.common.cache.CacheBuilder;
27-
import com.google.common.cache.CacheLoader;
28-
import com.google.common.cache.LoadingCache;
2928
import com.uber.m3.tally.Scope;
3029
import io.temporal.api.common.v1.WorkflowExecution;
3130
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
3231
import io.temporal.internal.replay.WorkflowRunTaskHandler;
3332
import io.temporal.worker.MetricsType;
3433
import java.util.Objects;
3534
import java.util.concurrent.Callable;
36-
import java.util.concurrent.ExecutionException;
35+
import javax.annotation.Nullable;
3736
import javax.annotation.concurrent.ThreadSafe;
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
@@ -42,7 +41,7 @@
4241
public final class WorkflowExecutorCache {
4342
private final Logger log = LoggerFactory.getLogger(WorkflowExecutorCache.class);
4443
private final WorkflowRunLockManager runLockManager;
45-
private final LoadingCache<String, WorkflowRunTaskHandler> cache;
44+
private final Cache<String, WorkflowRunTaskHandler> cache;
4645
private final Scope metricsScope;
4746

4847
public WorkflowExecutorCache(
@@ -73,13 +72,7 @@ public WorkflowExecutorCache(
7372
}
7473
}
7574
})
76-
.build(
77-
new CacheLoader<String, WorkflowRunTaskHandler>() {
78-
@Override
79-
public WorkflowRunTaskHandler load(String key) {
80-
return null;
81-
}
82-
});
75+
.build();
8376
this.metricsScope = Objects.requireNonNull(scope);
8477
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
8578
}
@@ -90,47 +83,30 @@ public WorkflowRunTaskHandler getOrCreate(
9083
Callable<WorkflowRunTaskHandler> workflowExecutorFn)
9184
throws Exception {
9285
WorkflowExecution execution = workflowTask.getWorkflowExecution();
86+
String runId = execution.getRunId();
9387
if (isFullHistory(workflowTask)) {
94-
// no need to call a full-blown #invalidate, because we don't need to unmark from processing
95-
// yet
96-
cache.invalidate(execution.getRunId());
97-
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
98-
88+
invalidate(execution, metricsScope, "full history", null);
9989
log.trace(
10090
"New Workflow Executor {}-{} has been created for a full history run",
10191
execution.getWorkflowId(),
102-
execution.getRunId());
92+
runId);
10393
return workflowExecutorFn.call();
10494
}
10595

106-
WorkflowRunTaskHandler workflowRunTaskHandler = getForProcessing(execution, workflowTypeScope);
96+
@Nullable WorkflowRunTaskHandler workflowRunTaskHandler = cache.getIfPresent(runId);
97+
10798
if (workflowRunTaskHandler != null) {
99+
workflowTypeScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
108100
return workflowRunTaskHandler;
109101
}
110102

111103
log.trace(
112104
"Workflow Executor {}-{} wasn't found in cache and a new executor has been created",
113105
execution.getWorkflowId(),
114-
execution.getRunId());
115-
return workflowExecutorFn.call();
116-
}
106+
runId);
107+
workflowTypeScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
117108

118-
public WorkflowRunTaskHandler getForProcessing(
119-
WorkflowExecution workflowExecution, Scope metricsScope) throws ExecutionException {
120-
String runId = workflowExecution.getRunId();
121-
try {
122-
WorkflowRunTaskHandler workflowRunTaskHandler = cache.get(runId);
123-
log.trace(
124-
"Workflow Execution {}-{} has been marked as in-progress",
125-
workflowExecution.getWorkflowId(),
126-
workflowExecution.getRunId());
127-
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
128-
return workflowRunTaskHandler;
129-
} catch (CacheLoader.InvalidCacheLoadException e) {
130-
// We don't have a default loader and don't want to have one. So it's ok to get null value.
131-
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
132-
return null;
133-
}
109+
return workflowExecutorFn.call();
134110
}
135111

136112
public void addToCache(
@@ -143,7 +119,13 @@ public void addToCache(
143119
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
144120
}
145121

146-
public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Scope metricsScope) {
122+
/**
123+
* @param workflowTypeScope accepts workflow metric scope (tagged with task queue and workflow
124+
* type)
125+
*/
126+
@SuppressWarnings("deprecation")
127+
public boolean evictAnyNotInProcessing(
128+
WorkflowExecution inFavorOfExecution, Scope workflowTypeScope) {
147129
try {
148130
String inFavorOfRunId = inFavorOfExecution.getRunId();
149131
for (String key : cache.asMap().keySet()) {
@@ -159,8 +141,8 @@ public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Sco
159141
inFavorOfRunId,
160142
key);
161143
cache.invalidate(key);
162-
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
163-
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
144+
workflowTypeScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
145+
workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
164146
return true;
165147
} finally {
166148
runLockManager.unlock(key);
@@ -175,22 +157,23 @@ public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Sco
175157
}
176158
}
177159

160+
@SuppressWarnings("deprecation")
178161
public void invalidate(
179162
WorkflowExecution execution, Scope workflowTypeScope, String reason, Throwable cause) {
180-
try {
181-
String runId = execution.getRunId();
182-
if (log.isTraceEnabled()) {
183-
log.trace(
184-
"Invalidating {}-{} because of '{}', value is present in the cache: {}",
185-
execution.getWorkflowId(),
186-
runId,
187-
reason,
188-
cache.getIfPresent(runId),
189-
cause);
190-
}
191-
cache.invalidate(runId);
163+
String runId = execution.getRunId();
164+
@Nullable WorkflowRunTaskHandler present = cache.getIfPresent(runId);
165+
if (log.isTraceEnabled()) {
166+
log.trace(
167+
"Invalidating {}-{} because of '{}', value is present in the cache: {}",
168+
execution.getWorkflowId(),
169+
runId,
170+
reason,
171+
present,
172+
cause);
173+
}
174+
cache.invalidate(runId);
175+
if (present != null) {
192176
workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
193-
} finally {
194177
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
195178
}
196179
}

temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private MetricsType() {}
129129
public static final String LOCAL_ACTIVITY_FAILED_COUNTER =
130130
TEMPORAL_METRICS_PREFIX + "local_activity_failed";
131131

132-
// Worker internals, tagged with worker_type
132+
// Worker internals, tagged with namespace, task_queue, worker_type
133133
public static final String WORKER_START_COUNTER = TEMPORAL_METRICS_PREFIX + "worker_start";
134134
public static final String POLLER_START_COUNTER = TEMPORAL_METRICS_PREFIX + "poller_start";
135135
// gauge
@@ -140,13 +140,31 @@ private MetricsType() {}
140140
// Worker Factory
141141
//
142142

143+
// tagged with namespace, task_queue, worker_type, workflow_type
143144
public static final String STICKY_CACHE_HIT = TEMPORAL_METRICS_PREFIX + "sticky_cache_hit";
145+
// tagged with namespace, task_queue, worker_type, workflow_type
144146
public static final String STICKY_CACHE_MISS = TEMPORAL_METRICS_PREFIX + "sticky_cache_miss";
147+
// tagged with namespace, task_queue, worker_type, workflow_type
148+
@Deprecated
149+
// This metric in its current form is useless, it's not possible for users to interpret it for any
150+
// meaningful purpose.
151+
// We count in workflows that are getting evicted because we are out of threads in workflow thread
152+
// pool. (makes sense)
153+
// We count in workflows that are getting "evicted" because a full history from the server is
154+
// received. (kinda makes sense)
155+
// We count in workflows that are getting "evicted" because they are done. But only if they were
156+
// added to the cache. (doesn't make sense)
157+
// We DON'T count in workflows that are getting "evicted" because the cache is overflown. (doesn't
158+
// make sense)
159+
// TODO revisit implementation of this metric in Go and if it make sense there, fix the Java
160+
// version.
161+
// Otherwise deprecate it everywhere and remove from docs.
145162
public static final String STICKY_CACHE_TOTAL_FORCED_EVICTION =
146163
TEMPORAL_METRICS_PREFIX + "sticky_cache_total_forced_eviction";
164+
// tagged with namespace, task_queue, worker_type, workflow_type
147165
public static final String STICKY_CACHE_THREAD_FORCED_EVICTION =
148166
TEMPORAL_METRICS_PREFIX + "sticky_cache_thread_forced_eviction";
149-
// gauge
167+
// gauge, tagged with namespace
150168
public static final String STICKY_CACHE_SIZE = TEMPORAL_METRICS_PREFIX + "sticky_cache_size";
151169
// gauge
152170
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =

temporal-sdk/src/test/java/io/temporal/common/reporter/TestStatsReporter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,18 @@ public synchronized void assertNoMetric(String name, Map<String, String> tags) {
5555
fail(
5656
"Metric '"
5757
+ metricName
58-
+ "', all reported metrics: \n "
59-
+ String.join("\n ", counters.keySet()));
58+
+ "' was reported, with value: '"
59+
+ counters.get(metricName).get()
60+
+ "'");
6061
}
6162
}
6263

6364
public synchronized void assertCounter(String name, Map<String, String> tags, long expected) {
65+
assertCounter(name, tags, actual -> actual == expected);
66+
}
67+
68+
public synchronized void assertCounter(
69+
String name, Map<String, String> tags, Predicate<Long> expected) {
6470
String metricName = getMetricName(name, tags);
6571
AtomicLong accumulator = counters.get(metricName);
6672
if (accumulator == null) {
@@ -70,7 +76,8 @@ public synchronized void assertCounter(String name, Map<String, String> tags, lo
7076
+ "', reported metrics: \n "
7177
+ String.join("\n ", counters.keySet()));
7278
}
73-
assertEquals(String.valueOf(accumulator.get()), expected, accumulator.get());
79+
long actual = accumulator.get();
80+
assertTrue("" + actual, expected.test(actual));
7481
}
7582

7683
public synchronized void assertGauge(String name, Map<String, String> tags, double expected) {

temporal-sdk/src/test/java/io/temporal/worker/StickyWorkerTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except
259259
}
260260

261261
@Test
262+
@SuppressWarnings("deprecation")
262263
public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Exception {
263264
// Arrange
264265
String taskQueueName = "cachedStickyTest_ChildWorkflows";
@@ -291,8 +292,21 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex
291292
.put(MetricsTag.WORKFLOW_TYPE, "TestWorkflow1")
292293
.build();
293294
metricsScope.close(); // Flush metrics
294-
reporter.assertCounter(MetricsType.STICKY_CACHE_HIT, tags, 2);
295+
296+
// making sure none workflow tasks came with a full history (after timeout from the sticky
297+
// queue) which caused a forced eviction from the cache. 1 eviction comes from the finishing of
298+
// the parent workflow and eviction of it from the cache.
299+
// TODO feel free to remove this assertion if refactoring out STICKY_CACHE_TOTAL_FORCED_EVICTION
300+
// metric. It has been added just as an additional verification to investigate a flaky test
301+
reporter.assertCounter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION, tags, 1);
302+
295303
reporter.assertNoMetric(MetricsType.STICKY_CACHE_MISS, tags);
304+
// It's valid for the server to schedule a workflow task
305+
// - after the child is started and before the child is completed
306+
// - or after it was completed
307+
// Depending on that, there will be one or two additional workflow tasks.
308+
// So both 1 or 2 here is a valid scenario. The main thing that matters is 0 cache miss.
309+
reporter.assertCounter(MetricsType.STICKY_CACHE_HIT, tags, a -> a == 1 || a == 2);
296310
// Finish Workflow
297311
wrapper.close();
298312
}

temporal-serviceclient/src/main/java/io/temporal/serviceclient/MetricsTag.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@
2929
import java.util.concurrent.ConcurrentMap;
3030

3131
public class MetricsTag {
32-
public static final String ACTIVITY_TYPE = "activity_type";
3332
public static final String NAMESPACE = "namespace";
3433
public static final String TASK_QUEUE = "task_queue";
34+
public static final String WORKER_TYPE = "worker_type";
35+
36+
public static final String ACTIVITY_TYPE = "activity_type";
3537
public static final String WORKFLOW_TYPE = "workflow_type";
3638
public static final String SIGNAL_NAME = "signal_name";
3739
public static final String QUERY_TYPE = "query_type";
3840
public static final String STATUS_CODE = "status_code";
3941
public static final String EXCEPTION = "exception";
4042
public static final String OPERATION_NAME = "operation";
41-
public static final String WORKER_TYPE = "worker_type";
4243

4344
/** Used to pass metrics scope to the interceptor */
4445
public static final CallOptions.Key<Scope> METRICS_TAGS_CALL_OPTIONS_KEY =

0 commit comments

Comments
 (0)