RocketMQ整合代码

news2025/1/22 17:05:32

RocketMQ整合代码

一 构建Java基础环境

在maven项⽬中构建出RocketMQ消息示例的基础环境,即创建⽣产者程序和消费者程序。通过⽣产者和消费者了解RocketMQ操作消息的原⽣API

在这里插入图片描述

引⼊依赖

 <dependencies>
 	<dependency>
 		<groupId>org.apache.rocketmq</groupId>
 		<artifactId>rocketmq-client</artifactId>
 		<version>4.7.1</version>
 	</dependency>
 </dependencies>

编写⽣产者程序

public class SyncProducer {
    public static void main(String[] args) throws
            MQClientException, UnsupportedEncodingException,
            RemotingException, InterruptedException, MQBrokerException {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("producerGroup1");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.253.101:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

编写消费者程序

public class MyConsumer {
    public static void main(String[] args) throws MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // Specify name server addresses.
        consumer.setNamesrvAddr("172.16.253.101:9876");
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus
            consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s % n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //Launch the consumer instance.
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

启动消费者和⽣产者,验证消息的收发

在这里插入图片描述

1. 简单消息示例

简单消息分成三种:同步消息、异步消息、单向消息。

1.1 同步消息

⽣产者发送消息后,必须等待broker返回信息后才继续之后的业务逻辑,在broker返回信息之前,⽣产者阻塞等待

同步消息的应⽤场景:如重要通知消息、短信通知、短信营销系统等。

public class SyncProducer {
    public static void main(String[] args) throws
            MQClientException, UnsupportedEncodingException,
            RemotingException, InterruptedException, MQBrokerException {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.253.101:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

1.2 异步消息

⽣产者发完消息后,不需要等待broker的回信,可以直接执⾏之后的业务逻辑。⽣产者提供⼀个回调函数供broker调⽤,体现了异步的⽅式。

异步传输⼀般⽤于响应时间敏感的业务场景。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.253.101:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

1. 3 单向消息

⽣产者发送完消息后不需要等待任何回复,直接进⾏之后的业务逻辑,单向传输⽤于需要中等可靠性的情况,例如⽇志收集。

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.253.101:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);
        producer.shutdown();
    }
}

2. 顺序消息

顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执⾏。顺序消息分成两种:局部顺序和全局顺序。

2.1 局部顺序

局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的。消费者使⽤MessageListenerOrderly类做消息监听,实现局部顺序。

public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("OrderTopicTest", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2.2 全局顺序

消费者消费全部消息都是顺序的,只能通过⼀个某个topic只有⼀个队列才能实现,这种应⽤场景较少,且性能较差。

2.3 乱序消费

消费者消费消息不需要关注消息的顺序。消费者使⽤MessageListenerConcurrentl类做消息监听。

public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("OrderTopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

3. ⼴播消息

⼴播是向主题(topic)的所有订阅者发送消息。订阅同⼀个topic的多个消费者,能全量收到⽣产者发送的所有消息。

消费者

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

⽣产者

public class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest", "TagA", "OrderID188",
                    ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

4. 延迟消息

延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递。

消息⽣产者

public class ScheduledProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message" + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
        // Shutdown producer after use.
        producer.shutdown();
    }
}

消息消费者

public class ScheduledConsumer {
    public static void main(String[] args) throws MQClientException {
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        // Subscribe topics
        consumer.subscribe("TestTopic", "*");
        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" +
                            message.getMsgId() + "] "
                            + (System.currentTimeMillis() -
                            message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Launch consumer
        consumer.start();
    }
}

延迟等级:RocketMQ设计了18个延迟等级,分别是:

 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

5. 批量消息

批量发送消息提⾼了传递⼩消息的性能。

使⽤限制:同⼀批次的消息应该具有:相同的主题、相同的 waitStoreMsgOK 并且不⽀持延迟消息和事务消息。

使⽤批量消息

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
        producer.send(messages);
        producer.shutdown();
    }
}

超出限制的批量消息:官⽅建议批量消息的总⼤⼩不应超过1m,实际不应超过4m。如果超过4m的批量消息需要进⾏分批处理,同时设置broker的配置参数为4m(在broker的配置⽂件中修改: maxMessageSize=4194304 )

/*
 * ⼤批量消息的处理
 */
public class MaxBatchProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();
        //large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100);
        for (int i = 0; i < 100; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }
        // producer.send(messages);
        //split the large batch into small ones:
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }
        producer.shutdown();
    }
}

