From f0e24cd6a09caa43aad300e2c7cfb88243b29de8 Mon Sep 17 00:00:00 2001 From: Erez Rokah Date: Wed, 16 Jul 2025 09:39:37 +0100 Subject: [PATCH 1/2] fix: Dont `await` on table resovler (#283) ~~Might fix~~ Fixes https://github.com/cloudquery/cloudquery/issues/20910 --- package-lock.json | 23 +++++++++++++ package.json | 1 + src/scheduler/scheduler.ts | 67 +++++++++++++++++++++----------------- 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/package-lock.json b/package-lock.json index bf72e39..96cc0f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "modern-errors": "^7.0.0", "modern-errors-bugs": "^5.0.0", "p-map": "^7.0.0", + "p-queue": "^8.1.0", "p-timeout": "^6.1.2", "path-exists": "^5.0.0", "path-type": "^6.0.0", @@ -3309,6 +3310,12 @@ "node": ">=0.10.0" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", + "license": "MIT" + }, "node_modules/execa": { "version": "9.5.2", "resolved": "https://registry.npmjs.org/execa/-/execa-9.5.2.tgz", @@ -5380,6 +5387,22 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "license": "MIT", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-timeout": { "version": "6.1.4", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", diff --git a/package.json b/package.json index 5f3df3d..a0939bf 100644 --- a/package.json +++ b/package.json @@ -101,6 +101,7 @@ "modern-errors": "^7.0.0", "modern-errors-bugs": "^5.0.0", "p-map": "^7.0.0", + "p-queue": "^8.1.0", "p-timeout": "^6.1.2", "path-exists": "^5.0.0", "path-type": "^6.0.0", diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index b00b799..6e4bd79 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -1,6 +1,7 @@ import { Duplex } from 'node:stream'; import pMap from 'p-map'; +import pQueue from 'p-queue'; import pTimeout from 'p-timeout'; import type { Logger } from 'winston'; @@ -39,26 +40,22 @@ export enum Strategy { } class TableResolverStream extends Duplex { - queue: unknown[] = []; - constructor() { super({ objectMode: true }); } - _read() { - while (this.queue.length > 0) { - this.push(this.queue.shift()); - } - if (this.writableEnded) { - // end readable stream if writable stream has ended - this.push(null); - } - } + _read() {} _write(chunk: unknown, _: string, next: (error?: Error | null) => void) { - this.queue.push(chunk); + this.emit('data', chunk); next(); } + + end(callback?: () => void): this { + this.emit('end'); + callback?.(); + return this; + } } const validateResource = (resource: Resource) => { @@ -92,20 +89,8 @@ const resolveTable = async ( ) => { logger.info(`resolving table ${table.name}`); const stream = new TableResolverStream(); - try { - await table.resolver(client, parent, stream); - } catch (error) { - const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, { - cause: error, - props: { table, client }, - }); - logger.error(`error resolving table ${table.name}`, tableError); - return; - } finally { - stream.end(); - } - for await (const data of stream) { + const processData = async (data: unknown) => { logger.debug(`resolving resource for table ${table.name}`); const resolveResourceTimeout = 10 * 60 * 1000; const resource = new Resource(table, parent, data); @@ -118,7 +103,7 @@ const resolveTable = async ( props: { resource, table, client }, }); logger.error(preResolverError); - continue; + return; } try { @@ -128,7 +113,7 @@ const resolveTable = async ( await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout }); } catch (error) { logger.error(`error resolving columns for table ${table.name}`, error); - continue; + return; } try { @@ -139,7 +124,7 @@ const resolveTable = async ( props: { resource, table, client }, }); logger.error(postResolveError); - continue; + return; } setCQId(resource, deterministicCQId); @@ -148,7 +133,7 @@ const resolveTable = async ( validateResource(resource); } catch (error) { logger.error(error); - continue; + return; } try { @@ -161,7 +146,7 @@ const resolveTable = async ( }, }); logger.error(encodeError); - continue; + return; } logger.debug(`done resolving resource for table ${table.name}`); @@ -169,6 +154,28 @@ const resolveTable = async ( await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream, deterministicCQId), ); + }; + + const queue = new pQueue({ concurrency: 5 }); + + stream.on('data', async (data) => { + await queue.add(() => processData(data)); + }); + + const resolverPromise = table.resolver(client, parent, stream); + + try { + await resolverPromise; + } catch (error) { + const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, { + cause: error, + props: { table, client }, + }); + logger.error(`error resolving table ${table.name}`, tableError); + return; + } finally { + stream.end(); + await queue.onIdle(); } logger.info(`done resolving table ${table.name}`); From 209e595ab65e6831dc6edc50c10a1e51a8173853 Mon Sep 17 00:00:00 2001 From: CloudQuery Bot <102256036+cq-bot@users.noreply.github.com> Date: Wed, 16 Jul 2025 09:44:48 +0100 Subject: [PATCH 2/2] chore(main): Release v0.1.28 (#287) :robot: I have created a release *beep* *boop* --- ## [0.1.28](https://github.com/cloudquery/plugin-sdk-javascript/compare/v0.1.27...v0.1.28) (2025-07-16) ### Bug Fixes * Dont `await` on table resovler ([#283](https://github.com/cloudquery/plugin-sdk-javascript/issues/283)) ([f0e24cd](https://github.com/cloudquery/plugin-sdk-javascript/commit/f0e24cd6a09caa43aad300e2c7cfb88243b29de8)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --- .release-please-manifest.json | 2 +- CHANGELOG.md | 7 +++++++ package-lock.json | 4 ++-- package.json | 2 +- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index fb4a2b8..83f79df 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "0.1.27" + ".": "0.1.28" } diff --git a/CHANGELOG.md b/CHANGELOG.md index 2775bd2..3463d92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.1.28](https://github.com/cloudquery/plugin-sdk-javascript/compare/v0.1.27...v0.1.28) (2025-07-16) + + +### Bug Fixes + +* Dont `await` on table resovler ([#283](https://github.com/cloudquery/plugin-sdk-javascript/issues/283)) ([f0e24cd](https://github.com/cloudquery/plugin-sdk-javascript/commit/f0e24cd6a09caa43aad300e2c7cfb88243b29de8)) + ## [0.1.27](https://github.com/cloudquery/plugin-sdk-javascript/compare/v0.1.26...v0.1.27) (2025-07-01) diff --git a/package-lock.json b/package-lock.json index 96cc0f6..75085c1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@cloudquery/plugin-sdk-javascript", - "version": "0.1.27", + "version": "0.1.28", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@cloudquery/plugin-sdk-javascript", - "version": "0.1.27", + "version": "0.1.28", "license": "MPL-2.0", "dependencies": { "@apache-arrow/esnext-esm": "^19.0.0", diff --git a/package.json b/package.json index a0939bf..7ee9ba7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@cloudquery/plugin-sdk-javascript", - "version": "0.1.27", + "version": "0.1.28", "files": [ "dist", "!dist/**/*.test.*",