Skip to content

Commit 45c0a46

Browse files
committed
[SPARK-53941][SS] Support AQE in stateless streaming workloads
### What changes were proposed in this pull request? This PR proposes to support AQE in stateless streaming workloads. We have been disabling it due to the incompatibility with stateful operator, but it's arguably too restricted given shuffles are still triggered in stateless streaming workloads and stream-static join can benefit with it. Note that AQE performs re-optimization which replans via reapplying optimization and physical planning against a logical link (optimized plan) for stage. IncrementalExecution instance may not be available during AQE re-optimization (e.g. ForeachBatch sink), hence streaming specific physical planning rules aren't compatible with AQE re-optimization. These rules are reserved for initializing stateful operators, hence stateless workloads are still safe to apply full phase of AQE. Worth mentioning that AQE is not enabled for 1) continuous mode (and upcoming real time mode) 2) stateful workloads. * continuous mode (and real time mode): AQE doesn't make sense for continuous and real time mode since stages run concurrently. * stateful workloads: AQE can't change the number of partitions in stateful operator, and even if it's changeable, repartitioning state would cost a lot and we shouldn't decide it per batch based on specific batch's data distribution. ### Why are the changes needed? There are still various cases where stateless operators trigger shuffle (e.g. stream-static join), and these operators have the same characteristic with batch query which AQE has been battle tested and proved its usefulness for a long time. ### Does this PR introduce _any_ user-facing change? Yes, AQE will be enabled in stateless streaming workloads. Given that AQE is set to true by default, stateless streaming queries will take effect, regardless whether the query starts with new Spark version, or being upgraded from old Spark version. This PR also updates this to the migration guide. ### How was this patch tested? Existing tests will run with AQE enabled if the query is stateless. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52642 from HeartSaVioR/WIP-AQE-in-stateless-streaming-query. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent bcb0817 commit 45c0a46

16 files changed

Lines changed: 411 additions & 125 deletions

File tree

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public enum LogKeys implements LogKey {
4444
APP_ID,
4545
APP_NAME,
4646
APP_STATE,
47+
AQE_PLAN,
4748
ARCHIVE_NAME,
4849
ARGS,
4950
ARTIFACTS,

docs/streaming/ss-migration-guide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ Note that this migration guide describes the items specific to Structured Stream
2323
Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
2424
Please refer [Migration Guide: SQL, Datasets and DataFrame](../sql-migration-guide.html).
2525

26+
## Upgrading from Structured Streaming 4.0 to 4.1
27+
28+
- Since Spark 4.1, AQE is supported for stateless workloads, and it could affect the behavior of the query after upgrade (especially since AQE is turned on by default). In general, it helps to achieve better performance including resolution of skewed partition, but you can turn off AQE via changing `spark.sql.adaptive.enabled` to `false` to restore the behavior if you see regression.
29+
2630
## Upgrading from Structured Streaming 3.5 to 4.0
2731

2832
- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen
5252
InMemoryScans ::
5353
SparkScripts ::
5454
Pipelines ::
55-
BasicOperators :: Nil)
55+
BasicOperators ::
56+
// Need to be here since users can specify withWatermark in stateless streaming query.
57+
EventTimeWatermarkStrategy :: Nil)
5658

