目录
一、引入消息队列之后该如何保证其高可用性
二、异步投递Async Sends
三、延迟投递和定时投递
四、ActiveMQ消费重试机制
五、死信队列
六、如何保证消息不被重复消费呢?幂等性问题你谈谈
一、引入消息队列之后该如何保证其高可用性
ActiveMQ集群模式_zoeil的博客-CSDN博客
二、异步投递Async Sends
(一)官网
http://activemq.apache.org/async-sends
(二)解释
ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。ActiveMQ默认使用异步发送的模式;除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻寒producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步发送
它可以最大化produer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加;此外它不能有效的确保消息的发送成功。在useAsyncSend=true的情况下客户端需要突忍消息丢失的可能。
(三)开启方式
①Activemq_URL带参数
public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616?jms.userAsyncSend=true";
②ActiveMQConnectionFactory的setUseAsyncSend
activeMQConnectionFactory.setUseAsyncSend(true);
(四)面试追问:异步发送如何确保发送成功
异步发送丢失消息的场景是: 生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。
由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
所以,正确的异步发送方法是需要接收回调的。
同步发送和异步发送的区别就在此,
同步发送等send不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。
回调测试
生产者
public class JMSProduce {
public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
public static final String QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
activeMQMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i = 1 ; i <= 3; i ++ ) {
TextMessage textMessage = session.createTextMessage("AsyncMessage--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"order001");
String messageID = textMessage.getJMSMessageID();
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println("成功发送消息Id:"+messageID);
}
@Override
public void onException(JMSException e) {
System.out.println("失败发送消息Id:"+messageID);
}
});
}
activeMQMessageProducer.close();
session.close();
connection.close();
System.out.println("消息发布到MQ完成……");
}
}
点击Browse查看队列具体信息
(五)总结
- 异步发送可以让生产者发的更快。
- 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。
三、延迟投递和定时投递
(一)官网
ActiveMQ
(二)案例
1.配置文件修改并重启服务
添加 schedulerSupport=true
2.java代码里面封装的辅助消息类型:ScheduleMessage
生产者
public class JMSProduce {
public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
public static final String QUEUE_NAME = "schedule";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
activeMQMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
long delay = 10*1000;
long period = 5*1000;
int repeat = 3 ;
for(int i = 1 ; i <= 3; i ++ ) {
TextMessage textMessage = session.createTextMessage("scheduleMessage--" + i);
// 延迟的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 重复投递的时间间隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 重复投递的次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
// 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。
activeMQMessageProducer.send(textMessage);
}
activeMQMessageProducer.close();
session.close();
connection.close();
System.out.println("消息发布到MQ完成……");
}
}
消费者
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
public static final String QUEUE_NAME = "schedule";
public static void main(String[] args) throws JMSException, IOException {
//1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过链接工厂,获得Connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//3.1 两个参数 ①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(queue or topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听到队列消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//连接AvtiveMQ需要等待时间
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
四、ActiveMQ消费重试机制
(一)官网
http://activemq.apache.org/redelivery-policy
(二)解释
消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。
那些情况会引起消息重发
- Client用了transactions且再session中调用了rollback
- Client用了transactions且再调用commit之前关闭或者没有commit
- Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
(三)请说说消息重发时间间隔和重发次数
间隔:1
次数:6
(四)有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次,可修改)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。
(五)属性说明
(六)代码验证
模拟上面可能发生消费重试的第二种情况
生产者代码同上
消费者
开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码:
public class JmsPoisonConsumer {
private static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "dead01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消费者接收到的消息: " + textMessage.getText());
//session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
});
//关闭资源
System.in.read();
// session.commit(); 本应该commit,这里为演示重发机制,故意不提交事务
messageConsumer.close();
session.close();
connection.close();
}
}
(七)修改默认重试次数
消费者。除橙色代码外,其他代码都和之前一样。修改重试次数为3。更多的设置请参考官网文档
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 修改默认参数,设置消息消费重试3次
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
(八)整合spring
五、死信队列
(一)官网
ActiveMQ
死信队列:异常消息规避处理的集合,主要处理失败的消息。
(二)生产环境
(三)死信队列的配置(一般采用默认)
1.sharedDeadLetterStrategy
不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
2.individualDeadLetterStrategy
可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
3.自动删除过期消息
过期消息是值生产者指定的过期时间,超过这个时间的消息。
> 类似于mysql的*,指对所有的队列都起效
4.存放非持久消息到死信队列中
六、如何保证消息不被重复消费呢?幂等性问题你谈谈
幂等性如何解决,根据messageid去查这个消息是否被消费了。