亿可控_第2章_指标数据采集与断连监控

news2025/1/4 4:34:30

亿可控_第2章_指标数据采集与断连监控

文章目录

  • 亿可控_第2章_指标数据采集与断连监控
  • 第2章 指标数据采集与断连监控
    • 学习目标
    • 1. EMQ指标主题订阅
      • 1.1 Eclipse paho简介
      • 1.2 发送与订阅消息
        • 1.2.1 发送消息
        • 1.2.2 订阅消息
      • 1.3 订阅指标主题
        • 1.3.1 需求分析
        • 1.3.2 实现思路
        • 1.3.3 代码实现
    • 2.指标数据解析
      • 2.1 需求分析
      • 2.2 实现思路
      • 2.3 代码实现
    • 3. 告警判断与存储
      • 3.1 需求分析
      • 3.2 实现思路
      • 3.3 代码实现
        • 3.3.1 告警判断逻辑
        • 3.3.2 设备告警信息保存
    • 4. 设备断连监控
      • 4.1 需求分析
      • 4.2 EMQ WebHook介绍
      • 4.3 实现思路
      • 4.4 代码实现
        • 4.4.1 断连监控实现
        • 4.4.2 更新设备在线状态

第2章 指标数据采集与断连监控

学习目标

  • 能够完成EMQ指标主题订阅
  • 能够完成指标数据的解析
  • 能够完成告警判断与存储
  • 能够运用EMQ的webhook实现设备断连监控

1. EMQ指标主题订阅

1.1 Eclipse paho简介

Eclipse paho是eclipse基金会下面的一个开源项目,基于MQTT协议的客户端,用多种语言的实现。什么是MQTT?可以关注之前的EMQ课程,里面有详细介绍。 这几年的很火的物联网多是基于这个协议来通信的。

Eclipse paho支持的客户端语言很多,有java、Python、JavaScript、GoLang、C 、C++ 、C#等。

这里我们使用的是基于java语言版本的实现,这个版本的实现可以运行在JVM之上或者其他兼容于java的平台,比如安卓平台上。

Paho Java Client提供了两种API:

MqttAsyncClient:该API是完全基于异步来实现的,通过在启动时注册一个回调(callbacks),来实现消息的异步收发处理。

MqttClient:是基于同步的方式实现的消息收发处理,在亿可控项目中我们使用同步的方式来接收处理消息。

1.2 发送与订阅消息

1.2.1 发送消息

(1)添加和emq通信包paho的引用,paho不光能和emq通信,只要是基于mqtt协议实现的消息代理服务器,paho都能作为客户端和其进行通信。同时编写和mqtt通信的客户端代码。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

(2) 在consul中添加emq相关的配置:

emq:
  mqttServerUrl: tcp://192.168.200.128:1883

(3)在com.yikekong.config包下定义emq配置类:

package com.yikekong.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties("emq")
@Data
public class EmqConfig{
    private String mqttServerUrl;
}

(4)编写EMQ客户端类,新增连接方法

package com.yikekong.emq;

import com.yikekong.config.EmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
@Slf4j
public class EmqClient{
    @Autowired
    private EmqConfig emqConfig;//emq配置

    private MqttClient mqttClient;

    /**
     * 连接mqtt broker
     */
    public void connect(){
        try {
            mqttClient = new MqttClient(
                 emqConfig.getMqttServerUrl(),"monitor."+ UUID.randomUUID().toString());
          		mqttClient.connect();
        } catch (MqttException e) {
            log.error("mqtt creat error",e);
        }
    }

}

(5)编写发布消息的方法

    /**
     * 发布消息
     * @param topic 消息主题
     * @param msg 发送的消息
     */
    public void publish(String topic,String msg){
        try {
            MqttMessage mqttMessage = new MqttMessage(msg.getBytes());
            mqttClient.getTopic(topic).publish(mqttMessage);//向某主题发送消息
        } catch (MqttException e) {
            log.error("mqtt publish msg error",e);
        }
    }

测试:

(1)编写单元测试

@SpringBootTest
@RunWith(SpringRunner.class)
public class EmqTest {

    @Autowired
    private EmqClient emqClient;

    @Test
    public void testSend(){
        emqClient.connect();
        emqClient.publish("test_topic","test_content");
    }
}

(2)打开EMQ

http://192.168.200.128:18083 ,选择Tools下的Websocket进行测试

在这里插入图片描述

(3)连接并订阅主题
在这里插入图片描述

输入主题名称test_topic

在这里插入图片描述

(4)调用单元测试方法

在这里插入图片描述

测试后可以看到列表中有接收到的消息。

