8000 [HttpClient] resolve promise chains on HttplugClient::wait() · symfony/symfony@ea0be07 · GitHub
[go: up one dir, main page]

Skip to content

Commit ea0be07

Browse files
[HttpClient] resolve promise chains on HttplugClient::wait()
1 parent 6e7f325 commit ea0be07

File tree

4 files changed

+192
-124
lines changed

4 files changed

+192
-124
lines changed

src/Symfony/Component/HttpClient/HttplugClient.php

Lines changed: 30 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@
3232
use Psr\Http\Message\StreamInterface;
3333
use Psr\Http\Message\UriFactoryInterface;
3434
use Psr\Http\Message\UriInterface;
35+
use Symfony\Component\HttpClient\Internal\HttplugWaitLoop;
3536
use Symfony\Component\HttpClient\Response\HttplugPromise;
36-
use Symfony\Component\HttpClient\Response\ResponseTrait;
37-
use Symfony\Component\HttpClient\Response\StreamWrapper;
3837
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
3938
use Symfony\Contracts\HttpClient\HttpClientInterface;
4039
use Symfony\Contracts\HttpClient\ResponseInterface;
@@ -60,27 +59,27 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF
6059
private $client;
6160
private $responseFactory;
6261
private $streamFactory;
63-
private $promisePool = [];
64-
private $pendingResponse;
62+
private $promisePool;
63+
private $waitLoop;
6564

6665
public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null)
6766
{
6867
$this->client = $client ?? HttpClient::create();
6968
$this->responseFactory = $responseFactory;
7069
$this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null);
71-
$this->promisePool = new \SplObjectStorage();
70+
$this->promisePool = \function_exists('GuzzleHttp\Promise\queue') ? new \SplObjectStorage() : null;
7271

73-
if (null !== $this->responseFactory && null !== $this->streamFactory) {
74-
return;
75-
}
72+
if (null === $this->responseFactory || null === $this->streamFactory) {
73+
if (!class_exists(Psr17Factory::class)) {
74+
throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
75+
}
7676

77-
if (!class_exists(Psr17Factory::class)) {
78-
throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
77+
$psr17Factory = new Psr17Factory();
78+
$this->responseFactory = $this->responseFactory ?? $psr17Factory;
79+
$this->streamFactory = $this->streamFactory ?? $psr17Factory;
7980
}
8081

81-
$psr17Factory = new Psr17Factory();
82-
$this->responseFactory = $this->responseFactory ?? $psr17Factory;
83-
$this->streamFactory = $this->streamFactory ?? $psr17Factory;
82+
$this->waitLoop = new HttplugWaitLoop($this->client, $this->promisePool, $this->responseFactory, $this->streamFactory);
8483
}
8584

8685
/**
@@ -89,7 +88,7 @@ public function __construct(HttpClientInterface $client = null, ResponseFactoryI
8988
public function sendRequest(RequestInterface $request): Psr7ResponseInterface
9089
{
9190
try {
92-
return $this->createPsr7Response($this->sendPsr7Request($request));
91+
return $this->waitLoop->createPsr7Response($this->sendPsr7Request($request));
9392
} catch (TransportExceptionInterface $e) {
9493
throw new NetworkException($e->getMessage(), $request, $e);
9594
}
@@ -102,7 +101,7 @@ public function sendRequest(RequestInterface $request): Psr7ResponseInterface
102101
*/
103102
public function sendAsyncRequest(RequestInterface $request): Promise
104103
{
105-
if (!class_exists(GuzzlePromise::class)) {
104+
if (!$promisePool = $this->promisePool) {
106105
throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__));
107106
}
108107

@@ -112,88 +111,30 @@ public function sendAsyncRequest(RequestInterface $request): Promise
112111
return new RejectedPromise($e);
113112
}
114113

115-
$cancel = function () use ($response) {
116-
$response->cancel();
117-
unset($this->promisePool[$response]);
118-
};
114+
$waitLoop = $this->waitLoop;
119115

120-
$promise = new GuzzlePromise(function () use ($response) {
121-
$this->pendingResponse = $response;
122-
$this->wait();
123-
}, $cancel);
116+
$promise = new GuzzlePromise(static function () use ($response, $waitLoop) {
117+
$waitLoop->wait($response);
118+
}, static function () use ($response, $promisePool) {
119+
$response->cancel();
120+
unset($promisePool[$response]);
121+
});
124122