ListSplitter

/*
 * 批量消息
 */
public class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() +
                    message.getBody().length;
            Map<String, String> properties =
                    message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > sizeLimit) {
                //it is unexpected that single message exceeds the  sizeLimit
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break,otherwise just break nextIndex ++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

6. 过滤消息

在⼤多数情况下,标签是⼀种简单⽽有⽤的设计,可以⽤来选择您想要的消息。

6.1 普通过滤

tag过滤的⽣产者

public class TagProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("TagFilterTest", tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

tag过滤的消费者

public class TagConsumer {
    public static void main(String[] args) throws
            MQClientException {
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.subscribe("TagFilterTest", "TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus
            consumeMessage(List<MessageExt> msgs,

                           ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s % n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

6.2 使用sql过滤

消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是⼀条消息只能有⼀个标签,这可能不适⽤于复杂的场景。在这种情况下,您可以使⽤ SQL 表达式来过滤掉消息。

使⽤SQL过滤:SQL 功能可以通过您在发送消息时输⼊的属性进⾏⼀些计算。在 RocketMQ 定义的语法下,可以实现⼀些有趣的逻辑。

使⽤注意:只有推模式的消费者可以使⽤SQL过滤。拉模式是⽤不了的。

这是⼀个例⼦:

------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------

语法:RocketMQ 只定义了⼀些基本的语法来⽀持这个特性,也可以轻松扩展它。

1. 数值⽐较,如`>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
2. 字符⽐较,如`=`, `<>`, `IN`;
3. `IS NULL`或`IS NOT NULL`;
4. 逻辑`AND`, `OR`, `NOT`;

常量类型有:
1. 数字,如 123、3.1415;
2. 字符,如'abc',必须⽤单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE`或`FALSE`;

SQL过滤的⽣产者示例

public class SQLProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("SqlFilterTest",
                    tags[i % tags.length],
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // Set some properties.
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

SQL过滤的消费者示例

public class SQLConsumer {
    public static void main(String[] args) throws
            MQClientException {
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("please_rename_unique_group_name");
        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFilterTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in('TagA', 'TagB'))"
                        + "and (a is not null and a between 0 and 3) "));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus
            consumeMessage(List<MessageExt> msgs,

                           ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}         

7. 事务消息

事务消息的定义:它可以被认为是⼀个两阶段的提交消息实现,以确保分布式系统的最终⼀致性。事务性消息确保本地事务的执⾏和消息的发送可以原⼦地执⾏。

事务消息有三种状态:

  1. TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。

  2. TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。

  3. TransactionStatus.Unknown:中间状态,表示需要MQ回查才能确定状态。

使⽤限制

  1. 事务性消息没有调度和批处理⽀持。
  2. 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但⽤户可以通过更改transactionCheckMax来更改此限制,参数在broker的配置中,如果⼀条消息的检查次数超过transactionCheckMax次,broker默认会丢弃这条消息,同时打印错误⽇志。⽤户可以通过重写AbstractTransactionCheckListener类来改变这种⾏为。
  3. 事务消息将在⼀定时间后检查,该时间由代理配置中的参数transactionTimeout确定。并且⽤户也可以在发送事务消息时通过设置⽤户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,这个参数优先于transactionMsgTimeout参数。
  4. ⼀个事务性消息可能会被检查或消费不⽌⼀次。
  5. 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使⽤同步双写机制。
  6. 事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务性消息允许向后查询。MQ 服务器通过其⽣产者 ID 查询客户端。

事务消息的实现流程

在这里插入图片描述

生产者

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new
                TransactionListenerImpl();
        TransactionMQProducer producer = new
                TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("172.16.253.101:9876");
        ExecutorService executorService = new
                ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new
                ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-checkthread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC",
                "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest", tags[i %
                                tags.length], "KEY" + i,
                                ("Hello RocketMQ " +
                                        i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult =
                        producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException |
                    UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}  

本地事务处理-TransactionListener

public class TransactionListenerImpl implements TransactionListener {
    /
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        if (StringUtils.contains(tags, "TagA")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.contains(tags, "TagB")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

    /
     * When no response to prepare(half) message. broker will send
     * check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt
                                                               msg) {
        String tags = msg.getTags();
        if (StringUtils.contains(tags, "TagC")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.contains(tags, "TagD")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }
}

消费者

public class TransactionConsumer {
    public static void main(String[] args) throws
            MQClientException {
        //1.创建消费者对象
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("my-consumer-group1");
        //2.指明nameserver的地址
        consumer.setNamesrvAddr("172.16.253.101:9876");
        //3.订阅主题:topic 和过滤消息⽤的tag表达式
        consumer.subscribe("TopicTest", "*");
        //4.创建⼀个监听器,当broker把消息推过来时调⽤
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus
            consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
                    context) {
                for (MessageExt msg : msgs) {
					// System.out.println("收到的消息:" + new String(msg.getBody()));
                    System.out.println("收到的消息:" + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

二 SpringBoot整合RocketMQ

Springboot提供了快捷操作RocketMQ的RocketMQTemplate对象。

1.引⼊依赖

注意依赖的版本需要和RocketMQ的版本相同。

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.1.1</version>
</dependency>

2.编写配置⽂件

# 应⽤名称
spring.application.name=my-boot-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8080
# nameserver地址
rocketmq.name-server=172.16.253.101:9876
# 配置⽣产者组
rocketmq.producer.group=my-producer-boot-group1

3.编写⽣产者发送普通消息

@Component
public class MyProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /
     * 发送消息
     *
     * @param msg   消息内容
     * @param topic ⽬标主题
     */
    public void sendMessage(String msg, String topic) {
        //将msg转换成Message对象并发送
        rocketMQTemplate.convertAndSend(topic, msg);
    }
}

4.编写JUnit单元测试发送消息

    @Test
    void testSendMessage() {
        String topic = "MyBootTopic";
        String message = "hello spring boot rocketmq";
        producer.sendMessage(message, topic);
    }

5.创建消费者程序

@Component
@RocketMQMessageListener(consumerGroup = "my-boot-consumer-group", topic = "MyBootTopic")
public class MyConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("收到的消息:" + msg);
    }
}

6.发送事务消息

编写⽣产者⽅法

	public void sendMessageInTransaction(String msg, String topic) throws Exception {
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            //注意该message为org.springframework.messaging.Message
            Message<String> message =
                    MessageBuilder.withPayload(msg).build();
            //topic和tag整合在⼀起,以":"隔开
            String destination = topic + ":" + tags[i % tags.length];
            //第⼀个destination为消息要发到的⽬的地,第⼆个destination为
            消协携带的业务数据
            TransactionSendResult sendResult =
                    rocketMQTemplate.sendMessageInTransaction(destination, message,
                            destination);
            System.out.println(sendResult);
            Thread.sleep(10);
        }
    }

编写事务监听器类

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListenerImpl implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState
    executeLocalTransaction(Message msg, Object arg) {
        //获得业务参数中的数据
        String destination = (String) arg;
        //使⽤RocketMQUtil将spring的message转换成rocketmq的message
        org.apache.rocketmq.common.message.Message message =
                RocketMQUtil.convertToRocketMessage(
                        new StringMessageConverter(), "utf-8", destination, msg);
        //获得消息中的业务数据tags
        String tags = message.getTags();
        if (StringUtils.contains(tags, "TagA")) {
            //提交本地事务
            return RocketMQLocalTransactionState.COMMIT;
        } else if (StringUtils.contains(tags, "TagB")) {
            //回滚
            return RocketMQLocalTransactionState.ROLLBACK;
        } else {
            //中间状态
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState
    checkLocalTransaction(Message msg) {
        return null;
    }
}

7.编写单元测试发送事务消息

	@Test
	void testSendTransactionMessage() throws Exception {
 		String topic = "MyBootTopic";
 		String message = "hello transaction spring boot rocketmq";
 		producer.sendMessageInTransaction(message,topic);
 	}

三 Spring Cloud Stream整合RocketMQ

1.Spring Cloud Stream介绍

Spring Cloud Stream 是⼀个框架,⽤于构建与共享消息系统连接的⾼度可扩展的事件驱动微服务。

该框架提供了⼀个灵活的编程模型,该模型基于已经建⽴和熟悉的 Spring 习惯⽤法和最佳实践,包括对持久 pub/sub 语义、消费者组和有状态分区的⽀持。

在这里插入图片描述

Spring Cloud Stream 的核⼼构建块是:

  1. Destination Binders:负责提供与外部消息传递系统集成的组件。
  2. Destination Bindings:外部消息系统和最终⽤户提供的应⽤程序代码(⽣产者/消费者)之间的桥梁。
  3. Message:⽣产者和消费者⽤来与⽬标绑定器(以及通过外部消息系统的其他应⽤程序)进⾏通信的规范数据结构。

2.编写⽣产者

引⼊依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.qf</groupId>
    <artifactId>my-s-rocketmq-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>my-s-rocketmq-demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-
            8
        </project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-
            8
        </project.reporting.outputEncoding>
        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
        <spring-cloud-alibaba.version>2.2.2.RELEASE
        </springcloud-alibaba.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-streamrocketmq</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibabadependencies</artifactId>
                <version>${spring-cloud-alibaba.version}
                </version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>

                    <mainClass>com.qf.my.s.rocketmq.demo.MySRocketmqDemoApplication
                    <
                    /mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

注意,Rocket官⽅维护的spring-cloud-stream依赖中rocket⽤的版本为4.4,需要排除后再加⼊4.7.1的依赖。

编写配置⽂件

# 应⽤名称
spring.application.name=my-s-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8080
# output ⽣产者
spring.cloud.stream.bindings.output.destination=TopicTest
# 配置rocketMQ
spring.cloud.stream.rocketmq.binder.name-server=172.16.253.101:9876

启动类上打上注解;其中 @EnableBinding(Source.class) 指向配置⽂件的output参数

@EnableBinding(Source.class)
@SpringBootApplication
public class MySRocketmqDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(MySRocketmqDemoApplication.class,
                args);
    }
}

编写生产者程序

@Component
public class MyProducer {
    @Resource
    private Source source;

    public void sendMessage(String msg) {
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "TagA");
        MessageHeaders messageHeaders = new
                MessageHeaders(headers);
        Message<String> message =
                MessageBuilder.createMessage(msg, messageHeaders);
        source.output().send(message);
    }
}   

编写单元测试发送消息

@SpringBootTest
class MySRocketmqDemoApplicationTests {
    @Autowired
    private MyProducer producer;

    @Test
    void testSendMessage() {
        producer.sendMessage("hello spring cloud stream");
    }
}

3.编写消费者

引⼊依赖;与⽣产者相同

编写配置⽂件

# 应⽤名称
spring.application.name=my-s-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8081
# input 消费者
spring.cloud.stream.bindings.input.destination=TopicTest
spring.cloud.stream.bindings.input.group=spring-cloud-strema-group
# 配置rocketMQ
spring.cloud.stream.rocketmq.binder.name-server=172.16.253.101:9876

启动类上打上注解;其中@EnableBinding(Sink.class)指向配置⽂件的input参数

@EnableBinding(Sink.class)
@SpringBootApplication
public class MySpringCConsumerDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(MySpringCConsumerDemoApplication.class, args);
    }
}

编写消费者程序

@Component
public class MyConsumer {
 	@StreamListener(Sink.INPUT)
 	public void onMessage(String message){
 		System.out.println("收到的消息:"+message);
 	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/455383.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HCIP——交换(更新中)

园区网架构 交换机实现了以下功能 无限的传输距离——识别&#xff0c;重写电信号&#xff08;帧&#xff09;保证信息完整彻底解决了冲突二层单播——MAC地址表提高端口密度 MAC 单播地址&#xff1a;MAC地址第一个字节第8位为0 组播地址&#xff1a;MAC地址第一个字节第8位…

我完全手写的Resnet50网络,终于把猫识别出来了

大家好啊&#xff0c;我是董董灿。 经常看我文章的同学&#xff0c;可能知道最近我在做一个小项目——《从零手写Resnet50实战》。 从零开始&#xff0c;用最简单的程序语言&#xff0c;不借用任何第三方库&#xff0c;完成Resnet50的所有算法实现和网络结构搭建&#xff0c;…

SOS大规模敏捷开发项目管理完整版(Scrum of Scrums)

Scrum of Scrums是轻量化的规模化敏捷管理模式&#xff0c;Leangoo领歌可以完美支持Scrum of Scrums多团队敏捷管理。 Scrum of Scrums的场景 Scrum of Scrums是指多个敏捷团队共同开发一个大型产品、项目或解决方案。Leangoo提供了多团队场景下的产品路线图规划、需求管理、…

2023首场亚马逊云科技行业峰会,医疗与生命科学年度盛会精彩先行

从实验室扩展到真实世界&#xff0c;从前沿技术探索到医疗生命科学行业的快速创新实践&#xff0c;亚马逊云科技不断地通过数字化助力医疗和生命科学的行业创新。由上海徐汇区科委指导&#xff0c;上海枫林集团作为支持单位&#xff0c;亚马逊云科技主办的2023亚马逊云科技医疗…

如何评估小程序开发费用:从项目规模到技术需求

作为一种越来越受欢迎的移动应用&#xff0c;小程序的开发费用是许多企业和个人考虑的重要因素之一。但是&#xff0c;要确定小程序开发费用并不是一件容易的事情&#xff0c;因为它涉及到多个因素&#xff0c;从项目规模到技术需求。 项目规模 小程序开发的费用通常与项目规…

docker-Dockerfile文件使用配置、自定义构建镜像、docker build

Dockerfile使用 docker build构建新的镜像参数解释 Dockerfile格式基础格式FROMCOPYADDRUNCMDENTRYPOINTENVARGVOLUMEEXPOSEWORKDIRUSERHEALTHCHECKONBUILDLABEL 命令摘要 docker build构建新的镜像 命令&#xff1a;docker build -t some-content-nginx . 参数解释 docker …

2023团体程序设计天梯赛--正式赛

L1-1 最好的文档 有一位软件工程师说过一句很有道理的话&#xff1a;“Good code is its own best documentation.”&#xff08;好代码本身就是最好的文档&#xff09;。本题就请你直接在屏幕上输出这句话。 输入格式&#xff1a; 本题没有输入。 输出格式&#xff1a; 在…

Pytorch的几种常用优化器

文章目录 AdagradSGDRMSpropAdamAdamW Adagrad Adagrad是一种可以自动调节每个参数更新的梯度的优化器&#xff0c;也可以做到在梯度平缓时走的步长大&#xff0c;在梯度小时走的步长小&#xff0c;从而防止loss出现剧烈震荡的情况。这里默认你已知道了他的原理了&#xff0c;…

SpringBoot自动配置原理详解

1 前言 之前也写过一篇类似的文章&#xff0c;但是当时理解的并不是很深入&#xff0c;所以一直想重新写&#xff0c;但是一直没有时间&#xff0c;就拖到了现在。这篇文章可能会很长&#xff0c;因为在讲解自动配置的过程中还会衍生出其他一些重要的知识点&#xff0c;我也会…

2023文本定位模型选型调研

背景 时间点&#xff1a;2023年03月 场景&#xff1a;做一个通用型的多种证件解析服务 需求&#xff1a;调研一种又新又快的定位模型。要求&#xff1a;1&#xff09;支持倾斜的文字&#xff0c;可以是4点定位或分割法后获取box&#xff0c;但不能是2点的定位&#xff1b;2&…

2023.4.23第五十次周报

目录 前言 文献阅读&#xff1a;基于ARIMA-WOA-LSTM模型的空气污染物预测 背景 ARIMA-WOA-LSTM模型 思路 主要贡献 积分移动平均自回归 &#xff08;ARIMA&#xff09; 鲸鱼优化算法 搜索超参数 CEEMDAN 结论 LSTM-Kriging 主要目标 理论猜想 问1&#xff1a…

如何申请百度地图开发者AK和基本使用,并解决Uncaught ReferenceError: BMapGL is not defined的错误

文章目录 1. 文章引言2. 申请AK3. 使用AK4. 解决BMapGL is not defined的错误5. 文末总结 1. 文章引言 今天在学习amis框架中的地理位置(LocationPicker)的组件&#xff0c;如下图所示&#xff1a; 关于amis的更多了解&#xff0c;可以参考博文&#xff1a;百度低代码amis框架的…

适合学生的平价蓝牙耳机有哪些?学生平价蓝牙耳机推荐

随着蓝牙耳机的使用越来越频繁&#xff0c;近几年也出现了很多优质的蓝牙耳机&#xff0c;不仅有着超高的性价比&#xff0c;而且使用体验也有了很大的突破。接下来&#xff0c;我来给大家推荐几款适合学生使用的平价蓝牙耳机&#xff0c;可以当个参考。 一、南卡小音舱Lite2蓝…

Java基础--->基础部分(1)

文章目录 Java语言特点JVM、JRE和JDK的关系什么是字节码&#xff1f;采用字节码的好处是什么&#xff1f;面向对象面向对象的三大特征&#xff1a;封装&#xff0c;继承&#xff0c;多态关键字抽象类和接口特点和区别和equals的区别String、StringBuffer、StringBuilder异常 Ja…

中医脉诊仪:结合传统与现代技术的诊断工具

一、引言 随着科技的不断发展&#xff0c;医学领域也取得了举世瞩目的进步。中医作为一种古老的医学体系&#xff0c;始终保持着其独特的魅力。脉诊作为中医诊断的重要方法之一&#xff0c;历经千年的发展和传承&#xff0c;如今在现代科技的助力下&#xff0c;诞生了中医脉诊…

PostgreSQL标准复制方案

集群拓扑 假设我们使用4单元的标准配置&#xff1a;主库&#xff0c;同步从库&#xff0c;延迟备库&#xff0c;远程备库&#xff0c;分别用字母M,S,O,R标识。 M&#xff1a;Master, Main, Primary, Leader, 主库&#xff0c;权威数据源。S: Slave, Secondary, Standby, Sync…

CTFSHOW web入门——web37

过滤了flag&#xff0c;即c中不能有flag字段。 include包含变量c&#xff0c;因此可以利用文件包含漏洞&#xff0c;让变量c变成php代码&#xff0c;然后通过include函数执行。可以使用data协议获取flag.php文件中的内容 data://协议 通常可以用来执行PHP代码 data://text/pl…

FFmpeg PCM 编码 AAC

1. 概要说明与流程图 1.1 概要: 1) FFmpeg 已经废弃了 AV_SAMPLE_FMT_S16 格式 PCM 编码 AAC,也就是说如果使用 FFmpeg 自带的 AAC 编码器,必须做音频的重采样(重采样为:AV_SAMPLE_FMT_FLTP),否则AAC编码是失败的。 2) 传输 PCM 数据时,采取截取缓存机制,解决接收数据包…

8 年开发告诉你,API 是什么?如何看懂 API 文档

API 指的是应用程序编程接口&#xff0c;它是应用程序之间通信的一种方式&#xff0c;允许应用程序之间相互交互和传输数据。 API 文档是编写 API 的开发人员所提供的用户使用说明&#xff0c;通常包括 API 的用途、参数、请求示例、返回格式等信息&#xff0c;以便开发人员使用…

使用ltp进行三元组提取的实战代码

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…