在这里插入图片描述

1.2.2 订阅消息

(1)EmqClient类新增方法,用于订阅主题

/**
     * 订阅主题
     * @param topicName
     * @throws MqttException
     */
public void subscribe(String topicName) throws MqttException {
  	mqttClient.subscribe(topicName);
}

(2)接收消息回调类: com.yikekong.emq包下创建消息接收处理类:

package com.yikekong.emq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yikekong.dto.*;
import com.yikekong.entity.GPSEntity;
import com.yikekong.es.ESRepository;
import com.yikekong.service.*;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
@Slf4j
public class EmqMsgProcess implements MqttCallback{

    @Override
    public void connectionLost(Throwable throwable) {
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        String payload = new String(mqttMessage.getPayload());
        System.out.println("接收到数据:"+payload);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

}

(3)修改EmqClient的connect方法,添加代码

mqttClient.setCallback(emqMsgProcess);

(4)编写监控类,启动后自动订阅主题

package com.yikekong.core;

import com.yikekong.emq.EmqClient;
import com.yikekong.service.QuotaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * 自动监控
 */
@Component
@Slf4j
public class Monitor{
  
    @Autowired
    private EmqClient emqClient;
  
    @PostConstruct
    public void init(){
        emqClient.connect();
        emqClient.subscribe("mytopic");
    }
}

测试:

(1)启动工程

(2)打开emq的websocket 工具, 连接,向mytopic主题发送消息

在这里插入图片描述

在这里插入图片描述

点击发送后,控制台可以显示出接收的消息
在这里插入图片描述

1.3 订阅指标主题

1.3.1 需求分析

亿可控实现功能:订阅指标配置中的主题。
在这里插入图片描述

1.3.2 实现思路

(1)系统启动时,获取所有的主题名称,循环调用订阅主题的方法。

(2)创建新的指标时,根据设置的主题名称订阅。

1.3.3 代码实现

(1)修改Monitor的init方法,实现启动时订阅所有指标配置的主题

@Component
@Slf4j
public class Monitor {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private QuotaService quotaService;

    @PostConstruct
    public void init(){
        System.out.println("初始化方法,订阅主题");
        emqClient.connect();
        quotaService.getAllSubject().forEach(s -> {
            try {
                emqClient.subscribe("$queue/"+s);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });
    }
}

需要注意的是在订阅主题的时候我们使用了共享队列来接收($queue/),主要考虑到的是客户端物联网设备的个数比较庞大,在同一时刻或一段时间之内,上报的消息量过于庞大,接收消息的地方会很容易被击垮。使用共享队列,天然的平滑支持了分布式部署,面对巨大的消息量我们只需要部署多份亿可控节点来接收消息并处理就行了,无需做任何复杂的负载均衡处理,这样对研发和部署成本是最低的。

(2)修改QuotaController的create方法,新增主题订阅代码

    @Autowired
    private EmqClient emqClient;

