diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index f7904a041e..0c582db585 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -71,11 +71,18 @@ int SrsAsyncCallWorker::count() return (int)tasks.size(); } -int SrsAsyncCallWorker::start() +srs_error_t SrsAsyncCallWorker::start() { + srs_error_t err = srs_success; + srs_freep(trd); trd = new SrsSTCoroutine("async", this, _srs_context->get_id()); - return trd->start(); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; } void SrsAsyncCallWorker::stop() @@ -84,11 +91,15 @@ void SrsAsyncCallWorker::stop() trd->stop(); } -int SrsAsyncCallWorker::cycle() +srs_error_t SrsAsyncCallWorker::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "async call worker"); + } + if (tasks.empty()) { srs_cond_wait(wait); } @@ -99,6 +110,8 @@ int SrsAsyncCallWorker::cycle() std::vector::iterator it; for (it = copy.begin(); it != copy.end(); ++it) { ISrsAsyncCallTask* task = *it; + + int ret = ERROR_SUCCESS; if ((ret = task->call()) != ERROR_SUCCESS) { srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret); } @@ -106,7 +119,7 @@ int SrsAsyncCallWorker::cycle() } } - return ret; + return err; } diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 14fcce6f1a..12dc200765 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -77,11 +77,11 @@ class SrsAsyncCallWorker : public ISrsCoroutineHandler virtual int execute(ISrsAsyncCallTask* t); virtual int count(); public: - virtual int start(); + virtual srs_error_t start(); virtual void stop(); // interface ISrsReusableThreadHandler public: - virtual int cycle(); + virtual srs_error_t cycle(); }; #endif diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 6fdc18da82..42d464172b 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -73,7 +73,11 @@ int SrsAppCasterFlv::initialize() return ret; } - if ((ret = manager->start()) != ERROR_SUCCESS) { + if ((err = manager->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; } @@ -83,12 +87,17 @@ int SrsAppCasterFlv::initialize() int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip); conns.push_back(conn); - if ((ret = conn->start()) != ERROR_SUCCESS) { + if ((err = conn->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; } diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 0dbb06d72c..8eebdaa601 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -80,42 +80,43 @@ void SrsConnection::dispose() trd->interrupt(); } -int SrsConnection::start() +srs_error_t SrsConnection::start() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((ret = skt->initialize(stfd)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "socket"); } - return trd->start(); + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; } -int SrsConnection::cycle() +srs_error_t SrsConnection::cycle() { - int ret = ERROR_SUCCESS; + int ret = do_cycle(); - int oret = ret = do_cycle(); - - // if socket io error, set to closed. - if (srs_is_client_gracefully_close(ret)) { - ret = ERROR_SOCKET_CLOSED; - } + // Notify manager to remove it. + manager->remove(this); // success. if (ret == ERROR_SUCCESS) { srs_trace("client finished."); + return srs_success; } // client close peer. - if (ret == ERROR_SOCKET_CLOSED) { - srs_warn("client disconnect peer. oret=%d, ret=%d", oret, ret); + // TODO: FIXME: Only reset the error when client closed it. + if (srs_is_client_gracefully_close(ret)) { + srs_warn("client disconnect peer. ret=%d", ret); + return srs_success; } - // Notify manager to remove it. - manager->remove(this); - - return ERROR_SUCCESS; + return srs_error_new(ret, "cycle"); } int SrsConnection::srs_id() diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 9e9c502dd0..7f1ef6f33d 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -99,7 +99,7 @@ class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutin * when client cycle thread stop, invoke the on_thread_stop(), which will use server * to remove the client by server->remove(this). */ - virtual int start(); + virtual srs_error_t start(); // interface ISrsOneCycleThreadHandler public: /** @@ -107,7 +107,7 @@ class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutin * when serve connection completed, terminate the loop which will terminate the thread, * thread will invoke the on_thread_stop() when it terminated. */ - virtual int cycle(); + virtual srs_error_t cycle(); public: /** * get the srs id which identify the client. diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index b6eccab663..1aefe65c7d 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -612,7 +612,6 @@ SrsDvrPlan::~SrsDvrPlan() srs_error_t SrsDvrPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; hub = h; @@ -623,8 +622,8 @@ srs_error_t SrsDvrPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsReque return srs_error_wrap(err, "segmenter"); } - if ((ret = async->start()) != ERROR_SUCCESS) { - return srs_error_new(ret, "async"); + if ((err = async->start()) != srs_success) { + return srs_error_wrap(err, "async"); } return err; diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index fe3dec3c78..a4da33dd88 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -187,18 +187,23 @@ srs_error_t SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest return srs_success; } -int SrsEdgeIngester::start() +srs_error_t SrsEdgeIngester::start() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("edge pull stream then publish to edge failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "notify source"); } srs_freep(trd); trd = new SrsSTCoroutine("edge-igs", this); - return trd->start(); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; } void SrsEdgeIngester::stop() @@ -220,27 +225,36 @@ string SrsEdgeIngester::get_curr_origin() // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_INGESTER_CIMS (3*1000) -int SrsEdgeIngester::cycle() +srs_error_t SrsEdgeIngester::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("EdgeIngester: Ignore error, ret=%d", ret); + while (true) { + if ((err = do_cycle()) != srs_success) { + srs_warn("EdgeIngester: Ignore error, %s", srs_error_desc(err).c_str()); + srs_freep(err); } - if (!trd->pull()) { - srs_usleep(SRS_EDGE_INGESTER_CIMS * 1000); + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "edge ingester"); } + + srs_usleep(SRS_EDGE_INGESTER_CIMS * 1000); } - return ret; + + return err; } -int SrsEdgeIngester::do_cycle() +srs_error_t SrsEdgeIngester::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "do cycle pull"); + } + srs_freep(upstream); upstream = new SrsEdgeRtmpUpstream(redirect); @@ -249,15 +263,15 @@ int SrsEdgeIngester::do_cycle() redirect = ""; if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "on source id changed"); } if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "connect upstream"); } if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "notify edge play"); } ret = ingest(); @@ -275,7 +289,7 @@ int SrsEdgeIngester::do_cycle() break; } - return ret; + return srs_error_new(ret, "cycle"); } int SrsEdgeIngester::ingest() @@ -448,9 +462,10 @@ srs_error_t SrsEdgeForwarder::initialize(SrsSource* s, SrsPublishEdge* e, SrsReq return srs_success; } -int SrsEdgeForwarder::start() +srs_error_t SrsEdgeForwarder::start() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // reset the error code. send_error_code = ERROR_SUCCESS; @@ -480,18 +495,21 @@ int SrsEdgeForwarder::start() sdk = new SrsSimpleRtmpClient(url, cto, sto); if ((ret = sdk->connect()) != ERROR_SUCCESS) { - srs_warn("edge push %s failed, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", url.c_str(), cto, sto, ret); - return ret; + return srs_error_new(ret, "sdk connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto); } if ((ret = sdk->publish()) != ERROR_SUCCESS) { - srs_error("edge push publish failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "sdk publish"); } srs_freep(trd); trd = new SrsSTCoroutine("edge-fwr", this); - return trd->start(); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; } void SrsEdgeForwarder::stop() @@ -504,27 +522,31 @@ void SrsEdgeForwarder::stop() // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_FORWARDER_CIMS (3*1000) -int SrsEdgeForwarder::cycle() +srs_error_t SrsEdgeForwarder::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("EdgeForwarder: Ignore error, ret=%d", ret); + while (true) { + if ((err = do_cycle()) != srs_success) { + return srs_error_wrap(err, "do cycle"); } - if (!trd->pull()) { - srs_usleep(SRS_EDGE_FORWARDER_CIMS * 1000); + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "thread pull"); } + + srs_usleep(SRS_EDGE_FORWARDER_CIMS * 1000); } - return ret; + + return err; } #define SYS_MAX_EDGE_SEND_MSGS 128 -int SrsEdgeForwarder::do_cycle() +srs_error_t SrsEdgeForwarder::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TMMS); @@ -558,8 +580,7 @@ int SrsEdgeForwarder::do_cycle() // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) { - srs_error("get message to push to origin failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "queue dumps packets"); } pprint->elapse(); @@ -577,12 +598,11 @@ int SrsEdgeForwarder::do_cycle() // sendout messages, all messages are freed by send_and_free_messages(). if ((ret = sdk->send_and_free_messages(msgs.msgs, count)) != ERROR_SUCCESS) { - srs_error("edge publish push message to server failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "send messages"); } } - return ret; + return err; } int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) @@ -644,11 +664,16 @@ srs_error_t SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req) int SrsPlayEdge::on_client_play() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // start ingest when init state. if (state == SrsEdgeStateInit) { state = SrsEdgeStatePlay; - return ingester->start(); + err = ingester->start(); + + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); } return ret; @@ -727,6 +752,7 @@ bool SrsPublishEdge::can_publish() int SrsPublishEdge::on_client_publish() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // error when not init state. if (state != SrsEdgeStateInit) { @@ -746,7 +772,11 @@ int SrsPublishEdge::on_client_publish() } // start to forward stream to origin. - ret = forwarder->start(); + err = forwarder->start(); + + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); // @see https://github.com/ossrs/srs/issues/180 // when failed, revert to init diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 1dbf9314cd..9ab486b148 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -130,14 +130,14 @@ class SrsEdgeIngester : public ISrsCoroutineHandler virtual ~SrsEdgeIngester(); public: virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r); - virtual int start(); + virtual srs_error_t start(); virtual void stop(); virtual std::string get_curr_origin(); // interface ISrsReusableThread2Handler public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); private: virtual int ingest(); virtual int process_publish_message(SrsCommonMessage* msg); @@ -173,13 +173,13 @@ class SrsEdgeForwarder : public ISrsCoroutineHandler virtual void set_queue_size(double queue_size); public: virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r); - virtual int start(); + virtual srs_error_t start(); virtual void stop(); // interface ISrsReusableThread2Handler public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); public: virtual int proxy(SrsCommonMessage* msg); }; diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index 96da82bd7b..506f3c640c 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -56,6 +56,7 @@ SrsEncoder::~SrsEncoder() int SrsEncoder::on_publish(SrsRequest* req) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // parse the transcode engines for vhost and app and stream. ret = parse_scope_engines(req); @@ -75,8 +76,11 @@ int SrsEncoder::on_publish(SrsRequest* req) // start thread to run all encoding engines. srs_freep(trd); trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id()); - if ((ret = trd->start()) != ERROR_SUCCESS) { - srs_error("st_thread_create failed. ret=%d", ret); + if ((err = trd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; } @@ -92,18 +96,21 @@ void SrsEncoder::on_unpublish() // when error, encoder sleep for a while and retry. #define SRS_RTMP_ENCODER_CIMS (3000) -int SrsEncoder::cycle() +srs_error_t SrsEncoder::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("Encoder: Ignore error, ret=%d", ret); + while (true) { + if ((err = do_cycle()) != srs_success) { + srs_warn("Encoder: Ignore error, %s", srs_error_desc(err).c_str()); + srs_freep(err); } - if (!trd->pull()) { - srs_usleep(SRS_RTMP_ENCODER_CIMS * 1000); + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "encoder"); } + + srs_usleep(SRS_RTMP_ENCODER_CIMS * 1000); } // kill ffmpeg when finished and it alive @@ -114,12 +121,13 @@ int SrsEncoder::cycle() ffmpeg->stop(); } - return ret; + return err; } -int SrsEncoder::do_cycle() +srs_error_t SrsEncoder::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::vector::iterator it; for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { @@ -127,21 +135,19 @@ int SrsEncoder::do_cycle() // start all ffmpegs. if ((ret = ffmpeg->start()) != ERROR_SUCCESS) { - srs_error("transcode ffmpeg start failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ffmpeg start"); } // check ffmpeg status. if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) { - srs_error("transcode ffmpeg cycle failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ffmpeg cycle"); } } // pithy print show_encode_log_message(); - return ret; + return err; } void SrsEncoder::clear_engines() diff --git a/trunk/src/app/srs_app_encoder.hpp b/trunk/src/app/srs_app_encoder.hpp index ba83483d3f..f92c2f89f4 100644 --- a/trunk/src/app/srs_app_encoder.hpp +++ b/trunk/src/app/srs_app_encoder.hpp @@ -58,9 +58,9 @@ class SrsEncoder : public ISrsCoroutineHandler virtual void on_unpublish(); // interface ISrsReusableThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); private: virtual void clear_engines(); virtual SrsFFMPEG* at(int index); diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 05636cea7f..e5de37a50a 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -93,6 +93,7 @@ void SrsForwarder::set_queue_size(double queue_size) int SrsForwarder::on_publish() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // discovery the server port and tcUrl from req and ep_forward. std::string server; @@ -137,8 +138,11 @@ int SrsForwarder::on_publish() srs_freep(trd); trd = new SrsSTCoroutine("forward", this); - if ((ret = trd->start()) != ERROR_SUCCESS) { - srs_error("start srs thread failed. ret=%d", ret); + if ((err = trd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; } @@ -221,26 +225,30 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* shared_video) // when error, forwarder sleep for a while and retry. #define SRS_FORWARDER_CIMS (3000) -int SrsForwarder::cycle() +srs_error_t SrsForwarder::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("Forwarder: Ignore error, ret=%d", ret); + while (true) { + if ((err = do_cycle()) != srs_success) { + srs_warn("Forwarder: Ignore error, %s", srs_error_desc(err).c_str()); + srs_freep(err); } - if (!trd->pull()) { - srs_usleep(SRS_FORWARDER_CIMS * 1000); + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "forwarder"); } + + srs_usleep(SRS_FORWARDER_CIMS * 1000); } - return ret; + return err; } -int SrsForwarder::do_cycle() +srs_error_t SrsForwarder::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::string url; if (true) { @@ -260,24 +268,22 @@ int SrsForwarder::do_cycle() sdk = new SrsSimpleRtmpClient(url, cto, sto); if ((ret = sdk->connect()) != ERROR_SUCCESS) { - srs_warn("forward failed, url=%s, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", url.c_str(), cto, sto, ret); - return ret; + return srs_error_new(ret, "sdk connect url=%s, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto); } if ((ret = sdk->publish()) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "sdk publish"); } if ((ret = hub->on_forwarder_start(this)) != ERROR_SUCCESS) { - srs_error("callback the source to feed the sequence header failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "notify hub start"); } if ((ret = forward()) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "forward"); } - return ret; + return err; } #define SYS_MAX_FORWARD_SEND_MSGS 128 diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 29ec907af1..fdc67404c1 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -92,9 +92,9 @@ class SrsForwarder : public ISrsCoroutineHandler virtual int on_video(SrsSharedPtrMessage* shared_video); // interface ISrsReusableThread2Handler. public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); private: virtual int forward(); }; diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 3c8275dd81..0a4439bb32 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -257,13 +257,13 @@ int SrsHlsMuxer::deviation() srs_error_t SrsHlsMuxer::initialize() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = async->start()) != ERROR_SUCCESS) { - return srs_error_new(ret, "async start"); + if ((err = async->start()) != srs_success) { + return srs_error_wrap(err, "async start"); } - return srs_success; + return err; } int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index c3b8bb5e11..e66ad21272 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -85,9 +85,15 @@ int SrsBufferCache::update(SrsSource* s, SrsRequest* r) return ret; } -int SrsBufferCache::start() +srs_error_t SrsBufferCache::start() { - return trd->start(); + srs_error_t err = srs_success; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "corotine"); + } + + return err; } int SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) @@ -110,22 +116,22 @@ int SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jit return ret; } -int SrsBufferCache::cycle() +srs_error_t SrsBufferCache::cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // TODO: FIXME: support reload. if (fast_cache <= 0) { srs_usleep(SRS_STREAM_CACHE_CYCLE_SECONDS * 1000 * 1000); - return ret; + return err; } // the stream cache will create consumer to cache stream, // which will trigger to fetch stream from origin for edge. SrsConsumer* consumer = NULL; if ((ret = source->create_consumer(NULL, consumer, false, false, true)) != ERROR_SUCCESS) { - srs_error("http: create consumer failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "create consumer"); } SrsAutoFree(SrsConsumer, consumer); @@ -138,15 +144,18 @@ int SrsBufferCache::cycle() // TODO: FIXME: support reload. queue->set_queue_size(fast_cache); - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "buffer cache"); + } + pprint->elapse(); // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { - srs_error("http: get messages from consumer failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "consumer dump packets"); } if (count <= 0) { @@ -170,7 +179,7 @@ int SrsBufferCache::cycle() } } - return ret; + return err; } ISrsBufferEncoder::ISrsBufferEncoder() @@ -472,6 +481,7 @@ int SrsLiveStream::update(SrsSource* s, SrsRequest* r) int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; ISrsBufferEncoder* enc = NULL; @@ -547,7 +557,11 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); SrsAutoFree(SrsHttpRecvThread, trd); - if ((ret = trd->start()) != ERROR_SUCCESS) { + if ((err = trd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("http: start notify thread failed, ret=%d", ret); return ret; } @@ -778,7 +792,11 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) } // start http stream cache thread - if ((ret = entry->cache->start()) != ERROR_SUCCESS) { + if ((err = entry->cache->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("http: start stream cache failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index dbd9e78822..06b9fef3d7 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -52,11 +52,11 @@ class SrsBufferCache : public ISrsCoroutineHandler virtual ~SrsBufferCache(); virtual int update(SrsSource* s, SrsRequest* r); public: - virtual int start(); + virtual srs_error_t start(); virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); // interface ISrsEndlessThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); }; /** diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index dbbe59628b..4219e928e5 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -126,14 +126,14 @@ void SrsIngester::dispose() stop(); } -int SrsIngester::start() +srs_error_t SrsIngester::start() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((ret = parse()) != ERROR_SUCCESS) { clear_engines(); - ret = ERROR_SUCCESS; - return ret; + return srs_error_new(ret, "parse"); } // even no ingesters, we must also start it, @@ -142,12 +142,12 @@ int SrsIngester::start() // start thread to run all encoding engines. srs_freep(trd); trd = new SrsSTCoroutine("ingest", this, _srs_context->get_id()); - if ((ret = trd->start()) != ERROR_SUCCESS) { - srs_error("st_thread_create failed. ret=%d", ret); - return ret; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "start coroutine"); } - return ret; + return err; } void SrsIngester::stop() @@ -173,26 +173,30 @@ void SrsIngester::fast_stop() // ingest never sleep a long time, for we must start the stream ASAP. #define SRS_AUTO_INGESTER_CIMS (3000) -int SrsIngester::cycle() +srs_error_t SrsIngester::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("Ingester: Ignore error, ret=%d", ret); + while (true) { + if ((err = do_cycle()) != srs_success) { + srs_warn("Ingester: Ignore error, %s", srs_error_desc(err).c_str()); + srs_freep(err); } - if (!trd->pull()) { - srs_usleep(SRS_AUTO_INGESTER_CIMS * 1000); + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "ingester"); } + + srs_usleep(SRS_AUTO_INGESTER_CIMS * 1000); } - return ret; + return err; } -int SrsIngester::do_cycle() +srs_error_t SrsIngester::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // when expired, restart all ingesters. if (expired) { @@ -204,7 +208,7 @@ int SrsIngester::do_cycle() // re-prase the ingesters. if ((ret = parse()) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "parse"); } } @@ -215,21 +219,19 @@ int SrsIngester::do_cycle() // start all ffmpegs. if ((ret = ingester->start()) != ERROR_SUCCESS) { - srs_error("ingest ffmpeg start failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ingester start"); } // check ffmpeg status. if ((ret = ingester->cycle()) != ERROR_SUCCESS) { - srs_error("ingest ffmpeg cycle failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ingester cycle"); } } // pithy print show_ingest_log_message(); - return ret; + return err; } void SrsIngester::clear_engines() diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index a4f5ca9a42..4e5601cb88 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -88,15 +88,15 @@ class SrsIngester : public ISrsCoroutineHandler, public ISrsReloadHandler public: virtual void dispose(); public: - virtual int start(); + virtual srs_error_t start(); virtual void stop(); private: virtual void fast_stop(); // interface ISrsReusableThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); private: virtual void clear_engines(); virtual int parse(); diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 575c502e53..89b672d6e0 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -332,7 +332,6 @@ ISrsKafkaCluster* _srs_kafka = NULL; srs_error_t srs_initialize_kafka() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; SrsKafkaProducer* kafka = new SrsKafkaProducer(); @@ -342,8 +341,8 @@ srs_error_t srs_initialize_kafka() return srs_error_wrap(err, "initialize kafka producer"); } - if ((ret = kafka->start()) != ERROR_SUCCESS) { - return srs_error_new(ret, "start kafka producer"); + if ((err = kafka->start()) != srs_success) { + return srs_error_wrap(err, "start kafka producer"); } return err; @@ -396,28 +395,27 @@ srs_error_t SrsKafkaProducer::initialize() return srs_success; } -int SrsKafkaProducer::start() +srs_error_t SrsKafkaProducer::start() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!enabled) { - return ret; + return err; } - if ((ret = worker->start()) != ERROR_SUCCESS) { - srs_error("start kafka worker failed. ret=%d", ret); - return ret; + if ((err = worker->start()) != srs_success) { + return srs_error_wrap(err, "async worker"); } srs_freep(trd); trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id()); - if ((ret = trd->start()) != ERROR_SUCCESS) { - srs_error("start kafka thread failed. ret=%d", ret); + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); } refresh_metadata(); - return ret; + return err; } void SrsKafkaProducer::stop() @@ -448,15 +446,13 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj) } // sync with backgound metadata worker. - srs_mutex_lock(lock); + SrsLocker(lock); // flush message when metadata is ok. if (metadata_ok) { ret = flush(); } - srs_mutex_unlock(lock); - return ret; } @@ -493,25 +489,11 @@ int SrsKafkaProducer::on_close(int key) } #define SRS_KAKFA_CIMS 3000 -int SrsKafkaProducer::cycle() -{ - int ret = ERROR_SUCCESS; - - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("ignore kafka error. ret=%d", ret); - } - - if (!trd->pull()) { - srs_usleep(SRS_KAKFA_CIMS * 1000); - } - } - - return ret; -} -int SrsKafkaProducer::on_before_cycle() +srs_error_t SrsKafkaProducer::cycle() { + srs_error_t err = srs_success; + // wait for the metadata expired. // when metadata is ok, wait for it expired. if (metadata_ok) { @@ -519,16 +501,22 @@ int SrsKafkaProducer::on_before_cycle() } // request to lock to acquire the socket. - srs_mutex_lock(lock); + SrsLocker(lock); - return ERROR_SUCCESS; -} - -int SrsKafkaProducer::on_end_cycle() -{ - srs_mutex_unlock(lock); + while (true) { + if ((err = do_cycle()) != srs_success) { + srs_warn("KafkaProducer: Ignore error, %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "kafka cycle"); + } - return ERROR_SUCCESS; + srs_usleep(SRS_KAKFA_CIMS * 1000); + } + + return err; } void SrsKafkaProducer::clear_metadata() @@ -543,22 +531,22 @@ void SrsKafkaProducer::clear_metadata() partitions.clear(); } -int SrsKafkaProducer::do_cycle() +srs_error_t SrsKafkaProducer::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ignore when disabled. if (!enabled) { - return ret; + return err; } // when kafka enabled, request metadata when startup. if ((ret = request_metadata()) != ERROR_SUCCESS) { - srs_error("request kafka metadata failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "request metadata"); } - return ret; + return err; } int SrsKafkaProducer::request_metadata() diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 916c00fa1a..7018f70b3e 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -179,7 +179,7 @@ class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISr virtual ~SrsKafkaProducer(); public: virtual srs_error_t initialize(); - virtual int start(); + virtual srs_error_t start(); virtual void stop(); // internal: for worker to call task to send object. public: @@ -196,12 +196,10 @@ class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISr virtual int on_close(int key); // interface ISrsReusableThreadHandler public: - virtual int cycle(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); + virtual srs_error_t cycle(); private: virtual void clear_metadata(); - virtual int do_cycle(); + virtual srs_error_t do_cycle(); virtual int request_metadata(); // set the metadata to invalid and refresh it. virtual void refresh_metadata(); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index a3e5db8b03..152d822141 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -42,7 +42,7 @@ using namespace std; #define SRS_UDP_MAX_PACKET_SIZE 65535 // sleep in ms for udp recv packet. -#define SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS 0 +#define SrsUdpPacketRecvCycleMS 0 // nginx also set to 512 #define SERVER_LISTEN_BACKLOG 512 @@ -110,6 +110,7 @@ srs_netfd_t SrsUdpListener::stfd() int SrsUdpListener::listen() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { ret = ERROR_SOCKET_CREATE; @@ -141,7 +142,11 @@ int SrsUdpListener::listen() srs_freep(trd); trd = new SrsSTCoroutine("udp", this); - if ((ret = trd->start()) != ERROR_SUCCESS) { + if ((err = trd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; } @@ -150,9 +155,10 @@ int SrsUdpListener::listen() return ret; } -int SrsUdpListener::cycle() +srs_error_t SrsUdpListener::cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; while (!trd->pull()) { // TODO: FIXME: support ipv6, @see man 7 ipv6 @@ -161,21 +167,19 @@ int SrsUdpListener::cycle() int nread = 0; if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { - srs_warn("ignore recv udp packet failed, nread=%d", nread); - return ret; + return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread); } if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { - srs_warn("handle udp packet failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "handle packet %d bytes", nread); } - if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { - srs_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); + if (SrsUdpPacketRecvCycleMS > 0) { + srs_usleep(SrsUdpPacketRecvCycleMS * 1000); } } - return ret; + return err; } SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) @@ -205,6 +209,7 @@ int SrsTcpListener::fd() int SrsTcpListener::listen() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { ret = ERROR_SOCKET_CREATE; @@ -243,7 +248,11 @@ int SrsTcpListener::listen() srs_freep(trd); trd = new SrsSTCoroutine("tcp", this); - if ((ret = trd->start()) != ERROR_SUCCESS) { + if ((err = trd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; } @@ -252,9 +261,10 @@ int SrsTcpListener::listen() return ret; } -int SrsTcpListener::cycle() +srs_error_t SrsTcpListener::cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; while (!trd->pull()) { srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); @@ -263,20 +273,15 @@ int SrsTcpListener::cycle() srs_fd_close_exec(fd); if(stfd == NULL){ - // ignore error. - if (errno != EINTR) { - srs_error("ignore accept thread stoppped for accept client error"); - } - return ret; + return err; } srs_verbose("get a client. fd=%d", fd); if ((ret = handler->on_tcp_client(stfd)) != ERROR_SUCCESS) { - srs_warn("accept client error. ret=%d", ret); - return ret; + return srs_error_new(ret, "handle fd=%d", fd); } } - return ret; + return err; } diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 39af63c401..bd4be0518b 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -101,7 +101,7 @@ class SrsUdpListener : public ISrsCoroutineHandler virtual int listen(); // interface ISrsReusableThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); }; /** @@ -126,7 +126,7 @@ class SrsTcpListener : public ISrsCoroutineHandler virtual int listen(); // interface ISrsReusableThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); }; #endif diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp index 371ac64d8a..2e985fa236 100644 --- a/trunk/src/app/srs_app_ng_exec.cpp +++ b/trunk/src/app/srs_app_ng_exec.cpp @@ -53,6 +53,7 @@ SrsNgExec::~SrsNgExec() int SrsNgExec::on_publish(SrsRequest* req) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // when publish, parse the exec_publish. if ((ret = parse_exec_publish(req)) != ERROR_SUCCESS) { @@ -62,7 +63,11 @@ int SrsNgExec::on_publish(SrsRequest* req) // start thread to run all processes. srs_freep(trd); trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id()); - if ((ret = trd->start()) != ERROR_SUCCESS) { + if ((err = trd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("st_thread_create failed. ret=%d", ret); return ret; } @@ -78,18 +83,22 @@ void SrsNgExec::on_unpublish() // when error, ng-exec sleep for a while and retry. #define SRS_RTMP_EXEC_CIMS (3000) -int SrsNgExec::cycle() +srs_error_t SrsNgExec::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("EXEC: Ignore error, ret=%d", ret); + while (true) { + if ((err = do_cycle()) != srs_success) { + srs_warn("EXEC: Ignore error, %s", srs_error_desc(err).c_str()); + srs_freep(err); } - if (!trd->pull()) { - srs_usleep(SRS_RTMP_EXEC_CIMS * 1000); + if ((err = trd->pull()) != srs_success) { + err = srs_error_wrap(err, "ng exec cycle"); + break; } + + srs_usleep(SRS_RTMP_EXEC_CIMS * 1000); } std::vector::iterator it; @@ -98,16 +107,17 @@ int SrsNgExec::cycle() ep->stop(); } - return ret; + return err; } -int SrsNgExec::do_cycle() +srs_error_t SrsNgExec::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ignore when no exec. if (exec_publishs.empty()) { - return ret; + return err; } std::vector::iterator it; @@ -116,21 +126,19 @@ int SrsNgExec::do_cycle() // start all processes. if ((ret = process->start()) != ERROR_SUCCESS) { - srs_error("exec publish start failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "process start"); } // check process status. if ((ret = process->cycle()) != ERROR_SUCCESS) { - srs_error("exec publish cycle failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "process cycle"); } } // pithy print show_exec_log_message(); - return ret; + return err; } int SrsNgExec::parse_exec_publish(SrsRequest* req) diff --git a/trunk/src/app/srs_app_ng_exec.hpp b/trunk/src/app/srs_app_ng_exec.hpp index 1b963e71b9..23893cc091 100644 --- a/trunk/src/app/srs_app_ng_exec.hpp +++ b/trunk/src/app/srs_app_ng_exec.hpp @@ -55,9 +55,9 @@ class SrsNgExec : public ISrsCoroutineHandler virtual void on_unpublish(); // interface ISrsReusableThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); private: virtual int parse_exec_publish(SrsRequest* req); virtual void clear_exec_publish(); diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index bc77689026..9790f12b01 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -74,11 +74,18 @@ int SrsRecvThread::cid() return trd->cid(); } -int SrsRecvThread::start() +srs_error_t SrsRecvThread::start() { + srs_error_t err = srs_success; + srs_freep(trd); trd = new SrsSTCoroutine("recv", this); - return trd->start(); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "recv thread"); + } + + return err; } void SrsRecvThread::stop() @@ -91,9 +98,9 @@ void SrsRecvThread::stop_loop() trd->interrupt(); } -int SrsRecvThread::cycle() +srs_error_t SrsRecvThread::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // the multiple messages writev improve performance large, // but the timeout recv will cause 33% sys call performance, @@ -104,21 +111,28 @@ int SrsRecvThread::cycle() pumper->on_start(); - ret = do_cycle(); + if ((err = do_cycle()) != srs_success) { + err = srs_error_wrap(err, "recv thread"); + } // reset the timeout to pulse mode. rtmp->set_recv_timeout(timeout * 1000); pumper->on_stop(); - return ret; + return err; } -int SrsRecvThread::do_cycle() +srs_error_t SrsRecvThread::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "recv thread"); + } + // When the pumper is interrupted, wait then retry. if (pumper->interrupted()) { srs_usleep(timeout * 1000); @@ -143,12 +157,12 @@ int SrsRecvThread::do_cycle() // Notify the pumper to quit for error. pumper->interrupt(ret); - return ret; + return srs_error_new(ret, "recv thread"); } srs_verbose("thread loop recv message. ret=%d", ret); } - return ret; + return err; } SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms) @@ -172,9 +186,15 @@ SrsQueueRecvThread::~SrsQueueRecvThread() queue.clear(); } -int SrsQueueRecvThread::start() +srs_error_t SrsQueueRecvThread::start() { - return trd.start(); + srs_error_t err = srs_success; + + if ((err = trd.start()) != srs_success) { + return srs_error_wrap(err, "queue recv thread"); + } + + return err; } void SrsQueueRecvThread::stop() @@ -327,11 +347,17 @@ int SrsPublishRecvThread::get_cid() return ncid; } -int SrsPublishRecvThread::start() +srs_error_t SrsPublishRecvThread::start() { - int ret = trd.start(); + srs_error_t err = srs_success; + + if ((err = trd.start()) != srs_success) { + err = srs_error_wrap(err, "publish recv thread"); + } + ncid = cid = trd.cid(); - return ret; + + return err; } void SrsPublishRecvThread::stop() @@ -543,9 +569,15 @@ SrsHttpRecvThread::~SrsHttpRecvThread() srs_freep(trd); } -int SrsHttpRecvThread::start() +srs_error_t SrsHttpRecvThread::start() { - return trd->start(); + srs_error_t err = srs_success; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "http recv thread"); + } + + return err; } int SrsHttpRecvThread::error_code() @@ -553,20 +585,26 @@ int SrsHttpRecvThread::error_code() return error; } -int SrsHttpRecvThread::cycle() +srs_error_t SrsHttpRecvThread::cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "http recv thread"); + } + ISrsHttpMessage* req = NULL; SrsAutoFree(ISrsHttpMessage, req); if ((ret = conn->pop_message(&req)) != ERROR_SUCCESS) { + err = srs_error_new(ret, "pop message"); error = ret; break; } } - return ret; + return err; } diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 3061a30188..2432d8aaa1 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -106,14 +106,14 @@ class SrsRecvThread : public ISrsCoroutineHandler public: virtual int cid(); public: - virtual int start(); + virtual srs_error_t start(); virtual void stop(); virtual void stop_loop(); // interface ISrsReusableThread2Handler public: - virtual int cycle(); + virtual srs_error_t cycle(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); }; /** @@ -135,7 +135,7 @@ class SrsQueueRecvThread : public ISrsMessagePumper SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms); virtual ~SrsQueueRecvThread(); public: - virtual int start(); + virtual srs_error_t start(); virtual void stop(); public: virtual bool empty(); @@ -201,7 +201,7 @@ class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public IS virtual void set_cid(int v); virtual int get_cid(); public: - virtual int start(); + virtual srs_error_t start(); virtual void stop(); // interface ISrsMessagePumper public: @@ -239,12 +239,12 @@ class SrsHttpRecvThread : public ISrsCoroutineHandler SrsHttpRecvThread(SrsResponseOnlyHttpConn* c); virtual ~SrsHttpRecvThread(); public: - virtual int start(); + virtual srs_error_t start(); public: virtual int error_code(); // interface ISrsOneCycleThreadHandler public: - virtual int cycle(); + virtual srs_error_t cycle(); }; #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 77cdb61c24..b0e0316291 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -656,6 +656,7 @@ int SrsRtmpConn::check_vhost(bool try_default_vhost) int SrsRtmpConn::playing(SrsSource* source) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // create consumer of souce. SrsConsumer* consumer = NULL; @@ -671,7 +672,11 @@ int SrsRtmpConn::playing(SrsSource* source) SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP); // start isolate recv thread. - if ((ret = trd.start()) != ERROR_SUCCESS) { + if ((err = trd.start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("start isolate recv thread failed. ret=%d", ret); return ret; } @@ -919,13 +924,18 @@ int SrsRtmpConn::publishing(SrsSource* source) int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsRequest* req = info->req; SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); SrsAutoFree(SrsPithyPrint, pprint); // start isolate recv thread. - if ((ret = rtrd->start()) != ERROR_SUCCESS) { + if ((err = rtrd->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("start isolate recv thread failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 135ca9aa82..5d929c899c 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -230,19 +230,26 @@ SrsRtspConn::~SrsRtspConn() srs_freep(acache); } -int SrsRtspConn::serve() +srs_error_t SrsRtspConn::serve() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; + if ((ret = skt->initialize(stfd)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "socket initialize"); + } + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtsp connection"); } - return trd->start(); + return err; } -int SrsRtspConn::do_cycle() +srs_error_t SrsRtspConn::do_cycle() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // retrieve ip of client. std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); @@ -252,10 +259,7 @@ int SrsRtspConn::do_cycle() while (!trd->pull()) { SrsRtspRequest* req = NULL; if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("rtsp: recv request failed. ret=%d", ret); - } - return ret; + return srs_error_new(ret, "recv message"); } SrsAutoFree(SrsRtspRequest, req); srs_info("rtsp: got rtsp request"); @@ -264,10 +268,7 @@ int SrsRtspConn::do_cycle() SrsRtspOptionsResponse* res = new SrsRtspOptionsResponse((int)req->seq); res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("rtsp: send OPTIONS response failed. ret=%d", ret); - } - return ret; + return srs_error_new(ret, "response option"); } } else if (req->is_announce()) { if (rtsp_tcUrl.empty()) { @@ -298,17 +299,13 @@ int SrsRtspConn::do_cycle() SrsRtspResponse* res = new SrsRtspResponse((int)req->seq); res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("rtsp: send ANNOUNCE response failed. ret=%d", ret); - } - return ret; + return srs_error_new(ret, "response announce"); } } else if (req->is_setup()) { srs_assert(req->transport); int lpm = 0; if ((ret = caster->alloc_port(&lpm)) != ERROR_SUCCESS) { - srs_error("rtsp: alloc port failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "alloc port"); } SrsRtpConn* rtp = NULL; @@ -320,15 +317,13 @@ int SrsRtspConn::do_cycle() rtp = audio_rtp = new SrsRtpConn(this, lpm, audio_id); } if ((ret = rtp->listen()) != ERROR_SUCCESS) { - srs_error("rtsp: rtp listen at port=%d failed. ret=%d", lpm, ret); - return ret; + return srs_error_new(ret, "rtp listen"); } srs_trace("rtsp: #%d %s over %s/%s/%s %s client-port=%d-%d, server-port=%d-%d", - req->stream_id, (req->stream_id == video_id)? "Video":"Audio", - req->transport->transport.c_str(), req->transport->profile.c_str(), req->transport->lower_transport.c_str(), - req->transport->cast_type.c_str(), req->transport->client_port_min, req->transport->client_port_max, - lpm, lpm + 1 - ); + req->stream_id, (req->stream_id == video_id)? "Video":"Audio", + req->transport->transport.c_str(), req->transport->profile.c_str(), req->transport->lower_transport.c_str(), + req->transport->cast_type.c_str(), req->transport->client_port_min, req->transport->client_port_max, + lpm, lpm + 1); // create session. if (session.empty()) { @@ -342,24 +337,18 @@ int SrsRtspConn::do_cycle() res->local_port_max = lpm + 1; res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("rtsp: send SETUP response failed. ret=%d", ret); - } - return ret; + return srs_error_new(ret, "response setup"); } } else if (req->is_record()) { SrsRtspResponse* res = new SrsRtspResponse((int)req->seq); res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("rtsp: send SETUP response failed. ret=%d", ret); - } - return ret; + return srs_error_new(ret, "response record"); } } } - return ret; + return err; } int SrsRtspConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) @@ -397,24 +386,19 @@ int SrsRtspConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) return ret; } -int SrsRtspConn::cycle() +srs_error_t SrsRtspConn::cycle() { // serve the rtsp client. - int ret = do_cycle(); + srs_error_t err = do_cycle(); - // if socket io error, set to closed. - if (srs_is_client_gracefully_close(ret)) { - ret = ERROR_SOCKET_CLOSED; - } + caster->remove(this); - // success. - if (ret == ERROR_SUCCESS) { + if (err == srs_success) { srs_trace("client finished."); - } - - // client close peer. - if (ret == ERROR_SOCKET_CLOSED) { - srs_warn("client disconnect peer. ret=%d", ret); + } else if (srs_is_client_gracefully_close(srs_error_code(err))) { + srs_warn("client disconnect peer. code=%d", srs_error_code(err)); + srs_freep(err); + err = srs_success; } if (video_rtp) { @@ -425,9 +409,7 @@ int SrsRtspConn::cycle() caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); } - caster->remove(this); - - return ERROR_SUCCESS; + return err; } int SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) @@ -749,10 +731,15 @@ void SrsRtspCaster::free_port(int lpmin, int lpmax) int SrsRtspCaster::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsRtspConn* conn = new SrsRtspConn(this, stfd, output); - if ((ret = conn->serve()) != ERROR_SUCCESS) { + if ((err = conn->serve()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + srs_error("rtsp: serve client failed. ret=%d", ret); srs_freep(conn); return ret; diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 7eef307f8d..7373fd8ad8 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -152,15 +152,15 @@ class SrsRtspConn : public ISrsCoroutineHandler SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o); virtual ~SrsRtspConn(); public: - virtual int serve(); + virtual srs_error_t serve(); private: - virtual int do_cycle(); + virtual srs_error_t do_cycle(); // internal methods public: virtual int on_rtp_packet(SrsRtpPacket* pkt, int stream_id); // interface ISrsOneCycleThreadHandler public: - virtual int cycle(); + virtual srs_error_t cycle(); private: virtual int on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts); virtual int on_rtp_audio(SrsRtpPacket* pkt, int64_t dts); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c052fb990c..af1862d1bb 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -394,8 +394,10 @@ srs_error_t SrsSignalManager::initialize() return srs_success; } -int SrsSignalManager::start() +srs_error_t SrsSignalManager::start() { + srs_error_t err = srs_success; + /** * Note that if multiple processes are used (see below), * the signal pipe should be initialized after the fork(2) call @@ -427,14 +429,22 @@ int SrsSignalManager::start() srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d", SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT); - return trd->start(); + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "signal manager"); + } + + return err; } -int SrsSignalManager::cycle() +srs_error_t SrsSignalManager::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "signal manager"); + } + int signo; /* Read the next signal from the pipe */ @@ -444,7 +454,7 @@ int SrsSignalManager::cycle() server->on_signal(signo); } - return ret; + return err; } void SrsSignalManager::sig_catcher(int signo) @@ -692,7 +702,6 @@ srs_error_t SrsServer::acquire_pid_file() srs_error_t SrsServer::listen() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; if ((err = listen_rtmp()) != srs_success) { @@ -711,8 +720,8 @@ srs_error_t SrsServer::listen() return srs_error_wrap(err, "stream caster listen"); } - if ((ret = conn_manager->start()) != ERROR_SUCCESS) { - return srs_error_new(ret, "connection manager"); + if ((err = conn_manager->start()) != srs_success) { + return srs_error_wrap(err, "connection manager"); } return err; @@ -720,12 +729,13 @@ srs_error_t SrsServer::listen() srs_error_t SrsServer::register_signal() { - // start signal process thread. - int ret = signal_manager->start(); - if (ret != ERROR_SUCCESS) { - return srs_error_new(ret, "signal manager start"); + srs_error_t err = srs_success; + + if ((err = signal_manager->start()) != srs_success) { + return srs_error_wrap(err, "signal manager start"); } - return srs_success; + + return err; } srs_error_t SrsServer::http_handle() @@ -808,15 +818,15 @@ srs_error_t SrsServer::http_handle() srs_error_t SrsServer::ingest() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; #ifdef SRS_AUTO_INGEST - if ((ret = ingester->start()) != ERROR_SUCCESS) { - return srs_error_new(ret, "ingest start"); + if ((err = ingester->start()) != srs_success) { + return srs_error_wrap(err, "ingest start"); } #endif - return srs_success; + return err; } srs_error_t SrsServer::cycle() @@ -1191,6 +1201,7 @@ void SrsServer::resample_kbps() int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsConnection* conn = fd2conn(type, stfd); if (conn == NULL) { @@ -1205,7 +1216,11 @@ int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) // cycle will start process thread and when finished remove the client. // @remark never use the conn, for it maybe destroyed. - if ((ret = conn->start()) != ERROR_SUCCESS) { + if ((err = conn->start()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; } srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index d513595092..e775d7f6e0 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -195,10 +195,10 @@ class SrsSignalManager : public ISrsCoroutineHandler virtual ~SrsSignalManager(); public: virtual srs_error_t initialize(); - virtual int start(); + virtual srs_error_t start(); // interface ISrsEndlessThreadHandler. public: - virtual int cycle(); + virtual srs_error_t cycle(); private: // global singleton instance static SrsSignalManager* instance; diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index f92becc78b..823abee51e 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -56,9 +56,9 @@ SrsDummyCoroutine::~SrsDummyCoroutine() { } -int SrsDummyCoroutine::start() +srs_error_t SrsDummyCoroutine::start() { - return ERROR_THREAD_DUMMY; + return srs_error_new(ERROR_THREAD_DUMMY, "dummy coroutine"); } void SrsDummyCoroutine::stop() @@ -69,9 +69,9 @@ void SrsDummyCoroutine::interrupt() { } -int SrsDummyCoroutine::pull() +srs_error_t SrsDummyCoroutine::pull() { - return ERROR_THREAD_DUMMY; + return srs_error_new(ERROR_THREAD_DUMMY, "dummy pull"); } int SrsDummyCoroutine::cid() @@ -85,35 +85,44 @@ SrsSTCoroutine::SrsSTCoroutine(const string& n, ISrsCoroutineHandler* h, int cid handler = h; context = cid; trd = NULL; - err = ERROR_SUCCESS; + trd_err = srs_success; started = interrupted = disposed = false; } SrsSTCoroutine::~SrsSTCoroutine() { stop(); + + srs_freep(trd_err); } -int SrsSTCoroutine::start() +srs_error_t SrsSTCoroutine::start() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (started || disposed) { - ret = ERROR_THREAD_DISPOSED; - err = (err == ERROR_SUCCESS? ret:err); - srs_error("Thread.start: Failed, started=%d, disposed=%d, ret=%d", started, disposed, ret); - return ret; + err = srs_error_new(ERROR_THREAD_DISPOSED, + "failed for disposed=%d, started=%d", disposed, started); + + if (trd_err == srs_success) { + trd_err = srs_error_copy(err); + } + + return err; } if((trd = (srs_thread_t)st_thread_create(pfn, this, 1, 0)) == NULL){ - ret = ERROR_ST_CREATE_CYCLE_THREAD; - srs_error("Thread.start: Create thread failed. ret=%d", ret); - return ret; + err = srs_error_new(ERROR_ST_CREATE_CYCLE_THREAD, "create failed"); + + srs_freep(trd_err); + trd_err = srs_error_copy(err); + + return err; } started = true; - return ret; + return err; } void SrsSTCoroutine::stop() @@ -126,15 +135,19 @@ void SrsSTCoroutine::stop() interrupt(); void* res = NULL; - int ret = st_thread_join((st_thread_t)trd, &res); - srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err); - srs_assert(!ret); + int r0 = st_thread_join((st_thread_t)trd, &res); + srs_assert(!r0); + + // Always override the error by the error from worker. + if ((srs_error_t)res != srs_success) { + srs_freep(trd_err); + trd_err = (srs_error_t)res; + return; + } - // Always override the error by the worker. - if (!res) { - err = (int)(uint64_t)res; - } else { - err = ERROR_THREAD_TERMINATED; + // If there's no error occur from worker, try to set to interrupted error. + if (trd_err == srs_success) { + trd_err = srs_error_new(ERROR_THREAD_TERMINATED, "terminated"); } return; @@ -147,14 +160,16 @@ void SrsSTCoroutine::interrupt() } interrupted = true; - srs_info("Thread.interrupt: Interrupt thread, err=%d", err); - err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err); + if (trd_err == srs_success) { + trd_err = srs_error_new(ERROR_THREAD_INTERRUPED, "interrupted"); + } + st_thread_interrupt((st_thread_t)trd); } -int SrsSTCoroutine::pull() +srs_error_t SrsSTCoroutine::pull() { - return err; + return srs_error_copy(trd_err); } int SrsSTCoroutine::cid() @@ -162,7 +177,7 @@ int SrsSTCoroutine::cid() return context; } -int SrsSTCoroutine::cycle() +srs_error_t SrsSTCoroutine::cycle() { if (_srs_context) { if (context) { @@ -171,17 +186,19 @@ int SrsSTCoroutine::cycle() context = _srs_context->generate_id(); } } - srs_info("Thread.cycle: Start with cid=%d, err=%d", context, err); - int ret = handler->cycle(); - srs_info("Thread.cycle: Finished with ret=%d, err=%d", ret, err); - return ret; + srs_error_t err = handler->cycle(); + if (err != srs_success) { + return srs_error_wrap(err, "coroutine cycle"); + } + + return err; } void* SrsSTCoroutine::pfn(void* arg) { SrsSTCoroutine* p = (SrsSTCoroutine*)arg; - void*res = (void*)(uint64_t)p->cycle(); + void* res = (void*)p->cycle(); return res; } diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 480349181b..b7d30022d5 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -38,7 +38,7 @@ * Thread do a job then terminated normally, it's a SrsOneCycleThread: * class SrsOneCycleThread : public ISrsCoroutineHandler { * public: SrsCoroutine trd; - * public: virtual int cycle() { + * public: virtual srs_error_t cycle() { * // Do something, then return this cycle and thread terminated normally. * } * }; @@ -46,7 +46,7 @@ * Thread has its inside loop, such as the RTMP receive thread: * class SrsReceiveThread : public ISrsCoroutineHandler { * public: SrsCoroutine trd; - * public: virtual int cycle() { + * public: virtual srs_error_t cycle() { * while (!trd.pull()) { // Check whether thread interrupted. * // Do something, such as st_read() packets, it'll be wakeup * // when user stop or interrupt the thread. @@ -64,7 +64,7 @@ class ISrsCoroutineHandler * Do the work. The ST-coroutine will terminated normally if it returned. * @remark If the cycle has its own loop, it must check the thread pull. */ - virtual int cycle() = 0; + virtual srs_error_t cycle() = 0; }; /** @@ -76,10 +76,10 @@ class SrsCoroutine SrsCoroutine(); virtual ~SrsCoroutine(); public: - virtual int start() = 0; + virtual srs_error_t start() = 0; virtual void stop() = 0; virtual void interrupt() = 0; - virtual int pull() = 0; + virtual srs_error_t pull() = 0; virtual int cid() = 0; }; @@ -93,10 +93,10 @@ class SrsDummyCoroutine : public SrsCoroutine SrsDummyCoroutine(); virtual ~SrsDummyCoroutine(); public: - virtual int start(); + virtual srs_error_t start(); virtual void stop(); virtual void interrupt(); - virtual int pull(); + virtual srs_error_t pull(); virtual int cid(); }; @@ -122,7 +122,7 @@ class SrsSTCoroutine : public SrsCoroutine private: srs_thread_t trd; int context; - int err; + srs_error_t trd_err; private: bool started; bool interrupted; @@ -137,7 +137,7 @@ class SrsSTCoroutine : public SrsCoroutine * Start the thread. * @remark Should never start it when stopped or terminated. */ - virtual int start(); + virtual srs_error_t start(); /** * Interrupt the thread then wait to terminated. * @remark If user want to notify thread to quit async, for example if there are @@ -158,13 +158,13 @@ class SrsSTCoroutine : public SrsCoroutine * @remark Return ERROR_THREAD_TERMINATED when thread terminated normally without error. * @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted. */ - virtual int pull(); + virtual srs_error_t pull(); /** * Get the context id of thread. */ virtual int cid(); private: - virtual int cycle(); + virtual srs_error_t cycle(); static void* pfn(void* arg); }; diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 1db6647074..3b595f4c15 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -43,19 +43,31 @@ SrsCoroutineManager::~SrsCoroutineManager() clear(); } -int SrsCoroutineManager::start() +srs_error_t SrsCoroutineManager::start() { - return trd->start(); + srs_error_t err = srs_success; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine manager"); + } + + return err; } -int SrsCoroutineManager::cycle() +srs_error_t SrsCoroutineManager::cycle() { - while (!trd->pull()) { + srs_error_t err = srs_success; + + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "coroutine mansger"); + } + srs_cond_wait(cond); clear(); } - return ERROR_SUCCESS; + return err; } void SrsCoroutineManager::remove(ISrsConnection* c) diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index 5458b7e0e8..f2f4684543 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -47,10 +47,10 @@ class SrsCoroutineManager : virtual public ISrsCoroutineHandler, virtual public SrsCoroutineManager(); virtual ~SrsCoroutineManager(); public: - int start(); + srs_error_t start(); // ISrsCoroutineHandler public: - virtual int cycle(); + virtual srs_error_t cycle(); // IConnectionManager public: virtual void remove(ISrsConnection* c); diff --git a/trunk/src/kernel/srs_kernel_error.cpp b/trunk/src/kernel/srs_kernel_error.cpp index b601c07f47..92e1cc53cc 100644 --- a/trunk/src/kernel/srs_kernel_error.cpp +++ b/trunk/src/kernel/srs_kernel_error.cpp @@ -41,8 +41,7 @@ bool srs_is_client_gracefully_close(int error_code) { return error_code == ERROR_SOCKET_READ || error_code == ERROR_SOCKET_READ_FULLY - || error_code == ERROR_SOCKET_WRITE - || error_code == ERROR_SOCKET_TIMEOUT; + || error_code == ERROR_SOCKET_WRITE; } SrsError::SrsError() @@ -137,6 +136,27 @@ SrsError* SrsError::success() { return NULL; } +SrsError* SrsError::copy(SrsError* from) +{ + if (from == srs_success) { + return srs_success; + } + + SrsError* err = new SrsError(); + + err->code = from->code; + err->wrapped = srs_error_copy(from->wrapped); + err->msg = from->msg; + err->func = from->func; + err->file = from->file; + err->line = from->line; + err->cid = from->cid; + err->rerrno = from->rerrno; + err->desc = from->desc; + + return err; +} + string SrsError::description(SrsError* err) { return err? err->description() : "Success"; diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 0fb06adbc3..f644a646ae 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -330,6 +330,7 @@ /** * whether the error code is an system control error. */ +// TODO: FIXME: Remove it from underlayer for confused with error and logger. extern bool srs_is_system_control_error(int error_code); extern bool srs_is_client_gracefully_close(int error_code); @@ -359,6 +360,7 @@ class SrsError static SrsError* create(const char* func, const char* file, int line, int code, const char* fmt, ...); static SrsError* wrap(const char* func, const char* file, int line, SrsError* err, const char* fmt, ...); static SrsError* success(); + static SrsError* copy(SrsError* from); static std::string description(SrsError* err); static int error_code(SrsError* err); }; @@ -367,6 +369,7 @@ class SrsError #define srs_success SrsError::success() #define srs_error_new(ret, fmt, ...) SrsError::create(__FUNCTION__, __FILE__, __LINE__, ret, fmt, ##__VA_ARGS__) #define srs_error_wrap(err, fmt, ...) SrsError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__) +#define srs_error_copy(err) SrsError::copy(err) #define srs_error_desc(err) SrsError::description(err) #define srs_error_code(err) SrsError::error_code(err) diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index dbd2b4d9e7..9e4b77a31b 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -69,7 +69,7 @@ extern const char* _srs_version; */ srs_error_t do_main(int argc, char** argv) { - srs_error_t err= srs_success; + srs_error_t err = srs_success; // TODO: support both little and big endian. srs_assert(srs_is_little_endian()); @@ -445,7 +445,7 @@ srs_error_t run_master(SrsServer* svr) return srs_error_wrap(err, "acquire pid file"); } - if ((err = svr->listen()) != ERROR_SUCCESS) { + if ((err = svr->listen()) != srs_success) { return srs_error_wrap(err, "listen"); } diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 6df2a48823..0ad8bc49a8 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -84,6 +84,27 @@ extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addr extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout); +/** + * The mutex locker. + */ +#define SrsLocker(instance) \ + impl__SrsLocker _srs_auto_free_##instance(&instance) + +class impl__SrsLocker +{ +private: + srs_mutex_t* lock; +public: + impl__SrsLocker(srs_mutex_t* l) : lock(l) { + int r0 = srs_mutex_lock(lock); + srs_assert(!r0); + } + virtual ~impl__SrsLocker() { + int r0 = srs_mutex_unlock(lock); + srs_assert(!r0); + } +}; + /** * the socket provides TCP socket over st, * that is, the sync socket mechanism.