物联网视频服务(LinkVisual)支持视频流上云、存储、转发、告警事件等视频基础能力,提供丰富的视频算法以及云边协同(算法云端训练、云端下发、边缘计算推理)服务。旨在帮助视频设备厂商、方案商与服务提供商,快速将存量或者新增的视频设备上云。
文章目录
- 前言
- 一、购买企业版实例
- 二、使用步骤
- 1.新增设备
- 2.服务器端开发
- 主动触发IPC设备拍摄图片并上传至云端
- 服务器端订阅AMQP
- 总结
前言
提示:主要记录物联网智能视频服务的接入过程:
后端服务主要是微服务Springcloud。
云平台主要是阿里云物联网智能视频服务-企业版实例
主要业务场景是,户外版4G摄像头,在无人观看是不产生上行流量,在有人观看是再进行推流查看直播画面,以及支持设备主动抓图。
提示:以下是本篇文章正文内容,下面案例可供参考
一、购买企业版实例
首先在物联网平台-购买企业版实例。下面的视频服务一定要开启
二、使用步骤
1.新增设备
主要在购买的实例上进行新增产品-新增设备,以及设备端烧录,设备端开发一般都摄像头厂商进行对接,此处不在进行记录。
2.服务器端开发
主要记录服务器端与阿里云平台的对接
主动触发IPC设备拍摄图片并上传至云端
官网文档api 可以参考官方对接文档,其他相关接口和此接口对接方式相同,不在粘贴代码了
/**
* 调用该接口查询IPC设备获取的图片信息。。
* @param iotInstanceId 实例id
* @param productKey 产品key
* @param deviceName 设备名称
* @param captureId 图片id
* @return
* @throws Exception
*/
public QueryDevicePictureFileResponseBody.QueryDevicePictureFileResponseBodyData queryDevicePictureFileWithOptions(String iotInstanceId, String productKey, String deviceName, String captureId) throws Exception {
// 调用该接口主动触发IPC设备拍摄图片并上传至云端
Client client = getClient();
QueryDevicePictureFileRequest queryLiveStreamingRequest = new QueryDevicePictureFileRequest();
queryLiveStreamingRequest.setDeviceName(deviceName);
queryLiveStreamingRequest.setProductKey(productKey);
queryLiveStreamingRequest.setIotInstanceId(iotInstanceId);
queryLiveStreamingRequest.setCaptureId(captureId);
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
try {
// 复制代码运行请自行打印 API 的返回值
log.info("调用该接口查询IPC设备获取的图片信息,入参:{}",JSON.toJSONString(queryLiveStreamingRequest));
QueryDevicePictureFileResponse queryDevicePictureFileResponse = client.queryDevicePictureFileWithOptions(queryLiveStreamingRequest, runtime);
log.info("调用该接口查询IPC设备获取的图片信息,结果:{}",JSON.toJSONString(queryDevicePictureFileResponse));
// {"body":{"code":"200","data":{"iotId":"Z50Qxs5BSuUoxO06mKRUi3rb00","picCreateTime":1672135564877,"picId":"WGJmWkYxU2dNaE5HSmpEWDJiQjY3T3ZnT2dNdl90LTlSVXhBVjU5SjJ5WS9lZmVmYzc0YmU2OTY0ZWY2YmVjNzg1MWM5MTA5YmY0Y18xNjcyMTM1NTY0ODc3","picUrl":"https://link-vision-picture-sh.oss-cn-shanghai.aliyuncs.com/XbfZF1SgMhNGJjDX2bB67OvgOgMv_t-9RUxAV59J2yY/efefc74be6964ef6bec7851c9109bf4c?Expires=1672139359&OSSAccessKeyId=LTAILduaCDAC561K&Signature=z3jXpII0Hjijbf8%2FGaJv%2FterhAw%3D","thumbUrl":"https://link-vision-picture-sh.oss-cn-shanghai.aliyuncs.com/XbfZF1SgMhNGJjDX2bB67OvgOgMv_t-9RUxAV59J2yY/efefc74be6964ef6bec7851c9109bf4c?Expires=1672139359&OSSAccessKeyId=LTAILduaCDAC561K&Signature=yZw6elXowc%2F2R3GQ753H8OR3E44%3D&x-oss-process=image%2Fauto-orient%2C1%2Fresize%2Cm_lfit%2Cw_400%2Climit_0%2Fquality%2Cq_90"},"requestId":"DCEAE36F-FBD2-5187-A035-317AFC1380A5","success":true},"headers":{"access-control-allow-origin":"*","date":"Tue, 27 Dec 2022 10:09:19 GMT","content-length":"866","x-acs-request-id":"DCEAE36F-FBD2-5187-A035-317AFC1380A5","connection":"keep-alive","content-type":"application/json;charset=utf-8","x-acs-trace-id":"853464dc94d2787fef2e9b5d5fc21556"},"statusCode":200}
QueryDevicePictureFileResponseBody body = queryDevicePictureFileResponse.getBody();
if(body.getSuccess()){
return body.getData();
}else {
log.error("调用该接口查询IPC设备获取的图片信息 有误,{}", JSON.toJSONString(queryDevicePictureFileResponse));
throw new ServiceWarnException(body.getErrorMessage() == null ? "查询IPC设备获取的图片有误,请联系管理员。" : body.getErrorMessage());
}
} catch (TeaException error) {
// 如有需要,请打印 error
log.error("调用该接口查询IPC设备获取的图片信息,{}", error.message);
} catch (Exception _error) {
TeaException error = new TeaException(_error.getMessage(), _error);
// 如有需要,请打印 error
log.error("调用该接口查询IPC设备获取的图片信息,{}", error.message);
}
return null;
}
private Client getClient() throws Exception {
Config config = new Config().setAccessKeyId(aliiotConfig.getAccessKeyId()).setAccessKeySecret(aliiotConfig.getAccessKeySecret());
config.endpoint = aliiotConfig.getEndpoint();
return new Client(config);
}
主要引入的包
import com.aliyun.iot20180120.models.BatchGetDeviceStateRequest;
import com.aliyun.iot20180120.models.BatchGetDeviceStateResponse;
import com.aliyun.iot20180120.models.BatchGetDeviceStateResponseBody;
import com.aliyun.linkvisual20180120.Client;
import com.aliyun.linkvisual20180120.models.*;
import com.aliyun.tea.TeaException;
import com.aliyun.teaopenapi.models.Config;
服务器端订阅AMQP
官方文档
官方文档介绍的AmqpClient.java,主要是main方法运行,肯定不适用咱们微服务启动的,所以还需要单独调整或者修改方式,接入自己的代码平台中。
改版后:
AmqpClient.java
package com.dindo.monitoring.mq;
import cn.hutool.core.util.IdUtil;
import com.dindo.monitoring.config.AliiotConfig;
import com.dindo.monitoring.mq.aliiot.JmsConnectionImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
/**
* @author GMaya
* @create 2022/12/28 17:26
* @Description 类描述: TODO 当正式环境使用时, 测试环境不在进行消息订阅,避免和正式环境消息订阅产生冲突
*/
@Slf4j
@Component
public class AmqpClient {
@Resource
private AliiotConfig aliiotConfig;
// 告警消息
private static String consumerGroupId_alarm = "xxxxxxxxx";
// 设备运行状态消息
private static String consumerGroupId_run = "xxxxxxxxx";
// 设备基本信息消费组
private static String consumerGroupId_info = "xxxxxxxxxxxx";
private static String clientId = IdUtil.simpleUUID();
private static int connectionCount = 1;
@Resource
@Qualifier("MessageListenerImpl")
private MessageListener messageListenerImpl;
@Resource
private JmsConnectionImpl jmsConnectionImpl;
@PostConstruct
public void init() throws Exception {
List<Connection> connections = new ArrayList<>();
List<String> consumerGroupIdList = new ArrayList<>();
consumerGroupIdList.add(consumerGroupId_alarm);
consumerGroupIdList.add(consumerGroupId_run);
consumerGroupIdList.add(consumerGroupId_info);
for (String consumerGroupId : consumerGroupIdList) {
//参数说明,请参见AMQP客户端接入说明文档。
for (int i = 0; i < connectionCount; i++) {
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + aliiotConfig.getAccessKeyId()
+ ",iotInstanceId=" + aliiotConfig.getIotInstanceId()
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + aliiotConfig.getAccessKeyId() + "×tamp=" + timeStamp;
String password = doSign(signContent, aliiotConfig.getAccessKeySecret(), signMethod);
String connectionUrl = "failover:(amqps://" + aliiotConfig.getAmqpHost() + ":5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Destination queue = (Destination) context.lookup("QUEUE");
// 创建连接。
Connection connection = cf.createConnection(userName, password);
connections.add(connection);
((JmsConnection) connection).addConnectionListener(jmsConnectionImpl);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListenerImpl);
}
}
}
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
}
JmsConnectionImpl.java
package com.dindo.monitoring.mq.aliiot;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.springframework.stereotype.Component;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.net.URI;
/**
* @author GMaya
* @create 2022/12/28 18:33
* @Description 类描述:
*/
@Slf4j
@Component
public class JmsConnectionImpl implements JmsConnectionListener {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
log.info("连接成功建立。, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
log.error("尝试过最大重试次数之后,最终连接失败, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
log.info("连接中断。, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
log.info("连接中断后又自动重连上, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
}
MessageListenerImpl.java
package com.dindo.monitoring.mq.aliiot;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
* @author GMaya
* @create 2022/12/28 18:30
* @Description 类描述: 消息消费类,订阅的消费组消息都会到此处
*/
@Component
@Slf4j
public class MessageListenerImpl implements MessageListener {
@Resource
private AliiotMessage aliiotMessage;
@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
// processMessage(message);
aliiotMessage.processMessage(message);
} catch (Exception e) {
log.error("阿里物联网消息推送处理失败", e);
}
}
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
log.info("获取的阿里iot推送的相关消息 message"
+ ",\n topic = " + topic
+ ",\n messageId = " + messageId
+ ",\n content = " + content);
} catch (Exception e) {
log.error("processMessage occurs error ", e);
}
}
}
具体的消费处理类
AliiotMessage.java
package com.dindo.monitoring.mq.aliiot;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.dindo.core.tool.utils.Func;
import com.dindo.monitoring.config.AliiotConfig;
import com.dindo.monitoring.constant.MonitorConstant;
import com.dindo.monitoring.entity.MonitorDeviceAliiotEntity;
import com.dindo.monitoring.mq.aliiot.entity.BasicInfoBean;
import com.dindo.monitoring.service.entity.IEntityMonitorDeviceAliiotService;
import com.dindo.monitoring.utils.AliYunInternetLinkVisualUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.List;
/**
* @author GMaya
* @create 2022/12/29 10:02
* @Description 类描述: 阿里iot物联网视频服务消息处理器
*
*/
@Service
@Slf4j
public class AliiotMessage {
/**
* 运行状态topic
*/
private static String runStats = "/as/mqtt/status";
/**
* 设备信息topic
*/
private static String devInfo = "/user/property/post";
/**
* 告警topic
*/
private static String eventIntell = "/thing/event/IntelligentAlarm/post";
@Resource
private IEntityMonitorDeviceAliiotService iEntityMonitorDeviceAliiotService;
@Resource
private AliiotConfig aliiotConfig;
@Resource
private AliYunInternetLinkVisualUtil aliYunInternetLinkVisualUtil;
/**
* 异步处理阿里云平台推送的相关消息
* @param message 消息
*/
@Async
public void processMessage(Message message) throws JMSException {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
log.info("获取的阿里iot推送的相关消息,topic={},messageId={},content={}",topic,messageId,content);
if(topic.contains(runStats)){
// 设备运行状态变更,在线或者离线
log.info("获取的阿里iot推送的相关消息,设备运行状态事件,topic={},messageId={},content={}",topic,messageId,content);
}
if(topic.contains(devInfo)){
// 设备基本信息
log.info("获取的阿里iot推送的相关消息,设备基本信息,topic={},messageId={},content={}",topic,messageId,content);
this.devInfo(content);
}
if(topic.contains(eventIntell)){
// 告警事件
log.info("获取的阿里iot推送的相关消息,告警事件,topic={},messageId={},content={}",topic,messageId,content);
}
}
public void devInfo(String content){
BasicInfoBean basicInfoBean = JSON.parseObject(content, BasicInfoBean.class);
List<MonitorDeviceAliiotEntity> list = iEntityMonitorDeviceAliiotService.list(Wrappers.<MonitorDeviceAliiotEntity>query().lambda()
.eq(MonitorDeviceAliiotEntity::getDeviceName, basicInfoBean.getDeviceName()));
if(Func.isEmpty(list)){
// 如果数据库不存在此设备,则进行新增
MonitorDeviceAliiotEntity deviceAliiot = new MonitorDeviceAliiotEntity();
deviceAliiot.setDeviceName(basicInfoBean.getDeviceName());
deviceAliiot.setProductKey(basicInfoBean.getProductKey());
deviceAliiot.setSerialNo(basicInfoBean.getSerialNo());
deviceAliiot.setIotInstanceId(aliiotConfig.getIotInstanceId());
deviceAliiot.setDeviceFactory(MonitorConstant.ALIYUN_INTERNET);
log.info("阿里物联网设备推送新增:{}", JSON.toJSONString(deviceAliiot));
iEntityMonitorDeviceAliiotService.save(deviceAliiot);
try {
// 调用该接口设置IPC设备的云存储图片生命周期。
aliYunInternetLinkVisualUtil.setDevicePictureLifeCycleWithOptions(aliiotConfig.getIotInstanceId(),basicInfoBean.getProductKey(),basicInfoBean.getDeviceName());
// 如果后续有需要设置录像,此处设置IPC设备的录像生命周期
// aliYunInternetLinkVisualUtil.setDeviceRecordLifeCycleRequest(aliiotConfig.getIotInstanceId(),basicInfoBean.getProductKey(),basicInfoBean.getDeviceName());
} catch (Exception e) {
log.info("设置过期时间失败:{}", JSON.toJSONString(basicInfoBean));
}
}
}
}
总结
综上只是服务器端对接云端SDK。部分功能的实现, 具体业务还需要根据自己平台的业务进行处理和变更。