    /**
     * 创建指标
     * @param vo
     * @return
     */
    @PostMapping
    public boolean create(@RequestBody QuotaVO vo){
        try {
            QuotaEntity quotaEntity = new QuotaEntity();
            BeanUtils.copyProperties(vo,quotaEntity);
            emqClient.subscribe("$queue/"+quotaEntity.getSubject());//添加这句!

            return quotaService.save(quotaEntity);
        }catch (DuplicateKeyException e){
            throw new BussinessException("已存在该名称");
        } catch (MqttException e) {
            log.error("订阅主题失败",e);
            return false;
        }
    }

(3)丢失连接后再次连接和订阅,修改EmqMsgProcess的connectionLost方法

@Autowired
private QuotaService quotaService;

@Autowired
private EmqClient emqClient;

@Override
public void connectionLost(Throwable throwable) {
	log.info("emq connect lost");
  	//当连接丢失时再次连接emq
  	emqClient.connect();
    //重新订阅所有主题
  	quotaService.getAllSubject().forEach(s -> {
            try {
                subscribe("$queue/"+s);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });  
}

2.指标数据解析

2.1 需求分析

我们的系统通过后台接口在前端页面中添加的一些指标数据的配置定义,然后在接收到EMQ的数据之后,跟这些提前配置好的数据进行解析。

其中针对指标定义添加的产品页面如下:

在这里插入图片描述

  • 指标名称:对应将来在系统中需要展示的名称
  • 单位:主要是用来解析和保存设备报文中对应的指标数据的单位,比如:摄氏度、米这样的单位
  • 报文主题:对应mqtt协议中的报文主题,对应的主题数据就需要在系统中接收和解析处理
  • 指标值字段:报文中需要接收解析、处理的字段名称,根据该名称获取对应的值
  • 指标值数据类型:就是数据属于什么类型,类似编程语言的数据类型,有Double、String、Boolean等
  • 设备识别码字段:对应报文数据中设备编码的字段,主要用来区分设备,方便数据的存取
  • web hook:指标数据需要透传到的外部web接口地址
  • 安全值:主要用来显示指标数据安全值的一个范围

通过在系统中创建这样的指标数据,方便订阅EMQ主题来接收响应数据并进行后期处理。

2.2 实现思路

(1)定义用于封装设备和指标数据的DTO 。

报文有可能是一个设备多个指标

{
 "sn":"123456",
 "temp":1.2,
 "humi":50
}

(2)编写业务逻辑方法,接收参数为主题和报文map,根据主题提取字段名称,从报文map中提取数据。

(3)在接收报文后,调用此业务逻辑实现指标数据的解析。

2.3 代码实现

(1)创建用于存储指标数据的DTO,封装指标数据

package com.yikekong.dto;

import lombok.Data;

import java.io.Serializable;

/**
 * 指标DTO
 */
@Data
public class QuotaDTO implements Serializable{

    /**
     * 指标ID
     */
    private Integer id;

    /**
     * 指标名称
     */
    private String quotaName;

    /**
     * 单位
     */
    private String unit;

    /**
     * 报文主题
     */
    private String subject;

    /**
     * 指标值字段名称
     */
    private String valueKey;

    /**
     * 指标值数据类型
     */
    private String valueType;

    /**
     * 指标值(数值)
     */
    private Double value;

    /**
     * 指标值(非数值)
     */
    private String  stringValue;

    /**
     * 设备识别码字段(设备Id)
     */
    private String snKey;

    /**
     * web钩子地址
     */
    private String webhook;

    /**
     * 参考值
     */
    private String referenceValue;

    /**
     * 设备Id
     */
    private String deviceId;
}

(2)创建用于存储设备和指标列表的DTO

package com.yikekong.dto;
import lombok.Data;
import java.util.List;

@Data
public class DeviceInfoDTO {
    
    private DeviceDTO device;//设备
    
    private List<QuotaDTO> quotaList; //指标列表    
    
}

(3)在QuotaService接口里新增方法

/**
 * 解析报文
 * @param topic 主题名称
 * @param payloadMap 报文内容
 * @return 设备(含指标列表)
 */
DeviceInfoDTO analysis(String topic, Map<String, Object> payloadMap);

在QuotaServiceImpl实现类里实现该接口方法:

@Override
public DeviceInfoDTO analysis(String topic, Map<String, Object> payloadMap) {

    //1.获取指标配置
    List<QuotaEntity> quotaList = baseMapper.selectBySubject(topic);//根据主题查询指标列表
    if(quotaList.size()==0) return null;

    //2.封装设备信息
    String snKey=quotaList.get(0).getSnKey();
    if( Strings.isNullOrEmpty(snKey)  )  return null;

    String  deviceId = (String) payloadMap.get(snKey);//设备编号
    if( Strings.isNullOrEmpty(deviceId)  )  return null;

    DeviceDTO deviceDTO=new DeviceDTO();
    deviceDTO.setDeviceId(deviceId);

    //3.封装指标列表  :  循环我们根据主题名称查询得指标列表,到报文中提取,如果能够提到,进行封装
    List<QuotaDTO> quotaDTOList=Lists.newArrayList();
    for( QuotaEntity quota:quotaList ){
        String quotaKey = quota.getValueKey();//指标key
        if( payloadMap.containsKey(quotaKey) ){
            QuotaDTO quotaDTO=new QuotaDTO();
            //复制指标配置信息
            BeanUtils.copyProperties( quota, quotaDTO);
            quotaDTO.setQuotaName( quota.getName() );
            //指标值封装
            //指标分为两种  1.数值  2.非数值(string boolean)
            //1.数值   value 存储数值  stringValue :存储数值字符串
            //2.非数值  value 0   stringValue:内容
            //如果是非数值
            if( "String".equals(quotaDTO.getValueType()) || "Boolean".equals(quotaDTO.getValueType()) ){
                quotaDTO.setStringValue(  (String) payloadMap.get(quotaKey) );
                quotaDTO.setValue(0d);
            }else{//如果是数值
                if(  payloadMap.get(quotaKey)  instanceof String ){
                    quotaDTO.setValue( Double.valueOf(   (String) payloadMap.get(quotaKey)  ) );
                    quotaDTO.setStringValue( (String) payloadMap.get(quotaKey)  );
                }else{
                    quotaDTO.setValue( Double.valueOf( payloadMap.get(quotaKey) +"" )  );
                    quotaDTO.setStringValue( quotaDTO.getValue()+"" );
                }
                quotaDTO.setDeviceId( deviceId );
            }
            quotaDTOList.add(quotaDTO);
        }
    }

    //4.封装设备+指标列表返回
    DeviceInfoDTO deviceInfoDTO=new DeviceInfoDTO();
    deviceInfoDTO.setDevice(deviceDTO);
    deviceInfoDTO.setQuotaList(quotaDTOList );

    return deviceInfoDTO;
}

编写单元测试类进行测试

import com.fasterxml.jackson.core.JsonProcessingException;
import com.yikekong.YkkApplication;
import com.yikekong.dto.DeviceInfoDTO;
import com.yikekong.service.QuotaService;
import com.yikekong.util.JsonUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest(classes = YkkApplication.class)
@RunWith(SpringRunner.class)
public class TestQuota {

    @Autowired
    private QuotaService quotaService;

    @Test
    public void testAnalysis(){

        Map map=new HashMap<>();
        map.put("sn","123456"); 
        map.put("temp",1.2);// 也测试一下 map.put("temp","1.2");
        DeviceInfoDTO deviceInfoDTO = quotaService.analysis("temperature", map);
        String json = null;
        try {
            json = JsonUtil.serialize(deviceInfoDTO);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        System.out.println(json);
    }
}

3. 告警判断与存储

3.1 需求分析

我们需要配置相关指标的报警数据,当设备上报的数据达到报警阈值之后,更新设备的告警状态。

在这里插入图片描述

3.2 实现思路

(1)在告警服务类中编写方法,封装告警判断逻辑,返回的设备信息中包含告警信息

(2)将包含有告警信息的设备信息保存至elasticsearch中。

3.3 代码实现

3.3.1 告警判断逻辑

(1)AlarmService新增方法定义,根据指标信息返回告警信息

/**
 * 根据指标判断告警信息
 * @param quotaDTO
 */
AlarmEntity verifyQuota(QuotaDTO quotaDTO);

AlarmServiceImpl实现此方法

@Override
public AlarmEntity verifyQuota(QuotaDTO quotaDTO) {

    //1.根据指标id查询告警判断规则列表
    List<AlarmEntity> alarmEntityList = getByQuotaId(quotaDTO.getId());
    AlarmEntity alarm=null;
    for( AlarmEntity alarmEntity:alarmEntityList ){
        //判断:操作符和指标对比
        if( "String".equals( quotaDTO.getValueType() ) || "Boolean".equals(quotaDTO.getValueType())  ){
            if(  alarmEntity.getOperator().equals("=")  &&  quotaDTO.getStringValue().equals(alarmEntity.getThreshold()) ){
                alarm=alarmEntity;
                break;
            }
        }else //数值
        {
            if(  alarmEntity.getOperator().equals(">")  &&  quotaDTO.getValue()>alarmEntity.getThreshold() ){
                alarm=alarmEntity;
                break;
            }
            if(  alarmEntity.getOperator().equals("<")  &&  quotaDTO.getValue()<alarmEntity.getThreshold() ){
                alarm=alarmEntity;
                break;
            }
            if(  alarmEntity.getOperator().equals("=")  &&  quotaDTO.getValue().equals(alarmEntity.getThreshold()) ){
                alarm=alarmEntity;
                break;
            }
        }
    }
    return alarm;
}

(2)QuotaDTO新增用于封装告警信息的属性

private String alarm;//是否告警

private String alarmName;// 告警名称

private String level;//告警级别

private String alarmWebhook;//告警web钩子

private Integer cycle;//沉默周期

(3)AlarmService新增方法定义,根据设备信息返回告警信息

/**
 * 根据设备信息判断
 * @param deviceInfoDTO
 */
DeviceInfoDTO verifyDeviceInfo(DeviceInfoDTO deviceInfoDTO);

AlarmServiceImpl实现此方法

@Override
public DeviceInfoDTO verifyDeviceInfo(DeviceInfoDTO deviceInfoDTO) {

    // 封装指标的告警  封装设备的告警
    DeviceDTO deviceDTO = deviceInfoDTO.getDevice();

    deviceDTO.setLevel(0);//假设不告警
    deviceDTO.setAlarm(false);
    deviceDTO.setAlarmName("正常");
    deviceDTO.setStatus(true);
    deviceDTO.setOnline(true);

    for(QuotaDTO quotaDTO :deviceInfoDTO.getQuotaList() ){

        AlarmEntity alarmEntity = verifyQuota(quotaDTO);//根据指标得到告警信息
        if(alarmEntity!=null){  //如果指标存在告警

            quotaDTO.setAlarm("1");
            quotaDTO.setAlarmName( alarmEntity.getName() );//告警名称
            quotaDTO.setLevel( alarmEntity.getLevel()+"" );//告警级别
            quotaDTO.setAlarmWebHook(alarmEntity.getWebHook());//告警web钩子
            quotaDTO.setCycle( alarmEntity.getCycle() );//沉默周期
		   //存储设备告警信息
            if(alarmEntity.getLevel().intValue()> deviceDTO.getLevel().intValue() ){
                deviceDTO.setLevel( alarmEntity.getLevel() );
                deviceDTO.setAlarm(true);
                deviceDTO.setAlarmName(alarmEntity.getName());
            }

        }else{//如果指标不存储在告警
            quotaDTO.setAlarm("0");
            quotaDTO.setAlarmName("正常");
            quotaDTO.setLevel("0");
            quotaDTO.setAlarmWebHook("");
            quotaDTO.setCycle(0);
        }
    }
    return deviceInfoDTO;
}

单元测试:

(1)添加告警数据用于测试

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6c3cbDWb-1666737764579)(images/2-17.png)](2)编写单元测试代码,修改TestQuota

import com.fasterxml.jackson.core.JsonProcessingException;
import com.yikekong.YkkApplication;
import com.yikekong.dto.DeviceInfoDTO;
import com.yikekong.service.AlarmService;
import com.yikekong.service.QuotaService;
import com.yikekong.util.JsonUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;

@SpringBootTest(classes = YkkApplication.class)
@RunWith(SpringRunner.class)
public class TestQuota {

