8000 [HttpClient] add ResponseSetMonitor to trigger callbacks on response … · symfony/symfony@74c2003 · GitHub
[go: up one dir, main page]

Skip to content

Commit 74c2003

Browse files
[HttpClient] add ResponseSetMonitor to trigger callbacks on response completion
1 parent efaa154 commit 74c2003

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpClient;
13+
14+
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
15+
use Symfony\Contracts\HttpClient\HttpClientInterface;
16+
use Symfony\Contracts\HttpClient\ResponseInterface;
17+
18+
/**
19+
* Monitors a set of responses and triggers callbacks as they complete.
20+
*
21+
* @author Nicolas Grekas <p@tchwork.com>
22+
*/
23+
final class ResponseSetMonitor implements \Countable
24+
{
25+
private $client;
26+
private $onHeaders;
27+
private $onBody;
28+
private $onError;
29+
private $responses;
30+
31+
public function __construct(HttpClientInterface $client, callable $onHeaders = null, callable $onBody = null, callable $onError = null)
32+
{
33+
$this->client = $client;
34+
$this->onHeaders = $onHeaders;
35+
$this->onBody = $onBody;
36+
$this->onError = $onError;
37+
$this->responses = new \SplObjectStorage();
38+
}
8000 39+
40+
/**
41+
* Adds a response to the monitored set.
42+
*
43+
* The response must be created with the same client that was passed to the constructor.
44+
*/
45+
public function add(ResponseInterface $response, callable $onHeaders = null, callable $onBody = null, callable $onError = null): void
46+
{
47+
$this->responses[$response] = [$onHeaders, $onBody, $onError];
48+
$this->tick();
49+
}
50+
51+
/**
52+
* Monitors pending responses, moving them forward as network activity happens.
53+
*
54+
* @param float $timeout The maximum duration of the tick
55+
*
56+
* @return int The number of responses remaining in the set after the tick
57+
*/
58+
public function tick(float $timeout = 0.0): int
59+
{
60+
return $this->wait($timeout, false);
61+
}
62+
63+
/**
64+
* Completes all pending responses.
65+
*
66+
* @param float|null $idleTimeout The maximum inactivy timeout before erroring idle responses
67+
*/
68+
public function complete(float $idleTimeout = null): void
69+
{
70+
$this->wait($idleTimeout, true);
71+
}
72+
73+
/**
74+
* Cancels all pending responses.
75+
*/
76+
public function cancel(): void
77+
{
78+
foreach ($this->responses as $response) {
79+
$response->cancel();
80+
}
81+
82+
$this->responses = new \SplObjectStorage();
83+
}
84+
85+
/**
86+
* Returns the number of pending responses.
87+
*/
88+
public function count(): int
89+
{
90+
return \count($this->responses);
91+
}
92+
93+
public function __destruct()
94+
{
95+
$this->wait(null, true);
96+
}
97+
98+
private function wait(?float $timeout, bool $errorOnTimeout): int
99+
{
100+
$error = null;
101+
$remainingTimeout = $timeout;
102+
103+
if (!$errorOnTimeout && $remainingTimeout) {
104+
$startTime = microtime(true);
105+
}
106+
107+
foreach ($this->client->stream($this->responses, $remainingTimeout) as $response => $chunk) {
108+
try {
109+
if ($chunk->isTimeout() && !$errorOnTimeout) {
110+
continue;
111+
}
112+
113+
if (!$chunk->isFirst() && !$chunk->isLast()) {
114+
continue;
115+
}
116+
117+
[$onHeaders, $onBody] = $this->responses[$response];
118+
$onHeaders = $onHeaders ?? $this->onHeaders;
119+
$onBody = $onBody ?? $this->onBody;
120+
121+
if (null !== $onHeaders && $chunk->isFirst()) {
122+
$onHeaders($response);
123+
}
124+
125+
if (null !== $onBody && $chunk->isLast()) {
126+
$onBody($response);
127+
}
128+
129+
if (null === $onBody || $chunk->isLast()) {
130+
unset($this->responses[$response]);
131+
}
132+
} catch (ExceptionInterface $e) {
133+
[, , $onError] = $this->responses[$response];
134+
$onError = $onError ?? $this->onError;
135+
unset($this->responses[$response]);
136+
137+
if (null !== $onError) {
138+
$onError($e, $response);
139+
} else {
140+
$error = $error ?? $e;
141+
}
142+
} finally {
143+
if (!$errorOnTimeout && $remainingTimeout) {
144+
$remainingTimeout = max(0.0, $timeout - microtime(true) + $startTime);
145+
}
146+
}
147+
}
148+
149+
if (null !== $error) {
150+
throw $error;
151+
}
152+
153+
return \count($this->responses);
154+
}
155+
}

0 commit comments

Comments
 (0)
0