diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 720bd1319b381f..2dfe9d8b518df6 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -107,7 +107,11 @@ const EventEmitter = require('events'); const { StringDecoder } = require('string_decoder'); const { kFSWatchStart, watch } = require('internal/fs/watchers'); const nonNativeWatcher = require('internal/fs/recursive_watch'); -const { isIterable } = require('internal/streams/utils'); +const { + isIterable, + isReadableErrored, + isReadableNodeStream, +} = require('internal/streams/utils'); const assert = require('internal/assert'); const permission = require('internal/process/permission'); @@ -1116,24 +1120,60 @@ function checkAborted(signal) { throw new AbortError(undefined, { cause: signal.reason }); } -async function writeFileHandle(filehandle, data, signal, encoding) { - checkAborted(signal); +function makeWriteFileStreamErrorHandler(data) { + if (!isReadableNodeStream(data)) + return undefined; + + let error; + let errored = false; + function onError(err) { + error = err; + errored = true; + } + const streamError = isReadableErrored(data); + if (streamError != null) + onError(streamError); + data.on('error', onError); + + return { + __proto__: null, + check() { + if (errored) + throw error; + }, + cleanup() { + data.removeListener('error', onError); + }, + }; +} + +async function writeFileHandle(filehandle, data, signal, encoding, streamErrorHandler) { if (isCustomIterable(data)) { - for await (const buf of data) { + streamErrorHandler ??= makeWriteFileStreamErrorHandler(data); + try { checkAborted(signal); - const toWrite = - isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8'); - let remaining = toWrite.byteLength; - while (remaining > 0) { - const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); - const { bytesWritten } = await write( - filehandle, toWrite, toWrite.byteLength - remaining, writeSize); - remaining -= bytesWritten; + streamErrorHandler?.check(); + for await (const buf of data) { checkAborted(signal); + streamErrorHandler?.check(); + const toWrite = + isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8'); + let remaining = toWrite.byteLength; + while (remaining > 0) { + const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); + const { bytesWritten } = await write( + filehandle, toWrite, toWrite.byteLength - remaining, writeSize); + remaining -= bytesWritten; + checkAborted(signal); + streamErrorHandler?.check(); + } } + } finally { + streamErrorHandler?.cleanup(); } return; } + checkAborted(signal); data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); let remaining = data.byteLength; if (remaining === 0) return; @@ -1891,13 +1931,23 @@ async function writeFile(path, data, options) { } validateAbortSignal(options.signal); + checkAborted(options.signal); + const streamErrorHandler = makeWriteFileStreamErrorHandler(data); + if (path instanceof FileHandle) - return writeFileHandle(path, data, options.signal, options.encoding); + return writeFileHandle( + path, data, options.signal, options.encoding, streamErrorHandler); - checkAborted(options.signal); + let fd; + try { + fd = await open(path, flag, options.mode); + } catch (err) { + streamErrorHandler?.cleanup(); + throw err; + } - const fd = await open(path, flag, options.mode); - let writeOp = writeFileHandle(fd, data, options.signal, options.encoding); + let writeOp = writeFileHandle( + fd, data, options.signal, options.encoding, streamErrorHandler); if (flush) { writeOp = handleFdSync(writeOp, fd); diff --git a/test/parallel/test-fs-promises-file-handle-writeFile.js b/test/parallel/test-fs-promises-file-handle-writeFile.js index 2c1a80e4f52d49..7635ec57567db0 100644 --- a/test/parallel/test-fs-promises-file-handle-writeFile.js +++ b/test/parallel/test-fs-promises-file-handle-writeFile.js @@ -48,6 +48,7 @@ async function doWriteAndCancel() { const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const errorDest = path.resolve(tmpDir, 'tmp-error.txt'); const stream = Readable.from(['a', 'b', 'c']); const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']); const iterable = { @@ -65,6 +66,15 @@ function iterableWith(value) { } }; } + +function createEarlyErrorStream(error) { + const stream = new Readable({ + read() {} + }); + process.nextTick(() => stream.destroy(error)); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -94,6 +104,25 @@ async function doWriteStream() { } } +async function doWriteStreamError() { + const fileHandle = await open(errorDest, 'w+'); + const error = new Error('early file handle writeFile stream error'); + const stream = createEarlyErrorStream(error); + const uncaughtException = common.mustNotCall( + 'stream errors should reject FileHandle.writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fileHandle.writeFile(stream), + { message: error.message } + ); + } finally { + process.removeListener('uncaughtException', uncaughtException); + await fileHandle.close(); + } +} + async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -190,6 +219,7 @@ async function doWriteInvalidValues() { await validateWriteFile(); await doWriteAndCancel(); await doWriteStream(); + await doWriteStreamError(); await doWriteStreamWithCancel(); await doWriteIterable(); await doWriteInvalidIterable(); diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index 25df61b2b48414..4e90de47352d2b 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -13,6 +13,7 @@ tmpdir.refresh(); const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const errorDest = path.resolve(tmpDir, 'tmp-error.txt'); const buffer = Buffer.from('abc'.repeat(1000)); const buffer2 = Buffer.from('xyz'.repeat(1000)); const stream = Readable.from(['a', 'b', 'c']); @@ -40,6 +41,15 @@ function iterableWith(value) { } }; } + +function createEarlyErrorStream(error) { + const stream = new Readable({ + read() {} + }); + process.nextTick(() => stream.destroy(error)); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -70,6 +80,34 @@ async function doWriteStream() { assert.deepStrictEqual(data, expected); } +async function doWriteStreamError() { + const error = new Error('early writeFile stream error'); + const stream = createEarlyErrorStream(error); + const uncaughtException = common.mustNotCall( + 'stream errors should reject writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fsPromises.writeFile(errorDest, stream), + { message: error.message } + ); + assert.strictEqual(stream.listenerCount('error'), 0); + } finally { + process.removeListener('uncaughtException', uncaughtException); + } +} + +async function doWriteStreamOpenError() { + const stream = Readable.from(['a']); + + await assert.rejects( + fsPromises.writeFile(path.resolve(tmpDir, 'not-found', 'tmp.txt'), stream), + { code: 'ENOENT' } + ); + assert.strictEqual(stream.listenerCount('error'), 0); +} + async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -168,6 +206,8 @@ async function doReadWithEncoding() { await doRead(); await doReadWithEncoding(); await doWriteStream(); + await doWriteStreamError(); + await doWriteStreamOpenError(); await doWriteStreamWithCancel(); await doWriteIterable(); await doWriteInvalidIterable();