Skip to content

Commit

Permalink
Fix #2837: Callback: Support stream_url and stream_id. v5.0.55
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 31, 2022
1 parent 9c6774b commit 6a108fa
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 80 deletions.
18 changes: 12 additions & 6 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,8 @@ vhost hooks.callback.srs.com {
# "action": "on_publish",
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty"
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
Expand All @@ -1215,7 +1216,8 @@ vhost hooks.callback.srs.com {
# "action": "on_unpublish",
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty"
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
Expand All @@ -1232,7 +1234,8 @@ vhost hooks.callback.srs.com {
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "pageUrl": "http://www.test.com/live.html", "server_id": "vid-werty"
# "pageUrl": "http://www.test.com/live.html", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
Expand All @@ -1248,7 +1251,8 @@ vhost hooks.callback.srs.com {
# "action": "on_stop",
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty"
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
Expand All @@ -1266,7 +1270,8 @@ vhost hooks.callback.srs.com {
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "cwd": "/usr/local/srs",
# "file": "./objs/nginx/html/live/livestream.1420254068776.flv", "server_id": "vid-werty"
# "file": "./objs/nginx/html/live/livestream.1420254068776.flv", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
Expand All @@ -1285,7 +1290,8 @@ vhost hooks.callback.srs.com {
# "url": "live/livestream/2015-04-23/01/476584165.ts",
# "m3u8": "./objs/nginx/html/live/livestream/live.m3u8",
# "m3u8_url": "live/livestream/live.m3u8",
# "seq_no": 100, "server_id": "vid-werty"
# "seq_no": 100, "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
Expand Down
1 change: 1 addition & 0 deletions trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The changelog for SRS.

## SRS 5.0 Changelog

* v5.0, 2022-08-30, Fix [#2837](https://github.com/ossrs/srs/issues/2837): Callback: Support stream_url and stream_id. v5.0.55
* v5.0, 2022-08-30, STAT: Refine tcUrl for SRT/RTC. v5.0.54
* v5.0, 2022-08-30, Refactor: Extract SrsNetworkKbps from SrsKbps. v5.0.53
* v5.0, 2022-08-30, Remove bandwidth check because falsh is disabled. v5.0.52
Expand Down
11 changes: 8 additions & 3 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd,

manager = cm;
skt = new SrsTcpConnection(fd);
enable_stat_ = false;

if (https) {
ssl = new SrsSslConnection(skt);
Expand All @@ -313,6 +314,11 @@ SrsHttpxConn::~SrsHttpxConn()
srs_freep(skt);
}

void SrsHttpxConn::set_enable_stat(bool v)
{
enable_stat_ = v;
}

srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -399,9 +405,8 @@ srs_error_t SrsHttpxConn::on_message_done(ISrsHttpMessage* r, SrsHttpResponseWri
srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0)
{
// Only stat the HTTP streaming clients, ignore all API clients.
bool exists = false;
SrsStatistic::instance()->on_disconnect(get_id().c_str(), &exists);
if (exists) {
if (enable_stat_) {
SrsStatistic::instance()->on_disconnect(get_id().c_str());
SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), conn->delta());
}

Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,14 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt
SrsTcpConnection* skt;
SrsSslConnection* ssl;
SrsHttpConn* conn;
// We should never enable the stat, unless HTTP stream connection requires.
bool enable_stat_;
public:
SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpxConn();
public:
// Require statistic about HTTP connection, for HTTP streaming clients only.
void set_enable_stat(bool v);
// Directly read a HTTP request message.
// It's exported for HTTP stream, such as HTTP FLV, only need to write to client when
// serving it, but we need to start a thread to read message to detect whether FD is closed.
Expand Down
43 changes: 42 additions & 1 deletion trunk/src/app/srs_app_http_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req)

SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);

obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str()));
obj->set("action", SrsJsonAny::str("on_publish"));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
Expand All @@ -137,6 +137,12 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req)
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));

obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}

std::string data = obj->dumps();
std::string res;
Expand Down Expand Up @@ -171,8 +177,15 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req)
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));

obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}

std::string data = obj->dumps();
std::string res;
Expand Down Expand Up @@ -211,8 +224,15 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req)
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("pageUrl", SrsJsonAny::str(req->pageUrl.c_str()));

obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}

std::string data = obj->dumps();
std::string res;
Expand Down Expand Up @@ -247,8 +267,15 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req)
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));

obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}

std::string data = obj->dumps();
std::string res;
Expand Down Expand Up @@ -287,10 +314,17 @@ srs_error_t SrsHttpHooks::on_dvr(SrsContextId c, string url, SrsRequest* req, st
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("cwd", SrsJsonAny::str(cwd.c_str()));
obj->set("file", SrsJsonAny::str(file.c_str()));

obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}

std::string data = obj->dumps();
std::string res;
Expand Down Expand Up @@ -332,6 +366,7 @@ srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, SrsRequest* req, st
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("duration", SrsJsonAny::number(srsu2ms(duration)/1000.0));
Expand All @@ -341,6 +376,12 @@ srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, SrsRequest* req, st
obj->set("m3u8", SrsJsonAny::str(m3u8.c_str()));
obj->set("m3u8_url", SrsJsonAny::str(m3u8_url.c_str()));
obj->set("seq_no", SrsJsonAny::integer(sn));

obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}

std::string data = obj->dumps();
std::string res;
Expand Down
20 changes: 14 additions & 6 deletions trunk/src/app/srs_app_http_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ srs_error_t SrsHlsStream::serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpM
SrsContextRestore(_srs_context->get_id());
_srs_context->set_id(SrsContextId().set_value(ctx));

// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsHlsPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}

// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play(req)) != srs_success) {
return srs_error_wrap(err, "HLS: http_hooks_on_play");
}
Expand Down Expand Up @@ -155,12 +162,6 @@ srs_error_t SrsHlsStream::serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpM
return srs_error_wrap(err, "final request");
}

// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsHlsPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}

return err;
}

Expand Down Expand Up @@ -497,6 +498,13 @@ srs_error_t SrsVodStream::serve_ts_ctx(ISrsHttpResponseWriter * w, ISrsHttpMessa

// SrsServer also stat all HTTP connections including this one, but it should be ignored because the id is not
// matched to any exists client. And we will do stat for the HLS streaming by session in hls_ctx.
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());

// Note that we never enable the stat for the HTTP connection, because we always stat the pseudo HLS streaming
// session identified by hls_ctx, which served by an SrsHlsStream object.
hxc->set_enable_stat(false);

// Serve by default HLS handler.
err = SrsHttpFileServer::serve_ts_ctx(w, r, fullpath);
Expand Down
41 changes: 25 additions & 16 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,28 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
{
srs_error_t err = srs_success;

SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());

// Note that we should enable stat for HTTP streaming client, because each HTTP streaming connection is a real
// session that should have statistics for itself.
hxc->set_enable_stat(true);

// Correct the app and stream by path, which is created from template.
// @remark Be careful that the stream has extension now, might cause identify fail.
req->stream = srs_path_basename(r->path());


// update client ip
req->ip = hc->remote_ip();

// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, hc, SrsFlvPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}

// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play(r)) != srs_success) {
return srs_error_wrap(err, "http hook");
}
Expand Down Expand Up @@ -591,15 +609,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());

// update client ip
req->ip = hc->remote_ip();

// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, hc, SrsFlvPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}

// the memory writer.
SrsBufferWriter writer(w);
if ((err = enc->initialize(&writer, cache)) != srs_success) {
Expand All @@ -616,25 +625,25 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Try to use fast flv encoder, remember that it maybe NULL.
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);

// Note that the handler of hc now is rohc.
SrsHttpxConn* rohc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(rohc);
// Note that the handler of hc now is hxc.
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(hxc);

// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = rohc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
if ((err = hxc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}

srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
if ((err = rohc->set_socket_buffer(mw_sleep)) != srs_success) {
if ((err = hxc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
}

// Start a thread to receive all messages from client, then drop them.
SrsHttpRecvThread* trd = new SrsHttpRecvThread(rohc);
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hxc);
SrsAutoFree(SrsHttpRecvThread, trd);

if ((err = trd->start()) != srs_success) {
Expand Down
Loading

0 comments on commit 6a108fa

Please sign in to comment.