From 388f654e8d4fb844f4d2f398a1a965f6845db1ef Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Wed, 3 Jul 2024 10:33:05 +0100 Subject: [PATCH 1/6] wip --- plugins/source/typeform/plugin/plugin.py | 7 +++++++ plugins/source/typeform/plugin/tables/form_responses.py | 2 ++ 2 files changed, 9 insertions(+) diff --git a/plugins/source/typeform/plugin/plugin.py b/plugins/source/typeform/plugin/plugin.py index 57e4904794527d..7b0a798cd8b289 100644 --- a/plugins/source/typeform/plugin/plugin.py +++ b/plugins/source/typeform/plugin/plugin.py @@ -6,6 +6,7 @@ from cloudquery.sdk import plugin from cloudquery.sdk import schema from cloudquery.sdk.scheduler import Scheduler, TableResolver +from cloudquery.sdk.stateclient.stateclient import StateClientBuilder from plugin import tables from plugin.client import Client, Spec @@ -64,6 +65,12 @@ def get_tables(self, options: plugin.TableOptions) -> List[plugin.Table]: def sync( self, options: plugin.SyncOptions ) -> Generator[message.SyncMessage, None, None]: + state_client = StateClientBuilder.build(self, options.backend_options) + self._scheduler.set_state_client(state_client) + + state_client.set_key("key", "value") + state_client.set_key("key2", "value2") + resolvers: list[TableResolver] = [] for table in self.get_tables( plugin.TableOptions( diff --git a/plugins/source/typeform/plugin/tables/form_responses.py b/plugins/source/typeform/plugin/tables/form_responses.py index 029231940b80aa..8b060d0dbd232c 100644 --- a/plugins/source/typeform/plugin/tables/form_responses.py +++ b/plugins/source/typeform/plugin/tables/form_responses.py @@ -6,6 +6,7 @@ from cloudquery.sdk.schema import Table from cloudquery.sdk.schema.resource import Resource from cloudquery.sdk.types import JSONType +from cloudquery.sdk.stateclient.stateclient import StateClient from plugin.client import Client @@ -43,6 +44,7 @@ def __init__(self, table) -> None: def resolve( self, client: Client, parent_resource: Resource ) -> Generator[Any, None, None]: + print("In FormResponsesResolver.resolve, I found this stateclient: ", self.state_client) for form_response in client.client.list_form_responses( form_id=parent_resource.item["id"] ): From 2561e0dc49495c28b63b56d2d4150897e3fa0689 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 5 Jul 2024 17:07:06 +0100 Subject: [PATCH 2/6] Modify Typeform plugin for incremental sync. --- plugins/source/typeform/plugin/plugin.py | 13 ++++++++----- .../source/typeform/plugin/tables/form_responses.py | 13 ++++++++++--- plugins/source/typeform/plugin/tables/forms.py | 3 ++- plugins/source/typeform/plugin/typeform/client.py | 6 +++--- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/plugins/source/typeform/plugin/plugin.py b/plugins/source/typeform/plugin/plugin.py index 7b0a798cd8b289..0e724783ce7b6c 100644 --- a/plugins/source/typeform/plugin/plugin.py +++ b/plugins/source/typeform/plugin/plugin.py @@ -65,11 +65,8 @@ def get_tables(self, options: plugin.TableOptions) -> List[plugin.Table]: def sync( self, options: plugin.SyncOptions ) -> Generator[message.SyncMessage, None, None]: - state_client = StateClientBuilder.build(self, options.backend_options) - self._scheduler.set_state_client(state_client) - - state_client.set_key("key", "value") - state_client.set_key("key2", "value2") + state_client = StateClientBuilder.build(backend_options=options.backend_options) + self._scheduler.set_post_sync_hook(state_client.flush) resolvers: list[TableResolver] = [] for table in self.get_tables( @@ -80,6 +77,12 @@ def sync( ) ): resolvers.append(table.resolver) + + for resolver in resolvers: + resolver.set_state_client(state_client) + for r in resolver.child_resolvers: + r.set_state_client(state_client) + return self._scheduler.sync( self._client, resolvers, options.deterministic_cq_id ) diff --git a/plugins/source/typeform/plugin/tables/form_responses.py b/plugins/source/typeform/plugin/tables/form_responses.py index 8b060d0dbd232c..b13d2318241e7b 100644 --- a/plugins/source/typeform/plugin/tables/form_responses.py +++ b/plugins/source/typeform/plugin/tables/form_responses.py @@ -31,10 +31,11 @@ def __init__(self) -> None: Column("tags", JSONType()), ], ) + self._resolver = FormResponsesResolver(table=self) @property def resolver(self): - return FormResponsesResolver(table=self) + return self._resolver class FormResponsesResolver(TableResolver): @@ -44,9 +45,15 @@ def __init__(self, table) -> None: def resolve( self, client: Client, parent_resource: Resource ) -> Generator[Any, None, None]: - print("In FormResponsesResolver.resolve, I found this stateclient: ", self.state_client) + since = self.state_client.get_key("typeform_form_responses_since") + for form_response in client.client.list_form_responses( - form_id=parent_resource.item["id"] + form_id=parent_resource.item["id"], + since=since, ): + if not since or form_response["submitted_at"] >= since: + since = form_response["submitted_at"] + self.state_client.set_key("typeform_form_responses_since", since) + form_response["form_id"] = parent_resource.item["id"] yield form_response diff --git a/plugins/source/typeform/plugin/tables/forms.py b/plugins/source/typeform/plugin/tables/forms.py index 5099c672ed2fc8..d54234aa14a66a 100644 --- a/plugins/source/typeform/plugin/tables/forms.py +++ b/plugins/source/typeform/plugin/tables/forms.py @@ -28,10 +28,11 @@ def __init__(self) -> None: ], relations=[FormResponses()], ) + self._resolver = FormsResolver(table=self) @property def resolver(self): - return FormsResolver(table=self) + return self._resolver class FormsResolver(TableResolver): diff --git a/plugins/source/typeform/plugin/typeform/client.py b/plugins/source/typeform/plugin/typeform/client.py index d8a2731ee340be..c9b2cad1eaf41b 100644 --- a/plugins/source/typeform/plugin/typeform/client.py +++ b/plugins/source/typeform/plugin/typeform/client.py @@ -24,8 +24,8 @@ def list_forms(self, page=1): if resp["page_count"] > page: yield from self.list_forms(page + 1) - def list_form_responses(self, form_id, page=1): - params = {"page": page, "page_size": 1000} + def list_form_responses(self, *, form_id, since, page=1): + params = {"page": page, "page_size": 1000, "since": since} resp = self._get(f"/forms/{form_id}/responses", params=params) if resp.status_code != 200: raise Exception(f"Failed to list form responses: {resp.text}") @@ -35,4 +35,4 @@ def list_form_responses(self, form_id, page=1): yield form if resp["page_count"] > page: - yield from self.list_form_responses(form_id, page + 1) + yield from self.list_form_responses(form_id=form_id, since=since, page=page + 1) From 20d34203c4384d672bbd47ee9dafe4bfc506b7e0 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 5 Jul 2024 17:35:28 +0100 Subject: [PATCH 3/6] Set is_incremental=True --- plugins/source/typeform/plugin/tables/form_responses.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/source/typeform/plugin/tables/form_responses.py b/plugins/source/typeform/plugin/tables/form_responses.py index b13d2318241e7b..2a372798a4a1c7 100644 --- a/plugins/source/typeform/plugin/tables/form_responses.py +++ b/plugins/source/typeform/plugin/tables/form_responses.py @@ -30,6 +30,7 @@ def __init__(self) -> None: Column("variables", JSONType()), Column("tags", JSONType()), ], + is_incremental=True, ) self._resolver = FormResponsesResolver(table=self) From 1549b9fc30fc48ba182cfdff3440fd7782188e6a Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Mon, 8 Jul 2024 10:39:13 +0100 Subject: [PATCH 4/6] Bump state client dependencies. --- plugins/source/typeform/requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/source/typeform/requirements.txt b/plugins/source/typeform/requirements.txt index bdbe1473b7f5ae..a715d7db66c74c 100644 --- a/plugins/source/typeform/requirements.txt +++ b/plugins/source/typeform/requirements.txt @@ -1,5 +1,5 @@ -cloudquery-plugin-sdk==0.1.26 +cloudquery-plugin-sdk==0.1.27 pyarrow==15.0.2 requests==2.32.3 -pytest==8.2.1 +pytest==8.2.2 pandas==2.2.2 From 8d7367a2002a8a59aeffa6df1936937ff3455bd5 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Mon, 8 Jul 2024 10:39:59 +0100 Subject: [PATCH 5/6] Run fmt. --- plugins/source/typeform/plugin/typeform/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/source/typeform/plugin/typeform/client.py b/plugins/source/typeform/plugin/typeform/client.py index c9b2cad1eaf41b..58dc4651f82697 100644 --- a/plugins/source/typeform/plugin/typeform/client.py +++ b/plugins/source/typeform/plugin/typeform/client.py @@ -35,4 +35,6 @@ def list_form_responses(self, *, form_id, since, page=1): yield form if resp["page_count"] > page: - yield from self.list_form_responses(form_id=form_id, since=since, page=page + 1) + yield from self.list_form_responses( + form_id=form_id, since=since, page=page + 1 + ) From a73edff25e31d03b36b64f6a99ae2b782fb5ff38 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Mon, 8 Jul 2024 12:23:41 +0100 Subject: [PATCH 6/6] Fix tests. --- plugins/source/typeform/tests/tables/test_forms.py | 3 ++- plugins/source/typeform/tests/typeform/test_client.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/source/typeform/tests/tables/test_forms.py b/plugins/source/typeform/tests/tables/test_forms.py index b96c6893f3fb87..425f7fb868e1b9 100644 --- a/plugins/source/typeform/tests/tables/test_forms.py +++ b/plugins/source/typeform/tests/tables/test_forms.py @@ -37,7 +37,8 @@ def test_forms(mock_typeform_client): ] client.list_form_responses.return_value = [ { - "response_id": "response_id", + "answers": [], + "submitted_at": "2017-09-14T22:38:22Z", }, ] diff --git a/plugins/source/typeform/tests/typeform/test_client.py b/plugins/source/typeform/tests/typeform/test_client.py index c61d02a33fe5c1..0a136c4230b0c3 100644 --- a/plugins/source/typeform/tests/typeform/test_client.py +++ b/plugins/source/typeform/tests/typeform/test_client.py @@ -116,7 +116,7 @@ def test_list_forms_responses(self): base_url="http://localhost:{}".format(self.mock_server_port), access_token="fake", ) - forms = list(client.list_form_responses("form1")) + forms = list(client.list_form_responses(form_id="form1", since=None)) assert len(forms) == 2 assert forms[0]["submitted_at"] == "2017-09-14T22:38:22Z" assert forms[1]["submitted_at"] == "2017-09-14T22:33:56Z"