8000 [HttpClient] Async HTTPlug client · symfony/symfony@31449b9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 31449b9

Browse files
Nyholmnicolas-grekas
authored andcommitted
[HttpClient] Async HTTPlug client
1 parent 732c034 commit 31449b9

File tree

6 files changed

+311
-34
lines changed

6 files changed

+311
-34
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
"doctrine/orm": "~2.4,>=2.4.5",
109109
"doctrine/reflection": "~1.0",
110110
"doctrine/doctrine-bundle": "~1.4",
111+
"guzzlehttp/promises": "^1.3.1",
111112
"masterminds/html5": "^2.6",
112113
"monolog/monolog": "^1.25.1",
113114
"nyholm/psr7": "^1.0",

src/Symfony/Component/HttpClient/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ CHANGELOG
55
-----
66

77
* added `StreamWrapper`
8-
* added `HttplugClient`
8+
* added `HttplugClient` with support for sync and async requests
99
* added `max_duration` option
1010
* added support for NTLM authentication
1111
* added `$response->toStream()` to cast responses to regular PHP streams

src/Symfony/Component/HttpClient/HttplugClient.php

Lines changed: 175 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,74 +11,159 @@
1111

1212
namespace Symfony\Component\HttpClient;
1313

14+
use GuzzleHttp\Promise\Promise as GuzzlePromise;
1415
use Http\Client\Exception\NetworkException;
1516
use Http\Client\Exception\RequestException;
16-
use Http\Client\HttpClient;
17+
use Http\Client\HttpAsyncClient;
18+
use Http\Client\HttpClient as HttplugInterface;
1719
use Http\Message\RequestFactory;
1820
use Http\Message\StreamFactory;
1921
use Http\Message\UriFactory;
20-
use Psr\Http\Client\ClientInterface;
21-
use Psr\Http\Client\NetworkExceptionInterface;
22-
use Psr\Http\Client\RequestExceptionInterface;
22+
use Http\Promise\Promise;
23+
use Http\Promise\RejectedPromise;
24+
use Nyholm\Psr7\Factory\Psr17Factory;
25+
use Nyholm\Psr7\Request;
26+
use Nyholm\Psr7\Uri;
27+
use Psr\Http\Message\RequestFactoryInterface;
2328
use Psr\Http\Message\RequestInterface;
2429
use Psr\Http\Message\ResponseFactoryInterface;
25-
use Psr\Http\Message\ResponseInterface;
30+
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
2631
use Psr\Http\Message\StreamFactoryInterface;
2732
use Psr\Http\Message\StreamInterface;
33+
use Psr\Http\Message\UriFactoryInterface;
2834
use Psr\Http\Message\UriInterface;
35+
use Symfony\Component\HttpClient\Response\HttplugPromise;
36+
use Symfony\Component\HttpClient\Response\ResponseTrait;
37+
use Symfony\Component\HttpClient\Response\StreamWrapper;
38+
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
2939
use Symfony\Contracts\HttpClient\HttpClientInterface;
40+
use Symfony\Contracts\HttpClient\ResponseInterface;
3041

31-
if (!interface_exists(HttpClient::class)) {
42+
if (!interface_exists(HttplugInterface::class)) {
3243
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug".');
3344
}
3445

35-
if (!interface_exists(ClientInterface::class)) {
36-
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client".');
37-
}
38-
3946
if (!interface_exists(RequestFactory::class)) {
4047
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7".');
4148
}
4249

4350
/**
4451
* An adapter to turn a Symfony HttpClientInterface into an Httplug client.
4552
*
46-
* Run "composer require psr/http-client" to install the base ClientInterface. Run
47-
* "composer require nyholm/psr7" to install an efficient implementation of response
53+
* Run "composer require nyholm/psr7" to install an efficient implementation of response
4854
* and stream factories with flex-provided autowiring aliases.
4955
*
5056
* @author Nicolas Grekas <p@tchwork.com>
5157
*/
52-
final class HttplugClient implements HttpClient, RequestFactory, StreamFactory, UriFactory
58+
final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestFactory, StreamFactory, UriFactory
5359
{
5460
private $client;
61+
private $responseFactory;
62+
private $streamFactory;
63+
private $promisePool = [];
5564

5665
public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null)
5766
{
58-
$this->client = new Psr18Client($client, $responseFactory, $streamFactory);
67+
$this->client = $client ?? HttpClient::create();
68+
$this->responseFactory = $responseFactory;
69+
$this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null);
70+
$this->promisePool = new \SplObjectStorage();
71+
72+
if (null !== $this->responseFactory && null !== $this->streamFactory) {
73+
return;
74+
}
75+
76+
if (!class_exists(Psr17Factory::class)) {
77+
throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
78+
}
79+
80+
$psr17Factory = new Psr17Factory();
81+
$this->responseFactory = $this->responseFactory ?? $psr17Factory;
82+
$this->streamFactory = $this->streamFactory ?? $psr17Factory;
5983
}
6084

