代码链接
设备发现
上一篇文章说过,设备的发现有两种情况,主动和被动,下面我们来用java实现这两种模式
主动发现
构建一个UDP请求发送到239.255.255.250:1900获取设备信息,UDP包的内容和http一样
等待响应,当接收到一个完整的响应包后,将数据包封装成设备对象SSDPRespBO
private void receiveSSDP(DatagramSocket udpSocket, Consumer<SSDPRespBO> consumer) throws IOException {
long time;
int resIndex = 0;
byte[] res = new byte[1024];
byte[] data = new byte[1024];
long endTime = System.currentTimeMillis() + timeout;
//一次从socket内核缓冲区复制到进程缓冲的最大字节数
DatagramPacket dp = new DatagramPacket(data, data.length);
while ((time = endTime - System.currentTimeMillis()) > 0) {
udpSocket.setSoTimeout((int) time);
udpSocket.receive(dp);
//本次接收到的数据的实际长度(<=DatagramPacket第二个构造参数)从索引0开始覆盖data数组
int length = dp.getLength();
for (int i = 0; i < length; i++) {
if (resIndex == res.length) {
//如果res数组已经满了需要进行扩容
res = ArrayExtraUtil.byteExpansion(res, 1024);
}
res[resIndex++] = data[i];
if (NetUtil.headerEnd(res, resIndex)) {
String str = new String(res, 0, resIndex);
consumer.accept(buildSSDPResp(str));
//一个响应结束后重置数组以接收其他设备服务的响应
resIndex = 0;
res = new byte[1024];
}
}
//设置下次读取的最大长度,否则会使用上次接收到的字节长度,receive会设置length属性
dp.setLength(data.length);
}
}
private SSDPRespBO buildSSDPResp(String resp) {
String[] respArray = resp.split("\r\n");
if (!respArray[0].contains(" 200 OK")) {
log.error("响应失败:{}", resp);
return null;
}
SSDPRespBO ssdpRespBO = new SSDPRespBO();
buildSSDPResp(Arrays.stream(respArray), ssdpRespBO);
return ssdpRespBO;
}
然后将SSDPRespBO提交给线程池去获取设备描述文档
根据设备描述文档地址去请求文档,这个地址是http地址,直接通过get请求就可以了
private void setDeviceDesc(SSDPRespBO ssdpRespBO, List<DeviceDescBO> list) {
if (ssdpRespBO != null) {
String location = ssdpRespBO.getLocation();
Result<DeviceDescBO> result = deviceService.getDeviceDesc(location);
if (result.isSuccess()) {
DeviceDescBO deviceDescBO = result.getData();
deviceDescBO.setUrl(location);
list.add(deviceDescBO);
}
}
}
@Override
public Result<DeviceDescBO> getDeviceDesc(String desUrl) {
HttpRespBO httpRespBO = httpGet(desUrl);
return Optional.ofNullable(httpRespBO).map(this::buildDeviceDesc)
.map(Result::success).orElseGet(() -> Result.fail(ResultEnum.GET_DEVICE_DESC_FAIL));
}
然后将http返回的内容组装成设备描述对象DeviceDescBO
//构建设备的描述和其服务列表信息
private DeviceDescBO buildDeviceDesc(HttpRespBO httpRespBO) {
try {
if (!httpRespBO.ok()) {
log.error("设备描述响应错误:{}", JSON.toJSONString(httpRespBO));
return null;
}
String xml = httpRespBO.getUTF8Body();
DeviceDescBO deviceDescBO = new DeviceDescBO();
deviceDescBO.setServiceList(new ArrayList<>());
Document doc = DocumentHelper.parseText(xml);
Element rootElt = doc.getRootElement();
Element recordEle = rootElt.element("device");
Element serviceList = recordEle.element("serviceList");
Iterator<?> iterator = serviceList.elementIterator("service");
deviceDescBO.setDeviceType(recordEle.elementTextTrim("deviceType"));
deviceDescBO.setFriendlyName(recordEle.elementTextTrim("friendlyName"));
while (iterator.hasNext()) {
ServiceBO serviceVO = new ServiceBO();
deviceDescBO.getServiceList().add(serviceVO);
Element serviceElement = (Element) iterator.next();
serviceVO.setScpDUrl(serviceElement.elementTextTrim("SCPDURL"));
serviceVO.setServiceId(serviceElement.elementTextTrim("serviceId"));
serviceVO.setControlUrl(serviceElement.elementTextTrim("controlURL"));
serviceVO.setServiceType(serviceElement.elementTextTrim("serviceType"));
serviceVO.setEventSubUrl(serviceElement.elementTextTrim("eventSubURL"));
}
return deviceDescBO;
} catch (DocumentException e) {
log.error("设备描述响应解析失败:{}", JSON.toJSONString(httpRespBO), e);
return null;
}
}
并将其加入设备描述对象列表中,返回给调用方
整个发现过程持续5秒,在这5秒内持续阻塞等待组播返回符合条件的设备。这个时间可以在application.yml中指定ssdp.timeout
被动发现
构建一个服务加入组播,监听服务上线和下线事件,设备上线或下线,会发送UDP到组播中,所有加入到组播的服务会收到这个UDP请求,这个请求的内容和上面主动发现的响应内容差不多,所以我们接受请求数据的方法和主动发现用的是同一个都是receiveSSDP
private void runNotify() {
log.info("ssdp notify监听开始");
//构建一个服务加入组播,监听服务上线和下线事件
try (MulticastSocket socket = new MulticastSocket(1900)) {
socket.joinGroup(InetAddress.getByName("239.255.255.250"));
while (!Thread.currentThread().isInterrupted()) {
receiveSSDP(socket, this::runNotify);
}
} catch (Exception e) {
log.error("ssdp notify异常", e);
} finally {
log.info("ssdp notify监听结束");
}
}
//notifyDeviceList只有一个线程操作,没有并发问题
private void runNotify(SSDPRespBO ssdpRespBO) {
if (ssdpRespBO != null) {
String nts = ssdpRespBO.getNts();
String url = ssdpRespBO.getLocation();
SSDPStEnum nt = SSDPStEnum.getEnumByType(ssdpRespBO.getNt());
if (nts.equals("ssdp:alive") && notifyServiceTypes.contains(nt) &&
notifyDeviceList.stream().map(DeviceDescBO::getUrl).noneMatch(url::equals)) {
setDeviceDesc(ssdpRespBO, notifyDeviceList);
}
if (nts.equals("ssdp:byebye")) {
notifyDeviceList.removeIf(deviceDescBO -> deviceDescBO.getUrl().equals(url));
}
}
}
接收完一个完整的包后,如果是设备上线,则和主动发现一样执行setDeviceDesc方法,加入设备描述对象列表中
如果是设备下线,将设备从设备描述对象列表中移除
设备控制
其实这个设备控制,只需要向控制地址发送soap请求即可,在homer-service/src/main/resources/upnp/action/目录下保存了xml的模版,发送soap请求的时候只需要将模版中的参数占位符替换成实际的值即可,在UPNPActionEnum中设置了模版的地址和获取模版内容的方法
@Getter
@AllArgsConstructor
public enum UPNPActionEnum {
PLAY("upnp/action/play.xml", "播放资源"),
SET_URI("upnp/action/set_uri.xml", "设置播放资源url"),
URI_METADATA("upnp/action/uri_metadata.xml", "播放资源元数据");
private String path;
private String desc;
public String getXmlText() {
return fileTextCache.get(path);
}
}
对模版内容做了一个本地缓存
@Slf4j
public class ResourceUtil {
private ResourceUtil() {
throw new IllegalStateException("Utility class");
}
public static final LoadingCache<String, String> fileTextCache = Caffeine.newBuilder()
.maximumSize(10).expireAfterAccess(100, TimeUnit.MINUTES).build(ResourceUtil::getFileText);
public static String getFileText(String path) {
int len;
ClassPathResource classPathResource = new ClassPathResource(path);
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
InputStream inputStream = classPathResource.getInputStream()) {
byte[] bytes = new byte[inputStream.available()];
while ((len = inputStream.read(bytes)) > -1) {
bos.write(bytes, 0, len);
}
return new String(bos.toByteArray(), StandardCharsets.UTF_8);
} catch (Exception e) {
log.error("获取{}文件失败", path, e);
return null;
}
}
}
设置播放资源
- 设置控制动作(请求头中的SOAPACTION)
- 获取xml模版
- 替换xml中的占位符
@Override
public Result<Void> setResourceUrl(ActionBO actionBO) {
String progress = actionBO.getProgress();
String resourceUrl = actionBO.getResourceUrl();
String resourceTitle = actionBO.getResourceTitle();
String metadata = UPNPActionEnum.URI_METADATA.getXmlText();
metadata = String.format(metadata, resourceTitle, new Date(), resourceUrl);
String xml = UPNPActionEnum.SET_URI.getXmlText();
xml = String.format(xml, progress, resourceUrl, StringEscapeUtils.escapeXml10(metadata));
return executeAction(actionBO, xml);
}
private Result<Void> executeAction(ActionBO actionBO, String xml) {
String actionUrl = actionBO.getActionUrl();
Map<String, String> headerMap = new HashMap<>();
headerMap.put("SOAPACTION", actionBO.getSoapAction());
HttpRespBO httpRespBO = httpPostXml(actionUrl, xml, headerMap);
return Optional.ofNullable(httpRespBO).filter(HttpRespBO::success).map(r -> Result.empty()).orElseGet(() -> {
log.error("执行动作失败,{},{},{}", actionUrl, xml, httpRespBO);
return Result.fail("执行动作失败");
});
}
播放资源
和上面的流程差不多,只不过xml和soapAction(也就是请求头中的SOAPACTION)不一样。有的投屏设备不需要这一步,只需要设置完播放资源就能播放,有的必须有这一步才能播放,为了兼容不同类型的设备,需要在设置完播放资源后再执行一次播放动作。
public Result<Void> playResource(ActionBO actionBO) {
String speed = actionBO.getSpeed();
String progress = actionBO.getProgress();
String xml = UPNPActionEnum.PLAY.getXmlText();
xml = String.format(xml, progress, speed);
return executeAction(actionBO, xml);
}
完整的投屏流程
- 搜索设备,一般用主动搜索就行
- 获取视频名和视频的本地播放地址
- 设置播放资源
- 播放资源
public Result<Void> playVideo(int deviceId, String videoId) {
List<DeviceDescBO> deviceDescList = context.getDeviceDescList();
Assert.isTrue(deviceDescList != null, "未搜索投屏设备");
Assert.isTrue(deviceId < deviceDescList.size(), "设备id错误");
DeviceDescBO deviceDescBO = deviceDescList.get(deviceId);
List<ServiceBO> serviceList = deviceDescBO.getServiceList();
Assert.isNotEmpty(serviceList, "设备服务不存在");
Optional<ServiceBO> serviceOptional = serviceList.stream().filter(s ->
SSDPStEnum.AV_TRANSPORT_V1.getType().equals(s.getServiceType())).findFirst();
Assert.isTrue(serviceOptional.isPresent(), "投屏服务不存在");
ServiceBO serviceBO = serviceOptional.get();
String controlUrl = serviceBO.getControlUrl();
controlUrl = controlUrl.startsWith("/") ? controlUrl.substring(1) : controlUrl;
Result<byte[]> infoResult = videoService.getFileByte(videoId + "/info.txt");
Assert.isTrue(infoResult.isSuccess(), infoResult.getMessage());
String videoInfo = new String(infoResult.getData(), StandardCharsets.UTF_8);
Matcher videoNameMatcher = videoNamePat.matcher(videoInfo);
String videoName = Optional.of(videoNameMatcher).filter(Matcher::find).map(m -> m.group(1)).orElse(null);
ActionBO urlAction = new ActionBO();
urlAction.setProgress("0");
urlAction.setResourceTitle(videoName);
urlAction.setResourceUrl(context.getLocalHost() + "/video/m3u8/" + videoId);
urlAction.setSoapAction("\"" + serviceBO.getServiceType() + "#SetAVTransportURI\"");
urlAction.setActionUrl(NetUtil.resolveRootUrl(deviceDescBO.getUrl()) + "/" + controlUrl);
Result<Void> result = setResourceUrl(urlAction);
Assert.isTrue(result.isSuccess(), result.getCode(), result.getMessage());
urlAction.setSoapAction("\"" + serviceBO.getServiceType() + "#Play\"");
urlAction.setSpeed("1");
urlAction.setProgress("0");
return playResource(urlAction);
}