8000
We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 205c018 commit 88a4819Copy full SHA for 88a4819
doc/api/stream.md
@@ -2021,6 +2021,50 @@ for await (const result of concatResult) {
2021
}
2022
```
2023
2024
+### `readable.drop(limit[, options])`
2025
+
2026
+<!-- YAML
2027
+added: REPLACEME
2028
+-->
2029
2030
+> Stability: 1 - Experimental
2031
2032
+* `limit` {number} the number of chunks to drop from the readable.
2033
+* `options` {Object}
2034
+ * `signal` {AbortSignal} allows destroying the stream if the signal is
2035
+ aborted.
2036
+* Returns: {Readable} a stream with `limit` chunks dropped.
2037
2038
+This method returns a new stream with the first `limit` chunks dropped.
2039
2040
+```mjs
2041
+import { Readable } from 'stream';
2042
2043
+await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
2044
+```
2045
2046
+### `readable.take(limit[, options])`
2047
2048
2049
2050
2051
2052
2053
2054
+* `limit` {number} the number of chunks to take from the readable.
2055
2056
2057
2058
+* Returns: {Readable} a stream with `limit` chunks taken.
2059
2060
+This method returns a new stream with the first `limit` chunks.
2061
2062
2063
2064
2065
+await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
2066
2067
2068
### Duplex and transform streams
2069
2070
#### Class: `stream.Duplex`
lib/internal/streams/operators.js
@@ -6,6 +6,7 @@ const { Buffer } = require('buffer');
6
const {
7
codes: {
8
ERR_INVALID_ARG_TYPE,
9
+ ERR_OUT_OF_RANGE,
10
},
11
AbortError,
12
} = require('internal/errors');
@@ -15,6 +16,8 @@ const { kWeakHandler } = require('internal/event_target');
15
16
17
ArrayPrototypePush,
18
MathFloor,
19
+ Number,
20
+ NumberIsNaN,
21
Promise,
22
PromiseReject,
23
PromisePrototypeCatch,
@@ -236,10 +239,62 @@ async function* flatMap(fn, options) {
236
239
237
240
238
241
242
+function toIntegerOrInfinity(number) {
243
+ // We coerce here to align with the spec
244
+ // https://github.com/tc39/proposal-iterator-helpers/issues/169
245
+ number = Number(number);
246
+ if (NumberIsNaN(number)) {
247
+ return 0;
248
+ }
249
+ if (number < 0) {
250
+ throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
251
252
+ return number;
253
+}
254
255
+function drop(number, options) {
256
+ number = toIntegerOrInfinity(number);
257
+ return async function* drop() {
258
+ if (options?.signal?.aborted) {
259
+ throw new AbortError();
260
261
+ for await (const val of this) {
262
263
264
265
+ if (number-- <= 0) {
266
+ yield val;
267
268
269
+ }.call(this);
270
271
272
273
+function take(number, options) {
274
275
+ return async function* take() {
276
277
278
279
280
281
282
283
+ if (number-- > 0) {
284
285
+ } else {
286
+ return;
287
288
289
290
291
292
module.exports.streamReturningOperators = {
293
+ drop,
294
filter,
295
flatMap,
296
map,
297
+ take,
298
};
299
300
module.exports.promiseReturningOperators = {
test/parallel/test-stream-drop-take.js
@@ -0,0 +1,96 @@
1
+'use strict';
2
3
+const common = require('../common');
4
+const {
5
+ Readable,
+} = require('stream');
+const { deepStrictEqual, rejects, throws } = require('assert');
+const { from } = Readable;
+const fromAsync = (...args) => from(...args).map(async (x) => x);
13
+const naturals = () => from(async function*() {
14
+ let i = 1;
+ while (true) {
+ yield i++;
+}());
+{
+ // Synchronous streams
+ (async () => {
+ deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
24
+ deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
25
+ deepStrictEqual(await from([]).drop(2).toArray(), []);
26
+ deepStrictEqual(await from([]).take(1).toArray(), []);
27
+ deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
28
+ deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
29
+ deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
30
+ })().then(common.mustCall());
31
+ // Asynchronous streams
32
33
+ deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
34
+ deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
35
+ deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
36
+ deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
37
+ deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
38
+ deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
39
+ deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
40
41
+ // Infinite streams
42
43
44
+ deepStrictEqual(await naturals().take(1).toArray(), [1]);
45
+ deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
46
+ const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
47
+ deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
48
+ deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
49
50
51
52
53
+ // Coercion
54
55
+ // The spec made me do this ^^
56
+ deepStrictEqual(await naturals().take('cat').toArray(), []);
57
+ deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
58
+ deepStrictEqual(await naturals().take(true).toArray(), [1]);
59
60
61
62
63
+ // Support for AbortSignal
64
+ const ac = new AbortController();
65
+ rejects(
66
+ Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
67
+ name: 'AbortError',
68
+ }).then(common.mustCall());
69
70
+ Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
71
72
73
+ ac.abort();
74
75
76
77
+ // Support for AbortSignal, already aborted
78
+ const signal = AbortSignal.abort();
79
80
+ Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
81
82
83
84
85
86
+ // Error cases
87
+ const invalidArgs = [
88
+ -1,
89
+ -Infinity,
90
+ -40,
91
+ ];
92
93
+ for (const example of invalidArgs) {
94
+ throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
95
96