Skip to content

Commit fcc49aa

Browse files
committed
feat(databases): Add support for fast bulk operations
1 parent a443673 commit fcc49aa

File tree

11 files changed

+172
-25
lines changed

11 files changed

+172
-25
lines changed

docs/api/databases/common.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@ Disabling or changing the default pagination is not available in the client. Onl
102102

103103
</BlockQuote>
104104

105+
## Bulk updates
106+
107+
Some database adapters allow to set the `params.bulk` option to perform fast `create`, `patch` or `remove` operations for a large amount of data. Setting `params.bulk = true` will always return no data (an empty array `[]`) and not send any real-time events.
108+
109+
```ts
110+
const manyTodos = await readCSVFile('todos.csv')
111+
112+
await app.service('todos').create(manyTodos, {
113+
bulk: true
114+
}) // -> []
115+
```
116+
105117
## Extending Adapters
106118

107119
There are two ways to extend existing database adapters. Either by extending the base class or by adding functionality through hooks.

packages/adapter-commons/src/declarations.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export interface AdapterParams<
7070
> extends Params<Q> {
7171
adapter?: A
7272
paginate?: PaginationParams
73+
bulk?: boolean
7374
}
7475

7576
/**

packages/adapter-commons/src/service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ export abstract class AdapterBase<
5757
* @returns Wether or not multiple updates are allowed.
5858
*/
5959
allowsMulti(method: string, params: ServiceParams = {} as ServiceParams) {
60+
if (params.bulk) {
61+
return true
62+
}
63+
6064
const always = alwaysMulti[method]
6165

6266
if (typeof always !== 'undefined') {

packages/adapter-tests/src/declarations.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export type AdapterMethodsTestName =
3333
| '.remove'
3434
| '.remove + $select'
3535
| '.remove + id + query'
36+
| '.remove bulk'
3637
| '.remove + multi'
3738
| '.remove + multi no pagination'
3839
| '.remove + id + query id'
@@ -49,12 +50,14 @@ export type AdapterMethodsTestName =
4950
| '.patch multiple no pagination'
5051
| '.patch multi query same'
5152
| '.patch multi query changed'
53+
| '.patch bulk'
5254
| '.patch + NotFound'
5355
| '.patch + query + NotFound'
5456
| '.patch + id + query id'
5557
| '.create'
5658
| '.create + $select'
5759
| '.create multi'
60+
| '.create bulk'
5861
| 'internal .find'
5962
| 'internal .get'
6063
| 'internal .create'

packages/adapter-tests/src/methods.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,28 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s
114114
}
115115
})
116116

117+
test('.remove bulk', async () => {
118+
await service.create({ name: 'Dave', age: 29, created: true })
119+
await service.create({
120+
name: 'David',
121+
age: 3,
122+
created: true
123+
})
124+
125+
const data = await service.remove(null, {
126+
query: { created: true },
127+
bulk: true
128+
})
129+
130+
assert.deepStrictEqual(data, [])
131+
132+
const found = await service.find({
133+
query: { created: true }
134+
})
135+
136+
assert.strictEqual(found.length, 0)
137+
})
138+
117139
test('.remove + multi', async () => {
118140
try {
119141
await service.remove(null)
@@ -398,6 +420,39 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s
398420
await service.remove(david[idProp])
399421
})
400422

423+
test('.patch bulk', async () => {
424+
const dave = await service.create({
425+
name: 'Dave',
426+
age: 29,
427+
created: true
428+
})
429+
const david = await service.create({
430+
name: 'David',
431+
age: 3,
432+
created: true
433+
})
434+
435+
const data = await service.patch(
436+
null,
437+
{
438+
age: 2
439+
},
440+
{
441+
query: { created: true },
442+
bulk: true
443+
}
444+
)
445+
446+
assert.deepStrictEqual(data, [])
447+
448+
const daveAfter = await service.get(dave[idProp])
449+
450+
assert.strictEqual(daveAfter.age, 2, 'Dave age was updated')
451+
452+
await service.remove(dave[idProp])
453+
await service.remove(david[idProp])
454+
})
455+
401456
test('.patch multiple no pagination', async () => {
402457
try {
403458
await service.remove(doug[idProp])
@@ -643,6 +698,36 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s
643698
await service.remove(data[0][idProp])
644699
await service.remove(data[1][idProp])
645700
})
701+
702+
test('.create bulk', async () => {
703+
const items = [
704+
{
705+
name: 'Gerald',
706+
age: 18
707+
},
708+
{
709+
name: 'Herald',
710+
age: 18
711+
}
712+
]
713+
714+
const data = await service.create(items, {
715+
bulk: true
716+
})
717+
718+
assert.deepStrictEqual(data, [])
719+
720+
const foundItems = await service.find({
721+
query: { age: 18 }
722+
})
723+
724+
assert.strictEqual(foundItems.length, 2)
725+
726+
await service.remove(null, {
727+
query: { age: 18 },
728+
bulk: true
729+
})
730+
})
646731
})
647732

