Skip to content
Prev Previous commit
Next Next commit
move to func
Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com>
  • Loading branch information
hao-affirm committed Sep 10, 2022
commit 9f64788d334d86777f37efe5d7faa1a381a86e79
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MySQLOnlineStoreConfig(FeastConfigBaseModel):
port: Optional[int] = None



class MySQLOnlineStore(OnlineStore):
"""
An online store implementation that uses MySQL.
Expand Down Expand Up @@ -73,7 +74,7 @@ def online_write_batch(
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
entity_key_serialization_version=2,
).hex()
timestamp = _to_naive_utc(timestamp)
Comment thread
felixwang9817 marked this conversation as resolved.
if created_ts is not None:
Expand Down Expand Up @@ -138,7 +139,7 @@ def online_read(
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
Comment thread
felixwang9817 marked this conversation as resolved.
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
entity_key_serialization_version=2,
).hex()

cur.execute(
Expand Down Expand Up @@ -191,10 +192,7 @@ def update(
)

for table in tables_to_delete:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
_drop_table_and_index(cur, project, table)

def teardown(
self,
Expand All @@ -207,10 +205,15 @@ def teardown(
project = config.project

for table in tables:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
_drop_table_and_index(cur, project, table)


def _drop_table_and_index(cur, project: str, table: FeatureView) -> None:
table_name = _table_id(project, table)
cur.execute(
f"DROP INDEX {table_name}_ek ON {table_name};"
)
cur.execute(f"DROP TABLE IF EXISTS {table_name}")


def _table_id(project: str, table: FeatureView) -> str:
Expand Down