저장 프로시저용 Python 처리기 예

워커 프로세스로 동시 작업 실행하기

Python 작업자 프로세스를 사용하여 동시 작업을 실행할 수 있습니다. 웨어하우스 노드에서 여러 CPU 코어를 활용하는 병렬 작업을 실행해야 할 때 이 기능이 유용할 수 있습니다.

참고

기본 제공된 Python 다중 처리 모듈을 사용하지 않는 것이 좋습니다.

Python Global Interpreter Lock 으로 인해 멀티태스킹 접근 방식이 모든 CPU 코어에서 확장되지 못하는 문제를 해결하려면 스레드가 아닌 별도의 작업자 프로세스를 사용하여 동시 작업을 실행할 수 있습니다.

다음 예에서처럼 joblib 라이브러리의 Parallel 클래스를 사용하여 Snowflake 웨어하우스에서 이 작업을 수행할 수 있습니다.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.12
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;

참고

joblib.Parallel 에 사용되는 기본 백엔드는 Snowflake 표준과 Snowpark 최적화 웨어하우스 간에 다릅니다.

  • 표준 웨어하우스 기본값: threading

  • Snowpark 최적화 웨어하우스 기본값: loky (다중 처리)

다음 예에서처럼 joblib.parallel_backend 함수를 호출하여 기본 백엔드 설정을 재정의할 수 있습니다.

import joblib
joblib.parallel_backend('loky')

비동기 처리에 Snowpark APIs 사용하기

다음 예제에서는 Snowpark APIs를 사용하여 비동기 하위 작업을 시작하는 방법과 다양한 조건에서 이러한 작업이 어떻게 작동하는지 설명합니다.

비동기 하위 작업의 상태 확인하기

다음 예제에서 checkStatus 프로시저는 60초 동안 기다리는 비동기 하위 작업을 실행합니다. 그런 다음 프로시저는 작업이 완료되기 전에 작업의 상태를 확인하므로 확인은 False 를 반환합니다.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;

다음 코드는 프로시저를 호출합니다.

CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

비동기 하위 작업 취소하기

다음 예제에서, cancelJob 프로시저는 SQL을 사용하여 완료하는 데 10초가 걸리는 비동기 하위 작업으로 test_tb 테이블에 데이터를 삽입합니다. 그런 다음 작업이 완료되고 데이터가 삽입되기 전에 하위 작업을 취소합니다.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();

다음 코드는 test_tb 테이블을 쿼리하지만, 데이터가 삽입되지 않았기 때문에 결과를 반환하지 않습니다.

SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

비동기 하위 작업이 실행되는 동안 대기 및 차단하기

다음 예제에서, blockUntilDone 프로시저는 완료하는 데 5초가 걸리는 비동기 하위 작업을 실행합니다. snowflake.snowpark.AsyncJob.result 메서드를 사용하면 프로시저가 대기하다가 작업이 완료되면 반환합니다.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;

다음 코드는 5초를 기다린 후 반환되는 blockUntilDone 프로시저를 호출합니다.

CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

완료되지 않은 비동기 하위 작업의 결과를 요청한 후 오류 반환하기

다음 예제에서, earlyReturn 프로시저는 완료하는 데 60초가 걸리는 비동기 하위 작업을 실행합니다. 그런 다음 프로시저는 작업이 완료되기 전에 작업 결과에서 DataFrame 의 반환을 시도합니다. 결과적으로 오류가 발생합니다.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;

다음 코드는 earlyReturn 프로시저를 호출하여 오류를 반환합니다.

CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

하위 작업이 완료되기 전에 상위 작업을 완료하고 하위 작업 취소하기

다음 예제에서, earlyCancelJob 프로시저는 비동기 하위 작업을 실행하여 테이블에 데이터를 삽입하고 완료하는 데 10초가 걸립니다. 그러나 상위 작업 async_handler 는 하위 작업이 완료되기 전에 반환되어 하위 작업이 취소됩니다.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;

다음 코드는 earlyCancelJob 프로시저를 호출합니다. 그런 다음 test_tb 테이블을 쿼리하며, 이 작업은 취소된 하위 작업에서 삽입된 데이터가 없기 때문에 결과를 반환하지 않습니다.

CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

파일 및 자산 읽기

IMPORTS를 사용하여 정적으로 지정된 파일 읽기

IMPORTS 명령의 CREATE PROCEDURE 절에 파일 이름과 스테이지 이름을 지정하여 파일을 읽을 수 있습니다.

IMPORTS 절에 파일을 지정하는 경우 Snowflake는 해당 파일을 스테이지에서 저장 프로시저의 *홈 디렉터리*(*가져오기 디렉터리*라고도 함)로 복사합니다. 이 디렉터리는 저장 프로시저가 파일을 읽는 디렉터리입니다.

Snowflake는 가져온 파일을 단일 디렉터리에 복사합니다. 해당 디렉터리의 모든 파일은 고유한 이름을 가져야 하므로 IMPORTS 절의 각 파일은 고유한 이름을 가져야 합니다. 이는 파일이 다른 스테이지 또는 스테이지 내의 다른 하위 디렉터리에서 시작하는 경우에도 적용됩니다.

다음 예에서는 file.txt 라는 스테이지에서 my_stage 라는 파일을 읽는 인라인 Python 처리기를 사용합니다. 처리기는 snowflake_import_directory 시스템 옵션과 함께 Python sys._xoptions 메서드를 사용하여 저장 프로시저의 홈 디렉터리 위치를 검색합니다.

Snowflake는 저장 프로시저 생성 중에 한 번만 파일을 읽으며 파일 읽기가 대상 처리기 외부에서 발생하는 경우 저장 프로시저 실행 중에 파일을 다시 읽지 않습니다.

인라인 처리기로 저장 프로시저를 생성합니다.

CREATE OR REPLACE PROCEDURE test_file_import_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/dir/file.txt')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys

def run(session):
  with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    return f.read()
$$;
CALL test_file_import_sp();
// return file content

IMPORTS를 사용하여 디렉터리 가져오기

CREATE PROCEDURE 명령의 IMPORTS 절을 사용하여 디렉터리를 가져올 수 있습니다.

참고

  • 디렉터리의 가져오기 경로 끝에는 후행 슬래시(/)가 있어야 합니다. 예: IMPORTS = ('@my_stage/my_dir/').

  • 가져올 때 디렉터리의 이름을 바꾸려면 스테이지 경로에 /=custom_name/``을 추가합니다. 사용자 지정 이름은 경로가 아닌 단일 디렉터리 이름이어야 합니다. 예: ``IMPORTS = ('@my_stage/my_dir/=custom_name/').

  • Native Apps에서는 디렉터리 가져오기가 지원되지 않습니다.

다음 예제에서는 my_stage`라는 스테이지에서 :code:`my_dir 디렉터리를 가져오고 해당 디렉터리 안에 포함된 파일을 나열합니다.

CREATE OR REPLACE PROCEDURE my_directory_import_list_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/my_dir/')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys
def list_files(directory):
  files = []
  # Walk through the directory and its subdirectories
  for dirpath, _, filenames in os.walk(directory):
    for filename in filenames:
      # Append the relative path to each file to the list
      full_path = os.path.join(dirpath, filename)
      files.append(os.path.relpath(full_path, directory))
  return files
def run(session):
  directory_path = sys._xoptions["snowflake_import_directory"]
  file_list = list_files(directory_path)
  file_list_str = ' '.join(file_list)
  return file_list_str
$$;
CALL my_directory_import_list_sp();