go语言作为近十年来优秀的现代开发语言的代表,由于继承了c语言的简洁和很多现代语言的表达方式,在广泛的应用场景中得到众多爱好者的喜爱,如何将go和c、c++进行联合开发,拓展整个开发生态,不用重复造轮子,掌握cgo可以让你得心应手的在c和go之间传递信息,打通任督二脉。
go在流媒体传输领域也有很强大的生态和优秀的轮子,比起传统的ffmpeg这种大而全的库,可以选择性的用一些小巧强悍的go语言写的库来替代ffmpeg,比如rtsp拉流,笔者用ffmpeg在android下写了一个推拉流的播放器,但是由于ffmpeg自成体系,在灵活定制方面有一些局限性,于是尝试用go rtsp来代替ffmpeg的rtsp拉流。
首先我们需要利用cgo的交叉编译特性封装一个可以被c/c++调用的动态库,其中用到了cgo 进行设置和回调函数,并传递了类的指针,从而实现了c++ class的特性,达到了面向对象多路连接的设计目标。以下是go写的动态库源码,由于go的包管理做的特别棒,你可以用很少的代码实现一个多路拉流的应用。感觉比c++爽多了。
package main
import (
"fmt"
"sync"
"time"
"unsafe"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtsp"
)
/*
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#cgo CFLAGS: -I.
void OnSendPacket(void * callclass,unsigned char *data, int len,int mediatype);
typedef void (*CallbackFunc)(unsigned char *,int,int);
*/
import "C"
// 导出的回调函数类型
// type
// void SendcallbackFunc(unsigned char* data, int length);
// extern CallbackFunc _callback;
//
// static void setCallbackWrapper(CallbackFunc callback) {
// _callback = callback;
// }
type CallbackFunc func(*byte, int, int)
// 接口定义
type MediaInterface interface {
SetCallback(callback CallbackFunc)
}
// 结构体实现接口
type MediaImplementation struct {
id int
callback CallbackFunc
// C._callback
callclass unsafe.Pointer // 修改为unsafe.Pointer类型
}
var (
instances = make(map[int]*MediaImplementation)
instancesLock sync.Mutex
callbackLock sync.Mutex
)
func (m *MediaImplementation) SendData(data []byte, length int, mediatype int) {
// if C._callback != nil {
fmt.Println("callback data len", length, mediatype)
// arr := (*C.uchar)(unsafe.Pointer(&data[0]))
// C.OnSendPacket(arr, C.int(length), C.int(mediatype))
}
func (m *MediaImplementation) CallBackData(data *byte, length int, mediatype int) {
goSlice := (*[1 << 30]byte)(unsafe.Pointer(data))[:length:length]
m.SendData(goSlice, length, mediatype)
}
func (m *MediaImplementation) RtspClientStart(uri string) {
// m.rtsp = unsafe.Pointer(&Rtspclient{
// callback: m.CallBackData,
// })
// ((*Rtspclient)(m.rtsp)).rtspclient(uri)
m.rtspConsumer(uri)
}
func (m *MediaImplementation) SetCallback(callback CallbackFunc) {
m.callback = callback
}
//export CallRtspClientStart
func CallRtspClientStart(impl unsafe.Pointer, data *C.char) {
if impl == nil {
fmt.Println("Invalid implementation")
return
}
m := (*MediaImplementation)(impl)
m.rtspConsumer(C.GoString(data))
// m.RtspClientStart(C.GoString(data))
}
//export NewMediaImplementation
func NewMediaImplementation() uintptr {
instancesLock.Lock()
defer instancesLock.Unlock()
id := len(instances) + 1
instance := &MediaImplementation{
id: id,
}
instances[id] = instance
return uintptr(unsafe.Pointer(instance))
}
//export GetMediaInstanceByID
func GetMediaInstanceByID(id C.int) uintptr { // 修改返回类型为uintptr
instancesLock.Lock()
defer instancesLock.Unlock()
if instance, ok := instances[int(id)]; ok {
return uintptr(unsafe.Pointer(instance))
}
return uintptr(0) // 返回表示未找到实例
}
//export CallSendData
func CallSendData(impl unsafe.Pointer, data *C.uchar, length C.int, mediatype C.int) {
if impl == nil {
fmt.Println("Invalid implementation")
return
}
goSlice := (*[1 << 30]byte)(unsafe.Pointer(data))[:length:length]
m := (*MediaImplementation)(impl)
go m.SendData(goSlice, int(length), int(mediatype))
}
//export SetCallbackWrapper
func SetCallbackWrapper(impl unsafe.Pointer, callback unsafe.Pointer) {
if impl == nil {
fmt.Println("Invalid implementation")
return
}
m := (*MediaImplementation)(impl)
m.callclass = callback
fmt.Println("SetCallbackWrapper callback", m.callback, callback)
// C._callback = callbackFunc
// m.SetCallback(callbackFunc)
}
func main() {}
func (m *MediaImplementation) rtspConsumer(uri string) {
annexbNALUStartCode := func() []byte { return []byte{0x00, 0x00, 0x00, 0x01} }
//fmt.Println("rtspConsumer starting...")
session, err := rtsp.DialTimeout(uri, 10*time.Second)
// defer session.Close()
if err != nil {
fmt.Errorf("rtsp Dial Error: %v", err)
return
//panic(err)
}
//session.RtpKeepAliveTimeout = 10 * time.Second
session.RtpTimeout = 20 * time.Second
session.RtpKeepAliveTimeout = 5 * time.Second
session.RtspTimeout = 20 * time.Second
codecs, err := session.Streams()
if err != nil {
fmt.Errorf("stream error: %v", err)
// continue
//panic(err)
}
for i, t := range codecs {
fmt.Printf("Stream", i, "is of type", t.Type().String())
}
if codecs[0].Type() != av.H264 {
//fmt.Println("RTSP feed must begin with a H264 codec")
// continue
//panic("RTSP feed must begin with a H264 codec")
}
if len(codecs) != 1 {
//fmt.Println("Ignoring all but the first stream.")
}
// var previousTime time.Duration
for {
// select {
// case <-rtspsrcch:
// default:
// if KVMrtsp.BInUse != true {
// break
// }
pkt, err := session.ReadPacket()
if err != nil {
break
}
// if pkt.Idx != 0 {
// //audio or other stream, skip it
// continue
// }
if codecs[pkt.Idx].Type().IsVideo() {
pkt.Data = pkt.Data[4:]
// For every key-frame pre-pend the SPS and PPS
if pkt.IsKeyFrame {
pkt.Data = append(annexbNALUStartCode(), pkt.Data...)
pkt.Data = append(codecs[0].(h264parser.CodecData).PPS(), pkt.Data...)
pkt.Data = append(annexbNALUStartCode(), pkt.Data...)
pkt.Data = append(codecs[0].(h264parser.CodecData).SPS(), pkt.Data...)
pkt.Data = append(annexbNALUStartCode(), pkt.Data...)
} else {
pkt.Data = append(annexbNALUStartCode(), pkt.Data...)
}
// bufferDuration := pkt.Time - previousTime
// previousTime = pkt.Time
// m.CallBackData(pkt.Data,len(pkt.Data))
fmt.Println("Is Video IsKeyFrame", pkt.IsKeyFrame, codecs[pkt.Idx].Type(), pkt.Time)
mediatype := 0
switch codecs[pkt.Idx].Type() {
case av.H264:
mediatype = 1
case av.H265:
mediatype = 2
}
length := len(pkt.Data)
arr := (*C.uchar)(unsafe.Pointer(&pkt.Data[0]))
C.OnSendPacket(m.callclass, arr, C.int(length), C.int(mediatype))
// if err = KVMrtsp.Track.WriteSample(media.Sample{Data: pkt.Data, Duration: bufferDuration}); err != nil && err != io.ErrClosedPipe {
// logger.Errorf("WriteSample error %v", err)
// break
// //panic(err)
// }
} else if codecs[pkt.Idx].Type().IsAudio() {
// codecs, err = session.Streams()
// if err != nil {
// fmt.Println("Error getting streams")
// break
// }
codec := codecs[pkt.Idx].(av.AudioCodecData)
duration, err := codec.PacketDuration(pkt.Data)
if err != nil {
fmt.Println("Failed to get duration for audio:", err)
break
}
fmt.Println("IS Audio Packet duration:", duration)
mediatype := 0
switch codecs[pkt.Idx].Type() {
case av.AAC:
mediatype = 3
case av.PCM_MULAW:
mediatype = 4
case av.PCM_ALAW:
mediatype = 5
case av.SPEEX:
mediatype = 6
case av.NELLYMOSER:
mediatype = 7
case av.PCM:
mediatype = 8
case av.OPUS:
mediatype = 9
}
length := len(pkt.Data)
arr := (*C.uchar)(unsafe.Pointer(&pkt.Data[0]))
C.OnSendPacket(m.callclass, arr, C.int(length), C.int(mediatype))
// m.CallBackData(pkt.Data,len(pkt.Data))
// err = audioTrack.WriteSample(media.Sample{Data: pkt.Data, Duration: duration})
// if err != nil {
// //fmt.Println("Failed to write audio sample", err)
// break
// }
}
}
if err = session.Close(); err != nil {
fmt.Errorf("session Close error %v", err)
return
}
// time.Sleep(5 * time.Second)
// }
}