目录
- 1. 需求
- 2. 思路
- 3. 实现
- 3.1. 确保服务器上安装视频汇聚平台(iSecure Center)
- 3.2. 查看API网关是否安装成功
- 3.3. 分配身份认证信息AK/SK
- 3.4. 利用认证信息,python demo开发
1. 需求
海康视频汇聚平台(综合安防管理平台(iSecure Center)V2.1.0)的openAPI的demo均为c++/java代码,官方没有python代码。
需通过汇聚平台获得所有摄像头的设备编号,利用设备编号获得摄像头IP地址和rtsp地址。
2. 思路
根据海康开放平台官方文档《资源中心>综合安防管理平台(iSecure Center)>开发前准备》,通过平台获得摄像头信息,需遵循以下步骤:
- 确保服务器上安装视频汇聚平台(iSecure Center);
- 查看API网关是否安装成功;
- 分配身份认证信息AK/SK;
- 利用认证信息,撰写python demo
3. 实现
3.1. 确保服务器上安装视频汇聚平台(iSecure Center)
对接前需要先部署海康威视综合安防管理平台 iSecure Center
V1.5.100或更高版本的产品。
产品的安装过程请参考产品的《iSecure Center
综合安防管理平台 安装部署指南》,该文档可以在产品安装光盘中获取。
3.2. 查看API网关是否安装成功
-
登入运管中心,如下
-
进入“状态监控”页面,如下
-
左侧菜单搜索API网关,如下
-
在API网关的监控详情选项卡中查看服务的运行状态,如下
-
运行状态显示“正在运行”表示API网关安装成功,如下
3.3. 分配身份认证信息AK/SK
开发前需要先获取对接的身份认证信息,AK/SK——平台会通过AK/SK认证方式来验证请求发送者的身份。
获取接口调用权限,资源操作权限——平台会判断某个请求者是否有某个接口的调用权限,以及资源的操作权限。
在平台的OpenAPI管理中心新增合作方来生成AK/SK并分配权限。
- 进入运行管理中心,点击“状态监控”选项卡,搜索API网关,点击“API管理”,如下
2. 进入OpenAPI管理中心,点击“合作方管理”菜单,如下
3. 点击“创建合作方”,如下
4. 输入合作方名称和描述,如下
5. 配置合作方的参数,配置userId的值,userId的值对应平台内部的一个用户;如果使用admin的话,默认所有资源权限都有,如下
6. 配置合作方的参数,配置domainId的值,domainId对应对接平台所在的网域,如下
7. 配置合作方的参数,配置tagId的值,为保证接口兼容性,tagId的值请配为frs,如下
8. 配置完上述信息后,点击创建按钮,如下
9. 创建完新的合作方,进入合作方管理列表,选择其中一个合作方点击授权,如下
10. 对接口进行授权,如下
11. 接口授权完成后,到合作方列表,点击合作方进入详情页,如下
12. 在合作方详情页面获取AK/SK,合作方Key就是AK,合作方Secret就是SK,如下
3.4. 利用认证信息,python demo开发
- 获得artemis网关服务器ip端口 ,即API网关的ip地址,端口默认为443,如下
- 获得秘钥appkey和秘钥appSecret ,如下
- 获得设备编号的openAPI获取方式,需确定请求方式和Body内容,如下
请求:/api/irds/v2/deviceResource/resources
Body示例(这里为摄像头camera):
{
"pageNo": 1,
"pageSize": 100,
"resourceType": "camera"
}
- 获得指定设备编号的回放url的openAPI获取方式,需确定请求方式和Body内容,如下
请求:/api/video/v2/cameras/playbackURLs
Body示例:
{
"cameraIndexCode": "90ad77d8057c43dab140b77361606927",
"recordLocation": "0",
"protocol": "rtsp",
"transmode": 0,
"beginTime": "2017-06-15T00:00:00.000+08:00",
"endTime": "2017-06-18T00:00:00.000+08:00",
"uuid": "",
"expand": "streamform=rtp",
"streamform": "ps",
"lockType": 0
}
- python demo
主要参考博客《海康综合安防管理平台python版sdk》和《利用Python调用海康威视综合管理平台openAPI接口》
# coding=gb2312
import os
import base64
import json
import time
import uuid
import hmac # hex-based message authentication code 哈希消息认证码
import hashlib # 提供了很多加密的算法
import requests
import cv2
class OpenApiISC: # 调用视频汇聚平台的openAPI
def __init__(self, gatewayIp, appKey, appSecret):
self.gatewayIp = gatewayIp # API网关IP
self.appKey = appKey # 合作方key
self.appSecret = appSecret # 合作方秘钥
self.base_url = "https://{}:443".format(self.gatewayIp) # api网关ip和端口号
self.http_method = "POST"
self.ARTEMIS_PATH = "/artemis"
self.cameraDataDict = {} # 最后输出的摄像头数据字典
def init(self, api_get_address_url): # 每次执行操作,都要初始化
appKey = self.appKey
appSecret = self.appSecret
x_ca_nonce = str(uuid.uuid4())
x_ca_timestamp = str(int(round(time.time()) * 1000))
signature = self.generateSignature( appKey, appSecret, x_ca_nonce, x_ca_timestamp, api_get_address_url)
self.headers = {
"Accept": "*/*",
"Content-Type": "application/json",
"x-ca-key": appKey, # appKey,即 AK
"x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp",
"x-ca-signature": signature, # 需要计算得到的签名,此处通过后台得到
"x-ca-timestamp": x_ca_timestamp, # 时间戳
"x-ca-nonce": x_ca_nonce # UUID,结合时间戳防重复
}
def sign(self, key, value):
temp = hmac.new(key.encode(), value.encode(), digestmod=hashlib.sha256)
return base64.b64encode(temp.digest()).decode()
def generateSignature(self, appKey, appSecret, x_ca_nonce, x_ca_timestamp, api_get_address_url): # 生成签名值
# sign_str 的拼接很关键,不然得不到正确的签名
sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + appKey + "\nx-ca-nonce:" + \
x_ca_nonce + "\nx-ca-timestamp:" + x_ca_timestamp + "\n" + \
api_get_address_url
signature = self.sign(appSecret, sign_str)
print("[INFO] 获取到的签名值为:", signature)
return signature
def getDeviceIndexCodeListOperation(self): # 获取设备列表,可获得设备编码和名称
api_get_address_url = "{}{}".format(self.ARTEMIS_PATH, "/api/irds/v2/deviceResource/resources") #
self.init(api_get_address_url) # 获得设备编码的操作前的初始化
body = {
"pageNo": 1,
"pageSize": 100,
"resourceType": "camera"
}
url = self.base_url + api_get_address_url
results = requests.post(url, data=json.dumps(body), headers=self.headers, verify=False)
print(results.json())
for i in range(len(results.json()['data']['list'])): # 遍历所有设备
data = results.json()['data']['list'][i]
if data['resourceType'] != 'camera': continue # 仅考虑摄像头
cameradata = {} # 单个摄像头数据
cameradata.setdefault('indexCode', data['indexCode']) # 摄像头的设备编码
cameradata.setdefault('name', data['name']) # 摄像头的名称
cameradata.setdefault('regionIndexCode', data['regionIndexCode']) # 摄像头所属区域的编码
cameradata.setdefault('regionName', data['regionName']) # 摄像头所属区域的名称
self.cameraDataDict.setdefault(i, cameradata)
print("获取设备编码和名称:", self.cameraDataDict)
def getSingleRealtimeURL(self, indexCode): # 根据设备编码indexCode,获取摄像头的实时播放url
api_get_address_url = "{}{}".format(self.ARTEMIS_PATH, "/api/video/v2/cameras/previewURLs")
self.init(api_get_address_url) # 获得设备实时播放url的操作前的初始化
body = {
"cameraIndexCode": indexCode,
"streamType": 0,
"protocol": "rtsp",
}
url = self.base_url + api_get_address_url
results = requests.post(url, data=json.dumps(body), headers=self.headers, verify=False)
#print("根据设备编码{},获得实时播放url:{}".format(indexCode, results.json()['data']['url']))
return results.json()['data']['url']
def getSinglePlaybackURL(self, indexCode, beginTime, endTime, name): # 根据设备编码indexCode, 获取摄像头给定时间段内的回放播放url
if 'NVR' not in name: return '' # 只有nvr设备才有回放
api_get_address_url = "{}{}".format(self.ARTEMIS_PATH, "/api/video/v2/cameras/playbackURLs")
self.init(api_get_address_url) # 获得设备实时播放url的操作前的初始化
body = {
"cameraIndexCode": indexCode,
"recordLocation": "1",
"protocol": "rtsp",
"transmode": 0,
"beginTime": beginTime,
"endTime": endTime,
"expand": "streamform=rtp"
}
url = self.base_url + api_get_address_url
results = requests.post(url, data=json.dumps(body), headers=self.headers, verify=False)
print("根据设备编码{}和时间段,获得回放播放url:{}".format(indexCode, results.json()['data']))
return results.json()['data']['url']
def getCameraDataDictOperation(self, beginTime, endTime): # 根据设备编码,获得摄像头的实时播放url
for key, camerainfo in self.cameraDataDict.items(): # 遍历摄像头
indexCode = camerainfo['indexCode'] # 设备编码
name = camerainfo['name'] # 设备名称
url = self.getSingleRealtimeURL(indexCode) # 实时播放url
playback_url = self.getSinglePlaybackURL(indexCode, beginTime, endTime, name) # 回放信息
self.cameraDataDict[key].setdefault('url', url) # 添加url数据
self.cameraDataDict[key].setdefault('playback', {'beginTime': beginTime, 'endTime': endTime, 'url': playback_url}) # 添加回放url
print("根据设备编码列表,获得摄像头数据:", self.cameraDataDict)
if __name__ == '__main__':
gatewayIp = '' # 请填写自己的
appKey = "" # 请填写自己的
appSecret = "" # 请填写自己的
openAPIs = OpenApiISC(gatewayIp, appKey, appSecret)
openAPIs.getDeviceIndexCodeListOperation()
beginTime = "2023-05-23T13:44:04.000+08:00"
endTime = "2023-05-23T15:44:04.000+08:00"
openAPIs.getCameraDataDictOperation(beginTime, endTime)