|
| 1 | +# Copyright 2024 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +# This file is automatically generated by CrossSync. Do not edit manually. |
| 16 | + |
| 17 | +""" |
| 18 | +This module contains the client handler process for proxy_server.py. |
| 19 | +""" |
| 20 | +import os |
| 21 | +from google.cloud.environment_vars import BIGTABLE_EMULATOR |
| 22 | +from google.cloud.bigtable.data._cross_sync import CrossSync |
| 23 | +from client_handler_data_async import error_safe |
| 24 | + |
| 25 | + |
| 26 | +class TestProxyClientHandler: |
| 27 | + """ |
| 28 | + Implements the same methods as the grpc server, but handles the client |
| 29 | + library side of the request. |
| 30 | +
|
| 31 | + Requests received in TestProxyGrpcServer are converted to a dictionary, |
| 32 | + and supplied to the TestProxyClientHandler methods as kwargs. |
| 33 | + The client response is then returned back to the TestProxyGrpcServer |
| 34 | + """ |
| 35 | + |
| 36 | + def __init__( |
| 37 | + self, |
| 38 | + data_target=None, |
| 39 | + project_id=None, |
| 40 | + instance_id=None, |
| 41 | + app_profile_id=None, |
| 42 | + per_operation_timeout=None, |
| 43 | + **kwargs |
| 44 | + ): |
| 45 | + self.closed = False |
| 46 | + os.environ[BIGTABLE_EMULATOR] = data_target |
| 47 | + self.client = CrossSync._Sync_Impl.DataClient(project=project_id) |
| 48 | + self.instance_id = instance_id |
| 49 | + self.app_profile_id = app_profile_id |
| 50 | + self.per_operation_timeout = per_operation_timeout |
| 51 | + |
| 52 | + def close(self): |
| 53 | + self.closed = True |
| 54 | + |
| 55 | + @error_safe |
| 56 | + async def ReadRows(self, request, **kwargs): |
| 57 | + table_id = request.pop("table_name").split("/")[-1] |
| 58 | + app_profile_id = self.app_profile_id or request.get("app_profile_id", None) |
| 59 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 60 | + kwargs["operation_timeout"] = ( |
| 61 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 62 | + ) |
| 63 | + result_list = table.read_rows(request, **kwargs) |
| 64 | + serialized_response = [row._to_dict() for row in result_list] |
| 65 | + return serialized_response |
| 66 | + |
| 67 | + @error_safe |
| 68 | + async def ReadRow(self, row_key, **kwargs): |
| 69 | + table_id = kwargs.pop("table_name").split("/")[-1] |
| 70 | + app_profile_id = self.app_profile_id or kwargs.get("app_profile_id", None) |
| 71 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 72 | + kwargs["operation_timeout"] = ( |
| 73 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 74 | + ) |
| 75 | + result_row = table.read_row(row_key, **kwargs) |
| 76 | + if result_row: |
| 77 | + return result_row._to_dict() |
| 78 | + else: |
| 79 | + return "None" |
| 80 | + |
| 81 | + @error_safe |
| 82 | + async def MutateRow(self, request, **kwargs): |
| 83 | + from google.cloud.bigtable.data.mutations import Mutation |
| 84 | + |
| 85 | + table_id = request["table_name"].split("/")[-1] |
| 86 | + app_profile_id = self.app_profile_id or request.get("app_profile_id", None) |
| 87 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 88 | + kwargs["operation_timeout"] = ( |
| 89 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 90 | + ) |
| 91 | + row_key = request["row_key"] |
| 92 | + mutations = [Mutation._from_dict(d) for d in request["mutations"]] |
| 93 | + table.mutate_row(row_key, mutations, **kwargs) |
| 94 | + return "OK" |
| 95 | + |
| 96 | + @error_safe |
| 97 | + async def BulkMutateRows(self, request, **kwargs): |
| 98 | + from google.cloud.bigtable.data.mutations import RowMutationEntry |
| 99 | + |
| 100 | + table_id = request["table_name"].split("/")[-1] |
| 101 | + app_profile_id = self.app_profile_id or request.get("app_profile_id", None) |
| 102 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 103 | + kwargs["operation_timeout"] = ( |
| 104 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 105 | + ) |
| 106 | + entry_list = [ |
| 107 | + RowMutationEntry._from_dict(entry) for entry in request["entries"] |
| 108 | + ] |
| 109 | + table.bulk_mutate_rows(entry_list, **kwargs) |
| 110 | + return "OK" |
| 111 | + |
| 112 | + @error_safe |
| 113 | + async def CheckAndMutateRow(self, request, **kwargs): |
| 114 | + from google.cloud.bigtable.data.mutations import Mutation, SetCell |
| 115 | + |
| 116 | + table_id = request["table_name"].split("/")[-1] |
| 117 | + app_profile_id = self.app_profile_id or request.get("app_profile_id", None) |
| 118 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 119 | + kwargs["operation_timeout"] = ( |
| 120 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 121 | + ) |
| 122 | + row_key = request["row_key"] |
| 123 | + true_mutations = [] |
| 124 | + for mut_dict in request.get("true_mutations", []): |
| 125 | + try: |
| 126 | + true_mutations.append(Mutation._from_dict(mut_dict)) |
| 127 | + except ValueError: |
| 128 | + mutation = SetCell("", "", "", 0) |
| 129 | + true_mutations.append(mutation) |
| 130 | + false_mutations = [] |
| 131 | + for mut_dict in request.get("false_mutations", []): |
| 132 | + try: |
| 133 | + false_mutations.append(Mutation._from_dict(mut_dict)) |
| 134 | + except ValueError: |
| 135 | + false_mutations.append(SetCell("", "", "", 0)) |
| 136 | + predicate_filter = request.get("predicate_filter", None) |
| 137 | + result = table.check_and_mutate_row( |
| 138 | + row_key, |
| 139 | + predicate_filter, |
| 140 | + true_case_mutations=true_mutations, |
| 141 | + false_case_mutations=false_mutations, |
| 142 | + **kwargs |
| 143 | + ) |
| 144 | + return result |
| 145 | + |
| 146 | + @error_safe |
| 147 | + async def ReadModifyWriteRow(self, request, **kwargs): |
| 148 | + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule |
| 149 | + from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule |
| 150 | + |
| 151 | + table_id = request["table_name"].split("/")[-1] |
| 152 | + app_profile_id = self.app_profile_id or request.get("app_profile_id", None) |
| 153 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 154 | + kwargs["operation_timeout"] = ( |
| 155 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 156 | + ) |
| 157 | + row_key = request["row_key"] |
| 158 | + rules = [] |
| 159 | + for rule_dict in request.get("rules", []): |
| 160 | + qualifier = rule_dict["column_qualifier"] |
| 161 | + if "append_value" in rule_dict: |
| 162 | + new_rule = AppendValueRule( |
| 163 | + rule_dict["family_name"], qualifier, rule_dict["append_value"] |
| 164 | + ) |
| 165 | + else: |
| 166 | + new_rule = IncrementRule( |
| 167 | + rule_dict["family_name"], qualifier, rule_dict["increment_amount"] |
| 168 | + ) |
| 169 | + rules.append(new_rule) |
| 170 | + result = table.read_modify_write_row(row_key, rules, **kwargs) |
| 171 | + if result: |
| 172 | + return result._to_dict() |
| 173 | + else: |
| 174 | + return "None" |
| 175 | + |
| 176 | + @error_safe |
| 177 | + async def SampleRowKeys(self, request, **kwargs): |
| 178 | + table_id = request["table_name"].split("/")[-1] |
| 179 | + app_profile_id = self.app_profile_id or request.get("app_profile_id", None) |
| 180 | + table = self.client.get_table(self.instance_id, table_id, app_profile_id) |
| 181 | + kwargs["operation_timeout"] = ( |
| 182 | + kwargs.get("operation_timeout", self.per_operation_timeout) or 20 |
| 183 | + ) |
| 184 | + result = table.sample_row_keys(**kwargs) |
| 185 | + return result |
0 commit comments