@@ -488,10 +488,24 @@ def to_bigquery(
488488 return str (job_config .destination )
489489
490490 with self ._query_generator () as query :
491- self ._execute_query (query , job_config , timeout )
491+ dest = job_config .destination
492+ # because setting destination for scripts is not valid
493+ # remove destination attribute if provided
494+ job_config .destination = None
495+ bq_job = self ._execute_query (query , job_config , timeout )
492496
493- print (f"Done writing to '{ job_config .destination } '." )
494- return str (job_config .destination )
497+ if not job_config .dry_run :
498+ config = bq_job .to_api_repr ()["configuration" ]
499+ # get temp table created by BQ
500+ tmp_dest = config ["query" ]["destinationTable" ]
501+ temp_dest_table = f"{ tmp_dest ['projectId' ]} .{ tmp_dest ['datasetId' ]} .{ tmp_dest ['tableId' ]} "
502+
503+ # persist temp table
504+ sql = f"CREATE TABLE { dest } AS SELECT * FROM { temp_dest_table } "
505+ self ._execute_query (sql , timeout = timeout )
506+
507+ print (f"Done writing to '{ dest } '." )
508+ return str (dest )
495509
496510 def _to_arrow_internal (self ) -> pyarrow .Table :
497511 with self ._query_generator () as query :
@@ -777,7 +791,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
777791 Compute a deterministic hash for the `left_table_query_string` that will be used throughout
778792 all the logic as the field to GROUP BY the data
779793*/
780- WITH entity_dataframe AS (
794+ CREATE TEMP TABLE entity_dataframe AS (
781795 SELECT *,
782796 {{entity_df_event_timestamp_col}} AS entity_timestamp
783797 {% for featureview in featureviews %}
@@ -793,95 +807,95 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
793807 {% endif %}
794808 {% endfor %}
795809 FROM `{{ left_table_query_string }}`
796- ),
810+ );
797811
798812{% for featureview in featureviews %}
799-
800- {{ featureview.name }}__entity_dataframe AS (
801- SELECT
802- {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
803- entity_timestamp,
804- {{featureview.name}}__entity_row_unique_id
805- FROM entity_dataframe
806- GROUP BY
807- {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
808- entity_timestamp,
809- {{featureview.name}}__entity_row_unique_id
810- ),
811-
812- /*
813- This query template performs the point-in-time correctness join for a single feature set table
814- to the provided entity table.
815-
816- 1. We first join the current feature_view to the entity dataframe that has been passed.
817- This JOIN has the following logic:
818- - For each row of the entity dataframe, only keep the rows where the `timestamp_field`
819- is less than the one provided in the entity dataframe
820- - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
821- is higher the the one provided minus the TTL
822- - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
823- computed previously
824-
825- The output of this CTE will contain all the necessary information and already filtered out most
826- of the data that is not relevant.
827- */
828-
829- {{ featureview.name }}__subquery AS (
830- SELECT
831- {{ featureview.timestamp_field }} as event_timestamp,
832- {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
833- {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
834- {% for feature in featureview.features %}
835- {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
836- {% endfor %}
837- FROM {{ featureview.table_subquery }}
838- WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
839- {% if featureview.ttl == 0 %}{% else %}
840- AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
841- {% endif %}
842- ),
843-
844- {{ featureview.name }}__base AS (
845- SELECT
846- subquery.*,
847- entity_dataframe.entity_timestamp,
848- entity_dataframe.{{featureview.name}}__entity_row_unique_id
849- FROM {{ featureview.name }}__subquery AS subquery
850- INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
851- ON TRUE
852- AND subquery.event_timestamp <= entity_dataframe.entity_timestamp
853-
813+ CREATE TEMP TABLE {{ featureview.name }}__cleaned AS (
814+ WITH {{ featureview.name }}__entity_dataframe AS (
815+ SELECT
816+ {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
817+ entity_timestamp,
818+ {{featureview.name}}__entity_row_unique_id
819+ FROM entity_dataframe
820+ GROUP BY
821+ {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
822+ entity_timestamp,
823+ {{featureview.name}}__entity_row_unique_id
824+ ),
825+
826+ /*
827+ This query template performs the point-in-time correctness join for a single feature set table
828+ to the provided entity table.
829+
830+ 1. We first join the current feature_view to the entity dataframe that has been passed.
831+ This JOIN has the following logic:
832+ - For each row of the entity dataframe, only keep the rows where the `timestamp_field`
833+ is less than the one provided in the entity dataframe
834+ - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
835+ is higher the the one provided minus the TTL
836+ - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
837+ computed previously
838+
839+ The output of this CTE will contain all the necessary information and already filtered out most
840+ of the data that is not relevant.
841+ */
842+
843+ {{ featureview.name }}__subquery AS (
844+ SELECT
845+ {{ featureview.timestamp_field }} as event_timestamp,
846+ {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
847+ {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
848+ {% for feature in featureview.features %}
849+ {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
850+ {% endfor %}
851+ FROM {{ featureview.table_subquery }}
852+ WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
854853 {% if featureview.ttl == 0 %}{% else %}
855- AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
854+ AND {{ featureview.timestamp_field }} >= ' {{ featureview.min_event_timestamp }}'
856855 {% endif %}
856+ ),
857+
858+ {{ featureview.name }}__base AS (
859+ SELECT
860+ subquery.*,
861+ entity_dataframe.entity_timestamp,
862+ entity_dataframe.{{featureview.name}}__entity_row_unique_id
863+ FROM {{ featureview.name }}__subquery AS subquery
864+ INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
865+ ON TRUE
866+ AND subquery.event_timestamp <= entity_dataframe.entity_timestamp
867+
868+ {% if featureview.ttl == 0 %}{% else %}
869+ AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
870+ {% endif %}
857871
858- {% for entity in featureview.entities %}
859- AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
860- {% endfor %}
861- ),
862-
863- /*
864- 2. If the `created_timestamp_column` has been set, we need to
865- deduplicate the data first. This is done by calculating the
866- `MAX(created_at_timestamp)` for each event_timestamp.
867- We then join the data on the next CTE
868- */
869- {% if featureview.created_timestamp_column %}
870- {{ featureview.name }}__dedup AS (
871- SELECT
872- {{featureview.name}}__entity_row_unique_id,
873- event_timestamp,
874- MAX(created_timestamp) as created_timestamp
875- FROM {{ featureview.name }}__base
876- GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
877- ),
878- {% endif %}
872+ {% for entity in featureview.entities %}
873+ AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
874+ {% endfor %}
875+ ),
876+
877+ /*
878+ 2. If the `created_timestamp_column` has been set, we need to
879+ deduplicate the data first. This is done by calculating the
880+ `MAX(created_at_timestamp)` for each event_timestamp.
881+ We then join the data on the next CTE
882+ */
883+ {% if featureview.created_timestamp_column %}
884+ {{ featureview.name }}__dedup AS (
885+ SELECT
886+ {{featureview.name}}__entity_row_unique_id,
887+ event_timestamp,
888+ MAX(created_timestamp) as created_timestamp
889+ FROM {{ featureview.name }}__base
890+ GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
891+ ),
892+ {% endif %}
879893
880- /*
881- 3. The data has been filtered during the first CTE "*__base"
882- Thus we only need to compute the latest timestamp of each feature.
883- */
884- {{ featureview.name }}__latest AS (
894+ /*
895+ 3. The data has been filtered during the first CTE "*__base"
896+ Thus we only need to compute the latest timestamp of each feature.
897+ */
898+ {{ featureview.name }}__latest AS (
885899 SELECT
886900 event_timestamp,
887901 {% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
@@ -900,13 +914,13 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
900914 {% endif %}
901915 )
902916 WHERE row_number = 1
903- ),
917+ )
904918
905919/*
906920 4. Once we know the latest value of each feature for a given timestamp,
907921 we can join again the data back to the original "base" dataset
908922*/
909- {{ featureview.name }}__cleaned AS (
923+
910924 SELECT base.*
911925 FROM {{ featureview.name }}__base as base
912926 INNER JOIN {{ featureview.name }}__latest
@@ -917,7 +931,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
917931 ,created_timestamp
918932 {% endif %}
919933 )
920- ){% if loop.last %}{% else %}, {% endif %}
934+ );
921935
922936
923937{% endfor %}
0 commit comments