多媒体服务器核心实现(流管理)

news2024/11/23 16:32:15

       多媒体服务器比较多,实现的功能也很复杂,但其核心就是是转协议,流管理,连接管理,就是一个时序状态机和信令结合的系统。现在的生态有很多现成的轮子,c/c++ go实现的均可以拿来就用,只需要按一定的组织逻辑构建在一起就可以成为一个不错的多媒体服务器。

    我对流管理的理解是,多媒体服务器包含多个流,这个流实际就是类似rtmp rtsp gb28181 webrtc等推流或者拉流,本质核心就是H264/H265 AAC opus 等实时流数据,多媒体服务器就是将不同协议头解析了得到这些原始数据转换成播放器或者客户端需要的数据格式,比如我这里就是统一转换成webrtc (rtp)让浏览器播放器进行播放。代码已开源,应为简单,效果也不错,可以作为入门级学习资料,本人亦在此基础上实现了上1000路的压测,同时支持H265 及p2p 级联组网GitHub - xiangxud/rtmp_webrtc_server: rtmp stream push and webrtc pull use opus and h264 ,mqtt for signal cmd message transfor ,device & peers manage

本人能力有限,欢迎大佬指点(q 365603975)。

服务器内置了mqtt传输json作为信令,灵活且利于大批量设备管理,逻辑可以由两端则来界定

livekit视频会议端的效果

 简易的播放器,支持p2p及流媒体转发,支持rtmp及webrtc的转发

    主要的几个结构体如下

1、p2p连接对象,主要针对webrtc, peerConnection连接以及承载转发任务各种Track,每次建立连接及创建一个peer

type Peer struct {
	peerId                       string
	peerName                     string
	userName                     string
	streamName                   string
	passWord                     string
	status                       StreamState
	startTime                    time.Time
	endTime                      time.Time
	peerConnection               *webrtc.PeerConnection
	videoTrack, audioTrack       *webrtc.TrackLocalStaticSample
	videoRTPTrack, audioRTPTrack *webrtc.TrackLocalStaticRTP

	peerInterface
}

  2、stream 流是一种资源,也相当于生产者,供很多消费者peers来使用,stream跟peer是一对多的关系,它由多媒体流输入来产生,同时支持向livekit room发布流,需要消费的peer统一由pees 的map来管理,根据连接状态进行增删等操作

type Stream struct {
	streamId       string
	streamName     string
	streamType     string
	userName       string
	passWord       string
	status         StreamState
	startTime      time.Time
	endTime        time.Time
	peers          map[string]*Peer
	audioDecoder   *fdkaac.AacDecoder
	audioEncoder   *opus.Encoder
	remotetrack    []*webrtc.TrackRemote
	audioBuffer    []byte
	audioClockRate uint32
	//room                   *lksdk.Room //for publish to livekit room ,first create room as streamname for livekit,then publish track to livekit
	room *livekitclient.Room
	// videoTrack, audioTrack *webrtc.TrackLocalStaticSample
	streamPeerinterface
}

 3、流管理,一个多媒体服务可以管理多个流,同时也可以有多个livekit 房间,设备推流可以指定推送至某房间,可以有播放器或客户端的信令来实现逻辑适配

type StreamManager struct {
	streams    map[string]*Stream
	roomMap    map[string]*livekitclient.Room //多房间管理
	deviceroom *livekitclient.Device_Room     //设备房间对应关系
	streamsinterface
	ctx context.Context
	// livekitclient.Room
}

 4、详细的方法实现如下,此构成了多媒体服务的核心之一,流的生成及消费和状态管理,以及连接的生成和状态管理,实现媒体的发布和消费,流就可以真正的流转起来了

package media

import (
	"context"
	"encoding/binary"
	"encoding/hex"
	"errors"
	"fmt"
	"time"

	"github.com/Glimesh/go-fdkaac/fdkaac"
	"github.com/pion/webrtc/v3"
	"github.com/pion/webrtc/v3/pkg/media"
	"github.com/xiangxud/rtmp_webrtc_server/config"
	"github.com/xiangxud/rtmp_webrtc_server/livekitclient"

	// "github.com/xiangxud/rtmp_webrtc_server/livekitclient"
	"github.com/xiangxud/rtmp_webrtc_server/log"
	opus "github.com/xiangxud/rtmp_webrtc_server/opus"
)

type StreamState byte

