|
7 | 7 | import pandas as pd |
8 | 8 | import pyarrow as pa |
9 | 9 | import pyarrow.parquet as pq |
10 | | -from tenacity import retry, retry_if_exception_type, wait_exponential |
| 10 | +from tenacity import ( |
| 11 | + retry, |
| 12 | + retry_if_exception_type, |
| 13 | + stop_after_attempt, |
| 14 | + stop_after_delay, |
| 15 | + wait_exponential, |
| 16 | +) |
11 | 17 |
|
12 | 18 | from feast.errors import RedshiftCredentialsError, RedshiftQueryError |
13 | 19 | from feast.type_map import pa_to_redshift_value_type |
@@ -53,6 +59,7 @@ def get_bucket_and_key(s3_path: str) -> Tuple[str, str]: |
53 | 59 | @retry( |
54 | 60 | wait=wait_exponential(multiplier=1, max=4), |
55 | 61 | retry=retry_if_exception_type(ConnectionClosedError), |
| 62 | + stop=stop_after_attempt(5), |
56 | 63 | ) |
57 | 64 | def execute_redshift_statement_async( |
58 | 65 | redshift_data_client, cluster_id: str, database: str, user: str, query: str |
@@ -88,6 +95,7 @@ class RedshiftStatementNotFinishedError(Exception): |
88 | 95 | @retry( |
89 | 96 | wait=wait_exponential(multiplier=1, max=30), |
90 | 97 | retry=retry_if_exception_type(RedshiftStatementNotFinishedError), |
| 98 | + stop=stop_after_delay(300), # 300 seconds |
91 | 99 | ) |
92 | 100 | def wait_for_redshift_statement(redshift_data_client, statement: dict) -> None: |
93 | 101 | """Waits for the Redshift statement to finish. Raises RedshiftQueryError if the statement didn't succeed. |
|
0 commit comments