消息发送
发送同步消息
public class SyncProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer(/*please_rename_unique_group_name*/"group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0;i<100;i++){ Message msg=new Message("base" /*Topic*/ ,"Tag1" /*TagA*/ ,("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); /*Message*/ SendResult sendResult=producer.send(msg); System.out.println("消息ID"+sendResult.getMessageQueue().getQueueId()); System.out.printf("%s%n",sendResult); // TimeUnit.SECONDS.sleep(1); } producer.shutdown(); } }
发送异步消息
DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i=0;i<100;i++){ Message msg=new Message("base" /*Topic*/ ,"Tag2" /*TagA*/ ,("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); /*Message*/ producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送结果"+sendResult); } @Override public void onException(Throwable throwable) { System.out.println("发送异常:"+throwable.getMessage()); } }); TimeUnit.SECONDS.sleep(1); } producer.shutdown();
单向发送消息
DefaultMQProducer producer=new DefaultMQProducer(/*please_rename_unique_group_name*/"group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0;i<5;i++){ Message msg=new Message("base" /*Topic*/ ,"Tag3" /*TagA*/ ,("Hello RocketMQ单向消息"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); /*Message*/ // SendResult sendResult=producer.send(msg); producer.sendOneway(msg); // System.out.println("消息ID"+sendResult.getMessageQueue().getQueueId()); // System.out.printf("%s%n",sendResult); TimeUnit.SECONDS.sleep(1); } producer.shutdown();
消费信息
负载均衡模式(默认模式)
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
顺序消息
消息顺序
全局消息顺序
局部消息顺序
例如:订单的顺序流程:创建、付款、推送、完成
延时消息
批量消息
过滤消息
事务消息
事务流程
事务消息发送及提交
-
发送消息(half消息)
-
服务端响应消息写入结果
-
根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
-
根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消费对消费者可见)
事务补偿
-
对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次回查
-
Producer收到回查消息,检查回查消息对应的本地事务状态
-
根据本地事务状态,重新提交Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况
事务消息状态
三个状态:提交状态、回滚状态、中间状态