Yin的笔记本

vuePress-theme-reco Howard Yin    2021 - 2025
Yin的笔记本 Yin的笔记本

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
author-avatar

Howard Yin

303

Article

153

Tag

Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
  • 用实例学习pion - rtp-forwarder

    • 配置MediaEngine
      • 配置interceptor.Registry
        • 构造WebRTC标准API
          • 构造PeerConnection
            • 配置并启动UDP连接
              • 输入流处理函数
                • 连接状态变化时的处理函数
                  • 启动PeerConnection

                  用实例学习pion - [`rtp-forwarder`](https://github.com/pion/webrtc/blob/master/examples/rtp-forwarder/main.go#L24)

                  vuePress-theme-reco Howard Yin    2021 - 2025

                  用实例学习pion - rtp-forwarder


                  Howard Yin 2021-09-02 13:33:03 WebRTC编程框架pion实操

                  # 用实例学习pion - rtp-forwarder

                  # 配置MediaEngine

                  // Everything below is the Pion WebRTC API! Thanks for using it ❤️.
                  
                  // Create a MediaEngine object to configure the supported codec
                  m := &webrtc.MediaEngine{}
                  
                  1
                  2
                  3
                  4

                  A MediaEngine defines the codecs supported by a PeerConnection, and the configuration of those codecs.

                  A MediaEngine must not be shared between PeerConnections.

                  这里的MediaEngine是存放编码器和编码器设置的类。用于定义WebRTC可以接收什么编码的流。

                  // Setup the codecs you want to use.
                  // We'll use a VP8 and Opus but you can also define your own
                  if err := m.RegisterCodec(webrtc.RTPCodecParameters{
                      RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
                  }, webrtc.RTPCodecTypeVideo); err != nil {
                      panic(err)
                  }
                  if err := m.RegisterCodec(webrtc.RTPCodecParameters{
                      RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
                  }, webrtc.RTPCodecTypeAudio); err != nil {
                      panic(err)
                  }
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9
                  10
                  11
                  12

                  RegisterCodec是在MediaEngine里添加编码器和编码器设置。 webrtc.RTPCodecParameters就是这样的编码器设置类。 这里指定了一个MimeType为VP8的视频解码器和MimeType为Opus的音频解码器,还有时钟频率之类的相关设置。 webrtc.RTPCodecTypeVideo和webrtc.RTPCodecTypeAudio就是用来指明所添加的编码器设置是针对视频还是音频。

                  # 配置interceptor.Registry

                  // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
                  // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
                  // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
                  // for each PeerConnection.
                  i := &interceptor.Registry{}
                  
                  1
                  2
                  3
                  4
                  5

                  interceptor 是pion的RTP引擎。

                  interceptor.Registry里面就是一个Interceptor类的列表,在运行之前它会将Interceptor一个个地串起来。 Interceptor类的逻辑类似于处理流数据的中间件。在运行时,rtp/rtcp数据包按顺序经过interceptor.Registry串起来的每个Interceptor,接受其处理;Interceptor可以修改这些数据包,也可以生成新的数据包。比如下面会看到的默认设置里的处理NACK和发送方/接收方报告的Interceptor就是统计每个数据包的信息,然后生成NACK和发送方/接收方报告数据包的Interceptor。

                  // Use the default set of Interceptors
                  if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
                      panic(err)
                  }
                  
                  1
                  2
                  3
                  4

                  webrtc.RegisterDefaultInterceptors会注册一些默认的Interceptor。

                  从webrtc.RegisterDefaultInterceptors里面的代码上看,它帮你注册了处理NACK和发送方/接收方报告的Interceptor。

                  # 构造WebRTC标准API

                  // Create the API object with the MediaEngine
                  api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
                  
                  1
                  2

                  webrtc.NewAPI用于创建完整的WebRTC设置,包括Interceptor、MediaEngine和SettingEngine。其中Interceptor0和MediaEngine已经在上面讲过了;SettingEngine用于设置那些不在WebRTC标准里的设置项。

                  从代码上看 webrtc.NewAPI的输入参数全是修改webrtc.API类的函数,webrtc.WithMediaEngine和webrtc.WithInterceptorRegistry是就是将我们的设置项转化为这种函数的函数,当然,还有webrtc.WithSettingEngine

                  // Prepare the configuration
                  config := webrtc.Configuration{
                      ICEServers: []webrtc.ICEServer{
                          {
                              URLs: []string{"stun:stun.l.google.com:19302"},
                          },
                      },
                  }
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8

                  # 构造PeerConnection

                  webrtc.Configuration用于配置标准WebRTC API里的PeerConnection。

                  // Create a new RTCPeerConnection
                  peerConnection, err := api.NewPeerConnection(config)
                  if err != nil {
                      panic(err)
                  }
                  defer func() {
                      if cErr := peerConnection.Close(); cErr != nil {
                          fmt.Printf("cannot close peerConnection: %v\n", cErr)
                      }
                  }()
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9
                  10

                  注意这里的api.NewPeerConnection和webrtc.NewPeerConnection的区别:api.NewPeerConnection是按照我们定义的webrtc.API类生成PeerConnection;webrtc.NewPeerConnection相当于是默认PeerConnection配置,是用webrtc.RegisterDefaultCodecs和webrtc.RegisterDefaultInterceptors生成默认webrtc.API类再调用api.NewPeerConnection生成PeerConnection。

                  可以看出,本实例的PeerConnection与webrtc.NewPeerConnection生成的默认PeerConnection区别只在MediaEngine上,本例使用的MediaEngine比默认配置少,默认配置里添加了所有已实现的编码器。

                  // Allow us to receive 1 audio track, and 1 video track
                  if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
                      panic(err)
                  } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
                      panic(err)
                  }
                  
                  1
                  2
                  3
                  4
                  5
                  6

                  配置PeerConnection接受一路视频和一路音频。

                  # 配置并启动UDP连接

                  // Create a local addr
                  var laddr *net.UDPAddr
                  if laddr, err = net.ResolveUDPAddr("udp", "127.0.0.1:"); err != nil {
                      panic(err)
                  }
                  
                  // Prepare udp conns
                  // Also update incoming packets with expected PayloadType, the browser may use
                  // a different value. We have to modify so our stream matches what rtp-forwarder.sdp expects
                  udpConns := map[string]*udpConn{
                      "audio": {port: 4000, payloadType: 111},
                      "video": {port: 4002, payloadType: 96},
                  }
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9
                  10
                  11
                  12
                  13

                  创建两个UDP地址,RTP流就是从这个地址里面来的,这里一看就明白,127.0.0.1:4000是音频地址,127.0.0.1:4002是视频地址。具体是从里面收数据还是往里发数据还不知道。

                  这个udpConn是在本示例的开头定义:

                  type udpConn struct {
                  	conn        *net.UDPConn
                  	port        int
                  	payloadType uint8
                  }
                  
                  1
                  2
                  3
                  4
                  5

                  它包含port和payloadType两个数字和一个net.UDPConn指针。下面会看到它的用处。

                  for _, c := range udpConns {
                      // Create remote addr
                      var raddr *net.UDPAddr
                      if raddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("127.0.0.1:%d", c.port)); err != nil {
                          panic(err)
                      }
                  
                      // Dial udp
                      if c.conn, err = net.DialUDP("udp", laddr, raddr); err != nil {
                          panic(err)
                      }
                      defer func(conn net.PacketConn) {
                          if closeErr := conn.Close(); closeErr != nil {
                              panic(closeErr)
                          }
                      }(c.conn)
                  }
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9
                  10
                  11
                  12
                  13
                  14
                  15
                  16
                  17

                  重点在c.conn, err = net.DialUDP("udp", laddr, raddr),这里用net.DialUDP向udpConn指定的两个端口创建了两个UDP连接,并且把它们放进对应的udpConn.conn里;后面的defer指定了退出时关闭连接操作。

                  # 输入流处理函数

                  // Set a handler for when a new remote track starts, this handler will forward data to
                  // our UDP listeners.
                  // In your application this is where you would handle/process audio/video
                  peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
                  
                  1
                  2
                  3
                  4

                  OnTrack用于指定被呼叫时的处理函数,处理函数包含两个参数(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver):

                  • track *webrtc.TrackRemote表示来自对方的track信息
                  • receiver *webrtc.RTPReceiver表示本地的接收器的信息

                  接下来是输入流处理函数的正文

                      // Retrieve udp connection
                      c, ok := udpConns[track.Kind().String()]
                      if !ok {
                          return
                      }
                  
                  1
                  2
                  3
                  4
                  5

                  这里按照对方的track类型名称取出先前生成的UDP连接。

                      // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
                      go func() {
                          ticker := time.NewTicker(time.Second * 2)
                          for range ticker.C {
                              if rtcpErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
                                  fmt.Println(rtcpErr)
                              }
                          }
                      }()
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9

                  这里用了一个协程每两秒向PeerConnection发一次PLI请求rtcp.PictureLossIndication,从而让发送方知道这边的接收情况。 PLI是一种“关键帧请求”,类似的还有SLI/PLI/FIR,作用是在关键帧丢失无法解码时,请求发送方重新生成并发送一个关键帧。相关知识见《WebRTC 视频通信中的错误恢复机制》。

                      b := make([]byte, 1500)
                  
                  1

                  创建一个缓冲区

                      rtpPacket := &rtp.Packet{}
                      for {
                  
                  1
                  2

                  for循环处理对方track发来的数据

                          // Read
                          n, _, readErr := track.Read(b)
                          if readErr != nil {
                              panic(readErr)
                          }
                  
                  1
                  2
                  3
                  4
                  5

                  先把数据读进缓冲区

                          // Unmarshal the packet and update the PayloadType
                          if err = rtpPacket.Unmarshal(b[:n]); err != nil {
                              panic(err)
                          }
                  
                  1
                  2
                  3
                  4

                  然后将缓冲区里的数据解码为rtp.Packet

                          rtpPacket.PayloadType = c.payloadType
                  
                  1

                  修改一下rtp.Packet里的PayloadType

                          // Marshal into original buffer with updated PayloadType
                          if n, err = rtpPacket.MarshalTo(b); err != nil {
                              panic(err)
                          }
                  
                  1
                  2
                  3
                  4

                  再编码放回去缓冲区

                  这一段的最终目的就只是改一下缓冲区里的PayloadType

                          // Write
                          if _, err = c.conn.Write(b[:n]); err != nil {
                              // For this particular example, third party applications usually timeout after a short
                              // amount of time during which the user doesn't have enough time to provide the answer
                              // to the browser.
                              // That's why, for this particular example, the user first needs to provide the answer
                              // to the browser then open the third party application. Therefore we must not kill
                              // the forward on "connection refused" errors
                              if opError, ok := err.(*net.OpError); ok && opError.Err.Error() == "write: connection refused" {
                                  continue
                              }
                              panic(err)
                          }
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9
                  10
                  11
                  12
                  13

                  最后把缓冲区数据写到UDP连接里

                      }
                  })
                  
                  1
                  2

                  结束。

                  可以看出,上面这个track不断从PeerConnection里读数据然后写进UDP连接里,说明这个示例是将一个WebRTC流转发为RTP流;并且实例里没有从RTP到WebRTC的代码,说明只能从WebRTC流到RTP流单向发。

                  # 连接状态变化时的处理函数

                  // Set the handler for ICE connection state
                  // This will notify you when the peer has connected/disconnected
                  peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
                      fmt.Printf("Connection State has changed %s \n", connectionState.String())
                  
                      if connectionState == webrtc.ICEConnectionStateConnected {
                          fmt.Println("Ctrl+C the remote client to stop the demo")
                      }
                  })
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9

                  当WebRTC流的ICE连接状态改变时输出一些提示信息。

                  // Set the handler for Peer connection state
                  // This will notify you when the peer has connected/disconnected
                  peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
                      fmt.Printf("Peer Connection State has changed: %s\n", s.String())
                  
                      if s == webrtc.PeerConnectionStateFailed {
                          // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
                          // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
                          // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
                          fmt.Println("Done forwarding")
                          os.Exit(0)
                      }
                  })
                  
                  1
                  2
                  3
                  4
                  5
                  6
                  7
                  8
                  9
                  10
                  11
                  12
                  13

                  当WebRTC流的PeerConnection连接状态改变时输出一些提示信息。

                  # 启动PeerConnection

                  // Wait for the offer to be pasted
                  offer := webrtc.SessionDescription{}
                  signal.Decode(signal.MustReadStdin(), &offer)
                  
                  1
                  2
                  3

                  先从stdin读入SessionDescription

                  // Set the remote SessionDescription
                  if err = peerConnection.SetRemoteDescription(offer); err != nil {
                      panic(err)
                  }
                  
                  1
                  2
                  3
                  4

                  设置PeerConnection的远端SessionDescription,应该是表示只接受SessionDescription相符的传入连接。

                  // Create answer
                  answer, err := peerConnection.CreateAnswer(nil)
                  if err != nil {
                      panic(err)
                  }
                  
                  1
                  2
                  3
                  4
                  5

                  设置PeerConnection的SDP Answer。

                  // Create channel that is blocked until ICE Gathering is complete
                  gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
                  
                  1
                  2

                  这里的GatheringCompletePromise返回的是一个context(本质上是<-chan struct{}),阻塞直到完成ICE信息收集。

                  // Sets the LocalDescription, and starts our UDP listeners
                  if err = peerConnection.SetLocalDescription(answer); err != nil {
                      panic(err)
                  }
                  
                  1
                  2
                  3
                  4

                  设置PeerConnection的本地SessionDescription,用在Answer里面。

                  // Block until ICE Gathering is complete, disabling trickle ICE
                  // we do this because we only can exchange one signaling message
                  // in a production application you should exchange ICE Candidates via OnICECandidate
                  <-gatherComplete
                  
                  1
                  2
                  3
                  4

                  等待完成ICE信息收集。注释里讲了这个示例的特殊性:只进行一次ICE信令交换,正常情况应该是使用OnICECandidate。

                  // Output the answer in base64 so we can paste it in browser
                  fmt.Println(signal.Encode(*peerConnection.LocalDescription()))
                  
                  1
                  2

                  输出SessionDescription,给用户在浏览器里粘贴(包含收集到的ICE信息),浏览器就是通过这个在STUN里找到自己要连的服务端在哪。

                  // Block forever
                  select {}
                  
                  1
                  2

                  阻塞,让各协程运行。

                  帮助我们改善此页面!
                  创建于: 2021-08-30 14:12:38

                  更新于: 2021-09-02 13:33:33