Skip to content

Commit 7fa9fe3

Browse files
authored
Fix materialize bug with RedisCluster (feast-dev#2311)
* Fix materialization bug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * oops removed something when i was copy pasting Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Test github workflow Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * transfer to unit_tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Brute force implementation Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Brute force implementation add gcc Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Brute force implementation add gcc Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Brute force implementation add gcc Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Brute force implementation add gcc Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Continue fixing... Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove gcc setup Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add integration test Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * fix error Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * fix error Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add setup Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * temp fix to get integration tests to work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * temp fix to get integration tests to work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * temp fix to get integration tests to work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * temp fix to get integration tests to work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix integration even more Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix integration even more Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * only run one test Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Do some more integration testing Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Do some more integration testing by adding bug to make sure no false positives Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Integration testing works Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean upu code Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add redis cluster script for starting a redis cluster Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Reset integration yml file Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix how to guide lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * add fixtures and remove excess code Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
1 parent f1a54a9 commit 7fa9fe3

File tree

6 files changed

+140
-10
lines changed

6 files changed

+140
-10
lines changed

.github/workflows/pr_integration_tests.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ jobs:
145145
run: pip install pip-tools
146146
- name: Install dependencies
147147
run: make install-python-ci-dependencies
148+
- name: Setup Redis Cluster
149+
run: |
150+
docker pull vishnunair/docker-redis-cluster:latest
151+
docker run -d -p 6001:6379 -p 6002:6380 -p 6003:6381 -p 6004:6382 -p 6005:6383 -p 6006:6384 --name redis-cluster vishnunair/docker-redis-cluster
148152
- name: Test python
149153
if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak
150154
env:

docs/how-to-guides/adding-or-reusing-tests.md

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
7979
datasets["global"],
8080
datasets["entity"],
8181
)
82-
8382
# ... more test code
8483

8584
customer_fv, driver_fv, driver_odfv, order_fv, global_fv = (
@@ -93,7 +92,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
9392
feature_service = FeatureService(
9493
"convrate_plus100",
9594
features=[
96-
feature_views["driver"][["conv_rate"]],
95+
feature_views["driver"][["conv_rate"]],
9796
feature_views["driver_odfv"]
9897
],
9998
)
@@ -112,7 +111,6 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
112111
]
113112
)
114113
store.apply(feast_objects)
115-
116114
# ... more test code
117115

118116
job_from_df = store.get_historical_features(
@@ -132,13 +130,11 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
132130
full_feature_names=full_feature_names,
133131
)
134132
actual_df_from_df_entities = job_from_df.to_df()
135-
136133
# ... more test code
137134

138135
assert_frame_equal(
139136
expected_df, actual_df_from_df_entities, check_dtype=False,
140137
)
141-
142138
# ... more test code
143139
```
144140
{% endtab %}
@@ -186,6 +182,24 @@ def your_test(environment: Environment):
186182
your_fv = driver_feature_view(data_source)
187183
entity = driver(value_type=ValueType.UNKNOWN)
188184
fs.apply([fv, entity])
189-
185+
190186
# ... run test
191187
```
188+
189+
### Running your own redis cluster for testing
190+
191+
* Install redis on your computer. If you are a mac user, you should be able to `brew install redis`.
192+
* Running `redis-server --help` and `redis-cli --help` should show corresponding help menus.
193+
* Run `cd scripts/create-cluster` and run `./create-cluster start` then `./create-cluster create` to start the server. You should see output that looks like this:
194+
~~~~
195+
Starting 6001
196+
Starting 6002
197+
Starting 6003
198+
Starting 6004
199+
Starting 6005
200+
Starting 6006
201+
~~~~
202+
* You should be able to run the integration tests and have the redis cluster tests pass.
203+
* If you would like to run your own redis cluster, you can run the above commands with your own specified ports and connect to the newly configured cluster.
204+
* To stop the cluster, run `./create-cluster stop` and then `./create-cluster clean`.
205+

