Skip to content

Commit 3429de0

Browse files
authored
feat: MemDB read (#43)
Fixes #41
1 parent 8f21f52 commit 3429de0

File tree

2 files changed

+33
-22
lines changed

2 files changed

+33
-22
lines changed

src/grpc/plugin.ts

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,18 @@ import { Plugin } from '../plugin/plugin.js';
55
import { encodeTables } from '../schema/table.js';
66

77
export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
8+
export class SyncRequest extends pluginV3.cloudquery.plugin.v3.Sync.Request {}
89
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}
10+
export class ReadRequest extends pluginV3.cloudquery.plugin.v3.Read.Request {}
911
export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {}
1012
export class WriteRequest extends pluginV3.cloudquery.plugin.v3.Write.Request {}
1113
export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {}
1214

13-
export type SyncStream = grpc.ServerWritableStream<
14-
pluginV3.cloudquery.plugin.v3.Sync.Request,
15-
pluginV3.cloudquery.plugin.v3.Sync.Response
16-
>;
15+
export type SyncStream = grpc.ServerWritableStream<SyncRequest, SyncResponse>;
1716

18-
export type ReadStream = grpc.ServerWritableStream<
19-
pluginV3.cloudquery.plugin.v3.Read.Request,
20-
pluginV3.cloudquery.plugin.v3.Read.Response
21-
>;
17+
export type ReadStream = grpc.ServerWritableStream<ReadRequest, ReadResponse>;
2218

23-
export type WriteStream = grpc.ServerReadableStream<
24-
pluginV3.cloudquery.plugin.v3.Write.Request,
25-
pluginV3.cloudquery.plugin.v3.Write.Response
26-
>;
19+
export type WriteStream = grpc.ServerReadableStream<WriteRequest, WriteResponse>;
2720

2821
export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPluginService {
2922
// Needed due to some TypeScript nonsense

src/memdb/memdb.ts

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,8 @@
11
import { StructRowProxy } from '@apache-arrow/esnext-esm';
22
import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
33

4-
import { WriteRequest, WriteStream } from '../grpc/plugin.js';
5-
import {
6-
Plugin,
7-
newUnimplementedDestination,
8-
newPlugin,
9-
SyncOptions,
10-
TableOptions,
11-
NewClientOptions,
12-
} from '../plugin/plugin.js';
4+
import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js';
5+
import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientOptions } from '../plugin/plugin.js';
136
import { sync } from '../scheduler/scheduler.js';
147
import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js';
158

@@ -93,7 +86,6 @@ export const newMemDBPlugin = (): Plugin => {
9386
};
9487

9588
const pluginClient = {
96-
...newUnimplementedDestination(),
9789
init: (spec: string, options: NewClientOptions) => Promise.resolve(),
9890
close: () => Promise.resolve(),
9991
tables: (options: TableOptions) => {
@@ -151,6 +143,32 @@ export const newMemDBPlugin = (): Plugin => {
151143
resolve();
152144
});
153145

146+
stream.on('error', (error) => {
147+
reject(error);
148+
});
149+
});
150+
},
151+
read(stream: ReadStream): Promise<void> {
152+
return new Promise((resolve, reject) => {
153+
stream.on('data', (request: ReadRequest) => {
154+
const table = decodeTable(request.table);
155+
156+
try {
157+
const rows = memoryDB[table.name] || [];
158+
159+
// We iterate over records in reverse here because we don't set an expectation
160+
// of ordering on plugins, and we want to make sure that the tests are not
161+
// dependent on the order of insertion either.
162+
for (let index = rows.length - 1; index >= 0; index--) {
163+
stream.write(rows[index]);
164+
}
165+
stream.end();
166+
resolve();
167+
} catch (error) {
168+
reject(error);
169+
}
170+
});
171+
154172
stream.on('error', (error) => {
155173
reject(error);
156174
});

0 commit comments

Comments
 (0)