diff --git a/trunk/conf/forward.master.conf b/trunk/conf/forward.master.conf index 630a4c84eaa..f1ccb242613 100644 --- a/trunk/conf/forward.master.conf +++ b/trunk/conf/forward.master.conf @@ -11,5 +11,6 @@ vhost __defaultVhost__ { forward { enabled on; destination 127.0.0.1:19350; + backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index e50af7c46ea..e28dd7d9d6c 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -805,6 +805,173 @@ def POST(self): def OPTIONS(self, *args, **kwargs): enable_crossdomain() +''' +handle the forward requests: dynamic forward url. +''' +class RESTForward(object): + exposed = True + + def __init__(self): + self.__forwards = [] + + self.__forwards.append({ + "vhost":"ossrs.net", + "app":"live", + "stream":"livestream", + "url":"push.ossrs.com", + }) + + self.__forwards.append({ + "app":"live", + "stream":"livestream", + "url":"push.ossrs.com", + }) + + self.__forwards.append({ + "app":"live", + "stream":"livestream", + "url":"rtmp://push.ossrs.com/test/teststream?auth_token=123456", + }) + + def GET(self): + enable_crossdomain() + + forwards = {} + return json.dumps(forwards) + + ''' + for SRS hook: on_forward + on_forward: + when srs reap a dvr file, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_forward", + "server_id": "server_test", + "client_id": 1985, + "ip": "192.168.1.10", + "vhost": "video.test.com", + "app": "live", + "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a", + "stream": "livestream", + "param":"?token=xxx&salt=yyy" + } + if valid, the hook must return HTTP code 200(Stauts OK) and response + an int value specifies the error code(0 corresponding to success): + 0 + ''' + def POST(self): + enable_crossdomain() + + # return the error code in str + code = Error.success + + req = cherrypy.request.body.read() + trace("post to forwards, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return json.dumps({"code": int(code), "data": None}) + + action = json_req["action"] + if action == "on_forward": + return self.__on_forward(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return json.dumps({"code": int(code), "data": None}) + + def OPTIONS(self, *args, **kwargs): + enable_crossdomain() + + def __on_forward(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"] + )) + + # dynamic create forward config + forwards = [] + destinations = [] + + # handle param: ?forward=xxxxx&auth_token=xxxxx + # 1.delete ? + req_param = req["param"].replace('?', '', 1) + + # 2.delete 'forward=xxxxx' + new_req_param = "" + params = req_param.split("&") + for param in params: + result = param.split("=") + if result[0].find("forward") != -1: + destinations.append({ + "url": result[1], + }) + elif len(new_req_param) > 0: + new_req_param = new_req_param + "&" + param + else: + new_req_param = param + + # secne: dynamic config + for forward in self.__forwards: + # vhost exist + if hasattr(forward, "vhost"): + if len(forward["vhost"]) > 0 and req["vhost"] != forward["vhost"]: + continue + # app exist + if hasattr(forward, "app"): + if len(forward["app"]) > 0 and req["app"] != forward["app"]: + continue + # app exist + if hasattr(forward, "stream"): + if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]: + continue + # no url + if forward["url"] is None: + continue + + # url maybe spell full rtmp address + url = forward["url"] + if url.find("rtmp://") == -1: + # format: xxx:xxx + # maybe you should use destination config + url = "rtmp://%s/%s"%(url, req['app']) + if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1: + url = url + "?vhost=" + req['vhost'] + url = url + "/" + req['stream'] + if len(new_req_param) > 0: + url = url + "?" + new_req_param + + # append + forwards.append({ + "url": url, + }) + + # secne: parse client params, like: + # format1: rtmp://srs-server/live/stream?forward=aliyuncdn.com:1936&token=xxxxxx + # format2: rtmp://srs-server/live/stream?forward=rtmp://cdn.com/myapp/mystream?XXXXXX + for destination in destinations: + url = destination["url"] + if url.find("rtmp://") == -1: + # format: xxx:xxx + # maybe you should use destination config + url = "rtmp://%s/%s"%(url, req['app']) + if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1: + url = url + "?vhost=" + req['vhost'] + url = url + "/" + req['stream'] + if len(new_req_param) > 0: + url = url + "?" + new_req_param + + # append + forwards.append({ + "url": url, + }) + + return json.dumps({"code": int(code), "data": {"forwards": forwards}}) + # HTTP RESTful path. class Root(object): exposed = True @@ -846,6 +1013,7 @@ def __init__(self): self.chats = RESTChats() self.servers = RESTServers() self.snapshots = RESTSnapshots() + self.forward = RESTForward() def GET(self): enable_crossdomain(); return json.dumps({"code":Error.success, "urls":{ diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index fa813737ceb..e9d376f8acb 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2795,7 +2795,7 @@ srs_error_t SrsConfig::check_normal_config() } else if (n == "forward") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; - if (m != "enabled" && m != "destination") { + if (m != "enabled" && m != "destination" && m != "backend") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -4576,6 +4576,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost) return conf->get("destination"); } +SrsConfDirective* SrsConfig::get_forward_backend(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return NULL; + } + + conf = conf->get("forward"); + if (!conf) { + return NULL; + } + + return conf->get("backend"); +} + SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index c62351c8726..8a35eb89b98 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -604,6 +604,8 @@ class SrsConfig virtual bool get_forward_enabled(SrsConfDirective* vhost); // Get the forward directive of vhost. virtual SrsConfDirective* get_forwards(std::string vhost); + // Get the forward directive of backend. + virtual SrsConfDirective* get_forward_backend(std::string vhost); public: // Whether the srt sevice enabled diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 06cfd118c00..0ec145f2e93 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder() srs_freep(sh_video); srs_freep(sh_audio); + + srs_freep(req); } srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) @@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) // it's ok to use the request object, // SrsLiveSource already copy it and never delete it. - req = r; + req = r->copy(); // the ep(endpoint) to forward to ep_forward = ep; diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 58ff8276d0f..9d8e1099c0c 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -482,6 +482,100 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por return err; } +// Request: +// POST /api/v1/forward +// { +// "action": "on_forward", +// "server_id": "vid-k21d7y2", +// "client_id": "9o7g1330", +// "ip": "127.0.0.1", +// "vhost": "__defaultVhost__", +// "app": "live", +// "tcUrl": "rtmp://127.0.0.1:1935/live", +// "stream": "livestream", +// "param": "?forward=rtmp://ossrs.net/live/livestream" +// } +// Response: +// { +// "code": 0, +// "data": { +// "forwards":[{ +// "url": "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx" +// },{ +// "url": "rtmp://aliyuncdn.com:1935/live/livestream?auth_token=xxx" +// }] +// } +// } +srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector& rtmp_urls) +{ + srs_error_t err = srs_success; + + SrsContextId cid = _srs_context->get_id(); + + SrsStatistic* stat = SrsStatistic::instance(); + + SrsJsonObject* obj = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, obj); + + obj->set("action", SrsJsonAny::str("on_forward")); + obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); + obj->set("ip", SrsJsonAny::str(req->ip.c_str())); + obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); + obj->set("app", SrsJsonAny::str(req->app.c_str())); + obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str())); + obj->set("stream", SrsJsonAny::str(req->stream.c_str())); + obj->set("param", SrsJsonAny::str(req->param.c_str())); + + std::string data = obj->dumps(); + std::string res; + int status_code; + + SrsHttpClient http; + if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + } + + // parse string res to json. + SrsJsonAny* info = SrsJsonAny::loads(res); + if (!info) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str()); + } + SrsAutoFree(SrsJsonAny, info); + + // response error code in string. + if (!info->is_object()) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str()); + } + + SrsJsonAny* prop = NULL; + // response standard object, format in json: {} + SrsJsonObject* res_info = info->to_object(); + if ((prop = res_info->ensure_property_object("data")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str()); + } + + SrsJsonObject* p = prop->to_object(); + if ((prop = p->ensure_property_array("forwards")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse forwards %s", res.c_str()); + } + + SrsJsonArray* forwards = prop->to_array(); + for (int i = 0; i < forwards->count(); i++) { + prop = forwards->at(i); + SrsJsonObject* forward = prop->to_object(); + if ((prop = forward->ensure_property_string("url")) != NULL) { + rtmp_urls.push_back(prop->to_str()); + } + } + + srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s", + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); + + return err; +} + srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 547f6c53359..6ab8d893947 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -10,6 +10,7 @@ #include #include +#include class SrsHttpUri; class SrsStSocket; @@ -79,6 +80,10 @@ class SrsHttpHooks static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); // Discover co-workers for origin cluster. static srs_error_t discover_co_workers(std::string url, std::string& host, int& port); + // The on_forward_backend hook, when publish stream start to forward + // @param url the api server url, to valid the client. + // ignore if empty. + static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector& rtmp_urls); private: static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res); }; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d297f0f4aa2..06e7f877a88 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -34,6 +34,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 250 #define CONST_MAX_JITTER_MS_NEG -250 @@ -1472,8 +1473,54 @@ srs_error_t SrsOriginHub::create_forwarders() if (!_srs_config->get_forward_enabled(req->vhost)) { return err; } - - SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); + + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); + + SrsConfDirective* conf = _srs_config->get_forward_backend(req->vhost); + if (conf) { + int count = conf->args.size(); + for (int i = 0; i < count; i++) { + std::string backend_url = conf->args.at(i); + + // create forward by backend + std::vector urls; + if ((err = SrsHttpHooks::on_forward_backend(backend_url, req, urls)) != srs_success) { + // ignore + srs_trace("get backend failed, %s", srs_error_desc(err).c_str()); + continue; + } + + std::vector::iterator it; + for (it = urls.begin(); it != urls.end(); ++it) { + std::string url = *it; + + // create forwarder by url + SrsRequest* freq = new SrsRequest(); + srs_parse_rtmp_url(url, freq->tcUrl, freq->stream); + srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param); + + SrsForwarder* forwarder = new SrsForwarder(this); + forwarders.push_back(forwarder); + + std::stringstream forward_server; + forward_server << freq->host << ":" << freq->port; + + // initialize the forwarder with request. + if ((err = forwarder->initialize(freq, forward_server.str())) != srs_success) { + return srs_error_wrap(err, "init forwarder"); + } + + forwarder->set_queue_size(queue_size); + + if ((err = forwarder->on_publish()) != srs_success) { + return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.str().c_str()); + } + } + } + } + + conf = _srs_config->get_forwards(req->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); @@ -1485,7 +1532,6 @@ srs_error_t SrsOriginHub::create_forwarders() return srs_error_wrap(err, "init forwarder"); } - srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); forwarder->set_queue_size(queue_size); if ((err = forwarder->on_publish()) != srs_success) { diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 978a2b30d6a..74b987bf408 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -2914,6 +2914,13 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2) EXPECT_EQ(5000000, conf.get_publish_normal_timeout("ossrs.net")); EXPECT_FALSE(conf.get_forward_enabled("ossrs.net")); EXPECT_TRUE(conf.get_forwards("ossrs.net") == NULL); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") == NULL); + } + + if (true) { + MockSrsConfig conf; + HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{forward {backend xxx;}}")); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") != NULL); } if (true) { @@ -3115,6 +3122,7 @@ VOID TEST(ConfigMainTest, CheckVhostConfig3) EXPECT_EQ(0, (int)conf.get_vhost_coworkers("ossrs.net").size()); EXPECT_FALSE(conf.get_security_enabled("ossrs.net")); EXPECT_TRUE(conf.get_security_rules("ossrs.net") == NULL); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") == NULL); } if (true) {