1拉取4.8镜像
docker pull foxiswho/rocketmq:4.8.0
拉取控制台镜像
docker pull styletang/rocketmq-console-ng
2创建rocketmq使用的共有网络,便于相互访问
docker network create rocketmq_network
3启动rmqnamesrv
docker run -d --name rmqnamesrv --network rocketmq_network -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 9876:9876 foxiswho/rocketmq:4.8.0 sh mqnamesrv
–name rmqnamesrv:指定容器名称为rmqnamesrv,注意这个名字,后续会使用。
–network rocketmq_network:为容器指定网络为rocketmq_network,同一网络下的容器能够通过容器名称互通。
4启动rmqbroker
创建映射日志文件夹 赋予权限
mkdir -p /data/docker/rocketmq/logs
chmod 777 logs
创建映射配置文件/data/docker/rocketmq/conf/broker.conf 编辑文件内容 broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
mesrvAddr = 66.88.14.183:9876
brokerIP1 = 66.88.14.183
listenPort = 10911
启动rmqbroker
docker run -d --name rmqbroker --network rocketmq_network --privileged=true -v /data/docker/rocketmq/logs:/home/rocketmq/logs/rocketmqlogs -v /data/docker/rocketmq/conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf -e "NAMESRV_ADDR=rmqnamesrv:9876" -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 10911:10911 -p 10912:10912 -p 10909:10909 foxiswho/rocketmq:4.8.0 sh mqbroker autoCreateTopicEnable=true -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf
–privileged=true:如果使用-v映射了目录,则使用该参数获取文件访问权限
(容器之间可以通过容器名称链接)
验证容器之间是否互通:# 进入broker容器
docker exec -it rmqbroker /bin/bash
ping name-server的容器名称
ping rmqnamesrv
5启动rmqconsole
docker run -d --name rmqconsole --network rocketmq_network --link rmqnamesrv:rmqnamesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8180:8080 -t styletang/rocketmq-console-ng
访问8180端口 出现下图搭建成功
如果失败 进去rmqbroker容器中 查看下日志 并且查询是否 ping rmqnamesrv 通容器
6 SpringBoot测试是否可用
引入pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
消费者代码
public class Consumer {
public static void main(String[] args) throws MQClientException {
/*消息接受*/
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("66.66.14.183:9876");
//3.指定消费者订阅的主题和标签
consumer.subscribe("broker-a", "myTag");
//4.设置回调函数,编写处理处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
String s = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.println(s);
}
//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者
consumer.start();
}
}
生产者代码
public class Provider {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
/*消息发送*/
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("66.66.14.183:9876");
//3. 启动生产者
producer.start();
//4. 创建消息对象,指定主题、标签和消息体
String data = "holle word";
Message msg = new Message("broker-a", "myTag", data.getBytes());
//5. 发送消息
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
//6. 关闭生产者
producer.shutdown();
}
}
测试结果
最后吐槽一下 csdn太恶心l 所有的文章都是copy的