From 07a94f1e6d3917817b602bc6bab297beb7bf0404 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 28 Jul 2023 16:01:03 +0300 Subject: [PATCH 1/6] more fixes --- cloudquery/sdk/internal/servers/plugin_v3/plugin.py | 13 ++++--------- cloudquery/sdk/plugin/plugin.py | 5 +++-- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/cloudquery/sdk/internal/servers/plugin_v3/plugin.py b/cloudquery/sdk/internal/servers/plugin_v3/plugin.py index 62ee483..3399da8 100644 --- a/cloudquery/sdk/internal/servers/plugin_v3/plugin.py +++ b/cloudquery/sdk/internal/servers/plugin_v3/plugin.py @@ -1,7 +1,7 @@ import pyarrow as pa import structlog -from cloudquery.plugin_v3 import plugin_pb2, plugin_pb2_grpc +from cloudquery.plugin_v3 import plugin_pb2, plugin_pb2_grpc, arrow from cloudquery.sdk.message import SyncInsertMessage, SyncMigrateTableMessage from cloudquery.sdk.plugin.plugin import Plugin, SyncOptions, TableOptions from cloudquery.sdk.schema import tables_to_arrow_schemas @@ -48,19 +48,14 @@ def Sync(self, request, context): for msg in self._plugin.sync(options): if isinstance(msg, SyncInsertMessage): - sink = pa.BufferOutputStream() - writer = pa.ipc.new_stream(sink, msg.record.schema) - writer.write_batch(msg.record) - writer.close() - buf = sink.getvalue().to_pybytes() + buf = arrow.record_to_bytes(msg.record) yield plugin_pb2.Sync.Response( insert=plugin_pb2.Sync.MessageInsert(record=buf) ) elif isinstance(msg, SyncMigrateTableMessage): + buf = arrow.schema_to_bytes(msg.schema) yield plugin_pb2.Sync.Response( - migrate_table=plugin_pb2.Sync.MessageMigrateTable( - table=msg.table.to_arrow_schema().serialize().to_pybytes() - ) + migrate_table=plugin_pb2.Sync.MessageMigrateTable(table=buf) ) else: # unknown sync message type diff --git a/cloudquery/sdk/plugin/plugin.py b/cloudquery/sdk/plugin/plugin.py index 2473c77..dad741b 100644 --- a/cloudquery/sdk/plugin/plugin.py +++ b/cloudquery/sdk/plugin/plugin.py @@ -1,8 +1,9 @@ import queue from dataclasses import dataclass -from typing import List +from typing import List, Generator from cloudquery.sdk.schema import Table +from cloudquery.sdk import message MIGRATE_MODE_STRINGS = ["safe", "force"] @@ -46,7 +47,7 @@ def version(self) -> str: def get_tables(self, options: TableOptions) -> List[Table]: raise NotImplementedError() - def sync(self, options: SyncOptions, results: queue.Queue) -> None: + def sync(self, options: SyncOptions) -> Generator[message.SyncMessage, None, None]: raise NotImplementedError() def close(self) -> None: From 3148c6ce70c19cda24e357e2b274c0e4130b05eb Mon Sep 17 00:00:00 2001 From: Kemal <223029+disq@users.noreply.github.com> Date: Sat, 29 Jul 2023 10:24:30 +0100 Subject: [PATCH 2/6] feat: Scalar things (#5) --- cloudquery/sdk/scalar/__init__.py | 11 +++- cloudquery/sdk/scalar/binary.py | 28 ++++----- cloudquery/sdk/scalar/bool.py | 17 ++--- cloudquery/sdk/scalar/date32.py | 17 ++--- cloudquery/sdk/scalar/date64.py | 39 ++++++++++++ cloudquery/sdk/scalar/float.py | 49 +++++++++++++++ cloudquery/sdk/scalar/float64.py | 35 ----------- cloudquery/sdk/scalar/int.py | 56 +++++++++++++++++ cloudquery/sdk/scalar/int64.py | 37 ----------- cloudquery/sdk/scalar/json.py | 27 ++++++++ cloudquery/sdk/scalar/list.py | 60 ++++++++++++++++++ cloudquery/sdk/scalar/scalar.py | 17 ++++- cloudquery/sdk/scalar/scalar_factory.py | 84 ++++++++++++++++++++++++- cloudquery/sdk/scalar/string.py | 32 ++++++++++ cloudquery/sdk/scalar/timestamp.py | 41 ++++++++++++ cloudquery/sdk/scalar/uint.py | 60 ++++++++++++++++++ cloudquery/sdk/scalar/uuid.py | 11 +++- cloudquery/sdk/scalar/vector.py | 43 +++++++++++++ cloudquery/sdk/types/__init__.py | 2 + cloudquery/sdk/types/json.py | 20 ++++++ cloudquery/sdk/types/uuid.py | 8 +-- requirements.txt | 18 +++++- tests/scalar/bool.py | 2 +- tests/scalar/float32.py | 15 +++++ tests/scalar/float64.py | 10 +-- tests/scalar/int64.py | 10 +-- tests/scalar/json.py | 0 tests/scalar/list.py | 43 +++++++++++++ tests/scalar/string.py | 18 ++++++ tests/scalar/timestamp.py | 58 +++++++++++++++++ tests/scalar/uint32.py | 15 +++++ tests/scalar/uuid.py | 2 +- tests/scalar/vector.py | 58 +++++++++++++++++ 33 files changed, 812 insertions(+), 131 deletions(-) create mode 100644 cloudquery/sdk/scalar/date64.py create mode 100644 cloudquery/sdk/scalar/float.py delete mode 100644 cloudquery/sdk/scalar/float64.py create mode 100644 cloudquery/sdk/scalar/int.py delete mode 100644 cloudquery/sdk/scalar/int64.py create mode 100644 cloudquery/sdk/scalar/json.py create mode 100644 cloudquery/sdk/scalar/list.py create mode 100644 cloudquery/sdk/scalar/string.py create mode 100644 cloudquery/sdk/scalar/timestamp.py create mode 100644 cloudquery/sdk/scalar/uint.py create mode 100644 cloudquery/sdk/scalar/vector.py create mode 100644 cloudquery/sdk/types/__init__.py create mode 100644 cloudquery/sdk/types/json.py create mode 100644 tests/scalar/float32.py create mode 100644 tests/scalar/json.py create mode 100644 tests/scalar/list.py create mode 100644 tests/scalar/string.py create mode 100644 tests/scalar/timestamp.py create mode 100644 tests/scalar/uint32.py create mode 100644 tests/scalar/vector.py diff --git a/cloudquery/sdk/scalar/__init__.py b/cloudquery/sdk/scalar/__init__.py index a89de78..475cee5 100644 --- a/cloudquery/sdk/scalar/__init__.py +++ b/cloudquery/sdk/scalar/__init__.py @@ -1,8 +1,15 @@ from .scalar import Scalar, ScalarInvalidTypeError, NULL_VALUE from .scalar_factory import ScalarFactory +from .vector import Vector + from .binary import Binary from .bool import Bool from .date32 import Date32 -from .float64 import Float64 -from .int64 import Int64 +from .date64 import Date64 +from .float import Float +from .int import Int +from .list import List +from .string import String +from .timestamp import Timestamp +from .uint import Uint from .uuid import UUID diff --git a/cloudquery/sdk/scalar/binary.py b/cloudquery/sdk/scalar/binary.py index 0311a32..0617c5e 100644 --- a/cloudquery/sdk/scalar/binary.py +++ b/cloudquery/sdk/scalar/binary.py @@ -3,10 +3,6 @@ class Binary(Scalar): - def __init__(self, valid: bool = False, value: bytes = None): - self._valid = valid - self._value = value - def __eq__(self, scalar: Scalar) -> bool: if scalar is None: return False @@ -14,22 +10,26 @@ def __eq__(self, scalar: Scalar) -> bool: return self._value == scalar._value and self._valid == scalar._valid return False - def __str__(self) -> str: - return str(self._value) if self._valid else NULL_VALUE - @property def value(self): return self._value - def set(self, scalar): - if scalar is None: + def set(self, value: any): + if value is None: + return + + if isinstance(value, Binary): + self._valid = value.is_valid + self._value = value.value return - if type(scalar) == bytes: + if type(value) == bytes: self._valid = True - self._value = scalar - elif type(scalar) == str: + self._value = value + elif type(value) == str: self._valid = True - self._value = scalar.encode() + self._value = value.encode() else: - raise ScalarInvalidTypeError("Invalid type for Binary scalar") + raise ScalarInvalidTypeError( + "Invalid type {} for Binary scalar".format(type(value)) + ) diff --git a/cloudquery/sdk/scalar/bool.py b/cloudquery/sdk/scalar/bool.py index 42c7b21..6867998 100644 --- a/cloudquery/sdk/scalar/bool.py +++ b/cloudquery/sdk/scalar/bool.py @@ -17,10 +17,6 @@ def parse_string_to_bool(input_string): class Bool(Scalar): - def __init__(self, valid: bool = False, value: bool = False) -> None: - self._valid = valid - self._value = value - def __eq__(self, scalar: Scalar) -> bool: if scalar is None: return False @@ -28,15 +24,18 @@ def __eq__(self, scalar: Scalar) -> bool: return self._value == scalar._value and self._valid == scalar._valid return False - def __str__(self) -> str: - return str(self._value) if self._valid else NULL_VALUE - @property def value(self): return self._value def set(self, value: Any): if value is None: + self._valid = False + return + + if isinstance(value, Bool): + self._valid = value.is_valid + self._value = value.value return if type(value) == bool: @@ -44,6 +43,8 @@ def set(self, value: Any): elif type(value) == str: self._value = parse_string_to_bool(value) else: - raise ScalarInvalidTypeError("Invalid type for Bool scalar") + raise ScalarInvalidTypeError( + "Invalid type {} for Bool scalar".format(type(value)) + ) self._valid = True diff --git a/cloudquery/sdk/scalar/date32.py b/cloudquery/sdk/scalar/date32.py index cbc8bca..24597f0 100644 --- a/cloudquery/sdk/scalar/date32.py +++ b/cloudquery/sdk/scalar/date32.py @@ -4,10 +4,6 @@ class Date32(Scalar): - def __init__(self, valid: bool = False, value: bool = False) -> None: - self._valid = valid - self._value = value - def __eq__(self, scalar: Scalar) -> bool: if scalar is None: return False @@ -15,15 +11,18 @@ def __eq__(self, scalar: Scalar) -> bool: return self._value == scalar._value and self._valid == scalar._valid return False - def __str__(self) -> str: - return str(self._value) if self._valid else NULL_VALUE - @property def value(self): return self._value def set(self, value: Any): if value is None: + self._valid = False + return + + if isinstance(value, Date32): + self._valid = value.is_valid + self._value = value.value return if type(value) == datetime: @@ -33,6 +32,8 @@ def set(self, value: Any): elif type(value) == time: self._value = datetime.combine(datetime.today(), value) else: - raise ScalarInvalidTypeError("Invalid type for Bool scalar") + raise ScalarInvalidTypeError( + "Invalid type {} for Date32 scalar".format(type(value)) + ) self._valid = True diff --git a/cloudquery/sdk/scalar/date64.py b/cloudquery/sdk/scalar/date64.py new file mode 100644 index 0000000..ba88953 --- /dev/null +++ b/cloudquery/sdk/scalar/date64.py @@ -0,0 +1,39 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError, NULL_VALUE +from datetime import datetime, time +from typing import Any + + +class Date64(Scalar): + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == Date64: + return self._value == scalar._value and self._valid == scalar._valid + return False + + @property + def value(self): + return self._value + + def set(self, value: Any): + if value is None: + self._valid = False + return + + if isinstance(value, Date64): + self._valid = value.is_valid + self._value = value.value + return + + if type(value) == datetime: + self._value = value + elif type(value) == str: + self._value = datetime.strptime(value, "%Y-%m-%d") + elif type(value) == time: + self._value = datetime.combine(datetime.today(), value) + else: + raise ScalarInvalidTypeError( + "Invalid type {} for Date64 scalar".format(type(value)) + ) + + self._valid = True diff --git a/cloudquery/sdk/scalar/float.py b/cloudquery/sdk/scalar/float.py new file mode 100644 index 0000000..49ee15d --- /dev/null +++ b/cloudquery/sdk/scalar/float.py @@ -0,0 +1,49 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError + + +class Float(Scalar): + def __init__(self, valid: bool = False, value: any = None, bitwidth: int = 64): + super().__init__(valid, value) + self._bitwidth = bitwidth + + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == Float and self._bitwidth == scalar.bitwidth: + return self._value == scalar._value and self._valid == scalar._valid + return False + + @property + def bitwidth(self): + return self._bitwidth + + @property + def value(self): + return self._value + + def set(self, value: any): + if value is None: + self._valid = False + return + + if isinstance(value, Float) and value.bitwidth == self._bitwidth: + self._valid = value.is_valid + self._value = value.value + return + + if type(value) == int: + self._value = float(value) + elif type(value) == float: + self._value = value + elif type(value) == str: + try: + self._value = float(value) + except ValueError: + raise ScalarInvalidTypeError( + "Invalid value for Float{} scalar".format(self._bitwidth) + ) + else: + raise ScalarInvalidTypeError( + "Invalid type {} for Float{} scalar".format(type(value), self._bitwidth) + ) + self._valid = True diff --git a/cloudquery/sdk/scalar/float64.py b/cloudquery/sdk/scalar/float64.py deleted file mode 100644 index 2b9b86e..0000000 --- a/cloudquery/sdk/scalar/float64.py +++ /dev/null @@ -1,35 +0,0 @@ -from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError - - -class Float64(Scalar): - def __init__(self, valid: bool = False, value: float = None): - self._valid = valid - self._value = value - - def __eq__(self, scalar: Scalar) -> bool: - if scalar is None: - return False - if type(scalar) == Float64: - return self._value == scalar._value and self._valid == scalar._valid - return False - - @property - def value(self): - return self._value - - def set(self, value): - if value is None: - return - - if type(value) == int: - self._value = float(value) - elif type(value) == float: - self._value = value - elif type(value) == str: - try: - self._value = float(value) - except ValueError: - raise ScalarInvalidTypeError("Invalid type for Float64 scalar") - else: - raise ScalarInvalidTypeError("Invalid type for Binary scalar") - self._valid = True diff --git a/cloudquery/sdk/scalar/int.py b/cloudquery/sdk/scalar/int.py new file mode 100644 index 0000000..8133429 --- /dev/null +++ b/cloudquery/sdk/scalar/int.py @@ -0,0 +1,56 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError + + +class Int(Scalar): + def __init__(self, valid: bool = False, value: any = None, bitwidth: int = 64): + self._bitwidth = bitwidth + self._min = -(2 ** (bitwidth - 1)) + self._max = 2 ** (bitwidth - 1) + super().__init__(valid, value) + + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == Int and self._bitwidth == scalar.bitwidth: + return self._value == scalar._value and self._valid == scalar._valid + return False + + @property + def bitwidth(self): + return self._bitwidth + + @property + def value(self): + return self._value + + def set(self, value: any): + if value is None: + self._valid = False + return + + if isinstance(value, Int) and value.bitwidth == self._bitwidth: + self._valid = value.is_valid + self._value = value.value + return + + if type(value) == int: + val = value + elif type(value) == float: + val = int(value) + elif type(value) == str: + try: + val = int(value) + except ValueError as e: + raise ScalarInvalidTypeError( + "Invalid type for Int{} scalar".format(self._bitwidth) + ) from e + else: + raise ScalarInvalidTypeError( + "Invalid type {} for Int{} scalar".format(type(value), self._bitwidth) + ) + if val < self._min or val >= self._max: + raise ScalarInvalidTypeError( + "Invalid Int{} scalar with value {}".format(self._bitwidth, val) + ) + self._value = val + self._valid = True diff --git a/cloudquery/sdk/scalar/int64.py b/cloudquery/sdk/scalar/int64.py deleted file mode 100644 index 9a7d77c..0000000 --- a/cloudquery/sdk/scalar/int64.py +++ /dev/null @@ -1,37 +0,0 @@ -from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError - - -class Int64(Scalar): - def __init__(self, valid: bool = False, value: float = None): - self._valid = valid - self._value = value - - def __eq__(self, scalar: Scalar) -> bool: - if scalar is None: - return False - if type(scalar) == Int64: - return self._value == scalar._value and self._valid == scalar._valid - return False - - @property - def value(self): - return self._value - - def set(self, value): - if value is None: - return - - if type(value) == int: - self._value = value - elif type(value) == float: - self._value = int(value) - elif type(value) == str: - try: - self._value = int(value) - except ValueError as e: - raise ScalarInvalidTypeError("Invalid type for Int64 scalar") from e - else: - raise ScalarInvalidTypeError( - "Invalid type {} for Int64 scalar".format(type(value)) - ) - self._valid = True diff --git a/cloudquery/sdk/scalar/json.py b/cloudquery/sdk/scalar/json.py new file mode 100644 index 0000000..b06bb79 --- /dev/null +++ b/cloudquery/sdk/scalar/json.py @@ -0,0 +1,27 @@ +import json +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError + + +class JSON(Scalar): + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == JSON: + return self._value == scalar._value and self._valid == scalar._valid + return False + + @property + def value(self): + return self._value + + def set(self, value: any): + if value is None: + return + + if type(value) == str or type(value) == bytes: + # test if it is a valid json + json.loads(value) + self._value = value + else: + self._value = json.dumps(value) + self._valid = True diff --git a/cloudquery/sdk/scalar/list.py b/cloudquery/sdk/scalar/list.py new file mode 100644 index 0000000..66560d3 --- /dev/null +++ b/cloudquery/sdk/scalar/list.py @@ -0,0 +1,60 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError +from .scalar import NULL_VALUE +from .vector import Vector +from typing import Any, Type, Union + + +class List(Scalar): + def __init__(self, scalar_type: Type[Scalar]): + super().__init__(False, None) + self._value = Vector(scalar_type) + self._type = scalar_type + + def __eq__(self, other: Union[None, "List"]) -> bool: + if other is None: + return False + if type(self) != type(other): + return False + if self._valid != other._valid: + return False + return self._value == other._value + + @property + def type(self): + return self._type + + @property + def value(self): + return self._value + + def set(self, val: Any): + if val is None: + self._valid = False + self._value = Vector() + return + + if isinstance(val, Scalar) and type(val) == self._type: + if not val.is_valid: + self._valid = False + self._value = Vector() + return + return self.set([val.value]) + + if isinstance(val, (list, tuple)): + self._value = Vector() + for item in val: + scalar = self._type() + scalar.set(item) + self._value.append(scalar) + self._valid = True + return + + raise ScalarInvalidTypeError("Invalid type {} for List".format(type(val))) + + def __str__(self) -> str: + if not self._valid: + return NULL_VALUE + return f"[{', '.join(str(v) for v in self._value)}]" + + def __len__(self): + return len(self.value.data) diff --git a/cloudquery/sdk/scalar/scalar.py b/cloudquery/sdk/scalar/scalar.py index 379a912..f08bd04 100644 --- a/cloudquery/sdk/scalar/scalar.py +++ b/cloudquery/sdk/scalar/scalar.py @@ -1,3 +1,5 @@ +from abc import abstractmethod + NULL_VALUE = "null" @@ -6,10 +8,23 @@ class ScalarInvalidTypeError(Exception): class Scalar: + def __init__(self, valid: bool = False, value: any = None): + self._valid = valid + self._value = None + self.set(value) + @property def is_valid(self) -> bool: return self._valid + def __str__(self) -> str: + return str(self._value) if self._valid else NULL_VALUE + @property + @abstractmethod def value(self): - raise NotImplementedError("Scalar value not implemented") + pass + + @abstractmethod + def set(self, value: any): + pass diff --git a/cloudquery/sdk/scalar/scalar_factory.py b/cloudquery/sdk/scalar/scalar_factory.py index 27f7f6a..4b390c1 100644 --- a/cloudquery/sdk/scalar/scalar_factory.py +++ b/cloudquery/sdk/scalar/scalar_factory.py @@ -1,15 +1,93 @@ import pyarrow as pa from .scalar import ScalarInvalidTypeError -from .int64 import Int64 +from .binary import Binary +from .bool import Bool +from .date32 import Date32 +from .date64 import Date64 +from .float import Float +from .int import Int +from .list import List +from .string import String +from .timestamp import Timestamp +from .uint import Uint +from .uuid import UUID +from cloudquery.sdk.types import UUIDType, JSONType class ScalarFactory: def __init__(self): pass - def new_scalar(self, dt): + def new_scalar(self, dt: pa.DataType): dt_id = dt.id if dt_id == pa.types.lib.Type_INT64: - return Int64() + return Int(bitwidth=64) + elif dt_id == pa.types.lib.Type_INT32: + return Int(bitwidth=32) + elif dt_id == pa.types.lib.Type_INT16: + return Int(bitwidth=16) + elif dt_id == pa.types.lib.Type_INT8: + return Int(bitwidth=8) + elif dt_id == pa.types.lib.Type_UINT64: + return Uint(bitwidth=64) + elif dt_id == pa.types.lib.Type_UINT32: + return Uint(bitwidth=32) + elif dt_id == pa.types.lib.Type_UINT16: + return Uint(bitwidth=16) + elif dt_id == pa.types.lib.Type_UINT8: + return Uint(bitwidth=8) + elif ( + dt_id == pa.types.lib.Type_BINARY + or dt_id == pa.types.lib.Type_LARGE_BINARY + or dt_id == pa.types.lib.Type_FIXED_SIZE_BINARY + ): + return Binary() + elif dt_id == pa.types.lib.Type_BOOL: + return Bool() + elif dt_id == pa.types.lib.Type_DATE64: + return Date64() + elif dt_id == pa.types.lib.Type_DATE32: + return Date32() + # elif dt_id == pa.types.lib.Type_DECIMAL256: + # return () + # elif dt_id == pa.types.lib.Type_DECIMAL128: + # return () + # elif dt_id == pa.types.lib.Type_DICTIONARY: + # return () + # elif dt_id == pa.types.lib.Type_DURATION: + # return () + elif dt_id == pa.types.lib.Type_DOUBLE: + return Float(bitwidth=64) + elif dt_id == pa.types.lib.Type_FLOAT: + return Float(bitwidth=32) + elif dt_id == pa.types.lib.Type_HALF_FLOAT: + return Float(bitwidth=16) + # elif dt_id == pa.types.lib.Type_INTERVAL_MONTH_DAY_NANO: + # return () + elif ( + dt_id == pa.types.lib.Type_LIST + or dt_id == pa.types.lib.Type_LARGE_LIST + or dt_id == pa.types.lib.Type_FIXED_SIZE_LIST + ): + item = ScalarFactory.new_scalar(dt.field(0).type) + return List(type(item)) + # elif dt_id == pa.types.lib.Type_MAP: + # return () + elif ( + dt_id == pa.types.lib.Type_STRING or dt_id == pa.types.lib.Type_LARGE_STRING + ): + return String() + # elif dt_id == pa.types.lib.Type_STRUCT: + # return () + # elif dt_id == pa.types.lib.Type_TIME32: + # return () + # elif dt_id == pa.types.lib.Type_TIME64: + # return () + elif dt_id == pa.types.lib.Type_TIMESTAMP: + return Timestamp() + elif dt == UUIDType: + return UUID() + elif dt == JSONType: + return String() else: raise ScalarInvalidTypeError("Invalid type {} for scalar".format(dt)) diff --git a/cloudquery/sdk/scalar/string.py b/cloudquery/sdk/scalar/string.py new file mode 100644 index 0000000..f9bbc26 --- /dev/null +++ b/cloudquery/sdk/scalar/string.py @@ -0,0 +1,32 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError +from .scalar import NULL_VALUE + + +class String(Scalar): + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == String: + return self._value == scalar._value and self._valid == scalar._valid + return False + + @property + def value(self): + return self._value + + def set(self, value: any): + if value is None: + return + + if isinstance(value, String): + self._valid = value._valid + self._value = value.value + return + + if type(value) == str: + self._valid = True + self._value = value + else: + raise ScalarInvalidTypeError( + "Invalid type {} for String scalar".format(type(value)) + ) diff --git a/cloudquery/sdk/scalar/timestamp.py b/cloudquery/sdk/scalar/timestamp.py new file mode 100644 index 0000000..8009f0f --- /dev/null +++ b/cloudquery/sdk/scalar/timestamp.py @@ -0,0 +1,41 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError, NULL_VALUE +from datetime import datetime, time +from typing import Any +import pandas as pd + + +class Timestamp(Scalar): + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == Timestamp: + return self._value == scalar._value and self._valid == scalar._valid + return False + + @property + def value(self): + return self._value + + def set(self, value: Any): + if value is None: + return + + if isinstance(value, Timestamp): + self._valid = value.is_valid + self._value = value.value + return + + if isinstance(value, pd.Timestamp): + self._value = value + elif type(value) == datetime: + self._value = pd.to_datetime(value) + elif type(value) == str: + self._value = pd.to_datetime(value) + elif type(value) == time: + self._value = pd.to_datetime(datetime.combine(datetime.today(), value)) + else: + raise ScalarInvalidTypeError( + "Invalid type {} for Timestamp scalar".format(type(value)) + ) + + self._valid = True diff --git a/cloudquery/sdk/scalar/uint.py b/cloudquery/sdk/scalar/uint.py new file mode 100644 index 0000000..51aeab9 --- /dev/null +++ b/cloudquery/sdk/scalar/uint.py @@ -0,0 +1,60 @@ +from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError + + +class Uint(Scalar): + def __init__(self, valid: bool = False, value: any = None, bitwidth: int = 64): + self._bitwidth = bitwidth + self._max = 2**bitwidth + super().__init__(valid, value) + + def __eq__(self, scalar: Scalar) -> bool: + if scalar is None: + return False + if type(scalar) == Uint: + return ( + self._bitwidth == scalar.bitwidth + and self._value == scalar._value + and self._valid == scalar._valid + ) + return False + + @property + def bitwidth(self): + return self._bitwidth + + @property + def value(self): + return self._value + + def set(self, value: any): + if value is None: + self._valid = False + return + + if isinstance(value, Uint) and value.bitwidth == self._bitwidth: + self._valid = value.is_valid + self._value = value.value + return + + if type(value) == int: + val = value + elif type(value) == float: + val = int(value) + elif type(value) == str: + try: + val = int(value) + except ValueError as e: + raise ScalarInvalidTypeError( + "Invalid value for Int{} scalar".format(self._bitwidth) + ) from e + else: + raise ScalarInvalidTypeError( + "Invalid type {} for Int{} scalar".format(type(value), self._bitwidth) + ) + + if val < 0 or val >= self._max: + raise ScalarInvalidTypeError( + "Invalid Uint{} scalar".format(self._bitwidth, val) + ) + self._value = val + self._valid = True diff --git a/cloudquery/sdk/scalar/uuid.py b/cloudquery/sdk/scalar/uuid.py index c97e4da..39462a8 100644 --- a/cloudquery/sdk/scalar/uuid.py +++ b/cloudquery/sdk/scalar/uuid.py @@ -4,8 +4,7 @@ class UUID(Scalar): def __init__(self, valid: bool = False, value: uuid.UUID = None): - self._valid = valid - self._value = value + super().__init__(valid, value) def __eq__(self, scalar: Scalar) -> bool: if scalar is None: @@ -18,8 +17,14 @@ def __eq__(self, scalar: Scalar) -> bool: def value(self): return self._value - def set(self, value): + def set(self, value: any): if value is None: + self._valid = False + return + + if isinstance(value, UUID): + self._valid = value.is_valid + self._value = value.value return if type(value) == uuid.UUID: diff --git a/cloudquery/sdk/scalar/vector.py b/cloudquery/sdk/scalar/vector.py new file mode 100644 index 0000000..a57d1fe --- /dev/null +++ b/cloudquery/sdk/scalar/vector.py @@ -0,0 +1,43 @@ +from .scalar import Scalar +from typing import Type + + +class Vector: + def __init__(self, type: Type[Scalar] = None, *args): + self.data = [] + self.type = type + for arg in args: + self.append(arg) # Use the append method for type checking and appending + + def append(self, item): + if not isinstance(item, Scalar): + raise TypeError("Item is not of type Scalar or its subclass") + + if self.type is None: + self.type = type(item) + self.data.append(item) + elif isinstance(item, self.type): + self.data.append(item) + else: + raise TypeError(f"Item is not of type {self.type.__name__}") + + def __eq__(self, other): + if not isinstance(other, Vector): + return False + if len(self) != len(other): + return False + + for self_item, other_item in zip(self.data, other.data): + if self_item != other_item: + return False + + return True + + def __getitem__(self, index): + return self.data[index] + + def __len__(self): + return len(self.data) + + def __repr__(self): + return f"Vector of {self.type.__name__ if self.type else 'unknown type'}: {self.data}" diff --git a/cloudquery/sdk/types/__init__.py b/cloudquery/sdk/types/__init__.py new file mode 100644 index 0000000..e6ad650 --- /dev/null +++ b/cloudquery/sdk/types/__init__.py @@ -0,0 +1,2 @@ +from .json import JSONType +from .uuid import UUIDType diff --git a/cloudquery/sdk/types/json.py b/cloudquery/sdk/types/json.py new file mode 100644 index 0000000..a222e0b --- /dev/null +++ b/cloudquery/sdk/types/json.py @@ -0,0 +1,20 @@ +import pyarrow as pa + + +class JSONType(pa.PyExtensionType): + def __init__(self): + pa.PyExtensionType.__init__(self, pa.binary()) + + def __reduce__(self): + return JSONType, () + + def __arrow_ext_serialize__(self): + # since we don't have a parameterized type, we don't need extra + # metadata to be deserialized + return b"json-serialized" + + @classmethod + def __arrow_ext_deserialize__(self, storage_type, serialized): + # return an instance of this subclass given the serialized + # metadata. + return JSONType() diff --git a/cloudquery/sdk/types/uuid.py b/cloudquery/sdk/types/uuid.py index 940a04a..0041cc4 100644 --- a/cloudquery/sdk/types/uuid.py +++ b/cloudquery/sdk/types/uuid.py @@ -1,14 +1,12 @@ import pyarrow as pa -import pyarrow -import sys -class UuidType(pa.PyExtensionType): +class UUIDType(pa.PyExtensionType): def __init__(self): pa.PyExtensionType.__init__(self, pa.binary(16)) def __reduce__(self): - return UuidType, () + return UUIDType, () def __arrow_ext_serialize__(self): # since we don't have a parameterized type, we don't need extra @@ -19,4 +17,4 @@ def __arrow_ext_serialize__(self): def __arrow_ext_deserialize__(self, storage_type, serialized): # return an instance of this subclass given the serialized # metadata. - return UuidType() + return UUIDType() diff --git a/requirements.txt b/requirements.txt index 713b100..b4871e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,21 @@ +cloudquery-plugin-pb==0.0.14 +exceptiongroup==1.1.2 black==23.7.0 grpcio==1.56.2 grpcio-tools==1.56.2 -cloudquery-plugin-pb==0.0.14 +iniconfig==2.0.0 +Jinja2==3.1.2 +MarkupSafe==2.1.3 +numpy==1.25.1 +packaging==23.1 +pandas==2.0.3 +pluggy==1.2.0 protobuf==4.23.4 pyarrow==12.0.1 pytest==7.4.0 -Jinja2~=3.1.2 -structlog==23.1.0 \ No newline at end of file +python-dateutil==2.8.2 +pytz==2023.3 +six==1.16.0 +structlog==23.1.0 +tomli==2.0.1 +tzdata==2023.3 diff --git a/tests/scalar/bool.py b/tests/scalar/bool.py index 47d2b6a..015ebbe 100644 --- a/tests/scalar/bool.py +++ b/tests/scalar/bool.py @@ -11,7 +11,7 @@ ("false", Bool(True, False)), ], ) -def test_binary_set(input_value, expected_scalar): +def test_bool_set(input_value, expected_scalar): b = Bool() b.set(input_value) assert b == expected_scalar diff --git a/tests/scalar/float32.py b/tests/scalar/float32.py new file mode 100644 index 0000000..fe99719 --- /dev/null +++ b/tests/scalar/float32.py @@ -0,0 +1,15 @@ +import pytest +from cloudquery.sdk.scalar import Float + + +@pytest.mark.parametrize( + "input_value,expected_scalar", + [ + (1, Float(True, float(1), 32)), + ("1", Float(True, float(1), 32)), + ], +) +def test_float32_set(input_value, expected_scalar): + b = Float(bitwidth=32) + b.set(input_value) + assert b == expected_scalar diff --git a/tests/scalar/float64.py b/tests/scalar/float64.py index 7eb2aef..4d0d011 100644 --- a/tests/scalar/float64.py +++ b/tests/scalar/float64.py @@ -1,15 +1,15 @@ import pytest -from cloudquery.sdk.scalar import Float64 +from cloudquery.sdk.scalar import Float @pytest.mark.parametrize( "input_value,expected_scalar", [ - (1, Float64(True, float(1))), - ("1", Float64(True, float(1))), + (1, Float(True, float(1), 64)), + ("1", Float(True, float(1), 64)), ], ) -def test_binary_set(input_value, expected_scalar): - b = Float64() +def test_float64_set(input_value, expected_scalar): + b = Float(bitwidth=64) b.set(input_value) assert b == expected_scalar diff --git a/tests/scalar/int64.py b/tests/scalar/int64.py index 47becf6..4417065 100644 --- a/tests/scalar/int64.py +++ b/tests/scalar/int64.py @@ -1,15 +1,15 @@ import pytest -from cloudquery.sdk.scalar import Int64 +from cloudquery.sdk.scalar import Int @pytest.mark.parametrize( "input_value,expected_scalar", [ - (1, Int64(True, float(1))), - ("1", Int64(True, float(1))), + (1, Int(True, 1, 64)), + ("1", Int(True, 1, 64)), ], ) -def test_binary_set(input_value, expected_scalar): - b = Int64() +def test_int_set(input_value, expected_scalar): + b = Int(bitwidth=64) b.set(input_value) assert b == expected_scalar diff --git a/tests/scalar/json.py b/tests/scalar/json.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scalar/list.py b/tests/scalar/list.py new file mode 100644 index 0000000..5819537 --- /dev/null +++ b/tests/scalar/list.py @@ -0,0 +1,43 @@ +from cloudquery.sdk.scalar import String, Bool, List + + +def test_list(): + s = type(String()) + l = List(s) + assert l == List(s) + + l.set([String(True, "a string"), String(True, "another string"), String(False)]) + assert len(l) == 3 + + +def test_list_eq(): + s = type(String()) + l1 = List(s) + l1.set([String(True, "a string"), String(True, "another string"), String(False)]) + + l2 = List(s) + l2.set([String(True, "a string"), String(True, "another string"), String(False)]) + + assert l1 == l2 + + +def test_list_ineq(): + s = type(String()) + l1 = List(s) + l1.set([String(True, "a string")]) + + l2 = List(s) + l2.set([String(True, "another string")]) + + assert l1 != l2 + + +def test_list_eq_invalid(): + s = type(String()) + l1 = List(s) + l1.set([String(False)]) + + l2 = List(s) + l2.set([String(False)]) + + assert l1 == l2 diff --git a/tests/scalar/string.py b/tests/scalar/string.py new file mode 100644 index 0000000..471efed --- /dev/null +++ b/tests/scalar/string.py @@ -0,0 +1,18 @@ +import pytest +import uuid +from cloudquery.sdk.scalar import String + + +@pytest.mark.parametrize( + "input_value,expected_scalar", + [ + ( + "foo", + String(True, "foo"), + ), + ], +) +def test_string_set(input_value, expected_scalar): + b = String() + b.set(input_value) + assert b == expected_scalar diff --git a/tests/scalar/timestamp.py b/tests/scalar/timestamp.py new file mode 100644 index 0000000..2350b68 --- /dev/null +++ b/tests/scalar/timestamp.py @@ -0,0 +1,58 @@ +import pytest +from datetime import datetime +from cloudquery.sdk.scalar import Timestamp +import pandas as pd + + +@pytest.mark.parametrize( + "input_value,expected_scalar", + [ + ( + datetime.strptime("2006-01-02T15:04:05", "%Y-%m-%dT%H:%M:%S"), + Timestamp( + True, + pd.to_datetime("2006-01-02 15:04:05"), + ), + ), + ( + "2006-01-02T15:04:05Z", + Timestamp( + True, + pd.to_datetime("2006-01-02 15:04:05Z"), + ), + ), + ( + "2006-01-02T15:04:05Z07:00", + Timestamp( + True, + pd.to_datetime("2006-01-02T15:04:05Z07:00"), + ), + ), + ( + "2006-01-02 15:04:05.999999999 -0700", + # "2006-01-02 15:04:05.999999999 -0700 MST", + Timestamp( + True, + pd.to_datetime("2006-01-02 15:04:05.999999999 -0700"), + ), + ), + ( + "2006-01-02 15:04:05.999999999", + Timestamp( + True, + pd.to_datetime("2006-01-02 15:04:05.999999999"), + ), + ), + ( + "2006-01-02 15:04:05.999999999Z", + Timestamp( + True, + pd.to_datetime("2006-01-02 15:04:05.999999999Z"), + ), + ), + ], +) +def test_timestamp_set(input_value, expected_scalar): + b = Timestamp() + b.set(input_value) + assert b == expected_scalar, f"{b} and {expected_scalar} are not equal" diff --git a/tests/scalar/uint32.py b/tests/scalar/uint32.py new file mode 100644 index 0000000..855b4bf --- /dev/null +++ b/tests/scalar/uint32.py @@ -0,0 +1,15 @@ +import pytest +from cloudquery.sdk.scalar import Uint + + +@pytest.mark.parametrize( + "input_value,expected_scalar", + [ + (1, Uint(True, 1, 32)), + ("1", Uint(True, 1, 32)), + ], +) +def test_uint_set(input_value, expected_scalar): + b = Uint(bitwidth=32) + b.set(input_value) + assert b == expected_scalar diff --git a/tests/scalar/uuid.py b/tests/scalar/uuid.py index 7dadfaa..6df0d85 100644 --- a/tests/scalar/uuid.py +++ b/tests/scalar/uuid.py @@ -12,7 +12,7 @@ ), ], ) -def test_binary_set(input_value, expected_scalar): +def test_uuid_set(input_value, expected_scalar): b = UUID() b.set(input_value) assert b == expected_scalar diff --git a/tests/scalar/vector.py b/tests/scalar/vector.py new file mode 100644 index 0000000..0509c1c --- /dev/null +++ b/tests/scalar/vector.py @@ -0,0 +1,58 @@ +from cloudquery.sdk.scalar import String, Bool, Vector + + +def test_vector_append(): + s = type(String()) + v = Vector(s) + assert v == Vector(s) + + v.append(String(True, "a string")) + v.append(String(True, "another string")) + v.append(String(False)) + assert len(v) == 3 + + +def test_vector_invalid_type_append(): + s = type(String()) + v = Vector(s) + b = Bool(True, True) + try: + v.append(b) + assert False + except: + assert True + + +def test_vector_eq(): + s = type(String()) + v1 = Vector(s) + v1.append(String(True, "a string")) + + v2 = Vector(s) + v2.append(String(True, "a string")) + + assert v1 == v2 + + +def test_vector_ineq(): + s = type(String()) + v1 = Vector(s) + v1.append(String(True, "a string")) + + v2 = Vector(s) + v2.append(String(True, "another string")) + + assert v1 != v2 + + +def test_vector_ineq_order(): + s = type(String()) + v1 = Vector(s) + v1.append(String(True, "a")) + v1.append(String(True, "b")) + + v2 = Vector(s) + v2.append(String(True, "b")) + v2.append(String(True, "a")) + + assert v1 != v2 From 15923e0d4924defae5ca2023a4cf08132af775fd Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sat, 29 Jul 2023 13:19:31 +0300 Subject: [PATCH 3/6] feat: Update openapi transformers --- cloudquery/sdk/schema/column.py | 17 ++++++++ cloudquery/sdk/transformers/__init__.py | 1 + cloudquery/sdk/transformers/openapi.py | 16 +++++-- tests/transformers/__init__.py | 0 tests/transformers/openapi.py | 57 +++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 tests/transformers/__init__.py create mode 100644 tests/transformers/openapi.py diff --git a/cloudquery/sdk/schema/column.py b/cloudquery/sdk/schema/column.py index 55362e7..a598166 100644 --- a/cloudquery/sdk/schema/column.py +++ b/cloudquery/sdk/schema/column.py @@ -23,6 +23,23 @@ def __init__( self.not_null = not_null self.incremental_key = incremental_key self.unique = unique + + def __str__(self) -> str: + return f"Column(name={self.name}, type={self.type}, description={self.description}, primary_key={self.primary_key}, not_null={self.not_null}, incremental_key={self.incremental_key}, unique={self.unique})" + + def __repr__(self) -> str: + return f"Column(name={self.name}, type={self.type}, description={self.description}, primary_key={self.primary_key}, not_null={self.not_null}, incremental_key={self.incremental_key}, unique={self.unique})" + + def __eq__(self, __value: object) -> bool: + if type(__value) == Column: + return (self.name == __value.name + and self.type == __value.type + and self.description == __value.description + and self.primary_key == __value.primary_key + and self.not_null == __value.not_null + and self.incremental_key == __value.incremental_key + and self.unique == __value.unique) + return False def to_arrow_field(self): metadata = { diff --git a/cloudquery/sdk/transformers/__init__.py b/cloudquery/sdk/transformers/__init__.py index b762412..ab50feb 100644 --- a/cloudquery/sdk/transformers/__init__.py +++ b/cloudquery/sdk/transformers/__init__.py @@ -1 +1,2 @@ from .transformers import transform_list_of_dict +from .openapi import oapi_definition_to_columns \ No newline at end of file diff --git a/cloudquery/sdk/transformers/openapi.py b/cloudquery/sdk/transformers/openapi.py index 831e1cd..23af197 100644 --- a/cloudquery/sdk/transformers/openapi.py +++ b/cloudquery/sdk/transformers/openapi.py @@ -1,5 +1,6 @@ from typing import Dict, List import pyarrow as pa +from cloudquery.sdk.types import JSONType from cloudquery.sdk.schema import Column @@ -9,16 +10,25 @@ def oapi_type_to_arrow_type(field) -> pa.DataType: return pa.string() elif oapi_type == "number": return pa.int64() + elif oapi_type == "integer": + return pa.int64() elif oapi_type == "boolean": return pa.bool_() + elif oapi_type == "array": + return JSONType() + elif oapi_type == "object": + return JSONType() + elif oapi_type is None and "$ref" in field: + return JSONType() else: return pa.string() -def oapi_properties_to_columns(properties: Dict) -> List[Column]: +def oapi_definition_to_columns(definition: Dict) -> List[Column]: columns = [] - for key, value in properties.items(): + for key, value in definition["properties"].items(): + column_type = oapi_type_to_arrow_type(value) columns.append( - Column(name=key, type=value, description=value.get("description")) + Column(name=key, type=column_type, description=value.get("description")) ) return columns diff --git a/tests/transformers/__init__.py b/tests/transformers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/transformers/openapi.py b/tests/transformers/openapi.py new file mode 100644 index 0000000..e7a2272 --- /dev/null +++ b/tests/transformers/openapi.py @@ -0,0 +1,57 @@ +import pyarrow as pa +from cloudquery.sdk.transformers import oapi_definition_to_columns +from cloudquery.sdk.schema import Column +from cloudquery.sdk.types import JSONType + +OAPI_SPEC = { + "swagger": "2.0", + "info": { + "version": "2.0", + "title": "Test API", + "description": "Unit tests APIs", + }, + "host": "cloudquery.io", + "schemes": ["https"], + "consumes": ["application/json"], + "produces": ["application/json"], + "paths": {}, + "definitions": { + "TestDefinition": { + "type": "object", + "properties": { + "string": { + "type": "string", + }, + "number": { + "type": "number", + }, + "integer": { + "type": "integer", + }, + "boolean": { + "type": "boolean", + }, + "object": { + "$ref": "#/definitions/SomeDefinition", + }, + "array": { + "type": "array", + "items": {"$ref": "#/definitions/SomeDefinition"}, + }, + }, + }, + }, +} + + +def test_oapi_properties_to_columns(): + expected_columns = [ + Column("string", pa.string() , description=None), + Column("number", pa.int64() , description=None), + Column("integer", pa.int64() , description=None), + Column("boolean", pa.bool_() , description=None), + Column("object", JSONType() , description=None), + Column("array", JSONType() , description=None), + ] + columns = oapi_definition_to_columns(OAPI_SPEC["definitions"]["TestDefinition"]) + assert expected_columns == columns From 5980d1ebfef7fdbe526622c14dd0ba0fddc62e04 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sat, 29 Jul 2023 13:27:27 +0300 Subject: [PATCH 4/6] fmt --- cloudquery/sdk/schema/column.py | 22 ++++++++++++---------- cloudquery/sdk/transformers/__init__.py | 2 +- tests/transformers/openapi.py | 14 +++++++------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/cloudquery/sdk/schema/column.py b/cloudquery/sdk/schema/column.py index a598166..9be427c 100644 --- a/cloudquery/sdk/schema/column.py +++ b/cloudquery/sdk/schema/column.py @@ -23,22 +23,24 @@ def __init__( self.not_null = not_null self.incremental_key = incremental_key self.unique = unique - + def __str__(self) -> str: return f"Column(name={self.name}, type={self.type}, description={self.description}, primary_key={self.primary_key}, not_null={self.not_null}, incremental_key={self.incremental_key}, unique={self.unique})" - + def __repr__(self) -> str: return f"Column(name={self.name}, type={self.type}, description={self.description}, primary_key={self.primary_key}, not_null={self.not_null}, incremental_key={self.incremental_key}, unique={self.unique})" - + def __eq__(self, __value: object) -> bool: if type(__value) == Column: - return (self.name == __value.name - and self.type == __value.type - and self.description == __value.description - and self.primary_key == __value.primary_key - and self.not_null == __value.not_null - and self.incremental_key == __value.incremental_key - and self.unique == __value.unique) + return ( + self.name == __value.name + and self.type == __value.type + and self.description == __value.description + and self.primary_key == __value.primary_key + and self.not_null == __value.not_null + and self.incremental_key == __value.incremental_key + and self.unique == __value.unique + ) return False def to_arrow_field(self): diff --git a/cloudquery/sdk/transformers/__init__.py b/cloudquery/sdk/transformers/__init__.py index ab50feb..cd00cc0 100644 --- a/cloudquery/sdk/transformers/__init__.py +++ b/cloudquery/sdk/transformers/__init__.py @@ -1,2 +1,2 @@ from .transformers import transform_list_of_dict -from .openapi import oapi_definition_to_columns \ No newline at end of file +from .openapi import oapi_definition_to_columns diff --git a/tests/transformers/openapi.py b/tests/transformers/openapi.py index e7a2272..521fa8a 100644 --- a/tests/transformers/openapi.py +++ b/tests/transformers/openapi.py @@ -36,7 +36,7 @@ }, "array": { "type": "array", - "items": {"$ref": "#/definitions/SomeDefinition"}, + "items": {"$ref": "#/definitions/SomeDefinition"}, }, }, }, @@ -46,12 +46,12 @@ def test_oapi_properties_to_columns(): expected_columns = [ - Column("string", pa.string() , description=None), - Column("number", pa.int64() , description=None), - Column("integer", pa.int64() , description=None), - Column("boolean", pa.bool_() , description=None), - Column("object", JSONType() , description=None), - Column("array", JSONType() , description=None), + Column("string", pa.string(), description=None), + Column("number", pa.int64(), description=None), + Column("integer", pa.int64(), description=None), + Column("boolean", pa.bool_(), description=None), + Column("object", JSONType(), description=None), + Column("array", JSONType(), description=None), ] columns = oapi_definition_to_columns(OAPI_SPEC["definitions"]["TestDefinition"]) assert expected_columns == columns From 4f3545b6a1418625f472c750b3ec24525810c3c1 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sat, 29 Jul 2023 22:39:26 +0300 Subject: [PATCH 5/6] fix: Add write support for roundtrip test --- cloudquery/sdk/internal/memdb/memdb.py | 28 +++++++++---- .../sdk/internal/servers/plugin_v3/plugin.py | 42 +++++++++++++++++-- cloudquery/sdk/message/__init__.py | 6 +++ cloudquery/sdk/message/write.py | 14 +++++-- cloudquery/sdk/plugin/plugin.py | 5 ++- cloudquery/sdk/scheduler/scheduler.py | 13 +++++- cloudquery/sdk/schema/__init__.py | 2 +- cloudquery/sdk/schema/arrow.py | 16 +++---- cloudquery/sdk/schema/table.py | 32 +++++++++++++- tests/internal/memdb/memdb.py | 4 +- tests/serve/plugin.py | 34 ++++++++++++++- 11 files changed, 165 insertions(+), 31 deletions(-) diff --git a/cloudquery/sdk/internal/memdb/memdb.py b/cloudquery/sdk/internal/memdb/memdb.py index ba97f4a..78f3ac9 100644 --- a/cloudquery/sdk/internal/memdb/memdb.py +++ b/cloudquery/sdk/internal/memdb/memdb.py @@ -11,18 +11,30 @@ class MemDB(plugin.Plugin): def __init__(self) -> None: super().__init__(NAME, VERSION) - self._tables: List[schema.Table] = [ - schema.Table("test_table", [schema.Column("test_column", pa.int64())]) - ] - self._memory_db: Dict[str, pa.record] = { - "test_table": pa.record_batch([pa.array([1, 2, 3])], names=["test_column"]) - } + self._db: Dict[str, pa.RecordBatch] = {} + self._tables: Dict[str, schema.Table] = {} def get_tables(self, options: plugin.TableOptions = None) -> List[plugin.Table]: - return self._tables + tables = list(self._tables.values()) + return schema.filter_dfs(tables, options.tables, options.skip_tables) def sync( self, options: plugin.SyncOptions ) -> Generator[message.SyncMessage, None, None]: - for table, record in self._memory_db.items(): + for table, record in self._db.items(): yield message.SyncInsertMessage(record) + + def write(self, msg_iterator: Generator[message.WriteMessage, None, None]) -> None: + for msg in msg_iterator: + if type(msg) == message.WriteMigrateTableMessage: + if msg.table.name not in self._db: + self._db[msg.table.name] = msg.table + self._tables[msg.table.name] = msg.table + elif type(msg) == message.WriteInsertMessage: + table = schema.Table.from_arrow_schema(msg.record.schema) + self._db[table.name] = msg.record + else: + raise NotImplementedError(f"Unknown message type {type(msg)}") + + def close(self) -> None: + self._db = {} diff --git a/cloudquery/sdk/internal/servers/plugin_v3/plugin.py b/cloudquery/sdk/internal/servers/plugin_v3/plugin.py index 3399da8..77f7df5 100644 --- a/cloudquery/sdk/internal/servers/plugin_v3/plugin.py +++ b/cloudquery/sdk/internal/servers/plugin_v3/plugin.py @@ -1,10 +1,18 @@ import pyarrow as pa import structlog +from typing import Generator from cloudquery.plugin_v3 import plugin_pb2, plugin_pb2_grpc, arrow -from cloudquery.sdk.message import SyncInsertMessage, SyncMigrateTableMessage +from cloudquery.sdk.message import ( + SyncInsertMessage, + SyncMigrateTableMessage, + WriteInsertMessage, + WriteMigrateTableMessage, + WriteMessage, + WriteDeleteStale, +) from cloudquery.sdk.plugin.plugin import Plugin, SyncOptions, TableOptions -from cloudquery.sdk.schema import tables_to_arrow_schemas +from cloudquery.sdk.schema import tables_to_arrow_schemas, Table class PluginServicer(plugin_pb2_grpc.PluginServicer): @@ -64,8 +72,34 @@ def Sync(self, request, context): def Read(self, request, context): raise NotImplementedError() - def Write(self, request_iterator, context): - raise NotImplementedError() + def Write( + self, request_iterator: Generator[plugin_pb2.Write.Request, None, None], context + ): + def msg_iterator() -> Generator[WriteMessage, None, None]: + for msg in request_iterator: + field = msg.WhichOneof("message") + if field == "migrate_table": + sc = arrow.new_schema_from_bytes(msg.migrate_table.table) + table = Table.from_arrow_schema(sc) + yield WriteMigrateTableMessage(table=table) + elif field == "insert": + yield WriteInsertMessage( + record=arrow.new_record_from_bytes(msg.insert.record) + ) + elif field == "delete": + yield WriteDeleteStale( + table_name=msg.delete.table_name, + source_name=msg.delete.source_name, + sync_time=msg.delete.sync_time.ToDatetime(), + ) + elif field is None: + continue + else: + raise NotImplementedError(f"unknown write message type {field}") + + self._plugin.write(msg_iterator()) + return plugin_pb2.Write.Response() def Close(self, request, context): + self._plugin.close() return plugin_pb2.Close.Response() diff --git a/cloudquery/sdk/message/__init__.py b/cloudquery/sdk/message/__init__.py index b170660..5ddb77a 100644 --- a/cloudquery/sdk/message/__init__.py +++ b/cloudquery/sdk/message/__init__.py @@ -1 +1,7 @@ from .sync import SyncMessage, SyncInsertMessage, SyncMigrateTableMessage +from .write import ( + WriteMessage, + WriteInsertMessage, + WriteMigrateTableMessage, + WriteDeleteStale, +) diff --git a/cloudquery/sdk/message/write.py b/cloudquery/sdk/message/write.py index c9ea978..69ef9cc 100644 --- a/cloudquery/sdk/message/write.py +++ b/cloudquery/sdk/message/write.py @@ -1,15 +1,23 @@ import pyarrow as pa +from cloudquery.sdk.schema import Table class WriteMessage: pass -class InsertMessage(WriteMessage): +class WriteInsertMessage(WriteMessage): def __init__(self, record: pa.RecordBatch): self.record = record -class MigrateMessage(WriteMessage): - def __init__(self, table: pa.Schema): +class WriteMigrateTableMessage(WriteMessage): + def __init__(self, table: Table): self.table = table + + +class WriteDeleteStale(WriteMessage): + def __init__(self, table_name: str, source_name: str, sync_time): + self.table_name = table_name + self.source_name = source_name + self.sync_time = sync_time diff --git a/cloudquery/sdk/plugin/plugin.py b/cloudquery/sdk/plugin/plugin.py index dad741b..4765cf2 100644 --- a/cloudquery/sdk/plugin/plugin.py +++ b/cloudquery/sdk/plugin/plugin.py @@ -50,5 +50,8 @@ def get_tables(self, options: TableOptions) -> List[Table]: def sync(self, options: SyncOptions) -> Generator[message.SyncMessage, None, None]: raise NotImplementedError() + def write(self, writer: Generator[message.WriteMessage, None, None]) -> None: + raise NotImplementedError() + def close(self) -> None: - pass + raise NotImplementedError() diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index a2cb270..fa94506 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -103,7 +103,18 @@ def resolve_table( ) total_resources = 0 for item in resolver.resolve(client, parent_item): - resource = self.resolve_resource(resolver, client, parent_item, item) + try: + resource = self.resolve_resource( + resolver, client, parent_item, item + ) + except Exception as e: + self._logger.error( + "failed to resolve resource", + table=resolver.table.name, + depth=depth, + exception=e, + ) + continue res.put(SyncInsertMessage(resource.to_arrow_record())) for child_resolvers in resolver.child_resolvers: self._pools[depth + 1].submit( diff --git a/cloudquery/sdk/schema/__init__.py b/cloudquery/sdk/schema/__init__.py index 3e3ecc7..649a1fd 100644 --- a/cloudquery/sdk/schema/__init__.py +++ b/cloudquery/sdk/schema/__init__.py @@ -1,5 +1,5 @@ from .column import Column -from .table import Table, tables_to_arrow_schemas +from .table import Table, tables_to_arrow_schemas, filter_dfs from .resource import Resource # from .table_resolver import TableReso diff --git a/cloudquery/sdk/schema/arrow.py b/cloudquery/sdk/schema/arrow.py index 3800326..2f21bc1 100644 --- a/cloudquery/sdk/schema/arrow.py +++ b/cloudquery/sdk/schema/arrow.py @@ -1,9 +1,9 @@ -METADATA_UNIQUE = "cq:extension:unique" -METADATA_PRIMARY_KEY = "cq:extension:primary_key" -METADATA_CONSTRAINT_NAME = "cq:extension:constraint_name" -METADATA_INCREMENTAL = "cq:extension:incremental" +METADATA_UNIQUE = b"cq:extension:unique" +METADATA_PRIMARY_KEY = b"cq:extension:primary_key" +METADATA_CONSTRAINT_NAME = b"cq:extension:constraint_name" +METADATA_INCREMENTAL = b"cq:extension:incremental" -METADATA_TRUE = "true" -METADATA_FALSE = "false" -METADATA_TABLE_NAME = "cq:table_name" -METADATA_TABLE_DESCRIPTION = "cq:table_description" +METADATA_TRUE = b"true" +METADATA_FALSE = b"false" +METADATA_TABLE_NAME = b"cq:table_name" +METADATA_TABLE_DESCRIPTION = b"cq:table_description" diff --git a/cloudquery/sdk/schema/table.py b/cloudquery/sdk/schema/table.py index 5cee64e..785614f 100644 --- a/cloudquery/sdk/schema/table.py +++ b/cloudquery/sdk/schema/table.py @@ -1,7 +1,7 @@ from __future__ import annotations from typing import List, Generator, Any - +import fnmatch import pyarrow as pa from cloudquery.sdk.schema import arrow @@ -53,6 +53,19 @@ def primary_keys(self): def incremental_keys(self): return [column.name for column in self.columns if column.incremental_key] + @classmethod + def from_arrow_schema(cls, schema: pa.Schema) -> Table: + columns = [] + for field in schema: + columns.append(Column.from_arrow_field(field)) + return cls( + name=schema.metadata[arrow.METADATA_TABLE_NAME].decode("utf-8"), + columns=columns, + description=schema.metadata.get(arrow.METADATA_TABLE_DESCRIPTION).decode( + "utf-8" + ), + ) + def to_arrow_schema(self): fields = [] md = { @@ -74,3 +87,20 @@ def tables_to_arrow_schemas(tables: List[Table]): for table in tables: schemas.append(table.to_arrow_schema()) return schemas + + +def filter_dfs( + tables: List[Table], include_tables: List[str], skip_tables: List[str] +) -> List[Table]: + filtered: List[Table] = [] + for table in tables: + matched = False + for include_table in include_tables: + if fnmatch.fnmatch(table.name, include_table): + matched = True + for skip_table in skip_tables: + if fnmatch.fnmatch(table.name, skip_table): + matched = False + if matched: + filtered.append(table) + return filtered diff --git a/tests/internal/memdb/memdb.py b/tests/internal/memdb/memdb.py index 3b0afca..546d109 100644 --- a/tests/internal/memdb/memdb.py +++ b/tests/internal/memdb/memdb.py @@ -6,6 +6,6 @@ def test_memdb(): p = memdb.MemDB() p.init(None) msgs = [] - for msg in p.sync(SyncOptions()): + for msg in p.sync(SyncOptions(tables=["*"])): msgs.append(msg) - assert len(msgs) == 1 + assert len(msgs) == 0 diff --git a/tests/serve/plugin.py b/tests/serve/plugin.py index 24cbbb2..1619e53 100644 --- a/tests/serve/plugin.py +++ b/tests/serve/plugin.py @@ -1,12 +1,22 @@ import random import grpc import time +import pyarrow as pa from concurrent import futures +from cloudquery.sdk.schema import Table, Column from cloudquery.sdk import serve from cloudquery.sdk import message from cloudquery.plugin_v3 import plugin_pb2_grpc, plugin_pb2, arrow from cloudquery.sdk.internal.memdb import MemDB +test_table = Table( + "test", + [ + Column("id", pa.int64()), + Column("name", pa.string()), + ], +) + def test_plugin_serve(): p = MemDB() @@ -27,11 +37,31 @@ def test_plugin_serve(): response = stub.Init(plugin_pb2.Init.Request(spec=b"")) assert response is not None - response = stub.GetTables(plugin_pb2.GetTables.Request()) + def writer_iterator(): + buf = arrow.schema_to_bytes(test_table.to_arrow_schema()) + yield plugin_pb2.Write.Request( + migrate_table=plugin_pb2.Write.MessageMigrateTable(table=buf) + ) + record = pa.RecordBatch.from_arrays( + [ + pa.array([1, 2, 3]), + pa.array(["a", "b", "c"]), + ], + schema=test_table.to_arrow_schema(), + ) + yield plugin_pb2.Write.Request( + insert=plugin_pb2.Write.MessageInsert( + record=arrow.record_to_bytes(record) + ) + ) + + stub.Write(writer_iterator()) + + response = stub.GetTables(plugin_pb2.GetTables.Request(tables=["*"])) schemas = arrow.new_schemas_from_bytes(response.tables) assert len(schemas) == 1 - response = stub.Sync(plugin_pb2.Sync.Request()) + response = stub.Sync(plugin_pb2.Sync.Request(tables=["*"])) total_records = 0 for msg in response: if msg.insert is not None: From dc09a6b30d9bf53bb617f07cd3c49879c6e6ad2c Mon Sep 17 00:00:00 2001 From: CloudQuery Bot <102256036+cq-bot@users.noreply.github.com> Date: Sat, 29 Jul 2023 22:40:44 +0300 Subject: [PATCH 6/6] chore(main): Release v0.0.7 (#9) :robot: I have created a release *beep* *boop* --- ## [0.0.7](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.6...v0.0.7) (2023-07-29) ### Features * Scalar things ([#5](https://github.com/cloudquery/plugin-sdk-python/issues/5)) ([3148c6c](https://github.com/cloudquery/plugin-sdk-python/commit/3148c6ce70c19cda24e357e2b274c0e4130b05eb)) * Update openapi transformers ([15923e0](https://github.com/cloudquery/plugin-sdk-python/commit/15923e0d4924defae5ca2023a4cf08132af775fd)) ### Bug Fixes * Add write support for roundtrip test ([4f3545b](https://github.com/cloudquery/plugin-sdk-python/commit/4f3545b6a1418625f472c750b3ec24525810c3c1)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --- CHANGELOG.md | 13 +++++++++++++ setup.py | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36c33b2..4b88d16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## [0.0.7](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.6...v0.0.7) (2023-07-29) + + +### Features + +* Scalar things ([#5](https://github.com/cloudquery/plugin-sdk-python/issues/5)) ([3148c6c](https://github.com/cloudquery/plugin-sdk-python/commit/3148c6ce70c19cda24e357e2b274c0e4130b05eb)) +* Update openapi transformers ([15923e0](https://github.com/cloudquery/plugin-sdk-python/commit/15923e0d4924defae5ca2023a4cf08132af775fd)) + + +### Bug Fixes + +* Add write support for roundtrip test ([4f3545b](https://github.com/cloudquery/plugin-sdk-python/commit/4f3545b6a1418625f472c750b3ec24525810c3c1)) + ## [0.0.6](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.5...v0.0.6) (2023-07-28) diff --git a/setup.py b/setup.py index d507eec..6f1847c 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ ] setuptools.setup( name=name, - version="0.0.6", + version="0.0.7", description=description, long_description=long_description, author="CloudQuery LTD",