说在前头:阿里的rocketmq的文档是真他妈的烂的1b,很多东西都不说,全靠自己看源码,摸索,草,真的要吐血了
rocketmq的版本5而不是版本4,版本5比版本4多了个proxy
rocketmq5 三个组件:namesrv、broker、proxy,所以要启动这三个组件,clientsdk是和proxy通信,proxy和broker通信,
0:rocketmq用的jdk1.8
1:配置环境变量,可以直接在idea里配也可以在windows上配,不过windows上配可能要重启才起作用:
(!!!路径一定要用双斜杠,比如 a\\b\\c不能是a\b\c,因为他会解析成转义而不是路径,坑了我好久)
ROCKETMQ_HOME="D:\\work\\code\\rocketmq" #namesrv和broker使用的,就是我们的源码路径
RMQ_PROXY_HOME="D:\\work\\code\\rocketmq" #proxy使用的,proxy是新增的,所以这个环境变量也是新增的
2:创建配置文件夹
rocketmq默认使用的是ROCKETMQ_HOME\\distribution目录下的配置文件,我们不用,而是新建一个并在启动的时候指定
2.1:创建文件夹 ROCKETMQ_HOME\\conf
2.2:创建配置文件 ROCKETMQ_HOME\\conf\\namesrv.conf,ROCKETMQ_HOME\\conf\\broker.conf,ROCKETMQ_HOME\\conf\\rmq-proxy.json
broker.conf和namesrv.conf的命令可以随便取,内容也可以为空,直接用默认的,
proxy我偷懒,直接用代码默认的,默认的要求是RMQ_PROXY_HOME\\conf\\rmq-proxy.json这个文件即指定目录下的指定文件
rmq-proxy.json文件内容如下:
{
"rocketMQClusterName": "DefaultCluster",
"namesrvAddr": "127.0.0.1:9876"
}
broker.conf文件内容如下:
brokerClusterName = DefaultCluster #必须和rmq-proxy.json中的clusterName保持一致
brokerName = broker-a
namesrvAddr = 127.0.0.1:9876
storePathRootDir=D:\\work\\code\\rocketmq\\conf\\brokerstore #!!!我们手动创建就行,还有,路径名一定要双斜杠
storePathCommitLog=D:\\work\\code\\rocketmq\\conf\\brokerstore\\commitlog #!!!这个目录他会自动创建,还有,路径名一定要双斜杠
namesrv.conf文件内容如下:
listenPort=9876 #指定端口
(当然也可以啥也不填)
3:修改日志文件,以便会在控制台打印日志
只要修改每个子项目的resource目录下的rmq.xxx.logback.xml文件就行:
<configuration>
<root level="INFO">
<appender-ref ref="STDOUT"/> #!!!!只要把<root>标签内的ref的名字改成"STDOUT"就会输出到控制台了
</root>
</configuration>
4:启动namesrv/broker/proxy,命令行参数:
broker: -c D:\\work\\code\\rocketmq\\conf\\broker.conf #!!!路径一定要双斜杠,否则会启动失败,这个小错误卡了我好久。。
namesrv: -c D:\\work\\code\\rocketmq\\conf\\namesrv.conf
proxy: (空,可以不填,因为我们用的是默认的配置文件,只要配置RMQ_PROXY_HOME以及创建对应的rmq-proxy.json就行)
proxy: 启动会超级慢,需要三四分钟。。。真的太夸张了,暂不知道为啥
笔记1:namesrv默认端口9876,broker默认端口10911,proxy默认端口8081,默认集群名 DefaultCluster
5:用mqadmin源码来创建topic
5.1:修改代码。运行mqadmin创建topic前必须先修改源码中的timeout,
否则因为它连接需要耗时很长但是超时时间只有5s导致原本可以连接却因为超时而中断而topic创建失败
源码修改如下:
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
//TODO 恢复
timeoutMillis=25000; #!!!!!!在进入循环前设置timeoutMillis为25s,这样就不会超时了,只要改这里就可以了
if (channel != null && channel.isActive()) {
long left = timeoutMillis;
try {
long costTime = System.currentTimeMillis() - beginStartTime;
5.2: 运行mqadmin来创建topic,不要用mqadmin.cmd,这个创建不了,老是连不上,
而且broker设置autoTopicCreateEnable=true不起作用必须手动创建
mqadmin启动命令如下:
mqadmin对应的class为:org.apache.rocketmq.tools.command.MQAdminStartup
mqadmin: updateTopic -b 127.0.0.1:10911 -t testx -n 127.0.0.1:9876 #创建topic testx
client测试程序:
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>4.9.7</version>
</dependency>
</dependencies>
Producer代码:
package producer;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.io.IOException;
import java.time.Duration;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "127.0.0.1:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "testx";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
.setRequestTimeout(Duration.ofSeconds(25));
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
String messageBody = "hello world";
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody(messageBody.getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
for (int i = 0; i < 10000; i++) {
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message= {" + messageBody + "} successfully, messageId={" + sendReceipt.getMessageId() + "}");
int ch = System.in.read();
}
} catch (ClientException e) {
System.out.println("Failed to send message" + e.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
// producer.close();
}
}
consumer程序:
package consumer;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setRequestTimeout(Duration.ofSeconds(25)) //!!!!这个超时时间一定要设置长一点,不然会导致连不上而报错
//!!!!搞了一天,真的吐血,还好今天搞完了,虽然搞完了,但还是得骂两句
//2024/11/18 22:48,又是加班暂调休的一天。。。。。。
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "myconsumer";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "testx";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
System.out.println("Consume message={" + messageView.getBody().toString() + "} successfully, messageId={" + messageView.getMessageId() + "}");
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}