    @Autowired
    private QuotaService quotaService;

    @Autowired
    private AlarmService alarmService;

    /**
     * 测试报文解析(告警)
     */
    @Test
    public void testAnalysis(){

        Map map=new HashMap<>();
        map.put("sn","123456");
        map.put("temp",12);
        DeviceInfoDTO deviceInfoDTO = quotaService.analysis("temperature", map);
        //告警信息封装
        DeviceInfoDTO deviceInfoDTO1 = alarmService.verifyDeviceInfo(deviceInfoDTO);
        String json = null;
        try {
            json = JsonUtil.serialize(deviceInfoDTO1);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        System.out.println(json);
    }
}

3.3.2 设备告警信息保存

(1)DeviceService新增方法定义

/**
 * 存储设备信息
 * @param deviceDTO
 * @return
 */
boolean saveDeviceInfo(DeviceDTO deviceDTO);

DeviceServiceImpl实现此方法

@Override
public boolean saveDeviceInfo(DeviceDTO deviceDTO) {
    //查询设备 ,判断开关状态 ,如果是关闭则不处理
    DeviceDTO device= findDevice(deviceDTO.getDeviceId());
    if( device!=null && !device.getStatus() ) return false;

    // 如果当前设备查不到,新增
    if(device==null){
        esRepository.addDevices( deviceDTO );
    }else{
        //如果可以查询到,更新告警信息
        esRepository.updateDevicesAlarm(deviceDTO);
    }
    return true;
}

(2)修改EmqMsgProcess的messageArrived方法,实现告警判断和保存设备信息

@Autowired
private AlarmService alarmService;

@Autowired
private DeviceService deviceService;

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
    String payload = new String(mqttMessage.getPayload());
    ObjectMapper mapper = new ObjectMapper();
    Map<String, Object> payloadMap = mapper.readValue(payload, Map.class);
    System.out.println("接收到数据:"+payloadMap);
    //解析数据
    DeviceInfoDTO deviceInfoDTO = quotaService.analysis(topic, payloadMap);
    if(deviceInfoDTO!=null){
        //告警判断
        deviceInfoDTO= alarmService.verifyDeviceInfo(deviceInfoDTO);  //返回包含了告警判断的对象
        //保存设备信息
        deviceService.saveDeviceInfo(deviceInfoDTO.getDevice());
    }
}

测试:

(1)启动亿可控

(2)用kibana查看当前设备的告警信息

(3)通过emq模拟发送报文

4. 设备断连监控

4.1 需求分析

在系统中要对设备的断网情况进行监控,发现设备断网后更新设备状态。我们可以使用EMQ的webHook来实现。

4.2 EMQ WebHook介绍

WebHook是由emqx_web_hook插件提供的将EMQ X中的钩子时间通知到某个Web服务的功能。WebHook的内部实现是基于EMQ X内部的钩子,它通过钩子上挂载的回调函数获取到EMQ X中的各种事件,并转发到emqx_web_hook中配置的Web服务器接口中。

在这里插入图片描述

WebHook 对于事件的处理是单向的,它仅支持将 EMQ X 中的事件推送给 Web 服务,并不关心 Web 服务的返回。 借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。

Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf

配置项类型可取值默认值说明
api.urlstring-http://127.0.0.1:8080事件需要转发的目的服务器地址
encode_payloadenumbase64, base62undefined消息类事件中的 Payload 字段进行编码,注释或其他则表示不编码

说明:当消息内容是不可见字符(如二进制数据)时,为了能够在 HTTP 协议中传输,使用 encode_payload 是十分有用的。

配置触发规则:

etc/plugins/emqx_web_hooks.conf 可配置触发规则,其配置的格式如下:

## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

Event 触发事件:

目前支持以下事件:

名称说明执行时机
client.connect处理连接报文服务端收到客户端的连接报文时
client.connack下发连接应答服务端准备下发连接应答报文时
client.connected成功接入客户端认证完成并成功接入系统后
client.disconnected连接断开客户端连接层在准备关闭时
client.subscribe订阅主题收到订阅报文后,执行 client.check_acl 鉴权前
client.unsubscribe取消订阅收到取消订阅报文后
session.subscribed会话订阅主题完成订阅操作后
session.unsubscribed会话取消订阅完成取消订阅操作后
message.publish消息发布服务端在发布(路由)消息前
message.delivered消息投递消息准备投递到客户端前
message.acked消息回执服务端在收到客户端发回的消息 ACK 后
message.dropped消息丢弃发布出的消息被丢弃后

Number

同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。

Rule

触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:

