1
- import {
2
- DestroyRef ,
3
- ErrorHandler ,
4
- inject ,
5
- Injectable ,
6
- Optional ,
7
- } from '@angular/core' ;
1
+ import { DestroyRef , ErrorHandler , inject , Injectable } from '@angular/core' ;
2
+ import { takeUntilDestroyed } from '@angular/core/rxjs-interop' ;
8
3
import {
9
4
EMPTY ,
10
5
from ,
@@ -18,14 +13,14 @@ import {
18
13
import {
19
14
catchError ,
20
15
filter ,
21
- mapTo ,
16
+ map ,
22
17
mergeAll ,
23
18
share ,
24
19
takeUntil ,
25
20
tap ,
26
21
} from 'rxjs/operators' ;
27
22
import { DestroyProp , OnDestroy$ } from './model' ;
28
- import { toHook , untilDestroyed } from './utils' ;
23
+ import { toHook } from './utils' ;
29
24
30
25
/**
31
26
* @deprecated - use rxEffects instead
@@ -72,17 +67,9 @@ import { toHook, untilDestroyed } from './utils';
72
67
*/
73
68
@Injectable ( )
74
69
export class RxEffects implements OnDestroy$ {
75
- constructor (
76
- @Optional ( )
77
- private readonly errorHandler : ErrorHandler | null ,
78
- ) {
79
- inject ( DestroyRef ) . onDestroy ( ( ) => {
80
- this . _hooks$ . next ( { destroy : true } ) ;
81
- this . subscription . unsubscribe ( ) ;
82
- } ) ;
83
- }
84
-
85
70
private static nextId = 0 ;
71
+ private readonly destroyRef = inject ( DestroyRef ) ;
72
+ private readonly errorHandler = inject ( ErrorHandler , { optional : true } ) ;
86
73
readonly _hooks$ = new Subject < DestroyProp > ( ) ;
87
74
private readonly observables$ = new Subject < Observable < unknown > > ( ) ;
88
75
// we have to use publish here to make it hot (composition happens without subscriber)
@@ -91,6 +78,13 @@ export class RxEffects implements OnDestroy$ {
91
78
onDestroy$ : Observable < boolean > = this . _hooks$ . pipe ( toHook ( 'destroy' ) ) ;
92
79
private readonly destroyers : Record < number , Subject < void > > = { } ;
93
80
81
+ constructor ( ) {
82
+ this . destroyRef . onDestroy ( ( ) => {
83
+ this . _hooks$ . next ( { destroy : true } ) ;
84
+ this . subscription . unsubscribe ( ) ;
85
+ } ) ;
86
+ }
87
+
94
88
/**
95
89
* Performs a side-effect whenever a source observable emits, and handles its subscription.
96
90
*
@@ -171,7 +165,10 @@ export class RxEffects implements OnDestroy$ {
171
165
}
172
166
const effectId = RxEffects . nextId ++ ;
173
167
const destroy$ = ( this . destroyers [ effectId ] = new Subject < void > ( ) ) ;
174
- const applyBehavior = pipe ( mapTo ( effectId ) , takeUntil ( destroy$ ) ) ;
168
+ const applyBehavior = pipe (
169
+ map ( ( ) => effectId ) ,
170
+ takeUntil ( destroy$ ) ,
171
+ ) ;
175
172
if ( fnOrObj != null ) {
176
173
this . observables$ . next (
177
174
from ( obsOrSub ) . pipe (
@@ -233,7 +230,7 @@ export class RxEffects implements OnDestroy$ {
233
230
untilEffect ( effectId : number ) {
234
231
return < V > ( source : Observable < V > ) =>
235
232
source . pipe (
236
- untilDestroyed ( this ) ,
233
+ takeUntilDestroyed ( this . destroyRef ) ,
237
234
takeUntil ( this . effects$ . pipe ( filter ( ( eId ) => eId === effectId ) ) ) ,
238
235
) ;
239
236
}
0 commit comments