1414
1515
1616import enum
17- import re
1817from typing import Callable , Dict , Iterable , Optional , Tuple
1918
2019from pyarrow .parquet import ParquetFile
@@ -371,7 +370,7 @@ class DataSource:
371370
372371 def __init__ (
373372 self ,
374- event_timestamp_column : str ,
373+ event_timestamp_column : Optional [ str ] = "" ,
375374 created_timestamp_column : Optional [str ] = "" ,
376375 field_mapping : Optional [Dict [str , str ]] = None ,
377376 date_partition_column : Optional [str ] = "" ,
@@ -520,45 +519,11 @@ def to_proto(self) -> DataSourceProto:
520519 """
521520 raise NotImplementedError
522521
523- def _infer_event_timestamp_column (self , ts_column_type_regex_pattern ):
524- ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
525- USER_GUIDANCE = "Please specify event_timestamp_column explicitly."
526-
527- if isinstance (self , FileSource ) or isinstance (self , BigQuerySource ):
528- event_timestamp_column , matched_flag = None , False
529- for col_name , col_datatype in self .get_table_column_names_and_types ():
530- if re .match (ts_column_type_regex_pattern , col_datatype ):
531- if matched_flag :
532- raise TypeError (
533- f"""
534- { ERROR_MSG_PREFIX } due to multiple possible columns satisfying
535- the criteria. { USER_GUIDANCE }
536- """
537- )
538- matched_flag = True
539- event_timestamp_column = col_name
540- if matched_flag :
541- return event_timestamp_column
542- else :
543- raise TypeError (
544- f"""
545- { ERROR_MSG_PREFIX } due to an absence of columns that satisfy the criteria.
546- { USER_GUIDANCE }
547- """
548- )
549- else :
550- raise TypeError (
551- f"""
552- { ERROR_MSG_PREFIX } because this DataSource currently does not support this inference.
553- { USER_GUIDANCE }
554- """
555- )
556-
557522
558523class FileSource (DataSource ):
559524 def __init__ (
560525 self ,
561- event_timestamp_column : Optional [str ] = None ,
526+ event_timestamp_column : Optional [str ] = "" ,
562527 file_url : Optional [str ] = None ,
563528 path : Optional [str ] = None ,
564529 file_format : FileFormat = None ,
@@ -598,7 +563,7 @@ def __init__(
598563 self ._file_options = FileOptions (file_format = file_format , file_url = file_url )
599564
600565 super ().__init__ (
601- event_timestamp_column or self . _infer_event_timestamp_column ( r"^timestamp" ) ,
566+ event_timestamp_column ,
602567 created_timestamp_column ,
603568 field_mapping ,
604569 date_partition_column ,
@@ -662,7 +627,7 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
662627class BigQuerySource (DataSource ):
663628 def __init__ (
664629 self ,
665- event_timestamp_column : Optional [str ] = None ,
630+ event_timestamp_column : Optional [str ] = "" ,
666631 table_ref : Optional [str ] = None ,
667632 created_timestamp_column : Optional [str ] = "" ,
668633 field_mapping : Optional [Dict [str , str ]] = None ,
@@ -672,8 +637,7 @@ def __init__(
672637 self ._bigquery_options = BigQueryOptions (table_ref = table_ref , query = query )
673638
674639 super ().__init__ (
675- event_timestamp_column
676- or self ._infer_event_timestamp_column ("TIMESTAMP|DATETIME" ),
640+ event_timestamp_column ,
677641 created_timestamp_column ,
678642 field_mapping ,
679643 date_partition_column ,
@@ -743,20 +707,12 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
743707 from google .cloud import bigquery
744708
745709 client = bigquery .Client ()
746- name_type_pairs = []
747710 if self .table_ref is not None :
748- project_id , dataset_id , table_id = self .table_ref .split ("." )
749- bq_columns_query = f"""
750- SELECT COLUMN_NAME, DATA_TYPE FROM { project_id } .{ dataset_id } .INFORMATION_SCHEMA.COLUMNS
751- WHERE TABLE_NAME = '{ table_id } '
752- """
753- table_schema = (
754- client .query (bq_columns_query ).result ().to_dataframe_iterable ()
755- )
756- for df in table_schema :
757- name_type_pairs .extend (
758- list (zip (df ["COLUMN_NAME" ].to_list (), df ["DATA_TYPE" ].to_list ()))
759- )
711+ table_schema = client .get_table (self .table_ref ).schema
712+ if not isinstance (table_schema [0 ], bigquery .schema .SchemaField ):
713+ raise TypeError ("Could not parse BigQuery table schema." )
714+
715+ name_type_pairs = [(field .name , field .field_type ) for field in table_schema ]
760716 else :
761717 bq_columns_query = f"SELECT * FROM ({ self .query } ) LIMIT 1"
762718 queryRes = client .query (bq_columns_query ).result ()
0 commit comments