From 6de7768e9d71ac3535969b997a1a5e9f78fdbfeb Mon Sep 17 00:00:00 2001 From: beikesong <876740535@qq.com> Date: Sun, 15 Mar 2020 16:13:54 +0800 Subject: [PATCH] For #1500, rewrite 28181 stream decoder and clean some usless codes --- trunk/src/app/srs_app_gb28181.cpp | 914 +----------------------------- trunk/src/app/srs_app_gb28181.hpp | 99 +--- 2 files changed, 29 insertions(+), 984 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index b75b17ab91..b370275c3b 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -49,7 +49,6 @@ using namespace std; #include //#include - Srs28181AudioCache::Srs28181AudioCache() { dts = 0; @@ -319,6 +318,7 @@ srs_error_t Srs28181TcpStreamListener::remove_conn(Srs28181TcpStreamConn* c) return err; } + SrsLiveUdpListener::SrsLiveUdpListener(Srs28181UdpStreamListener* h, string i, int p) { handler = h; @@ -337,6 +337,7 @@ SrsLiveUdpListener::SrsLiveUdpListener(Srs28181UdpStreamListener* h, string i, i } + SrsOneCycleCoroutine::SrsOneCycleCoroutine(string n, ISrsCoroutineHandler* h, int cid) { name = n; @@ -604,13 +605,14 @@ Srs28181UdpStreamListener::~Srs28181UdpStreamListener() srs_trace("28181-udp-listener - deconstruction!"); } + static srs_utime_t DEFAULT_28181_SLEEP = 10 * SRS_UTIME_SECONDS; srs_error_t Srs28181UdpStreamListener::cycle() { srs_error_t err = srs_success; // only for testing SrsSTCoroutine self-destruction in cycle function - // should declarate lifeguard as a SrsSTCoroutine object + // before testing, we should declarate lifeguard as a SrsSTCoroutine object //srs_freep(lifeguard); //return srs_error_new(13027,"28181-udp-listener recv timeout"); //return err; @@ -674,12 +676,10 @@ srs_error_t Srs28181UdpStreamListener::listen(string i, int p) srs_error_t Srs28181UdpStreamListener::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { srs_error_t err = srs_success; - // using MB decoder - //int ret = streamcore->decode_packet(buf,nb_buf); - - // using TSB decoder - int ret = streamcore->decode_packet_v2(buf,nb_buf); - //srs_trace("28181 udp stream: recv size:%d", nb_buf); + + // TODO: should modify return value in future + // default 28181 stream decoder + int ret = streamcore->decode_packet(buf,nb_buf); if (ret != 0) { return srs_error_new(ret, "process 28181 udp stream"); } @@ -698,7 +698,13 @@ Srs28181StreamCore::Srs28181StreamCore(std::string suuid) // TODO: set stream_id when connected stream_id = 50125; video_id = stream_id; - boundary_type_ = MarkerBoundary;//TimestampBoundary; + + h264_sps = ""; + h264_pps = ""; + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = false; + cache_ = NULL; sdk = NULL; vjitter = new Srs28181Jitter(); @@ -962,12 +968,8 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) if (true) { SrsBuffer stream(buf,nb_buf); - //if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - // return ret; - //} - Srs2SRtpPacket pkt; - if ((ret = pkt.decode_v2(&stream)) != ERROR_SUCCESS) { + if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) { srs_error("28181: decode rtp packet failed. ret=%d", ret); return ret; } @@ -988,7 +990,7 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) return ret; }*/ - //pengzhang: correct rtp decode bug + //besson: correct rtp decode bug if (!cache_->completed) { return ret; } @@ -1014,18 +1016,9 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) // always free it. SrsAutoFree(Srs2SRtpPacket, cache_); -#ifdef PS_IN_RTP if ((status = cache_->decode_stream()) != ERROR_SUCCESS) { - if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { - //private_proto = true; - //only mention once - srs_error(" rtp type 96 ps. stream_id:%d", stream_id); - } - } -#else - // only rtp no ps - cache_->tgtstream->append(cache_->payload->bytes(),cache_->payload->length()); -#endif + // just continue running, check nothing + } srs_error_t err = srs_success; if ((err = on_stream_packet(cache_, stream_id)) != srs_success) { @@ -1036,133 +1029,6 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) return ret; } -// TODO: modify return type -int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) -{ - int ret = 0; - int status; - - pprint->elapse(); - - if (true) { - SrsBuffer stream(buf,nb_buf); - - /*if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return ret; - }*/ - - Srs2SRtpPacket pkt; - if ((ret = pkt.decode_v2(&stream, boundary_type_)) != ERROR_SUCCESS) { - srs_error("rtp auto decoder: decode rtp packet failed. ret=%d", ret); - return ret; - } - - if (pkt.chunked) { - if (!cache_) { - cache_ = new Srs2SRtpPacket(); - } - - if (boundary_type_ == MarkerBoundary) { - cache_->copy(&pkt); - cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - } - else if (boundary_type_ == TimestampBoundary) { - - // there is two conditions: - // 1.ts changing every rtp packet - // 2.ts changing every x rtp packets - // in any case, we should first copy the cached rtp packet from last loop - // cause we use ts boundary to decode rtp group, we determinte a group end after a new group beginng - if (first_rtp_tsb_enabled_) { - first_rtp_tsb_enabled_ = false; - - if (!first_rtp_tsb_) { - srs_error("rtp auto decoder: first_rtp_tsb_ is NULL!"); - ret = ERROR_RTP_PS_FIRST_TSB_LOSS; - return ret; - //srs_assert(first_rtp_tsb_==NULL); - } - - cache_->copy(first_rtp_tsb_); - cache_->payload->append(first_rtp_tsb_->payload->bytes(), first_rtp_tsb_->payload->length()); - srs_freep(first_rtp_tsb_); - } - - if (pkt.timestamp != cache_->timestamp) { - - // if timestamp change, enable flag and cache the first new rtp packet in group - first_rtp_tsb_enabled_ = true; - - srs_freep(first_rtp_tsb_); - first_rtp_tsb_ = new Srs2SRtpPacket(); - first_rtp_tsb_->copy(&pkt); - first_rtp_tsb_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - - cache_->completed = true; - } - else { - cache_->copy(&pkt); - cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - cache_->completed = false; - } - } - else { - srs_error("Unkonown rtp boundary type!"); - } - - /* - if (!cache->completed && pprint->can_print()) { - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", - nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, - cache->payload->length() - ); - return ret; - }*/ - - // correct rtp decode bug - if (!cache_->completed) { - return ret; - } - - } - else { - // bession:NOTE:if u receive a stream in the middle of rtp groups or a stream losses starting rtp packets, - // the following progress will skip this ncompleted packets - srs_freep(cache_); - cache_ = new Srs2SRtpPacket(); - cache_->reap(&pkt); - - } - } - - if (pprint->can_print()) { - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d, boundary type=%s", - stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, - cache_->payload->length(), cache_->chunked, boundary_type_==MarkerBoundary?"MKR":"TSB" - ); - } - - // always free it. - SrsAutoFree(Srs2SRtpPacket, cache_); - - if ((status = cache_->decode_stream()) != 0) { - if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { - //private_proto = true; - //only mention once - srs_error(" rtp type 96 ps. private proto port:%d, stream_id:%d", 0, stream_id); - } - } - - srs_error_t err = srs_success; - if ((err = on_stream_packet(cache_, stream_id)) != srs_success) { - srs_error("rtp auto decoder: process rtp packet failed. ret=%d",srs_error_code(err) ); - //invalid_rtp_num_++; - return -1; - } - - return ret; -} - srs_error_t Srs28181StreamCore::write_sequence_header() { srs_error_t err = srs_success; @@ -1377,764 +1243,28 @@ void Srs28181StreamCore::close() srs_freep(sdk); } + + Srs28181TcpStreamConn::Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o) { - output_template = o; - - target_tcUrl = "rtmp://127.0.0.1:7935/live/test";//"rtmp://127.0.0.1:" + "7935" + "/live/test"; - output_template = "rtmp://127.0.0.1:7935/[app]/[stream]"; - - session = ""; - // TODO: set stream_id when connected - stream_id = 50125; - video_id = stream_id; - - listener = l; - //caster = c; - stfd = fd; - skt = new SrsStSocket(); - trd = new SrsSTCoroutine("28181tcpstream", this); - - //req = NULL; - sdk = NULL; - vjitter = new Srs28181Jitter(); - ajitter = new Srs28181Jitter(); - - avc = new SrsRawH264Stream(); - aac = new SrsRawAacStream(); - acodec = new SrsRawAacStreamCodec(); - acache = new Srs28181AudioCache(); - pprint = SrsPithyPrint::create_caster(); } Srs28181TcpStreamConn::~Srs28181TcpStreamConn() { - close(); - - srs_close_stfd(stfd); - srs_freep(trd); - srs_freep(skt); - - srs_freep(sdk); - srs_freep(vjitter); - srs_freep(ajitter); - srs_freep(acodec); - srs_freep(acache); - srs_freep(pprint); } srs_error_t Srs28181TcpStreamConn::init() { srs_error_t err = srs_success; - if ((err = skt->initialize(stfd)) != srs_success) { - return srs_error_wrap(err, "socket initialize"); - } - - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "rtsp connection"); - } - - return err; -} - -#define SRS_RECV_BUFFER_SIZE 1024*10 -srs_error_t Srs28181TcpStreamConn::do_cycle() -{ - srs_error_t err = srs_success; - - // retrieve ip of client. - std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); - if (ip.empty() && !_srs_config->empty_ip_ok()) { - srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); - } - srs_trace("28181: serve %s", ip.c_str()); - - char buffer[SRS_RECV_BUFFER_SIZE]; - while (true) { - if ((err = trd->pull()) != srs_success) { - return srs_error_wrap(err, "28181 conn do_cycle"); - } - - ssize_t nb_read = 0; - if ((err = skt->read(buffer, SRS_RECV_BUFFER_SIZE, &nb_read)) != srs_success) { - return srs_error_wrap(err, "recv data"); - } - - decode_packet(buffer,nb_read); - } - - // make it happy return err; } -//#define GB28181_STREAM -srs_error_t Srs28181TcpStreamConn::on_rtp_packet(Srs2SRtpPacket* pkt, int stream_id) +srs_error_t Srs28181TcpStreamConn::cycle() { srs_error_t err = srs_success; - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - return srs_error_wrap(err, "connect"); - } - - if (stream_id == video_id) { - // rtsp tbn is ts tbn. - int64_t pts = pkt->timestamp; - if ((err = vjitter->correct(pts)) != srs_success) { - return srs_error_wrap(err, "jitter"); - } - - // TODO: FIXME: set dts to pts, please finger out the right dts. - int64_t dts = pts; - #ifdef GB28181_STREAM - return on_rtp_video_adv(pkt,dts,pts); - #else - return on_rtp_video(pkt, dts, pts); - #endif - - } else { - // rtsp tbn is ts tbn. - int64_t pts = pkt->timestamp; - if ((err = ajitter->correct(pts)) != srs_success) { - return srs_error_wrap(err, "jitter"); - } - - return on_rtp_audio(pkt, pts); - } - return err; } - - -srs_error_t Srs28181TcpStreamConn::on_rtp_video_adv(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts) -{ - - //int ret = ERROR_SUCCESS; - srs_error_t err = srs_success; - - if (pkt->tgtstream->length() <= 0) { - return srs_error_new(13002,"tetstream not enough"); - } - - SrsBuffer stream(pkt->tgtstream->bytes(), pkt->tgtstream->length()); - /*SrsBuffer stream; - if ((ret = stream.initialize(pkt->tgtstream->bytes(), pkt->tgtstream->length()))!=ERROR_SUCCESS) { - srs_trace("h264-ps stream: inValid Frame Size, frame size=%d, dts=%d", pkt->tgtstream->length(), dts); - return ret; - }*/ - - // send each frame. - // TODO: bks: find i frame then return directory. dont need compare every bytes - while (!stream.empty()) { - char* frame = NULL; - int frame_size = 0; - - if ((err = avc->annexb_demux(&stream, &frame, &frame_size)) != srs_success) { - return srs_error_wrap(err,"annexb demux"); - } - - // for highly reliable. Only give notification but exit - if (frame_size <= 0) { - srs_warn("h264 stream: frame_size <=0, and continue for next loop!"); - continue; - } - - // ignore others. - // 5bits, 7.3.1 NAL unit syntax, - // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame, 9: AUD - SrsAvcNaluType nut = (SrsAvcNaluType)(frame[0] & 0x1f); - if (nut != SrsAvcNaluTypeSPS && nut != SrsAvcNaluTypePPS //&& nut != SrsAvcNaluTypeSEI - && nut != SrsAvcNaluTypeIDR && nut != SrsAvcNaluTypeNonIDR - && nut != SrsAvcNaluTypeAccessUnitDelimiter - ) { - //srs_trace("h264-ps stream: Ignore this frame size=%d, dts=%d", frame_size, dts); - continue; - } - - // for sps - if (avc->is_sps(frame, frame_size)) { - std::string sps = ""; - if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { - srs_error("h264-ps: invalied sps in dts=%d",dts); - continue; - //return ret; - } - - if (h264_sps != sps) { - h264_sps = sps; - h264_sps_changed = true; - h264_sps_pps_sent = false; - srs_trace("h264-ps stream: set SPS frame size=%d, dts=%d", frame_size, dts); - } - } - - // for pps - if (avc->is_pps(frame, frame_size)) { - std::string pps = ""; - if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { - srs_error("h264-ps: invalied sps in dts=%d", dts); - continue; - //return ret; - } - - if (h264_pps != pps) { - h264_pps = pps; - h264_pps_changed = true; - h264_sps_pps_sent = false; - srs_trace("h264-ps stream: set PPS frame size=%d, dts=%d", frame_size, dts); - } - } - - // attention: now, we set sps/pps - if (h264_sps_changed && h264_pps_changed) { - - h264_sps_changed = false; - h264_pps_changed = false; - h264_sps_pps_sent = true; - - if ((err = write_h264_sps_pps(dts / 90, pts / 90)) != srs_success) { - srs_error("h264-ps stream: Re-write SPS-PPS Wrong! frame size=%d, dts=%d", frame_size, dts); - return srs_error_wrap(err,"re-write sps-pps failed"); - } - srs_warn("h264-ps stream: Re-write SPS-PPS Successful! frame size=%d, dts=%d", frame_size, dts); - } - - //pengzhang: make sure you control flows in one important function - //dont spread controlers everythere. mpegts_upd is not a good example - - // attention: should ship sps/pps frame in every tsb rtp group - // otherwise sps/pps will be written as ipb frame! - if (h264_sps_pps_sent && nut != SrsAvcNaluTypeSPS && nut != SrsAvcNaluTypePPS) { - if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { - srs_warn("h264-ps stream: kickoff audio cache dts=%d", dts); - return srs_error_wrap(err,"killoff audio cache failed"); - } - - // ibp frame. - // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. - srs_info("h264-ps stream: demux avc ibp frame size=%d, dts=%d", frame_size, dts); - if ((err = write_h264_ipb_frame(frame, frame_size, dts / 90, pts / 90)) != srs_success) { - return srs_error_wrap(err,"write ibp failed"); - } - } - }//while send frame - - return err; -} - -srs_error_t Srs28181TcpStreamConn::cycle() -{ - // serve the rtsp client. - srs_error_t err = do_cycle(); - - //caster->remove(this); - if (err == srs_success) { - srs_trace("client finished."); - } else if (srs_is_client_gracefully_close(err)) { - srs_warn("client disconnect peer. code=%d", srs_error_code(err)); - srs_freep(err); - } - - listener->remove_conn(this); - - /* do not need caster anymore - if (video_rtp) { - caster->free_port(video_rtp->port(), video_rtp->port() + 1); - } - - if (audio_rtp) { - caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); - }*/ - - return err; -} - -srs_error_t Srs28181TcpStreamConn::on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts) -{ - srs_error_t err = srs_success; - - if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { - return srs_error_wrap(err, "kickoff audio cache"); - } - - char* bytes = pkt->payload->bytes(); - int length = pkt->payload->length(); - uint32_t fdts = (uint32_t)(dts / 90); - uint32_t fpts = (uint32_t)(pts / 90); - if ((err = write_h264_ipb_frame(bytes, length, fdts, fpts)) != srs_success) { - return srs_error_wrap(err, "write ibp frame"); - } - - return err; -} - -srs_error_t Srs28181TcpStreamConn::on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts) -{ - srs_error_t err = srs_success; - - if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { - return srs_error_wrap(err, "kickoff audio cache"); - } - - // cache current audio to kickoff. - acache->dts = dts; - acache->audio = pkt->audio; - acache->payload = pkt->payload; - - pkt->audio = NULL; - pkt->payload = NULL; - - return err; -} - -srs_error_t Srs28181TcpStreamConn::kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts) -{ - srs_error_t err = srs_success; - - // nothing to kick off. - if (!acache->payload) { - return err; - } - - if (dts - acache->dts > 0 && acache->audio->nb_samples > 0) { - int64_t delta = (dts - acache->dts) / acache->audio->nb_samples; - for (int i = 0; i < acache->audio->nb_samples; i++) { - char* frame = acache->audio->samples[i].bytes; - int nb_frame = acache->audio->samples[i].size; - int64_t timestamp = (acache->dts + delta * i) / 90; - acodec->aac_packet_type = 1; - if ((err = write_audio_raw_frame(frame, nb_frame, acodec, (uint32_t)timestamp)) != srs_success) { - return srs_error_wrap(err, "write audio raw frame"); - } - } - } - - acache->dts = 0; - srs_freep(acache->audio); - srs_freep(acache->payload); - - return err; -} - -// TODO: modify return type -// can decode raw rtp+h264 or rtp+ps+h264 -int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) -{ - int ret = 0; - int status; - - pprint->elapse(); - - if (true) { - SrsBuffer stream(buf,nb_buf); - - //if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - // return ret; - //} - - Srs2SRtpPacket pkt; - if ((ret = pkt.decode_v2(&stream)) != ERROR_SUCCESS) { - srs_error("28181: decode rtp packet failed. ret=%d", ret); - return ret; - } - - if (pkt.chunked) { - if (!cache_) { - cache_ = new Srs2SRtpPacket(); - } - cache_->copy(&pkt); - cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - - /* - if (!cache->completed && pprint->can_print()) { - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", - nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, - cache->payload->length() - ); - return ret; - }*/ - - //pengzhang: correct rtp decode bug - if (!cache_->completed) { - return ret; - } - - } - else { - // : NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, - // the following progress will skip this ncompleted packet - srs_freep(cache_); - cache_ = new Srs2SRtpPacket(); - cache_->reap(&pkt); - - } - } - - // TODO: set stream_id when connected - stream_id = 50125; - - if (pprint->can_print()) { - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d", - stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, - cache_->payload->length(), cache_->chunked - ); - } - - // always free it. - SrsAutoFree(Srs2SRtpPacket, cache_); - -#ifdef PS_IN_RTP - if ((status = cache_->decode_stream()) != ERROR_SUCCESS) { - if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { - //private_proto = true; - //only mention once - srs_error(" rtp type 96 ps. stream_id:%d", stream_id); - } - } -#endif - - srs_error_t err = srs_success; - if ((err = on_rtp_packet(cache_, stream_id)) != srs_success) { - srs_error("28181: process rtp packet failed. ret=%d", ret); - return -1; - } - - return ret; -} - -// TODO: modify return type -int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) -{ - int ret = 0; - int status; - - pprint->elapse(); - - if (true) { - SrsBuffer stream(buf,nb_buf); - - /*if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return ret; - }*/ - - Srs2SRtpPacket pkt; - if ((ret = pkt.decode_v2(&stream, boundary_type_)) != ERROR_SUCCESS) { - srs_error("rtp auto decoder: decode rtp packet failed. ret=%d", ret); - return ret; - } - - if (pkt.chunked) { - if (!cache_) { - cache_ = new Srs2SRtpPacket(); - } - - if (boundary_type_ == MarkerBoundary) { - cache_->copy(&pkt); - cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - } - else if (boundary_type_ == TimestampBoundary) { - - // there is two conditions: - // 1.ts changing every rtp packet - // 2.ts changing every x rtp packets - // in any case, we should first copy the cached rtp packet from last loop - // cause we use ts boundary to decode rtp group, we determinte a group end after a new group beginng - if (first_rtp_tsb_enabled_) { - first_rtp_tsb_enabled_ = false; - - if (!first_rtp_tsb_) { - srs_error("rtp auto decoder: first_rtp_tsb_ is NULL!"); - ret = ERROR_RTP_PS_FIRST_TSB_LOSS; - return ret; - //srs_assert(first_rtp_tsb_==NULL); - } - - cache_->copy(first_rtp_tsb_); - cache_->payload->append(first_rtp_tsb_->payload->bytes(), first_rtp_tsb_->payload->length()); - srs_freep(first_rtp_tsb_); - } - - if (pkt.timestamp != cache_->timestamp) { - - // if timestamp change, enable flag and cache the first new rtp packet in group - first_rtp_tsb_enabled_ = true; - - srs_freep(first_rtp_tsb_); - first_rtp_tsb_ = new Srs2SRtpPacket(); - first_rtp_tsb_->copy(&pkt); - first_rtp_tsb_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - - cache_->completed = true; - } - else { - cache_->copy(&pkt); - cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); - cache_->completed = false; - } - } - else { - srs_error("Unkonown rtp boundary type!"); - } - - /* - if (!cache->completed && pprint->can_print()) { - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", - nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, - cache->payload->length() - ); - return ret; - }*/ - - //pengzhang: correct rtp decode bug - if (!cache_->completed) { - return ret; - } - - } - else { - // pengzhang: NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, - // the following progress will skip this ncompleted packet - srs_freep(cache_); - cache_ = new Srs2SRtpPacket(); - cache_->reap(&pkt); - - } - } - - if (pprint->can_print()) { - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d, boundary type=%s", - stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, - cache_->payload->length(), cache_->chunked, boundary_type_==MarkerBoundary?"MKR":"TSB" - ); - } - - // always free it. - SrsAutoFree(Srs2SRtpPacket, cache_); - - if ((status = cache_->decode_stream()) != 0) { - if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { - //private_proto = true; - //only mention once - srs_error(" rtp type 96 ps. private proto port:%d, stream_id:%d", 0, stream_id); - } - } - - srs_error_t err = srs_success; - if ((err = on_rtp_packet(cache_, stream_id)) != srs_success) { - srs_error("rtp auto decoder: process rtp packet failed. ret=%d", srs_error_code(err)); - //invalid_rtp_num_++; - return -1; - } - - return ret; -} - -srs_error_t Srs28181TcpStreamConn::write_sequence_header() -{ - srs_error_t err = srs_success; - - // use the current dts. - int64_t dts = vjitter->timestamp() / 90; - - // send video sps/pps - if ((err = write_h264_sps_pps((uint32_t)dts, (uint32_t)dts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - - // generate audio sh by audio specific config. - if (true) { - std::string sh = aac_specific_config; - - SrsFormat* format = new SrsFormat(); - SrsAutoFree(SrsFormat, format); - - if ((err = format->on_aac_sequence_header((char*)sh.c_str(), (int)sh.length())) != srs_success) { - return srs_error_wrap(err, "on aac sequence header"); - } - - SrsAudioCodecConfig* dec = format->acodec; - - acodec->sound_format = SrsAudioCodecIdAAC; - acodec->sound_type = (dec->aac_channels == 2)? SrsAudioChannelsStereo : SrsAudioChannelsMono; - acodec->sound_size = SrsAudioSampleBits16bit; - acodec->aac_packet_type = 0; - - static int srs_aac_srates[] = { - 96000, 88200, 64000, 48000, - 44100, 32000, 24000, 22050, - 16000, 12000, 11025, 8000, - 7350, 0, 0, 0 - }; - switch (srs_aac_srates[dec->aac_sample_rate]) { - case 11025: - acodec->sound_rate = SrsAudioSampleRate11025; - break; - case 22050: - acodec->sound_rate = SrsAudioSampleRate22050; - break; - case 44100: - acodec->sound_rate = SrsAudioSampleRate44100; - break; - default: - break; - }; - - if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (uint32_t)dts)) != srs_success) { - return srs_error_wrap(err, "write audio raw frame"); - } - } - - return err; -} - -srs_error_t Srs28181TcpStreamConn::write_h264_sps_pps(uint32_t dts, uint32_t pts) -{ - srs_error_t err = srs_success; - - // h264 raw to h264 packet. - std::string sh; - if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { - return srs_error_wrap(err, "mux sequence header"); - } - - // h264 packet to flv packet. - int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; - int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; - char* flv = NULL; - int nb_flv = 0; - if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { - return srs_error_wrap(err, "mux avc to flv"); - } - - // the timestamp in rtmp message header is dts. - uint32_t timestamp = dts; - if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { - return srs_error_wrap(err, "write packet"); - } - - return err; -} - -srs_error_t Srs28181TcpStreamConn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) -{ - srs_error_t err = srs_success; - - // 5bits, 7.3.1 NAL unit syntax, - // ISO_IEC_14496-10-AVC-2003.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame - SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); - - // for IDR frame, the frame is keyframe. - SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; - if (nal_unit_type == SrsAvcNaluTypeIDR) { - frame_type = SrsVideoAvcFrameTypeKeyFrame; - } - - std::string ibp; - if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { - return srs_error_wrap(err, "mux ibp frame"); - } - - int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; - char* flv = NULL; - int nb_flv = 0; - if ((err = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { - return srs_error_wrap(err, "mux avc to flv"); - } - - // the timestamp in rtmp message header is dts. - uint32_t timestamp = dts; - return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); -} - -srs_error_t Srs28181TcpStreamConn::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) -{ - srs_error_t err = srs_success; - - char* data = NULL; - int size = 0; - if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { - return srs_error_wrap(err, "mux aac to flv"); - } - - return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); -} - -srs_error_t Srs28181TcpStreamConn::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) -{ - srs_error_t err = srs_success; - - if ((err = connect()) != srs_success) { - return srs_error_wrap(err, "connect"); - } - - SrsSharedPtrMessage* msg = NULL; - - if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { - return srs_error_wrap(err, "create message"); - } - srs_assert(msg); - - // send out encoded msg. - if ((err = sdk->send_and_free_message(msg)) != srs_success) { - close(); - return srs_error_wrap(err, "write message"); - } - - return err; -} - -srs_error_t Srs28181TcpStreamConn::connect() -{ - srs_error_t err = srs_success; - - // Ignore when connected. - if (sdk) { - return err; - } - - // generate rtmp url to connect to. - std::string url; - //if (!req) { - if(target_tcUrl != ""){ - std::string schema, host, vhost, app, param; - int port; - srs_discovery_tc_url(target_tcUrl, schema, host, vhost, app, stream_name, port, param); - - // generate output by template. - std::string output = output_template; - output = srs_string_replace(output, "[app]", app); - output = srs_string_replace(output, "[stream]", stream_name); - url = output; - } - - srs_trace("28181 stream - target_tcurl:%s,stream_name:%s, url:%s", - target_tcUrl.c_str(),stream_name.c_str(),url.c_str()); - - // connect host. - srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; - srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; - sdk = new SrsSimpleRtmpClient(url, cto, sto); - - if ((err = sdk->connect()) != srs_success) { - close(); - return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); - } - - // publish. - if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { - close(); - return srs_error_wrap(err, "publish %s failed", url.c_str()); - } - - return write_sequence_header(); -} - -void Srs28181TcpStreamConn::close() -{ - srs_freep(sdk); -} \ No newline at end of file diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index ae16b70968..7c6f8a7b19 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -281,7 +281,7 @@ class Srs28181Jitter virtual srs_error_t correct(int64_t& ts); }; -// 28181 stream core functions +// 28181 stream core class Srs28181StreamCore { private: @@ -319,29 +319,18 @@ class Srs28181StreamCore Srs28181AudioCache* acache; private: // this param group using on rtp packet decode - + int stream_id; Srs2SRtpPacket* cache_; - - // the timestamp of a rtp group - uint32_t group_timestamp; - // true says using timestamp boundary - bool first_rtp_tsb_enabled_; - // first rtp with new timestamp in a rtp group - Srs2SRtpPacket * first_rtp_tsb_; - - // indicates rtp group boundary decode type: marker or timestamp - int boundary_type_; public: Srs28181StreamCore(std::string suuid); virtual ~Srs28181StreamCore(); public: - // decode rtp using MB boundary + // decode rtp virtual int decode_packet(char* buf, int nb_buf); - // decode rtp using TSB/MB boundary - virtual int decode_packet_v2(char* buf, int nb_buf); + // internal methods public: virtual srs_error_t on_stream_packet(Srs2SRtpPacket* pkt, int stream_id); @@ -373,89 +362,15 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler std::string target_tcUrl; std::string stream_name; SrsPithyPrint* pprint; -private: - std::string session; - // video stream. - int video_id; - std::string video_codec; - // audio stream. - int audio_id; - std::string audio_codec; - int audio_sample_rate; - int audio_channel; -private: - srs_netfd_t stfd; - SrsStSocket* skt; - Srs28181TcpStreamListener* listener; - SrsCoroutine* trd; -private: - //SrsRequest* req; - SrsSimpleRtmpClient* sdk; - Srs28181Jitter* vjitter; - Srs28181Jitter* ajitter; -private: - SrsRawH264Stream* avc; - std::string h264_sps; - std::string h264_pps; - bool h264_sps_changed; - bool h264_pps_changed; - bool h264_sps_pps_sent; -private: - SrsRawAacStream* aac; - SrsRawAacStreamCodec* acodec; - std::string aac_specific_config; - Srs28181AudioCache* acache; -private: - // this param group using on rtp packet decode - - int stream_id; - - Srs2SRtpPacket* cache_; - - // the timestamp of a rtp group - uint32_t group_timestamp; - // if timestamp boundary flag enabled - // true says using rtp timestamp decode rtp group - bool first_rtp_tsb_enabled_; - // first rtp with new timestamp in a rtp group - Srs2SRtpPacket * first_rtp_tsb_; - - // indicates rtp group boundary decode type: marker or timestamp - int boundary_type_; public: Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o); virtual ~Srs28181TcpStreamConn(); -public: - virtual srs_error_t init(); -private: - virtual srs_error_t do_cycle(); -private: - // decode rtp using MB boundary - virtual int decode_packet(char* buf, int nb_buf); - // decode rtp using TSB/MB boundary - virtual int decode_packet_v2(char* buf, int nb_buf); -// internal methods public: - virtual srs_error_t on_rtp_packet(Srs2SRtpPacket* pkt, int stream_id); - virtual srs_error_t on_rtp_video_adv(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts); -// Interface ISrsOneCycleThreadHandler + srs_error_t init(); + public: virtual srs_error_t cycle(); -private: - virtual srs_error_t on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts); - virtual srs_error_t on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts); - virtual srs_error_t kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts); -private: - virtual srs_error_t write_sequence_header(); - virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); - virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); - virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); - virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); -private: - // Connect to RTMP server. - virtual srs_error_t connect(); - // Close the connection to RTMP server. - virtual void close(); + }; #endif \ No newline at end of file