diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..e3b263e --- /dev/null +++ b/.gitattributes @@ -0,0 +1,8 @@ +/.gitattributes export-ignore +/.github/ export-ignore +/.gitignore export-ignore +/examples/ export-ignore +/phpstan.neon.dist export-ignore +/phpunit.xml.dist export-ignore +/phpunit.xml.legacy export-ignore +/tests/ export-ignore diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..b2645b4 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,74 @@ +name: CI + +on: + push: + pull_request: + +jobs: + PHPUnit: + name: PHPUnit (PHP ${{ matrix.php }}) + runs-on: ubuntu-24.04 + strategy: + matrix: + php: + - 8.4 + - 8.3 + - 8.2 + - 8.1 + - 8.0 + - 7.4 + - 7.3 + - 7.2 + - 7.1 + steps: + - uses: actions/checkout@v4 + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: xdebug + ini-file: development + - run: composer install + - run: vendor/bin/phpunit --coverage-text + if: ${{ matrix.php >= 7.3 }} + - run: vendor/bin/phpunit --coverage-text -c phpunit.xml.legacy + if: ${{ matrix.php < 7.3 }} + - run: time php examples/91-benchmark-throughput.php + + PHPUnit-macOS: + name: PHPUnit (macOS) + runs-on: macos-14 + continue-on-error: true + steps: + - uses: actions/checkout@v4 + - uses: shivammathur/setup-php@v2 + with: + php-version: 8.2 + coverage: xdebug + ini-file: development + - run: composer install + - run: vendor/bin/phpunit --coverage-text + - run: time php examples/91-benchmark-throughput.php + + PHPStan: + name: PHPStan (PHP ${{ matrix.php }}) + runs-on: ubuntu-24.04 + strategy: + matrix: + php: + - 8.4 + - 8.3 + - 8.2 + - 8.1 + - 8.0 + - 7.4 + - 7.3 + - 7.2 + - 7.1 + steps: + - uses: actions/checkout@v4 + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none + - run: composer install + - run: vendor/bin/phpstan diff --git a/.gitignore b/.gitignore index 987e2a2..c8153b5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -composer.lock -vendor +/composer.lock +/vendor/ diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 37a14d7..0000000 --- a/.travis.yml +++ /dev/null @@ -1,48 +0,0 @@ -language: php - -php: -# - 5.3 # requires old distro, see below - - 5.4 - - 5.5 - - 5.6 - - 7.0 -# - 7.0 # Mac OS X test setup, see below - - 7.1 - - nightly # ignore errors, see below - - hhvm # ignore errors, see below - -# lock distro so new future defaults will not break the build -dist: trusty - -matrix: - include: - - php: 5.3 - dist: precise - include: - - os: osx - language: generic - php: 7.0 # just to look right on travis - env: - - PACKAGE: php70 - allow_failures: - - php: nightly - - php: hhvm - -install: - # OSX install inspired by https://github.com/kiler129/TravisCI-OSX-PHP - - | - if [[ "${TRAVIS_OS_NAME}" == "osx" ]]; then - brew tap homebrew/homebrew-php - echo "Installing PHP ..." - brew install "${PACKAGE}" - brew install "${PACKAGE}"-xdebug - brew link "${PACKAGE}" - echo "Installing composer ..." - curl -s http://getcomposer.org/installer | php - mv composer.phar /usr/local/bin/composer - fi - - composer install --no-interaction - -script: - - vendor/bin/phpunit --coverage-text - - time php examples/benchmark-throughput.php diff --git a/CHANGELOG.md b/CHANGELOG.md index ee2ba60..b4e2307 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,105 @@ # Changelog +## 1.3.0 (2023-06-16) + +* Feature: Full PHP 8.1 and PHP 8.2 compatibility. + (#160 by @SimonFrings, #165 by @clue and #169 by @WyriHaximus) + +* Feature: Avoid unneeded syscall when creating non-blocking `DuplexResourceStream`. + (#164 by @clue) + +* Minor documentation improvements. + (#161 by @mrsimonbennett, #162 by @SimonFrings and #166 by @nhedger) + +* Improve test suite and project setup and report failed assertions. + (#168 and #170 by @clue and #163 by @SimonFrings) + +## 1.2.0 (2021-07-11) + +A major new feature release, see [**release announcement**](https://clue.engineering/2021/announcing-reactphp-default-loop). + +* Feature: Simplify usage by supporting new [default loop](https://reactphp.org/event-loop/#loop). + (#159 by @clue) + + ```php + // old (still supported) + $stream = new ReadableResourceStream($resource, $loop); + $stream = new WritabeResourceStream($resource, $loop); + $stream = new DuplexResourceStream($resource, $loop); + + // new (using default loop) + $stream = new ReadableResourceStream($resource); + $stream = new WritabeResourceStream($resource); + $stream = new DuplexResourceStream($resource); + ``` + +* Improve test suite, use GitHub actions for continuous integration (CI), + update PHPUnit config, run tests on PHP 8 and add full core team to the license. + (#153, #156 and #157 by @SimonFrings and #154 by @WyriHaximus) + +## 1.1.1 (2020-05-04) + +* Fix: Fix faulty write buffer behavior when sending large data chunks over TLS (Mac OS X only). + (#150 by @clue) + +* Minor code style improvements to fix phpstan analysis warnings and + add `.gitattributes` to exclude dev files from exports. + (#140 by @flow-control and #144 by @reedy) + +* Improve test suite to run tests on PHP 7.4 and simplify test matrix. + (#147 by @clue) + +## 1.1.0 (2019-01-01) + +* Improvement: Increase performance by optimizing global function and constant look ups. + (#137 by @WyriHaximus) + +* Travis: Test against PHP 7.3. + (#138 by @WyriHaximus) + +* Fix: Ignore empty reads. + (#139 by @WyriHaximus) + +## 1.0.0 (2018-07-11) + +* First stable LTS release, now following [SemVer](https://semver.org/). + We'd like to emphasize that this component is production ready and battle-tested. + We plan to support all long-term support (LTS) releases for at least 24 months, + so you have a rock-solid foundation to build on top of. + +> Contains no other changes, so it's actually fully compatible with the v0.7.7 release. + +## 0.7.7 (2018-01-19) + +* Improve test suite by fixing forward compatibility with upcoming EventLoop + releases, avoid risky tests and add test group to skip integration tests + relying on internet connection and apply appropriate test timeouts. + (#128, #131 and #132 by @clue) + +## 0.7.6 (2017-12-21) + +* Fix: Work around reading from unbuffered pipe stream in legacy PHP < 5.4.28 and PHP < 5.5.12 + (#126 by @clue) + +* Improve test suite by simplifying test bootstrapping logic via Composer and + test against PHP 7.2 + (#127 by @clue and #124 by @carusogabriel) + +## 0.7.5 (2017-11-20) + +* Fix: Igore excessive `fopen()` mode flags for `WritableResourceStream` + (#119 by @clue) + +* Fix: Fix forward compatibility with upcoming EventLoop releases + (#121 by @clue) + +* Restructure examples to ease getting started + (#123 by @clue) + +* Improve test suite by adding forward compatibility with PHPUnit 6 and + ignore Mac OS X test failures for now until Travis tests work again + (#122 by @gabriel-caruso and #120 by @clue) + ## 0.7.4 (2017-10-11) * Fix: Remove event listeners from `CompositeStream` once closed and @@ -98,6 +198,9 @@ * Feature: Explicitly allow custom events and exclude any semantics (#97 by @clue) +* Strict definition for event callback functions + (#101 by @clue) + * Support legacy PHP 5.3 through PHP 7.1 and HHVM and improve usage documentation (#100 and #102 by @clue) diff --git a/LICENSE b/LICENSE index a808108..d6f8901 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,6 @@ -Copyright (c) 2012 Igor Wiedler, Chris Boden +The MIT License (MIT) + +Copyright (c) 2012 Christian Lück, Cees-Jan Kiewiet, Jan Sorgalla, Chris Boden, Igor Wiedler Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 6f1a6bb..870a907 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,25 @@ -# Stream Component +# Stream -[![Build Status](https://travis-ci.org/reactphp/stream.svg?branch=master)](https://travis-ci.org/reactphp/stream) +[![CI status](https://github.com/reactphp/stream/actions/workflows/ci.yml/badge.svg)](https://github.com/reactphp/stream/actions) +[![installs on Packagist](https://img.shields.io/packagist/dt/react/stream?color=blue&label=installs%20on%20Packagist)](https://packagist.org/packages/react/stream) Event-driven readable and writable streams for non-blocking I/O in [ReactPHP](https://reactphp.org/). +> **Development version:** This branch contains the code for the upcoming v3 +> release. For the code of the current stable v1 release, check out the +> [`1.x` branch](https://github.com/reactphp/stream/tree/1.x). +> +> The upcoming v3 release will be the way forward for this package. However, +> we will still actively support v1 for those not yet on the latest version. +> See also [installation instructions](#install) for more details. + In order to make the [EventLoop](https://github.com/reactphp/event-loop) -easier to use, this component introduces the concept of "streams". +easier to use, this component introduces the powerful concept of "streams". +Streams allow you to efficiently process huge amounts of data (such as a multi +Gigabyte file download) in small chunks without having to store everything in +memory at once. They are very similar to the streams found in PHP itself, but have an interface more suited for async, non-blocking I/O. -Mainly it provides interfaces for readable and writable streams, plus a file -descriptor based implementation with an in-memory write buffer. **Table of contents** @@ -101,7 +111,7 @@ from this source stream. The event receives a single mixed argument for incoming data. ```php -$stream->on('data', function ($data) { +$stream->on('data', function (mixed $data): void { echo $data; }); ``` @@ -132,7 +142,7 @@ The `end` event will be emitted once the source stream has successfully reached the end of the stream (EOF). ```php -$stream->on('end', function () { +$stream->on('end', function (): void { echo 'END'; }); ``` @@ -170,7 +180,7 @@ trying to read from this stream. The event receives a single `Exception` argument for the error instance. ```php -$server->on('error', function (Exception $e) { +$server->on('error', function (Exception $e): void { echo 'Error: ' . $e->getMessage() . PHP_EOL; }); ``` @@ -203,7 +213,7 @@ stream which should result in the same error processing. The `close` event will be emitted once the stream closes (terminates). ```php -$stream->on('close', function () { +$stream->on('close', function (): void { echo 'CLOSED'; }); ``` @@ -302,7 +312,7 @@ Re-attach the data source after a previous `pause()`. ```php $stream->pause(); -$loop->addTimer(1.0, function () use ($stream) { +Loop::addTimer(1.0, function () use ($stream): void { $stream->resume(); }); ``` @@ -314,7 +324,7 @@ See also `pause()`. #### pipe() -The `pipe(WritableStreamInterface $dest, array $options = [])` method can be used to +The `pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface` method can be used to pipe all the data from this readable source into the given writable destination. Automatically sends all incoming data to the destination. @@ -343,7 +353,7 @@ By default, this will call `end()` on the destination stream once the source stream emits an `end` event. This can be disabled like this: ```php -$source->pipe($dest, array('end' => false)); +$source->pipe($dest, ['end' => false]); ``` Note that this only applies to the `end` event. @@ -352,7 +362,7 @@ you'll have to manually close the destination stream: ```php $source->pipe($dest); -$source->on('close', function () use ($dest) { +$source->on('close', function () use ($dest): void { $dest->end('BYE!'); }); ``` @@ -446,7 +456,7 @@ The `drain` event will be emitted whenever the write buffer became full previously and is now ready to accept more data. ```php -$stream->on('drain', function () use ($stream) { +$stream->on('drain', function () use ($stream): void { echo 'Stream is now ready to accept more data'; }); ``` @@ -468,11 +478,11 @@ The event receives a single `ReadableStreamInterface` argument for the source stream. ```php -$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) { +$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream): void { echo 'Now receiving piped data'; // explicitly close target if source emits an error - $source->on('error', function () use ($stream) { + $source->on('error', function () use ($stream): void { $stream->close(); }); }); @@ -496,7 +506,7 @@ trying to write to this stream. The event receives a single `Exception` argument for the error instance. ```php -$stream->on('error', function (Exception $e) { +$stream->on('error', function (Exception $e): void { echo 'Error: ' . $e->getMessage() . PHP_EOL; }); ``` @@ -526,7 +536,7 @@ stream which should result in the same error processing. The `close` event will be emitted once the stream closes (terminates). ```php -$stream->on('close', function () { +$stream->on('close', function (): void { echo 'CLOSED'; }); ``` @@ -608,7 +618,7 @@ data until the buffer drains. The stream SHOULD send a `drain` event once the buffer is ready to accept more data. -Similarly, if the the stream is not writable (already in a closed state) +Similarly, if the stream is not writable (already in a closed state) it MUST NOT process the given `$data` and SHOULD return `false`, indicating that the caller should stop sending data. @@ -736,7 +746,7 @@ stream in order to stop waiting for the stream to flush its final data. ```php $stream->end(); -$loop->addTimer(1.0, function () use ($stream) { +Loop::addTimer(1.0, function () use ($stream): void { $stream->close(); }); ``` @@ -820,11 +830,11 @@ This can be used to represent a read-only resource like a file stream opened in readable mode or a stream such as `STDIN`: ```php -$stream = new ReadableResourceStream(STDIN, $loop); -$stream->on('data', function ($chunk) { +$stream = new ReadableResourceStream(STDIN); +$stream->on('data', function (string $chunk): void { echo $chunk; }); -$stream->on('end', function () { +$stream->on('end', function (): void { echo 'END'; }); ``` @@ -837,7 +847,7 @@ Otherwise, it will throw an `InvalidArgumentException`: ```php // throws InvalidArgumentException -$stream = new ReadableResourceStream(false, $loop); +$stream = new ReadableResourceStream(false); ``` See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write @@ -850,7 +860,7 @@ If this fails, it will throw a `RuntimeException`: ```php // throws RuntimeException on Windows -$stream = new ReadableResourceStream(STDIN, $loop); +$stream = new ReadableResourceStream(STDIN); ``` Once the constructor is called with a valid stream resource, this class will @@ -858,6 +868,12 @@ take care of the underlying stream resource. You SHOULD only use its public API and SHOULD NOT interfere with the underlying stream resource manually. +This class takes an optional `LoopInterface|null $loop` parameter that can be used to +pass the event loop instance to use for this object. You can use a `null` value +here in order to use the [default loop](https://github.com/reactphp/event-loop#loop). +This value SHOULD NOT be given unless you're sure you want to explicitly use a +given event loop instance. + This class takes an optional `int|null $readChunkSize` parameter that controls the maximum buffer size in bytes to read at once from the stream. You can use a `null` value here in order to apply its default value. @@ -873,7 +889,7 @@ This should read until the stream resource is not readable anymore mean it reached EOF. ```php -$stream = new ReadableResourceStream(STDIN, $loop, 8192); +$stream = new ReadableResourceStream(STDIN, null, 8192); ``` > PHP bug warning: If the PHP process has explicitly been started without a @@ -882,6 +898,9 @@ $stream = new ReadableResourceStream(STDIN, $loop, 8192); stream like `php test.php < /dev/null` instead of `php test.php <&-`. See [#81](https://github.com/reactphp/stream/issues/81) for more details. +> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a + `null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop). + ### WritableResourceStream The `WritableResourceStream` is a concrete implementation of the @@ -891,7 +910,7 @@ This can be used to represent a write-only resource like a file stream opened in writable mode or a stream such as `STDOUT` or `STDERR`: ```php -$stream = new WritableResourceStream(STDOUT, $loop); +$stream = new WritableResourceStream(STDOUT); $stream->write('hello!'); $stream->end(); ``` @@ -904,7 +923,7 @@ Otherwise, it will throw an `InvalidArgumentException`: ```php // throws InvalidArgumentException -$stream = new WritableResourceStream(false, $loop); +$stream = new WritableResourceStream(false); ``` See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write @@ -917,7 +936,7 @@ If this fails, it will throw a `RuntimeException`: ```php // throws RuntimeException on Windows -$stream = new WritableResourceStream(STDOUT, $loop); +$stream = new WritableResourceStream(STDOUT); ``` Once the constructor is called with a valid stream resource, this class will @@ -932,13 +951,19 @@ For this, it uses an in-memory buffer string to collect all outstanding writes. This buffer has a soft-limit applied which defines how much data it is willing to accept before the caller SHOULD stop sending further data. +This class takes an optional `LoopInterface|null $loop` parameter that can be used to +pass the event loop instance to use for this object. You can use a `null` value +here in order to use the [default loop](https://github.com/reactphp/event-loop#loop). +This value SHOULD NOT be given unless you're sure you want to explicitly use a +given event loop instance. + This class takes an optional `int|null $writeBufferSoftLimit` parameter that controls this maximum buffer size in bytes. You can use a `null` value here in order to apply its default value. This value SHOULD NOT be changed unless you know what you're doing. ```php -$stream = new WritableResourceStream(STDOUT, $loop, 8192); +$stream = new WritableResourceStream(STDOUT, null, 8192); ``` This class takes an optional `int|null $writeChunkSize` parameter that controls @@ -953,11 +978,14 @@ This can be `-1` which means "write everything available" to the underlying stream resource. ```php -$stream = new WritableResourceStream(STDOUT, $loop, null, 8192); +$stream = new WritableResourceStream(STDOUT, null, null, 8192); ``` See also [`write()`](#write) for more details. +> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a + `null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop). + ### DuplexResourceStream The `DuplexResourceStream` is a concrete implementation of the @@ -968,7 +996,7 @@ in read and write mode mode or a stream such as a TCP/IP connection: ```php $conn = stream_socket_client('tcp://google.com:80'); -$stream = new DuplexResourceStream($conn, $loop); +$stream = new DuplexResourceStream($conn); $stream->write('hello!'); $stream->end(); ``` @@ -981,7 +1009,7 @@ Otherwise, it will throw an `InvalidArgumentException`: ```php // throws InvalidArgumentException -$stream = new DuplexResourceStream(false, $loop); +$stream = new DuplexResourceStream(false); ``` See also the [`ReadableResourceStream`](#readableresourcestream) for read-only @@ -995,7 +1023,7 @@ If this fails, it will throw a `RuntimeException`: ```php // throws RuntimeException on Windows -$stream = new DuplexResourceStream(STDOUT, $loop); +$stream = new DuplexResourceStream(STDOUT); ``` Once the constructor is called with a valid stream resource, this class will @@ -1003,6 +1031,12 @@ take care of the underlying stream resource. You SHOULD only use its public API and SHOULD NOT interfere with the underlying stream resource manually. +This class takes an optional `LoopInterface|null $loop` parameter that can be used to +pass the event loop instance to use for this object. You can use a `null` value +here in order to use the [default loop](https://github.com/reactphp/event-loop#loop). +This value SHOULD NOT be given unless you're sure you want to explicitly use a +given event loop instance. + This class takes an optional `int|null $readChunkSize` parameter that controls the maximum buffer size in bytes to read at once from the stream. You can use a `null` value here in order to apply its default value. @@ -1019,7 +1053,7 @@ mean it reached EOF. ```php $conn = stream_socket_client('tcp://google.com:80'); -$stream = new DuplexResourceStream($conn, $loop, 8192); +$stream = new DuplexResourceStream($conn, null, 8192); ``` Any `write()` calls to this class will not be performed instantly, but will @@ -1039,12 +1073,15 @@ If you want to change the write buffer soft limit, you can pass an instance of ```php $conn = stream_socket_client('tcp://google.com:80'); -$buffer = new WritableResourceStream($conn, $loop, 8192); -$stream = new DuplexResourceStream($conn, $loop, null, $buffer); +$buffer = new WritableResourceStream($conn, null, 8192); +$stream = new DuplexResourceStream($conn, null, null, $buffer); ``` See also [`WritableResourceStream`](#writableresourcestream) for more details. +> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a + `null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop). + ### ThroughStream The `ThroughStream` implements the @@ -1084,19 +1121,19 @@ used to convert data, for example for transforming any structured data into a newline-delimited JSON (NDJSON) stream like this: ```php -$through = new ThroughStream(function ($data) { +$through = new ThroughStream(function (mixed $data): string { return json_encode($data) . PHP_EOL; }); $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); -$through->write(array(2, true)); +$through->write([2, true]); ``` The callback function is allowed to throw an `Exception`. In this case, the stream will emit an `error` event and then [`close()`](#close-1) the stream. ```php -$through = new ThroughStream(function ($data) { +$through = new ThroughStream(function (mixed $data): string { if (!is_string($data)) { throw new \UnexpectedValueException('Only strings allowed'); } @@ -1122,12 +1159,12 @@ This is useful for some APIs which may require a single more convenient to work with a single stream instance like this: ```php -$stdin = new ReadableResourceStream(STDIN, $loop); -$stdout = new WritableResourceStream(STDOUT, $loop); +$stdin = new ReadableResourceStream(STDIN); +$stdout = new WritableResourceStream(STDOUT); $stdio = new CompositeStream($stdin, $stdout); -$stdio->on('data', function ($chunk) use ($stdio) { +$stdio->on('data', function (string $chunk) use ($stdio): void { $stdio->write('You said: ' . $chunk); }); ``` @@ -1153,14 +1190,10 @@ The following example can be used to pipe the contents of a source file into a destination file without having to ever read the whole file into memory: ```php -$loop = new React\EventLoop\StreamSelectLoop; - -$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'), $loop); -$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'), $loop); +$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r')); +$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w')); $source->pipe($dest); - -$loop->run(); ``` > Note that this example uses `fopen()` for illustration purposes only. @@ -1174,18 +1207,18 @@ $loop->run(); The recommended way to install this library is [through Composer](https://getcomposer.org). [New to Composer?](https://getcomposer.org/doc/00-intro.md) -This will install the latest supported version: +Once released, this project will follow [SemVer](https://semver.org/). +At the moment, this will install the latest development version: ```bash -$ composer require react/stream:^0.7.4 +composer require react/stream:^3@dev ``` -More details about version upgrades can be found in the [CHANGELOG](CHANGELOG.md). +See also the [CHANGELOG](CHANGELOG.md) for details about version upgrades. This project aims to run on any platform and thus does not require any PHP -extensions and supports running on legacy PHP 5.3 through current PHP 7+ and HHVM. -It's *highly recommended to use PHP 7+* for this project due to its vast -performance improvements. +extensions and supports running on PHP 7.1 through current PHP 8+. +It's *highly recommended to use the latest supported PHP version* for this project. ## Tests @@ -1193,13 +1226,27 @@ To run the test suite, you first need to clone this repo and then install all dependencies [through Composer](https://getcomposer.org): ```bash -$ composer install +composer install ``` To run the test suite, go to the project root and run: ```bash -$ php vendor/bin/phpunit +vendor/bin/phpunit +``` + +The test suite also contains a number of functional integration tests that rely +on a stable internet connection. +If you do not want to run these, they can simply be skipped like this: + +```bash +vendor/bin/phpunit --exclude-group internet +``` + +On top of this, we use PHPStan on max level to ensure type safety across the project: + +```bash +vendor/bin/phpstan ``` ## License diff --git a/composer.json b/composer.json index d110e2a..00c2378 100644 --- a/composer.json +++ b/composer.json @@ -3,21 +3,46 @@ "description": "Event-driven readable and writable streams for non-blocking I/O in ReactPHP", "keywords": ["event-driven", "readable", "writable", "stream", "non-blocking", "io", "pipe", "ReactPHP"], "license": "MIT", + "authors": [ + { + "name": "Christian Lück", + "homepage": "https://clue.engineering/", + "email": "christian@clue.engineering" + }, + { + "name": "Cees-Jan Kiewiet", + "homepage": "https://wyrihaximus.net/", + "email": "reactphp@ceesjankiewiet.nl" + }, + { + "name": "Jan Sorgalla", + "homepage": "https://sorgalla.com/", + "email": "jsorgalla@gmail.com" + }, + { + "name": "Chris Boden", + "homepage": "https://cboden.dev/", + "email": "cboden@gmail.com" + } + ], "require": { - "php": ">=5.3.8", - "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3", + "php": ">=7.1", + "react/event-loop": "^1.2", "evenement/evenement": "^3.0 || ^2.0 || ^1.0" }, "require-dev": { - "phpunit/phpunit": "^5.0 || ^4.8.10", - "clue/stream-filter": "~1.2" - }, - "suggest": { - "react/event-loop": "^0.4" + "clue/stream-filter": "^1.2", + "phpstan/phpstan": "1.12.19 || 1.4.10", + "phpunit/phpunit": "^9.6 || ^7.5" }, "autoload": { "psr-4": { - "React\\Stream\\": "src" + "React\\Stream\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "React\\Tests\\Stream\\": "tests/" } } } diff --git a/examples/01-http.php b/examples/01-http.php new file mode 100644 index 0000000..316e7e3 --- /dev/null +++ b/examples/01-http.php @@ -0,0 +1,36 @@ +on('data', function (string $chunk): void { + echo $chunk; +}); +$stream->on('close', function (): void { + echo '[CLOSED]' . PHP_EOL; +}); + +$stream->write("GET / HTTP/1.0\r\nHost: $host\r\n\r\n"); diff --git a/examples/02-https.php b/examples/02-https.php new file mode 100644 index 0000000..8b3ad79 --- /dev/null +++ b/examples/02-https.php @@ -0,0 +1,36 @@ +on('data', function (string $chunk): void { + echo $chunk; +}); +$stream->on('close', function (): void { + echo '[CLOSED]' . PHP_EOL; +}); + +$stream->write("GET / HTTP/1.0\r\nHost: $host\r\n\r\n"); diff --git a/examples/11-cat.php b/examples/11-cat.php new file mode 100644 index 0000000..818ec8e --- /dev/null +++ b/examples/11-cat.php @@ -0,0 +1,23 @@ +pipe($stdout); diff --git a/examples/91-benchmark-throughput.php b/examples/91-benchmark-throughput.php new file mode 100644 index 0000000..3d0213c --- /dev/null +++ b/examples/91-benchmark-throughput.php @@ -0,0 +1,66 @@ +write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL); +} +$info->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL); + +// setup input and output streams and pipe inbetween +$fh = fopen($if, 'r'); +assert(is_resource($fh)); +$fo = fopen($of, 'w'); +assert(is_resource($fo)); +$in = new React\Stream\ReadableResourceStream($fh); +$out = new React\Stream\WritableResourceStream($fo); +$in->pipe($out); + +// stop input stream in $t seconds +$start = microtime(true); +$timeout = Loop::addTimer((float) $t, function () use ($in): void { + $in->close(); +}); + +// print stream position once stream closes +$in->on('close', function () use ($fh, $start, $timeout, $info): void { + $t = microtime(true) - $start; + Loop::cancelTimer($timeout); + + $bytes = ftell($fh); + + $info->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL); + $info->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL); +}); diff --git a/examples/benchmark-throughput.php b/examples/benchmark-throughput.php deleted file mode 100644 index 477e4fc..0000000 --- a/examples/benchmark-throughput.php +++ /dev/null @@ -1,46 +0,0 @@ -write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL); -} -$info->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL); - -// setup input and output streams and pipe inbetween -$fh = fopen($if, 'r'); -$in = new React\Stream\ReadableResourceStream($fh, $loop); -$out = new React\Stream\WritableResourceStream(fopen($of, 'w'), $loop); -$in->pipe($out); - -// stop input stream in $t seconds -$start = microtime(true); -$timeout = $loop->addTimer($t, function () use ($in, &$bytes) { - $in->close(); -}); - -// print stream position once stream closes -$in->on('close', function () use ($fh, $start, $loop, $timeout, $info) { - $t = microtime(true) - $start; - $loop->cancelTimer($timeout); - - $bytes = ftell($fh); - - $info->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL); - $info->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL); -}); - -$loop->run(); diff --git a/examples/cat.php b/examples/cat.php deleted file mode 100644 index 067d2f4..0000000 --- a/examples/cat.php +++ /dev/null @@ -1,15 +0,0 @@ -pipe($stdout); - -$loop->run(); diff --git a/phpstan.neon.dist b/phpstan.neon.dist new file mode 100644 index 0000000..0fe275f --- /dev/null +++ b/phpstan.neon.dist @@ -0,0 +1,7 @@ +parameters: + level: max + + paths: + - examples/ + - src/ + - tests/ diff --git a/phpunit.xml.dist b/phpunit.xml.dist index cba6d4d..ac542e7 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,25 +1,28 @@ - + + convertDeprecationsToExceptions="true"> ./tests/ - - - + + ./src/ - - + + + + + + + + + + diff --git a/phpunit.xml.legacy b/phpunit.xml.legacy new file mode 100644 index 0000000..0086860 --- /dev/null +++ b/phpunit.xml.legacy @@ -0,0 +1,26 @@ + + + + + + + ./tests/ + + + + + ./src/ + + + + + + + + + + + diff --git a/src/CompositeStream.php b/src/CompositeStream.php index 153f2a3..db6db78 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -6,8 +6,13 @@ final class CompositeStream extends EventEmitter implements DuplexStreamInterface { + /** @var ReadableStreamInterface */ private $readable; + + /** @var WritableStreamInterface */ private $writable; + + /** @var bool */ private $closed = false; public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable) @@ -16,27 +21,28 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt $this->writable = $writable; if (!$readable->isReadable() || !$writable->isWritable()) { - return $this->close(); + $this->close(); + return; } - Util::forwardEvents($this->readable, $this, array('data', 'end', 'error')); - Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe')); + Util::forwardEvents($this->readable, $this, ['data', 'end', 'error']); + Util::forwardEvents($this->writable, $this, ['drain', 'error', 'pipe']); - $this->readable->on('close', array($this, 'close')); - $this->writable->on('close', array($this, 'close')); + $this->readable->on('close', [$this, 'close']); + $this->writable->on('close', [$this, 'close']); } - public function isReadable() + public function isReadable(): bool { return $this->readable->isReadable(); } - public function pause() + public function pause(): void { $this->readable->pause(); } - public function resume() + public function resume(): void { if (!$this->writable->isWritable()) { return; @@ -45,28 +51,28 @@ public function resume() $this->readable->resume(); } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface { return Util::pipe($this, $dest, $options); } - public function isWritable() + public function isWritable(): bool { return $this->writable->isWritable(); } - public function write($data) + public function write($data): bool { return $this->writable->write($data); } - public function end($data = null) + public function end($data = null): void { $this->readable->pause(); $this->writable->end($data); } - public function close() + public function close(): void { if ($this->closed) { return; diff --git a/src/DuplexResourceStream.php b/src/DuplexResourceStream.php index 7869eda..5990906 100644 --- a/src/DuplexResourceStream.php +++ b/src/DuplexResourceStream.php @@ -3,12 +3,16 @@ namespace React\Stream; use Evenement\EventEmitter; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use InvalidArgumentException; final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface { + /** @var resource */ private $stream; + + /** @var LoopInterface */ private $loop; /** @@ -28,89 +32,102 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt * @var int */ private $bufferSize; + + /** @var WritableStreamInterface */ private $buffer; + /** @var bool */ private $readable = true; + + /** @var bool */ private $writable = true; + + /** @var bool */ private $closing = false; - public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null) + /** @var bool */ + private $listening = false; + + /** + * @param resource $stream + * @param ?LoopInterface $loop + * @param ?int $readChunkSize + * @param ?WritableStreamInterface $buffer + */ + public function __construct($stream, ?LoopInterface $loop = null, ?int $readChunkSize = null, ?WritableStreamInterface $buffer = null) { - if (!is_resource($stream) || get_resource_type($stream) !== "stream") { + if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") { throw new InvalidArgumentException('First parameter must be a valid stream resource'); } // ensure resource is opened for reading and wrting (fopen mode must contain "+") - $meta = stream_get_meta_data($stream); - if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], '+') === false) { + $meta = \stream_get_meta_data($stream); + if (\strpos($meta['mode'], '+') === false) { throw new InvalidArgumentException('Given stream resource is not opened in read and write mode'); } // this class relies on non-blocking I/O in order to not interrupt the event loop // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918 - if (stream_set_blocking($stream, 0) !== true) { + if ($buffer !== null && !$buffer instanceof WritableResourceStream && \stream_set_blocking($stream, false) !== true) { throw new \RuntimeException('Unable to set stream resource to non-blocking mode'); } // Use unbuffered read operations on the underlying stream resource. // Reading chunks from the stream may otherwise leave unread bytes in // PHP's stream buffers which some event loop implementations do not - // trigger events on (edge triggered). - // This does not affect the default event loop implementation (level - // triggered), so we can ignore platforms not supporting this (HHVM). - // Pipe streams (such as STDIN) do not seem to require this and legacy - // PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this. - if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) { - stream_set_read_buffer($stream, 0); - } + // trigger events on (edge triggered). This does not affect the default + // event loop implementation (level triggered). + \stream_set_read_buffer($stream, 0); if ($buffer === null) { $buffer = new WritableResourceStream($stream, $loop); } $this->stream = $stream; - $this->loop = $loop; - $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize; + $this->loop = $loop ?: Loop::get(); + $this->bufferSize = $readChunkSize ?? 65536; $this->buffer = $buffer; - $that = $this; - - $this->buffer->on('error', function ($error) use ($that) { - $that->emit('error', array($error)); + $this->buffer->on('error', function (\Exception $error): void { + $this->emit('error', [$error]); }); - $this->buffer->on('close', array($this, 'close')); + $this->buffer->on('close', [$this, 'close']); - $this->buffer->on('drain', function () use ($that) { - $that->emit('drain'); + $this->buffer->on('drain', function (): void { + $this->emit('drain'); }); $this->resume(); } - public function isReadable() + public function isReadable(): bool { return $this->readable; } - public function isWritable() + public function isWritable(): bool { return $this->writable; } - public function pause() + public function pause(): void { - $this->loop->removeReadStream($this->stream); + if ($this->listening) { + $this->loop->removeReadStream($this->stream); + $this->listening = false; + } } - public function resume() + public function resume(): void { - if ($this->readable) { - $this->loop->addReadStream($this->stream, array($this, 'handleData')); + if (!$this->listening && $this->readable) { + $this->loop->addReadStream($this->stream, [$this, 'handleData']); + $this->listening = true; } } - public function write($data) + public function write($data): bool { if (!$this->writable) { return false; @@ -119,7 +136,7 @@ public function write($data) return $this->buffer->write($data); } - public function close() + public function close(): void { if (!$this->writable && !$this->closing) { return; @@ -131,14 +148,16 @@ public function close() $this->writable = false; $this->emit('close'); - $this->loop->removeStream($this->stream); + $this->pause(); $this->buffer->close(); $this->removeAllListeners(); - $this->handleClose(); + if (\is_resource($this->stream)) { + \fclose($this->stream); + } } - public function end($data = null) + public function end($data = null): void { if (!$this->writable) { return; @@ -153,16 +172,19 @@ public function end($data = null) $this->buffer->end($data); } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface { return Util::pipe($this, $dest, $options); } - /** @internal */ - public function handleData($stream) + /** + * @internal + * @param resource $stream + */ + public function handleData($stream): void { $error = null; - set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { + \set_error_handler(function (int $errno, string $errstr, string $errfile, int $errline) use (&$error): bool { $error = new \ErrorException( $errstr, 0, @@ -170,52 +192,25 @@ public function handleData($stream) $errfile, $errline ); + return true; }); - $data = stream_get_contents($stream, $this->bufferSize); + $data = \stream_get_contents($stream, $this->bufferSize); - restore_error_handler(); + \restore_error_handler(); if ($error !== null) { - $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error))); + $this->emit('error', [new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)]); $this->close(); return; } if ($data !== '') { - $this->emit('data', array($data)); - } else{ + $this->emit('data', [$data]); + } elseif (\feof($this->stream)) { // no data read => we reached the end and close the stream $this->emit('end'); $this->close(); } } - - /** @internal */ - public function handleClose() - { - if (is_resource($this->stream)) { - fclose($this->stream); - } - } - - /** - * Returns whether this is a pipe resource in a legacy environment - * - * @param resource $resource - * @return bool - * - * @codeCoverageIgnore - */ - private function isLegacyPipe($resource) - { - if (PHP_VERSION_ID < 50400) { - $meta = stream_get_meta_data($resource); - - if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') { - return true; - } - } - return false; - } } diff --git a/src/ReadableResourceStream.php b/src/ReadableResourceStream.php index 6a9cd65..cf1161a 100644 --- a/src/ReadableResourceStream.php +++ b/src/ReadableResourceStream.php @@ -3,6 +3,7 @@ namespace React\Stream; use Evenement\EventEmitter; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use InvalidArgumentException; @@ -13,6 +14,7 @@ final class ReadableResourceStream extends EventEmitter implements ReadableStrea */ private $stream; + /** @var LoopInterface */ private $loop; /** @@ -35,68 +37,76 @@ final class ReadableResourceStream extends EventEmitter implements ReadableStrea */ private $bufferSize; + /** @var bool */ private $closed = false; - public function __construct($stream, LoopInterface $loop, $readChunkSize = null) + /** @var bool */ + private $listening = false; + + /** + * @param resource $stream + * @param ?LoopInterface $loop + * @param ?int $readChunkSize + */ + public function __construct($stream, ?LoopInterface $loop = null, ?int $readChunkSize = null) { - if (!is_resource($stream) || get_resource_type($stream) !== "stream") { + if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") { throw new InvalidArgumentException('First parameter must be a valid stream resource'); } // ensure resource is opened for reading (fopen mode must contain "r" or "+") - $meta = stream_get_meta_data($stream); - if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], 'r') === strpos($meta['mode'], '+')) { + $meta = \stream_get_meta_data($stream); + if (\strpos($meta['mode'], 'r') === \strpos($meta['mode'], '+')) { throw new InvalidArgumentException('Given stream resource is not opened in read mode'); } // this class relies on non-blocking I/O in order to not interrupt the event loop // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918 - if (stream_set_blocking($stream, 0) !== true) { + if (\stream_set_blocking($stream, false) !== true) { throw new \RuntimeException('Unable to set stream resource to non-blocking mode'); } // Use unbuffered read operations on the underlying stream resource. // Reading chunks from the stream may otherwise leave unread bytes in // PHP's stream buffers which some event loop implementations do not - // trigger events on (edge triggered). - // This does not affect the default event loop implementation (level - // triggered), so we can ignore platforms not supporting this (HHVM). - // Pipe streams (such as STDIN) do not seem to require this and legacy - // PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this. - if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) { - stream_set_read_buffer($stream, 0); - } + // trigger events on (edge triggered). This does not affect the default + // event loop implementation (level triggered). + \stream_set_read_buffer($stream, 0); $this->stream = $stream; - $this->loop = $loop; - $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize; + $this->loop = $loop ?: Loop::get(); + $this->bufferSize = $readChunkSize ?? 65536; $this->resume(); } - public function isReadable() + public function isReadable(): bool { return !$this->closed; } - public function pause() + public function pause(): void { - $this->loop->removeReadStream($this->stream); + if ($this->listening) { + $this->loop->removeReadStream($this->stream); + $this->listening = false; + } } - public function resume() + public function resume(): void { - if (!$this->closed) { - $this->loop->addReadStream($this->stream, array($this, 'handleData')); + if (!$this->listening && !$this->closed) { + $this->loop->addReadStream($this->stream, [$this, 'handleData']); + $this->listening = true; } } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface { return Util::pipe($this, $dest, $options); } - public function close() + public function close(): void { if ($this->closed) { return; @@ -105,17 +115,19 @@ public function close() $this->closed = true; $this->emit('close'); - $this->loop->removeStream($this->stream); + $this->pause(); $this->removeAllListeners(); - $this->handleClose(); + if (\is_resource($this->stream)) { + \fclose($this->stream); + } } /** @internal */ - public function handleData() + public function handleData(): void { $error = null; - set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { + \set_error_handler(function (int $errno, string $errstr, string $errfile, int $errline) use (&$error): bool { $error = new \ErrorException( $errstr, 0, @@ -123,52 +135,25 @@ public function handleData() $errfile, $errline ); + return true; }); - $data = stream_get_contents($this->stream, $this->bufferSize); + $data = \stream_get_contents($this->stream, $this->bufferSize); - restore_error_handler(); + \restore_error_handler(); if ($error !== null) { - $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error))); + $this->emit('error', [new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)]); $this->close(); return; } if ($data !== '') { - $this->emit('data', array($data)); - } else{ + $this->emit('data', [$data]); + } elseif (\feof($this->stream)) { // no data read => we reached the end and close the stream $this->emit('end'); $this->close(); } } - - /** @internal */ - public function handleClose() - { - if (is_resource($this->stream)) { - fclose($this->stream); - } - } - - /** - * Returns whether this is a pipe resource in a legacy environment - * - * @param resource $resource - * @return bool - * - * @codeCoverageIgnore - */ - private function isLegacyPipe($resource) - { - if (PHP_VERSION_ID < 50400) { - $meta = stream_get_meta_data($resource); - - if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') { - return true; - } - } - return false; - } } diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index 2b4c3d0..94e5915 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -17,7 +17,7 @@ * The event receives a single mixed argument for incoming data. * * ```php - * $stream->on('data', function ($data) { + * $stream->on('data', function (mixed $data): void { * echo $data; * }); * ``` @@ -47,7 +47,7 @@ * reached the end of the stream (EOF). * * ```php - * $stream->on('end', function () { + * $stream->on('end', function (): void { * echo 'END'; * }); * ``` @@ -84,7 +84,7 @@ * The event receives a single `Exception` argument for the error instance. * * ```php - * $stream->on('error', function (Exception $e) { + * $stream->on('error', function (Exception $e): void { * echo 'Error: ' . $e->getMessage() . PHP_EOL; * }); * ``` @@ -116,7 +116,7 @@ * The `close` event will be emitted once the stream closes (terminates). * * ```php - * $stream->on('close', function () { + * $stream->on('close', function (): void { * echo 'CLOSED'; * }); * ``` @@ -194,7 +194,7 @@ interface ReadableStreamInterface extends EventEmitterInterface * * @return bool */ - public function isReadable(); + public function isReadable(): bool; /** * Pauses reading incoming data events. @@ -226,7 +226,7 @@ public function isReadable(); * @see self::resume() * @return void */ - public function pause(); + public function pause(): void; /** * Resumes reading incoming data events. @@ -236,7 +236,7 @@ public function pause(); * ```php * $stream->pause(); * - * $loop->addTimer(1.0, function () use ($stream) { + * Loop::addTimer(1.0, function () use ($stream): void { * $stream->resume(); * }); * ``` @@ -247,7 +247,7 @@ public function pause(); * @see self::pause() * @return void */ - public function resume(); + public function resume(): void; /** * Pipes all the data from this readable source into the given writable destination. @@ -278,7 +278,7 @@ public function resume(); * source stream emits an `end` event. This can be disabled like this: * * ```php - * $source->pipe($dest, array('end' => false)); + * $source->pipe($dest, ['end' => false]); * ``` * * Note that this only applies to the `end` event. @@ -287,7 +287,7 @@ public function resume(); * * ```php * $source->pipe($dest); - * $source->on('close', function () use ($dest) { + * $source->on('close', function () use ($dest): void { * $dest->end('BYE!'); * }); * ``` @@ -319,10 +319,10 @@ public function resume(); * a `pipe` event with this source stream an event argument. * * @param WritableStreamInterface $dest - * @param array $options + * @param array{end?:bool} $options * @return WritableStreamInterface $dest stream as-is */ - public function pipe(WritableStreamInterface $dest, array $options = array()); + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface; /** * Closes the stream (forcefully). @@ -358,5 +358,5 @@ public function pipe(WritableStreamInterface $dest, array $options = array()); * @return void * @see WritableStreamInterface::close() */ - public function close(); + public function close(): void; } diff --git a/src/ThroughStream.php b/src/ThroughStream.php index da2fbb0..a8e4837 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -3,7 +3,6 @@ namespace React\Stream; use Evenement\EventEmitter; -use InvalidArgumentException; /** * The `ThroughStream` implements the @@ -43,19 +42,19 @@ * a newline-delimited JSON (NDJSON) stream like this: * * ```php - * $through = new ThroughStream(function ($data) { + * $through = new ThroughStream(function (mixed $data): string { * return json_encode($data) . PHP_EOL; * }); * $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); * - * $through->write(array(2, true)); + * $through->write([2, true]); * ``` * * The callback function is allowed to throw an `Exception`. In this case, * the stream will emit an `error` event and then [`close()`](#close-1) the stream. * * ```php - * $through = new ThroughStream(function ($data) { + * $through = new ThroughStream(function (mixed $data): string { * if (!is_string($data)) { * throw new \UnexpectedValueException('Only strings allowed'); * } @@ -75,52 +74,65 @@ */ final class ThroughStream extends EventEmitter implements DuplexStreamInterface { + /** @var bool */ private $readable = true; + + /** @var bool */ private $writable = true; + + /** @var bool */ private $closed = false; + + /** @var bool */ private $paused = false; + + /** @var bool */ private $drain = false; + + /** @var ?callable */ private $callback; - public function __construct($callback = null) + /** + * @param ?callable $callback + */ + public function __construct(?callable $callback = null) { - if ($callback !== null && !is_callable($callback)) { - throw new InvalidArgumentException('Invalid transformation callback given'); - } - $this->callback = $callback; } - public function pause() + public function pause(): void { - $this->paused = true; + // only allow pause if still readable, false otherwise + $this->paused = $this->readable; } - public function resume() + public function resume(): void { + $this->paused = false; + + // emit drain event if previous write was paused (throttled) if ($this->drain) { $this->drain = false; $this->emit('drain'); } - $this->paused = false; } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface { return Util::pipe($this, $dest, $options); } - public function isReadable() + public function isReadable(): bool { return $this->readable; } - public function isWritable() + public function isWritable(): bool { return $this->writable; } - public function write($data) + public function write($data): bool { if (!$this->writable) { return false; @@ -128,26 +140,28 @@ public function write($data) if ($this->callback !== null) { try { - $data = call_user_func($this->callback, $data); + $data = \call_user_func($this->callback, $data); } catch (\Exception $e) { - $this->emit('error', array($e)); + $this->emit('error', [$e]); $this->close(); return false; } } - $this->emit('data', array($data)); + $this->emit('data', [$data]); + // emit drain event on next resume if currently paused (throttled) if ($this->paused) { $this->drain = true; - return false; } - return true; + // continue writing if still writable and not paused (throttled), false otherwise + // @phpstan-ignore-next-line (may be false when write() causes stream to close) + return $this->writable && !$this->paused; } - public function end($data = null) + public function end($data = null): void { if (!$this->writable) { return; @@ -157,6 +171,7 @@ public function end($data = null) $this->write($data); // return if write() already caused the stream to close + // @phpstan-ignore-next-line (may be false when write() causes stream to close) if (!$this->writable) { return; } @@ -164,14 +179,14 @@ public function end($data = null) $this->readable = false; $this->writable = false; - $this->paused = true; + $this->paused = false; $this->drain = false; $this->emit('end'); $this->close(); } - public function close() + public function close(): void { if ($this->closed) { return; @@ -179,9 +194,10 @@ public function close() $this->readable = false; $this->writable = false; - $this->closed = true; - $this->paused = true; + $this->paused = false; $this->drain = false; + + $this->closed = true; $this->callback = null; $this->emit('close'); diff --git a/src/Util.php b/src/Util.php index 14ddcfc..c6c038d 100644 --- a/src/Util.php +++ b/src/Util.php @@ -9,11 +9,11 @@ final class Util * * @param ReadableStreamInterface $source * @param WritableStreamInterface $dest - * @param array $options + * @param array{end?:bool} $options * @return WritableStreamInterface $dest stream as-is * @see ReadableStreamInterface::pipe() for more details */ - public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array()) + public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = []): WritableStreamInterface { // source not readable => NO-OP if (!$source->isReadable()) { @@ -27,36 +27,36 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter return $dest; } - $dest->emit('pipe', array($source)); + $dest->emit('pipe', [$source]); // forward all source data events as $dest->write() - $source->on('data', $dataer = function ($data) use ($source, $dest) { + $source->on('data', $dataer = function ($data) use ($source, $dest): void { $feedMore = $dest->write($data); if (false === $feedMore) { $source->pause(); } }); - $dest->on('close', function () use ($source, $dataer) { + $dest->on('close', function () use ($source, $dataer): void { $source->removeListener('data', $dataer); $source->pause(); }); // forward destination drain as $source->resume() - $dest->on('drain', $drainer = function () use ($source) { + $dest->on('drain', $drainer = function () use ($source): void { $source->resume(); }); - $source->on('close', function () use ($dest, $drainer) { + $source->on('close', function () use ($dest, $drainer): void { $dest->removeListener('drain', $drainer); }); // forward end event from source as $dest->end() $end = isset($options['end']) ? $options['end'] : true; if ($end) { - $source->on('end', $ender = function () use ($dest) { + $source->on('end', $ender = function () use ($dest): void { $dest->end(); }); - $dest->on('close', function () use ($source, $ender) { + $dest->on('close', function () use ($source, $ender): void { $source->removeListener('end', $ender); }); } @@ -64,11 +64,17 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter return $dest; } - public static function forwardEvents($source, $target, array $events) + /** + * @param ReadableStreamInterface|WritableStreamInterface $source + * @param ReadableStreamInterface|WritableStreamInterface $target + * @param string[] $events + * @return void + */ + public static function forwardEvents($source, $target, array $events): void { foreach ($events as $event) { - $source->on($event, function () use ($event, $target) { - $target->emit($event, func_get_args()); + $source->on($event, function () use ($event, $target): void { + $target->emit($event, \func_get_args()); }); } } diff --git a/src/WritableResourceStream.php b/src/WritableResourceStream.php index f43f95c..68b2721 100644 --- a/src/WritableResourceStream.php +++ b/src/WritableResourceStream.php @@ -3,49 +3,71 @@ namespace React\Stream; use Evenement\EventEmitter; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; final class WritableResourceStream extends EventEmitter implements WritableStreamInterface { + /** @var resource */ private $stream; + + /** @var LoopInterface */ private $loop; + + /** @var int */ private $softLimit; + + /** @var int */ private $writeChunkSize; + /** @var bool */ private $listening = false; + + /** @var bool */ private $writable = true; + + /** @var bool */ private $closed = false; + + /** @var string */ private $data = ''; - public function __construct($stream, LoopInterface $loop, $writeBufferSoftLimit = null, $writeChunkSize = null) + /** + * @param resource $stream + * @param ?LoopInterface $loop + * @param ?int $writeBufferSoftLimit + * @param ?int $writeChunkSize + */ + public function __construct($stream, ?LoopInterface $loop = null, ?int $writeBufferSoftLimit = null, ?int $writeChunkSize = null) { - if (!is_resource($stream) || get_resource_type($stream) !== "stream") { + if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") { throw new \InvalidArgumentException('First parameter must be a valid stream resource'); } - $meta = stream_get_meta_data($stream); - if (isset($meta['mode']) && str_replace(array('b', 't'), '', $meta['mode']) === 'r') { + // ensure resource is opened for writing (fopen mode must contain either of "waxc+") + $meta = \stream_get_meta_data($stream); + if (\strtr($meta['mode'], 'waxc+', '.....') === $meta['mode']) { throw new \InvalidArgumentException('Given stream resource is not opened in write mode'); } // this class relies on non-blocking I/O in order to not interrupt the event loop // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918 - if (stream_set_blocking($stream, 0) !== true) { + if (\stream_set_blocking($stream, false) !== true) { throw new \RuntimeException('Unable to set stream resource to non-blocking mode'); } $this->stream = $stream; - $this->loop = $loop; - $this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit; - $this->writeChunkSize = ($writeChunkSize === null) ? -1 : (int)$writeChunkSize; + $this->loop = $loop ?: Loop::get(); + $this->softLimit = $writeBufferSoftLimit ?? 65536; + $this->writeChunkSize = $writeChunkSize ?? -1; } - public function isWritable() + public function isWritable(): bool { return $this->writable; } - public function write($data) + public function write($data): bool { if (!$this->writable) { return false; @@ -56,13 +78,13 @@ public function write($data) if (!$this->listening && $this->data !== '') { $this->listening = true; - $this->loop->addWriteStream($this->stream, array($this, 'handleWrite')); + $this->loop->addWriteStream($this->stream, [$this, 'handleWrite']); } return !isset($this->data[$this->softLimit - 1]); } - public function end($data = null) + public function end($data = null): void { if (null !== $data) { $this->write($data); @@ -77,7 +99,7 @@ public function end($data = null) } } - public function close() + public function close(): void { if ($this->closed) { return; @@ -95,58 +117,46 @@ public function close() $this->emit('close'); $this->removeAllListeners(); - if (is_resource($this->stream)) { - fclose($this->stream); + if (\is_resource($this->stream)) { + \fclose($this->stream); } } /** @internal */ - public function handleWrite() + public function handleWrite(): void { $error = null; - set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { - $error = array( - 'message' => $errstr, - 'number' => $errno, - 'file' => $errfile, - 'line' => $errline - ); + \set_error_handler(function (int $_, string $errstr) use (&$error): bool { + $error = $errstr; + return true; }); if ($this->writeChunkSize === -1) { - $sent = fwrite($this->stream, $this->data); + $sent = \fwrite($this->stream, $this->data); } else { - $sent = fwrite($this->stream, $this->data, $this->writeChunkSize); + \assert($this->writeChunkSize >= -1); + $sent = \fwrite($this->stream, $this->data, $this->writeChunkSize); } - restore_error_handler(); + \restore_error_handler(); - // Only report errors if *nothing* could be sent. + // Only report errors if *nothing* could be sent and an error has been raised. + // Ignore non-fatal warnings if *some* data could be sent. // Any hard (permanent) error will fail to send any data at all. // Sending excessive amounts of data will only flush *some* data and then // report a temporary error (EAGAIN) which we do not raise here in order // to keep the stream open for further tries to write. // Should this turn out to be a permanent error later, it will eventually // send *nothing* and we can detect this. - if ($sent === 0 || $sent === false) { - if ($error !== null) { - $error = new \ErrorException( - $error['message'], - 0, - $error['number'], - $error['file'], - $error['line'] - ); - } - - $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . ($error !== null ? $error->getMessage() : 'Unknown error'), 0, $error))); + if (($sent === 0 || $sent === false) && $error !== null) { + $this->emit('error', [new \RuntimeException('Unable to write to stream: ' . $error)]); $this->close(); return; } $exceeded = isset($this->data[$this->softLimit - 1]); - $this->data = (string) substr($this->data, $sent); + $this->data = (string) \substr($this->data, (int) $sent); // buffer has been above limit and is now below limit if ($exceeded && !isset($this->data[$this->softLimit - 1])) { diff --git a/src/WritableStreamInterface.php b/src/WritableStreamInterface.php index 3bc932e..d48536d 100644 --- a/src/WritableStreamInterface.php +++ b/src/WritableStreamInterface.php @@ -16,7 +16,7 @@ * previously and is now ready to accept more data. * * ```php - * $stream->on('drain', function () use ($stream) { + * $stream->on('drain', function () use ($stream): void { * echo 'Stream is now ready to accept more data'; * }); * ``` @@ -37,11 +37,11 @@ * source stream. * * ```php - * $stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) { + * $stream->on('pipe', function (ReadableStreamInterface $source) use ($stream): void { * echo 'Now receiving piped data'; * * // explicitly close target if source emits an error - * $source->on('error', function () use ($stream) { + * $source->on('error', function () use ($stream): void { * $stream->close(); * }); * }); @@ -64,7 +64,7 @@ * The event receives a single `Exception` argument for the error instance. * * ```php - * $stream->on('error', function (Exception $e) { + * $stream->on('error', function (Exception $e): void { * echo 'Error: ' . $e->getMessage() . PHP_EOL; * }); * ``` @@ -93,7 +93,7 @@ * The `close` event will be emitted once the stream closes (terminates). * * ```php - * $stream->on('close', function () { + * $stream->on('close', function (): void { * echo 'CLOSED'; * }); * ``` @@ -170,7 +170,7 @@ interface WritableStreamInterface extends EventEmitterInterface * * @return bool */ - public function isWritable(); + public function isWritable(): bool; /** * Write some data into the stream. @@ -196,7 +196,7 @@ public function isWritable(); * The stream SHOULD send a `drain` event once the buffer is ready to accept * more data. * - * Similarly, if the the stream is not writable (already in a closed state) + * Similarly, if the stream is not writable (already in a closed state) * it MUST NOT process the given `$data` and SHOULD return `false`, * indicating that the caller should stop sending data. * @@ -219,7 +219,7 @@ public function isWritable(); * @param mixed|string $data * @return bool */ - public function write($data); + public function write($data): bool; /** * Successfully ends the stream (after optionally sending some final data). @@ -292,7 +292,7 @@ public function write($data); * @param mixed|string|null $data * @return void */ - public function end($data = null); + public function end($data = null): void; /** * Closes the stream (forcefully). @@ -330,7 +330,7 @@ public function end($data = null); * * ```php * $stream->end(); - * $loop->addTimer(1.0, function () use ($stream) { + * Loop::addTimer(1.0, function () use ($stream): void { * $stream->close(); * }); * ``` @@ -343,5 +343,5 @@ public function end($data = null); * @return void * @see ReadableStreamInterface::close() */ - public function close(); + public function close(): void; } diff --git a/tests/CallableStub.php b/tests/CallableStub.php deleted file mode 100644 index 31cc834..0000000 --- a/tests/CallableStub.php +++ /dev/null @@ -1,10 +0,0 @@ -getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') @@ -21,12 +23,14 @@ public function itShouldCloseReadableIfNotWritable() $readable ->expects($this->once()) ->method('close'); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->once()) ->method('isWritable') ->willReturn(false); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); @@ -35,18 +39,20 @@ public function itShouldCloseReadableIfNotWritable() } /** @test */ - public function itShouldCloseWritableIfNotReadable() + public function itShouldCloseWritableIfNotReadable(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') ->willReturn(false); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->once()) ->method('close'); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); @@ -55,15 +61,16 @@ public function itShouldCloseWritableIfNotReadable() } /** @test */ - public function itShouldForwardWritableCallsToWritableStream() + public function itShouldForwardWritableCallsToWritableStream(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') ->willReturn(true); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->once()) ->method('write') @@ -72,6 +79,7 @@ public function itShouldForwardWritableCallsToWritableStream() ->expects($this->exactly(2)) ->method('isWritable') ->willReturn(true); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); $composite->write('foo'); @@ -79,9 +87,9 @@ public function itShouldForwardWritableCallsToWritableStream() } /** @test */ - public function itShouldForwardReadableCallsToReadableStream() + public function itShouldForwardReadableCallsToReadableStream(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->exactly(2)) ->method('isReadable') @@ -92,12 +100,14 @@ public function itShouldForwardReadableCallsToReadableStream() $readable ->expects($this->once()) ->method('resume'); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->any()) ->method('isWritable') ->willReturn(true); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); $composite->isReadable(); @@ -106,9 +116,9 @@ public function itShouldForwardReadableCallsToReadableStream() } /** @test */ - public function itShouldNotForwardResumeIfStreamIsNotWritable() + public function itShouldNotForwardResumeIfStreamIsNotWritable(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') @@ -116,27 +126,30 @@ public function itShouldNotForwardResumeIfStreamIsNotWritable() $readable ->expects($this->never()) ->method('resume'); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->exactly(2)) ->method('isWritable') ->willReturnOnConsecutiveCalls(true, false); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); $composite->resume(); } /** @test */ - public function endShouldDelegateToWritableWithData() + public function endShouldDelegateToWritableWithData(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') ->willReturn(true); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->once()) ->method('isWritable') @@ -145,15 +158,16 @@ public function endShouldDelegateToWritableWithData() ->expects($this->once()) ->method('end') ->with('foo'); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); $composite->end('foo'); } /** @test */ - public function closeShouldCloseBothStreams() + public function closeShouldCloseBothStreams(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') @@ -161,8 +175,9 @@ public function closeShouldCloseBothStreams() $readable ->expects($this->once()) ->method('close'); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->once()) ->method('isWritable') @@ -170,13 +185,14 @@ public function closeShouldCloseBothStreams() $writable ->expects($this->once()) ->method('close'); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); $composite->close(); } /** @test */ - public function itShouldForwardCloseOnlyOnce() + public function itShouldForwardCloseOnlyOnce(): void { $readable = new ThroughStream(); $writable = new ThroughStream(); @@ -189,7 +205,7 @@ public function itShouldForwardCloseOnlyOnce() } /** @test */ - public function itShouldForwardCloseAndRemoveAllListeners() + public function itShouldForwardCloseAndRemoveAllListeners(): void { $in = new ThroughStream(); @@ -208,7 +224,7 @@ public function itShouldForwardCloseAndRemoveAllListeners() } /** @test */ - public function itShouldReceiveForwardedEvents() + public function itShouldReceiveForwardedEvents(): void { $readable = new ThroughStream(); $writable = new ThroughStream(); @@ -217,51 +233,55 @@ public function itShouldReceiveForwardedEvents() $composite->on('data', $this->expectCallableOnce()); $composite->on('drain', $this->expectCallableOnce()); - $readable->emit('data', array('foo')); + $readable->emit('data', ['foo']); $writable->emit('drain'); } /** @test */ - public function itShouldHandlePipingCorrectly() + public function itShouldHandlePipingCorrectly(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->once()) ->method('isReadable') ->willReturn(true); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable->expects($this->any())->method('isWritable')->willReturn(True); $writable ->expects($this->once()) ->method('write') ->with('foo'); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); $input = new ThroughStream(); $input->pipe($composite); - $input->emit('data', array('foo')); + $input->emit('data', ['foo']); } /** @test */ - public function itShouldForwardPipeCallsToReadableStream() + public function itShouldForwardPipeCallsToReadableStream(): void { $readable = new ThroughStream(); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable->expects($this->any())->method('isWritable')->willReturn(True); + assert($writable instanceof WritableStreamInterface); $composite = new CompositeStream($readable, $writable); - $output = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $output = $this->createMock(WritableStreamInterface::class); $output->expects($this->any())->method('isWritable')->willReturn(True); $output ->expects($this->once()) ->method('write') ->with('foo'); + assert($output instanceof WritableStreamInterface); $composite->pipe($output); - $readable->emit('data', array('foo')); + $readable->emit('data', ['foo']); } } diff --git a/tests/DuplexResourceStreamIntegrationTest.php b/tests/DuplexResourceStreamIntegrationTest.php index f7f2860..2c3d2ed 100644 --- a/tests/DuplexResourceStreamIntegrationTest.php +++ b/tests/DuplexResourceStreamIntegrationTest.php @@ -3,33 +3,48 @@ namespace React\Tests\Stream; use React\Stream\DuplexResourceStream; -use React\EventLoop as rel; use React\Stream\ReadableResourceStream; +use React\EventLoop\ExtEventLoop; +use React\EventLoop\LoopInterface; +use React\EventLoop\StreamSelectLoop; +use function Clue\StreamFilter\append as filter_append; class DuplexResourceStreamIntegrationTest extends TestCase { - public function loopProvider() + public function loopProvider(): \Generator { - return array( - array(function() { return true; }, function() { return new rel\StreamSelectLoop; }), - array(function() { return function_exists('event_base_new'); }, function() { return new rel\LibEventLoop; }), - array(function() { return class_exists('libev\EventLoop'); }, function() { return new rel\LibEvLoop; }), - array(function() { return class_exists('EventBase'); }, function() { return new rel\ExtEventLoop; }) - ); + yield [ + function() { + return true; + }, + function () { + return new StreamSelectLoop(); + } + ]; + yield [ + function () { + return class_exists('EventBase'); + }, + function () { + return new ExtEventLoop(); + } + ]; } /** * @dataProvider loopProvider */ - public function testBufferReadsLargeChunks($condition, $loopFactory) + public function testBufferReadsLargeChunks(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + assert(is_array($pair)); + [$sockA, $sockB] = $pair; $bufferSize = 4096; $streamA = new DuplexResourceStream($sockA, $loop, $bufferSize); @@ -44,9 +59,9 @@ public function testBufferReadsLargeChunks($condition, $loopFactory) $streamA->write($testString); - $loop->tick(); - $loop->tick(); - $loop->tick(); + $this->loopTick($loop); + $this->loopTick($loop); + $this->loopTick($loop); $streamA->close(); $streamB->close(); @@ -57,15 +72,17 @@ public function testBufferReadsLargeChunks($condition, $loopFactory) /** * @dataProvider loopProvider */ - public function testWriteLargeChunk($condition, $loopFactory) + public function testWriteLargeChunk(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + assert(is_array($pair)); + [$sockA, $sockB] = $pair; $streamA = new DuplexResourceStream($sockA, $loop); $streamB = new DuplexResourceStream($sockB, $loop); @@ -97,15 +114,17 @@ public function testWriteLargeChunk($condition, $loopFactory) /** * @dataProvider loopProvider */ - public function testDoesNotEmitDataIfNothingHasBeenWritten($condition, $loopFactory) + public function testDoesNotEmitDataIfNothingHasBeenWritten(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + assert(is_array($pair)); + [$sockA, $sockB] = $pair; $streamA = new DuplexResourceStream($sockA, $loop); $streamB = new DuplexResourceStream($sockB, $loop); @@ -125,15 +144,17 @@ public function testDoesNotEmitDataIfNothingHasBeenWritten($condition, $loopFact /** * @dataProvider loopProvider */ - public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed($condition, $loopFactory) + public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + assert(is_array($pair)); + [$sockA, $sockB] = $pair; $streamA = new DuplexResourceStream($sockA, $loop); $streamB = new DuplexResourceStream($sockB, $loop); @@ -155,18 +176,22 @@ public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed($condition /** * @dataProvider loopProvider */ - public function testDoesNotWriteDataIfServerSideHasBeenClosed($condition, $loopFactory) + public function testDoesNotWriteDataIfServerSideHasBeenClosed(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); $server = stream_socket_server('tcp://127.0.0.1:0'); + assert(is_resource($server)); + + $client = stream_socket_client((string) stream_socket_get_name($server, false)); + assert(is_resource($client)); - $client = stream_socket_client(stream_socket_get_name($server, false)); $peer = stream_socket_accept($server); + assert(is_resource($peer)); $streamA = new DuplexResourceStream($client, $loop); $streamB = new DuplexResourceStream($peer, $loop); @@ -188,18 +213,22 @@ public function testDoesNotWriteDataIfServerSideHasBeenClosed($condition, $loopF /** * @dataProvider loopProvider */ - public function testDoesNotWriteDataIfClientSideHasBeenClosed($condition, $loopFactory) + public function testDoesNotWriteDataIfClientSideHasBeenClosed(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); $server = stream_socket_server('tcp://127.0.0.1:0'); + assert(is_resource($server)); + + $client = stream_socket_client((string) stream_socket_get_name($server, false)); + assert(is_resource($client)); - $client = stream_socket_client(stream_socket_get_name($server, false)); $peer = stream_socket_accept($server); + assert(is_resource($peer)); $streamA = new DuplexResourceStream($peer, $loop); $streamB = new DuplexResourceStream($client, $loop); @@ -221,15 +250,18 @@ public function testDoesNotWriteDataIfClientSideHasBeenClosed($condition, $loopF /** * @dataProvider loopProvider */ - public function testReadsSingleChunkFromProcessPipe($condition, $loopFactory) + public function testReadsSingleChunkFromProcessPipe(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - $stream = new ReadableResourceStream(popen('echo test', 'r'), $loop); + $fh = popen('echo test', 'r'); + assert(is_resource($fh)); + + $stream = new ReadableResourceStream($fh, $loop); $stream->on('data', $this->expectCallableOnceWith("test\n")); $stream->on('end', $this->expectCallableOnce()); $stream->on('error', $this->expectCallableNever()); @@ -240,15 +272,18 @@ public function testReadsSingleChunkFromProcessPipe($condition, $loopFactory) /** * @dataProvider loopProvider */ - public function testReadsMultipleChunksFromProcessPipe($condition, $loopFactory) + public function testReadsMultipleChunksFromProcessPipe(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - $stream = new ReadableResourceStream(popen('echo a;sleep 0.1;echo b;sleep 0.1;echo c', 'r'), $loop); + $fh = popen('echo a;sleep 0.1;echo b;sleep 0.1;echo c', 'r'); + assert(is_resource($fh)); + + $stream = new ReadableResourceStream($fh, $loop); $buffer = ''; $stream->on('data', function ($chunk) use (&$buffer) { @@ -266,15 +301,18 @@ public function testReadsMultipleChunksFromProcessPipe($condition, $loopFactory) /** * @dataProvider loopProvider */ - public function testReadsLongChunksFromProcessPipe($condition, $loopFactory) + public function testReadsLongChunksFromProcessPipe(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - $stream = new ReadableResourceStream(popen('dd if=/dev/zero bs=12345 count=1234 2>&-', 'r'), $loop); + $fh = popen('dd if=/dev/zero bs=12345 count=1234 2>&-', 'r'); + assert(is_resource($fh)); + + $stream = new ReadableResourceStream($fh, $loop); $bytes = 0; $stream->on('data', function ($chunk) use (&$bytes) { @@ -292,19 +330,71 @@ public function testReadsLongChunksFromProcessPipe($condition, $loopFactory) /** * @dataProvider loopProvider */ - public function testReadsNothingFromProcessPipeWithNoOutput($condition, $loopFactory) + public function testReadsNothingFromProcessPipeWithNoOutput(callable $condition, callable $loopFactory): void { if (true !== $condition()) { - return $this->markTestSkipped('Loop implementation not available'); + $this->markTestSkipped('Loop implementation not available'); } $loop = $loopFactory(); - $stream = new ReadableResourceStream(popen('true', 'r'), $loop); + $fh = popen('true', 'r'); + assert(is_resource($fh)); + + $stream = new ReadableResourceStream($fh, $loop); $stream->on('data', $this->expectCallableNever()); $stream->on('end', $this->expectCallableOnce()); $stream->on('error', $this->expectCallableNever()); $loop->run(); } + + /** + * @covers React\Stream\ReadableResourceStream::handleData + * @dataProvider loopProvider + */ + public function testEmptyReadShouldntFcloseStream(callable $condition, callable $loopFactory): void + { + if (true !== $condition()) { + $this->markTestSkipped('Loop implementation not available'); + } + + $server = stream_socket_server('tcp://127.0.0.1:0'); + assert(is_resource($server)); + + $client = stream_socket_client((string) stream_socket_get_name($server, false)); + assert(is_resource($client)); + + $stream = stream_socket_accept($server); + assert(is_resource($stream)); + + + // add a filter which returns an error when encountering an 'a' when reading + filter_append($stream, function ($chunk) { + return ''; + }, STREAM_FILTER_READ); + + $loop = $loopFactory(); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->on('error', $this->expectCallableNever()); + $conn->on('data', $this->expectCallableNever()); + $conn->on('end', $this->expectCallableNever()); + + fwrite($client, "foobar\n"); + + $conn->handleData($stream); + + fclose($stream); + fclose($client); + fclose($server); + } + + private function loopTick(LoopInterface $loop): void + { + $loop->addTimer(0, function () use ($loop) { + $loop->stop(); + }); + $loop->run(); + } } diff --git a/tests/DuplexResourceStreamTest.php b/tests/DuplexResourceStreamTest.php index 9ea43f9..c1076a6 100644 --- a/tests/DuplexResourceStreamTest.php +++ b/tests/DuplexResourceStreamTest.php @@ -2,81 +2,159 @@ namespace React\Tests\Stream; +use PHPUnit\Framework\MockObject\MockObject; +use React\EventLoop\LoopInterface; use React\Stream\DuplexResourceStream; -use Clue\StreamFilter as Filter; use React\Stream\WritableResourceStream; +use React\Stream\WritableStreamInterface; +use function Clue\StreamFilter\append as filter_append; class DuplexResourceStreamTest extends TestCase { /** * @covers React\Stream\DuplexResourceStream::__construct + * @doesNotPerformAssertions */ - public function testConstructor() + public function testConstructor(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $conn = new DuplexResourceStream($stream, $loop); + new DuplexResourceStream($stream, $loop); + } + + public function testConstructWithoutLoopAssignsLoopAutomatically(): void + { + $resource = fopen('php://temp', 'r+'); + assert(is_resource($resource)); + + $stream = new DuplexResourceStream($resource); + + $ref = new \ReflectionProperty($stream, 'loop'); + $ref->setAccessible(true); + $loop = $ref->getValue($stream); + + $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); } /** * @covers React\Stream\DuplexResourceStream::__construct + * @doesNotPerformAssertions */ - public function testConstructorThrowsExceptionOnInvalidStream() + public function testConstructorWithExcessiveMode(): void { - $loop = $this->createLoopMock(); + // excessive flags are ignored for temp streams, so we have to use a file stream + $name = (string) tempnam(sys_get_temp_dir(), 'test'); + $stream = fopen($name, 'r+eANYTHING'); + assert(is_resource($stream)); + unlink($name); - $this->setExpectedException('InvalidArgumentException'); - new DuplexResourceStream('breakme', $loop); + $loop = $this->createLoopMock(); + $buffer = new DuplexResourceStream($stream, $loop); + $buffer->close(); } /** * @covers React\Stream\DuplexResourceStream::__construct */ - public function testConstructorThrowsExceptionOnWriteOnlyStream() + public function testConstructorThrowsExceptionOnInvalidStream(): void { - if (defined('HHVM_VERSION')) { - $this->markTestSkipped('HHVM does not report fopen mode for STDOUT'); - } + $loop = $this->createLoopMock(); + + $this->expectException(\InvalidArgumentException::class); + new DuplexResourceStream('breakme', $loop); // @phpstan-ignore-line + } + /** + * @covers React\Stream\DuplexResourceStream::__construct + */ + public function testConstructorThrowsExceptionOnWriteOnlyStream(): void + { $loop = $this->createLoopMock(); - $this->setExpectedException('InvalidArgumentException'); + $this->expectException(\InvalidArgumentException::class); new DuplexResourceStream(STDOUT, $loop); } /** * @covers React\Stream\DuplexResourceStream::__construct */ - public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking() + public function testConstructorThrowsExceptionOnWriteOnlyStreamWithExcessiveMode(): void + { + // excessive flags are ignored for temp streams, so we have to use a file stream + $name = (string) tempnam(sys_get_temp_dir(), 'test'); + $stream = fopen($name, 'weANYTHING'); + assert(is_resource($stream)); + unlink($name); + + $loop = $this->createLoopMock(); + $this->expectException(\InvalidArgumentException::class); + new DuplexResourceStream($stream, $loop); + } + + /** + * @covers React\Stream\DuplexResourceStream::__construct + */ + public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking(): void { if (!in_array('blocking', stream_get_wrappers())) { stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); } $stream = fopen('blocking://test', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $this->setExpectedException('RuntimeException'); + $this->expectException(\RuntimeException::class); new DuplexResourceStream($stream, $loop); } /** * @covers React\Stream\DuplexResourceStream::__construct + * @doesNotPerformAssertions */ - public function testConstructorAcceptsBuffer() + public function testConstructorAcceptsBuffer(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + + $buffer = $this->createMock(WritableStreamInterface::class); + assert($buffer instanceof WritableStreamInterface); + + new DuplexResourceStream($stream, $loop, null, $buffer); + } + + /** + * @covers React\Stream\DuplexResourceStream::__construct + */ + public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlockingWithBufferGiven(): void + { + if (!in_array('blocking', stream_get_wrappers())) { + stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); + } + + $stream = fopen('blocking://test', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + assert($buffer instanceof WritableStreamInterface); - $conn = new DuplexResourceStream($stream, $loop, null, $buffer); + $this->expectException(\RuntimeException::class); + new DuplexResourceStream($stream, $loop, null, $buffer); } - public function testCloseShouldEmitCloseEvent() + public function testCloseShouldEmitCloseEvent(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new DuplexResourceStream($stream, $loop); @@ -88,26 +166,32 @@ public function testCloseShouldEmitCloseEvent() $this->assertFalse($conn->isReadable()); } - public function testEndShouldEndBuffer() + public function testEndShouldEndBuffer(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $buffer = $this->createMock(WritableStreamInterface::class); $buffer->expects($this->once())->method('end')->with('foo'); + assert($buffer instanceof WritableStreamInterface); $conn = new DuplexResourceStream($stream, $loop, null, $buffer); $conn->end('foo'); } - public function testEndAfterCloseIsNoOp() + public function testEndAfterCloseIsNoOp(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $buffer = $this->createMock(WritableStreamInterface::class); $buffer->expects($this->never())->method('end'); + assert($buffer instanceof WritableStreamInterface); $conn = new DuplexResourceStream($stream, $loop); $conn->close(); @@ -118,9 +202,11 @@ public function testEndAfterCloseIsNoOp() * @covers React\Stream\DuplexResourceStream::__construct * @covers React\Stream\DuplexResourceStream::handleData */ - public function testDataEvent() + public function testDataEvent(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $capturedData = null; @@ -141,9 +227,11 @@ public function testDataEvent() * @covers React\Stream\DuplexResourceStream::__construct * @covers React\Stream\DuplexResourceStream::handleData */ - public function testDataEventDoesEmitOneChunkMatchingBufferSize() + public function testDataEventDoesEmitOneChunkMatchingBufferSize(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $capturedData = null; @@ -166,9 +254,11 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize() * @covers React\Stream\DuplexResourceStream::__construct * @covers React\Stream\DuplexResourceStream::handleData */ - public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite() + public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $capturedData = null; @@ -191,9 +281,11 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi /** * @covers React\Stream\DuplexResourceStream::handleData */ - public function testEmptyStreamShouldNotEmitData() + public function testEmptyStreamShouldNotEmitData(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new DuplexResourceStream($stream, $loop); @@ -205,9 +297,11 @@ public function testEmptyStreamShouldNotEmitData() /** * @covers React\Stream\DuplexResourceStream::write */ - public function testWrite() + public function testWrite(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createWriteableLoopMock(); $conn = new DuplexResourceStream($stream, $loop); @@ -222,9 +316,11 @@ public function testWrite() * @covers React\Stream\DuplexResourceStream::isReadable * @covers React\Stream\DuplexResourceStream::isWritable */ - public function testEnd() + public function testEnd(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new DuplexResourceStream($stream, $loop); @@ -238,20 +334,107 @@ public function testEnd() /** * @covers React\Stream\DuplexResourceStream::end */ - public function testEndRemovesReadStreamFromLoop() + public function testEndRemovesReadStreamFromLoop(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $loop->expects($this->once())->method('removeReadStream'); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); $conn = new DuplexResourceStream($stream, $loop); $conn->end('bye'); } - public function testEndedStreamsShouldNotWrite() + /** + * @covers React\Stream\DuplexResourceStream::pause + */ + public function testPauseRemovesReadStreamFromLoop(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->pause(); + $conn->pause(); + } + + /** + * @covers React\Stream\DuplexResourceStream::pause + */ + public function testResumeDoesAddStreamToLoopOnlyOnce(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->resume(); + $conn->resume(); + } + + /** + * @covers React\Stream\DuplexResourceStream::close + */ + public function testCloseRemovesReadStreamFromLoop(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->close(); + } + + /** + * @covers React\Stream\DuplexResourceStream::close + */ + public function testCloseAfterPauseRemovesReadStreamFromLoopOnlyOnce(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->pause(); + $conn->close(); + } + + /** + * @covers React\Stream\DuplexResourceStream::close + */ + public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce(): void { - $file = tempnam(sys_get_temp_dir(), 'reactphptest_'); + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->close(); + $conn->resume(); + } + + public function testEndedStreamsShouldNotWrite(): void + { + $file = (string) tempnam(sys_get_temp_dir(), 'reactphptest_'); $stream = fopen($file, 'r+'); + assert(is_resource($stream)); + $loop = $this->createWriteableLoopMock(); $conn = new DuplexResourceStream($stream, $loop); @@ -259,7 +442,9 @@ public function testEndedStreamsShouldNotWrite() $conn->end(); $res = $conn->write("bar\n"); + $stream = fopen($file, 'r'); + assert(is_resource($stream)); $this->assertSame("foo\n", fgets($stream)); $this->assertFalse($res); @@ -267,20 +452,25 @@ public function testEndedStreamsShouldNotWrite() unlink($file); } - public function testPipeShouldReturnDestination() + public function testPipeShouldReturnDestination(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new DuplexResourceStream($stream, $loop); - $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $dest = $this->createMock(WritableStreamInterface::class); + assert($dest instanceof WritableStreamInterface); $this->assertSame($dest, $conn->pipe($dest)); } - public function testBufferEventsShouldBubbleUp() + public function testBufferEventsShouldBubbleUp(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -290,18 +480,21 @@ public function testBufferEventsShouldBubbleUp() $conn->on('error', $this->expectCallableOnce()); $buffer->emit('drain'); - $buffer->emit('error', array(new \RuntimeException('Whoops'))); + $buffer->emit('error', [new \RuntimeException('Whoops')]); } /** * @covers React\Stream\DuplexResourceStream::handleData */ - public function testClosingStreamInDataEventShouldNotTriggerError() + public function testClosingStreamInDataEventShouldNotTriggerError(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new DuplexResourceStream($stream, $loop); + $conn->on('error', $this->expectCallableNever()); $conn->on('data', function ($data) use ($conn) { $conn->close(); }); @@ -315,12 +508,13 @@ public function testClosingStreamInDataEventShouldNotTriggerError() /** * @covers React\Stream\DuplexResourceStream::handleData */ - public function testDataFiltered() + public function testDataFiltered(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); // add a filter which removes every 'a' when reading - Filter\append($stream, function ($chunk) { + filter_append($stream, function ($chunk) { return str_replace('a', '', $chunk); }, STREAM_FILTER_READ); @@ -343,12 +537,13 @@ public function testDataFiltered() /** * @covers React\Stream\DuplexResourceStream::handleData */ - public function testDataErrorShouldEmitErrorAndClose() + public function testDataErrorShouldEmitErrorAndClose(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); // add a filter which returns an error when encountering an 'a' when reading - Filter\append($stream, function ($chunk) { + filter_append($stream, function ($chunk) { if (strpos($chunk, 'a') !== false) { throw new \Exception('Invalid'); } @@ -368,7 +563,7 @@ public function testDataErrorShouldEmitErrorAndClose() $conn->handleData($stream); } - private function createWriteableLoopMock() + private function createWriteableLoopMock(): LoopInterface { $loop = $this->createLoopMock(); $loop @@ -381,8 +576,10 @@ private function createWriteableLoopMock() return $loop; } - private function createLoopMock() + /** @return LoopInterface&MockObject */ + private function createLoopMock(): MockObject { - return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + /** @var MockObject&LoopInterface */ + return $this->createMock(LoopInterface::class); } } diff --git a/tests/EnforceBlockingWrapper.php b/tests/EnforceBlockingWrapper.php index 39c0487..cab433b 100644 --- a/tests/EnforceBlockingWrapper.php +++ b/tests/EnforceBlockingWrapper.php @@ -5,26 +5,29 @@ /** * Used to test dummy stream resources that do not support setting non-blocking mode * - * @link http://php.net/manual/de/class.streamwrapper.php + * @link https://www.php.net/manual/en/class.streamwrapper.php */ class EnforceBlockingWrapper { - public function stream_open($path, $mode, $options, &$opened_path) + /** @var resource */ + public $context; + + public function stream_open(string $path, string $mode, int $options, ?string &$opened_path): bool { return true; } - public function stream_cast($cast_as) + public function stream_cast(int $cast_as): bool { return false; } - public function stream_eof() + public function stream_eof(): bool { return false; } - public function stream_set_option($option, $arg1, $arg2) + public function stream_set_option(int $option, int $arg1, ?int $arg2): bool { if ($option === STREAM_OPTION_BLOCKING) { return false; diff --git a/tests/FunctionalInternetTest.php b/tests/FunctionalInternetTest.php index fdbec53..f56eefc 100644 --- a/tests/FunctionalInternetTest.php +++ b/tests/FunctionalInternetTest.php @@ -2,8 +2,9 @@ namespace React\Tests\Stream; -use React\Stream\DuplexResourceStream; use React\EventLoop\Factory; +use React\EventLoop\LoopInterface; +use React\Stream\DuplexResourceStream; use React\Stream\WritableResourceStream; /** @@ -11,10 +12,11 @@ */ class FunctionalInternetTest extends TestCase { - public function testUploadKilobytePlain() + public function testUploadKilobytePlain(): void { $size = 1000; $stream = stream_socket_client('tcp://httpbin.org:80'); + assert(is_resource($stream)); $loop = Factory::create(); $stream = new DuplexResourceStream($stream, $loop); @@ -28,15 +30,16 @@ public function testUploadKilobytePlain() $stream->write("POST /post HTTP/1.0\r\nHost: httpbin.org\r\nContent-Length: $size\r\n\r\n" . str_repeat('.', $size)); - $loop->run(); + $this->awaitStreamClose($stream, $loop); $this->assertNotEquals('', $buffer); } - public function testUploadBiggerBlockPlain() + public function testUploadBiggerBlockPlain(): void { - $size = 1000 * 30; + $size = 50 * 1000; $stream = stream_socket_client('tcp://httpbin.org:80'); + assert(is_resource($stream)); $loop = Factory::create(); $stream = new DuplexResourceStream($stream, $loop); @@ -50,15 +53,16 @@ public function testUploadBiggerBlockPlain() $stream->write("POST /post HTTP/1.0\r\nHost: httpbin.org\r\nContent-Length: $size\r\n\r\n" . str_repeat('.', $size)); - $loop->run(); + $this->awaitStreamClose($stream, $loop); $this->assertNotEquals('', $buffer); } - public function testUploadKilobyteSecure() + public function testUploadKilobyteSecure(): void { $size = 1000; - $stream = stream_socket_client('tls://httpbin.org:443'); + $stream = stream_socket_client('ssl://httpbin.org:443'); + assert(is_resource($stream)); $loop = Factory::create(); $stream = new DuplexResourceStream($stream, $loop); @@ -72,16 +76,26 @@ public function testUploadKilobyteSecure() $stream->write("POST /post HTTP/1.0\r\nHost: httpbin.org\r\nContent-Length: $size\r\n\r\n" . str_repeat('.', $size)); - $loop->run(); + $this->awaitStreamClose($stream, $loop); $this->assertNotEquals('', $buffer); } - public function testUploadBiggerBlockSecureRequiresSmallerChunkSize() + public function testUploadBiggerBlockSecure(): void { - $size = 1000 * 30000; - $stream = stream_socket_client('tls://httpbin.org:443'); - + // A few dozen kilobytes should be enough to verify this works. + // Underlying buffer sizes are platform-specific, so let's increase this + // a bit to trigger different behavior on Linux vs Mac OS X. + $size = 136 * 1000; + + $stream = stream_socket_client('ssl://httpbin.org:443'); + assert(is_resource($stream)); + + // PHP < 7.1.4 suffers from a bug when writing big chunks of data over + // TLS streams at once. + // We work around this by limiting the write chunk size to 8192 bytes + // here to also support older PHP versions. + // See https://github.com/reactphp/socket/issues/105 $loop = Factory::create(); $stream = new DuplexResourceStream( $stream, @@ -99,8 +113,22 @@ public function testUploadBiggerBlockSecureRequiresSmallerChunkSize() $stream->write("POST /post HTTP/1.0\r\nHost: httpbin.org\r\nContent-Length: $size\r\n\r\n" . str_repeat('.', $size)); - $loop->run(); + $this->awaitStreamClose($stream, $loop); $this->assertNotEquals('', $buffer); } + + private function awaitStreamClose(DuplexResourceStream $stream, LoopInterface $loop, float $timeout = 10.0): void + { + $stream->on('close', function () use ($loop) { + $loop->stop(); + }); + + $loop->addTimer($timeout, function () use ($loop) { + $loop->stop(); + $this->fail('Timed out while waiting for stream to close'); + }); + + $loop->run(); + } } diff --git a/tests/ReadableResourceStreamTest.php b/tests/ReadableResourceStreamTest.php index a6909ba..3f7c144 100644 --- a/tests/ReadableResourceStreamTest.php +++ b/tests/ReadableResourceStreamTest.php @@ -2,68 +2,120 @@ namespace React\Tests\Stream; +use PHPUnit\Framework\MockObject\MockObject; +use React\EventLoop\LoopInterface; use React\Stream\ReadableResourceStream; -use Clue\StreamFilter as Filter; +use React\Stream\WritableStreamInterface; +use function Clue\StreamFilter\append as filter_append; class ReadableResourceStreamTest extends TestCase { /** * @covers React\Stream\ReadableResourceStream::__construct + * @doesNotPerformAssertions */ - public function testConstructor() + public function testConstructor(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); new ReadableResourceStream($stream, $loop); } + public function testConstructWithoutLoopAssignsLoopAutomatically(): void + { + $resource = fopen('php://temp', 'r+'); + assert(is_resource($resource)); + + $stream = new ReadableResourceStream($resource); + + $ref = new \ReflectionProperty($stream, 'loop'); + $ref->setAccessible(true); + $loop = $ref->getValue($stream); + + $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); + } + /** * @covers React\Stream\ReadableResourceStream::__construct + * @doesNotPerformAssertions */ - public function testConstructorThrowsExceptionOnInvalidStream() + public function testConstructorWithExcessiveMode(): void { - $loop = $this->createLoopMock(); + // excessive flags are ignored for temp streams, so we have to use a file stream + $name = (string) tempnam(sys_get_temp_dir(), 'test'); + $stream = fopen($name, 'r+eANYTHING'); + assert(is_resource($stream)); + unlink($name); - $this->setExpectedException('InvalidArgumentException'); - new ReadableResourceStream(false, $loop); + $loop = $this->createLoopMock(); + $buffer = new ReadableResourceStream($stream, $loop); + $buffer->close(); } /** * @covers React\Stream\ReadableResourceStream::__construct */ - public function testConstructorThrowsExceptionOnWriteOnlyStream() + public function testConstructorThrowsExceptionOnInvalidStream(): void { - if (defined('HHVM_VERSION')) { - $this->markTestSkipped('HHVM does not report fopen mode for STDOUT'); - } + $loop = $this->createLoopMock(); + + $this->expectException(\InvalidArgumentException::class); + new ReadableResourceStream(false, $loop); // @phpstan-ignore-line + } + /** + * @covers React\Stream\ReadableResourceStream::__construct + */ + public function testConstructorThrowsExceptionOnWriteOnlyStream(): void + { $loop = $this->createLoopMock(); - $this->setExpectedException('InvalidArgumentException'); + $this->expectException(\InvalidArgumentException::class); new ReadableResourceStream(STDOUT, $loop); } /** * @covers React\Stream\ReadableResourceStream::__construct */ - public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking() + public function testConstructorThrowsExceptionOnWriteOnlyStreamWithExcessiveMode(): void + { + // excessive flags are ignored for temp streams, so we have to use a file stream + $name = (string) tempnam(sys_get_temp_dir(), 'test'); + $stream = fopen($name, 'weANYTHING'); + assert(is_resource($stream)); + unlink($name); + + $loop = $this->createLoopMock(); + $this->expectException(\InvalidArgumentException::class); + new ReadableResourceStream($stream, $loop); + } + + /** + * @covers React\Stream\ReadableResourceStream::__construct + */ + public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking(): void { if (!in_array('blocking', stream_get_wrappers())) { stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); } $stream = fopen('blocking://test', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $this->setExpectedException('RuntimeException'); + $this->expectException(\RuntimeException::class); new ReadableResourceStream($stream, $loop); } - - public function testCloseShouldEmitCloseEvent() + public function testCloseShouldEmitCloseEvent(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); @@ -74,9 +126,11 @@ public function testCloseShouldEmitCloseEvent() $this->assertFalse($conn->isReadable()); } - public function testCloseTwiceShouldEmitCloseEventOnce() + public function testCloseTwiceShouldEmitCloseEventOnce(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); @@ -90,9 +144,11 @@ public function testCloseTwiceShouldEmitCloseEventOnce() * @covers React\Stream\ReadableResourceStream::__construct * @covers React\Stream\ReadableResourceStream::handleData */ - public function testDataEvent() + public function testDataEvent(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $capturedData = null; @@ -105,7 +161,7 @@ public function testDataEvent() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $conn->handleData(); $this->assertSame("foobar\n", $capturedData); } @@ -113,9 +169,11 @@ public function testDataEvent() * @covers React\Stream\ReadableResourceStream::__construct * @covers React\Stream\ReadableResourceStream::handleData */ - public function testDataEventDoesEmitOneChunkMatchingBufferSize() + public function testDataEventDoesEmitOneChunkMatchingBufferSize(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $capturedData = null; @@ -128,7 +186,7 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize() fwrite($stream, str_repeat("a", 100000)); rewind($stream); - $conn->handleData($stream); + $conn->handleData(); $this->assertTrue($conn->isReadable()); $this->assertEquals(4321, strlen($capturedData)); @@ -138,9 +196,11 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize() * @covers React\Stream\ReadableResourceStream::__construct * @covers React\Stream\ReadableResourceStream::handleData */ - public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite() + public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $capturedData = null; @@ -154,7 +214,7 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi fwrite($stream, str_repeat("a", 100000)); rewind($stream); - $conn->handleData($stream); + $conn->handleData(); $this->assertTrue($conn->isReadable()); $this->assertEquals(100000, strlen($capturedData)); @@ -163,24 +223,29 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi /** * @covers React\Stream\ReadableResourceStream::handleData */ - public function testEmptyStreamShouldNotEmitData() + public function testEmptyStreamShouldNotEmitData(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $conn->on('data', $this->expectCallableNever()); - $conn->handleData($stream); + $conn->handleData(); } - public function testPipeShouldReturnDestination() + public function testPipeShouldReturnDestination(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); - $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $dest = $this->createMock(WritableStreamInterface::class); + assert($dest instanceof WritableStreamInterface); $this->assertSame($dest, $conn->pipe($dest)); } @@ -188,12 +253,15 @@ public function testPipeShouldReturnDestination() /** * @covers React\Stream\ReadableResourceStream::handleData */ - public function testClosingStreamInDataEventShouldNotTriggerError() + public function testClosingStreamInDataEventShouldNotTriggerError(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); + $conn->on('error', $this->expectCallableNever()); $conn->on('data', function ($data) use ($conn) { $conn->close(); }); @@ -201,18 +269,101 @@ public function testClosingStreamInDataEventShouldNotTriggerError() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $conn->handleData(); + } + + /** + * @covers React\Stream\ReadableResourceStream::pause + */ + public function testPauseRemovesReadStreamFromLoop(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->pause(); + $conn->pause(); + } + + /** + * @covers React\Stream\ReadableResourceStream::pause + */ + public function testResumeDoesAddStreamToLoopOnlyOnce(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->resume(); + $conn->resume(); + } + + /** + * @covers React\Stream\ReadableResourceStream::close + */ + public function testCloseRemovesReadStreamFromLoop(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->close(); + } + + /** + * @covers React\Stream\ReadableResourceStream::close + */ + public function testCloseAfterPauseRemovesReadStreamFromLoopOnce(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->pause(); + $conn->close(); + } + + /** + * @covers React\Stream\ReadableResourceStream::close + */ + public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce(): void + { + $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->close(); + $conn->resume(); } /** * @covers React\Stream\ReadableResourceStream::handleData */ - public function testDataFiltered() + public function testDataFiltered(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); // add a filter which removes every 'a' when reading - Filter\append($stream, function ($chunk) { + filter_append($stream, function ($chunk) { return str_replace('a', '', $chunk); }, STREAM_FILTER_READ); @@ -228,19 +379,20 @@ public function testDataFiltered() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $conn->handleData(); $this->assertSame("foobr\n", $capturedData); } /** * @covers React\Stream\ReadableResourceStream::handleData */ - public function testDataErrorShouldEmitErrorAndClose() + public function testDataErrorShouldEmitErrorAndClose(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); // add a filter which returns an error when encountering an 'a' when reading - Filter\append($stream, function ($chunk) { + filter_append($stream, function ($chunk) { if (strpos($chunk, 'a') !== false) { throw new \Exception('Invalid'); } @@ -257,11 +409,35 @@ public function testDataErrorShouldEmitErrorAndClose() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $conn->handleData(); + } + + /** + * @covers React\Stream\ReadableResourceStream::handleData + */ + public function testEmptyReadShouldntFcloseStream(): void + { + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + assert(is_array($pair)); + [$stream, $_] = $pair; + + $loop = $this->createLoopMock(); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->on('error', $this->expectCallableNever()); + $conn->on('data', $this->expectCallableNever()); + $conn->on('end', $this->expectCallableNever()); + + $conn->handleData(); + + fclose($stream); + fclose($_); } - private function createLoopMock() + /** @return MockObject&LoopInterface */ + private function createLoopMock(): MockObject { - return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + /** @var MockObject&LoopInterface */ + return $this->createMock(LoopInterface::class); } } diff --git a/tests/Stub/ReadableStreamStub.php b/tests/Stub/ReadableStreamStub.php index 6984f24..98d0240 100644 --- a/tests/Stub/ReadableStreamStub.php +++ b/tests/Stub/ReadableStreamStub.php @@ -9,50 +9,58 @@ class ReadableStreamStub extends EventEmitter implements ReadableStreamInterface { + /** @var bool */ public $readable = true; + + /** @var bool */ public $paused = false; - public function isReadable() + public function isReadable(): bool { return true; } - // trigger data event - public function write($data) + /** + * trigger data event + * + * @param mixed $data + * @return void + */ + public function write($data): void { - $this->emit('data', array($data)); + $this->emit('data', [$data]); } // trigger error event - public function error($error) + public function error(\Exception $error): void { - $this->emit('error', array($error)); + $this->emit('error', [$error]); } // trigger end event - public function end() + public function end(): void { - $this->emit('end', array()); + $this->emit('end', []); } - public function pause() + public function pause(): void { $this->paused = true; } - public function resume() + public function resume(): void { $this->paused = false; } - public function close() + public function close(): void { $this->readable = false; $this->emit('close'); } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface { Util::pipe($this, $dest, $options); diff --git a/tests/TestCase.php b/tests/TestCase.php index 63746f9..95bc3f1 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -2,19 +2,12 @@ namespace React\Tests\Stream; -class TestCase extends \PHPUnit_Framework_TestCase -{ - protected function expectCallableExactly($amount) - { - $mock = $this->createCallableMock(); - $mock - ->expects($this->exactly($amount)) - ->method('__invoke'); +use PHPUnit\Framework\MockObject\MockObject; +use PHPUnit\Framework\TestCase as BaseTestCase; - return $mock; - } - - protected function expectCallableOnce() +class TestCase extends BaseTestCase +{ + protected function expectCallableOnce(): callable { $mock = $this->createCallableMock(); $mock @@ -24,7 +17,8 @@ protected function expectCallableOnce() return $mock; } - protected function expectCallableOnceWith($value) + /** @param mixed $value */ + protected function expectCallableOnceWith($value): callable { $callback = $this->createCallableMock(); $callback @@ -35,7 +29,7 @@ protected function expectCallableOnceWith($value) return $callback; } - protected function expectCallableNever() + protected function expectCallableNever(): callable { $mock = $this->createCallableMock(); $mock @@ -45,8 +39,19 @@ protected function expectCallableNever() return $mock; } - protected function createCallableMock() + /** @return MockObject&callable */ + protected function createCallableMock(): MockObject { - return $this->getMockBuilder('React\Tests\Stream\CallableStub')->getMock(); + $builder = $this->getMockBuilder(\stdClass::class); + if (method_exists($builder, 'addMethods')) { + // PHPUnit 9+ + $mock = $builder->addMethods(['__invoke'])->getMock(); + } else { + // legacy PHPUnit + $mock = $builder->setMethods(['__invoke'])->getMock(); + } + assert($mock instanceof MockObject && is_callable($mock)); + + return $mock; } } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index a98badf..a4b49b0 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -3,23 +3,15 @@ namespace React\Tests\Stream; use React\Stream\ThroughStream; +use React\Stream\WritableStreamInterface; /** * @covers React\Stream\ThroughStream */ class ThroughStreamTest extends TestCase { - /** - * @test - * @expectedException InvalidArgumentException - */ - public function itShouldRejectInvalidCallback() - { - new ThroughStream(123); - } - /** @test */ - public function itShouldReturnTrueForAnyDataWrittenToIt() + public function itShouldReturnTrueForAnyDataWrittenToIt(): void { $through = new ThroughStream(); $ret = $through->write('foo'); @@ -28,7 +20,7 @@ public function itShouldReturnTrueForAnyDataWrittenToIt() } /** @test */ - public function itShouldEmitAnyDataWrittenToIt() + public function itShouldEmitAnyDataWrittenToIt(): void { $through = new ThroughStream(); $through->on('data', $this->expectCallableOnceWith('foo')); @@ -36,7 +28,7 @@ public function itShouldEmitAnyDataWrittenToIt() } /** @test */ - public function itShouldEmitAnyDataWrittenToItPassedThruFunction() + public function itShouldEmitAnyDataWrittenToItPassedThruFunction(): void { $through = new ThroughStream('strtoupper'); $through->on('data', $this->expectCallableOnceWith('FOO')); @@ -44,7 +36,7 @@ public function itShouldEmitAnyDataWrittenToItPassedThruFunction() } /** @test */ - public function itShouldEmitAnyDataWrittenToItPassedThruCallback() + public function itShouldEmitAnyDataWrittenToItPassedThruCallback(): void { $through = new ThroughStream('strtoupper'); $through->on('data', $this->expectCallableOnceWith('FOO')); @@ -52,7 +44,7 @@ public function itShouldEmitAnyDataWrittenToItPassedThruCallback() } /** @test */ - public function itShouldEmitErrorAndCloseIfCallbackThrowsException() + public function itShouldEmitErrorAndCloseIfCallbackThrowsException(): void { $through = new ThroughStream(function () { throw new \RuntimeException(); @@ -69,7 +61,7 @@ public function itShouldEmitErrorAndCloseIfCallbackThrowsException() } /** @test */ - public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd() + public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd(): void { $through = new ThroughStream(function () { throw new \RuntimeException(); @@ -86,7 +78,7 @@ public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd() } /** @test */ - public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() + public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused(): void { $through = new ThroughStream(); $through->pause(); @@ -96,7 +88,31 @@ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() } /** @test */ - public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused() + public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventEndsStream(): void + { + $through = new ThroughStream(); + $through->on('data', function () use ($through) { + $through->end(); + }); + $ret = $through->write('foo'); + + $this->assertFalse($ret); + } + + /** @test */ + public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventClosesStream(): void + { + $through = new ThroughStream(); + $through->on('data', function () use ($through) { + $through->close(); + }); + $ret = $through->write('foo'); + + $this->assertFalse($ret); + } + + /** @test */ + public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused(): void { $through = new ThroughStream(); $through->pause(); @@ -107,7 +123,41 @@ public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWh } /** @test */ - public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause() + public function itShouldNotEmitDrainOnResumeAfterClose(): void + { + $through = new ThroughStream(); + $through->close(); + + $through->on('drain', $this->expectCallableNever()); + $through->resume(); + } + + /** @test */ + public function itShouldNotEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenThatCausesStreamToClose(): void + { + $through = new ThroughStream(); + $through->on('data', function () use ($through) { $through->close(); }); + $through->write('foo'); + + $through->on('drain', $this->expectCallableNever()); + $through->resume(); + } + + /** @test */ + public function itShouldReturnFalseForAnyDataWrittenToItAfterPausingFromDrainEvent(): void + { + $through = new ThroughStream(); + $through->pause(); + $through->write('foo'); + + $through->on('drain', function () use ($through) { $through->pause(); }); + $through->resume(); + + $this->assertFalse($through->write('bar')); + } + + /** @test */ + public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause(): void { $through = new ThroughStream(); $through->on('drain', $this->expectCallableNever()); @@ -119,7 +169,7 @@ public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause() } /** @test */ - public function pipingStuffIntoItShouldWork() + public function pipingStuffIntoItShouldWork(): void { $readable = new ThroughStream(); @@ -127,11 +177,11 @@ public function pipingStuffIntoItShouldWork() $through->on('data', $this->expectCallableOnceWith('foo')); $readable->pipe($through); - $readable->emit('data', array('foo')); + $readable->emit('data', ['foo']); } /** @test */ - public function endShouldEmitEndAndClose() + public function endShouldEmitEndAndClose(): void { $through = new ThroughStream(); $through->on('data', $this->expectCallableNever()); @@ -141,7 +191,7 @@ public function endShouldEmitEndAndClose() } /** @test */ - public function endShouldCloseTheStream() + public function endShouldCloseTheStream(): void { $through = new ThroughStream(); $through->on('data', $this->expectCallableNever()); @@ -152,7 +202,7 @@ public function endShouldCloseTheStream() } /** @test */ - public function endShouldWriteDataBeforeClosing() + public function endShouldWriteDataBeforeClosing(): void { $through = new ThroughStream(); $through->on('data', $this->expectCallableOnceWith('foo')); @@ -163,16 +213,16 @@ public function endShouldWriteDataBeforeClosing() } /** @test */ - public function endTwiceShouldOnlyEmitOnce() + public function endTwiceShouldOnlyEmitOnce(): void { $through = new ThroughStream(); - $through->on('data', $this->expectCallableOnce('first')); + $through->on('data', $this->expectCallableOnceWith('first')); $through->end('first'); $through->end('ignored'); } /** @test */ - public function writeAfterEndShouldReturnFalse() + public function writeAfterEndShouldReturnFalse(): void { $through = new ThroughStream(); $through->on('data', $this->expectCallableNever()); @@ -182,16 +232,16 @@ public function writeAfterEndShouldReturnFalse() } /** @test */ - public function writeDataWillCloseStreamShouldReturnFalse() + public function writeDataWillCloseStreamShouldReturnFalse(): void { $through = new ThroughStream(); - $through->on('data', array($through, 'close')); + $through->on('data', [$through, 'close']); $this->assertFalse($through->write('foo')); } /** @test */ - public function writeDataToPausedShouldReturnFalse() + public function writeDataToPausedShouldReturnFalse(): void { $through = new ThroughStream(); $through->pause(); @@ -200,7 +250,7 @@ public function writeDataToPausedShouldReturnFalse() } /** @test */ - public function writeDataToResumedShouldReturnTrue() + public function writeDataToResumedShouldReturnTrue(): void { $through = new ThroughStream(); $through->pause(); @@ -210,21 +260,21 @@ public function writeDataToResumedShouldReturnTrue() } /** @test */ - public function itShouldBeReadableByDefault() + public function itShouldBeReadableByDefault(): void { $through = new ThroughStream(); $this->assertTrue($through->isReadable()); } /** @test */ - public function itShouldBeWritableByDefault() + public function itShouldBeWritableByDefault(): void { $through = new ThroughStream(); $this->assertTrue($through->isWritable()); } /** @test */ - public function closeShouldCloseOnce() + public function closeShouldCloseOnce(): void { $through = new ThroughStream(); @@ -237,7 +287,7 @@ public function closeShouldCloseOnce() } /** @test */ - public function doubleCloseShouldCloseOnce() + public function doubleCloseShouldCloseOnce(): void { $through = new ThroughStream(); @@ -251,14 +301,15 @@ public function doubleCloseShouldCloseOnce() } /** @test */ - public function pipeShouldPipeCorrectly() + public function pipeShouldPipeCorrectly(): void { - $output = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $output = $this->createMock(WritableStreamInterface::class); $output->expects($this->any())->method('isWritable')->willReturn(True); $output ->expects($this->once()) ->method('write') ->with('foo'); + assert($output instanceof WritableStreamInterface); $through = new ThroughStream(); $through->pipe($output); diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 3d113ab..6bac04c 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -2,49 +2,56 @@ namespace React\Tests\Stream; -use React\Stream\WritableResourceStream; -use React\Stream\Util; +use React\EventLoop\LoopInterface; use React\Stream\CompositeStream; +use React\Stream\ReadableStreamInterface; use React\Stream\ThroughStream; +use React\Stream\Util; +use React\Stream\WritableResourceStream; +use React\Stream\WritableStreamInterface; /** * @covers React\Stream\Util */ class UtilTest extends TestCase { - public function testPipeReturnsDestinationStream() + public function testPipeReturnsDestinationStream(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); + assert($writable instanceof WritableStreamInterface); $ret = Util::pipe($readable, $writable); $this->assertSame($writable, $ret); } - public function testPipeNonReadableSourceShouldDoNothing() + public function testPipeNonReadableSourceShouldDoNothing(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->any()) ->method('isReadable') ->willReturn(false); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->never()) ->method('isWritable'); $writable ->expects($this->never()) ->method('end'); + assert($writable instanceof WritableStreamInterface); Util::pipe($readable, $writable); } - public function testPipeIntoNonWritableDestinationShouldPauseSource() + public function testPipeIntoNonWritableDestinationShouldPauseSource(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->any()) ->method('isReadable') @@ -52,8 +59,9 @@ public function testPipeIntoNonWritableDestinationShouldPauseSource() $readable ->expects($this->once()) ->method('pause'); + assert($readable instanceof ReadableStreamInterface); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->any()) ->method('isWritable') @@ -61,13 +69,14 @@ public function testPipeIntoNonWritableDestinationShouldPauseSource() $writable ->expects($this->never()) ->method('end'); + assert($writable instanceof WritableStreamInterface); Util::pipe($readable, $writable); } - public function testPipeClosingDestPausesSource() + public function testPipeClosingDestPausesSource(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable ->expects($this->any()) ->method('isReadable') @@ -75,6 +84,7 @@ public function testPipeClosingDestPausesSource() $readable ->expects($this->once()) ->method('pause'); + assert($readable instanceof ReadableStreamInterface); $writable = new ThroughStream(); @@ -83,11 +93,11 @@ public function testPipeClosingDestPausesSource() $writable->close(); } - public function testPipeWithEnd() + public function testPipeWithEnd(): void { $readable = new Stub\ReadableStreamStub(); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->any()) ->method('isWritable') @@ -95,17 +105,18 @@ public function testPipeWithEnd() $writable ->expects($this->once()) ->method('end'); + assert($writable instanceof WritableStreamInterface); Util::pipe($readable, $writable); $readable->end(); } - public function testPipeWithoutEnd() + public function testPipeWithoutEnd(): void { $readable = new Stub\ReadableStreamStub(); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->any()) ->method('isWritable') @@ -113,17 +124,18 @@ public function testPipeWithoutEnd() $writable ->expects($this->never()) ->method('end'); + assert($writable instanceof WritableStreamInterface); - Util::pipe($readable, $writable, array('end' => false)); + Util::pipe($readable, $writable, ['end' => false]); $readable->end(); } - public function testPipeWithTooSlowWritableShouldPauseReadable() + public function testPipeWithTooSlowWritableShouldPauseReadable(): void { $readable = new Stub\ReadableStreamStub(); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->any()) ->method('isWritable') @@ -133,6 +145,7 @@ public function testPipeWithTooSlowWritableShouldPauseReadable() ->method('write') ->with('some data') ->will($this->returnValue(false)); + assert($writable instanceof WritableStreamInterface); $readable->pipe($writable); @@ -141,13 +154,13 @@ public function testPipeWithTooSlowWritableShouldPauseReadable() $this->assertTrue($readable->paused); } - public function testPipeWithTooSlowWritableShouldResumeOnDrain() + public function testPipeWithTooSlowWritableShouldResumeOnDrain(): void { $readable = new Stub\ReadableStreamStub(); $onDrain = null; - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable = $this->createMock(WritableStreamInterface::class); $writable ->expects($this->any()) ->method('isWritable') @@ -160,6 +173,7 @@ public function testPipeWithTooSlowWritableShouldResumeOnDrain() $onDrain = $callback; } })); + assert($writable instanceof WritableStreamInterface); $readable->pipe($writable); $readable->pause(); @@ -170,12 +184,15 @@ public function testPipeWithTooSlowWritableShouldResumeOnDrain() $this->assertFalse($readable->paused); } - public function testPipeWithWritableResourceStream() + public function testPipeWithWritableResourceStream(): void { $readable = new Stub\ReadableStreamStub(); $stream = fopen('php://temp', 'r+'); - $loop = $this->createLoopMock(); + assert(is_resource($stream)); + + $loop = $this->createMock(LoopInterface::class); + assert($loop instanceof LoopInterface); $buffer = new WritableResourceStream($stream, $loop); $readable->pipe($buffer); @@ -188,7 +205,7 @@ public function testPipeWithWritableResourceStream() $this->assertSame('hello, I am some random data', stream_get_contents($stream)); } - public function testPipeSetsUpListeners() + public function testPipeSetsUpListeners(): void { $source = new ThroughStream(); $dest = new ThroughStream(); @@ -204,7 +221,7 @@ public function testPipeSetsUpListeners() $this->assertCount(1, $dest->listeners('drain')); } - public function testPipeClosingSourceRemovesListeners() + public function testPipeClosingSourceRemovesListeners(): void { $source = new ThroughStream(); $dest = new ThroughStream(); @@ -218,7 +235,7 @@ public function testPipeClosingSourceRemovesListeners() $this->assertCount(0, $dest->listeners('drain')); } - public function testPipeClosingDestRemovesListeners() + public function testPipeClosingDestRemovesListeners(): void { $source = new ThroughStream(); $dest = new ThroughStream(); @@ -232,12 +249,16 @@ public function testPipeClosingDestRemovesListeners() $this->assertCount(0, $dest->listeners('drain')); } - public function testPipeDuplexIntoSelfEndsOnEnd() + public function testPipeDuplexIntoSelfEndsOnEnd(): void { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable = $this->createMock(ReadableStreamInterface::class); $readable->expects($this->any())->method('isReadable')->willReturn(true); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + assert($readable instanceof ReadableStreamInterface); + + $writable = $this->createMock(WritableStreamInterface::class); $writable->expects($this->any())->method('isWritable')->willReturn(true); + assert($writable instanceof WritableStreamInterface); + $duplex = new CompositeStream($readable, $writable); Util::pipe($duplex, $duplex); @@ -248,26 +269,16 @@ public function testPipeDuplexIntoSelfEndsOnEnd() } /** @test */ - public function forwardEventsShouldSetupForwards() + public function forwardEventsShouldSetupForwards(): void { $source = new ThroughStream(); $target = new ThroughStream(); - Util::forwardEvents($source, $target, array('data')); + Util::forwardEvents($source, $target, ['data']); $target->on('data', $this->expectCallableOnce()); $target->on('foo', $this->expectCallableNever()); - $source->emit('data', array('hello')); - $source->emit('foo', array('bar')); - } - - private function createLoopMock() - { - return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - } - - private function notEqualTo($value) - { - return new \PHPUnit_Framework_Constraint_Not($value); + $source->emit('data', ['hello']); + $source->emit('foo', ['bar']); } } diff --git a/tests/WritableStreamResourceTest.php b/tests/WritableResourceStreamTest.php similarity index 71% rename from tests/WritableStreamResourceTest.php rename to tests/WritableResourceStreamTest.php index 5f0400e..bc4fc42 100644 --- a/tests/WritableStreamResourceTest.php +++ b/tests/WritableResourceStreamTest.php @@ -2,60 +2,115 @@ namespace React\Tests\Stream; -use Clue\StreamFilter as Filter; +use PHPUnit\Framework\MockObject\MockObject; +use React\EventLoop\LoopInterface; use React\Stream\WritableResourceStream; +use function Clue\StreamFilter\append as filter_append; class WritableResourceStreamTest extends TestCase { /** * @covers React\Stream\WritableResourceStream::__construct + * @doesNotPerformAssertions */ - public function testConstructor() + public function testConstructor(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); + new WritableResourceStream($stream, $loop); + } + + public function testConstructWithoutLoopAssignsLoopAutomatically(): void + { + $resource = fopen('php://temp', 'r+'); + assert(is_resource($resource)); + + $stream = new WritableResourceStream($resource); + + $ref = new \ReflectionProperty($stream, 'loop'); + $ref->setAccessible(true); + $loop = $ref->getValue($stream); + + $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); + } + + /** + * @covers React\Stream\WritableResourceStream::__construct + * @doesNotPerformAssertions + */ + public function testConstructorWithExcessiveMode(): void + { + // excessive flags are ignored for temp streams, so we have to use a file stream + $name = (string) tempnam(sys_get_temp_dir(), 'test'); + $stream = fopen($name, 'w+eANYTHING'); + assert(is_resource($stream)); + unlink($name); + + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); - $buffer->on('error', $this->expectCallableNever()); + $buffer->close(); } /** * @covers React\Stream\WritableResourceStream::__construct - * @expectedException InvalidArgumentException */ - public function testConstructorThrowsIfNotAValidStreamResource() + public function testConstructorThrowsIfNotAValidStreamResource(): void { $stream = null; $loop = $this->createLoopMock(); - new WritableResourceStream($stream, $loop); + $this->expectException(\InvalidArgumentException::class); + new WritableResourceStream($stream, $loop); // @phpstan-ignore-line } /** * @covers React\Stream\WritableResourceStream::__construct - * @expectedException InvalidArgumentException */ - public function testConstructorThrowsExceptionOnReadOnlyStream() + public function testConstructorThrowsExceptionOnReadOnlyStream(): void { $stream = fopen('php://temp', 'r'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); + $this->expectException(\InvalidArgumentException::class); new WritableResourceStream($stream, $loop); } /** * @covers React\Stream\WritableResourceStream::__construct */ - public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking() + public function testConstructorThrowsExceptionOnReadOnlyStreamWithExcessiveMode(): void + { + // excessive flags are ignored for temp streams, so we have to use a file stream + $name = (string) tempnam(sys_get_temp_dir(), 'test'); + $stream = fopen($name, 'reANYTHING'); + assert(is_resource($stream)); + unlink($name); + + $loop = $this->createLoopMock(); + $this->expectException(\InvalidArgumentException::class); + new WritableResourceStream($stream, $loop); + } + + /** + * @covers React\Stream\WritableResourceStream::__construct + */ + public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking(): void { if (!in_array('blocking', stream_get_wrappers())) { stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); } $stream = fopen('blocking://test', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); - $this->setExpectedException('RuntimeException'); + $this->expectException(\RuntimeException::class); new WritableResourceStream($stream, $loop); } @@ -63,9 +118,11 @@ public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking( * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testWrite() + public function testWrite(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createWriteableLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -79,9 +136,11 @@ public function testWrite() /** * @covers React\Stream\WritableResourceStream::write */ - public function testWriteWithDataDoesAddResourceToLoop() + public function testWriteWithDataDoesAddResourceToLoop(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $loop->expects($this->once())->method('addWriteStream')->with($this->equalTo($stream)); @@ -94,9 +153,11 @@ public function testWriteWithDataDoesAddResourceToLoop() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testEmptyWriteDoesNotAddToLoop() + public function testEmptyWriteDoesNotAddToLoop(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $loop->expects($this->never())->method('addWriteStream'); @@ -110,26 +171,39 @@ public function testEmptyWriteDoesNotAddToLoop() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testWriteReturnsFalseWhenWritableResourceStreamIsFull() + public function testWriteReturnsFalseWhenWritableResourceStreamIsFull(): void { $stream = fopen('php://temp', 'r+'); - $loop = $this->createWriteableLoopMock(); - $loop->preventWrites = true; + assert(is_resource($stream)); + + $preventWrites = true; + $loop = $this->createLoopMock(); + $loop + ->expects($this->any()) + ->method('addWriteStream') + ->will($this->returnCallback(function ($stream, $listener) use (&$preventWrites) { + /** @var bool $preventWrites */ + if (!$preventWrites) { + call_user_func($listener, $stream); + } + })); $buffer = new WritableResourceStream($stream, $loop, 4); $buffer->on('error', $this->expectCallableNever()); $this->assertTrue($buffer->write("foo")); - $loop->preventWrites = false; + $preventWrites = false; $this->assertFalse($buffer->write("bar\n")); } /** * @covers React\Stream\WritableResourceStream::write */ - public function testWriteReturnsFalseWhenWritableResourceStreamIsExactlyFull() + public function testWriteReturnsFalseWhenWritableResourceStreamIsExactlyFull(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop, 3); @@ -141,9 +215,11 @@ public function testWriteReturnsFalseWhenWritableResourceStreamIsExactlyFull() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testWriteDetectsWhenOtherSideIsClosed() + public function testWriteDetectsWhenOtherSideIsClosed(): void { - list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + assert(is_array($pair)); + [$a, $b] = $pair; $loop = $this->createWriteableLoopMock(); @@ -159,9 +235,11 @@ public function testWriteDetectsWhenOtherSideIsClosed() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testEmitsDrainAfterWriteWhichExceedsBuffer() + public function testEmitsDrainAfterWriteWhichExceedsBuffer(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop, 2); @@ -176,9 +254,11 @@ public function testEmitsDrainAfterWriteWhichExceedsBuffer() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testWriteInDrain() + public function testWriteInDrain(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop, 2); @@ -200,9 +280,11 @@ public function testWriteInDrain() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testDrainAfterWrite() + public function testDrainAfterWrite(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop, 2); @@ -216,9 +298,11 @@ public function testDrainAfterWrite() /** * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testDrainAfterWriteWillRemoveResourceFromLoopWithoutClosing() + public function testDrainAfterWriteWillRemoveResourceFromLoopWithoutClosing(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $loop->expects($this->once())->method('removeWriteStream')->with($stream); @@ -235,9 +319,11 @@ public function testDrainAfterWriteWillRemoveResourceFromLoopWithoutClosing() /** * @covers React\Stream\WritableResourceStream::handleWrite */ - public function testClosingDuringDrainAfterWriteWillRemoveResourceFromLoopOnceAndClose() + public function testClosingDuringDrainAfterWriteWillRemoveResourceFromLoopOnceAndClose(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $loop->expects($this->once())->method('removeWriteStream')->with($stream); @@ -256,9 +342,11 @@ public function testClosingDuringDrainAfterWriteWillRemoveResourceFromLoopOnceAn /** * @covers React\Stream\WritableResourceStream::end */ - public function testEndWithoutDataClosesImmediatelyIfWritableResourceStreamIsEmpty() + public function testEndWithoutDataClosesImmediatelyIfWritableResourceStreamIsEmpty(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -273,9 +361,11 @@ public function testEndWithoutDataClosesImmediatelyIfWritableResourceStreamIsEmp /** * @covers React\Stream\WritableResourceStream::end */ - public function testEndWithoutDataDoesNotCloseIfWritableResourceStreamIsFull() + public function testEndWithoutDataDoesNotCloseIfWritableResourceStreamIsFull(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -292,9 +382,11 @@ public function testEndWithoutDataDoesNotCloseIfWritableResourceStreamIsFull() /** * @covers React\Stream\WritableResourceStream::end */ - public function testEndWithDataClosesImmediatelyIfWritableResourceStreamFlushes() + public function testEndWithDataClosesImmediatelyIfWritableResourceStreamFlushes(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $filterBuffer = ''; $loop = $this->createLoopMock(); @@ -302,7 +394,7 @@ public function testEndWithDataClosesImmediatelyIfWritableResourceStreamFlushes( $buffer->on('error', $this->expectCallableNever()); $buffer->on('close', $this->expectCallableOnce()); - Filter\append($stream, function ($chunk) use (&$filterBuffer) { + filter_append($stream, function ($chunk) use (&$filterBuffer) { $filterBuffer .= $chunk; return $chunk; }); @@ -318,9 +410,11 @@ public function testEndWithDataClosesImmediatelyIfWritableResourceStreamFlushes( /** * @covers React\Stream\WritableResourceStream::end */ - public function testEndWithDataDoesNotCloseImmediatelyIfWritableResourceStreamIsFull() + public function testEndWithDataDoesNotCloseImmediatelyIfWritableResourceStreamIsFull(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -341,9 +435,11 @@ public function testEndWithDataDoesNotCloseImmediatelyIfWritableResourceStreamIs * @covers React\Stream\WritableResourceStream::isWritable * @covers React\Stream\WritableResourceStream::close */ - public function testClose() + public function testClose(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -354,15 +450,17 @@ public function testClose() $buffer->close(); $this->assertFalse($buffer->isWritable()); - $this->assertEquals(array(), $buffer->listeners('close')); + $this->assertEquals([], $buffer->listeners('close')); } /** * @covers React\Stream\WritableResourceStream::close */ - public function testClosingAfterWriteRemovesStreamFromLoop() + public function testClosingAfterWriteRemovesStreamFromLoop(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -375,9 +473,11 @@ public function testClosingAfterWriteRemovesStreamFromLoop() /** * @covers React\Stream\WritableResourceStream::close */ - public function testClosingWithoutWritingDoesNotRemoveStreamFromLoop() + public function testClosingWithoutWritingDoesNotRemoveStreamFromLoop(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -389,9 +489,11 @@ public function testClosingWithoutWritingDoesNotRemoveStreamFromLoop() /** * @covers React\Stream\WritableResourceStream::close */ - public function testDoubleCloseWillEmitOnlyOnce() + public function testDoubleCloseWillEmitOnlyOnce(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); @@ -405,15 +507,17 @@ public function testDoubleCloseWillEmitOnlyOnce() * @covers React\Stream\WritableResourceStream::write * @covers React\Stream\WritableResourceStream::close */ - public function testWritingToClosedWritableResourceStreamShouldNotWriteToStream() + public function testWritingToClosedWritableResourceStreamShouldNotWriteToStream(): void { $stream = fopen('php://temp', 'r+'); + assert(is_resource($stream)); + $filterBuffer = ''; $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); - Filter\append($stream, function ($chunk) use (&$filterBuffer) { + filter_append($stream, function ($chunk) use (&$filterBuffer) { $filterBuffer .= $chunk; return $chunk; }); @@ -426,40 +530,16 @@ public function testWritingToClosedWritableResourceStreamShouldNotWriteToStream( $this->assertSame('', $filterBuffer); } - /** - * @covers React\Stream\WritableResourceStream::handleWrite - */ - public function testErrorWhenStreamResourceIsInvalid() - { - $stream = fopen('php://temp', 'r+'); - $loop = $this->createWriteableLoopMock(); - - $error = null; - - $buffer = new WritableResourceStream($stream, $loop); - $buffer->on('error', function ($message) use (&$error) { - $error = $message; - }); - - // invalidate stream resource - fclose($stream); - - $buffer->write('Attempting to write to bad stream'); - - $this->assertInstanceOf('Exception', $error); - - // the error messages differ between PHP versions, let's just check substrings - $this->assertContains('Unable to write to stream: ', $error->getMessage()); - $this->assertContains(' not a valid stream resource', $error->getMessage(), '', true); - } - - public function testWritingToClosedStream() + public function testWritingToClosedStream(): void { if ('Darwin' === PHP_OS) { $this->markTestSkipped('OS X issue with shutting down pair for writing'); } - list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + assert(is_array($pair)); + [$a, $b] = $pair; + $loop = $this->createLoopMock(); $error = null; @@ -476,28 +556,27 @@ public function testWritingToClosedStream() $buffer->write('bar'); $buffer->handleWrite(); - $this->assertInstanceOf('Exception', $error); - $this->assertSame('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage()); + $this->assertInstanceOf(\Exception::class, $error); + $this->assertEqualsIgnoringCase('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage()); } - private function createWriteableLoopMock() + private function createWriteableLoopMock(): LoopInterface { $loop = $this->createLoopMock(); - $loop->preventWrites = false; $loop ->expects($this->any()) ->method('addWriteStream') - ->will($this->returnCallback(function ($stream, $listener) use ($loop) { - if (!$loop->preventWrites) { - call_user_func($listener, $stream); - } + ->will($this->returnCallback(function ($stream, $listener) { + call_user_func($listener, $stream); })); return $loop; } - private function createLoopMock() + /** @return MockObject&LoopInterface */ + private function createLoopMock(): MockObject { - return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + /** @var MockObject&LoopInterface */ + return $this->createMock(LoopInterface::class); } } diff --git a/tests/bootstrap.php b/tests/bootstrap.php deleted file mode 100644 index 754ef06..0000000 --- a/tests/bootstrap.php +++ /dev/null @@ -1,7 +0,0 @@ -addPsr4('React\\Tests\\Stream\\', __DIR__);