32
32
use Psr \Http \Message \StreamInterface ;
33
33
use Psr \Http \Message \UriFactoryInterface ;
34
34
use Psr \Http \Message \UriInterface ;
35
+ use Symfony \Component \HttpClient \Internal \HttplugWaitLoop ;
35
36
use Symfony \Component \HttpClient \Response \HttplugPromise ;
36
- use Symfony \Component \HttpClient \Response \ResponseTrait ;
37
- use Symfony \Component \HttpClient \Response \StreamWrapper ;
38
37
use Symfony \Contracts \HttpClient \Exception \TransportExceptionInterface ;
39
38
use Symfony \Contracts \HttpClient \HttpClientInterface ;
40
39
use Symfony \Contracts \HttpClient \ResponseInterface ;
@@ -60,27 +59,27 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF
60
59
private $ client ;
61
60
private $ responseFactory ;
62
61
private $ streamFactory ;
63
- private $ promisePool = [] ;
64
- private $ pendingResponse ;
62
+ private $ promisePool ;
63
+ private $ waitLoop ;
65
64
66
65
public function __construct (HttpClientInterface $ client = null , ResponseFactoryInterface $ responseFactory = null , StreamFactoryInterface $ streamFactory = null )
67
66
{
68
67
$ this ->client = $ client ?? HttpClient::create ();
69
68
$ this ->responseFactory = $ responseFactory ;
70
69
$ this ->streamFactory = $ streamFactory ?? ($ responseFactory instanceof StreamFactoryInterface ? $ responseFactory : null );
71
- $ this ->promisePool = new \SplObjectStorage ();
70
+ $ this ->promisePool = \function_exists ( ' GuzzleHttp\Promise\queue ' ) ? new \SplObjectStorage () : null ;
72
71
73
- if (null !== $ this ->responseFactory && null !== $ this ->streamFactory ) {
74
- return ;
75
- }
72
+ if (null === $ this ->responseFactory || null === $ this ->streamFactory ) {
73
+ if (!class_exists (Psr17Factory::class)) {
74
+ throw new \LogicException ('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7". ' );
75
+ }
76
76
77
- if (!class_exists (Psr17Factory::class)) {
78
- throw new \LogicException ('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7". ' );
77
+ $ psr17Factory = new Psr17Factory ();
78
+ $ this ->responseFactory = $ this ->responseFactory ?? $ psr17Factory ;
79
+ $ this ->streamFactory = $ this ->streamFactory ?? $ psr17Factory ;
79
80
}
80
81
81
- $ psr17Factory = new Psr17Factory ();
82
- $ this ->responseFactory = $ this ->responseFactory ?? $ psr17Factory ;
83
- $ this ->streamFactory = $ this ->streamFactory ?? $ psr17Factory ;
82
+ $ this ->waitLoop = new HttplugWaitLoop ($ this ->client , $ this ->promisePool , $ this ->responseFactory , $ this ->streamFactory );
84
83
}
85
84
86
85
/**
@@ -89,7 +88,7 @@ public function __construct(HttpClientInterface $client = null, ResponseFactoryI
89
88
public function sendRequest (RequestInterface $ request ): Psr7ResponseInterface
90
89
{
91
90
try {
92
- return $ this ->createPsr7Response ($ this ->sendPsr7Request ($ request ));
91
+ return $ this ->waitLoop -> createPsr7Response ($ this ->sendPsr7Request ($ request ));
93
92
} catch (TransportExceptionInterface $ e ) {
94
93
throw new NetworkException ($ e ->getMessage (), $ request , $ e );
95
94
}
@@ -102,7 +101,7 @@ public function sendRequest(RequestInterface $request): Psr7ResponseInterface
102
101
*/
103
102
public function sendAsyncRequest (RequestInterface $ request ): Promise
104
103
{
105
- if (!class_exists (GuzzlePromise::class) ) {
104
+ if (!$ promisePool = $ this -> promisePool ) {
106
105
throw new \LogicException (sprintf ('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises". ' , __METHOD__ ));
107
106
}
108
107
@@ -112,88 +111,30 @@ public function sendAsyncRequest(RequestInterface $request): Promise
112
111
return new RejectedPromise ($ e );
113
112
}
114
113
115
- $ cancel = function () use ($ response ) {
116
- $ response ->cancel ();
117
- unset($ this ->promisePool [$ response ]);
118
- };
114
+ $ waitLoop = $ this ->waitLoop ;
119
115
120
- $ promise = new GuzzlePromise (function () use ($ response ) {
121
- $ this ->pendingResponse = $ response ;
122
- $ this ->wait ();
123
- }, $ cancel );
116
+ $ promise = new GuzzlePromise (static function () use ($ response , $ waitLoop ) {
117
+ $ waitLoop ->wait ($ response );
118
+ }, static function () use ($ response , $ promisePool ) {
119
+ $ response ->cancel ();
120
+ unset($ promisePool [$ response ]);
121
+ });
124
122
125
- $ this -> promisePool [$ response ] = [$ request , $ promise ];
123
+ $ promisePool [$ response ] = [$ request , $ promise ];
126
124
127
- return new HttplugPromise ($ promise, $ cancel );
125
+ return new HttplugPromise ($ promise );
128
126
}
129
127
130
128
/**
131
- * Resolve pending promises that complete before the timeouts are reached.
129
+ * Resolves pending promises that complete before the timeouts are reached.
132
130
*
133
131
* When $maxDuration is null and $idleTimeout is reached, promises are rejected.
134
132
*
135
133
* @return int The number of remaining pending promises
136
134
*/
137
135
public function wait (float $ maxDuration = null , float $ idleTimeout = null ): int
138
136
{
139
- $ pendingResponse = $ this ->pendingResponse ;
140
- $ this ->pendingResponse = null ;
141
-
142
- if (null !== $ maxDuration ) {
143
- $ startTime = microtime (true );
144
- $ idleTimeout = max (0.0 , min ($ maxDuration / 5 , $ idleTimeout ?? $ maxDuration ));
145
- $ remainingDuration = $ maxDuration ;
146
- }
147
-
148
- do {
149
- foreach ($ this ->client ->stream ($ this ->promisePool , $ idleTimeout ) as $ response => $ chunk ) {
150
- try {
151
- if (null !== $ maxDuration && $ chunk ->isTimeout ()) {
152
- goto check_duration;
153
- }
154
-
155
- if ($ chunk ->isFirst ()) {
156
- // Deactivate throwing on 3/4/5xx
157
- $ response ->getStatusCode ();
158
- }
159
-
160
- if (!$ chunk ->isLast ()) {
161
- goto check_duration;
162
- }
163
-
164
- if ([$ request , $ promise ] = $ this ->promisePool [$ response ] ?? null ) {
165
- unset($ this ->promisePool [$ response ]);
166
- $ promise ->resolve ($ this ->createPsr7Response ($ response , true ));
167
- }
168
- } catch (\Exception $ e ) {
169
- if ([$ request , $ promise ] = $ this ->promisePool [$ response ] ?? null ) {
170
- unset($ this ->promisePool [$ response ]);
171
-
172
- if ($ e instanceof TransportExceptionInterface) {
173
- $ e = new NetworkException ($ e ->getMessage (), $ request , $ e );
174
- }
175
-
176
- $ promise ->reject ($ e );
177
- }
178
- }
179
-
180
- if ($ pendingResponse === $ response ) {
181
- return \count ($ this ->promisePool );
182
- }
183
-
184
- check_duration:
185
- if (null !== $ maxDuration && $ idleTimeout && $ idleTimeout > $ remainingDuration = max (0.0 , $ maxDuration - microtime (true ) + $ startTime )) {
186
- $ idleTimeout = $ remainingDuration / 5 ;
187
- break ;
188
- }
189
- }
190
-
191
- if (!$ count = \count ($ this ->promisePool )) {
192
- return 0 ;
193
- }
194
- } while (null !== $ maxDuration && 0 < $ remainingDuration );
195
-
196
- return $ count ;
137
+ return $ this ->waitLoop ->wait (null , $ maxDuration , $ idleTimeout );
197
138
}
198
139
199
140
/**
@@ -265,6 +206,11 @@ public function createUri($uri): UriInterface
265
206
return new Uri ($ uri );
266
207
}
267
208
209
+ public function __destruct ()
210
+ {
211
+ $ this ->wait ();
212
+ }
213
+
268
214
private function sendPsr7Request (RequestInterface $ request , bool $ buffer = null ): ResponseInterface
269
215
{
270
216
try {
@@ -286,29 +232,4 @@ private function sendPsr7Request(RequestInterface $request, bool $buffer = null)
286
232
throw new NetworkException ($ e ->getMessage (), $ request , $ e );
287
233
}
288
234
}
289
-
290
- private function createPsr7Response (ResponseInterface $ response , bool $ buffer = false ): Psr7ResponseInterface
291
- {
292
- $ psrResponse = $ this ->responseFactory ->createResponse ($ response ->getStatusCode ());
293
-
294
- foreach ($ response ->getHeaders (false ) as $ name => $ values ) {
295
- foreach ($ values as $ value ) {
296
- $ psrResponse = $ psrResponse ->withAddedHeader ($ name , $ value );
297
- }
298
- }
299
-
300
- if (isset (class_uses ($ response )[ResponseTrait::class])) {
301
- $ body = $ this ->streamFactory ->createStreamFromResource ($ response ->toStream (false ));
302
- } elseif (!$ buffer ) {
303
- $ body = $ this ->streamFactory ->createStreamFromResource (StreamWrapper::createResource ($ response , $ this ->client ));
304
- } else {
305
- $ body = $ this ->streamFactory ->createStream ($ response ->getContent (false ));
306
- }
307
-
308
- if ($ body ->isSeekable ()) {
F438
td>309
- $ body ->seek (0 );
310
- }
311
-
312
- return $ psrResponse ->withBody ($ body );
313
- }
314
235
}
0 commit comments