安装:
composer require php-amqplib/php-amqplib
延时队列 + 优先级队列
- 使用:
<?php require_once './vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * Class MQ */ class MQ{ private $connection; private $channel; private $exchange_name; private $queue_name; /** * MQ constructor. * @param $config * @param $exchange_name * @param $queue_name * @param int $max_priority 最大优先级设置 * @throws Exception */ public function __construct($config,$exchange_name,$queue_name,$max_priority = 10){ $this->connection = AMQPStreamConnection::create_connection($config); $this->channel = $this->connection->channel(); $args = new AMQPTable([ 'x-delayed-type' => 'direct', ]); $this->channel->exchange_declare($exchange_name, 'x-delayed-message', false, true, false, false, false, $args); $args = new AMQPTable([ 'x-max-priority' => $max_priority, ]); $this->channel->queue_declare($queue_name, false, true, false, false, false, $args); $this->channel->queue_bind($queue_name, $exchange_name); $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; } /** * 生产消息 * @param string $msg 消息 * @param int $priority 优先级:值越大,优先级越高 * @param int $delay 消息延迟时间,单位:秒 */ public function publish($msg,$priority,$delay){ $conf = [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'priority' => intval($priority), ]; $msg = new AMQPMessage($msg, $conf); $headers = new AMQPTable(['x-delay' => $delay * 1000]); $msg->set('application_headers', $headers); $this->channel->basic_publish($msg, $this->exchange_name); } /** * 消费消息 * @param callable $callback 回调函数 */ public function consume($callback){ $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queue_name, '', false, false, false, false, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } public function __destruct(){ $this->connection->close(); $this->channel->close(); } } $config = [ ['host' => '10.110.1.13', 'port' => 5672, 'user' => 'admin', 'password' => 'admin', 'vhost' => '/'], ]; $exchange_name = 'exchange-key'; $queue_name = 'queue-key'; $mq = new MQ($config,$exchange_name,$queue_name); // 生产消息 $delay = 3; $mq->publish('优先级:1',1,$delay); $mq->publish('优先级:2',2,$delay); $mq->publish('优先级:3',3,$delay); // 消费消息 $callback = function ($msg) { // todo something echo ' [x] Received ', $msg->body, PHP_EOL; sleep(1); echo " [x] Done" . PHP_EOL; // todo something $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); // 发送响应,代表该消息已处理成功 }; $mq->consume($callback);
- 延迟3秒后,输出:
[x] Received 优先级:3 [x] Done [x] Received 优先级:2 [x] Done [x] Received 优先级:1 [x] Done
事务机制
produce.php:
<?php require_once "./vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456"); $channel = $connection->channel(); try { $channel->tx_select(); $channel->queue_declare("hello", false, false, false, false); for ($i = 0; $i < 1000; $i++) { $msg = new AMQPMessage("Hello World!"); $channel->basic_publish($msg, "", "hello"); if ($i == 10) { throw new Exception("something wrong.."); } } $channel->tx_commit(); } catch (Exception $e) { $channel->tx_rollback(); echo $e->getMessage(); } $channel->close(); $connection->close();
consume.php:
<?php require_once "./vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456"); $channel = $connection->channel(); $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume("hello", "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
confirm 机制
produce.php:
<?php require_once "./vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456"); $channel = $connection->channel(); $channel->confirm_select(); $channel->set_ack_handler(function (AMQPMessage $message){ // 发送成功的消息 echo 'ack' . $message->getBody() . PHP_EOL; }); $channel->set_nack_handler(function (AMQPMessage $message){ // 未发送成功的消息,可以做一些重新入队的操作 echo 'nack' . $message->getBody() .PHP_EOL; }); $channel->queue_declare("hello", false, false, false, false); for ($i = 0; $i < 10000; $i++) { $msg = new AMQPMessage("Hello World!" . $i); $channel->basic_publish($msg, "", "hello"); $channel->wait_for_pending_acks_returns(5); sleep(1); } $channel->close(); $connection->close();
consume.php:
<?php require_once "./vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456"); $channel = $connection->channel(); $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume("hello", "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
文档更新时间: 2024-04-20 10:57 作者:lee