Describe the bug
We are building dwh with Databricks premium on GCS and decided to use dbt core 1.3.2 (databricks adapter 1.3.2) for ELT pipelines and documentation. So far in production we have developed just 1 model pattern (incremental "merge" updates for staging tables in databricks DeltaLake). ELT pipeline triggers dbt run command with --select option to incrementally update relevant staging tables. However, from time to time, absolutely randomly some runs throws "[Errno 110] Connection timed out" error for random models in selection. This is happening with no pattern, sometimes everything runs fine, sometimes not. Interestingly enough, we also tested this behaviour with Databricks workflows (to check for possible connectivity reasons behind this) and figured out the same behaviour even when dbt is run on databricks dbt cli cluster as per documentation here. What's even more interesting is that problem gets amplified "exponentially" if we use more than 1 thread in profile (now we have to run models sequentially to minimize errors). This problem is becoming a big concern for us about further use of dbt with databricks.. We have also considered "retry_all: true" mechanism described for spark adapter, but it does not seem to be triggered for these types of errors.
dbt run --select tag:pg_230_deals --vars '{"env": "prod", "data_interval_start": 1673103600, "data_interval_end": 1673190000}'
....
03:36:13.436533 [info ] [MainThread]: Finished running 14 incremental models in 0 hours 33 minutes and 48.86 seconds (2028.86s).
03:36:13.436860 [debug] [MainThread]: Connection 'master' was properly closed.
03:36:13.436988 [debug] [MainThread]: Connection 'model.databricks.deals__pg_230__tradestone_tp_real_cry_1__mt5_deals' was properly closed.
03:36:13.459647 [info ] [MainThread]:
03:36:13.460106 [info ] [MainThread]: Completed with 1 error and 0 warnings:
03:36:13.460478 [info ] [MainThread]:
03:36:13.460738 [error] [MainThread]: Runtime Error in model deals__pg_230__fbs_mt5_real_1__mt5_deals (models/staging/pg_230/deals/deals__pg_230__fbs_mt5_real_1__mt5_deals.sql)
03:36:13.461018 [error] [MainThread]: Error during request to server: [Errno 110] Connection timed out
03:36:13.461290 [info ] [MainThread]:
03:36:13.461510 [info ] [MainThread]: Done. PASS=13 WARN=0 ERROR=1 SKIP=0 TOTAL=14
Steps To Reproduce
- Use multithreading in profiles.yml (threads = 5+)
- Create several (10+) models
- Run them selectively (--select)
if required, i can share full github repo for our project.
This is out typical model for staging tables with the same schema:
# models.yml
version: 2
models:
- name: deals__pg_230__fbs_b100_real_1__mt5_deals
description: "dealing replica pg 230, table: fbs_b100_real_1.mt5_deals"
config:
materialized: incremental
incremental_strategy: merge
file_format: delta
on_schema_change: fail
unique_key: deal
schema: staging
- name: deals__pg_230__fbs_levelup_real_2__mt5_deals
description: "dealing replica pg 230, table: fbs_levelup_real_2.mt5_deals"
config:
tags: ['pg_230_deals']
materialized: incremental
incremental_strategy: merge
file_format: delta
on_schema_change: fail
unique_key: deal
schema: staging
...
# deals__pg_230__fbs_b100_real_1__mt5_deals.sql
{{ config(
pre_hook="CREATE TABLE if not exists {{ source('dw', 'deals__pg_230__fbs_b100_real_1__mt5_deals') }} USING parquet LOCATION 'gs://staging-databricks-{{ var('env') }}/pg_230__deals/fbs_b100_real_1__mt5_deals/'",
) }}
with src as
(
select
*,
row_number() over (partition by deal order by timestamp desc) rn
from {{ source('dw', 'deals__pg_230__fbs_b100_real_1__mt5_deals') }}
)
select *
from src
{% if is_incremental() %}
where rn = 1 and timestamp between '{{ var('data_interval_start') }}' and '{{ var('data_interval_end') }}'
{% endif %}
# sources.yml
version: 2
sources:
- name: dw
description: dw temporary tables or external tables in gcs
tables:
- name: deals__pg_230__fbs_b100_real_1__mt5_deals
- name: deals__pg_230__fbs_levelup_real_1__mt5_deals
...
Expected behavior
Expecting more robust and predictable processing of dbt run command with multithreading and clear instruction on how to make operational error handling for intermittent [Errno 110] connectivity errors.
Screenshots and log output
If applicable, add screenshots or log output to help explain your problem.
System information
The output of dbt --version:
Core:
- installed: 1.3.2
- latest: 1.3.2 - Up to date!
Plugins:
- spark: 1.3.0 - Up to date!
- databricks: 1.3.2 - Up to date!
The operating system you're using:
The output of python --version:
Python 3.8.10
Additional context
Add any other context about the problem here.
Describe the bug
We are building dwh with Databricks premium on GCS and decided to use dbt core 1.3.2 (databricks adapter 1.3.2) for ELT pipelines and documentation. So far in production we have developed just 1 model pattern (incremental "merge" updates for staging tables in databricks DeltaLake). ELT pipeline triggers dbt run command with --select option to incrementally update relevant staging tables. However, from time to time, absolutely randomly some runs throws "[Errno 110] Connection timed out" error for random models in selection. This is happening with no pattern, sometimes everything runs fine, sometimes not. Interestingly enough, we also tested this behaviour with Databricks workflows (to check for possible connectivity reasons behind this) and figured out the same behaviour even when dbt is run on databricks dbt cli cluster as per documentation here. What's even more interesting is that problem gets amplified "exponentially" if we use more than 1 thread in profile (now we have to run models sequentially to minimize errors). This problem is becoming a big concern for us about further use of dbt with databricks.. We have also considered "retry_all: true" mechanism described for spark adapter, but it does not seem to be triggered for these types of errors.
Steps To Reproduce
if required, i can share full github repo for our project.
This is out typical model for staging tables with the same schema:
Expected behavior
Expecting more robust and predictable processing of dbt run command with multithreading and clear instruction on how to make operational error handling for intermittent [Errno 110] connectivity errors.
Screenshots and log output
If applicable, add screenshots or log output to help explain your problem.
System information
The output of
dbt --version:The operating system you're using:
The output of
python --version:Python 3.8.10
Additional context
Add any other context about the problem here.