Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions lib/internal/atomics/wait_async.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

const {
PromisePrototypeThen
} = primordials;

const timers = require('timers');

const keepAliveInterval = 2 ** 31 - 1;

let pendingWaiters = 0;
let keepAliveHandle = null;

function maybeStopKeepAlive() {
if (--pendingWaiters === 0) {
timers.clearInterval(keepAliveHandle);
keepAliveHandle = null;
}
}

function trackWaitAsyncResult(result) {
if (!result.async) {
return result;
}

if (++pendingWaiters === 1) {
keepAliveHandle = timers.setInterval(() => {}, keepAliveInterval);
}

PromisePrototypeThen(result.value, maybeStopKeepAlive, maybeStopKeepAlive);
return result;
}

module.exports = {
trackWaitAsyncResult,
};
24 changes: 24 additions & 0 deletions lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
/* global process, require, internalBinding, primordials */

const {
AtomicsWaitAsync,
FunctionPrototypeCall,
JSONParse,
Number,
Expand All @@ -62,11 +63,14 @@ const {
ObjectFreeze,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Proxy,
ReflectApply,
SymbolToStringTag,
globalThis,
} = primordials;
const config = internalBinding('config');
const internalTimers = require('internal/timers');
const { trackWaitAsyncResult } = require('internal/atomics/wait_async');
const { defineOperation } = require('internal/util');
const {
validateInteger,
Expand Down Expand Up @@ -414,6 +418,26 @@ internalBinding('process_methods').setEmitWarningSync(emitWarningSync);
process.getBuiltinModule = getBuiltinModule;
}

// Patch Atomics.waitAsync to hold an event-loop reference while async waiters
// are pending. Without this, Node.js exits if Atomics.waitAsync() is the only
// outstanding operation and no other ref'd handle (timer, socket, worker) is
// active, because the underlying libuv handles are unreferenced.
{
const wrappedAtomicsWaitAsync = new Proxy(AtomicsWaitAsync, {
__proto__: null,
apply(target, thisArg, args) {
return trackWaitAsyncResult(ReflectApply(target, thisArg, args));
}
});

ObjectDefineProperty(globalThis.Atomics, 'waitAsync', {
__proto__: null,
value: wrappedAtomicsWaitAsync,
writable: true,
configurable: true,
});
}

function setupProcessObject() {
const EventEmitter = require('events');
const origProcProto = ObjectGetPrototypeOf(process);
Expand Down
7 changes: 6 additions & 1 deletion lib/internal/worker/messaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const {
const {
constructSharedArrayBuffer,
} = require('internal/util');
const {
trackWaitAsyncResult,
} = require('internal/atomics/wait_async');

const {
codes: {
Expand Down Expand Up @@ -204,7 +207,9 @@ async function postMessageToThread(threadId, value, transferList, timeout) {

const memory = constructSharedArrayBuffer(WORKER_MESSAGING_SHARED_DATA);
const status = new Int32Array(memory);
const promise = AtomicsWaitAsync(status, WORKER_MESSAGING_STATUS_INDEX, 0, timeout).value;
const promise =
trackWaitAsyncResult(
AtomicsWaitAsync(status, WORKER_MESSAGING_STATUS_INDEX, 0, timeout)).value;

const message = {
type: messageTypes.SEND_MESSAGE_TO_WORKER,
Expand Down
121 changes: 121 additions & 0 deletions test/parallel/test-atomics-waitasync-event-loop.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { Worker } from 'node:worker_threads';
import assert from 'node:assert';

function makeInt32(initialValue) {
const sab = new SharedArrayBuffer(4);
const view = new Int32Array(sab, 0, 1);
Atomics.store(view, 0, initialValue);
return view;
}

{
const view = makeInt32(0);
assert.strictEqual(
Object.prototype.hasOwnProperty.call(Atomics.waitAsync, 'prototype'),
false);
assert.throws(() => new Atomics.waitAsync(view, 0, 0), {
name: 'TypeError',
});
}

// No timeout, notified by a worker thread
// Must not exit early and the promise must resolve with 'ok'
{
const view = makeInt32(0);

const workerCode = `
import { workerData, parentPort } from 'node:worker_threads';
const view = new Int32Array(workerData, 0, 1);
setTimeout(() => {
Atomics.store(view, 0, 1);
Atomics.notify(view, 0, 1);
}, 50);
parentPort.postMessage('ready');
`;

const worker = new Worker(workerCode, { eval: true, workerData: view.buffer });
await new Promise((resolve) => worker.once('message', resolve));
worker.unref(); // only the waitAsync should keep the loop alive

const result = Atomics.waitAsync(view, 0, 0);
assert.strictEqual(result.async, true, 'should be async');
const value = await result.value;
assert.strictEqual(value, 'ok', `expected 'ok', got "${value}"`);
}

// With timeout, resolved by notify
// A timeout is specified but the notify arrives first. Must resolve to "ok".
{
const view = makeInt32(0);

const workerCode = `
import { workerData, parentPort } from 'node:worker_threads';
const view = new Int32Array(workerData, 0, 1);
setTimeout(() => {
Atomics.store(view, 0, 1);
Atomics.notify(view, 0, 1);
}, 50);
parentPort.postMessage('ready');
`;

const worker = new Worker(workerCode, { eval: true, workerData: view.buffer });
await new Promise((resolve) => worker.once('message', resolve));
worker.unref();

const result = Atomics.waitAsync(view, 0, 0, 5_000 /* 5 s – won't fire */);
assert.strictEqual(result.async, true);
const value = await result.value;
assert.strictEqual(value, 'ok');
}

// With timeout, resolved by timeout itself
// No notify ever fires; the promise must resolve to "timed-out" via the timer.
{
const view = makeInt32(0);

const result = Atomics.waitAsync(view, 0, 0, 30);
assert.strictEqual(result.async, true);
const value = await result.value;
assert.strictEqual(value, 'timed-out');
}

// Multiple concurrent waiters
// All promises must resolve and the loop must not exit until the last one does.
{
const view = makeInt32(0);

const workerCode = `
import { workerData, parentPort } from 'node:worker_threads';
const view = new Int32Array(workerData, 0, 1);
setTimeout(() => {
Atomics.store(view, 0, 1);
Atomics.notify(view, 0, 3); // wake up to 3 waiters
}, 50);
parentPort.postMessage('ready');
`;

const worker = new Worker(workerCode, { eval: true, workerData: view.buffer });
await new Promise((resolve) => worker.once('message', resolve));
worker.unref();

const [r1, r2, r3] = [
Atomics.waitAsync(view, 0, 0),
Atomics.waitAsync(view, 0, 0),
Atomics.waitAsync(view, 0, 0),
];
assert.strictEqual(r1.async, true);
assert.strictEqual(r2.async, true);
assert.strictEqual(r3.async, true);
const values = await Promise.all([r1.value, r2.value, r3.value]);
assert.deepStrictEqual(values, ['ok', 'ok', 'ok']);
}

// Immediate synchronous resolution (value mismatch)
// When the current value does not equal the expected value, waitAsync must
// return { async: false, value: "not-equal" } and not ref any handle.
{
const view = makeInt32(99); // already != 0
const result = Atomics.waitAsync(view, 0, 0);
assert.strictEqual(result.async, false);
assert.strictEqual(result.value, 'not-equal');
}
20 changes: 20 additions & 0 deletions test/parallel/test-worker-messaging-event-loop-ref.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import '../common/index.mjs';
import assert from 'node:assert';
import {
Worker,
postMessageToThread,
} from 'node:worker_threads';

// postMessageToThread() uses an internal Atomics.waitAsync path, which should keep the event loop alive while awaiting the response.
const workerCode = `
const { parentPort } = require('node:worker_threads');
process.on('workerMessage', () => {});
parentPort.postMessage('ready');
`;

const worker = new Worker(workerCode, { eval: true });
await new Promise((resolve) => worker.once('message', resolve));
worker.unref();

await assert.doesNotReject(postMessageToThread(worker.threadId, { hello: 1 }));
await worker.terminate();