Skip to content

Commit b64c696

Browse files
committed
Bunch of enhancements
1. Remove the dependency on REGION_NAME for AWS DynamoDb access through AWS Batch. For now, just use the same region as AWS Batch, since there is no way for us to specify AWS Region for AWS DynamoDb within AWS StepFunctions 2. Handle branches within foreaches 3. Some code cleanup
1 parent bbad7ab commit b64c696

5 files changed

Lines changed: 53 additions & 34 deletions

File tree

metaflow/main_cli.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -479,11 +479,6 @@ def yellow(string):
479479
cyan('METAFLOW_SFN_DYNAMO_DB_TABLE:') +
480480
' AWS DynamoDB Table Name for tracking AWS Step '
481481
' Functions metadata')
482-
env_dict['METAFLOW_SFN_DYNAMO_DB_REGION'] =\
483-
click.prompt('\t' +
484-
cyan('METAFLOW_SFN_DYNAMO_DB_REGION:') +
485-
' Region of AWS DynamoDB Table for tracking AWS Step '
486-
' Functions metadata')
487482

488483
# Metadata service configuration.
489484
use_metadata = click.confirm('\nConfigure Metadata Service as default metadata provider?',

metaflow/metaflow_config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ def from_conf(name, default=None):
103103
SFN_IAM_ROLE = from_conf("METAFLOW_SFN_IAM_ROLE")
104104
# AWS DynamoDb Table name (with partition key - `pathspec` of type string)
105105
SFN_DYNAMO_DB_TABLE = from_conf("METAFLOW_SFN_DYNAMO_DB_TABLE")
106-
SFN_DYNAMO_DB_REGION = from_conf("METAFLOW_SFN_DYNAMO_DB_REGION")
107106
# IAM role for AWS Events with AWS Step Functions access'
108107
# https://docs.aws.amazon.com/eventbridge/latest/userguide/auth-and-access-control-eventbridge.html
109108
EVENTS_SFN_ACCESS_IAM_ROLE = from_conf("METAFLOW_EVENTS_SFN_ACCESS_IAM_ROLE")

metaflow/plugins/aws/step_functions/dynamo_db_client.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
import os
33

44
from metaflow.metaflow_config import get_authenticated_boto3_client, \
5-
SFN_DYNAMO_DB_TABLE, SFN_DYNAMO_DB_REGION
5+
SFN_DYNAMO_DB_TABLE
66

77

88
class DynamoDbClient(object):
99

1010
def __init__(self):
1111
self._client = get_authenticated_boto3_client('dynamodb',
12-
params = {'region_name': SFN_DYNAMO_DB_REGION})
12+
params = {'region_name': self._get_instance_region()})
1313
self.name = SFN_DYNAMO_DB_TABLE
1414

