8000
We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent c97fb91 commit 6b1a887Copy full SHA for 6b1a887
doc/api/worker.md
@@ -240,7 +240,7 @@ Most Node.js APIs are available inside of it.
240
Notable differences inside a Worker environment are:
241
242
- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][]
243
- properties are set to `null`.
+ may be redirected by the parent thread.
244
- The [`require('worker').isMainThread`][] property is set to `false`.
245
- The [`require('worker').parentPort`][] message port is available,
246
- [`process.exit()`][] does not stop the whole program, just the single thread,
@@ -313,6 +313,13 @@ if (isMainThread) {
313
described in the [HTML structured clone algorithm][], and an error will be
314
thrown if the object cannot be cloned (e.g. because it contains
315
`function`s).
316
+ * stdin {boolean} If this is set to `true`, then `worker.stdin` will
317
+ provide a writable stream whose contents will appear as `process.stdin`
318
+ inside the Worker. By default, no data is provided.
319
+ * stdout {boolean} If this is set to `true`, then `worker.stdout` will
320
+ not automatically be piped through to `process.stdout` in the parent.
321
+ * stderr {boolean} If this is set to `true`, then `worker.stderr` will
322
+ not automatically be piped through to `process.stderr` in the parent.
323
324
### Event: 'error'
325
<!-- YAML
@@ -377,6 +384,41 @@ Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will
377
384
behavior). If the worker is `ref()`ed, calling `ref()` again will have
378
385
no effect.
379
386
387
+### worker.stderr
388
+<!-- YAML
389
+added: REPLACEME
390
+-->
391
+
392
+* {stream.Readable}
393
394
+This is a readable stream which contains data written to [`process.stderr`][]
395
+inside the worker thread. If `stderr: true` was not passed to the
396
+[`Worker`][] constructor, then data will be piped to the parent thread's
397
+[`process.stderr`][] stream.
398
399
+### worker.stdin
400
401
402
403
404
+* {null|stream.Writable}
405
406
+If `stdin: true` was passed to the [`Worker`][] constructor, this is a
407
+writable stream. The data written to this stream will be made available in
408
+the worker thread as [`process.stdin`][].
409
410
+### worker.stdout
411
412
413
414
415
416
417
+This is a readable stream which contains data written to [`process.stdout`][]
418
+inside the worker thread. If `stdout: true` was not passed to the
419
420
+[`process.stdout`][] stream.
421
380
422
### worker.terminate([callback])
381
423
382
424
added: REPLACEME
lib/internal/process/stdio.js
@@ -6,7 +6,10 @@ const {
6
ERR_UNKNOWN_STDIN_TYPE,
7
ERR_UNKNOWN_STREAM_TYPE
8
} = require('internal/errors').codes;
9
-const { isMainThread } = require('internal/worker');
+const {
10
+ isMainThread,
11
+ workerStdio
12
+} = require('internal/worker');
13
14
exports.setup = setupStdio;
15
@@ -17,8 +20,7 @@ function setupStdio() {
17
20
18
21
function getStdout() {
19
22
if (stdout) return stdout;
- if (!isMainThread)
- return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
23
+ if (!isMainThread) return workerStdio.stdout;
24
stdout = createWritableStdioStream(1);
25
stdout.destroySoon = stdout.destroy;
26
stdout._destroy = function(er, cb) {
@@ -34,8 +36,7 @@ function setupStdio() {
34
36
35
37
function getStderr() {
38
if (stderr) return stderr;
39
+ if (!isMainThread) return workerStdio.stderr;
40
stderr = createWritableStdioStream(2);
41
stderr.destroySoon = stderr.destroy;
42
stderr._destroy = function(er, cb) {
@@ -51,8 +52,7 @@ function setupStdio() {
51
52
53
function getStdin() {
54
if (stdin) return stdin;
55
- return new (require('stream').Readable)({ read() { this.push(null); } });
+ if (!isMainThread) return workerStdio.stdin;
56
57
const tty_wrap = process.binding('tty_wrap');
58
const fd = 0;
lib/internal/worker.js
@@ -5,6 +5,7 @@ const EventEmitter = require('events');
5
const assert = require('assert');
const path = require('path');
const util = require('util');
+const { Readable, Writable } = require('stream');
const {
ERR_INVALID_ARG_TYPE,
ERR_WORKER_NEED_ABSOLUTE_PATH,
@@ -29,13 +30,20 @@ const isMainThread = threadId === 0;
29
30
31
const kOnMessageListener = Symbol('kOnMessageListener');
32
const kHandle = Symbol('kHandle');
33
+const kName = Symbol('kName');
const kPort = Symbol('kPort');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
const kOnExit = Symbol('kOnExit');
const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
+const kParentSideStdio = Symbol('kParentSideStdio');
+const kWritableCallbacks = Symbol('kWritableCallbacks');
43
+const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
44
+const kStartedReading = Symbol('kStartedReading');
45
+const kWaitingStreams = Symbol('kWaitingStreams');
46
+const kIncrementsPortRef = Symbol('kIncrementsPortRef');
47
48
const debug = util.debuglog('worker');
49
@@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
129
137
}
130
138
131
139
140
+class ReadableWorkerStdio extends Readable {
141
+ constructor(port, name) {
142
+ super();
143
+ this[kPort] = port;
144
+ this[kName] = name;
145
+ this[kIncrementsPortRef] = true;
146
+ this[kStartedReading] = false;
147
+ this.on('end', () => {
148
+ if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
149
+ this[kPort].unref();
150
+ });
151
+ }
152
153
+ _read() {
154
+ if (!this[kStartedReading] && this[kIncrementsPortRef]) {
155
+ this[kStartedReading] = true;
156
+ if (this[kPort][kWaitingStreams]++ === 0)
157
+ this[kPort].ref();
158
159
160
+ this[kPort].postMessage({
161
+ type: 'stdioWantsMoreData',
162
+ stream: this[kName]
163
164
165
+}
166
167
+class WritableWorkerStdio extends Writable {
168
169
+ super({ decodeStrings: false });
170
171
172
+ this[kWritableCallbacks] = [];
173
174
175
+ _write(chunk, encoding, cb) {
176
177
+ type: 'stdioPayload',
178
+ stream: this[kName],
179
+ chunk,
180
+ encoding
181
182
+ this[kWritableCallbacks].push(cb);
183
184
185
186
187
+ _final(cb) {
188
189
190
191
+ chunk: null
192
193
+ cb();
194
195
196
+ [kStdioWantsMoreDataCallback]() {
197
+ const cbs = this[kWritableCallbacks];
198
199
+ for (const cb of cbs)
200
201
+ if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
202
203
204
205
132
206
class Worker extends EventEmitter {
133
207
constructor(filename, options = {}) {
134
208
super();
@@ -154,8 +228,25 @@ class Worker extends EventEmitter {
228
this[kPort].on('message', (data) => this[kOnMessage](data));
229
this[kPort].start();
230
this[kPort].unref();
231
+ this[kPort][kWaitingStreams] = 0;
232
debug(`[${threadId}] created Worker with ID ${this.threadId}`);
233
234
+ let stdin = null;
235
+ if (options.stdin)
236
+ stdin = new WritableWorkerStdio(this[kPort], 'stdin');
237
+ const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
238
+ if (!options.stdout) {
239
+ stdout[kIncrementsPortRef] = false;
+ pipeWithoutWarning(stdout, process.stdout);
+ const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
+ if (!options.stderr) {
+ stderr[kIncrementsPortRef] = false;
+ pipeWithoutWarning(stderr, process.stderr);
247
248
+ this[kParentSideStdio] = { stdin, stdout, stderr };
249
250
const { port1, port2 } = new MessageChannel();
251
this[kPublicPort] = port1;
252
this[kPublicPort].on('message', (message) => this.emit('message', message));
@@ -165,7 +256,8 @@ class Worker extends EventEmitter {
256
filename,
257
doEval: !!options.eval,
258
workerData: options.workerData,
- publicPort: port2
259
+ publicPort: port2,
260
+ hasStdin: !!options.stdin
261
}, [port2]);
262
// Actually start the new thread now that everything is in place.
263
this[kHandle].startThread();
@@ -197,6 +289,16 @@ class Worker extends EventEmitter {
289
return this[kOnCouldNotSerializeErr]();
290
case 'errorMessage':
291
return this[kOnErrorMessage](message.error);
292
+ case 'stdioPayload':
293
+ {
294
+ const { stream, chunk, encoding } = message;
295
+ return this[kParentSideStdio][stream].push(chunk, encoding);
296
297
+ case 'stdioWantsMoreData':
298
299
+ const { stream } = message;
300
+ return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
301
302
303
304
assert.fail(`Unknown worker message type ${message.type}`);
@@ -207,6 +309,18 @@ class Worker extends EventEmitter {
309
this[kHandle] = null;
310
this[kPort] = null;
209
311
this[kPublicPort] = null;
312
+ const { stdout, stderr } = this[kParentSideStdio];
+ this[kParentSideStdio] = null;
+ if (!stdout._readableState.ended) {
+ debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
+ stdout.push(null);
+ if (!stderr._readableState.ended) {
+ debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
+ stderr.push(null);
210
211
212
326
postMessage(...args) {
@@ -243,6 +357,27 @@ class Worker extends EventEmitter {
357
358
return this[kHandle].threadId;
359
360
361
+ get stdin() {
362
+ return this[kParentSideStdio].stdin;
363
364
365
+ get stdout() {
366
+ return this[kParentSideStdio].stdout;
367
368
369
+ get stderr() {
370
+ return this[kParentSideStdio].stderr;
371
372
373
374
+const workerStdio = {};
375
+if (!isMainThread) {
376
+ const port = getEnvMessagePort();
+ port[kWaitingStreams] = 0;
+ workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
+ workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
+ workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
383
let originalFatalException;
@@ -256,10 +391,14 @@ function setupChild(evalScript) {
port.on('message', (message) => {
if (message.type === 'loadScript') {
- const { filename, doEval, workerData, publicPort } = message;
+ const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
setupPortReferencing(publicPort, publicPort, 'message');
publicWorker.workerData = workerData;
+ if (!hasStdin)
+ workerStdio.stdin.push(null);
debug(`[${threadId}] starts worker script ${filename} ` +
264
`(eval = ${eval}) at cwd = ${process.cwd()}`);
265
port.unref();
@@ -271,6 +410,14 @@ function setupChild(evalScript) {
271
require('module').runMain();
272
273
return;
+ } else if (message.type === 'stdioPayload') {
+ workerStdio[stream].push(chunk, encoding);
+ return;
+ } else if (message.type === 'stdioWantsMoreData') {
+ workerStdio[stream][kStdioWantsMoreDataCallback]();
274
275
276
@@ -317,11 +464,24 @@ function deserializeError(error) {
464
error.byteLength).toString('utf8');
465
466
467
+function pipeWithoutWarning(source, dest) {
468
+ const sourceMaxListeners = source._maxListeners;
469
+ const destMaxListeners = dest._maxListeners;
470
+ source.setMaxListeners(Infinity);
471
+ dest.setMaxListeners(Infinity);
472
473
+ source.pipe(dest);
474
475
+ source._maxListeners = sourceMaxListeners;
476
+ dest._maxListeners = destMaxListeners;
477
478
479
module.exports = {
480
MessagePort,
481
MessageChannel,
482
threadId,
483
Worker,
484
setupChild,
- isMainThread
485
486
327
487
};