subscribe流程
1. AmqpClient - New message received
2023-04-26T21:54:18.415 - DEBUG: AmqpClient - RpcServer New message received {
method: 'subscribe',
args: [
'b149e44bb10d4e91bd162a8c6806ae7b',
'webrtc',
{
transportId: 'b149e44bb10d4e91bd162a8c6806ae7b',
tracks: [Array],
controller: 'conference-aed26ef945c09ddf89b3@192.168.221.62_0',
owner: 'xG6DXLHdXwky_E8eAAAD'
}
],
corrID: 28,
replyTo: 'amq.gen-WtoELIbC4gJ1GfdYgkvSFA'
}
2. WebrtcNode - subscribe
webrtc_agent/webrtc/index.js
2023-04-26T21:54:18.416 - DEBUG: WebrtcNode - subscribe, operationId: b149e44bb10d4e91bd162a8c6806ae7b connectionType: webrtc options: {
transportId: 'b149e44bb10d4e91bd162a8c6806ae7b',
tracks: [
{
from: 'cb8fcfb93f174c24862feaa38915111a',
mid: '0',
type: 'audio',
formatPreference: [Object]
},
{
from: 'cb8fcfb93f174c24862feaa38915111a',
mid: '1',
parameters: {},
type: 'video',
formatPreference: [Object]
}
],
controller: 'conference-aed26ef945c09ddf89b3@192.168.221.62_0',
owner: 'xG6DXLHdXwky_E8eAAAD'
}
from----参数的意义???,来自哪个transportId
/*
* For operations on type webrtc, publicTrackId is connectionId.
* For operations on type internal, operationId is connectionId.
*/
// functions: publish, unpublish, subscribe, unsubscribe, linkup, cutoff
// options = {
// transportId,
// tracks = [{mid, type, formatPreference, scalabilityMode}],
// controller, owner, enableBWE
// }
// formatPreference = {preferred: MediaFormat, optional: [MediaFormat]}
that.subscribe = function (operationId, connectionType, options, callback) {
log.debug('subscribe, operationId:', operationId, 'connectionType:', connectionType, 'options:', options);
if (mappingTransports.has(operationId)) {
return callback('callback', {type: 'failed', reason: 'Connection already exists:'+operationId});
}
var conn = null;
if (connectionType === 'webrtc') {
if (!options.transportId) {
// Generate a transportId
}
// 1. 创建 WebRTCConnection
// options.transportId, connecid 类似
// options.controller, 消息来自哪个confrence-agent,回传消息的时候用
// options.owner, 用户id
conn = createWebRTCConnection(options.transportId, options.controller, options.owner);
// 2. addTrackOperation
options.tracks.forEach(function trackOp(t) {
conn.addTrackOperation(operationId, 'recvonly', t);
});
mappingTransports.set(operationId, options.transportId);
if (options.enableBWE) {
conn.enableBWE = true;
}
callback('callback', 'ok');
} else {
log.error('Connection type invalid:' + connectionType);
}
if (!conn) {
log.error('Create connection failed', operationId, connectionType);
callback('callback', {type: 'failed', reason: 'Create Connection failed'});
}
};
???callback 是哪里来的
2.1 createWebRTCConnection
创建WebRtcConnection
小节 3
2.2 addTrackOperation
小节 4
3.WebrtcNode - createWebRTCConnection——返回WrtcConnection
webrtc_agent/webrtc/index.js
var createWebRTCConnection = function (transportId, controller, owner) {
if (peerConnections.has(transportId)) {
log.debug('PeerConnection already created:', transportId);
return peerConnections.get(transportId);
}
var connection = new WrtcConnection({
connectionId: transportId,
threadPool: threadPool,
ioThreadPool: ioThreadPool,
network_interfaces: global.config.webrtc.network_interfaces,
owner,
}, function onTransportStatus(status) {
notifyTransportStatus(controller, transportId, status);
}, function onTrack(trackInfo) {
// track的相关信息
handleTrackInfo(transportId, trackInfo, controller);
});
// map 存放WebRtcconneciton
peerConnections.set(transportId, connection);
mappingPublicId.set(transportId, new Map());
connection.controller = controller;
return connection;
};
3.1 peerConnections 成员
存放 WrtcConnection,transportId与connection一一对应
// Map { transportId => WrtcConnection }
var peerConnections = new Map();
----------------------------------
peerConnections.set(transportId, connection);
3.2 mappingPublicId成员
// Map { transportId => Map { trackId => publicTrackId } }
var mappingPublicId = new Map();
--------------------------------------------
mappingPublicId.set(transportId, new Map());
在 createWebRTCConnection
的时候,只是创建了空的Map,而Map中存放的{ trackId => publicTrackId } }
,是在 handleTrackInfo
的 trackInfo.type === 'track-added'
中存入的。
dist-debug/webrtc_agent/webrtc/index.js
var handleTrackInfo = function (transportId, trackInfo, controller) {
var publicTrackId;
var updateInfo;
if (trackInfo.type === 'track-added') {
// Generate public track ID
const track = trackInfo.track;
publicTrackId = transportId + '-' + track.id;
if (mediaTracks.has(publicTrackId)) {
log.error('Conflict public track id:', publicTrackId, transportId, track.id);
return;
}
...
mappingPublicId.get(transportId).set(track.id, publicTrackId);
...
}
3.3 new WrtcConnection——创建rtc连接,并初始化
详细见小节4
3.4 handleTrackInfo
4. new WrtcConnection——创建rtc连接,并初始化
webrtc_agent/webrtc/wrtcConnection.js
module.exports = function (spec, on_status, on_track) {
...
wrtc = new Connection(wrtcId, threadPool, ioThreadPool, { ipAddresses });
wrtc.callBase = new CallBase();
// wrtc.addMediaStream(wrtcId, {label: ''}, direction === 'in');
initWebRtcConnection(wrtc);
return that;
};
4.1 new Connection——创建c++的WebrtcConnection
webrtc_agent/webrtc/connection.js
2023-04-26T21:54:18.416 - INFO: Connection - message: Connection, id: b149e44bb10d4e91bd162a8c6806ae7bd
class Connection extends EventEmitter {
constructor (id, threadPool, ioThreadPool, options = {}) {
super();
log.info(`message: Connection, id: ${id}`);
this.id = id;
this.threadPool = threadPool;
this.ioThreadPool = ioThreadPool;
this.mediaConfiguration = 'default';
this.mediaStreams = new Map();
this.initialized = false;
this.options = options;
this.ipAddresses = options.ipAddresses || '';
this.trickleIce = options.trickleIce || false;
this.metadata = this.options.metadata || {};
this.isProcessingRemoteSdp = false;
this.ready = false;
// native 的WebRtcConnection
this.wrtc = this._createWrtc();
}
...
}
4.1.1 ---------Connection._createWrtc——创建webrtc connection
_createWrtc() {
var wrtc = new addon.WebRtcConnection(
this.threadPool, this.ioThreadPool, this.id,
global.config.webrtc.stunserver,
global.config.webrtc.stunport,
global.config.webrtc.minport,
global.config.webrtc.maxport,
false, //this.trickleIce,
this._getMediaConfiguration(this.mediaConfiguration),
false,
'', // turnserver,
'', // turnport,
'', //turnusername,
'', //turnpass,
'', //networkinterface
this.ipAddresses
);
return wrtc;
}
4.1.2 NAN_METHOD(WebRtcConnection::New)
source/agent/webrtc/rtcConn/WebRtcConnection.cc
NAN_METHOD(WebRtcConnection::New) {
...
WebRtcConnection* obj = new WebRtcConnection();
obj->me = std::make_shared<erizo::WebRtcConnection>(worker, io_worker, wrtcId, iceConfig,
rtp_mappings, ext_mappings, obj);
uv_async_init(uv_default_loop(), &obj->async_, &WebRtcConnection::eventsCallback);
obj->Wrap(info.This());
info.GetReturnValue().Set(info.This());
obj->asyncResource_ = new Nan::AsyncResource("WebRtcConnectionCallback");
...
}
4.1.3 erizo::WebRtcConnection::WebRtcConnection
source/agent/webrtc/rtcConn/erizo/src/erizo/WebRtcConnection.cpp
2023-04-26 21:54:18,417 - INFO: WebRtcConnection -
id: b149e44bb10d4e91bd162a8c6806ae7b,
message: constructor,
stunserver: ,
stunPort: 0,
minPort: 0,
maxPort: 0
WebRtcConnection::WebRtcConnection(std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker,
const std::string& connection_id, const IceConfig& ice_config, const std::vector<RtpMap> rtp_mappings,
const std::vector<erizo::ExtMap> ext_mappings, WebRtcConnectionEventListener* listener) :
connection_id_{connection_id},
audio_enabled_{false}, video_enabled_{false}, bundle_{false}, conn_event_listener_{listener},
ice_config_{ice_config}, rtp_mappings_{rtp_mappings}, extension_processor_{ext_mappings},
worker_{worker}, io_worker_{io_worker},
remote_sdp_{std::make_shared<SdpInfo>(rtp_mappings)}, local_sdp_{std::make_shared<SdpInfo>(rtp_mappings)},
audio_muted_{false}, video_muted_{false}, first_remote_sdp_processed_{false}
{
ELOG_INFO("%s message: constructor, stunserver: %s, stunPort: %d, minPort: %d, maxPort: %d",
toLog(), ice_config.stun_server.c_str(), ice_config.stun_port, ice_config.min_port, ice_config.max_port);
stats_ = std::make_shared<Stats>();
// distributor_ = std::unique_ptr<BandwidthDistributionAlgorithm>(new TargetVideoBWDistributor());
global_state_ = CONN_INITIAL;
trickle_enabled_ = ice_config_.should_trickle;
slide_show_mode_ = false;
sending_ = true;
}
4.2 WrtcConnection - initWebRtcConnection
webrtc_agent/webrtc/wrtcConnection.js
/*
* Given a WebRtcConnection waits for the state CANDIDATES_GATHERED for set remote SDP.
*/
// wrtc 是Connection,Connection 继承于EventEmitter
var initWebRtcConnection = function (wrtc) {
// EventEmitter.on()用于监听事件
// 在c++从回调到js中,就是在Connection.init中触发
// Connection wrtc
wrtc.on('status_event', (evt, status) => {
if (evt.type === 'answer') {
processAnswer(evt.sdp);
const message = localSdp.toString();
log.debug('Answer SDP', message);
on_status({type: 'answer', sdp: message});
} else if (evt.type === 'candidate') {
let message = evt.candidate;
networkInterfaces.forEach((i) => {
if (i.ip_address && i.replaced_ip_address) {
message = message.replace(new RegExp(i.ip_address, 'g'), i.replaced_ip_address);
}
});
on_status({type: 'candidate', candidate: message});
} else if (evt.type === 'failed') {
log.warn('ICE failed, ', status, wrtc.id);
on_status({type: 'failed', reason: 'Ice procedure failed.'});
} else if (evt.type === 'ready') {
log.debug('Connection ready, ', wrtc.wrtcId);
on_status({
type: 'ready'
});
}
});
// Connection wrtc
wrtc.init(wrtcId);
};
4.2.1 Connection/EventEmitter.on——监听事件
4.2.2 Connection.init——初始化c++的WebRtcConnection
webrtc_agent/webrtc/connection.js
2023-04-26T21:54:18.417 - DEBUG: Connection - message:
Init Connection,
connectionId: b149e44bb10d4e91bd162a8c6806ae7b
{"ipAddresses":[]}
init(streamId) {
if (this.initialized) {
return false;
}
const firstStreamId = streamId;
this.initialized = true;
log.debug(`message: Init Connection, connectionId: ${this.id} `+
`${logger.objectToLog(this.options)}`);
this.sessionVersion = 0;
// WebRtcConnection c++ wrapper, 调用c++
this.wrtc.init((newStatus, mess, streamId) => {
log.debug('message: WebRtcConnection status update, ' +
'id: ' + this.id + ', status: ' + newStatus +
', ' + logger.objectToLog(this.metadata) + mess);
switch(newStatus) {
case CONN_INITIAL:
// 触发4.2.1
this.emit('status_event', {type: 'started'}, newStatus);
break;
case CONN_SDP_PROCESSED:
this.isProcessingRemoteSdp = false;
// this.latestSdp = mess;
// this._maybeSendAnswer(newStatus, streamId);
break;
case CONN_SDP:
this.latestSdp = mess;
this._maybeSendAnswer(newStatus, streamId);
break;
case CONN_GATHERED:
this.alreadyGathered = true;
this.latestSdp = mess;
this._maybeSendAnswer(newStatus, firstStreamId);
break;
case CONN_CANDIDATE:
mess = mess.replace(this.options.privateRegexp, this.options.publicIP);
this.emit('status_event', {type: 'candidate', candidate: mess}, newStatus);
break;
case CONN_FAILED:
log.warn('message: failed the ICE process, ' + 'code: ' + WARN_BAD_CONNECTION +
', id: ' + this.id);
this.emit('status_event', {type: 'failed', sdp: mess}, newStatus);
break;
case CONN_READY:
log.debug('message: connection ready, ' + 'id: ' + this.id +
', ' + 'status: ' + newStatus + ' ' + mess + ',' + streamId);
if (!this.ready) {
this.ready = true;
this.emit('status_event', {type: 'ready'}, newStatus);
}
break;
}
});
if (this.options.createOffer) {
log.debug('message: create offer requested, id:', this.id);
const audioEnabled = this.options.createOffer.audio;
const videoEnabled = this.options.createOffer.video;
const bundle = this.options.createOffer.bundle;
// WebRtcConnection c++ wrapper, 调用c++
this.wrtc.createOffer(videoEnabled, audioEnabled, bundle);
}
// 触发4.2.1
this.emit('status_event', {type: 'initializing'});
return true;
}
4.2.3 NAN_METHOD(WebRtcConnection::init)
source/agent/webrtc/rtcConn/WebRtcConnection.cc
NAN_METHOD(WebRtcConnection::init) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;
obj->eventCallback_ = new Nan::Callback(info[0].As<Function>());
bool r = me->init();
info.GetReturnValue().Set(Nan::New(r));
}
4.2.4 erizo::WebRtcConnection::init()
source/agent/webrtc/rtcConn/erizo/src/erizo/WebRtcConnection.cpp
bool WebRtcConnection::init() {
maybeNotifyWebRtcConnectionEvent(global_state_, "");
return true;
}
状态通知给js层,
4.2.5 erizo::WebRtcConnection::maybeNotifyWebRtcConnectionEvent
void WebRtcConnection::maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message,
const std::string& stream_id) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
if (!conn_event_listener_) {
return;
}
conn_event_listener_->notifyEvent(event, message, stream_id);
}
—> 4.2.2
this.wrtc.init((newStatus, mess, streamId) => {
log.debug('message: WebRtcConnection status update, ' +
'id: ' + this.id + ', status: ' + newStatus +
', ' + logger.objectToLog(this.metadata) + mess);
switch(newStatus) {
case CONN_INITIAL:
this.emit('status_event', {type: 'started'}, newStatus);
break;
....
});
—> 4.2.1
wrtc.on('status_event', (evt, status) => {
...
});
4.2.6 log
2023-04-26T21:54:18.419 - DEBUG: Connection - message:
WebRtcConnection status update,
id: b149e44bb10d4e91bd162a8c6806ae7b,
status: 101,
{}
CONN_INITIAL = 101
4.2.7 status
const CONN_INITIAL = 101;
const CONN_STARTED = 102;
const CONN_GATHERED = 103;
const CONN_READY = 104;
const CONN_FINISHED = 105;
const CONN_CANDIDATE = 201;
const CONN_SDP = 202;
const CONN_SDP_PROCESSED = 203;
const CONN_FAILED = 500;
const WARN_BAD_CONNECTION = 502;
5. WrtcConnection.addTrackOperation
// option = {mid, type, formatPreference, scalabilityMode}
that.addTrackOperation = function (operationId, sdpDirection, option) {
var ret = false;
var {mid, type, formatPreference, scalabilityMode} = option;
if (!operationMap.has(mid)) {
log.debug(`MID ${mid} for operation ${operationId} add`);
const enabled = true;
// map
operationMap.set(mid, {operationId, type, sdpDirection, formatPreference, enabled});
if (scalabilityMode) {
operationMap.get(mid).scalabilityMode = scalabilityMode;
}
ret = true;
} else {
log.warn(`MID ${mid} has mapped operation ${operationMap.get(mid).operationId}`);
}
return ret;
};
5.1 WrtcConnection.operationMap
// mid => { operationId, sdpDirection, type, formatPreference, rids, enabled, finalFormat }
var operationMap = new Map();