Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup! worker: implement Web Locks API
  • Loading branch information
addaleax committed Dec 14, 2020
commit 0fed56def24aad539f09278ef39291603d01b8ab
54 changes: 28 additions & 26 deletions lib/internal/worker/locks.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {

const kName = Symbol('kName');
const kMode = Symbol('kMode');
const MUTEX_UNOWNED = -1;

// Class that contains all the state around active and pending locks.
class LockInternals {
Expand Down Expand Up @@ -73,8 +74,8 @@ class LockInternals {
// lock requests.
lockIdCounter: new BigUint64Array(sab, 0, 1),
// The mutex used to ensure that no two threads access the internal
// state at the same time. Contains -1 if unlocked, and the locker's
// threadId otherwise.
// state at the same time. Contains MUTEX_UNOWNED if unlocked, and the
// locker's threadId otherwise.
mutexOwner: new Int32Array(sab, 8, 1),
// Lists of the lock (request)s in their individual states.
held: [],
Expand Down Expand Up @@ -105,7 +106,8 @@ class LockInternals {
// internal state.
const mutex = this.state.mutexOwner;
let owner;
while ((owner = Atomics.compareExchange(mutex, 0, -1, threadId)) !== -1) {
while ((owner = Atomics.compareExchange(
mutex, 0, MUTEX_UNOWNED, threadId)) !== MUTEX_UNOWNED) {
// We use a timeout to make sure that we still receive messages from
// threads that have terminated. It should be very rare to actually
// end up blocked on this, because the first other thread to observe
Expand All @@ -123,8 +125,8 @@ class LockInternals {
if (publish) {
this.publishState();
}
const owner =
Atomics.compareExchange(this.state.mutexOwner, 0, threadId, -1);
const owner = Atomics.compareExchange(
this.state.mutexOwner, 0, threadId, MUTEX_UNOWNED);
Atomics.notify(this.state.mutexOwner, 0);
assert(owner === threadId);
}
Expand All @@ -145,13 +147,8 @@ class LockInternals {
clientId: this.clientId,
id: Atomics.add(this.state.lockIdCounter, 0, 1n)
};
let availableResolve, availableReject;
const available = new Promise((res, rej) => {
availableResolve = res;
availableReject = rej;
});
this.availableMap.set(request.id, {
resolve: availableResolve, reject: availableReject
const available = new Promise((resolve, reject) => {
this.availableMap.set(request.id, { resolve, reject });
});
this.resolveMap.set(request.id, { resolve, reject });

Expand Down Expand Up @@ -298,7 +295,8 @@ class LockInternals {
switch (message.op) {
case 'terminated': {
const { threadId, clientId } = message;
Atomics.compareExchange(this.state.mutexOwner, 0, threadId, -1);
Atomics.compareExchange(
this.state.mutexOwner, 0, threadId, MUTEX_UNOWNED);
Atomics.notify(this.state.mutexOwner, 0);
queueMicrotask(() => {
this.lockMutex();
Expand All @@ -318,33 +316,37 @@ class LockInternals {
}
}
}
this.scheduleQueueProcessing();
this.unlockMutex();
});
break;
}
case 'stateUpdate': {
const { state } = message;
this.state = state;
if (!this.queueProcessingScheduled) {
// Enueue a call to processLockRequestQueue().
this.queueProcessingScheduled = true;
queueMicrotask(() => {
this.queueProcessingScheduled = false;
this.lockMutex();
const didModify = this.processLockRequestQueue();
// Only publish the state information again in case we did make
// a change of some kind to avoid snowballing with state update
// messages.
this.unlockMutex(didModify);
});
}
// Enueue a call to processLockRequestQueue().
this.scheduleQueueProcessing();
break;
}
default:
assert(false);
}
}

scheduleQueueProcessing() {
if (this.queueProcessingScheduled) return;
this.queueProcessingScheduled = true;
queueMicrotask(() => {
this.queueProcessingScheduled = false;
this.lockMutex();
const didModify = this.processLockRequestQueue();
// Only publish the state information again in case we did make
// a change of some kind to avoid snowballing with state update
// messages.
this.unlockMutex(didModify);
});
}

snapshot() {
this.readSynchronousMessages();
const printLock = ({ name, shared, clientId }) =>
Expand Down
5 changes: 3 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,12 @@ enum class MessageMode {
NORMAL,

// Sticky messages. Like normal, but also delivered as the first message for
// new message ports in the sibling group.
// new message ports in the sibling group. A new sticky message replaces
// previous ones for the group.
STICKY,
Comment thread
addaleax marked this conversation as resolved.
Outdated

// Close message. Will only be delivered right before the sending message port
// is being closed.
// is being closed. A new onclose message replaces previous ones for the port.
ONCLOSE
Comment thread
addaleax marked this conversation as resolved.
Outdated
};

Expand Down
5 changes: 3 additions & 2 deletions test/parallel/test-worker-locks.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
const { mustCall, mustNotCall } = require('../common');
const assert = require('assert');
const { locks, Worker } = require('worker_threads');
const { EventEmitter, once } = require('events');
const { EventEmitter, once, getEventListeners } = require('events');

const tick = () => new Promise((resolve) => setImmediate(resolve));
const { setImmediate: tick } = require('timers/promises');

(async () => {
{
Expand Down Expand Up @@ -124,6 +124,7 @@ const tick = () => new Promise((resolve) => setImmediate(resolve));
controller.abort();
}));
assert.deepStrictEqual(await locks.query(), { held: [], pending: [] });
Comment thread
jasnell marked this conversation as resolved.
Outdated
assert.deepStrictEqual(getEventListeners(controller.signal, 'abort'), []);
}

{
Expand Down