SRS总结(2)

发布时间 2023-09-20 15:31:51作者: 泽良_小涛

5.metadata,video,audio数据处理

RTMP推流到SRS流媒体服务器。

5.1. SrsRtmpConn::publishing(SrsSource* source)

在进行RTMP握手,消息交互后,即SrsRtmpConn::stream_service_cycle中执行完if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success)后,调用publishing—》SrsRtmpConn::publishing函数,主要包括两部分功能:

  • 根据给定的mount挂载handle。
  • 启动协程接受推流的音视频数据。

5.1.1 根据给定的mount挂载handle

参考:https://www.yuque.com/wahaha-0yfyj/mnfloz/lkxg8o?#

1.首先会根据跟定的mount挂载到handle,handle指SrsLiveStream(HTTP直播流,将RTMP转成HTTP-FLV或者其他格式)

2.推流的时候根据url创建对应的handler,拉流的时候根据url找到对应处理的handler

3.流程:

SrsLiveSource::on_publish-- handler->on_publish(this, req)? SrsServer::on_publish? SrsHttpServer::http_mount? SrsHttpStreamServer::http_mount

5.1.2挂载handle代码分析

1.主代码在SrsHttpStreamServer::http_mount开始。

the id to identify stream.

std::string sid = r->get_stream_url(); // 比如rtmp://8.141.75.248:1935/live/livestream中streamUrl就是/live/stream

SrsLiveEntry* entry = NULL; //SrsLiveEntry,直播⼊⼝,⽤来处理HTTP直播流

…..

// replace the vhost variable 替换mount,由[vhost]/[app]/[stream].flv变为__defaultVhost__/live/livestream.flv

mount = srs_string_replace(mount, "[vhost]", r->vhost);

mount = srs_string_replace(mount, "[app]", r->app);

mount = srs_string_replace(mount, "[stream]", r->stream);

// remove the default vhost mount 由__defaultVhost__/live/livestream.flv替换为/live/livestream.flv

mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");

entry = new SrsLiveEntry(mount); //创建SrsLiveEntry并标明类型,比如flv还是ts

entry->source = s; //指向source

entry->req = r->copy()->as_http();

entry->cache = new SrsBufferCache(s, r);

entry->stream = new SrsLiveStream(s, r, entry->cache); //创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式,其实际是handler

…..

2. 调用栈:(要再确认一下,目前还不太清楚)

3.实际挂载http flv stream在SrsHttpServeMux::handle

SrsHttpServeMux:HTTP请求多路复⽤器,⾥⾯记录了path以及对应的handler.

由if ((err = mux.handle(mount, entry->stream)) != srs_success) { //挂载http-flv流

调用。

5.2. 启动协程接收推流的音视频数据

5.2.1.SrsRtmpConn::do_publishing

启动协程接收客户端推送的音视频数据。

SrsRtmpConn::stream_service_cycle

case SrsRtmpConnFMLEPublish:

return publishing(source);

? SrsRtmpConn::publishing: err = do_publishing(source, &rtrd); ? SrsRtmpConn::do_publishing

5.2.2.SrsRecvThread::do_cycle

最后执行具体接收音视频数据操作的对象是SrsRecvThread::do_cycle,调用过程如下。

5.2.3.SrsRecvThread::do_cycle

完成两步操作:

  • 处理接收到的message。
  • 然后去消费这个message。

5.2.3.1. 处理接收到的message

SrsRtmpServer::recv_message? SrsProtocol::recv_message? SrsProtocol::on_recv_message

1.从协议栈接收流式数据,解码成RTMP的message,即流式数据->chunk->message。

2.对于协议控制消息,会进行解析成packet进行相应处理。

3.SrsProtocol::recv_message

主要流程:流式数据->chunk->message,返回的是message.

从chunk->message:recv_interlaced_message。(interlaced:交错的)

4.RTMP传输时会对数据格式化为RTMP message,实际传输的时候为了更好实现多路复用,分包和信息的公平性,发送端会把message分为带message id的chunk,每个chunk可以是单独一个message,也可以是message的一部分。

5.chunk又称消息块,由四部分组成:

https://cdn.nlark.com/yuque/0/2022/png/2544508/1644394562701-ebcb9dd1-cf4a-4793-ba8d-8a07f0c7f215.png

  • basic header:标识此chunk。
  • message header:标识此chunk负载所属消息。
  • Extended Timestamp:当时间戳溢出时才出现。
  • chunk data:此chunk的payload。

6.当时间戳不溢出时,解码成RTMP的chunk分为三个步骤:

  • 解析basic header,见代码SrsProtocol::read_basic_header
  • 解析message header,见代码SrsProtocol::read_message_header
  • 解析message payload,见代码SrsProtocol::read_message_payload

7. 入口函数为srs_error_t SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)

