8000 Merge pull request #63 from chimdiadi/master · zerolugithub/rabbitmq-tutorials@7f807bf · GitHub
[go: up one dir, main page]

Skip to content

Commit 7f807bf

Browse files
Merge pull request rabbitmq#63 from chimdiadi/master
Added Examples for php-amqpc
2 parents 38a76d7 + b4b3830 commit 7f807bf

15 files changed

+668
-0
lines changed

php-amqp/.gitkeep

Whitespace-only changes.

php-amqp/README.md

Whitespace-only changes.

php-amqp/emit_log.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
//Establish connection to AMQP
4+
$connection = new AMQPConnection();
5+
$connection->setHost('127.0.0.1');
6+
$connection->setLogin('guest');
7+
$connection->setPassword('guest');
8+
$connection->connect();
9+
10+
//Create and declare channel
11+
$channel = new AMQPChannel($connection);
12+
13+
try {
14+
//Declare Exchange
15+
$exchange_name = 'logs';
16+
17+
$exchange = new AMQPExchange($channel);
18+
$exchange->setType(AMQP_EX_TYPE_FANOUT);
19+
$exchange->setName($exchange_name);
20+
$exchange->declareExchange();
21+
22+
//Do not declasre the queue name by setting AMQPQueue::setName()
23+
$queue = new AMQPQueue($channel);
24+
$queue->setFlags(AMQP_EXCLUSIVE);
25+
$queue->declareQueue();
26+
$queue->bind($exchange_name,$queue->getName());
27+
echo sprintf("Queue Name: %s", $queue->getName()), PHP_EOL;
28+
} catch(Exception $ex) {
29+
print_r($ex);
30+
}
31+
32+
33+
//Read from stdin
34+
$message = implode(' ',array_slice($argv,1));
35+
if(empty($message))
36+
$message = "info: Hello World!";
37+
38+
$exchange->publish($message, '');
39+
40+
echo " [X] Sent {$message}", PHP_EOL;
41+
$connection->disconnect();

php-amqp/emit_log_direct.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
//Establish connection to AMQP
4+
$connection = new AMQPConnection();
5+
$connection->setHost('127.0.0.1');
6+
$connection->setLogin('guest');
7+
$connection->setPassword('guest');
8+
$connection->connect();
9+
10+
11+
//Declare Channel
12+
$channel = new AMQPChannel($connection);
13+
14+
15+
$routing_key = $severity = $argv[1];
16+
if(empty($severity)) $severity = 'info';
17+
18+
$message = implode(' ',array_slice($argv, 2));
19+
if(empty($message)) $message = 'Hello World!';
20+
21+
try {
22+
//Declare Exchange
23+
$exchange_name = 'direct_logs';
24+
$exchange = new AMQPExchange($channel);
25+
$exchange->setType(AMQP_EX_TYPE_DIRECT);
26+
$exchange->setName($exchange_name);
27+
$exchange->declareExchange();
28+
29+
$queue = new AMQPQueue($channel);
30+
$queue->setFlags(AMQP_EXCLUSIVE);
31+
$queue->setName('monitor.1');
32+
$queue->declareQueue();
33+
34+
$queue->bind($exchange_name, $routing_key);
35+
$exchange->publish($message,$routing_key);
36+
echo " [x] Sent {$severity}:{$message}", PHP_EOL;
37+
} catch(Exception $ex) {
38+
print_r($ex);
39+
}
40+
41+
$connection->disconnect();

php-amqp/emit_log_topic.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
//Establish connection to AMQP
4+
$connection = new AMQPConnection();
5+
$connection->setHost('127.0.0.1');
6+
$connection->setLogin('guest');
7+
$connection->setPassword('guest');
8+
$connection->connect();
9+
10+
11+
//Declare Channel
12+
$channel = new AMQPChannel($connection);
13+
14+
15+
$routing_key = (isset($argv[1]))? $argv[1] : 'anonymous.info';
16+
17+
$message = implode(' ',array_slice($argv, 2));
18+
if(empty($message)) $message = "Hello World!";
19+
20+
21+
try {
22+
//Declare Exchange
23+
$exchange_name = 'topic_logs';
24+
$exchange = new AMQPExchange($channel);
25+
$exchange->setType(AMQP_EX_TYPE_TOPIC);
26+
$exchange->setName($exchange_name);
27+
$exchange->declareExchange();
28+
29+
$exchange->publish($message, $routing_key);
30+
echo " [x] Sent {$routing_key}:{$message}", PHP_EOL;
31+
} catch(Exception $ex) {
32+
print_r($ex);
33+
}

