From a1b07e8624a335c7ffc37f58913ec103305fd46a Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Tue, 1 Aug 2023 17:24:39 +0300 Subject: [PATCH 01/13] fix: Log error on table resolver exception/error (#16) The previous logger was raising an exception inside an exception handler --- cloudquery/sdk/scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index ae9b7ba..073e01d 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -142,7 +142,7 @@ def resolve_table( "table resolver finished with error", table=resolver.table.name, depth=depth, - exception=e, + exec_info=e, ) finally: res.put(TableResolverStarted(count=table_resolvers_started)) From 2a5996b553e9cbb4b1e8dc95d22872a866d32c8b Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 15:48:33 +0100 Subject: [PATCH 02/13] fix: Fix exception logging (#18) It's `exc_info` not `exec_info`, and `exc_info` just needs to evaluate to True. It takes the exception from the context. --- cloudquery/sdk/scheduler/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index 073e01d..d8e383c 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -110,7 +110,7 @@ def resolve_table( "failed to resolve resource", table=resolver.table.name, depth=depth, - exception=e, + exc_info=True, ) continue res.put(SyncInsertMessage(resource.to_arrow_record())) @@ -142,7 +142,7 @@ def resolve_table( "table resolver finished with error", table=resolver.table.name, depth=depth, - exec_info=e, + exc_info=True, ) finally: res.put(TableResolverStarted(count=table_resolvers_started)) From c0cdf55a49ebbb1c8ed51022d4a5910f51378e74 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 15:54:30 +0100 Subject: [PATCH 03/13] fix: Fix JSON type handling (#19) --- cloudquery/sdk/scalar/scalar_factory.py | 3 ++- cloudquery/sdk/scalar/string.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cloudquery/sdk/scalar/scalar_factory.py b/cloudquery/sdk/scalar/scalar_factory.py index 6eabaca..8bbf1d4 100644 --- a/cloudquery/sdk/scalar/scalar_factory.py +++ b/cloudquery/sdk/scalar/scalar_factory.py @@ -7,6 +7,7 @@ from .date64 import Date64 from .float import Float from .int import Int +from .json import JSON from .list import List from .scalar import ScalarInvalidTypeError from .string import String @@ -89,6 +90,6 @@ def new_scalar(self, dt: pa.DataType): elif dt == UUIDType(): return UUID() elif dt == JSONType(): - return String() + return JSON() else: raise ScalarInvalidTypeError("Invalid type {} for scalar".format(dt)) diff --git a/cloudquery/sdk/scalar/string.py b/cloudquery/sdk/scalar/string.py index f9bbc26..9fd3a79 100644 --- a/cloudquery/sdk/scalar/string.py +++ b/cloudquery/sdk/scalar/string.py @@ -1,5 +1,4 @@ from cloudquery.sdk.scalar import Scalar, ScalarInvalidTypeError -from .scalar import NULL_VALUE class String(Scalar): From 146c5498cb1ecf54a55bb5a60ce2cb0a0228c2ed Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 16:17:28 +0100 Subject: [PATCH 04/13] fix: Fix extension type definitions (#20) `PyExtensionType` is a special type for extensions backed by Python pickle, so it's not what we want. We should use the base `ExtensionType` --- cloudquery/sdk/types/json.py | 4 ++-- cloudquery/sdk/types/uuid.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cloudquery/sdk/types/json.py b/cloudquery/sdk/types/json.py index a222e0b..28bf3bb 100644 --- a/cloudquery/sdk/types/json.py +++ b/cloudquery/sdk/types/json.py @@ -1,9 +1,9 @@ import pyarrow as pa -class JSONType(pa.PyExtensionType): +class JSONType(pa.ExtensionType): def __init__(self): - pa.PyExtensionType.__init__(self, pa.binary()) + pa.ExtensionType.__init__(self, extension_name="json", storage_type=pa.binary()) def __reduce__(self): return JSONType, () diff --git a/cloudquery/sdk/types/uuid.py b/cloudquery/sdk/types/uuid.py index 0041cc4..a9ef571 100644 --- a/cloudquery/sdk/types/uuid.py +++ b/cloudquery/sdk/types/uuid.py @@ -1,9 +1,11 @@ import pyarrow as pa -class UUIDType(pa.PyExtensionType): +class UUIDType(pa.ExtensionType): def __init__(self): - pa.PyExtensionType.__init__(self, pa.binary(16)) + pa.ExtensionType.__init__( + self, extension_name="uuid", storage_type=pa.binary(16) + ) def __reduce__(self): return UUIDType, () From 536e16303a0ac18f37a468d9e2b97b4438bc596e Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 16:44:25 +0100 Subject: [PATCH 05/13] fix: Emit migrate messages for child relations (#21) A fix for the scheduler so that migrate messages are emitted for child tables. --- cloudquery/sdk/scheduler/scheduler.py | 12 ++++++++++-- tests/scheduler/scheduler.py | 21 ++++++++++++--------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index d8e383c..c951305 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -171,8 +171,8 @@ def sync( self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() - for resolver in resolvers: - yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) + yield from self._send_migrate_table_messages(resolvers) + thread = futures.ThreadPoolExecutor() thread.submit(self._sync, client, resolvers, res, deterministic_cq_id) total_table_resolvers = 0 @@ -191,3 +191,11 @@ def sync( continue yield message thread.shutdown(wait=True) + + def _send_migrate_table_messages( + self, resolvers: List[TableResolver] + ) -> Generator[SyncMessage, None, None]: + for resolver in resolvers: + yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) + if resolver.child_resolvers: + yield from self._send_migrate_table_messages(resolver.child_resolvers) diff --git a/tests/scheduler/scheduler.py b/tests/scheduler/scheduler.py index a60321c..1a6eb84 100644 --- a/tests/scheduler/scheduler.py +++ b/tests/scheduler/scheduler.py @@ -1,9 +1,10 @@ from typing import Any, List, Generator + import pyarrow as pa -import pytest -from cloudquery.sdk.scheduler import Scheduler, TableResolver -from cloudquery.sdk.schema import Table, Column, Resource + from cloudquery.sdk.message import SyncMessage +from cloudquery.sdk.scheduler import Scheduler, TableResolver +from cloudquery.sdk.schema import Column from cloudquery.sdk.schema.table import Table @@ -46,10 +47,12 @@ def test_scheduler(): expected_record1 = pa.record_batch([[1]], schema=table1.to_arrow_schema()) table2 = Table("test_child_table", [Column("test_child_column", pa.int64())]) expected_record2 = pa.record_batch([[2]], schema=table2.to_arrow_schema()) - resources: List[SyncMessage] = [] - for resource in s.sync(client, [SchedulerTestTableResolver()]): - resources.append(resource) - assert len(resources) == 3 - assert resources[1].record == expected_record1 - assert resources[2].record == expected_record2 + messages: List[SyncMessage] = [] + for message in s.sync(client, [SchedulerTestTableResolver()]): + messages.append(message) + assert len(messages) == 4 + assert Table.from_arrow_schema(messages[0].table).name == "test_table" + assert Table.from_arrow_schema(messages[1].table).name == "test_child_table" + assert messages[2].record == expected_record1 + assert messages[3].record == expected_record2 s.shutdown() From 9936ced487ba387c21385c372f163557bd46ba69 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 17:18:41 +0100 Subject: [PATCH 06/13] fix: Fix column resolver resource set (#23) --- cloudquery/sdk/scheduler/table_resolver.py | 7 +++-- tests/scheduler/table_resolver.py | 34 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 tests/scheduler/table_resolver.py diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index 84cdc25..3cde6bd 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -1,7 +1,8 @@ -from cloudquery.sdk.schema.table import Table -from cloudquery.sdk.schema import Resource from typing import Any, Generator +from cloudquery.sdk.schema import Resource +from cloudquery.sdk.schema.table import Table + class TableResolver: def __init__(self, table: Table, child_resolvers=[]) -> None: @@ -31,7 +32,7 @@ def resolve_column(self, client, resource: Resource, column_name: str): resource.set(column_name, resource.item[column_name]) else: if hasattr(resource.item, column_name): - resource.set(column_name, resource.item[column_name]) + resource.set(column_name, getattr(resource.item, column_name)) def post_resource_resolve(self, client, resource): return diff --git a/tests/scheduler/table_resolver.py b/tests/scheduler/table_resolver.py new file mode 100644 index 0000000..3a11575 --- /dev/null +++ b/tests/scheduler/table_resolver.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass + +import pyarrow as pa + +from cloudquery.sdk.scheduler import TableResolver +from cloudquery.sdk.schema import Table, Resource, Column + + +@dataclass +class TestItem: + test_column2: int = 1 + + +def test_table_resolver_resolve_column(): + test_table = Table( + name="test_table", + columns=[ + Column(name="test_column", type=pa.int64()), + Column(name="test_column2", type=pa.int64()), + ], + ) + resource_dict = Resource(table=test_table, parent=None, item={"test_column": 1}) + resource_obj = Resource( + table=test_table, parent=None, item=TestItem(test_column2=2) + ) + test_resolver = TableResolver(table=test_table, child_resolvers=[]) + test_resolver.resolve_column( + client=None, resource=resource_dict, column_name="test_column" + ) + test_resolver.resolve_column( + client=None, resource=resource_obj, column_name="test_column2" + ) + assert resource_dict.to_list_of_arr() == [[1], [None]] + assert resource_obj.to_list_of_arr() == [[None], [2]] From a53bb0e0e1eda99705fc96c0c7d0ac79e814a8d7 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Tue, 1 Aug 2023 19:45:32 +0300 Subject: [PATCH 07/13] feat: Add override_columns to openapi transformer (#22) --- cloudquery/sdk/transformers/openapi.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/cloudquery/sdk/transformers/openapi.py b/cloudquery/sdk/transformers/openapi.py index 23af197..300acce 100644 --- a/cloudquery/sdk/transformers/openapi.py +++ b/cloudquery/sdk/transformers/openapi.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict, List, Optional import pyarrow as pa from cloudquery.sdk.types import JSONType from cloudquery.sdk.schema import Column @@ -24,11 +24,24 @@ def oapi_type_to_arrow_type(field) -> pa.DataType: return pa.string() -def oapi_definition_to_columns(definition: Dict) -> List[Column]: +def get_column_by_name(columns: List[Column], name: str) -> Optional[Column]: + for column in columns: + if column.name == name: + return column + return None + + +def oapi_definition_to_columns(definition: Dict, override_columns=[]) -> List[Column]: columns = [] for key, value in definition["properties"].items(): column_type = oapi_type_to_arrow_type(value) - columns.append( - Column(name=key, type=column_type, description=value.get("description")) + column = Column( + name=key, type=column_type, description=value.get("description") ) + override_column = get_column_by_name(override_columns, key) + if override_column is not None: + column.type = override_column.type + column.primary_key = override_column.primary_key + column.unique = override_column.unique + columns.append(column) return columns From 17fee278f05449584c6a781f6560c7f5faf431c6 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 17:50:49 +0100 Subject: [PATCH 08/13] fix: Fix race in scheduler (#25) This fixes a data race in the scheduler that caused some child resources to be missed. The implementation is now correct, but it could be made more efficient. The point is that we have to submit the sentinel indicating the start of the resolver before submitting the job, otherwise the job might send a "finished" signal before the "start" signal is sent, causing the counts to be equal before all resources have actually been processed. --- cloudquery/sdk/scheduler/scheduler.py | 32 +++++++++------------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index c951305..719592b 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -22,12 +22,8 @@ def __init__(self, maxsize, *args, **kwargs): class TableResolverStarted: - def __init__(self, count=1) -> None: - self._count = count - - @property - def count(self): - return self._count + def __init__(self) -> None: + pass class TableResolverFinished: @@ -89,7 +85,6 @@ def resolve_table( parent_item: Resource, res: queue.Queue, ): - table_resolvers_started = 0 try: if depth == 0: self._logger.info( @@ -115,6 +110,7 @@ def resolve_table( continue res.put(SyncInsertMessage(resource.to_arrow_record())) for child_resolvers in resolver.child_resolvers: + res.put(TableResolverStarted()) self._pools[depth + 1].submit( self.resolve_table, child_resolvers, @@ -123,7 +119,6 @@ def resolve_table( resource, res, ) - table_resolvers_started += 1 total_resources += 1 if depth == 0: self._logger.info( @@ -145,7 +140,6 @@ def resolve_table( exc_info=True, ) finally: - res.put(TableResolverStarted(count=table_resolvers_started)) res.put(TableResolverFinished()) def _sync( @@ -155,17 +149,13 @@ def _sync( res: queue.Queue, deterministic_cq_id=False, ): - total_table_resolvers = 0 - try: - for resolver in resolvers: - clients = resolver.multiplex(client) - for client in clients: - self._pools[0].submit( - self.resolve_table, resolver, 0, client, None, res - ) - total_table_resolvers += 1 - finally: - res.put(TableResolverStarted(total_table_resolvers)) + for resolver in resolvers: + clients = resolver.multiplex(client) + for client in clients: + res.put(TableResolverStarted()) + self._pools[0].submit( + self.resolve_table, resolver, 0, client, None, res + ) def sync( self, client, resolvers: List[TableResolver], deterministic_cq_id=False @@ -180,7 +170,7 @@ def sync( while True: message = res.get() if type(message) == TableResolverStarted: - total_table_resolvers += message.count + total_table_resolvers += 1 if total_table_resolvers == finished_table_resolvers: break continue From 505f94bf67f9835cc3463df854943c6f00c2d281 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Tue, 1 Aug 2023 19:57:42 +0300 Subject: [PATCH 09/13] fix: Add better logging for scheduler (#24) --- cloudquery/sdk/scheduler/scheduler.py | 23 +++++++++++++++++----- cloudquery/sdk/scheduler/table_resolver.py | 18 ++++++++++++----- cloudquery/sdk/serve/plugin.py | 2 +- tests/scheduler/scheduler.py | 3 ++- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index 719592b..c4e594d 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -10,7 +10,7 @@ SyncMigrateTableMessage, ) from cloudquery.sdk.schema import Resource -from .table_resolver import TableResolver +from .table_resolver import TableResolver, Client QUEUE_PER_WORKER = 100 @@ -68,7 +68,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client: Client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -81,18 +81,24 @@ def resolve_table( self, resolver: TableResolver, depth: int, - client, + client: Client, parent_item: Resource, res: queue.Queue, ): try: if depth == 0: self._logger.info( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) else: self._logger.debug( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) total_resources = 0 for item in resolver.resolve(client, parent_item): @@ -103,6 +109,7 @@ def resolve_table( except Exception as e: self._logger.error( "failed to resolve resource", + client_id=client.id(), table=resolver.table.name, depth=depth, exc_info=True, @@ -123,19 +130,25 @@ def resolve_table( if depth == 0: self._logger.info( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) else: self._logger.debug( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) except Exception as e: self._logger.error( "table resolver finished with error", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, exc_info=True, ) diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index 3cde6bd..d1ffd77 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -1,4 +1,12 @@ -from typing import Any, Generator +from cloudquery.sdk.schema.table import Table +from cloudquery.sdk.schema import Resource +from typing import Any, Generator, List + + +class Client: + def id(self) -> str: + raise NotImplementedError() + from cloudquery.sdk.schema import Resource from cloudquery.sdk.schema.table import Table @@ -17,16 +25,16 @@ def table(self) -> Table: def child_resolvers(self): return self._child_resolvers - def multiplex(self, client): + def multiplex(self, client: Client) -> List[Client]: return [client] def resolve(self, client, parent_resource) -> Generator[Any, None, None]: raise NotImplementedError() - def pre_resource_resolve(self, client, resource): + def pre_resource_resolve(self, client: Client, resource): return - def resolve_column(self, client, resource: Resource, column_name: str): + def resolve_column(self, client: Client, resource: Resource, column_name: str): if type(resource.item) is dict: if column_name in resource.item: resource.set(column_name, resource.item[column_name]) @@ -34,5 +42,5 @@ def resolve_column(self, client, resource: Resource, column_name: str): if hasattr(resource.item, column_name): resource.set(column_name, getattr(resource.item, column_name)) - def post_resource_resolve(self, client, resource): + def post_resource_resolve(self, client: Client, resource): return diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index f51503d..7842425 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -16,7 +16,7 @@ def get_logger(args): - log = structlog.get_logger() + log = structlog.get_logger(processors=[structlog.processors.JSONRenderer()]) return log diff --git a/tests/scheduler/scheduler.py b/tests/scheduler/scheduler.py index 1a6eb84..3e5458a 100644 --- a/tests/scheduler/scheduler.py +++ b/tests/scheduler/scheduler.py @@ -37,7 +37,8 @@ def resolve(self, client, parent_resource) -> Generator[Any, None, None]: class TestClient: - pass + def id(self): + return "test_client" def test_scheduler(): From 0bd216b6fbfb84d04887a4b81c724d59d080731c Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Wed, 2 Aug 2023 11:18:20 +0100 Subject: [PATCH 10/13] Fix docs signature (#27) --- cloudquery/sdk/serve/plugin.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index 7842425..d412a74 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -1,16 +1,16 @@ import argparse -import structlog from concurrent import futures import grpc +import structlog import sys - from cloudquery.discovery_v1 import discovery_pb2_grpc from cloudquery.plugin_v3 import plugin_pb2_grpc + from cloudquery.sdk.docs.generator import Generator from cloudquery.sdk.internal.servers.discovery_v1.discovery import DiscoveryServicer from cloudquery.sdk.internal.servers.plugin_v3 import PluginServicer -from cloudquery.sdk.plugin.plugin import Plugin +from cloudquery.sdk.plugin.plugin import Plugin, TableOptions DOC_FORMATS = ["json", "markdown"] @@ -105,6 +105,13 @@ def stop(self): def _generate_docs(self, args): print("Generating docs in format: " + args.format) generator = Generator( - self._plugin.name(), self._plugin.get_tables(tables=["*"], skip_tables=[]) + self._plugin.name(), + self._plugin.get_tables( + options=TableOptions( + tables=["*"], + skip_tables=[], + skip_dependent_tables=False, + ) + ), ) generator.generate(args.directory, args.format) From 106781b0ddfa5d5be77890fc0f9b3fe1cee10848 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Wed, 2 Aug 2023 14:35:02 +0300 Subject: [PATCH 11/13] feat: Wire logging with cli flags (#26) --- cloudquery/sdk/plugin/plugin.py | 3 ++ cloudquery/sdk/scheduler/scheduler.py | 2 ++ cloudquery/sdk/serve/plugin.py | 47 ++++++++++++++++++++++++++- 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/cloudquery/sdk/plugin/plugin.py b/cloudquery/sdk/plugin/plugin.py index 4765cf2..7fc76f6 100644 --- a/cloudquery/sdk/plugin/plugin.py +++ b/cloudquery/sdk/plugin/plugin.py @@ -38,6 +38,9 @@ def __init__(self, name: str, version: str) -> None: def init(self, spec: bytes) -> None: pass + def set_logger(self, logger) -> None: + pass + def name(self) -> str: return self._name diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index c4e594d..8f8d111 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -39,6 +39,8 @@ def __init__( self._max_depth = max_depth if logger is None: self._logger = structlog.get_logger() + else: + self._logger = logger if concurrency <= 0: raise ValueError("concurrency must be greater than 0") if max_depth <= 0: diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index d412a74..8df531a 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -4,6 +4,8 @@ import grpc import structlog import sys +import os + from cloudquery.discovery_v1 import discovery_pb2_grpc from cloudquery.plugin_v3 import plugin_pb2_grpc @@ -15,8 +17,50 @@ DOC_FORMATS = ["json", "markdown"] +_IS_WINDOWS = sys.platform == "win32" + +try: + import colorama +except ImportError: + colorama = None + +if _IS_WINDOWS: # pragma: no cover + # On Windows, use colors by default only if Colorama is installed. + _has_colors = colorama is not None +else: + # On other OSes, use colors by default. + _has_colors = True + + def get_logger(args): - log = structlog.get_logger(processors=[structlog.processors.JSONRenderer()]) + processors = [ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.dev.set_exc_info, + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False), + ] + if args.log_format == "text": + processors.append( + structlog.dev.ConsoleRenderer( + colors=os.environ.get("NO_COLOR", "") == "" + and ( + os.environ.get("FORCE_COLOR", "") != "" + or ( + _has_colors + and sys.stdout is not None + and hasattr(sys.stdout, "isatty") + and sys.stdout.isatty() + ) + ) + ) + ) + else: + processors.append(structlog.processors.JSONRenderer()) + + # if args.log_format == "json": + # processors.append(structlog.processors.JSONRenderer()) + log = structlog.get_logger(processors=processors) return log @@ -87,6 +131,7 @@ def run(self, args): def _serve(self, args): logger = get_logger(args) + self._plugin.set_logger(logger) self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) discovery_pb2_grpc.add_DiscoveryServicer_to_server( DiscoveryServicer([3]), self._server From 5d52af99cb98906faee1485ea10edf05c09155e5 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Wed, 2 Aug 2023 15:11:52 +0100 Subject: [PATCH 12/13] fix: Add more command-line args, use standard logging (#29) * Fix docs signature * Command line args * Fmt * update --- cloudquery/sdk/serve/plugin.py | 60 ++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index 8df531a..74082f7 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -1,13 +1,14 @@ import argparse +import logging +import os from concurrent import futures import grpc import structlog import sys -import os - from cloudquery.discovery_v1 import discovery_pb2_grpc from cloudquery.plugin_v3 import plugin_pb2_grpc +from structlog import wrap_logger from cloudquery.sdk.docs.generator import Generator from cloudquery.sdk.internal.servers.discovery_v1.discovery import DiscoveryServicer @@ -16,7 +17,6 @@ DOC_FORMATS = ["json", "markdown"] - _IS_WINDOWS = sys.platform == "win32" try: @@ -33,12 +33,27 @@ def get_logger(args): + log_level_map = { + "debug": logging.DEBUG, + "info": logging.INFO, + "warning": logging.WARNING, + "error": logging.ERROR, + "critical": logging.CRITICAL, + } + + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=log_level_map.get(args.log_level.lower(), logging.INFO), + ) + processors = [ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.StackInfoRenderer(), structlog.dev.set_exc_info, - structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False), + structlog.stdlib.filter_by_level, + structlog.processors.TimeStamper(fmt="%Y-%m-%dT%H:%M:%SZ", utc=True), ] if args.log_format == "text": processors.append( @@ -58,9 +73,7 @@ def get_logger(args): else: processors.append(structlog.processors.JSONRenderer()) - # if args.log_format == "json": - # processors.append(structlog.processors.JSONRenderer()) - log = structlog.get_logger(processors=processors) + log = wrap_logger(logging.getLogger(), processors=processors) return log @@ -73,6 +86,13 @@ def run(self, args): subparsers = parser.add_subparsers(dest="command", required=True) serve_parser = subparsers.add_parser("serve", help="Start plugin server") + serve_parser.add_argument( + "--log-format", + type=str, + default="text", + choices=["text", "json"], + help="logging format", + ) serve_parser.add_argument( "--log-level", type=str, @@ -80,9 +100,28 @@ def run(self, args): choices=["trace", "debug", "info", "warn", "error"], help="log level", ) + + # ignored for now serve_parser.add_argument( - "--log-format", type=str, default="text", choices=["text", "json"] + "--no-sentry", + action="store_true", + help="disable sentry (placeholder for future use)", ) + # ignored for now + serve_parser.add_argument( + "--otel-endpoint", + type=str, + default="", + help="Open Telemetry HTTP collector endpoint (placeholder for future use)", + ) + # ignored for now + serve_parser.add_argument( + "--otel-endpoint-insecure", + type=str, + default="", + help="Open Telemetry HTTP collector endpoint (for development only) (placeholder for future use)", + ) + serve_parser.add_argument( "--address", type=str, @@ -140,7 +179,7 @@ def _serve(self, args): PluginServicer(self._plugin, logger), self._server ) self._server.add_insecure_port(args.address) - print("Starting server. Listening on " + args.address) + logger.info("Starting server", address=args.address) self._server.start() self._server.wait_for_termination() @@ -148,7 +187,8 @@ def stop(self): self._server.stop(5) def _generate_docs(self, args): - print("Generating docs in format: " + args.format) + logger = get_logger(args) + logger.info("Generating docs", format=args.format) generator = Generator( self._plugin.name(), self._plugin.get_tables( From 573c9eb47771c5ad14eea25035ffe3f776d85d89 Mon Sep 17 00:00:00 2001 From: CloudQuery Bot <102256036+cq-bot@users.noreply.github.com> Date: Wed, 2 Aug 2023 17:14:01 +0300 Subject: [PATCH 13/13] chore(main): Release v0.0.9 (#17) :robot: I have created a release *beep* *boop* --- ## [0.0.9](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.8...v0.0.9) (2023-08-02) ### Features * Add override_columns to openapi transformer ([#22](https://github.com/cloudquery/plugin-sdk-python/issues/22)) ([a53bb0e](https://github.com/cloudquery/plugin-sdk-python/commit/a53bb0e0e1eda99705fc96c0c7d0ac79e814a8d7)) * Wire logging with cli flags ([#26](https://github.com/cloudquery/plugin-sdk-python/issues/26)) ([106781b](https://github.com/cloudquery/plugin-sdk-python/commit/106781b0ddfa5d5be77890fc0f9b3fe1cee10848)) ### Bug Fixes * Add better logging for scheduler ([#24](https://github.com/cloudquery/plugin-sdk-python/issues/24)) ([505f94b](https://github.com/cloudquery/plugin-sdk-python/commit/505f94bf67f9835cc3463df854943c6f00c2d281)) * Add more command-line args, use standard logging ([#29](https://github.com/cloudquery/plugin-sdk-python/issues/29)) ([5d52af9](https://github.com/cloudquery/plugin-sdk-python/commit/5d52af99cb98906faee1485ea10edf05c09155e5)) * Emit migrate messages for child relations ([#21](https://github.com/cloudquery/plugin-sdk-python/issues/21)) ([536e163](https://github.com/cloudquery/plugin-sdk-python/commit/536e16303a0ac18f37a468d9e2b97b4438bc596e)) * Fix column resolver resource set ([#23](https://github.com/cloudquery/plugin-sdk-python/issues/23)) ([9936ced](https://github.com/cloudquery/plugin-sdk-python/commit/9936ced487ba387c21385c372f163557bd46ba69)) * Fix exception logging ([#18](https://github.com/cloudquery/plugin-sdk-python/issues/18)) ([2a5996b](https://github.com/cloudquery/plugin-sdk-python/commit/2a5996b553e9cbb4b1e8dc95d22872a866d32c8b)) * Fix extension type definitions ([#20](https://github.com/cloudquery/plugin-sdk-python/issues/20)) ([146c549](https://github.com/cloudquery/plugin-sdk-python/commit/146c5498cb1ecf54a55bb5a60ce2cb0a0228c2ed)) * Fix JSON type handling ([#19](https://github.com/cloudquery/plugin-sdk-python/issues/19)) ([c0cdf55](https://github.com/cloudquery/plugin-sdk-python/commit/c0cdf55a49ebbb1c8ed51022d4a5910f51378e74)) * Fix race in scheduler ([#25](https://github.com/cloudquery/plugin-sdk-python/issues/25)) ([17fee27](https://github.com/cloudquery/plugin-sdk-python/commit/17fee278f05449584c6a781f6560c7f5faf431c6)) * Log error on table resolver exception/error ([#16](https://github.com/cloudquery/plugin-sdk-python/issues/16)) ([a1b07e8](https://github.com/cloudquery/plugin-sdk-python/commit/a1b07e8624a335c7ffc37f58913ec103305fd46a)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --- CHANGELOG.md | 21 +++++++++++++++++++++ setup.py | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 088fd56..ec8db45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## [0.0.9](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.8...v0.0.9) (2023-08-02) + + +### Features + +* Add override_columns to openapi transformer ([#22](https://github.com/cloudquery/plugin-sdk-python/issues/22)) ([a53bb0e](https://github.com/cloudquery/plugin-sdk-python/commit/a53bb0e0e1eda99705fc96c0c7d0ac79e814a8d7)) +* Wire logging with cli flags ([#26](https://github.com/cloudquery/plugin-sdk-python/issues/26)) ([106781b](https://github.com/cloudquery/plugin-sdk-python/commit/106781b0ddfa5d5be77890fc0f9b3fe1cee10848)) + + +### Bug Fixes + +* Add better logging for scheduler ([#24](https://github.com/cloudquery/plugin-sdk-python/issues/24)) ([505f94b](https://github.com/cloudquery/plugin-sdk-python/commit/505f94bf67f9835cc3463df854943c6f00c2d281)) +* Add more command-line args, use standard logging ([#29](https://github.com/cloudquery/plugin-sdk-python/issues/29)) ([5d52af9](https://github.com/cloudquery/plugin-sdk-python/commit/5d52af99cb98906faee1485ea10edf05c09155e5)) +* Emit migrate messages for child relations ([#21](https://github.com/cloudquery/plugin-sdk-python/issues/21)) ([536e163](https://github.com/cloudquery/plugin-sdk-python/commit/536e16303a0ac18f37a468d9e2b97b4438bc596e)) +* Fix column resolver resource set ([#23](https://github.com/cloudquery/plugin-sdk-python/issues/23)) ([9936ced](https://github.com/cloudquery/plugin-sdk-python/commit/9936ced487ba387c21385c372f163557bd46ba69)) +* Fix exception logging ([#18](https://github.com/cloudquery/plugin-sdk-python/issues/18)) ([2a5996b](https://github.com/cloudquery/plugin-sdk-python/commit/2a5996b553e9cbb4b1e8dc95d22872a866d32c8b)) +* Fix extension type definitions ([#20](https://github.com/cloudquery/plugin-sdk-python/issues/20)) ([146c549](https://github.com/cloudquery/plugin-sdk-python/commit/146c5498cb1ecf54a55bb5a60ce2cb0a0228c2ed)) +* Fix JSON type handling ([#19](https://github.com/cloudquery/plugin-sdk-python/issues/19)) ([c0cdf55](https://github.com/cloudquery/plugin-sdk-python/commit/c0cdf55a49ebbb1c8ed51022d4a5910f51378e74)) +* Fix race in scheduler ([#25](https://github.com/cloudquery/plugin-sdk-python/issues/25)) ([17fee27](https://github.com/cloudquery/plugin-sdk-python/commit/17fee278f05449584c6a781f6560c7f5faf431c6)) +* Log error on table resolver exception/error ([#16](https://github.com/cloudquery/plugin-sdk-python/issues/16)) ([a1b07e8](https://github.com/cloudquery/plugin-sdk-python/commit/a1b07e8624a335c7ffc37f58913ec103305fd46a)) + ## [0.0.8](https://github.com/cloudquery/plugin-sdk-python/compare/v0.0.7...v0.0.8) (2023-08-01) diff --git a/setup.py b/setup.py index 40c514a..65f9a80 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ ] setuptools.setup( name=name, - version="0.0.8", + version="0.0.9", description=description, long_description=long_description, author="CloudQuery LTD",