6185
/**
6286
* {@inheritdoc}
6387
*/
64-
public function sendRequest(RequestInterface $request): ResponseInterface
88+
public function sendRequest(RequestInterface $request): Psr7ResponseInterface
6589
{
6690
try {
67-
return $this->client->sendRequest($request);
68-
} catch (RequestExceptionInterface $e) {
69-
throw new RequestException($e->getMessage(), $request, $e);
70-
} catch (NetworkExceptionInterface $e) {
91+
return $this->createPsr7Response($this->sendPsr7Request($request));
92+
} catch (TransportExceptionInterface $e) {
7193
throw new NetworkException($e->getMessage(), $request, $e);
7294
}
7395
}
7496

97+
/**
98+
* {@inheritdoc}
99+
*
100+
* @return HttplugPromise
101+
*/
102+
public function sendAsyncRequest(RequestInterface $request): Promise
103+
{
104+
if (!class_exists(GuzzlePromise::class)) {
105+
throw new \LogicException('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".');
106+
}
107+
108+
try {
109+
$response = $this->sendPsr7Request($request, true);
110+
} catch (NetworkException $e) {
111+
return new RejectedPromise($e);
112+
}
113+
114+
$cancel = function () use ($response) {
115+
$response->cancel();
116+
unset($this->promisePool[$response]);
117+
};
118+
119+
return new HttplugPromise($this->promisePool[$response] = new GuzzlePromise(function () use ($request, $response) {
120+
foreach ($this->client->stream($this->promisePool) as $resp => $chunk) {
121+
try {
122+
if ($chunk->isFirst()) {
123+
// Deactivate throwing on 3/4/5xx
124+
$resp->getStatusCode();
125+
}
126+
127+
if (!$chunk->isLast()) {
128+
continue;
129+
}
130+
131+
if ($promise = $this->promisePool[$resp] ?? null) {
132+
unset($this->promisePool[$resp]);
133+
$promise->resolve($this->createPsr7Response($resp, true));
134+
}
135+
} catch (\Exception $e) {
136+
if ($e instanceof TransportExceptionInterface) {
137+
$e = new NetworkException($e->getMessage(), $request, $e);
138+
}
139+
140+
if ($promise = $this->promisePool[$resp]) {
141+
unset($this->promisePool[$resp]);
142+
$promise->reject($e);
143+
}
144+
}
145+
146+
if ($resp === $response) {
147+
break;
148+
}
149+
}
150+
}, $cancel), $cancel);
151+
}
152+
75153
/**
76154
* {@inheritdoc}
77155
*/
78156
public function createRequest($method, $uri, array $headers = [], $body = null, $protocolVersion = '1.1'): RequestInterface
79157
{
80-
$request = $this->client
81-
->createRequest($method, $uri)
158+
if ($this->responseFactory instanceof RequestFactoryInterface) {
159+
$request = $this->responseFactory->createRequest($method, $uri);
160+
} elseif (!class_exists(Request::class)) {
161+
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
162+
} else {
163+
$request = new Request($method, $uri);
164+
}
165+
166+
$request = $request
82167
->withProtocolVersion($protocolVersion)
83168
->withBody($this->createStream($body))
84169
;
@@ -100,27 +185,84 @@ public function createStream($body = null): StreamInterface
100185
}
101186

