diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index b6f7fa47ba4..d497d2cad44 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -40,7 +40,7 @@ using namespace std; #include #include -SrsBufferCache::SrsBufferCache(SrsRequest* r) +SrsBufferCache::SrsBufferCache(SrsServer* s, SrsRequest* r) { req = r->copy()->as_http(); queue = new SrsMessageQueue(true); @@ -48,6 +48,7 @@ SrsBufferCache::SrsBufferCache(SrsRequest* r) // TODO: FIXME: support reload. fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); + server_ = s; } SrsBufferCache::~SrsBufferCache() @@ -122,10 +123,11 @@ srs_error_t SrsBufferCache::cycle() return err; } - SrsSharedPtr live_source = _srs_sources->fetch(req); - if (!live_source.get()) { - return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { + return srs_error_wrap(err, "source create"); } + srs_assert(live_source.get() != NULL); // the stream cache will create consumer to cache stream, // which will trigger to fetch stream from origin for edge. @@ -578,12 +580,13 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri return writer->writev(iov, iovcnt, pnwrite); } -SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c) +SrsLiveStream::SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c) { cache = c; req = r->copy()->as_http(); security_ = new SrsSecurity(); alive_viewers_ = 0; + server_ = s; } SrsLiveStream::~SrsLiveStream() @@ -692,10 +695,16 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Enter chunked mode, because we didn't set the content-length. w->write_header(SRS_CONSTS_HTTP_OK); - SrsSharedPtr live_source = _srs_sources->fetch(req); - if (!live_source.get()) { - return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { + return srs_error_wrap(err, "source create"); } + srs_assert(live_source.get() != NULL); + + bool enabled_cache = _srs_config->get_gop_cache(req->vhost); + int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost); + live_source->set_cache(enabled_cache); + live_source->set_gop_cache_max_frames(gcmf); // create consumer of souce, ignore gop cache, use the audio gop cache. SrsLiveConsumer* consumer_raw = NULL; @@ -904,6 +913,7 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder* enc, SrsSh SrsLiveEntry::SrsLiveEntry(std::string m) { mount = m; + disposing = false; stream = NULL; cache = NULL; @@ -1015,8 +1025,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) entry = new SrsLiveEntry(mount); entry->req = r->copy()->as_http(); - entry->cache = new SrsBufferCache(r); - entry->stream = new SrsLiveStream(r, entry->cache); + entry->cache = new SrsBufferCache(server, r); + entry->stream = new SrsLiveStream(server, r, entry->cache); // TODO: FIXME: maybe refine the logic of http remux service. // if user push streams followed: @@ -1045,6 +1055,12 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) } else { // The entry exists, we reuse it and update the request of stream and cache. entry = streamHandlers[sid]; + + // Fail if system is disposing the entry. + if (entry->disposing) { + return srs_error_new(ERROR_NO_SOURCE, "stream is disposing"); + } + entry->stream->update_auth(r); entry->cache->update_auth(r); } @@ -1068,7 +1084,7 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r) // Free all HTTP resources. SrsUniquePtr entry(it->second); - streamHandlers.erase(it); + entry->disposing = true; SrsUniquePtr stream(entry->stream); SrsUniquePtr cache(entry->cache); @@ -1086,6 +1102,9 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r) srs_usleep(100 * SRS_UTIME_MILLISECONDS); } + // Remove the entry from handlers. + streamHandlers.erase(it); + // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and // stream stopped for it uses it. mux.unhandle(entry->mount, stream.get()); @@ -1187,17 +1206,6 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle } } - SrsSharedPtr live_source; - if ((err = _srs_sources->fetch_or_create(r.get(), server, live_source)) != srs_success) { - return srs_error_wrap(err, "source create"); - } - srs_assert(live_source.get() != NULL); - - bool enabled_cache = _srs_config->get_gop_cache(r->vhost); - int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost); - live_source->set_cache(enabled_cache); - live_source->set_gop_cache_max_frames(gcmf); - // create http streaming handler. if ((err = http_mount(r.get())) != srs_success) { return srs_error_wrap(err, "http mount"); @@ -1208,11 +1216,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle entry = streamHandlers[sid]; *ph = entry->stream; } - - // trigger edge to fetch from origin. - bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost); - srs_trace("flv: source url=%s, is_edge=%d, source_id=%s/%s", - r->get_stream_url().c_str(), vhost_is_edge, live_source->source_id().c_str(), live_source->pre_source_id().c_str()); + + srs_trace("flv: hijack %s ok", upath.c_str()); return err; } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 2c557972ba6..e2fb9505df5 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -21,12 +21,13 @@ class SrsBufferCache : public ISrsCoroutineHandler { private: srs_utime_t fast_cache; + SrsServer* server_; private: SrsMessageQueue* queue; SrsRequest* req; SrsCoroutine* trd; public: - SrsBufferCache(SrsRequest* r); + SrsBufferCache(SrsServer* s, SrsRequest* r); virtual ~SrsBufferCache(); virtual srs_error_t update_auth(SrsRequest* r); public: @@ -182,12 +183,13 @@ class SrsLiveStream : public ISrsHttpHandler SrsRequest* req; SrsBufferCache* cache; SrsSecurity* security_; + SrsServer* server_; // For multiple viewers, which means there will more than one alive viewers for a live stream, so we must // use an int value to represent if there is any viewer is alive. We should never do cleanup unless all // viewers closed the connection. int alive_viewers_; public: - SrsLiveStream(SrsRequest* r, SrsBufferCache* c); + SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c); virtual ~SrsLiveStream(); virtual srs_error_t update_auth(SrsRequest* r); public: @@ -218,6 +220,9 @@ struct SrsLiveEntry SrsLiveStream* stream; SrsBufferCache* cache; + + // Whether is disposing the entry. + bool disposing; SrsLiveEntry(std::string m); virtual ~SrsLiveEntry(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 485f40d5957..956f7b2e3e2 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1909,9 +1909,6 @@ SrsLiveSource::SrsLiveSource() SrsLiveSource::~SrsLiveSource() { - // Because source maybe created by http stream, so we must notify the handler to cleanup. - handler->on_unpublish(req); - _srs_config->unsubscribe(this); // never free the consumers,