php-amqp/new_task.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
//Establish connection to AMQP
4+
$connection = new AMQPConnection();
5+
$connection->setHost('127.0.0.1');
6+
$connection->setLogin('guest');
7+
$connection->setPassword('guest');
8+
$connection->connect();
9+
10+
11+
12+
//Create and declare channel
13+
$channel = new AMQPChannel($connection);
14+
$channel->setPrefetchCount(1);
15+
16+
17+
$routing_key = 'task_queue';
18+
19+
try{
20+
$queue = new AMQPQueue($channel);
21+
$queue->setName($routing_key);
22+
$queue->setFlags(AMQP_DURABLE);
23+
$queue->declareQueue();
24+
25+
}catch(Exception $ex){
26+
print_r($ex);
27+
}
28+
29+
30+
//Read from stdin
31+
$message = implode(' ', array_slice($argv,1));
32+
if(empty($message))
33+
$message = "Hello World!";
34+
35+
$exchange = new AMQPExchange($channel);
36+
$exchange->publish($message, $routing_key);
37+
38+
echo " [x] Sent {$data}", PHP_EOL;
39+
40+
$connection->disconnect();

php-amqp/receive.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
4+
//Establish connection AMQP
5+
$connection = new AMQPConnection();
6+
$connection->setHost('127.0.0.1');
7+
$connection->setLogin('guest');
8+
$connection->setPassword('guest');
9+
$connection->connect();
10+
11+
//Create and declare channel
12+
$channel = new AMQPChannel($connection);
13+
14+
//AMQPC Exchange is the publishing mechanism
15+
$exchange = new AMQPExchange($channel);
16+
17+
18+
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q) use (&$max_consume) {
19+
echo PHP_EOL, "------------", PHP_EOL;
20+
echo " [x] Received ", $message->getBody(), PHP_EOL;
21+
echo PHP_EOL, "------------", PHP_EOL;
22+
23+
$q->nack($message->getDeliveryTag());
24+
sleep(1);
25+
};
26+
27+
try{
28+
$routing_key = 'hello';
29+
30+
$queue = new AMQPQueue($channel);
31+
$queue->setName($routing_key);
32+
$queue->setFlags(AMQP_NOPARAM);
33+
$queue->declareQueue();
34+
35+
echo ' [*] Waiting for messages. To exit press CTRL+C ', PHP_EOL;
36+
$queue->consume($callback_func);
37+
}catch(AMQPQueueException $ex){
38+
print_r($ex);
39+
}catch(Exception $ex){
40+
print_r($ex);
41+
}
42+
43+
echo 'Close connection...', PHP_EOL;
44+
$queue->cancel();
45+
$connection->disconnect();
46+

php-amqp/receive_log_topic.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
//Establish connection to AMQP
4+
$connection = new AMQPConnection();
5+
$connection->setHost('127.0.0.1');
6+
$connection->setLogin('guest');
7+
$connection->setPassword('guest');
8+
$connection->connect();
9+
10+
11+
//Declare Channel
12+
$channel = new AMQPChannel($connection);
13+
$channel->setPrefetchCount(1);
14+
15+
//Declare Exchange
16+
$exchange_name = 'topic_logs';
17+
$exchange = new AMQPExchange($channel);
18+
$exchange->setType(AMQP_EX_TYPE_TOPIC);
19+
$exchange->setName($exchange_name);
20+
$exchange->declareExchange();
21+
22+
$binding_keys = array_slice($argv, 1); //accept an array of inputs delimited by space
23+
if(empty($binding_keys)) {
24+
file_put_contents('php://stderr', "Usage: {$argv[0]} [binding_key]...\n");
25+
exit(1);
26+
}
27+
28+
//Declare Queue
29+
$queue = new AMQPQueue($channel);
30+
$queue->setFlags(AMQP_EXCLUSIVE);
31+
$queue->declareQueue();
32+
foreach($binding_keys as $binding_key) {
33+
$queue->bind($exchange_name, $binding_key);
34+
}
35+
36+
echo " [*] Waiting for logs. To exit press CTRL+C", PHP_EOL;
37+
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q) {
38+
echo sprintf(" [X] [%s] %s",$message->getRoutingKey(),$message->getBody()), PHP_EOL;
39+
//echo sprintf("Delivery Tag: %s",$message->getDeliveryTag()), PHP_EOL;
40+
$q->nack($message->getDeliveryTag());
41+
};
42+
43+
44+
try {
45+
$queue->consume($callback_func);
46+
} catch(AMQPQueueException $ex) {
47+
print_r($ex);
48+
} catch(Exception $ex) {
49+
print_r($ex);
50+
}

