8000 feature #33743 [HttpClient] Async HTTPlug client (Nyholm) · dontub/symfony@1998814 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1998814

Browse files
feature symfony#33743 [HttpClient] Async HTTPlug client (Nyholm)
This PR was squashed before being merged into the 4.4 branch (closes symfony#33743). Discussion ---------- [HttpClient] Async HTTPlug client | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | Fix symfony#33710, Fix symfony#32142 | License | MIT | Doc PR | symfony/symfony-docs#12389 This PR removes `HttplugClient`'s dependency on `Psr18Client`. It will also add an `HttplugPromise` to make sure we sure we respect the Httplug's `HttpAsyncClient` interface. It implements `HttpAsyncClient::sendAsyncRequest()` and provides two extensions: - `HttplugPromise::cancel()` allows cancelling a promise (and the underlying response) - `HttplugClient::wait()` allows to tick the promise pool, with configurable timeouts. Commits ------- 4fd593f [HttpClient] Async HTTPlug client
2 parents ce7c0b4 + 4fd593f commit 1998814

File tree

6 files changed

+357
-34
lines changed

6 files changed

+357
-34
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
"doctrine/orm": "~2.4,>=2.4.5",
109109
"doctrine/reflection": "~1.0",
110110
"doctrine/doctrine-bundle": "~1.4",
111+
"guzzlehttp/promises": "^1.3.1",
111112
"masterminds/html5": "^2.6",
112113
"monolog/monolog": "^1.25.1",
113114
"nyholm/psr7": "^1.0",

src/Symfony/Component/HttpClient/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ CHANGELOG
55
-----
66

77
* added `StreamWrapper`
8-
* added `HttplugClient`
8+
* added `HttplugClient` with support for sync and async requests
99
* added `max_duration` option
1010
* added support for NTLM authentication
1111
* added `$response->toStream()` to cast responses to regular PHP streams

src/Symfony/Component/HttpClient/HttplugClient.php

Lines changed: 221 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,74 +11,205 @@
1111

1212
namespace Symfony\Component\HttpClient;
1313

14+
use GuzzleHttp\Promise\Promise as GuzzlePromise;
1415
use Http\Client\Exception\NetworkException;
1516
use Http\Client\Exception\RequestException;
16-
use Http\Client\HttpClient;
17+
use Http\Client\HttpAsyncClient;
18+
use Http\Client\HttpClient as HttplugInterface;
1719
use Http\Message\RequestFactory;
1820
use Http\Message\StreamFactory;
1921
use Http\Message\UriFactory;
20-
use Psr\Http\Client\ClientInterface;
21-
use Psr\Http\Client\NetworkExceptionInterface;
22-
use Psr\Http\Client\RequestExceptionInterface;
22+
use Http\Promise\Promise;
23+
use Http\Promise\RejectedPromise;
24+
use Nyholm\Psr7\Factory\Psr17Factory;
25+
use Nyholm\Psr7\Request;
26+
use Nyholm\Psr7\Uri;
27+
use Psr\Http\Message\RequestFactoryInterface;
2328
use Psr\Http\Message\RequestInterface;
2429
use Psr\Http\Message\ResponseFactoryInterface;
25-
use Psr\Http\Message\ResponseInterface;
30+
use Psr\Http\Message\ResponseInterface as Psr7Respon 1E80 seInterface;
2631
use Psr\Http\Message\StreamFactoryInterface;
2732
use Psr\Http\Message\StreamInterface;
33+
use Psr\Http\Message\UriFactoryInterface;
2834
use Psr\Http\Message\UriInterface;
35+
use Symfony\Component\HttpClient\Response\HttplugPromise;
36+
use Symfony\Component\HttpClient\Response\ResponseTrait;
37+
use Symfony\Component\HttpClient\Response\StreamWrapper;
38+
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
2939
use Symfony\Contracts\HttpClient\HttpClientInterface;
40+
use Symfony\Contracts\HttpClient\ResponseInterface;
3041

31-
if (!interface_exists(HttpClient::class)) {
42+
if (!interface_exists(HttplugInterface::class)) {
3243
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug".');
3344
}
3445

35-
if (!interface_exists(ClientInterface::class)) {
36-
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client".');
37-
}
38-
3946
if (!interface_exists(RequestFactory::class)) {
4047
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7".');
4148
}
4249

4350
/**
4451
* An adapter to turn a Symfony HttpClientInterface into an Httplug client.
4552
*
46-
* Run "composer require psr/http-client" to install the base ClientInterface. Run
47-
* "composer require nyholm/psr7" to install an efficient implementation of response
53+
* Run "composer require nyholm/psr7" to install an efficient implementation of response
4854
* and stream factories with flex-provided autowiring aliases.
4955
*
5056
* @author Nicolas Grekas <p@tchwork.com>
5157
*/
52-
final class HttplugClient implements HttpClient, RequestFactory, StreamFactory, UriFactory
58+
final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestFactory, StreamFactory, UriFactory
5359
{
5460
private $client;
61+
private $responseFactory;
62+
private $streamFactory;
63+
private $promisePool = [];
64+
private $pendingResponse;
5565

5666
public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null)
5767
{
58-
$this->client = new Psr18Client($client, $responseFactory, $streamFactory);
68+
$this->client = $client ?? HttpClient::create();
69+
$this->responseFactory = $responseFactory;
70+
$this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null);
71+
$this->promisePool = new \SplObjectStorage();
72+
73+
if (null !== $this->responseFactory && null !== $this->streamFactory) {
74+
return;
75+
}
76+
77+
if (!class_exists(Psr17Factory::class)) {
78+
throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
79+
}
80+
81+
$psr17Factory = new Psr17Factory();
82+
$this->responseFactory = $this->responseFactory ?? $psr17Factory;
83+
$this->streamFactory = $this->streamFactory ?? $psr17Factory;
5984
}
6085

6186
/**
6287
* {@inheritdoc}
6388
*/
64-
public function sendRequest(RequestInterface $request): ResponseInterface
89+
public function sendRequest(RequestInterface $request): Psr7ResponseInterface
6590
{
6691
try {
67-
return $this->client->sendRequest($request);
68-
} catch (RequestExceptionInterface $e) {
69-
throw new RequestException($e->getMessage(), $request, $e);
70-
} catch (NetworkExceptionInterface $e) {
92+
return $this->createPsr7Response($this->sendPsr7Request($request));
93+
} catch (TransportExceptionInterface $e) {
7194
throw new NetworkException($e->getMessage(), $request, $e);
7295
}
7396
}
7497

98+
/**
99+
* {@inheritdoc}
100+
*
101+
* @return HttplugPromise
102+
*/
103+
public function sendAsyncRequest(RequestInterface $request): Promise
104+
{
105+
if (!class_exists(GuzzlePromise::class)) {
106+
throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__));
107+
}
108+
109+
try {
110+
$response = $this->sendPsr7Request($request, true);
111+
} catch (NetworkException $e) {
112+
return new RejectedPromise($e);
113+
}
114+
115+
$cancel = function () use ($response) {
116+
$response->cancel();
117+
unset($this->promisePool[$response]);
118+
};
119+
120+
$promise = new GuzzlePromise(function () use ($response) {
121+
$this->pendingResponse = $response;
122+
$this->wait();
123+
}, $cancel);
124+
125+
$this->promisePool[$response] = [$request, $promise];
126+
127+
return new HttplugPromise($promise, $cancel);
128+
}
129+
130+
/**
131+
* Resolve pending promises that complete before the timeouts are reached.
132+
*
133+
* When $maxDuration is null and $idleTimeout is reached, promises are rejected.
134+
*
135+
* @return int The number of remaining pending promises
136+
*/
137+
public function wait(float $maxDuration = null, float $idleTimeout = null): int
138+
{
139+
$pendingResponse = $this->pendingResponse;
140+
$this->pendingResponse = null;
141+
142+
if (null !== $maxDuration) {
143+
$startTime = microtime(true);
144+
$idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration));
145+
$remainingDuration = $maxDuration;
146+
}
147+
148+
do {
149+
foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) {
150+
try {
151+
if (null !== $maxDuration && $chunk->isTimeout()) {
152+
goto check_duration;
153+
}
154+
155+
if ($chunk->isFirst()) {
156+
// Deactivate throwing on 3/4/5xx
157+
$response->getStatusCode();
158+
}
159+
160+
if (!$chunk->isLast()) {
161+
goto check_duration;
162+
}
163+
164+
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
165+
unset($this->promisePool[$response]);
166+
$promise->resolve($this->createPsr7Response($response, true));
167+
}
168+
} catch (\Exception $e) {
169+
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
170+
unset($this->promisePool[$response]);
171+
172+
if ($e instanceof TransportExceptionInterface) {
173+
$e = new NetworkException($e->getMessage(), $request, $e);
174+
}
175+
176+
$promise->reject($e);
177+
}
178+
}
179+
180+
if ($pendingResponse === $response) {
181+
return \count($this->promisePool);
182+
}
183+
184+
check_duration:
185+
if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) {
186+
$idleTimeout = $remainingDuration / 5;
187+
break;
188+
}
189+
}
190+
191+
if (!$count = \count($this->promisePool)) {
192+
return 0;
193+
}
194+
} while (null !== $maxDuration && 0 < $remainingDuration);
195+
196+
return $count;
197+
}
198+
75199
/**
76200
* {@inheritdoc}
77201
*/
78202
public function createRequest($method, $uri, array $headers = [], $body = null, $protocolVersion = '1.1'): RequestInterface
79203
{
80-
$request = $this->client
81-
->createRequest($method, $uri)
204+
if ($this->responseFactory instanceof RequestFactoryInterface) {
205+
$request = $this->responseFactory->createRequest($method, $uri);
206+
} elseif (!class_exists(Request::class)) {
207+
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
208+
} else {
209+
$request = new Request($method, $uri);
210+
}
211+
212+
$request = $request
82213
->withProtocolVersion($protocolVersion)
83214
->withBody($this->createStream($body))
84215
;
@@ -100,27 +231,84 @@ public function createStream($body = null): StreamInterface
100231
}
101232

