1. 安装:
    composer require php-amqplib/php-amqplib
  2. 使用:
    <?php
    require_once './vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use PhpAmqpLib\Wire\AMQPTable;
    /**
    * Class MQ
    */
    class MQ{
     private $connection;
     private $channel;
     private $exchange_name;
     private $queue_name;
     /**
      * MQ constructor.
      * @param $config
      * @param $exchange_name
      * @param $queue_name
      * @param int $max_priority 最大优先级设置
      * @throws Exception
      */
     public function __construct($config,$exchange_name,$queue_name,$max_priority = 10){
         $this->connection = AMQPStreamConnection::create_connection($config);
         $this->channel = $this->connection->channel();
         $args = new AMQPTable([
             'x-delayed-type' => 'direct',
         ]);
         $this->channel->exchange_declare($exchange_name, 'x-delayed-message', false, true, false, false, false, $args);
         $args = new AMQPTable([
             'x-max-priority' => $max_priority,
         ]);
         $this->channel->queue_declare($queue_name, false, true, false, false, false, $args);
         $this->channel->queue_bind($queue_name, $exchange_name);
         $this->exchange_name = $exchange_name;
         $this->queue_name = $queue_name;
     }
     /**
      * 生产消息
      * @param string $msg 消息
      * @param int $priority 优先级:值越大,优先级越高
      * @param int $delay 消息延迟时间,单位:秒
      */
     public function publish($msg,$priority,$delay){
         $conf = [
             'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
             'priority' => intval($priority),
         ];
         $msg = new AMQPMessage($msg, $conf);
         $headers = new AMQPTable(['x-delay' => $delay * 1000]);
         $msg->set('application_headers', $headers);
         $this->channel->basic_publish($msg, $this->exchange_name);
     }
     /**
      * 消费消息
      * @param callable $callback 回调函数
      */
     public function consume($callback){
         $this->channel->basic_qos(null, 1, null);
         $this->channel->basic_consume($this->queue_name, '', false, false, false, false, $callback);
         while ($this->channel->is_consuming()) {
             $this->channel->wait();
         }
     }
     public function __destruct(){
         $this->connection->close();
         $this->channel->close();
     }
    }
    $config = [
     ['host' => '10.110.1.13', 'port' => 5672, 'user' => 'admin', 'password' => 'admin', 'vhost' => '/'],
    ];
    $exchange_name = 'exchange-key';
    $queue_name = 'queue-key';
    $mq = new MQ($config,$exchange_name,$queue_name);
    // 生产消息
    $delay = 3;
    $mq->publish('优先级:1',1,$delay);
    $mq->publish('优先级:2',2,$delay);
    $mq->publish('优先级:3',3,$delay);
    // 消费消息
    $callback = function ($msg) {
     // todo something
     echo ' [x] Received ', $msg->body, PHP_EOL;
     sleep(1);
     echo " [x] Done" . PHP_EOL;
     // todo something
     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);  // 发送响应,代表该消息已处理成功
    };
    $mq->consume($callback);
  3. 延迟3秒后,输出:
    [x] Received 优先级:3
    [x] Done
    [x] Received 优先级:2
    [x] Done
    [x] Received 优先级:1
    [x] Done
文档更新时间: 2021-11-26 10:59   作者:lee