-
-
Notifications
You must be signed in to change notification settings - Fork 291
Add AMQP interop based driver. #158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ba7a8b4
1ae568e
e8374f0
5739f4d
f7c2a19
a2d3c9d
1a66da3
f248f73
7040946
d456b75
8110ee8
9cc7a49
0c35b9e
0534ac4
32d3c76
0b20354
54e7299
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
* Amqp Queue | ||
* | ||
* @author Maksym Kotliar <kotlyar.maksim@gmail.com> | ||
* @since 2.0.2 | ||
*/ | ||
class Queue extends CliQueue | ||
{ | ||
|
@@ -36,143 +37,200 @@ class Queue extends CliQueue | |
const H_DELAY = 'yii-delay'; | ||
const H_PRIORITY = 'yii-priority'; | ||
|
||
const ENQUEUE_AMQP_LIB = 'enqueue/amqp-lib'; | ||
const ENQUEUE_AMQP_EXT = 'enqueue/amqp-ext'; | ||
const ENQUEUE_AMQP_BUNNY = 'enqueue/amqp-bunny'; | ||
|
||
/** | ||
* @var string like amqp:, amqps:, amqps://user:pass@localhost:1000/vhost | ||
* The connection to the borker could be configured as an array of options | ||
* or as a DSN string like amqp:, amqps:, amqps://user:pass@localhost:1000/vhost | ||
* | ||
* @var string | ||
*/ | ||
public $dsn = null; | ||
public $dsn; | ||
|
||
/** | ||
* The message queue broker's host | ||
* | ||
* @var string|null | ||
*/ | ||
public $host = null; | ||
public $host; | ||
|
||
/** | ||
* The message queue broker's port | ||
* | ||
* @var string|null | ||
*/ | ||
public $port = null; | ||
public $port; | ||
|
||
/** | ||
* This is RabbitMQ user which is used to login on the broker | ||
* | ||
* @var string|null | ||
*/ | ||
public $user = null; | ||
public $user; | ||
|
||
/** | ||
* This is RabbitMQ password which is used to login on the broker | ||
* | ||
* @var string|null | ||
*/ | ||
public $password = null; | ||
public $password; | ||
|
||
/** | ||
* Virtual hosts provide logical grouping and separation of resources. | ||
* | ||
* @var string|null | ||
*/ | ||
public $vhost = null; | ||
public $vhost; | ||
|
||
/** | ||
* In seconds | ||
* The time PHP socket waits for an information while reading. In seconds | ||
* | ||
* @var float|null | ||
*/ | ||
public $readTimeout = null; | ||
public $readTimeout; | ||
|
||
/** | ||
* In seconds | ||
* The time PHP socket waits for an information while witting. In seconds | ||
* | ||
* @var float|null | ||
*/ | ||
public $writeTimeout = null; | ||
public $writeTimeout; | ||
|
||
/** | ||
* In seconds | ||
* The time RabbitMQ keeps the connection on idle. In seconds | ||
* | ||
* @var float|null | ||
*/ | ||
public $connectionTimeout = null; | ||
public $connectionTimeout; | ||
|
||
/** | ||
* In seconds | ||
* The periods of time PHP pings the broker in order to prolong the connection timeout. In seconds | ||
* | ||
* @var float|null | ||
*/ | ||
public $heartbeat = null; | ||
public $heartbeat; | ||
|
||
/** | ||
* PHP uses one shared connection if set true | ||
* | ||
* @var bool|null | ||
*/ | ||
public $persisted = null; | ||
public $persisted; | ||
|
||
/** | ||
* The connection will be established as later as possible if set true | ||
* | ||
* @var bool|null | ||
*/ | ||
public $lazy = null; | ||
public $lazy; | ||
|
||
/** | ||
* If false prefetch_count option applied separately to each new consumer on the channel | ||
* If true prefetch_count option shared across all consumers on the channel | ||
* | ||
* @var bool|null | ||
*/ | ||
public $qosGlobal = null; | ||
public $qosGlobal; | ||
|
||
/** | ||
* Defines number of message pre-fetched in advance on a channel basis. | ||
* | ||
* @var int|null | ||
*/ | ||
public $qosPrefetchSize = null; | ||
public $qosPrefetchSize; | ||
|
||
/** | ||
* Defines number of message pre-fetched in advance per consumer. | ||
* | ||
* @var int|null | ||
*/ | ||
public $qosPrefetchCount = null; | ||
public $qosPrefetchCount; | ||
|
||
/** | ||
* Defines whether secure connection should be used or not | ||
* | ||
* @var bool|null | ||
*/ | ||
public $sslOn = null; | ||
public $sslOn; | ||
|
||
/** | ||
* Require verification of SSL certificate used. | ||
* | ||
* @var bool|null | ||
*/ | ||
public $sslVerify = null; | ||
public $sslVerify; | ||
|
||
/** | ||
* Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. | ||
* | ||
* @var string|null | ||
*/ | ||
public $sslCacert = null; | ||
public $sslCacert; | ||
|
||
/** | ||
* Path to local certificate file on filesystem. | ||
* | ||
* @var string|null | ||
*/ | ||
public $sslCert = null; | ||
public $sslCert; | ||
|
||
/** | ||
* Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. | ||
* | ||
* @var string|null | ||
*/ | ||
public $sslKey = null; | ||
public $sslKey; | ||
|
||
/** | ||
* The queue used to consume messages from | ||
* | ||
* @var string | ||
*/ | ||
public $queueName = 'interop_queue'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. phpdoc should describe what is the property for. |
||
|
||
/** | ||
* The exchange used to publish messages to | ||
* | ||
* @var string | ||
*/ | ||
public $exchangeName = 'exchange'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. phpdoc should describe what is the property for. |
||
|
||
/** | ||
* Defines the amqp interop transport being internally used. Currently supports lib, ext and bunny values | ||
* | ||
* @var string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. phpdoc should describe what is the property for. |
||
*/ | ||
public $driver = 'lib'; | ||
public $driver = self::ENQUEUE_AMQP_LIB; | ||
|
||
/** | ||
* This property should be an integer indicating the maximum priority the queue should support. Default is 10 | ||
* | ||
* @var int | ||
*/ | ||
public $maxPriority = 10; | ||
|
||
/** | ||
* The property contains a command class which used in cli. | ||
* | ||
* @var string command class name | ||
*/ | ||
public $commandClass = Command::class; | ||
|
||
/** | ||
* Amqp interop context | ||
* | ||
* @var AmqpContext | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. phpdoc should describe what is the property for. |
||
*/ | ||
protected $context; | ||
|
||
/** | ||
* List of supported amqp interop drivers | ||
* | ||
* @var string[] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. phpdoc should describe what is the property for. |
||
*/ | ||
protected $supportedDrivers = ['lib', 'ext', 'bunny']; | ||
protected $supportedDrivers = [self::ENQUEUE_AMQP_LIB, self::ENQUEUE_AMQP_EXT, self::ENQUEUE_AMQP_BUNNY]; | ||
|
||
/** | ||
* @inheritdoc | ||
|
@@ -283,21 +341,21 @@ protected function open() | |
} | ||
|
||
switch ($this->driver) { | ||
case 'lib': | ||
case self::ENQUEUE_AMQP_LIB: | ||
$connectionClass = AmqpLibConnectionFactory::class; | ||
|
||
break; | ||
case 'ext': | ||
case self::ENQUEUE_AMQP_EXT: | ||
$connectionClass = AmqpExtConnectionFactory::class; | ||
|
||
break; | ||
case 'bunny': | ||
case self::ENQUEUE_AMQP_BUNNY: | ||
$connectionClass = AmqpBunnyConnectionFactory::class; | ||
|
||
break; | ||
|
||
default: | ||
throw new \LogicException(sprintf('The given driver "%s" is not supported. Supported are "%s"', $this->driver, implode('", "', $this->supportedDrivers))); | ||
throw new \LogicException(sprintf('The given driver "%s" is not supported. Drivers supported are "%s"', $this->driver, implode('", "', $this->supportedDrivers))); | ||
} | ||
|
||
$config = [ | ||
|
@@ -341,7 +399,7 @@ public function setupBroker() | |
{ | ||
$queue = $this->context->createQueue($this->queueName); | ||
$queue->addFlag(AmqpQueue::FLAG_DURABLE); | ||
$queue->setArguments(['x-max-priority' => 4]); | ||
$queue->setArguments(['x-max-priority' => $this->maxPriority]); | ||
$this->context->declareQueue($queue); | ||
|
||
$topic = $this->context->createTopic($this->exchangeName); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@since 2.0.2
(or which version is it going to be released it, @zhuravljov ?)