1. 项目地址:
    https://github.com/streadway/amqp
  2. 使用:
    package main
    import (
     "fmt"
     "github.com/streadway/amqp"
     "time"
    )
    /**
    * class MQ
    */
    type MQ struct {
     Connection *amqp.Connection
     Channel *amqp.Channel
     QueueName string
    }
    /**
    * 初始化方法
    * @param url string 连接地址
    * @param queue_name 队列名
    * @param max_priority 最大优先级设置
    */
    func (this *MQ) Init(url string,queue_name string,max_priority int) {
     conn, _ := amqp.Dial(url)
     ch, _ := conn.Channel()
     ch.Qos(
         1,
         0,
         false,
     )
     args := amqp.Table{"x-max-priority":int32(max_priority)}
     ch.QueueDeclare(
         queue_name,
         true,
         false,
         false,
         false,
         args,
     )
     this.Connection = conn
     this.Channel = ch
     this.QueueName = queue_name
    }
    /**
    * 生产者
    * @param msg 消息
    * @param priority 优先级:值越大,优先级越高
    */
    func (this *MQ) Publish(msg string,priority int) {
     err := this.Channel.Publish(
         "",
         this.QueueName,
         false,
         false,
         amqp.Publishing{
             ContentType: "text/plain",
             Priority: uint8(priority),
             Body :      []byte(msg),
         })
     if err != nil {
         fmt.Println("publish failed:",err)
     }
    }
    /**
    * 消费消息
    * @param callback 回调方法
    */
    func (this *MQ) Consume(callback func(d amqp.Delivery))  {
     msgs,err := this.Channel.Consume(
         this.QueueName,
         "",
         false,
         false,
         false,
         false,
         nil,
     )
     if err != nil {
         fmt.Println("consume failed:",err)
     }
     ch := make(chan bool)
     go func(){
         for d:= range msgs{
             callback(d)
         }
     }()
     <- ch
    }
    /**
    * 初始化方法
    */
    func (this *MQ) Destruct()  {
     this.Connection.Close()
     this.Channel.Close()
    }
    func main() {
     mq := MQ{}
     url := "amqp://admin:admin@192.168.0.11:5672"
     queue_name := "test_queue"
     max_priority := 10
     mq.Init(url,queue_name,max_priority)
     // 生产消息
     mq.Publish("优先级:1",1)
     mq.Publish("优先级:2",2)
     mq.Publish("优先级:3",3)
     // 消费消息
     mq.Consume(func(d amqp.Delivery) {
         // todo something
         fmt.Println(" [x] Received ", string(d.Body))
         time.Sleep(time.Second)
         fmt.Println(" [x] Done")
         // todo something
         d.Ack(false)  // 发送响应,代表该消息已处理成功
     })
     mq.Destruct()
    }
    //输出
    //[x] Received  优先级:3
    //[x] Done
    //[x] Received  优先级:2
    //[x] Done
    //[x] Received  优先级:1
    //[x] Done
文档更新时间: 2021-05-31 08:56   作者:lee