forked from apache/ignite
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbinary.py
More file actions
209 lines (195 loc) · 7.28 KB
/
Copy pathbinary.py
File metadata and controls
209 lines (195 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from pyignite.constants import *
from pyignite.datatypes.binary import (
body_struct, enum_struct, schema_struct, binary_fields_struct,
)
from pyignite.datatypes import String, Int, Bool
from pyignite.queries import Query, Response
from pyignite.queries.op_codes import *
from pyignite.utils import int_overflow, entity_id
from .result import APIResult
def get_binary_type(
connection: 'Connection', binary_type: Union[str, int], query_id=None,
) -> APIResult:
"""
Gets the binary type information by type ID.
:param connection: connection to Ignite server,
:param binary_type: binary type name or ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object.
"""
query_struct = Query(
OP_GET_BINARY_TYPE,
[
('type_id', Int),
],
query_id=query_id,
)
_, send_buffer = query_struct.from_python({
'type_id': entity_id(binary_type),
})
connection.send(send_buffer)
response_head_struct = Response([
('type_exists', Bool),
])
response_head_type, recv_buffer = response_head_struct.parse(connection)
response_head = response_head_type.from_buffer_copy(recv_buffer)
response_parts = []
if response_head.type_exists:
resp_body_type, resp_body_buffer = body_struct.parse(connection)
response_parts.append(('body', resp_body_type))
resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)
recv_buffer += resp_body_buffer
if resp_body.is_enum:
resp_enum, resp_enum_buffer = enum_struct.parse(connection)
response_parts.append(('enums', resp_enum))
recv_buffer += resp_enum_buffer
resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)
response_parts.append(('schema', resp_schema_type))
recv_buffer += resp_schema_buffer
response_class = type(
'GetBinaryTypeResponse',
(response_head_type,),
{
'_pack_': 1,
'_fields_': response_parts,
}
)
response = response_class.from_buffer_copy(recv_buffer)
result = APIResult(response)
if result.status != 0:
return result
result.value = {
'type_exists': response.type_exists
}
if hasattr(response, 'body'):
result.value.update(body_struct.to_python(response.body))
if hasattr(response, 'enums'):
result.value['enums'] = enum_struct.to_python(response.enums)
if hasattr(response, 'schema'):
result.value['schema'] = {
x['schema_id']: [
z['schema_field_id'] for z in x['schema_fields']
]
for x in schema_struct.to_python(response.schema)
}
return result
def put_binary_type(
connection: 'Connection', type_name: str, affinity_key_field: str=None,
is_enum=False, schema: dict=None, query_id=None,
) -> APIResult:
"""
Registers binary type information in cluster.
:param connection: connection to Ignite server,
:param type_name: name of the data type being registered,
:param affinity_key_field: (optional) name of the affinity key field,
:param is_enum: (optional) register enum if True, binary object otherwise.
Defaults to False,
:param schema: (optional) when register enum, pass a dict of enumerated
parameter names as keys and an integers as values. When register binary
type, pass a dict of field names: field types. Binary type with no fields
is OK,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object.
"""
# prepare data
if schema is None:
schema = {}
type_id = entity_id(type_name)
data = {
'type_name': type_name,
'type_id': type_id,
'affinity_key_field': affinity_key_field,
'binary_fields': [],
'is_enum': is_enum,
'schema': [],
}
schema_id = None
if is_enum:
data['enums'] = []
for literal, ordinal in schema.items():
data['enums'].append({
'literal': literal,
'type_id': ordinal,
})
else:
# assemble schema and calculate schema ID in one go
schema_id = FNV1_OFFSET_BASIS if schema else 0
for field_name, data_type in schema.items():
# TODO: check for allowed data types
field_id = entity_id(field_name)
data['binary_fields'].append({
'field_name': field_name,
'type_id': int.from_bytes(
data_type.type_code,
byteorder=PROTOCOL_BYTE_ORDER
),
'field_id': field_id,
})
schema_id ^= (field_id & 0xff)
schema_id = int_overflow(schema_id * FNV1_PRIME)
schema_id ^= ((field_id >> 8) & 0xff)
schema_id = int_overflow(schema_id * FNV1_PRIME)
schema_id ^= ((field_id >> 16) & 0xff)
schema_id = int_overflow(schema_id * FNV1_PRIME)
schema_id ^= ((field_id >> 24) & 0xff)
schema_id = int_overflow(schema_id * FNV1_PRIME)
data['schema'].append({
'schema_id': schema_id,
'schema_fields': [
{'schema_field_id': entity_id(x)} for x in schema
],
})
# do query
if is_enum:
query_struct = Query(
OP_PUT_BINARY_TYPE,
[
('type_id', Int),
('type_name', String),
('affinity_key_field', String),
('binary_fields', binary_fields_struct),
('is_enum', Bool),
('enums', enum_struct),
('schema', schema_struct),
],
query_id=query_id,
)
else:
query_struct = Query(
OP_PUT_BINARY_TYPE,
[
('type_id', Int),
('type_name', String),
('affinity_key_field', String),
('binary_fields', binary_fields_struct),
('is_enum', Bool),
('schema', schema_struct),
],
query_id=query_id,
)
result = query_struct.perform(connection, query_params=data)
if result.status == 0:
result.value = {
'type_id': type_id,
'schema_id': schema_id,
}
return result