scripts/create-cluster

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Settings
2+
# Make sure you run "brew install redis"
3+
4+
# BIN_PATH="/opt/homebrew/bin"
5+
REDIS_CLI=`which redis-cli`
6+
REDIS_SERVER=`which redis-server`
7+
CLUSTER_HOST=127.0.0.1
8+
# Creates a cluster at ports 6001-6006 with 3 masters 6001-6003 and 3 slaves 6004-6006
9+
PORT=${2:-6000}
10+
TIMEOUT=2000
11+
NODES=6
12+
REPLICAS=1
13+
PROTECTED_MODE=yes
14+
ADDITIONAL_OPTIONS=""
15+
16+
if [ -a config.sh ]
17+
then
18+
source "config.sh"
19+
fi
20+
21+
# Computed vars
22+
ENDPORT=$((PORT+NODES))
23+
24+
if [ "$1" == "start" ]
25+
then
26+
while [ $((PORT < ENDPORT)) != "0" ]; do
27+
PORT=$((PORT+1))
28+
echo "Starting $PORT"
29+
$REDIS_SERVER --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS}
30+
done
31+
exit 0
32+
fi
33+
34+
if [ "$1" == "create" ]
35+
then
36+
HOSTS=""
37+
while [ $((PORT < ENDPORT)) != "0" ]; do
38+
PORT=$((PORT+1))
39+
HOSTS="$HOSTS $CLUSTER_HOST:$PORT"
40+
done
41+
OPT_ARG=""
42+
if [ "$2" == "-f" ]; then
43+
OPT_ARG="--cluster-yes"
44+
fi
45+
$REDIS_CLI --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG
46+
exit 0
47+
fi
48+
49+
if [ "$1" == "stop" ]
50+
then
51+
while [ $((PORT < ENDPORT)) != "0" ]; do
52+
PORT=$((PORT+1))
53+
echo "Stopping $PORT"
54+
$REDIS_CLI -p $PORT shutdown nosave
55+
done
56+
exit 0
57+
fi
58+
59+
if [ "$1" == "watch" ]
60+
then
61+
PORT=$((PORT+1))
62+
while [ 1 ]; do
63+
clear
64+
date
65+
$REDIS_CLI -p $PORT cluster nodes | head -30
66+
sleep 1
67+
done
68+
exit 0
69+
fi
70+
71+
if [ "$1" == "clean" ]
72+
then
73+
echo "Cleaning *.log"
74+
rm -rf *.log
75+
echo "Cleaning appendonly-*"
76+
rm -rf appendonly-*
77+
echo "Cleaning dump-*.rdb"
78+
rm -rf dump-*.rdb
79+
echo "Cleaning nodes-*.conf"
80+
rm -rf nodes-*.conf
81+
exit 0
82+
fi
83+
84+
if [ "$1" == "clean-logs" ]
85+
then
86+
echo "Cleaning *.log"
87+
rm -rf *.log
88+
exit 0
89+
fi
90+
91+
echo "Usage: $0 [start|create|stop|watch|clean|clean-logs|call]"
92+
echo "start [PORT] -- Launch Redis Cluster instances."
93+
echo "create [PORT] [-f] -- Create a cluster using redis-cli --cluster create."
94+
echo "stop [PORT] -- Stop Redis Cluster instances."
95+
echo "watch [PORT] -- Show CLUSTER NODES output (first 30 lines) of first node."
96+
echo "clean -- Remove all instances data, logs, configs."
97+
echo "clean-logs -- Remove just instances logs."

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
try:
4343
from redis import Redis
44-
from redis.cluster import RedisCluster
44+
from redis.cluster import ClusterNode, RedisCluster
4545
except ImportError as e:
4646
from feast.errors import FeastExtrasDependencyImportError
4747

@@ -160,7 +160,9 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
160160
online_store_config.connection_string
161161
)
162162
if online_store_config.redis_type == RedisType.redis_cluster:
163-
kwargs["startup_nodes"] = startup_nodes
163+
kwargs["startup_nodes"] = [
164+
ClusterNode(**node) for node in startup_nodes
165+
]
164166
self._client = RedisCluster(**kwargs)
165167
else:
166168
kwargs["host"] = startup_nodes[0]["host"]

sdk/python/tests/conftest.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from tests.integration.feature_repos.repo_configuration import (
3232
FULL_REPO_CONFIGS,
33+
REDIS_CLUSTER_CONFIG,
3334
REDIS_CONFIG,
3435
Environment,
3536
construct_test_environment,
@@ -170,10 +171,14 @@ def cleanup():
170171
return e
171172

172173

173-
@pytest.fixture()
174+
@pytest.fixture(
175+
scope="session",
176+
params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG],
177+
ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]],
178+
)
174179
def local_redis_environment(request, worker_id):
175180
e = construct_test_environment(
176-
IntegrationTestRepoConfig(online_store=REDIS_CONFIG), worker_id=worker_id
181+
IntegrationTestRepoConfig(online_store=request.param), worker_id=worker_id
177182
)
178183

179184
def cleanup():

sdk/python/tests/integration/feature_repos/repo_configuration.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@
4646

4747
DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"}
4848
REDIS_CONFIG = {"type": "redis", "connection_string": "localhost:6379,db=0"}
49+
REDIS_CLUSTER_CONFIG = {
50+
"type": "redis",
51+
"redis_type": "redis_cluster",
52+
# Redis Cluster Port Forwarding is setup in "pr_integration_tests.yaml" under "Setup Redis Cluster".
53+
"connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003",
54+
}
4955

5056
# FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store,
5157
# online store, test data, and more parameters) that most integration tests will test
@@ -62,7 +68,9 @@
6268
if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True":
6369
DEFAULT_FULL_REPO_CONFIGS.extend(
6470
[
71+
# Redis configurations
6572
IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
73+
IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG),
6674
# GCP configurations
6775
IntegrationTestRepoConfig(
6876
provider="gcp",

0 commit comments

Comments
 (0)