Skip to content
Prev Previous commit
Next Next commit
fix lint
Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com>
  • Loading branch information
hao-affirm committed Sep 8, 2022
commit 0527d5dc8a087b95e3bedb37f94472bbd95ec3b2
35 changes: 13 additions & 22 deletions sdk/python/feast/infra/online_stores/contrib/mysql.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
from __future__ import absolute_import

from datetime import datetime
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Sequence,
Tuple,
)
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import pymysql
import pytz
from feast import (
Entity,
FeatureView,
RepoConfig,
)
from pydantic import StrictStr
from pymysql.connections import Connection

from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from pydantic import StrictStr
from pymysql.connections import Connection


class MySQLOnlineStoreConfig(FeastConfigBaseModel):
Expand All @@ -39,6 +28,7 @@ class MySQLOnlineStoreConfig(FeastConfigBaseModel):
user: Optional[StrictStr] = None
password: Optional[StrictStr] = None
database: Optional[StrictStr] = None
port: Optional[int] = None


class MySQLOnlineStore(OnlineStore):
Expand All @@ -47,7 +37,7 @@ class MySQLOnlineStore(OnlineStore):
NOTE: The class *must* end with the `OnlineStore` suffix.
"""

_conn: Connection
_conn: Optional[Connection] = None

def _get_conn(self, config: RepoConfig) -> Connection:

Expand All @@ -57,9 +47,10 @@ def _get_conn(self, config: RepoConfig) -> Connection:
if not self._conn:
self._conn = pymysql.connect(
host=online_store_config.host or "127.0.0.1",
user=online_store_config.user or "root",
password=online_store_config.password,
user=online_store_config.user or "test",
password=online_store_config.password or "test",
database=online_store_config.database or "feast",
port=online_store_config.port or 3306,
autocommit=True,
)
return self._conn
Expand All @@ -85,7 +76,7 @@ def online_write_batch(

for feature_name, val in values.items():
self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val)
self._conn.commit()
conn.commit()
if progress:
progress(1)

Expand Down Expand Up @@ -149,7 +140,7 @@ def online_read(
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S')
res_ts = ts

if not res:
result.append((None, None))
Expand All @@ -176,7 +167,7 @@ def update(
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
feature_name VARCHAR(256),
value BLOB,
event_ts,
event_ts timestamp NULL DEFAULT NULL,
created_ts timestamp NULL DEFAULT NULL,
PRIMARY KEY(entity_key, feature_name))"""
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Dict

from testcontainers.mysql import MySqlContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.mysql import MySqlContainer

from tests.integration.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
Expand All @@ -11,17 +11,20 @@
class MySQLOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = MySqlContainer('mysql:latest', platform='linux/amd64').with_exposed_ports("3306") \
self.container = MySqlContainer('mysql:latest', platform='linux/amd64') \
.with_exposed_ports(3306) \
.with_env("MYSQL_USER", "root") \
.with_env("MYSQL_DATABASE", "feast")
.with_env("MYSQL_PASSWORD", "test") \
.with_env("MYSQL_DATABASE", "test")

def create_online_store(self) -> Dict[str, str]:
self.container.start()
log_string_to_wait_for = "Ready to accept connections"
wait_for_logs(
container=self.container, predicate=log_string_to_wait_for, timeout=10
)
return {"type": "mysql", "user": "root", "password": "test", "database": "feast"}
# wait_for_logs(
# container=self.container, predicate=log_string_to_wait_for, timeout=10
# )
exposed_port = self.container.get_exposed_port(3306)
return {"type": "mysql", "user": "root", "password": "test", "database": "test", "port": exposed_port}

def teardown(self):
self.container.stop()