8000 [HttpClient] Fix content swallowed by AsyncClient initializer by jderusse · Pull Request #39228 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[HttpClient] Fix content swallowed by AsyncClient initializer #39228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix content swallowed by AsyncClient initializer
  • Loading branch information
jderusse authored and nicolas-grekas committed Dec 10, 2020
commit d32427169142bb55f39ba6c90ed46c8379f55bda
42 changes: 35 additions & 7 deletions src/Symfony/Component/HttpClient/Response/AsyncResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\HttpClient\Response;

use Symfony\Component\HttpClient\Chunk\ErrorChunk;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\ChunkInterface;
Expand All @@ -34,6 +35,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
private $response;
private $info = ['canceled' => false];
private $passthru;
private $stream;
private $lastYielded = false;

/**
Expand Down Expand Up @@ -226,6 +228,19 @@ public static function stream(iterable $responses, float $timeout = null, string

$asyncMap[$r->response] = $r;
$wrappedResponses[] = $r->response;

if ($r->stream) {
yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);

if (!isset($asyncMap[$response])) {
array_pop($wrappedResponses);
}

if ($r->response !== $response && !isset($asyncMap[$r->response])) {
$asyncMap[$r->response] = $r;
$wrappedResponses[] = $r->response;
}
}
}

if (!$client) {
Expand Down Expand Up @@ -286,6 +301,7 @@ public static function stream(iterable $responses, float $timeout = null, string

private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
{
$r->stream = null;
$response = $r->response;
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
if (null === $stream = ($r->passthru)($chunk, $context)) {
Expand All @@ -295,32 +311,39 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte

return;
}
$chunk = null;

if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
}
$r->stream = $stream;

yield from self::passthruStream($response, $r, null, $asyncMap);
}

private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
{
while (true) {
try {
if (null !== $chunk) {
$stream->next();
if (null !== $chunk && $r->stream) {
$r->stream->next();
}

if (!$stream->valid()) {
if (!$r->stream || !$r->stream->valid() || !$r->stream) {
$r->stream = null;
break;
}
} catch (\Throwable $e) {
unset($asyncMap[$response]);
$r->stream = null;
$r->info['error'] = $e->getMessage();
$r->response->cancel();

yield $r => $chunk = new ErrorChunk($r->offset, $e);
$chunk->didThrow() ?: $chunk->getContent();
unset($asyncMap[$response]);
break;
}

$chunk = $stream->current();
$chunk = $r->stream->current();

if (!$chunk instanceof ChunkInterface) {
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
Expand Down Expand Up @@ -356,6 +379,12 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte
}
}

if (null !== $chunk->getError() || $chunk->isLast()) {
$stream = $r->stream;
$r->stream = null;
unset($asyncMap[$response]);
}

if (null === $chunk->getError()) {
$r->offset += \strlen($content);

Expand Down Expand Up @@ -387,7 +416,6 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte
$chunk->didThrow() ?: $chunk->getContent();
}

unset($asyncMap[$response]);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/HttpClient/RetryableHttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public function request(string $method, string $url, array $options = []): Respo

$context->getResponse()->cancel();

$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk instanceof LastChunk ? $content : null, $exception);
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk->isLast() ? $content : null, $exception);
++$retryCount;

$this->logger->info('Try #{count} after {delay}ms'.($exception ? ': '.$exception->getMessage() : ', status code: '.$context->getStatusCode()), [
Expand Down
21 changes: 21 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,25 @@ public function testInfoPassToDecorator()
$this->assertSame('test', $lastInfo['foo']);
$this->assertArrayHasKey('previous_info', $lastInfo);
}

public function testMultipleYieldInInitializer()
{
$first = null;
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$first) {
if ($chunk->isFirst()) {
$first = $chunk;

return;
}
$context->passthru();
yield $first;
yield $context->createChunk('injectedFoo');
yield $chunk;
});

$response = $client->request('GET', 'http://localhost:8057/404', ['timeout' => 0.1]);

$this->assertSame(404, $response->getStatusCode());
$this->assertStringContainsString('injectedFoo', $response->getContent(false));
}
}
25 changes: 25 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/RetryableHttpClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,31 @@ public function shouldRetry(AsyncContext $context, ?string $responseContent, ?Tr
self::assertSame(200, $response->getStatusCode());
}

public function testRetryWithBodyKeepContent()
{
$client = new RetryableHttpClient(
new MockHttpClient([
new MockResponse('my bad', ['http_code' => 400]),
]),
new class([400], 0) extends GenericRetryStrategy {
public function shouldRetry(AsyncContext $context, ?string $responseContent, ?TransportExceptionInterface $exception): ?bool
{
if (null === $responseContent) {
return null;
}

return 'my bad' !== $responseContent;
}
},
1
);

$response = $client->request('GET', 'http://example.com/foo-bar');

self::assertSame(400, $response->getStatusCode());
self::assertSame('my bad', $response->getContent(false));
}

public function testRetryWithBodyInvalid()
{
$client = new RetryableHttpClient(
Expand Down
0