1.引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.0</version>
</dependency>
- 生产者使用
2.1 发送普通消息
public class SendMessage {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
// 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
// 创建消息对象,指定主题、标签和消息体
Message msg = new Message("topicB", "myTag", ("RocketMQ Message").getBytes());
// 发送消息
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
// 关闭生产者
producer.shutdown();
}
}
2.2 发送延时消息
/**
* 发送延时消息
*/
public class SendDelayLevel {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
// 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
// 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
// 创建消息对象,指定主题、标签和消息体
Message msg = new Message("topicB", "myTag", ("RocketMQ DelayL Message").getBytes());
//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
msg.setDelayTimeLevel(3);
// 发送消息
// SendResult send = producer.send(msg, 10000);//同步的
System.out.println();
producer.send(msg, new SendCallback() {//异步的回调函数
@Override
public void onSuccess(SendResult sendResult) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = sdf.format(new Date());
System.out.println(format+":发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
}, 10000);
}
}
2.3发送事务消息
事务消息业务流程
1.发送方发送半事务消息
2.Broker收到半事务消息存储后返回结果
3.发送半事务消息方处理本地事务
4.发送方把本地事务处理结果以消息形式发送到Broker
5.Broker在固定的时间内(默认60秒)未收到4的确认消息,Broker为发送方发送回查消息
6.业务发送发收到Broker回查消息后,查询本地业务执行结果
7.业务方发送回查结果消息
1-4 是同步调用,5-7是异步调用。RocketMQ事务消息使用了2PC+事后补偿机制保证了最终一致性。
public class SendTransactionMessage {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer ("myproducer-group");
// 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
private AtomicInteger transactionIndex = new AtomicInteger(1);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String transactionId = message.getTransactionId();
//0:执行中,状态未知 1:执行成功 2:执行失败
localTrans.put(transactionId, 0);
try {
//
Thread.sleep(1000);
localTrans.put(transactionId, 1);
return LocalTransactionState.COMMIT_MESSAGE;
}catch(Exception e){
localTrans.put(transactionId,2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.printf("%s,事务结果回查%s%n\n", new Date(System.currentTimeMillis()),messageExt.toString());
Integer status = localTrans.get(messageExt.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 启动生产者
producer.start();
// 创建消息对象,指定主题、标签和消息体
Message msg = new Message("topicB", "myTag", ("RocketMQ Transaction Message").getBytes());
SendResult sendResult =producer.sendMessageInTransaction(msg,null);
System.out.printf("%s,半消息发送结果%s%n",new Date(System.currentTimeMillis()),sendResult);
}
}
- 消费者使用
消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。
3.1 拉取式消费
Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。
特点:由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差
3.2 推送式消费
该模式下Broker收到数据后会主动推送给Consumer。该获取方式一般实时性较高。
特点:该获取方式是典型的发布-订阅模式,即Consumer向其关联的Queue注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是Consumer去Queue中拉取消息。而这些都是基于Consumer与Broker间的长连接的。长连接的维护是需要消耗系统资源的。
@Bean
public DefaultMQPushConsumer defaultConsumer() {
log.info("ruleEngineConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();//推送式消费
// DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();//拉取式消费
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(2);
consumer.setConsumeMessageBatchMaxSize(100);//批量消费消息数目
// consumer.setMessageModel(MessageModel.BROADCASTING);//广播式消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);//默认消费模式,集群消费模式,即每个Consumer Group中的Consumer均摊所有的消息。
// 设置监听
consumer.registerMessageListener(mqConsumeMsgListenerProcessor);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setInstanceName(System.currentTimeMillis()+"dstMQMsgConsumer");
consumer.setConsumerGroup(resultsGroupName);
try {
consumer.subscribe("topicB", "*");
consumer.start();
log.info("ruleEngineConsumer 创建成功 groupName={}",resultsGroupName);
} catch (MQClientException e) {
log.error("ruleEngineConsumer 创建失败!");
}
return consumer;
}