const (
	PEER_INIT StreamState = iota
	PEER_CONNECT
	PEER_TIMEOUT
	PEER_CLOSED
	PEER_DEADLINE
)
const (
	STREAM_INIT StreamState = iota
	STREAM_CONNECT
	STREAM_TIMEOUT
	STREAM_CLOSED
	STREAM_DEADLINE
)
const (
	USE_SFU_ION     = true
	USE_SFU_LIVEKIT = false
)

type peerInterface interface {
	InitPeer(peerid int64,
		peername string,
		username string,
		password string) error
	AddConnect(*webrtc.PeerConnection)
	AddAudioTrack(*webrtc.TrackLocalStaticSample)
	AddVideoTrack(*webrtc.TrackLocalStaticSample)
	AddAudioRTPTrack(*webrtc.TrackLocalStaticRTP)
	AddVideoRTPTrack(*webrtc.TrackLocalStaticRTP)
	SendPeerAudio([]byte) error
	SendPeerVideo([]byte) error
	VerifyPeer()
}

//webrtc 客户
type Peer struct {
	peerId                       string
	peerName                     string
	userName                     string
	streamName                   string
	passWord                     string
	status                       StreamState
	startTime                    time.Time
	endTime                      time.Time
	peerConnection               *webrtc.PeerConnection
	videoTrack, audioTrack       *webrtc.TrackLocalStaticSample
	videoRTPTrack, audioRTPTrack *webrtc.TrackLocalStaticRTP

	peerInterface
}

func (p *Peer) InitPeer(peerid string,
	peername string,
	username string,
	password string) {
	p.peerId = peerid
	p.peerName = peername
	p.userName = username
	p.passWord = password
	p.status = PEER_INIT
}
func (p *Peer) AddConnect(streamname string, pconn *webrtc.PeerConnection) {
	p.peerConnection = pconn
	p.startTime = time.Now()
	p.streamName = streamname
	p.status = PEER_CONNECT

}
func (p *Peer) AddAudioTrack(track *webrtc.TrackLocalStaticSample) {
	p.audioTrack = track
}
func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticSample) {
	p.videoTrack = track
}
func (p *Peer) AddAudioRTPTrack(track *webrtc.TrackLocalStaticRTP) {
	p.audioRTPTrack = track
}
func (p *Peer) AddVideoRTPTrack(track *webrtc.TrackLocalStaticRTP) {
	p.videoRTPTrack = track
}
func (p *Peer) SendPeerAudio(audiodata []byte) error {
	if p.status != PEER_CONNECT {
		// return fmt.Errorf("peer is not connected,status %d", p.status)
		// log.Debug("SendPeerAudio peer ", p.peerName, " is not connected,status ", p.status)
		return nil
	}
	if p.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
		p.status = PEER_CLOSED
		p.endTime = time.Now()
		log.Debug("SendPeerAudio peer ", p.peerName, " is closed,status ", p.status)
		return nil
	}
	if config.Config.Stream.Debug {
		log.Debug("SendPeerAudio peer name:", p.peerName, "stream name:", p.streamName, "opus len:", len(audiodata))
	}
	if audioErr := p.audioTrack.WriteSample(media.Sample{
		Data:     audiodata,
		Duration: 20 * time.Millisecond,
	}); audioErr != nil {
		log.Debug("WriteSample err", audioErr)
		return fmt.Errorf("WriteSample err %s", audioErr)
	}
	return nil
}
func (p *Peer) SendPeerVideo(videodata []byte) error {
	if p.status != PEER_CONNECT {
		// return fmt.Errorf("peer is not connected,status %d", p.status)
		// log.Debug("SendPeerVideo peer ", p.peerName, " is not connected,status ", p.status)
		return nil
	}
	if p.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
		p.status = PEER_CLOSED
		p.endTime = time.Now()
		// return fmt.Errorf("peer is closed,status %d", p.status)
		log.Debug("SendPeerVideo peer ", p.peerName, " is closed,status ", p.status)
		return nil
	}

	if videoErr := p.videoTrack.WriteSample(media.Sample{
		Data:     videodata,
		Duration: time.Second / 30,
	}); videoErr != nil {
		log.Debug("WriteSample err", videoErr)
		return fmt.Errorf("WriteSample err %s", videoErr)
	}
	return nil
}