5759
/**
5860
* Override to add extra planning strategies to the planner. These strategies are tried after

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
421421
}
422422
}
423423

424-
/**
425-
* Used to plan streaming aggregation queries that are computed incrementally as part of a
426-
* [[org.apache.spark.sql.streaming.StreamingQuery]]. Currently this rule is injected into the
427-
* planner on-demand, only when planning in a
428-
* [[org.apache.spark.sql.execution.streaming.StreamExecution]]
429-
*/
430-
object StatefulAggregationStrategy extends Strategy {
424+
object EventTimeWatermarkStrategy extends Strategy {
431425
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
432426
case _ if !plan.isStreaming => Nil
433427

@@ -445,6 +439,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
445439
"Please report your query to Spark user mailing list.")
446440
}
447441
UpdateEventTimeColumnExec(columnName, delay.get, None, planLater(child)) :: Nil
442+
}
443+
}
444+
445+
/**
446+
* Used to plan streaming aggregation queries that are computed incrementally as part of a
447+
* [[org.apache.spark.sql.streaming.StreamingQuery]]. Currently this rule is injected into the
448+
* planner on-demand, only when planning in a
449+
* [[org.apache.spark.sql.execution.streaming.StreamExecution]]
450+
*/
451+
object StatefulAggregationStrategy extends Strategy {
452+
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
453+
case _ if !plan.isStreaming => Nil
448454

449455
case PhysicalAggregation(
450456
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedC
3232
import org.apache.spark.sql.execution.datasources.V1WriteCommand
3333
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
3434
import org.apache.spark.sql.execution.exchange.Exchange
35+
import org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperator
3536
import org.apache.spark.sql.internal.SQLConf
3637

3738
/**
@@ -55,6 +56,15 @@ case class InsertAdaptiveSparkPlan(
5556
case c: DataWritingCommandExec
5657
if !c.cmd.isInstanceOf[V1WriteCommand] || !conf.plannedWriteEnabled =>
5758
c.copy(child = apply(c.child))
59+
// SPARK-53941: Do not apply AQE for stateful streaming workloads. From recent change of shuffle
60+
// origin for shuffle being added from stateful operator, we anticipate stateful operator to
61+
// work with AQE. But we want to make the adoption of AQE be gradual, to have a risk under
62+
// control. Note that we will disable the value of AQE config explicitly in streaming engine,
63+
// but also introduce this pattern here for defensive programming.
64+
case _ if plan.exists {
65+
case _: StatefulOperator => true
66+
case _ => false
67+
} => plan
5868
case _ if shouldApplyAQE(plan, isSubquery) =>
5969
if (supportAdaptive(plan)) {
6070
try {
@@ -114,9 +124,7 @@ case class InsertAdaptiveSparkPlan(
114124
}
115125

116126
private def supportAdaptive(plan: SparkPlan): Boolean = {
117-
sanityCheck(plan) &&
118-
!plan.logicalLink.exists(_.isStreaming) &&
119-
plan.children.forall(supportAdaptive)
127+
sanityCheck(plan) && plan.children.forall(supportAdaptive)
120128
}
121129

122130
private def sanityCheck(plan: SparkPlan): Boolean =
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.internal.{Logging, LogKeys}
21+
import org.apache.spark.sql.execution.SparkPlan
22+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
23+
import org.apache.spark.sql.execution.adaptive.QueryStageExec
24+
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
25+
26+
/**
27+
* This is an utility object placing methods to traverse the query plan for streaming query.
28+
* This is used for patterns of traversal which are repeated in multiple places.
29+
*/
30+
object StreamingQueryPlanTraverseHelper extends Logging {
31+
def collectFromUnfoldedPlan[B](
32+
executedPlan: SparkPlan)(
33+
pf: PartialFunction[SparkPlan, B]): Seq[B] = {
34+
executedPlan.flatMap {
35+
// InMemoryTableScanExec is a node to represent a cached plan. The node has underlying
36+
// actual executed plan, which we should traverse to collect the required information.
37+
case s: InMemoryTableScanExec => collectFromUnfoldedPlan(s.relation.cachedPlan)(pf)
38+
39+
// AQE physical node contains the executed plan, pick the plan.
40+
// In most cases, AQE physical node is expected to contain the final plan, which is
41+
// appropriate for the caller.
42+
// Even it does not contain the final plan (in whatever reason), we just provide the
43+
// plan as best effort, as there is no better way around.
44+
case a: AdaptiveSparkPlanExec =>
45+
if (!a.isFinalPlan) {
46+
logWarning(log"AQE plan is captured, but the executed plan in AQE plan is not" +
47+
log"the final one. Providing incomplete executed plan. AQE plan: ${MDC(
48+
LogKeys.AQE_PLAN, a)}")
49+
}
50+
collectFromUnfoldedPlan(a.executedPlan)(pf)
51+
52+
// There are several AQE-specific leaf nodes which covers shuffle. We should pick the
53+
// underlying plan of these nodes, since the underlying plan has the actual executed
54+
// nodes which we want to collect metrics.
55+
case e: QueryStageExec => collectFromUnfoldedPlan(e.plan)(pf)
56+
57+
case p if pf.isDefinedAt(p) => Seq(pf(p))
58+
case _ => Seq.empty[B]
59+
}
60+
}
61+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi
3838
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
3939
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
4040
import org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec, TransformWithStateInPySparkExec}
41+
import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
4142
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata}
4243
import org.apache.spark.sql.execution.streaming.operators.stateful.{SessionWindowStateStoreRestoreExec, SessionWindowStateStoreSaveExec, StatefulOperator, StatefulOperatorStateInfo, StateStoreRestoreExec, StateStoreSaveExec, StateStoreWriter, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingGlobalLimitExec, StreamingLocalLimitExec, UpdateEventTimeColumnExec}
4344
import org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.FlatMapGroupsWithStateExec
@@ -638,10 +639,11 @@ class IncrementalExecution(
638639
def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
639640
val tentativeBatchId = currentBatchId + 1
640641
watermarkPropagator.propagate(tentativeBatchId, executedPlan, newMetadata.batchWatermarkMs)
641-
executedPlan.collect {
642-
case p: StateStoreWriter => p.shouldRunAnotherBatch(
643-
watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId,
644-
p.stateInfo.get.operatorId))
645-
}.exists(_ == true)
642+
StreamingQueryPlanTraverseHelper
643+
.collectFromUnfoldedPlan(executedPlan) {
644+
case p: StateStoreWriter => p.shouldRunAnotherBatch(
645+
watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId,
646+
p.stateInfo.get.operatorId))
647+
}.exists(_ == true)
646648
}
647649
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.internal.LogKeys
2727
import org.apache.spark.internal.LogKeys.BATCH_ID
2828
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2929
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp}
30-
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, GlobalLimit, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan}
30+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark}
3131
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
3232
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
3333
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -344,9 +344,40 @@ class MicroBatchExecution(
344344
setLatestExecutionContext(execCtx)
345345

346346
populateStartOffsets(execCtx, sparkSessionForStream)
347+
348+
// SPARK-53941: This code path is executed for the first batch, regardless of whether it's a
349+
// fresh new run or restart.
350+
disableAQESupportInStatelessIfUnappropriated(sparkSessionForStream)
351+
347352
logInfo(log"Stream started from ${MDC(LogKeys.STREAMING_OFFSETS_START, execCtx.startOffsets)}")
348353
execCtx
349354
}
355+
356+
private def disableAQESupportInStatelessIfUnappropriated(
357+
sparkSessionToRunBatches: SparkSession): Unit = {
358+
def containsStatefulOperator(p: LogicalPlan): Boolean = {
359+
p.exists {
360+
case node: Aggregate if node.isStreaming => true
361+
case node: Deduplicate if node.isStreaming => true
362+
case node: DeduplicateWithinWatermark if node.isStreaming => true
363+
case node: Distinct if node.isStreaming => true
364+
case node: Join if node.left.isStreaming && node.right.isStreaming => true
365+
case node: FlatMapGroupsWithState if node.isStreaming => true
366+
case node: FlatMapGroupsInPandasWithState if node.isStreaming => true
367+
case node: TransformWithState if node.isStreaming => true
368+
case node: TransformWithStateInPySpark if node.isStreaming => true
369+
case node: GlobalLimit if node.isStreaming => true
370+
case _ => false
371+
}
372+
}
373+
374+
if (containsStatefulOperator(analyzedPlan)) {
375+
// SPARK-53941: We disable AQE for stateful workloads as of now.
376+
logWarning(log"Disabling AQE since AQE is not supported in stateful workloads.")
377+
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
378+
}
379+
}
380+
350381
/**
351382
* Repeatedly attempts to run batches as data arrives.
352383
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.Table
3737
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream}
3838
import org.apache.spark.sql.execution.{QueryExecution, StreamSourceAwareSparkPlan}
3939
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
40+
import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
4041
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata
4142
import org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeWatermarkExec, StateStoreWriter}
4243
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
@@ -443,8 +444,8 @@ abstract class ProgressContext(
443444

444445
val sources = newData.keys.toSet
445446

446-
val sourceToInputRowsTuples = lastExecution.executedPlan
447-
.collect {
447+
val sourceToInputRowsTuples = StreamingQueryPlanTraverseHelper
448+
.collectFromUnfoldedPlan(lastExecution.executedPlan) {
448449
case node: StreamSourceAwareSparkPlan if node.getStream.isDefined =>
449450
val numRows = node.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
450451
node.getStream.get -> numRows
@@ -502,12 +503,13 @@ abstract class ProgressContext(
502503
// It's possible that multiple DataSourceV2ScanExec instances may refer to the same source
503504
// (can happen with self-unions or self-joins). This means the source is scanned multiple
504505
// times in the query, we should count the numRows for each scan.
505-
val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
506-
case s: MicroBatchScanExec =>
507-
val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
508-
val source = s.stream
509-
source -> numRows
510-
}
506+
val sourceToInputRowsTuples = StreamingQueryPlanTraverseHelper
507+
.collectFromUnfoldedPlan(lastExecution.executedPlan) {
508+
case s: MicroBatchScanExec =>
509+
val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
510+
val source = s.stream
511+
source -> numRows
512+
}
511513
logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))
512514
sumRows(sourceToInputRowsTuples)
513515
} else {
@@ -544,7 +546,10 @@ abstract class ProgressContext(
544546
val finalLogicalPlan = unrollCTE(lastExecution.logical)
545547

546548
val allLogicalPlanLeaves = finalLogicalPlan.collectLeaves() // includes non-streaming
547-
val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
549+
val allExecPlanLeaves = StreamingQueryPlanTraverseHelper
550+
.collectFromUnfoldedPlan(lastExecution.executedPlan) {
551+
case p if p.children.isEmpty => p
552+
}
548553
if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
549554
val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
550555
case (_, ep: MicroBatchScanExec) =>
@@ -580,10 +585,11 @@ abstract class ProgressContext(
580585
private def extractStateOperatorMetrics(
581586
lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {
582587
assert(lastExecution != null, "lastExecution is not available")
583-
lastExecution.executedPlan.collect {
584-
case p if p.isInstanceOf[StateStoreWriter] =>
585-
p.asInstanceOf[StateStoreWriter].getProgress()
586-
}
588+
StreamingQueryPlanTraverseHelper
589+
.collectFromUnfoldedPlan(lastExecution.executedPlan) {
590+
case p if p.isInstanceOf[StateStoreWriter] =>
591+
p.asInstanceOf[StateStoreWriter].getProgress()
592+
}
587593
}
588594

589595
/** Extracts statistics from the most recent query execution. */
@@ -609,8 +615,8 @@ abstract class ProgressContext(
609615
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp, sinkOutput)
610616
}
611617

