1+ import { StructRowProxy } from '@apache-arrow/esnext-esm' ;
2+ import { pluginV3 } from '@cloudquery/plugin-pb-javascript' ;
3+
4+ import { WriteRequest , WriteStream } from '../grpc/plugin.js' ;
15import {
26 Plugin ,
37 newUnimplementedDestination ,
@@ -7,20 +11,87 @@ import {
711 NewClientOptions ,
812} from '../plugin/plugin.js' ;
913import { sync } from '../scheduler/scheduler.js' ;
10- import { Table , createTable , filterTables } from '../schema/table.js' ;
14+ import { Table , createTable , filterTables , decodeTable , decodeRecord , getPrimaryKeys } from '../schema/table.js' ;
1115
1216export const createMemDBClient = ( ) => {
13- return { id : ( ) => 'memdb' } ;
17+ //eslint-disable-next-line @typescript-eslint/no-explicit-any
18+ const memoryDB : Record < string , any [ ] > = { } ;
19+ const tables : Record < string , Table > = { } ;
20+ return {
21+ id : ( ) => 'memdb' ,
22+ memoryDB,
23+ tables,
24+ } ;
1425} ;
1526
1627export const newMemDBPlugin = ( ) : Plugin => {
1728 const memdbClient = createMemDBClient ( ) ;
29+ const memoryDB = memdbClient . memoryDB ;
30+ const tables = memdbClient . tables ;
1831
1932 const allTables : Table [ ] = [
2033 createTable ( { name : 'table1' , title : 'Table 1' , description : 'Table 1 description' } ) ,
2134 createTable ( { name : 'table2' , title : 'Table 2' , description : 'Table 2 description' } ) ,
2235 ] ;
2336
37+ const memdb : { inserts : unknown [ ] ; [ key : string ] : unknown } = {
38+ inserts : [ ] ,
39+ ...memoryDB ,
40+ } ;
41+
42+ //eslint-disable-next-line @typescript-eslint/no-explicit-any
43+ const overwrite = ( table : Table , primaryKeys : string [ ] , record : StructRowProxy < any > ) => {
44+ const tableData = memoryDB [ table . name ] || [ ] ;
45+
46+ if ( primaryKeys . length === 0 ) {
47+ // If there are no primary keys, simply append the data
48+ tableData . push ( record ) ;
49+ memoryDB [ table . name ] = tableData ;
50+ return ;
51+ }
52+
53+ // Otherwise, perform an upsert based on the primary keys
54+ const recordIndex = tableData . findIndex ( ( existingRecord ) => {
55+ return primaryKeys . every ( ( key ) => existingRecord [ key ] === record [ key ] ) ;
56+ } ) ;
57+
58+ if ( recordIndex > - 1 ) {
59+ // If record exists, update (overwrite) it
60+ tableData [ recordIndex ] = record ;
61+ } else {
62+ // If record doesn't exist, insert it
63+ tableData . push ( record ) ;
64+ }
65+
66+ memoryDB [ table . name ] = tableData ; // Update the memoryDB with the modified table data
67+ } ;
68+
69+ const deleteStale = ( message : pluginV3 . cloudquery . plugin . v3 . Write . MessageDeleteStale ) : void => {
70+ const tableName = message . table_name ;
71+
72+ // Filter the table based on the provided criteria
73+ const filteredTable = memoryDB [ tableName ] . filter ( ( row ) => {
74+ const sc = row . Schema ( ) ;
75+
76+ const sourceColIndex = sc . FieldIndices ( 'source_name_column' ) ;
77+ const syncColIndex = sc . FieldIndices ( 'sync_time_column' ) ;
78+
79+ // Ensure both columns are present
80+ if ( sourceColIndex === undefined || syncColIndex === undefined ) {
81+ return true ; // Keep the record if either column is missing
82+ }
83+
84+ const rowSourceName = row . Column ( sourceColIndex ) . Value ( 0 ) ;
85+ const rowSyncTime = row . Column ( syncColIndex ) . Value ( 0 ) ; // Assuming it returns a Date object
86+
87+ // If source names match and the record's sync time is not before the given sync time, keep the record
88+ return rowSourceName === message . source_name && ! rowSyncTime . before ( message . sync_time ) ;
89+ } ) ;
90+
91+ // Update the memory database with the filtered table
92+ memoryDB [ tableName ] = filteredTable ;
93+ } ;
94+
2495 const pluginClient = {
2596 ...newUnimplementedDestination ( ) ,
2697 init : ( spec : string , options : NewClientOptions ) => Promise . resolve ( ) ,
@@ -35,6 +106,56 @@ export const newMemDBPlugin = (): Plugin => {
35106 const filtered = filterTables ( allTables , tables , skipTables , skipDependentTables ) ;
36107 return await sync ( memdbClient , filtered , stream , { deterministicCQId } ) ;
37108 } ,
109+ write ( stream : WriteStream ) : Promise < void > {
110+ return new Promise ( ( resolve , reject ) => {
111+ stream . on ( 'data' , ( request : WriteRequest ) => {
112+ switch ( request . message ) {
113+ case 'migrate_table' : {
114+ // Update table schema in the `tables` map
115+ const table = decodeTable ( request . migrate_table . table ) ;
116+ tables [ table . name ] = table ;
117+ break ;
118+ }
119+
120+ case 'insert' : {
121+ const [ tableName , batches ] = decodeRecord ( request . insert . record ) ;
122+
123+ if ( ! memoryDB [ tableName ] ) {
124+ memoryDB [ tableName ] = [ ] ;
125+ }
126+
127+ const tableSchema = tables [ tableName ] ;
128+ const pks = getPrimaryKeys ( tableSchema ) ;
129+
130+ for ( const batch of batches ) {
131+ //eslint-disable-next-line unicorn/no-array-for-each
132+ for ( const record of batch ) {
133+ overwrite ( tableSchema , pks , record ) ;
134+ }
135+ }
136+ break ;
137+ }
138+
139+ case 'delete' : {
140+ deleteStale ( request . delete ) ;
141+ break ;
142+ }
143+
144+ default : {
145+ throw new Error ( `Unknown request message type: ${ request . message } ` ) ;
146+ }
147+ }
148+ } ) ;
149+
150+ stream . on ( 'finish' , ( ) => {
151+ resolve ( ) ;
152+ } ) ;
153+
154+ stream . on ( 'error' , ( error ) => {
155+ reject ( error ) ;
156+ } ) ;
157+ } ) ;
158+ } ,
38159 } ;
39160
40161 return newPlugin ( 'memdb' , '0.0.1' , ( ) => Promise . resolve ( pluginClient ) ) ;
0 commit comments