102233
if (\is_string($body ?? '')) {
103-
$body = $this->client->createStream($body ?? '');
104-
105-
if ($body->isSeekable()) {
106-
$body->seek(0);
107-
}
108-
109-
return $body;
234+
$stream = $this->streamFactory->createStream($body ?? '');
235+
} elseif (\is_resource($body)) {
236+
$stream = $this->streamFactory->createStreamFromResource($body);
237+
} else {
238+
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
110239
}
111240

112-
if (\is_resource($body)) {
113-
return $this->client->createStreamFromResource($body);
241+
if ($stream->isSeekable()) {
242+
$stream->seek(0);
114243
}
115244

116-
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
245+
return $stream;
117246
}
118247

119248
/**
120249
* {@inheritdoc}
121250
*/
122-
public function createUri($uri = ''): UriInterface
251+
public function createUri($uri): UriInterface
123252
{
124-
return $uri instanceof UriInterface ? $uri : $this->client->createUri($uri);
253+
if ($uri instanceof UriInterface) {
254+
return $uri;
255+
}
256+
257+
if ($this->responseFactory instanceof UriFactoryInterface) {
258+
return $this->responseFactory->createUri($uri);
259+
}
260+
261+
if (!class_exists(Uri::class)) {
262+
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
263+
}
264+
265+
return new Uri($uri);
266+
}
267+
268+
private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface
269+
{
270+
try {
271+
$body = $request->getBody();
272+
273+
if ($body->isSeekable()) {
274+
$body->seek(0);
275+
}
276+
277+
return $this->client->request($request->getMethod(), (string) $request->getUri(), [
278+
'headers' => $request->getHeaders(),
279+
'body' => $body->getContents(),
280+
'http_version' => '1.0' === $request->getProtocolVersion() ? '1.0' : null,
281+
'buffer' => $buffer,
282+
]);
283+
} catch (\InvalidArgumentException $e) {
284+
throw new RequestException($e->getMessage(), $request, $e);
285+
} catch (TransportExceptionInterface $e) {
286+
throw new NetworkException($e->getMessage(), $request, $e);
287+
}
288+
}
289+
290+
private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
291+
{
292+
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
293+
294+
foreach ($response->getHeaders(false) as $name => $values) {
295+
foreach ($values as $value) {
296+
$psrResponse = $psrResponse->withAddedHeader($name, $value);
297+
}
298+
}
299+
300+
if (isset(class_uses($response)[ResponseTrait::class])) {
301+
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
302+
} elseif (!$buffer) {
303+
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
304+
} else {
305+
$body = $this->streamFactory->createStream($response->getContent(false));
306+
}
307+
308+
if ($body->isSeekable()) {
309+
$body->seek(0);
310+
}
311+
312+
return $psrResponse->withBody($body);
125313
}
126314
}

0 commit comments

Comments
 (0)
0