5.2.3.2解析basic header

1.basic header可能为1,2,3字节,包含 chunk type(chunk类型)和 chunk stream id(流通道id)

  • chunk type(fmt),2bit,决定了后面message header的格式。
  • chunk stream id简写为CSID,,唯一标识特定的流通道。

https://cdn.nlark.com/yuque/0/2022/png/2544508/1644395654045-7fe71d2a-3782-46ac-8e60-5182fafbf3b7.png

2.由于chunk type固定为2bit,所以CSID长度是6bit(8bit-2bit),14bit或者22bit,RTMP协议支持用户自定义3~65599之间的CSID,其中0,1,2由协议保留为特殊信息。

  • 0表示basic header要占用2个字节,csid在64~319之间
  • 1表示basic header要占用3个字节,csid在64~65599之间
  • 2表示该chunk是控制信息和一些命令命令。

3.当basic header为1字节,CSID占6bit,csid范围在0到63之间,用户自定义范围是3到63之间。

4.当basic header为2字节时,CSID占14bit,将第1字节的后6bit置为0,第2字节存储CSID,8bit可以表示0到255,所以csid的范围是64到319,319=255+64。

5.当basic header为3字节时,CDID占22bit,将第1字节的后6bit置为1,剩下16bit表示CSID,16bit可以表示0到65535,所以csid的范围是64到65599,65599=65535+64

  1. 代码分析:

5.2.3.3.解析message header

message header 包含发送信息的描述信息。message header的格式和长度取决于 basic header的chunk type(即fmt,2bit),共有4种格式。

1.Type=0时,message header 占11个字节,其他三种数据都能表示。

  • 在chunk stream 开始的第一个chunk和头信息中时间戳回退(即值与上一个chunk小,在回退播放会出现这种情况)的时候采用这种格式。

https://cdn.nlark.com/yuque/0/2021/png/2544508/1625643064728-ee500893-5a0e-45e5-9229-d1df13e9ef1b.png

  • timestamp:占3字节,表示时间戳,单位是ms。最大值为2^24-1,即最多能播放4.6个小时,超过时置为1,去extended timestamp解析实际时间戳,extended timestamp最多能播放1193个小时。
  • message length:占3字节,表示要发送的数据的长度,比如音频或者视频数据长度。
  • message type id:占1字节,表示消息类型,比如8代表音频数据,9代表视频数据。
  • message stream id:占4字节,表示chunk所在的流id。

2.Type = 1时,message header占7字节,省去了message stream id的4字节,表示此chunk和上一个chunk所在流相同。

https://cdn.nlark.com/yuque/0/2022/png/2544508/1644399465601-1f6c8b97-1887-46e8-b936-eb30563fe6b5.png

timestamp delta:占3字节,与type=0时timestamp不同,timestamp delta存储是和上一个chunk的时间戳差值

3.Type = 2时,message header占3字节,相对于type=1省去了3字节的消息长度和1字节的消息类型,表示此chunk和上一个chunk所在流,消息长度,类型都相同,余下3字节表示timestamp delta。

image.png

4.Type = 3时,message header占0字节,表示此chunk的message header和上一个完全相同。

  • 当它跟在type=0的chunk后面,表示和前一个chunk的时间戳完全相同,即message拆分成chunk的情况。
  • 当它跟在type=1或者type=2后面,表示和前一个chunk的时间戳差值是相同的。
  • 比如第一个chunk type=0,timestamp=100,第二个chunk type=2,timestamp delta=20,表示时间戳120,第三个chunk type=3,表示timestamp delta=20,时间戳为140。