func (p *Peer) SendPeerRTPAudio(audiodata []byte) error {
	if p.status != PEER_CONNECT {
		// return fmt.Errorf("peer is not connected,status %d", p.status)
		// log.Debug("SendPeerAudio peer ", p.peerName, " is not connected,status ", p.status)
		return nil
	}
	if p.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
		p.status = PEER_CLOSED
		p.endTime = time.Now()
		log.Debug("SendPeerAudio peer ", p.peerName, " is closed,status ", p.status)
		return nil
	}
	if config.Config.Stream.Debug {
		log.Debug("SendPeerAudio peer name:", p.peerName, "stream name:", p.streamName, "opus len:", len(audiodata))
	}
	if p.audioRTPTrack == nil {
		return fmt.Errorf("peer audioRTPTrack is nil")
	}
	if _, audioErr := p.audioRTPTrack.Write(audiodata); audioErr != nil {
		log.Debug("WriteSample err", audioErr)
		return fmt.Errorf("WriteSample err %s", audioErr)
	}

	return nil
}
func (p *Peer) SendPeerRtpVideo(videodata []byte) error {
	if p.status != PEER_CONNECT {
		// return fmt.Errorf("peer is not connected,status %d", p.status)
		// log.Debug("SendPeerVideo peer ", p.peerName, " is not connected,status ", p.status)
		return nil
	}
	if p.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
		p.status = PEER_CLOSED
		p.endTime = time.Now()
		// return fmt.Errorf("peer is closed,status %d", p.status)
		log.Debug("SendPeerVideo peer ", p.peerName, " is closed,status ", p.status)
		return nil
	}
	if p.videoRTPTrack == nil {
		return fmt.Errorf("peer videoRTPTrack is nil")
	}
	if _, videoErr := p.videoRTPTrack.Write(videodata); videoErr != nil {
		log.Debug("WriteSample err", videoErr)
		return fmt.Errorf("WriteSample err %s", videoErr)
	}
	return nil
}

type streamPeerinterface interface {
	InitStream(streamid string,
		streamname string,
		username string,
		password string)
	AddPeer(*Peer) error
	DeletePeer(Peer) error
	GetPeer(string) (*Peer, error)
	SetPeer(string, *Peer) error
	SetOpusCtl()
	InitAudio([]byte) error
	InitVideo([]byte) error
	SendStreamAudio([]byte) error
	SendStreamVideo([]byte) error
	// SetAudiodecoder(*fdkaac.AacDecoder) (error)
}

//rtmp 流
type Stream struct {
	streamId       string
	streamName     string
	streamType     string
	userName       string
	passWord       string
	status         StreamState
	startTime      time.Time
	endTime        time.Time
	peers          map[string]*Peer
	audioDecoder   *fdkaac.AacDecoder
	audioEncoder   *opus.Encoder
	remotetrack    []*webrtc.TrackRemote
	audioBuffer    []byte
	audioClockRate uint32
	//room                   *lksdk.Room //for publish to livekit room ,first create room as streamname for livekit,then publish track to livekit
	room *livekitclient.Room
	// videoTrack, audioTrack *webrtc.TrackLocalStaticSample
	streamPeerinterface
}

func (s *Stream) SetRemoteTrack(remotetrack *webrtc.TrackRemote) {
	s.remotetrack = append(s.remotetrack, remotetrack)
}
func (s *Stream) GetRemoteTrack() ([]*webrtc.TrackRemote, error) {
	if s.remotetrack == nil {
		return nil, errors.New(" not remotetrack")
	} else {
		return s.remotetrack, nil
	}
}
func (s *Stream) InitStream(streamid string,
	streamname string,
	username string,
	password string,
	streamtype string) {
	s.streamId = streamid
	s.streamName = streamname
	s.userName = username
	s.passWord = password
	s.streamType = streamtype
	s.peers = make(map[string]*Peer)
	s.status = STREAM_INIT
	s.startTime = time.Now()
}
func (s *Stream) AddPeer(p *Peer) error {
	if s.peers == nil || p == nil {
		s.peers = make(map[string]*Peer, 0)
	}
	if s.peers[p.peerName] != nil {
		log.Debug(p.peerName, " peer is exsit,reset peer")
	}
	if p.peerName != "" {
		// p.status = PEER_CONNECT
		// p.startTime = time.Now()
		s.peers[p.peerName] = p
		return nil
	}
	return errors.New("peer is null")
}
func (s *Stream) DeletePeer(p *Peer) error {
	if s.peers == nil || s.peers[p.peerName] == nil || p == nil {
		return errors.New("peers is not exsit")
	}
	if p.peerName != "" {
		delete(s.peers, p.peerName)
		return nil
	}

	return errors.New("peer is not exsit")
}

