链接:https://pan.baidu.com/s/14ziQH62MeYmM8N6JsH5RcA
提取码:yyds
下载rocketmq-all-4.9.3-bin-release.zip
下载、修改配置
mkdir -p /app/rocketmq
cd /app/rocketmq
unzip rocketmq-all-4.9.3-bin-release.zip
cd rocketmq-4.9.3/
修改 配置文件:
broker.conf
文件内容
#所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = SYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = SYNC_FLUSH
#设置broker节点所在服务器的ip地址(**这个非常重要,主从模式下,从节点会根据主节点的brokerIP2来同步数据,如果不配置,主从无法同步,brokerIP1设置为自己外网能访问的ip,服务器双网卡情况下必须配置,比如阿里云这种,主节点需要配置ip1和ip2,从节点只需要配置ip1即可)
brokerIP1 = 192.168.111.101
#nameServer地址,分号分割
namesrvAddr=192.168.111.101:9876
#Broker 对外服务的监听端口
listenPort = 10911
#是否允许Broker自动创建Topic
autoCreateTopicEnable = true
#是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup = true
#linux开启epoll
useEpollNativeSelector = true
# 这个是发送消息的线程数量,一般建议你配置成跟你的CPU核数一样
sendMessageThreadPoolNums=4
# 设置从节点可读
slaveReadEnable = true
#数据存放的根目录
storePathRootDir = /app/rocketmq/data
#commit log保存目录
storePathCommitLog = /app/rocketmq/data/commitlog
#消费队列存储路径存储路径
storePathConsumerQueue = /app/rocketmq/data/consumequeue
# 消息索引存储路径
storePathIndex= /app/rocketmq/data/index
# checkpoint文件路径
storeCheckpoint= /app/rocketmq/data/checkpoint
# abort文件存储路径
abortFile= /app/rocketmq/data/abort
启动NameServer
nohup sh bin/mqnamesrv &
查看 是否启动:
netstat -tanlp| grep 9876
启动 broker
进入bin 目录,修改 runbroker.sh
调整 各种JVM参数,然后启动
# 启动 mqbroker ,-n 指定NameServer地址
nohup sh bin/mqbroker -n 127.0.0.1:9876 &
tail -f nohup.out
成功启动:
启动控制台
JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.111.101:9876,指定 NameServer地址
docker run -d --name rocketmq-console --network rocketmq -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.111.101:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8000:8080 rocketmq-dashboard:v1.0
访问地址:http://192.168.111.101:8000/#/cluster
此时发现 已经启动成功!!!
测试
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
/**
1. 谁来发?
2. 发给谁?
3. 怎么发?
4. 发什么?
5. 发的结果是什么?
6. 打扫战场
**/
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer ("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr ("192.168.111.101:9876");
//3.1启动发送的服务
producer.start ();
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message ("topic1", "hello rocketmq".getBytes ("UTF-8"));
//3.2发送消息
SendResult result = producer.send (msg);
System.out.println ("返回结果:" + result);
//5.关闭连接
producer.shutdown ();
}
}
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
/**
1. 谁来发?
2. 发给谁?
3. 怎么发?
4. 发什么?
5. 发的结果是什么?
6. 打扫战场
**/
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr ("192.168.111.101:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe ("topic1", "*");
//3.开启监听,用于接收消息
consumer.registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
byte[] body = msg.getBody ();
System.out.println ("收到消息:" + new String (body));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start ();
System.out.println ("接受消息服务已经开启!");
//5 不要关闭消费者!
}
}