  • action:字符串,取固定值
  • topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发

例如,我们只将与 a/b/cfoo/# 主题匹配的消息转发到 Web 服务器上,其配置应该为:

web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

这样 Webhook 仅会转发与 a/b/cfoo/# 主题匹配的消息,例如 foo/bar 等,而不是转发 a/b/dfo/bar

开启WebHook

在EMQ中,点击菜单Plugins ,找到 EMQ X WebHook Plugin,点击后边的start开启

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZvIMjlLm-1666737764580)(images/2-12.png)]

4.3 实现思路

(1)在EMQ中配置webhook针对客户端断连情况的节点,指向亿可控系统的接收地址

(2)开发亿可控接收断连接口来接收EMQ传入的数据

4.4 代码实现

4.4.1 断连监控实现

(1)在亿可控项目编写接收EMQ传入的数据。在DeviceController中添加如下方法:

/**
 * 接收设备断连信息
 * @param param
 */
@PostMapping("/clientAction")
public void clientAction(@RequestBody  Map<String,String> param){
    System.out.println(param);
}

(2)修改AuthFilter的doFilter方法,添加放行地址:

//如果是设备断链监控
if(path.equals("/device/clientAction")){
  	filterChain.doFilter(servletRequest, servletResponse);
 	return;
}