func (s *Stream) GetPeer(peername string) (*Peer, error) {
	if s.peers == nil {
		return nil, errors.New("peers is not exsit")
	}
	if p := s.peers[peername]; p == nil {
		return nil, fmt.Errorf("peer %s is not exsit", peername)
	} else {
		return p, nil
	}
}
func (s *Stream) SetPeer(peername string, pp *Peer) error {
	if s.peers == nil || pp == nil {
		return errors.New("peers is not exsit")
	}
	if p := s.peers[peername]; p == nil || pp.peerName != peername {
		return fmt.Errorf("peer %s is not exsit", peername)
	} else {
		s.peers[peername] = pp
		return nil
	}
}

func (s *Stream) SetOpusCtl() {
	s.audioEncoder.SetMaxBandwidth(opus.Bandwidth(2))
	s.audioEncoder.SetComplexity(9)
	s.audioEncoder.SetBitrateToAuto()
	s.audioEncoder.SetInBandFEC(true)
}
func (s *Stream) InitAudio(data []byte) error {
	encoder, err := opus.NewEncoder(48000, 2, opus.AppAudio)
	if err != nil {
		log.Debug(err.Error())
		return err
	}
	s.audioEncoder = encoder
	s.SetOpusCtl()
	s.audioDecoder = fdkaac.NewAacDecoder()
	s.audioDecoder.InitRaw(data)
	return nil
}
func (s *Stream) ReleaseAudio() {
	s.audioDecoder.Close()
	s.audioEncoder.Close()
}
func (s *Stream) SendStreamAudio(datas []byte) []error {
	var errs []error
	if s.audioDecoder == nil {
		log.Debug("decoder is released")
		errs = append(errs, fmt.Errorf("decoder is released"))
		return errs
	}
	pcm, err := s.audioDecoder.Decode(datas)
	if err != nil {
		log.Debug("decode error: ", hex.EncodeToString(datas), err)
		errs = append(errs, fmt.Errorf("decode error"))
		return errs
	}
	if config.Config.Stream.Debug {
		log.Debug("\r\npcm len ", len(pcm), " ->") //, pcm)
	}
	blockSize := 960
	for s.audioBuffer = append(s.audioBuffer, pcm...); len(s.audioBuffer) >= blockSize*4; s.audioBuffer = s.audioBuffer[blockSize*4:] {
		pcm16 := make([]int16, blockSize*2)
		pcm16len := len(pcm16)
		for i := 0; i < pcm16len; i++ {
			pcm16[i] = int16(binary.LittleEndian.Uint16(s.audioBuffer[i*2:]))
		}
		bufferSize := 1024
		opusData := make([]byte, bufferSize)
		if s.audioEncoder == nil {
			log.Debug("encoder is released")
			errs = append(errs, fmt.Errorf("encoder is released"))
			return errs
		}
		n, err := s.audioEncoder.Encode(pcm16, opusData)
		// n, err := h.audioEncoder.ReadEncode(pcm16, opusData)
		if err != nil {
			errs = append(errs, err)
			return errs
		}
		opusOutput := opusData[:n]
		// m:=GetRoom
		// room, err := h.streammanager.GetRoom("")
		if s.room == nil {
			//log.Debug("stream room is null")
		} else {
			//960 48k 20ms/per
			if s.streamType != "RTP" {
				//send to sfu server
				if config.Config.Stream.IONSfuEnable {
					s.room.TrackSendIonData(s.streamName, "audio", opusOutput, 20*time.Millisecond)
				}
				if config.Config.Stream.LiveKitSfuEnable {
					s.room.TrackSendLivekitData(s.streamName, "audio", opusOutput, 20*time.Millisecond)
				}
			}
		}
		for pname, p := range s.peers {
			if config.Config.Stream.Debug {
				log.Debug("peer ", pname)
			}
			if p.streamName == s.streamName {
				//log.Printf(" send audio data ")

				err := p.SendPeerAudio(opusOutput)
				if err != nil {
					log.Debug("error", err)
					errs = append(errs, err)
				}

			}
		}

	}
	return errs
}

