Skip to content

Commit

Permalink
For #913, coroutine support complex error.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 11, 2017
1 parent 9ae5485 commit 9db2a04
Show file tree
Hide file tree
Showing 38 changed files with 614 additions and 408 deletions.
25 changes: 19 additions & 6 deletions trunk/src/app/srs_app_async_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,18 @@ int SrsAsyncCallWorker::count()
return (int)tasks.size();
}

int SrsAsyncCallWorker::start()
srs_error_t SrsAsyncCallWorker::start()
{
srs_error_t err = srs_success;

srs_freep(trd);
trd = new SrsSTCoroutine("async", this, _srs_context->get_id());
return trd->start();

if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}

return err;
}

void SrsAsyncCallWorker::stop()
Expand All @@ -84,11 +91,15 @@ void SrsAsyncCallWorker::stop()
trd->stop();
}

int SrsAsyncCallWorker::cycle()
srs_error_t SrsAsyncCallWorker::cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

while (!trd->pull()) {
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "async call worker");
}

if (tasks.empty()) {
srs_cond_wait(wait);
}
Expand All @@ -99,14 +110,16 @@ int SrsAsyncCallWorker::cycle()
std::vector<ISrsAsyncCallTask*>::iterator it;
for (it = copy.begin(); it != copy.end(); ++it) {
ISrsAsyncCallTask* task = *it;

int ret = ERROR_SUCCESS;
if ((ret = task->call()) != ERROR_SUCCESS) {
srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret);
}
srs_freep(task);
}
}

return ret;
return err;
}


4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_async_call.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ class SrsAsyncCallWorker : public ISrsCoroutineHandler
virtual int execute(ISrsAsyncCallTask* t);
virtual int count();
public:
virtual int start();
virtual srs_error_t start();
virtual void stop();
// interface ISrsReusableThreadHandler
public:
virtual int cycle();
virtual srs_error_t cycle();
};

#endif
Expand Down
13 changes: 11 additions & 2 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ int SrsAppCasterFlv::initialize()
return ret;
}

if ((ret = manager->start()) != ERROR_SUCCESS) {
if ((err = manager->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);

return ret;
}

Expand All @@ -83,12 +87,17 @@ int SrsAppCasterFlv::initialize()
int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip);
conns.push_back(conn);

if ((ret = conn->start()) != ERROR_SUCCESS) {
if ((err = conn->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);

return ret;
}

Expand Down
35 changes: 18 additions & 17 deletions trunk/src/app/srs_app_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,43 @@ void SrsConnection::dispose()
trd->interrupt();
}

int SrsConnection::start()
srs_error_t SrsConnection::start()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

if ((ret = skt->initialize(stfd)) != ERROR_SUCCESS) {
return ret;
return srs_error_new(ret, "socket");
}

return trd->start();
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}

return err;
}

int SrsConnection::cycle()
srs_error_t SrsConnection::cycle()
{
int ret = ERROR_SUCCESS;
int ret = do_cycle();

int oret = ret = do_cycle();

// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// Notify manager to remove it.
manager->remove(this);

// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client finished.");
return srs_success;
}

// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. oret=%d, ret=%d", oret, ret);
// TODO: FIXME: Only reset the error when client closed it.
if (srs_is_client_gracefully_close(ret)) {
srs_warn("client disconnect peer. ret=%d", ret);
return srs_success;
}

// Notify manager to remove it.
manager->remove(this);

return ERROR_SUCCESS;
return srs_error_new(ret, "cycle");
}

int SrsConnection::srs_id()
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutin
* when client cycle thread stop, invoke the on_thread_stop(), which will use server
* to remove the client by server->remove(this).
*/
virtual int start();
virtual srs_error_t start();
// interface ISrsOneCycleThreadHandler
public:
/**
* the thread cycle function,
* when serve connection completed, terminate the loop which will terminate the thread,
* thread will invoke the on_thread_stop() when it terminated.
*/
virtual int cycle();
virtual srs_error_t cycle();
public:
/**
* get the srs id which identify the client.
Expand Down
5 changes: 2 additions & 3 deletions trunk/src/app/srs_app_dvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ SrsDvrPlan::~SrsDvrPlan()

srs_error_t SrsDvrPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

hub = h;
Expand All @@ -623,8 +622,8 @@ srs_error_t SrsDvrPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsReque
return srs_error_wrap(err, "segmenter");
}

if ((ret = async->start()) != ERROR_SUCCESS) {
return srs_error_new(ret, "async");
if ((err = async->start()) != srs_success) {
return srs_error_wrap(err, "async");
}

return err;
Expand Down
Loading

0 comments on commit 9db2a04

Please sign in to comment.