写在前面
本文看下spring message相关的内容。
1:Message?Messaging?
Message是消息的意思,是一个名词。而Messaging是一个动名词,是将消息发送出去
的意思,因此,我们的消息系统是messaging system,而非message system,像我们经常使用的消息中间件activeMQ等,就是一个messaing system,即一个用来将消息接收并且发送的系统
。同理,后文我们将要看到的JMS,也是Java messaging service。
2:为什么要有MQ
服务与服务之间,通过同步的方式调用不就行了吗?为什么非得创建MQ这么个东西呢?因为同步调用方式有如下的缺点:
1:服务与服务之间的调用关系呈网状,调用关系复杂,某个服务出现问题,会引起连锁反应,降低系统稳定性。
2:同步的调用方式,当出现大量线程阻塞等待时,会给服务器资源使用带来具体的压力,降低系统稳定性。
以上可以参考下图:
可以看到调用关系极其复杂,当有n个系统时,就会存在n*(n-1)/2
条线,就像蜘蛛网一样,密密麻麻,我们在设计系统时一定要避免这种情况的方法。为了解决这些问题,就出现了消息中间件,通过提供如下的能力,解决了以上的问题:
1:同步转异步
2:削峰填谷
此时结构就变为下图:
这样,组件和组件之间不直接产生关联,而是通过消息中间件间接的产生关系,系统的连线也都到消息中间件,连线的个数也从原来的n*(n-1)/2
减少为n
,结构更加清晰,系统运维会变得更加容易维护。
3:JMS
无规矩不成方圆,在数据库操作领域jdk中有jdbc相关的规范接口,在日志领域有JCL,SLF4J 日志门面。自然的,对于消息中间件也需要这样一个规范,这也就是就是jdk中的JMS,java messaging service,Java消息服务,在JMS中定义了queue和topic的概念,其中queue是一种生产消费的模式,消息最终只会被一个消费者接收,topic是一种发布订阅模式,消息可以被所有的订阅者接收,如下图:
JMS规范最常见的实现就是activeMQ了,所以接下来我们也以activeMQ为例,来看下如何通过spring message模块的功能来收发消息。
4:spring messaging
[源码] (https://gitee.com/dongsir2020/dongshidaddy-labs-new/tree/master/jms/src/main/java/dongshi/daddy/springmessaging)。
spring messaging模块在spring源码中的位置如下图:
为了测试,首先我们需要安装activeMQ,在这里 下载对应操作系统的安装包,下载完毕之后就可以按照如下方式启动了:
我这里是直接双击win64/activemq.bat
,运行,运行后访问http://localhost:9161/,默认是8161,因为我本地8161端口被占用了,所以修改成了9161,正常直接使用8161就行,之后点击Manage ActiveMQ broker
就可以进入如下的界面:
接着我们来使用spring messaging来向其生产和消费消息,首先定义生产者xml,如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<context:component-scan base-package="dongshi.daddy.springmessaging"/>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name = "brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value = "test.queue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name = "connectionFactory" ref="connectionFactory" />
</bean>
</beans>
接着定义生产者的类:
@Component
public class SendService {
@Autowired
JmsTemplate jmsTemplate;
public void send(final Student user) {
jmsTemplate.send("test.queue", new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(JSON.toJSONString(user));
}
});
}
}
public class JmsSender {
public static void main(String[] args) {
Student student2 = Student.create();
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:springjms-sender.xml");
SendService sendService = (SendService) context.getBean("sendService");
student2.setName("KK103");
sendService.send(student2);
System.out.println("send successfully, please visit http://localhost:8161/admin to see it");
}
}
运行JmsSender后,查看activeMQ:
接着我们定义消费者消费消息,xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<context:component-scan base-package="dongshi.daddy.springmessaging"/>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name = "brokerURL" value="tcp://localhost:61616" />
<property name="trustAllPackages" value="true" />
</bean>
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value = "test.queue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name = "connectionFactory" ref="connectionFactory" />
</bean>
<jms:listener-container
container-type="default"
connection-factory="connectionFactory"
acknowledge="auto">
<jms:listener destination="test.queue" ref="jmsListener" method="onMessage"></jms:listener>
</jms:listener-container>
</beans>
接收类:
@Component(value = "jmsListener")
public class JmsListener implements MessageListener {
//收到信息时的动作
@Override
public void onMessage(Message m) {
ObjectMessage message = (ObjectMessage) m;
try {
System.out.println("收到的信息:" + message.getObject());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public class JmsReceiver {
public static void main( String[] args ) throws IOException {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:springjms-receiver.xml");
System.in.read();
System.out.println("send successfully, please visit http://localhost:8161/admin to see it");
}
}
运行JmsReceiver,console输出:
received message: {"id":102,"name":"KK103"}
这样就完成了消息的生产和消费了。