diff --git a/src/batch_python_tutorial_ffmpeg.py b/src/batch_python_tutorial_ffmpeg.py index 1a7095d..d875cdc 100644 --- a/src/batch_python_tutorial_ffmpeg.py +++ b/src/batch_python_tutorial_ffmpeg.py @@ -23,6 +23,7 @@ # unique to your accounts. These are used when constructing connection strings # for the Batch and Storage client objects. + def query_yes_no(question, default="yes"): """ Prompts the user for yes/no input, displaying the specified question text. @@ -92,11 +93,10 @@ def upload_file_to_container(block_blob_client, container_name, file_path): block_blob_client.create_blob_from_path(container_name, blob_name, file_path) - + # Obtain the SAS token for the container. sas_token = get_container_sas_token(block_blob_client, - container_name, azureblob.BlobPermissions.READ) - + container_name, azureblob.BlobPermissions.READ) sas_url = block_blob_client.make_blob_url(container_name, blob_name, @@ -105,6 +105,7 @@ def upload_file_to_container(block_blob_client, container_name, file_path): return batchmodels.ResourceFile(file_path=blob_name, http_url=sas_url) + def get_container_sas_token(block_blob_client, container_name, blob_permissions): """ @@ -130,9 +131,8 @@ def get_container_sas_token(block_blob_client, return container_sas_token - def get_container_sas_url(block_blob_client, - container_name, blob_permissions): + container_name, blob_permissions): """ Obtains a shared access signature URL that provides write access to the ouput container to which the tasks will upload their output. @@ -146,10 +146,11 @@ def get_container_sas_url(block_blob_client, """ # Obtain the SAS token for the container. sas_token = get_container_sas_token(block_blob_client, - container_name, azureblob.BlobPermissions.WRITE) + container_name, azureblob.BlobPermissions.WRITE) # Construct SAS URL for the container - container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format(config._STORAGE_ACCOUNT_NAME, container_name, sas_token) + container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format( + config._STORAGE_ACCOUNT_NAME, container_name, sas_token) return container_sas_url @@ -174,16 +175,16 @@ def create_pool(batch_service_client, pool_id): # The start task installs ffmpeg on each node from an available repository, using # an administrator user identity. - + new_pool = batch.models.PoolAddParameter( id=pool_id, virtual_machine_configuration=batchmodels.VirtualMachineConfiguration( image_reference=batchmodels.ImageReference( - publisher="Canonical", - offer="UbuntuServer", - sku="18.04-LTS", - version="latest" - ), + publisher="Canonical", + offer="UbuntuServer", + sku="18.04-LTS", + version="latest" + ), node_agent_sku_id="batch.node.ubuntu 18.04"), vm_size=config._POOL_VM_SIZE, target_dedicated_nodes=config._DEDICATED_POOL_NODE_COUNT, @@ -193,13 +194,14 @@ def create_pool(batch_service_client, pool_id): wait_for_success=True, user_identity=batchmodels.UserIdentity( auto_user=batchmodels.AutoUserSpecification( - scope=batchmodels.AutoUserScope.pool, - elevation_level=batchmodels.ElevationLevel.admin)), - ) + scope=batchmodels.AutoUserScope.pool, + elevation_level=batchmodels.ElevationLevel.admin)), + ) ) batch_service_client.pool.add(new_pool) + def create_job(batch_service_client, job_id, pool_id): """ Creates a job with the specified ID, associated with the specified pool. @@ -216,7 +218,8 @@ def create_job(batch_service_client, job_id, pool_id): pool_info=batch.models.PoolInformation(pool_id=pool_id)) batch_service_client.job.add(job) - + + def add_tasks(batch_service_client, job_id, input_files, output_container_sas_url): """ Adds a task for each input file in the collection to the specified job. @@ -234,26 +237,26 @@ def add_tasks(batch_service_client, job_id, input_files, output_container_sas_ur tasks = list() - for idx, input_file in enumerate(input_files): - input_file_path=input_file.file_path - output_file_path="".join((input_file_path).split('.')[:-1]) + '.mp3' - command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(input_file_path, output_file_path) + for idx, input_file in enumerate(input_files): + input_file_path = input_file.file_path + output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3' + command = "/bin/bash -c \"ffmpeg -i {} {} \"".format( + input_file_path, output_file_path) tasks.append(batch.models.TaskAddParameter( id='Task{}'.format(idx), command_line=command, resource_files=[input_file], output_files=[batchmodels.OutputFile( - file_pattern=output_file_path, - destination=batchmodels.OutputFileDestination( - container=batchmodels.OutputFileBlobContainerDestination( - container_url=output_container_sas_url)), - upload_options=batchmodels.OutputFileUploadOptions( - upload_condition=batchmodels.OutputFileUploadCondition.task_success))] - ) - ) + file_pattern=output_file_path, + destination=batchmodels.OutputFileDestination( + container=batchmodels.OutputFileBlobContainerDestination( + container_url=output_container_sas_url)), + upload_options=batchmodels.OutputFileUploadOptions( + upload_condition=batchmodels.OutputFileUploadCondition.task_success))] + ) + ) batch_service_client.task.add_collection(job_id, tasks) - def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): """ @@ -289,7 +292,6 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): "timeout period of " + str(timeout)) - if __name__ == '__main__': start_time = datetime.datetime.now().replace(microsecond=0) @@ -299,14 +301,13 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): # Create the blob client, for use in obtaining references to # blob storage containers and uploading files to containers. - blob_client = azureblob.BlockBlobService( account_name=config._STORAGE_ACCOUNT_NAME, account_key=config._STORAGE_ACCOUNT_KEY) # Use the blob client to create the containers in Azure Storage if they # don't yet exist. - + input_container_name = 'input' output_container_name = 'output' blob_client.create_container(input_container_name, fail_on_exist=False) @@ -314,22 +315,23 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): print('Container [{}] created.'.format(input_container_name)) print('Container [{}] created.'.format(output_container_name)) - # Create a list of all MP4 files in the InputFiles directory. + # Create a list of all MP4 files in the InputFiles directory. input_file_paths = [] - - for folder, subs, files in os.walk(os.path.join(sys.path[0],'InputFiles')): + + for folder, subs, files in os.walk(os.path.join(sys.path[0], 'InputFiles')): for filename in files: if filename.endswith(".mp4"): - input_file_paths.append(os.path.abspath(os.path.join(folder, filename))) + input_file_paths.append(os.path.abspath( + os.path.join(folder, filename))) - # Upload the input files. This is the collection of files that are to be processed by the tasks. + # Upload the input files. This is the collection of files that are to be processed by the tasks. input_files = [ upload_file_to_container(blob_client, input_container_name, file_path) for file_path in input_file_paths] # Obtain a shared access signature URL that provides write access to the output # container to which the tasks will upload their output. - + output_container_sas_url = get_container_sas_url( blob_client, output_container_name, @@ -348,13 +350,14 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): # Create the pool that will contain the compute nodes that will execute the # tasks. create_pool(batch_client, config._POOL_ID) - + # Create the job that will run the tasks. create_job(batch_client, config._JOB_ID, config._POOL_ID) - # Add the tasks to the job. Pass the input files and a SAS URL + # Add the tasks to the job. Pass the input files and a SAS URL # to the storage container for output files. - add_tasks(batch_client, config._JOB_ID, input_files, output_container_sas_url) + add_tasks(batch_client, config._JOB_ID, + input_files, output_container_sas_url) # Pause execution until tasks reach Completed state. wait_for_tasks_to_complete(batch_client, @@ -362,16 +365,15 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): datetime.timedelta(minutes=30)) print(" Success! All tasks reached the 'Completed' state within the " - "specified timeout period.") + "specified timeout period.") except batchmodels.BatchErrorException as err: - print_batch_exception(err) - raise + print_batch_exception(err) + raise # Delete input container in storage print('Deleting container [{}]...'.format(input_container_name)) blob_client.delete_container(input_container_name) - # Print out some timing info end_time = datetime.datetime.now().replace(microsecond=0) diff --git a/src/config.py b/src/config.py index b90bdec..a1fbc7b 100644 --- a/src/config.py +++ b/src/config.py @@ -1,15 +1,15 @@ -#------------------------------------------------------------------------- -# -# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, -# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES +# ------------------------------------------------------------------------- +# +# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, +# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. -#---------------------------------------------------------------------------------- +# ---------------------------------------------------------------------------------- # The example companies, organizations, products, domain names, # e-mail addresses, logos, people, places, and events depicted # herein are fictitious. No association with any real company, # organization, product, domain name, email address, logo, person, # places, or events is intended or should be inferred. -#-------------------------------------------------------------------------- +# -------------------------------------------------------------------------- # Global constant variables (Azure Storage account/Batch details) @@ -19,7 +19,7 @@ # unique to your accounts. These are used when constructing connection strings # for the Batch and Storage client objects. -_BATCH_ACCOUNT_NAME ='' +_BATCH_ACCOUNT_NAME = '' _BATCH_ACCOUNT_KEY = '' _BATCH_ACCOUNT_URL = '' _STORAGE_ACCOUNT_NAME = '' @@ -28,4 +28,4 @@ _DEDICATED_POOL_NODE_COUNT = 0 _LOW_PRIORITY_POOL_NODE_COUNT = 5 _POOL_VM_SIZE = 'STANDARD_A1_v2' -_JOB_ID = 'LinuxFfmpegJob' \ No newline at end of file +_JOB_ID = 'LinuxFfmpegJob'