Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 77 additions & 9 deletions handwritten/storage/src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import {CRC32CValidatorGenerator} from './crc32c.js';
import {URL} from 'url';
import {
BaseMetadata,
DeleteOptions,
SetMetadataOptions,
} from './nodejs-common/service-object.js';

Expand Down Expand Up @@ -190,6 +191,7 @@ export interface CombineOptions extends PreconditionOptions {
[key: string]: ContextValue;
} | null;
};
deleteSourceObjects?: boolean;
}

export interface CombineCallback {
Expand All @@ -198,6 +200,24 @@ export interface CombineCallback {

export type CombineResponse = [File, unknown];

export class ComposeCleanupError extends Error {
errors: Error[];
newFile: File;
apiResponse: unknown;
constructor(
message: string,
errors: Error[],
newFile: File,
apiResponse: unknown
) {
super(message);
this.name = 'ComposeCleanupError';
this.errors = errors;
this.newFile = newFile;
this.apiResponse = apiResponse;
}
}

export interface CreateChannelConfig extends WatchAllOptions {
address: string;
}
Expand Down Expand Up @@ -1579,7 +1599,9 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
* metadata's `kms_key_name` value, if any.
* @property {string} [userProject] The ID of the project which will be
* billed for the request.
*/
* @property {boolean} [deleteSourceObjects] If true, the source objects
* will be permanently deleted after a successful compose operation.
*/
/**
* @callback CombineCallback
* @param {?Error} err Request error, if any.
Expand Down Expand Up @@ -1612,7 +1634,8 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
* metadata's `kms_key_name` value, if any.
* @param {string} [options.userProject] The ID of the project which will be
* billed for the request.

* @param {boolean} [options.deleteSourceObjects] If true, the source objects
* will be permanently deleted after a successful compose operation.
* @param {CombineCallback} [callback] Callback function.
* @returns {Promise<CombineResponse>}
*
Expand Down Expand Up @@ -1709,8 +1732,17 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
maxRetries = 0;
}

if (options.ifGenerationMatch === undefined) {
Object.assign(options, destinationFile.instancePreconditionOpts, options);
const deleteSourceObjects = options.deleteSourceObjects;

const requestQueryObject = Object.assign({}, options);
delete requestQueryObject.deleteSourceObjects;

if (requestQueryObject.ifGenerationMatch === undefined) {
Object.assign(
requestQueryObject,
destinationFile.instancePreconditionOpts,
requestQueryObject
);
}

// Make the request from the destination File object.
Expand All @@ -1723,7 +1755,8 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
destination: {
contentType: destinationFile.metadata.contentType,
contentEncoding: destinationFile.metadata.contentEncoding,
contexts: options.contexts || destinationFile.metadata.contexts,
contexts:
requestQueryObject.contexts || destinationFile.metadata.contexts,
},
sourceObjects: (sources as File[]).map(source => {
const sourceObject = {
Expand All @@ -1732,24 +1765,59 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {

if (source.metadata && source.metadata.generation) {
sourceObject.generation = parseInt(
source.metadata.generation.toString(),
source.metadata.generation.toString()
);
}

return sourceObject;
}),
},
qs: options,
qs: requestQueryObject,
},
(err, resp) => {
async (err, resp) => {
this.storage.retryOptions.autoRetry = this.instanceRetryValue;
if (err) {
callback!(err, null, resp);
return;
}

if (deleteSourceObjects) {
const deletePromises = (sources as File[]).map(async source => {
const deleteOptions: DeleteOptions = {
ignoreNotFound: true,
userProject: options.userProject,
};

const generation = source.generation ?? source.metadata?.generation;
if (generation !== undefined) {
deleteOptions.ifGenerationMatch = generation;
}

try {
await source.delete(deleteOptions);
} catch (deleteErr) {
return deleteErr as Error;
}
return null;
});
Comment thread
thiyaguk09 marked this conversation as resolved.

const results = await Promise.all(deletePromises);
const errors = results.filter((res): res is Error => res !== null);

if (errors.length > 0) {
const cleanupErr = new ComposeCleanupError(
`Compose operation succeeded, but cleaning up source objects failed. Failed to delete ${errors.length} source object(s).`,
errors,
destinationFile,
resp
);
callback!(cleanupErr, destinationFile, resp);
return;
}
}

callback!(null, destinationFile, resp);
},
}
);
}

Expand Down
1 change: 1 addition & 0 deletions handwritten/storage/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export {
CombineCallback,
CombineOptions,
CombineResponse,
ComposeCleanupError,
CreateChannelCallback,
CreateChannelConfig,
CreateChannelOptions,
Expand Down
1 change: 1 addition & 0 deletions handwritten/storage/src/nodejs-common/service-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export interface CreateCallback<T> {

export type DeleteOptions = {
ignoreNotFound?: boolean;
userProject?: string;
ifGenerationMatch?: number | string;
ifGenerationNotMatch?: number | string;
ifMetagenerationMatch?: number | string;
Expand Down
144 changes: 141 additions & 3 deletions handwritten/storage/test/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ describe('Bucket', () => {
let Bucket: any;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let bucket: any;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let ComposeCleanupError: any;

const STORAGE = {
createBucket: util.noop,
Expand All @@ -211,7 +213,7 @@ describe('Bucket', () => {
const BUCKET_NAME = 'test-bucket';

before(() => {
Bucket = proxyquire('../src/bucket.js', {
const bucketModule = proxyquire('../src/bucket.js', {
fs: fakeFs,
'p-limit': fakePLimit,
'@google-cloud/promisify': fakePromisify,
Expand All @@ -225,7 +227,9 @@ describe('Bucket', () => {
'./iam.js': {Iam: FakeIam},
'./notification.js': {Notification: FakeNotification},
'./signer.js': fakeSigner,
}).Bucket;
});
Bucket = bucketModule.Bucket;
ComposeCleanupError = bucketModule.ComposeCleanupError;
});

beforeEach(() => {
Expand Down Expand Up @@ -806,7 +810,7 @@ describe('Bucket', () => {
const destination = bucket.file('destination.txt');

destination.request = (reqOpts: DecorateRequestOptions) => {
assert.strictEqual(reqOpts.qs, options);
assert.deepStrictEqual(reqOpts.qs, options);
done();
};

Expand Down Expand Up @@ -916,6 +920,140 @@ describe('Bucket', () => {

bucket.combine(sources, destination, done);
});

it('should delete source objects if deleteSourceObjects is true', done => {
const sources = [bucket.file('1.foo'), bucket.file('2.foo')];
const destination = bucket.file('destination.foo');

// Set generation on the first file and leave second file without generation
sources[0].generation = 12345;

let deletedCount = 0;
sources[0].delete = async (opts?: any) => {
assert.strictEqual(opts?.userProject, 'user-project-id');
assert.strictEqual(opts?.ignoreNotFound, true);
assert.strictEqual(opts?.ifGenerationMatch, 12345);
deletedCount++;
return [{}];
};
sources[1].delete = async (opts?: any) => {
assert.strictEqual(opts?.userProject, 'user-project-id');
assert.strictEqual(opts?.ignoreNotFound, true);
assert.strictEqual(opts?.ifGenerationMatch, undefined);
deletedCount++;
return [{}];
};

destination.request = (reqOpts: DecorateRequestOptions, callback: Function) => {
assert.strictEqual(reqOpts.qs.deleteSourceObjects, undefined);
assert.strictEqual(reqOpts.json.deleteSourceObjects, undefined);
callback(null, {});
};

bucket.combine(
sources,
destination,
{deleteSourceObjects: true, userProject: 'user-project-id'},
(err: any) => {
assert.ifError(err);
assert.strictEqual(deletedCount, 2);
done();
}
);
});

it('should not delete source objects if deleteSourceObjects is false/omitted', done => {
const sources = [bucket.file('1.foo'), bucket.file('2.foo')];
const destination = bucket.file('destination.foo');

let deletedCount = 0;
sources.forEach(source => {
source.delete = async () => {
deletedCount++;
return [{}];
};
});

destination.request = (reqOpts: DecorateRequestOptions, callback: Function) => {
assert.strictEqual(reqOpts.json.deleteSourceObjects, undefined);
callback(null, {});
};

bucket.combine(sources, destination, (err: any) => {
assert.ifError(err);
assert.strictEqual(deletedCount, 0);
done();
});
});

it('should not delete source objects if compose operation fails', done => {
const sources = [bucket.file('1.foo'), bucket.file('2.foo')];
const destination = bucket.file('destination.foo');
const composeError = new Error('Compose failed.');

let deletedCount = 0;
sources.forEach(source => {
source.delete = async () => {
deletedCount++;
return [{}];
};
});

destination.request = (reqOpts: DecorateRequestOptions, callback: Function) => {
assert.strictEqual(reqOpts.json.deleteSourceObjects, undefined);
callback(composeError);
};

bucket.combine(sources, destination, {deleteSourceObjects: true}, (err: any) => {
assert.strictEqual(err, composeError);
assert.strictEqual(deletedCount, 0);
done();
});
});

it('should return ComposeCleanupError if deleting source objects fails', done => {
const sources = [bucket.file('1.foo'), bucket.file('2.foo')];
const destination = bucket.file('destination.foo');
const deleteError = new Error('Delete failed.');

sources[0].delete = async (opts?: any) => {
assert.strictEqual(opts?.userProject, 'user-project-id');
assert.strictEqual(opts?.ignoreNotFound, true);
throw deleteError;
};
sources[1].delete = async (opts?: any) => {
assert.strictEqual(opts?.userProject, 'user-project-id');
assert.strictEqual(opts?.ignoreNotFound, true);
return [{}];
};

destination.request = (reqOpts: DecorateRequestOptions, callback: Function) => {
assert.strictEqual(reqOpts.json.deleteSourceObjects, undefined);
callback(null, {success: true});
};

bucket.combine(
sources,
destination,
{deleteSourceObjects: true, userProject: 'user-project-id'},
(err: any, newFile: any, apiResponse: any) => {
try {
assert.ok(err instanceof ComposeCleanupError);
assert.strictEqual(err.name, 'ComposeCleanupError');
assert.deepStrictEqual((err as any).errors, [deleteError]);
assert.strictEqual((err as any).newFile, destination);
assert.deepStrictEqual((err as any).apiResponse, {success: true});

// Also check callback arguments
assert.strictEqual(newFile, destination);
assert.deepStrictEqual(apiResponse, {success: true});
done();
} catch (assertErr) {
done(assertErr);
}
}
);
});
});

describe('createChannel', () => {
Expand Down
Loading