5.代码:

srs_error_t SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt)

5.2.3.4解析message payload

1.message payload即Chunk Data,

2.获取message的payload,函数为:

srs_error_t SrsProtocol::read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg)

5.2.3.5协议控制消息

对于协议控制消息,会进行解析成packet进行相应处理

RTMP为协议控制消息保留消息类型id 1-7。这些消息包含RTM Chunk Stream协议或RTMP本身所需的信息。

  • id为1和2的协议消息保留给RTM Chunk Stream协议使用。
  • id为3-6的协议消息保留给RTMP使用。
  • 在边缘服务器和源服务器之间使用ID为7的协议消息。

处理的函数:srs_error_t SrsProtocol::on_recv_message(SrsCommonMessage* msg)

5.3. 消费message

1.解析完成message后,就要消费这个message,具体执行函数在SrsPublishRecvThread::consume。

2.接着调用SrsRtmpConn::handle_publish_message处理message。

3. 对应matedata、audio、video数据在SrsRtmpConn::process_publish_message函数中处理。

5.3.1. 解析onMetaData message

1.如果message type是17或者20表示命令消息,即onMetaData数据包,通常接收到的第一个媒体数据包就是onMetaData。

2.命令消息在客户机和服务器之间携带amf编码的命令。对于AMF0编码,这些消息的消息类型值为20,对于AMF3编码,消息类型值为17。

  • 这些消息被发送来执行一些操作,比如连接、createStream、发布、播放、暂停。onstatus、result等命令消息用于通知发送者所请求命令的状态。
  • 命令消息由命令名、事务ID和包含相关参数的命令对象组成。客户端或服务器可以通过使用命令消息与对等端通信的流请求远程过程调用(RPC)。

https://cdn.nlark.com/yuque/0/2022/png/2544508/1644478858788-65a1d1f1-22c6-408a-b9a0-639e38427f91.png

5.3.1. 解析onMetaData
1.如果message type的值为17或者20,那么将调用SrsRtmpServer::decode_message,SrsRtmpServer::decode_message继而调用SrsProtocol::decode_message。
2.其中SrsProtocol::do_decode_message函数会先从stream中获取command名称,然后根据command判断packet类型生成对应的packet进行解码。

3.实际解码函数是SrsOnMetaDataPacket::decode。

4.srs_amf0_read_any函数用于读取metadata携带的各项property。

5.其中SrsAmf0EcmaArray::read读取ECMA array的property内容。

5.3.2. 处理onMetaData

1.解析metadata数据后,调用SrsSource::on_meta_data函数对解析后的metadata做进一步的处理。

2.更新metadata缓存

srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated)

3.如果有其他客户端订阅了该直播流,通知这些客户端,将metadata加入SrsMessageQueue队列,注:

  • 如果没有其他客户端,不会进此逻辑,即不会存储metadata
  • SrsMessageQueue队列包括onMetaData,video,audio数据

srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)

4. 如果atc为false,检测时间抖动并校正。若传入的第二个参数为 SrsRtmpJitterAlgorithmOFF,则禁止所有的 jitter 校正,构造 SrsSource 的时候默认初始化为 SrsRtmpJitterAlgorithmOFF。

srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag)

5.SrsMessageQueue::enqueue具体负责将onMetaData的message插入到SrsMessageQueue队列.

srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)

6.存储到SrsFastVector的SrsMessageQueue队列。

void SrsFastVector::push_back(SrsSharedPtrMessage* msg)

7. 最后看SrsSource::on_meta_data函数的SrsOriginHub::on_meta_data函数。

8. 如果设置了forward模式,将onMetaData转发给各个forward节点,这部分等分析forward模式时再展开说明。

srs_error_t SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet)

5.3.3. 处理video message

1.回到SrsRtmpConn::process_publish_message函数看处理video流程,SrsSource::on_video函数是具体处理video数据的逻辑。

srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)

if ((err = source->on_video(msg)) != srs_success)

srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)

2.SrsSource::on_video_imp函数中实现功能:

  • 将msg发送到其他实体,比如转推到各个forward节点。
  • 如果有消费客户端,将共享message发送到SrsMessageQueue队列,队列包括onMetaData,video,audio。如果没有消费客户端,不会进行存储。
  • 如果是音频和视频数据,缓存gop cache。

srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)

3.SrsConsumer::enqueue函数解析见onMetaData部分。

4.如果是音频和视频数据,缓存gop cache。

srs_error_t SrsGopCache::cache(SrsSharedPtrMessage* shared_msg)

5.3.4. 处理audio message

和video message流程处理相同。

6.拉流消息处理

6.1.拉流消息处理概述

6.1.1整体播放流程

https://cdn.nlark.com/yuque/0/2022/png/2544508/1645672985904-65bd7b09-691c-4cf8-af35-e4e23eddda11.png

6.1.2. wiresharek抓包

image.png

6.1.3. _result

拉流消息中的_result消息回复的是客户端发送过来createStream消息。

6.2.拉流客户端发送getStreamLength、play和set Buffer Length消息

拉流客户端接收到SRS服务器的_result消息后,会连续发三个消息:getStreamLength、play和set Buffer Length消息给SRS服务器。

6.2.1. getStreamLength消息

1.wiresharek截图:

https://cdn.nlark.com/yuque/0/2022/png/2544508/1645673514252-187e315f-6c3a-43ac-9c36-f93a06a25148.png

2.拉流客户端生成'getStreamLength'调用并将其发送到服务器。如果服务器知道所选流的持续时间,它将以秒为单位进行应答。

3. FFmpeg对应代码:

static int gen_get_stream_length(URLContext *s, RTMPContext *rt)

6.2.2. play消息

1.wiresharek截图

image.png

2. 由客户端向服务器发起请求从服务器端接受数据(如果传输的信息是视频的话就是请求开始播流),可以多次调用,这样本地就会形成一组数据流的接收者。注意其中有一个reset字段,表示是覆盖之前的播流(设为true)还是重新开始一路播放(设为false)。

3.play命令的结构如下:

4.FFmpeg对应代码:

static int gen_play(URLContext *s, RTMPContext *rt)

6.2.3. set Buffer Length消息

1.wiresharek截图:

image.png

2.生成客户端缓冲区时间并将其发送到服务器。

static int gen_buffer_time(URLContext *s, RTMPContext *rt)

6.4. SRS服务器发送Stream Begin、onStatus(NetStream.Play.Reset)和onStatus(NetStream.Play.Start)等消息给拉流客户端

对应函数为srs_error_t SrsRtmpServer::start_play(int stream_id)

7.从SRS服务器拉RTMP流

7.1. SrsRtmpConn::stream_service_cycle

1.客户端从SRS服务器拉流主要逻辑入口在SrsRtmpConn::stream_service_cycle:

2.如果客户端类型为SrsRtmpConnPlay表示拉流。其中:

  • http_hooks_on_play() 方法中回调 on_play() 方法通知 vhost,用户已经开始 play。
  • http_hooks_on_stop() 方法中回调 on_stop() 方法通知 vhost,用户已经停止 play。
  • 最重要的函数是 playing(source),进入该函数。

7.2. SrsRtmpConn::playing

7.3. SrsRtmpConn::do_playing

SrsRtmpConn::do_playing主要有三个功能。

  • 播放控制信息,比如结束,暂停等。
  • 从消费者列表中取出Rtmp message(SrsMessageQueue)
  • 发送message给拉流客户端。

7.3.1.播放控制信息

SrsRtmpConn::do_playing-- process_play_control_msg(consumer, msg) SrsRtmpConn::process_play_control_msg

7.3.2. 从消费者列表中取出Rtmp message

1.从SrsMessageQueue获取messages,对应函数为:

SrsLiveConsumer::dump_packets

数据实际存储在:SrsMessageQueue的SrsFastVector中,注:

RTMP推流时,通过SrsMessageQueue::enqueue函数将onMetadata、audio和video数据加入到SrsFastVector数组中。

7.3.3. 发送message给拉流客户端

1.SrsProtocol::send_and_free_messages函数负责将message发送给拉流客户端。

2.调用do_send_messages(msgs, nb_msgs)进行发送到拉流客户端。

3.发送消息。

SrsProtocol::do_iovs_send-- srs_write_large_iovs