8000 feature #37136 [HttpClient] added support for pausing responses with … · symfony/symfony@c459c80 · GitHub
[go: up one dir, main page]

Skip to content

Commit c459c80

Browse files
committed
feature #37136 [HttpClient] added support for pausing responses with a new pause_handler callable exposed as an info item (nicolas-grekas)
This PR was merged into the 5.2-dev branch. Discussion ---------- [HttpClient] added support for pausing responses with a new `pause_handler` callable exposed as an info item | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | - | License | MIT | Doc PR | - This code sample will delay sending the request by 2 seconds: ```php $response = $client->request('GET', 'https://symfony.com/'); $response->getInfo('pause_handler')(2); ``` Unlike "competing" HTTP clients written in PHP, this one works while streaming a request/response. This means this PR allows implementing delays before retries but it also enables throttling the streams while still maintaining async/multiplexing. Returning the handler as an info item saves adding a new method and thus plays well with decorators, without requiring a new dedicated interface. While this can be used directly, the target use case is within an async-decorator, by using [the `AsyncContext::pause()` method](https://github.com/symfony/symfony/pull/36779/files#diff-1d1f61631f4f5e84634e7c3dac6f208cR89). As a bonus, this PR improves `NativeHttpClient` by making it able to count the maximum number of open connections *per-host*. Commits ------- f3cc7c1 [HttpClient] added support for pausing responses with a new `pause_handler` callable exposed as an info item
2 parents d75ef18 + f3cc7c1 commit c459c80

File tree

8 files changed

+163
-22
lines changed

8 files changed

+163
-22
lines changed

src/Symfony/Component/HttpClient/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
-----
66

77
* added `AsyncDecoratorTrait` to ease processing responses without breaking async
8+
* added support for pausing responses with a new `pause_handler` callable exposed as an info item
89

910
5.1.0
1011
-----

src/Symfony/Component/HttpClient/Internal/CurlClientState.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ final class CurlClientState extends ClientState
2626
public $pushedResponses = [];
2727
/** @var DnsCache */
2828
public $dnsCache;
29+
/** @var float[] */
30+
public $pauseExpiries = [];
31+
public $execCounter = PHP_INT_MIN;
2932

3033
public function __construct()
3134
{

src/Symfony/Component/HttpClient/Internal/NativeClientState.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ final class NativeClientState extends ClientState
3030
public $dnsCache = [];
3131
/** @var bool */
3232
public $sleep = false;
33+
/** @var int[] */
34+
public $hosts = [];
3335

3436
public function __construct()
3537
{

src/Symfony/Component/HttpClient/Response/AmpResponse.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,28 @@ public function __construct(AmpClientState $multi, Request $request, array $opti
9292
return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
9393
});
9494

95+
$info['pause_handler'] = static function (float $duration) use ($id, &$delay) {
96+
if (null !== $delay) {
97+
Loop::cancel($delay);
98+
$delay = null;
99+
}
100+
101+
if (0 < $duration) {
102+
$duration += microtime(true);
103+
Loop::disable($id);
104+
$delay = Loop::defer(static function () use ($duration, $id, &$delay) {
105+
if (0 < $duration -= microtime(true)) {
106+
$delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); });
107+
} else {
108+
$delay = null;
109+
Loop::enable($id);
110+
}
111+
});
112+
} else {
113+
Loop::enable($id);
114+
}
115+
};
116+
95117
$multi->openHandles[$id] = $id;
96118
++$multi->responseCount;
97119
}

src/Symfony/Component/HttpClient/Response/CurlResponse.php

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ public function __construct(CurlClientState $multi, $ch, array $options = null,
8989
return;
9090
}
9191

92+
$execCounter = $multi->execCounter;
93+
$this->info['pause_handler'] = static function (float $duration) use ($ch, $multi, $execCounter) {
94+
if (0 < $duration) {
95+
if ($execCounter === $multi->execCounter) {
96+
$multi->execCounter = !\is_float($execCounter) ? 1 + $execCounter : PHP_INT_MIN;
97+
curl_multi_exec($multi->handle, $execCounter);
98+
}
99+
100+
$lastExpiry = end($multi->pauseExpiries);
101+
$multi->pauseExpiries[(int) $ch] = $duration += microtime(true);
102+
if (false !== $lastExpiry && $lastExpiry > $duration) {
103+
asort($multi->pauseExpiries);
104+
}
105+
curl_pause($ch, CURLPAUSE_ALL);
106+
} else {
107+
unset($multi->pauseExpiries[(int) $ch]);
108+
curl_pause($ch, CURLPAUSE_CONT);
109+
}
110+
};
111+
92112
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
93113
curl_pause($ch, CURLPAUSE_CONT);
94114

