package com.ldj.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
/**
* User: ldj
* Date: 2024/5/26
* Time: 15:09
* Description: 局部顺序消息
*/
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("produce-group-order");
producer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");
producer.start();
/**
* 局部顺序消息的要点的分2级,拿最外层的id作为计算队列下标,让相同一级的消息进入同一个队列
*/
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
int orderId = i;
Message message = new Message("OrderTopic", "orderMessage", ("oreder_step[" + orderId + "-" + j + "]").getBytes(StandardCharsets.UTF_8));
producer.send(message, (mqs, msg, arg) -> {
Integer id = (Integer) arg;
if (id != null) {
return mqs.get(id.hashCode() % mqs.size());
}
throw new RuntimeException("缺少决定消息顺序的id!");
}, orderId);
}
}
}
}