648733
describe("doesn't call public methods internally", () => {

packages/knex/src/adapter.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,15 @@ export class KnexAdapter<
223223
): Promise<Result | Result[]> {
224224
const data = _data as any
225225

226+
if (params.bulk) {
227+
const res = await this.db(params)
228+
.insert(data)
229+
.then(() => [])
230+
.catch(errorHandler)
231+
232+
return res as Result[]
233+
}
234+
226235
if (Array.isArray(data)) {
227236
return Promise.all(data.map((current) => this._create(current, params)))
228237
}
@@ -252,6 +261,12 @@ export class KnexAdapter<
252261
}
253262

254263
const data = _.omit(raw, this.id)
264+
265+
if (params.bulk) {
266+
await this.createQuery(params).update(data)
267+
return []
268+
}
269+
255270
const results = await this._findOrGet(id, {
256271
...params,
257272
query: {
@@ -313,6 +328,11 @@ export class KnexAdapter<
313328
throw new MethodNotAllowed('Can not remove multiple entries')
314329
}
315330

331+
if (params.bulk) {
332+
await this.createQuery(params).del().catch(errorHandler)
333+
return []
334+
}
335+
316336
const items = await this._findOrGet(id, params)
317337
const { query } = this.filterQuery(params)
318338
const q = this.db(params)

packages/knex/test/index.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const testSuite = adapterTests([
2727
'.remove + id + query',
2828
'.remove + multi',
2929
'.remove + multi no pagination',
30+
'.remove bulk',
3031
'.remove + id + query id',
3132
'.update',
3233
'.update + $select',
@@ -38,6 +39,7 @@ const testSuite = adapterTests([
3839
'.patch + $select',
3940
'.patch + id + query',
4041
'.patch multiple',
42+
'.patch bulk',
4143
'.patch multiple no pagination',
4244
'.patch multi query same',
4345
'.patch multi query changed',
@@ -47,6 +49,7 @@ const testSuite = adapterTests([
4749
'.create',
4850
'.create + $select',
4951
'.create multi',
52+
'.create bulk',
5053
'internal .find',
5154
'internal .get',
5255
'internal .create',

packages/memory/src/index.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import {
66
AdapterBase,
77
AdapterServiceOptions,
88
PaginationOptions,
9-
AdapterParams
9+
AdapterParams,
10+
AdapterQuery
1011
} from '@feathersjs/adapter-commons'
1112
import sift from 'sift'
1213
import { NullableId, Id, Params, Paginated } from '@feathersjs/feathers'
@@ -28,10 +29,12 @@ const _select = (data: any, params: any, ...args: any[]) => {
2829
return base(JSON.parse(JSON.stringify(data)))
2930
}
3031

32+
export type MemoryAdapterParams<Q = AdapterQuery> = AdapterParams<Q, Partial<MemoryServiceOptions>>
33+
3134
export class MemoryAdapter<
3235
Result = any,
3336
Data = Partial<Result>,
34-
ServiceParams extends Params = Params,
37+
ServiceParams extends MemoryAdapterParams = MemoryAdapterParams,
3538
PatchData = Partial<Data>
3639
> extends AdapterBase<Result, Data, PatchData, ServiceParams, MemoryServiceOptions<Result>> {
3740
store: MemoryServiceStore<Result>
@@ -145,18 +148,18 @@ export class MemoryAdapter<
145148
async _create(data: Partial<Data>[], params?: ServiceParams): Promise<Result[]>
146149
async _create(data: Partial<Data> | Partial<Data>[], _params?: ServiceParams): Promise<Result | Result[]>
147150
async _create(
148-
data: Partial<Data> | Partial<Data>[],
151+
_data: Partial<Data> | Partial<Data>[],
149152
params: ServiceParams = {} as ServiceParams
150153
): Promise<Result | Result[]> {
151-
if (Array.isArray(data)) {
152-
return Promise.all(data.map((current) => this._create(current, params)))
153-
}
154+
const payload = Array.isArray(_data) ? _data : [_data]
155+
const results = payload.map((value) => {
156+
const id = (value as any)[this.id] || this._uId++
157+
const current = _.extend({}, value, { [this.id]: id })
154158

155-
const id = (data as any)[this.id] || this._uId++
156-
const current = _.extend({}, data, { [this.id]: id })
157-
const result = (this.store[id] = current)
159+
return _select((this.store[id] = current), params, this.id)
160+
})
158161

159-
return _select(result, params, this.id) as Result
162+
return params.bulk ? [] : Array.isArray(_data) ? results : results[0]
160163
}
161164

162165
async _update(id: Id, data: Data, params: ServiceParams = {} as ServiceParams): Promise<Result> {
@@ -202,11 +205,12 @@ export class MemoryAdapter<
202205
...params,
203206
query
204207
})
208+
const results = entries.map(patchEntry)
205209

206-
return entries.map(patchEntry)
210+
return params.bulk ? [] : results
207211
}
208212

209-
return patchEntry(await this._get(id, params)) // Will throw an error if not found
213+
return params.bulk ? [] : patchEntry(await this._get(id, params)) // Will throw an error if not found
210214
}
211215

212216
async _remove(id: null, params?: ServiceParams): Promise<Result[]>
@@ -225,7 +229,9 @@ export class MemoryAdapter<
225229
query
226230
})
227231

228-
return Promise.all(entries.map((current: any) => this._remove(current[this.id] as Id, params)))
232+
entries.forEach((current: any) => delete this.store[(current as any)[this.id]])
233+
234+
return params.bulk ? [] : entries
229235
}
230236

231237
const entry = await this._get(id, params)

packages/memory/test/index.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const testSuite = adapterTests([
2626
'.remove + id + query',
2727
'.remove + multi',
2828
'.remove + multi no pagination',
29+
'.remove bulk',
2930
'.remove + id + query id',
3031
'.update',
3132
'.update + $select',
@@ -40,12 +41,14 @@ const testSuite = adapterTests([
4041
'.patch multiple no pagination',
4142
'.patch multi query same',
4243
'.patch multi query changed',
44+
'.patch bulk',
4345
'.patch + query + NotFound',
4446
'.patch + NotFound',
4547
'.patch + id + query id',
4648
'.create',
4749
'.create + $select',
4850
'.create multi',
51+
'.create bulk',
4952
'internal .find',
5053
'internal .get',
5154
'internal .create',

packages/mongodb/src/adapter.ts

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ export class MongoDbAdapter<
261261
data: Data | Data[],
262262
params: ServiceParams = {} as ServiceParams
263263
): Promise<Result | Result[]> {
264-
const writeOptions = params.mongodb
265264
const model = await this.getModel(params)
266265
const setId = (item: any) => {
267266
const entry = Object.assign({}, item)
@@ -279,14 +278,14 @@ export class MongoDbAdapter<
279278

280279
const promise = Array.isArray(data)
281280
? model
282-
.insertMany(data.map(setId), writeOptions)
281+
.insertMany(data.map(setId), params.mongodb)
283282
.then(async (result) =>
284-
Promise.all(
285-
Object.values(result.insertedIds).map(async (_id) => model.findOne({ _id }, params.mongodb))
286-
)
283+
params.bulk
284+
? []
285+
: model.find({ _id: { $in: Object.values(result.insertedIds) } }, params.mongodb).toArray()
287286
)
288287
: model
289-
.insertOne(setId(data), writeOptions)
288+
.insertOne(setId(data), params.mongodb)
290289
.then(async (result) => model.findOne({ _id: result.insertedId }, params.mongodb))
291290

292291
return promise.then(select(params, this.id)).catch(errorHandler)
@@ -325,6 +324,12 @@ export class MongoDbAdapter<
325324

326325
return current
327326
}, {} as any)
327+
328+
if (params.bulk) {
329+
await model.updateMany(query, modifier, updateOptions)
330+
return []
331+
}
332+
328333
const originalIds = await this._findOrGet(id, {
329334
...params,
330335
query: {
@@ -389,11 +394,13 @@ export class MongoDbAdapter<
389394
}
390395
}
391396

392-
return this._findOrGet(id, findParams)
393-
.then(async (items) => {
394-
await model.deleteMany(query, deleteOptions)
395-
return items
396-
})
397-
.catch(errorHandler)
397+
return params.bulk
398+
? model.deleteMany(query, deleteOptions).then(() => [])
399+
: this._findOrGet(id, findParams)
400+
.then(async (items) => {
401+
await model.deleteMany(query, deleteOptions)
402+
return items
403+
})
404+
.catch(errorHandler)
398405
}
399406
}

0 commit comments

Comments
 (0)