Yin的笔记本

vuePress-theme-reco Howard Yin    2021 - 2025
Yin的笔记本 Yin的笔记本

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
author-avatar

Howard Yin

303

Article

153

Tag

Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
  • pion/interceptor浅析

    • 核心接口
      • BindRTCPReader方法
      • BindRTCPWriter方法
    • BindRemoteStream方法
      • BindLocalStream方法
        • Close
          • 来自《用实例学习pion interceptor - nack》的附加知识
            • 总结一下Interceptor的创建过程
              • 实现BindLocalStream
              • 实现BindRTCPWriter
              • 实现BindRemoteStream
              • 实现BindRTCPReader

          [pion/interceptor](https://github.com/pion/interceptor)浅析

          vuePress-theme-reco Howard Yin    2021 - 2025

          pion/interceptor浅析


          Howard Yin 2021-09-27 13:16:04 WebRTC编程框架pion概念

          v3.0.0 introduces a new Pion specific concept known as a interceptor. A Interceptor is a pluggable RTP/RTCP processor. Via a public API users can easily add and customize operations that are run on inbound/outbound RTP. Interceptors are an interface this means A user could provide their own implementation. Or you can use one of the interceptors Pion will have in-tree.

          We designed this with the following vision.

          • Useful defaults. Nontechnical users should be able to build things without tuning.
          • Don't block unique use cases. We shouldn't paint ourself in a corner. We want to support interesting WebRTC users
          • Allow users to bring their own logic. We should encourage easy changing. Innovation in this area is important.
          • Allow users to learn. Don't put this stuff deep in the code base. Should be easy to jump into so people can learn.

          In this release we only are providing a NACK Generator/Responder interceptor. We will be implementing more and you can import the latest at anytime! This means you can update your pion/interceptor version without having to upgrade pion/webrtc!

          总的来说,在WebRTC通信中,要收发的包可以分为两类:

          • RTP包:传递媒体内容,如音视频片段等
          • RTCP包:传递控制信息,如NACK、接收方报告等

          pion/interceptor也是按照这样的思路进行实现的,其中实现的interceptor都同时要处理RTP包和RTCP包,处理方式基本上都是根据RTP包的收发情况构造RTCP包进行发送,以及根据收到的RTCP包调整RTP包的发送。

          • 比如在pion/interceptor/pkg/nack里,就是一方根据RTP包的序列号发送NACK信息,另一方根据NACK信息重发RTP包
          • 又比如在pion/interceptor/pkg/nack里,就是一方统计RTP包的接收情况发送SenderReport,另一方接收并存储之
          • 再比如在pion/interceptor/pkg/twcc里,就是一方统计RTP包的接收情况反馈丢包信息,另一方接收然后调整RTP包发送窗口

          此外,从逻辑上讲,一个协议必须得收发双方都实现了才能正常运行,而且由于WebRTC是对等连接通信,一方可能既是接收方又是发送方,所以pion/interceptor里的interceptor都必须得实现收发双方的功能。在程序里,这个思想体现为收发方基础接口不是分SenderInterceptor和ReceiverInterceptor两个,而是在一个基础接口Interceptor中同时包含收发双方的方法。

          本文可以和《用实例学习pion interceptor - nack》搭配着看。

          # 核心接口

          // Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp
          // packets, or sending your own packets as needed.
          type Interceptor interface {
          
          1
          2
          3

          Interceptor接口是pion/interceptor包的核心,pion/interceptor包的主要功能代码是pion/interceptor/pkg里继承了Interceptor类的各种interceptor,其余的代码基本都是这个Interceptor的方法参数里用到的类。pion/interceptor/pkg里的这些interceptor都是pion/webrtc里要用到的。用户也可以自己定义interceptor用在pion/webrtc里,比如《用实例学习pion - rtp-forwarder》。

          # BindRTCPReader方法

          	// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
          	// change in the future. The returned method will be called once per packet batch.
          	BindRTCPReader(reader RTCPReader) RTCPReader
          
          1
          2
          3
          • 从函数名上看,这是一个绑定RTCPReader的接口
          • 从注释上看,这个接口是为了让用户自定义修改输入RTCP数据包的过程
          • 从函数的输入输出上看,函数输入一个RTCPReader输出一个RTCPReader,这个方法在使用时应该是可以级联的

          再看看这个RTCPReader是什么

          // RTCPReader is used by Interceptor.BindRTCPReader.
          type RTCPReader interface {
          	// Read a batch of rtcp packets
          	Read([]byte, Attributes) (int, Attributes, error)
          }
          
          1
          2
          3
          4
          5

          可以看到,RTCPReader就是一个Read函数,输入一段字节数据和Attributes,输出整型、Attributes和错误。

          结合前面BindRTCPReader里说的功能,这个Read应该就是用户实现自定义修改输入RTCP数据包过程的地方。

          输出里的错误自不必多讲,这里输入的字节数据应该就是修改前的RTCP数据包,修改过程应该是直接在这个字节输入上进行,后面输出的整型应该是修改后的数据长度,让之后的操作可以直接从字节数据里读出RTCP包。这个Attributes在后面定义的,是一个map[interface{}]interface{},应该是用于传递一些自定义参数的。

          pion/interceptor里还提供了一种简便的构造RTCPReader的方式:

          // RTCPReaderFunc is an adapter for RTCPReader interface
          type RTCPReaderFunc func([]byte, Attributes) (int, Attributes, error)
          
          // Read a batch of rtcp packets
          func (f RTCPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
          	return f(b, a)
          }
          
          1
          2
          3
          4
          5
          6
          7

          这样,只要写好Read里的代码,可以不用再定义一个RTCPReader子类,直接把函数放进RTCPReaderFunc就行。函数式编程思想,很妙。pion/interceptor/pkg里的几个interceptor都是这么用的。

          那么这么看,BindRTCPReader确实是可以级联的,并且BindRTCPReader里面要实现的操作也能大概猜得到:

          • 以BindRTCPReader输入的RTCPReader构造自己的RTCPReader作为输出,在自己的RTCPReader的Read函数中:
            1. 调用BindRTCPReader输入的RTCPReader的Read函数
            2. 根据返回的整型值,读取修改后的字节数据,反序列化为RTCP包
            3. 修改RTCP包和Attributes,或进行一些其他自定义操作(比如记录统计信息、转发、筛选等)
            4. 把修改后RTCP包序列化到字节数据里(可选)
            5. 返回整型值和Attributes

          pion/interceptor/pkg里的几个interceptor都是这样的,不过它们都没有修改字节数据的操作。

          • 比如在pion/interceptor/pkg/nack里,interceptor从字节数据里获取RTCP包,然后判断是不是NACK包,如果是就按照NACK里汇报的丢包情况重发RTCP包
          • 再比如在pion/interceptor/pkg/report里,interceptor从字节数据里获取RTCP包,然后判断是不是SenderReport包,如果是就存储之

          于是,一级一级地调用一串interceptor的BindRTCPReader,每个BindRTCPReader都以上一个interceptor的BindRTCPReader返回的RTCPReader为输入;输出的RTCPReader的Read里面先调用了输入的RTCPReader的Read,再进行自定义的修改操作,返回修改后的RTCP包字节数据。这样,最后一个interceptor的BindRTCPReader输出的RTCPReader的Read就是一个顺序执行所有自定义操作的RTCP包处理函数。

          # BindRTCPWriter方法

          	// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
          	// will be called once per packet batch.
          	BindRTCPWriter(writer RTCPWriter) RTCPWriter
          
          1
          2
          3
          • 从函数名上看,这是一个绑定RTCPWriter的接口
          • 从注释上看,这个接口是为了让用户自定义修改输出RTCP数据包的过程
          • 很明显,这个方法在使用时应该和BindRTCPReader一样也是可以级联的,级联方式应该也大差不离
          // RTCPWriter is used by Interceptor.BindRTCPWriter.
          type RTCPWriter interface {
          	// Write a batch of rtcp packets
          	Write(pkts []rtcp.Packet, attributes Attributes) (int, error)
          }
          
          1
          2
          3
          4
          5

          一股子RTCPReader的既视感,明显也是可以级联的,要实现的操作应该也差不多:

          • 以BindRTCPWriter输入的RTCPWriter构造自己的RTCPWriter作为输出,在RTCPWriter的Write函数里对输入的rtcp.Packet列表进行增减(也就是增减要发送的)

          但是pion/interceptor/pkg里的几个interceptor好像都没这样用,它们的BindRTCPWriter都是直接记录下RTCPWriter,然后开了个协程写数据:

          • 比如在pion/interceptor/pkg/nack里是定期读取接收日志,找到有哪些缺失的包,收集序列号构造NACK包发送
          • 再比如在pion/interceptor/pkg/report里是定期发送SenderReport包
          • 又比如在pion/interceptor/pkg/twcc里是定期发送反馈信息

          pion/interceptor里也提供了一种简便的构造RTCPWriter的方式:

          // RTCPWriterFunc is an adapter for RTCPWriter interface
          type RTCPWriterFunc func(pkts []rtcp.Packet, attributes Attributes) (int, error)
          
          // Write a batch of rtcp packets
          func (f RTCPWriterFunc) Write(pkts []rtcp.Packet, attributes Attributes) (int, error) {
          	return f(pkts, attributes)
          }
          
          1
          2
          3
          4
          5
          6
          7

          和RTCPReaderFunc一个道理,不必多讲。

          # BindRemoteStream方法

          	// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
          	// will be called once per rtp packet.
          	BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader
          
          	// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
          	UnbindRemoteStream(info *StreamInfo)
          
          1
          2
          3
          4
          5
          6

          绑定和解绑远端流,从方法和注释上看和BindRTCPReader是类似的,都是用来绑定处理发送出去的数据包的方法的。

          这里绑定的RTPReader和RTCPReader的Read函数里的输入参数是一模一样的:

          // RTPReader is used by Interceptor.BindRemoteStream.
          type RTPReader interface {
          	// Read a rtp packet
          	Read([]byte, Attributes) (int, Attributes, error)
          }
          
          1
          2
          3
          4
          5

          BindRemoteStream和BindRTCPReader唯一的区别在于包处理的方式:RTP包和RTCP包在逻辑上的不同之处在于,RTP包是从属于一个流的连续序列,而RTCP包是一个个独立的包。因此在BindRTCPReader中,输入的数据直接就是一个RTCPReader;而BindRemoteStream不仅需要指定RTPReader,还需要指定一个存储流信息的StreamInfo。

          这个StreamInfo长这样:

          // StreamInfo is the Context passed when a StreamLocal or StreamRemote has been Binded or Unbinded
          type StreamInfo struct {
          	ID                  string
          	Attributes          Attributes
          	SSRC                uint32
          	PayloadType         uint8
          	RTPHeaderExtensions []RTPHeaderExtension
          	MimeType            string
          	ClockRate           uint32
          	Channels            uint16
          	SDPFmtpLine         string
          	RTCPFeedback        []RTCPFeedback
          }
          
          // RTPHeaderExtension represents a negotiated RFC5285 RTP header extension.
          type RTPHeaderExtension struct {
          	URI string
          	ID  int
          }
          
          // RTCPFeedback signals the connection to use additional RTCP packet types.
          // https://draft.ortc.org/#dom-rtcrtcpfeedback
          type RTCPFeedback struct {
          	// Type is the type of feedback.
          	// see: https://draft.ortc.org/#dom-rtcrtcpfeedback
          	// valid: ack, ccm, nack, goog-remb, transport-cc
          	Type string
          
          	// The parameter value depends on the type.
          	// For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
          	Parameter string
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32

          可以看到,这个StreamInfo里面放的是一些与流有关的配置信息。由于RTP包承载的是流,流中的包可以看成是一个整体,是一系列相互关联的连续包,不像RTCP包那样是一个个独立的包,一会是NACK、一会又是SenderReport。StreamInfo就是这些连续RTP包中与流相关的标记信息,它可以用来区分RTP包属于哪个流、区分媒体的类型、记录时钟频率等等。

          一些重要的StreamInfo参数:引自《RTP: audio and video for the Internet》

          • SSRC:Synchronization Source(SSRC)标识RTP会话中的参与者。它是一个临时的,每个会话的标识符通过RTP控制协议映射到一个长期的规范名称CNAME。SSRC是一个32位整数,由参与者加入会话时随机选择。具有相同SSRC的所有数据包均构成单个时序和序列号空间的一部分,因此接收方必须按SSRC对数据包进行分组才能进行播放。如果参加者在一个RTP会话中生成多个流(例如,来自不同的摄像机),每个流都必须标识为不同的SSRC,以便接收方可以区分哪些数据包属于每个流。
          • PayloadType:有效负载类型。RTP头的负载类型(或者PT)与RTP传输的媒体数据关联。接收者应用检测负载类型来甄别如何处理数据,例如,传递给特定的解压缩器。
          • MimeType:有效负载格式。有效负载格式是根据MIME名称空间命名的。该名称空间最初是为电子邮件定义的,用于标识附件的内容,但此后它已成为媒体格式的通用名称空间,并在许多应用程序中使用。所有有效负载格式都应该具有MIME类型注册。更新的有效负载格式将其包含在其各自规范中; 在线维护MIME类型的完整列表,网址为:http://www.iana.org/assignments/media-types 。

          还有其他的一些参数在RTP包相关的IETF标准里都应该能找到。

          pion/interceptor里也提供了一种简便的构造RTPReader的方式:

          // RTPReaderFunc is an adapter for RTPReader interface
          type RTPReaderFunc func([]byte, Attributes) (int, Attributes, error)
          
          // Read a rtp packet
          func (f RTPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
          	return f(b, a)
          }
          
          1
          2
          3
          4
          5
          6
          7

          和RTCPReaderFunc一个道理,不必多讲。

          # BindLocalStream方法

          	// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
          	// will be called once per rtp packet.
          	BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter
          
          	// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
          	UnbindLocalStream(info *StreamInfo)
          
          1
          2
          3
          4
          5
          6

          绑定和解绑本地流,从方法和注释上看和BindRTCPWriter是类似的,都是用来绑定处理发送出去的数据包的方法的,这里绑定的RTPWriter也和RTCPWriter大差不离:

          // RTPWriter is used by Interceptor.BindLocalStream.
          type RTPWriter interface {
          	// Write a rtp packet
          	Write(header *rtp.Header, payload []byte, attributes Attributes) (int, error)
          }
          
          1
          2
          3
          4
          5

          可以看到,唯一的区别在于包构建方式:从代码上看,RTCPWriter.Write的输入直接就是rtcp.Packet的列表;而RTPWriter.Write的输入是分开的一个包头rtp.Header和[]byte格式的内容。这可能是因为RTCP只会传递一些运行状态数据和控制信息,每种包都有自己独特的结构,而RTP包是由一长串媒体数据切开包装而来,结构比较规整,不能给用户随便调整,所以把包头和包内容分了两个变量。

          pion/interceptor里也提供了一种简便的构造RTPWriter的方式:

          // RTPWriterFunc is an adapter for RTPWrite interface
          type RTPWriterFunc func(header *rtp.Header, payload []byte, attributes Attributes) (int, error)
          
          // Write a rtp packet
          func (f RTPWriterFunc) Write(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
          	return f(header, payload, attributes)
          }
          
          1
          2
          3
          4
          5
          6
          7

          和RTCPReaderFunc一个道理,不必多讲。

          # Close

          	io.Closer
          }
          
          1
          2

          这里好理解,当要销毁这个Interceptor的时候,必须要解绑所有的RTCPReader、RTCPWriter、RTPReader、RTPWriter,并且停止所有的相关协程,这个只能由实现Interceptor的用户来做。所以在这里加上了一个io.Closer,要求用户自己实现一个Close方法。

          # 来自《用实例学习pion interceptor - nack》的附加知识

          以下是一些关于级联和Interceptor具体如何调用的知识。《用实例学习pion interceptor - nack》里的案例很是简洁,一看就能懂。

          从《用实例学习pion interceptor - nack》中的案例看:

          • 在级联的开头,用户需要自行调用Read把包传进级联的Reader里
          • 在级联的末尾,用户需要自行在Write里写上发送包的函数,把级联的Writer传来的包发送出去

          比如NACK发送方接收RTP包就是首先获取到RTPReader:

          // Create the writer just for a single SSRC stream
          // this is a callback that is fired everytime a RTP packet is ready to be sent
          streamReader := chain.BindRemoteStream(&interceptor.StreamInfo{
          	SSRC:         ssrc,
          	RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
          }, interceptor.RTPReaderFunc(func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { return len(b), nil, nil }))
          
          1
          2
          3
          4
          5
          6

          然后在循环里从UDP处收包之后放进RTPReader.Read:

          i, addr, err := conn.ReadFrom(buffer)
          if err != nil {
          	panic(err)
          }
          
          log.Println("Received RTP")
          
          if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
          	panic(err)
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10

          由于级联了NACK Interceptor,所以就能执行一些包统计的操作,找出未接收到的RTP包,构造NACK。

          然后NACK发送方发NACK包就是写在RTCPWriter.Write里的:

          chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
          	buf, err := rtcp.Marshal(pkts)
          	if err != nil {
          		return 0, err
          	}
          
          	return conn.WriteTo(buf, addr)
          }))
          
          1
          2
          3
          4
          5
          6
          7
          8

          这样就能完成“收RTP包——统计丢包——发NACK”的操作。

          NACK接收方也是一样,先获取RTCPReader:

          // Set the interceptor wide RTCP Reader
          // this is a handle to send NACKs back into the interceptor.
          rtcpReader := chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
          	return len(in), nil, nil
          }))
          
          1
          2
          3
          4
          5

          然后也是在循环里UDP收包之后放进RTCPReader.Read:

          i, err := conn.Read(rtcpBuf)
          if err != nil {
          	panic(err)
          }
          
          log.Println("Received NACK")
          
          if _, _, err = rtcpWriter.Read(rtcpBuf[:i], nil); err != nil {
          	panic(err)
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10

          于是获取到NACK解包出来就知道要重发哪些RTP包了。

          然后NACK接收方重发RTP包就是写在RTPWriter.Write里的:

          // Create the writer just for a single SSRC stream
          // this is a callback that is fired everytime a RTP packet is ready to be sent
          streamWriter := chain.BindLocalStream(&interceptor.StreamInfo{
          	SSRC:         ssrc,
          	RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
          }, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
          	headerBuf, err := header.Marshal()
          	if err != nil {
          		panic(err)
          	}
          
          	return conn.Write(append(headerBuf, payload...))
          }))
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13

          这样就能完成“接收NACK包——找出需要重发的RTP包——重发RTP包”的操作了。

          # 总结一下Interceptor的创建过程

          首先是继承interceptor.NoOp,因为实际情况下不一定需要把BindLocalStream、BindRTCPWriter、BindRemoteStream、BindRTCPReader四个全实现。

          # 实现BindLocalStream

          1. 实现一个RTPWriter,在其中保存另一个RTPWriter,并在其Write函数中调用保存的RTPWriter.Write
          2. 实现BindLocalStream,将输入的RTPWriter保存到你实现的RTPWriter中并返回
          3. (常见操作)让你实现的RTPWriter可以读到Interceptor里的数据,然后在BindLocalStream里开goroutine调用RTPWriter定期获取Interceptor里的数据并据此调用保存的另一个RTPWriter写一些特殊功能的包

          # 实现BindRTCPWriter

          同上,只不过RTPWriter变成RTCPWriter

          # 实现BindRemoteStream

          1. 实现一个RTPReader,在其中保存另一个RTPReader,并在其Read函数中调用保存的RTPReader.Read
          2. 实现BindRemoteStream,将输入的RTPReader保存到你实现的RTPReader中并返回
          3. (常见操作)让RTPReader可以操作Interceptor里的数据,从而可以根据RTPReader.Read输入的数据修改Interceptor里的数据,进而影响其绑定的RTPWriter和RTCPWriter的行为

          # 实现BindRTCPReader

          同上,只不过RTPReader变成RTCPReader

          帮助我们改善此页面!
          创建于: 2021-09-10 12:33:05

          更新于: 2021-09-27 13:16:48