-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add mysql as online store #3190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
3ad3773
update remove debug
hao-affirm 7d98a4a
add interface
hao-affirm cc13d24
fix lint
hao-affirm 0527d5d
fix lint
hao-affirm 312af8b
update sql
hao-affirm 7bd0662
update ci reqs
hao-affirm 9a547bb
remove pip index
hao-affirm 5d84e06
format
hao-affirm edceadf
fix unit test issue
hao-affirm 28090e5
fix lint issue
hao-affirm 7f2afd6
add entity_key_serialization_version
hao-affirm 84b5246
update doc
hao-affirm d17ffe6
format
hao-affirm a1aed88
format
hao-affirm 5717d8c
fix makefile typo
hao-affirm 9f64788
move to func
hao-affirm 0d7d831
format
hao-affirm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| from __future__ import absolute_import | ||
|
|
||
| from datetime import datetime | ||
| from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple | ||
|
|
||
| import pymysql | ||
| import pytz | ||
| from pydantic import StrictStr | ||
| from pymysql.connections import Connection | ||
|
|
||
| from feast import Entity, FeatureView, RepoConfig | ||
| from feast.infra.key_encoding_utils import serialize_entity_key | ||
| from feast.infra.online_stores.online_store import OnlineStore | ||
| from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto | ||
| from feast.protos.feast.types.Value_pb2 import Value as ValueProto | ||
| from feast.repo_config import FeastConfigBaseModel | ||
|
|
||
|
|
||
| class MySQLOnlineStoreConfig(FeastConfigBaseModel): | ||
| """ | ||
| Configuration for the MySQL online store. | ||
| NOTE: The class *must* end with the `OnlineStoreConfig` suffix. | ||
| """ | ||
|
|
||
| type = "mysql" | ||
|
|
||
| host: Optional[StrictStr] = None | ||
| user: Optional[StrictStr] = None | ||
| password: Optional[StrictStr] = None | ||
| database: Optional[StrictStr] = None | ||
| port: Optional[int] = None | ||
|
|
||
|
|
||
| class MySQLOnlineStore(OnlineStore): | ||
| """ | ||
| An online store implementation that uses MySQL. | ||
| NOTE: The class *must* end with the `OnlineStore` suffix. | ||
| """ | ||
|
|
||
| _conn: Optional[Connection] = None | ||
|
|
||
| def _get_conn(self, config: RepoConfig) -> Connection: | ||
|
|
||
| online_store_config = config.online_store | ||
| assert isinstance(online_store_config, MySQLOnlineStoreConfig) | ||
|
|
||
| if not self._conn: | ||
| self._conn = pymysql.connect( | ||
| host=online_store_config.host or "127.0.0.1", | ||
| user=online_store_config.user or "test", | ||
| password=online_store_config.password or "test", | ||
| database=online_store_config.database or "feast", | ||
| port=online_store_config.port or 3306, | ||
| autocommit=True, | ||
| ) | ||
| return self._conn | ||
|
|
||
| def online_write_batch( | ||
| self, | ||
| config: RepoConfig, | ||
| table: FeatureView, | ||
| data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], | ||
| progress: Optional[Callable[[int], Any]], | ||
| ) -> None: | ||
|
|
||
| conn = self._get_conn(config) | ||
| cur = conn.cursor() | ||
|
|
||
| project = config.project | ||
|
|
||
| for entity_key, values, timestamp, created_ts in data: | ||
| entity_key_bin = serialize_entity_key(entity_key).hex() | ||
| timestamp = _to_naive_utc(timestamp) | ||
| if created_ts is not None: | ||
| created_ts = _to_naive_utc(created_ts) | ||
|
|
||
| for feature_name, val in values.items(): | ||
| self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val) | ||
| conn.commit() | ||
| if progress: | ||
| progress(1) | ||
|
|
||
| @staticmethod | ||
| def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val) -> None: | ||
| cur.execute( | ||
| f""" | ||
| INSERT INTO {_table_id(project, table)} | ||
| (entity_key, feature_name, value, event_ts, created_ts) | ||
| values (%s, %s, %s, %s, %s) | ||
| ON DUPLICATE KEY UPDATE | ||
| value = %s, | ||
| event_ts = %s, | ||
| created_ts = %s; | ||
| """, | ||
| ( | ||
| # Insert | ||
| entity_key_bin, | ||
| feature_name, | ||
| val.SerializeToString(), | ||
| timestamp, | ||
| created_ts, | ||
| # Update on duplicate key | ||
| val.SerializeToString(), | ||
| timestamp, | ||
| created_ts | ||
| ), | ||
| ) | ||
|
|
||
| def online_read( | ||
| self, | ||
| config: RepoConfig, | ||
| table: FeatureView, | ||
| entity_keys: List[EntityKeyProto], | ||
| requested_features: Optional[List[str]] = None, | ||
| ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: | ||
| conn = self._get_conn(config) | ||
| cur = conn.cursor() | ||
|
|
||
| result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = [] | ||
|
|
||
| project = config.project | ||
| for entity_key in entity_keys: | ||
| entity_key_bin = serialize_entity_key(entity_key).hex() | ||
|
|
||
| cur.execute( | ||
| f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s", | ||
| (entity_key_bin,), | ||
| ) | ||
|
|
||
| res = {} | ||
| res_ts: Optional[datetime] = None | ||
| records = cur.fetchall() | ||
| if records: | ||
| for feature_name, val_bin, ts in records: | ||
| val = ValueProto() | ||
| val.ParseFromString(val_bin) | ||
| res[feature_name] = val | ||
| res_ts = ts | ||
|
|
||
| if not res: | ||
| result.append((None, None)) | ||
| else: | ||
| result.append((res_ts, res)) | ||
| return result | ||
|
|
||
| def update( | ||
| self, | ||
| config: RepoConfig, | ||
| tables_to_delete: Sequence[FeatureView], | ||
| tables_to_keep: Sequence[FeatureView], | ||
| entities_to_delete: Sequence[Entity], | ||
| entities_to_keep: Sequence[Entity], | ||
| partial: bool, | ||
| ) -> None: | ||
| conn = self._get_conn(config) | ||
| cur = conn.cursor() | ||
| project = config.project | ||
|
|
||
| # We don't create any special state for the entities in this implementation. | ||
| for table in tables_to_keep: | ||
| cur.execute( | ||
| f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), | ||
| feature_name VARCHAR(256), | ||
| value BLOB, | ||
| event_ts timestamp NULL DEFAULT NULL, | ||
| created_ts timestamp NULL DEFAULT NULL, | ||
| PRIMARY KEY(entity_key, feature_name))""" | ||
| ) | ||
|
|
||
| cur.execute( | ||
| f"ALTER TABLE {_table_id(project, table)} ADD INDEX {_table_id(project, table)}_ek (entity_key);" | ||
| ) | ||
|
|
||
| for table in tables_to_delete: | ||
| cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};") | ||
| cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") | ||
|
|
||
| def teardown( | ||
| self, | ||
| config: RepoConfig, | ||
| tables: Sequence[FeatureView], | ||
| entities: Sequence[Entity], | ||
| ) -> None: | ||
| conn = self._get_conn(config) | ||
| cur = conn.cursor() | ||
| project = config.project | ||
|
|
||
| for table in tables: | ||
| cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};") | ||
| cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") | ||
|
|
||
|
|
||
| def _table_id(project: str, table: FeatureView) -> str: | ||
| return f"{project}_{table.name}" | ||
|
|
||
|
|
||
| def _to_naive_utc(ts: datetime) -> datetime: | ||
| if ts.tzinfo is None: | ||
| return ts | ||
| else: | ||
| return ts.astimezone(pytz.utc).replace(tzinfo=None) | ||
10 changes: 10 additions & 0 deletions
10
sdk/python/feast/infra/online_stores/contrib/mysql_repo_configuration.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| from tests.integration.feature_repos.integration_test_repo_config import ( | ||
| IntegrationTestRepoConfig, | ||
| ) | ||
| from tests.integration.feature_repos.universal.online_store.mysql import ( | ||
| MySQLOnlineStoreCreator, | ||
| ) | ||
|
|
||
| FULL_REPO_CONFIGS = [ | ||
| IntegrationTestRepoConfig(online_store_creator=MySQLOnlineStoreCreator), | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| from typing import Dict | ||
|
|
||
| from testcontainers.mysql import MySqlContainer | ||
|
|
||
| from tests.integration.feature_repos.universal.online_store_creator import ( | ||
| OnlineStoreCreator, | ||
| ) | ||
|
|
||
|
|
||
| class MySQLOnlineStoreCreator(OnlineStoreCreator): | ||
| def __init__(self, project_name: str, **kwargs): | ||
| super().__init__(project_name) | ||
| self.container = MySqlContainer('mysql:latest', platform='linux/amd64') \ | ||
| .with_exposed_ports(3306) \ | ||
| .with_env("MYSQL_USER", "root") \ | ||
| .with_env("MYSQL_PASSWORD", "test") \ | ||
| .with_env("MYSQL_DATABASE", "test") | ||
|
|
||
| def create_online_store(self) -> Dict[str, str]: | ||
| self.container.start() | ||
| exposed_port = self.container.get_exposed_port(3306) | ||
| return {"type": "mysql", "user": "root", "password": "test", "database": "test", "port": exposed_port} | ||
|
|
||
| def teardown(self): | ||
| self.container.stop() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,6 +113,8 @@ | |
|
|
||
| MYSQL_REQUIRED = [ | ||
| "mysqlclient", | ||
| "pymysql", | ||
| "types-PyMySQL" | ||
| ] | ||
|
|
||
| HBASE_REQUIRED = [ | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.