1. 项目地址:
    https://github.com/streadway/amqp
  2. 使用:
    package main
    import (
     "fmt"
     "github.com/streadway/amqp"
     "time"
    )
    /**
    * class MQ
    */
    type MQ struct {
     Connection *amqp.Connection
     Channel *amqp.Channel
     ExchangeName string
     QueueName string
    }
    /**
    * 初始化方法
    * @param url string 连接地址
    * @param exchange_name 交换机名
    * @param queue_name 队列名
    * @param max_priority 最大优先级设置
    */
    func (this *MQ) Init(url string,exchange_name string,queue_name string,max_priority int) {
     conn, _ := amqp.Dial(url)
     ch, _ := conn.Channel()
     args := amqp.Table{
         "x-delayed-type": "direct",
     }
     ch.ExchangeDeclare(
         exchange_name,
         "x-delayed-message",
         true,
         false,
         false,
         false,
         args,
         )
     arg := amqp.Table{"x-max-priority":int32(max_priority)}
     ch.QueueDeclare(
         queue_name,
         true,
         false,
         false,
         false,
         arg,
         )
     ch.QueueBind(
         queue_name,
         "",
         exchange_name,
         false,
         nil,
         )
     ch.Qos(1, 0, false)
     this.Connection = conn
     this.Channel = ch
     this.ExchangeName = exchange_name
     this.QueueName = queue_name
    }
    /**
    * 生产者
    * @param msg 消息
    * @param priority 优先级:值越大,优先级越高
    * @param delay 延迟时间,单位:秒
    */
    func (this *MQ) Publish(msg string,priority int,delay int) {
     headers := amqp.Table{
         "x-delay": delay * 1000,
     }
     this.Channel.Publish(this.ExchangeName, "", false, false, amqp.Publishing{
         DeliveryMode: amqp.Persistent,
         Timestamp:    time.Now(),
         ContentType:  "application/text",
         Body:         []byte(msg),
         Headers:      headers,
         Priority: uint8(priority),
     })
    }
    /**
    * 消费消息
    * @param callback 回调方法
    */
    func (this *MQ) Consume(callback func(d amqp.Delivery))  {
     msgs,err := this.Channel.Consume(
         this.QueueName,
         "",
         false,
         false,
         false,
         false,
         nil,
     )
     if err != nil {
         fmt.Println("consume failed:",err)
     }
     ch := make(chan bool)
     go func(){
         for d:= range msgs{
             callback(d)
         }
     }()
     <- ch
    }
    /**
    * 析构方法
    */
    func (this *MQ) Destruct()  {
     this.Connection.Close()
     this.Channel.Close()
    }
    func main() {
     mq := MQ{}
     url := "amqp://admin:admin@10.110.1.13:5672"
     exchange_name := "exchange-key"
     queue_name := "queue-key"
     max_priority := 10
     mq.Init(url,exchange_name,queue_name,max_priority)
     // 生产消息
     delay := 3
     mq.Publish("优先级:1",1,delay)
     mq.Publish("优先级:2",2,delay)
     mq.Publish("优先级:3",3,delay)
     // 消费消息
     mq.Consume(func(d amqp.Delivery) {
         // todo something
         fmt.Println(" [x] Received ", string(d.Body))
         time.Sleep(time.Second)
         fmt.Println(" [x] Done")
         // todo something
         d.Ack(false)  // 发送响应,代表该消息已处理成功
     })
     mq.Destruct()
    }
  3. 延迟3秒后,输出:
    [x] Received 优先级:3
    [x] Done
    [x] Received 优先级:2
    [x] Done
    [x] Received 优先级:1
    [x] Done
文档更新时间: 2021-11-26 10:59   作者:lee