目录
- 一、准备工作
- 二、代码实现
- 1.添加依赖
- 2.创建一个常量类存放公共参数
- 3.调用HTTP协议的SDK 发送普通消息
- 4.调用HTTP协议的SDK 订阅普通消息
- 三、配置main的日志输出级别
- 四、测试效果
- 五、完成代码
一、准备工作
登录阿里云官网,先申请rocketMQ,再申请Topic、Group ID,然后就是参考阿里云的JAVA SDK进行编程实现。
环境要求:
安装JDK 1.6或以上版本
安装Maven
安装Java SDK
参照 阿里云 官方文档,来一步一步操作。
文档提供的SDK有TCP和Http协议
,这里使用HTTP协议来实现rocketMQ消息的发送与消费。
二、代码实现
调用HTTP协议的SDK收发普通消息
1.添加依赖
创建Springboot项目,添加 SDK依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--aliyun mq sdk-->
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.2</version>
</dependency>
注意:aliyun mq sdk的版本信息,请参见版本说明
2.创建一个常量类存放公共参数
package com.example.rocketdemo.config;
/**
* @author qzz
*/
public class MqConfigParams {
/**
* 你的topic
*/
public static final String TOPIC = "你的topic";
/**
* 消息标签 *:代表全部
*/
public static final String TAG = "你的tag";
/**
* 你的Group_ID
*/
public static final String GROUP_ID = "你的Group_ID";
/**
*你的accessKey
*/
public static final String ACCESS_KEY = "你的accessKey";
/**
*你的secretKey
*/
public static final String SECRET_KEY = "你的secretKey";
/**
* 实例ID
*/
public static final String INSTANCE_ID = "你的实例ID";
/**
*设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看
*/
public static final String HTTP_ENDPOINT = "http接入点地址";
}
3.调用HTTP协议的SDK 发送普通消息
package com.example.rocketdemo.util;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import com.example.rocketdemo.config.MqConfigParams;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
/**
* 生产 阿里云 RocketMQ 消息
* @author qzz
*/
@Slf4j
public class AliyunMessageProducerTest {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
MqConfigParams.HTTP_ENDPOINT,
// AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.ACCESS_KEY,
// AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.SECRET_KEY
);
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
// 不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
final String topic = MqConfigParams.TOPIC;
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
final String instanceId = MqConfigParams.INSTANCE_ID;
// 获取Topic的生产者。
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// 循环发送2条消息。
for (int i = 0; i < 2; i++) {
TopicMessage pubMsg; // 普通消息。
pubMsg = new TopicMessage(
// 消息内容。
"hello mq 111!".getBytes(),
// 消息标签。
MqConfigParams.TAG
);
// 设置消息的自定义属性。
pubMsg.getProperties().put("a", String.valueOf(i));
// 设置消息的Key。
pubMsg.setMessageKey("MessageKey");
// 同步发送消息,只要不抛异常就是成功。
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步发送消息,只要不抛异常就是成功。
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
4.调用HTTP协议的SDK 订阅普通消息
订阅普通消息的代码如下:
package com.example.rocketdemo.util;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.example.rocketdemo.config.MqConfigParams;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 消费 阿里云 RocketMQ 消息
* @author qzz
*/
@Slf4j
public class AliyunMessageConsumerTest {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
MqConfigParams.HTTP_ENDPOINT,
// AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.ACCESS_KEY,
// AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.SECRET_KEY
);
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
//不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
final String topic = MqConfigParams.TOPIC;
// 您在消息队列RocketMQ版控制台创建的Group ID。
final String groupId = MqConfigParams.GROUP_ID;
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
final String instanceId = MqConfigParams.INSTANCE_ID;
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// 在当前线程循环消费消息,建议多开个几个线程并发消费消息。
do {
List<Message> messages = null;
try {
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。
messages = consumer.consumeMessage(
1,// 一次最多消费1条消息(最多可设置为16条)。
3// 长轮询时间3秒(最多可设置为30秒)。
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// Topic中没有消息可消费。
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// 处理业务逻辑。
for (Message message : messages) {
System.out.println("Receive message: " + message+",topic:"+message.getMessageTag());
}
// 消息重试时间到达前若不确认消息消费成功,则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费的时间戳都不一样。
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}
三、配置main的日志输出级别
启动执行时,发现问题:控制台打印巨多debug日志
默认情况下,如果项目中集成了Logback等日志框架,在执行main方法时通过其进行日志打印,那么默认的日志级别是debug的。
此时,如果是http请求,甚至可以把请求的具体报文信息都打印出来,特别是三方框架的。为了不影响查看正常的日志,可以将main方法的日志级别进行调整
。
解决方法:
resources目录下增加logback.xml
logback.xml:
<configuration debug="false">
<logger name="org.apache" level="INFO" />
<logger name="org.apache.http.wire" level="INFO" />
<logger name="org.apache.http.headers" level="INFO" />
<property name="CONSOLE_LOG_PATTERN"
value="%date{yyyy-MM-dd HH:mm:ss} %highlight(%-5level) %magenta(%-4relative) --- [%yellow(%15.15thread)] %cyan(%-40.40logger{39}) : %msg%n"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<root level="ERROR">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
再次启动,打印日志就简洁明多了。
四、测试效果
生产者启动运行,结果如下:
消费者启动运行,结果如下:
"D:\Program Files\Java\jdk1.8.0_40\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar=51390:C:\Program Files\JetBrains\IntelliJ IDEA 2021.1.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\Program Files\Java\jdk1.8.0_40\jre\lib\charsets.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\deploy.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\access-bridge-64.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\cldrdata.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\dnsns.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\jaccess.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\jfxrt.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\localedata.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\nashorn.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\sunec.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\sunjce_provider.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\sunmscapi.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\sunpkcs11.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\ext\zipfs.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\javaws.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\jce.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\jfr.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\jfxswt.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\jsse.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\management-agent.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\plugin.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\resources.jar;D:\Program Files\Java\jdk1.8.0_40\jre\lib\rt.jar;F:\study-project\rocket-demo\target\classes;D:\mvnrepository\org\springframework\boot\spring-boot-starter-web\2.7.5\spring-boot-starter-web-2.7.5.jar;D:\mvnrepository\org\springframework\boot\spring-boot-starter\2.7.5\spring-boot-starter-2.7.5.jar;D:\mvnrepository\org\springframework\boot\spring-boot\2.7.5\spring-boot-2.7.5.jar;D:\mvnrepository\org\springframework\boot\spring-boot-autoconfigure\2.7.5\spring-boot-autoconfigure-2.7.5.jar;D:\mvnrepository\org\springframework\boot\spring-boot-starter-logging\2.7.5\spring-boot-starter-logging-2.7.5.jar;D:\mvnrepository\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\mvnrepository\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\mvnrepository\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\mvnrepository\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\mvnrepository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\mvnrepository\org\yaml\snakeyaml\1.30\snakeyaml-1.30.jar;D:\mvnrepository\org\springframework\boot\spring-boot-starter-json\2.7.5\spring-boot-starter-json-2.7.5.jar;D:\mvnrepository\com\fasterxml\jackson\core\jackson-databind\2.13.4.2\jackson-databind-2.13.4.2.jar;D:\mvnrepository\com\fasterxml\jackson\core\jackson-annotations\2.13.4\jackson-annotations-2.13.4.jar;D:\mvnrepository\com\fasterxml\jackson\core\jackson-core\2.13.4\jackson-core-2.13.4.jar;D:\mvnrepository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.13.4\jackson-datatype-jdk8-2.13.4.jar;D:\mvnrepository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.13.4\jackson-datatype-jsr310-2.13.4.jar;D:\mvnrepository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.13.4\jackson-module-parameter-names-2.13.4.jar;D:\mvnrepository\org\springframework\boot\spring-boot-starter-tomcat\2.7.5\spring-boot-starter-tomcat-2.7.5.jar;D:\mvnrepository\org\apache\tomcat\embed\tomcat-embed-core\9.0.68\tomcat-embed-core-9.0.68.jar;D:\mvnrepository\org\apache\tomcat\embed\tomcat-embed-el\9.0.68\tomcat-embed-el-9.0.68.jar;D:\mvnrepository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.68\tomcat-embed-websocket-9.0.68.jar;D:\mvnrepository\org\springframework\spring-web\5.3.23\spring-web-5.3.23.jar;D:\mvnrepository\org\springframework\spring-beans\5.3.23\spring-beans-5.3.23.jar;D:\mvnrepository\org\springframework\spring-webmvc\5.3.23\spring-webmvc-5.3.23.jar;D:\mvnrepository\org\springframework\spring-aop\5.3.23\spring-aop-5.3.23.jar;D:\mvnrepository\org\springframework\spring-context\5.3.23\spring-context-5.3.23.jar;D:\mvnrepository\org\springframework\spring-expression\5.3.23\spring-expression-5.3.23.jar;D:\mvnrepository\org\projectlombok\lombok\1.18.24\lombok-1.18.24.jar;D:\mvnrepository\com\aliyun\mq\mq-http-sdk\1.0.2\mq-http-sdk-1.0.2.jar;D:\mvnrepository\org\apache\httpcomponents\httpasyncclient\4.1.5\httpasyncclient-4.1.5.jar;D:\mvnrepository\org\apache\httpcomponents\httpcore\4.4.15\httpcore-4.4.15.jar;D:\mvnrepository\org\apache\httpcomponents\httpcore-nio\4.4.15\httpcore-nio-4.4.15.jar;D:\mvnrepository\org\apache\httpcomponents\httpclient\4.5.13\httpclient-4.5.13.jar;D:\mvnrepository\commons-codec\commons-codec\1.15\commons-codec-1.15.jar;D:\mvnrepository\org\apache\commons\commons-lang3\3.12.0\commons-lang3-3.12.0.jar;D:\mvnrepository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;D:\mvnrepository\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\mvnrepository\org\apache\logging\log4j\log4j-core\2.17.2\log4j-core-2.17.2.jar;D:\mvnrepository\org\apache\logging\log4j\log4j-jcl\2.17.2\log4j-jcl-2.17.2.jar;D:\mvnrepository\com\google\code\gson\gson\2.9.1\gson-2.9.1.jar;D:\mvnrepository\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\mvnrepository\org\springframework\spring-core\5.3.23\spring-core-5.3.23.jar;D:\mvnrepository\org\springframework\spring-jcl\5.3.23\spring-jcl-5.3.23.jar" com.example.rocketdemo.util.AliyunMessageConsumerTest
Receive message: Message{MessageID:3AD19CA3000E681A95157491C815BDC8,MessageMD5:39C636AD390E69FDA0821CA29F8E8CC9,RequestID:637DC89A3944310E00E92833,Properties:{a=0, KEYS=MessageKey, __BORNHOST=58.209.156.163}, receiptHandle='3AD19CA3000E681A95157491C815BDC8-MSAxNjY5MTg3NzM4NjYxIDMwMDAwMCAyIDAgY24taGFuZ3pob3Utc2hhcmUtMDItMSA1IDE=', publishTime=1669187710997, nextConsumeTime=1669188038661, firstConsumeTime=1669187738661, consumedTimes=1, messageTag='Safety_Hat_Message_tt', errorMessage=null},topic:Safety_Hat_Message_tt
Process finished with exit code -1
接收到消息,订阅消息成功。
五、完成代码
可点击此处进行下载
参考文档:阿里云 消息队列 RocketMQ Java SDK