-
-
Notifications
You must be signed in to change notification settings - Fork 291
Add AMQP interop based driver. #158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I'll work on tests and other stuff once we agree in general on this matter. |
@samdark @zhuravljov could you please have a look? |
I don't have enough experience do decide on this one in general so @zhuravljov this one is yours. I'll check code in general. |
src/drivers/amqp/Queue.php
Outdated
|
||
$this->context->createProducer() | ||
|
||
// TODO check values compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll work on tests and other stuff once we agree in general on this matter.
src/drivers/amqp/Queue.php
Outdated
$this->channel->queue_declare($this->queueName, false, true, false, false); | ||
$this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); | ||
$this->channel->queue_bind($this->queueName, $this->exchangeName); | ||
if ($this->context) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add {
and }
please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, (that's how it was, I kept original styling).
src/drivers/amqp/Queue.php
Outdated
if (!$this->channel) return; | ||
$this->channel->close(); | ||
$this->connection->close(); | ||
if (!$this->context) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add {
and }
please.
This implementation also supports new features: ttr, attempts, delays, priorities. |
@zhuravljov I was wondering if you have you got time to look into this PR? |
src/drivers/amqp/Queue.php
Outdated
'host' => $this->host, | ||
'port' => $this->port, | ||
'user' => $this->user, | ||
'pass' => $this->password, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure thing
@makasim thank you for contribution, I will look and test soon. |
Added:
|
Would it be easier for you to review it if I, instead of changing existing class, create new ones? |
e5eea13
to
44570a3
Compare
bfe426f
to
ba7a8b4
Compare
A test fail is not related to my changes. |
@samdark @zhuravljov @thiagotalma I addressed all review comments. |
CHANGELOG.md
Outdated
@@ -4,8 +4,7 @@ Yii2 Queue Extension Change Log | |||
2.0.2 under development | |||
----------------------- | |||
|
|||
- no changes in this release. | |||
|
|||
- Enh: Add Amqp Interop driver (makasim) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#158
docs/guide/README.md
Outdated
@@ -18,6 +18,7 @@ Queue Drivers | |||
* [Db](driver-db.md) | |||
* [Redis](driver-redis.md) | |||
* [RabbitMQ](driver-amqp.md) | |||
* [AMQP Interop)](driver-amqp-interop.md) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove extra )
.
docs/guide/driver-amqp-interop.md
Outdated
|
||
* It would work with any amqp interop compatible transports, such as | ||
|
||
* [enqueue/amqp-ext](https://github.com/php-enqueue/amqp-ext) based on [php amqp extension](https://github.com/pdezwart/php-amqp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
php → PHP
/** | ||
* Manages application amqp-queue. | ||
* | ||
* @author Maksym Kotliar <kotlyar.maksim@gmail.com> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@since 2.0.2
(or which version is it going to be released it, @zhuravljov ?)
src/drivers/amqp_interop/Queue.php
Outdated
*/ | ||
class Queue extends CliQueue | ||
{ | ||
const H_ATTEMPT = 'yii-attempt'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is H_
? Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
H stands for Head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Head of what?
src/drivers/amqp_interop/Queue.php
Outdated
$connectionClass = AmqpLibConnectionFactory::class; | ||
|
||
break; | ||
case 'ext': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to class constants.
src/drivers/amqp_interop/Queue.php
Outdated
$connectionClass = AmqpExtConnectionFactory::class; | ||
|
||
break; | ||
case 'bunny': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to class constants.
src/drivers/amqp_interop/Queue.php
Outdated
break; | ||
|
||
default: | ||
throw new \LogicException(sprintf('The given driver "%s" is not supported. Supported are "%s"', $this->driver, implode('", "', $this->supportedDrivers))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Supported are "%s" → Drivers supported are "%s"
src/drivers/amqp_interop/Queue.php
Outdated
{ | ||
$queue = $this->context->createQueue($this->queueName); | ||
$queue->addFlag(AmqpQueue::FLAG_DURABLE); | ||
$queue->setArguments(['x-max-priority' => 4]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is 4? Why 4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set a priority between 0 and 4. Here's the priorities doc
Resource Usage Considerations
If priority queues are desired, we recommend using between 1 and 10. Currently using more priorities will consume more resources (Erlang processes).
There is some in-memory and on-disk cost per priority level per queue. There is also an additional CPU cost, especially when consuming, so you may not wish to create huge numbers of levels.
The message priority field is defined as an unsigned byte, so in practice priorities should be between 0 and 255.
I add an ability to configure it and set the default value to 10 (as suggested by off doc).
Removed: Added an exception for the case where a developer wants to use a higher value. because of
Messages with a priority which is higher than the queue's maximum are treated as if they were published with the maximum priority.
|
||
public function testRun() | ||
{ | ||
// Not supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
markTestSkipped()?
@samdark done |
src/drivers/amqp_interop/Queue.php
Outdated
*/ | ||
class Queue extends CliQueue | ||
{ | ||
const H_ATTEMPT = 'yii-attempt'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to remove H_
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Travis build fail is not related to my changes |
@zhuravljov could you please give a hand with the error on Travis? |
@zhuravljov tests failed because of this commit d456b75. While fixing it I realized that it is a good idea to keep setup broker logic and connection one separate. If you mix them up you won't be able to do anything if setup broker fails. For example, the broker has the same queue but with a different argument for |
It is merged. Thank you for big work! |
Party time! 🎉 🎉 🎉 Thank you guys for your patience and thoughtful reviews! |
Advantages:
It would work with any amqp interop compatible transports, such as
Supports priorities
Supports delays
Supports ttr
Supports attempts
Contains new options like: vhost, connection_timeout, qos_prefetch_count and so on.
Supports Secure (SSL) AMQP connections.
Add ability to set DSN like: amqp:, amqps: or amqp://user:pass@localhost:1000/vhost