1212from metaflow .parameters import deploy_time_eval
1313from metaflow .util import compress_list , dict_to_cli_options , to_pascalcase
1414from metaflow .metaflow_config import SFN_IAM_ROLE , \
15- EVENTS_SFN_ACCESS_IAM_ROLE , SFN_DYNAMO_DB_TABLE , SFN_DYNAMO_DB_REGION
15+ EVENTS_SFN_ACCESS_IAM_ROLE , SFN_DYNAMO_DB_TABLE
1616
1717from .step_functions_client import StepFunctionsClient
1818from .event_bridge_client import EventBridgeClient
@@ -396,26 +396,33 @@ def _batch(self, node):
396396 if self .graph [parent ].type == 'foreach' :
397397 attrs ['split_parent_task_id_%s.$' % parent ] = \
398398 '$.Parameters.split_parent_task_id_%s' % parent
399- elif node .type == 'join' and \
400- self .graph [node .split_parents [- 1 ]].type == 'foreach' :
401- # A foreach join only gets one set of input from the parent
402- # tasks. We filter the Map state to only output `$.[0]`,
403- # since we don't need any of the other outputs, that
404- # information is available to us from AWS DynamoDB. This
405- # has a nice side-effect of making our foreach splits
406- # infinitely scalable because otherwise we would be bounded
407- # by the 32K state limit for the outputs. So, instead of
408- # referencing `Parameters` fields by index (like in
409- # `split-and`), we can just reference them directly.
410- attrs ['split_parent_task_id_%s.$' % \
411- node .split_parents [- 1 ]] = \
412- '$.Parameters.split_parent_task_id_%s' % \
413- node .split_parents [- 1 ]
414- for parent in node .split_parents [:- 1 ]:
415- if self .graph [parent ].type == 'foreach' :
416- attrs ['split_parent_task_id_%s.$' % parent ] = \
399+ elif node .type == 'join' :
400+ if self .graph [node .split_parents [- 1 ]].type == 'foreach' :
401+ # A foreach join only gets one set of input from the
402+ # parent tasks. We filter the Map state to only output
403+ # `$.[0]`, since we don't need any of the other outputs,
404+ # that information is available to us from AWS DynamoDB.
405+ # This has a nice side-effect of making our foreach
406+ # splits infinitely scalable because otherwise we would
407+ # be bounded by the 32K state limit for the outputs. So,
408+ # instead of referencing `Parameters` fields by index
409+ # (like in `split-and`), we can just reference them
410+ # directly.
411+ attrs ['split_parent_task_id_%s.$' % \
412+ node .split_parents [- 1 ]] = \
417413 '$.Parameters.split_parent_task_id_%s' % \
418- parent
414+ node .split_parents [- 1 ]
415+ for parent in node .split_parents [:- 1 ]:
416+ if self .graph [parent ].type == 'foreach' :
417+ attrs ['split_parent_task_id_%s.$' % parent ] = \
418+ '$.Parameters.split_parent_task_id_%s' % \
419+ parent
420+ else :
421+ for parent in node .split_parents :
422+ if self .graph [parent ].type == 'foreach' :
423+ attrs ['split_parent_task_id_%s.$' % parent ] = \
424+ '$.[0].Parameters.split_parent_task_id_%s' % \
425+ parent
419426 else :
420427 for parent in node .split_parents :
421428 if self .graph [parent ].type == 'foreach' :
@@ -426,7 +433,10 @@ def _batch(self, node):
426433 # next transition is to a foreach join, so that the
427434 # stepfunctions decorator can write the mapping for input path
428435 # to DynamoDb.
429- if any (self .graph [n ].type == 'join' for n in node .out_funcs ):
436+ if any (self .graph [n ].type == 'join' and \
437+ self .graph [self .graph [n ].split_parents [- 1 ]].type == \
438+ 'foreach'
439+ for n in node .out_funcs ):
430440 env ['METAFLOW_SPLIT_PARENT_TASK_ID_FOR_FOREACH_JOIN' ] = \
431441 attrs ['split_parent_task_id_%s.$' % \
432442 self .graph [node .out_funcs [0 ]].split_parents [- 1 ]]
@@ -450,14 +460,22 @@ def _batch(self, node):
450460 metaflow_version ['production_token' ] = self .production_token
451461 env ['METAFLOW_VERSION' ] = json .dumps (metaflow_version )
452462
453- # Set AWS DynamoDb Table Name/Region for state tracking for for-eaches
463+ # Set AWS DynamoDb Table Name for state tracking for for-eaches.
464+ # There are three instances when metaflow runtime directly interacts
465+ # with AWS DynamoDB.
466+ # 1. To set the cardinality of foreaches (which are subsequently)
467+ # read prior to the instantiation of the Map state by AWS Step
468+ # Functions.
469+ # 2. To set the input paths from the parent steps of a foreach join.
470+ # 3. To read the input paths in a foreach join.
454471 if node .type == 'foreach' or \
455472 (node .is_inside_foreach and \
456- any (self .graph [n ].type == 'join' for n in node .out_funcs )) or \
473+ any (self .graph [n ].type == 'join' and \
474+ self .graph [self .graph [n ].split_parents [- 1 ]].type == \
475+ 'foreach' for n in node .out_funcs )) or \
457476 (node .type == 'join' and \
458477 self .graph [node .split_parents [- 1 ]].type == 'foreach' ):
459478 env ['METAFLOW_SFN_DYNAMO_DB_TABLE' ] = SFN_DYNAMO_DB_TABLE
460- env ['METAFLOW_SFN_DYNAMO_DB_REGION' ] = SFN_DYNAMO_DB_REGION
461479
462480 # Resolve AWS Batch resource requirements.
463481 batch_deco = [deco for deco in node .decorators
0 commit comments