diff --git a/CHANGELOG.md b/CHANGELOG.md index 088fd56..ec8db45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## [0.0.9](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.8...v0.0.9) (2023-08-02) + + +### Features + +* Add override_columns to openapi transformer ([#22](https://github.com/cloudquery/plugin-sdk-python/issues/22)) ([a53bb0e](https://github.com/cloudquery/plugin-sdk-python/commit/a53bb0e0e1eda99705fc96c0c7d0ac79e814a8d7)) +* Wire logging with cli flags ([#26](https://github.com/cloudquery/plugin-sdk-python/issues/26)) ([106781b](https://github.com/cloudquery/plugin-sdk-python/commit/106781b0ddfa5d5be77890fc0f9b3fe1cee10848)) + + +### Bug Fixes + +* Add better logging for scheduler ([#24](https://github.com/cloudquery/plugin-sdk-python/issues/24)) ([505f94b](https://github.com/cloudquery/plugin-sdk-python/commit/505f94bf67f9835cc3463df854943c6f00c2d281)) +* Add more command-line args, use standard logging ([#29](https://github.com/cloudquery/plugin-sdk-python/issues/29)) ([5d52af9](https://github.com/cloudquery/plugin-sdk-python/commit/5d52af99cb98906faee1485ea10edf05c09155e5)) +* Emit migrate messages for child relations ([#21](https://github.com/cloudquery/plugin-sdk-python/issues/21)) ([536e163](https://github.com/cloudquery/plugin-sdk-python/commit/536e16303a0ac18f37a468d9e2b97b4438bc596e)) +* Fix column resolver resource set ([#23](https://github.com/cloudquery/plugin-sdk-python/issues/23)) ([9936ced](https://github.com/cloudquery/plugin-sdk-python/commit/9936ced487ba387c21385c372f163557bd46ba69)) +* Fix exception logging ([#18](https://github.com/cloudquery/plugin-sdk-python/issues/18)) ([2a5996b](https://github.com/cloudquery/plugin-sdk-python/commit/2a5996b553e9cbb4b1e8dc95d22872a866d32c8b)) +* Fix extension type definitions ([#20](https://github.com/cloudquery/plugin-sdk-python/issues/20)) ([146c549](https://github.com/cloudquery/plugin-sdk-python/commit/146c5498cb1ecf54a55bb5a60ce2cb0a0228c2ed)) +* Fix JSON type handling ([#19](https://github.com/cloudquery/plugin-sdk-python/issues/19)) ([c0cdf55](https://github.com/cloudquery/plugin-sdk-python/commit/c0cdf55a49ebbb1c8ed51022d4a5910f51378e74)) +* Fix race in scheduler ([#25](https://github.com/cloudquery/plugin-sdk-python/issues/25)) ([17fee27](https://github.com/cloudquery/plugin-sdk-python/commit/17fee278f05449584c6a781f6560c7f5faf431c6)) +* Log error on table resolver exception/error ([#16](https://github.com/cloudquery/plugin-sdk-python/issues/16)) ([a1b07e8](https://github.com/cloudquery/plugin-sdk-python/commit/a1b07e8624a335c7ffc37f58913ec103305fd46a)) + ## [0.0.8](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.7...v0.0.8) (2023-08-01) diff --git a/cloudquery/sdk/plugin/plugin.py b/cloudquery/sdk/plugin/plugin.py index 4765cf2..7fc76f6 100644 --- a/cloudquery/sdk/plugin/plugin.py +++ b/cloudquery/sdk/plugin/plugin.py @@ -38,6 +38,9 @@ def __init__(self, name: str, version: str) -> None: def init(self, spec: bytes) -> None: pass + def set_logger(self, logger) -> None: + pass + def name(self) -> str: return self._name diff --git a/cloudquery/sdk/scalar/scalar_factory.py b/cloudquery/sdk/scalar/scalar_factory.py index 6eabaca..8bbf1d4 100644 --- a/cloudquery/sdk/scalar/scalar_factory.py +++ b/cloudquery/sdk/scalar/scalar_factory.py @@ -7,6 +7,7 @@ from .date64 import Date64 from .float import Float from .int import Int +from .json import JSON from .list import List from .scalar import ScalarInvalidTypeError from .string import String @@ -89,6 +90,6 @@ def new_scalar(self, dt: pa.DataType): elif dt == UUIDType(): return UUID() elif dt == JSONType(): - return String() + return JSON() else: raise ScalarInvalidTypeError("Invalid type {} for scalar".format(dt)) diff --git a/cloudquery/sdk/scalar/string.py b/cloudquery/sdk/scalar/string.py index f9bbc26..9fd3a79 100644 --- a/cloudquery/sdk/scalar/string.py +++ b/cloudquery/sdk/scalar/string.py @@ -1,5 +1,4 @@ from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError -from .scalar import NULL_VALUE class String(Scalar): diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index ae9b7ba..8f8d111 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -10,7 +10,7 @@ SyncMigrateTableMessage, ) from cloudquery.sdk.schema import Resource -from .table_resolver import TableResolver +from .table_resolver import TableResolver, Client QUEUE_PER_WORKER = 100 @@ -22,12 +22,8 @@ def __init__(self, maxsize, *args, **kwargs): class TableResolverStarted: - def __init__(self, count=1) -> None: - self._count = count - - @property - def count(self): - return self._count + def __init__(self) -> None: + pass class TableResolverFinished: @@ -43,6 +39,8 @@ def __init__( self._max_depth = max_depth if logger is None: self._logger = structlog.get_logger() + else: + self._logger = logger if concurrency <= 0: raise ValueError("concurrency must be greater than 0") if max_depth <= 0: @@ -72,7 +70,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client: Client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -85,19 +83,24 @@ def resolve_table( self, resolver: TableResolver, depth: int, - client, + client: Client, parent_item: Resource, res: queue.Queue, ): - table_resolvers_started = 0 try: if depth == 0: self._logger.info( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) else: self._logger.debug( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) total_resources = 0 for item in resolver.resolve(client, parent_item): @@ -108,13 +111,15 @@ def resolve_table( except Exception as e: self._logger.error( "failed to resolve resource", + client_id=client.id(), table=resolver.table.name, depth=depth, - exception=e, + exc_info=True, ) continue res.put(SyncInsertMessage(resource.to_arrow_record())) for child_resolvers in resolver.child_resolvers: + res.put(TableResolverStarted()) self._pools[depth + 1].submit( self.resolve_table, child_resolvers, @@ -123,29 +128,33 @@ def resolve_table( resource, res, ) - table_resolvers_started += 1 total_resources += 1 if depth == 0: self._logger.info( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) else: self._logger.debug( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) except Exception as e: self._logger.error( "table resolver finished with error", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, - exception=e, + exc_info=True, ) finally: - res.put(TableResolverStarted(count=table_resolvers_started)) res.put(TableResolverFinished()) def _sync( @@ -155,24 +164,20 @@ def _sync( res: queue.Queue, deterministic_cq_id=False, ): - total_table_resolvers = 0 - try: - for resolver in resolvers: - clients = resolver.multiplex(client) - for client in clients: - self._pools[0].submit( - self.resolve_table, resolver, 0, client, None, res - ) - total_table_resolvers += 1 - finally: - res.put(TableResolverStarted(total_table_resolvers)) + for resolver in resolvers: + clients = resolver.multiplex(client) + for client in clients: + res.put(TableResolverStarted()) + self._pools[0].submit( + self.resolve_table, resolver, 0, client, None, res + ) def sync( self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() - for resolver in resolvers: - yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) + yield from self._send_migrate_table_messages(resolvers) + thread = futures.ThreadPoolExecutor() thread.submit(self._sync, client, resolvers, res, deterministic_cq_id) total_table_resolvers = 0 @@ -180,7 +185,7 @@ def sync( while True: message = res.get() if type(message) == TableResolverStarted: - total_table_resolvers += message.count + total_table_resolvers += 1 if total_table_resolvers == finished_table_resolvers: break continue @@ -191,3 +196,11 @@ def sync( continue yield message thread.shutdown(wait=True) + + def _send_migrate_table_messages( + self, resolvers: List[TableResolver] + ) -> Generator[SyncMessage, None, None]: + for resolver in resolvers: + yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) + if resolver.child_resolvers: + yield from self._send_migrate_table_messages(resolver.child_resolvers) diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index 84cdc25..d1ffd77 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -1,6 +1,15 @@ from cloudquery.sdk.schema.table import Table from cloudquery.sdk.schema import Resource -from typing import Any, Generator +from typing import Any, Generator, List + + +class Client: + def id(self) -> str: + raise NotImplementedError() + + +from cloudquery.sdk.schema import Resource +from cloudquery.sdk.schema.table import Table class TableResolver: @@ -16,22 +25,22 @@ def table(self) -> Table: def child_resolvers(self): return self._child_resolvers - def multiplex(self, client): + def multiplex(self, client: Client) -> List[Client]: return [client] def resolve(self, client, parent_resource) -> Generator[Any, None, None]: raise NotImplementedError() - def pre_resource_resolve(self, client, resource): + def pre_resource_resolve(self, client: Client, resource): return - def resolve_column(self, client, resource: Resource, column_name: str): + def resolve_column(self, client: Client, resource: Resource, column_name: str): if type(resource.item) is dict: if column_name in resource.item: resource.set(column_name, resource.item[column_name]) else: if hasattr(resource.item, column_name): - resource.set(column_name, resource.item[column_name]) + resource.set(column_name, getattr(resource.item, column_name)) - def post_resource_resolve(self, client, resource): + def post_resource_resolve(self, client: Client, resource): return diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index f51503d..74082f7 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -1,22 +1,79 @@ import argparse -import structlog +import logging +import os from concurrent import futures import grpc +import structlog import sys - from cloudquery.discovery_v1 import discovery_pb2_grpc from cloudquery.plugin_v3 import plugin_pb2_grpc +from structlog import wrap_logger + from cloudquery.sdk.docs.generator import Generator from cloudquery.sdk.internal.servers.discovery_v1.discovery import DiscoveryServicer from cloudquery.sdk.internal.servers.plugin_v3 import PluginServicer -from cloudquery.sdk.plugin.plugin import Plugin +from cloudquery.sdk.plugin.plugin import Plugin, TableOptions DOC_FORMATS = ["json", "markdown"] +_IS_WINDOWS = sys.platform == "win32" + +try: + import colorama +except ImportError: + colorama = None + +if _IS_WINDOWS: # pragma: no cover + # On Windows, use colors by default only if Colorama is installed. + _has_colors = colorama is not None +else: + # On other OSes, use colors by default. + _has_colors = True + def get_logger(args): - log = structlog.get_logger() + log_level_map = { + "debug": logging.DEBUG, + "info": logging.INFO, + "warning": logging.WARNING, + "error": logging.ERROR, + "critical": logging.CRITICAL, + } + + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=log_level_map.get(args.log_level.lower(), logging.INFO), + ) + + processors = [ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.dev.set_exc_info, + structlog.stdlib.filter_by_level, + structlog.processors.TimeStamper(fmt="%Y-%m-%dT%H:%M:%SZ", utc=True), + ] + if args.log_format == "text": + processors.append( + structlog.dev.ConsoleRenderer( + colors=os.environ.get("NO_COLOR", "") == "" + and ( + os.environ.get("FORCE_COLOR", "") != "" + or ( + _has_colors + and sys.stdout is not None + and hasattr(sys.stdout, "isatty") + and sys.stdout.isatty() + ) + ) + ) + ) + else: + processors.append(structlog.processors.JSONRenderer()) + + log = wrap_logger(logging.getLogger(), processors=processors) return log @@ -29,6 +86,13 @@ def run(self, args): subparsers = parser.add_subparsers(dest="command", required=True) serve_parser = subparsers.add_parser("serve", help="Start plugin server") + serve_parser.add_argument( + "--log-format", + type=str, + default="text", + choices=["text", "json"], + help="logging format", + ) serve_parser.add_argument( "--log-level", type=str, @@ -36,9 +100,28 @@ def run(self, args): choices=["trace", "debug", "info", "warn", "error"], help="log level", ) + + # ignored for now + serve_parser.add_argument( + "--no-sentry", + action="store_true", + help="disable sentry (placeholder for future use)", + ) + # ignored for now + serve_parser.add_argument( + "--otel-endpoint", + type=str, + default="", + help="Open Telemetry HTTP collector endpoint (placeholder for future use)", + ) + # ignored for now serve_parser.add_argument( - "--log-format", type=str, default="text", choices=["text", "json"] + "--otel-endpoint-insecure", + type=str, + default="", + help="Open Telemetry HTTP collector endpoint (for development only) (placeholder for future use)", ) + serve_parser.add_argument( "--address", type=str, @@ -87,6 +170,7 @@ def run(self, args): def _serve(self, args): logger = get_logger(args) + self._plugin.set_logger(logger) self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) discovery_pb2_grpc.add_DiscoveryServicer_to_server( DiscoveryServicer([3]), self._server @@ -95,7 +179,7 @@ def _serve(self, args): PluginServicer(self._plugin, logger), self._server ) self._server.add_insecure_port(args.address) - print("Starting server. Listening on " + args.address) + logger.info("Starting server", address=args.address) self._server.start() self._server.wait_for_termination() @@ -103,8 +187,16 @@ def stop(self): self._server.stop(5) def _generate_docs(self, args): - print("Generating docs in format: " + args.format) + logger = get_logger(args) + logger.info("Generating docs", format=args.format) generator = Generator( - self._plugin.name(), self._plugin.get_tables(tables=["*"], skip_tables=[]) + self._plugin.name(), + self._plugin.get_tables( + options=TableOptions( + tables=["*"], + skip_tables=[], + skip_dependent_tables=False, + ) + ), ) generator.generate(args.directory, args.format) diff --git a/cloudquery/sdk/transformers/openapi.py b/cloudquery/sdk/transformers/openapi.py index 23af197..300acce 100644 --- a/cloudquery/sdk/transformers/openapi.py +++ b/cloudquery/sdk/transformers/openapi.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict, List, Optional import pyarrow as pa from cloudquery.sdk.types import JSONType from cloudquery.sdk.schema import Column @@ -24,11 +24,24 @@ def oapi_type_to_arrow_type(field) -> pa.DataType: return pa.string() -def oapi_definition_to_columns(definition: Dict) -> List[Column]: +def get_column_by_name(columns: List[Column], name: str) -> Optional[Column]: + for column in columns: + if column.name == name: + return column + return None + + +def oapi_definition_to_columns(definition: Dict, override_columns=[]) -> List[Column]: columns = [] for key, value in definition["properties"].items(): column_type = oapi_type_to_arrow_type(value) - columns.append( - Column(name=key, type=column_type, description=value.get("description")) + column = Column( + name=key, type=column_type, description=value.get("description") ) + override_column = get_column_by_name(override_columns, key) + if override_column is not None: + column.type = override_column.type + column.primary_key = override_column.primary_key + column.unique = override_column.unique + columns.append(column) return columns diff --git a/cloudquery/sdk/types/json.py b/cloudquery/sdk/types/json.py index a222e0b..28bf3bb 100644 --- a/cloudquery/sdk/types/json.py +++ b/cloudquery/sdk/types/json.py @@ -1,9 +1,9 @@ import pyarrow as pa -class JSONType(pa.PyExtensionType): +class JSONType(pa.ExtensionType): def __init__(self): - pa.PyExtensionType.__init__(self, pa.binary()) + pa.ExtensionType.__init__(self, extension_name="json", storage_type=pa.binary()) def __reduce__(self): return JSONType, () diff --git a/cloudquery/sdk/types/uuid.py b/cloudquery/sdk/types/uuid.py index 0041cc4..a9ef571 100644 --- a/cloudquery/sdk/types/uuid.py +++ b/cloudquery/sdk/types/uuid.py @@ -1,9 +1,11 @@ import pyarrow as pa -class UUIDType(pa.PyExtensionType): +class UUIDType(pa.ExtensionType): def __init__(self): - pa.PyExtensionType.__init__(self, pa.binary(16)) + pa.ExtensionType.__init__( + self, extension_name="uuid", storage_type=pa.binary(16) + ) def __reduce__(self): return UUIDType, () diff --git a/setup.py b/setup.py index 40c514a..65f9a80 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ ] setuptools.setup( name=name, - version="0.0.8", + version="0.0.9", description=description, long_description=long_description, author="CloudQuery LTD", diff --git a/tests/scheduler/scheduler.py b/tests/scheduler/scheduler.py index a60321c..3e5458a 100644 --- a/tests/scheduler/scheduler.py +++ b/tests/scheduler/scheduler.py @@ -1,9 +1,10 @@ from typing import Any, List, Generator + import pyarrow as pa -import pytest -from cloudquery.sdk.scheduler import Scheduler, TableResolver -from cloudquery.sdk.schema import Table, Column, Resource + from cloudquery.sdk.message import SyncMessage +from cloudquery.sdk.scheduler import Scheduler, TableResolver +from cloudquery.sdk.schema import Column from cloudquery.sdk.schema.table import Table @@ -36,7 +37,8 @@ def resolve(self, client, parent_resource) -> Generator[Any, None, None]: class TestClient: - pass + def id(self): + return "test_client" def test_scheduler(): @@ -46,10 +48,12 @@ def test_scheduler(): expected_record1 = pa.record_batch([[1]], schema=table1.to_arrow_schema()) table2 = Table("test_child_table", [Column("test_child_column", pa.int64())]) expected_record2 = pa.record_batch([[2]], schema=table2.to_arrow_schema()) - resources: List[SyncMessage] = [] - for resource in s.sync(client, [SchedulerTestTableResolver()]): - resources.append(resource) - assert len(resources) == 3 - assert resources[1].record == expected_record1 - assert resources[2].record == expected_record2 + messages: List[SyncMessage] = [] + for message in s.sync(client, [SchedulerTestTableResolver()]): + messages.append(message) + assert len(messages) == 4 + assert Table.from_arrow_schema(messages[0].table).name == "test_table" + assert Table.from_arrow_schema(messages[1].table).name == "test_child_table" + assert messages[2].record == expected_record1 + assert messages[3].record == expected_record2 s.shutdown() diff --git a/tests/scheduler/table_resolver.py b/tests/scheduler/table_resolver.py new file mode 100644 index 0000000..3a11575 --- /dev/null +++ b/tests/scheduler/table_resolver.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass + +import pyarrow as pa + +from cloudquery.sdk.scheduler import TableResolver +from cloudquery.sdk.schema import Table, Resource, Column + + +@dataclass +class TestItem: + test_column2: int = 1 + + +def test_table_resolver_resolve_column(): + test_table = Table( + name="test_table", + columns=[ + Column(name="test_column", type=pa.int64()), + Column(name="test_column2", type=pa.int64()), + ], + ) + resource_dict = Resource(table=test_table, parent=None, item={"test_column": 1}) + resource_obj = Resource( + table=test_table, parent=None, item=TestItem(test_column2=2) + ) + test_resolver = TableResolver(table=test_table, child_resolvers=[]) + test_resolver.resolve_column( + client=None, resource=resource_dict, column_name="test_column" + ) + test_resolver.resolve_column( + client=None, resource=resource_obj, column_name="test_column2" + ) + assert resource_dict.to_list_of_arr() == [[1], [None]] + assert resource_obj.to_list_of_arr() == [[None], [2]]