Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
async_hooks: add AsyncLocal class
This introduces a new API to provide asynchronous storage via a new
AsyncLocal class. Besides setting a value which is passed along
asynchronous invocations it allows to register a callback to get
informed about changes of the current value.

The implementation is based on async_hooks but it doesn't expose
internals like execution Ids, resources or the hooks itself.

Naming and implementation is inspired by .NET AsyncLocal.
  • Loading branch information
Flarna committed Dec 16, 2019
commit 2afe57a977fddc3ee3545a80444a52bcbd55142a
84 changes: 84 additions & 0 deletions doc/api/async_hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,90 @@ never be called.
* Returns: {number} The same `triggerAsyncId` that is passed to the
`AsyncResource` constructor.

## Class: AsyncLocal
<!-- YAML
added: REPLACEME
-->

This class can be used to set a value which follows asynchronous control flow.
An `AsyncLocal` instance is a key into a continuation local storage.
The value set on an `AsyncLocal` instance is propagated to any async
continuation triggered within this flow. Modification of the value are done via
"copy on write", therefore already created continuations are not effected by
setting a new value, only continuations created afterwards.

The implementation relys on async hooks to follow the execution flow. Therefore
if some library is not interacting well with async hooks (e.g. it does user
space queuing) it will result in the same problems with `AsyncLocal`. To
correct this such modules should use the `AsyncResource` class.

### Example

```js
const http = require('http');
const wait = require('util').promisify(setTimeout);

const asyncLocal = new AsyncLocal();

function print(...args) {
console.log(`${asyncLocal.value || '-'}:`, ...args);
}

http.createServer(async (req, res) => {
asyncLocal.value = `${req.method}:${req.url}`;
print('start');

setImmediate(async () => {
print('next');
asyncLocal.value = `${asyncLocal.value}:split`;
await wait(10);
print('branched');
});

await wait(100);

print('done');
res.end();
}).listen(8181);
http.get('http://localhost:8181/first');
http.get('http://localhost:8181/second');
// Prints:
// GET:/first: start
// GET:/second: start
// GET:/second: next
// GET:/first: next
// GET:/second:split: branched
// GET:/first:split: branched
// GET:/first: done
// GET:/second: done
```

### new AsyncLocal([options])

* `options` {Object}
* `onChangedCb` {Function} Optional callback invoked whenever a value of an
`AsyncLocal` changes.

Creates a new instance of an `AsyncLocal`. Once a value is set it's propagated
to async continuations until it is cleared.

The optional `onChangedCb` callback
signals changes of the value referenced by the `AsyncLocal` instance. The first
argument is the previous value, the second argument holds the current value and
the third argument is a boolean set to `true` if change is caused by a change
of the execution context and `false` if a new value was assinged to
`AsyncLocal.value`.

### asyncLocal.value

Reading this value returns the current value associated with this execution
path execution context (async id).

The value written stored in a persistent storage for the current asychronous
execution path. Writting `null` or `undefined` clears the value and stops
further tracking on this execution path.


[`after` callback]: #async_hooks_after_asyncid
[`before` callback]: #async_hooks_before_asyncid
[`destroy` callback]: #async_hooks_destroy_asyncid
Expand Down
122 changes: 121 additions & 1 deletion lib/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const {

const {
ERR_ASYNC_CALLBACK,
ERR_INVALID_ASYNC_ID
ERR_INVALID_ASYNC_ID,
ERR_INVALID_ARG_TYPE
} = require('internal/errors').codes;
const { validateString } = require('internal/validators');
const internal_async_hooks = require('internal/async_hooks');
Expand Down Expand Up @@ -200,6 +201,123 @@ class AsyncResource {
}


// AsyncLocal //

const kStack = Symbol('stack');
const kIsFirst = Symbol('is-first');
const kMap = Symbol('map');
const kOnChangedCb = Symbol('on-changed-cb');
const kHooks = Symbol('hooks');
const kSet = Symbol('set');

class AsyncLocal {
constructor(options = {}) {
if (typeof options !== 'object' || options === null)
throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);

const { onChangedCb = null } = options;
if (onChangedCb !== null && typeof onChangedCb !== 'function')
throw new ERR_INVALID_ARG_TYPE('options.onChangedCb',
'function',
onChangedCb);

this[kOnChangedCb] = onChangedCb;
this[kMap] = new Map();

const fns = {
init: (asyncId, type, triggerAsyncId, resource) => {
// Propagate value from current id to new (execution graph)
const value = this[kMap].get(executionAsyncId());
Comment thread
Flarna marked this conversation as resolved.
Outdated
if (value)
this[kMap].set(asyncId, value);
},

destroy: (asyncId) => this[kSet](asyncId, null),
};

if (this[kOnChangedCb]) {
// Change notification requires to keep a stack of async local values
this[kStack] = [];
// Indicates that first value was stored (before callback "missing")
this[kIsFirst] = true;

// Use before/after hooks to signal changes because of execution
fns.before = (asyncId) => {
const stack = this[kStack];
const cVal = this[kMap].get(asyncId);
const pVal = stack[stack.length - 1];
stack.push(pVal);
Comment thread
Flarna marked this conversation as resolved.
Outdated
if (cVal !== pVal)
this[kOnChangedCb](pVal, cVal, true);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invocation is very risky. User callbask should be called outside of async hooks.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, then it will not work as moving it outside of async hooks would require something like nextTick which is a new async operation. As a result the signaled values and the real values don't match anymore.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this cannot be made safe, I don't think supporting this "onchanged" behavior is compatible for the goal of "ease of use" that this API is supposed to have.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will improve docs in this, add a check to catch recursion cases (e.g. onChangedCb sets a new value) and handle exceptions thrown by onChangedCb. Hopefully this renders this callback as save as all the other callbacks which can be registered in node core.

};

fns.after = (asyncId) => {
const stack = this[kStack];
const pVal = this[kMap].get(asyncId);
stack.pop();
const cVal = stack[stack.length - 1];
if (cVal !== pVal)
this[kOnChangedCb](pVal, cVal, true);
};
}
this[kHooks] = createHook(fns);
}

set value(val) {
val = val === null ? undefined : val;
const id = executionAsyncId();
const onChangedCb = this[kOnChangedCb];
let pVal;
if (onChangedCb)
pVal = this[kMap].get(id);

this[kSet](id, val);

if (onChangedCb && pVal !== val)
onChangedCb(pVal, val, false);
}

get value() {
return this[kMap].get(executionAsyncId());
}

[kSet](id, val) {
const map = this[kMap];

if (val == null) {
map.delete(id);
if (map.size === 0)
this[kHooks].disable();

if (this[kOnChangedCb]) {
if (map.size === 0) {
// Hooks have been disabled so next set is the first one
this[kStack] = [];
this[kIsFirst] = true;
} else {
const stack = this[kStack];
stack[stack.length - 1] = undefined;
}
}
} else {
map.set(id, val);
if (map.size === 1)
this[kHooks].enable();

if (this[kOnChangedCb]) {
const stack = this[kStack];
if (this[kIsFirst]) {
// First value set => "simulate" before hook
this[kIsFirst] = false;
stack.push(val);
} else {
stack[stack.length - 1] = val;
}
}
}
}
}

// Placing all exports down here because the exported classes won't export
// otherwise.
module.exports = {
Expand All @@ -209,4 +327,6 @@ module.exports = {
triggerAsyncId,
// Embedder API
AsyncResource,
// CLS API
AsyncLocal
};
Loading