diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 5355068eb3..c16b18a4f8 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1288,6 +1288,39 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess return srs_api_response(w, r, obj->dumps()); } + +SrsGoApi28181StreamCreation::SrsGoApi28181StreamCreation(SrsServer* srv) +{ + server = srv; +} + +SrsGoApi28181StreamCreation::~SrsGoApi28181StreamCreation() +{ +} + +srs_error_t SrsGoApi28181StreamCreation::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + srs_error_t err = srs_success; + std::string suuid = ""; + int port = 0; + std::stringstream sstream; + SrsJsonObject* content = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, content); + if((err = server->create_28181stream_listener(SrsListener28181UdpStream,port,suuid))!=srs_success) + { + srs_warn("SrsGoApi28181StreamCreation - create listener failed[%d]",srs_error_code(err)); + content->set("status",SrsJsonAny::str("failed")); + content->set("desc",SrsJsonAny::str("create listener failed")); + return srs_api_response(w, r, content->dumps()); + } + sstream<set("status",SrsJsonAny::str("successful")); + content->set("stream-uuid",SrsJsonAny::str(suuid.c_str())); + content->set("listen-port",SrsJsonAny::str(sstream.str().c_str())); + return srs_api_response(w, r, content->dumps()); +} + + SrsGoApiError::SrsGoApiError() { } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index 5957ff2f3f..6e6a49cead 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -201,6 +201,18 @@ class SrsGoApiClusters : public ISrsHttpHandler virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; +class SrsGoApi28181StreamCreation : public ISrsHttpHandler +{ +public: + SrsGoApi28181StreamCreation(SrsServer * srv); + virtual ~SrsGoApi28181StreamCreation(); +public: + virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +private: + SrsServer * server; +}; + + class SrsGoApiError : public ISrsHttpHandler { public: diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 8c6967115e..ceac849e3e 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -31,6 +31,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -45,6 +46,8 @@ using namespace std; #include #include #include +//#include + SrsRtpConn::SrsRtpConn(SrsRtspConn* r, int p, int sid) { @@ -128,6 +131,92 @@ srs_error_t SrsRtpConn::on_udp_packet(const sockaddr* from, const int fromlen, c return err; } + + +SrsRtpOverTcpConn::SrsRtpOverTcpConn(SrsRtspConn* r, int p, int sid) +{ + rtsp = r; + _port = p; + stream_id = sid; + // TODO: support listen at <[ip:]port> + listener = new SrsUdpListener(this, srs_any_address_for_listener(), p); + cache = new SrsRtpPacket(); + pprint = SrsPithyPrint::create_caster(); +} + +SrsRtpOverTcpConn::~SrsRtpOverTcpConn() +{ + srs_freep(listener); + srs_freep(cache); + srs_freep(pprint); +} + +int SrsRtpOverTcpConn::port() +{ + return _port; +} + +srs_error_t SrsRtpOverTcpConn::listen() +{ + return listener->listen(); +} + +srs_error_t SrsRtpOverTcpConn::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + + pprint->elapse(); + + if (true) { + SrsBuffer stream(buf, nb_buf); + + SrsRtpPacket pkt; + if ((err = pkt.decode(&stream)) != srs_success) { + return srs_error_wrap(err, "decode"); + } + + if (pkt.chunked) { + if (!cache) { + cache = new SrsRtpPacket(); + } + cache->copy(&pkt); + cache->payload->append(pkt.payload->bytes(), pkt.payload->length()); + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", + nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, + cache->payload->length() + ); + } + + if (!cache->completed){ + return err; + } + } else { + srs_freep(cache); + cache = new SrsRtpPacket(); + cache->reap(&pkt); + } + } + + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " rtsp: rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d", + stream_id, nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, + cache->payload->length(), cache->chunked + ); + } + + // always free it. + SrsAutoFree(SrsRtpPacket, cache); + + if ((err = rtsp->on_rtp_packet(cache, stream_id)) != srs_success) { + return srs_error_wrap(err, "process rtp packet"); + } + + return err; +} + + + SrsRtspAudioCache::SrsRtspAudioCache() { dts = 0; @@ -240,12 +329,6 @@ srs_error_t SrsRtspConn::serve() return err; } -std::string SrsRtspConn::remote_ip() -{ - // TODO: FIXME: Implement it. - return ""; -} - srs_error_t SrsRtspConn::do_cycle() { srs_error_t err = srs_success; @@ -690,7 +773,6 @@ SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) output = _srs_config->get_stream_caster_output(c); local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); - manager = new SrsCoroutineManager(); } SrsRtspCaster::~SrsRtspCaster() @@ -698,21 +780,10 @@ SrsRtspCaster::~SrsRtspCaster() std::vector::iterator it; for (it = clients.begin(); it != clients.end(); ++it) { SrsRtspConn* conn = *it; - manager->remove(conn); + srs_freep(conn); } clients.clear(); used_ports.clear(); - - srs_freep(manager); -} - -srs_error_t SrsRtspCaster::initialize() -{ - srs_error_t err = srs_success; - if ((err = manager->start()) != srs_success) { - return srs_error_wrap(err, "start manager"); - } - return err; } srs_error_t SrsRtspCaster::alloc_port(int* pport) @@ -765,6 +836,1874 @@ void SrsRtspCaster::remove(SrsRtspConn* conn) } srs_info("rtsp: remove connection from caster."); - manager->remove(conn); + srs_freep(conn); +} + +Srs28181StreamServer::Srs28181StreamServer() +{ + // default value for testing + output = "output_rtmp_url"; + local_port_min = 57000; + local_port_max = 60000; +} + + +Srs28181StreamServer::Srs28181StreamServer(SrsConfDirective* c) +{ + // TODO: FIXME: support reload. + output = _srs_config->get_stream_caster_output(c); + local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); + local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); +} + +Srs28181StreamServer::~Srs28181StreamServer() +{ + std::vector::iterator it; + for (it = listeners.begin(); it != listeners.end(); ++it) { + Srs28181Listener* ltn = *it; + srs_freep(ltn); + } + listeners.clear(); + used_ports.clear(); + + srs_info("28181- server: deconstruction"); +} + +srs_error_t Srs28181StreamServer::create_listener(SrsListenerType type, int& ltn_port, std::string& suuid ) +{ + srs_error_t err = srs_success; + + //uuid_t sid; + //uuid_generate(sid); + //suuid.append((char*)sid,128); + + // Fix Me: should use uuid in future + std::string rd = ""; + srand(time(NULL)); + for(int i=0;i<32;i++) + { + rd = rd + char(rand()%10+'0'); + } + suuid = rd; + + Srs28181Listener * ltn = NULL; + if( type == SrsListener28181UdpStream ){ + ltn = new Srs28181UdpStreamListener(suuid); + } + else if(SrsListener28181UdpStream){ + ltn = new Srs28181TcpStreamListener(); + } + else{ + return srs_error_new(13026, "28181 listener creation"); + } + + int port = 0; + alloc_port(&port); + ltn_port= port; + + // using default port for testing + // port = 20090; + srs_trace("28181-stream-server: start a new listener on %s-%d stream_uuid:%s", + srs_any_address_for_listener().c_str(),port,suuid.c_str()); + + if ((err = ltn->listen(srs_any_address_for_listener(),port)) != srs_success) { + free_port(port,port+2); + srs_freep(ltn); + return srs_error_wrap(err, "28181 listener creation"); + } + + + + + listeners.push_back(ltn); + + return err; +} + +srs_error_t Srs28181StreamServer::release_listener() +{ + srs_error_t err = srs_success; + return err; +} + +srs_error_t Srs28181StreamServer::alloc_port(int* pport) +{ + srs_error_t err = srs_success; + + int i = 0; + // use a pair of port. + for (i = local_port_min; i < local_port_max - 1; i += 2) { + if (!used_ports[i]) { + used_ports[i] = true; + used_ports[i + 1] = true; + *pport = i; + break; + } + } + + if(i>= local_port_max - 1){ + return srs_error_new(10020,"listen port alloc failed!"); + } + + srs_info("28181 tcp stream: alloc port=%d-%d", *pport, *pport + 1); + + return err; +} + +void Srs28181StreamServer::free_port(int lpmin, int lpmax) +{ + for (int i = lpmin; i < lpmax; i++) { + used_ports[i] = false; + } + srs_trace("28181stream: free rtp port=%d-%d", lpmin, lpmax); +} + + +void Srs28181StreamServer::remove() +{ + /* + std::vector::iterator it = find(clients.begin(), clients.end(), conn); + if (it != clients.end()) { + clients.erase(it); + } + srs_info("rtsp: remove connection from caster."); + + srs_freep(conn); + */ +} + +Srs28181Listener::Srs28181Listener() +{ + port = 0; + //server = NULL; + //type = ""; +} + +Srs28181Listener::~Srs28181Listener() +{ +} +/* +SrsListenerType Srs28181Listener::listen_type() +{ + //return type; +}*/ + +Srs28181TcpStreamListener::Srs28181TcpStreamListener() +{ + listener = NULL; +} + +//(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) +/* +Srs28181TcpStreamListener::Srs28181TcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) +{ + listener = NULL; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(type == SrsListenerGB28181TcpStream); + if (type == SrsListenerGB28181TcpStream) { + caster = new Srs28181Caster(c); + } +} +*/ + +Srs28181TcpStreamListener::~Srs28181TcpStreamListener() +{ + std::vector::iterator it; + for (it = clients.begin(); it != clients.end(); ++it) { + Srs28181TcpStreamConn* conn = *it; + srs_freep(conn); + } + clients.clear(); + + //srs_freep(caster); + srs_freep(listener); +} + +srs_error_t Srs28181TcpStreamListener::listen(string i, int p) +{ + srs_error_t err = srs_success; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + //srs_assert(type == SrsListenerRtsp); + + std::string ip = i; + int port = p; + + srs_freep(listener); + listener = new SrsTcpListener(this, ip, port); + + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "28181 listen %s:%d", ip.c_str(), port); + } + + //string v = srs_listener_type2string(type); + srs_trace("%s listen at tcp://%s:%d, fd=%d", "v.c_str()", ip.c_str(), port, listener->fd()); + + return err; +} + +srs_error_t Srs28181TcpStreamListener::on_tcp_client(srs_netfd_t stfd) +{ + srs_error_t err = srs_success; + + if(clients.size()>=1){ + return srs_error_wrap(err,"only allow one src!"); + } + + std::string output = "output_temple"; + Srs28181TcpStreamConn * conn = new Srs28181TcpStreamConn(this, stfd, output); + srs_trace("28181- listener(0x%x): accept a new connection(0x%x)",this,conn); + + if((err = conn->init())!=srs_success){ + srs_freep(conn); + return srs_error_wrap(err,"28181 stream conn init"); + } + clients.push_back(conn); + + return err; +} + +srs_error_t Srs28181TcpStreamListener::remove_conn(Srs28181TcpStreamConn* c) +{ + srs_error_t err = srs_success; + //srs_error_new(ERROR_THREAD_DISPOSED, "disposed"); + + std::vector::iterator it = find(clients.begin(), clients.end(), c); + if (it != clients.end()) { + clients.erase(it); + } + srs_info("28181 - listener: remove connection."); + + return err; +} + +Srs28181UdpStreamListener::Srs28181UdpStreamListener(std::string suuid) +{ + listener = NULL; + streamcore = new Srs28181StreamCore(suuid); +} + +Srs28181UdpStreamListener::~Srs28181UdpStreamListener() +{ + srs_freep(streamcore); + srs_freep(listener); + srs_trace("Srs28181UdpStreamListener deconstruction!"); +} + +srs_error_t Srs28181UdpStreamListener::listen(string i, int p) +{ + srs_error_t err = srs_success; + + ip = i; + port = p; + + srs_freep(listener); + listener = new SrsUdpListener(this, ip, port); + + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); + } + + // notify the handler the fd changed. + if ((err = on_stfd_change(listener->stfd())) != srs_success) { + return srs_error_wrap(err, "notify fd change failed"); + } + + //string v = srs_listener_type2string(type); + srs_trace("%s listen 28181 stream at udp://%s:%d, fd=%d", "v.c_str()", ip.c_str(), port, listener->fd()); + + return err; } +srs_error_t Srs28181UdpStreamListener::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + + int ret = streamcore->decode_packet_v2(buf,nb_buf); + //srs_trace("28181 udp stream: recv size:%d", nb_buf); + if (ret != 0) { + return srs_error_new(ret, "process 28181 udp stream"); + } + + return err; +} + + +Srs28181StreamCore::Srs28181StreamCore(std::string suuid)//(Srs28181TcpStreamListener* l, std::string o) +{ + //output_template = o; + + target_tcUrl = "rtmp://127.0.0.1:7935/live/"+suuid;//"rtmp://127.0.0.1:" + "7935" + "/live/test"; + output_template = "rtmp://127.0.0.1:7935/[app]/[stream]"; + + session = ""; + video_rtp = NULL; + audio_rtp = NULL; + + // TODO: set stream_id when connected + stream_id = 50125; + video_id = stream_id; + + boundary_type_ = TimestampBoundary; + + //listener = l; + ////caster = c; + //stfd = fd; + + //skt = new SrsStSocket(); + ////rtsp = new SrsRtspStack(skt); + //trd = new SrsSTCoroutine("28181tcpstream", this); + + //req = NULL; + sdk = NULL; + vjitter = new SrsRtspJitter(); + ajitter = new SrsRtspJitter(); + + avc = new SrsRawH264Stream(); + aac = new SrsRawAacStream(); + acodec = new SrsRawAacStreamCodec(); + acache = new SrsRtspAudioCache(); + pprint = SrsPithyPrint::create_caster(); +} + +Srs28181StreamCore::~Srs28181StreamCore() +{ + close(); + //srs_close_stfd(stfd); + + srs_freep(video_rtp); + srs_freep(audio_rtp); + + //srs_freep(trd); + //srs_freep(skt); + ////srs_freep(rtsp); + + srs_freep(sdk); + ////srs_freep(req); + + srs_freep(vjitter); + srs_freep(ajitter); + srs_freep(acodec); + srs_freep(acache); + srs_freep(pprint); +} +/* +srs_error_t Srs28181StreamCore::init() +{ + srs_error_t err = srs_success; + + if ((err = skt->initialize(stfd)) != srs_success) { + return srs_error_wrap(err, "socket initialize"); + } + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtsp connection"); + } + + return err; +}*/ + +/* +#define SRS_RECV_BUFFER_SIZE 1024*10 +srs_error_t Srs28181TcpStreamConn::do_cycle() +{ + srs_error_t err = srs_success; + + // retrieve ip of client. + std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); + if (ip.empty() && !_srs_config->empty_ip_ok()) { + srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); + } + srs_trace("28181: serve %s", ip.c_str()); + + char buffer[SRS_RECV_BUFFER_SIZE]; + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "28181 conn do_cycle"); + } + + ssize_t nb_read = 0; + if ((err = skt->read(buffer, SRS_RECV_BUFFER_SIZE, &nb_read)) != srs_success) { + return srs_error_wrap(err, "recv data"); + } + + decode_packet(buffer,nb_read); + } + + // make it happy + return err; +}*/ + +#define GB28181_STREAM +srs_error_t Srs28181StreamCore::on_stream_packet(SrsRtpPacket* pkt, int stream_id) +{ + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + if (stream_id == video_id) { + // rtsp tbn is ts tbn. + int64_t pts = pkt->timestamp; + if ((err = vjitter->correct(pts)) != srs_success) { + return srs_error_wrap(err, "jitter"); + } + + // TODO: FIXME: set dts to pts, please finger out the right dts. + int64_t dts = pts; + #ifdef GB28181_STREAM + return on_stream_video(pkt,dts,pts); + #else + return on_rtp_video(pkt, dts, pts); + #endif + + } else { + // rtsp tbn is ts tbn. + int64_t pts = pkt->timestamp; + if ((err = ajitter->correct(pts)) != srs_success) { + return srs_error_wrap(err, "jitter"); + } + + return on_rtp_audio(pkt, pts); + } + + return err; +} + + +srs_error_t Srs28181StreamCore::on_stream_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +{ + + //int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; + + if (pkt->tgtstream->length() <= 0) { + srs_trace("28181streamcore - empty stream, will continue"); + return err; + } + + SrsBuffer stream(pkt->tgtstream->bytes(), pkt->tgtstream->length()); + + // send each frame. + // TODO: bks: find i frame then return directory. dont need compare every bytes + while (!stream.empty()) { + char* frame = NULL; + int frame_size = 0; + + if ((err = avc->annexb_demux(&stream, &frame, &frame_size)) != srs_success) { + // i do not care + srs_warn("28181streamcore - waring: no nalu in buffer.[%d]",srs_error_code(err)); + return srs_success; + //return srs_error_wrap(err,"annexb demux"); + } + + // for highly reliable. Only give notification but exit + if (frame_size <= 0) { + srs_warn("h264 stream: frame_size <=0, and continue for next loop!"); + continue; + } + + //if ((ret = avc->annexb_demux(stream, &frame, &frame_size)) != ERROR_SUCCESS) { + // return ret; + //} + + // ignore others. + // 5bits, 7.3.1 NAL unit syntax, + // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame, 9: AUD + SrsAvcNaluType nut = (SrsAvcNaluType)(frame[0] & 0x1f); + if (nut != SrsAvcNaluTypeSPS && nut != SrsAvcNaluTypePPS //&& nut != SrsAvcNaluTypeSEI + && nut != SrsAvcNaluTypeIDR && nut != SrsAvcNaluTypeNonIDR + && nut != SrsAvcNaluTypeAccessUnitDelimiter + ) { + //srs_trace("h264-ps stream: Ignore this frame size=%d, dts=%d", frame_size, dts); + continue; + } + + + // for sps + if (avc->is_sps(frame, frame_size)) { + std::string sps = ""; + if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { + srs_error("h264-ps: invalied sps in dts=%d",dts); + continue; + //return ret; + } + + if (h264_sps != sps) { + h264_sps = sps; + h264_sps_changed = true; + h264_sps_pps_sent = false; + srs_trace("h264-ps stream: set SPS frame size=%d, dts=%d", frame_size, dts); + } + } + + // for pps + if (avc->is_pps(frame, frame_size)) { + std::string pps = ""; + if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { + srs_error("h264-ps: invalied sps in dts=%d", dts); + continue; + //return ret; + } + + if (h264_pps != pps) { + h264_pps = pps; + h264_pps_changed = true; + h264_sps_pps_sent = false; + srs_trace("h264-ps stream: set PPS frame size=%d, dts=%d", frame_size, dts); + } + } + + // attention: now, we set sps/pps + if (h264_sps_changed && h264_pps_changed) { + + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = true; + + if ((err = write_h264_sps_pps(dts / 90, pts / 90)) != srs_success) { + srs_error("h264-ps stream: Re-write SPS-PPS Wrong! frame size=%d, dts=%d", frame_size, dts); + return srs_error_wrap(err,"re-write sps-pps failed"); + } + srs_warn("h264-ps stream: Re-write SPS-PPS Successful! frame size=%d, dts=%d", frame_size, dts); + } + + //besson: make sure you control flows in one important function + //dont spread controlers everythere. mpegts_upd is not a good example + + // attention: should ship sps/pps frame in every tsb rtp group + // otherwise sps/pps will be written as ipb frame! + if (h264_sps_pps_sent && nut != SrsAvcNaluTypeSPS && nut != SrsAvcNaluTypePPS) { + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + srs_warn("h264-ps stream: kickoff audio cache dts=%d", dts); + return srs_error_wrap(err,"killoff audio cache failed"); + } + + // ibp frame. + // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. + srs_info("h264-ps stream: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts / 90, pts / 90)) != srs_success) { + return srs_error_wrap(err,"write ibp failed"); + } + } + }//while send frame + + return err; +} + +/* +srs_error_t Srs28181StreamCore::cycle() +{ + // serve the rtsp client. + srs_error_t err = do_cycle(); + + //caster->remove(this); + if (err == srs_success) { + srs_trace("client finished."); + } else if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. code=%d", srs_error_code(err)); + srs_freep(err); + } + + listener->remove_conn(this); + + // do not need caster anymore + // if (video_rtp) { + // caster->free_port(video_rtp->port(), video_rtp->port() + 1); + // } + + // if (audio_rtp) { + // caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); + // } + + return err; +} +*/ + +srs_error_t Srs28181StreamCore::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +{ + srs_error_t err = srs_success; + + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + return srs_error_wrap(err, "kickoff audio cache"); + } + + char* bytes = pkt->payload->bytes(); + int length = pkt->payload->length(); + uint32_t fdts = (uint32_t)(dts / 90); + uint32_t fpts = (uint32_t)(pts / 90); + if ((err = write_h264_ipb_frame(bytes, length, fdts, fpts)) != srs_success) { + return srs_error_wrap(err, "write ibp frame"); + } + + return err; +} + +srs_error_t Srs28181StreamCore::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) +{ + srs_error_t err = srs_success; + + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + return srs_error_wrap(err, "kickoff audio cache"); + } + + // cache current audio to kickoff. + acache->dts = dts; + acache->audio = pkt->audio; + acache->payload = pkt->payload; + + pkt->audio = NULL; + pkt->payload = NULL; + + return err; +} + +srs_error_t Srs28181StreamCore::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) +{ + srs_error_t err = srs_success; + + // nothing to kick off. + if (!acache->payload) { + return err; + } + + if (dts - acache->dts > 0 && acache->audio->nb_samples > 0) { + int64_t delta = (dts - acache->dts) / acache->audio->nb_samples; + for (int i = 0; i < acache->audio->nb_samples; i++) { + char* frame = acache->audio->samples[i].bytes; + int nb_frame = acache->audio->samples[i].size; + int64_t timestamp = (acache->dts + delta * i) / 90; + acodec->aac_packet_type = 1; + if ((err = write_audio_raw_frame(frame, nb_frame, acodec, (uint32_t)timestamp)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + } + + acache->dts = 0; + srs_freep(acache->audio); + srs_freep(acache->payload); + + return err; +} + +// TODO: modify return type +// can decode raw rtp+h264 or rtp+ps+h264 +#define PS_IN_RTP +int Srs28181StreamCore::decode_packet(char* buf, int nb_buf) +{ + int ret = 0; + int status; + + pprint->elapse(); + + if (true) { + SrsBuffer stream(buf,nb_buf); + + //if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { + // return ret; + //} + + SrsRtpPacket pkt; + if ((ret = pkt.decode_v2(&stream)) != ERROR_SUCCESS) { + srs_error("28181: decode rtp packet failed. ret=%d", ret); + return ret; + } + + if (pkt.chunked) { + if (!cache_) { + cache_ = new SrsRtpPacket(); + } + cache_->copy(&pkt); + cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + + /* + if (!cache->completed && pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", + nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, + cache->payload->length() + ); + return ret; + }*/ + + //besson: correct rtp decode bug + if (!cache_->completed) { + return ret; + } + + } + else { + // : NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, + // the following progress will skip this ncompleted packet + srs_freep(cache_); + cache_ = new SrsRtpPacket(); + cache_->reap(&pkt); + + } + } + + if (pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d", + stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, + cache_->payload->length(), cache_->chunked + ); + } + + // always free it. + SrsAutoFree(SrsRtpPacket, cache_); + +#ifdef PS_IN_RTP + // ps stream + if ((status = cache_->decode_stream()) != ERROR_SUCCESS) { + if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { + //private_proto = true; + //only mention once + srs_error(" rtp type 96 ps. stream_id:%d", stream_id); + } + } +#else + // only rtp no ps + cache_->tgtstream->append(cache_->payload->bytes(),cache_->payload->length()); +#endif + + srs_error_t err = srs_success; + if ((err = on_stream_packet(cache_, stream_id)) != srs_success) { + srs_error("28181: process rtp packet failed. ret=%d",err->error_code(err) ); + return -1; + } + + return ret; +} + +// TODO: modify return type +int Srs28181StreamCore::decode_packet_v2(char* buf, int nb_buf) +{ + int ret = 0; + int status; + + pprint->elapse(); + + if (true) { + SrsBuffer stream(buf,nb_buf); + + /*if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { + return ret; + }*/ + + SrsRtpPacket pkt; + if ((ret = pkt.decode_v2(&stream, boundary_type_)) != ERROR_SUCCESS) { + srs_error("rtp auto decoder: decode rtp packet failed. ret=%d", ret); + return ret; + } + + if (pkt.chunked) { + if (!cache_) { + cache_ = new SrsRtpPacket(); + } + + if (boundary_type_ == MarkerBoundary) { + cache_->copy(&pkt); + cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + } + else if (boundary_type_ == TimestampBoundary) { + + // there is two conditions: + // 1.ts changing every rtp packet + // 2.ts changing every x rtp packets + // in any case, we should first copy the cached rtp packet from last loop + // cause we use ts boundary to decode rtp group, we determinte a group end after a new group beginng + if (first_rtp_tsb_enabled_) { + first_rtp_tsb_enabled_ = false; + + if (!first_rtp_tsb_) { + srs_error("rtp auto decoder: first_rtp_tsb_ is NULL!"); + ret = ERROR_RTP_PS_FIRST_TSB_LOSS; + return ret; + //srs_assert(first_rtp_tsb_==NULL); + } + + cache_->copy(first_rtp_tsb_); + cache_->payload->append(first_rtp_tsb_->payload->bytes(), first_rtp_tsb_->payload->length()); + srs_freep(first_rtp_tsb_); + } + + if (pkt.timestamp != cache_->timestamp) { + + // if timestamp change, enable flag and cache the first new rtp packet in group + first_rtp_tsb_enabled_ = true; + + srs_freep(first_rtp_tsb_); + first_rtp_tsb_ = new SrsRtpPacket(); + first_rtp_tsb_->copy(&pkt); + first_rtp_tsb_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + + cache_->completed = true; + } + else { + cache_->copy(&pkt); + cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + cache_->completed = false; + } + } + else { + srs_error("Unkonown rtp boundary type!"); + } + + /* + if (!cache->completed && pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", + nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, + cache->payload->length() + ); + return ret; + }*/ + + // correct rtp decode bug + if (!cache_->completed) { + return ret; + } + + } + else { + // besson: NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, + // the following progress will skip this ncompleted packet + srs_freep(cache_); + cache_ = new SrsRtpPacket(); + cache_->reap(&pkt); + + } + } + + if (pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d, boundary type=%s", + stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, + cache_->payload->length(), cache_->chunked, boundary_type_==MarkerBoundary?"MKR":"TSB" + ); + } + + // always free it. + SrsAutoFree(SrsRtpPacket, cache_); + + // ps stream + if ((status = cache_->decode_stream()) != 0) { + if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { + //private_proto = true; + //only mention once + srs_error(" rtp type 96 ps. private proto port:%d, stream_id:%d", 0, stream_id); + } + } + + srs_error_t err = srs_success; + if ((err = on_stream_packet(cache_, stream_id)) != srs_success) { + srs_error("rtp auto decoder: process rtp packet failed. ret=%d",srs_error_code(err) ); + //invalid_rtp_num_++; + return -1; + } + + return ret; +} + +srs_error_t Srs28181StreamCore::write_sequence_header() +{ + srs_error_t err = srs_success; + + // use the current dts. + int64_t dts = vjitter->timestamp() / 90; + + // send video sps/pps + if ((err = write_h264_sps_pps((uint32_t)dts, (uint32_t)dts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + + // generate audio sh by audio specific config. + if (true) { + std::string sh = aac_specific_config; + + SrsFormat* format = new SrsFormat(); + SrsAutoFree(SrsFormat, format); + + if ((err = format->on_aac_sequence_header((char*)sh.c_str(), (int)sh.length())) != srs_success) { + return srs_error_wrap(err, "on aac sequence header"); + } + + SrsAudioCodecConfig* dec = format->acodec; + + acodec->sound_format = SrsAudioCodecIdAAC; + acodec->sound_type = (dec->aac_channels == 2)? SrsAudioChannelsStereo : SrsAudioChannelsMono; + acodec->sound_size = SrsAudioSampleBits16bit; + acodec->aac_packet_type = 0; + + static int srs_aac_srates[] = { + 96000, 88200, 64000, 48000, + 44100, 32000, 24000, 22050, + 16000, 12000, 11025, 8000, + 7350, 0, 0, 0 + }; + switch (srs_aac_srates[dec->aac_sample_rate]) { + case 11025: + acodec->sound_rate = SrsAudioSampleRate11025; + break; + case 22050: + acodec->sound_rate = SrsAudioSampleRate22050; + break; + case 44100: + acodec->sound_rate = SrsAudioSampleRate44100; + break; + default: + break; + }; + + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (uint32_t)dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + + return err; +} + +srs_error_t Srs28181StreamCore::write_h264_sps_pps(uint32_t dts, uint32_t pts) +{ + srs_error_t err = srs_success; + + // h264 raw to h264 packet. + std::string sh; + if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + + // h264 packet to flv packet. + int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; + int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; + char* flv = NULL; + int nb_flv = 0; + if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); + } + + return err; +} + +srs_error_t Srs28181StreamCore::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +{ + srs_error_t err = srs_success; + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsVideoAvcFrameTypeKeyFrame; + } + + std::string ibp; + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux ibp frame"); + } + + int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; + char* flv = NULL; + int nb_flv = 0; + if ((err = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); +} + +srs_error_t Srs28181StreamCore::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) +{ + srs_error_t err = srs_success; + + char* data = NULL; + int size = 0; + if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); + } + + return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); +} + +srs_error_t Srs28181StreamCore::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) +{ + srs_error_t err = srs_success; + + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + SrsSharedPtrMessage* msg = NULL; + + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + srs_assert(msg); + + // send out encoded msg. + if ((err = sdk->send_and_free_message(msg)) != srs_success) { + close(); + return srs_error_wrap(err, "write message"); + } + + return err; +} + +#define H264PS_STREAM_TEST +srs_error_t Srs28181StreamCore::connect() +{ + srs_error_t err = srs_success; + + // Ignore when connected. + if (sdk) { + return err; + } + + // generate rtmp url to connect to. + std::string url; + //if (!req) { + if(target_tcUrl != ""){ + std::string schema, host, vhost, app, param; + int port; + srs_discovery_tc_url(target_tcUrl, schema, host, vhost, app, stream_name, port, param); + + // generate output by template. + std::string output = output_template; + output = srs_string_replace(output, "[app]", app); + output = srs_string_replace(output, "[stream]", stream_name); + url = output; + } + + // Fix Me: MUST use identified url in future + url = target_tcUrl; + + srs_trace("28181 stream - target_tcurl:%s,stream_name:%s, url:%s", + target_tcUrl.c_str(),stream_name.c_str(),url.c_str()); + + // connect host. + srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; + srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + sdk = new SrsSimpleRtmpClient(url, cto, sto); + + if ((err = sdk->connect()) != srs_success) { + close(); + return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + // publish. + if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { + close(); + return srs_error_wrap(err, "publish %s failed", url.c_str()); + } + +#ifdef H264PS_STREAM_TEST + return err; +#else + return write_sequence_header(); +#endif +} + +void Srs28181StreamCore::close() +{ + srs_freep(sdk); +} + + + + + + + +Srs28181TcpStreamConn::Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o) +{ + output_template = o; + + target_tcUrl = "rtmp://127.0.0.1:7935/live/test";//"rtmp://127.0.0.1:" + "7935" + "/live/test"; + output_template = "rtmp://127.0.0.1:7935/[app]/[stream]"; + + session = ""; + video_rtp = NULL; + audio_rtp = NULL; + + // TODO: set stream_id when connected + stream_id = 50125; + video_id = stream_id; + + listener = l; + //caster = c; + stfd = fd; + skt = new SrsStSocket(); + //rtsp = new SrsRtspStack(skt); + trd = new SrsSTCoroutine("28181tcpstream", this); + + //req = NULL; + sdk = NULL; + vjitter = new SrsRtspJitter(); + ajitter = new SrsRtspJitter(); + + avc = new SrsRawH264Stream(); + aac = new SrsRawAacStream(); + acodec = new SrsRawAacStreamCodec(); + acache = new SrsRtspAudioCache(); + pprint = SrsPithyPrint::create_caster(); +} + +Srs28181TcpStreamConn::~Srs28181TcpStreamConn() +{ + close(); + + srs_close_stfd(stfd); + + srs_freep(video_rtp); + srs_freep(audio_rtp); + + srs_freep(trd); + srs_freep(skt); + //srs_freep(rtsp); + + srs_freep(sdk); + //srs_freep(req); + + srs_freep(vjitter); + srs_freep(ajitter); + srs_freep(acodec); + srs_freep(acache); + srs_freep(pprint); +} + +srs_error_t Srs28181TcpStreamConn::init() +{ + srs_error_t err = srs_success; + + if ((err = skt->initialize(stfd)) != srs_success) { + return srs_error_wrap(err, "socket initialize"); + } + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtsp connection"); + } + + return err; +} + +#define SRS_RECV_BUFFER_SIZE 1024*10 +srs_error_t Srs28181TcpStreamConn::do_cycle() +{ + srs_error_t err = srs_success; + + // retrieve ip of client. + std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); + if (ip.empty() && !_srs_config->empty_ip_ok()) { + srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); + } + srs_trace("28181: serve %s", ip.c_str()); + + char buffer[SRS_RECV_BUFFER_SIZE]; + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "28181 conn do_cycle"); + } + + ssize_t nb_read = 0; + if ((err = skt->read(buffer, SRS_RECV_BUFFER_SIZE, &nb_read)) != srs_success) { + return srs_error_wrap(err, "recv data"); + } + + decode_packet(buffer,nb_read); + } + + // make it happy + return err; +} + +//#define GB28181_STREAM +srs_error_t Srs28181TcpStreamConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) +{ + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + if (stream_id == video_id) { + // rtsp tbn is ts tbn. + int64_t pts = pkt->timestamp; + if ((err = vjitter->correct(pts)) != srs_success) { + return srs_error_wrap(err, "jitter"); + } + + // TODO: FIXME: set dts to pts, please finger out the right dts. + int64_t dts = pts; + #ifdef GB28181_STREAM + return on_rtp_video_adv(pkt,dts,pts); + #else + return on_rtp_video(pkt, dts, pts); + #endif + + } else { + // rtsp tbn is ts tbn. + int64_t pts = pkt->timestamp; + if ((err = ajitter->correct(pts)) != srs_success) { + return srs_error_wrap(err, "jitter"); + } + + return on_rtp_audio(pkt, pts); + } + + return err; +} + + +srs_error_t Srs28181TcpStreamConn::on_rtp_video_adv(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +{ + + //int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; + + if (pkt->tgtstream->length() <= 0) { + return srs_error_new(13002,"tetstream not enough"); + } + + SrsBuffer stream(pkt->tgtstream->bytes(), pkt->tgtstream->length()); + /*SrsBuffer stream; + if ((ret = stream.initialize(pkt->tgtstream->bytes(), pkt->tgtstream->length()))!=ERROR_SUCCESS) { + srs_trace("h264-ps stream: inValid Frame Size, frame size=%d, dts=%d", pkt->tgtstream->length(), dts); + return ret; + }*/ + + // send each frame. + // TODO: bks: find i frame then return directory. dont need compare every bytes + while (!stream.empty()) { + char* frame = NULL; + int frame_size = 0; + + if ((err = avc->annexb_demux(&stream, &frame, &frame_size)) != srs_success) { + return srs_error_wrap(err,"annexb demux"); + } + + // for highly reliable. Only give notification but exit + if (frame_size <= 0) { + srs_warn("h264 stream: frame_size <=0, and continue for next loop!"); + continue; + } + + //if ((ret = avc->annexb_demux(stream, &frame, &frame_size)) != ERROR_SUCCESS) { + // return ret; + //} + + // ignore others. + // 5bits, 7.3.1 NAL unit syntax, + // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame, 9: AUD + SrsAvcNaluType nut = (SrsAvcNaluType)(frame[0] & 0x1f); + if (nut != SrsAvcNaluTypeSPS && nut != SrsAvcNaluTypePPS //&& nut != SrsAvcNaluTypeSEI + && nut != SrsAvcNaluTypeIDR && nut != SrsAvcNaluTypeNonIDR + && nut != SrsAvcNaluTypeAccessUnitDelimiter + ) { + //srs_trace("h264-ps stream: Ignore this frame size=%d, dts=%d", frame_size, dts); + continue; + } + + + // for sps + if (avc->is_sps(frame, frame_size)) { + std::string sps = ""; + if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { + srs_error("h264-ps: invalied sps in dts=%d",dts); + continue; + //return ret; + } + + if (h264_sps != sps) { + h264_sps = sps; + h264_sps_changed = true; + h264_sps_pps_sent = false; + srs_trace("h264-ps stream: set SPS frame size=%d, dts=%d", frame_size, dts); + } + } + + // for pps + if (avc->is_pps(frame, frame_size)) { + std::string pps = ""; + if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { + srs_error("h264-ps: invalied sps in dts=%d", dts); + continue; + //return ret; + } + + if (h264_pps != pps) { + h264_pps = pps; + h264_pps_changed = true; + h264_sps_pps_sent = false; + srs_trace("h264-ps stream: set PPS frame size=%d, dts=%d", frame_size, dts); + } + } + + // attention: now, we set sps/pps + if (h264_sps_changed && h264_pps_changed) { + + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = true; + + if ((err = write_h264_sps_pps(dts / 90, pts / 90)) != srs_success) { + srs_error("h264-ps stream: Re-write SPS-PPS Wrong! frame size=%d, dts=%d", frame_size, dts); + return srs_error_wrap(err,"re-write sps-pps failed"); + } + srs_warn("h264-ps stream: Re-write SPS-PPS Successful! frame size=%d, dts=%d", frame_size, dts); + } + + //besson: make sure you control flows in one important function + //dont spread controlers everythere. mpegts_upd is not a good example + + // attention: should ship sps/pps frame in every tsb rtp group + // otherwise sps/pps will be written as ipb frame! + if (h264_sps_pps_sent && nut != SrsAvcNaluTypeSPS && nut != SrsAvcNaluTypePPS) { + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + srs_warn("h264-ps stream: kickoff audio cache dts=%d", dts); + return srs_error_wrap(err,"killoff audio cache failed"); + } + + // ibp frame. + // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. + srs_info("h264-ps stream: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts / 90, pts / 90)) != srs_success) { + return srs_error_wrap(err,"write ibp failed"); + } + } + }//while send frame + + return err; +} + +srs_error_t Srs28181TcpStreamConn::cycle() +{ + // serve the rtsp client. + srs_error_t err = do_cycle(); + + //caster->remove(this); + if (err == srs_success) { + srs_trace("client finished."); + } else if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. code=%d", srs_error_code(err)); + srs_freep(err); + } + + listener->remove_conn(this); + + /* do not need caster anymore + if (video_rtp) { + caster->free_port(video_rtp->port(), video_rtp->port() + 1); + } + + if (audio_rtp) { + caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); + }*/ + + return err; +} + +srs_error_t Srs28181TcpStreamConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +{ + srs_error_t err = srs_success; + + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + return srs_error_wrap(err, "kickoff audio cache"); + } + + char* bytes = pkt->payload->bytes(); + int length = pkt->payload->length(); + uint32_t fdts = (uint32_t)(dts / 90); + uint32_t fpts = (uint32_t)(pts / 90); + if ((err = write_h264_ipb_frame(bytes, length, fdts, fpts)) != srs_success) { + return srs_error_wrap(err, "write ibp frame"); + } + + return err; +} + +srs_error_t Srs28181TcpStreamConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) +{ + srs_error_t err = srs_success; + + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + return srs_error_wrap(err, "kickoff audio cache"); + } + + // cache current audio to kickoff. + acache->dts = dts; + acache->audio = pkt->audio; + acache->payload = pkt->payload; + + pkt->audio = NULL; + pkt->payload = NULL; + + return err; +} + +srs_error_t Srs28181TcpStreamConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) +{ + srs_error_t err = srs_success; + + // nothing to kick off. + if (!acache->payload) { + return err; + } + + if (dts - acache->dts > 0 && acache->audio->nb_samples > 0) { + int64_t delta = (dts - acache->dts) / acache->audio->nb_samples; + for (int i = 0; i < acache->audio->nb_samples; i++) { + char* frame = acache->audio->samples[i].bytes; + int nb_frame = acache->audio->samples[i].size; + int64_t timestamp = (acache->dts + delta * i) / 90; + acodec->aac_packet_type = 1; + if ((err = write_audio_raw_frame(frame, nb_frame, acodec, (uint32_t)timestamp)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + } + + acache->dts = 0; + srs_freep(acache->audio); + srs_freep(acache->payload); + + return err; +} + +// TODO: modify return type +// can decode raw rtp+h264 or rtp+ps+h264 +int Srs28181TcpStreamConn::decode_packet(char* buf, int nb_buf) +{ + int ret = 0; + int status; + + pprint->elapse(); + + if (true) { + SrsBuffer stream(buf,nb_buf); + + //if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { + // return ret; + //} + + SrsRtpPacket pkt; + if ((ret = pkt.decode_v2(&stream)) != ERROR_SUCCESS) { + srs_error("28181: decode rtp packet failed. ret=%d", ret); + return ret; + } + + if (pkt.chunked) { + if (!cache_) { + cache_ = new SrsRtpPacket(); + } + cache_->copy(&pkt); + cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + + /* + if (!cache->completed && pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", + nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, + cache->payload->length() + ); + return ret; + }*/ + + //besson: correct rtp decode bug + if (!cache_->completed) { + return ret; + } + + } + else { + // : NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, + // the following progress will skip this ncompleted packet + srs_freep(cache_); + cache_ = new SrsRtpPacket(); + cache_->reap(&pkt); + + } + } + + // TODO: set stream_id when connected + stream_id = 50125; + + if (pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d", + stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, + cache_->payload->length(), cache_->chunked + ); + } + + // always free it. + SrsAutoFree(SrsRtpPacket, cache_); + +#ifdef PS_IN_RTP + // ps stream + if ((status = cache_->decode_stream()) != ERROR_SUCCESS) { + if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { + //private_proto = true; + //only mention once + srs_error(" rtp type 96 ps. stream_id:%d", stream_id); + } + } +#endif + + + srs_error_t err = srs_success; + if ((err = on_rtp_packet(cache_, stream_id)) != srs_success) { + srs_error("28181: process rtp packet failed. ret=%d", ret); + return -1; + } + + return ret; +} + +// TODO: modify return type +int Srs28181TcpStreamConn::decode_packet_v2(char* buf, int nb_buf) +{ + int ret = 0; + int status; + + pprint->elapse(); + + if (true) { + SrsBuffer stream(buf,nb_buf); + + /*if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { + return ret; + }*/ + + SrsRtpPacket pkt; + if ((ret = pkt.decode_v2(&stream, boundary_type_)) != ERROR_SUCCESS) { + srs_error("rtp auto decoder: decode rtp packet failed. ret=%d", ret); + return ret; + } + + if (pkt.chunked) { + if (!cache_) { + cache_ = new SrsRtpPacket(); + } + + if (boundary_type_ == MarkerBoundary) { + cache_->copy(&pkt); + cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + } + else if (boundary_type_ == TimestampBoundary) { + + // there is two conditions: + // 1.ts changing every rtp packet + // 2.ts changing every x rtp packets + // in any case, we should first copy the cached rtp packet from last loop + // cause we use ts boundary to decode rtp group, we determinte a group end after a new group beginng + if (first_rtp_tsb_enabled_) { + first_rtp_tsb_enabled_ = false; + + if (!first_rtp_tsb_) { + srs_error("rtp auto decoder: first_rtp_tsb_ is NULL!"); + ret = ERROR_RTP_PS_FIRST_TSB_LOSS; + return ret; + //srs_assert(first_rtp_tsb_==NULL); + } + + cache_->copy(first_rtp_tsb_); + cache_->payload->append(first_rtp_tsb_->payload->bytes(), first_rtp_tsb_->payload->length()); + srs_freep(first_rtp_tsb_); + } + + if (pkt.timestamp != cache_->timestamp) { + + // if timestamp change, enable flag and cache the first new rtp packet in group + first_rtp_tsb_enabled_ = true; + + srs_freep(first_rtp_tsb_); + first_rtp_tsb_ = new SrsRtpPacket(); + first_rtp_tsb_->copy(&pkt); + first_rtp_tsb_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + + cache_->completed = true; + } + else { + cache_->copy(&pkt); + cache_->payload->append(pkt.payload->bytes(), pkt.payload->length()); + cache_->completed = false; + } + } + else { + srs_error("Unkonown rtp boundary type!"); + } + + /* + if (!cache->completed && pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB", + nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, + cache->payload->length() + ); + return ret; + }*/ + + //besson: correct rtp decode bug + if (!cache_->completed) { + return ret; + } + + } + else { + // besson: NOTE:if u receive from middle or stream loss starting rtp, will also deal this uncompleted packet, + // the following progress will skip this ncompleted packet + srs_freep(cache_); + cache_ = new SrsRtpPacket(); + cache_->reap(&pkt); + + } + } + + if (pprint->can_print()) { + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d, boundary type=%s", + stream_id, nb_buf, pprint->age(), cache_->version, cache_->payload_type, cache_->sequence_number, cache_->timestamp, cache_->ssrc, + cache_->payload->length(), cache_->chunked, boundary_type_==MarkerBoundary?"MKR":"TSB" + ); + } + + // always free it. + SrsAutoFree(SrsRtpPacket, cache_); + + // ps stream + if ((status = cache_->decode_stream()) != 0) { + if (status == ERROR_RTP_PS_HK_PRIVATE_PROTO) { + //private_proto = true; + //only mention once + srs_error(" rtp type 96 ps. private proto port:%d, stream_id:%d", 0, stream_id); + } + } + + srs_error_t err = srs_success; + if ((err = on_rtp_packet(cache_, stream_id)) != srs_success) { + srs_error("rtp auto decoder: process rtp packet failed. ret=%d", srs_error_code(err)); + //invalid_rtp_num_++; + return -1; + } + + return ret; +} + +srs_error_t Srs28181TcpStreamConn::write_sequence_header() +{ + srs_error_t err = srs_success; + + // use the current dts. + int64_t dts = vjitter->timestamp() / 90; + + // send video sps/pps + if ((err = write_h264_sps_pps((uint32_t)dts, (uint32_t)dts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + + // generate audio sh by audio specific config. + if (true) { + std::string sh = aac_specific_config; + + SrsFormat* format = new SrsFormat(); + SrsAutoFree(SrsFormat, format); + + if ((err = format->on_aac_sequence_header((char*)sh.c_str(), (int)sh.length())) != srs_success) { + return srs_error_wrap(err, "on aac sequence header"); + } + + SrsAudioCodecConfig* dec = format->acodec; + + acodec->sound_format = SrsAudioCodecIdAAC; + acodec->sound_type = (dec->aac_channels == 2)? SrsAudioChannelsStereo : SrsAudioChannelsMono; + acodec->sound_size = SrsAudioSampleBits16bit; + acodec->aac_packet_type = 0; + + static int srs_aac_srates[] = { + 96000, 88200, 64000, 48000, + 44100, 32000, 24000, 22050, + 16000, 12000, 11025, 8000, + 7350, 0, 0, 0 + }; + switch (srs_aac_srates[dec->aac_sample_rate]) { + case 11025: + acodec->sound_rate = SrsAudioSampleRate11025; + break; + case 22050: + acodec->sound_rate = SrsAudioSampleRate22050; + break; + case 44100: + acodec->sound_rate = SrsAudioSampleRate44100; + break; + default: + break; + }; + + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (uint32_t)dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + + return err; +} + +srs_error_t Srs28181TcpStreamConn::write_h264_sps_pps(uint32_t dts, uint32_t pts) +{ + srs_error_t err = srs_success; + + // h264 raw to h264 packet. + std::string sh; + if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + + // h264 packet to flv packet. + int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; + int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; + char* flv = NULL; + int nb_flv = 0; + if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); + } + + return err; +} + +srs_error_t Srs28181TcpStreamConn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +{ + srs_error_t err = srs_success; + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsVideoAvcFrameTypeKeyFrame; + } + + std::string ibp; + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux ibp frame"); + } + + int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; + char* flv = NULL; + int nb_flv = 0; + if ((err = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); +} + +srs_error_t Srs28181TcpStreamConn::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) +{ + srs_error_t err = srs_success; + + char* data = NULL; + int size = 0; + if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); + } + + return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); +} + +srs_error_t Srs28181TcpStreamConn::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) +{ + srs_error_t err = srs_success; + + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + SrsSharedPtrMessage* msg = NULL; + + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + srs_assert(msg); + + // send out encoded msg. + if ((err = sdk->send_and_free_message(msg)) != srs_success) { + close(); + return srs_error_wrap(err, "write message"); + } + + return err; +} + +srs_error_t Srs28181TcpStreamConn::connect() +{ + srs_error_t err = srs_success; + + // Ignore when connected. + if (sdk) { + return err; + } + + // generate rtmp url to connect to. + std::string url; + //if (!req) { + if(target_tcUrl != ""){ + std::string schema, host, vhost, app, param; + int port; + srs_discovery_tc_url(target_tcUrl, schema, host, vhost, app, stream_name, port, param); + + // generate output by template. + std::string output = output_template; + output = srs_string_replace(output, "[app]", app); + output = srs_string_replace(output, "[stream]", stream_name); + url = output; + } + + srs_trace("28181 stream - target_tcurl:%s,stream_name:%s, url:%s", + target_tcUrl.c_str(),stream_name.c_str(),url.c_str()); + + // connect host. + srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; + srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + sdk = new SrsSimpleRtmpClient(url, cto, sto); + + if ((err = sdk->connect()) != srs_success) { + close(); + return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + // publish. + if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { + close(); + return srs_error_wrap(err, "publish %s failed", url.c_str()); + } + + return write_sequence_header(); +} + +void Srs28181TcpStreamConn::close() +{ + srs_freep(sdk); +} \ No newline at end of file diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 0ca748535b..9ab89e3b7c 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -33,6 +33,8 @@ #include #include #include +#include + class SrsStSocket; class SrsRtspConn; @@ -49,8 +51,15 @@ struct SrsRawAacStreamCodec; class SrsSharedPtrMessage; class SrsAudioFrame; class SrsSimpleStream; +class SrsSimpleBufferX; class SrsPithyPrint; class SrsSimpleRtmpClient; +class SrsListener; +class Srs28181Listener; +class Srs28181TcpStreamListener; +class Srs28181TcpStreamConn; +class Srs28181StreamCore; + // A rtp connection which transport a stream. class SrsRtpConn: public ISrsUdpHandler @@ -73,12 +82,35 @@ class SrsRtpConn: public ISrsUdpHandler virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); }; +// may not need this +class SrsRtpOverTcpConn: public ISrsUdpHandler +{ +private: + SrsPithyPrint* pprint; + SrsUdpListener* listener; + SrsRtspConn* rtsp; + SrsRtpPacket* cache; + int stream_id; + int _port; +public: + SrsRtpOverTcpConn(SrsRtspConn* r, int p, int sid); + virtual ~SrsRtpOverTcpConn(); +public: + virtual int port(); + virtual srs_error_t listen(); +// Interface ISrsUdpHandler +public: + virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); +}; + // The audio cache, audio is grouped by frames. struct SrsRtspAudioCache { int64_t dts; SrsAudioFrame* audio; - SrsSimpleStream* payload; + //SrsSimpleStream* payload; + // TODO: may merge with 28181 someday + SrsSimpleBufferX* payload; SrsRtspAudioCache(); virtual ~SrsRtspAudioCache(); @@ -100,7 +132,7 @@ class SrsRtspJitter }; // The rtsp connection serve the fd. -class SrsRtspConn : public ISrsCoroutineHandler, public ISrsConnection +class SrsRtspConn : public ISrsCoroutineHandler { private: std::string output_template; @@ -143,7 +175,6 @@ class SrsRtspConn : public ISrsCoroutineHandler, public ISrsConnection virtual ~SrsRtspConn(); public: virtual srs_error_t serve(); - virtual std::string remote_ip(); private: virtual srs_error_t do_cycle(); // internal methods @@ -180,7 +211,6 @@ class SrsRtspCaster : public ISrsTcpHandler std::map used_ports; private: std::vector clients; - SrsCoroutineManager* manager; public: SrsRtspCaster(SrsConfDirective* c); virtual ~SrsRtspCaster(); @@ -190,7 +220,6 @@ class SrsRtspCaster : public ISrsTcpHandler virtual srs_error_t alloc_port(int* pport); // Free the alloced rtp port. virtual void free_port(int lpmin, int lpmax); - virtual srs_error_t initialize(); // Interface ISrsTcpHandler public: virtual srs_error_t on_tcp_client(srs_netfd_t stfd); @@ -198,6 +227,301 @@ class SrsRtspCaster : public ISrsTcpHandler public: virtual void remove(SrsRtspConn* conn); }; +/* +enum SrsRtpDecoderType +{ + TimestampBoundary = 1, + MarkerBoundary = 2, +};*/ + +// a gb28181 stream server +class Srs28181StreamServer +{ +private: + std::string output; + int local_port_min; + int local_port_max; + // The key: port, value: whether used. + std::map used_ports; +private: + // TODO: will expand for multi-listeners + std::vector listeners; +public: + Srs28181StreamServer(); + Srs28181StreamServer(SrsConfDirective* c); + virtual ~Srs28181StreamServer(); +public: + // create a 28181 stream listener + virtual srs_error_t create_listener(SrsListenerType type, int& ltn_port, std::string& suuid); + // release a listener + virtual srs_error_t release_listener(); + // Alloc a rtp port from local ports pool. + // @param pport output the rtp port. + virtual srs_error_t alloc_port(int* pport); + // Free the alloced rtp port. + virtual void free_port(int lpmin, int lpmax); + +public: + virtual void remove(); +}; + +// A common tcp listener, for RTMP/HTTP server. +class Srs28181Listener +{ +protected: + //SrsListenerType type; +protected: + std::string ip; + int port; + //SrsServer* server; +public: + Srs28181Listener(); + //Srs28181Listener(SrsServer* svr, SrsListenerType t); + virtual ~Srs28181Listener(); +public: + //virtual SrsListenerType listen_type(); + virtual srs_error_t listen(std::string i, int p) = 0; +}; + +// A TCP listener, for 28181 server. +//class Srs28181TcpStreamListener : public Srs28181Listener, public SrsTcpListener +class Srs28181TcpStreamListener : public Srs28181Listener, public ISrsTcpHandler +{ +private: + SrsTcpListener* listener; + //ISrsTcpHandler* caster; + + std::vector clients; +public: + Srs28181TcpStreamListener(); + //Srs28181TcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); + virtual ~Srs28181TcpStreamListener(); +public: + virtual srs_error_t listen(std::string i, int p); +// Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t remove_conn(Srs28181TcpStreamConn* c); +}; + +// 28181 udp stream linstener +class Srs28181UdpStreamListener : public Srs28181Listener, public ISrsUdpHandler +{ +protected: + SrsUdpListener* listener; + Srs28181StreamCore* streamcore; +public: + //Srs28181UdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c); + Srs28181UdpStreamListener(std::string suuid); + virtual ~Srs28181UdpStreamListener(); +public: + virtual srs_error_t listen(std::string i, int p); + virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); +}; + + + +// The 28181 connection serve the fd. +// copy from SrsRtstConn +class Srs28181StreamCore +{ +private: + std::string output; + std::string output_template; + std::string target_tcUrl;//rtsp_tcUrl; + std::string stream_name;//rtsp_stream; + SrsPithyPrint* pprint; +private: + std::string session; + // video stream. + int video_id; + std::string video_codec; + SrsRtpConn* video_rtp; + // audio stream. + int audio_id; + std::string audio_codec; + int audio_sample_rate; + int audio_channel; + SrsRtpConn* audio_rtp; +private: + //srs_netfd_t stfd; + //SrsStSocket* skt; + ////SrsRtspStack* rtsp; + ////SrsRtspCaster* caster; + //Srs28181TcpStreamListener* listener; + //SrsCoroutine* trd; +private: + ////SrsRequest* req; + SrsSimpleRtmpClient* sdk; + SrsRtspJitter* vjitter; + SrsRtspJitter* ajitter; +private: + SrsRawH264Stream* avc; + std::string h264_sps; + std::string h264_pps; + bool h264_sps_changed; + bool h264_pps_changed; + bool h264_sps_pps_sent; +private: + SrsRawAacStream* aac; + SrsRawAacStreamCodec* acodec; + std::string aac_specific_config; + SrsRtspAudioCache* acache; +private: + // this param group using on rtp packet decode + + int stream_id; + + SrsRtpPacket* cache_; + + // the timestamp of a rtp group + uint32_t group_timestamp; + // if timestamp boundary flag enabled + // true says using rtp timestamp decode rtp group + bool first_rtp_tsb_enabled_; + // first rtp with new timestamp in a rtp group + SrsRtpPacket * first_rtp_tsb_; + + // indicates rtp group boundary decode type: marker or timestamp + int boundary_type_; +public: + //Srs28181StreamCore(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o); + Srs28181StreamCore(std::string suuid); + virtual ~Srs28181StreamCore(); + + // used in tcp but not needed in udp +//public: + //virtual srs_error_t init(); +//private: + //virtual srs_error_t do_cycle(); + +public: + // decode rtp using MB boundary + virtual int decode_packet(char* buf, int nb_buf); + // decode rtp using TSB/MB boundary + virtual int decode_packet_v2(char* buf, int nb_buf); +// internal methods +public: + virtual srs_error_t on_stream_packet(SrsRtpPacket* pkt, int stream_id); + virtual srs_error_t on_stream_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); +// Interface ISrsOneCycleThreadHandler +//public: + //virtual srs_error_t cycle(); +private: + virtual srs_error_t on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); + virtual srs_error_t on_rtp_audio(SrsRtpPacket* pkt, int64_t dts); + virtual srs_error_t kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts); +private: + virtual srs_error_t write_sequence_header(); + virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); + virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); + virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); + virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); +private: + // Connect to RTMP server. + virtual srs_error_t connect(); + // Close the connection to RTMP server. + virtual void close(); +}; + +// The 28181 connection serve the fd. +// copy from SrsRtstConn +class Srs28181TcpStreamConn : public ISrsCoroutineHandler +{ +private: + std::string output; + std::string output_template; + std::string target_tcUrl;//rtsp_tcUrl; + std::string stream_name;//rtsp_stream; + SrsPithyPrint* pprint; +private: + std::string session; + // video stream. + int video_id; + std::string video_codec; + SrsRtpConn* video_rtp; + // audio stream. + int audio_id; + std::string audio_codec; + int audio_sample_rate; + int audio_channel; + SrsRtpConn* audio_rtp; +private: + srs_netfd_t stfd; + SrsStSocket* skt; + //SrsRtspStack* rtsp; + //SrsRtspCaster* caster; + Srs28181TcpStreamListener* listener; + SrsCoroutine* trd; +private: + //SrsRequest* req; + SrsSimpleRtmpClient* sdk; + SrsRtspJitter* vjitter; + SrsRtspJitter* ajitter; +private: + SrsRawH264Stream* avc; + std::string h264_sps; + std::string h264_pps; + bool h264_sps_changed; + bool h264_pps_changed; + bool h264_sps_pps_sent; +private: + SrsRawAacStream* aac; + SrsRawAacStreamCodec* acodec; + std::string aac_specific_config; + SrsRtspAudioCache* acache; +private: + // this param group using on rtp packet decode + + int stream_id; + + SrsRtpPacket* cache_; + + // the timestamp of a rtp group + uint32_t group_timestamp; + // if timestamp boundary flag enabled + // true says using rtp timestamp decode rtp group + bool first_rtp_tsb_enabled_; + // first rtp with new timestamp in a rtp group + SrsRtpPacket * first_rtp_tsb_; + + // indicates rtp group boundary decode type: marker or timestamp + int boundary_type_; +public: + Srs28181TcpStreamConn(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o); + virtual ~Srs28181TcpStreamConn(); +public: + virtual srs_error_t init(); +private: + virtual srs_error_t do_cycle(); +private: + // decode rtp using MB boundary + virtual int decode_packet(char* buf, int nb_buf); + // decode rtp using TSB/MB boundary + virtual int decode_packet_v2(char* buf, int nb_buf); +// internal methods +public: + virtual srs_error_t on_rtp_packet(SrsRtpPacket* pkt, int stream_id); + virtual srs_error_t on_rtp_video_adv(SrsRtpPacket* pkt, int64_t dts, int64_t pts); +// Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); +private: + virtual srs_error_t on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); + virtual srs_error_t on_rtp_audio(SrsRtpPacket* pkt, int64_t dts); + virtual srs_error_t kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts); +private: + virtual srs_error_t write_sequence_header(); + virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); + virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); + virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); + virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); +private: + // Connect to RTMP server. + virtual srs_error_t connect(); + // Close the connection to RTMP server. + virtual void close(); +}; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c35c27a423..38cefb2fe6 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -108,6 +108,8 @@ std::string srs_listener_type2string(SrsListenerType type) return "RTSP"; case SrsListenerFlv: return "HTTP-FLV"; + case SrsListener28181TcpStream: + return "GB28181-Stream over TCP"; default: return "UNKONWN"; } @@ -179,9 +181,6 @@ SrsRtspListener::SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirec srs_assert(type == SrsListenerRtsp); if (type == SrsListenerRtsp) { caster = new SrsRtspCaster(c); - - // TODO: FIXME: Must check error. - caster->initialize(); } } @@ -283,6 +282,60 @@ srs_error_t SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd) return err; } +SrsTcpStreamListener::SrsTcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) +{ + listener = NULL; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(type == SrsListener28181TcpStream); + if (type == SrsListener28181TcpStream) { + //caster = new SrsGB28181TcpStreamCaster(c); + } +} + +SrsTcpStreamListener::~SrsTcpStreamListener() +{ + srs_freep(caster); + srs_freep(listener); +} + +srs_error_t SrsTcpStreamListener::listen(string i, int p) +{ + srs_error_t err = srs_success; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + //srs_assert(type == SrsListenerRtsp); + + ip = i; + port = p; + + srs_freep(listener); + listener = new SrsTcpListener(this, ip, port); + + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "tcp stream listen %s:%d", ip.c_str(), port); + } + + string v = srs_listener_type2string(type); + srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); + + return err; +} + +srs_error_t SrsTcpStreamListener::on_tcp_client(srs_netfd_t stfd) +{ + srs_error_t err = caster->on_tcp_client(stfd); + if (err != srs_success) { + srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + + return srs_success; +} + + SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t) { listener = NULL; @@ -503,6 +556,7 @@ void SrsServer::destroy() dispose(); + srs_freep(srs_28181_streams); srs_freep(http_api_mux); srs_freep(http_server); srs_freep(http_heartbeat); @@ -802,6 +856,10 @@ srs_error_t SrsServer::http_handle() if ((err = http_api_mux->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) { return srs_error_wrap(err, "handle raw"); } + + if ((err = http_api_mux->handle("/api/v1/srs28181stream-request-listener", new SrsGoApi28181StreamCreation(this))) != srs_success) { + return srs_error_wrap(err, "handle srs28181stream-request-listener"); + } // test the request info. if ((err = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != srs_success) { @@ -1187,10 +1245,22 @@ srs_error_t SrsServer::listen_stream_caster() return srs_error_wrap(err, "listen at %d", port); } } + + srs_28181_streams = new Srs28181StreamServer(); + //srs_28181_streams->create_listener(Listener_UDP); return err; } +srs_error_t SrsServer::create_28181stream_listener(SrsListenerType type, int& port, std::string& suuid) +{ + if(srs_28181_streams==NULL){ + return srs_error_new(13025,"srs 28181 stream server is null!"); + } + return srs_28181_streams->create_listener(type, port,suuid); + srs_trace("srsserver - create a new 28181 stream listener[port:%d]",port); +} + void SrsServer::close_listeners(SrsListenerType type) { std::vector::iterator it; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 87407bb9f8..612a2c0a9a 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -50,8 +50,8 @@ class ISrsUdpHandler; class SrsUdpListener; class SrsTcpListener; class SrsAppCasterFlv; -class SrsRtspCaster; class SrsCoroutineManager; +class Srs28181StreamServer; // The listener type for server to identify the connection, // that is, use different type to process the connection. @@ -69,6 +69,10 @@ enum SrsListenerType SrsListenerRtsp = 4, // TCP stream, FLV stream over HTTP. SrsListenerFlv = 5, + // TCP stream, gb28181 stream over tcp + SrsListener28181TcpStream = 6, + // 28181 udp stream linstener + SrsListener28181UdpStream = 7, }; // A common tcp listener, for RTMP/HTTP server. @@ -103,12 +107,48 @@ class SrsBufferListener : virtual public SrsListener, virtual public ISrsTcpHand virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; +/* +// a server for gb28181 stream. +class Srs28181StreamServer : public ISrsTcpHandler +{ +private: + std::string output; + int local_port_min; + int local_port_max; + // The key: port, value: whether used. + std::map used_ports; +private: + //std::vector clients; + std::vector listeners; +public: + Srs28181StreamServer(SrsConfDirective* c); + virtual ~SrsRtspCaster(); +public: + // create a stream listener + virtual srs_error_t create_listener(); + // release a listener + virtual srs_error_t release_listener(); + // Alloc a rtp port from local ports pool. + // @param pport output the rtp port. + virtual srs_error_t alloc_port(int* pport); + // Free the alloced rtp port. + virtual void free_port(int lpmin, int lpmax); +// Interface ISrsTcpHandler +public: + //virtual srs_error_t on_tcp_client(srs_netfd_t stfd); +// internal methods. +public: + virtual void remove(SrsRtspConn* conn); +}; +*/ + + // A TCP listener, for rtsp server. class SrsRtspListener : virtual public SrsListener, virtual public ISrsTcpHandler { private: SrsTcpListener* listener; - SrsRtspCaster* caster; + ISrsTcpHandler* caster; public: SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); virtual ~SrsRtspListener(); @@ -135,6 +175,22 @@ class SrsHttpFlvListener : virtual public SrsListener, virtual public ISrsTcpHan virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; +// A tcp listener, for an extend tcp stream +class SrsTcpStreamListener : virtual public SrsListener, virtual public ISrsTcpHandler +{ +private: + SrsTcpListener* listener; + ISrsTcpHandler* caster; +public: + SrsTcpStreamListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); + virtual ~SrsTcpStreamListener(); +public: + virtual srs_error_t listen(std::string i, int p); +// Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); +}; + // A UDP listener, for udp server. class SrsUdpStreamListener : public SrsListener { @@ -210,6 +266,9 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; SrsCoroutineManager* conn_manager; +private: + // for testing + Srs28181StreamServer* srs_28181_streams; private: // The pid file fd, lock the file write when server is running. // @remark the init.d script should cleanup the pid file, when stop service, @@ -290,6 +349,9 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan virtual void close_listeners(SrsListenerType type); // Resample the server kbs. virtual void resample_kbps(); +public: + // create a 28181 stream listener + virtual srs_error_t create_28181stream_listener(SrsListenerType type, int& port, std::string& suuid); // For internal only public: // When listener got a fd, notice server to accept it. diff --git a/trunk/src/kernel/srs_kernel_buffer.cpp b/trunk/src/kernel/srs_kernel_buffer.cpp index e6fcfe1fa8..b2fb5433dc 100644 --- a/trunk/src/kernel/srs_kernel_buffer.cpp +++ b/trunk/src/kernel/srs_kernel_buffer.cpp @@ -100,6 +100,32 @@ void SrsBuffer::skip(int size) p += size; } +// make sure we have enough size in data buf before skipping +bool SrsBuffer::skip_x(int size) +{ + srs_assert(p); + + if (require(size)) { + p += size; + return true; + } + else { + return false; + } +} + +bool SrsBuffer::chk_bytes(char * data, int size) +{ + if (require(size)) { + memcpy(data, p, size); + return true; + } + else { + return false; + } +} + + int8_t SrsBuffer::read_1bytes() { srs_assert(require(1)); @@ -185,6 +211,19 @@ void SrsBuffer::read_bytes(char* data, int size) p += size; } +bool SrsBuffer::read_bytes_x(char* data, int size) +{ + if (require(size)) { + memcpy(data, p, size); + + p += size; + return true; + } + else { + return false; + } +} + void SrsBuffer::write_1bytes(int8_t value) { srs_assert(require(1)); diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index 3fdb7e74b5..a480e6c6e9 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -137,6 +137,15 @@ class SrsBuffer * @remark assert initialized, the data() not NULL. */ virtual void skip(int size); + + // will check if it has enough bytes to skip + virtual bool skip_x(int size); + + /* + * beikesong: check bytes number but not move pos pointer + * user should use skip() to move pointer + */ + virtual bool chk_bytes(char * data, int size); public: /** * get 1bytes char from stream. @@ -166,6 +175,9 @@ class SrsBuffer * get bytes from stream, length specifies by param len. */ virtual void read_bytes(char* data, int size); + + // check if it has enough bytes before reading + virtual bool read_bytes_x(char* data, int size); public: /** * write 1bytes char to stream. diff --git a/trunk/src/kernel/srs_kernel_stream.cpp b/trunk/src/kernel/srs_kernel_stream.cpp old mode 100755 new mode 100644 index 3a32bd159f..0d95077b36 --- a/trunk/src/kernel/srs_kernel_stream.cpp +++ b/trunk/src/kernel/srs_kernel_stream.cpp @@ -73,3 +73,114 @@ void SrsSimpleStream::append(SrsSimpleStream* src) { append(src->bytes(), src->length()); } + + + +/* +* beikesong: SrsSimpleBufferX +* +*/ +SrsSimpleBufferX::SrsSimpleBufferX() +{ + oft = 0; +} + +SrsSimpleBufferX::~SrsSimpleBufferX() +{ +} + +bool SrsSimpleBufferX::require(int require) +{ + int len = length(); + + return require <= len - oft; +} + +char * SrsSimpleBufferX::curat() +{ + return (length() == 0) ? NULL : &data.at(oft); +} + +int SrsSimpleBufferX::cursize() +{ + int len = length(); + return (len < oft) ? 0 : (len - oft); +} + +int SrsSimpleBufferX::getoft() +{ + return oft; +} + +bool SrsSimpleBufferX::skip_x(int size) +{ + if (require(size)) { + oft += size; + return true; + } + else { + return false; + } +} + +bool SrsSimpleBufferX::chk_bytes(char * cb, int size) +{ + if (require(size)){ + memcpy(cb, &data.at(oft), size); + return true; + } + else { + return false; + } +} + +bool SrsSimpleBufferX::read_bytes_x(char * cb, int size) +{ + if (require(size)) { + memcpy(cb, &data.at(oft), size); + oft += size; + return true; + } + else { + return false; + } +} + +void SrsSimpleBufferX::resetoft() +{ + oft = 0; +} + +int SrsSimpleBufferX::length() +{ + int len = (int)data.size(); + srs_assert(len >= 0); + return len; +} + +char* SrsSimpleBufferX::bytes() +{ + return (length() == 0) ? NULL : &data.at(0); +} + +void SrsSimpleBufferX::erase(int size) +{ + if (size <= 0) { + return; + } + + if (size >= length()) { + data.clear(); + return; + } + + data.erase(data.begin(), data.begin() + size); +} + +void SrsSimpleBufferX::append(const char* bytes, int size) +{ + srs_assert(size > 0); + + data.insert(data.end(), bytes, bytes + size); +} + diff --git a/trunk/src/kernel/srs_kernel_stream.hpp b/trunk/src/kernel/srs_kernel_stream.hpp index eb786559e2..71459e7471 100644 --- a/trunk/src/kernel/srs_kernel_stream.hpp +++ b/trunk/src/kernel/srs_kernel_stream.hpp @@ -66,4 +66,78 @@ class SrsSimpleStream virtual void append(SrsSimpleStream* src); }; + +/** +* a simple buffer based on vector +*/ +class SrsSimpleBufferX +{ +private: + std::vector data; + + // current offset in bytes from data.at(0) (data beginning) + u_int32_t oft; +public: + SrsSimpleBufferX(); + virtual ~SrsSimpleBufferX(); +public: + /** + * get the length of buffer. empty if zero. + * @remark assert length() is not negative. + */ + virtual int length(); + /** + * get the buffer bytes. + * @return the bytes, NULL if empty. + */ + virtual char* bytes(); + /** + * erase size of bytes from begin. + * @param size to erase size of bytes. + * clear if size greater than or equals to length() + * @remark ignore size is not positive. + */ + virtual void erase(int size); + /** + * append specified bytes to buffer. + * @param size the size of bytes + * @remark assert size is positive. + */ + virtual void append(const char* bytes, int size); + + // resocman: exhance this class by adding thoes functions + /** + * tell current position return char * p=data.at(oft) + */ + virtual char* curat(); + /* + * current size from oft to end: length()-oft + */ + virtual int cursize(); + /** + * get current oft value + */ + virtual int getoft(); + /** + * check if we have enough size in vector + */ + virtual bool require(int size); + /** + * move size bytes from cur position + */ + virtual bool skip_x(int size); + /** + * chek size bytes value, but dont move cur position + */ + virtual bool chk_bytes(char * cb, int size); + /** + * read size bytes and move cur positon + */ + virtual bool read_bytes_x(char * cb, int size); + /** + * reset cur position to the beginning of vector + */ + virtual void resetoft(); +}; + #endif diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp index c1e828e446..65dd187a8b 100644 --- a/trunk/src/protocol/srs_rtsp_stack.cpp +++ b/trunk/src/protocol/srs_rtsp_stack.cpp @@ -26,6 +26,7 @@ #if !defined(SRS_EXPORT_LIBRTMP) #include +#include #include using namespace std; @@ -39,6 +40,7 @@ using namespace std; #include #include + #define SRS_RTSP_BUFFER 4096 // get the status text of code. @@ -133,7 +135,8 @@ SrsRtpPacket::SrsRtpPacket() timestamp = 0; ssrc = 0; - payload = new SrsSimpleStream(); + payload = new SrsSimpleBufferX(); + tgtstream = new SrsSimpleBufferX(); audio = new SrsAudioFrame(); chunked = false; completed = false; @@ -142,6 +145,7 @@ SrsRtpPacket::SrsRtpPacket() SrsRtpPacket::~SrsRtpPacket() { srs_freep(payload); + srs_freep(tgtstream); srs_freep(audio); } @@ -177,6 +181,48 @@ void SrsRtpPacket::reap(SrsRtpPacket* src) src->audio = NULL; } +void SrsRtpPacket::copy_v2(SrsRtpPacket* src) +{ + version = src->version; + padding = src->padding; + extension = src->extension; + csrc_count = src->csrc_count; + marker = src->marker; + payload_type = src->payload_type; + sequence_number = src->sequence_number; + timestamp = src->timestamp; + ssrc = src->ssrc; + + chunked = src->chunked; + completed = src->completed; + + //add code. only h264 format and chunked and last chank is completed + if (payload_type == 96 && chunked && completed) { + + srs_freep(audio); + audio = src->audio; + src->audio = NULL; + //srs_freep(audio_samples); + //audio_samples = src->audio_samples; + //src->audio_samples = NULL; + } + //srs_freep(audio); + //audio = new SrsAudioFrame(); +} + +void SrsRtpPacket::reap_v2(SrsRtpPacket* src) +{ + copy_v2(src); + + srs_freep(payload); + payload = src->payload; + src->payload = NULL; + + srs_freep(audio); + audio = src->audio; + src->audio = NULL; +} + srs_error_t SrsRtpPacket::decode(SrsBuffer* stream) { srs_error_t err = srs_success; @@ -316,6 +362,404 @@ srs_error_t SrsRtpPacket::decode_96(SrsBuffer* stream) return err; } + +// beisong: decode gb stream ps96/mpeg97/h26498/svac99 +int SrsRtpPacket::decode_v2(SrsBuffer* stream, int & boundary_type) +{ + int ret = 0; + + // 12bytes header + if (!stream->require(12)) { + ret = ERROR_RTP_HEADER_CORRUPT; + srs_error("rtp header corrupt. ret=%d", ret); + return ret; + } + + int8_t vv = stream->read_1bytes(); + version = (vv >> 6) & 0x03; + padding = (vv >> 5) & 0x01; + extension = (vv >> 4) & 0x01; + csrc_count = vv & 0x0f; + + int8_t mv = stream->read_1bytes(); + marker = (mv >> 7) & 0x01; + payload_type = mv & 0x7f; + + sequence_number = stream->read_2bytes(); + timestamp = stream->read_4bytes(); + ssrc = stream->read_4bytes(); + + // TODO: FIXME: check sequence number. + /* + if (marker && !marker_boundary) { + marker_boundary = true; + srs_warn("rtp decode: SWITCH to MKR decoder! Default is TSB"); + }*/ + + // video codec. + if (payload_type == 96){ + + return decode_96ps_rtp_tsb2(stream); + + if (boundary_type==MarkerBoundary) { + // use marker to decode + return decode_96ps_rtp(stream, marker); + } + else if(boundary_type==TimestampBoundary) { + // use timestamp to decode + return decode_96ps_rtp_tsb2(stream); + } + else {} + } + else + { + srs_error("rtp type is not 96 ps . ret=%d", ret); + } + + return ret; +} + + +int SrsRtpPacket::decode_v2(SrsBuffer* stream) +{ + int ret = ERROR_SUCCESS; + + // 12bytes header + if (!stream->require(12)) { + ret = ERROR_RTP_HEADER_CORRUPT; + srs_error("rtp header corrupt. ret=%d", ret); + return ret; + } + + int8_t vv = stream->read_1bytes(); + version = (vv >> 6) & 0x03; + padding = (vv >> 5) & 0x01; + extension = (vv >> 4) & 0x01; + csrc_count = vv & 0x0f; + + int8_t mv = stream->read_1bytes(); + marker = (mv >> 7) & 0x01; + payload_type = mv & 0x7f; + + sequence_number = stream->read_2bytes(); + timestamp = stream->read_4bytes(); + ssrc = stream->read_4bytes(); + + // TODO: FIXME: check sequence number. + + // video codec. + if (payload_type == 96) { + + // use marker to decode + return decode_96ps_rtp(stream, marker); + } + else{ + srs_error("rtp type is not 96 ps . ret=%d", ret); + } + + return ret; +} + + + +int SrsRtpPacket::decode_v3(SrsBuffer* stream) +{ + int ret = ERROR_SUCCESS; + + // 12bytes header + if (!stream->require(12)) { + ret = ERROR_RTP_HEADER_CORRUPT; + srs_error("rtp header corrupt. ret=%d", ret); + return ret; + } + + int8_t vv = stream->read_1bytes(); + version = (vv >> 6) & 0x03; + padding = (vv >> 5) & 0x01; + extension = (vv >> 4) & 0x01; + csrc_count = vv & 0x0f; + + int8_t mv = stream->read_1bytes(); + marker = (mv >> 7) & 0x01; + payload_type = mv & 0x7f; + + sequence_number = stream->read_2bytes(); + timestamp = stream->read_4bytes(); + ssrc = stream->read_4bytes(); + + // TODO: FIXME: check sequence number. + if (payload_type == 96) { + // use marker to decode + return decode_raw_rtp(stream, marker); + } + else{ + srs_error("rtp type is not 96 ps . ret=%d", ret); + } + + return ret; +} + + + +int SrsRtpPacket::decode_stream() +{ + int ret = 0; + //ret= decode_96ps_pkg(); + // only decode pesv+pesa + { + //real time + ret = decode_96ps_core(); + } + + payload->resetoft(); + + return ret; +} + +int SrsRtpPacket::decode_raw_rtp(SrsBuffer* stream, int8_t marker) +{ + int ret = 0; + + // atleast 2bytes content. + if (!stream->require(0)) { + ret = ERROR_RTP_TYPE96_CORRUPT; + srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); + return ret; + } + + //Basson: marker is used to indicate ps packet boundery + //so ps packet have 2 rtp packets at Least! one is begin as 0x8060, and one is 0x80e0 for end + if(marker == 0){ + chunked = true; + completed = false; + } + else { + // always chunked in ps + // considering compatibility for other streams, we set chunked as true always + chunked = true; + completed = true; + } + payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); + //tgtstream->append(stream->data() + stream->pos(), stream->size() - stream->pos()); + + return ret; +} + +int SrsRtpPacket::decode_96ps_rtp(SrsBuffer* stream, int8_t marker) +{ + int ret = 0; + + // atleast 2bytes content. + if (!stream->require(0)) { + ret = ERROR_RTP_TYPE96_CORRUPT; + srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); + return ret; + } + + //beisong: marker is used to indicate ps packet boundery + //so ps packet have 2 rtp packets at Least! one is begin as 0x8060, and one is 0x80e0 for end + if(marker == 0){ + chunked = true; + completed = false; + } + else { + // always chunked in ps + // considering compatibility for other streams, we set chunked as true always + chunked = true; + completed = true; + } + payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); + + return ret; +} + +int SrsRtpPacket::decode_96ps_rtp_tsb2(SrsBuffer* stream) +{ + int ret = 0; + + // atleast 2bytes content. + if (!stream->require(1)) { + ret = ERROR_RTP_TYPE96_CORRUPT; + srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); + return ret; + } + + // suitable for no standard ps stream + // only use rtp timestamp to find rtp group boundary + // always chunked in ps + // considering compatibility for other streams, we set chunked as true always + chunked = true; + //completed = true; + + payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); + + return ret; +} + + +int SrsRtpPacket::decode_96ps_rtp_tsb(SrsBuffer* stream, u_int32_t & group_ts) +{ + int ret = ERROR_SUCCESS; + + // atleast 2bytes content. + if (!stream->require(1)) { + ret = ERROR_RTP_TYPE96_CORRUPT; + srs_error("rtsp: rtp type 96 ps corrupt. ret=%d", ret); + return ret; + } + + // set current timestamp at the very beginning + if (group_ts == 0) { + group_ts = timestamp; + } + + // suitable for no standard ps stream + // only use rtp timestamp to find rtp group boundary + // always chunked in ps + // considering compatibility for other streams, we set chunked as true always + if (timestamp != group_ts) { + // + group_ts = timestamp; + start_new_timestamp = true; + + chunked = true; + //completed = true; + } + else { + // continuing reciving rtp + start_new_timestamp = false; + + chunked = true; + //completed = false; + } + + payload->append(stream->data() + stream->pos(), stream->size() - stream->pos()); + + return ret; +} + +int SrsRtpPacket::get_decodertype() +{ + if (marker_boundary) { + return MarkerBoundary; + } + else { + return TimestampBoundary; + } +} + +bool SrsRtpPacket::newtimestamp() +{ + return start_new_timestamp; +} + +#define PH_PSC 0x01ba +#define SYS_HEAD_PSC 0x01bb +#define PS_MAP_PSC 0x01bc +#define PES_V_PSC 0x01e0 +#define PES_A_PSC 0x01c0 +#define HK_PRIVATE_PSC 0x01bd +int SrsRtpPacket::decode_96ps_core() +{ + int ret = 0; + + bool a, b, c; + Packet_Start_Code psc; + int psc_len = sizeof(Packet_Start_Code); + PS_Packet_Header ps_ph; + int ph_len = sizeof(PS_Packet_Header); + PS_Sys_Header sys_header; + int sys_header_len = sizeof(PS_Sys_Header); + PS_Map ps_map; + int psm_len = sizeof(PS_Map); + PS_PES pes; + int pes_len = sizeof(PS_PES); + + //pesv1...n+pesa1...n + int p_skip_0 = 0; + int p_skip_1 = 0; + psc.start_code = 0; + while (payload->chk_bytes((char*)&psc, sizeof(Packet_Start_Code))) { + psc.start_code = htonl(psc.start_code); + if (psc.start_code == PES_V_PSC) { + psc.start_code = 0; + a = payload->read_bytes_x((char*)&pes, sizeof(PS_PES)); + b = payload->skip_x(pes.pes_header_data_length); + // pengzhang: see my note + pes.pes_packet_length = htons(pes.pes_packet_length); + u_int32_t load_len = pes.pes_packet_length - pes.pes_header_data_length - 3; + c = payload->require(load_len); + if (!a || !b) { + ret = ERROR_RTP_PS_CORRUPT; + srs_error(" core- rtp type 96 ps Currepted. size not enough at 4. ret=%d", ret); + return ret; + } + + if (!c) { + // may loss some packets, copy the last buffer + srs_warn("core- rtp type 96 ps Loss some packet pesVV len:%d, cursize:%d", load_len, payload->cursize()); + load_len = payload->cursize(); + + if (load_len <= 0) { + srs_warn("core- rtp type 96 pesVV len <=0 cursize:%d, oft:%d, payload len:%d, tgt len:%d will return!", + payload->cursize(), payload->getoft(), payload->length(), tgtstream->length()); + return ret; + } + } + + tgtstream->append(payload->curat(), load_len); + payload->skip_x(load_len); + + } + else if (psc.start_code == PES_A_PSC) { + //audio:do nothing + psc.start_code = 0; + a = payload->read_bytes_x((char*)&pes, sizeof(PS_PES)); + b = payload->skip_x(pes.pes_header_data_length); + //see my note + pes.pes_packet_length = htons(pes.pes_packet_length); + u_int32_t load_len = pes.pes_packet_length - pes.pes_header_data_length - 3; + c = payload->require(load_len); + if (!a || !b) { + ret = ERROR_RTP_PS_CORRUPT; + srs_error(" core- rtp type 96 ps Currepted. size not enough at 5. ret=%d", ret); + return ret; + } + // here len may not enough as packet Loss, but still can work well as we call require(x) in skip_x + // i will not modify this, as this is a very stable strategy + if (!c) { + // may loss some packets, copy the last buffer + srs_warn("core- rtp type 96 ps Loss some packet pesAAA len:%d, cursize:%d", load_len, payload->cursize()); + load_len = payload->cursize(); + + if (load_len <= 0) { + srs_warn("core- rtp type 96 pesAAA len <=0 cursize:%d, oft:%d, payload len:%d, tgt len:%d will return!", + payload->cursize(), payload->getoft(), payload->length(), tgtstream->length()); + return ret; + } + } + + payload->skip_x(load_len); + } + else { + // specially for history stream, as it has a bug + tgtstream->append(payload->curat(), 1); + payload->skip_x(1); + //p_skip_0++; + if (p_skip_0 <= 2) { + //srs_error(" core- rtp type 96 ps currept. miss some pes in I-Frame"); + } + + //ret = ERROR_RTP_PS_CORRUPT; + //srs_error(" rtp type 96 ps currept. miss some pes in I-Frame ret=%d", ret); + //return ret; + } + } //while pesn + + return ret; +} + + SrsRtspSdp::SrsRtspSdp() { state = SrsRtspSdpStateOthers; diff --git a/trunk/src/protocol/srs_rtsp_stack.hpp b/trunk/src/protocol/srs_rtsp_stack.hpp index 87c07ce555..9643a2c853 100644 --- a/trunk/src/protocol/srs_rtsp_stack.hpp +++ b/trunk/src/protocol/srs_rtsp_stack.hpp @@ -35,6 +35,7 @@ class SrsBuffer; class SrsSimpleStream; +class SrsSimpleBufferX; class SrsAudioFrame; class ISrsProtocolReadWriter; @@ -126,6 +127,47 @@ enum SrsRtspTokenState SrsRtspTokenStateEOF = 102, }; +enum SrsRtpDecoderType +{ + TimestampBoundary = 1, + MarkerBoundary = 2, +}; + +#define ERROR_RTP_PS_CORRUPT 12060 +#define ERROR_RTP_PS_HK_PRIVATE_PROTO 12061 +#define ERROR_RTP_PS_FIRST_TSB_LOSS 12062 + +#pragma pack (1) +struct Packet_Start_Code { + u_int32_t start_code; //4bytes,need htonl exchange + //u_int8_t start_code[3]; + //u_int8_t stream_id[1]; +}; + +struct PS_Packet_Header { + Packet_Start_Code psc; //4bytes + u_int8_t holder[9]; + u_int8_t pack_stuffing_length; //low 3bit,high 5 bits are reserved; +}; + +struct PS_Sys_Header { + Packet_Start_Code psc; + u_int16_t header_length; +}; + +struct PS_Map { + Packet_Start_Code psc; + u_int16_t psm_length; //2 bytes need htons exchange +}; + +struct PS_PES { + Packet_Start_Code psc; //4bytes + u_int16_t pes_packet_length; //2 bytes need htonl exchange + u_int8_t holder[2]; + u_int8_t pes_header_data_length; +}; +#pragma pack () + // The rtp packet. // 5. RTP Data Transfer Protocol, @see rfc3550-2003-rtp.pdf, page 12 class SrsRtpPacket @@ -244,29 +286,73 @@ class SrsRtpPacket uint32_t ssrc; //32bits // The payload. - SrsSimpleStream* payload; + SrsSimpleBufferX* payload; + + // Beikesong: target tgt h264 stream load or other types + SrsSimpleBufferX* tgtstream; + // Whether transport in chunked payload. bool chunked; // Whether message is completed. // normal message always completed. // while chunked completed when the last chunk arriaved. bool completed; + + // whether some private data in stream + bool private_proto; // The audio samples, one rtp packets may contains multiple audio samples. SrsAudioFrame* audio; + + private: + // pengzhang: indicate current rtp group timestamp + //u_int32_t group_ts; + bool start_new_timestamp; + +public: + // true says rtp decode is marker, otherwise is timestamp + bool marker_boundary; + // get rtp decode type: TimestampBoundary or MarkerBoundary + virtual int get_decodertype(); + + virtual bool newtimestamp(); + public: SrsRtpPacket(); virtual ~SrsRtpPacket(); public: // copy the header from src. virtual void copy(SrsRtpPacket* src); + // + virtual void copy_v2(SrsRtpPacket* src); // reap the src to this packet, reap the payload. virtual void reap(SrsRtpPacket* src); + // + virtual void reap_v2(SrsRtpPacket* src); // decode rtp packet from stream. virtual srs_error_t decode(SrsBuffer* stream); + + virtual int decode_v2(SrsBuffer* stream, int & boundary_type); + virtual int decode_v2(SrsBuffer* stream); + // not check payload type + virtual int decode_v3(SrsBuffer* stream); + virtual int decode_stream(); private: virtual srs_error_t decode_97(SrsBuffer* stream); virtual srs_error_t decode_96(SrsBuffer* stream); + +private: + virtual int decode_96ps_rtp(SrsBuffer* stream, int8_t marker); + + // decode rtp group by using timestamp as boundary + virtual int decode_96ps_rtp_tsb(SrsBuffer* stream, u_int32_t & group_timestamp); + // decode rtp by using timestamp boundary + // will not compare timestamp in this one + virtual int decode_96ps_rtp_tsb2(SrsBuffer* stream); + // only decode rtp not ps anymore + virtual int decode_raw_rtp(SrsBuffer* stream, int8_t marker); + // only aim on high stabilable + virtual int decode_96ps_core(); }; // The sdp in announce, @see rfc2326-1998-rtsp.pdf, page 159