安装:

composer require php-amqplib/php-amqplib

延时队列 + 优先级队列

  1. 使用:
     <?php
     require_once './vendor/autoload.php';
     use PhpAmqpLib\Connection\AMQPStreamConnection;
     use PhpAmqpLib\Message\AMQPMessage;
     use PhpAmqpLib\Wire\AMQPTable;
     /**
      * Class MQ
      */
     class MQ{
         private $connection;
         private $channel;
         private $exchange_name;
         private $queue_name;
         /**
          * MQ constructor.
          * @param $config
          * @param $exchange_name
          * @param $queue_name
          * @param int $max_priority 最大优先级设置
          * @throws Exception
          */
         public function __construct($config,$exchange_name,$queue_name,$max_priority = 10){
             $this->connection = AMQPStreamConnection::create_connection($config);
             $this->channel = $this->connection->channel();
             $args = new AMQPTable([
                 'x-delayed-type' => 'direct',
             ]);
             $this->channel->exchange_declare($exchange_name, 'x-delayed-message', false, true, false, false, false, $args);
             $args = new AMQPTable([
                 'x-max-priority' => $max_priority,
             ]);
             $this->channel->queue_declare($queue_name, false, true, false, false, false, $args);
             $this->channel->queue_bind($queue_name, $exchange_name);
             $this->exchange_name = $exchange_name;
             $this->queue_name = $queue_name;
         }
         /**
          * 生产消息
          * @param string $msg 消息
          * @param int $priority 优先级:值越大,优先级越高
          * @param int $delay 消息延迟时间,单位:秒
          */
         public function publish($msg,$priority,$delay){
             $conf = [
                 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                 'priority' => intval($priority),
             ];
             $msg = new AMQPMessage($msg, $conf);
             $headers = new AMQPTable(['x-delay' => $delay * 1000]);
             $msg->set('application_headers', $headers);
             $this->channel->basic_publish($msg, $this->exchange_name);
         }
         /**
          * 消费消息
          * @param callable $callback 回调函数
          */
         public function consume($callback){
             $this->channel->basic_qos(null, 1, null);
             $this->channel->basic_consume($this->queue_name, '', false, false, false, false, $callback);
             while ($this->channel->is_consuming()) {
                 $this->channel->wait();
             }
         }
         public function __destruct(){
             $this->connection->close();
             $this->channel->close();
         }
     }
     $config = [
         ['host' => '10.110.1.13', 'port' => 5672, 'user' => 'admin', 'password' => 'admin', 'vhost' => '/'],
     ];
     $exchange_name = 'exchange-key';
     $queue_name = 'queue-key';
     $mq = new MQ($config,$exchange_name,$queue_name);
     // 生产消息
     $delay = 3;
     $mq->publish('优先级:1',1,$delay);
     $mq->publish('优先级:2',2,$delay);
     $mq->publish('优先级:3',3,$delay);
     // 消费消息
     $callback = function ($msg) {
         // todo something
         echo ' [x] Received ', $msg->body, PHP_EOL;
         sleep(1);
         echo " [x] Done" . PHP_EOL;
         // todo something
         $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);  // 发送响应,代表该消息已处理成功
     };
     $mq->consume($callback);
  2. 延迟3秒后,输出:
     [x] Received 优先级:3
     [x] Done
     [x] Received 优先级:2
     [x] Done
     [x] Received 优先级:1
     [x] Done

事务机制

  1. produce.php:

     <?php
     require_once "./vendor/autoload.php";
     use PhpAmqpLib\Connection\AMQPStreamConnection;
     use PhpAmqpLib\Message\AMQPMessage;
     $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456");
     $channel = $connection->channel();
     try {
         $channel->tx_select();
         $channel->queue_declare("hello", false, false, false, false);
         for ($i = 0; $i < 1000; $i++) {
             $msg = new AMQPMessage("Hello World!");
             $channel->basic_publish($msg, "", "hello");
             if ($i == 10) {
                 throw new Exception("something wrong..");
             }
         }
         $channel->tx_commit();
     } catch (Exception $e) {
         $channel->tx_rollback();
         echo $e->getMessage();
     }
     $channel->close();
     $connection->close();
  2. consume.php:

     <?php
     require_once "./vendor/autoload.php";
     use PhpAmqpLib\Connection\AMQPStreamConnection;
     $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456");
     $channel = $connection->channel();
     $channel->queue_declare("hello", false, false, false, false);
     echo " [*] Waiting for messages. To exit press CTRL+C", "\n";
     $callback = function($msg) {
         echo " [x] Received ", $msg->body, "\n";
     };
     $channel->basic_consume("hello", "", false, true, false, false, $callback);
     while(count($channel->callbacks)) {
         $channel->wait();
     }

confirm 机制

  1. produce.php:

     <?php
     require_once "./vendor/autoload.php";
     use PhpAmqpLib\Connection\AMQPStreamConnection;
     use PhpAmqpLib\Message\AMQPMessage;
     $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456");
     $channel = $connection->channel();
     $channel->confirm_select();
     $channel->set_ack_handler(function (AMQPMessage $message){
         // 发送成功的消息
         echo 'ack' . $message->getBody() . PHP_EOL;
     });
     $channel->set_nack_handler(function (AMQPMessage $message){
         // 未发送成功的消息,可以做一些重新入队的操作
         echo 'nack' . $message->getBody() .PHP_EOL;
     });
     $channel->queue_declare("hello", false, false, false, false);
     for ($i = 0; $i < 10000; $i++) {
         $msg = new AMQPMessage("Hello World!" . $i);
         $channel->basic_publish($msg, "", "hello");
         $channel->wait_for_pending_acks_returns(5);
         sleep(1);
     }
     $channel->close();
     $connection->close();
  2. consume.php:

     <?php
     require_once "./vendor/autoload.php";
     use PhpAmqpLib\Connection\AMQPStreamConnection;
     $connection = new AMQPStreamConnection("10.0.0.14", 5672, "admin", "123456");
     $channel = $connection->channel();
     $channel->queue_declare("hello", false, false, false, false);
     echo " [*] Waiting for messages. To exit press CTRL+C", "\n";
     $callback = function($msg) {
         echo " [x] Received ", $msg->body, "\n";
     };
     $channel->basic_consume("hello", "", false, true, false, false, $callback);
     while(count($channel->callbacks)) {
         $channel->wait();
     }
文档更新时间: 2024-04-20 10:57   作者:lee