Go操作各大消息队列教程
1 RabbitMQ
1.1 概念
①基本名词
当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的RocketMQ。甚至连redis这种NoSQL都支持MQ的功能。
- Broker:表示消息队列服务实体
- Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
- AMQP(Advanced Message Queuing Protocol)高级消息队列协议
- Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
②常见模式
1. simple简单模式
消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)
2. worker工作模式
多个消费者从一个队列中争抢消息
- (隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
- 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
3. publish/subscribe发布订阅(共享资源)
消费者订阅消息,然后从订阅的队列中获取消息进行消费。
- X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
- 相关场景:邮件群发,群聊天,广播(广告)
4. routing路由模式
- 交换机根据路由规则,将消息路由到不同的队列中
- 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
5. topic主题模式(路由模式的一种)
- 星号井号代表通配符
- 星号代表多个单词,井号代表一个单词
- 路由功能添加模糊匹配
- 消息产生者产生消息,把消息交给交换机
- 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
1.2 搭建(docker方式)
①拉取镜像
# 拉取镜像
docker pull rabbitmq:3.7-management
②创建并启动容器
# 创建并运行容器
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management
#5672是项目中连接rabbitmq的端口(我这里映射的是5672),15672是rabbitmq的web管理界面端口(我映射为15672)
# 输入网址http://ip:15672即可进入rabbitmq的web管理页面,账户密码:guest / guest
③web界面创建用户和virtual host
下面为了我们后续的操作,首先我们新建一个Virtual Host并且给他分配一个用户名,用来隔离数据,根据自己需要自行创建
- 新增virtual host
- 新增用户
- 点击新建好的用户,设置其host
- 最终效果
1.3 代码操作
①RabbitMQ struct:包含创建、消费、生产消息
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//amqp:// 账号 密码@地址:端口号/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"
type RabbitMQ struct {
//连接
conn *amqp.Connection
//管道
channel *amqp.Channel
//队列名称
QueueName string
//交换机
Exchange string
//key Simple模式 几乎用不到
Key string
//连接信息
Mqurl string
}
//创建RabbitMQ结构体实例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
var err error
//创建rabbitmq连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接错误!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel失败")
return rabbitmq
}
//断开channel和connection
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
//简单模式step:1。创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
//订阅模式创建rabbitmq实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel!")
return rabbitmq
}
//订阅模式生成
func (r *RabbitMQ) PublishPub(message string) {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"fanout",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2 发送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//订阅模式消费端代码
func (r *RabbitMQ) RecieveSub() {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"fanout",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2试探性创建队列,创建队列
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,这里的key要为空
"",
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出请按 Ctrl+C")
<-forever
}
//话题模式 创建RabbitMQ实例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//话题模式发送信息
func (r *RabbitMQ) PublishTopic(message string) {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 话题模式
"topic",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "topic failed to declare an excha"+"nge")
//2发送信息
err = r.channel.Publish(
r.Exchange,
//要设置
r.Key,
false,
false,
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//话题模式接收信息
//要注意key
//其中* 用于匹配一个单词,#用于匹配多个单词(可以是零个)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 话题模式
"topic",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an exchange")
//2试探性创建队列,创建队列
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,这里的key要为空
r.Key,
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出请按 Ctrl+C")
<-forever
}
//路由模式 创建RabbitMQ实例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//路由模式发送信息
func (r *RabbitMQ) PublishRouting(message string) {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"direct",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//发送信息
err = r.channel.Publish(
r.Exchange,
//要设置
r.Key,
false,
false,
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"direct",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2试探性创建队列,创建队列
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,这里的key要为空
r.Key,
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出请按 Ctrl+C")
<-forever
}
//简单模式Step:2、简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {
//1、申请队列,如果队列存在就跳过,不存在创建
//优点:保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
false,
//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
false,
//是否具有排他性 true表示自己可见 其他用户不能访问
false,
//是否阻塞 true表示要等待服务器的响应
false,
//额外数据
nil,
)
if err != nil {
fmt.Println(err)
}
//2.发送消息到队列中
r.channel.Publish(
//默认的Exchange交换机是default,类型是direct直接类型
r.Exchange,
//要赋值的队列名称
r.QueueName,
//如果为true,根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把发送的消息返回给发送者
false,
//如果为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息还给发送者
false,
//消息
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
func (r *RabbitMQ) ConsumeSimple() {
//1、申请队列,如果队列存在就跳过,不存在创建
//优点:保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
false,
//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外数据
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用来区分多个消费者
"",
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
false,
//队列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理
go func() {
for d := range msgs {
//实现我们要处理的逻辑函数
log.Printf("Received a message:%s", d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
func (r *RabbitMQ) ConsumeWorker(consumerName string) {
//1、申请队列,如果队列存在就跳过,不存在创建
//优点:保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
false,
//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外数据
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用来区分多个消费者
consumerName,
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
false,
//队列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理
go func() {
for d := range msgs {
//实现我们要处理的逻辑函数
log.Printf("%s Received a message:%s", consumerName, d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
②测试代码
1. simple简单模式
consumer.go
func main() {
//消费者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
rabbitmq.ConsumeSimple()
}
producer.go
func main() {
//Simple模式 生产者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 2)
rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
}
}
2. worker模式
consumer.go
func main() {
/*
worker模式无非就是多个消费者去同一个队列中消费消息
*/
//消费者1
rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
go rabbitmq1.ConsumeWorker("consumer1")
//消费者2
rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
rabbitmq2.ConsumeWorker("consumer2")
}
producer.go
func main() {
//Worker模式 生产者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
for i := 0; i < 100; i++ {
//time.Sleep(time.Second * 2)
rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
}
}
3. publish/subscribe模式
consumer.go:
func main() {
//消费者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
rabbitmq.RecieveSub()
}
producer.go
func main() {
//订阅模式发送者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
for i := 0; i <= 20; i++ {
rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")
fmt.Println(i)
time.Sleep(1 * time.Second)
}
}
4. router模式
consumer.go
func main() {
//消费者
rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
rabbitmq.RecieveRouting()
}
producer.go
func main() {
//路由模式生产者
imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")
for i := 0; i <= 10; i++ {
imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
5. topic模式
consumer.go
func main() {
/*
星号井号代表通配符
星号代表多个单词,井号代表一个单词
路由功能添加模糊匹配
消息产生者产生消息,把消息交给交换机
交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
*/
//Topic消费者
//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99
rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的
rabbitmq.RecieveTopic()
}
producer.go
func main() {
//Topic模式生产者
imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")
for i := 0; i <= 10; i++ {
imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
2 Kafka
2.1 基本概念
Kafka是分布式的,其所有的构件borker(server服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
producer给broker发送数据,这些消息会存到kafka server里,然后consumer再向kafka server发起请求去消费这些数据。
kafka server在这个过程中像是一个帮你保管数据的中间商。所以kafka服务器也可以叫做broker(broker直接翻译可以是中间人或者经纪人的意思)。
在消息的生产时可以使用一个标识topic来区分,且可以进行分区;每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡
参考:https://blog.csdn.net/lingfy1234/article/details/122900348
- 应用场景
- 监控
- 消息队列
- 流处理
- 日志聚合
- 持久性日志
- 基础概念
- topic:话题
- broker:kafka服务集群,已发布的消息保存在一组服务器中,称之为kafka集群。集群中的每一个服务器都是一个代理(broker)
- partition:分区,topic物理上的分组
- message:消息,每个producer可以向一个topic主题发布一些消息
1.⽣产者从Kafka集群获取分区leader信息
2.⽣产者将消息发送给leader
3.leader将消息写入本地磁盘
4.follower从leader拉取消息数据
5.follower将消息写入本地磁盘后向leader发送ACK
6.leader收到所有的follower的ACK之后向生产者发送ACK
2.2 常见模式
①点对点模式:火车站出租车抢客
发送者将消息发送到消息队列中,消费者去消费,如果消费者有多个,他们会竞争地消费,也就是说对于某一条消息,只有一个消费者能“抢“到它。类似于火车站门口的出租车抢客的场景。
②发布订阅模式:组间无竞争,组内有竞争
消费者订阅对应的topic(主题),只有订阅了对应topic消费者的才会接收到消息。
例如:
- 牛奶有很多种,光明牛奶,希望牛奶等,只有你订阅了光明牛奶,送奶工才会把光明牛奶送到对应位置,你也才会有机会消费这个牛奶
注意
:为了提高消费者的消费能力,kafka中引入了消费者组的概念。相当于是:不同消费者组之间因为订阅的topic不同,不会有竞争关系。但是消费者组内是有竞争关系。
例如:
- 成都、厦门的出租车司机分别组成各自的消费者组。
- 成都的出租车司机只拉成都的人,厦门的只拉厦门的人。(因此他们两个消费者组不是竞争关系)
- 成都市内的出租车司机之间是竞争关系。(消费者组内是竞争关系)
2.3 docker-compose部署
vim docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#KAFKA_ADVERTISED_LISTENERS后面改为自己本地宿主机的ip,例如我本地mac的ip为192.168.0.101
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
# 进入到docker-compose.yml所在目录,执行下面命令
docker-compose up -d
# 查看部署结果,状态为up表明部署成功
docker-compose ps
2.4 代码操作
# 1. 创建对应topic
docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092
# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
①producer.go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
②consumer.go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)
}
//演示时使用
select {}
}