From a3be0c8afae35af30f54149a3abec4fcc108ced8 Mon Sep 17 00:00:00 2001 From: beikesong <876740535@qq.com> Date: Sat, 14 Mar 2020 23:21:23 +0800 Subject: [PATCH] For #1500, move all 28181 codes to new files --- trunk/src/app/srs_app_gb28181.cpp | 204 +++--------- trunk/src/app/srs_app_gb28181.hpp | 82 ++--- trunk/src/app/srs_app_http_api.cpp | 4 + trunk/src/app/srs_app_rtsp.cpp | 22 +- trunk/src/app/srs_app_rtsp.hpp | 5 +- trunk/src/app/srs_app_server.cpp | 265 +-------------- trunk/src/app/srs_app_server.hpp | 83 ----- trunk/src/protocol/srs_rtsp_stack.cpp | 446 +------------------------- trunk/src/protocol/srs_rtsp_stack.hpp | 88 +---- 9 files changed, 106 insertions(+), 1093 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 425d1f8c08..7d4d303e5f 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -28,7 +28,7 @@ using namespace std; #include #include -#include +#include #include #include #include @@ -49,6 +49,7 @@ using namespace std; #include //#include + Srs28181AudioCache::Srs28181AudioCache() { dts = 0; @@ -115,8 +116,6 @@ Srs28181StreamServer::Srs28181StreamServer(SrsConfDirective* c) output = _srs_config->get_stream_caster_output(c); local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); - //trd = new SrsSTCoroutine(); - //trd->start(); } Srs28181StreamServer::~Srs28181StreamServer() @@ -252,21 +251,6 @@ Srs28181TcpStreamListener::Srs28181TcpStreamListener() listener = NULL; } -//(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) -/* -Srs28181TcpStreamListener::Srs28181TcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) -{ - listener = NULL; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerGB28181TcpStream); - if (type == SrsListenerGB28181TcpStream) { - caster = new Srs28181Caster(c); - } -} -*/ - Srs28181TcpStreamListener::~Srs28181TcpStreamListener() { std::vector::iterator it; @@ -284,10 +268,6 @@ srs_error_t Srs28181TcpStreamListener::listen(string i, int p) { srs_error_t err = srs_success; - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - //srs_assert(type == SrsListenerRtsp); - std::string ip = i; int port = p; @@ -422,7 +402,7 @@ void SrsOneCycleCoroutine::stop() void* res = NULL; int r0 = st_thread_join((st_thread_t)trd, &res); // will confirm with winlin - // i think it is not nessary + // i think it is not nessary in one cycle thread //srs_assert(!r0); srs_error_t err_res = (srs_error_t)res; @@ -455,11 +435,6 @@ void SrsOneCycleCoroutine::interrupt() st_thread_interrupt((st_thread_t)trd); } -// srs_error_t SrsOneCycleCoroutine::pull() -// { -// return srs_error_copy(trd_err); -// } - int SrsOneCycleCoroutine::cid() { return context; @@ -477,7 +452,7 @@ srs_error_t SrsOneCycleCoroutine::cycle() srs_error_t err = handler->cycle(); // Set cycle done, no need to interrupt it. - // besson: i think we should set cycle_down here + // besson: in one cycle thread, i think we should set cycle_down here // we don't need to interrupt or stop a thread if it return from cycle function anymore cycle_done = true; @@ -497,7 +472,7 @@ void* SrsOneCycleCoroutine::pfn(void* arg) srs_error_t err = p->cycle(); - // besson: should exit here in OneCyleCoroutine + // besson: it should exit here in OneCyleCoroutine st_thread_exit(NULL); // Set the err for function pull to fetch it. @@ -513,12 +488,6 @@ void* SrsOneCycleCoroutine::pfn(void* arg) } - - - - - - SrsLiveUdpListener::~SrsLiveUdpListener() { trd->stop(); @@ -627,7 +596,6 @@ Srs28181UdpStreamListener::Srs28181UdpStreamListener(Srs28181StreamServer* srv, listener = NULL; streamcore = new Srs28181StreamCore(suuid); lifeguard = NULL; - //workdone = true; } Srs28181UdpStreamListener::~Srs28181UdpStreamListener() @@ -659,6 +627,7 @@ srs_error_t Srs28181UdpStreamListener::cycle() { srs_warn("28181-udp-listener - recv timeout. we will release this listener[%d-%d]", DEFAULT_28181_SLEEP,nb_packet); + server->release_listener(this); return srs_error_new(13027,"28181-udp-listener recv timeout"); } @@ -681,7 +650,6 @@ srs_error_t Srs28181UdpStreamListener::listen(string i, int p) port = p; srs_freep(listener); - //listener = new SrsUdpListener(this, ip, port); listener = new SrsLiveUdpListener(this, ip, port); if ((err = listener->listen()) != srs_success) { @@ -698,6 +666,7 @@ srs_error_t Srs28181UdpStreamListener::listen(string i, int p) if ((err = lifeguard->start()) != srs_success) { return srs_error_wrap(err, "start thread"); } + //string v = srs_listener_type2string(type); srs_trace("%s listen 28181 stream at udp://%s:%d, fd=%d", "v.c_str()", ip.c_str(), port, listener->fd()); @@ -721,10 +690,9 @@ srs_error_t Srs28181UdpStreamListener::on_udp_packet(const sockaddr* from, const } -Srs28181StreamCore::Srs28181StreamCore(std::string suuid)//(Srs28181TcpStreamListener* l, std::string o) +Srs28181StreamCore::Srs28181StreamCore(std::string suuid) { - //output_template = o; - + // TODO: may rewrite stream name formation target_tcUrl = "rtmp://127.0.0.1:7935/live/"+suuid;//"rtmp://127.0.0.1:" + "7935" + "/live/test"; output_template = "rtmp://127.0.0.1:7935/[app]/[stream]"; @@ -734,7 +702,6 @@ Srs28181StreamCore::Srs28181StreamCore(std::string suuid)//(Srs28181TcpStreamLis video_id = stream_id; boundary_type_ = MarkerBoundary;//TimestampBoundary; - //req = NULL; sdk = NULL; vjitter = new Srs28181Jitter(); ajitter = new Srs28181Jitter(); @@ -759,7 +726,7 @@ Srs28181StreamCore::~Srs28181StreamCore() } #define GB28181_STREAM -srs_error_t Srs28181StreamCore::on_stream_packet(SrsRtpPacket* pkt, int stream_id) +srs_error_t Srs28181StreamCore::on_stream_packet(Srs2SRtpPacket* pkt, int stream_id) { srs_error_t err = srs_success; @@ -797,7 +764,7 @@ srs_error_t Srs28181StreamCore::on_stream_packet(SrsRtpPacket* pkt, int stream_i } -srs_error_t Srs28181StreamCore::on_stream_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +srs_error_t Srs28181StreamCore::on_stream_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts) { //int ret = ERROR_SUCCESS; @@ -846,14 +813,6 @@ srs_error_t Srs28181StreamCore::on_stream_video(SrsRtpPacket* pkt, int64_t dts, continue; } - char sc[3]; - sc[0] = (char)0x00; - sc[1] = (char)0x00; - sc[2] = (char)0x01; - stream2file("./h264_wframe.h264", sc, 3); - stream2file("./h264_wframe.h264", frame, frame_size); - stream2file("./h264_nosc.h264", frame, frame_size); - // for sps if (avc->is_sps(frame, frame_size)) { std::string sps = ""; @@ -902,7 +861,7 @@ srs_error_t Srs28181StreamCore::on_stream_video(SrsRtpPacket* pkt, int64_t dts, srs_warn("h264-ps stream: Re-write SPS-PPS Successful! frame size=%d, dts=%d", frame_size, dts); } - //pengzhang: make sure you control flows in one important function + //besson: make sure you control flows in one important function //dont spread controlers everythere. mpegts_upd is not a good example // attention: should ship sps/pps frame in every tsb rtp group @@ -925,36 +884,7 @@ srs_error_t Srs28181StreamCore::on_stream_video(SrsRtpPacket* pkt, int64_t dts, return err; } -/* -srs_error_t Srs28181StreamCore::cycle() -{ - // serve the rtsp client. - srs_error_t err = do_cycle(); - - //caster->remove(this); - if (err == srs_success) { - srs_trace("client finished."); - } else if (srs_is_client_gracefully_close(err)) { - srs_warn("client disconnect peer. code=%d", srs_error_code(err)); - srs_freep(err); - } - - listener->remove_conn(this); - - // do not need caster anymore - // if (video_rtp) { - // caster->free_port(video_rtp->port(), video_rtp->port() + 1); - // } - - // if (audio_rtp) { - // caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); - // } - - return err; -} -*/ - -srs_error_t Srs28181StreamCore::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +srs_error_t Srs28181StreamCore::on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts) { srs_error_t err = srs_success; @@ -973,7 +903,7 @@ srs_error_t Srs28181StreamCore::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int return err; } -srs_error_t Srs28181StreamCore::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) +srs_error_t Srs28181StreamCore::on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts) { srs_error_t err = srs_success; @@ -992,7 +922,7 @@ srs_error_t Srs28181StreamCore::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) return err; } -srs_error_t Srs28181StreamCore::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) +srs_error_t Srs28181StreamCore::kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts) { srs_error_t err = srs_success; @@ -1031,8 +961,6 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) pprint->elapse(); - stream2file("rtp.mp4",buf,nb_buf); - if (true) { SrsBuffer stream(buf,nb_buf); @@ -1040,7 +968,7 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) // return ret; //} - SrsRtpPacket pkt; + Srs2SRtpPacket pkt; if ((ret = pkt.decode_v2(&stream)) != ERROR_SUCCESS) { srs_error("28181: decode rtp packet failed. ret=%d", ret); return ret; @@ -1048,7 +976,7 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) if (pkt.chunked) { if (!cache_) { - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); } cache_->copy(&pkt); cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); @@ -1072,7 +1000,7 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) // : NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, // the following progress will skip this ncompleted packet srs_freep(cache_); - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); cache_->reap(&pkt); } @@ -1086,11 +1014,9 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) } // always free it. - SrsAutoFree(SrsRtpPacket, cache_); + SrsAutoFree(Srs2SRtpPacket, cache_); #ifdef PS_IN_RTP - stream2file("./ps.ps",cache_->payload->bytes(), cache_->payload->length()); - // ps stream if ((status = cache_->decode_stream()) != ERROR_SUCCESS) { if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { //private_proto = true; @@ -1103,11 +1029,6 @@ int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) cache_->tgtstream->append(cache_->payload->bytes(),cache_->payload->length()); #endif - stream2file("./h264.h264",cache_->tgtstream->bytes(),cache_->tgtstream->length()); - return 0; - // temporarily return on testing - //return ret; - srs_error_t err = srs_success; if ((err = on_stream_packet(cache_, stream_id)) != srs_success) { srs_error("28181: process rtp packet failed. ret=%d",err->error_code(err) ); @@ -1125,8 +1046,6 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) pprint->elapse(); - stream2file("rtp.mp4", buf, nb_buf); - if (true) { SrsBuffer stream(buf,nb_buf); @@ -1134,7 +1053,7 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) return ret; }*/ - SrsRtpPacket pkt; + Srs2SRtpPacket pkt; if ((ret = pkt.decode_v2(&stream, boundary_type_)) != ERROR_SUCCESS) { srs_error("rtp auto decoder: decode rtp packet failed. ret=%d", ret); return ret; @@ -1142,7 +1061,7 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) if (pkt.chunked) { if (!cache_) { - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); } if (boundary_type_ == MarkerBoundary) { @@ -1177,7 +1096,7 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) first_rtp_tsb_enabled_ = true; srs_freep(first_rtp_tsb_); - first_rtp_tsb_ = new SrsRtpPacket(); + first_rtp_tsb_ = new Srs2SRtpPacket(); first_rtp_tsb_->copy(&pkt); first_rtp_tsb_->payload->append(pkt.payload->bytes(), pkt.payload->length()); @@ -1209,10 +1128,10 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) } else { - // pengzhang: NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, - // the following progress will skip this ncompleted packet + // bession:NOTE:if u receive a stream in the middle of rtp groups or a stream losses starting rtp packets, + // the following progress will skip this ncompleted packets srs_freep(cache_); - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); cache_->reap(&pkt); } @@ -1226,10 +1145,8 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) } // always free it. - SrsAutoFree(SrsRtpPacket, cache_); + SrsAutoFree(Srs2SRtpPacket, cache_); - stream2file("./ps.ps", cache_->payload->bytes(), cache_->payload->length()); - // ps stream if ((status = cache_->decode_stream()) != 0) { if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { //private_proto = true; @@ -1238,10 +1155,6 @@ int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) } } - stream2file("./h264.h264", cache_->tgtstream->bytes(), cache_->tgtstream->length()); - // temporarily return on testing - //return ret; - srs_error_t err = srs_success; if ((err = on_stream_packet(cache_, stream_id)) != srs_success) { srs_error("rtp auto decoder: process rtp packet failed. ret=%d",srs_error_code(err) ); @@ -1466,12 +1379,6 @@ void Srs28181StreamCore::close() srs_freep(sdk); } - - - - - - Srs28181TcpStreamConn::Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o) { output_template = o; @@ -1480,8 +1387,6 @@ Srs28181TcpStreamConn::Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_n output_template = "rtmp://127.0.0.1:7935/[app]/[stream]"; session = ""; - // video_rtp = NULL; - //audio_rtp = NULL; // TODO: set stream_id when connected stream_id = 50125; @@ -1491,7 +1396,6 @@ Srs28181TcpStreamConn::Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_n //caster = c; stfd = fd; skt = new SrsStSocket(); - //rtsp = new SrsRtspStack(skt); trd = new SrsSTCoroutine("28181tcpstream", this); //req = NULL; @@ -1569,7 +1473,7 @@ srs_error_t Srs28181TcpStreamConn::do_cycle() } //#define GB28181_STREAM -srs_error_t Srs28181TcpStreamConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) +srs_error_t Srs28181TcpStreamConn::on_rtp_packet(Srs2SRtpPacket* pkt, int stream_id) { srs_error_t err = srs_success; @@ -1607,7 +1511,7 @@ srs_error_t Srs28181TcpStreamConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_i } -srs_error_t Srs28181TcpStreamConn::on_rtp_video_adv(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +srs_error_t Srs28181TcpStreamConn::on_rtp_video_adv(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts) { //int ret = ERROR_SUCCESS; @@ -1640,10 +1544,6 @@ srs_error_t Srs28181TcpStreamConn::on_rtp_video_adv(SrsRtpPacket* pkt, int64_t d continue; } - //if ((ret = avc->annexb_demux(stream, &frame, &frame_size)) != ERROR_SUCCESS) { - // return ret; - //} - // ignore others. // 5bits, 7.3.1 NAL unit syntax, // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. @@ -1657,14 +1557,6 @@ srs_error_t Srs28181TcpStreamConn::on_rtp_video_adv(SrsRtpPacket* pkt, int64_t d continue; } - char sc[3]; - sc[0] = (char)0x00; - sc[1] = (char)0x00; - sc[2] = (char)0x01; - stream2file("./h264_wframe.h264", sc, 3); - stream2file("./h264_wframe.h264", frame, frame_size); - stream2file("./h264_nosc.h264", frame, frame_size); - // for sps if (avc->is_sps(frame, frame_size)) { std::string sps = ""; @@ -1763,7 +1655,7 @@ srs_error_t Srs28181TcpStreamConn::cycle() return err; } -srs_error_t Srs28181TcpStreamConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +srs_error_t Srs28181TcpStreamConn::on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts) { srs_error_t err = srs_success; @@ -1782,7 +1674,7 @@ srs_error_t Srs28181TcpStreamConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, return err; } -srs_error_t Srs28181TcpStreamConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) +srs_error_t Srs28181TcpStreamConn::on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts) { srs_error_t err = srs_success; @@ -1801,7 +1693,7 @@ srs_error_t Srs28181TcpStreamConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) return err; } -srs_error_t Srs28181TcpStreamConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) +srs_error_t Srs28181TcpStreamConn::kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts) { srs_error_t err = srs_success; @@ -1839,8 +1731,6 @@ int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) pprint->elapse(); - stream2file("rtp.mp4",buf,nb_buf); - if (true) { SrsBuffer stream(buf,nb_buf); @@ -1848,7 +1738,7 @@ int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) // return ret; //} - SrsRtpPacket pkt; + Srs2SRtpPacket pkt; if ((ret = pkt.decode_v2(&stream)) != ERROR_SUCCESS) { srs_error("28181: decode rtp packet failed. ret=%d", ret); return ret; @@ -1856,7 +1746,7 @@ int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) if (pkt.chunked) { if (!cache_) { - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); } cache_->copy(&pkt); cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); @@ -1880,7 +1770,7 @@ int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) // : NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, // the following progress will skip this ncompleted packet srs_freep(cache_); - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); cache_->reap(&pkt); } @@ -1897,11 +1787,9 @@ int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) } // always free it. - SrsAutoFree(SrsRtpPacket, cache_); + SrsAutoFree(Srs2SRtpPacket, cache_); #ifdef PS_IN_RTP - stream2file("./ps.ps",cache_->payload->bytes(), cache_->payload->length()); - // ps stream if ((status = cache_->decode_stream()) != ERROR_SUCCESS) { if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { //private_proto = true; @@ -1911,10 +1799,6 @@ int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) } #endif - stream2file("./h264.h264",cache_->tgtstream->bytes(),cache_->tgtstream->length()); - // temporarily return on testing - //return ret; - srs_error_t err = srs_success; if ((err = on_rtp_packet(cache_, stream_id)) != srs_success) { srs_error("28181: process rtp packet failed. ret=%d", ret); @@ -1932,8 +1816,6 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) pprint->elapse(); - stream2file("rtp.mp4", buf, nb_buf); - if (true) { SrsBuffer stream(buf,nb_buf); @@ -1941,7 +1823,7 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) return ret; }*/ - SrsRtpPacket pkt; + Srs2SRtpPacket pkt; if ((ret = pkt.decode_v2(&stream, boundary_type_)) != ERROR_SUCCESS) { srs_error("rtp auto decoder: decode rtp packet failed. ret=%d", ret); return ret; @@ -1949,7 +1831,7 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) if (pkt.chunked) { if (!cache_) { - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); } if (boundary_type_ == MarkerBoundary) { @@ -1984,7 +1866,7 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) first_rtp_tsb_enabled_ = true; srs_freep(first_rtp_tsb_); - first_rtp_tsb_ = new SrsRtpPacket(); + first_rtp_tsb_ = new Srs2SRtpPacket(); first_rtp_tsb_->copy(&pkt); first_rtp_tsb_->payload->append(pkt.payload->bytes(), pkt.payload->length()); @@ -2019,7 +1901,7 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) // pengzhang: NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, // the following progress will skip this ncompleted packet srs_freep(cache_); - cache_ = new SrsRtpPacket(); + cache_ = new Srs2SRtpPacket(); cache_->reap(&pkt); } @@ -2033,10 +1915,8 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) } // always free it. - SrsAutoFree(SrsRtpPacket, cache_); + SrsAutoFree(Srs2SRtpPacket, cache_); - stream2file("./ps.ps", cache_->payload->bytes(), cache_->payload->length()); - // ps stream if ((status = cache_->decode_stream()) != 0) { if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { //private_proto = true; @@ -2045,10 +1925,6 @@ int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) } } - stream2file("./h264.h264", cache_->tgtstream->bytes(), cache_->tgtstream->length()); - // temporarily return on testing - //return ret; - srs_error_t err = srs_success; if ((err = on_rtp_packet(cache_, stream_id)) != srs_success) { srs_error("rtp auto decoder: process rtp packet failed. ret=%d", srs_error_code(err)); diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 6d51159b8e..ae16b70968 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -38,7 +38,7 @@ class SrsStSocket; class SrsConfDirective; -class SrsRtpPacket; +class Srs2SRtpPacket; class SrsRequest; class SrsStSocket; class SrsRtmpClient; @@ -92,18 +92,14 @@ class Srs28181StreamServer virtual void remove(); }; -// A common listener, for RTMP/HTTP server. +// A base listener class Srs28181Listener { -protected: - //SrsListenerType type; protected: std::string ip; int port; - //SrsServer* server; public: Srs28181Listener(); - //Srs28181Listener(SrsServer* svr, SrsListenerType t); virtual ~Srs28181Listener(); public: //virtual SrsListenerType listen_type(); @@ -111,7 +107,6 @@ class Srs28181Listener }; // A TCP listener -//class Srs28181TcpStreamListener : public Srs28181Listener, public SrsTcpListener class Srs28181TcpStreamListener : public Srs28181Listener, public ISrsTcpHandler { private: @@ -138,8 +133,9 @@ class Srs28181TcpStreamListener : public Srs28181Listener, public ISrsTcpHandler // problem in multiple threads. For SRS, we only use single thread module, // like NGINX to get very high performance, with asynchronous and non-blocking // sockets. + /* -* +* SrsOneCycleCoroutine has these features: * 1.will exit thread if it returns from cycle function * 2.no pull function * 3.can destory itself in cycle @@ -187,7 +183,7 @@ class SrsOneCycleCoroutine: public ISrsCoroutineHandler static void* pfn(void* arg); }; -// Bind a udp port, start thread to recv packet and handler it. +// Bind a udp port, start a thread to recv packet and handler it. class SrsLiveUdpListener : public ISrsCoroutineHandler { private: @@ -209,18 +205,16 @@ class SrsLiveUdpListener : public ISrsCoroutineHandler public: virtual int fd(); virtual srs_netfd_t stfd(); - // set timeout value //virtual srs_error_t wait(srs_utime_t tm); public: uint64_t nb_packet(); public: virtual srs_error_t listen(); -// Interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); }; - +// A guard thread class SrsLifeGuardThread : public SrsOneCycleCoroutine { private: @@ -235,7 +229,6 @@ class SrsLifeGuardThread : public SrsOneCycleCoroutine virtual void wait(srs_utime_t tm); }; - // 28181 udp stream linstener class Srs28181UdpStreamListener : public Srs28181Listener, public ISrsUdpHandler, public ISrsCoroutineHandler { @@ -247,7 +240,6 @@ class Srs28181UdpStreamListener : public Srs28181Listener, public ISrsUdpHandler uint64_t nb_packet; bool workdone; public: - //Srs28181UdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c); Srs28181UdpStreamListener(Srs28181StreamServer * srv, std::string suuid); virtual ~Srs28181UdpStreamListener(); private: @@ -303,20 +295,11 @@ class Srs28181StreamCore // video stream. int video_id; std::string video_codec; - //SrsRtpConn* video_rtp; // audio stream. int audio_id; std::string audio_codec; int audio_sample_rate; int audio_channel; - //SrsRtpConn* audio_rtp; -private: - //srs_netfd_t stfd; - //SrsStSocket* skt; - ////SrsRtspStack* rtsp; - ////SrsRtspCaster* caster; - //Srs28181TcpStreamListener* listener; - //SrsCoroutine* trd; private: ////SrsRequest* req; SrsSimpleRtmpClient* sdk; @@ -339,29 +322,21 @@ class Srs28181StreamCore int stream_id; - SrsRtpPacket* cache_; + Srs2SRtpPacket* cache_; // the timestamp of a rtp group uint32_t group_timestamp; - // if timestamp boundary flag enabled - // true says using rtp timestamp decode rtp group + // true says using timestamp boundary bool first_rtp_tsb_enabled_; // first rtp with new timestamp in a rtp group - SrsRtpPacket * first_rtp_tsb_; + Srs2SRtpPacket * first_rtp_tsb_; // indicates rtp group boundary decode type: marker or timestamp int boundary_type_; public: - //Srs28181StreamCore(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o); Srs28181StreamCore(std::string suuid); virtual ~Srs28181StreamCore(); - // used in tcp but not needed in udp -//public: - //virtual srs_error_t init(); -//private: - //virtual srs_error_t do_cycle(); - public: // decode rtp using MB boundary virtual int decode_packet(char* buf, int nb_buf); @@ -369,15 +344,12 @@ class Srs28181StreamCore virtual int decode_packet_v2(char* buf, int nb_buf); // internal methods public: - virtual srs_error_t on_stream_packet(SrsRtpPacket* pkt, int stream_id); - virtual srs_error_t on_stream_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); -// Interface ISrsOneCycleThreadHandler -//public: - //virtual srs_error_t cycle(); + virtual srs_error_t on_stream_packet(Srs2SRtpPacket* pkt, int stream_id); + virtual srs_error_t on_stream_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts); private: - virtual srs_error_t on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); - virtual srs_error_t on_rtp_audio(SrsRtpPacket* pkt, int64_t dts); - virtual srs_error_t kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts); + virtual srs_error_t on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts); + virtual srs_error_t on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts); + virtual srs_error_t kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts); private: virtual srs_error_t write_sequence_header(); virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); @@ -391,32 +363,29 @@ class Srs28181StreamCore virtual void close(); }; +// TODO: will rewrite blow codes for TCP mode in future // The 28181 tcp stream connection class Srs28181TcpStreamConn : public ISrsCoroutineHandler { private: std::string output; std::string output_template; - std::string target_tcUrl;//rtsp_tcUrl; - std::string stream_name;//rtsp_stream; + std::string target_tcUrl; + std::string stream_name; SrsPithyPrint* pprint; private: std::string session; // video stream. int video_id; std::string video_codec; - //SrsRtpConn* video_rtp; // audio stream. int audio_id; std::string audio_codec; int audio_sample_rate; int audio_channel; - //SrsRtpConn* audio_rtp; private: srs_netfd_t stfd; SrsStSocket* skt; - //SrsRtspStack* rtsp; - //SrsRtspCaster* caster; Srs28181TcpStreamListener* listener; SrsCoroutine* trd; private: @@ -441,7 +410,7 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler int stream_id; - SrsRtpPacket* cache_; + Srs2SRtpPacket* cache_; // the timestamp of a rtp group uint32_t group_timestamp; @@ -449,7 +418,7 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler // true says using rtp timestamp decode rtp group bool first_rtp_tsb_enabled_; // first rtp with new timestamp in a rtp group - SrsRtpPacket * first_rtp_tsb_; + Srs2SRtpPacket * first_rtp_tsb_; // indicates rtp group boundary decode type: marker or timestamp int boundary_type_; @@ -468,15 +437,15 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler virtual int decode_packet_v2(char* buf, int nb_buf); // internal methods public: - virtual srs_error_t on_rtp_packet(SrsRtpPacket* pkt, int stream_id); - virtual srs_error_t on_rtp_video_adv(SrsRtpPacket* pkt, int64_t dts, int64_t pts); + virtual srs_error_t on_rtp_packet(Srs2SRtpPacket* pkt, int stream_id); + virtual srs_error_t on_rtp_video_adv(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); private: - virtual srs_error_t on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); - virtual srs_error_t on_rtp_audio(SrsRtpPacket* pkt, int64_t dts); - virtual srs_error_t kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts); + virtual srs_error_t on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts); + virtual srs_error_t on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts); + virtual srs_error_t kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts); private: virtual srs_error_t write_sequence_header(); virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); @@ -488,4 +457,5 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler virtual srs_error_t connect(); // Close the connection to RTMP server. virtual void close(); -}; \ No newline at end of file +}; +#endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index c16b18a4f8..b69def576b 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1301,11 +1301,13 @@ SrsGoApi28181StreamCreation::~SrsGoApi28181StreamCreation() srs_error_t SrsGoApi28181StreamCreation::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; + std::string suuid = ""; int port = 0; std::stringstream sstream; SrsJsonObject* content = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, content); + if((err = server->create_28181stream_listener(SrsListener28181UdpStream,port,suuid))!=srs_success) { srs_warn("SrsGoApi28181StreamCreation - create listener failed[%d]",srs_error_code(err)); @@ -1313,10 +1315,12 @@ srs_error_t SrsGoApi28181StreamCreation::serve_http(ISrsHttpResponseWriter* w, I content->set("desc",SrsJsonAny::str("create listener failed")); return srs_api_response(w, r, content->dumps()); } + sstream<set("status",SrsJsonAny::str("successful")); content->set("stream-uuid",SrsJsonAny::str(suuid.c_str())); content->set("listen-port",SrsJsonAny::str(sstream.str().c_str())); + return srs_api_response(w, r, content->dumps()); } diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index ed684d4b96..8c6967115e 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -240,6 +240,12 @@ srs_error_t SrsRtspConn::serve() return err; } +std::string SrsRtspConn::remote_ip() +{ + // TODO: FIXME: Implement it. + return ""; +} + srs_error_t SrsRtspConn::do_cycle() { srs_error_t err = srs_success; @@ -684,6 +690,7 @@ SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) output = _srs_config->get_stream_caster_output(c); local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); + manager = new SrsCoroutineManager(); } SrsRtspCaster::~SrsRtspCaster() @@ -691,10 +698,21 @@ SrsRtspCaster::~SrsRtspCaster() std::vector::iterator it; for (it = clients.begin(); it != clients.end(); ++it) { SrsRtspConn* conn = *it; - srs_freep(conn); + manager->remove(conn); } clients.clear(); used_ports.clear(); + + srs_freep(manager); +} + +srs_error_t SrsRtspCaster::initialize() +{ + srs_error_t err = srs_success; + if ((err = manager->start()) != srs_success) { + return srs_error_wrap(err, "start manager"); + } + return err; } srs_error_t SrsRtspCaster::alloc_port(int* pport) @@ -747,6 +765,6 @@ void SrsRtspCaster::remove(SrsRtspConn* conn) } srs_info("rtsp: remove connection from caster."); - srs_freep(conn); + manager->remove(conn); } diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 7bbd526515..0ca748535b 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -100,7 +100,7 @@ class SrsRtspJitter }; // The rtsp connection serve the fd. -class SrsRtspConn : public ISrsCoroutineHandler +class SrsRtspConn : public ISrsCoroutineHandler, public ISrsConnection { private: std::string output_template; @@ -143,6 +143,7 @@ class SrsRtspConn : public ISrsCoroutineHandler virtual ~SrsRtspConn(); public: virtual srs_error_t serve(); + virtual std::string remote_ip(); private: virtual srs_error_t do_cycle(); // internal methods @@ -179,6 +180,7 @@ class SrsRtspCaster : public ISrsTcpHandler std::map used_ports; private: std::vector clients; + SrsCoroutineManager* manager; public: SrsRtspCaster(SrsConfDirective* c); virtual ~SrsRtspCaster(); @@ -188,6 +190,7 @@ class SrsRtspCaster : public ISrsTcpHandler virtual srs_error_t alloc_port(int* pport); // Free the alloced rtp port. virtual void free_port(int lpmin, int lpmax); + virtual srs_error_t initialize(); // Interface ISrsTcpHandler public: virtual srs_error_t on_tcp_client(srs_netfd_t stfd); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 3519e3e309..f9c9a24477 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -30,7 +30,6 @@ #include #include #include -#include using namespace std; #include @@ -44,6 +43,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -111,6 +111,8 @@ std::string srs_listener_type2string(SrsListenerType type) return "HTTP-FLV"; case SrsListener28181TcpStream: return "GB28181-Stream over TCP"; + case SrsListener28181UdpStream: + return "GB28181-Stream over UDP" default: return "UNKONWN"; } @@ -283,100 +285,6 @@ srs_error_t SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd) return err; } -SrsTcpStreamListener::SrsTcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) -{ - listener = NULL; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListener28181TcpStream); - if (type == SrsListener28181TcpStream) { - //caster = new SrsGB28181TcpStreamCaster(c); - } -} - -SrsTcpStreamListener::~SrsTcpStreamListener() -{ - srs_freep(caster); - srs_freep(listener); -} - -srs_error_t SrsTcpStreamListener::listen(string i, int p) -{ - srs_error_t err = srs_success; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - //srs_assert(type == SrsListenerRtsp); - - ip = i; - port = p; - - srs_freep(listener); - listener = new SrsTcpListener(this, ip, port); - - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "tcp stream listen %s:%d", ip.c_str(), port); - } - - string v = srs_listener_type2string(type); - srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - - return err; -} - -srs_error_t SrsTcpStreamListener::on_tcp_client(srs_netfd_t stfd) -{ - srs_error_t err = caster->on_tcp_client(stfd); - if (err != srs_success) { - srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - - return srs_success; -} - - -SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t) -{ - listener = NULL; - caster = c; -} - -SrsUdpStreamListener::~SrsUdpStreamListener() -{ - srs_freep(listener); -} - -srs_error_t SrsUdpStreamListener::listen(string i, int p) -{ - srs_error_t err = srs_success; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerMpegTsOverUdp); - - ip = i; - port = p; - - srs_freep(listener); - listener = new SrsUdpListener(caster, ip, port); - - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); - } - - // notify the handler the fd changed. - if ((err = caster->on_stfd_change(listener->stfd())) != srs_success) { - return srs_error_wrap(err, "notify fd change failed"); - } - - string v = srs_listener_type2string(type); - srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - - return err; -} - SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL) { // the caller already ensure the type is ok, @@ -514,147 +422,6 @@ void SrsSignalManager::sig_catcher(int signo) errno = err; } -// Whether we are in docker, defined in main module. -extern bool _srs_in_docker; - -SrsInotifyWorker::SrsInotifyWorker(SrsServer* s) -{ - server = s; - trd = new SrsSTCoroutine("inotify", this); - inotify_fd = NULL; -} - -SrsInotifyWorker::~SrsInotifyWorker() -{ - srs_freep(trd); - srs_close_stfd(inotify_fd); -} - -srs_error_t SrsInotifyWorker::start() -{ - srs_error_t err = srs_success; - - // Whether enable auto reload config. - bool auto_reload = _srs_config->inotify_auto_reload(); - if (!auto_reload && _srs_in_docker && _srs_config->auto_reload_for_docker()) { - srs_warn("enable auto reload for docker"); - auto_reload = true; - } - - if (!auto_reload) { - return err; - } - - // Create inotify to watch config file. - int fd = ::inotify_init1(IN_NONBLOCK); - if (fd < 0) { - return srs_error_new(ERROR_INOTIFY_CREATE, "create inotify"); - } - - // Open as stfd to read by ST. - if ((inotify_fd = srs_netfd_open(fd)) == NULL) { - ::close(fd); - return srs_error_new(ERROR_INOTIFY_OPENFD, "open fd=%d", fd); - } - - if (((err = srs_fd_closeexec(fd))) != srs_success) { - return srs_error_new(ERROR_INOTIFY_OPENFD, "closeexec fd=%d", fd); - } - - // /* the following are legal, implemented events that user-space can watch for */ - // #define IN_ACCESS 0x00000001 /* File was accessed */ - // #define IN_MODIFY 0x00000002 /* File was modified */ - // #define IN_ATTRIB 0x00000004 /* Metadata changed */ - // #define IN_CLOSE_WRITE 0x00000008 /* Writtable file was closed */ - // #define IN_CLOSE_NOWRITE 0x00000010 /* Unwrittable file closed */ - // #define IN_OPEN 0x00000020 /* File was opened */ - // #define IN_MOVED_FROM 0x00000040 /* File was moved from X */ - // #define IN_MOVED_TO 0x00000080 /* File was moved to Y */ - // #define IN_CREATE 0x00000100 /* Subfile was created */ - // #define IN_DELETE 0x00000200 /* Subfile was deleted */ - // #define IN_DELETE_SELF 0x00000400 /* Self was deleted */ - // #define IN_MOVE_SELF 0x00000800 /* Self was moved */ - // - // /* the following are legal events. they are sent as needed to any watch */ - // #define IN_UNMOUNT 0x00002000 /* Backing fs was unmounted */ - // #define IN_Q_OVERFLOW 0x00004000 /* Event queued overflowed */ - // #define IN_IGNORED 0x00008000 /* File was ignored */ - // - // /* helper events */ - // #define IN_CLOSE (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE) /* close */ - // #define IN_MOVE (IN_MOVED_FROM | IN_MOVED_TO) /* moves */ - // - // /* special flags */ - // #define IN_ONLYDIR 0x01000000 /* only watch the path if it is a directory */ - // #define IN_DONT_FOLLOW 0x02000000 /* don't follow a sym link */ - // #define IN_EXCL_UNLINK 0x04000000 /* exclude events on unlinked objects */ - // #define IN_MASK_ADD 0x20000000 /* add to the mask of an already existing watch */ - // #define IN_ISDIR 0x40000000 /* event occurred against dir */ - // #define IN_ONESHOT 0x80000000 /* only send event once */ - - // Watch the config directory events. - string config_dir = srs_path_dirname(_srs_config->config()); - uint32_t mask = IN_MODIFY | IN_CREATE | IN_MOVED_TO; int watch_conf = 0; - if ((watch_conf = ::inotify_add_watch(fd, config_dir.c_str(), mask)) < 0) { - return srs_error_new(ERROR_INOTIFY_WATCH, "watch file=%s, fd=%d, watch=%d, mask=%#x", - config_dir.c_str(), fd, watch_conf, mask); - } - srs_trace("auto reload watching fd=%d, watch=%d, file=%s", fd, watch_conf, config_dir.c_str()); - - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "inotify"); - } - - return err; -} - -srs_error_t SrsInotifyWorker::cycle() -{ - srs_error_t err = srs_success; - - string config_path = _srs_config->config(); - string config_file = srs_path_basename(config_path); - string k8s_file = "..data"; - - while (true) { - char buf[4096]; - ssize_t nn = srs_read(inotify_fd, buf, (size_t)sizeof(buf), SRS_UTIME_NO_TIMEOUT); - if (nn < 0) { - srs_warn("inotify ignore read failed, nn=%d", (int)nn); - break; - } - - // Whether config file changed. - bool do_reload = false; - - // Parse all inotify events. - inotify_event* ie = NULL; - for (char* ptr = buf; ptr < buf + nn; ptr += sizeof(inotify_event) + ie->len) { - ie = (inotify_event*)ptr; - - if (!ie->len || !ie->name) { - continue; - } - - string name = ie->name; - if ((name == k8s_file || name == config_file) && ie->mask & (IN_MODIFY|IN_CREATE|IN_MOVED_TO)) { - do_reload = true; - } - - srs_trace("inotify event wd=%d, mask=%#x, len=%d, name=%s, reload=%d", ie->wd, ie->mask, ie->len, ie->name, do_reload); - } - - // Notify server to do reload. - if (do_reload && srs_path_exists(config_path)) { - server->on_signal(SRS_SIGNAL_RELOAD); - } - - srs_usleep(3000 * SRS_UTIME_MILLISECONDS); - } - - return err; -} - ISrsServerCycle::ISrsServerCycle() { } @@ -753,11 +520,9 @@ void SrsServer::gracefully_dispose() close_listeners(SrsListenerMpegTsOverUdp); close_listeners(SrsListenerRtsp); close_listeners(SrsListenerFlv); - srs_trace("listeners closed"); // Fast stop to notify FFMPEG to quit, wait for a while then fast kill. ingester->stop(); - srs_trace("ingesters stopped"); // Wait for connections to quit. // While gracefully quiting, user can requires SRS to fast quit. @@ -773,7 +538,6 @@ void SrsServer::gracefully_dispose() // dispose the source for hls and dvr. _srs_sources->dispose(); - srs_trace("source disposed"); #ifdef SRS_AUTO_MEM_WATCH srs_memory_report(); @@ -1047,16 +811,7 @@ srs_error_t SrsServer::ingest() srs_error_t SrsServer::cycle() { - srs_error_t err = srs_success; - - // Start the inotify auto reload by watching config file. - SrsInotifyWorker inotify(this); - if ((err = inotify.start()) != srs_success) { - return srs_error_wrap(err, "start inotify"); - } - - // Do server main cycle. - err = do_cycle(); + srs_error_t err = do_cycle(); #ifdef SRS_AUTO_GPERF_MC destroy(); @@ -1099,7 +854,6 @@ srs_error_t SrsServer::cycle() void SrsServer::on_signal(int signo) { if (signo == SRS_SIGNAL_RELOAD) { - srs_trace("reload config, signo=%d", signo); signal_reload = true; return; } @@ -1107,7 +861,7 @@ void SrsServer::on_signal(int signo) #ifndef SRS_AUTO_GPERF_MC if (signo == SRS_SIGNAL_REOPEN_LOG) { _srs_log->reopen(); - srs_warn("reopen log file, signo=%d", signo); + srs_warn("reopen log file"); return; } #endif @@ -1115,7 +869,7 @@ void SrsServer::on_signal(int signo) #ifdef SRS_AUTO_GPERF_MC if (signo == SRS_SIGNAL_REOPEN_LOG) { signal_gmc_stop = true; - srs_warn("for gmc, the SIGUSR1 used as SIGINT, signo=%d", signo); + srs_warn("for gmc, the SIGUSR1 used as SIGINT"); return; } #endif @@ -1127,7 +881,7 @@ void SrsServer::on_signal(int signo) if (signo == SIGINT) { #ifdef SRS_AUTO_GPERF_MC - srs_trace("gmc is on, main cycle will terminate normally, signo=%d", signo); + srs_trace("gmc is on, main cycle will terminate normally."); signal_gmc_stop = true; #else #ifdef SRS_AUTO_MEM_WATCH @@ -1401,8 +1155,8 @@ srs_error_t SrsServer::listen_stream_caster() } } + // create a 28181 stream server srs_28181_streams = new Srs28181StreamServer(); - //srs_28181_streams->create_listener(Listener_UDP); return err; } @@ -1412,8 +1166,9 @@ srs_error_t SrsServer::create_28181stream_listener(SrsListenerType type, int& po if(srs_28181_streams==NULL){ return srs_error_new(13025,"srs 28181 stream server is null!"); } + return srs_28181_streams->create_listener(type, port,suuid); - srs_trace("srsserver - create a new 28181 stream listener[port:%d]",port); + srs_trace("create a new 28181 stream listener[port:%d]",port); } void SrsServer::close_listeners(SrsListenerType type) diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 08717a469d..8b1e4d22f8 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -107,42 +107,6 @@ class SrsBufferListener : virtual public SrsListener, virtual public ISrsTcpHand virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; -/* -// a server for gb28181 stream. -class Srs28181StreamServer : public ISrsTcpHandler -{ -private: - std::string output; - int local_port_min; - int local_port_max; - // The key: port, value: whether used. - std::map used_ports; -private: - //std::vector clients; - std::vector listeners; -public: - Srs28181StreamServer(SrsConfDirective* c); - virtual ~SrsRtspCaster(); -public: - // create a stream listener - virtual srs_error_t create_listener(); - // release a listener - virtual srs_error_t release_listener(); - // Alloc a rtp port from local ports pool. - // @param pport output the rtp port. - virtual srs_error_t alloc_port(int* pport); - // Free the alloced rtp port. - virtual void free_port(int lpmin, int lpmax); -// Interface ISrsTcpHandler -public: - //virtual srs_error_t on_tcp_client(srs_netfd_t stfd); -// internal methods. -public: - virtual void remove(SrsRtspConn* conn); -}; -*/ - - // A TCP listener, for rtsp server. class SrsRtspListener : virtual public SrsListener, virtual public ISrsTcpHandler { @@ -175,35 +139,6 @@ class SrsHttpFlvListener : virtual public SrsListener, virtual public ISrsTcpHan virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; -// A tcp listener, for an extend tcp stream -class SrsTcpStreamListener : virtual public SrsListener, virtual public ISrsTcpHandler -{ -private: - SrsTcpListener* listener; - ISrsTcpHandler* caster; -public: - SrsTcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); - virtual ~SrsTcpStreamListener(); -public: - virtual srs_error_t listen(std::string i, int p); -// Interface ISrsTcpHandler -public: - virtual srs_error_t on_tcp_client(srs_netfd_t stfd); -}; - -// A UDP listener, for udp server. -class SrsUdpStreamListener : public SrsListener -{ -protected: - SrsUdpListener* listener; - ISrsUdpHandler* caster; -public: - SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c); - virtual ~SrsUdpStreamListener(); -public: - virtual srs_error_t listen(std::string i, int p); -}; - // A UDP listener, for udp stream caster server. class SrsUdpCasterListener : public SrsUdpStreamListener { @@ -241,24 +176,6 @@ class SrsSignalManager : public ISrsCoroutineHandler static void sig_catcher(int signo); }; -// Auto reload by inotify. -// @see https://github.com/ossrs/srs/issues/1635 -class SrsInotifyWorker : public ISrsCoroutineHandler -{ -private: - SrsServer* server; - SrsCoroutine* trd; - srs_netfd_t inotify_fd; -public: - SrsInotifyWorker(SrsServer* s); - virtual ~SrsInotifyWorker(); -public: - virtual srs_error_t start(); -// Interface ISrsEndlessThreadHandler. -public: - virtual srs_error_t cycle(); -}; - // A handler to the handle cycle in SRS RTMP server. class ISrsServerCycle { diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp index 65dd187a8b..c1e828e446 100644 --- a/trunk/src/protocol/srs_rtsp_stack.cpp +++ b/trunk/src/protocol/srs_rtsp_stack.cpp @@ -26,7 +26,6 @@ #if !defined(SRS_EXPORT_LIBRTMP) #include -#include #include using namespace std; @@ -40,7 +39,6 @@ using namespace std; #include #include - #define SRS_RTSP_BUFFER 4096 // get the status text of code. @@ -135,8 +133,7 @@ SrsRtpPacket::SrsRtpPacket() timestamp = 0; ssrc = 0; - payload = new SrsSimpleBufferX(); - tgtstream = new SrsSimpleBufferX(); + payload = new SrsSimpleStream(); audio = new SrsAudioFrame(); chunked = false; completed = false; @@ -145,7 +142,6 @@ SrsRtpPacket::SrsRtpPacket() SrsRtpPacket::~SrsRtpPacket() { srs_freep(payload); - srs_freep(tgtstream); srs_freep(audio); } @@ -181,48 +177,6 @@ void SrsRtpPacket::reap(SrsRtpPacket* src) src->audio = NULL; } -void SrsRtpPacket::copy_v2(SrsRtpPacket* src) -{ - version = src->version; - padding = src->padding; - extension = src->extension; - csrc_count = src->csrc_count; - marker = src->marker; - payload_type = src->payload_type; - sequence_number = src->sequence_number; - timestamp = src->timestamp; - ssrc = src->ssrc; - - chunked = src->chunked; - completed = src->completed; - - //add code. only h264 format and chunked and last chank is completed - if (payload_type == 96 && chunked && completed) { - - srs_freep(audio); - audio = src->audio; - src->audio = NULL; - //srs_freep(audio_samples); - //audio_samples = src->audio_samples; - //src->audio_samples = NULL; - } - //srs_freep(audio); - //audio = new SrsAudioFrame(); -} - -void SrsRtpPacket::reap_v2(SrsRtpPacket* src) -{ - copy_v2(src); - - srs_freep(payload); - payload = src->payload; - src->payload = NULL; - - srs_freep(audio); - audio = src->audio; - src->audio = NULL; -} - srs_error_t SrsRtpPacket::decode(SrsBuffer* stream) { srs_error_t err = srs_success; @@ -362,404 +316,6 @@ srs_error_t SrsRtpPacket::decode_96(SrsBuffer* stream) return err; } - -// beisong: decode gb stream ps96/mpeg97/h26498/svac99 -int SrsRtpPacket::decode_v2(SrsBuffer* stream, int & boundary_type) -{ - int ret = 0; - - // 12bytes header - if (!stream->require(12)) { - ret = ERROR_RTP_HEADER_CORRUPT; - srs_error("rtp header corrupt. ret=%d", ret); - return ret; - } - - int8_t vv = stream->read_1bytes(); - version = (vv >> 6) & 0x03; - padding = (vv >> 5) & 0x01; - extension = (vv >> 4) & 0x01; - csrc_count = vv & 0x0f; - - int8_t mv = stream->read_1bytes(); - marker = (mv >> 7) & 0x01; - payload_type = mv & 0x7f; - - sequence_number = stream->read_2bytes(); - timestamp = stream->read_4bytes(); - ssrc = stream->read_4bytes(); - - // TODO: FIXME: check sequence number. - /* - if (marker && !marker_boundary) { - marker_boundary = true; - srs_warn("rtp decode: SWITCH to MKR decoder! Default is TSB"); - }*/ - - // video codec. - if (payload_type == 96){ - - return decode_96ps_rtp_tsb2(stream); - - if (boundary_type==MarkerBoundary) { - // use marker to decode - return decode_96ps_rtp(stream, marker); - } - else if(boundary_type==TimestampBoundary) { - // use timestamp to decode - return decode_96ps_rtp_tsb2(stream); - } - else {} - } - else - { - srs_error("rtp type is not 96 ps . ret=%d", ret); - } - - return ret; -} - - -int SrsRtpPacket::decode_v2(SrsBuffer* stream) -{ - int ret = ERROR_SUCCESS; - - // 12bytes header - if (!stream->require(12)) { - ret = ERROR_RTP_HEADER_CORRUPT; - srs_error("rtp header corrupt. ret=%d", ret); - return ret; - } - - int8_t vv = stream->read_1bytes(); - version = (vv >> 6) & 0x03; - padding = (vv >> 5) & 0x01; - extension = (vv >> 4) & 0x01; - csrc_count = vv & 0x0f; - - int8_t mv = stream->read_1bytes(); - marker = (mv >> 7) & 0x01; - payload_type = mv & 0x7f; - - sequence_number = stream->read_2bytes(); - timestamp = stream->read_4bytes(); - ssrc = stream->read_4bytes(); - - // TODO: FIXME: check sequence number. - - // video codec. - if (payload_type == 96) { - - // use marker to decode - return decode_96ps_rtp(stream, marker); - } - else{ - srs_error("rtp type is not 96 ps . ret=%d", ret); - } - - return ret; -} - - - -int SrsRtpPacket::decode_v3(SrsBuffer* stream) -{ - int ret = ERROR_SUCCESS; - - // 12bytes header - if (!stream->require(12)) { - ret = ERROR_RTP_HEADER_CORRUPT; - srs_error("rtp header corrupt. ret=%d", ret); - return ret; - } - - int8_t vv = stream->read_1bytes(); - version = (vv >> 6) & 0x03; - padding = (vv >> 5) & 0x01; - extension = (vv >> 4) & 0x01; - csrc_count = vv & 0x0f; - - int8_t mv = stream->read_1bytes(); - marker = (mv >> 7) & 0x01; - payload_type = mv & 0x7f; - - sequence_number = stream->read_2bytes(); - timestamp = stream->read_4bytes(); - ssrc = stream->read_4bytes(); - - // TODO: FIXME: check sequence number. - if (payload_type == 96) { - // use marker to decode - return decode_raw_rtp(stream, marker); - } - else{ - srs_error("rtp type is not 96 ps . ret=%d", ret); - } - - return ret; -} - - - -int SrsRtpPacket::decode_stream() -{ - int ret = 0; - //ret= decode_96ps_pkg(); - // only decode pesv+pesa - { - //real time - ret = decode_96ps_core(); - } - - payload->resetoft(); - - return ret; -} - -int SrsRtpPacket::decode_raw_rtp(SrsBuffer* stream, int8_t marker) -{ - int ret = 0; - - // atleast 2bytes content. - if (!stream->require(0)) { - ret = ERROR_RTP_TYPE96_CORRUPT; - srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); - return ret; - } - - //Basson: marker is used to indicate ps packet boundery - //so ps packet have 2 rtp packets at Least! one is begin as 0x8060, and one is 0x80e0 for end - if(marker == 0){ - chunked = true; - completed = false; - } - else { - // always chunked in ps - // considering compatibility for other streams, we set chunked as true always - chunked = true; - completed = true; - } - payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); - //tgtstream->append(stream->data() + stream->pos(), stream->size() - stream->pos()); - - return ret; -} - -int SrsRtpPacket::decode_96ps_rtp(SrsBuffer* stream, int8_t marker) -{ - int ret = 0; - - // atleast 2bytes content. - if (!stream->require(0)) { - ret = ERROR_RTP_TYPE96_CORRUPT; - srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); - return ret; - } - - //beisong: marker is used to indicate ps packet boundery - //so ps packet have 2 rtp packets at Least! one is begin as 0x8060, and one is 0x80e0 for end - if(marker == 0){ - chunked = true; - completed = false; - } - else { - // always chunked in ps - // considering compatibility for other streams, we set chunked as true always - chunked = true; - completed = true; - } - payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); - - return ret; -} - -int SrsRtpPacket::decode_96ps_rtp_tsb2(SrsBuffer* stream) -{ - int ret = 0; - - // atleast 2bytes content. - if (!stream->require(1)) { - ret = ERROR_RTP_TYPE96_CORRUPT; - srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); - return ret; - } - - // suitable for no standard ps stream - // only use rtp timestamp to find rtp group boundary - // always chunked in ps - // considering compatibility for other streams, we set chunked as true always - chunked = true; - //completed = true; - - payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); - - return ret; -} - - -int SrsRtpPacket::decode_96ps_rtp_tsb(SrsBuffer* stream, u_int32_t & group_ts) -{ - int ret = ERROR_SUCCESS; - - // atleast 2bytes content. - if (!stream->require(1)) { - ret = ERROR_RTP_TYPE96_CORRUPT; - srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); - return ret; - } - - // set current timestamp at the very beginning - if (group_ts == 0) { - group_ts = timestamp; - } - - // suitable for no standard ps stream - // only use rtp timestamp to find rtp group boundary - // always chunked in ps - // considering compatibility for other streams, we set chunked as true always - if (timestamp != group_ts) { - // - group_ts = timestamp; - start_new_timestamp = true; - - chunked = true; - //completed = true; - } - else { - // continuing reciving rtp - start_new_timestamp = false; - - chunked = true; - //completed = false; - } - - payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); - - return ret; -} - -int SrsRtpPacket::get_decodertype() -{ - if (marker_boundary) { - return MarkerBoundary; - } - else { - return TimestampBoundary; - } -} - -bool SrsRtpPacket::newtimestamp() -{ - return start_new_timestamp; -} - -#define PH_PSC 0x01ba -#define SYS_HEAD_PSC 0x01bb -#define PS_MAP_PSC 0x01bc -#define PES_V_PSC 0x01e0 -#define PES_A_PSC 0x01c0 -#define HK_PRIVATE_PSC 0x01bd -int SrsRtpPacket::decode_96ps_core() -{ - int ret = 0; - - bool a, b, c; - Packet_Start_Code psc; - int psc_len = sizeof(Packet_Start_Code); - PS_Packet_Header ps_ph; - int ph_len = sizeof(PS_Packet_Header); - PS_Sys_Header sys_header; - int sys_header_len = sizeof(PS_Sys_Header); - PS_Map ps_map; - int psm_len = sizeof(PS_Map); - PS_PES pes; - int pes_len = sizeof(PS_PES); - - //pesv1...n+pesa1...n - int p_skip_0 = 0; - int p_skip_1 = 0; - psc.start_code = 0; - while (payload->chk_bytes((char*)&psc, sizeof(Packet_Start_Code))) { - psc.start_code = htonl(psc.start_code); - if (psc.start_code == PES_V_PSC) { - psc.start_code = 0; - a = payload->read_bytes_x((char*)&pes, sizeof(PS_PES)); - b = payload->skip_x(pes.pes_header_data_length); - // pengzhang: see my note - pes.pes_packet_length = htons(pes.pes_packet_length); - u_int32_t load_len = pes.pes_packet_length - pes.pes_header_data_length - 3; - c = payload->require(load_len); - if (!a || !b) { - ret = ERROR_RTP_PS_CORRUPT; - srs_error(" core- rtp type 96 ps Currepted. size not enough at 4. ret=%d", ret); - return ret; - } - - if (!c) { - // may loss some packets, copy the last buffer - srs_warn("core- rtp type 96 ps Loss some packet pesVV len:%d, cursize:%d", load_len, payload->cursize()); - load_len = payload->cursize(); - - if (load_len <= 0) { - srs_warn("core- rtp type 96 pesVV len <=0 cursize:%d, oft:%d, payload len:%d, tgt len:%d will return!", - payload->cursize(), payload->getoft(), payload->length(), tgtstream->length()); - return ret; - } - } - - tgtstream->append(payload->curat(), load_len); - payload->skip_x(load_len); - - } - else if (psc.start_code == PES_A_PSC) { - //audio:do nothing - psc.start_code = 0; - a = payload->read_bytes_x((char*)&pes, sizeof(PS_PES)); - b = payload->skip_x(pes.pes_header_data_length); - //see my note - pes.pes_packet_length = htons(pes.pes_packet_length); - u_int32_t load_len = pes.pes_packet_length - pes.pes_header_data_length - 3; - c = payload->require(load_len); - if (!a || !b) { - ret = ERROR_RTP_PS_CORRUPT; - srs_error(" core- rtp type 96 ps Currepted. size not enough at 5. ret=%d", ret); - return ret; - } - // here len may not enough as packet Loss, but still can work well as we call require(x) in skip_x - // i will not modify this, as this is a very stable strategy - if (!c) { - // may loss some packets, copy the last buffer - srs_warn("core- rtp type 96 ps Loss some packet pesAAA len:%d, cursize:%d", load_len, payload->cursize()); - load_len = payload->cursize(); - - if (load_len <= 0) { - srs_warn("core- rtp type 96 pesAAA len <=0 cursize:%d, oft:%d, payload len:%d, tgt len:%d will return!", - payload->cursize(), payload->getoft(), payload->length(), tgtstream->length()); - return ret; - } - } - - payload->skip_x(load_len); - } - else { - // specially for history stream, as it has a bug - tgtstream->append(payload->curat(), 1); - payload->skip_x(1); - //p_skip_0++; - if (p_skip_0 <= 2) { - //srs_error(" core- rtp type 96 ps currept. miss some pes in I-Frame"); - } - - //ret = ERROR_RTP_PS_CORRUPT; - //srs_error(" rtp type 96 ps currept. miss some pes in I-Frame ret=%d", ret); - //return ret; - } - } //while pesn - - return ret; -} - - SrsRtspSdp::SrsRtspSdp() { state = SrsRtspSdpStateOthers; diff --git a/trunk/src/protocol/srs_rtsp_stack.hpp b/trunk/src/protocol/srs_rtsp_stack.hpp index 9643a2c853..87c07ce555 100644 --- a/trunk/src/protocol/srs_rtsp_stack.hpp +++ b/trunk/src/protocol/srs_rtsp_stack.hpp @@ -35,7 +35,6 @@ class SrsBuffer; class SrsSimpleStream; -class SrsSimpleBufferX; class SrsAudioFrame; class ISrsProtocolReadWriter; @@ -127,47 +126,6 @@ enum SrsRtspTokenState SrsRtspTokenStateEOF = 102, }; -enum SrsRtpDecoderType -{ - TimestampBoundary = 1, - MarkerBoundary = 2, -}; - -#define ERROR_RTP_PS_CORRUPT 12060 -#define ERROR_RTP_PS_HK_PRIVATE_PROTO 12061 -#define ERROR_RTP_PS_FIRST_TSB_LOSS 12062 - -#pragma pack (1) -struct Packet_Start_Code { - u_int32_t start_code; //4bytes,need htonl exchange - //u_int8_t start_code[3]; - //u_int8_t stream_id[1]; -}; - -struct PS_Packet_Header { - Packet_Start_Code psc; //4bytes - u_int8_t holder[9]; - u_int8_t pack_stuffing_length; //low 3bit,high 5 bits are reserved; -}; - -struct PS_Sys_Header { - Packet_Start_Code psc; - u_int16_t header_length; -}; - -struct PS_Map { - Packet_Start_Code psc; - u_int16_t psm_length; //2 bytes need htons exchange -}; - -struct PS_PES { - Packet_Start_Code psc; //4bytes - u_int16_t pes_packet_length; //2 bytes need htonl exchange - u_int8_t holder[2]; - u_int8_t pes_header_data_length; -}; -#pragma pack () - // The rtp packet. // 5. RTP Data Transfer Protocol, @see rfc3550-2003-rtp.pdf, page 12 class SrsRtpPacket @@ -286,73 +244,29 @@ class SrsRtpPacket uint32_t ssrc; //32bits // The payload. - SrsSimpleBufferX* payload; - - // Beikesong: target tgt h264 stream load or other types - SrsSimpleBufferX* tgtstream; - + SrsSimpleStream* payload; // Whether transport in chunked payload. bool chunked; // Whether message is completed. // normal message always completed. // while chunked completed when the last chunk arriaved. bool completed; - - // whether some private data in stream - bool private_proto; // The audio samples, one rtp packets may contains multiple audio samples. SrsAudioFrame* audio; - - private: - // pengzhang: indicate current rtp group timestamp - //u_int32_t group_ts; - bool start_new_timestamp; - -public: - // true says rtp decode is marker, otherwise is timestamp - bool marker_boundary; - // get rtp decode type: TimestampBoundary or MarkerBoundary - virtual int get_decodertype(); - - virtual bool newtimestamp(); - public: SrsRtpPacket(); virtual ~SrsRtpPacket(); public: // copy the header from src. virtual void copy(SrsRtpPacket* src); - // - virtual void copy_v2(SrsRtpPacket* src); // reap the src to this packet, reap the payload. virtual void reap(SrsRtpPacket* src); - // - virtual void reap_v2(SrsRtpPacket* src); // decode rtp packet from stream. virtual srs_error_t decode(SrsBuffer* stream); - - virtual int decode_v2(SrsBuffer* stream, int & boundary_type); - virtual int decode_v2(SrsBuffer* stream); - // not check payload type - virtual int decode_v3(SrsBuffer* stream); - virtual int decode_stream(); private: virtual srs_error_t decode_97(SrsBuffer* stream); virtual srs_error_t decode_96(SrsBuffer* stream); - -private: - virtual int decode_96ps_rtp(SrsBuffer* stream, int8_t marker); - - // decode rtp group by using timestamp as boundary - virtual int decode_96ps_rtp_tsb(SrsBuffer* stream, u_int32_t & group_timestamp); - // decode rtp by using timestamp boundary - // will not compare timestamp in this one - virtual int decode_96ps_rtp_tsb2(SrsBuffer* stream); - // only decode rtp not ps anymore - virtual int decode_raw_rtp(SrsBuffer* stream, int8_t marker); - // only aim on high stabilable - virtual int decode_96ps_core(); }; // The sdp in announce, @see rfc2326-1998-rtsp.pdf, page 159