(3)编辑EMQ的配置文件etc/plugins/emqx_web_hook.conf将亿可控的接收地址配置到web.hook.api.url,然后只打开客户端连接和断开的触发规则,具体配置如下:

##====================================================================
## WebHook
##====================================================================

## The web services URL for Hook request
##
## Value: String
##亿可控接收地址
web.hook.api.url = http://192.168.3.4:9093/device/clientAction

## Encode message payload field
##
## Value: base64 | base62
## web.hook.encode_payload = base64

##--------------------------------------------------------------------
## Hook Rules

## These configuration items represent a list of events should be forwarded
##
## Format:
##   web.hook.rule.<HookName>.<No> = <Spec>
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}

注意:192.168.3.4是我windows宿主机的ip ,同学们需要在windows宿主机执行ipconfig获得此ip

(4)重新启动emq容器

测试:使用EMQ的Websocket工具,点击连接和断开连接,观测控制台的日志输出

{username=, proto_ver=4, keepalive=60, ipaddress=192.168.200.1, connected_at=1599791080405, clientid=mqttjs_11aebe9694, action=client_connected}
{username=, reason=normal, clientid=mqttjs_11aebe9694, action=client_disconnected}

上面的结果中,clientid就是设备的id,action是发生的事件:client_connected表示连接 client_disconnected表示断开连接

4.4.2 更新设备在线状态

(1)DeviceService新增方法定义

/**
 * 更新在线状态
 * @param deviceId
 * @param online
 */
void updateOnline(String deviceId, Boolean online);

DeviceServiceImpl类实现此方法

/**
 * 更新
 * @param deviceId
 * @param online
 */