func (s *Stream) SendStreamAudioFromWebrtc(datas []byte) []error {
	var errs []error

	opusOutput := datas
	// m:=GetRoom
	// room, err := h.streammanager.GetRoom("")
	if s.room != nil && s.status == STREAM_CONNECT {
		//log.Debug("stream room is null")
		// } else {
		//960 48k 20ms/per
		if s.streamType == "RTP" {

			//send audio stream to ion sfu server
			if config.Config.Stream.IONSfuEnable {
				s.room.TrackSendIonRtpPackets(s.streamName, "audio", opusOutput)
			}
			//send audio stream to livekit sfu server
			if config.Config.Stream.LiveKitSfuEnable {
				s.room.TrackSendLivekitRtpPackets(s.streamName, "audio", opusOutput)
			}
		} else {
			if config.Config.Stream.IONSfuEnable {
				s.room.TrackSendIonData(s.streamName, "audio", opusOutput, 20*time.Millisecond)
			}
			if config.Config.Stream.LiveKitSfuEnable {
				s.room.TrackSendLivekitData(s.streamName, "audio", opusOutput, 20*time.Millisecond)
			}
		}
	}
	for pname, p := range s.peers {
		if config.Config.Stream.Debug {
			log.Debug("peer ", pname)
		}
		if p.streamName == s.streamName {
			//log.Printf(" send audio data ")
			if s.streamType == "RTP" {
				err := p.SendPeerRTPAudio(opusOutput)
				if err != nil {
					log.Debug("error", err)
					errs = append(errs, err)
				}
			} else {
				err := p.SendPeerAudio(opusOutput)
				if err != nil {
					log.Debug("error", err)
					errs = append(errs, err)
				}
			}
		}
	}

	return errs
}

func (s *Stream) IsRtpStream() bool {
	return s.streamType == "RTP"
}
func (s *Stream) SendStreamVideo(datas []byte) []error {
	var errs []error
	if s.streamType == "RTP" {
		if s.room != nil && s.status == STREAM_CONNECT {
			//send rtp stream to ion sfu
			if config.Config.Stream.IONSfuEnable {
				s.room.TrackSendIonRtpPackets(s.streamName, "video", datas)
			}
			//send rtp stream to livekit sfu
			if config.Config.Stream.LiveKitSfuEnable {
				s.room.TrackSendLivekitRtpPackets(s.streamName, "video", datas)
			}
		}
	} else {
		if s.room != nil && s.status == STREAM_CONNECT {
			if config.Config.Stream.IONSfuEnable {
				s.room.TrackSendIonData(s.streamName, "video", datas, time.Second/30)
			}
			if config.Config.Stream.LiveKitSfuEnable {
				s.room.TrackSendLivekitData(s.streamName, "video", datas, time.Second/30)
			}
		}
	}
	for pname, p := range s.peers {
		if config.Config.Stream.Debug {
			log.Debug("peer ", pname)
		}
		if p.streamName == s.streamName {
			if config.Config.Stream.Debug {
				log.Debug(" send video data ")
			}
			if s.streamType == "RTP" {
				err := p.SendPeerRtpVideo(datas)
				if err != nil {
					log.Debug("error", err)
					errs = append(errs, err)
				}
			} else {
				err := p.SendPeerVideo(datas)
				if err != nil {
					errs = append(errs, err)
				}
			}
		}
	}
	return errs
}

type streamsinterface interface {
	InitStreamManage(ctx context.Context)
	AddStream(*Stream) error
	DeleteStream(*Stream) error
	GetStream(string) (*Stream, error)
	SetStream(string, *Stream) error
}

//流管理
type StreamManager struct {
	streams    map[string]*Stream
	roomMap    map[string]*livekitclient.Room //多房间管理
	deviceroom *livekitclient.Device_Room     //设备房间对应关系
	streamsinterface
	ctx context.Context
	// livekitclient.Room
}

