Skip to content
Prev Previous commit
Next Next commit
format
Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com>
  • Loading branch information
hao-affirm committed Sep 9, 2022
commit 5d84e0676fb0385eb14d6c37a0d0722daf5addba
29 changes: 23 additions & 6 deletions sdk/python/feast/infra/online_stores/contrib/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:

Expand All @@ -75,13 +77,24 @@ def online_write_batch(
created_ts = _to_naive_utc(created_ts)

for feature_name, val in values.items():
self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val)
self.write_to_table(
created_ts,
cur,
entity_key_bin,
feature_name,
project,
table,
timestamp,
val,
)
conn.commit()
if progress:
progress(1)

@staticmethod
def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val) -> None:
def write_to_table(
created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val
) -> None:
cur.execute(
f"""
INSERT INTO {_table_id(project, table)}
Expand All @@ -102,7 +115,7 @@ def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table
# Update on duplicate key
val.SerializeToString(),
timestamp,
created_ts
created_ts,
),
)

Expand Down Expand Up @@ -172,7 +185,9 @@ def update(
)

for table in tables_to_delete:
cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};")
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

def teardown(
Expand All @@ -186,7 +201,9 @@ def teardown(
project = config.project

for table in tables:
cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};")
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@
class MySQLOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = MySqlContainer('mysql:latest', platform='linux/amd64') \
.with_exposed_ports(3306) \
.with_env("MYSQL_USER", "root") \
.with_env("MYSQL_PASSWORD", "test") \
self.container = (
MySqlContainer("mysql:latest", platform="linux/amd64")
.with_exposed_ports(3306)
.with_env("MYSQL_USER", "root")
.with_env("MYSQL_PASSWORD", "test")
.with_env("MYSQL_DATABASE", "test")
)

def create_online_store(self) -> Dict[str, str]:
self.container.start()
exposed_port = self.container.get_exposed_port(3306)
return {"type": "mysql", "user": "root", "password": "test", "database": "test", "port": exposed_port}
return {
"type": "mysql",
"user": "root",
"password": "test",
"database": "test",
"port": exposed_port,
}

def teardown(self):
self.container.stop()