-
Notifications
You must be signed in to change notification settings - Fork 545
Expand file tree
/
Copy pathplugin.py
More file actions
88 lines (73 loc) · 2.78 KB
/
plugin.py
File metadata and controls
88 lines (73 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import json
from typing import List, Generator
import structlog
from cloudquery.sdk import message
from cloudquery.sdk import plugin
from cloudquery.sdk import schema
from cloudquery.sdk.scheduler import Scheduler, TableResolver
from cloudquery.sdk.stateclient.stateclient import StateClientBuilder
from plugin import tables
from plugin.client import Client, Spec
PLUGIN_NAME = "typeform"
PLUGIN_VERSION = "1.5.10" # {x-release-please-version}
class TypeformPlugin(plugin.Plugin):
def __init__(self) -> None:
super().__init__(
PLUGIN_NAME,
PLUGIN_VERSION,
opts=plugin.plugin.Options(
team="cloudquery", kind="source", json_schema=Spec.json_schema()
),
)
self._spec_json = None
self._spec = None
self._scheduler = None
self._client = None
self._logger = structlog.get_logger()
def set_logger(self, logger) -> None:
self._logger = logger
def init(self, spec, no_connection: bool = False):
if no_connection:
return
self._spec_json = json.loads(spec)
self._spec = Spec(**self._spec_json)
self._spec.validate()
self._scheduler = Scheduler(
self._spec.concurrency, self._spec.queue_size, logger=self._logger
)
self._client = Client(self._spec)
def get_tables(self, options: plugin.TableOptions) -> List[plugin.Table]:
all_tables: List[plugin.Table] = [
tables.Forms(),
]
# set parent table relationships
for table in all_tables:
for relation in table.relations:
relation.parent = table
# set initial values
if options.tables is None:
options.tables = []
if options.skip_tables is None:
options.skip_tables = []
return schema.filter_dfs(all_tables, options.tables, options.skip_tables)
def sync(
self, options: plugin.SyncOptions
) -> Generator[message.SyncMessage, None, None]:
state_client = StateClientBuilder.build(backend_options=options.backend_options)
self._scheduler.set_post_sync_hook(state_client.flush)
resolvers: list[TableResolver] = []
for table in self.get_tables(
plugin.TableOptions(
tables=options.tables,
skip_tables=options.skip_tables,
skip_dependent_tables=options.skip_dependent_tables,
)
):
resolvers.append(table.resolver)
for resolver in resolvers:
resolver.set_state_client(state_client)
for r in resolver.child_resolvers:
r.set_state_client(state_client)
return self._scheduler.sync(
self._client, resolvers, options.deterministic_cq_id
)