Yin的笔记本

vuePress-theme-reco Howard Yin    2021 - 2025
Yin的笔记本 Yin的笔记本

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
author-avatar

Howard Yin

303

Article

153

Tag

Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
  • 用实例学习pion interceptor - nack

    • 接收方函数
      • 发送方函数
        • 总结一下interceptor的调用过程
          • 发送方的调用
          • 接收方的调用

      用实例学习pion interceptor - [`nack`](https://github.com/pion/interceptor/blob/master/examples/nack/main.go)

      vuePress-theme-reco Howard Yin    2021 - 2025

      用实例学习pion interceptor - nack


      Howard Yin 2021-09-27 13:27:02 WebRTC编程框架pion实操
      const (
      	listenPort = 6420
      	mtu        = 1500
      	ssrc       = 5000
      )
      
      func main() {
      	go sendRoutine()
      	receiveRoutine()
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10

      示例的开头,是一些配置文件和主函数。从主函数里看,这个例程是一收一发两个协程,其中接收是在主协程里。

      接下来分别解析发送和接收方协程函数。

      # 接收方函数

      func receiveRoutine() {
      
      1

      以下是接收方函数体

      	serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
      	if err != nil {
      		panic(err)
      	}
      
      	conn, err := net.ListenUDP("udp4", serverAddr)
      	if err != nil {
      		panic(err)
      	}
      
      1
      2
      3
      4
      5
      6
      7
      8
      9

      首先当然是开UDP监听端口

      	// Create NACK Generator
      	generator, err := nack.NewGeneratorInterceptor()
      	if err != nil {
      		panic(err)
      	}
      
      1
      2
      3
      4
      5

      然后构造一个NACK Interceptor。接收方是负责生成NACK消息的,所以是用NewGeneratorInterceptor,顾名思义,这是生成NACK消息的Interceptor。从源码上看,这个NewGeneratorInterceptor返回的是GeneratorInterceptor,只实现了BindRTCPWriter和BindRemoteStream;从逻辑上想也确实应该是这样,因为接收方生成NACK消息只需要知道接收到的RTP包序列(用BindRemoteStream实现的功能)然后发送NACK包(用BindRTCPWriter实现的功能)就行了。

      	// Create our interceptor chain with just a NACK Generator
      	chain := interceptor.NewChain([]interceptor.Interceptor{generator})
      
      1
      2

      这里的Chain本质是一个Interceptor的列表,并且自身也是Interceptor,它的BindRTCPReader、BindRTCPWriter等方法的实现就是依次调用其Interceptor的列表里的对应方法。这种级联思想的解释可以看《pion/interceptor浅析》。

      	// Create the writer just for a single SSRC stream
      	// this is a callback that is fired everytime a RTP packet is ready to be sent
      	streamReader := chain.BindRemoteStream(&interceptor.StreamInfo{
      		SSRC:         ssrc,
      		RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
      	}, interceptor.RTPReaderFunc(func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { return len(b), nil, nil }))
      
      1
      2
      3
      4
      5
      6

      这里绑定了一个处理远端输入流的处理函数,看样子是直接返回缓冲区大小,不进行任何操作。

      这个绑定操作主要是为了获取到这个streamReader变量。这个streamReader是一个嵌套了NACK相关操作的RTPReader,NACK的相关操作要调用streamReader.Read才能触发(具体为什么是这样可以看《pion/interceptor浅析》里关于RTCPReader嵌套操作的介绍)。

      	for rtcpBound, buffer := false, make([]byte, mtu); ; {
      
      1

      这里一个死循环,在循环中不断进行数据的处理。

      		i, addr, err := conn.ReadFrom(buffer)
      		if err != nil {
      			panic(err)
      		}
      
      		log.Println("Received RTP")
      
      1
      2
      3
      4
      5
      6

      首先当然是读取到UDP里发来的RTP包。

      		if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
      			panic(err)
      		}
      
      1
      2
      3

      然后把包输入到streamReader.Read中,正如前文所述,streamReader是一个嵌套了NACK相关操作的RTPReader,调用streamReader.Read就会触发NACK Interceptor里的相关操作。

      		// Set the interceptor wide RTCP Writer
      		// this is a callback that is fired everytime a RTCP packet is ready to be sent
      		if !rtcpBound {
      			chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
      				buf, err := rtcp.Marshal(pkts)
      				if err != nil {
      					return 0, err
      				}
      
      				return conn.WriteTo(buf, addr)
      			}))
      
      			rtcpBound = true
      		}
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14

      接下来这是一个绑定RTCPWriter的操作,按理说这种绑定操作应该是放在读取数据的这个循环外面,这样就不需要这个rtcpBound来防止重复操作了。从代码上看,这里绑定的RTCPWriter是把输入的RTCP包发送出去。结合NACK Interceptor里面的包发送逻辑,这个操作会定期被调用,也就是定期发送NACK包。所以这个操作被放在循环里面的原因应该是防止这个NACK的定时发送在对面还没有就绪的时候就发包。

      	}
      }
      
      1
      2

      结束。

      # 发送方函数

      func sendRoutine() {
      
      1

      以下是发送方函数体

      	// Dial our UDP listener that we create in receiveRoutine
      	serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
      	if err != nil {
      		panic(err)
      	}
      
      	conn, err := net.DialUDP("udp4", nil, serverAddr)
      	if err != nil {
      		panic(err)
      	}
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10

      接收方那边是监听UDP端口,发送方这边就是连接接收方那边开的端口。

      	// Create NACK Responder
      	responder, err := nack.NewResponderInterceptor()
      	if err != nil {
      		panic(err)
      	}
      
      	// Create our interceptor chain with just a NACK Responder.
      	chain := interceptor.NewChain([]interceptor.Interceptor{responder})
      
      1
      2
      3
      4
      5
      6
      7
      8

      然后当然也有和初始化Interceptor的操作。这个NewResponderInterceptor返回的是一个ResponderInterceptor。和NewGeneratorInterceptor差不多的道理,NACK接收方进行的操作是接收NACK包然后重发所需的RTP包,所以这个ResponderInterceptor里面只实现了BindRTCPReader(接收NACK包)和BindLocalStream(重发所需的RTP包)。

      	// Set the interceptor wide RTCP Reader
      	// this is a handle to send NACKs back into the interceptor.
      	rtcpWriter := chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
      		return len(in), nil, nil
      	}))
      
      1
      2
      3
      4
      5

      如果按照这个变量的含义来说,这个变量名应该是写错了,这里明显生成的是一个RTCPReader,变量名应该是rtcpReader。

      这个也是绑定了一个什么都不做的操作,主要也是为了获取RTCPReader。后面会调用RTCPReader.Read读取RTCP包并触发NACK相关操作。和前面接收方函数差不多,只不过接收方函数里Read的是RTP包,发送方这里Read的是RTCP包。

      	// Create the writer just for a single SSRC stream
      	// this is a callback that is fired everytime a RTP packet is ready to be sent
      	streamWriter := chain.BindLocalStream(&interceptor.StreamInfo{
      		SSRC:         ssrc,
      		RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
      	}, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
      		headerBuf, err := header.Marshal()
      		if err != nil {
      			panic(err)
      		}
      
      		return conn.Write(append(headerBuf, payload...))
      	}))
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13

      这边BindLocalStream绑定了一个发送用的流,流里的操作就是把上层传过来的需要发送的数据通过UDP连接发送出去。

      	// Read RTCP packets sent by receiver and pass into Interceptor
      	go func() {
      		for rtcpBuf := make([]byte, mtu); ; {
      			i, err := conn.Read(rtcpBuf)
      			if err != nil {
      				panic(err)
      			}
      
      			log.Println("Received NACK")
      
      			if _, _, err = rtcpWriter.Read(rtcpBuf[:i], nil); err != nil {
      				panic(err)
      			}
      		}
      	}()
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15

      然后开了个协程从UDP里面读数据,交给前面生成的RTCPReader去进行读取操作。因为这边是一个发送方,所以这边收到的只会是NACK包。

      	for sequenceNumber := uint16(0); ; sequenceNumber++ {
      		// Send a RTP packet with a Payload of 0x0, 0x1, 0x2
      		if _, err := streamWriter.Write(&rtp.Header{
      			Version:        2,
      			SSRC:           ssrc,
      			SequenceNumber: sequenceNumber,
      		}, []byte{0x0, 0x1, 0x2}, nil); err != nil {
      			fmt.Println(err)
      		}
      
      		time.Sleep(time.Millisecond * 200)
      	}
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12

      最后这里就是发送随机测试数据,不用多讲。

      }
      
      1

      结束。

      # 总结一下interceptor的调用过程

      # 发送方的调用

      1. 实现RTPWriter和RTCPWriter,在它们的Write中调用网络接口发送数据包
      2. 调用BindLocalStream绑定上一步实现的RTPWriter、调用BindRTCPWriter绑定上一步实现的RTCPWriter,获得RTPWriter和RTCPWriter
      3. 按需执行RTPWriter.Write和RTCPWriter.Write发送数据

      # 接收方的调用

      1. 调用BindRemoteStream绑定不执行任何操作的RTPReader、调用BindRTCPReader绑定不执行任何操作的RTCPReader,获得RTPReader和RTCPReader
      2. 实现调用网络接口接收数据包的过程,以接收到的数据包为输入调用RTPReader.Read和RTCPReader.Read
      帮助我们改善此页面!
      创建于: 2021-09-10 12:33:05

      更新于: 2021-09-27 13:27:20