Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fs: add stream utilities to FileHandle
  • Loading branch information
aduh95 committed Sep 8, 2021
commit 51c212d4406171aa47249b70bf2049a980e3f5fc
109 changes: 109 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,80 @@ If one or more `filehandle.read()` calls are made on a file handle and then a
position till the end of the file. It doesn't always read from the beginning
of the file.

#### `filehandle.readStream([options])`
Comment thread
aduh95 marked this conversation as resolved.
Outdated
<!-- YAML
added: REPLACEME
-->

* `options` {Object}
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'r'`.
* `encoding` {string} **Default:** `null`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `true`
* `start` {integer}
* `end` {integer} **Default:** `Infinity`
* `highWaterMark` {integer} **Default:** `64 * 1024`
* `fs` {Object|null} **Default:** `null`
* Returns: {fs.ReadStream} See [Readable Stream][].

Unlike the 16 kb default `highWaterMark` for a readable stream, the stream
returned by this method has a default `highWaterMark` of 64 kb.

`options` can include `start` and `end` values to read a range of bytes from
the file instead of the entire file. Both `start` and `end` are inclusive and
start counting at 0, allowed values are in the
[0, [`Number.MAX_SAFE_INTEGER`][]] range. If `start` is
omitted or `undefined`, `filehandle.readStream()` reads sequentially from the
current file position. The `encoding` can be any one of those accepted by
{Buffer}.

If the `FileHandle` points to a character device that only supports blocking
reads (such as keyboard or sound card), read operations do not finish until data
is available. This can prevent the process from exiting and the stream from
closing naturally.

By default, the stream will emit a `'close'` event after it has been
destroyed, like most `Readable` streams. Set the `emitClose` option to
`false` to change this behavior.

By providing the `fs` option, it is possible to override the corresponding `fs`
implementations for `open`, `read`, and `close`. When providing the `fs` option,
overrides for `open`, `read`, and `close` are required.

```mjs
import { open } from 'fs/promises';

const fd = await open('/dev/input/event0');
// Create a stream from some character device.
const stream = fd.readStream();
setTimeout(() => {
stream.close(); // This may not close the stream.
// Artificially marking end-of-stream, as if the underlying resource had
// indicated end-of-file by itself, allows the stream to close.
// This does not cancel pending read operations, and if there is such an
// operation, the process may still not be able to exit successfully
// until it finishes.
stream.push(null);
stream.read(0);
}, 100);
```

If `autoClose` is false, then the file descriptor won't be closed, even if
there's an error. It is the application's responsibility to close it and make
sure there's no file descriptor leak. If `autoClose` is set to true (default
behavior), on `'error'` or `'end'` the file descriptor will be closed
automatically.

An example to read the last 10 bytes of a file which is 100 bytes long:

```mjs
import { open } from 'fs/promises';