func (m *StreamManager) InitStreamManage(ctx context.Context) {
	m.streams = make(map[string]*Stream, 0)
	m.roomMap = make(map[string]*livekitclient.Room)
	m.ctx = ctx

	room := livekitclient.NewRoom(ctx, &config.Config.Livekit.Token)
	// sn, _ := identity.GetSN()
	//create livekit room with this server's Identity like mac addr
	if config.Config.Stream.LiveKitSfuEnable {
		roomname := config.Config.Livekit.Token.Identity // + "->LIVEKIT"
		_, err := room.CreateliveKitRoom(roomname)
		if err != nil {
			log.Debug("livekit room create failed: ", err)
		}
		roomname = config.Config.Livekit.Token.Identity
		m.roomMap[roomname] = room
	}
	//create ion room  with this server's Identity like mac addr
	if config.Config.Stream.IONSfuEnable {
		roomname := config.Config.Livekit.Token.Identity // + "->ION"
		_, err := room.CreateIonRoom(config.Config.Livekit.Token.HostIon, roomname)
		if err != nil {
			log.Debug("ion room create failed: ", err)
		}
		roomname = config.Config.Livekit.Token.Identity
		m.roomMap[roomname] = room
	}

	m.deviceroom = livekitclient.NewDevice_Room()

}
func (m *StreamManager) GetDefaultRoom() *livekitclient.Room {

	return m.roomMap[config.Config.Livekit.Token.Identity]
}
func (m *StreamManager) GetRoomByName(roomname string) *livekitclient.Room {
	return m.roomMap[roomname]
}
func (m *StreamManager) AddStream(s *Stream) error {
	if m.streams == nil || s == nil {
		return errors.New("stream is not exsit")
	}
	if s.streamName != "" {
		// m.streams[s.streamName] = s
		//原来存在这个源就直接改状态
		if ss := m.streams[s.streamName]; ss == nil {
			s.room = m.GetDefaultRoom()
			s.status = STREAM_CONNECT
			m.streams[s.streamName] = s
		} else {
			ss.room = m.GetDefaultRoom()
			ss.startTime = time.Now()
			ss.status = STREAM_CONNECT
			m.streams[s.streamName] = ss
		}
		if s.streamType == "RTP" {
			//publish to ion sfu
			if config.Config.Stream.IONSfuEnable {
				m.GetDefaultRoom().RTPTrackPublished_to_ION(s.remotetrack, s.streamName)
			}
			//publish to livekit sfu
			if config.Config.Stream.LiveKitSfuEnable {
				m.GetDefaultRoom().RTPTrackPublished(s.remotetrack, s.streamName)
			}
		} else {
			if config.Config.Stream.IONSfuEnable {
				m.GetDefaultRoom().TrackPublished_to_ION(s.streamName)
			}
			if config.Config.Stream.LiveKitSfuEnable {
				m.GetDefaultRoom().TrackPublished(s.streamName)
			}
		}
		return nil
	} else {
		return errors.New("stream is not exsit")
	}

}
func (m *StreamManager) DeleteStream(name string) error {
	if m.streams == nil || name == "" {
		return errors.New("stream is not exsit")
	}
	s := m.streams[name]

	if s != nil {
		s.status = STREAM_DEADLINE
		if config.Config.Stream.LiveKitSfuEnable {
			m.GetDefaultRoom().TrackClose(s.streamName)
		}
		if config.Config.Stream.IONSfuEnable {
			m.GetDefaultRoom().TrackCloseION(s.streamName)
		}
		// go func() { //防止正在使用,先设置状态,然后延迟再删除,如果还有冲突,就先不删除,一般是在stream推流关闭时才调用一般不会出问题
		// 	time.Sleep(time.Duration(2) * time.Second)
		// 	s.ReleaseAudio()
		// 	delete(m.streams, name)
		// }()
		return nil
	} else {
		return errors.New("stream is not exsit")
	}

}
func (m *StreamManager) GetStream(streamname string) (*Stream, error) {
	if m.streams == nil {
		return nil, errors.New("stream is not exsit")
	}
	if streamname != "" {
		return m.streams[streamname], nil
	} else {
		return nil, errors.New("streamname is null")
	}
}
func (m *StreamManager) GetRoom(roomname string) (*livekitclient.Room, error) {
	if m.roomMap[roomname] == nil {
		return nil, fmt.Errorf("room(%s) is not exsit", roomname)
	}
	return m.roomMap[roomname], nil
}
func (m *StreamManager) GetDeviceRoom() *livekitclient.Device_Room {
	return m.deviceroom
}
func (m *StreamManager) SetStream(name string, s *Stream) error {
	if m.streams == nil || s == nil {
		return errors.New("stream is not exsit")
	}
	if name != "" && name == s.streamName {
		m.streams[name] = s
		return nil
	} else {
		return errors.New("streamname is null")
	}
}
func (m *StreamManager) EndRTMP() {
	room := m.GetDefaultRoom()
	if room != nil {
		room.Close()
	}
}
func (m *StreamManager) EndAll() {
	for _, room := range m.roomMap {
		room.Close()
	}
}

