Java实现Mqtt收发消息
文章目录
Java实现Mqtt收发消息 windows mqtt 平台服务搭建 mqtt 客户端工具:mqttbox 整体代码结构 mqtt基础参数配置类 mqtt客户端连接 mqtt接收的消息处理类 对应的MqttService注解和MqttTopic注解
MqttGateway 发送消息 指定topic接收处理方法
java实现mqtt对消息的交互,mqtt 的topic主题概念是相互的,这个要先理解好,
发布者和订阅者是对等的,它们之间可以相互发送消息,而不需要建立任何连接或状态
使用到windows mqtt 平台服务搭建( 不是必须安装,仅 windows 测试需要此步骤)
mqtt 客户端工具:mqttbox
废话不多说,直接上代码,上工具,准备工作先做好,以及我的实现过程
windows mqtt 平台服务搭建
下载apache-apollo-1.7.1-windows版本,这里提供一个链接地址
http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/
提供一个现有教程:
https://blog.csdn.net/qq_42315062/article/details/125890181
搭建完成后:登录 http://127.0.0.1:61680 即可,默认账号 admin,密码 password,
注意 这里网页的端口是 61680 ,但是 mqtt 服务的端口是 61613
mqtt 客户端工具:mqttbox
这里提供一个下载地方,也可以自行下载
https://download.csdn.net/download/qq_39671088/85740566?utm_medium= distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3& depth_1-utm_source= distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3& spm = 1003.2020 .3001.6616.1
整体代码结构
mqtt基础参数配置类
@Data
@Component
@ConfigurationProperties ( "mqtt" )
public class MqttProperties {
private String username;
private String password;
private String hostUrl;
private String inClientId;
private String outClientId;
private String clientId;
private String defaultTopic;
private int timeout;
private int keepalive;
private boolean clearSession;
}
mqtt客户端连接
import com. bsj. boyun. core. tool. utils. ExceptionUtil ;
import org. eclipse. paho. client. mqttv3. MqttConnectOptions ;
import org. eclipse. paho. client. mqttv3. MqttException ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. integration. channel. ExecutorChannel ;
import org. springframework. integration. dsl. IntegrationFlow ;
import org. springframework. integration. dsl. IntegrationFlows ;
import org. springframework. integration. mqtt. core. DefaultMqttPahoClientFactory ;
import org. springframework. integration. mqtt. core. MqttPahoClientFactory ;
import org. springframework. integration. mqtt. inbound. MqttPahoMessageDrivenChannelAdapter ;
import org. springframework. integration. mqtt. outbound. MqttPahoMessageHandler ;
import org. springframework. integration. mqtt. support. DefaultPahoMessageConverter ;
import org. springframework. scheduling. concurrent. ThreadPoolTaskExecutor ;
import java. util. concurrent. ThreadPoolExecutor ;
@Configuration
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttMessageHandle mqttMessageHandle;
private static String outboundChannel = "mqttOutboundChannel" ;
@Bean
public MqttPahoClientFactory mqttPahoClientFactory ( ) throws MqttException {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory ( ) ;
try {
MqttConnectOptions options = new MqttConnectOptions ( ) ;
options. setServerURIs ( mqttProperties. getHostUrl ( ) . split ( "," ) ) ;
options. setUserName ( mqttProperties. getUsername ( ) ) ;
options. setPassword ( mqttProperties. getPassword ( ) . toCharArray ( ) ) ;
factory. setConnectionOptions ( options) ;
} catch ( Exception e) {
System . out. println ( "mqtt初始化连接异常:" + ExceptionUtil . getStackStr ( e) ) ;
}
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter ( MqttPahoClientFactory factory) {
return new MqttPahoMessageDrivenChannelAdapter ( mqttProperties. getInClientId ( ) , factory, mqttProperties. getDefaultTopic ( ) . split ( "," ) ) ;
}
@Bean
public IntegrationFlow mqttInbound ( MqttPahoMessageDrivenChannelAdapter adapter) {
adapter. setCompletionTimeout ( 5000 ) ;
adapter. setQos ( 1 ) ;
return IntegrationFlows . from ( adapter)
. channel ( new ExecutorChannel ( mqttThreadPoolTaskExecutor ( ) ) )
. handle ( mqttMessageHandle) . get ( ) ;
}
@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor ( ) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ;
int maxPoolSize = 200 ;
executor. setMaxPoolSize ( maxPoolSize) ;
int corePoolSize = 50 ;
executor. setCorePoolSize ( corePoolSize) ;
int queueCapacity = 1000 ;
executor. setQueueCapacity ( queueCapacity) ;
int keepAliveSeconds = 300 ;
executor. setKeepAliveSeconds ( keepAliveSeconds) ;
executor. setRejectedExecutionHandler ( new ThreadPoolExecutor. CallerRunsPolicy ( ) ) ;
return executor;
}
@Bean
public IntegrationFlow mqttOutboundFlow ( MqttPahoClientFactory factory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler ( mqttProperties. getOutClientId ( ) , factory) ;
handler. setAsync ( true ) ;
handler. setConverter ( new DefaultPahoMessageConverter ( ) ) ;
handler. setDefaultTopic ( mqttProperties. getDefaultTopic ( ) . split ( "," ) [ 0 ] ) ;
return IntegrationFlows . from ( outboundChannel) . handle ( handler) . get ( ) ;
}
}
mqtt接收的消息处理类
import com. bsj. studentcard. gateway. attendance. mqtt. annotation. MqttService ;
import com. bsj. studentcard. gateway. attendance. mqtt. annotation. MqttTopic ;
import com. bsj. studentcard. gateway. attendance. util. SpringUtils ;
import lombok. extern. slf4j. Slf4j ;
import org. springframework. integration. mqtt. support. MqttHeaders ;
import org. springframework. messaging. Message ;
import org. springframework. messaging. MessageHandler ;
import org. springframework. messaging. MessagingException ;
import org. springframework. stereotype. Component ;
import java. lang. reflect. InvocationTargetException ;
import java. lang. reflect. Method ;
import java. util. Map ;
@Component
@Slf4j
public class MqttMessageHandle implements MessageHandler {
public static Map < String , Object > mqttServices;
@Override
public void handleMessage ( Message < ? > message) throws MessagingException {
getMqttTopicService ( message) ;
}
public Map < String , Object > getMqttServices ( ) {
if ( mqttServices == null ) {
mqttServices = SpringUtils . getBeansByAnnotation ( MqttService . class ) ;
}
return mqttServices;
}
public void getMqttTopicService ( Message < ? > message) {
String receivedTopic = ( String ) message. getHeaders ( ) . get ( MqttHeaders . RECEIVED_TOPIC) ;
if ( receivedTopic == null || "" . equals ( receivedTopic) ) {
return ;
}
for ( Map. Entry < String , Object > entry : getMqttServices ( ) . entrySet ( ) ) {
Class < ? > clazz = entry. getValue ( ) . getClass ( ) ;
Method [ ] methods = clazz. getDeclaredMethods ( ) ;
for ( Method method : methods) {
if ( method. isAnnotationPresent ( MqttTopic . class ) ) {
MqttTopic handleTopic = method. getAnnotation ( MqttTopic . class ) ;
if ( isMatch ( receivedTopic, handleTopic. value ( ) ) ) {
try {
method. invoke ( SpringUtils . getBean ( clazz) , message) ;
return ;
} catch ( IllegalAccessException e) {
e. printStackTrace ( ) ;
} catch ( InvocationTargetException e) {
log. error ( "执行 {} 方法出现错误" , handleTopic. value ( ) , e) ;
}
}
}
}
}
}
public static boolean isMatch ( String topic, String pattern) {
if ( ( topic == null ) || ( pattern == null ) ) {
return false ;
}
if ( topic. equals ( pattern) ) {
return true ;
}
if ( "#" . equals ( pattern) ) {
return true ;
}
String [ ] splitTopic = topic. split ( "/" ) ;
String [ ] splitPattern = pattern. split ( "/" ) ;
boolean match = true ;
for ( int i = 0 ; i < splitPattern. length; i++ ) {
if ( ! "#" . equals ( splitPattern[ i] ) ) {
if ( i >= splitTopic. length) {
match = false ;
break ;
}
if ( ! splitTopic[ i] . equals ( splitPattern[ i] ) && ! "+" . equals ( splitPattern[ i] ) ) {
match = false ;
break ;
}
} else {
break ;
}
}
return match;
}
}
对应的MqttService注解和MqttTopic注解
import org. springframework. core. annotation. AliasFor ;
import org. springframework. stereotype. Component ;
import java. lang. annotation. * ;
@Documented
@Target ( { ElementType . TYPE} )
@Retention ( RetentionPolicy . RUNTIME)
@Component
public @interface MqttService {
@AliasFor (
annotation = Component . class
)
String value ( ) default "" ;
}
import java. lang. annotation. ElementType ;
import java. lang. annotation. Retention ;
import java. lang. annotation. RetentionPolicy ;
import java. lang. annotation. Target ;
@Target ( ElementType . METHOD)
@Retention ( RetentionPolicy . RUNTIME)
public @interface MqttTopic {
String value ( ) default "" ;
}
MqttGateway 发送消息
import org. springframework. integration. annotation. MessagingGateway ;
import org. springframework. integration. mqtt. support. MqttHeaders ;
import org. springframework. messaging. handler. annotation. Header ;
import org. springframework. stereotype. Component ;
@Component
@MessagingGateway ( defaultRequestChannel = "mqttOutboundChannel" )
public interface MqttGateway {
void sendToMqtt ( @Header ( MqttHeaders . TOPIC) String topic, String data) ;
void sendToMqtt ( @Header ( MqttHeaders . TOPIC) String topic, @Header ( MqttHeaders . QOS) Integer qos, String data) ;
}
指定topic接收处理方法
@MqttService
@Slf4j
@RequiredArgsConstructor
public class MqttTopicHandle {
private final MqttGateway mqttGateway;
@MqttTopic ( "mqtt/face/basic" )
public void basic ( Message < ? > message) throws MqttException {
String receivedTopic = ( String ) message. getHeaders ( ) . get ( MqttHeaders . RECEIVED_TOPIC) ;
String payload = ( String ) message. getPayload ( ) ;
log. info ( "接收到的topic为:{},内容:{}" , receivedTopic, payload ) ;
mqttGateway. sendToMqtt ( topic, 0 , "收到消息!" ) ;
}
}