8000 Add AMQP interop based driver. by makasim · Pull Request #158 · yiisoft/yii2-queue · GitHub
[go: up one dir, main page]

Skip to content

Add AMQP interop based driver. #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Dec 5, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove H_
  • Loading branch information
makasim committed Nov 29, 2017
commit f248f7372b209a8fdf015c7ee43eccef8666eaf9
24 changes: 12 additions & 12 deletions src/drivers/amqp_interop/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
*/
class Queue extends CliQueue
{
const H_ATTEMPT = 'yii-attempt';
const H_TTR = 'yii-ttr';
const H_DELAY = 'yii-delay';
const H_PRIORITY = 'yii-priority';
const ATTEMPT = 'yii-attempt';
const TTR = 'yii-ttr';
const DELAY = 'yii-delay';
const PRIORITY = 'yii-priority';

const ENQUEUE_AMQP_LIB = 'enqueue/amqp-lib';
const ENQUEUE_AMQP_EXT = 'enqueue/amqp-ext';
Expand Down Expand Up @@ -262,8 +262,8 @@ public function listen()
return true;
}

$ttr = $message->getProperty(self::H_TTR);
$attempt = $message->getProperty(self::H_ATTEMPT, 1);
$ttr = $message->getProperty(self::TTR);
$attempt = $message->getProperty(self::ATTEMPT, 1);

if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) {
$consumer->acknowledge($message);
Expand Down Expand Up @@ -303,18 +303,18 @@ protected function pushMessage($payload, $ttr, $delay, $priority)
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
$message->setMessageId(uniqid('', true));
$message->setTimestamp(time());
$message->setProperty(self::H_ATTEMPT, 1);
$message->setProperty(self::H_TTR, $ttr);
$message->setProperty(self::ATTEMPT, 1);
$message->setProperty(self::TTR, $ttr);

$producer = $this->context->createProducer();

if ($delay) {
$message->setProperty(self::H_DELAY, $delay);
$message->setProperty(self::DELAY, $delay);
$producer->setDeliveryDelay($delay * 1000);
}

if ($priority) {
$message->setProperty(self::H_PRIORITY, $priority);
$message->setProperty(self::PRIORITY, $priority);
$producer->setPriority($priority);
}

Expand Down Expand Up @@ -427,11 +427,11 @@ protected function close()
*/
protected function redeliver(AmqpMessage $message)
{
$attempt = $message->getProperty(self::H_ATTEMPT, 1);
$attempt = $message->getProperty(self::ATTEMPT, 1);

$newMessage = $this->context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
$newMessage->setDeliveryMode($message->getDeliveryMode());
$newMessage->setProperty(self::H_ATTEMPT, ++$attempt);
$newMessage->setProperty(self::ATTEMPT, ++$attempt);

$this->context->createProducer()->send(
$this->context->createQueue($this->queueName),
Expand Down
0