const fd = await open('sample.txt');
fd.readStream({ start: 90, end: 99 });
```

#### `filehandle.readv(buffers[, position])`
<!-- YAML
added:
Expand Down Expand Up @@ -582,6 +656,41 @@ If one or more `filehandle.write()` calls are made on a file handle and then a
current position till the end of the file. It doesn't always write from the
beginning of the file.

#### `filehandle.writeStream([options])`
Comment thread
aduh95 marked this conversation as resolved.
Outdated
<!-- YAML
added: REPLACEME
-->

* `options` {Object}
* `encoding` {string} **Default:** `'utf8'`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `true`
* `start` {integer}
* `fs` {Object|null} **Default:** `null`
* Returns: {fs.WriteStream} See [Writable Stream][].

`options` may also include a `start` option to allow writing data at some
position past the beginning of the file, allowed values are in the
[0, [`Number.MAX_SAFE_INTEGER`][]] range. Modifying a file rather than replacing
it may require the `flags` `open` option to be set to `r+` rather than the
default `r`. The `encoding` can be any one of those accepted by {Buffer}.

If `autoClose` is set to true (default behavior) on `'error'` or `'finish'`
the file descriptor will be closed automatically. If `autoClose` is false,
then the file descriptor won't be closed, even if there's an error.
It is the application's responsibility to close it and make sure there's no
file descriptor leak.

By default, the stream will emit a `'close'` event after it has been
destroyed, like most `Writable` streams. Set the `emitClose` option to
`false` to change this behavior.

By providing the `fs` option it is possible to override the corresponding `fs`
implementations for `open`, `write`, `writev` and `close`. Overriding `write()`
without `writev()` can reduce performance as some optimizations (`_writev()`)
will be disabled. When providing the `fs` option, overrides for `open`,
`close`, and at least one of `write` and `writev` are required.

#### `filehandle.writev(buffers[, position])`
<!-- YAML
added: v12.9.0
Expand Down
37 changes: 37 additions & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ const {
const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
const { ReadStream, WriteStream } = require('internal/fs/streams');

const getDirectoryEntriesPromise = promisify(getDirents);
const validateRmOptionsPromise = promisify(validateRmOptions);
Expand Down Expand Up @@ -252,6 +253,42 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return readable;
}

/**
* @typedef {import('./streams').ReadStream
* } NodeJSReadStream
* @param {{
* flags?: string;
* encoding?: string;
* autoClose?: boolean;
* emitClose?: boolean;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove autoClose & emitClose from here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are legacy. The only reason we have still have them is to avoid breakage. Since this is a new API they are not necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a new way for exposing the same old API, removing it would be harder than keeping it – also I don’t think they’re documented as legacy anywhere, so that’s probably best to leave that discussion for another PR.

* start: number;
* end?: number;
* highWaterMark?: number;
* fs?: Object | null;
* }} [options]
* @returns {NodeJSReadStream}
*/
Comment thread
aduh95 marked this conversation as resolved.
readStream(options = undefined) {
return new ReadStream(undefined, { ...options, fd: this[kFd] });
Comment thread
addaleax marked this conversation as resolved.
Outdated
}

/**
* @typedef {import('./streams').WriteStream
* } NodeJSWriteStream
* @param {{
* flags?: string;
* encoding?: string;
* autoClose?: boolean;
* emitClose?: boolean;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove autoClose & emitClose from here?

* start: number;
* fs?: Object | null;
* }} [options]
* @returns {NodeJSWriteStream}
*/
Comment thread
aduh95 marked this conversation as resolved.
writeStream(options = undefined) {
return new WriteStream(undefined, { ...options, fd: this[kFd] });
}

[kTransfer]() {
if (this[kClosePromise] || this[kRefs] > 1) {
throw lazyDOMException('Cannot transfer FileHandle while in use',
Expand Down
52 changes: 52 additions & 0 deletions test/parallel/test-fs-promises-file-handle-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common');

// The following tests validate base functionality for the fs.promises
// FileHandle.write method.

const fs = require('fs');
const { open } = fs.promises;
Comment thread
aduh95 marked this conversation as resolved.
const path = require('path');
const tmpdir = require('../common/tmpdir');
const assert = require('assert');
const { finished } = require('stream/promises');
const { Blob } = require('buffer');
const tmpDir = tmpdir.path;

tmpdir.refresh();

async function validateWrite() {
const filePathForHandle = path.resolve(tmpDir, 'tmp-write.txt');
const fileHandle = await open(filePathForHandle, 'w');
const buffer = Buffer.from('Hello world'.repeat(100), 'utf8');

const stream = fileHandle.writeStream();
stream.end(buffer);
await finished(stream);

const readFileData = fs.readFileSync(filePathForHandle);
Comment thread
aduh95 marked this conversation as resolved.
assert.deepStrictEqual(buffer, readFileData);
}

async function validateRead() {
const filePathForHandle = path.resolve(tmpDir, 'tmp-read.txt');
const buffer = Buffer.from('Hello world'.repeat(100), 'utf8');

fs.writeFileSync(filePathForHandle, buffer);
Comment thread
aduh95 marked this conversation as resolved.
Outdated

const fileHandle = await open(filePathForHandle);

const chunks = [];
for await (const chunk of fileHandle.readStream()) {
chunks.push(chunk);
}

const arrayBuffer = await new Blob(chunks).arrayBuffer();
assert.deepStrictEqual(Buffer.from(arrayBuffer), buffer);
}

Promise.all([
validateWrite(),
validateRead(),
]).then(common.mustCall());