8000 Add AMQP interop based driver. by makasim · Pull Request #158 · yiisoft/yii2-queue · GitHub
[go: up one dir, main page]

Skip to content

Add AMQP interop based driver. #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address review comments.
  • Loading branch information
makasim committed Nov 29, 2017
commit a2d3c9dab024bfb86989655ef35cd0e479230b73
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Yii2 Queue Extension Change Log
2.0.2 under development
-----------------------

- Enh: Add Amqp Interop driver (makasim)
- Enh: #158 Add Amqp Interop driver (makasim)

## 2.0.1, November 13, 2017

Expand Down
2 changes: 1 addition & 1 deletion docs/guide/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Queue Drivers
* [Db](driver-db.md)
* [Redis](driver-redis.md)
* [RabbitMQ](driver-amqp.md)
* [AMQP Interop)](driver-amqp-interop.md)
* [AMQP Interop](driver-amqp-interop.md)
* [Beanstalk](driver-beanstalk.md)
* [Gearman](driver-gearman.md)

Expand Down
4 changes: 2 additions & 2 deletions docs/guide/driver-amqp-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Advantages:

* It would work with any amqp interop compatible transports, such as

* [enqueue/amqp-ext](https://github.com/php-enqueue/amqp-ext) based on [php amqp extension](https://github.com/pdezwart/php-amqp)
* [enqueue/amqp-ext](https://github.com/php-enqueue/amqp-ext) based on [PHP amqp extension](https://github.com/pdezwart/php-amqp)
* [enqueue/amqp-lib](https://github.com/php-enqueue/amqp-lib) based on [php-amqplib/php-amqplib](https://github.com/php-amqplib/php-amqplib)
* [enqueue/amqp-bunny](https://github.com/php-enqueue/amqp-bunny) based on [bunny](https://github.com/jakubkulhan/bunny)

Expand All @@ -35,7 +35,7 @@ return [
'user' => 'guest',
'password' => 'guest',
'queueName' => 'queue',
'driver' => 'lib',
'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,

// or
'dsn' => 'amqp://guest:guest@localhost:5672/%2F',
Expand Down
1 change: 1 addition & 0 deletions src/drivers/amqp_interop/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Manages application amqp-queue.
*
* @author Maksym Kotliar <kotlyar.maksim@gmail.com>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@since 2.0.2 (or which version is it going to be released it, @zhuravljov ?)

* @since 2.0.2
*/
class Command extends CliCommand
{
Expand Down
122 changes: 90 additions & 32 deletions src/drivers/amqp_interop/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Amqp Queue
*
* @author Maksym Kotliar <kotlyar.maksim@gmail.com>
* @since 2.0.2
*/
class Queue extends CliQueue
{
Expand All @@ -36,143 +37,200 @@ class Queue extends CliQueue
const H_DELAY = 'yii-delay';
const H_PRIORITY = 'yii-priority';

const ENQUEUE_AMQP_LIB = 'enqueue/amqp-lib';
const ENQUEUE_AMQP_EXT = 'enqueue/amqp-ext';
const ENQUEUE_AMQP_BUNNY = 'enqueue/amqp-bunny';

/**
* @var string like amqp:, amqps:, amqps://user:pass@localhost:1000/vhost
* The connection to the borker could be configured as an array of options
* or as a DSN string like amqp:, amqps:, amqps://user:pass@localhost:1000/vhost
*
* @var string
*/
public $dsn = null;
public $dsn;

/**
* The message queue broker's host
*
* @var string|null
*/
public $host = null;
public $host;

/**
* The message queue broker's port
*
* @var string|null
*/
public $port = null;
public $port;

/**
* This is RabbitMQ user which is used to login on the broker
*
* @var string|null
*/
public $user = null;
public $user;

/**
* This is RabbitMQ password which is used to login on the broker
*
* @var string|null
*/
public $password = null;
public $password;

/**
* Virtual hosts provide logical grouping and separation of resources.
*
* @var string|null
*/
public $vhost = null;
public $vhost;

/**
* In seconds
* The time PHP socket waits for an information while reading. In seconds
*
* @var float|null
*/
public $readTimeout = null;
public $readTimeout;

/**
* In seconds
* The time PHP socket waits for an information while witting. In seconds
*
* @var float|null
*/
public $writeTimeout = null;
public $writeTimeout;

/**
* In seconds
* The time RabbitMQ keeps the connection on idle. In seconds
*
* @var float|null
*/
public $connectionTimeout = null;
public $connectionTimeout;

/**
* In seconds
* The periods of time PHP pings the broker in order to prolong the connection timeout. In seconds
*
* @var float|null
*/
public $heartbeat = null;
public $heartbeat;

/**
* PHP uses one shared connection if set true
*
* @var bool|null
*/
public $persisted = null;
public $persisted;

/**
* The connection will be established as later as possible if set true
*
* @var bool|null
*/
public $lazy = null;
public $lazy;

/**
* If false prefetch_count option applied separately to each new consumer on the channel
* If true prefetch_count option shared across all consumers on the channel
*
* @var bool|null
*/
public $qosGlobal = null;
public $qosGlobal;

/**
* Defines number of message pre-fetched in advance on a channel basis.
*
* @var int|null
*/
public $qosPrefetchSize = null;
public $qosPrefetchSize;

/**
* Defines number of message pre-fetched in advance per consumer.
*
* @var int|null
*/
public $qosPrefetchCount = null;
public $qosPrefetchCount;

/**
* Defines whether secure connection should be used or not
*
* @var bool|null
*/
public $sslOn = null;
public $sslOn;

/**
* Require verification of SSL certificate used.
*
* @var bool|null
*/
public $sslVerify = null;
public $sslVerify;

/**
* Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer.
*
* @var string|null
*/
public $sslCacert = null;
public $sslCacert;

/**
* Path to local certificate file on filesystem.
*
* @var string|null
*/
public $sslCert = null;
public $sslCert;

/**
* Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key.
*
* @var string|null
*/
public $sslKey = null;
public $sslKey;

/**
* The queue used to consume messages from
*
* @var string
*/
public $queueName = 'interop_queue';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phpdoc should describe what is the property for.


/**
* The exchange used to publish messages to
*
* @var string
*/
public $exchangeName = 'exchange';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phpdoc should describe what is the property for.


/**
* Defines the amqp interop transport being internally used. Currently supports lib, ext and bunny values
*
* @var string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phpdoc should describe what is the property for.

*/
public $driver = 'lib';
public $driver = self::ENQUEUE_AMQP_LIB;

/**
* This property should be an integer indicating the maximum priority the queue should support. Default is 10
*
* @var int
*/
public $maxPriority = 10;

/**
* The property contains a command class which used in cli.
*
* @var string command class name
*/
public $commandClass = Command::class;

/**
* Amqp interop context
*
* @var AmqpContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phpdoc should describe what is the property for.

*/
protected $context;

/**
* List of supported amqp interop drivers
*
* @var string[]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phpdoc should describe what is the property for.

*/
protected $supportedDrivers = ['lib', 'ext', 'bunny'];
protected $supportedDrivers = [self::ENQUEUE_AMQP_LIB, self::ENQUEUE_AMQP_EXT, self::ENQUEUE_AMQP_BUNNY];

/**
* @inheritdoc
Expand Down Expand Up @@ -283,21 +341,21 @@ protected function open()
}

switch ($this->driver) {
case 'lib':
case self::ENQUEUE_AMQP_LIB:
$connectionClass = AmqpLibConnectionFactory::class;

break;
case 'ext':
case self::ENQUEUE_AMQP_EXT:
$connectionClass = AmqpExtConnectionFactory::class;

break;
case 'bunny':
case self::ENQUEUE_AMQP_BUNNY:
$connectionClass = AmqpBunnyConnectionFactory::class;

break;

default:
throw new \LogicException(sprintf('The given driver "%s" is not supported. Supported are "%s"', $this->driver, implode('", "', $this->supportedDrivers)));
throw new \LogicException(sprintf('The given driver "%s" is not supported. Drivers supported are "%s"', $this->driver, implode('", "', $this->supportedDrivers)));
}

$config = [
Expand Down Expand Up @@ -341,7 +399,7 @@ public function setupBroker()
{
$queue = $this->context->createQueue($this->queueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
$queue->setArguments(['x-max-priority' => 4]);
$queue->setArguments(['x-max-priority' => $this->maxPriority]);
$this->context->declareQueue($queue);

$topic = $this->context->createTopic($this->exchangeName);
Expand Down
8 changes: 4 additions & 4 deletions tests/drivers/amqp_interop/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ protected function getQueue()

public function testRun()
{
// Not supported
$this->markTestSkipped('Not supported');
}

public function testStatus()
{
// Not supported
$this->markTestSkipped('Not supported');
}

public function testLater()
{
// Not supported
$this->markTestSkipped('Not supported');
}

public function testRetry()
{
// Limited support
$this->markTestSkipped('Not supported');
}
}
0