612-
val eventTimeStats = lastExecution.executedPlan
613-
.collect {
618+
val eventTimeStats = StreamingQueryPlanTraverseHelper
619+
.collectFromUnfoldedPlan(lastExecution.executedPlan) {
614620
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
615621
val stats = e.eventTimeStats.value
616622
Map(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLi
4343
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write}
4444
import org.apache.spark.sql.execution.SparkPlan
4545
import org.apache.spark.sql.execution.command.StreamingExplainCommand
46+
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
4647
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitLog, OffsetSeqLog, OffsetSeqMetadata}
4748
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperator, StateStoreWriter}
4849
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchUserFuncException, ForeachUserFuncException}
@@ -304,8 +305,6 @@ abstract class StreamExecution(
304305

305306
// While active, repeatedly attempt to run batches.
306307
sparkSessionForStream.withActive {
307-
// Adaptive execution can change num shuffle partitions, disallow
308-
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
309308
// Disable cost-based join optimization as we do not want stateful operations
310309
// to be rearranged
311310
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
@@ -315,6 +314,12 @@ abstract class StreamExecution(
315314
sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key,
316315
"false")
317316

317+
if (trigger.isInstanceOf[ContinuousTrigger]) {
318+
// SPARK-53941: AQE does not make sense for continuous processing, disable it.
319+
logWarning("Disabling AQE since the query runs with continuous mode.")
320+
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
321+
}
322+
318323
getLatestExecutionContext().updateStatusMessage("Initializing sources")
319324
// force initialization of the logical plan so that the sources can be created
320325
logicalPlan

0 commit comments

Comments
 (0)