@@ -206,7 +226,7 @@ public function __destruct()
206226
private function close(): void
207227
{
208228
$this->inflate = null;
209-
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
229+
unset($this->multi->pauseExpiries[$this->id], $this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
210230
curl_setopt($this->handle, CURLOPT_PRIVATE, '_0');
211231

212232
if (self::$performing) {
@@ -261,6 +281,7 @@ private static function perform(ClientState $multi, array &$responses = null): v
261281

262282
try {
263283
self::$performing = true;
284+
++$multi->execCounter;
264 F438 285
$active = 0;
265286
while (CURLM_CALL_MULTI_PERFORM === curl_multi_exec($multi->handle, $active));
266287

@@ -303,7 +324,29 @@ private static function select(ClientState $multi, float $timeout): int
303324
$timeout = min($timeout, 0.01);
304325
}
305326

306-
return curl_multi_select($multi->handle, $timeout);
327+
if ($multi->pauseExpiries) {
328+
$now = microtime(true);
329+
330+
foreach ($multi->pauseExpiries as $id => $pauseExpiry) {
331+
if ($now < $pauseExpiry) {
332+
$timeout = min($timeout, $pauseExpiry - $now);
333+
break;
334+
}
335+
336+
unset($multi->pauseExpiries[$id]);
337+
curl_pause($multi->openHandles[$id][0], CURLPAUSE_CONT);
338+
}
339+
}
340+
341+
if (0 !== $selected = curl_multi_select($multi->handle, $timeout)) {
342+
return $selected;
343+
}
344+
345+
if ($multi->pauseExpiries && 0 < $timeout -= microtime(true) - $now) {
346+
usleep(1E6 * $timeout);
347+
}
348+
349+
return 0;
307350
}
308351

309352
/**

src/Symfony/Component/HttpClient/Response/NativeResponse.php

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ final class NativeResponse implements ResponseInterface
3838
private $multi;
3939
private $debugBuffer;
4040
private $shouldBuffer;
41+
private $pauseExpiry = 0;
4142

4243
/**
4344
* @internal
@@ -65,6 +66,11 @@ public function __construct(NativeClientState $multi, $context, string $url, arr
6566
$this->initializer = static function (self $response) {
6667
return null === $response->remaining;
6768
};
69+
70+
$pauseExpiry = &$this->pauseExpiry;
71+
$info['pause_handler'] = static function (float $duration) use (&$pauseExpiry) {
72+
$pauseExpiry = 0 < $duration ? microtime(true) + $duration : 0;
73+
};
6874
}
6975

7076
/**
@@ -184,14 +190,19 @@ private function open(): void
184190
return;
185191
}
186192

187-
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
193+
$host = parse_url($this->info['redirect_url'] ?? $this->url, PHP_URL_HOST);
194+
$this->multi->openHandles[$this->id] = [&$this->pauseExpiry, $h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info, $host];
195+
$this->multi->hosts[$host] = 1 + ($this->multi->hosts[$host] ?? 0);
188196
}
189197

190198
/**
191199
* {@inheritdoc}
192200
*/
193201
private function close(): void
194202
{
203+
if (null !== ($host = $this->multi->openHandles[$this->id][6] ?? null) && 0 >= --$this->multi->hosts[$host]) {
204+
unset($this->multi->hosts[$host]);
205+
}
195206
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
196207
$this->handle = $this->buffer = $this->inflate = $this->onProgress = null;
197208
}
@@ -221,10 +232,18 @@ private static function schedule(self $response, array &$runningResponses): void
221232
*/
222233
private static function perform(ClientState $multi, array &$responses = null): void
223234
{
224-
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
235+
foreach ($multi->openHandles as $i => [$pauseExpiry, $h, $buffer, $onProgress]) {
236+
if ($pauseExpiry) {
237+
if (microtime(true) < $pauseExpiry) {
238+
continue;
239+
}
240+
241+
$multi->openHandles[$i][0] = 0;
242+
}
243+
225244
$hasActivity = false;
226-
$remaining = &$multi->openHandles[$i][3];
227-
$info = &$multi->openHandles[$i][4];
245+
$remaining = &$multi->openHandles[$i][4];
246+
$info = &$multi->openHandles[$i][5];
228247
$e = null;
229248

230249
// Read incoming buffer and write it to the dechunk one
@@ -285,6 +304,9 @@ private static function perform(ClientState $multi, array &$responses = null): v
285304

286305
$multi->handlesActivity[$i][] = null;
287306
$multi->handlesActivity[$i][] = $e;
307+
if (null !== ($host = $multi->openHandles[$i][6] ?? null) && 0 >= --$multi->hosts[$host]) {
308+
unset($multi->hosts[$host]);
309+
}
288310
unset($multi->openHandles[$i]);
289311
$multi->sleep = false;
290312
}
@@ -294,25 +316,22 @@ private static function perform(ClientState $multi, array &$responses = null): v
294316
return;
295317
}
296318

297-
// Create empty activity lists to tell ResponseTrait::stream() we still have pending requests
319+
$maxHosts = $multi->maxHostConnections;
320+
298321
foreach ($responses as $i => $response) {
299-
if (null === $response->remaining && null !== $response->buffer) {
300-
$multi->handlesActivity[$i] = [];
322+
if (null !== $response->remaining || null === $response->buffer) {
323+
continue;
301324
}
302-
}
303-
304-
if (\count($multi->openHandles) >= $multi->maxHostConnections) {
305-
return;
306-
}
307325

308-
// Open the next pending request - this is a blocking operation so we do only one of them
309-
foreach ($responses as $i => $response) {
310-
if (null === $response->remaining && null !== $response->buffer) {
326+
if ($response->pauseExpiry && microtime(true) < $response->pauseExpiry) {
327+
// Create empty open handles to tell we still have pending requests
328+
$multi->openHandles[$i] = [INF, null, null, null];
329+
} elseif ($maxHosts && $maxHosts > ($multi->hosts[parse_url($response->url, PHP_URL_HOST)] ?? 0)) {
330+
// Open the next pending request - this is a blocking operation so we do only one of them
311331
$response->open();
312332
$multi->sleep = false;
313333
self::perform($multi);
314-
315-
break;
334+
$maxHosts = 0;
316335
}
317336
}
318337
}
@@ -324,9 +343,32 @@ private static function perform(ClientState $multi, array &$responses = null): v
324343
*/
325344
private static function select(ClientState $multi, float $timeout): int
326345
{
327-
$_ = [];
328-
$handles = array_column($multi->openHandles, 0);
346+
if (!$multi->sleep = !$multi->sleep) {
347+
return -1;
348+
}
349+
350+
$_ = $handles = [];
351+
$now = null;
352+
353+
foreach ($multi->openHandles as [$pauseExpiry, $h]) {
354+
if (null === $h) {
355+
continue;
356+
}
357+
358+
if ($pauseExpiry && ($now ?? $now = microtime(true)) < $pauseExpiry) {
359+
$timeout = min($timeout, $pauseExpiry - $now);
360+
continue;
361+
}
362+
363+
$handles[] = $h;
364+
}
365+
366+
if (!$handles) {
367+
usleep(1E6 * $timeout);
368+
369+
return 0;
370+
}
329371

330-
return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
372+
return stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
331373
}
332374
}

src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,30 @@ public function testHttp2PushVulcain()
176176
$this->assertSame($expected, $logger->logs);
177177
}
178178

179+
public function testPause()
180+
{
181+
$client = $this->getHttpClient(__FUNCTION__);
182+
$response = $client->request('GET', 'http://localhost:8057/');
183+
184+
$time = microtime(true);
185+
$response->getInfo('pause_handler')(0.5);
186+
$this->assertSame(200, $response->getStatusCode());
187+
$this->assertTrue(0.5 <= microtime(true) - $time);
188+
189+
$response = $client->request('GET', 'http://localhost:8057/');
190+
191+
$time = microtime(true);
192+
$response->getInfo('pause_handler')(1);
193+
194+
foreach ($client->stream($response, 0.5) as $chunk) {
195+
$this->assertTrue($chunk->isTimeout());
196+
$response->cancel();
197+
}
198+
$response = null;
199+
$this->assertTrue(1.0 > microtime(true) - $time);
200+
$this->assertTrue(0.5 <= microtime(true) - $time);
201+
}
202+
179203
public function testHttp2PushVulcainWithUnusedResponse()
180204
{
181205
$client = $this->getHttpClient(__FUNCTION__);

src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ protected function getHttpClient(string $testCase): HttpClientInterface
198198
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
199199
break;
200200

201+
case 'testPause':
202+
$this->markTestSkipped("MockHttpClient doesn't support pauses by default");
203+
break;
204+
201205
case 'testGetRequest':
202206
array_unshift($headers, 'HTTP/1.1 200 OK');
203207
$responses[] = new MockResponse($body, ['response_headers' => $headers]);

0 commit comments

Comments
 (0)
0