异步技术
企业级应用中广泛使用的三种异步消息传递技术
原文链接:https://blog.csdn.net/qq_55917018/article/details/122122218
三种异步消息传递技术
JMS (java message service)
一个Java规范,等同于JDBC规范,提供了与消息服务相关的API接口
JMS只 是一套接口,并没有给予实现,各大厂商和开源组织都对JMS实现不同产品,这些产品 包括:Apache的ActiveMQ、RocketMQ(前期属于阿里巴巴)、IBM的MQSeries、Microsoft的MSMQ和 VM的RabbitMQ(前期属于Spring source)等等,它们基本都遵循JMS规范。 这里介绍的ActiveMQ是最早的JMS开源产品,在Java世界使用比较广泛,在中等规模的 应用中是完全胜任的。当然,如果要真正面面对大型互联网应,要解决超高并发和吞吐 量问题,现在更推荐使用RabbitMQ、Kafuka或者RocketMQ等新一代的分布式产品,但 它们的基本原理和用法是相通的.
JMS消息模型
- peer-2-peer: 点对点模型,消息发送到一个队列中,队列保存消息。队列中的消息只能被一个消费者消费
- publish-subscribe: 发布订阅模型,消息可以被多个消费者消息,消费者和生产者完全独立,不需要感知对方的存在
JMS 消息种类
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
- ObjectMessage
- Message
JMS实现:ActiveMQ、Redis、HornetMQ、RabbitMQ、RocketMQ
AMQP(advanced message queuing protocol)
一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式
**解决:**服务跨平台,服务器供应商、生产者,消费者可以使用不同的语言来实现
AMQP消息模型:
- direct exchange
- fanoout exchange
- topic exchange
- headers exchange
- system exchange
消息种类:byte[]
- AMQP实现:RabbitMQ、StormMQ、RocketMQ
MQTT(Message Queueing Telemetry Transpory)
消息队列遥测传输、专为小设备涉及、是物联网生态系统中主要成分之一
实测
ActiveMQ
下载
官网下载
安装(windows)
启动
服务端口:61616
管理后台端口:8161
ActiveMQ后台:http://127.0.0.1:8161
默认账号、密码:admin\admin
springboot 整合
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
点对点模式
server:
port: 8080
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
template:
default-destination: com.yinhai
pub-sub-domain: false
destination:可指定
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
JmsMessagingTemplate jmsTemplate;
// private ArrayList list = new ArrayList();
@Override
public String sendMessage(String id) {
System.out.println("消息已经放入队列中了:"+id);
jmsTemplate.convertAndSend("order.queue.id",id);
// final boolean add = list.add(id);
return "1";
}
@Override
public String doMessage() {
// final Object id = list.remove(0);
final String s = jmsTemplate.receiveAndConvert("order.queue.id",String.class);
System.out.println("消息已经发送成功了id:"+s);
return "1";
}
}
JmsListener 监听
监听到队列有消息放入后立马通过入参接收消息
@SendTo(“队列名”)
将入参放入另外一个消息队列
@Component
public class ActiveMQListener {
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "order.queue.id")
@SendTo("other")
private String snedMessage(String message){
// final String s = (String) jmsTemplate.receiveAndConvert();
System.out.println("-------------收到后发送:"+message);
return message;
}
}
RabbitMQ
RabbitMQ 是基于Erlang语言编写的,所以先要安装Erlang语言环境
自行去官网下载
安装Erlang
下载完成后配置环境变量ERLANG_HOME
Path
安装RabbitMQ
自行去官网下载,windows下exe 直接傻瓜式安装
启动:
start/stop
注:启动时报下图错误则是因为已经自启动了
注:如果Erlang版本和Rabbitmq版本不匹配报其他错误
启动管理界面插件
rabbitmq-plugins.bat enable rabbitmq_management
服务端口:5672 管理后台端口:15672
管理界面:http://localhost:15672/
默认密码: guest/guest
Springboot 整合
坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
server:
port: 8080
spring:
rabbitmq:
host: localhost
port: 5672
队列、交换机配置 ---- DirectExchange
@Configuration
public class RabbimtDirectConfig {
@Bean
public Queue queue() {
// durable 是否持久化 默认false
// exclusive 是否当前连接专用 默认false 当前连接关闭后即被删除
// autoDelete 是否自动删除 默认false 生产者消费者不再使用就删除
return new Queue("direct.queue");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
@Bean
Binding binding(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("direct");
}
}
放入消息
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
// private ArrayList list = new ArrayList();
@Override
public String sendMessage(String id) {
System.out.println("消息已经放入队列中了:"+id);
rabbitTemplate.convertAndSend("direct.exchange", "direct", id);
// final boolean add = list.add(id);
return "1";
}
@Override
public String doMessage() {
// final Object id = list.remove(0);
// final String s = jmsTemplate.receiveAndConvert("order.queue.id",String.class);
// System.out.println("消息已经发送成功了id:"+s);
return "1";
}
}
监听
@Component
public class RabbitListenre {
@RabbitListener(queues="direct.queue")
public void listen(String id) {
System.out.println("从队列中取出"+id);
}
}
队列、交换机配置 ---- TopExchange
匹配规则:模糊匹配
一 :(*)用来表示一个单词,该单词必须出现
二 :(#):用来表示任意数量
@Configuration
public class RabbimtTopicConfig {
@Bean
public Queue topicqueue() {
return new Queue("topic.queue");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
@Bean
Binding bindingTopic(Queue topicqueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicqueue).to(topicExchange).with("topic.#");
}
}
@Override
public String sendMessage(String id) {
System.out.println("消息已经放入队列中了:"+id);
rabbitTemplate.convertAndSend("topic.exchange", "topic.asdasd.asds", id);
// final boolean add = list.add(id);
return "1";
}
消息监听
@Component
public class RabbitListenre {
@RabbitListener(queues="topic.queue")
public void listen(String id) {
System.out.println("从队列中取出"+id);
}
}