8000 stream: add drop and take · nodejs/node@b59200d · GitHub
[go: up one dir, main page]

Skip to content

Commit b59200d

Browse files
committed
stream: add drop and take
1 parent ca48949 commit b59200d

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

lib/internal/streams/operators.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,45 @@ async function* flatMap(fn, options) {
232232
}
233233
}
234234

235+
async function* drop(number, options) {
236+
validateInteger(number, 'number', 0);
237+
if (options?.signal?.aborted) {
238+
throw new AbortError();
239+
}
240+
for await (const val of this) {
241+
if (options?.signal?.aborted) {
242+
throw new AbortError();
243+
}
244+
if (number-- <= 0) {
245+
yield await val;
246+
}
247+
}
248+
}
249+
250+
251+
async function* take(number, options) {
252+
validateInteger(number, 'number', 0);
253+
if (options?.signal?.aborted) {
254+
throw new AbortError();
255+
}
256+
for await (const val of this) {
257+
if (options?.signal?.aborted) {
258+
throw new AbortError();
259+
}
260+
if (number-- > 0) {
261+
yield val;
262+
} else {
263+
return;
264+
}
265+
}
266+
}
267+
235268
module.exports.streamReturningOperators = {
269+
drop,
236270
filter,
237271
flatMap,
238272
map,
273+
take,
239274
};
240275

241276
module.exports.promiseReturningOperators = {
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const { deepStrictEqual, rejects } = require('assert');
8+
9+
const { from } = Readable;
10+
{
11+
// Synchronous streams
12+
(async () => {
13+
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
14+
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
15+
deepStrictEqual(await from([]).drop(2).toArray(), []);
16+
deepStrictEqual(await from([]).take(1).toArray(), []);
17+
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
18+
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
19+
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
20+
})().then(common.mustCall());
21+
// Asynchronous streams
22+
(async () => {
23+
const fromAsync = (...args) => from(...args).map(async (x) => x);
24+
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
25+
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
26+
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
27+
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
28+
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
29+
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
30+
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
31+
})().then(common.mustCall());
32+
// Infinite streams
33+
// Asynchronous streams
34+
(async () => {
35+
const naturals = () => from(async function*() {
36+
let i = 1;
37+
while (true) {
38+
yield i++;
39+
}
40+
}());
41+
deepStrictEqual(await naturals().take(1).toArray(), [1]);
42+
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
43+
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
44+
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
45+
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
46+
})().then(common.mustCall());
47+
}
48+
49+
{
50+
// Support for AbortSignal
51+
const ac = new AbortController();
52+
rejects(
53+
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
54+
name: 'AbortError',
55+
}).then(common.mustCall());
56+
rejects(
57+
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
58+
name: 'AbortError',
59+
}).then(common.mustCall());
60+
ac.abort();
61+
}
62+
63+
{
64+
// Support for AbortSignal, already aborted
65+
const signal = AbortSignal.abort();
66+
rejects(
67+
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
68+
name: 'AbortError',
69+
}).then(common.mustCall());
70+
}
71+
72+
{
73+
// Error cases
74+
// TODO(benjamingr) these do not align with the spec and need to all
75+
// unfortunately coerce to `0`. Negative values should be checked to throw
76+
// a RangeError, issue opened upstream at
77+
// https://github.com/tc39/proposal-iterator-helpers/issues/169
78+
const invalidArgs = [
79+
'5',
80+
undefined,
81+
null,
82+
{},
83+
[],
84+
from([1, 2, 3]),
85+
Promise.resolve(5),
86+
];
87+
88+
for (const example of invalidArgs) {
89+
rejects(
90+
from([]).take(example).toArray(),
91+
/ERR_INVALID_ARG_TYPE/
92+
).then(common.mustCall());
93+
}
94+
}

0 commit comments

Comments
 (0)
0