From 8a1a3107aa42c51f21b3b049e7644b95074ef70f Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sun, 25 Apr 2021 17:00:15 +0800 Subject: [PATCH 01/19] Add Synapse Launcher Add Synapse Launcher (with corresponding python dependency and required changes in different files) --- python/feast_spark/constants.py | 15 + python/feast_spark/pyspark/launcher.py | 13 + .../pyspark/launchers/synapse/__init__.py | 13 + .../pyspark/launchers/synapse/synapse.py | 285 ++++++++++++++++++ .../launchers/synapse/synapse_utils.py | 265 ++++++++++++++++ tests/e2e/conftest.py | 7 +- tests/e2e/fixtures/client.py | 33 ++ 7 files changed, 630 insertions(+), 1 deletion(-) create mode 100644 python/feast_spark/pyspark/launchers/synapse/__init__.py create mode 100644 python/feast_spark/pyspark/launchers/synapse/synapse.py create mode 100644 python/feast_spark/pyspark/launchers/synapse/synapse_utils.py diff --git a/python/feast_spark/constants.py b/python/feast_spark/constants.py index 428a0c03..160b0937 100644 --- a/python/feast_spark/constants.py +++ b/python/feast_spark/constants.py @@ -93,6 +93,21 @@ class ConfigOptions(metaclass=ConfigMeta): # SparkApplication resource template SPARK_K8S_JOB_TEMPLATE_PATH = None + # Synapse dev url + AZURE_SYNAPSE_DEV_URL: Optional[str] = None + + # Synapse pool name + AZURE_SYNAPSE_POOL_NAME: Optional[str] = None + + # Datalake directory that linked to Synapse + AZURE_SYNAPSE_DATALAKE_DIR: Optional[str] = None + + # Synapse pool executor size: Small, Medium or Large + AZURE_SYNAPSE_EXECUTOR_SIZE = "Small" + + # Synapse pool executor count + AZURE_SYNAPSE_EXECUTORS = "2" + #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" diff --git a/python/feast_spark/pyspark/launcher.py b/python/feast_spark/pyspark/launcher.py index e86f02d4..22fb7c15 100644 --- a/python/feast_spark/pyspark/launcher.py +++ b/python/feast_spark/pyspark/launcher.py @@ -83,11 +83,24 @@ def _k8s_launcher(config: Config) -> JobLauncher: ) +def _synapse_launcher(config: Config) -> JobLauncher: + from feast_spark.pyspark.launchers import synapse + + return synapse.SynapseJobLauncher( + synapse_dev_url=config.get(opt.AZURE_SYNAPSE_DEV_URL), + pool_name=config.get(opt.AZURE_SYNAPSE_POOL_NAME), + datalake_dir=config.get(opt.AZURE_SYNAPSE_DATALAKE_DIR), + executor_size=config.get(opt.AZURE_SYNAPSE_EXECUTOR_SIZE), + executors=int(config.get(opt.AZURE_SYNAPSE_EXECUTORS)) + ) + + _launchers = { "standalone": _standalone_launcher, "dataproc": _dataproc_launcher, "emr": _emr_launcher, "k8s": _k8s_launcher, + 'synapse': _synapse_launcher, } diff --git a/python/feast_spark/pyspark/launchers/synapse/__init__.py b/python/feast_spark/pyspark/launchers/synapse/__init__.py new file mode 100644 index 00000000..363312a1 --- /dev/null +++ b/python/feast_spark/pyspark/launchers/synapse/__init__.py @@ -0,0 +1,13 @@ +from .synapse import ( + SynapseBatchIngestionJob, + SynapseJobLauncher, + SynapseRetrievalJob, + SynapseStreamIngestionJob, +) + +__all__ = [ + "SynapseRetrievalJob", + "SynapseBatchIngestionJob", + "SynapseStreamIngestionJob", + "SynapseJobLauncher", +] diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py new file mode 100644 index 00000000..63733ac1 --- /dev/null +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -0,0 +1,285 @@ +import time +from datetime import datetime +from typing import List, Optional, cast + +from azure.synapse.spark.models import SparkBatchJob + +from feast_spark.pyspark.abc import ( + BatchIngestionJob, + BatchIngestionJobParameters, + JobLauncher, + RetrievalJob, + RetrievalJobParameters, + SparkJob, + SparkJobFailure, + SparkJobStatus, + StreamIngestionJob, + StreamIngestionJobParameters, +) + +from .synapse_utils import ( + HISTORICAL_RETRIEVAL_JOB_TYPE, + LABEL_JOBTYPE, + LABEL_FEATURE_TABLE, + METADATA_JOBHASH, + METADATA_OUTPUT_URI, + OFFLINE_TO_ONLINE_JOB_TYPE, + STREAM_TO_ONLINE_JOB_TYPE, + SynapseJobRunner, + DataLakeFiler, + _prepare_job_tags, + _job_feast_state, + _job_start_time, + _cancel_job_by_id, + _get_job_by_id, + _list_jobs, + _submit_job, +) + + +class SynapseJobMixin: + def __init__(self, api: SynapseJobRunner, job_id: int): + self._api = api + self._job_id = job_id + + def get_id(self) -> str: + return self._job_id + + def get_status(self) -> SparkJobStatus: + job = _get_job_by_id(self._api, self._job_id) + assert job is not None + return _job_feast_state(job) + + def get_start_time(self) -> datetime: + job = _get_job_by_id(self._api, self._job_id) + assert job is not None + return _job_start_time(job) + + def cancel(self): + _cancel_job_by_id(self._api, self._job_id) + + def _wait_for_complete(self, timeout_seconds: Optional[float]) -> bool: + """ Returns true if the job completed successfully """ + start_time = time.time() + while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds): + status = self.get_status() + if status == SparkJobStatus.COMPLETED: + return True + elif status == SparkJobStatus.FAILED: + return False + else: + time.sleep(1) + else: + raise TimeoutError("Timeout waiting for job to complete") + + +class SynapseRetrievalJob(SynapseJobMixin, RetrievalJob): + """ + Historical feature retrieval job result for a synapse cluster + """ + + def __init__( + self, api: SynapseJobRunner, job_id: int, output_file_uri: str + ): + """ + This is the job object representing the historical retrieval job, returned by SynapseClusterLauncher. + + Args: + output_file_uri (str): Uri to the historical feature retrieval job output file. + """ + super().__init__(api, job_id) + self._output_file_uri = output_file_uri + + def get_output_file_uri(self, timeout_sec=None, block=True): + if not block: + return self._output_file_uri + + if self._wait_for_complete(timeout_sec): + return self._output_file_uri + else: + raise SparkJobFailure("Spark job failed") + + +class SynapseBatchIngestionJob(SynapseJobMixin, BatchIngestionJob): + """ + Ingestion job result for a synapse cluster + """ + + def __init__( + self, api: SynapseJobRunner, job_id: int, feature_table: str + ): + super().__init__(api, job_id) + self._feature_table = feature_table + + def get_feature_table(self) -> str: + return self._feature_table + + +class SynapseStreamIngestionJob(SynapseJobMixin, StreamIngestionJob): + """ + Ingestion streaming job for a synapse cluster + """ + + def __init__( + self, + api: SynapseJobRunner, + job_id: int, + job_hash: str, + feature_table: str, + ): + super().__init__(api, job_id) + self._job_hash = job_hash + self._feature_table = feature_table + + def get_hash(self) -> str: + return self._job_hash + + def get_feature_table(self) -> str: + return self._feature_table + + +class SynapseJobLauncher(JobLauncher): + """ + Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs. + """ + + def __init__( + self, + synapse_dev_url: str, + pool_name: str, + datalake_dir: str, + executor_size: str, + executors: int + ): + self._api = SynapseJobRunner(synapse_dev_url, pool_name, executor_size = executor_size, executors = executors) + self._datalake = DataLakeFiler(datalake_dir) + + def _job_from_job_info(self, job_info: SparkBatchJob) -> SparkJob: + job_type = job_info.tags[LABEL_JOBTYPE] + if job_type == HISTORICAL_RETRIEVAL_JOB_TYPE: + assert METADATA_OUTPUT_URI in job_info.tags + return SynapseRetrievalJob( + api=self._api, + job_id=job_info.id, + output_file_uri=job_info.tags[METADATA_OUTPUT_URI], + ) + elif job_type == OFFLINE_TO_ONLINE_JOB_TYPE: + return SynapseBatchIngestionJob( + api=self._api, + job_id=job_info.id, + feature_table=job_info.tags.get(LABEL_FEATURE_TABLE, ""), + ) + elif job_type == STREAM_TO_ONLINE_JOB_TYPE: + # job_hash must not be None for stream ingestion jobs + assert METADATA_JOBHASH in job_info.tags + return SynapseStreamIngestionJob( + api=self._api, + job_id=job_info.id, + job_hash=job_info.tags[METADATA_JOBHASH], + feature_table=job_info.tags.get(LABEL_FEATURE_TABLE, ""), + ) + else: + # We should never get here + raise ValueError(f"Unknown job type {job_type}") + + def historical_feature_retrieval( + self, job_params: RetrievalJobParameters + ) -> RetrievalJob: + """ + Submits a historical feature retrieval job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + RetrievalJob: wrapper around remote job that returns file uri to the result file. + """ + + main_file = self._datalake.upload_file(job_params.get_main_file_path()) + job_info = _submit_job(self._api, "Historical-Retrieval", main_file, + arguments = job_params.get_arguments(), + tags = {LABEL_JOBTYPE: HISTORICAL_RETRIEVAL_JOB_TYPE, + METADATA_OUTPUT_URI: job_params.get_destination_path()}) + + return cast(RetrievalJob, self._job_from_job_info(job_info)) + + def offline_to_online_ingestion( + self, ingestion_job_params: BatchIngestionJobParameters + ) -> BatchIngestionJob: + """ + Submits a batch ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + BatchIngestionJob: wrapper around remote job that can be used to check when job completed. + """ + + main_file = self._datalake.upload_file(ingestion_job_params.get_main_file_path()) + + print(main_file, ingestion_job_params.get_main_file_path()) + print(ingestion_job_params.get_class_name()) + # for arg in ingestion_job_params.get_arguments(): + # print(len(arg), arg) + # args = [x.replace(' ', '') for x in ingestion_job_params.get_arguments()] + # print(args) + + job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, + main_class = ingestion_job_params.get_class_name(), + arguments = ingestion_job_params.get_arguments(), + tags = _prepare_job_tags(ingestion_job_params, OFFLINE_TO_ONLINE_JOB_TYPE)) + + return cast(BatchIngestionJob, self._job_from_job_info(job_info)) + + def start_stream_to_online_ingestion( + self, ingestion_job_params: StreamIngestionJobParameters + ) -> StreamIngestionJob: + """ + Starts a stream ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + StreamIngestionJob: wrapper around remote job. + """ + + main_file = self._datalake.upload_file(ingestion_job_params.get_main_file_path()) + + extra_jar_paths: List[str] = [] + for extra_jar in ingestion_job_params.get_extra_jar_paths(): + extra_jar_paths.append(self._datalake.upload_file(extra_jar)) + + tags = _prepare_job_tags(ingestion_job_params, STREAM_TO_ONLINE_JOB_TYPE) + tags[METADATA_JOBHASH] = ingestion_job_params.get_job_hash() + job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, + main_class = ingestion_job_params.get_class_name(), + arguments = ingestion_job_params.get_arguments(), + reference_files = extra_jar_paths, + tags = tags) + + return cast(StreamIngestionJob, self._job_from_job_info(job_info)) + + def get_job_by_id(self, job_id: int) -> SparkJob: + job_info = _get_job_by_id(self._api, job_id) + if job_info is None: + raise KeyError(f"Job iwth id {job_id} not found") + else: + return self._job_from_job_info(job_info) + + def list_jobs( + self, + include_terminated: bool, + project: Optional[str] = None, + table_name: Optional[str] = None, + ) -> List[SparkJob]: + return [ + self._job_from_job_info(job) + for job in _list_jobs(self._api, project, table_name) + if include_terminated + or _job_feast_state(job) not in (SparkJobStatus.COMPLETED, SparkJobStatus.FAILED) + ] diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py new file mode 100644 index 00000000..edff919f --- /dev/null +++ b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py @@ -0,0 +1,265 @@ +import os +import re +import hashlib +import urllib.request +from datetime import datetime +from typing import Any, Dict, List, Optional + +from azure.identity import DefaultAzureCredential + +from azure.synapse.spark import SparkClient +from azure.synapse.spark.models import SparkBatchJobOptions, SparkBatchJob + +from azure.storage.filedatalake import DataLakeServiceClient + +from feast_spark.pyspark.abc import SparkJobStatus, RetrievalJobParameters,BQ_SPARK_PACKAGE + +__all__ = [ + "_cancel_job_by_id", + "_prepare_job_tags", + "_list_jobs", + "_get_job_by_id", + "_generate_project_table_hash", + "STREAM_TO_ONLINE_JOB_TYPE", + "OFFLINE_TO_ONLINE_JOB_TYPE", + "HISTORICAL_RETRIEVAL_JOB_TYPE", + "METADATA_JOBHASH", + "METADATA_OUTPUT_URI", +] + +STREAM_TO_ONLINE_JOB_TYPE = "STREAM_TO_ONLINE_JOB" +OFFLINE_TO_ONLINE_JOB_TYPE = "OFFLINE_TO_ONLINE_JOB" +HISTORICAL_RETRIEVAL_JOB_TYPE = "HISTORICAL_RETRIEVAL_JOB" + +LABEL_JOBID = "feast.dev/jobid" +LABEL_JOBTYPE = "feast.dev/type" +LABEL_FEATURE_TABLE = "feast.dev/table" +LABEL_FEATURE_TABLE_HASH = "feast.dev/tablehash" +LABEL_PROJECT = "feast.dev/project" + +# Can't store these bits of info due to 64-character limit, so we store them as +# sparkConf +METADATA_OUTPUT_URI = "dev.feast.outputuri" +METADATA_JOBHASH = "dev.feast.jobhash" + + +def _generate_project_table_hash(project: str, table_name: str) -> str: + return hashlib.md5(f"{project}:{table_name}".encode()).hexdigest() + + +def _truncate_label(label: str) -> str: + return label[:63] + + +def _prepare_job_tags(job_params, job_type: str) -> Dict[str, Any]: + """ Prepare Synapse job tags """ + return {LABEL_JOBTYPE:job_type, + LABEL_FEATURE_TABLE: _truncate_label( + job_params.get_feature_table_name() + ), + LABEL_FEATURE_TABLE_HASH: _generate_project_table_hash( + job_params.get_project(), + job_params.get_feature_table_name(), + ), + LABEL_PROJECT: job_params.get_project() + } + + +STATE_MAP = { + "": SparkJobStatus.STARTING, + "not_started": SparkJobStatus.STARTING, + 'starting': SparkJobStatus.STARTING, + "running": SparkJobStatus.IN_PROGRESS, + "success": SparkJobStatus.COMPLETED, + "dead": SparkJobStatus.FAILED, + "killed": SparkJobStatus.FAILED, + "Uncertain": SparkJobStatus.IN_PROGRESS, + "Succeeded": SparkJobStatus.COMPLETED, + "Failed": SparkJobStatus.FAILED, + "Cancelled": SparkJobStatus.FAILED, +} + + +def _job_feast_state(job: SparkBatchJob) -> SparkJobStatus: + return STATE_MAP[job.state] + + +def _job_start_time(job: SparkBatchJob) -> datetime: + return job.scheduler.scheduled_at + + +EXECUTOR_SIZE = {'Small': {'Cores': 4, 'Memory': '28g'}, 'Medium': {'Cores': 8, 'Memory': '56g'}, + 'Large': {'Cores': 16, 'Memory': '112g'}} + + +def categorized_files(reference_files): + if reference_files == None: + return None, None + + files = [] + jars = [] + for file in reference_files: + file = file.strip() + if file.endswith(".jar"): + jars.append(file) + else: + files.append(file) + return files, jars + + +class SynapseJobRunner(object): + def __init__(self, synapse_dev_url, spark_pool_name, credential = None, executor_size = 'Small', executors = 2): + if credential is None: + credential = DefaultAzureCredential() + + self.client = SparkClient( + credential=credential, + endpoint=synapse_dev_url, + spark_pool_name=spark_pool_name + ) + + self._executor_size = executor_size + self._executors = executors + + def get_spark_batch_job(self, job_id): + + return self.client.spark_batch.get_spark_batch_job(job_id, detailed=True) + + def get_spark_batch_jobs(self): + + return self.client.spark_batch.get_spark_batch_jobs(detailed=True) + + def cancel_spark_batch_job(self, job_id): + + return self.client.spark_batch.cancel_spark_batch_job(job_id) + + def create_spark_batch_job(self, job_name, main_definition_file, class_name = None, + arguments=None, reference_files=None, archives=None, configuration=None, tags=None): + + file = main_definition_file + + files, jars = categorized_files(reference_files) + driver_cores = EXECUTOR_SIZE[self._executor_size]['Cores'] + driver_memory = EXECUTOR_SIZE[self._executor_size]['Memory'] + executor_cores = EXECUTOR_SIZE[self._executor_size]['Cores'] + executor_memory = EXECUTOR_SIZE[self._executor_size]['Memory'] + + # hard coded for test + arguments = ['--feature-table', '{"features": [{"name": "avg_daily_trips", "type": "INT32"}, {"name": "conv_rate", "type": "FLOAT"}, {"name": "acc_rate", "type": "FLOAT"}], "project": "default", "name": "driver_statistics", "entities": [{"name": "driver_id", "type": "INT64"}], "max_age": null, "labels": "test"}', '--source', '{"file": {"field_mapping": "test", "event_timestamp_column": "datetime", "created_timestamp_column": "created", "date_partition_column": "date", "path": "wasbs://feasttest@feaststore.blob.core.windows.net/driver_statistics", "format": {"json_class": "ParquetFormat"}}}', '--redis', '{"host": "localhost", "port": 6379, "ssl": false}', '--mode', 'offline', '--start', '2020/10/10T00:00:00', '--end', '2020-10-20T00:00:00'] + # SDK source code is here: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/synapse/azure-synapse + # Exact code is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114 + spark_batch_job_options = SparkBatchJobOptions( + tags=tags, + name=job_name, + file=file, + class_name=class_name, + arguments=arguments, + jars=jars, + files=files, + archives=archives, + configuration=configuration, + driver_memory=driver_memory, + driver_cores=driver_cores, + executor_memory=executor_memory, + executor_cores=executor_cores, + executor_count=self._executors) + + print("Final input argument:", arguments, type(arguments)) + + return self.client.spark_batch.create_spark_batch_job(spark_batch_job_options, detailed=True) + + +class DataLakeFiler(object): + def __init__(self, datalake_dir, credential = None): + datalake = list(filter(None, re.split('/|@', datalake_dir))) + assert len(datalake) >= 3 + + if credential is None: + credential = DefaultAzureCredential() + + account_url = "https://" + datalake[2] + datalake_client = DataLakeServiceClient( + credential=credential, + account_url=account_url + ).get_file_system_client(datalake[1]) + + if len(datalake) > 3: + datalake_client = datalake_client.get_directory_client('/'.join(datalake[3:])) + datalake_client.create_directory() + + self.datalake_dir = datalake_dir + '/' if datalake_dir[-1] != '/' else datalake_dir + self.dir_client = datalake_client + + def upload_file(self, local_file): + + file_name = os.path.basename(local_file) + file_client = self.dir_client.create_file(file_name) + + if local_file.startswith('http'): + # remote_file = local_file + # local_file = './' + file_name + # urllib.request.urlretrieve(remote_file, local_file) + with urllib.request.urlopen(local_file) as f: + data = f.read() + file_client.append_data(data, 0, len(data)) + file_client.flush_data(len(data)) + else: + with open(local_file, 'r') as f: + data = f.read() + file_client.append_data(data, 0, len(data)) + file_client.flush_data(len(data)) + + return self.datalake_dir + file_name + + +def _submit_job( + api: SynapseJobRunner, + name: str, + main_file: str, + main_class = None, + arguments = None, + reference_files = None, + tags = None +) -> SparkBatchJob: + return api.create_spark_batch_job(name, main_file, class_name = main_class, arguments = arguments, + reference_files = reference_files, tags = tags) + + +def _list_jobs( + api: SynapseJobRunner, + project: Optional[str] = None, + table_name: Optional[str] = None, +) -> List[SparkBatchJob]: + + job_infos = api.get_spark_batch_jobs() + + # Batch, Streaming Ingestion jobs + if project and table_name: + result = [] + table_name_hash = _generate_project_table_hash(project, table_name) + for job_info in job_infos: + if LABEL_FEATURE_TABLE_HASH in job_info.tags: + if table_name_hash == job_info.tags[LABEL_FEATURE_TABLE_HASH]: + result.append(job_info) + elif project: + result = [] + for job_info in job_infos: + if LABEL_PROJECT in job_info.tags: + if project == job_info.tags[LABEL_PROJECT]: + result.append(job_info) + else: + result = job_infos + + return result + + +def _get_job_by_id( + api: SynapseJobRunner, + job_id: int +) -> Optional[SparkBatchJob]: + return api.get_spark_batch_job(job_id) + + +def _cancel_job_by_id(api: SynapseJobRunner, job_id: int): + api.cancel_spark_batch_job(job_id) + diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 3bdebb90..e4099e01 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -10,7 +10,7 @@ def pytest_addoption(parser): parser.addoption("--kafka-brokers", action="store", default="localhost:9092") parser.addoption( - "--env", action="store", help="local|aws|gcloud|k8s", default="local" + "--env", action="store", help="local|aws|gcloud|k8s|synapse", default="local" ) parser.addoption("--with-job-service", action="store_true") parser.addoption("--staging-path", action="store") @@ -22,6 +22,11 @@ def pytest_addoption(parser): parser.addoption("--dataproc-executor-instances", action="store", default="2") parser.addoption("--dataproc-executor-cores", action="store", default="2") parser.addoption("--dataproc-executor-memory", action="store", default="2g") + parser.addoption("--azure-synapse-dev-url", action="store", default="") + parser.addoption("--azure-synapse-pool-name", action="store", default="") + parser.addoption("--azure-synapse-datalake-dir", action="store", default="") + parser.addoption("--azure-blob-account-name", action="store", default="") + parser.addoption("--azure-blob-account-access-key", action="store", default="") parser.addoption("--ingestion-jar", action="store") parser.addoption("--redis-url", action="store", default="localhost:6379") parser.addoption("--redis-cluster", action="store_true") diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py index 401fb113..162713da 100644 --- a/tests/e2e/fixtures/client.py +++ b/tests/e2e/fixtures/client.py @@ -105,6 +105,24 @@ def feast_client( ), enable_auth=pytestconfig.getoption("enable_auth"), ) + elif pytestconfig.getoption("env") == "synapse": + return Client( + core_url=f"{feast_core[0]}:{feast_core[1]}", + serving_url=f"{feast_serving[0]}:{feast_serving[1]}", + spark_launcher="synapse", + azure_synapse_dev_url = pytestconfig.getoption("azure_synapse_dev_url"), + azure_synapse_pool_name = pytestconfig.getoption("azure_synapse_pool_name"), + azure_synapse_datalake_dir = pytestconfig.getoption("azure_synapse_datalake_dir"), + spark_staging_location=os.path.join(local_staging_path, "synapse"), + azure_blob_account_name=pytestconfig.getoption("azure_blob_account_name"), + azure_blob_account_access_key=pytestconfig.getoption("azure_blob_account_access_key"), + spark_ingestion_jar=ingestion_job_jar, + redis_host=pytestconfig.getoption("redis_url").split(":")[0], + redis_port=pytestconfig.getoption("redis_url").split(":")[1], + historical_feature_output_location=os.path.join( + local_staging_path, "historical_output" + ), + ) else: raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}") @@ -186,6 +204,21 @@ def tfrecord_feast_client( local_staging_path, "historical_output" ), ) + elif pytestconfig.getoption("env") == "synapse": + return Client( + core_url=f"{feast_core[0]}:{feast_core[1]}", + spark_launcher="synapse", + azure_synapse_dev_url = pytestconfig.getoption("azure_synapse_dev_url"), + azure_synapse_pool_name = pytestconfig.getoption("azure_synapse_pool_name"), + azure_synapse_datalake_dir = pytestconfig.getoption("azure_synapse_datalake_dir"), + spark_staging_location=os.path.join(local_staging_path, "synapse"), + azure_blob_account_name=pytestconfig.getoption("azure_blob_account_name"), + azure_blob_account_access_key=pytestconfig.getoption("azure_blob_account_access_key"), + historical_feature_output_format="tfrecord", + historical_feature_output_location=os.path.join( + local_staging_path, "historical_output" + ), + ) else: raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}") From 8c06c45b3b92367eb4573934903de22fe407382d Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 26 Apr 2021 08:30:08 +0800 Subject: [PATCH 02/19] Remove unnecessary printout Remove unnecessary printout --- python/feast_spark/pyspark/launchers/synapse/synapse.py | 8 -------- .../pyspark/launchers/synapse/synapse_utils.py | 7 +------ 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 63733ac1..5de07eed 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -219,14 +219,6 @@ def offline_to_online_ingestion( """ main_file = self._datalake.upload_file(ingestion_job_params.get_main_file_path()) - - print(main_file, ingestion_job_params.get_main_file_path()) - print(ingestion_job_params.get_class_name()) - # for arg in ingestion_job_params.get_arguments(): - # print(len(arg), arg) - # args = [x.replace(' ', '') for x in ingestion_job_params.get_arguments()] - # print(args) - job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, main_class = ingestion_job_params.get_class_name(), arguments = ingestion_job_params.get_arguments(), diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py index edff919f..61b9d3fb 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py @@ -144,10 +144,7 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No executor_cores = EXECUTOR_SIZE[self._executor_size]['Cores'] executor_memory = EXECUTOR_SIZE[self._executor_size]['Memory'] - # hard coded for test - arguments = ['--feature-table', '{"features": [{"name": "avg_daily_trips", "type": "INT32"}, {"name": "conv_rate", "type": "FLOAT"}, {"name": "acc_rate", "type": "FLOAT"}], "project": "default", "name": "driver_statistics", "entities": [{"name": "driver_id", "type": "INT64"}], "max_age": null, "labels": "test"}', '--source', '{"file": {"field_mapping": "test", "event_timestamp_column": "datetime", "created_timestamp_column": "created", "date_partition_column": "date", "path": "wasbs://feasttest@feaststore.blob.core.windows.net/driver_statistics", "format": {"json_class": "ParquetFormat"}}}', '--redis', '{"host": "localhost", "port": 6379, "ssl": false}', '--mode', 'offline', '--start', '2020/10/10T00:00:00', '--end', '2020-10-20T00:00:00'] - # SDK source code is here: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/synapse/azure-synapse - # Exact code is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114 + # Exact API definition is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114 spark_batch_job_options = SparkBatchJobOptions( tags=tags, name=job_name, @@ -164,8 +161,6 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No executor_cores=executor_cores, executor_count=self._executors) - print("Final input argument:", arguments, type(arguments)) - return self.client.spark_batch.create_spark_batch_job(spark_batch_job_options, detailed=True) From b3fd983f437093997f62e140c6bfd1bc56ff2387 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sat, 8 May 2021 00:50:20 -0700 Subject: [PATCH 03/19] Add eventhub support --- spark/ingestion/pom.xml | 14 ++++++++++++++ .../main/scala/feast/ingestion/IngestionJob.scala | 7 ++++--- .../scala/feast/ingestion/IngestionJobConfig.scala | 13 ++++++++++++- .../scala/feast/ingestion/StreamingPipeline.scala | 9 +++++++++ 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/spark/ingestion/pom.xml b/spark/ingestion/pom.xml index 1fc560c5..d80d5645 100644 --- a/spark/ingestion/pom.xml +++ b/spark/ingestion/pom.xml @@ -51,6 +51,20 @@ ${protobuf.version} + + + org.glassfish + javax.el + 3.0.1-b08 + + + + com.microsoft.azure + azure-eventhubs-spark_2.12 + 2.3.18 + + + com.gojek stencil diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index 70520681..0ae4dc07 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -48,9 +48,10 @@ object IngestionJob { case (_, x) => x } .extract[Sources] match { - case Sources(file: Some[FileSource], _, _) => c.copy(source = file.get) - case Sources(_, bq: Some[BQSource], _) => c.copy(source = bq.get) - case Sources(_, _, kafka: Some[KafkaSource]) => c.copy(source = kafka.get) + case Sources(file: Some[FileSource], _, _, _) => c.copy(source = file.get) + case Sources(_, bq: Some[BQSource], _, _) => c.copy(source = bq.get) + case Sources(_, _, kafka: Some[KafkaSource], _) => c.copy(source = kafka.get) + case Sources(_, _, _, eventhub: Some[EventHubSource]) => c.copy(source = eventhub.get) } }) .required() diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 87150493..275dc1c5 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -90,10 +90,21 @@ case class KafkaSource( override val datePartitionColumn: Option[String] = None ) extends StreamingSource +case class EventHubSource( + bootstrapServers: String, + topic: String, + override val format: DataFormat, + override val fieldMapping: Map[String, String], + override val eventTimestampColumn: String, + override val createdTimestampColumn: Option[String] = None, + override val datePartitionColumn: Option[String] = None + ) extends StreamingSource + case class Sources( file: Option[FileSource] = None, bq: Option[BQSource] = None, - kafka: Option[KafkaSource] = None + kafka: Option[KafkaSource] = None, + eventhub: Option[EventHubSource] = None ) case class Field(name: String, `type`: feast.proto.types.ValueProto.ValueType.Enum) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index e7903ea5..6a72b7bb 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.functions.{expr, lit, struct, udf, coalesce} import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.types.BooleanType import org.apache.spark.{SparkEnv, SparkFiles} +import org.apache.spark.eventhubs._ /** * Streaming pipeline (currently in micro-batches mode only, since we need to have multiple sinks: redis & deadletters). @@ -59,6 +60,9 @@ object StreamingPipeline extends BasePipeline with Serializable { val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn) val metrics = new IngestionPipelineMetrics val validationUDF = createValidationUDF(sparkSession, config) + val connStr = "Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=xiaoyzhufeasttesteh" + val ehConf = EventHubsConf(connStr).setStartingPosition(EventPosition.fromStartOfStream) + val input = config.source match { case source: KafkaSource => @@ -67,6 +71,11 @@ object StreamingPipeline extends BasePipeline with Serializable { .option("kafka.bootstrap.servers", source.bootstrapServers) .option("subscribe", source.topic) .load() + case source: EventHubSource => + sparkSession.readStream + .format("eventhubs") + .options(ehConf.toMap) + .load() case source: MemoryStreamingSource => source.read } From 89f48a3bbe59cb01bea17c95901456fb5a7810ed Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 18 May 2021 19:13:38 -0700 Subject: [PATCH 04/19] Add EventHub support and Redis Auth support --- python/feast_spark/constants.py | 9 ++ python/feast_spark/pyspark/abc.py | 142 ++---------------- python/feast_spark/pyspark/launcher.py | 12 +- .../pyspark/launchers/synapse/synapse.py | 9 +- .../launchers/synapse/synapse_utils.py | 19 ++- 5 files changed, 48 insertions(+), 143 deletions(-) diff --git a/python/feast_spark/constants.py b/python/feast_spark/constants.py index bda11867..f51686a3 100644 --- a/python/feast_spark/constants.py +++ b/python/feast_spark/constants.py @@ -108,6 +108,12 @@ class ConfigOptions(metaclass=ConfigMeta): # Synapse pool executor count AZURE_SYNAPSE_EXECUTORS = "2" + # Azure EventHub Connection String (with Kafka API). See more details here: + # https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-migration-guide + # Code Sample is here: + # https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/tutorials/spark/sparkConsumer.scala + AZURE_EVENTHUB_KAFKA_CONNECTION_STRING = "" + #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" @@ -123,6 +129,9 @@ class ConfigOptions(metaclass=ConfigMeta): #: Enable or disable TLS/SSL to Redis REDIS_SSL: Optional[str] = "False" + #: Auth string for redis + REDIS_AUTH: str = "" + #: BigTable Project ID BIGTABLE_PROJECT: Optional[str] = "" diff --git a/python/feast_spark/pyspark/abc.py b/python/feast_spark/pyspark/abc.py index 6d02597a..60e3f9e5 100644 --- a/python/feast_spark/pyspark/abc.py +++ b/python/feast_spark/pyspark/abc.py @@ -329,13 +329,9 @@ def __init__( feature_table: Dict, source: Dict, jar: str, - redis_host: Optional[str] = None, - redis_port: Optional[int] = None, - redis_ssl: Optional[bool] = None, - bigtable_project: Optional[str] = None, - bigtable_instance: Optional[str] = None, - cassandra_host: Optional[str] = None, - cassandra_port: Optional[int] = None, + redis_host: str, + redis_port: int, + redis_ssl: bool, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, @@ -348,10 +344,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._bigtable_project = bigtable_project - self._bigtable_instance = bigtable_instance - self._cassandra_host = cassandra_host - self._cassandra_port = cassandra_port self._statsd_host = statsd_host self._statsd_port = statsd_port self._deadletter_path = deadletter_path @@ -361,14 +353,6 @@ def __init__( def _get_redis_config(self): return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl) - def _get_bigtable_config(self): - return dict( - project_id=self._bigtable_project, instance_id=self._bigtable_instance - ) - - def _get_cassandra_config(self): - return dict(host=self._cassandra_host, port=self._cassandra_port) - def _get_statsd_config(self): return ( dict(host=self._statsd_host, port=self._statsd_port) @@ -394,17 +378,10 @@ def get_arguments(self) -> List[str]: json.dumps(self._feature_table), "--source", json.dumps(self._source), + "--redis", + json.dumps(self._get_redis_config()), ] - if self._redis_host and self._redis_port: - args.extend(["--redis", json.dumps(self._get_redis_config())]) - - if self._bigtable_project and self._bigtable_instance: - args.extend(["--bigtable", json.dumps(self._get_bigtable_config())]) - - if self._cassandra_host and self._cassandra_port: - args.extend(["--cassandra", json.dumps(self._get_cassandra_config())]) - if self._get_statsd_config(): args.extend(["--statsd", json.dumps(self._get_statsd_config())]) @@ -433,13 +410,9 @@ def __init__( start: datetime, end: datetime, jar: str, - redis_host: Optional[str], - redis_port: Optional[int], - redis_ssl: Optional[bool], - bigtable_project: Optional[str], - bigtable_instance: Optional[str], - cassandra_host: Optional[str] = None, - cassandra_port: Optional[int] = None, + redis_host: str, + redis_port: int, + redis_ssl: bool, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, @@ -452,10 +425,6 @@ def __init__( redis_host, redis_port, redis_ssl, - bigtable_project, - bigtable_instance, - cassandra_host, - cassandra_port, statsd_host, statsd_port, deadletter_path, @@ -484,60 +453,6 @@ def get_arguments(self) -> List[str]: ] -class ScheduledBatchIngestionJobParameters(IngestionJobParameters): - def __init__( - self, - feature_table: Dict, - source: Dict, - ingestion_timespan: int, - cron_schedule: str, - jar: str, - redis_host: Optional[str], - redis_port: Optional[int], - redis_ssl: Optional[bool], - bigtable_project: Optional[str], - bigtable_instance: Optional[str], - cassandra_host: Optional[str] = None, - cassandra_port: Optional[int] = None, - statsd_host: Optional[str] = None, - statsd_port: Optional[int] = None, - deadletter_path: Optional[str] = None, - stencil_url: Optional[str] = None, - ): - super().__init__( - feature_table, - source, - jar, - redis_host, - redis_port, - redis_ssl, - bigtable_project, - bigtable_instance, - cassandra_host, - cassandra_port, - statsd_host, - statsd_port, - deadletter_path, - stencil_url, - ) - self._ingestion_timespan = ingestion_timespan - self._cron_schedule = cron_schedule - - def get_name(self) -> str: - return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" - - def get_job_type(self) -> SparkJobType: - return SparkJobType.SCHEDULED_BATCH_INGESTION - - def get_arguments(self) -> List[str]: - return super().get_arguments() + [ - "--mode", - "offline", - "--ingestion-timespan", - str(self._ingestion_timespan), - ] - - class StreamIngestionJobParameters(IngestionJobParameters): def __init__( self, @@ -545,20 +460,15 @@ def __init__( source: Dict, jar: str, extra_jars: List[str], - redis_host: Optional[str], - redis_port: Optional[int], - redis_ssl: Optional[bool], - bigtable_project: Optional[str], - bigtable_instance: Optional[str], - cassandra_host: Optional[str] = None, - cassandra_port: Optional[int] = None, + redis_host: str, + redis_port: int, + redis_ssl: bool, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, checkpoint_path: Optional[str] = None, stencil_url: Optional[str] = None, drop_invalid_rows: bool = False, - triggering_interval: Optional[int] = None, ): super().__init__( feature_table, @@ -567,10 +477,6 @@ def __init__( redis_host, redis_port, redis_ssl, - bigtable_project, - bigtable_instance, - cassandra_host, - cassandra_port, statsd_host, statsd_port, deadletter_path, @@ -579,7 +485,6 @@ def __init__( ) self._extra_jars = extra_jars self._checkpoint_path = checkpoint_path - self._triggering_interval = triggering_interval def get_name(self) -> str: return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" @@ -595,8 +500,6 @@ def get_arguments(self) -> List[str]: args.extend(["--mode", "online"]) if self._checkpoint_path: args.extend(["--checkpoint-path", self._checkpoint_path]) - if self._triggering_interval: - args.extend(["--triggering-interval", str(self._triggering_interval)]) return args def get_job_hash(self) -> str: @@ -691,29 +594,6 @@ def offline_to_online_ingestion( """ raise NotImplementedError - @abc.abstractmethod - def schedule_offline_to_online_ingestion( - self, ingestion_job_params: ScheduledBatchIngestionJobParameters - ): - """ - Submits a scheduled batch ingestion job to a Spark cluster. - - Raises: - SparkJobFailure: The spark job submission failed, encountered error - during execution, or timeout. - - Returns: - ScheduledBatchIngestionJob: wrapper around remote job that can be used to check when job completed. - """ - raise NotImplementedError - - @abc.abstractmethod - def unschedule_offline_to_online_ingestion(self, project: str, feature_table: str): - """ - Unschedule a scheduled batch ingestion job. - """ - raise NotImplementedError - @abc.abstractmethod def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters diff --git a/python/feast_spark/pyspark/launcher.py b/python/feast_spark/pyspark/launcher.py index b4c2b963..85da6cff 100644 --- a/python/feast_spark/pyspark/launcher.py +++ b/python/feast_spark/pyspark/launcher.py @@ -364,11 +364,9 @@ def get_stream_to_online_ingestion_params( source=_source_to_argument(feature_table.stream_source, client.config), feature_table=_feature_table_to_argument(client, project, feature_table), redis_host=client.config.get(opt.REDIS_HOST), - redis_port=bool(client.config.get(opt.REDIS_HOST)) - and client.config.getint(opt.REDIS_PORT), + redis_port=client.config.getint(opt.REDIS_PORT), redis_ssl=client.config.getboolean(opt.REDIS_SSL), - bigtable_project=client.config.get(opt.BIGTABLE_PROJECT), - bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE), + redis_auth=client.config.get(opt.REDIS_AUTH), statsd_host=client.config.getboolean(opt.STATSD_ENABLED) and client.config.get(opt.STATSD_HOST), statsd_port=client.config.getboolean(opt.STATSD_ENABLED) @@ -377,11 +375,9 @@ def get_stream_to_online_ingestion_params( checkpoint_path=client.config.get(opt.CHECKPOINT_PATH), stencil_url=client.config.get(opt.STENCIL_URL), drop_invalid_rows=client.config.get(opt.INGESTION_DROP_INVALID_ROWS), - triggering_interval=client.config.getint( - opt.SPARK_STREAMING_TRIGGERING_INTERVAL, default=None - ), - ) + kafka_sasl_auth=client.config.get(opt.AZURE_EVENTHUB_KAFKA_CONNECTION_STRING), + ) def start_stream_to_online_ingestion( client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str] diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 5de07eed..13b2cd5c 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -219,10 +219,17 @@ def offline_to_online_ingestion( """ main_file = self._datalake.upload_file(ingestion_job_params.get_main_file_path()) + print(main_file, ingestion_job_params.get_main_file_path()) + print(ingestion_job_params.get_class_name()) + # for arg in ingestion_job_params.get_arguments(): + # print(len(arg), arg) + # args = [x.replace(' ', '') for x in ingestion_job_params.get_arguments()] + # print(args) + job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, main_class = ingestion_job_params.get_class_name(), arguments = ingestion_job_params.get_arguments(), - tags = _prepare_job_tags(ingestion_job_params, OFFLINE_TO_ONLINE_JOB_TYPE)) + tags = _prepare_job_tags(ingestion_job_params, OFFLINE_TO_ONLINE_JOB_TYPE),configuration=None) return cast(BatchIngestionJob, self._job_from_job_info(job_info)) diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py index 61b9d3fb..716b25b5 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py @@ -4,6 +4,7 @@ import urllib.request from datetime import datetime from typing import Any, Dict, List, Optional +from azure.core.configuration import Configuration from azure.identity import DefaultAzureCredential @@ -144,7 +145,11 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No executor_cores = EXECUTOR_SIZE[self._executor_size]['Cores'] executor_memory = EXECUTOR_SIZE[self._executor_size]['Memory'] - # Exact API definition is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114 + # SDK source code is here: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/synapse/azure-synapse + # Exact code is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114 + + arguments = [elem.replace("}", " }") for elem in arguments] + spark_batch_job_options = SparkBatchJobOptions( tags=tags, name=job_name, @@ -160,7 +165,14 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No executor_memory=executor_memory, executor_cores=executor_cores, executor_count=self._executors) + + # print("spark_batch_job_options", spark_batch_job_options) + # print("arguments", arguments, type(arguments)) + + # print(tags,job_name,file,class_name,arguments,jars,files,archives,configuration,driver_memory,driver_cores,executor_memory,executor_cores,self._executors) + print("Final input argument:", arguments) + return self.client.spark_batch.create_spark_batch_job(spark_batch_job_options, detailed=True) @@ -214,10 +226,11 @@ def _submit_job( main_class = None, arguments = None, reference_files = None, - tags = None + tags = None, + configuration = None, ) -> SparkBatchJob: return api.create_spark_batch_job(name, main_file, class_name = main_class, arguments = arguments, - reference_files = reference_files, tags = tags) + reference_files = reference_files, tags = tags, configuration=configuration) def _list_jobs( From f2cd8be5d003612f9d89dad811b991d96370b4fb Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 18 May 2021 19:21:00 -0700 Subject: [PATCH 05/19] Adding EventHub support in Spark jobs --- .../scala/feast/ingestion/BasePipeline.scala | 3 +- .../scala/feast/ingestion/IngestionJob.scala | 17 +++++++- .../feast/ingestion/IngestionJobConfig.scala | 3 +- .../feast/ingestion/StreamingPipeline.scala | 40 +++++++++++++------ 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala index d88e900d..2e2456db 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala @@ -33,11 +33,12 @@ object BasePipeline { val conf = new SparkConf() jobConfig.store match { - case RedisConfig(host, port, ssl) => + case RedisConfig(host, port, auth, ssl) => conf .set("spark.redis.host", host) .set("spark.redis.port", port.toString) .set("spark.redis.ssl", ssl.toString) + .set("spark.redis.auth", auth.toString) case BigTableConfig(projectId, instanceId) => conf .set("spark.bigtable.projectId", projectId) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index 0ae4dc07..20ec8e13 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -21,6 +21,8 @@ import org.joda.time.{DateTime, DateTimeZone} import org.json4s._ import org.json4s.ext.JavaEnumNameSerializer import org.json4s.jackson.JsonMethods.{parse => parseJSON} +import org.json4s.ext.JavaEnumNameSerializer +import scala.collection.mutable.ArrayBuffer object IngestionJob { import Modes._ @@ -116,8 +118,21 @@ object IngestionJob { .action((x, c) => c.copy(streamingTriggeringSecs = x)) } + opt[String](name = "kafka_sasl_auth") + .action((x, c) => c.copy(kafkaSASL = Some(x))) + } + def main(args: Array[String]): Unit = { - parser.parse(args, IngestionJobConfig()) match { + println("Debug... Received following argument:") + println(args.toList) + val args_modified = new Array[String](args.length) + for ( i <- 0 to (args_modified.length - 1)) { + args_modified(i) = args(i).replace(" }", "}"); + args_modified(i) = args_modified(i).replace("\\", "\\\""); + } + println("Remove additional spaces in args:") + println(args_modified.toList) + parser.parse(args_modified, IngestionJobConfig()) match { case Some(config) => println(s"Starting with config $config") config.mode match { diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 275dc1c5..90f43b04 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -130,7 +130,7 @@ case class IngestionJobConfig( source: Source = null, startTime: DateTime = DateTime.now(), endTime: DateTime = DateTime.now(), - store: StoreConfig = RedisConfig("localhost", 6379, false), + store: StoreConfig = RedisConfig("localhost", 6379, "", false), metrics: Option[MetricConfig] = None, deadLetterPath: Option[String] = None, stencilURL: Option[String] = None, @@ -138,4 +138,5 @@ case class IngestionJobConfig( validationConfig: Option[ValidationConfig] = None, doNotIngestInvalidRows: Boolean = false, checkpointPath: Option[String] = None + kafkaSASL: Option[String] = None ) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index 6a72b7bb..9a31eb04 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -37,6 +37,8 @@ import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.types.BooleanType import org.apache.spark.{SparkEnv, SparkFiles} import org.apache.spark.eventhubs._ +import org.apache.kafka.common.security.plain.PlainLoginModule +import org.apache.kafka.common.security.JaasContext /** * Streaming pipeline (currently in micro-batches mode only, since we need to have multiple sinks: redis & deadletters). @@ -60,24 +62,36 @@ object StreamingPipeline extends BasePipeline with Serializable { val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn) val metrics = new IngestionPipelineMetrics val validationUDF = createValidationUDF(sparkSession, config) - val connStr = "Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=xiaoyzhufeasttesteh" - val ehConf = EventHubsConf(connStr).setStartingPosition(EventPosition.fromStartOfStream) + val EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=driver_trips\";" val input = config.source match { case source: KafkaSource => - sparkSession.readStream - .format("kafka") - .option("kafka.bootstrap.servers", source.bootstrapServers) - .option("subscribe", source.topic) - .load() - case source: EventHubSource => - sparkSession.readStream - .format("eventhubs") - .options(ehConf.toMap) - .load() + if (config.kafkaSASL.nonEmpty) + { + // if we have authentication enabled + sparkSession.readStream + .format("kafka") + .option("subscribe", source.topic) + .option("kafka.bootstrap.servers", source.bootstrapServers) + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.jaas.config", config.kafkaSASL.get) + .option("kafka.request.timeout.ms", "60000") + .option("kafka.session.timeout.ms", "60000") + .option("failOnDataLoss", "false") + .load() + } + else + { + sparkSession.readStream + .format("kafka") + .option("kafka.bootstrap.servers", source.bootstrapServers) + .option("subscribe", source.topic) + .load() + } case source: MemoryStreamingSource => - source.read + source.read } val parsed = config.source.asInstanceOf[StreamingSource].format match { From 53d7e20e3176cb2202d9f7ceb50d90237166f9c5 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 31 Aug 2021 09:48:43 +0800 Subject: [PATCH 06/19] add ScheduledBatchIngestionJobParameters --- python/feast_spark/pyspark/abc.py | 56 +++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/python/feast_spark/pyspark/abc.py b/python/feast_spark/pyspark/abc.py index 60e3f9e5..6e33b9f8 100644 --- a/python/feast_spark/pyspark/abc.py +++ b/python/feast_spark/pyspark/abc.py @@ -452,6 +452,62 @@ def get_arguments(self) -> List[str]: self._end.strftime("%Y-%m-%dT%H:%M:%S"), ] +class ScheduledBatchIngestionJobParameters(IngestionJobParameters): + def __init__( + self, + feature_table: Dict, + source: Dict, + ingestion_timespan: int, + cron_schedule: str, + jar: str, + redis_host: Optional[str], + redis_port: Optional[int], + redis_ssl: Optional[bool], + bigtable_project: Optional[str], + bigtable_instance: Optional[str], + cassandra_host: Optional[str] = None, + cassandra_port: Optional[int] = None, + statsd_host: Optional[str] = None, + statsd_port: Optional[int] = None, + deadletter_path: Optional[str] = None, + stencil_url: Optional[str] = None, + ): + super().__init__( + feature_table, + source, + jar, + redis_host, + redis_port, + redis_ssl, + bigtable_project, + bigtable_instance, + cassandra_host, + cassandra_port, + statsd_host, + statsd_port, + deadletter_path, + stencil_url, + ) + self._ingestion_timespan = ingestion_timespan + self._cron_schedule = cron_schedule + + def get_name(self) -> str: + return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" + + def get_job_type(self) -> SparkJobType: + return SparkJobType.SCHEDULED_BATCH_INGESTION + + def get_job_schedule(self) -> str: + return self._cron_schedule + + def get_arguments(self) -> List[str]: + return super().get_arguments() + [ + "--mode", + "offline", + "--ingestion-timespan", + str(self._ingestion_timespan), + ] + class StreamIngestionJobParameters(IngestionJobParameters): def __init__( From c469ee7dbd8f7289fa497cdf8d2287597491828e Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Thu, 2 Sep 2021 08:42:31 +0800 Subject: [PATCH 07/19] Add Azure specific dependencies --- python/setup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/setup.py b/python/setup.py index ea68f668..92dbb63c 100644 --- a/python/setup.py +++ b/python/setup.py @@ -52,6 +52,9 @@ "grpcio-tools==1.31.0", "mypy-protobuf==2.5", "croniter==1.*", + "azure-synapse-spark", + "azure-identity", + "azure-storage", ] # README file from Feast repo root directory From 91c68226dcb1c59df269738ec0b282a96f6dfe62 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Thu, 2 Sep 2021 08:44:50 +0800 Subject: [PATCH 08/19] Change azure storage dependencies --- python/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/setup.py b/python/setup.py index 92dbb63c..354bdf7a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -54,7 +54,8 @@ "croniter==1.*", "azure-synapse-spark", "azure-identity", - "azure-storage", + "azure-storage-file-datalake", + "azure-storage-blob", ] # README file from Feast repo root directory From 0a7a56ccbc78751316dad18d963ee449c2a6fa3b Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Thu, 9 Sep 2021 07:26:13 +0000 Subject: [PATCH 09/19] Commen for removing/adding spaces between brackets --- feature_store_debug.py | 558 ++++++++++++++++++ python/feast_spark/pyspark/abc.py | 10 +- .../pyspark/launchers/synapse/synapse.py | 26 +- .../launchers/synapse/synapse_utils.py | 23 +- python/setup.py | 1 + .../scala/feast/ingestion/IngestionJob.scala | 7 +- 6 files changed, 602 insertions(+), 23 deletions(-) create mode 100644 feature_store_debug.py diff --git a/feature_store_debug.py b/feature_store_debug.py new file mode 100644 index 00000000..5d11d774 --- /dev/null +++ b/feature_store_debug.py @@ -0,0 +1,558 @@ +# To add a new cell, type '# %%' +# To add a new markdown cell, type '# %% [markdown]' +# %% [markdown] +# # Ride Hailing Example +# %% [markdown] +# ![chart](https://feaststore.blob.core.windows.net/feastjar/FeastArchitectureNew.png) + +# %% +import io +import json +import os +from datetime import datetime +from urllib.parse import urlparse + +import avro.schema +import feast_spark +import numpy as np +import pandas as pd +import pyarrow.parquet as pq +import pytz +from avro.io import BinaryEncoder, DatumWriter +from azure.identity import ClientSecretCredential, DefaultAzureCredential +from azure.storage.filedatalake import DataLakeServiceClient +from confluent_kafka import Producer +from feast import Client, Entity, Feature, FeatureTable, ValueType +from feast.data_format import AvroFormat, ParquetFormat +from feast.data_source import FileSource, KafkaSource +from pyarrow.parquet import ParquetDataset +from google.protobuf.duration_pb2 import Duration +# %% [markdown] +# ## Introduction +# %% [markdown] +# For this demo, we will: +# +# 1. Register two driver features, one for driver statistics, the other for driver trips. Driver statistics are updated on daily basis, whereas driver trips are updated in real time. +# 2. Creates a driver dataset, then use Feast SDK to retrieve the features corresponding to these drivers from an offline store. +# 3. Store the features in an online store (Redis), and retrieve the features via Feast SDK. +# %% [markdown] +# ## Features Registry (Feast Core) +# %% [markdown] +# ### Configuration +# %% [markdown] +# Configurations can be provided in three different methods: + +# %% +# get_historical_features will return immediately once the Spark job has been submitted succesfully. +os.environ["FEAST_SPARK_LAUNCHER"] = "synapse" +os.environ["FEAST_SPARK_HOME"] = "/usr/local/spark" +os.environ["FEAST_azure_synapse_dev_url"] = "https://xiaoyzhuspark3synapse.dev.azuresynapse.net" +os.environ["FEAST_azure_synapse_pool_name"] = "xiaoyzhuspark3" + +# the datalake dir is the same with this one os.environ["FEAST_SPARK_STAGING_LOCATION"] = "wasbs://feasttest@feaststore.blob.core.windows.net/artifacts/" +os.environ["FEAST_AZURE_SYNAPSE_DATALAKE_DIR"] = "abfss://feastsparkstagingprivate@xiaoyzhusynapse.dfs.core.windows.net/feast" +os.environ["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = "abfss://feastsparkstagingprivate@xiaoyzhusynapse.dfs.core.windows.net/feast/out" +os.environ["FEAST_SPARK_STAGING_LOCATION"] = "wasbs://feasttest@feaststore.blob.core.windows.net/artifacts/" +os.environ["FEAST_SPARK_INGESTION_JAR"] = "https://feaststore.blob.core.windows.net/feastjar/feast-ingestion-spark-latest.jar" + +# Redis Config +os.environ["FEAST_REDIS_HOST"] = "feastredistest.redis.cache.windows.net" +os.environ["FEAST_REDIS_PORT"] = "6380" +os.environ["FEAST_REDIS_SSL"] = "true" +os.environ["FEAST_REDIS_AUTH"] = "ruWBZ6WZsjUk5lEnDirM9JGoV1UgtMFbAO5lWoRY1QQ=" + +# EventHub config +os.environ["FEAST_AZURE_EVENTHUB_KAFKA_CONNECTION_STRING"] = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=driver_trips\";" + +os.environ["FEAST_AZURE_BLOB_ACCOUNT_NAME"] = "feaststore" +os.environ["FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY"] = "0V7PybxIprcykx3UEfygMTgRIn7pBH794KymizfArYArlB9OsQoVua32iJc5SkSpJZDPoFzDw4lAC2jGIuvAfg==" +os.environ["FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT"] = "parquet" + +# Xiaoyong's app, name: ml-auth-xiaoyzhu +os.environ['AZURE_CLIENT_ID'] = 'b92d6810-7e28-4380-89d8-103ad00e9acd' +os.environ['AZURE_TENANT_ID'] = '72f988bf-86f1-41af-91ab-2d7cd011db47' +os.environ['AZURE_CLIENT_SECRET'] = '_l5mz-6vj2c97Nb0YeNfs0Axa-Yhd0Q~u_' + + +# %% +# Using environmental variables +import os +os.environ["FEAST_CORE_URL"] = "20.62.162.242:6565" +os.environ["FEAST_SERVING_URL"] = "20.62.162.242:6566" + +# Provide a map during client initialization +# options = { +# "CORE_URL": "core:6565", +# "SERVING_URL": "online_serving:6566", +# } +# client = Client(options) + +# As keyword arguments, without the `FEAST` prefix +# client = Client(core_url="core:6565", serving_url="online_serving:6566") + +# %% [markdown] +# If you are following the quick start guide, all required configurations to follow the remainder of the tutorial should have been setup, in the form of environmental variables, as showned below. The configuration values may differ depending on the environment. For a full list of configurable values and explanation, please refer to the user guide. + +# %% +import os +from pprint import pprint +pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")}) + +# %% [markdown] +# ### Basic Imports and Feast Client initialization + +# %% +import os + +from feast import Client, Feature, Entity, ValueType, FeatureTable +from feast.data_source import FileSource, KafkaSource +from feast.data_format import ParquetFormat, AvroFormat + + +# %% +client = Client(core_url='13.66.211.41:6565') + +# %% [markdown] +# ### Declare Features and Entities +# %% [markdown] +# Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. + +# %% +driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64) + + +# %% +# Daily updated features +acc_rate = Feature("acc_rate", ValueType.FLOAT) +conv_rate = Feature("conv_rate", ValueType.FLOAT) +avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32) + +# Real-time updated features +trips_today = Feature("trips_today", ValueType.INT32) + +# %% [markdown] +# ```python +# FeatureTable( +# name = "driver_statistics", +# entities = ["driver_id"], +# features = [ +# acc_rate, +# conv_rate, +# avg_daily_trips +# ] +# +# ) +# ``` +# +# +# ```python +# FeatureTable( +# name = "driver_trips", +# entities = ["driver_id"], +# features = [ +# trips_today +# ] +# +# ) +# +# ``` +# %% [markdown] +# ![Features Join](https://raw.githubusercontent.com/feast-dev/feast-spark-examples/main/minimal/images/features-join.png) +# %% [markdown] +# ```python +# FeatureTable( +# ..., +# batch_source=FileSource( # Required +# file_format=ParquetFormat(), +# file_url="abfss://feast-demo-data-lake", +# ... +# ), +# stream_source=KafkaSource( # Optional +# bootstrap_servers="...", +# topic="driver_trips", +# ... +# ) +# ``` +# %% [markdown] +# Feature tables group the features together and describe how they can be retrieved. The following examples assume that the feature tables are stored on the local file system, and is accessible from the Spark cluster. If you have setup a GCP service account, you may use GCS instead as the file source. +# %% [markdown] +# `batch_source` defines where the historical features are stored. It is also possible to have an optional `stream_source`, which the feature values are delivered continuously. +# +# For now we will define only `batch_source` for both `driver_statistics` and `driver_trips`, and demonstrate the usage of `stream_source` in later part of the tutorial. + +# %% +# This is the location we're using for the offline feature store. + +import os +demo_data_location = "wasbs://feasttest@feaststore.blob.core.windows.net/" + + +# %% +driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics") + +driver_statistics = FeatureTable( + name = "driver_statistics", + entities = ["driver_id"], + features = [ + acc_rate, + conv_rate, + avg_daily_trips + ], + max_age=Duration(seconds=86400 * 1), + batch_source=FileSource( + event_timestamp_column="datetime", + created_timestamp_column="created", + file_format=ParquetFormat(), + file_url=driver_statistics_source_uri, + date_partition_column="date" + ) +) + + +# %% +driver_trips_source_uri = os.path.join(demo_data_location, "driver_trips") + + +driver_trips = FeatureTable( + name = "driver_trips", + entities = ["driver_id"], + features = [ + trips_today + ], + max_age=Duration(seconds=86400 * 1), + batch_source=FileSource( + event_timestamp_column="datetime", + created_timestamp_column="created", + file_format=ParquetFormat(), + file_url=driver_trips_source_uri, + date_partition_column="date" + ) +) + +# %% [markdown] +# ### Registering entities and feature tables in Feast Core + +# %% +client.apply(driver_id) +client.apply(driver_statistics) +client.apply(driver_trips) + + +# %% +print(client.get_feature_table("driver_statistics").to_yaml()) +print(client.get_feature_table("driver_trips").to_yaml()) + +# %% [markdown] +# ### Populating batch source +# %% [markdown] +# Feast is agnostic to how the batch source is populated, as long as it complies to the Feature Table specification. Therefore, any existing ETL tools can be used for the purpose of data ingestion. Alternatively, you can also use Feast SDK to ingest a Panda Dataframe to the batch source. + +# %% +import pandas as pd +import numpy as np +from datetime import datetime + + +# %% +def generate_entities(): + return np.random.choice(999999, size=100, replace=False) + + +# %% +def generate_trips(entities): + df = pd.DataFrame(columns=["driver_id", "trips_today", "datetime", "created"]) + df['driver_id'] = entities + df['trips_today'] = np.random.randint(0, 1000, size=100).astype(np.int32) + df['datetime'] = pd.to_datetime( + np.random.randint( + datetime(2020, 10, 10).timestamp(), + datetime(2020, 10, 20).timestamp(), + size=100), + unit="s" + ) + df['created'] = pd.to_datetime(datetime.now()) + return df + + + +# %% +def generate_stats(entities): + df = pd.DataFrame(columns=["driver_id", "conv_rate", "acc_rate", "avg_daily_trips", "datetime", "created"]) + df['driver_id'] = entities + df['conv_rate'] = np.random.random(size=100).astype(np.float32) + df['acc_rate'] = np.random.random(size=100).astype(np.float32) + df['avg_daily_trips'] = np.random.randint(0, 1000, size=100).astype(np.int32) + df['datetime'] = pd.to_datetime( + np.random.randint( + datetime(2020, 10, 10).timestamp(), + datetime(2020, 10, 20).timestamp(), + size=100), + unit="s" + ) + df['created'] = pd.to_datetime(datetime.now()) + return df + + +# %% +entities = generate_entities() +stats_df = generate_stats(entities) +trips_df = generate_trips(entities) + + +# %% +client.ingest(driver_statistics, stats_df) +client.ingest(driver_trips, trips_df) + +# %% [markdown] +# ## Historical Retrieval For Training +# %% [markdown] +# ### Point-in-time correction +# %% [markdown] +# ![Point In Time](https://raw.githubusercontent.com/feast-dev/feast-spark-examples/main/minimal/images/pit-1.png) +# %% [markdown] +# Feast joins the features to the entities based on the following conditions: +# +# 1. Entity primary key(s) value matches. +# 2. Feature event timestamp is the closest match possible to the entity event timestamp, +# but must not be more recent than the entity event timestamp, and the difference must +# not be greater than the maximum age specified in the feature table, unless the maximum age is not specified. +# 3. If more than one feature table rows satisfy condition 1 and 2, feature row with the +# most recent created timestamp will be chosen. +# 4. If none of the above conditions are satisfied, the feature rows will have null values. + +# %% +from pyarrow.parquet import ParquetDataset +from urllib.parse import urlparse + + +# %% +def read_parquet(uri): + parsed_uri = urlparse(uri) + if parsed_uri.scheme == "file": + return pd.read_parquet(parsed_uri.path) + elif parsed_uri.scheme == 'wasbs': + import adlfs + fs = adlfs.AzureBlobFileSystem( + account_name=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_NAME'), account_key=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY') + ) + uripath = parsed_uri.username + parsed_uri.path + files = fs.glob(uripath + '/part-*') + ds = ParquetDataset(files, filesystem=fs) + return ds.read().to_pandas() + elif parsed_uri.scheme == 'abfss': + credential = ClientSecretCredential(os.getenv('AZURE_TENANT_ID'), os.getenv('AZURE_CLIENT_ID'), os.getenv('AZURE_CLIENT_SECRET')) + # credential = DefaultAzureCredential() + datalake = parsed_uri.netloc.split('@') + service_client = DataLakeServiceClient(account_url="https://" + datalake[1], credential=credential) + file_system_client = service_client.get_file_system_client(datalake[0]) + file_client = file_system_client.get_file_client(parsed_uri.path) + data = file_client.download_file(0) + with io.BytesIO() as b: + data.readinto(b) + table = pq.read_table(b) + print(table) + return table + else: + raise ValueError(f"Unsupported URL scheme {uri}") + + +# %% +entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp']) +entities_with_timestamp['driver_id'] = np.random.choice(entities, 10, replace=False) +entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint( + datetime(2020, 10, 18).timestamp(), + datetime(2020, 10, 20).timestamp(), + size=10), unit='s') +entities_with_timestamp + + +# %% +job = feast_spark.Client(client).get_historical_features( + feature_refs=[ + "driver_statistics:avg_daily_trips", + "driver_statistics:conv_rate", + "driver_statistics:acc_rate", + "driver_trips:trips_today" + ], + entity_source=entities_with_timestamp +) + +# %% [markdown] +# ![Spark Job](https://feaststore.blob.core.windows.net/feastjar/SparkJobSubmission.PNG) +# + +# %% +# get_output_file_uri will block until the Spark job is completed. +# output_file_uri = job.get_output_file_uri() + +# print("output path:", output_file_uri) +# %% +# read_parquet(output_file_uri) + +# %% [markdown] +# The retrieved result can now be used for model training. +# %% [markdown] +# ## Populating Online Storage with Batch Ingestion +# %% [markdown] +# In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store. + +# %% +job = feast_spark.Client(client).start_offline_to_online_ingestion( + driver_statistics, + datetime(2020, 10, 10), + datetime(2020, 10, 20) +) + + +# %% +# It will take some time before the Spark Job is completed +job.get_status() + +# %% [markdown] +# Once the job is completed, the SDK can be used to retrieve the result from the online store. + +# %% +entities_sample = np.random.choice(entities, 10, replace=False) +entities_sample = [{"driver_id": e} for e in entities_sample] +entities_sample + + +# %% +features = client.get_online_features( + feature_refs=["driver_statistics:avg_daily_trips"], + entity_rows=entities_sample).to_dict() +features + + +# %% +# pd.DataFrame(features) + +# %% [markdown] +# The features can now be used as an input to the trained model. +# %% [markdown] +# ## Bonus: Ingestion from Streaming Source - EventHub +# %% [markdown] +# With a streaming source, we can use Feast SDK to launch a Spark streaming job that continuously update the online store. First, we will update `driver_trips` feature table such that a new streaming source is added. + +# %% + + + +# %% +import json +import pytz +import io +import avro.schema +from avro.io import BinaryEncoder, DatumWriter +from confluent_kafka import Producer + + +# %% +# Change this to any Kafka broker addresses which is accessible by the spark cluster +KAFKA_BROKER = "xiaoyzhufeasttest.servicebus.windows.net:9093" + + +# %% +avro_schema_json = json.dumps({ + "type": "record", + "name": "DriverTrips", + "fields": [ + {"name": "driver_id", "type": "long"}, + {"name": "trips_today", "type": "int"}, + { + "name": "datetime", + "type": {"type": "long", "logicalType": "timestamp-micros"}, + }, + ], +}) + + +# %% +kafka_topic = "driver_trips" +driver_trips.stream_source = KafkaSource( + event_timestamp_column="datetime", + created_timestamp_column="datetime", + bootstrap_servers=KAFKA_BROKER, + topic=kafka_topic, + message_format=AvroFormat(avro_schema_json) +) +client.apply(driver_trips) + +# %% [markdown] +# Start the streaming job and send avro record to EventHub: + +# %% +job = feast_spark.Client(client).start_stream_to_online_ingestion( + driver_trips +) + + +# %% +def send_avro_record_to_kafka(topic, record): + value_schema = avro.schema.parse(avro_schema_json) + writer = DatumWriter(value_schema) + bytes_writer = io.BytesIO() + encoder = BinaryEncoder(bytes_writer) + writer.write(record, encoder) + + conf = { + 'bootstrap.servers': 'xiaoyzhufeasttest.servicebus.windows.net:9093', #replace + 'security.protocol': 'SASL_SSL', + 'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': '$ConnectionString', + 'sasl.password': 'Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;', #replace + 'client.id': 'python-example-producer' + } + + + producer = Producer({ + **conf + }) + producer.produce(topic=topic, value=bytes_writer.getvalue()) + producer.flush() + + +# %% +# Note: depending on the Kafka configuration you may need to create the Kafka topic first, like below: +#from confluent_kafka.admin import AdminClient, NewTopic +#admin = AdminClient({'bootstrap.servers': KAFKA_BROKER}) +#new_topic = NewTopic('driver_trips', num_partitions=1, replication_factor=3) +#admin.create_topics(new_topic) +for record in trips_df.drop(columns=['created']).to_dict('record'): + # print("record", record) + record["datetime"] = ( + record["datetime"].to_pydatetime().replace(tzinfo=pytz.utc) + ) + + # send_avro_record_to_kafka(topic="driver_trips", record=record) + send_avro_record_to_kafka(topic=kafka_topic, record=record) + +# %% [markdown] +# ### Retrieving joined features from several feature tables + +# %% +entities_sample = np.random.choice(entities, 10, replace=False) +entities_sample = [{"driver_id": e} for e in entities_sample] +entities_sample + + +# %% +features = client.get_online_features( + feature_refs=["driver_statistics:avg_daily_trips", "driver_trips:trips_today"], + entity_rows=entities_sample).to_dict() + + +# %% +pd.DataFrame(features) + + +# %% +# This will stop the streaming job +job.cancel() + + +# %% + + + diff --git a/python/feast_spark/pyspark/abc.py b/python/feast_spark/pyspark/abc.py index 12cea667..6b757b14 100644 --- a/python/feast_spark/pyspark/abc.py +++ b/python/feast_spark/pyspark/abc.py @@ -424,6 +424,10 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, + bigtable_project: Optional[str] = None, + bigtable_instance: Optional[str] = None, + cassandra_host: Optional[str] = None, + cassandra_port: Optional[str] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, @@ -530,13 +534,17 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, + redis_auth: str, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, checkpoint_path: Optional[str] = None, stencil_url: Optional[str] = None, - drop_invalid_rows: bool = False, + drop_invalid_rows: Optional[bool] = False, + kafka_sasl_auth: Optional[str] = None, ): + stencil_url: Optional[str] = None, + drop_invalid_rows: bool = False, super().__init__( feature_table, source, diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 13b2cd5c..97865171 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -3,6 +3,7 @@ from typing import List, Optional, cast from azure.synapse.spark.models import SparkBatchJob +from azure.identity import DefaultAzureCredential, DeviceCodeCredential from feast_spark.pyspark.abc import ( BatchIngestionJob, @@ -137,6 +138,7 @@ def get_hash(self) -> str: def get_feature_table(self) -> str: return self._feature_table +login_credential_cache = None class SynapseJobLauncher(JobLauncher): """ @@ -151,7 +153,23 @@ def __init__( executor_size: str, executors: int ): - self._api = SynapseJobRunner(synapse_dev_url, pool_name, executor_size = executor_size, executors = executors) + tenant_id='72f988bf-86f1-41af-91ab-2d7cd011db47' + authority_host_uri = 'login.microsoftonline.com' + client_id = '04b07795-8ddb-461a-bbee-02f9e1bf7b46' + + global login_credential_cache + # use a global cache to store the credential, to avoid users from multiple login + + if login_credential_cache is None: + # self.credential = DefaultAzureCredential() + # use DeviceCodeCredential if DefaultAzureCredential is not available + # if self.credential is None: + self.credential = DeviceCodeCredential(client_id, authority=authority_host_uri, tenant=tenant_id) + login_credential_cache = self.credential + else: + self.credential = login_credential_cache + + self._api = SynapseJobRunner(synapse_dev_url, pool_name, executor_size = executor_size, executors = executors, credential=self.credential) self._datalake = DataLakeFiler(datalake_dir) def _job_from_job_info(self, job_info: SparkBatchJob) -> SparkJob: @@ -219,12 +237,6 @@ def offline_to_online_ingestion( """ main_file = self._datalake.upload_file(ingestion_job_params.get_main_file_path()) - print(main_file, ingestion_job_params.get_main_file_path()) - print(ingestion_job_params.get_class_name()) - # for arg in ingestion_job_params.get_arguments(): - # print(len(arg), arg) - # args = [x.replace(' ', '') for x in ingestion_job_params.get_arguments()] - # print(args) job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, main_class = ingestion_job_params.get_class_name(), diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py index 716b25b5..8e9d2cee 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py @@ -13,7 +13,7 @@ from azure.storage.filedatalake import DataLakeServiceClient -from feast_spark.pyspark.abc import SparkJobStatus, RetrievalJobParameters,BQ_SPARK_PACKAGE +from feast_spark.pyspark.abc import SparkJobStatus __all__ = [ "_cancel_job_by_id", @@ -147,15 +147,23 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No # SDK source code is here: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/synapse/azure-synapse # Exact code is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114 - - arguments = [elem.replace("}", " }") for elem in arguments] + # Adding spaces between brackets. This is to workaround this known YARN issue (when running Spark on YARN): + # https://issues.apache.org/jira/browse/SPARK-17814?focusedCommentId=15567964&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15567964 + # print(arguments) + updated_arguments = [] + for elem in arguments: + if type(elem) == str: + updated_arguments.append(elem.replace("}", " }")) + else: + updated_arguments.append(elem) + spark_batch_job_options = SparkBatchJobOptions( tags=tags, name=job_name, file=file, class_name=class_name, - arguments=arguments, + arguments=updated_arguments, jars=jars, files=files, archives=archives, @@ -166,13 +174,6 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No executor_cores=executor_cores, executor_count=self._executors) - # print("spark_batch_job_options", spark_batch_job_options) - # print("arguments", arguments, type(arguments)) - - # print(tags,job_name,file,class_name,arguments,jars,files,archives,configuration,driver_memory,driver_cores,executor_memory,executor_cores,self._executors) - - print("Final input argument:", arguments) - return self.client.spark_batch.create_spark_batch_job(spark_batch_job_options, detailed=True) diff --git a/python/setup.py b/python/setup.py index 354bdf7a..2759fa14 100644 --- a/python/setup.py +++ b/python/setup.py @@ -53,6 +53,7 @@ "mypy-protobuf==2.5", "croniter==1.*", "azure-synapse-spark", + "azure-synapse", "azure-identity", "azure-storage-file-datalake", "azure-storage-blob", diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index f7925ccc..227d319f 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -126,15 +126,14 @@ object IngestionJob { } def main(args: Array[String]): Unit = { - println("Debug... Received following argument:") - println(args.toList) val args_modified = new Array[String](args.length) for ( i <- 0 to (args_modified.length - 1)) { + // Removing spaces between brackets. This is to workaround this known YARN issue (when running Spark on YARN): + // https://issues.apache.org/jira/browse/SPARK-17814?focusedCommentId=15567964&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15567964 + // Also remove the unncessary back slashes args_modified(i) = args(i).replace(" }", "}"); args_modified(i) = args_modified(i).replace("\\", "\\\""); } - println("Remove additional spaces in args:") - println(args_modified.toList) parser.parse(args_modified, IngestionJobConfig()) match { case Some(config) => println(s"Starting with config $config") From f4c0d5a0ffc5508e0b9e02203716d271391ce3a9 Mon Sep 17 00:00:00 2001 From: snowmanmsft <84950230+snowmanmsft@users.noreply.github.com> Date: Thu, 9 Sep 2021 00:34:35 -0700 Subject: [PATCH 10/19] Delete feature_store_debug.py --- feature_store_debug.py | 558 ----------------------------------------- 1 file changed, 558 deletions(-) delete mode 100644 feature_store_debug.py diff --git a/feature_store_debug.py b/feature_store_debug.py deleted file mode 100644 index 5d11d774..00000000 --- a/feature_store_debug.py +++ /dev/null @@ -1,558 +0,0 @@ -# To add a new cell, type '# %%' -# To add a new markdown cell, type '# %% [markdown]' -# %% [markdown] -# # Ride Hailing Example -# %% [markdown] -# ![chart](https://feaststore.blob.core.windows.net/feastjar/FeastArchitectureNew.png) - -# %% -import io -import json -import os -from datetime import datetime -from urllib.parse import urlparse - -import avro.schema -import feast_spark -import numpy as np -import pandas as pd -import pyarrow.parquet as pq -import pytz -from avro.io import BinaryEncoder, DatumWriter -from azure.identity import ClientSecretCredential, DefaultAzureCredential -from azure.storage.filedatalake import DataLakeServiceClient -from confluent_kafka import Producer -from feast import Client, Entity, Feature, FeatureTable, ValueType -from feast.data_format import AvroFormat, ParquetFormat -from feast.data_source import FileSource, KafkaSource -from pyarrow.parquet import ParquetDataset -from google.protobuf.duration_pb2 import Duration -# %% [markdown] -# ## Introduction -# %% [markdown] -# For this demo, we will: -# -# 1. Register two driver features, one for driver statistics, the other for driver trips. Driver statistics are updated on daily basis, whereas driver trips are updated in real time. -# 2. Creates a driver dataset, then use Feast SDK to retrieve the features corresponding to these drivers from an offline store. -# 3. Store the features in an online store (Redis), and retrieve the features via Feast SDK. -# %% [markdown] -# ## Features Registry (Feast Core) -# %% [markdown] -# ### Configuration -# %% [markdown] -# Configurations can be provided in three different methods: - -# %% -# get_historical_features will return immediately once the Spark job has been submitted succesfully. -os.environ["FEAST_SPARK_LAUNCHER"] = "synapse" -os.environ["FEAST_SPARK_HOME"] = "/usr/local/spark" -os.environ["FEAST_azure_synapse_dev_url"] = "https://xiaoyzhuspark3synapse.dev.azuresynapse.net" -os.environ["FEAST_azure_synapse_pool_name"] = "xiaoyzhuspark3" - -# the datalake dir is the same with this one os.environ["FEAST_SPARK_STAGING_LOCATION"] = "wasbs://feasttest@feaststore.blob.core.windows.net/artifacts/" -os.environ["FEAST_AZURE_SYNAPSE_DATALAKE_DIR"] = "abfss://feastsparkstagingprivate@xiaoyzhusynapse.dfs.core.windows.net/feast" -os.environ["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = "abfss://feastsparkstagingprivate@xiaoyzhusynapse.dfs.core.windows.net/feast/out" -os.environ["FEAST_SPARK_STAGING_LOCATION"] = "wasbs://feasttest@feaststore.blob.core.windows.net/artifacts/" -os.environ["FEAST_SPARK_INGESTION_JAR"] = "https://feaststore.blob.core.windows.net/feastjar/feast-ingestion-spark-latest.jar" - -# Redis Config -os.environ["FEAST_REDIS_HOST"] = "feastredistest.redis.cache.windows.net" -os.environ["FEAST_REDIS_PORT"] = "6380" -os.environ["FEAST_REDIS_SSL"] = "true" -os.environ["FEAST_REDIS_AUTH"] = "ruWBZ6WZsjUk5lEnDirM9JGoV1UgtMFbAO5lWoRY1QQ=" - -# EventHub config -os.environ["FEAST_AZURE_EVENTHUB_KAFKA_CONNECTION_STRING"] = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=driver_trips\";" - -os.environ["FEAST_AZURE_BLOB_ACCOUNT_NAME"] = "feaststore" -os.environ["FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY"] = "0V7PybxIprcykx3UEfygMTgRIn7pBH794KymizfArYArlB9OsQoVua32iJc5SkSpJZDPoFzDw4lAC2jGIuvAfg==" -os.environ["FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT"] = "parquet" - -# Xiaoyong's app, name: ml-auth-xiaoyzhu -os.environ['AZURE_CLIENT_ID'] = 'b92d6810-7e28-4380-89d8-103ad00e9acd' -os.environ['AZURE_TENANT_ID'] = '72f988bf-86f1-41af-91ab-2d7cd011db47' -os.environ['AZURE_CLIENT_SECRET'] = '_l5mz-6vj2c97Nb0YeNfs0Axa-Yhd0Q~u_' - - -# %% -# Using environmental variables -import os -os.environ["FEAST_CORE_URL"] = "20.62.162.242:6565" -os.environ["FEAST_SERVING_URL"] = "20.62.162.242:6566" - -# Provide a map during client initialization -# options = { -# "CORE_URL": "core:6565", -# "SERVING_URL": "online_serving:6566", -# } -# client = Client(options) - -# As keyword arguments, without the `FEAST` prefix -# client = Client(core_url="core:6565", serving_url="online_serving:6566") - -# %% [markdown] -# If you are following the quick start guide, all required configurations to follow the remainder of the tutorial should have been setup, in the form of environmental variables, as showned below. The configuration values may differ depending on the environment. For a full list of configurable values and explanation, please refer to the user guide. - -# %% -import os -from pprint import pprint -pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")}) - -# %% [markdown] -# ### Basic Imports and Feast Client initialization - -# %% -import os - -from feast import Client, Feature, Entity, ValueType, FeatureTable -from feast.data_source import FileSource, KafkaSource -from feast.data_format import ParquetFormat, AvroFormat - - -# %% -client = Client(core_url='13.66.211.41:6565') - -# %% [markdown] -# ### Declare Features and Entities -# %% [markdown] -# Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. - -# %% -driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64) - - -# %% -# Daily updated features -acc_rate = Feature("acc_rate", ValueType.FLOAT) -conv_rate = Feature("conv_rate", ValueType.FLOAT) -avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32) - -# Real-time updated features -trips_today = Feature("trips_today", ValueType.INT32) - -# %% [markdown] -# ```python -# FeatureTable( -# name = "driver_statistics", -# entities = ["driver_id"], -# features = [ -# acc_rate, -# conv_rate, -# avg_daily_trips -# ] -# -# ) -# ``` -# -# -# ```python -# FeatureTable( -# name = "driver_trips", -# entities = ["driver_id"], -# features = [ -# trips_today -# ] -# -# ) -# -# ``` -# %% [markdown] -# ![Features Join](https://raw.githubusercontent.com/feast-dev/feast-spark-examples/main/minimal/images/features-join.png) -# %% [markdown] -# ```python -# FeatureTable( -# ..., -# batch_source=FileSource( # Required -# file_format=ParquetFormat(), -# file_url="abfss://feast-demo-data-lake", -# ... -# ), -# stream_source=KafkaSource( # Optional -# bootstrap_servers="...", -# topic="driver_trips", -# ... -# ) -# ``` -# %% [markdown] -# Feature tables group the features together and describe how they can be retrieved. The following examples assume that the feature tables are stored on the local file system, and is accessible from the Spark cluster. If you have setup a GCP service account, you may use GCS instead as the file source. -# %% [markdown] -# `batch_source` defines where the historical features are stored. It is also possible to have an optional `stream_source`, which the feature values are delivered continuously. -# -# For now we will define only `batch_source` for both `driver_statistics` and `driver_trips`, and demonstrate the usage of `stream_source` in later part of the tutorial. - -# %% -# This is the location we're using for the offline feature store. - -import os -demo_data_location = "wasbs://feasttest@feaststore.blob.core.windows.net/" - - -# %% -driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics") - -driver_statistics = FeatureTable( - name = "driver_statistics", - entities = ["driver_id"], - features = [ - acc_rate, - conv_rate, - avg_daily_trips - ], - max_age=Duration(seconds=86400 * 1), - batch_source=FileSource( - event_timestamp_column="datetime", - created_timestamp_column="created", - file_format=ParquetFormat(), - file_url=driver_statistics_source_uri, - date_partition_column="date" - ) -) - - -# %% -driver_trips_source_uri = os.path.join(demo_data_location, "driver_trips") - - -driver_trips = FeatureTable( - name = "driver_trips", - entities = ["driver_id"], - features = [ - trips_today - ], - max_age=Duration(seconds=86400 * 1), - batch_source=FileSource( - event_timestamp_column="datetime", - created_timestamp_column="created", - file_format=ParquetFormat(), - file_url=driver_trips_source_uri, - date_partition_column="date" - ) -) - -# %% [markdown] -# ### Registering entities and feature tables in Feast Core - -# %% -client.apply(driver_id) -client.apply(driver_statistics) -client.apply(driver_trips) - - -# %% -print(client.get_feature_table("driver_statistics").to_yaml()) -print(client.get_feature_table("driver_trips").to_yaml()) - -# %% [markdown] -# ### Populating batch source -# %% [markdown] -# Feast is agnostic to how the batch source is populated, as long as it complies to the Feature Table specification. Therefore, any existing ETL tools can be used for the purpose of data ingestion. Alternatively, you can also use Feast SDK to ingest a Panda Dataframe to the batch source. - -# %% -import pandas as pd -import numpy as np -from datetime import datetime - - -# %% -def generate_entities(): - return np.random.choice(999999, size=100, replace=False) - - -# %% -def generate_trips(entities): - df = pd.DataFrame(columns=["driver_id", "trips_today", "datetime", "created"]) - df['driver_id'] = entities - df['trips_today'] = np.random.randint(0, 1000, size=100).astype(np.int32) - df['datetime'] = pd.to_datetime( - np.random.randint( - datetime(2020, 10, 10).timestamp(), - datetime(2020, 10, 20).timestamp(), - size=100), - unit="s" - ) - df['created'] = pd.to_datetime(datetime.now()) - return df - - - -# %% -def generate_stats(entities): - df = pd.DataFrame(columns=["driver_id", "conv_rate", "acc_rate", "avg_daily_trips", "datetime", "created"]) - df['driver_id'] = entities - df['conv_rate'] = np.random.random(size=100).astype(np.float32) - df['acc_rate'] = np.random.random(size=100).astype(np.float32) - df['avg_daily_trips'] = np.random.randint(0, 1000, size=100).astype(np.int32) - df['datetime'] = pd.to_datetime( - np.random.randint( - datetime(2020, 10, 10).timestamp(), - datetime(2020, 10, 20).timestamp(), - size=100), - unit="s" - ) - df['created'] = pd.to_datetime(datetime.now()) - return df - - -# %% -entities = generate_entities() -stats_df = generate_stats(entities) -trips_df = generate_trips(entities) - - -# %% -client.ingest(driver_statistics, stats_df) -client.ingest(driver_trips, trips_df) - -# %% [markdown] -# ## Historical Retrieval For Training -# %% [markdown] -# ### Point-in-time correction -# %% [markdown] -# ![Point In Time](https://raw.githubusercontent.com/feast-dev/feast-spark-examples/main/minimal/images/pit-1.png) -# %% [markdown] -# Feast joins the features to the entities based on the following conditions: -# -# 1. Entity primary key(s) value matches. -# 2. Feature event timestamp is the closest match possible to the entity event timestamp, -# but must not be more recent than the entity event timestamp, and the difference must -# not be greater than the maximum age specified in the feature table, unless the maximum age is not specified. -# 3. If more than one feature table rows satisfy condition 1 and 2, feature row with the -# most recent created timestamp will be chosen. -# 4. If none of the above conditions are satisfied, the feature rows will have null values. - -# %% -from pyarrow.parquet import ParquetDataset -from urllib.parse import urlparse - - -# %% -def read_parquet(uri): - parsed_uri = urlparse(uri) - if parsed_uri.scheme == "file": - return pd.read_parquet(parsed_uri.path) - elif parsed_uri.scheme == 'wasbs': - import adlfs - fs = adlfs.AzureBlobFileSystem( - account_name=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_NAME'), account_key=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY') - ) - uripath = parsed_uri.username + parsed_uri.path - files = fs.glob(uripath + '/part-*') - ds = ParquetDataset(files, filesystem=fs) - return ds.read().to_pandas() - elif parsed_uri.scheme == 'abfss': - credential = ClientSecretCredential(os.getenv('AZURE_TENANT_ID'), os.getenv('AZURE_CLIENT_ID'), os.getenv('AZURE_CLIENT_SECRET')) - # credential = DefaultAzureCredential() - datalake = parsed_uri.netloc.split('@') - service_client = DataLakeServiceClient(account_url="https://" + datalake[1], credential=credential) - file_system_client = service_client.get_file_system_client(datalake[0]) - file_client = file_system_client.get_file_client(parsed_uri.path) - data = file_client.download_file(0) - with io.BytesIO() as b: - data.readinto(b) - table = pq.read_table(b) - print(table) - return table - else: - raise ValueError(f"Unsupported URL scheme {uri}") - - -# %% -entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp']) -entities_with_timestamp['driver_id'] = np.random.choice(entities, 10, replace=False) -entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint( - datetime(2020, 10, 18).timestamp(), - datetime(2020, 10, 20).timestamp(), - size=10), unit='s') -entities_with_timestamp - - -# %% -job = feast_spark.Client(client).get_historical_features( - feature_refs=[ - "driver_statistics:avg_daily_trips", - "driver_statistics:conv_rate", - "driver_statistics:acc_rate", - "driver_trips:trips_today" - ], - entity_source=entities_with_timestamp -) - -# %% [markdown] -# ![Spark Job](https://feaststore.blob.core.windows.net/feastjar/SparkJobSubmission.PNG) -# - -# %% -# get_output_file_uri will block until the Spark job is completed. -# output_file_uri = job.get_output_file_uri() - -# print("output path:", output_file_uri) -# %% -# read_parquet(output_file_uri) - -# %% [markdown] -# The retrieved result can now be used for model training. -# %% [markdown] -# ## Populating Online Storage with Batch Ingestion -# %% [markdown] -# In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store. - -# %% -job = feast_spark.Client(client).start_offline_to_online_ingestion( - driver_statistics, - datetime(2020, 10, 10), - datetime(2020, 10, 20) -) - - -# %% -# It will take some time before the Spark Job is completed -job.get_status() - -# %% [markdown] -# Once the job is completed, the SDK can be used to retrieve the result from the online store. - -# %% -entities_sample = np.random.choice(entities, 10, replace=False) -entities_sample = [{"driver_id": e} for e in entities_sample] -entities_sample - - -# %% -features = client.get_online_features( - feature_refs=["driver_statistics:avg_daily_trips"], - entity_rows=entities_sample).to_dict() -features - - -# %% -# pd.DataFrame(features) - -# %% [markdown] -# The features can now be used as an input to the trained model. -# %% [markdown] -# ## Bonus: Ingestion from Streaming Source - EventHub -# %% [markdown] -# With a streaming source, we can use Feast SDK to launch a Spark streaming job that continuously update the online store. First, we will update `driver_trips` feature table such that a new streaming source is added. - -# %% - - - -# %% -import json -import pytz -import io -import avro.schema -from avro.io import BinaryEncoder, DatumWriter -from confluent_kafka import Producer - - -# %% -# Change this to any Kafka broker addresses which is accessible by the spark cluster -KAFKA_BROKER = "xiaoyzhufeasttest.servicebus.windows.net:9093" - - -# %% -avro_schema_json = json.dumps({ - "type": "record", - "name": "DriverTrips", - "fields": [ - {"name": "driver_id", "type": "long"}, - {"name": "trips_today", "type": "int"}, - { - "name": "datetime", - "type": {"type": "long", "logicalType": "timestamp-micros"}, - }, - ], -}) - - -# %% -kafka_topic = "driver_trips" -driver_trips.stream_source = KafkaSource( - event_timestamp_column="datetime", - created_timestamp_column="datetime", - bootstrap_servers=KAFKA_BROKER, - topic=kafka_topic, - message_format=AvroFormat(avro_schema_json) -) -client.apply(driver_trips) - -# %% [markdown] -# Start the streaming job and send avro record to EventHub: - -# %% -job = feast_spark.Client(client).start_stream_to_online_ingestion( - driver_trips -) - - -# %% -def send_avro_record_to_kafka(topic, record): - value_schema = avro.schema.parse(avro_schema_json) - writer = DatumWriter(value_schema) - bytes_writer = io.BytesIO() - encoder = BinaryEncoder(bytes_writer) - writer.write(record, encoder) - - conf = { - 'bootstrap.servers': 'xiaoyzhufeasttest.servicebus.windows.net:9093', #replace - 'security.protocol': 'SASL_SSL', - 'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt', - 'sasl.mechanism': 'PLAIN', - 'sasl.username': '$ConnectionString', - 'sasl.password': 'Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;', #replace - 'client.id': 'python-example-producer' - } - - - producer = Producer({ - **conf - }) - producer.produce(topic=topic, value=bytes_writer.getvalue()) - producer.flush() - - -# %% -# Note: depending on the Kafka configuration you may need to create the Kafka topic first, like below: -#from confluent_kafka.admin import AdminClient, NewTopic -#admin = AdminClient({'bootstrap.servers': KAFKA_BROKER}) -#new_topic = NewTopic('driver_trips', num_partitions=1, replication_factor=3) -#admin.create_topics(new_topic) -for record in trips_df.drop(columns=['created']).to_dict('record'): - # print("record", record) - record["datetime"] = ( - record["datetime"].to_pydatetime().replace(tzinfo=pytz.utc) - ) - - # send_avro_record_to_kafka(topic="driver_trips", record=record) - send_avro_record_to_kafka(topic=kafka_topic, record=record) - -# %% [markdown] -# ### Retrieving joined features from several feature tables - -# %% -entities_sample = np.random.choice(entities, 10, replace=False) -entities_sample = [{"driver_id": e} for e in entities_sample] -entities_sample - - -# %% -features = client.get_online_features( - feature_refs=["driver_statistics:avg_daily_trips", "driver_trips:trips_today"], - entity_rows=entities_sample).to_dict() - - -# %% -pd.DataFrame(features) - - -# %% -# This will stop the streaming job -job.cancel() - - -# %% - - - From 6bc9260443fc686462628cf3f221c62a8291e4f3 Mon Sep 17 00:00:00 2001 From: snowmanmsft <84950230+snowmanmsft@users.noreply.github.com> Date: Mon, 13 Sep 2021 20:05:59 -0700 Subject: [PATCH 11/19] Update StreamingPipeline.scala --- .../src/main/scala/feast/ingestion/StreamingPipeline.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index 9a31eb04..fd5b7474 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -63,7 +63,7 @@ object StreamingPipeline extends BasePipeline with Serializable { val metrics = new IngestionPipelineMetrics val validationUDF = createValidationUDF(sparkSession, config) - val EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=driver_trips\";" + val EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyy=;EntityPath=driver_trips\";" val input = config.source match { case source: KafkaSource => From 9fde23576e2c7b95c11ccd34d56c25ed7eee7c5c Mon Sep 17 00:00:00 2001 From: snowmanmsft <84950230+snowmanmsft@users.noreply.github.com> Date: Thu, 30 Sep 2021 23:18:55 -0700 Subject: [PATCH 12/19] Update synapse.py still use service principal --- python/feast_spark/pyspark/launchers/synapse/synapse.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 97865171..5ce7c867 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -159,11 +159,13 @@ def __init__( global login_credential_cache # use a global cache to store the credential, to avoid users from multiple login - + + # if self.credential is None: + # prioritize using service principal + self.credential = DefaultAzureCredential() + login_credential_cache = self.credential if login_credential_cache is None: - # self.credential = DefaultAzureCredential() # use DeviceCodeCredential if DefaultAzureCredential is not available - # if self.credential is None: self.credential = DeviceCodeCredential(client_id, authority=authority_host_uri, tenant=tenant_id) login_credential_cache = self.credential else: From bb3d6be54f6e42f45925696db662fb29ca297e13 Mon Sep 17 00:00:00 2001 From: snowmanmsft <84950230+snowmanmsft@users.noreply.github.com> Date: Fri, 1 Oct 2021 01:33:41 -0700 Subject: [PATCH 13/19] Update synapse.py Change the authentication chain (first using Environmental variables then using device login) --- .../feast_spark/pyspark/launchers/synapse/synapse.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 5ce7c867..471d1352 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -160,19 +160,15 @@ def __init__( global login_credential_cache # use a global cache to store the credential, to avoid users from multiple login - # if self.credential is None: - # prioritize using service principal - self.credential = DefaultAzureCredential() - login_credential_cache = self.credential if login_credential_cache is None: - # use DeviceCodeCredential if DefaultAzureCredential is not available - self.credential = DeviceCodeCredential(client_id, authority=authority_host_uri, tenant=tenant_id) + # use DeviceCodeCredential if EnvironmentCredential is not available + self.credential = ChainedTokenCredential(EnvironmentCredential(), DeviceCodeCredential(client_id, authority=authority_host_uri, tenant=tenant_id)) login_credential_cache = self.credential else: self.credential = login_credential_cache self._api = SynapseJobRunner(synapse_dev_url, pool_name, executor_size = executor_size, executors = executors, credential=self.credential) - self._datalake = DataLakeFiler(datalake_dir) + self._datalake = DataLakeFiler(datalake_dir,credential=self.credential) def _job_from_job_info(self, job_info: SparkBatchJob) -> SparkJob: job_type = job_info.tags[LABEL_JOBTYPE] From 08da84f6286a04d678ce436ee17d0978057b93de Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 11 Oct 2021 17:57:24 -0700 Subject: [PATCH 14/19] Fix Redis auth issue --- python/feast_spark/pyspark/abc.py | 7 ++++++- python/feast_spark/pyspark/launcher.py | 1 + python/feast_spark/pyspark/launchers/synapse/synapse.py | 4 +++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/python/feast_spark/pyspark/abc.py b/python/feast_spark/pyspark/abc.py index 6b757b14..9b205c78 100644 --- a/python/feast_spark/pyspark/abc.py +++ b/python/feast_spark/pyspark/abc.py @@ -343,6 +343,7 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, + redis_auth: str, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, @@ -355,6 +356,7 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl + self._redis_auth = redis_auth self._statsd_host = statsd_host self._statsd_port = statsd_port self._deadletter_path = deadletter_path @@ -362,7 +364,7 @@ def __init__( self._drop_invalid_rows = drop_invalid_rows def _get_redis_config(self): - return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl) + return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl, auth=self._redis_auth) def _get_statsd_config(self): return ( @@ -424,6 +426,7 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, + redis_auth: str, bigtable_project: Optional[str] = None, bigtable_instance: Optional[str] = None, cassandra_host: Optional[str] = None, @@ -440,6 +443,7 @@ def __init__( redis_host, redis_port, redis_ssl, + redis_auth, statsd_host, statsd_port, deadletter_path, @@ -552,6 +556,7 @@ def __init__( redis_host, redis_port, redis_ssl, + redis_auth, statsd_host, statsd_port, deadletter_path, diff --git a/python/feast_spark/pyspark/launcher.py b/python/feast_spark/pyspark/launcher.py index 6ae99bb2..f8f8dc2d 100644 --- a/python/feast_spark/pyspark/launcher.py +++ b/python/feast_spark/pyspark/launcher.py @@ -360,6 +360,7 @@ def start_offline_to_online_ingestion( redis_port=bool(client.config.get(opt.REDIS_HOST)) and client.config.getint(opt.REDIS_PORT), redis_ssl=client.config.getboolean(opt.REDIS_SSL), + redis_auth=client.config.get(opt.REDIS_AUTH), bigtable_project=client.config.get(opt.BIGTABLE_PROJECT), bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE), cassandra_host=client.config.get(opt.CASSANDRA_HOST), diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 471d1352..78e15a62 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -3,7 +3,7 @@ from typing import List, Optional, cast from azure.synapse.spark.models import SparkBatchJob -from azure.identity import DefaultAzureCredential, DeviceCodeCredential +from azure.identity import DefaultAzureCredential, DeviceCodeCredential, ChainedTokenCredential, ManagedIdentityCredential,EnvironmentCredential from feast_spark.pyspark.abc import ( BatchIngestionJob, @@ -239,6 +239,7 @@ def offline_to_online_ingestion( job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, main_class = ingestion_job_params.get_class_name(), arguments = ingestion_job_params.get_arguments(), + reference_files=[main_file], tags = _prepare_job_tags(ingestion_job_params, OFFLINE_TO_ONLINE_JOB_TYPE),configuration=None) return cast(BatchIngestionJob, self._job_from_job_info(job_info)) @@ -269,6 +270,7 @@ def start_stream_to_online_ingestion( main_class = ingestion_job_params.get_class_name(), arguments = ingestion_job_params.get_arguments(), reference_files = extra_jar_paths, + configuration=None, tags = tags) return cast(StreamIngestionJob, self._job_from_job_info(job_info)) From 0ddbceff1c510d0747040724c7062e8b1424b543 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 12 Oct 2021 12:40:50 -0700 Subject: [PATCH 15/19] Update Ingestion jobs and add supporting files --- Makefile | 10 ++++- pom.xml | 4 +- python/feast_spark/copy_to_azure_blob.py | 40 +++++++++++++++++++ spark/ingestion/pom.xml | 6 +++ .../scala/feast/ingestion/IngestionJob.scala | 1 + .../feast/ingestion/IngestionJobConfig.scala | 4 +- .../feast/ingestion/StreamingPipeline.scala | 3 +- 7 files changed, 62 insertions(+), 6 deletions(-) create mode 100644 python/feast_spark/copy_to_azure_blob.py diff --git a/Makefile b/Makefile index a3d83b56..707d291e 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ build-local-test-docker: docker build -t feast:local -f infra/docker/tests/Dockerfile . build-ingestion-jar-no-tests: - cd spark/ingestion && ${MVN} --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true -DskipITs=true -Drevision=${REVISION} clean package + cd spark/ingestion && ${MVN} --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true -D"spotless.check.skip"=true -DskipITs=true -Drevision=${REVISION} clean package build-jobservice-docker: docker build -t $(REGISTRY)/feast-jobservice:$(VERSION) -f infra/docker/jobservice/Dockerfile . @@ -68,3 +68,11 @@ push-spark-docker: docker push $(REGISTRY)/feast-spark:$(VERSION) install-ci-dependencies: install-python-ci-dependencies + +build-ingestion-jar-push: + docker build -t $(REGISTRY)/feast-spark:$(VERSION) --build-arg VERSION=$(VERSION) -f infra/docker/spark/Dockerfile . + rm -f feast-ingestion-spark-latest.jar + docker create -ti --name dummy $(REGISTRY)/feast-spark:latest bash + docker cp dummy:/opt/spark/jars/feast-ingestion-spark-latest.jar feast-ingestion-spark-latest.jar + docker rm -f dummy + python copy_to_azure_blob.py \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4eeffaac..0b7f608b 100644 --- a/pom.xml +++ b/pom.xml @@ -18,8 +18,8 @@ 1.8 1.8 2.12 - ${scala.version}.12 - 3.0.2 + ${scala.version}.10 + 3.1.2 4.4.0 3.3.0 3.12.2 diff --git a/python/feast_spark/copy_to_azure_blob.py b/python/feast_spark/copy_to_azure_blob.py new file mode 100644 index 00000000..dc1f70c6 --- /dev/null +++ b/python/feast_spark/copy_to_azure_blob.py @@ -0,0 +1,40 @@ +# coding: utf-8 + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +""" +FILE: blob_samples_copy_blob.py +DESCRIPTION: + This sample demos how to copy a blob from a URL. +USAGE: python blob_samples_copy_blob.py + Set the environment variables with your own values before running the sample. + 1) AZURE_STORAGE_CONNECTION_STRING - the connection string to your storage account +""" + +from __future__ import print_function +import os +import sys +import time +from azure.storage.blob import BlobServiceClient + +def main(): + try: + CONNECTION_STRING = os.environ['AZURE_STORAGE_CONNECTION_STRING'] + + except KeyError: + print("AZURE_STORAGE_CONNECTION_STRING must be set.") + sys.exit(1) + + blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) + copied_blob = blob_service_client.get_blob_client("feastjar", 'feast-ingestion-spark-latest.jar') + SOURCE_FILE = "../../feast-ingestion-spark-latest.jar" + + with open(SOURCE_FILE, "rb") as data: + copied_blob.upload_blob(data, blob_type="BlockBlob",overwrite=True) + +if __name__ == "__main__": + main() diff --git a/spark/ingestion/pom.xml b/spark/ingestion/pom.xml index d80d5645..b8bb1612 100644 --- a/spark/ingestion/pom.xml +++ b/spark/ingestion/pom.xml @@ -51,6 +51,12 @@ ${protobuf.version} + + com.microsoft.azure + azure-eventhubs-spark_2.12 + 2.3.18 + + org.glassfish diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index 227d319f..3ce736a2 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -134,6 +134,7 @@ object IngestionJob { args_modified(i) = args(i).replace(" }", "}"); args_modified(i) = args_modified(i).replace("\\", "\\\""); } + println("arguments received:",args_modified.toList) parser.parse(args_modified, IngestionJobConfig()) match { case Some(config) => println(s"Starting with config $config") diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 90f43b04..633de5e8 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -26,7 +26,7 @@ object Modes extends Enumeration { abstract class StoreConfig -case class RedisConfig(host: String, port: Int, ssl: Boolean) extends StoreConfig +case class RedisConfig(host: String, port: Int, auth: String, ssl: Boolean) extends StoreConfig case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig case class CassandraConfig( connection: CassandraConnection, @@ -137,6 +137,6 @@ case class IngestionJobConfig( streamingTriggeringSecs: Int = 0, validationConfig: Option[ValidationConfig] = None, doNotIngestInvalidRows: Boolean = false, - checkpointPath: Option[String] = None + checkpointPath: Option[String] = None, kafkaSASL: Option[String] = None ) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index 05158cd7..33f5746e 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -83,6 +83,7 @@ object StreamingPipeline extends BasePipeline with Serializable { if (config.kafkaSASL.nonEmpty) { // if we have authentication enabled + println("config.kafkaSASL value:", config.kafkaSASL.get) sparkSession.readStream .format("kafka") .option("subscribe", source.topic) @@ -97,6 +98,7 @@ object StreamingPipeline extends BasePipeline with Serializable { } else { + println("config.kafkaSASL is empty.") sparkSession.readStream .format("kafka") .option("kafka.bootstrap.servers", source.bootstrapServers) @@ -106,7 +108,6 @@ object StreamingPipeline extends BasePipeline with Serializable { case source: MemoryStreamingSource => source.read } - val featureStruct = config.source.asInstanceOf[StreamingSource].format match { case ProtoFormat(classPath) => val parser = protoParser(sparkSession, classPath) From 48a1c449aade507a644840d97e58fe15bd8d70e5 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 12 Oct 2021 13:25:15 -0700 Subject: [PATCH 16/19] Fix build issues --- Makefile | 2 +- python/feast_spark/copy_to_azure_blob.py | 3 ++- .../main/scala/feast/ingestion/IngestionJob.scala | 8 +++----- .../scala/feast/ingestion/IngestionJobConfig.scala | 12 +----------- 4 files changed, 7 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 707d291e..55beb3c3 100644 --- a/Makefile +++ b/Makefile @@ -75,4 +75,4 @@ build-ingestion-jar-push: docker create -ti --name dummy $(REGISTRY)/feast-spark:latest bash docker cp dummy:/opt/spark/jars/feast-ingestion-spark-latest.jar feast-ingestion-spark-latest.jar docker rm -f dummy - python copy_to_azure_blob.py \ No newline at end of file + python python/feast_spark/copy_to_azure_blob.py \ No newline at end of file diff --git a/python/feast_spark/copy_to_azure_blob.py b/python/feast_spark/copy_to_azure_blob.py index dc1f70c6..7f8f719f 100644 --- a/python/feast_spark/copy_to_azure_blob.py +++ b/python/feast_spark/copy_to_azure_blob.py @@ -31,7 +31,8 @@ def main(): blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) copied_blob = blob_service_client.get_blob_client("feastjar", 'feast-ingestion-spark-latest.jar') - SOURCE_FILE = "../../feast-ingestion-spark-latest.jar" + # hard code to the current path + SOURCE_FILE = "./feast-ingestion-spark-latest.jar" with open(SOURCE_FILE, "rb") as data: copied_blob.upload_blob(data, blob_type="BlockBlob",overwrite=True) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index 3ce736a2..8b80e46c 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -53,10 +53,9 @@ object IngestionJob { case (_, x) => x } .extract[Sources] match { - case Sources(file: Some[FileSource], _, _, _) => c.copy(source = file.get) - case Sources(_, bq: Some[BQSource], _, _) => c.copy(source = bq.get) - case Sources(_, _, kafka: Some[KafkaSource], _) => c.copy(source = kafka.get) - case Sources(_, _, _, eventhub: Some[EventHubSource]) => c.copy(source = eventhub.get) + case Sources(file: Some[FileSource], _, _) => c.copy(source = file.get) + case Sources(_, bq: Some[BQSource], _) => c.copy(source = bq.get) + case Sources(_, _, kafka: Some[KafkaSource]) => c.copy(source = kafka.get) } }) .required() @@ -119,7 +118,6 @@ object IngestionJob { opt[Int](name = "triggering-interval") .action((x, c) => c.copy(streamingTriggeringSecs = x)) - } opt[String](name = "kafka_sasl_auth") .action((x, c) => c.copy(kafkaSASL = Some(x))) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 633de5e8..8e9a8fe3 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -90,21 +90,11 @@ case class KafkaSource( override val datePartitionColumn: Option[String] = None ) extends StreamingSource -case class EventHubSource( - bootstrapServers: String, - topic: String, - override val format: DataFormat, - override val fieldMapping: Map[String, String], - override val eventTimestampColumn: String, - override val createdTimestampColumn: Option[String] = None, - override val datePartitionColumn: Option[String] = None - ) extends StreamingSource case class Sources( file: Option[FileSource] = None, bq: Option[BQSource] = None, - kafka: Option[KafkaSource] = None, - eventhub: Option[EventHubSource] = None + kafka: Option[KafkaSource] = None ) case class Field(name: String, `type`: feast.proto.types.ValueProto.ValueType.Enum) From 762386e33269ac7c7ab19c5d0fbd35801e0ab9b9 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 12 Oct 2021 13:54:58 -0700 Subject: [PATCH 17/19] Add support for Kafka ingestion --- python/feast_spark/pyspark/abc.py | 3 +++ python/feast_spark/pyspark/launchers/synapse/synapse.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/feast_spark/pyspark/abc.py b/python/feast_spark/pyspark/abc.py index 9b205c78..154f8d08 100644 --- a/python/feast_spark/pyspark/abc.py +++ b/python/feast_spark/pyspark/abc.py @@ -565,6 +565,7 @@ def __init__( ) self._extra_jars = extra_jars self._checkpoint_path = checkpoint_path + self._kafka_sasl_auth = kafka_sasl_auth def get_name(self) -> str: return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" @@ -580,6 +581,8 @@ def get_arguments(self) -> List[str]: args.extend(["--mode", "online"]) if self._checkpoint_path: args.extend(["--checkpoint-path", self._checkpoint_path]) + if self._kafka_sasl_auth: + args.extend(["--kafka_sasl_auth", self._kafka_sasl_auth]) return args def get_job_hash(self) -> str: diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index 78e15a62..f66f6955 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -236,7 +236,7 @@ def offline_to_online_ingestion( main_file = self._datalake.upload_file(ingestion_job_params.get_main_file_path()) - job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, + job_info = _submit_job(self._api, ingestion_job_params.get_project()+"_offline_to_online_ingestion", main_file, main_class = ingestion_job_params.get_class_name(), arguments = ingestion_job_params.get_arguments(), reference_files=[main_file], @@ -266,7 +266,7 @@ def start_stream_to_online_ingestion( tags = _prepare_job_tags(ingestion_job_params, STREAM_TO_ONLINE_JOB_TYPE) tags[METADATA_JOBHASH] = ingestion_job_params.get_job_hash() - job_info = _submit_job(self._api, ingestion_job_params.get_project(), main_file, + job_info = _submit_job(self._api, ingestion_job_params.get_project()+"_stream_to_online_ingestion", main_file, main_class = ingestion_job_params.get_class_name(), arguments = ingestion_job_params.get_arguments(), reference_files = extra_jar_paths, From 41fc4067b273c01169466f5d21b58f6ca01e0236 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 12 Oct 2021 14:05:24 -0700 Subject: [PATCH 18/19] Add build and push instructions --- README.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0e00aad6..44c8ff0c 100644 --- a/README.md +++ b/README.md @@ -57,4 +57,14 @@ client.apply(entity, ft) # Start spark streaming ingestion job that reads from kafka and writes to the online store feast_spark.Client(client).start_stream_to_online_ingestion(ft) -``` \ No newline at end of file +``` + +Build and push to BLOB storage + +In order to build the Spark Ingestion jar and copy it to BLOB storage, you have to set these 3 environment variables: + +```bash +export VERSION=latest +export REGISTRY=your_registry_name +export AZURE_STORAGE_CONNECTION_STRING="your_azure_storage_connection_string" +``` From 0f7d433c2e779c44806182f23dc63e591bffba8f Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 26 Oct 2021 23:06:24 -0700 Subject: [PATCH 19/19] Adding License --- python/feast_spark/pyspark/launchers/synapse/__init__.py | 2 ++ python/feast_spark/pyspark/launchers/synapse/synapse.py | 2 ++ python/feast_spark/pyspark/launchers/synapse/synapse_utils.py | 3 +++ 3 files changed, 7 insertions(+) diff --git a/python/feast_spark/pyspark/launchers/synapse/__init__.py b/python/feast_spark/pyspark/launchers/synapse/__init__.py index 363312a1..59dadfa4 100644 --- a/python/feast_spark/pyspark/launchers/synapse/__init__.py +++ b/python/feast_spark/pyspark/launchers/synapse/__init__.py @@ -1,3 +1,5 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. from .synapse import ( SynapseBatchIngestionJob, SynapseJobLauncher, diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse.py b/python/feast_spark/pyspark/launchers/synapse/synapse.py index f66f6955..3a42a95a 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse.py @@ -1,3 +1,5 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. import time from datetime import datetime from typing import List, Optional, cast diff --git a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py index 8e9d2cee..91fba2eb 100644 --- a/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py +++ b/python/feast_spark/pyspark/launchers/synapse/synapse_utils.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + import os import re import hashlib