102187
if (\is_string($body ?? '')) {
103-
$body = $this->client->createStream($body ?? '');
104-
105-
if ($body->isSeekable()) {
106-
$body->seek(0);
107-
}
108-
109-
return $body;
188+
$stream = $this->streamFactory->createStream($body ?? '');
189+
} elseif (\is_resource($body)) {
190+
$stream = $this->streamFactory->createStreamFromResource($body);
191+
} else {
192+
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
110193
}
111194

112-
if (\is_resource($body)) {
113-
return $this->client->createStreamFromResource($body);
195+
if ($stream->isSeekable()) {
196+
$stream->seek(0);
114197
}
115198

116-
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
199+
return $stream;
117200
}
118201

119202
/**
120203
* {@inheritdoc}
121204
*/
122-
public function createUri($uri = ''): UriInterface
205+
public function createUri($uri): UriInterface
206+
{
207+
if ($uri instanceof UriInterface) {
208+
return $uri;
209+
}
210+
211+
if ($this->responseFactory instanceof UriFactoryInterface) {
212+
return $this->responseFactory->createUri($uri);
213+
}
214+
215+
if (!class_exists(Uri::class)) {
216+
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
217+
}
218+
219+
return new Uri($uri);
220+
}
221+
222+
public function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface
123223
{
124-
return $uri instanceof UriInterface ? $uri : $this->client->createUri($uri);
224+
try {
225+
$body = $request->getBody();
226+
227+
if ($body->isSeekable()) {
228+
$body->seek(0);
229+
}
230+
231+
return $this->client->request($request->getMethod(), (string) $request->getUri(), [
232+
'headers' => $request->getHeaders(),
233+
'body' => $body->getContents(),
234+
'http_version' => '1.0' === $request->getProtocolVersion() ? '1.0' : null,
235+
'buffer' => $buffer,
236+
]);
237+
} catch (\InvalidArgumentException $e) {
238+
throw new RequestException($e->getMessage(), $request, $e);
239+
} catch (TransportExceptionInterface $e) {
240+
throw new NetworkException($e->getMessage(), $request, $e);
241+
}
242+
}
243+
244+
private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
245+
{
246+
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
247+
248+
foreach ($response->getHeaders(false) as $name => $values) {
249+
foreach ($values as $value) {
250+
$psrResponse = $psrResponse->withAddedHeader($name, $value);
251+
}
252+
}
253+
254+
if (isset(class_uses($response)[ResponseTrait::class])) {
255+
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
256+
} elseif (!$buffer) {
257+
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
258+
} else {
259+
$body = $this->streamFactory->createStream($response->getContent(false));
260+
}
261+
262+
if ($body->isSeekable()) {
263+
$body->seek(0);
264+
}
265+
266+
return $psrResponse->withBody($body);
125267
}
126268
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpClient\Response;
13+
14+
use GuzzleHttp\Promise\PromiseInterface as GuzzlePromiseInterface;
15+
use Http\Promise\Promise as HttplugPromiseInterface;
16+
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
17+
18+
/**
19+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
20+
*
21+
* @internal
22+
*/
23+
final class HttplugPromise implements HttplugPromiseInterface
24+
{
25+
private $promise;
26+
private $cancel;
27+
28+
public function __construct(GuzzlePromiseInterface $promise, callable $cancel = null)
29+
{
30+
$this->promise = $promise;
31+
$this< 10000 /span>->cancel = $cancel;
32+
}
33+
34+
public function then(callable $onFulfilled = null, callable $onRejected = null): self
35+
{
36+
return new self($this->promise->then($onFulfilled, $onRejected));
37+
}
38+
39+
public function cancel(): void
40+
{
41+
$this->promise->cancel();
42+
}
43+
44+
/**
45+
* {@inheritdoc}
46+
*/
47+
public function getState(): string
48+
{
49+
return $this->promise->getState();
50+
}
51+
52+
/**
53+
* {@inheritdoc}
54+
*
55+
* @return Psr7ResponseInterface|mixed
56+
*/
57+
public function wait($unwrap = true)
58+
{
59+
return $this->promise->wait($unwrap);
60+
}
61+
62+
public function __destruct()
63+
{
64+
if ($this->cancel) {
65+
($this->cancel)();
66+
}
67+
}
68+
69+
public function __wakeup()
70+
{
71+
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
72+
}
73+
}

0 commit comments

Comments
 (0)
0