php-amqp/receive_logs.php

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
4+
//Establish connection to AMQP
5+
$connection = new AMQPConnection();
6+
$connection->setHost('127.0.0.1');
7+
$connection->setLogin('guest');
8+
$connection->setPassword('guest');
9+
$connection->connect();
10+
11+
//setup channel connection
12+
$channel = new AMQPChannel($connection);
13+
14+
15+
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q) use (&$max_jobs) {
16+
echo " [x] Received: ", $message->getBody(), PHP_EOL;
17+
sleep(1);
18+
$q->nack($message->getDeliveryTag());
19+
};
20+
21+
22+
try {
23+
//Declare Exchange
24+
$exchange_name = 'logs';
25+
$exchange = new AMQPExchange($channel);
26+
$exchange->setType(AMQP_EX_TYPE_FANOUT);
27+
$exchange->setName($exchange_name);
28+
$exchange->declareExchange();
29+
30+
31+
$queue = new AMQPQueue($channel);
32+
$queue->setFlags(AMQP_EXCLUSIVE); //allow server to define name
33+
$queue->declareQueue();
34+
$queue->bind($exchange_name,$queue->getName());
35+
36+
echo ' [*] Waiting for logs. To exit press CTRL+C', PHP_EOL;
37+
$queue->consume($callback_func);
38+
39+
} catch(AMQPQueueException $ex) {
40+
print_r($ex);
41+
} catch(AMQPExchangeException $ex) {
42+
print_r($ex);
43+
} catch(AMQPChanncelException $ex) {
44+
print_r($ex);
45+
} catch(AMQPConnectionException $ex) {
46+
print_r($ex);
47+
} catch(Exception $ex) {
48+
print_r($ex);
49+
}
50+
51+
$connection->disconnect();

php-amqp/receive_logs_direct.php

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
//Establish Connection
4+
$connection = new AMQPConnection();
5+
$connection->setHost('127.0.0.1');
6+
$connection->setLogin('guest');
7+
$connection->setPassword('guest');
8+
$connection->connect();
9+
10+
11+
//Listen on Channel
12+
$channel = new AMQPChannel($connection);
13+
14+
15+
echo " [*] Waiting for logs. To exit press CTRL+C", PHP_EOL;
16+
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q) {
17+
echo sprintf(" [X] [%s] %s",$message->getRoutingKey(),$message->getBody()), PHP_EOL;
18+
$q->nack($message->getDeliveryTag());
19+
return true;
20+
};
21+
22+
$severities = array_slice($argv,1);
23+
if(empty($severities)) {
24+
file_put_contents('php://stderr', "Usage: {$argv[0]} [info] [warning] [error]\n");
25+
exit(1);
26+
}
27+
28+
try {
29+
//Declare Exchange
30+
$exchange_name = 'direct_logs';
31+
$exchange = new AMQPExchange($channel);
32+
$exchange->setType(AMQP_EX_TYPE_DIRECT);
33+
$exchange->setName($exchange_name);
34+
$exchange->declareExchange();
35+
36+
37+
38+
//Declare Queue
39+
$queue = new AMQPQueue($channel);
40+
$queue->setFlags(AMQP_EXCLUSIVE);
41+
$queue->declareQueue();
42+
foreach($severities as $routing_key) {
43+
$queue->bind($exchange_name, $routing_key);
44+
}
45+
46+
$queue->consume($callback_func);
47+
} catch(AMQPQueueException $ex) {
48+
print_r($ex);
49+
} catch(Exception $ex) {
50+
print_r($ex);
51+
}

0 commit comments

Comments
 (0)
0