项目地址:
https://github.com/streadway/amqp
优先级队列
producer.go:
package main import ( "github.com/streadway/amqp" "strconv" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" queue = "my-queue" maxPriority int32 = 10 ) func main() { connection, err := amqp.Dial(url) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() args := amqp.Table{"x-max-priority": maxPriority} _, err = channel.QueueDeclare( queue, true, false, false, false, args, ) if err != nil { panic("queue declare error: " + err.Error()) } channel.Qos(1, 0, false) for i := 0; i < 100; i++ { var priority uint8 if i%2 == 0 { // 偶数优先级高 priority = 2 } else { priority = 1 } msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: "application/text", Body: []byte("msg: " + strconv.Itoa(i)), Priority: priority, } err = channel.Publish("", queue, false, false, msg) if err != nil { panic(err) } } channel.Close() connection.Close() }
consumer.go:
package main import ( "fmt" "github.com/streadway/amqp" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" queue = "my-queue" maxPriority int32 = 10 ) func rabbitConsume(u, q string) { defer func() { if err := recover(); err != nil { time.Sleep(3 * time.Second) rabbitConsume(u, q) } }() connection, err := amqp.Dial(u) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() args := amqp.Table{"x-max-priority": maxPriority} _, err = channel.QueueDeclare( q, true, false, false, false, args, ) if err != nil { panic("queue declare error: " + err.Error()) } channel.Qos(1, 0, false) closeChan := make(chan *amqp.Error, 1) notifyClose := channel.NotifyClose(closeChan) closeFlag := false msgs, err := channel.Consume( q, "", false, false, false, false, nil, ) if err != nil { panic("consume declare error: " + err.Error()) } for { select { case e := <-notifyClose: fmt.Println("channel 通道错误, error: ", e.Error()) close(closeChan) time.Sleep(3 * time.Second) rabbitConsume(u, q) closeFlag = true case delivery := <-msgs: // todo something fmt.Println(" [x] Received ", string(delivery.Body)) //time.Sleep(time.Second) fmt.Println(" [x] Done") // todo something delivery.Ack(false) // 发送响应,代表该消息已处理成功 } if closeFlag { break } } } func main() { // 开启 5 个协程去消费 for i := 0; i < 5; i++ { go rabbitConsume(url, queue) } select {} }
事务机制
producer.go:
package main import ( "github.com/streadway/amqp" "strconv" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" queue = "my-queue1" ) func main() { connection, err := amqp.Dial(url) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() _, err = channel.QueueDeclare( queue, false, false, false, false, nil, ) if err != nil { panic("queue declare error: " + err.Error()) } channel.Tx() for i := 0; i < 1000; i++ { msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: "application/text", Body: []byte("msg: " + strconv.Itoa(i)), } err = channel.Publish("", queue, false, false, msg) if err != nil { channel.TxRollback() panic(err) } if i == 500 { channel.TxRollback() panic("rollback due to panic.") } } channel.TxCommit() channel.Close() connection.Close() }
consumer.go:
package main import ( "fmt" "github.com/streadway/amqp" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" queue = "my-queue1" ) func rabbitConsume(u, q string) { defer func() { if err := recover(); err != nil { time.Sleep(3 * time.Second) rabbitConsume(u, q) } }() connection, err := amqp.Dial(u) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() _, err = channel.QueueDeclare( q, false, false, false, false, nil, ) if err != nil { panic("queue declare error: " + err.Error()) } closeChan := make(chan *amqp.Error, 1) notifyClose := channel.NotifyClose(closeChan) closeFlag := false msgs, err := channel.Consume( q, "", false, false, false, false, nil, ) if err != nil { panic("consume declare error: " + err.Error()) } for { select { case e := <-notifyClose: fmt.Println("channel 通道错误, error: ", e.Error()) close(closeChan) time.Sleep(3 * time.Second) rabbitConsume(u, q) closeFlag = true case delivery := <-msgs: // todo something fmt.Println(" [x] Received ", string(delivery.Body)) //time.Sleep(time.Second) fmt.Println(" [x] Done") // todo something delivery.Ack(false) // 告知 rabbitmq 服务器消息处理成功 //delivery.Nack(false,false) // 告知 rabbitmq 服务器消息处理失败 } if closeFlag { break } } } func main() { // 开启 5 个 goroutine 去消费 for i := 0; i < 5; i++ { go rabbitConsume(url, queue) } select {} }
confirm 机制
producer.go:
package main import ( "fmt" "github.com/streadway/amqp" "strconv" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" queue = "my-queue1" ) func main() { connection, err := amqp.Dial(url) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() channel.Confirm(false) _, err = channel.QueueDeclare( queue, false, false, false, false, nil, ) if err != nil { panic("queue declare error: " + err.Error()) } confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1)) go func() { for { select { case confirmed := <-confirms: if confirmed.Ack { // 发送成功的消息 fmt.Printf("ack, tag is %d\n", confirmed.DeliveryTag) } else { // 未发送成功的消息 fmt.Printf("nack, tag is %d\n", confirmed.DeliveryTag) } } } }() for i := 0; i < 1000; i++ { msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: "application/text", Body: []byte("msg: " + strconv.Itoa(i)), } err = channel.Publish("", queue, false, false, msg) if err != nil { fmt.Println("publish error: " + err.Error()) } } channel.Close() connection.Close() }
consumer.go(同【事务机制】-consumer.go)
延时队列(需要安装延时队列插件):
producer.go:
package main import ( "github.com/streadway/amqp" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" exchange = "my-exchange" routingKey = "my-routingKey" queue = "my-queue2" delay = 3 ) func main() { connection, err := amqp.Dial(url) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() args := amqp.Table{ "x-delayed-type": "direct", } err = channel.ExchangeDeclare( exchange, "x-delayed-message", true, false, false, false, args, ) if err != nil { panic("exchange declare error: " + err.Error()) } _, err = channel.QueueDeclare( queue, true, false, false, false, nil, ) if err != nil { panic("queue declare error: " + err.Error()) } err = channel.QueueBind( queue, routingKey, exchange, false, nil, ) if err != nil { panic("queue bind error: " + err.Error()) } headers := amqp.Table{ "x-delay": delay * 1000, } msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: "application/text", Body: []byte("delay msg from producer"), Headers: headers, } err = channel.Publish(exchange, routingKey, false, false, msg) if err != nil { panic(err) } channel.Close() connection.Close() }
consumer.go:
package main import ( "fmt" "github.com/streadway/amqp" "time" ) var ( url = "amqp://root:123456@10.0.0.13:5672" exchange = "my-exchange" routingKey = "my-routingKey" queue = "my-queue2" ) func rabbitConsume(u, q string) { defer func() { if err := recover(); err != nil { time.Sleep(3 * time.Second) rabbitConsume(u, q) } }() connection, err := amqp.Dial(u) if err != nil { panic("connect error: " + err.Error()) } channel, _ := connection.Channel() args := amqp.Table{ "x-delayed-type": "direct", } err = channel.ExchangeDeclare( exchange, "x-delayed-message", true, false, false, false, args, ) if err != nil { panic("exchange declare error: " + err.Error()) } _, err = channel.QueueDeclare( q, true, false, false, false, nil, ) if err != nil { panic("queue declare error: " + err.Error()) } err = channel.QueueBind( q, routingKey, exchange, false, nil, ) if err != nil { panic("queue bind error: " + err.Error()) } closeChan := make(chan *amqp.Error, 1) notifyClose := channel.NotifyClose(closeChan) closeFlag := false msgs, err := channel.Consume( q, "", false, false, false, false, nil, ) if err != nil { panic("consume declare error: " + err.Error()) } for { select { case e := <-notifyClose: fmt.Println("channel 通道错误, error: ", e.Error()) close(closeChan) time.Sleep(3 * time.Second) rabbitConsume(u, q) closeFlag = true case delivery := <-msgs: // todo something fmt.Println(" [x] Received ", string(delivery.Body)) //time.Sleep(time.Second) fmt.Println(" [x] Done") // todo something delivery.Ack(false) // 告知 rabbitmq 服务器消息处理成功 //delivery.Nack(false,false) // 告知 rabbitmq 服务器消息处理失败 } if closeFlag { break } } } func main() { // 开启 5 个 goroutine 去消费 for i := 0; i < 5; i++ { go rabbitConsume(url, queue) } select {} }
文档更新时间: 2024-04-18 16:35 作者:lee