125-
$this->promisePool[$response] = [$request, $promise];
123+
$promisePool[$response] = [$request, $promise];
126124

127-
return new HttplugPromise($promise, $cancel);
125+
return new HttplugPromise($promise);
128126
}
129127

130128
/**
131-
* Resolve pending promises that complete before the timeouts are reached.
129+
* Resolves pending promises that complete before the timeouts are reached.
132130
*
133131
* When $maxDuration is null and $idleTimeout is reached, promises are rejected.
134132
*
135133
* @return int The number of remaining pending promises
136134
*/
137135
public function wait(float $maxDuration = null, float $idleTimeout = null): int
138136
{
139-
$pendingResponse = $this->pendingResponse;
140-
$this->pendingResponse = null;
141-
142-
if (null !== $maxDuration) {
143-
$startTime = microtime(true);
144-
$idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration));
145-
$remainingDuration = $maxDuration;
146-
}
147-
148-
do {
149-
foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) {
150-
try {
151-
if (null !== $maxDuration && $chunk->isTimeout()) {
152-
goto check_duration;
153-
}
154-
155-
if ($chunk->isFirst()) {
156-
// Deactivate throwing on 3/4/5xx
157-
$response->getStatusCode();
158-
}
159-
160-
if (!$chunk->isLast()) {
161-
goto check_duration;
162-
}
163-
164-
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
165-
unset($this->promisePool[$response]);
166-
$promise->resolve($this->createPsr7Response($response, true));
167-
}
168-
} catch (\Exception $e) {
169-
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
170-
unset($this->promisePool[$response]);
171-
172-
if ($e instanceof TransportExceptionInterface) {
173-
$e = new NetworkException($e->getMessage(), $request, $e);
174-
}
175-
176-
$promise->reject($e);
177-
}
178-
}
179-
180-
if ($pendingResponse === $response) {
181-
return \count($this->promisePool);
182-
}
183-
184-
check_duration:
185-
if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) {
186-
$idleTimeout = $remainingDuration / 5;
187-
break;
188-
}
189-
}
190-
191-
if (!$count = \count($this->promisePool)) {
192-
return 0;
193-
}
194-
} while (null !== $maxDuration && 0 < $remainingDuration);
195-
196-
return $count;
137+
return $this->waitLoop->wait(null, $maxDuration, $idleTimeout);
197138
}
198139

