Exemplos de manipuladores Python para procedimentos armazenados

Como executar tarefas simultâneas com processos de trabalhador

Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.

Nota

A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.

Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.

Você pode fazer isso nos warehouses Snowflake usando a classe joblib da biblioteca Parallel, como no exemplo a seguir.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.12
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;

Nota

O back-end padrão usado para joblib.Parallel difere entre os warehouses padrão Snowflake e otimizados para Snowpark.

  • Padrão do warehouse: threading

  • Padrão do warehouse otimizado para Snowpark: loky (multiprocessamento)

Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend, como no exemplo a seguir.

import joblib
joblib.parallel_backend('loky')

Como usar APIs Snowpark para processamento assíncrono

Os exemplos a seguir ilustram como você pode usar as APIs Snowpark para iniciar trabalhos filho assíncronos, além de como esses trabalhos se comportam sob diferentes condições.

Verificação do status de um trabalho filho assíncrono

No exemplo a seguir, o procedimento checkStatus executa um trabalho filho assíncrono que aguarda 60 segundos. O procedimento então verifica o status do trabalho antes que ele possa ser concluído, então a verificação retorna False.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;

O código a seguir chama o procedimento.

CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

Cancelamento de um trabalho filho assíncrono

No exemplo a seguir, o procedimento cancelJob usa SQL para inserir dados na tabela test_tb com um trabalho filho assíncrono que levaria 10 segundos para terminar. Em seguida, ele cancela o trabalho filho antes que ele termine e os dados tenham sido inseridos.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();

O código a seguir consulta a tabela test_tb, mas não retorna resultados porque nenhum dado foi inserido.

SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

Espera e bloqueio enquanto um trabalho filho assíncrono é executado

No exemplo a seguir, o procedimento blockUntilDone executa um trabalho filho assíncrono que leva 5 segundos para terminar. Usando o método snowflake.snowpark.AsyncJob.result, o procedimento espera e retorna quando o trabalho termina.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;

O código a seguir chama o procedimento blockUntilDone, que retorna após esperar 5 segundos.

CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Retorno de um erro após solicitar resultados de um trabalho filho assíncrono inacabado

No exemplo a seguir, o procedimento earlyReturn executa um trabalho filho assíncrono que leva 60 segundos para terminar. O procedimento então tenta retornar um DataFrame do resultado do trabalho antes que ele possa ser concluído. O resultado é um erro.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;

O código a seguir chama o procedimento earlyReturn, retornando o erro.

CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Conclusão de um trabalho pai antes que um trabalho filho termine, cancelamento do trabalho filho

No exemplo a seguir, o procedimento earlyCancelJob executa um trabalho filho assíncrono para inserir dados em uma tabela e leva 10 segundos para terminar. Entretanto, a tarefa dos pais — async_handler — retorna antes que o trabalho filho termine, o que cancela o trabalho filho.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;

O código a seguir chama o procedimento earlyCancelJob. Em seguida, ele consulta a tabela test_tb, que não retorna nenhum resultado porque nenhum dado foi inserido pelo trabalho filho cancelado.

CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

Lendo arquivos e ativos

Como ler um arquivo especificado estaticamente usando IMPORTS

Você pode ler um arquivo especificando o nome do arquivo e o nome do estágio na cláusula IMPORTS do comando CREATE PROCEDURE.

Quando você especifica um arquivo na cláusula IMPORTS, o Snowflake copia esse arquivo da área de preparação para o diretório inicial do procedimento armazenado (também chamado de diretório de importação), que é o diretório do qual o procedimento armazenado lê o arquivo.

Snowflake copia arquivos importados para um único diretório. Todos os arquivos nesse diretório devem ter nomes exclusivos, portanto, cada arquivo em sua cláusula IMPORTS deve ter um nome distinto. Isso se aplica mesmo se os arquivos começarem em diferentes estágios ou diferentes subdiretórios dentro de um estágio.

O exemplo a seguir usa um manipulador Python inline que lê um arquivo chamado file.txt de um estágio chamado my_stage. O manipulador recupera a localização do diretório inicial do procedimento armazenado usando o método Python sys._xoptions com a opção do sistema snowflake_import_directory.

O Snowflake lê o arquivo somente uma vez durante a criação do procedimento armazenado, e não o lerá novamente durante a execução do procedimento armazenado se a leitura do arquivo for feita fora do manipulador de destino.

Crie o procedimento armazenado com um manipulador em linha:

CREATE OR REPLACE PROCEDURE test_file_import_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/dir/file.txt')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys

def run(session):
  with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    return f.read()
$$;
CALL test_file_import_sp();
// return file content

Importando um diretório com IMPORTS

Você pode importar um diretório usando a cláusula IMPORTS do comando CREATE PROCEDURE.

Nota

  • O caminho de importação de um diretório deve terminar com uma barra final (/). Por exemplo, IMPORTS = ('@my_stage/my_dir/').

  • Para renomear um diretório na importação, anexe /=custom_name/ ao caminho da área de preparação. O nome personalizado deve ser um único nome de diretório, não um caminho. Por exemplo, IMPORTS = ('@my_stage/my_dir/=custom_name/').

  • A importação de diretórios não é compatível com Native Apps.

O exemplo a seguir importa um diretório chamado my_dir de uma área de preparação chamada my_stage e lista os arquivos contidos nele.

CREATE OR REPLACE PROCEDURE my_directory_import_list_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/my_dir/')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys
def list_files(directory):
  files = []
  # Walk through the directory and its subdirectories
  for dirpath, _, filenames in os.walk(directory):
    for filename in filenames:
      # Append the relative path to each file to the list
      full_path = os.path.join(dirpath, filename)
      files.append(os.path.relpath(full_path, directory))
  return files
def run(session):
  directory_path = sys._xoptions["snowflake_import_directory"]
  file_list = list_files(directory_path)
  file_list_str = ' '.join(file_list)
  return file_list_str
$$;
CALL my_directory_import_list_sp();