8000 feat(mergeScan): Add index to the accumulator function (#4458) · ReactiveX/rxjs@f5e143d · GitHub
[go: up one dir, main page]

Skip to content

Commit f5e143d

Browse files
martinsikbenlesh
authored andcommitted
feat(mergeScan): Add index to the accumulator function (#4458)
* feat(mergeScan): add index to the accumulator function Closes #4441 * test(mergeScan): use marbles instead of "of()" * test(mergeScan): update spec, remove unnecessary dtest * readd index lost in merge.
1 parent 0ee4fbe commit f5e143d

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

spec/operators/mergeScan-spec.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,4 +419,16 @@ describe('mergeScan', () => {
419419
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
420420
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
421421
});
422+
423+
it('should pass current index to accumulator', () => {
424+
const recorded: number[] = [];
425+
const e1 = of('a', 'b', 'c', 'd');
426+
427+
e1.pipe(mergeScan((acc, x, index) => {
428+
recorded.push(index);
429+
return of(x);
430+
}, 0)).subscribe();
431+
432+
expect(recorded).to.deep.equal([0, 1, 2, 3]);
433+
});
422434
});

src/internal/operators/mergeScan.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ import { ObservableInput, OperatorFunction } from '../types';
4646
* @method mergeScan
4747
* @owner Observable
4848
*/
49-
export function mergeScan<T, R>(accumulator: (acc: R, value: T) => ObservableInput<R>,
49+
export function mergeScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
5050
seed: R,
5151
concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> {
5252
return (source: Observable<T>) => source.lift(new MergeScanOperator(accumulator, seed, concurrent));
5353
}
5454

5555
export class MergeScanOperator<T, R> implements Operator<T, R> {
56-
constructor(private accumulator: (acc: R, value: T) => ObservableInput<R>,
56+
constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
5757
private seed: R,
5858
private concurrent: number) {
5959
}
@@ -78,7 +78,7 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
7878
protected index: number = 0;
7979

8080
constructor(destination: Subscriber<R>,
81-
private accumulator: (acc: R, value: T) => ObservableInput<R>,
81+
private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
8282
private acc: R,
8383
private concurrent: number) {
8484
super(destination);
@@ -91,7 +91,7 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
9191
let ish;
9292
try {
9393
const { accumulator } = this;
94-
ish = accumulator(this.acc, value);
94+
ish = accumulator(this.acc, value, index);
9595
} catch (e) {
9696
return destination.error(e);
9797
}

0 commit comments

Comments
 (0)
0