From c0e50265bd6bdcf47e252a7e6512aa91d37786cc Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 16 Feb 2015 14:05:01 +0800 Subject: [PATCH] for #133, create rtsp framework. --- trunk/conf/full.conf | 6 +- trunk/configure | 2 +- trunk/ide/srs_upp/srs_upp.upp | 2 + trunk/ide/srs_vs2010/srs.vcxproj | 4 + trunk/ide/srs_vs2010/srs.vcxproj.filters | 12 +++ trunk/src/app/srs_app_rtsp.cpp | 114 ++++++++++++++++++++++- trunk/src/app/srs_app_rtsp.hpp | 49 +++++++++- trunk/src/app/srs_app_server.cpp | 4 +- trunk/src/app/srs_app_thread.hpp | 86 ++++++++++++++--- trunk/src/protocol/srs_rtsp_stack.cpp | 40 ++++++++ trunk/src/protocol/srs_rtsp_stack.hpp | 55 +++++++++++ 11 files changed, 348 insertions(+), 26 deletions(-) create mode 100644 trunk/src/protocol/srs_rtsp_stack.cpp create mode 100644 trunk/src/protocol/srs_rtsp_stack.hpp diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index d20284aa6f..31d6def3cd 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -157,8 +157,8 @@ stream_caster { # rtmp://127.0.0.1/live/livestream output rtmp://127.0.0.1/live/livestream; # the listen port for stream caster. - # for mpegts_over_udp caster, listen at udp port. - # for rtsp caster, listen at tcp port. + # for mpegts_over_udp caster, listen at udp port. for example, 8935. + # for rtsp caster, listen at tcp port. for example, 554. listen 8935; } stream_caster { @@ -168,7 +168,7 @@ stream_caster { listen 8935; } stream_caster { - enabled on; + enabled off; caster rtsp; output rtmp://127.0.0.1/[app]/[stream]; listen 554; diff --git a/trunk/configure b/trunk/configure index 41630abb68..b1e79e28fe 100755 --- a/trunk/configure +++ b/trunk/configure @@ -376,7 +376,7 @@ MODULE_DEPENDS=("CORE" "KERNEL") ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_rtmp_amf0" "srs_rtmp_io" "srs_rtmp_stack" "srs_rtmp_sdk" "srs_rtmp_handshake" "srs_rtmp_utility" "srs_rtmp_msg_array" "srs_rtmp_buffer" - "srs_raw_avc") + "srs_raw_avc" "srs_rtsp_stack") RTMP_INCS="src/protocol"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh RTMP_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index d617bc52dc..245dd7fbd0 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -36,6 +36,8 @@ file ../../src/kernel/srs_kernel_log.cpp, ../../src/kernel/srs_kernel_mp3.hpp, ../../src/kernel/srs_kernel_mp3.cpp, + ../../src/kernel/srs_rtsp_stack.hpp, + ../../src/kernel/srs_rtsp_stack.cpp, ../../src/kernel/srs_kernel_stream.hpp, ../../src/kernel/srs_kernel_stream.cpp, ../../src/kernel/srs_kernel_ts.cpp, diff --git a/trunk/ide/srs_vs2010/srs.vcxproj b/trunk/ide/srs_vs2010/srs.vcxproj index 0c0afb3018..3013b0eaeb 100755 --- a/trunk/ide/srs_vs2010/srs.vcxproj +++ b/trunk/ide/srs_vs2010/srs.vcxproj @@ -89,6 +89,7 @@ + @@ -125,6 +126,7 @@ + @@ -166,6 +168,7 @@ + @@ -203,6 +206,7 @@ + diff --git a/trunk/ide/srs_vs2010/srs.vcxproj.filters b/trunk/ide/srs_vs2010/srs.vcxproj.filters index a592ec709c..1c1ad77401 100755 --- a/trunk/ide/srs_vs2010/srs.vcxproj.filters +++ b/trunk/ide/srs_vs2010/srs.vcxproj.filters @@ -229,6 +229,12 @@ srs + + srs + + + srs + @@ -420,6 +426,12 @@ srs + + srs + + + srs + diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 56c091bf39..1763363804 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -23,7 +23,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include +using namespace std; + #include +#include +#include +#include +#include +#include #ifdef SRS_AUTO_STREAM_CASTER @@ -35,13 +43,115 @@ ISrsRtspHandler::~ISrsRtspHandler() { } -SrsRtspConn::SrsRtspConn(SrsConfDirective* c) +SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) { - output = _srs_config->get_stream_caster_output(c); + output = o; + caster = c; + stfd = fd; + skt = new SrsStSocket(fd); + rtsp = new SrsRtspStack(skt); + trd = new SrsThread("rtsp", this, 0, false); } SrsRtspConn::~SrsRtspConn() { + srs_close_stfd(stfd); + trd->stop(); + + srs_freep(trd); + srs_freep(skt); + srs_freep(rtsp); +} + +int SrsRtspConn::serve() +{ + return trd->start(); +} + +int SrsRtspConn::do_cycle() +{ + int ret = ERROR_SUCCESS; + + // retrieve ip of client. + std::string ip = srs_get_peer_ip(st_netfd_fileno(stfd)); + srs_trace("rtsp: serve %s", ip.c_str()); + + return ret; +} + +int SrsRtspConn::cycle() +{ + // serve the rtsp client. + int ret = do_cycle(); + + // if socket io error, set to closed. + if (srs_is_client_gracefully_close(ret)) { + ret = ERROR_SOCKET_CLOSED; + } + + // success. + if (ret == ERROR_SUCCESS) { + srs_trace("client finished."); + } + + // client close peer. + if (ret == ERROR_SOCKET_CLOSED) { + srs_warn("client disconnect peer. ret=%d", ret); + } + + // terminate thread in the thread cycle itself. + trd->stop_loop(); + + return ERROR_SUCCESS; +} + +void SrsRtspConn::on_thread_stop() +{ + caster->remove(this); +} + +SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) +{ + // TODO: FIXME: support reload. + output = _srs_config->get_stream_caster_output(c); +} + +SrsRtspCaster::~SrsRtspCaster() +{ + std::vector::iterator it; + for (it = clients.begin(); it != clients.end(); ++it) { + SrsRtspConn* conn = *it; + srs_freep(conn); + } + clients.clear(); +} + +int SrsRtspCaster::serve_client(st_netfd_t stfd) +{ + int ret = ERROR_SUCCESS; + + SrsRtspConn* conn = new SrsRtspConn(this, stfd, output); + if ((ret = conn->serve()) != ERROR_SUCCESS) { + srs_error("rtsp: serve client failed. ret=%d", ret); + srs_freep(conn); + return ret; + } + + clients.push_back(conn); + srs_info("rtsp: start thread to serve client."); + + return ret; +} + +void SrsRtspCaster::remove(SrsRtspConn* conn) +{ + std::vector::iterator it = find(clients.begin(), clients.end(), conn); + if (it != clients.end()) { + clients.erase(it); + } + srs_info("rtsp: remove connection from caster."); + + srs_freep(conn); } #endif diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index acf7995c8f..0fd461e130 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -31,10 +31,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include + +#include +#include #ifdef SRS_AUTO_STREAM_CASTER class SrsConfDirective; +class SrsStSocket; +class SrsRtspStack; +class SrsRtspCaster; /** * the handler for rtsp handler. @@ -44,18 +51,54 @@ class ISrsRtspHandler public: ISrsRtspHandler(); virtual ~ISrsRtspHandler(); +public: + /** + * serve the rtsp connection. + */ + virtual int serve_client(st_netfd_t stfd) = 0; }; /** -* the connection for rtsp. +* the rtsp connection serve the fd. */ -class SrsRtspConn : public ISrsRtspHandler +class SrsRtspConn : public ISrsThreadHandler { private: std::string output; + st_netfd_t stfd; + SrsStSocket* skt; + SrsRtspStack* rtsp; + SrsRtspCaster* caster; + SrsThread* trd; public: - SrsRtspConn(SrsConfDirective* c); + SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o); virtual ~SrsRtspConn(); +public: + virtual int serve(); +private: + virtual int do_cycle(); +// interface ISrsThreadHandler +public: + virtual int cycle(); + virtual void on_thread_stop(); +}; + +/** +* the caster for rtsp. +*/ +class SrsRtspCaster : public ISrsRtspHandler +{ +private: + std::string output; + std::vector clients; +public: + SrsRtspCaster(SrsConfDirective* c); + virtual ~SrsRtspCaster(); +public: + virtual int serve_client(st_netfd_t stfd); +// internal methods. +public: + virtual void remove(SrsRtspConn* conn); }; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 626f20640c..100fc34421 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -240,7 +240,7 @@ SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsCon // we just assert here for unknown stream caster. srs_assert(_type == SrsListenerRtsp); if (_type == SrsListenerRtsp) { - caster = new SrsRtspConn(c); + caster = new SrsRtspCaster(c); } } @@ -262,7 +262,7 @@ int SrsRtspListener::cycle() } srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); - if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) { + if ((ret = caster->serve_client(client_stfd)) != ERROR_SUCCESS) { srs_warn("accept client error. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index ff35305fa1..1af72a99e1 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -46,22 +46,78 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * which will cause the socket to return error and * terminate the cycle thread. * -* when thread interrupt, the socket maybe not got EINT, -* espectially on st_usleep(), so the cycle must check the loop, -* when handler->cycle() has loop itself, for example: -* while (true): -* if (read_from_socket(skt) < 0) break; -* if thread stop when read_from_socket, it's ok, the loop will break, -* but when thread stop interrupt the s_usleep(0), then the loop is -* death loop. -* in a word, the handler->cycle() must: -* while (pthread->can_loop()): -* if (read_from_socket(skt) < 0) break; -* check the loop, then it works. +* Usage 1: stop by other thread. +* user can create thread and stop then start again and again, +* generally must provides a start and stop method, @see SrsIngester. +* the step to create a thread stop by other thread: +* 1. create SrsThread field, with joinable true. +* 2. must use stop to stop and join the thread. +* for example: +* class SrsIngester : public ISrsThreadHandler { +* public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); } +* public: virtual int start() { return pthread->start(); } +* public: virtual void stop() { pthread->stop(); } +* public: virtual int cycle() { +* // check status, start ffmpeg when stopped. +* } +* }; * -* in the thread itself, that is the cycle method, -* if itself want to terminate the thread, should never use stop(), -* but use stop_loop() to set the loop to false and terminate normally. +* Usage 2: stop by thread itself. +* user can create thread which stop itself, +* generally only need to provides a start method, +* the object will destroy itself then terminate the thread, @see SrsConnection +* 1. create SrsThread field, with joinable false. +* 2. owner stop thread loop, destroy itself when thread stop. +* for example: +* class SrsConnection : public ISrsThreadHandler { +* public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); } +* public: virtual int start() { return pthread->start(); } +* public: virtual int cycle() { +* // serve client. +* // set loop to stop to quit, stop thread itself. +* pthread->stop_loop(); +* } +* public: virtual int on_thread_stop() { +* // remove the connection in thread itself. +* server->remove(this); +* } +* }; +* +* Usage 3: loop in the cycle method. +* user can use loop code in the cycle method, @see SrsForwarder +* 1. create SrsThread field, with or without joinable is ok. +* 2. loop code in cycle method, check the can_loop() for thread to quit. +* for example: +* class SrsForwarder : public ISrsThreadHandler { +* public: virtual int cycle() { +* while (pthread->can_loop()) { +* // read msgs from queue and forward to server. +* } +* } +* }; +* +* @remark why should check can_loop() in cycle method? +* when thread interrupt, the socket maybe not got EINT, +* espectially on st_usleep(), so the cycle must check the loop, +* when handler->cycle() has loop itself, for example: +* while (true): +* if (read_from_socket(skt) < 0) break; +* if thread stop when read_from_socket, it's ok, the loop will break, +* but when thread stop interrupt the s_usleep(0), then the loop is +* death loop. +* in a word, the handler->cycle() must: +* while (pthread->can_loop()): +* if (read_from_socket(skt) < 0) break; +* check the loop, then it works. +* +* @remark why should use stop_loop() to terminate thread in itself? +* in the thread itself, that is the cycle method, +* if itself want to terminate the thread, should never use stop(), +* but use stop_loop() to set the loop to false and terminate normally. +* +* @remark when should set the interval_us, and when not? +* the cycle will invoke util cannot loop, eventhough the return code of cycle is error, +* so the interval_us used to sleep for each cycle. */ class ISrsThreadHandler { diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp new file mode 100644 index 0000000000..220e9a3931 --- /dev/null +++ b/trunk/src/protocol/srs_rtsp_stack.cpp @@ -0,0 +1,40 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include + +#ifdef SRS_AUTO_STREAM_CASTER + +SrsRtspStack::SrsRtspStack(ISrsProtocolReaderWriter* s) +{ + skt = s; +} + +SrsRtspStack::~SrsRtspStack() +{ +} + +#endif + diff --git a/trunk/src/protocol/srs_rtsp_stack.hpp b/trunk/src/protocol/srs_rtsp_stack.hpp new file mode 100644 index 0000000000..a011e7c699 --- /dev/null +++ b/trunk/src/protocol/srs_rtsp_stack.hpp @@ -0,0 +1,55 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_PROTOCOL_RTSP_STACK_HPP +#define SRS_PROTOCOL_RTSP_STACK_HPP + +/* +#include +*/ + +#include + +#ifdef SRS_AUTO_STREAM_CASTER + +class ISrsProtocolReaderWriter; + +/** +* the rtsp protocol stack to parse the rtsp packets. +*/ +class SrsRtspStack +{ +private: + /** + * underlayer socket object, send/recv bytes. + */ + ISrsProtocolReaderWriter* skt; +public: + SrsRtspStack(ISrsProtocolReaderWriter* s); + virtual ~SrsRtspStack(); +}; + +#endif + +#endif +