项目地址:

https://github.com/streadway/amqp

优先级队列

  1. producer.go:

     package main
    
     import (
         "github.com/streadway/amqp"
         "strconv"
         "time"
     )
    
     var (
         url               = "amqp://root:123456@10.0.0.13:5672"
         queue             = "my-queue"
         maxPriority int32 = 10
     )
    
     func main() {
         connection, err := amqp.Dial(url)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         args := amqp.Table{"x-max-priority": maxPriority}
         _, err = channel.QueueDeclare(
             queue,
             true,
             false,
             false,
             false,
             args,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         channel.Qos(1, 0, false)
         for i := 0; i < 100; i++ {
             var priority uint8
             if i%2 == 0 { // 偶数优先级高
                 priority = 2
             } else {
                 priority = 1
             }
             msg := amqp.Publishing{
                 DeliveryMode: amqp.Persistent,
                 Timestamp:    time.Now(),
                 ContentType:  "application/text",
                 Body:         []byte("msg: " + strconv.Itoa(i)),
                 Priority:     priority,
             }
             err = channel.Publish("", queue, false, false, msg)
             if err != nil {
                 panic(err)
             }
         }
         channel.Close()
         connection.Close()
     }
  2. consumer.go:

     package main
    
     import (
         "fmt"
         "github.com/streadway/amqp"
         "time"
     )
    
     var (
         url               = "amqp://root:123456@10.0.0.13:5672"
         queue             = "my-queue"
         maxPriority int32 = 10
     )
    
     func rabbitConsume(u, q string) {
         defer func() {
             if err := recover(); err != nil {
                 time.Sleep(3 * time.Second)
                 rabbitConsume(u, q)
             }
         }()
         connection, err := amqp.Dial(u)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         args := amqp.Table{"x-max-priority": maxPriority}
         _, err = channel.QueueDeclare(
             q,
             true,
             false,
             false,
             false,
             args,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         channel.Qos(1, 0, false)
         closeChan := make(chan *amqp.Error, 1)
         notifyClose := channel.NotifyClose(closeChan)
         closeFlag := false
         msgs, err := channel.Consume(
             q,
             "",
             false,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("consume declare error: " + err.Error())
         }
         for {
             select {
             case e := <-notifyClose:
                 fmt.Println("channel 通道错误, error: ", e.Error())
                 close(closeChan)
                 time.Sleep(3 * time.Second)
                 rabbitConsume(u, q)
                 closeFlag = true
             case delivery := <-msgs:
                 // todo something
                 fmt.Println(" [x] Received ", string(delivery.Body))
                 //time.Sleep(time.Second)
                 fmt.Println(" [x] Done")
                 // todo something
                 delivery.Ack(false) // 发送响应,代表该消息已处理成功
             }
             if closeFlag {
                 break
             }
         }
     }
    
     func main() {
         // 开启 5 个协程去消费
         for i := 0; i < 5; i++ {
             go rabbitConsume(url, queue)
         }
         select {}
     }

事务机制

  1. producer.go:

     package main
    
     import (
         "github.com/streadway/amqp"
         "strconv"
         "time"
     )
    
     var (
         url   = "amqp://root:123456@10.0.0.13:5672"
         queue = "my-queue1"
     )
    
     func main() {
         connection, err := amqp.Dial(url)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         _, err = channel.QueueDeclare(
             queue,
             false,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         channel.Tx()
         for i := 0; i < 1000; i++ {
             msg := amqp.Publishing{
                 DeliveryMode: amqp.Persistent,
                 Timestamp:    time.Now(),
                 ContentType:  "application/text",
                 Body:         []byte("msg: " + strconv.Itoa(i)),
             }
             err = channel.Publish("", queue, false, false, msg)
             if err != nil {
                 channel.TxRollback()
                 panic(err)
             }
             if i == 500 {
                 channel.TxRollback()
                 panic("rollback due to panic.")
             }
         }
         channel.TxCommit()
         channel.Close()
         connection.Close()
     }
  2. consumer.go:

     package main
    
     import (
         "fmt"
         "github.com/streadway/amqp"
         "time"
     )
    
     var (
         url   = "amqp://root:123456@10.0.0.13:5672"
         queue = "my-queue1"
     )
    
     func rabbitConsume(u, q string) {
         defer func() {
             if err := recover(); err != nil {
                 time.Sleep(3 * time.Second)
                 rabbitConsume(u, q)
             }
         }()
         connection, err := amqp.Dial(u)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         _, err = channel.QueueDeclare(
             q,
             false,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         closeChan := make(chan *amqp.Error, 1)
         notifyClose := channel.NotifyClose(closeChan)
         closeFlag := false
         msgs, err := channel.Consume(
             q,
             "",
             false,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("consume declare error: " + err.Error())
         }
         for {
             select {
             case e := <-notifyClose:
                 fmt.Println("channel 通道错误, error: ", e.Error())
                 close(closeChan)
                 time.Sleep(3 * time.Second)
                 rabbitConsume(u, q)
                 closeFlag = true
             case delivery := <-msgs:
                 // todo something
                 fmt.Println(" [x] Received ", string(delivery.Body))
                 //time.Sleep(time.Second)
                 fmt.Println(" [x] Done")
                 // todo something
                 delivery.Ack(false) // 告知 rabbitmq 服务器消息处理成功
                 //delivery.Nack(false,false) // 告知 rabbitmq 服务器消息处理失败
             }
             if closeFlag {
                 break
             }
         }
     }
    
     func main() {
         // 开启 5 个 goroutine 去消费
         for i := 0; i < 5; i++ {
             go rabbitConsume(url, queue)
         }
         select {}
     }

confirm 机制

  1. producer.go:

     package main
    
     import (
         "fmt"
         "github.com/streadway/amqp"
         "strconv"
         "time"
     )
    
     var (
         url   = "amqp://root:123456@10.0.0.13:5672"
         queue = "my-queue1"
     )
    
     func main() {
         connection, err := amqp.Dial(url)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         channel.Confirm(false)
         _, err = channel.QueueDeclare(
             queue,
             false,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
         go func() {
             for {
                 select {
                 case confirmed := <-confirms:
                     if confirmed.Ack {
                         // 发送成功的消息
                         fmt.Printf("ack, tag is %d\n", confirmed.DeliveryTag)
                     } else {
                         // 未发送成功的消息
                         fmt.Printf("nack, tag is %d\n", confirmed.DeliveryTag)
                     }
    
                 }
             }
         }()
         for i := 0; i < 1000; i++ {
             msg := amqp.Publishing{
                 DeliveryMode: amqp.Persistent,
                 Timestamp:    time.Now(),
                 ContentType:  "application/text",
                 Body:         []byte("msg: " + strconv.Itoa(i)),
             }
             err = channel.Publish("", queue, false, false, msg)
             if err != nil {
                 fmt.Println("publish error: " + err.Error())
             }
         }
         channel.Close()
         connection.Close()
     }
  2. consumer.go(同【事务机制】-consumer.go)

延时队列(需要安装延时队列插件):

  1. producer.go:

     package main
    
     import (
         "github.com/streadway/amqp"
         "time"
     )
    
     var (
         url        = "amqp://root:123456@10.0.0.13:5672"
         exchange   = "my-exchange"
         routingKey = "my-routingKey"
         queue      = "my-queue2"
         delay      = 3
     )
    
     func main() {
         connection, err := amqp.Dial(url)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         args := amqp.Table{
             "x-delayed-type": "direct",
         }
         err = channel.ExchangeDeclare(
             exchange,
             "x-delayed-message",
             true,
             false,
             false,
             false,
             args,
         )
         if err != nil {
             panic("exchange declare error: " + err.Error())
         }
         _, err = channel.QueueDeclare(
             queue,
             true,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         err = channel.QueueBind(
             queue,
             routingKey,
             exchange,
             false,
             nil,
         )
         if err != nil {
             panic("queue bind error: " + err.Error())
         }
         headers := amqp.Table{
             "x-delay": delay * 1000,
         }
         msg := amqp.Publishing{
             DeliveryMode: amqp.Persistent,
             Timestamp:    time.Now(),
             ContentType:  "application/text",
             Body:         []byte("delay msg from producer"),
             Headers:      headers,
         }
         err = channel.Publish(exchange, routingKey, false, false, msg)
         if err != nil {
             panic(err)
         }
         channel.Close()
         connection.Close()
     }
  2. consumer.go:

     package main
    
     import (
         "fmt"
         "github.com/streadway/amqp"
         "time"
     )
    
     var (
         url        = "amqp://root:123456@10.0.0.13:5672"
         exchange   = "my-exchange"
         routingKey = "my-routingKey"
         queue      = "my-queue2"
     )
    
     func rabbitConsume(u, q string) {
         defer func() {
             if err := recover(); err != nil {
                 time.Sleep(3 * time.Second)
                 rabbitConsume(u, q)
             }
         }()
         connection, err := amqp.Dial(u)
         if err != nil {
             panic("connect error: " + err.Error())
         }
         channel, _ := connection.Channel()
         args := amqp.Table{
             "x-delayed-type": "direct",
         }
         err = channel.ExchangeDeclare(
             exchange,
             "x-delayed-message",
             true,
             false,
             false,
             false,
             args,
         )
         if err != nil {
             panic("exchange declare error: " + err.Error())
         }
         _, err = channel.QueueDeclare(
             q,
             true,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("queue declare error: " + err.Error())
         }
         err = channel.QueueBind(
             q,
             routingKey,
             exchange,
             false,
             nil,
         )
         if err != nil {
             panic("queue bind error: " + err.Error())
         }
         closeChan := make(chan *amqp.Error, 1)
         notifyClose := channel.NotifyClose(closeChan)
         closeFlag := false
         msgs, err := channel.Consume(
             q,
             "",
             false,
             false,
             false,
             false,
             nil,
         )
         if err != nil {
             panic("consume declare error: " + err.Error())
         }
         for {
             select {
             case e := <-notifyClose:
                 fmt.Println("channel 通道错误, error: ", e.Error())
                 close(closeChan)
                 time.Sleep(3 * time.Second)
                 rabbitConsume(u, q)
                 closeFlag = true
             case delivery := <-msgs:
                 // todo something
                 fmt.Println(" [x] Received ", string(delivery.Body))
                 //time.Sleep(time.Second)
                 fmt.Println(" [x] Done")
                 // todo something
                 delivery.Ack(false) // 告知 rabbitmq 服务器消息处理成功
                 //delivery.Nack(false,false) // 告知 rabbitmq 服务器消息处理失败
             }
             if closeFlag {
                 break
             }
         }
     }
    
     func main() {
         // 开启 5 个 goroutine 去消费
         for i := 0; i < 5; i++ {
             go rabbitConsume(url, queue)
         }
         select {}
     }
文档更新时间: 2024-04-18 16:35   作者:lee