@Override
public void updateOnline(String deviceId, Boolean online){
    //以webclient开头的client为系统前端,monitor开头的是亿可控服务端
    if(deviceId.startsWith("webclient") || deviceId.startsWith("monitor")){
        return;
    }
    //更新数据到es
    DeviceDTO deviceDTO = findDevice(deviceId);
    if(deviceDTO == null) return;

    deviceDTO.setOnline(online);
    esRepository.updateOnline(deviceId,online);
}

(2)修改DeviceController的clientAction方法

/**
 * 接收设备断连信息
 * @param param
 */
@PostMapping("/clientAction")
public void clientAction(@RequestBody  Map<String,String> param){
    System.out.println(param);
    String deviceId = param.get("clientid");  //提取设备id
    if( param.get("action").equals("client_connected") ){ //如果是联网
        deviceService.updateOnLine(deviceId,true);
    }
    if( param.get("action").equals("client_disconnected") ){ //如果是断网
        deviceService.updateOnLine(deviceId,false);
    }
}

测试:

(1)启动亿可控项目

(2)使用kibana查询现有的设备数据

(3)使用EMQ 的Websocket工具模拟设备的连接与断开,clientID就是设备id

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IrmQtdYL-1666737764580)(images/2-18.png)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/818.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

23模式---单例模式

单例模式&#xff0c;属于创建类型的一种常用的软件设计模式。通过单例模式的方法创建的类在当前进程中只有一个实例&#xff08;根据需要&#xff0c;也有可能一个线程中属于单例&#xff0c;如&#xff1a;仅线程上下文内使用同一个实例&#xff09; 这个也是23设计模型中最…

Python 图像处理OpenCV:几何变换(笔记)

包括图像缩放、图像平移、图像旋转、图像的仿射变换、图像的透射变换及图像金字塔等内容。 图像缩放&#xff1a; 缩放是对图像的大小进行调整&#xff0c;即使图像放大或缩小。cv2.resize(src,dsize,fx0,fy0,interpolationcv2.INTER_LINEAR)src : 输入图像dsize: 绝对尺寸&a…

Linux运维面试题总结—Linux基础、计算机网络基础

文章目录一、三次握手四次挥手二、如何划分vlan三、为什么划分vlanvlan三个模式&#xff1a;vxlan和vlan区别是什么&#xff1f;四、OSI七层模型及对应协议五、Linux中 查找大于10M的文件并删除六、查看cup占用情况&#xff0c;查看内存&#xff0c;查看磁盘IO使用情况&#xf…

图像处理黑科技——弯曲矫正、去摩尔纹、切边增强、PS检测

目录0 前言1 弯曲矫正2 去摩尔纹3 图像切边增强4 PS检测5 总结0 前言 合合信息是行业领先的人工智能及大数据科技企业&#xff0c;专注文字识别领域16年&#xff0c;在智能文字识别及商业大数据等核心领域处于国内领先地位&#xff0c;全球企业和个人用户提供创新的数字化、智…

代码随想录动态规划——一和零

题目 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集的大小&#xff0c;该子集中 最多 有 m 个 0 和 n 个 1 。 如果 x 的所有元素也是 y 的元素&#xff0c;集合 x 是集合 y 的 子集 。 示例 1&#xff1a; 输入&#xff1a;strs [“10…

Oracle Unifier 系统架构简述(安装部署)

关于Oracle Primavera Unifier 的应用架构&#xff0c;其实在我之前的博客已有介绍相关内容 谈谈 Oracle P6 , Unifier 和其他应用系统间的联系https://campin.blog.csdn.net/article/details/104972949 从官方文档方面&#xff0c;其实在《unifier_performance_and_sizing_g…

【C语言】全面解析数据在内存中的存储

文章目录前言类型的基本分类整型浮点数自定义类型整型在内存中的存储原码、反码、补码大端和小端如何判断编译器是大端还是小端浮点数在内存中的存储总结前言 C语言中有char、short、int、long、long long、float和doubole这些数据类型。这些数据类型也叫内置类型。 所占存储空…

JECloud微服务低代码平台重大发布。

JECloud微服务低代码平台【1.0.0】版升级发布&#xff0c;本次发布内容如下&#xff1a; JECloud微服务低代码平台是一款基于元数据领域模型构建的低代码开发平台&#xff0c;其底层采用微服务与微应用构建底层框架&#xff0c;并基于基础框架构建各核心微服务模块来实现低代码…

手撕七大排序 (四)

