pion中的TrackLocal
// TrackLocal is an interface that controls how the user can send media
// The user can provide their own TrackLocal implementations, or use
// the implementations in pkg/media
type TrackLocal interface {
// Bind should implement the way how the media data flows from the Track to the PeerConnection
// This will be called internally after signaling is complete and the list of available
// codecs has been determined
Bind(TrackLocalContext) (RTPCodecParameters, error)
// Unbind should implement the teardown logic when the track is no longer needed. This happens
// because a track has been stopped.
Unbind(TrackLocalContext) error
// ID is the unique identifier for this Track. This should be unique for the
// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
// and StreamID would be 'desktop' or 'webcam'
ID() string
// StreamID is the group this track belongs too. This must be unique
StreamID() string
// Kind controls if this TrackLocal is audio or video
Kind() RTPCodecType
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
TrackLocal
表示的是本地发往远端的track,其最核心的功能就是这个Bind
和Unbind
。从注释上看,这两个就是把TrackLocalContext
绑定和解绑到这个Track上面,来看看TrackLocalContext
是什么:
// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
// in Interceptors.
type TrackLocalContext struct {
id string
params RTPParameters
ssrc SSRC
writeStream TrackLocalWriter
}
// CodecParameters returns the negotiated RTPCodecParameters. These are the codecs supported by both
// PeerConnections and the SSRC/PayloadTypes
func (t *TrackLocalContext) CodecParameters() []RTPCodecParameters {
return t.params.Codecs
}
// HeaderExtensions returns the negotiated RTPHeaderExtensionParameters. These are the header extensions supported by
// both PeerConnections and the SSRC/PayloadTypes
func (t *TrackLocalContext) HeaderExtensions() []RTPHeaderExtensionParameter {
return t.params.HeaderExtensions
}
// SSRC requires the negotiated SSRC of this track
// This track may have multiple if RTX is enabled
func (t *TrackLocalContext) SSRC() SSRC {
return t.ssrc
}
// WriteStream returns the WriteStream for this TrackLocal. The implementer writes the outbound
// media packets to it
func (t *TrackLocalContext) WriteStream() TrackLocalWriter {
return t.writeStream
}
// ID is a unique identifier that is used for both Bind/Unbind
func (t *TrackLocalContext) ID() string {
return t.id
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
一看就能猜出来,最主要的操作应该是和TrackLocalWriter
的写入有关,这个TrackLocalWriter
就是个写入的东西:
// TrackLocalWriter is the Writer for outbound RTP Packets
type TrackLocalWriter interface {
// WriteRTP encrypts a RTP packet and writes to the connection
WriteRTP(header *rtp.Header, payload []byte) (int, error)
// Write encrypts and writes a full RTP packet
Write(b []byte) (int, error)
}
2
3
4
5
6
7
8
上面的绑定操作应该就是让track知道自己要写哪个TrackLocalWriter
,而具体是怎么写、写什么数据,那就是由用户自己定义,只要数据能写入到TrackLocalWriter.WriteRTP
或TrackLocalWriter.Write
里就行。
比如pion就有一个内置的TrackLocalStaticRTP
,构建了一种编解码方式不变的TrackLocal
:
// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets.
// If you wish to send a media.Sample use TrackLocalStaticSample
type TrackLocalStaticRTP struct {
mu sync.RWMutex
bindings []trackBinding
codec RTPCodecCapability
id, streamID string
}
2
3
4
5
6
7
8
它的bind就是简单地把TrackLocalContext
里的东西放进列表:
// Bind is called by the PeerConnection after negotiation is complete
// This asserts that the code requested is supported by the remote peer.
// If so it setups all the state (SSRC and PayloadType) to have a call
func (s *TrackLocalStaticRTP) Bind(t TrackLocalContext) (RTPCodecParameters, error) {
s.mu.Lock()
defer s.mu.Unlock()
parameters := RTPCodecParameters{RTPCodecCapability: s.codec}
if codec, matchType := codecParametersFuzzySearch(parameters, t.CodecParameters()); matchType != codecMatchNone {
s.bindings = append(s.bindings, trackBinding{
ssrc: t.SSRC(),
payloadType: codec.PayloadType,
writeStream: t.WriteStream(),
id: t.ID(),
})
return codec, nil
}
return RTPCodecParameters{}, ErrUnsupportedCodec
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
然后它当然是提供了写数据的函数:
// WriteRTP writes a RTP Packet to the TrackLocalStaticRTP
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them
func (s *TrackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
ipacket := rtpPacketPool.Get()
packet := ipacket.(*rtp.Packet)
defer func() {
*packet = rtp.Packet{}
rtpPacketPool.Put(ipacket)
}()
*packet = *p
return s.writeRTP(packet)
}
// writeRTP is like WriteRTP, except that it may modify the packet p
func (s *TrackLocalStaticRTP) writeRTP(p *rtp.Packet) error {
s.mu.RLock()
defer s.mu.RUnlock()
writeErrs := []error{}
for _, b := range s.bindings {
p.Header.SSRC = uint32(b.ssrc)
p.Header.PayloadType = uint8(b.payloadType)
if _, err := b.writeStream.WriteRTP(&p.Header, p.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
}
return util.FlattenErrs(writeErrs)
}
// Write writes a RTP Packet as a buffer to the TrackLocalStaticRTP
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them
func (s *TrackLocalStaticRTP) Write(b []byte) (n int, err error) {
ipacket := rtpPacketPool.Get()
packet := ipacket.(*rtp.Packet)
defer func() {
*packet = rtp.Packet{}
rtpPacketPool.Put(ipacket)
}()
if err = packet.Unmarshal(b); err != nil {
return 0, err
}
return len(b), s.writeRTP(packet)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
可以看到,这基本上就是调用所有的Bind
里添加进数组的TrackLocalWriter
的TrackLocalWriter.WriteRTP
。注意这是一个TrackLocal
,它不是TrackLocalWriter
,它的WriteRTP
和Write
并不是继承的TrackLocalWriter
里的WriteRTP
和Write
,只是自定义的用于外部数据写入的函数,不是继承的任何接口。
在用这个类的时候,就是将其用AddTrack
或者AddTransceiverFromTrack
加进PeerConnection,然后调用这个WriteRTP
方法就相当于是在向远端发送数据了。
比如《用实例学习pion - rtp-to-webrtc
》里介绍的就是TrackLocalStaticRTP
的典型应用:
在前面AddTrack
:
// Create a video track
videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "pion")
if err != nil {
panic(err)
}
rtpSender, err := peerConnection.AddTrack(videoTrack)
if err != nil {
panic(err)
}
2
3
4
5
6
7
8
9
然后最后一个死循环不断从UDP连接里读数据写进track里:
// Read RTP packets forever and send them to the WebRTC Client
inboundRTPPacket := make([]byte, 1600) // UDP MTU
for {
n, _, err := listener.ReadFrom(inboundRTPPacket)
if err != nil {
panic(fmt.Sprintf("error during read: %s", err))
}
if _, err = videoTrack.Write(inboundRTPPacket[:n]); err != nil {
if errors.Is(err, io.ErrClosedPipe) {
// The peerConnection has been closed.
return
}
panic(err)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17