流媒体分析之srt 协议srs 服务器数据收发

根据上一个章节的,我们知道了srt 每个链路创建SrsMpegtsSrtConn对象,SrsMpegtsSrtConn 调用do_cycle函数。do_sycle 函数推拉流调用不同处理函数。推流处理publishing及拉流处理playing

1. 分析推流数据处理流程。


srs_error_t SrsMpegtsSrtConn::publishing()
{srs_error_t err = srs_success;// We must do stat the client before hooks, because hooks depends on it.SrsStatistic* stat = SrsStatistic::instance();if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) {return srs_error_wrap(err, "srt: stat client");}// We must do hook after stat, because depends on it.if ((err = http_hooks_on_publish()) != srs_success) {return srs_error_wrap(err, "srt: callback on publish");}if ((err = acquire_publish()) == srs_success) {err = do_publishing();release_publish();}http_hooks_on_unpublish();return err;
}

do_publishing  函数,调用srt_conn_->read 函数接收推流数据。

  on_srt_packet  函数处理数据。

srs_error_t SrsMpegtsSrtConn::do_publishing()
{srs_error_t err = srs_success;SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish();SrsAutoFree(SrsPithyPrint, pprint);int nb_packets = 0;// Max udp packet size equal to 1500.char buf[1500];while (true) {if ((err = trd_->pull()) != srs_success) {return srs_error_wrap(err, "srt: thread quit");}pprint->elapse();if (pprint->can_print()) {SrsSrtStat s;if ((err = s.fetch(srt_fd_, true)) != srs_success) {srs_freep(err);} else {srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " Transport Stats # pktRecv=%" PRId64 ", pktRcvLoss=%d, pktRcvRetrans=%d, pktRcvDrop=%d",s.pktRecv(), s.pktRcvLoss(), s.pktRcvRetrans(), s.pktRcvDrop());}kbps_->sample();srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " time=%" PRId64 ", packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",srsu2ms(pprint->age()), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m());nb_packets = 0;}ssize_t nb = 0;if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {return srs_error_wrap(err, "srt: recvmsg");}++nb_packets;if ((err = on_srt_packet(buf, nb)) != srs_success) {return srs_error_wrap(err, "srt: process packet");}}return err;
}

on_srt_packet  创建SrsSrtPacket 函数.执行 srt_source_->on_packet 。

srs_error_t SrsMpegtsSrtConn::on_srt_packet(char* buf, int nb_buf)
{srs_error_t err = srs_success;// Ignore if invalid length.if (nb_buf <= 0) {return err;}// Check srt payload, mpegts must be N times of SRS_TS_PACKET_SIZEif ((nb_buf % SRS_TS_PACKET_SIZE) != 0) {return srs_error_new(ERROR_SRT_CONN, "invalid ts packet len=%d", nb_buf);}// Check srt payload, the first byte must be 0x47if (buf[0] != 0x47) {return srs_error_new(ERROR_SRT_CONN, "invalid ts packet first=%#x", (uint8_t)buf[0]);}SrsSrtPacket* packet = new SrsSrtPacket();SrsAutoFree(SrsSrtPacket, packet);packet->wrap(buf, nb_buf);if ((err = srt_source_->on_packet(packet)) != srs_success) {return srs_error_wrap(err, "on srt packet");}return err;
}

 on_packet : srt 推流 给每个消费者 consumer->enqueue 推数据;

 bridge_->on_packet  :srt 转换其他协议的流数据。

srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
{srs_error_t err = srs_success;for (int i = 0; i < (int)consumers.size(); i++) {SrsSrtConsumer* consumer = consumers.at(i);if ((err = consumer->enqueue(packet->copy())) != srs_success) {return srs_error_wrap(err, "consume ts packet");}}if ((err = bridge_->on_packet(packet)) != srs_success) {return srs_error_wrap(err, "bridge consume message");}return err;
}

 2. srs服务器处理srt拉流处理。 

srs_error_t SrsMpegtsSrtConn::playing()
{srs_error_t err = srs_success;// We must do stat the client before hooks, because hooks depends on it.SrsStatistic* stat = SrsStatistic::instance();if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) {return srs_error_wrap(err, "rtmp: stat client");}// We must do hook after stat, because depends on it.if ((err = http_hooks_on_play()) != srs_success) {return srs_error_wrap(err, "rtmp: callback on play");}err = do_playing();http_hooks_on_stop();return err;
}

 do_playing() 函数:

consumer->dump_packet(&pkt);每个拉流消费者取出数据。

 srt_conn_->write   发送数据。 

srs_error_t SrsMpegtsSrtConn::do_playing()
{srs_error_t err = srs_success;SrsSrtConsumer* consumer = NULL;SrsAutoFree(SrsSrtConsumer, consumer);if ((err = srt_source_->create_consumer(consumer)) != srs_success) {return srs_error_wrap(err, "create consumer, ts source=%s", req_->get_stream_url().c_str());}srs_assert(consumer);// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.if ((err = srt_source_->consumer_dumps(consumer)) != srs_success) {return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());}SrsPithyPrint* pprint = SrsPithyPrint::create_srt_play();SrsAutoFree(SrsPithyPrint, pprint);SrsSrtRecvThread srt_recv_trd(srt_conn_);if ((err = srt_recv_trd.start()) != srs_success) {return srs_error_wrap(err, "start srt recv trd");}int nb_packets = 0;while (true) {if ((err = trd_->pull()) != srs_success) {return srs_error_wrap(err, "srt play thread");}if ((err = srt_recv_trd.get_recv_err()) != srs_success) {return srs_error_wrap(err, "srt play recv thread");}// Wait for amount of packets.SrsSrtPacket* pkt = NULL;SrsAutoFree(SrsSrtPacket, pkt);consumer->dump_packet(&pkt);if (!pkt) {// TODO: FIXME: We should check the quit event.consumer->wait(1, 1000 * SRS_UTIME_MILLISECONDS);continue;}++nb_packets;// reportablepprint->elapse();if (pprint->can_print()) {SrsSrtStat s;if ((err = s.fetch(srt_fd_, true)) != srs_success) {srs_freep(err);} else {srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " Transport Stats # pktSent=%" PRId64 ", pktSndLoss=%d, pktRetrans=%d, pktSndDrop=%d",s.pktSent(), s.pktSndLoss(), s.pktRetrans(), s.pktSndDrop());}kbps_->sample();srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " time=%" PRId64 ", packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",srsu2ms(pprint->age()), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m());nb_packets = 0;}ssize_t nb_write = 0;if ((err = srt_conn_->write(pkt->data(), pkt->size(), &nb_write)) != srs_success) {return srs_error_wrap(err, "srt send, size=%d", pkt->size());}// Yield to another coroutines.// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777542162// TODO: FIXME: Please check whether SRT sendmsg causing clock deviation, see srs_thread_yield of SrsUdpMuxSocket::sendto}return err;
}

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部