Exemplos de manipuladores Python para procedimentos armazenados¶
Como executar tarefas simultâneas com processos de trabalhador¶
Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.
Nota
A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.
Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.
Você pode fazer isso nos warehouses Snowflake usando a classe joblib da biblioteca Parallel, como no exemplo a seguir.
Nota
O back-end padrão usado para joblib.Parallel difere entre os warehouses padrão Snowflake e otimizados para Snowpark.
Padrão do warehouse:
threadingPadrão do warehouse otimizado para Snowpark:
loky(multiprocessamento)
Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend, como no exemplo a seguir.
Como usar APIs Snowpark para processamento assíncrono¶
Os exemplos a seguir ilustram como você pode usar as APIs Snowpark para iniciar trabalhos filho assíncronos, além de como esses trabalhos se comportam sob diferentes condições.
Verificação do status de um trabalho filho assíncrono¶
No exemplo a seguir, o procedimento checkStatus executa um trabalho filho assíncrono que aguarda 60 segundos. O procedimento então verifica o status do trabalho antes que ele possa ser concluído, então a verificação retorna False.
O código a seguir chama o procedimento.
Cancelamento de um trabalho filho assíncrono¶
No exemplo a seguir, o procedimento cancelJob usa SQL para inserir dados na tabela test_tb com um trabalho filho assíncrono que levaria 10 segundos para terminar. Em seguida, ele cancela o trabalho filho antes que ele termine e os dados tenham sido inseridos.
O código a seguir consulta a tabela test_tb, mas não retorna resultados porque nenhum dado foi inserido.
Espera e bloqueio enquanto um trabalho filho assíncrono é executado¶
No exemplo a seguir, o procedimento blockUntilDone executa um trabalho filho assíncrono que leva 5 segundos para terminar. Usando o método snowflake.snowpark.AsyncJob.result, o procedimento espera e retorna quando o trabalho termina.
O código a seguir chama o procedimento blockUntilDone, que retorna após esperar 5 segundos.
Retorno de um erro após solicitar resultados de um trabalho filho assíncrono inacabado¶
No exemplo a seguir, o procedimento earlyReturn executa um trabalho filho assíncrono que leva 60 segundos para terminar. O procedimento então tenta retornar um DataFrame do resultado do trabalho antes que ele possa ser concluído. O resultado é um erro.
O código a seguir chama o procedimento earlyReturn, retornando o erro.
Conclusão de um trabalho pai antes que um trabalho filho termine, cancelamento do trabalho filho¶
No exemplo a seguir, o procedimento earlyCancelJob executa um trabalho filho assíncrono para inserir dados em uma tabela e leva 10 segundos para terminar. Entretanto, a tarefa dos pais — async_handler — retorna antes que o trabalho filho termine, o que cancela o trabalho filho.
O código a seguir chama o procedimento earlyCancelJob. Em seguida, ele consulta a tabela test_tb, que não retorna nenhum resultado porque nenhum dado foi inserido pelo trabalho filho cancelado.
Lendo arquivos e ativos¶
Como ler um arquivo especificado estaticamente usando IMPORTS¶
Você pode ler um arquivo especificando o nome do arquivo e o nome do estágio na cláusula IMPORTS do comando CREATE PROCEDURE.
Quando você especifica um arquivo na cláusula IMPORTS, o Snowflake copia esse arquivo da área de preparação para o diretório inicial do procedimento armazenado (também chamado de diretório de importação), que é o diretório do qual o procedimento armazenado lê o arquivo.
Snowflake copia arquivos importados para um único diretório. Todos os arquivos nesse diretório devem ter nomes exclusivos, portanto, cada arquivo em sua cláusula IMPORTS deve ter um nome distinto. Isso se aplica mesmo se os arquivos começarem em diferentes estágios ou diferentes subdiretórios dentro de um estágio.
O exemplo a seguir usa um manipulador Python inline que lê um arquivo chamado file.txt de um estágio chamado my_stage. O manipulador recupera a localização do diretório inicial do procedimento armazenado usando o método Python sys._xoptions com a opção do sistema snowflake_import_directory.
O Snowflake lê o arquivo somente uma vez durante a criação do procedimento armazenado, e não o lerá novamente durante a execução do procedimento armazenado se a leitura do arquivo for feita fora do manipulador de destino.
Crie o procedimento armazenado com um manipulador em linha:
Importando um diretório com IMPORTS¶
Você pode importar um diretório usando a cláusula IMPORTS do comando CREATE PROCEDURE.
Nota
O caminho de importação de um diretório deve terminar com uma barra final (
/). Por exemplo,IMPORTS = ('@my_stage/my_dir/').Para renomear um diretório na importação, anexe
/=custom_name/ao caminho da área de preparação. O nome personalizado deve ser um único nome de diretório, não um caminho. Por exemplo,IMPORTS = ('@my_stage/my_dir/=custom_name/').A importação de diretórios não é compatível com Native Apps.
O exemplo a seguir importa um diretório chamado my_dir de uma área de preparação chamada my_stage e lista os arquivos contidos nele.
