8000 [PoC] AMQP interop based broker. by makasim · Pull Request #23842 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[PoC] AMQP interop based broker. #23842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added two new components: AMQP and Worker
  • Loading branch information
lyrixx committed Jul 6, 2017
commit eb89bd8706132c52dd1db49cf39d74c2c8f87b41
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ env:
global:
- MIN_PHP=5.5.9
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/versions/5.6/bin/php
- RABBITMQ_URL=amqp://guest:guest@localhost:5672/

matrix:
include:
Expand All @@ -41,6 +42,7 @@ services:
- memcached
- mongodb
- redis-server
- rabbitmq

before_install:
- |
Expand Down Expand Up @@ -82,6 +84,7 @@ before_install:
echo apc.enable_cli = 1 >> $INI
echo extension = ldap.so >> $INI
echo extension = redis.so >> $INI
echo extension = amqp.so >> $INI
echo extension = memcached.so >> $INI
[[ $PHP = 5.* ]] && echo extension = memcache.so >> $INI
if [[ $PHP = 5.* ]]; then
Expand Down Expand Up @@ -159,6 +162,9 @@ install:

- if [[ ! $skip ]]; then $COMPOSER_UP; fi
- if [[ ! $skip ]]; then ./phpunit install; fi
- |
# setup rabbitmq
src/Symfony/Component/Amqp/bin/reset.php force
- |
# phpinfo
if [[ ! $PHP = hhvm* ]]; then php -i; else hhvm --php -r 'print_r($_SERVER);print_r(ini_get_all());'; fi
Expand Down
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"symfony/polyfill-util": "~1.0"
},
"replace": {
"symfony/amqp": "self.version",
"symfony/asset": "self.version",
"symfony/browser-kit": "self.version",
"symfony/cache": "self.version",
Expand Down Expand Up @@ -81,6 +82,7 @@
"symfony/web-link": "self.version",
"symfony/web-profiler-bundle": "self.version",
"symfony/web-server-bundle": "self.version",
"symfony/worker": "self.version",
"symfony/workflow": "self.version",
"symfony/yaml": "self.version"
},
Expand Down
1 change: 1 addition & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<env name="LDAP_PORT" value="3389" />
<env name="REDIS_HOST" value="localhost" />
<env name="MEMCACHED_HOST" value="localhost" />
<env name="RABBITMQ_URL" value="amqp://guest:guest@localhost:5672/sensiolabs_amqp" />
</php>

<testsuites>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

namespace Symfony\Bundle\FrameworkBundle\Command;

use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class WorkerAmqpMoveCommand extends ContainerAwareCommand
{
private $broker;
private $logger;

protected function configure()
{
$this
->setName('worker:amqp:move')
->setDescription('Take all messages from a queue, and send them to the default exchange with a new routing key.')
->setDefinition(array(
new InputArgument('from', InputArgument::REQUIRED, 'The queue.'),
new InputArgument('to', InputArgument::REQUIRED, 'The new routing key.'),
))
;
}

protected function initialize(InputInterface $input, OutputInterface $output)
{
$this->broker = $this->getContainer()->get('amqp.broker');
$this->logger = $this->getContainer()->get('logger');
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$from = $input->getArgument('from');
$to = $input->getArgument('to');

while (false !== $message = $this->broker->get($from)) {
$this->logger->info('Move a message from {from} to {to}.', array(
'from' => $from,
'to' => $to,
));
$this->broker->move($message, $to);
$this->broker->ack($message);
$this->logger->debug('...message moved {from} to {to}.', array(
'from' => $from,
'to' => $to,
));
}
}
}
26 changes: 26 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Command/WorkerListCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace Symfony\Bundle\FrameworkBundle\Command;

use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class WorkerListCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('worker:list')
->setDescription('List available workers.')
;
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$workers = $this->getContainer()->getParameter('worker.workers');

foreach ($workers as $name => $_) {
$output->writeln($name);
}
}
}
70 changes: 70 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Command/WorkerRunCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

namespace Symfony\Bundle\FrameworkBundle\Command;

use Symfony\Component\Amqp\Worker\ConfigurableLoopInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class WorkerRunC C361 ommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('worker:run')
->setDescription('Run a worker')
->setDefinition(array(
new InputArgument('worker', InputArgument::REQUIRED, 'The worker'),
new InputOption('name', null, InputOption::VALUE_REQUIRED, 'A name, useful for stats/monitoring. Defaults to worker name.'),
))
;
}

protected function execute(InputInterface $input, OutputInterface $output)
{
if (!extension_loaded('pcntl')) {
throw new \RuntimeException('The pcntl extension is mandatory.');
}

$loop = $this->getLoop($input);

$loopName = $input->getOption('name') ?: $loop->getName();

if ($loop instanceof ConfigurableLoopInterface) {
$loop->setName($loopName);
}

$processName = sprintf('%s_%s', $this->getContainer()->getParameter('worker.cli_title_prefix'), $loopName);

// On OSX, it may raise an error:
// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly
@cli_set_process_title($processName);

pcntl_signal(SIGTERM, function () use ($loop) {
$loop->stop('Signaled with SIGTERM.');
});
pcntl_signal(SIGINT, function () use ($loop) {
$loop->stop('Signaled with SIGINT.');
});

$loop->run();
}

private function getLoop(InputInterface $input)
{
$workers = $this->getContainer()->getParameter('worker.workers');

$workerName = $input->getArgument('worker');

if (!array_key_exists($workerName, $workers)) {
throw new \InvalidArgumentException(sprintf(
'The worker "%s" does not exist. Available ones are: "%s".',
$workerName, implode('", "', array_keys($workers))
));
}

return $this->getContainer()->get($workers[$workerName]);
}
}
Loading
0