Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
parallelize the per table lookups
Signed-off-by: Rob Howley <howley.robert@gmail.com>
  • Loading branch information
robhowley committed Oct 12, 2024
commit 3a302840f81ac36098cb67fd00f8f8ede475c105
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -240,7 +240,7 @@ async def get_online_features_async(
native_entity_values=True,
)

for table, requested_features in grouped_refs:
async def query_table(table, requested_features):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add type hints?

# Get the correct set of entity values with the correct join keys.
table_entity_values, idxs = utils._get_unique_entities(
table,
Expand All @@ -258,6 +258,14 @@ async def get_online_features_async(
requested_features=requested_features,
)

return idxs, read_rows

all_responses = await asyncio.gather(*[
query_table(table, requested_features)
for table, requested_features in grouped_refs
])

for (idxs, read_rows), (table, requested_features) in zip(all_responses, grouped_refs):
feature_data = utils._convert_rows_to_protobuf(
requested_features, read_rows
)
Expand Down