1515
def save_foreach_cardinality(self,
@@ -57,4 +57,9 @@ def get_parent_task_ids_for_foreach_join(self,
5757
ProjectionExpression = 'parent_task_ids_for_foreach_join',
5858
ConsistentRead = True
5959
)
60-
return response['Item']['parent_task_ids_for_foreach_join']['SS']
60+
return response['Item']['parent_task_ids_for_foreach_join']['SS']
61+
62+
def _get_instance_region(self):
63+
return os.popen(
64+
'curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone/'
65+
).read()[:-1]

metaflow/plugins/aws/step_functions/step_functions.py

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from metaflow.parameters import deploy_time_eval
1313
from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase
1414
from metaflow.metaflow_config import SFN_IAM_ROLE, \
15-
EVENTS_SFN_ACCESS_IAM_ROLE, SFN_DYNAMO_DB_TABLE, SFN_DYNAMO_DB_REGION
15+
EVENTS_SFN_ACCESS_IAM_ROLE, SFN_DYNAMO_DB_TABLE
1616

1717
from .step_functions_client import StepFunctionsClient
1818
from .event_bridge_client import EventBridgeClient
@@ -396,26 +396,33 @@ def _batch(self, node):
396396
if self.graph[parent].type == 'foreach':
397397
attrs['split_parent_task_id_%s.$' % parent] = \
398398
'$.Parameters.split_parent_task_id_%s' % parent
399-
elif node.type == 'join' and \
400-
self.graph[node.split_parents[-1]].type == 'foreach':
401-
# A foreach join only gets one set of input from the parent
402-
# tasks. We filter the Map state to only output `$.[0]`,
403-
# since we don't need any of the other outputs, that
404-
# information is available to us from AWS DynamoDB. This
405-
# has a nice side-effect of making our foreach splits
406-
# infinitely scalable because otherwise we would be bounded
407-
# by the 32K state limit for the outputs. So, instead of
408-
# referencing `Parameters` fields by index (like in
409-
# `split-and`), we can just reference them directly.
410-
attrs['split_parent_task_id_%s.$' % \
411-
node.split_parents[-1]] = \
412-
'$.Parameters.split_parent_task_id_%s' % \
413-
node.split_parents[-1]
414-
for parent in node.split_parents[:-1]:
415-
if self.graph[parent].type == 'foreach':
416-
attrs['split_parent_task_id_%s.$' % parent] = \
399+
elif node.type == 'join':
400+
if self.graph[node.split_parents[-1]].type == 'foreach':
401+
# A foreach join only gets one set of input from the
402+
# parent tasks. We filter the Map state to only output
403+
# `$.[0]`, since we don't need any of the other outputs,
404+
# that information is available to us from AWS DynamoDB.
405+
# This has a nice side-effect of making our foreach
406+
# splits infinitely scalable because otherwise we would
407+
# be bounded by the 32K state limit for the outputs. So,
408+
# instead of referencing `Parameters` fields by index
409+
# (like in `split-and`), we can just reference them
410+
# directly.
411+
attrs['split_parent_task_id_%s.$' % \
412+
node.split_parents[-1]] = \
417413
'$.Parameters.split_parent_task_id_%s' % \
418-
parent
414+
node.split_parents[-1]
415+
for parent in node.split_parents[:-1]:
416+
if self.graph[parent].type == 'foreach':
417+
attrs['split_parent_task_id_%s.$' % parent] = \
418+
'$.Parameters.split_parent_task_id_%s' % \
419+
parent
420+
else:
421+
for parent in node.split_parents:
422+
if self.graph[parent].type == 'foreach':
423+
attrs['split_parent_task_id_%s.$' % parent] = \
424+
'$.[0].Parameters.split_parent_task_id_%s' % \
425+
parent
419426
else:
420427
for parent in node.split_parents:
421428
if self.graph[parent].type == 'foreach':
@@ -426,7 +433,10 @@ def _batch(self, node):
426433
# next transition is to a foreach join, so that the
427434
# stepfunctions decorator can write the mapping for input path
428435
# to DynamoDb.
429-
if any(self.graph[n].type == 'join' for n in node.out_funcs):
436+
if any(self.graph[n].type == 'join' and \
437+
self.graph[self.graph[n].split_parents[-1]].type == \
438+
'foreach'
439+
for n in node.out_funcs):
430440
env['METAFLOW_SPLIT_PARENT_TASK_ID_FOR_FOREACH_JOIN'] = \
431441
attrs['split_parent_task_id_%s.$' % \
432442
self.graph[node.out_funcs[0]].split_parents[-1]]
@@ -450,14 +460,22 @@ def _batch(self, node):
450460
metaflow_version['production_token'] = self.production_token
451461
env['METAFLOW_VERSION'] = json.dumps(metaflow_version)
452462

453-
# Set AWS DynamoDb Table Name/Region for state tracking for for-eaches
463+
# Set AWS DynamoDb Table Name for state tracking for for-eaches.
464+
# There are three instances when metaflow runtime directly interacts
465+
# with AWS DynamoDB.
466+
# 1. To set the cardinality of foreaches (which are subsequently)
467+
# read prior to the instantiation of the Map state by AWS Step
468+
# Functions.
469+
# 2. To set the input paths from the parent steps of a foreach join.
470+
# 3. To read the input paths in a foreach join.
454471
if node.type == 'foreach' or \
455472
(node.is_inside_foreach and \
456-
any(self.graph[n].type == 'join' for n in node.out_funcs)) or \
473+
any(self.graph[n].type == 'join' and \
474+
self.graph[self.graph[n].split_parents[-1]].type == \
475+
'foreach' for n in node.out_funcs)) or \
457476
(node.type == 'join' and \
458477
self.graph[node.split_parents[-1]].type == 'foreach'):
459478
env['METAFLOW_SFN_DYNAMO_DB_TABLE'] = SFN_DYNAMO_DB_TABLE
460-
env['METAFLOW_SFN_DYNAMO_DB_REGION'] = SFN_DYNAMO_DB_REGION
461479

462480
# Resolve AWS Batch resource requirements.
463481
batch_deco = [deco for deco in node.decorators

metaflow/plugins/aws/step_functions/step_functions_decorator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ def task_finished(self,
5757
# instead write the task ids from the parent task to DynamoDb and read
5858
# it back in the foreach join
5959
elif graph[step_name].is_inside_foreach and \
60-
any(graph[n].type == 'join' for n in graph[step_name].out_funcs):
60+
any(graph[n].type == 'join' and \
61+
graph[graph[n].split_parents[-1]].type == 'foreach'
62+
for n in graph[step_name].out_funcs):
6163
self._save_parent_task_id_for_foreach_join(
6264
os.environ['METAFLOW_SPLIT_PARENT_TASK_ID_FOR_FOREACH_JOIN'],
6365
os.environ['AWS_BATCH_JOB_ID'])

0 commit comments

Comments
 (0)