//全局变量
var (
	Global_StreamM        StreamManager
	StreamPeersForConnect map[string]*Peer //不存在的流,考虑收到peer时通过mqtt命令向设备发起推流指令,比较好的策略是有客户端向设备询问是否在推流
)

func CreateGlobalStreamM(ctx context.Context) *StreamManager {
	Global_StreamM.InitStreamManage(ctx)
	return &Global_StreamM
}
func GetGlobalStreamM() *StreamManager {

	return &Global_StreamM
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/126797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

插槽,依赖注入,动态组件,异步组件,内置组件

插槽&#xff1a;父组件和子组件内容的一个通信 子组件使用<slot>接收父组件传入的内容 如果内容有多个标签时&#xff0c;使用<template>包裹 默认插槽&#xff1a; <template v-slot:default><h2>标题</h2><p>插槽内容</p> <…

Windows——编写jar启动脚本和关闭脚本

文章目录前言启动脚本编写关闭脚本restart.bat 重启脚本前言 假设项目打包后&#xff0c;项目结构为&#xff1a; 此时如果需要再windows环境中进行项目的启动或关闭&#xff0c;需要频繁的手敲命令&#xff0c;很不方便。此时可以编写.bat脚本文件进行项目的控制。 启动脚本…

就业信息追踪|基于Springboot+Vue开发实现就业信息追踪系统

作者主页&#xff1a;编程指南针 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容&#xff1a;Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 收藏点赞不迷路 关注作者有好处 文末获取源…

双向链表,添加,删除一个节点

文章目录前言一、创建双向链表&#xff08;重命名&#xff09;二、添加一个节点1.添加头指针&#xff1a;2.若 头指针为空3.若头指针非空三、删除一个节点1.找到某节点2.将节点从链表中删除四. 展示所有的节点五. 实验效果总结前言 链表有几种&#xff0c;大致分为&#xff1a…

小程序之会议OA项目--其他界面

目录一、tabs组件及会议管理布局1、tabs.js2、tabs.wxml3、tabs.wxss4、app.wxss5、list.js6、list.json7、list.wxml二、个人中心布局1、ucenter/index/index.js2、ucenter/index/index.wxml3、ucenter/index/index.wxss一、tabs组件及会议管理布局 1、tabs.js // component…

UDS - 15.2 RequestDownload (34) service

15.2 请求下载(34)服务 来自&#xff1a;ISO 14229-1-2020.pdf 15.2.1 服务描述 客户机使用requestDownload服务发起从客户机到服务器的数据传输(下载)。 在服务器接收到requestDownload请求消息之后&#xff0c;服务器应该在发送积极响应消息之前采取所有必要的操作来接收数据…

常用图像像素格式 NV12、NV2、I420、YV12、YUYV

文章目录目的RGBYUVYCrCb采样格式YUV 4:4:4 采样YUV 4:2:2 采样YUV 4:2:0 采样YUV 存储格式YUV422&#xff1a;YUYV、YVYU、UYVY、VYUYYUV420&#xff1a;I420、YV12、NV12,、NV21扩展目的 了解常用图像像素格式 RGB 和 YUV,像素格式描述了像素数据存储所用的格式&#xff0c;…

Spring MVC框架学习

前言:本篇博客将从三个方面来写我们要学习SpringMVC的什么: 连接:当用户在游览器中输入一个url之后,能将这个url请求映射到自己写的程序,也就是访问一个地址时,能够连接到门自己写的服务器. 获取参数:用户访问时如果带一些参数,我该怎样获取.返回数据:执行业务代码之后…

NVM实现一台电脑对node的多版本管理。

一、NVM&#xff1a;Node Version Management&#xff1b; 下载地址&#xff1a;Releases coreybutler/nvm-windows GitHubA node.js version management utility for Windows. Ironically written in Go. - Releases coreybutler/nvm-windowshttps://github.com/coreybutl…

JavaScript寒假系统学习之数组(一)

JavaScript寒假系统学习之数组&#xff08;一&#xff09;一、数组1.1 什么是数组1.2 数组创建的2种方式1.2.1 利用new创建数组1.2.2 利用数组字面量创建数组1.3 访问数组元素1.4 遍历数组1.5 数组实战训练1.5.1 计算数组的和以及平均值1.5.2 求数组中的最大值1.5.3 数组转化为…

使用Qemu在Windows上模拟arm平台并安装debian10 arm系统(cd镜像) 安装记录

参考&#xff1a;使用Qemu在Windows上模拟arm平台并安装国产化操作系统_viyon_blog的博客-CSDN博客_qemu windows 镜像&#xff1a;debian-10.12.0-arm64-xfce-CD-1.iso 环境&#xff1a;qemu虚拟机&#xff0c;宿主机win10,amd64 QEMU_EFI.fd: (298条消息) qemu虚拟机的bi…

N皇后问题-leetcode51-java回溯解+详细优化过程

说明&#xff1a;问题描述来源leetcode 一、问题描述&#xff1a; 51. N 皇后 难度困难1592 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之…

实验八、直接耦合多级放大电路的调试

一、题目 两级直接耦合放大电路的调试。 二、仿真电路 图1(a)所示电路为两级直接耦合放大电路&#xff0c;第一级为双端输入、单端输出差分放大电路&#xff0c;第二级为共射放大电路。 由于在分立元件中很难找到在任何温度下均具有完全相同特性的两只晶体管&#xff0c;因而…

Active Directory 基础 —— 如何理解group的类型

因为创建一个跨域的组,重新温习了一下最基本的AD知识,所谓温故而知新,把温习的结果整理了一下。AD里面的group类型从范围来说分为global, universal 和 local domain, 从类型来分分为security和distribution。后面的类型理解很容易,security就是纯粹用来权限访问的,而dist…

Java实现FIFO、LRU、LFU、OPT四页面置换算法

题目要求 采用多道程序思想设计一个程序&#xff0c;模拟页存储管理地址变换的过程&#xff0c;可采用FIFO、LRU、LFU、OPT四页面置换算法。基本要求如下&#xff1a; 需要建立访问页表线程、访问快表线程、缺页中断处理线程、访问内存线程等&#xff0c;协同这些线程模拟完成…

JDK17升级之路:JCE cannot authenticate the provider BC问题

问题的产生 报错代码运行环境 JDK&#xff1a;Oracle JDK17 CentOS7.8 这个问题刚拿到比较棘手。原因是本地windows是OK的&#xff0c;centos上是不成功的&#xff0c;报了下面的错误&#xff1a; Caused by: java.lang.SecurityException: JCE cannot authenticate the provi…

论文阅读 DeepGCNs: Can GCNs Go as Deep as CNNs?

DeepGCNs: Can GCNs Go as Deep as CNNs?绪论1、介绍2、相关工作3、方法3.1、图神经网络的表针学习3.2、图神经网络的残差结构3.3、图神经网络的密集连接3.4、图神经网络的扩张性聚集绪论 CNN很强&#xff0c;但不能正确解决非欧几里得数据的问题&#xff0c;图卷积网络&…

YOLO-V5 系列算法和代码解析(五)—— 损失函数

文章目录基本简介调试准备损失函数基本简介 损失函数是神经网络的重要组成部分&#xff0c;用于评估网络的预测值和真实值的差异度。根据偏差的大小&#xff0c;反向调整网络的训练参数&#xff0c;迭代优化使得损失尽量小&#xff0c;也就得到最优的网络参数。 调试准备 debu…

Go-学生教务管理系统【无界面 / 离线版】(一)

【Go】学生教务管理系统&#xff08;无界面 / 离线版&#xff09;&#xff08;一&#xff09;Ⅰ. 程序说明一、博客日期二、引言Ⅱ. 版权声明Ⅲ. 开发环境一、开发配置二、开发工具Ⅳ. 效果演示一、工程结构&#xff08;一&#xff09;目录结构&#xff08;二&#xff09;目录说…

【Ctfer训练计划】——(六)

作者名&#xff1a;Demo不是emo 主页面链接&#xff1a;主页传送门 创作初心&#xff1a;舞台再大&#xff0c;你不上台&#xff0c;永远是观众&#xff0c;没人会关心你努不努力&#xff0c;摔的痛不痛&#xff0c;他们只会看你最后站在什么位置&#xff0c;然后羡慕或鄙夷座…