rabbitmq的三种队列以及使用方式(beego)
创新互联建站坚持“要么做到,要么别承诺”的工作理念,服务领域包括:做网站、网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的博白网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!提示:最上面右调用的统一调用,最下面有消费的代码注:消费需根据条件修改并另起一个mian.go
面试官问这个问题,肯定是想知道你们公司有一个什么场景需要使用到这个Mq,这个场景有一个什么技术挑战导致必须要用这个mq,用了这个mq之后有什么好处。mq经典的使用场景有解耦,异步,削锋。
而rabbitmq是如何进行使用的他的使用发放是什么呢?
提示:先是统一的引入跟实例化
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
)
type Callback func(str1 string)
//Connect RabbitMQ连接函数
func Connect() (conn *amqp.Connection, err error) {//连接mq
conn, err = amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
return conn, err
}
func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
r := s.String()
return &r
}
一、rabbitmq的普通队列(路由模式)
1.生成者示例:这是最普通的rabbitmq的生成者
//Publish 发送端函数
//exchange交换机名称
//queueName队列名称
//body发送内容
func Publish(exchange string, queueName string, body string) error {//建立连接
conn, err := Connect()
if err != nil {return err
}
defer conn.Close()
//创建通道channel
channel, err := conn.Channel()
if err != nil {return err
}
defer channel.Close()
//创建队列
q, err := channel.QueueDeclare(
queueName, //队列名称
true, //持久化
false,
false,
false,
nil,
)
if err != nil {return err
}
//发送消息
err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
return err
}
2.消费者实例:这是普通队列的消费
//Consumer 接受方法
func Consumer(exchange string, queueName string, callback Callback) {//建立连接
conn, err := Connect()
defer conn.Close()
if err != nil {fmt.Println(err)
return
}
//创建通道channel
channel, err := conn.Channel()
defer channel.Close()
if err != nil {fmt.Println(err)
return
}
//创建queue
q, err := channel.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//输出
msgs, err := channel.Consume(
q.Name,
"",
false, //手动应答
false,
false,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
forever := make(chan bool)
go func() {for d := range msgs { s := BytesToString(&(d.Body))
callback(*s)
d.Ack(false)
}
}()
fmt.Printf("Waiting for messages")
<-forever
}
func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
r := s.String()
return &r
}
func callback(s string) {fmt.Printf("msg:%s", s)
return
}
二、rabbitmq的并发队列(主题模式)
1.生产者代码如下(示例):
func PublishEx(exchange string, types string, routingKey string, body string) error {//建立连接
conn, err := Connect()
defer conn.Close()
if err != nil {return err
}
//创建channel
channel, err := conn.Channel()
defer channel.Close()
if err != nil {return err
}
//创建交换机
err = channel.ExchangeDeclare(
exchange,
types,
true,
false,
false,
false,
nil,
)
if err != nil {return err
}
err = channel.Publish(exchange, routingKey, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
return err
}
2.消费者代码如下(示例):
func ConsumerEx(exchange string, types string, routingKey string, callback Callback) {//建立连接
conn, err := Connect()
defer conn.Close()
if err != nil {fmt.Println(err)
return
}
//创建通道channel
channel, err := conn.Channel()
defer channel.Close()
if err != nil {fmt.Println(err)
return
}
//创建交换机
err = channel.ExchangeDeclare(
exchange,
types,
true,
false,
false,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//创建队列
q, err := channel.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//绑定
err = channel.QueueBind(
q.Name,
routingKey,
exchange,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {fmt.Println(err)
return
}
forever := make(chan bool)
go func() {for d := range msgs { s := BytesToString(&(d.Body))
callback(*s)
d.Ack(false)
}
}()
fmt.Printf("Waiting for messages\n")
<-forever
}
三、rabbitmq的双队列(死信队列)
1.生产者代码如下(示例):
func PublishDlx(exchangeA string, body string) error {//建立连接
conn, err := Connect()
if err != nil {return err
}
defer conn.Close()
//创建一个Channel
channel, err := conn.Channel()
if err != nil {return err
}
defer channel.Close()
//消息发送到A交换机
err = channel.Publish(exchangeA, "", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
return err
}
2.消费者代码如下(示例):
func ConsumerDlx(exchangeA string, queueAName string, exchangeB string, queueBName string, ttl int, callback Callback) {//建立连接
conn, err := Connect()
if err != nil {fmt.Println(err)
return
}
defer conn.Close()
//创建一个Channel
channel, err := conn.Channel()
if err != nil {fmt.Println(err)
return
}
defer channel.Close()
//创建A交换机
//创建A队列
//A交换机和A队列绑定
err = channel.ExchangeDeclare(
exchangeA, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {fmt.Println(err)
return
}
//创建一个queue,指定消息过期时间,并且绑定过期以后发送到那个交换机
queueA, err := channel.QueueDeclare(
queueAName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
amqp.Table{ // 当消息过期时把消息发送到 exchangeB
"x-dead-letter-exchange": exchangeB,
"x-message-ttl": ttl,
//"x-dead-letter-queue" : queueBName,
//"x-dead-letter-routing-key" :
},
)
if err != nil {fmt.Println(err)
return
}
//A交换机和A队列绑定
err = channel.QueueBind(
queueA.Name, // queue name
"", // routing key
exchangeA, // exchange
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//创建B交换机
//创建B队列
//B交换机和B队列绑定
err = channel.ExchangeDeclare(
exchangeB, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {fmt.Println(err)
return
}
//创建一个queue
queueB, err := channel.QueueDeclare(
queueBName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {fmt.Println(err)
return
}
//B交换机和B队列绑定
err = channel.QueueBind(
queueB.Name, // queue name
"", // routing key
exchangeB, // exchange
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
msgs, err := channel.Consume(queueB.Name, "", false, false, false, false, nil)
if err != nil {fmt.Println(err)
return
}
forever := make(chan bool)
go func() {for d := range msgs { s := BytesToString(&(d.Body))
callback(*s)
d.Ack(false)
}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
<-forever
}
剩下的统一调用消费者模板
package main
import (
"encoding/json"
"fmt"
"github.com/beego/beego/v2/client/orm"
beego "github.com/beego/beego/v2/server/web"
"github.com/garyburd/redigo/redis"
"goApi/models"
_ "goApi/routers"
redisClient "goApi/services"
"goApi/services/mq"
"strconv"
)
func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")
//err := orm.RegisterDataBase("default", "mysql", "fukw:ipx4JtpXR6sCxmKt@tcp(127.0.0.1)/fukw?charset=utf8")
err := orm.RegisterDataBase("default", "mysql", "root:root@tcp(127.0.0.1)/fukw?charset=utf8")
if err != nil { fmt.Println("连接数据库失败")
}
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { fmt.Println("redis连接失败")
}
defer c.Close()
mq.Consumer("", "fyouku_top", callback)
fmt.Printf("mian执行成功")
beego.Run()
}
func callback(s string) {type Data struct { VideoId int
}
var data Data
err := json.Unmarshal([]byte(s), &data)
videoInfo, err := models.GetVideoInfo(data.VideoId)
if err == nil { conn := redisClient.RedisConnect()
defer conn.Close()
//更新排行榜
//执行的代码我这里是排行榜
redisChannelKey := "video:top:channel:channelId:" + strconv.Itoa(videoInfo.ChannelId)
redisTypeKey := "video:top:type:typeId:" + strconv.Itoa(videoInfo.TypeId)
conn.Do("zincrby", redisChannelKey, 1, data.VideoId)
conn.Do("zincrby", redisTypeKey, 1, data.VideoId)
}
fmt.Printf("msg is :%s\n", s)
}
这就是rabbitmq的3种队列的的书写形式
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