diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 70f38a348e..4b96485a49 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -128,12 +128,12 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa o = o.substr(0, o.length() - 4); } - int ret = conn->proxy(w, r, o); - if (ret != ERROR_SUCCESS) { - return srs_error_new(ret, "proxy"); + srs_error_t err = conn->proxy(w, r, o); + if (err != srs_success) { + return srs_error_wrap(err, "proxy"); } - return srs_success; + return err; } SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip) @@ -154,9 +154,9 @@ srs_error_t SrsDynamicHttpConn::on_got_http_message(ISrsHttpMessage* msg) return srs_success; } -int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o) +srs_error_t SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; output = o; srs_trace("flv: proxy %s to %s", r->uri().c_str(), output.c_str()); @@ -168,36 +168,29 @@ int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std SrsHttpFileReader reader(rr); SrsFlvDecoder dec; - if ((ret = dec.initialize(&reader)) != ERROR_SUCCESS) { - return ret; + if ((err = dec.initialize(&reader)) != srs_success) { + return srs_error_wrap(err, "init decoder"); } char header[9]; - if ((ret = dec.read_header(header)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flv: proxy flv header failed. ret=%d", ret); - } - return ret; + if ((err = dec.read_header(header)) != srs_success) { + return srs_error_wrap(err, "read header"); } - srs_trace("flv: proxy drop flv header."); char pps[4]; - if ((ret = dec.read_previous_tag_size(pps)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flv: proxy flv header pps failed. ret=%d", ret); - } - return ret; + if ((err = dec.read_previous_tag_size(pps)) != srs_success) { + return srs_error_wrap(err, "read pts"); } - ret = do_proxy(rr, &dec); + err = do_proxy(rr, &dec); sdk->close(); - return ret; + return err; } -int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) +srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; srs_freep(sdk); @@ -205,14 +198,12 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS; sdk = new SrsSimpleRtmpClient(output, cto, sto); - if ((ret = sdk->connect()) != ERROR_SUCCESS) { - srs_error("flv: connect %s failed, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", output.c_str(), cto, sto, ret); - return ret; + if ((err = sdk->connect()) != srs_success) { + return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto); } - if ((ret = sdk->publish()) != ERROR_SUCCESS) { - srs_error("flv: publish failed. ret=%d", ret); - return ret; + if ((err = sdk->publish()) != srs_success) { + return srs_error_wrap(err, "publish"); } char pps[4]; @@ -222,48 +213,36 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) char type; int32_t size; uint32_t time; - if ((ret = dec->read_tag_header(&type, &size, &time)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flv: proxy tag header failed. ret=%d", ret); - } - return ret; + if ((err = dec->read_tag_header(&type, &size, &time)) != srs_success) { + return srs_error_wrap(err, "read tag header"); } char* data = new char[size]; - if ((ret = dec->read_tag_data(data, size)) != ERROR_SUCCESS) { + if ((err = dec->read_tag_data(data, size)) != srs_success) { srs_freepa(data); - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flv: proxy tag data failed. ret=%d", ret); - } - return ret; + return srs_error_wrap(err, "read tag data"); } SrsSharedPtrMessage* msg = NULL; - if ((ret = srs_rtmp_create_msg(type, time, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { - return ret; + if ((err = srs_rtmp_create_msg(type, time, data, size, sdk->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); } // TODO: FIXME: for post flv, reconnect when error. - if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flv: proxy rtmp packet failed. ret=%d", ret); - } - return ret; + if ((err = sdk->send_and_free_message(msg)) != srs_success) { + return srs_error_wrap(err, "send message"); } if (pprint->can_print()) { srs_trace("flv: send msg %d age=%d, dts=%d, size=%d", type, pprint->age(), time, size); } - if ((ret = dec->read_previous_tag_size(pps)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flv: proxy tag header pps failed. ret=%d", ret); - } - return ret; + if ((err = dec->read_previous_tag_size(pps)) != srs_success) { + return srs_error_wrap(err, "read pts"); } } - return ret; + return err; } SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) @@ -275,9 +254,9 @@ SrsHttpFileReader::~SrsHttpFileReader() { } -int SrsHttpFileReader::open(std::string /*file*/) +srs_error_t SrsHttpFileReader::open(std::string /*file*/) { - return ERROR_SUCCESS; + return srs_success; } void SrsHttpFileReader::close() @@ -308,26 +287,23 @@ int64_t SrsHttpFileReader::filesize() return 0; } -int SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread) +srs_error_t SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (http->eof()) { - ret = ERROR_HTTP_REQUEST_EOF; - srs_error("flv: encoder EOF. ret=%d", ret); - return ret; + return srs_error_new(ERROR_HTTP_REQUEST_EOF, "EOF"); } int total_read = 0; while (total_read < (int)count) { int nread = 0; - if ((ret = http->read((char*)buf + total_read, (int)(count - total_read), &nread)) != ERROR_SUCCESS) { - return ret; + if ((err = http->read((char*)buf + total_read, (int)(count - total_read), &nread)) != srs_success) { + return srs_error_wrap(err, "read"); } if (nread == 0) { - ret = ERROR_HTTP_REQUEST_EOF; - srs_warn("flv: encoder read EOF. ret=%d", ret); + err = srs_error_new(ERROR_HTTP_REQUEST_EOF, "EOF"); break; } @@ -339,13 +315,13 @@ int SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread) *pnread = total_read; } - return ret; + return err; } -int SrsHttpFileReader::lseek(off_t offset, int whence, off_t* seeked) +srs_error_t SrsHttpFileReader::lseek(off_t offset, int whence, off_t* seeked) { // TODO: FIXME: Use HTTP range for seek. - return ERROR_SUCCESS; + return srs_success; } #endif diff --git a/trunk/src/app/srs_app_hds.cpp b/trunk/src/app/srs_app_hds.cpp index 24b1d71d17..fcdd15e682 100644 --- a/trunk/src/app/srs_app_hds.cpp +++ b/trunk/src/app/srs_app_hds.cpp @@ -58,11 +58,11 @@ char flv_header[] = {'F', 'L', 'V', string serialFlv(SrsSharedPtrMessage *msg) { - SrsBuffer *stream = new SrsBuffer; - int size = 15 + msg->size; char *byte = new char[size]; - stream->initialize(byte, size); + + SrsBuffer *stream = new SrsBuffer(byte, size); + SrsAutoFree(SrsBuffer, stream); // tag header long long dts = msg->timestamp; @@ -70,7 +70,7 @@ string serialFlv(SrsSharedPtrMessage *msg) stream->write_1bytes(type); stream->write_3bytes(msg->size); - stream->write_3bytes(dts); + stream->write_3bytes((int32_t)dts); stream->write_1bytes(dts >> 24 & 0xFF); stream->write_3bytes(0); stream->write_bytes(msg->payload, msg->size); @@ -81,7 +81,6 @@ string serialFlv(SrsSharedPtrMessage *msg) string ret(stream->data(), stream->size()); - delete stream; delete [] byte; return ret; @@ -128,7 +127,7 @@ class SrsHdsFragment /*! flush data to disk. */ - int flush() + srs_error_t flush() { string data; if (videoSh) { @@ -148,9 +147,8 @@ class SrsHdsFragment } char box_header[8]; - SrsBuffer ss; - ss.initialize(box_header, 8); - ss.write_4bytes(8 + data.size()); + SrsBuffer ss(box_header, 8); + ss.write_4bytes((int32_t)(8 + data.size())); ss.write_string("mdat"); data = string(ss.data(), ss.size()) + data; @@ -158,20 +156,18 @@ class SrsHdsFragment const char *file_path = path.c_str(); int fd = open(file_path, O_WRONLY | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); if (fd < 0) { - srs_error("open fragment file failed, path=%s", file_path); - return -1; + return srs_error_new(-1, "open fragment file failed, path=%s", file_path); } if (write(fd, data.data(), data.size()) != (int)data.size()) { - srs_error("write fragment file failed, path=", file_path); close(fd); - return -1; + return srs_error_new(-1, "write fragment file failed, path=", file_path); } close(fd); srs_trace("build fragment success=%s", file_path); - return ERROR_SUCCESS; + return srs_success; } /*! @@ -192,7 +188,7 @@ class SrsHdsFragment SrsSharedPtrMessage *last_msg = msgs.back(); last_msg_ts = last_msg->timestamp; - duration_ms = last_msg_ts - first_msg_ts; + duration_ms = (int)(last_msg_ts - first_msg_ts); } return duration_ms; @@ -276,17 +272,18 @@ SrsHds::~SrsHds() { } -int SrsHds::on_publish(SrsRequest *req) +srs_error_t SrsHds::on_publish(SrsRequest *req) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; + if (hds_enabled) { - return ret; + return err; } std::string vhost = req->vhost; if (!_srs_config->get_hds_enabled(vhost)) { hds_enabled = false; - return ret; + return err; } hds_enabled = true; @@ -295,12 +292,12 @@ int SrsHds::on_publish(SrsRequest *req) return flush_mainfest(); } -int SrsHds::on_unpublish() +srs_error_t SrsHds::on_unpublish() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!hds_enabled) { - return ret; + return err; } hds_enabled = false; @@ -321,15 +318,15 @@ int SrsHds::on_unpublish() srs_trace("HDS un-published"); - return ret; + return err; } -int SrsHds::on_video(SrsSharedPtrMessage* msg) +srs_error_t SrsHds::on_video(SrsSharedPtrMessage* msg) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!hds_enabled) { - return ret; + return err; } if (SrsFlvVideo::sh(msg->payload, msg->size)) { @@ -354,9 +351,8 @@ int SrsHds::on_video(SrsSharedPtrMessage* msg) double fragment_duration = _srs_config->get_hds_fragment(hds_req->vhost) * 1000; if (currentSegment->duration() >= fragment_duration) { // flush segment - if ((ret = currentSegment->flush()) != ERROR_SUCCESS) { - srs_error("flush segment failed."); - return ret; + if ((err = currentSegment->flush()) != srs_success) { + return srs_error_wrap(err, "flush segment"); } srs_trace("flush Segment success."); @@ -365,23 +361,22 @@ int SrsHds::on_video(SrsSharedPtrMessage* msg) adjust_windows(); // flush bootstrap - if ((ret = flush_bootstrap()) != ERROR_SUCCESS) { - srs_error("flush bootstrap failed."); - return ret; + if ((err = flush_bootstrap()) != srs_success) { + return srs_error_wrap(err, "flush bootstrap"); } srs_trace("flush BootStrap success."); } - return ret; + return err; } -int SrsHds::on_audio(SrsSharedPtrMessage* msg) +srs_error_t SrsHds::on_audio(SrsSharedPtrMessage* msg) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!hds_enabled) { - return ret; + return err; } if (SrsFlvAudio::sh(msg->payload, msg->size)) { @@ -406,9 +401,8 @@ int SrsHds::on_audio(SrsSharedPtrMessage* msg) double fragment_duration = _srs_config->get_hds_fragment(hds_req->vhost) * 1000; if (currentSegment->duration() >= fragment_duration) { // flush segment - if ((ret = currentSegment->flush()) != ERROR_SUCCESS) { - srs_error("flush segment failed."); - return ret; + if ((err = currentSegment->flush()) != srs_success) { + return srs_error_wrap(err, "flush segment"); } srs_info("flush Segment success."); @@ -419,20 +413,18 @@ int SrsHds::on_audio(SrsSharedPtrMessage* msg) adjust_windows(); // flush bootstrap - if ((ret = flush_bootstrap()) != ERROR_SUCCESS) { - srs_error("flush bootstrap failed."); - return ret; + if ((err = flush_bootstrap()) != srs_success) { + return srs_error_wrap(err, "flush bootstrap"); } srs_info("flush BootStrap success."); } - return ret; + return err; } -int SrsHds::flush_mainfest() +srs_error_t SrsHds::flush_mainfest() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; char buf[1024] = {0}; @@ -448,40 +440,30 @@ int SrsHds::flush_mainfest() string dir = _srs_config->get_hds_path(hds_req->vhost) + "/" + hds_req->app; if ((err = srs_create_dir_recursively(dir)) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - srs_error("hds create dir failed. ret=%d", ret); - return ret; + return srs_error_wrap(err, "hds create dir failed"); } string path = dir + "/" + hds_req->stream + ".f4m"; int fd = open(path.c_str(), O_WRONLY | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); if (fd < 0) { - srs_error("open manifest file failed, path=%s", path.c_str()); - ret = ERROR_HDS_OPEN_F4M_FAILED; - return ret; + return srs_error_new(ERROR_HDS_OPEN_F4M_FAILED, "open manifest file failed, path=%s", path.c_str()); } - int f4m_size = strlen(buf); + int f4m_size = (int)strlen(buf); if (write(fd, buf, f4m_size) != f4m_size) { - srs_error("write manifest file failed, path=", path.c_str()); close(fd); - ret = ERROR_HDS_WRITE_F4M_FAILED; - return ret; + return srs_error_new(ERROR_HDS_WRITE_F4M_FAILED, "write manifest file failed, path=", path.c_str()); } close(fd); srs_trace("build manifest success=%s", path.c_str()); - return ERROR_SUCCESS; + return err; } -int SrsHds::flush_bootstrap() +srs_error_t SrsHds::flush_bootstrap() { - int ret = ERROR_SUCCESS; - - SrsBuffer abst; + srs_error_t err = srs_success; int size = 1024*100; @@ -494,9 +476,7 @@ int SrsHds::flush_bootstrap() char *start_afrt = NULL; int size_afrt = 0; - if ((ret = abst.initialize(start_abst, size)) != ERROR_SUCCESS) { - return ret; - } + SrsBuffer abst(start_abst, size); // @see video_file_format_spec_v10_1 // page: 46 @@ -695,7 +675,7 @@ int SrsHds::flush_bootstrap() The number of items in this FragmentRunEntryTable. The minimum value is 1. */ - abst.write_4bytes(fragments.size()); + abst.write_4bytes((int32_t)fragments.size()); size_afrt += 4; list::iterator iter; @@ -715,22 +695,18 @@ int SrsHds::flush_bootstrap() int fd = open(path.c_str(), O_WRONLY | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); if (fd < 0) { - srs_error("open bootstrap file failed, path=%s", path.c_str()); - ret = ERROR_HDS_OPEN_BOOTSTRAP_FAILED; - return ret; + return srs_error_new(ERROR_HDS_OPEN_BOOTSTRAP_FAILED, "open bootstrap file failed, path=%s", path.c_str()); } if (write(fd, start_abst, size_abst) != size_abst) { - srs_error("write bootstrap file failed, path=", path.c_str()); close(fd); - ret = ERROR_HDS_WRITE_BOOTSTRAP_FAILED; - return ret; + return srs_error_new(ERROR_HDS_WRITE_BOOTSTRAP_FAILED, "write bootstrap file failed, path=", path.c_str()); } close(fd); srs_trace("build bootstrap success=%s", path.c_str()); - return ERROR_SUCCESS; + return err; } void SrsHds::adjust_windows() diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index 8a3f6913df..cfdaeda71e 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -48,24 +48,22 @@ SrsHourGlass::~SrsHourGlass() { } -int SrsHourGlass::tick(int type, int interval) +srs_error_t SrsHourGlass::tick(int type, int interval) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (resolution > 0 && (interval % resolution) != 0) { - ret = ERROR_SYSTEM_HOURGLASS_RESOLUTION; - srs_error("hourglass interval=%d invalid, resolution=%d. ret=%d", interval, resolution, ret); - return ret; + return srs_error_new(ERROR_SYSTEM_HOURGLASS_RESOLUTION, "hourglass interval=%d invalid, resolution=%d", interval, resolution); } ticks[type] = interval; - return ret; + return err; } -int SrsHourGlass::cycle() +srs_error_t SrsHourGlass::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; map::iterator it; for (it = ticks.begin(); it != ticks.end(); ++it) { @@ -73,8 +71,8 @@ int SrsHourGlass::cycle() int interval = it->second; if (interval == 0 || (total_elapse % interval) == 0) { - if ((ret = handler->notify(type, interval, total_elapse)) != ERROR_SUCCESS) { - return ret; + if ((err = handler->notify(type, interval, total_elapse)) != srs_success) { + return srs_error_wrap(err, "notify"); } } } @@ -82,5 +80,5 @@ int SrsHourGlass::cycle() total_elapse += resolution; srs_usleep(resolution * 1000); - return ret; + return err; } diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index bd8d67c6b9..36728b6ce4 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -152,32 +152,27 @@ string SrsKafkaPartition::hostport() return ep; } -int SrsKafkaPartition::connect() +srs_error_t SrsKafkaPartition::connect() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; if (transport) { - return ret; + return err; } transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT); kafka = new SrsKafkaClient(transport); if ((err = transport->connect()) != srs_success) { disconnect(); - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - srs_error("connect to %s partition=%d failed. ret=%d", hostport().c_str(), id, ret); - return ret; + return srs_error_wrap(err, "connect to %s partition=%d failed", hostport().c_str(), id); } srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker); - return ret; + return err; } -int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) +srs_error_t SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) { return kafka->write_messages(topic, id, *pc); } @@ -202,12 +197,12 @@ SrsKafkaMessage::~SrsKafkaMessage() srs_error_t SrsKafkaMessage::call() { - int ret = producer->send(key, obj); + srs_error_t err = producer->send(key, obj); // the obj is manged by producer now. obj = NULL; - return srs_error_new(ret, "kafka send"); + return srs_error_wrap(err, "kafka send"); } string SrsKafkaMessage::to_string() @@ -281,9 +276,9 @@ bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc) return false; } -int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc) +srs_error_t SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ensure the key exists. srs_assert (cache.find(key) != cache.end()); @@ -292,19 +287,17 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio // we remember the messages we have written and clear it when completed. int nb_msgs = (int)pc->size(); if (pc->empty()) { - return ret; + return err; } // connect transport. - if ((ret = partition->connect()) != ERROR_SUCCESS) { - srs_error("kafka connect to partition failed. ret=%d", ret); - return ret; + if ((err = partition->connect()) != srs_success) { + return srs_error_wrap(err, "connect partition"); } // write the json objects. - if ((ret = partition->flush(pc)) != ERROR_SUCCESS) { - srs_error("kafka write messages failed. ret=%d", ret); - return ret; + if ((err = partition->flush(pc)) != srs_success) { + return srs_error_wrap(err, "flush partition"); } // free all wrote messages. @@ -320,7 +313,7 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio pc->erase(pc->begin(), pc->begin() + nb_msgs); } - return ret; + return err; } ISrsKafkaCluster::ISrsKafkaCluster() @@ -432,16 +425,16 @@ void SrsKafkaProducer::stop() worker->stop(); } -int SrsKafkaProducer::send(int key, SrsJsonObject* obj) +srs_error_t SrsKafkaProducer::send(int key, SrsJsonObject* obj) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // cache the json object. cache->append(key, obj); // too few messages, ignore. if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { - return ret; + return err; } // too many messages, warn user. @@ -454,10 +447,10 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj) // flush message when metadata is ok. if (metadata_ok) { - ret = flush(); + err = flush(); } - return ret; + return err; } srs_error_t SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) @@ -537,7 +530,6 @@ void SrsKafkaProducer::clear_metadata() srs_error_t SrsKafkaProducer::do_cycle() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // ignore when disabled. @@ -546,28 +538,27 @@ srs_error_t SrsKafkaProducer::do_cycle() } // when kafka enabled, request metadata when startup. - if ((ret = request_metadata()) != ERROR_SUCCESS) { - return srs_error_new(ret, "request metadata"); + if ((err = request_metadata()) != srs_success) { + return srs_error_wrap(err, "request metadata"); } return err; } -int SrsKafkaProducer::request_metadata() +srs_error_t SrsKafkaProducer::request_metadata() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // ignore when disabled. if (!enabled) { - return ret; + return err; } // select one broker to connect to. SrsConfDirective* brokers = _srs_config->get_kafka_brokers(); if (!brokers) { srs_warn("ignore for empty brokers."); - return ret; + return err; } std::string server; @@ -594,18 +585,13 @@ int SrsKafkaProducer::request_metadata() // reconnect to kafka server. if ((err = transport->connect()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); - return ret; + return srs_error_wrap(err, "connect %s:%d failed", server.c_str(), port); } // do fetch medata from broker. SrsKafkaTopicMetadataResponse* metadata = NULL; - if ((ret = kafka->fetch_metadata(topic, &metadata)) != ERROR_SUCCESS) { - srs_error("kafka fetch metadata failed. ret=%d", ret); - return ret; + if ((err = kafka->fetch_metadata(topic, &metadata)) != srs_success) { + return srs_error_wrap(err, "fetch metadata"); } SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); @@ -615,7 +601,7 @@ int SrsKafkaProducer::request_metadata() SrsKafkaTopicMetadata* topic = metadata->metadatas.at(0); if (topic->metadatas.empty()) { srs_warn("topic %s metadata empty, retry.", topic->name.to_str().c_str()); - return ret; + return err; } } @@ -632,7 +618,7 @@ int SrsKafkaProducer::request_metadata() metadata_ok = true; - return ret; + return err; } void SrsKafkaProducer::refresh_metadata() @@ -644,9 +630,9 @@ void SrsKafkaProducer::refresh_metadata() srs_trace("kafka async refresh metadata in background"); } -int SrsKafkaProducer::flush() +srs_error_t SrsKafkaProducer::flush() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // flush all available partition caches. while (true) { @@ -661,13 +647,12 @@ int SrsKafkaProducer::flush() // flush specified partition. srs_assert(key >= 0 && pc); SrsKafkaPartition* partition = partitions.at(key % partitions.size()); - if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) { - srs_error("flush partition failed. ret=%d", ret); - return ret; + if ((err = cache->flush(partition, key, pc)) != srs_success) { + return srs_error_wrap(err, "flush partition"); } } - return ret; + return err; } #endif diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index aa856430ed..532271f4c1 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -67,9 +67,9 @@ SrsMpegtsQueue::~SrsMpegtsQueue() msgs.clear(); } -int SrsMpegtsQueue::push(SrsSharedPtrMessage* msg) +srs_error_t SrsMpegtsQueue::push(SrsSharedPtrMessage* msg) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // TODO: FIXME: use right way. for (int i = 0; i < 10; i++) { @@ -83,7 +83,7 @@ int SrsMpegtsQueue::push(SrsSharedPtrMessage* msg) if (i >= 5) { srs_warn("mpegts: free the msg for dts exists, dts=%" PRId64, msg->timestamp); srs_freep(msg); - return ret; + return err; } } @@ -97,7 +97,7 @@ int SrsMpegtsQueue::push(SrsSharedPtrMessage* msg) msgs[msg->timestamp] = msg; - return ret; + return err; } SrsSharedPtrMessage* SrsMpegtsQueue::dequeue() @@ -128,7 +128,6 @@ SrsSharedPtrMessage* SrsMpegtsQueue::dequeue() SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) { - stream = new SrsBuffer(); context = new SrsTsContext(); buffer = new SrsSimpleStream(); output = _srs_config->get_stream_caster_output(c); @@ -149,7 +148,6 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() close(); srs_freep(buffer); - srs_freep(stream); srs_freep(context); srs_freep(avc); srs_freep(aac); @@ -165,19 +163,15 @@ srs_error_t SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb // append to buffer. buffer->append(buf, nb_buf); - srs_info("udp: got %s:%d packet %d/%d bytes", - peer_ip.c_str(), peer_port, nb_buf, buffer->length()); - - int ret = on_udp_bytes(peer_ip, peer_port, buf, nb_buf); - if (ret != ERROR_SUCCESS) { - return srs_error_new(ret, "process udp"); + srs_error_t err = on_udp_bytes(peer_ip, peer_port, buf, nb_buf); + if (err != srs_success) { + return srs_error_wrap(err, "process udp"); } - return srs_success; + return err; } -int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) +srs_error_t SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // collect nMB data to parse in a time. @@ -186,25 +180,25 @@ int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) // TODO: FIXME: remove the debug to file. #if 0 SrsFileWriter fw; - if ((ret = fw.open("latest.ts")) != ERROR_SUCCESS) { - return ret; + if ((err = fw.open("latest.ts")) != srs_success) { + return srs_error_wrap(err, "open file"); } - if ((ret = fw.write(buffer->bytes(), buffer->length(), NULL)) != ERROR_SUCCESS) { - return ret; + if ((err = fw.write(buffer->bytes(), buffer->length(), NULL)) != srs_success) { + return srs_error_wrap(err, "write data"); } fw.close(); #endif #if 0 SrsFileReader fr; - if ((ret = fr.open("latest.ts")) != ERROR_SUCCESS) { - return ret; + if ((err = fr.open("latest.ts")) != srs_success) { + return srs_error_wrap(err, "open file"); } buffer->erase(buffer->length()); int nb_fbuf = fr.filesize(); char* fbuf = new char[nb_fbuf]; SrsAutoFreeA(char, fbuf); - if ((ret = fr.read(fbuf, nb_fbuf, NULL)) != ERROR_SUCCESS) { - return ret; + if ((err = fr.read(fbuf, nb_fbuf, NULL)) != srs_success) { + return srs_error_wrap(err, "read data"); } fr.close(); buffer->append(fbuf, nb_fbuf); @@ -226,40 +220,35 @@ int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) // drop ts packet when size not modulus by 188 if (buffer->length() < SRS_TS_PACKET_SIZE) { srs_warn("udp: wait %s:%d packet %d/%d bytes", host.c_str(), port, nb_buf, buffer->length()); - return ret; + return err; } // use stream to parse ts packet. int nb_packet = buffer->length() / SRS_TS_PACKET_SIZE; for (int i = 0; i < nb_packet; i++) { char* p = buffer->bytes() + (i * SRS_TS_PACKET_SIZE); - if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { - return ret; - } + + SrsBuffer* stream = new SrsBuffer(p, SRS_TS_PACKET_SIZE); + SrsAutoFree(SrsBuffer, stream); // process each ts packet if ((err = context->decode(stream, this)) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); + srs_warn("parse ts packet err=%s", srs_error_desc(err).c_str()); + srs_error_reset(err); continue; } - srs_info("mpegts: parse ts packet completed"); } - srs_info("mpegts: parse udp packet completed"); // erase consumed bytes if (nb_packet > 0) { buffer->erase(nb_packet * SRS_TS_PACKET_SIZE); } - return ret; + return err; } srs_error_t SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; pprint->elapse(); @@ -326,20 +315,17 @@ srs_error_t SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) } // parse the stream. - SrsBuffer avs; - if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { - return srs_error_new(ret, "ts: init av stream"); - } + SrsBuffer avs(msg->payload->bytes(), msg->payload->length()); // publish audio or video. if (msg->channel->stream == SrsTsStreamVideoH264) { - if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) { - return srs_error_new(ret, "ts: consume video"); + if ((err = on_ts_video(msg, &avs)) != srs_success) { + return srs_error_wrap(err, "ts: consume video"); } } if (msg->channel->stream == SrsTsStreamAudioAAC) { - if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) { - return srs_error_new(ret, "ts: consume audio"); + if ((err = on_ts_audio(msg, &avs)) != srs_success) { + return srs_error_wrap(err, "ts: consume audio"); } } @@ -347,13 +333,13 @@ srs_error_t SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) return err; } -int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) +srs_error_t SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ensure rtmp connected. - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); } // ts tbn to flv tbn. @@ -364,8 +350,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) while (!avs->empty()) { char* frame = NULL; int frame_size = 0; - if ((ret = avc->annexb_demux(avs, &frame, &frame_size)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->annexb_demux(avs, &frame, &frame_size)) != srs_success) { + return srs_error_wrap(err, "demux annexb"); } // 5bits, 7.3.1 NAL unit syntax, @@ -381,8 +367,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) // for sps if (avc->is_sps(frame, frame_size)) { std::string sps; - if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { + return srs_error_wrap(err, "demux sps"); } if (h264_sps == sps) { @@ -391,8 +377,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) h264_sps_changed = true; h264_sps = sps; - if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); } continue; } @@ -400,8 +386,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) // for pps if (avc->is_pps(frame, frame_size)) { std::string pps; - if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { + return srs_error_wrap(err, "demux pps"); } if (h264_pps == pps) { @@ -410,8 +396,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) h264_pps_changed = true; h264_pps = pps; - if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); } continue; } @@ -419,30 +405,30 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) // ibp frame. // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", frame_size, dts); - if ((ret = write_h264_ipb_frame(frame, frame_size, dts, pts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { + return srs_error_wrap(err, "write frame"); } } - return ret; + return err; } -int SrsMpegtsOverUdp::write_h264_sps_pps(uint32_t dts, uint32_t pts) +srs_error_t SrsMpegtsOverUdp::write_h264_sps_pps(uint32_t dts, uint32_t pts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // TODO: FIMXE: there exists bug, see following comments. // when sps or pps changed, update the sequence header, // for the pps maybe not changed while sps changed. // so, we must check when each video ts message frame parsed. if (!h264_sps_changed || !h264_pps_changed) { - return ret; + return err; } // h264 raw to h264 packet. std::string sh; - if ((ret = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); } // h264 packet to flv packet. @@ -450,14 +436,14 @@ int SrsMpegtsOverUdp::write_h264_sps_pps(uint32_t dts, uint32_t pts) int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; char* flv = NULL; int nb_flv = 0; - if ((ret = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "avc to flv"); } // the timestamp in rtmp message header is dts. uint32_t timestamp = dts; - if ((ret = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != ERROR_SUCCESS) { - return ret; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); } // reset sps and pps. @@ -465,17 +451,17 @@ int SrsMpegtsOverUdp::write_h264_sps_pps(uint32_t dts, uint32_t pts) h264_pps_changed = false; h264_sps_pps_sent = true; - return ret; + return err; } -int SrsMpegtsOverUdp::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +srs_error_t SrsMpegtsOverUdp::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // when sps or pps not sent, ignore the packet. // @see https://github.com/ossrs/srs/issues/203 if (!h264_sps_pps_sent) { - return ERROR_H264_DROP_BEFORE_SPS_PPS; + return srs_error_new(ERROR_H264_DROP_BEFORE_SPS_PPS, "drop sps/pps"); } // 5bits, 7.3.1 NAL unit syntax, @@ -490,15 +476,15 @@ int SrsMpegtsOverUdp::write_h264_ipb_frame(char* frame, int frame_size, uint32_t } std::string ibp; - if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux frame"); } int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; char* flv = NULL; int nb_flv = 0; - if ((ret = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); } // the timestamp in rtmp message header is dts. @@ -506,13 +492,13 @@ int SrsMpegtsOverUdp::write_h264_ipb_frame(char* frame, int frame_size, uint32_t return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); } -int SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) +srs_error_t SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ensure rtmp connected. - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); } // ts tbn to flv tbn. @@ -523,8 +509,8 @@ int SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) char* frame = NULL; int frame_size = 0; SrsRawAacStreamCodec codec; - if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) { - return ret; + if ((err = aac->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) { + return srs_error_wrap(err, "demux adts"); } // ignore invalid frame, @@ -537,61 +523,59 @@ int SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) // generate sh. if (aac_specific_config.empty()) { std::string sh; - if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) { - return ret; + if ((err = aac->mux_sequence_header(&codec, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); } aac_specific_config = sh; codec.aac_packet_type = 0; - if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write raw audio frame"); } } // audio raw data. codec.aac_packet_type = 1; - if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); } } - return ret; + return err; } -int SrsMpegtsOverUdp::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) +srs_error_t SrsMpegtsOverUdp::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; char* data = NULL; int size = 0; - if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) { - return ret; + if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); } return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); } -int SrsMpegtsOverUdp::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) +srs_error_t SrsMpegtsOverUdp::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); } SrsSharedPtrMessage* msg = NULL; - if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { - srs_error("mpegts: create shared ptr msg failed. ret=%d", ret); - return ret; + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); } srs_assert(msg); // push msg to queue. - if ((ret = queue->push(msg)) != ERROR_SUCCESS) { - srs_error("mpegts: push msg to queue failed. ret=%d", ret); - return ret; + if ((err = queue->push(msg)) != srs_success) { + return srs_error_wrap(err, "push to queue"); } // for all ready msg, dequeue and send out. @@ -606,41 +590,39 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, uint32_t timestamp, char* dat } // send out encoded msg. - if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { + if ((err = sdk->send_and_free_message(msg)) != srs_success) { close(); - return ret; + return srs_error_wrap(err, "send messages"); } } - return ret; + return err; } -int SrsMpegtsOverUdp::connect() +srs_error_t SrsMpegtsOverUdp::connect() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // Ignore when connected. if (sdk) { - return ret; + return err; } int64_t cto = SRS_CONSTS_RTMP_TMMS; int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS; sdk = new SrsSimpleRtmpClient(output, cto, sto); - if ((ret = sdk->connect()) != ERROR_SUCCESS) { + if ((err = sdk->connect()) != srs_success) { close(); - srs_error("mpegts: connect %s failed, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", output.c_str(), cto, sto, ret); - return ret; + return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto); } - if ((ret = sdk->publish()) != ERROR_SUCCESS) { + if ((err = sdk->publish()) != srs_success) { close(); - srs_error("mpegts: publish failed. ret=%d", ret); - return ret; + return srs_error_wrap(err, "publish"); } - return ret; + return err; } void SrsMpegtsOverUdp::close() diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index 77901163f3..819e506318 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -77,7 +77,6 @@ class SrsMpegtsOverUdp : virtual public ISrsTsHandler , virtual public ISrsUdpHandler { private: - SrsBuffer* stream; SrsTsContext* context; SrsSimpleStream* buffer; std::string output; diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp index 2e985fa236..b16f360f0a 100644 --- a/trunk/src/app/srs_app_ng_exec.cpp +++ b/trunk/src/app/srs_app_ng_exec.cpp @@ -50,29 +50,23 @@ SrsNgExec::~SrsNgExec() srs_freep(pprint); } -int SrsNgExec::on_publish(SrsRequest* req) +srs_error_t SrsNgExec::on_publish(SrsRequest* req) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // when publish, parse the exec_publish. - if ((ret = parse_exec_publish(req)) != ERROR_SUCCESS) { - return ret; + if ((err = parse_exec_publish(req)) != srs_success) { + return srs_error_wrap(err, "exec publish"); } // start thread to run all processes. srs_freep(trd); trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id()); if ((err = trd->start()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - srs_error("st_thread_create failed. ret=%d", ret); - return ret; + return srs_error_wrap(err, "start thread"); } - return ret; + return err; } void SrsNgExec::on_unpublish() @@ -112,7 +106,6 @@ srs_error_t SrsNgExec::cycle() srs_error_t SrsNgExec::do_cycle() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // ignore when no exec. @@ -125,13 +118,13 @@ srs_error_t SrsNgExec::do_cycle() SrsProcess* process = *it; // start all processes. - if ((ret = process->start()) != ERROR_SUCCESS) { - return srs_error_new(ret, "process start"); + if ((err = process->start()) != srs_success) { + return srs_error_wrap(err, "process start"); } // check process status. - if ((ret = process->cycle()) != ERROR_SUCCESS) { - return srs_error_new(ret, "process cycle"); + if ((err = process->cycle()) != srs_success) { + return srs_error_wrap(err, "process cycle"); } } @@ -141,13 +134,13 @@ srs_error_t SrsNgExec::do_cycle() return err; } -int SrsNgExec::parse_exec_publish(SrsRequest* req) +srs_error_t SrsNgExec::parse_exec_publish(SrsRequest* req) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!_srs_config->get_exec_enabled(req->vhost)) { srs_trace("ignore disabled exec for vhost=%s", req->vhost.c_str()); - return ret; + return err; } // stream name: vhost/app/stream for print @@ -181,16 +174,15 @@ int SrsNgExec::parse_exec_publish(SrsRequest* req) argv.push_back(parse(req, epa)); } - if ((ret = process->initialize(binary, argv)) != ERROR_SUCCESS) { + if ((err = process->initialize(binary, argv)) != srs_success) { srs_freep(process); - srs_error("initialize process failed, binary=%s, vhost=%s, ret=%d", binary.c_str(), req->vhost.c_str(), ret); - return ret; + return srs_error_wrap(err, "initialize process failed, binary=%s, vhost=%s", binary.c_str(), req->vhost.c_str()); } exec_publishs.push_back(process); } - return ret; + return err; } void SrsNgExec::clear_exec_publish() diff --git a/trunk/src/app/srs_app_process.cpp b/trunk/src/app/srs_app_process.cpp index c9e83ad8da..0a9cbae1ef 100644 --- a/trunk/src/app/srs_app_process.cpp +++ b/trunk/src/app/srs_app_process.cpp @@ -66,9 +66,9 @@ bool SrsProcess::started() return is_started; } -int SrsProcess::initialize(string binary, vector argv) +srs_error_t SrsProcess::initialize(string binary, vector argv) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; bin = binary; cli = ""; @@ -138,16 +138,16 @@ int SrsProcess::initialize(string binary, vector argv) actual_cli = srs_join_vector_string(params, " "); cli = srs_join_vector_string(argv, " "); - return ret; + return err; } -int srs_redirect_output(string from_file, int to_fd) +srs_error_t srs_redirect_output(string from_file, int to_fd) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // use default output. if (from_file.empty()) { - return ret; + return err; } // redirect the fd to file. @@ -156,28 +156,24 @@ int srs_redirect_output(string from_file, int to_fd) mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH; if ((fd = ::open(from_file.c_str(), flags, mode)) < 0) { - ret = ERROR_FORK_OPEN_LOG; - fprintf(stderr, "open process %d %s failed. ret=%d", to_fd, from_file.c_str(), ret); - exit(ret); + return srs_error_new(ERROR_FORK_OPEN_LOG, "open process %d %s failed", to_fd, from_file.c_str()); } if (dup2(fd, to_fd) < 0) { - ret = ERROR_FORK_DUP2_LOG; - srs_error("dup2 process %d failed. ret=%d", to_fd, ret); - exit(ret); + return srs_error_new(ERROR_FORK_DUP2_LOG, "dup2 process %d failed", to_fd); } ::close(fd); - return ret; + return err; } -int SrsProcess::start() +srs_error_t SrsProcess::start() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (is_started) { - return ret; + return err; } // generate the argv of process. @@ -189,9 +185,7 @@ int SrsProcess::start() // TODO: fork or vfork? if ((pid = fork()) < 0) { - ret = ERROR_ENCODER_FORK; - srs_error("vfork process failed, cli=%s. ret=%d", cli.c_str(), ret); - return ret; + return srs_error_new(ERROR_ENCODER_FORK, "vfork process failed, cli=%s", cli.c_str()); } // for osx(lldb) to debug the child process. @@ -209,14 +203,14 @@ int SrsProcess::start() // for the stdout, ignore when not specified. // redirect stdout to file if possible. - if ((ret = srs_redirect_output(stdout_file, STDOUT_FILENO)) != ERROR_SUCCESS) { - return ret; + if ((err = srs_redirect_output(stdout_file, STDOUT_FILENO)) != srs_success) { + return srs_error_wrap(err, "redirect output"); } // for the stderr, ignore when not specified. // redirect stderr to file if possible. - if ((ret = srs_redirect_output(stderr_file, STDERR_FILENO)) != ERROR_SUCCESS) { - return ret; + if ((err = srs_redirect_output(stderr_file, STDERR_FILENO)) != srs_success) { + return srs_error_wrap(err, "redirect output"); } // should never close the fd 3+, for it myabe used. @@ -242,11 +236,11 @@ int SrsProcess::start() argv[params.size()] = NULL; // use execv to start the program. - ret = execv(bin.c_str(), argv); - if (ret < 0) { + int r0 = execv(bin.c_str(), argv); + if (r0 < 0) { fprintf(stderr, "fork process failed, errno=%d(%s)", errno, strerror(errno)); } - exit(ret); + exit(r0); } // parent. @@ -254,43 +248,41 @@ int SrsProcess::start() is_started = true; srs_trace("fored process, pid=%d, bin=%s, stdout=%s, stderr=%s, argv=%s", pid, bin.c_str(), stdout_file.c_str(), stderr_file.c_str(), actual_cli.c_str()); - return ret; + return err; } - return ret; + return err; } -int SrsProcess::cycle() +srs_error_t SrsProcess::cycle() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!is_started) { - return ret; + return err; } // ffmpeg is prepare to stop, donot cycle. if (fast_stopped) { - return ret; + return err; } int status = 0; pid_t p = waitpid(pid, &status, WNOHANG); if (p < 0) { - ret = ERROR_SYSTEM_WAITPID; - srs_error("process waitpid failed, pid=%d, ret=%d", pid, ret); - return ret; + return srs_error_new(ERROR_SYSTEM_WAITPID, "process waitpid failed, pid=%d", pid); } if (p == 0) { srs_info("process process pid=%d is running.", pid); - return ret; + return err; } srs_trace("process pid=%d terminate, please restart it.", pid); is_started = false; - return ret; + return err; } void SrsProcess::stop() @@ -303,9 +295,10 @@ void SrsProcess::stop() // when rewind, upstream will stop publish(unpublish), // unpublish event will stop all ffmpeg encoders, // then publish will start all ffmpeg encoders. - int ret = srs_kill_forced(pid); - if (ret != ERROR_SUCCESS) { - srs_warn("ignore kill the process failed, pid=%d. ret=%d", pid, ret); + srs_error_t err = srs_kill_forced(pid); + if (err != srs_success) { + srs_warn("ignore kill the process failed, pid=%d. err=%s", pid, srs_error_desc(err).c_str()); + srs_freep(err); return; } diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 1d45d902c3..e7376e53ba 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -78,21 +78,16 @@ srs_error_t SrsRtpConn::listen() srs_error_t SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; pprint->elapse(); if (true) { - SrsBuffer stream; - - if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return srs_error_new(ret, "stream"); - } + SrsBuffer stream(buf, nb_buf); SrsRtpPacket pkt; - if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) { - return srs_error_new(ret, "decode"); + if ((err = pkt.decode(&stream)) != srs_success) { + return srs_error_wrap(err, "decode"); } if (pkt.chunked) { @@ -125,8 +120,8 @@ srs_error_t SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) // always free it. SrsAutoFree(SrsRtpPacket, cache); - if ((ret = rtsp->on_rtp_packet(cache, stream_id)) != ERROR_SUCCESS) { - return srs_error_new(ret, "process rtp packet"); + if ((err = rtsp->on_rtp_packet(cache, stream_id)) != srs_success) { + return srs_error_wrap(err, "process rtp packet"); } return err; @@ -161,9 +156,9 @@ int64_t SrsRtspJitter::timestamp() return pts; } -int SrsRtspJitter::correct(int64_t& ts) +srs_error_t SrsRtspJitter::correct(int64_t& ts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (previous_timestamp == 0) { previous_timestamp = ts; @@ -179,7 +174,7 @@ int SrsRtspJitter::correct(int64_t& ts) ts = pts + delta; pts = ts; - return ret; + return err; } SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o) @@ -246,7 +241,6 @@ srs_error_t SrsRtspConn::serve() srs_error_t SrsRtspConn::do_cycle() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // retrieve ip of client. @@ -260,8 +254,8 @@ srs_error_t SrsRtspConn::do_cycle() } SrsRtspRequest* req = NULL; - if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) { - return srs_error_new(ret, "recv message"); + if ((err = rtsp->recv_message(&req)) != srs_success) { + return srs_error_wrap(err, "recv message"); } SrsAutoFree(SrsRtspRequest, req); srs_info("rtsp: got rtsp request"); @@ -269,8 +263,8 @@ srs_error_t SrsRtspConn::do_cycle() if (req->is_options()) { SrsRtspOptionsResponse* res = new SrsRtspOptionsResponse((int)req->seq); res->session = session; - if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - return srs_error_new(ret, "response option"); + if ((err = rtsp->send_message(res)) != srs_success) { + return srs_error_wrap(err, "response option"); } } else if (req->is_announce()) { if (rtsp_tcUrl.empty()) { @@ -300,14 +294,14 @@ srs_error_t SrsRtspConn::do_cycle() SrsRtspResponse* res = new SrsRtspResponse((int)req->seq); res->session = session; - if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - return srs_error_new(ret, "response announce"); + if ((err = rtsp->send_message(res)) != srs_success) { + return srs_error_wrap(err, "response announce"); } } else if (req->is_setup()) { srs_assert(req->transport); int lpm = 0; - if ((ret = caster->alloc_port(&lpm)) != ERROR_SUCCESS) { - return srs_error_new(ret, "alloc port"); + if ((err = caster->alloc_port(&lpm)) != srs_success) { + return srs_error_wrap(err, "alloc port"); } SrsRtpConn* rtp = NULL; @@ -338,14 +332,14 @@ srs_error_t SrsRtspConn::do_cycle() res->local_port_min = lpm; res->local_port_max = lpm + 1; res->session = session; - if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - return srs_error_new(ret, "response setup"); + if ((err = rtsp->send_message(res)) != srs_success) { + return srs_error_wrap(err, "response setup"); } } else if (req->is_record()) { SrsRtspResponse* res = new SrsRtspResponse((int)req->seq); res->session = session; - if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { - return srs_error_new(ret, "response record"); + if ((err = rtsp->send_message(res)) != srs_success) { + return srs_error_wrap(err, "response record"); } } } @@ -353,21 +347,20 @@ srs_error_t SrsRtspConn::do_cycle() return err; } -int SrsRtspConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) +srs_error_t SrsRtspConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ensure rtmp connected. - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); } if (stream_id == video_id) { // rtsp tbn is ts tbn. int64_t pts = pkt->timestamp; - if ((ret = vjitter->correct(pts)) != ERROR_SUCCESS) { - srs_error("rtsp: correct by jitter failed. ret=%d", ret); - return ret; + if ((err = vjitter->correct(pts)) != srs_success) { + return srs_error_wrap(err, "jitter"); } // TODO: FIXME: set dts to pts, please finger out the right dts. @@ -377,15 +370,14 @@ int SrsRtspConn::on_rtp_packet(SrsRtpPacket* pkt, int stream_id) } else { // rtsp tbn is ts tbn. int64_t pts = pkt->timestamp; - if ((ret = ajitter->correct(pts)) != ERROR_SUCCESS) { - srs_error("rtsp: correct by jitter failed. ret=%d", ret); - return ret; + if ((err = ajitter->correct(pts)) != srs_success) { + return srs_error_wrap(err, "jitter"); } return on_rtp_audio(pkt, pts); } - return ret; + return err; } srs_error_t SrsRtspConn::cycle() @@ -414,31 +406,31 @@ srs_error_t SrsRtspConn::cycle() return err; } -int SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) +srs_error_t SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = kickoff_audio_cache(pkt, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + return srs_error_wrap(err, "kickoff audio cache"); } char* bytes = pkt->payload->bytes(); int length = pkt->payload->length(); uint32_t fdts = (uint32_t)(dts / 90); uint32_t fpts = (uint32_t)(pts / 90); - if ((ret = write_h264_ipb_frame(bytes, length, fdts, fpts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_h264_ipb_frame(bytes, length, fdts, fpts)) != srs_success) { + return srs_error_wrap(err, "write ibp frame"); } - return ret; + return err; } -int SrsRtspConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) +srs_error_t SrsRtspConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = kickoff_audio_cache(pkt, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = kickoff_audio_cache(pkt, dts)) != srs_success) { + return srs_error_wrap(err, "kickoff audio cache"); } // cache current audio to kickoff. @@ -449,16 +441,16 @@ int SrsRtspConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts) pkt->audio = NULL; pkt->payload = NULL; - return ret; + return err; } -int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) +srs_error_t SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // nothing to kick off. if (!acache->payload) { - return ret; + return err; } if (dts - acache->dts > 0 && acache->audio->nb_samples > 0) { @@ -468,8 +460,8 @@ int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) int nb_frame = acache->audio->samples[i].size; int64_t timestamp = (acache->dts + delta * i) / 90; acodec->aac_packet_type = 1; - if ((ret = write_audio_raw_frame(frame, nb_frame, acodec, (uint32_t)timestamp)) != ERROR_SUCCESS) { - return ret; + if ((err = write_audio_raw_frame(frame, nb_frame, acodec, (uint32_t)timestamp)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); } } } @@ -478,20 +470,19 @@ int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) srs_freep(acache->audio); srs_freep(acache->payload); - return ret; + return err; } -int SrsRtspConn::write_sequence_header() +srs_error_t SrsRtspConn::write_sequence_header() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; // use the current dts. int64_t dts = vjitter->timestamp() / 90; // send video sps/pps - if ((ret = write_h264_sps_pps((uint32_t)dts, (uint32_t)dts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_h264_sps_pps((uint32_t)dts, (uint32_t)dts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); } // generate audio sh by audio specific config. @@ -502,10 +493,7 @@ int SrsRtspConn::write_sequence_header() SrsAutoFree(SrsFormat, format); if ((err = format->on_aac_sequence_header((char*)sh.c_str(), (int)sh.length())) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - return ret; + return srs_error_wrap(err, "on aac sequence header"); } SrsAudioCodecConfig* dec = format->acodec; @@ -535,22 +523,22 @@ int SrsRtspConn::write_sequence_header() break; }; - if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (uint32_t)dts)) != ERROR_SUCCESS) { - return ret; + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (uint32_t)dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); } } - return ret; + return err; } -int SrsRtspConn::write_h264_sps_pps(uint32_t dts, uint32_t pts) +srs_error_t SrsRtspConn::write_h264_sps_pps(uint32_t dts, uint32_t pts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // h264 raw to h264 packet. std::string sh; - if ((ret = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); } // h264 packet to flv packet. @@ -558,22 +546,22 @@ int SrsRtspConn::write_h264_sps_pps(uint32_t dts, uint32_t pts) int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; char* flv = NULL; int nb_flv = 0; - if ((ret = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); } // the timestamp in rtmp message header is dts. uint32_t timestamp = dts; - if ((ret = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != ERROR_SUCCESS) { - return ret; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); } - return ret; + return err; } -int SrsRtspConn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +srs_error_t SrsRtspConn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // 5bits, 7.3.1 NAL unit syntax, // ISO_IEC_14496-10-AVC-2003.pdf, page 44. @@ -587,15 +575,15 @@ int SrsRtspConn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, } std::string ibp; - if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux ibp frame"); } int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; char* flv = NULL; int nb_flv = 0; - if ((ret = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { - return ret; + if ((err = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); } // the timestamp in rtmp message header is dts. @@ -603,51 +591,50 @@ int SrsRtspConn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); } -int SrsRtspConn::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) +srs_error_t SrsRtspConn::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; char* data = NULL; int size = 0; - if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) { - return ret; + if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); } return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); } -int SrsRtspConn::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) +srs_error_t SrsRtspConn::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); } SrsSharedPtrMessage* msg = NULL; - if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { - srs_error("rtsp: create shared ptr msg failed. ret=%d", ret); - return ret; + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); } srs_assert(msg); // send out encoded msg. - if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { + if ((err = sdk->send_and_free_message(msg)) != srs_success) { close(); - return ret; + return srs_error_wrap(err, "write message"); } - return ret; + return err; } -int SrsRtspConn::connect() +srs_error_t SrsRtspConn::connect() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // Ignore when connected. if (sdk) { - return ret; + return err; } // generate rtmp url to connect to. @@ -668,17 +655,15 @@ int SrsRtspConn::connect() int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS; sdk = new SrsSimpleRtmpClient(url, cto, sto); - if ((ret = sdk->connect()) != ERROR_SUCCESS) { + if ((err = sdk->connect()) != srs_success) { close(); - srs_error("rtsp: connect %s failed, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", url.c_str(), cto, sto, ret); - return ret; + return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto); } // publish. - if ((ret = sdk->publish()) != ERROR_SUCCESS) { + if ((err = sdk->publish()) != srs_success) { close(); - srs_error("rtsp: publish %s failed. ret=%d", url.c_str(), ret); - return ret; + return srs_error_wrap(err, "publish %s failed", url.c_str()); } return write_sequence_header(); @@ -708,9 +693,9 @@ SrsRtspCaster::~SrsRtspCaster() used_ports.clear(); } -int SrsRtspCaster::alloc_port(int* pport) +srs_error_t SrsRtspCaster::alloc_port(int* pport) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // use a pair of port. for (int i = local_port_min; i < local_port_max - 1; i += 2) { @@ -723,7 +708,7 @@ int SrsRtspCaster::alloc_port(int* pport) } srs_info("rtsp: alloc port=%d-%d", *pport, *pport + 1); - return ret; + return err; } void SrsRtspCaster::free_port(int lpmin, int lpmax) diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 5c123dda3e..aca5a2a56c 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -155,9 +155,9 @@ class SrsRtspConn : public ISrsCoroutineHandler virtual srs_error_t serve(); private: virtual srs_error_t do_cycle(); - // internal methods +// internal methods public: - virtual int on_rtp_packet(SrsRtpPacket* pkt, int stream_id); + virtual srs_error_t on_rtp_packet(SrsRtpPacket* pkt, int stream_id); // interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); diff --git a/trunk/src/app/srs_app_security.cpp b/trunk/src/app/srs_app_security.cpp index 352c7b1526..7be676dd84 100644 --- a/trunk/src/app/srs_app_security.cpp +++ b/trunk/src/app/srs_app_security.cpp @@ -36,35 +36,36 @@ SrsSecurity::~SrsSecurity() { } -int SrsSecurity::check(SrsRtmpConnType type, string ip, SrsRequest* req) +srs_error_t SrsSecurity::check(SrsRtmpConnType type, string ip, SrsRequest* req) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // allow all if security disabled. if (!_srs_config->get_security_enabled(req->vhost)) { - return ret; + return err; } // default to deny all when security enabled. - ret = ERROR_SYSTEM_SECURITY; + err = srs_error_new(ERROR_SYSTEM_SECURITY, "allowed"); // rules to apply SrsConfDirective* rules = _srs_config->get_security_rules(req->vhost); if (!rules) { - return ret; + return err; } // allow if matches allow strategy. if (allow_check(rules, type, ip) == ERROR_SYSTEM_SECURITY_ALLOW) { - ret = ERROR_SUCCESS; + srs_error_reset(err); } // deny if matches deny strategy. if (deny_check(rules, type, ip) == ERROR_SYSTEM_SECURITY_DENY) { - ret = ERROR_SYSTEM_SECURITY_DENY; + srs_error_reset(err); + return srs_error_new(ERROR_SYSTEM_SECURITY_DENY, "denied"); } - return ret; + return err; } int SrsSecurity::allow_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip) diff --git a/trunk/src/app/srs_app_security.hpp b/trunk/src/app/srs_app_security.hpp index 249f87e421..a2e10382d1 100644 --- a/trunk/src/app/srs_app_security.hpp +++ b/trunk/src/app/srs_app_security.hpp @@ -54,12 +54,12 @@ class SrsSecurity * security check the allow, * @return, if allowed, ERROR_SYSTEM_SECURITY_ALLOW. */ - virtual srs_error_t allow_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip); + virtual int allow_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip); /** * security check the deny, * @return, if denied, ERROR_SYSTEM_SECURITY_DENY. */ - virtual srs_error_t deny_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip); + virtual int deny_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip); }; #endif diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index acc901124c..ca5867a9f3 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -58,9 +58,9 @@ SrsStatisticVhost::~SrsStatisticVhost() srs_freep(kbps); } -int SrsStatisticVhost::dumps(SrsJsonObject* obj) +srs_error_t SrsStatisticVhost::dumps(SrsJsonObject* obj) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // dumps the config of vhost. bool hls_enabled = _srs_config->get_hls_enabled(vhost); @@ -88,7 +88,7 @@ int SrsStatisticVhost::dumps(SrsJsonObject* obj) hls->set("fragment", SrsJsonAny::number(_srs_config->get_hls_fragment(vhost))); } - return ret; + return err; } SrsStatisticStream::SrsStatisticStream() @@ -123,9 +123,9 @@ SrsStatisticStream::~SrsStatisticStream() srs_freep(kbps); } -int SrsStatisticStream::dumps(SrsJsonObject* obj) +srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; obj->set("id", SrsJsonAny::integer(id)); obj->set("name", SrsJsonAny::str(stream.c_str())); @@ -174,7 +174,7 @@ int SrsStatisticStream::dumps(SrsJsonObject* obj) audio->set("profile", SrsJsonAny::str(srs_aac_object2str(aac_object).c_str())); } - return ret; + return err; } void SrsStatisticStream::publish(int cid) @@ -208,9 +208,9 @@ SrsStatisticClient::~SrsStatisticClient() { } -int SrsStatisticClient::dumps(SrsJsonObject* obj) +srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; obj->set("id", SrsJsonAny::integer(id)); obj->set("vhost", SrsJsonAny::integer(stream->vhost->id)); @@ -224,7 +224,7 @@ int SrsStatisticClient::dumps(SrsJsonObject* obj) obj->set("publish", SrsJsonAny::boolean(srs_client_type_is_publish(type))); obj->set("alive", SrsJsonAny::number((srs_get_system_time_ms() - create) / 1000.0)); - return ret; + return err; } SrsStatistic* SrsStatistic::_instance = new SrsStatistic(); @@ -310,9 +310,9 @@ SrsStatisticClient* SrsStatistic::find_client(int cid) return NULL; } -int SrsStatistic::on_video_info(SrsRequest* req, SrsVideoCodecId vcodec, SrsAvcProfile avc_profile, SrsAvcLevel avc_level, int width, int height) +srs_error_t SrsStatistic::on_video_info(SrsRequest* req, SrsVideoCodecId vcodec, SrsAvcProfile avc_profile, SrsAvcLevel avc_level, int width, int height) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); @@ -325,12 +325,12 @@ int SrsStatistic::on_video_info(SrsRequest* req, SrsVideoCodecId vcodec, SrsAvcP stream->width = width; stream->height = height; - return ret; + return err; } -int SrsStatistic::on_audio_info(SrsRequest* req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object) +srs_error_t SrsStatistic::on_audio_info(SrsRequest* req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); @@ -341,19 +341,19 @@ int SrsStatistic::on_audio_info(SrsRequest* req, SrsAudioCodecId acodec, SrsAudi stream->asound_type = asound_type; stream->aac_object = aac_object; - return ret; + return err; } -int SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames) +srs_error_t SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); stream->nb_frames += nb_frames; - return ret; + return err; } void SrsStatistic::on_stream_publish(SrsRequest* req, int cid) @@ -387,9 +387,9 @@ void SrsStatistic::on_stream_close(SrsRequest* req) } } -int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type) +srs_error_t SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); @@ -412,7 +412,7 @@ int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtm stream->nb_clients++; vhost->nb_clients++; - return ret; + return err; } void SrsStatistic::on_disconnect(int id) @@ -481,9 +481,9 @@ int64_t SrsStatistic::server_id() return _server_id; } -int SrsStatistic::dumps_vhosts(SrsJsonArray* arr) +srs_error_t SrsStatistic::dumps_vhosts(SrsJsonArray* arr) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::map::iterator it; for (it = vhosts.begin(); it != vhosts.end(); it++) { @@ -492,17 +492,17 @@ int SrsStatistic::dumps_vhosts(SrsJsonArray* arr) SrsJsonObject* obj = SrsJsonAny::object(); arr->append(obj); - if ((ret = vhost->dumps(obj)) != ERROR_SUCCESS) { - return ret; + if ((err = vhost->dumps(obj)) != srs_success) { + return srs_error_wrap(err, "dump vhost"); } } - return ret; + return err; } -int SrsStatistic::dumps_streams(SrsJsonArray* arr) +srs_error_t SrsStatistic::dumps_streams(SrsJsonArray* arr) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::map::iterator it; for (it = streams.begin(); it != streams.end(); it++) { @@ -511,17 +511,17 @@ int SrsStatistic::dumps_streams(SrsJsonArray* arr) SrsJsonObject* obj = SrsJsonAny::object(); arr->append(obj); - if ((ret = stream->dumps(obj)) != ERROR_SUCCESS) { - return ret; + if ((err = stream->dumps(obj)) != srs_success) { + return srs_error_wrap(err, "dump stream"); } } - return ret; + return err; } -int SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count) +srs_error_t SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::map::iterator it = clients.begin(); for (int i = 0; i < start + count && it != clients.end(); it++, i++) { @@ -534,12 +534,12 @@ int SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count) SrsJsonObject* obj = SrsJsonAny::object(); arr->append(obj); - if ((ret = client->dumps(obj)) != ERROR_SUCCESS) { - return ret; + if ((err = client->dumps(obj)) != srs_success) { + return srs_error_wrap(err, "dump client"); } } - return ret; + return err; } SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)