Skip to content

Commit

Permalink
refine the pithy print of ingesters.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 6, 2015
1 parent e4c27a5 commit 2a1db36
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 24 deletions.
98 changes: 78 additions & 20 deletions trunk/src/app/srs_app_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,63 @@ using namespace std;
// ingest never sleep a long time, for we must start the stream ASAP.
#define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)

SrsIngesterFFMPEG::SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, string _vhost, string _id)
SrsIngesterFFMPEG::SrsIngesterFFMPEG()
{
ffmpeg = _ffmpeg;
vhost = _vhost;
id = _id;
ffmpeg = NULL;
}

SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
{
srs_freep(ffmpeg);
}

int SrsIngesterFFMPEG::initialize(SrsFFMPEG* ff, string v, string i)
{
int ret = ERROR_SUCCESS;

ffmpeg = ff;
vhost = v;
id = i;
starttime = srs_get_system_time_ms();

return ret;
}

string SrsIngesterFFMPEG::uri()
{
return vhost + "/" + id;
}

int SrsIngesterFFMPEG::alive()
{
return (int)(srs_get_system_time_ms() - starttime);
}

bool SrsIngesterFFMPEG::equals(string v)
{
return vhost == v;
}

bool SrsIngesterFFMPEG::equals(string v, string i)
{
return vhost == v && id == i;
}

int SrsIngesterFFMPEG::start()
{
return ffmpeg->start();
}

void SrsIngesterFFMPEG::stop()
{
ffmpeg->stop();
}

int SrsIngesterFFMPEG::cycle()
{
return ffmpeg->cycle();
}

void SrsIngesterFFMPEG::fast_stop()
{
ffmpeg->fast_stop();
Expand Down Expand Up @@ -129,6 +174,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest

// get all engines.
std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(ingest);

// create ingesters without engines.
if (engines.empty()) {
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) {
Expand All @@ -139,12 +186,17 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
return ret;
}

SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0());
SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
srs_freep(ingester);
return ret;
}

ingesters.push_back(ingester);
return ret;
}

// create engine
// create ingesters with engine
for (int i = 0; i < (int)engines.size(); i++) {
SrsConfDirective* engine = engines[i];
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
Expand All @@ -156,8 +208,13 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
}
return ret;
}

SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
srs_freep(ingester);
return ret;
}

SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0());
ingesters.push_back(ingester);
}

Expand Down Expand Up @@ -196,13 +253,13 @@ int SrsIngester::cycle()
SrsIngesterFFMPEG* ingester = *it;

// start all ffmpegs.
if ((ret = ingester->ffmpeg->start()) != ERROR_SUCCESS) {
if ((ret = ingester->start()) != ERROR_SUCCESS) {
srs_error("ingest ffmpeg start failed. ret=%d", ret);
return ret;
}

// check ffmpeg status.
if ((ret = ingester->ffmpeg->cycle()) != ERROR_SUCCESS) {
if ((ret = ingester->cycle()) != ERROR_SUCCESS) {
srs_error("ingest ffmpeg cycle failed. ret=%d", ret);
return ret;
}
Expand Down Expand Up @@ -376,11 +433,14 @@ void SrsIngester::show_ingest_log_message()
return;
}

// random choose one ingester to report.
int index = rand() % (int)ingesters.size();
SrsIngesterFFMPEG* ingester = ingesters.at(index);

// reportable
if (pprint->can_print()) {
// TODO: FIXME: show more info.
srs_trace("-> "SRS_CONSTS_LOG_INGESTER
" time=%"PRId64", ingesters=%d", pprint->age(), (int)ingesters.size());
srs_trace("-> "SRS_CONSTS_LOG_INGESTER" time=%"PRId64", ingesters=%d, #%d(alive=%ds, %s)",
pprint->age(), (int)ingesters.size(), index, ingester->alive() / 1000, ingester->uri().c_str());
}
}

Expand All @@ -407,16 +467,15 @@ int SrsIngester::on_reload_vhost_removed(string vhost)
for (it = ingesters.begin(); it != ingesters.end();) {
SrsIngesterFFMPEG* ingester = *it;

if (ingester->vhost != vhost) {
if (!ingester->equals(vhost)) {
++it;
continue;
}

// stop the ffmpeg and free it.
ingester->ffmpeg->stop();
ingester->stop();

srs_trace("reload stop ingester, "
"vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str());
srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str());

srs_freep(ingester);

Expand All @@ -436,16 +495,15 @@ int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id)
for (it = ingesters.begin(); it != ingesters.end();) {
SrsIngesterFFMPEG* ingester = *it;

if (ingester->vhost != vhost || ingester->id != ingest_id) {
if (!ingester->equals(vhost, ingest_id)) {
++it;
continue;
}

// stop the ffmpeg and free it.
ingester->ffmpeg->stop();
ingester->stop();

srs_trace("reload stop ingester, "
"vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str());
srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str());

srs_freep(ingester);

Expand Down
20 changes: 16 additions & 4 deletions trunk/src/app/srs_app_ingest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,26 @@ class SrsPithyPrint;
*/
class SrsIngesterFFMPEG
{
public:
private:
std::string vhost;
std::string id;
SrsFFMPEG* ffmpeg;

SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, std::string _vhost, std::string _id);
int64_t starttime;
public:
SrsIngesterFFMPEG();
virtual ~SrsIngesterFFMPEG();

public:
virtual int initialize(SrsFFMPEG* ff, std::string v, std::string i);
// the ingest uri, [vhost]/[ingest id]
virtual std::string uri();
// the alive in ms.
virtual int alive();
virtual bool equals(std::string v, std::string i);
virtual bool equals(std::string v);
public:
virtual int start();
virtual void stop();
virtual int cycle();
// @see SrsFFMPEG.fast_stop().
virtual void fast_stop();
};
Expand Down

0 comments on commit 2a1db36

Please sign in to comment.