RocketMQ 的搭建
1 ) 配置 docker-compose.yaml 文件
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./logs:/opt/logs
- ./store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- "10909:10909"
- "10911:10911"
volumes:
- ./logs:/opt/logs
- ./store:/opt/store
- ./conf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- "8080:8080"
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
2 ) 配置文件 conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=192.168.124.6
defaultTopicQueueNums = 4
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
listenPort = 10911
deleteWhen = 04
fileReservedTime = 120
mapedfileSizeCommitLog = 1073741824
mapedfileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio = 88
maxMessageSize=65536
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
- 注意,需要指定
brokerIP1
且不能使用 0.0.0.0 也不能不指定,否则无法通信 fileReservedTime
默认是 48h
3 ) 拉取镜像
- $
docker pull foxiswho/rocketmq:server
- $
docker pull foxiswho/rocketmq:broker
- $
docker pull styletang/rocketmq-console-ng
4 )启动和检查
- 启动 $
docker compose up -d
- 检查状态 $
docker compose ps
打开 UI 界面验证
- 访问:http://127.0.0.1:8080
上面这个就是和上面的 brokerIP1
对应
编写程序验证生产和消费消息
- 现在简述下场景
- 生产5条消息
- 10s 后进行消费
代码实现
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
const groupName = "BBS_SHOP_GROUP_123"
func GetMqAddr() string {
mqAddr := "127.0.0.1:9876" // 这里填入你的 NameServer 端口
return mqAddr
}
func ProduceMsg(mqAddr string, topic string) {
p, err := rocketmq.NewProducer( // 普通消息生产者
producer.WithGroupName(groupName),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
producer.WithRetry(2),
)
if err != nil {
panic(err)
}
err = p.Start()
if err != nil {
log.Fatal()
fmt.Println("生产者错误: %v", err.Error())
os.Exit(1)
}
for i := 0; i < 5; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello XProjectOrder " + strconv.Itoa(i)),
}
msg.WithDelayTimeLevel(3)
r, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Println("发送消息错误: %v", err.Error())
} else {
fmt.Println("生产消息成功: " + r.String() + "-" + r.MsgID)
}
}
err = p.Shutdown()
if err != nil {
fmt.Println("生产者shutdown: %v", err.Error())
os.Exit(1)
}
}
func ComsumeMsg(mqAddr string, topic string) {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(groupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
)
if err != nil {
panic(err)
}
err = c.Subscribe(topic, consumer.MessageSelector{},
func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgList {
fmt.Printf("订阅消息,消费%v \n", msgList[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println("消费消息错误: %v", err.Error())
}
err = c.Start()
if err != nil {
fmt.Println("开启消费这错误: %v", err.Error())
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Println("shutdown消费者错误: %v", err.Error())
}
}
func main() {
topic := "BBS_SHOP_TOPIC_123"
mqAddr := GetMqAddr()
ProduceMsg(mqAddr, topic)
ComsumeMsg(mqAddr, topic)
}
-
以上是一个 demo,在真实场景,自行进行封装处理
-
这里定义了一个 主题
BBS_SHOP_TOPIC_123
和 一个订阅组BBS_SHOP_GROUP_123
-
查看生产消息输出
生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80001, offsetMsgId=C0A87C0600002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80001 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80002, offsetMsgId=C0A87C0600002A9F00000000000000DE, queueOffset=1, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]]-C0A87C06209F0000000017eef7e80002 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80003, offsetMsgId=C0A87C0600002A9F00000000000001BC, queueOffset=2, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]]-C0A87C06209F0000000017eef7e80003 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80004, offsetMsgId=C0A87C0600002A9F000000000000029A, queueOffset=3, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]]-C0A87C06209F0000000017eef7e80004 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80005, offsetMsgId=C0A87C0600002A9F0000000000000378, queueOffset=4, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80005
-
查看消费 (10s 之后)
订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 0, Flag=0, properties=map[CONSUME_START_TIME:1717572747702 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80001], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80001, OffsetMsgId=C0A87C0600002A9F0000000000000456,QueueId=1, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737685, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747694, StoreHost=192.168.124.6:10911, CommitLogOffset=1110, BodyCRC=407480418, ReconsumeTimes=0, PreparedTransactionOffset=0] 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 1, Flag=0, properties=map[CONSUME_START_TIME:1717572747706 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:2 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80002], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80002, OffsetMsgId=C0A87C0600002A9F0000000000000533,QueueId=2, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737698, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747701, StoreHost=192.168.124.6:10911, CommitLogOffset=1331, BodyCRC=1867421940, ReconsumeTimes=0, PreparedTransactionOffset=0] 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 2, Flag=0, properties=map[CONSUME_START_TIME:1717572747710 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:3 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80003], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80003, OffsetMsgId=C0A87C0600002A9F0000000000000610,QueueId=3, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737702, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747704, StoreHost=192.168.124.6:10911, CommitLogOffset=1552, BodyCRC=1984416078, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]" consumerGroup=BBS_SHOP_GROUP_123 offset=0 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 3, Flag=0, properties=map[CONSUME_START_TIME:1717572747713 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:0 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80004], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80004, OffsetMsgId=C0A87C0600002A9F00000000000006ED,QueueId=0, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737706, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747709, StoreHost=192.168.124.6:10911, CommitLogOffset=1773, BodyCRC=21035480, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]" consumerGroup=BBS_SHOP_GROUP_123 offset=1 INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]" consumerGroup=BBS_SHOP_GROUP_123 offset=1 INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]" consumerGroup=BBS_SHOP_GROUP_123 offset=1 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 4, Flag=0, properties=map[CONSUME_START_TIME:1717572747717 DELAY:3 MAX_OFFSET:2 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80005], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80005, OffsetMsgId=C0A87C0600002A9F00000000000007CA,QueueId=1, StoreSize=221, QueueOffset=1, SysFlag=0, BornTimestamp=1717572737709, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747711, StoreHost=192.168.124.6:10911, CommitLogOffset=1994, BodyCRC=522480763, ReconsumeTimes=0, PreparedTransactionOffset=0]
-
以上就是生产和消费的主要过程
效果
总结
- 以上是简单的环境搭建和生产消息,以及延迟消费消息的 demo 示例
- 实际场景中,结合以上demo,对一些异步发送消息的场景进行灵活运用和升级