8000
We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent cb5ed45 commit 684684eCopy full SHA for 684684e
lib/internal/child_process.js
@@ -455,7 +455,10 @@ function setupChannel(target, channel) {
455
var jsonBuffer = '';
456
var pendingHandle = null;
457
channel.buffering = false;
458
- channel.onread = function(nread, pool, recvHandle) {
+ channel.pendingHandle = null;
459
+ channel.onread = function(nread, pool) {
460
+ const recvHandle = channel.pendingHandle;
461
462
// TODO(bnoordhuis) Check that nread > 0.
463
if (pool) {
464
if (recvHandle)
src/env.h
@@ -215,6 +215,7 @@ class ModuleWrap;
215
V(owner_string, "owner") \
216
V(parse_error_string, "Parse Error") \
217
V(path_string, "path") \
218
+ V(pending_handle_string, "pendingHandle") \
219
V(pbkdf2_error_string, "PBKDF2 Error") \
220
V(pid_string, "pid") \
221
V(pipe_string, "pipe") \
src/stream_base-inl.h
@@ -33,9 +33,7 @@ inline StreamListener::~StreamListener() {
33
34
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
35
CHECK_NE(previous_listener_, nullptr);
36
- previous_listener_->OnStreamRead(nread,
37
- uv_buf_init(nullptr, 0),
38
- UV_UNKNOWN_HANDLE);
+ previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
39
}
40
41
@@ -85,12 +83,10 @@ inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
85
83
return listener_->OnStreamAlloc(suggested_size);
86
84
87
88
-inline void StreamResource::EmitRead(ssize_t nread,
89
- const uv_buf_t& buf,
90
- uv_handle_type pending) {
+inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
91
if (nread > 0)
92
bytes_read_ += static_cast<uint64_t>(nread);
93
- listener_->OnStreamRead(nread, buf, pending);
+ listener_->OnStreamRead(nread, buf);
94
95
96
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
src/stream_base.cc
@@ -437,23 +437,17 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
437
438
439
440
-void StreamBase::CallJSOnreadMethod(ssize_t nread,
441
- Local<Object> buf,
442
- Local<Object> handle) {
+void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
443
Environment* env = env_;
444
445
Local<Value> argv[] = {
446
Integer::New(env->isolate(), nread),
447
- buf,
448
- handle
+ buf
449
};
450
451
if (argv[1].IsEmpty())
452
argv[1] = Undefined(env->isolate());
453
454
- if (argv[2].IsEmpty())
- argv[2] = Undefined(env->isolate());
-
AsyncWrap* wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
@@ -495,19 +489,6 @@ uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
495
489
return uv_buf_init(Malloc(suggested_size), suggested_size);
496
490
497
491
498
-void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
499
- // This cannot be virtual because it is just as valid to override the other
500
- // OnStreamRead() callback.
501
- CHECK(0 && "OnStreamRead() needs to be implemented");
502
-}
503
504
-void StreamListener::OnStreamRead(ssize_t nread,
505
506
507
- CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
508
- OnStreamRead(nread, buf);
509
510
511
492
512
493
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
513
494
CHECK_NE(stream_, nullptr);
src/stream_base.h
@@ -150,17 +150,8 @@ class StreamListener {
150
// with base nullpptr in case of an error.
151
// `nread` is the number of read bytes (which is at most the buffer length),
152
// or, if negative, a libuv error code.
153
- // The variant with a `uv_handle_type` argument is used by libuv-backed
154
- // streams for handle transfers (e.g. passing net.Socket instances between
155
- // cluster workers). For all other streams, overriding the simple variant
156
- // should be sufficient.
157
- // By default, the second variant crashes if `pending` is set and otherwise
158
- // calls the simple variant.
159
virtual void OnStreamRead(ssize_t nread,
160
const uv_buf_t& buf) = 0;
161
- virtual void OnStreamRead(ssize_t nread,
162
163
- uv_handle_type pending);
164
165
// This is called once a Write has finished. `status` may be 0 or,
166
// if negative, a libuv error code.
@@ -229,9 +220,7 @@ class StreamResource {
229
uv_buf_t EmitAlloc(size_t suggested_size);
230
// Call the current listener's OnStreamRead() method and update the
231
222
// stream's read byte counter.
232
- void EmitRead(ssize_t nread,
233
- const uv_buf_t& buf = uv_buf_init(nullptr, 0),
234
- uv_handle_type pending = UV_UNKNOWN_HANDLE);
223
+ void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
235
224
// Call the current listener's OnStreamAfterWrite() method.
236
225
void EmitAfterWrite(WriteWrap* w, int status);
237
226
@@ -260,10 +249,7 @@ class StreamBase : public StreamResource {
260
249
virtual bool IsIPCPipe();
261
250
virtual int GetFD();
262
251
263
- void CallJSOnreadMethod(
264
- ssize_t nread,
265
- v8::Local<v8::Object> buf,
266
- v8::Local<v8::Object> handle = v8::Local<v8::Object>());
252
+ void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
267
253
268
254
// These are called by the respective {Write,Shutdown}Wrap class.
269
255
virtual void AfterShutdown(ShutdownWrap* req, int status);
src/stream_wrap.cc
@@ -93,7 +93,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider),
StreamBase(env),
stream_(stream) {
- PushStreamListener(this);
97
98
99
@@ -146,7 +145,13 @@ bool LibuvStreamWrap::IsIPCPipe() {
146
145
147
148
int LibuvStreamWrap::ReadStart() {
149
- return uv_read_start(stream(), OnAlloc, OnRead);
+ return uv_read_start(stream(), [](uv_handle_t* handle,
+ size_t suggested_size,
+ uv_buf_t* buf) {
+ static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
+ }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+ static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
+ });
@@ -155,16 +160,11 @@ int LibuvStreamWrap::ReadStop() {
-void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
- LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
- HandleScope scope(wrap->env()->isolate());
- Context::Scope context_scope(wrap->env()->context());
- CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
+void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
+ HandleScope scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
167
- *buf = wrap->EmitAlloc(suggested_size);
+ *buf = EmitAlloc(suggested_size);
168
169
170
@@ -190,64 +190,47 @@ static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
190
191
192
193
-void LibuvStreamWrap::OnStreamRead(ssize_t nread,
194
195
196
- HandleScope handle_scope(env()->isolate());
+void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
197
Context::Scope context_scope(env()->context());
198
199
- if (nread <= 0) {
200
- free(buf.base);
201
- if (nread < 0)
202
- CallJSOnreadMethod(nread, Local<Object>());
203
- return;
204
- }
205
206
- CHECK_LE(static_cast<size_t>(nread), buf.len);
207
208
- Local<Object> pending_obj;
209
210
- if (pending == UV_TCP) {
211
- pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
212
- } else if (pending == UV_NAMED_PIPE) {
213
- pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
214
- } else if (pending == UV_UDP) {
- pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
- } else {
- Local<Object> obj = Buffer::New(env(), buf.base, nread).ToLocalChecked();
- CallJSOnreadMethod(nread, obj, pending_obj);
-void LibuvStreamWrap::OnRead(uv_stream_t* handle,
227
- const uv_buf_t* buf) {
228
uv_handle_type type = UV_UNKNOWN_HANDLE;
- if (wrap->is_named_pipe_ipc() &&
- uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
- type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
+ if (is_named_pipe_ipc() &&
+ uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
+ type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
238
// We should not be getting this callback if someone as already called
239
// uv_close() on the handle.
240
- CHECK_EQ(wrap->persistent().IsEmpty(), false);
+ CHECK_EQ(persistent().IsEmpty(), false);
241
242
if (nread > 0) {
243
- if (wrap->is_tcp()) {
+ if (is_tcp()) {
244
NODE_COUNT_NET_BYTES_RECV(nread);
245
- } else if (wrap->is_named_pipe()) {
+ } else if (is_named_pipe()) {
246
NODE_COUNT_PIPE_BYTES_RECV(nread);
247
+
+ Local<Object> pending_obj;
+ if (type == UV_TCP) {
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
+ } else if (type == UV_NAMED_PIPE) {
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
+ } else if (type == UV_UDP) {
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
+ } else {
+ CHECK_EQ(type, UV_UNKNOWN_HANDLE);
+ }
+ if (!pending_obj.IsEmpty()) {
+ object()->Set(env()->context(),
+ env()->pending_handle_string(),
+ pending_obj).FromJust();
248
- wrap->EmitRead(nread, *buf, type);
+ EmitRead(nread, *buf);
@@ -373,11 +356,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
373
356
req_wrap->Done(status);
374
357
375
358
376
377
-void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
378
- StreamBase::AfterWrite(w, status);
379
380
381
359
} // namespace node
382
360
383
361
NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap,
src/stream_wrap.h
@@ -33,9 +33,7 @@
namespace node {
-class LibuvStreamWrap : public HandleWrap,
- public StreamListener,
- public StreamBase {
+class LibuvStreamWrap : public HandleWrap, public StreamBase {
public:
static void Initialize(v8::Local<v8::Object> target,
v8::Local<v8::Value> unused,
@@ -93,30 +91,12 @@ class LibuvStreamWrap : public HandleWrap,
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
// Callbacks for libuv
- static void OnAlloc(uv_handle_t* handle,
- uv_buf_t* buf);
+ void OnUvAlloc(size_t suggested_size, uv_buf_t* buf);
+ void OnUvRead(ssize_t nread, const uv_buf_t* buf);
100
- static void OnRead(uv_stream_t* handle,
101
102
- const uv_buf_t* buf);
103
static void AfterUvWrite(uv_write_t* req, int status);
104
static void AfterUvShutdown(uv_shutdown_t* req, int status);
105
106
- // Resource interface implementation
107
- void OnStreamRead(ssize_t nread,
108
- const uv_buf_t& buf) override {
109
- CHECK(0 && "must not be called");
110
111
112
113
- uv_handle_type pending) override;
114
- void OnStreamAfterWrite(WriteWrap* w, int status) override {
115
- previous_listener_->OnStreamAfterWrite(w, status);
116
117
118
- void AfterWrite(WriteWrap* req_wrap, int status) override;
119
120
uv_stream_t* const stream_;
121
122