199140
/**
@@ -265,6 +206,11 @@ public function createUri($uri): UriInterface
265206
return new Uri($uri);
266207
}
267208

209+
public function __destruct()
210+
{
211+
$this->wait();
212+
}
213+
268214
private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface
269215
{
270216
try {
@@ -286,29 +232,4 @@ private function sendPsr7Request(RequestInterface $request, bool $buffer = null)
286232
throw new NetworkException($e->getMessage(), $request, $e);
287233
}
288234
}
289-
290-
private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
291-
{
292-
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
293-
294-
foreach ($response->getHeaders(false) as $name => $values) {
295-
foreach ($values as $value) {
296-
$psrResponse = $psrResponse->withAddedHeader($name, $value);
297-
}
298-
}
299-
300-
if (isset(class_uses($response)[ResponseTrait::class])) {
301-
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
302-
} elseif (!$buffer) {
303-
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
304-
} else {
305-
$body = $this->streamFactory->createStream($response->getContent(false));
306-
}
307-
308-
if ($body->isSeekable()) {
309-
$body->seek(0);
310-
}
311-
312-
return $psrResponse->withBody($body);
313-
}
314235
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpClient\Internal;
13+
14+
use Http\Client\Exception\NetworkException;
15+
use Psr\Http\Message\ResponseFactoryInterface;
16+
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
17+
use Psr\Http\Message\StreamFactoryInterface;
18+
use Symfony\Component\HttpClient\Response\ResponseTrait;
19+
use Symfony\Component\HttpClient\Response\StreamWrapper;
20+
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
21+
use Symfony\Contracts\HttpClient\HttpClientInterface;
22+
use Symfony\Contracts\HttpClient\ResponseInterface;
23+
24+
/**
25+
* @author Nicolas Grekas <p@tchwork.com>
26+
*
27+
* @internal
28+
*/
29+
final class HttplugWaitLoop
30+
{
31+
private $client;
32+
private $promisePool;
33+
private $responseFactory;
34+
private $streamFactory;
35+
36+
public function __construct(HttpClientInterface $client, ?\SplObjectStorage $promisePool, ResponseFactoryInterface $responseFactory, StreamFactoryInterface $streamFactory)
37+
{
38+
$this->client = $client;
39+
$this->promisePool = $promisePool;
40+
$this->responseFactory = $responseFactory;
41+
$this->streamFactory = $streamFactory;
42+
}
43+
44+
public function wait(?ResponseInterface $pendingResponse, float $maxDuration = null, float $idleTimeout = null): int
45+
{
46+
if (!$this->promisePool) {
47+
return 0;
48+
}
49+
50+
$guzzleQueue = \GuzzleHttp\Promise\queue();
51+
52+
if (0.0 === $remainingDuration = $maxDuration) {
53+
$idleTimeout = 0.0;
54+
} elseif (null !== $maxDuration) {
55+
$startTime = microtime(true);
56+
$idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration));
57+
}
58+
59+
do {
60+
foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) {
61+
try {
62+
if (null !== $maxDuration && $chunk->isTimeout()) {
63+
goto check_duration;
64+
}
65+
66+
if ($chunk->isFirst()) {
67+
// Deactivate throwing on 3/4/5xx
68+
$response->getStatusCode();
69+
}
70+
71+
if (!$chunk->isLast()) {
72+
goto check_duration;
73+
}
74+
75+
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
76+
unset($this->promisePool[$response]);
77+
$promise->resolve($this->createPsr7Response($response, true));
78+
}
79+
} catch (\Exception $e) {
80+
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
81+
unset($this->promisePool[$response]);
82+
83+
if ($e instanceof TransportExceptionInterface) {
84+
$e = new NetworkException($e->getMessage(), $request, $e);
85+
}
86+
87+
$promise->reject($e);
88+
}
89+
}
90+
91+
$guzzleQueue->run();
92+
93+
if ($pendingResponse === $response) {
94+
return $this->promisePool->count();
95+
}
96+
97+
check_duration:
98+
if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) {
99+
$idleTimeout = $remainingDuration / 5;
100+
break;
101+
}
102+
}
103+
104+
if (!$count = $this->promisePool->count()) {
105+
return 0;
106+
}
107+
} while (null === $maxDuration || 0 < $remainingDuration);
108+
109+
return $count;
110+
}
111+
112+
public function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
113+
{
114+
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
115+
116+
foreach ($response->getHeaders(false) as $name => $values) {
117+
foreach ($values as $value) {
118+
$psrResponse = $psrResponse->withAddedHeader($name, $value);
119+
}
120+
}
121+
122+
if (isset(class_uses($response)[ResponseTrait::class])) {
123+
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
124+
} elseif (!$buffer) {
125+
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
126+
} else {
127+
$body = $this->streamFactory->createStream($response->getContent(false));
128+
}
129+
130+
if ($body->isSeekable()) {
131+
$body->seek(0);
132+
}
133+
134+
return $psrResponse->withBody($body);
135+
}
136+
}

src/Symfony/Component/HttpClient/Response/HttplugPromise.php

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
final class HttplugPromise implements HttplugPromiseInterface
2424
{
2525
private $promise;
26-
private $cancel;
2726

28-
public function __construct(GuzzlePromiseInterface $promise, callable $cancel = null)
27+
public function __construct(GuzzlePromiseInterface $promise)
2928
{
3029
$this->promise = $promise;
31-
$this->cancel = $cancel;
3230
}
3331

3432
public function then(callable $onFulfilled = null, callable $onRejected = null): self
@@ -58,16 +56,4 @@ public function wait($unwrap = true)
5856
{
5957
return $this->promise->wait($unwrap);
6058
}
61-
62-
public function __destruct()
63-
{
64-
if ($this->cancel) {
65-
($this->cancel)();
66-
}
67-
}
68-
69-
public function __wakeup()
70-
{
71-
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
72-
}
7359
}

0 commit comments

Comments
 (0)
0