归并排序归并排序递归写法一. 基本概念二. 图解归并排序递归写法 一. 基本概念 归并排序&#xff08;MERGE-SORT&#xff09;是建立在归并操作上的一种有效的排序算法,该算法是采用分治法&#xff08;Divide andConquer&#xff09;的一个非常典型的应用。将已有序的子序列合并…

数据结构与算法——链表(双向链表,顺序表与链表的比较)

&#x1f353;个人主页&#xff1a;bit.. &#x1f352;系列专栏&#xff1a;Linux(Ubuntu)入门必看 C语言刷题 数据结构与算法 目录 一.双向链表 二.双向链表的对称性&#xff1a;&#xff08;设指针p指向某一结点&#xff09; 1.双向链表的插入 2.双向链表的删除 …

浏览器中的页面循环系统

【】渲染进程中有个主线程处理安排好的任务&#xff0c;为了在线程运行过程中接收并处理新任务&#xff0c;引入了事件循环机制&#xff1b;为了处理其他线程发送来的任务&#xff0c;引入消息队列&#xff1b;为了处理其他进程发送来的任务&#xff0c;渲染进程专门有一个IO线…

【数据结构初阶】二、顺序表的实现

目录 一、线性表 二、顺序表 2.1 顺序表概念及结构 2.2 顺序表接口实现 2.2.1 顺序表初始化 2.2.2 顺序表的销毁 2.2.3 顺序表的打印 2.2.4 顺序表增加数据&#xff08;插入&#xff0c;头插、尾插&#xff09; 2.2.5 顺序表删除数据&#xff08;删除&#xff0c;头删…

Uniapp集成腾讯IM+音视频通话

腾讯IM(包含界面)源码下载相关配置 传送门&#xff1a;https://cloud.tencent.com/document/product/269/36887 传送门&#xff1a;https://github.com/TencentCloud/TIMSDK/tree/master/uni-app vue2 vue3都可 笔者用的vue2 解压文件 拖到编辑器 #项目右键 在命令行窗口打开 …

Promethues原理详解

目录 引言 一、Prometheus概念 1.1、什么是Prometheus 1.2、Zabbix和Prometheus区别 1.3、Prometheus的特点 二、运维监控平台设计思路 三、Prometheus监控体系 3.1、系统层监控&#xff08;需要监控的数据&#xff09; 3.2、中间件及基础设施类监控 3.3、应用层监控…

语音合成经典模型结构介绍

(以下内容搬运自 PaddleSpeech) Models introduction TTS system mainly includes three modules: Text Frontend, Acoustic model and Vocoder. We introduce a rule-based Chinese text frontend in cn_text_frontend.md. Here, we will introduce acoustic models and voc…

XDataverse免费的统一数据库管理工具

XDataverse产品简介 XDataverse是一款通用的数据库管理工具&#xff0c;主要管理关系型数据库&#xff0c;同时也支持一些其余类型的数据库&#xff0c;比如Redis。其主要功能有 支持主流关系型数据库的常规操作&#xff0c;比如MySQL,SQLServer,SQlite,SQLCE&#xff0c;Postg…

机器学习 逻辑回归(2)softmax回归多类别分类-鸢尾花案例

机器学习 逻辑回归之softmax回归多类别分类-鸢尾花案例一、前言二、假设函数三、One-Hot 独热编码四、代价函数五、梯度下降六、原生代码实现6.1 加载并查看数据6.2 添加前置与数据分割6.3 迭代训练6.4 验证数据七、sklearn代码实现八、参考资料PS&#xff1a;softmax回归损失函…

[时间序列预测]基于BP、LSTM、CNN-LSTM神经网络算法的单特征用电负荷预测[保姆级手把手教学]

系列文章目录 深度学习原理-----线性回归梯度下降法 深度学习原理-----逻辑回归算法 深度学习原理-----全连接神经网络 深度学习原理-----卷积神经网络 深度学习原理-----循环神经网络&#xff08;RNN、LSTM&#xff09; 时间序列预测-----基于BP、LSTM、CNN-LSTM神经网络…

安卓开发Android studio学习笔记14:用户注册登录(案例演示)

Android studio学习笔记第一步&#xff1a;配置activity_information.xml第二步&#xff1a;配置activity_registration.xml第三步&#xff1a;配置strings.xml第四步&#xff1a;配置InformationActivity第五步&#xff1a;配置RegistrationActivity第六步&#xff1a;运行结果…

二叉搜索树

文章目录二叉搜索树1. 概念2. 模拟实现二叉搜索树2.1 准备工作 创建类2.2 查找方法2.3 插入方法2.4 删除方法3. 性能分析二叉搜索树 前言 &#xff1a; 1. 概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不…