Skip to content

Commit 31d6b31

Browse files
committed
Implement mutex notifications using MessagePort
1 parent 1f13497 commit 31d6b31

1 file changed

Lines changed: 92 additions & 0 deletions

File tree

JavaScript/8-async.js

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
'use strict';
2+
3+
const threads = require('worker_threads');
4+
const { Worker, isMainThread } = threads;
5+
6+
const LOCKED = 0;
7+
const UNLOCKED = 1;
8+
9+
class Mutex {
10+
constructor(messagePort, shared, offset = 0, initial = false) {
11+
this.port = messagePort;
12+
this.lock = new Int32Array(shared, offset, 1);
13+
if (initial) Atomics.store(this.lock, 0, UNLOCKED);
14+
this.owner = false;
15+
this.resolve = null;
16+
if (messagePort) {
17+
messagePort.on('message', kind => {
18+
if (kind === 'leave') this.tryEnter();
19+
});
20+
}
21+
}
22+
23+
enter() {
24+
return new Promise(resolve => {
25+
this.resolve = resolve;
26+
this.tryEnter();
27+
});
28+
}
29+
30+
tryEnter() {
31+
if (!this.resolve) return;
32+
let prev = Atomics.exchange(this.lock, 0, LOCKED);
33+
if (prev === UNLOCKED) {
34+
this.owner = true;
35+
this.resolve();
36+
this.resolve = null;
37+
}
38+
};
39+
40+
leave() {
41+
if (!this.owner) return false;
42+
Atomics.store(this.lock, 0, UNLOCKED);
43+
this.port.postMessage('leave');
44+
this.owner = false;
45+
return true;
46+
}
47+
}
48+
49+
class Thread {
50+
constructor(data) {
51+
const worker = new Worker(__filename, { workerData: data });
52+
this.worker = worker;
53+
Thread.workers.add(worker);
54+
worker.on('message', kind => {
55+
for (const next of Thread.workers) {
56+
if (next !== worker) {
57+
next.postMessage(kind);
58+
}
59+
}
60+
})
61+
}
62+
}
63+
64+
Thread.workers = new Set();
65+
66+
// Usage
67+
68+
if (isMainThread) {
69+
const buffer = new SharedArrayBuffer(4);
70+
const mutex = new Mutex(null, buffer, 0, true);
71+
console.dir({ mutex });
72+
new Thread(buffer);
73+
new Thread(buffer);
74+
} else {
75+
const { threadId, workerData, parentPort } = threads;
76+
const mutex = new Mutex(parentPort, workerData);
77+
78+
setInterval(() => {
79+
console.log(`Interval ${threadId}`);
80+
}, 500);
81+
82+
const f = async () => {
83+
await mutex.enter();
84+
console.log(`Enter ${threadId}`);
85+
setTimeout(() => {
86+
mutex.leave();
87+
console.log(`Leave ${threadId}`);
88+
setTimeout(f, 0);
89+
}, 5000);
90+
};
91+
f();
92+
}

0 commit comments

Comments
 (0)