|
1 | 1 | import { StructRowProxy } from '@apache-arrow/esnext-esm'; |
2 | 2 | import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; |
3 | 3 |
|
4 | | -import { WriteRequest, WriteStream } from '../grpc/plugin.js'; |
5 | | -import { |
6 | | - Plugin, |
7 | | - newUnimplementedDestination, |
8 | | - newPlugin, |
9 | | - SyncOptions, |
10 | | - TableOptions, |
11 | | - NewClientOptions, |
12 | | -} from '../plugin/plugin.js'; |
| 4 | +import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js'; |
| 5 | +import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientOptions } from '../plugin/plugin.js'; |
13 | 6 | import { sync } from '../scheduler/scheduler.js'; |
14 | 7 | import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; |
15 | 8 |
|
@@ -93,7 +86,6 @@ export const newMemDBPlugin = (): Plugin => { |
93 | 86 | }; |
94 | 87 |
|
95 | 88 | const pluginClient = { |
96 | | - ...newUnimplementedDestination(), |
97 | 89 | init: (spec: string, options: NewClientOptions) => Promise.resolve(), |
98 | 90 | close: () => Promise.resolve(), |
99 | 91 | tables: (options: TableOptions) => { |
@@ -151,6 +143,32 @@ export const newMemDBPlugin = (): Plugin => { |
151 | 143 | resolve(); |
152 | 144 | }); |
153 | 145 |
|
| 146 | + stream.on('error', (error) => { |
| 147 | + reject(error); |
| 148 | + }); |
| 149 | + }); |
| 150 | + }, |
| 151 | + read(stream: ReadStream): Promise<void> { |
| 152 | + return new Promise((resolve, reject) => { |
| 153 | + stream.on('data', (request: ReadRequest) => { |
| 154 | + const table = decodeTable(request.table); |
| 155 | + |
| 156 | + try { |
| 157 | + const rows = memoryDB[table.name] || []; |
| 158 | + |
| 159 | + // We iterate over records in reverse here because we don't set an expectation |
| 160 | + // of ordering on plugins, and we want to make sure that the tests are not |
| 161 | + // dependent on the order of insertion either. |
| 162 | + for (let index = rows.length - 1; index >= 0; index--) { |
| 163 | + stream.write(rows[index]); |
| 164 | + } |
| 165 | + stream.end(); |
| 166 | + resolve(); |
| 167 | + } catch (error) { |
| 168 | + reject(error); |
| 169 | + } |
| 170 | + }); |
| 171 | + |
154 | 172 | stream.on('error', (error) => { |
155 | 173 | reject(error); |
156 | 174 | }); |
|
0 commit comments