Yin的笔记本

vuePress-theme-reco Howard Yin    2021 - 2025
Yin的笔记本 Yin的笔记本

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Shell
  • Tex
  • Rust
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
author-avatar

Howard Yin

304

Article

153

Tag

Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Shell
  • Tex
  • Rust
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
  • pion/ion-sfuQ&A

    • pion/ion-sfu和ion中的SFU服务之间的区别和联系?
      • 可以控制pion/ion-sfu主动连接其他SFU吗
        • 可以用本地视频文件创建一个没有上行流的SFU服务吗?
          • pion/ion-sfu中是如何处理新增的Track的?
            • pion/ion-sfu中是如何处理关闭track的?
              • pion/ion-sfu中的JoinConfig是如何控制SFU的转发逻辑的?
                • Publisher和Subscriber两个PeerConnection怎么处理Offer和Answer的?
                  • 那两个方向的Offer都是从哪来的?
                • pion/ion-sfu里面是如何AddDownTrack的?
                  • NoAutoSubscribe=true时的AddDownTrack
                  • NoAutoSubscribe=false时的AddDownTrack

              `pion/ion-sfu`Q&A

              vuePress-theme-reco Howard Yin    2021 - 2025

              pion/ion-sfuQ&A


              Howard Yin 2022-05-10 13:48:01 WebRTC编程框架pion实操

              # pion/ion-sfu和ion中的SFU服务之间的区别和联系?

              • ion中的SFU服务是在pion/ion-sfu的基础上添加了GRPC信令传输功能得来的
              • ion中的SFU服务代码主要是传输信令和根据信令调用pion/ion-sfu中的函数

              # 可以控制pion/ion-sfu主动连接其他SFU吗

              • pion/ion-sfu主要为被动接收连接请求设计,所以不能CreateOffer,ion中的SFU服务只有一个信令服务器,想要发起连接只能用pion/ion-go-sdk将本地流推送到SFU服务,而不能控制SFU服务主动向其他SFU发起请求
              • 但pion/ion-sfu中有OnOffer,如果hack一下pion/ion-go-sdk
              • Session相关的代码都在pion/ion-sfu里面,ion中的SFU服务的代码中基本没有操作Session的逻辑

              # 可以用本地视频文件创建一个没有上行流的SFU服务吗?

              # pion/ion-sfu中是如何处理新增的Track的?

              首先,pion/ion-sfu中根据视频流的传输方向抽象出了几种传输控制类:

              • Publisher 和PublisherTrack :处理从外面“Publish”到本SFU的流,即上行流
              • Subscriber 和DownTrack :处理外面“Subscribe”本SFU的流,即下行流

              这两种传输控制类分别有各自的PeerConnection,所以pion/ion-sfu中是没有双向的PeerConnection的,收和发分别由两个PeerConnection控制。两个PeerConnection怎么处理Offer和Answer过程见后文。

              Publisher和Subscriber的初始化函数大体相同,都会创建PeerConnection。而在Publisher的初始化函数比Subscriber的初始化函数多了这么一段代码 :

              	pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
              		Logger.V(1).Info("Peer got remote track id",
              			"peer_id", p.id,
              			"track_id", track.ID(),
              			"mediaSSRC", track.SSRC(),
              			"rid", track.RID(),
              			"stream_id", track.StreamID(),
              		)
              
              		r, pub := p.router.AddReceiver(receiver, track, track.ID(), track.StreamID())
              		if pub {
              			p.session.Publish(p.router, r)
              			p.mu.Lock()
              			publisherTrack := PublisherTrack{track, r, true}
              			p.tracks = append(p.tracks, publisherTrack)
              			for _, rp := range p.relayPeers {
              				if err = p.createRelayTrack(track, r, rp.peer); err != nil {
              					Logger.V(1).Error(err, "Creating relay track.", "peer_id", p.id)
              				}
              			}
              			p.mu.Unlock()
              			if handler, ok := p.onPublisherTrack.Load().(func(PublisherTrack)); ok && handler != nil {
              				handler(publisherTrack)
              			}
              		} else {
              			p.mu.Lock()
              			p.tracks = append(p.tracks, PublisherTrack{track, r, false})
              			p.mu.Unlock()
              		}
              	})
              
              	pc.OnDataChannel(func(dc *webrtc.DataChannel) {
              		if dc.Label() == APIChannelLabel {
              			// terminate api data channel
              			return
              		}
              		p.session.AddDatachannel(id, dc)
              	})
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38

              可以看到,这段代码分别注册了OnTrack和OnDataChannel两个函数,在对面有新Track和新DataChannel加进来的时候执行操作,很明显最核心的就是这个p.session.Publish(p.router, r)和p.session.AddDatachannel(id, dc)。把这两个函数打开看看:

              首先是Publish :

              func (s *SessionLocal) Publish(router Router, r Receiver) {
              	for _, p := range s.Peers() {
              		// Don't sub to self
              		if router.ID() == p.ID() || p.Subscriber() == nil {
              			continue
              		}
              
              		Logger.V(0).Info("Publishing track to peer", "peer_id", p.ID())
              
              		if err := router.AddDownTracks(p.Subscriber(), r); err != nil {
              			Logger.Error(err, "Error subscribing transport to Router")
              			continue
              		}
              	}
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15

              太明显了,这就是一个循环把Track加进这个Session的所有Peer的Subscriber的DownTracks里面。

              然后是AddDatachannel :

              func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) {
              	label := dc.Label()
              
              	s.mu.Lock()
              	for _, lbl := range s.fanOutDCs {
              		if label == lbl {
              			s.mu.Unlock()
              			return
              		}
              	}
              	s.fanOutDCs = append(s.fanOutDCs, label)
              	peerOwner := s.peers[owner]
              	s.mu.Unlock()
              	peers := s.Peers()
              	peerOwner.Subscriber().RegisterDatachannel(label, dc)
              
              	dc.OnMessage(func(msg webrtc.DataChannelMessage) {
              		s.FanOutMessage(owner, label, msg)
              	})
              
              	for _, p := range peers {
              		peer := p
              		if peer.ID() == owner || peer.Subscriber() == nil {
              			continue
              		}
              		ndc, err := peer.Subscriber().AddDataChannel(label)
              
              		if err != nil {
              			Logger.Error(err, "error adding datachannel")
              			continue
              		}
              
              		if peer.Publisher() != nil && peer.Publisher().Relayed() {
              			peer.Publisher().AddRelayFanOutDataChannel(label)
              		}
              
              		pid := peer.ID()
              		ndc.OnMessage(func(msg webrtc.DataChannelMessage) {
              			s.FanOutMessage(pid, label, msg)
              
              			if peer.Publisher().Relayed() {
              				for _, rdc := range peer.Publisher().GetRelayedDataChannels(label) {
              					if msg.IsString {
              						if err = rdc.SendText(string(msg.Data)); err != nil {
              							Logger.Error(err, "Sending dc message err")
              						}
              					} else {
              						if err = rdc.Send(msg.Data); err != nil {
              							Logger.Error(err, "Sending dc message err")
              						}
              					}
              				}
              			}
              		})
              
              		peer.Subscriber().negotiate()
              	}
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52
              53
              54
              55
              56
              57
              58

              一看就是在搞消息转发,就不细看了

              # pion/ion-sfu中是如何处理关闭track的?

              相关操作在AddDownTrack的时候就已经通过OnCloseHandler定好了

              	downTrack.OnCloseHandler(func() {
              		if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
              			if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
              				if err == webrtc.ErrConnectionClosed {
              					return
              				}
              				Logger.Error(err, "Error closing down track")
              			} else {
              				sub.RemoveDownTrack(recv.StreamID(), downTrack)
              				sub.negotiate()
              			}
              		}
              	})
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13

              一个Publisher里过来的Track可能会通过AddDownTrack加到很多个Subscriber里,当Publish侧的SDK通过UnPublish函数关闭了一个流,Publisher里会有流关闭,进而触发所有这些Subscriber里的OnCloseHandler函数,从而达到删除流的目的。

              # pion/ion-sfu中的JoinConfig是如何控制SFU的转发逻辑的?

              JoinConfig 长这样:

              // JoinConfig allow adding more control to the peers joining a SessionLocal.
              type JoinConfig struct {
              	// If true the peer will not be allowed to publish tracks to SessionLocal.
              	NoPublish bool
              	// If true the peer will not be allowed to subscribe to other peers in SessionLocal.
              	NoSubscribe bool
              	// If true the peer will not automatically subscribe all tracks,
              	// and then the peer can use peer.Subscriber().AddDownTrack/RemoveDownTrack
              	// to customize the subscrbe stream combination as needed.
              	// this parameter depends on NoSubscribe=false.
              	NoAutoSubscribe bool
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12

              首先,在Peer的初始化过程 中有NoSubscribe和NoPublish发挥作用:

              	if !conf.NoSubscribe {
              		p.subscriber, err = NewSubscriber(uid, cfg)
              		if err != nil {
              			return fmt.Errorf("error creating transport: %v", err)
              		}
              
              		p.subscriber.noAutoSubscribe = conf.NoAutoSubscribe
              
              		p.subscriber.OnNegotiationNeeded(func() {
              			p.Lock()
              			defer p.Unlock()
              
              			if p.remoteAnswerPending {
              				p.negotiationPending = true
              				return
              			}
              
              			Logger.V(1).Info("Negotiation needed", "peer_id", p.id)
              			offer, err := p.subscriber.CreateOffer()
              			if err != nil {
              				Logger.Error(err, "CreateOffer error")
              				return
              			}
              
              			p.remoteAnswerPending = true
              			if p.OnOffer != nil && !p.closed.get() {
              				Logger.V(0).Info("Send offer", "peer_id", p.id)
              				p.OnOffer(&offer)
              			}
              		})
              
              		p.subscriber.OnICECandidate(func(c *webrtc.ICECandidate) {
              			Logger.V(1).Info("On subscriber ice candidate called for peer", "peer_id", p.id)
              			if c == nil {
              				return
              			}
              
              			if p.OnIceCandidate != nil && !p.closed.get() {
              				json := c.ToJSON()
              				p.OnIceCandidate(&json, subscriber)
              			}
              		})
              	}
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              	if !conf.NoSubscribe {
              		p.session.Subscribe(p)
              	}
              
              1
              2
              3

              显然,这NoSubscribe在生成PeerLocal时控制Subscriber的初始化,如果NoSubscribe=true就不会有Subscriber生成。从而也就没法AddDownTrack向外传出Track。

              	if !conf.NoPublish {
              		p.publisher, err = NewPublisher(uid, p.session, &cfg)
              		if err != nil {
              			return fmt.Errorf("error creating transport: %v", err)
              		}
              		if !conf.NoSubscribe {
              			for _, dc := range p.session.GetDCMiddlewares() {
              				if err := p.subscriber.AddDatachannel(p, dc); err != nil {
              					return fmt.Errorf("setting subscriber default dc datachannel: %w", err)
              				}
              			}
              		}
              
              		p.publisher.OnICECandidate(func(c *webrtc.ICECandidate) {
              			Logger.V(1).Info("on publisher ice candidate called for peer", "peer_id", p.id)
              			if c == nil {
              				return
              			}
              
              			if p.OnIceCandidate != nil && !p.closed.get() {
              				json := c.ToJSON()
              				p.OnIceCandidate(&json, publisher)
              			}
              		})
              
              		p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) {
              			if p.OnICEConnectionStateChange != nil && !p.closed.get() {
              				p.OnICEConnectionStateChange(s)
              			}
              		})
              	}
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31

              显然,这NoPublish在生成PeerLocal时控制Publisher的初始化,如果NoPublish=true就不会有Publisher生成。根据上一节的分析,PublisherTrack增减相关的操作主要就是在Publisher的初始化过程中执行的,没有了Publisher也就不会有对传入的PublisherTrack的那些操作了,从而也就不会接收传入的Track。

              此外,我们发现NoAutoSubscribe被赋值给了p.subscriber.noAutoSubscribe这个值主要在AddDownTracks 的里面发挥作用:

              func (r *router) AddDownTracks(s *Subscriber, recv Receiver) error {
              	r.Lock()
              	defer r.Unlock()
              
              	if s.noAutoSubscribe {
              		Logger.Info("peer turns off automatic subscription, skip tracks add")
              		return nil
              	}
              
              	if recv != nil {
              		if _, err := r.AddDownTrack(s, recv); err != nil {
              			return err
              		}
              		s.negotiate()
              		return nil
              	}
              
              	if len(r.receivers) > 0 {
              		for _, rcv := range r.receivers {
              			if _, err := r.AddDownTrack(s, rcv); err != nil {
              				return err
              			}
              		}
              		s.negotiate()
              	}
              	return nil
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27

              所以,当调用Publish 的时候,NoAutoSubscribe=true的router不会被调用AddDownTrack。根据前面对Publisher的初始化函数 的分析,Publisher有新Track到达的时候会对所有Session里的Peer调用Publish,所以NoAutoSubscribe=true不调用AddDownTrack就意味着新Track到达的时候这个Peer没法AddDownTrack,所以达到了“No Auto Subscribe”的目的。

              # Publisher和Subscriber两个PeerConnection怎么处理Offer和Answer的?

              从SDK的代码 上看,信令的传输也会被分类为两种。在SDK侧,接收到的所有Offer都交给negotiate 函数处理,接收到的所有Answer都交给setRemoteSDP 函数处理:

              			var sdpType webrtc.SDPType
              			if payload.Description.Type == "offer" {
              				sdpType = webrtc.SDPTypeOffer
              			} else {
              				sdpType = webrtc.SDPTypeAnswer
              			}
              			sdp := webrtc.SessionDescription{
              				SDP:  payload.Description.Sdp,
              				Type: sdpType,
              			}
              			if sdp.Type == webrtc.SDPTypeOffer {
              				log.Infof("[%v] [description] got offer call s.OnNegotiate sdp=%+v", r.uid, sdp)
              				err := r.negotiate(sdp)
              				if err != nil {
              					log.Errorf("error: %v", err)
              				}
              			} else if sdp.Type == webrtc.SDPTypeAnswer {
              				log.Infof("[%v] [description] got answer call sdp=%+v", r.uid, sdp)
              				err = r.setRemoteSDP(sdp)
              				if err != nil {
              					log.Errorf("[%v] [description] setRemoteSDP err=%s", r.uid, err)
              				}
              			}
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23

              并且可以看到negotiate 函数里基本上都是对Subscribe方向的操作、setRemoteSDP 函数里基本上都是对Publish方向的操作。

              所以,所有从SFU到SDK的流(即“Subscribe”)都是SFU向SDK发Offer、SDK向SFU回Answer;所有从SDK到SFU的流(即“Publish”)都是SDK向SFU发Offer、SFU向SDK回Answer。

              所以:

              • 如果在SFU那边收到了Offer,那必然是“Publish”流里的,应该给Publisher里的PeerConnection用,并且让Publisher里的PeerConnection回复一个Answer。代码位于这里 。
              • 如果在SFU那边收到了Answer,那必然是“Subscribe”流里的,应该给Subscriber里的PeerConnection用。代码位于这里 。
              • 如果在SDK这边收到了Offer,那必然是“Subscribe”流里的,应该给Subscribe方向的PeerConnection用,并且让Subscribe方向的PeerConnection回复一个Answer。代码就是上面介绍的negotiate 。
              • 如果在SDK这边收到了Answer,那必然是“Publish”流里的,应该给Publish方向的PeerConnection用。代码就是上面介绍的setRemoteSDP 。

              # 那两个方向的Offer都是从哪来的?

              “Publish”流的Offer是SDK在Join函数 里发出的:

              	offer, err := r.pub.pc.CreateOffer(nil)
              	if err != nil {
              		return err
              	}
              
              	err = r.pub.pc.SetLocalDescription(offer)
              	if err != nil {
              		return err
              	}
              
              	if len(config) > 0 {
              		err = r.SendJoin(sid, r.uid, offer, *config[0])
              	} else {
              		err = r.SendJoin(sid, r.uid, offer, nil)
              	}
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15

              这里的SendJoin就是将SDP打包在rtc.Request_Join里发出。

              “Subscribe”流的Offer是在SFU处理上面这SDK发的rtc.Request_Join请求时通过设置OnOffer 发出的:

              			// Notify user of new offer
              			peer.OnOffer = func(o *webrtc.SessionDescription) {
              				log.Debugf("[S=>C] peer.OnOffer: %v", o.SDP)
              				err = sig.Send(&rtc.Reply{
              					Payload: &rtc.Reply_Description{
              						Description: &rtc.SessionDescription{
              							Target: rtc.Target(rtc.Target_SUBSCRIBER),
              							Sdp:    o.SDP,
              							Type:   o.Type.String(),
              						},
              					},
              				})
              				if err != nil {
              					log.Errorf("negotiation error: %v", err)
              				}
              			}
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16

              很明显,不用多讲。

              进一步,SDK接收“Subscribe”流和SFU接收“Publish”流用的都是OnTrack,SFU里的操作前面已经介绍了,SDK里的OnTrack在这:

              # pion/ion-sfu里面是如何AddDownTrack的?

              # NoAutoSubscribe=true时的AddDownTrack

              从SFU里的Request_Subscription处理函数 这里开始解析:

              					for _, p := range peer.Session().Peers() {
              						if p.ID() != peer.ID() {
              							for _, track := range p.Publisher().PublisherTracks() {
              								if track.Receiver.TrackID() == trackInfo.TrackId && track.Track.RID() == trackInfo.Layer {
              									log.Infof("Add RemoteTrack: %v to peer %v %v %v", trackInfo.TrackId, peer.ID(), track.Track.Kind(), track.Track.RID())
              									dt, err := peer.Publisher().GetRouter().AddDownTrack(peer.Subscriber(), track.Receiver)
              									if err != nil {
              										log.Errorf("AddDownTrack error: %v", err)
              									}
              
              1
              2
              3
              4
              5
              6
              7
              8
              9

              首先很显然这里的peer.Publisher().GetRouter().AddDownTrack(peer.Subscriber(), track.Receiver)就是把别人的接收到Track的track.Receiver加进自己的发送器peer.Subscriber()里。

              这个函数长这样 :

              func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error) {
              	for _, dt := range sub.GetDownTracks(recv.StreamID()) {
              		if dt.ID() == recv.TrackID() {
              			return dt, nil
              		}
              	}
              
              	codec := recv.Codec()
              	if err := sub.me.RegisterCodec(codec, recv.Kind()); err != nil {
              		return nil, err
              	}
              
              	downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
              		MimeType:     codec.MimeType,
              		ClockRate:    codec.ClockRate,
              		Channels:     codec.Channels,
              		SDPFmtpLine:  codec.SDPFmtpLine,
              		RTCPFeedback: []webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},
              	}, recv, r.bufferFactory, sub.id, r.config.MaxPacketTrack)
              	if err != nil {
              		return nil, err
              	}
              	// Create webrtc sender for the peer we are sending track to
              	if downTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{
              		Direction: webrtc.RTPTransceiverDirectionSendonly,
              	}); err != nil {
              		return nil, err
              	}
              
              	// nolint:scopelint
              	downTrack.OnCloseHandler(func() {
              		if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
              			if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
              				if err == webrtc.ErrConnectionClosed {
              					return
              				}
              				Logger.Error(err, "Error closing down track")
              			} else {
              				sub.RemoveDownTrack(recv.StreamID(), downTrack)
              				sub.negotiate()
              			}
              		}
              	})
              
              	downTrack.OnBind(func() {
              		go sub.sendStreamDownTracksReports(recv.StreamID())
              	})
              
              	sub.AddDownTrack(recv.StreamID(), downTrack)
              	recv.AddDownTrack(downTrack, r.config.Simulcast.BestQualityFirst)
              	return downTrack, nil
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52

              这里的NewDownTrack生成的downTrack是一个继承了TrackLocal的类,可以看到被AddTransceiverFromTrack加进PeerConnection里了,并且在最后用sub.AddDownTrack和recv.AddDownTrack加进Subscriber和Receiver里了。

              这两个AddDownTrack都是简单的用变量记录DownTrack:

              func (s *Subscriber) AddDownTrack(streamID string, downTrack *DownTrack) {
              	s.Lock()
              	defer s.Unlock()
              	if dt, ok := s.tracks[streamID]; ok {
              		dt = append(dt, downTrack)
              		s.tracks[streamID] = dt
              	} else {
              		s.tracks[streamID] = []*DownTrack{downTrack}
              	}
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) {
              
              	...
              
              	w.Lock()
              	w.storeDownTrack(layer, track)
              	w.Unlock()
              }
              
              ...
              
              
              func (w *WebRTCReceiver) storeDownTrack(layer int, dt *DownTrack) {
              	dts := w.downTracks[layer].Load().([]*DownTrack)
              	ndts := make([]*DownTrack, len(dts)+1)
              	copy(ndts, dts)
              	ndts[len(ndts)-1] = dt
              	w.downTracks[layer].Store(ndts)
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19

              那Receiver里收到的东西到底是怎么通过这个DownTrack进的Subscriber? 在Receiver的AddUpTrack 里可以看见:

              func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool) {
              	if w.closed.get() {
              		return
              	}
              
              	var layer int
              	switch track.RID() {
              	case fullResolution:
              		layer = 2
              	case halfResolution:
              		layer = 1
              	default:
              		layer = 0
              	}
              
              	w.Lock()
              	w.upTracks[layer] = track
              	w.buffers[layer] = buff
              	w.available[layer].set(true)
              	w.downTracks[layer].Store(make([]*DownTrack, 0, 10))
              	w.pendingTracks[layer] = make([]*DownTrack, 0, 10)
              	w.Unlock()
              
              	subBestQuality := func(targetLayer int) {
              		for l := 0; l < targetLayer; l++ {
              			dts := w.downTracks[l].Load()
              			if dts == nil {
              				continue
              			}
              			for _, dt := range dts.([]*DownTrack) {
              				_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
              			}
              		}
              	}
              
              	subLowestQuality := func(targetLayer int) {
              		for l := 2; l != targetLayer; l-- {
              			dts := w.downTracks[l].Load()
              			if dts == nil {
              				continue
              			}
              			for _, dt := range dts.([]*DownTrack) {
              				_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
              			}
              		}
              	}
              
              	if w.isSimulcast {
              		if bestQualityFirst && (!w.available[2].get() || layer == 2) {
              			subBestQuality(layer)
              		} else if !bestQualityFirst && (!w.available[0].get() || layer == 0) {
              			subLowestQuality(layer)
              		}
              	}
              	go w.writeRTP(layer)
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52
              53
              54
              55
              56

              这个函数看样子只一个只能调用一次的函数,它最后创建了一个go程w.writeRTP,在这个函数 里面,我们终于看到了包转发WriteRTP的过程:

              func (w *WebRTCReceiver) writeRTP(layer int) {
              	defer func() {
              		w.closeOnce.Do(func() {
              			w.closed.set(true)
              			w.closeTracks()
              		})
              	}()
              
              	pli := []rtcp.Packet{
              		&rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(layer)},
              	}
              
              	for {
              		pkt, err := w.buffers[layer].ReadExtended()
              		if err == io.EOF {
              			return
              		}
              
              		if w.isSimulcast {
              			if w.pending[layer].get() {
              				if pkt.KeyFrame {
              					w.Lock()
              					for idx, dt := range w.pendingTracks[layer] {
              						w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.id)
              						w.storeDownTrack(layer, dt)
              						dt.SwitchSpatialLayerDone(int32(layer))
              						w.pendingTracks[layer][idx] = nil
              					}
              					w.pendingTracks[layer] = w.pendingTracks[layer][:0]
              					w.pending[layer].set(false)
              					w.Unlock()
              				} else {
              					w.SendRTCP(pli)
              				}
              			}
              		}
              
              		for _, dt := range w.downTracks[layer].Load().([]*DownTrack) {
              			if err = dt.WriteRTP(pkt, layer); err != nil {
              				if err == io.EOF && err == io.ErrClosedPipe {
              					w.Lock()
              					w.deleteDownTrack(layer, dt.id)
              					w.Unlock()
              				}
              				Logger.Error(err, "Error writing to down track", "id", dt.id)
              			}
              		}
              	}
              
              }
              
              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50

              所以就是把包给所有DownTrack都写一份。

              所以这Receiver的结构也很清楚了,就是一个UpTrack和多个DownTrack,设置好UpTrack后开go程不断把包从UpTrack复制几份写进所有的DownTrack里面。

              所以综上所述,可以看出真正的包转发操作全是在Publisher相关代码里完成的,Subscriber实际上只起一个记录的作用。

              # NoAutoSubscribe=false时的AddDownTrack

              前面已经介绍过在Publisher的初始化函数比Subscriber的初始化函数多的这么一段代码 ,可以看到最后也是回到Router中的AddDownTrack函数 里。

              帮助我们改善此页面!
              创建于: 2022-04-27 10:52:21

              更新于: 2022-05-10 13:48:13